26 const std::string &jobID,
60 std::string outputName = finalAtomicComputation->getOutputName();
63 std::string computationSpecifier = finalAtomicComputation->getComputationName();
73 if (!curComp->needsMaterializeOutput() &&
pipeline.back()->getNumConsumers() == 1) {
75 sink = makeObject<SetIdentifier>(
jobID,
76 finalAtomicComputation->getOutputName() +
"_aggregationResult",
80 else if(!curComp->needsMaterializeOutput()){
83 curComp->setOutput(
jobID, finalAtomicComputation->getOutputName());
86 sink = makeObject<SetIdentifier>(
jobID, finalAtomicComputation->getOutputName());
87 sink->setPageSize(
conf->getPageSize());
90 result->interGlobalSets.push_back(sink);
94 sink = makeObject<SetIdentifier>(curComp->getDatabaseName(), curComp->getSetName());
98 Handle<SetIdentifier> aggregator = makeObject<SetIdentifier>(
jobID, finalAtomicComputation->getOutputName() +
"_aggregationData");
99 aggregator->setPageSize(
conf->getShufflePageSize());
103 if (curComp->isUsingCombiner()) {
105 combiner = makeObject<SetIdentifier>(
jobID, finalAtomicComputation->getOutputName() +
"_combinerData");
112 tupleStageBuilder->setSourceTupleSetName(sourceAtomicComputation->getOutputName());
115 tupleStageBuilder->setSourceContext(
source);
118 tupleStageBuilder->setInputAggHashOut(
source->isAggregationResult());
121 tupleStageBuilder->setJobId(jobID);
124 tupleStageBuilder->setProbing(
isProbing);
143 tupleStageBuilder->addTupleSetToBuildPipeline(comp->getOutputName());
153 tupleStageBuilder->setJobStageId(nextStageID++);
154 tupleStageBuilder->setTargetTupleSetName(finalAtomicComputation->getInputName());
155 tupleStageBuilder->setTargetComputationName(computationSpecifier);
156 tupleStageBuilder->setOutputTypeName(
"IntermediateData");
157 tupleStageBuilder->setCombiner(combiner);
158 tupleStageBuilder->setSinkContext(aggregator);
159 tupleStageBuilder->setRepartition(
true);
160 tupleStageBuilder->setAllocatorPolicy(curComp->getAllocatorPolicy());
164 tupleStageBuilder->addHashSetToProbe(it.first, it.second);
168 result->physicalPlanToOutput.emplace_back(tupleStageBuilder->build());
177 aggregationBuilder->setJobId(jobID);
178 aggregationBuilder->setJobStageId(nextStageID);
179 aggregationBuilder->setAggComp(agg);
180 aggregationBuilder->setSourceContext(aggregator);
181 aggregationBuilder->setSinkContext(sink);
182 aggregationBuilder->setMaterializeOrNot(curComp->needsMaterializeOutput());
185 result->physicalPlanToOutput.emplace_back(aggregationBuilder->build());
188 result->interGlobalSets.push_back(aggregator);
192 if(
pipeline.back()->getNumConsumers() != 0) {
195 for(
int i = 0; i <
pipeline.back()->getNumConsumers(); ++i) {
196 result->createdSourceComputations.push_back(
pipeline.back()->getConsumer(i));
201 result->success =
true;
Handle< SetIdentifier > source
void extractAtomicComputations()
std::shared_ptr< Statistics > StatisticsPtr
Handle< ComputePlan > computePlan
void updateConsumers(const Handle< SetIdentifier > &sink, DataStatistics approxSize, const StatisticsPtr &stats)
std::list< AdvancedPhysicalPipelineNodePtr > pipeline
void extractHashSetsToProbe()
std::shared_ptr< LogicalPlan > LogicalPlanPtr
list< AtomicComputationPtr > pipelineComputations
AdvancedPhysicalAbstractAlgorithmTypeID getType() override
virtual DataStatistics approximateResultSize(const StatisticsPtr &stats)
AdvancedPhysicalAggregationPipelineAlgorithm(const AdvancedPhysicalPipelineNodePtr &handle, const std::string &jobID, bool isProbing, bool isOutput, const Handle< SetIdentifier > &source, const Handle< ComputePlan > &computePlan, const LogicalPlanPtr &logicalPlan, const ConfigurationPtr &conf)
AdvancedPhysicalAbstractAlgorithmTypeID
shared_ptr< Configuration > ConfigurationPtr
LogicalPlanPtr logicalPlan
PhysicalOptimizerResultPtr generate(int nextStageID, const StatisticsPtr &stats) override
std::shared_ptr< AggregationJobStageBuilder > AggregationJobStageBuilderPtr
unordered_map< std::string, std::string > probingHashSets
Handle< SetIdentifier > sink
std::shared_ptr< AdvancedPhysicalAbstractPipe > AdvancedPhysicalPipelineNodePtr
std::shared_ptr< TupleSetJobStageBuilder > TupleSetJobStageBuilderPtr
std::shared_ptr< PhysicalOptimizerResult > PhysicalOptimizerResultPtr