26 const std::string &jobID,
61 std::string outputName = finalAtomicComputation->getOutputName();
71 if (!curComp->needsMaterializeOutput()) {
74 curComp->setOutput(
jobID, outputName);
77 sink = makeObject<SetIdentifier>(
jobID, outputName);
78 sink->setPageSize(
conf->getPageSize());
81 result->interGlobalSets.push_back(sink);
84 sink = makeObject<SetIdentifier>(curComp->getDatabaseName(), curComp->getSetName());
91 tupleStageBuilder->setSourceTupleSetName(sourceAtomicComputation->getOutputName());
94 tupleStageBuilder->setSourceContext(
source);
97 tupleStageBuilder->setInputAggHashOut(
source->isAggregationResult());
100 tupleStageBuilder->setJobId(
jobID);
103 tupleStageBuilder->setProbing(
isProbing);
117 tupleStageBuilder->addTupleSetToBuildPipeline(it->getOutputName());
121 tupleStageBuilder->setJobStageId(nextStageID);
122 tupleStageBuilder->setTargetTupleSetName(finalAtomicComputation->getInputName());
123 tupleStageBuilder->setTargetComputationName(finalAtomicComputation->getComputationName());
124 tupleStageBuilder->setOutputTypeName(curComp->getOutputType());
125 tupleStageBuilder->setSinkContext(sink);
126 tupleStageBuilder->setAllocatorPolicy(curComp->getAllocatorPolicy());
130 tupleStageBuilder->addHashSetToProbe(it.first, it.second);
137 result->physicalPlanToOutput.emplace_back(jobStage);
138 result->success =
true;
142 if(
pipeline.back()->getNumConsumers() != 0) {
145 for(
int i = 0; i <
pipeline.back()->getNumConsumers(); ++i) {
146 result->createdSourceComputations.push_back(
pipeline.back()->getConsumer(i));
Handle< SetIdentifier > source
void extractAtomicComputations()
std::shared_ptr< Statistics > StatisticsPtr
Handle< ComputePlan > computePlan
void updateConsumers(const Handle< SetIdentifier > &sink, DataStatistics approxSize, const StatisticsPtr &stats)
std::list< AdvancedPhysicalPipelineNodePtr > pipeline
void extractHashSetsToProbe()
AdvancedPhysicalAbstractAlgorithmTypeID getType() override
PhysicalOptimizerResultPtr generate(int nextStageID, const StatisticsPtr &stats) override
std::shared_ptr< LogicalPlan > LogicalPlanPtr
list< AtomicComputationPtr > pipelineComputations
virtual DataStatistics approximateResultSize(const StatisticsPtr &stats)
AdvancedPhysicalAbstractAlgorithmTypeID
shared_ptr< Configuration > ConfigurationPtr
LogicalPlanPtr logicalPlan
AdvancedPhysicalPipelineAlgorithm(const AdvancedPhysicalPipelineNodePtr &handle, const std::string &jobID, bool isProbing, bool isOutput, const Handle< SetIdentifier > &source, const Handle< ComputePlan > &computePlan, const LogicalPlanPtr &logicalPlan, const ConfigurationPtr &conf)
unordered_map< std::string, std::string > probingHashSets
Handle< SetIdentifier > sink
std::shared_ptr< AdvancedPhysicalAbstractPipe > AdvancedPhysicalPipelineNodePtr
std::shared_ptr< TupleSetJobStageBuilder > TupleSetJobStageBuilderPtr
std::shared_ptr< PhysicalOptimizerResult > PhysicalOptimizerResultPtr