25 const std::string &jobID,
58 std::string computationSpecifier = joinAtomicComputation->getComputationName();
61 std::string outputName = joinAtomicComputation->getOutputName();
67 sink = makeObject<SetIdentifier>(
jobID, outputName +
"_repartitionData");
68 sink->setPageSize(
conf->getBroadcastPageSize());
82 tupleStageBuilder->addTupleSetToBuildPipeline(it->getOutputName());
86 tupleStageBuilder->setSourceTupleSetName(sourceAtomicComputation->getOutputName());
87 tupleStageBuilder->setSourceContext(
source);
88 tupleStageBuilder->setInputAggHashOut(
source->isAggregationResult());
89 tupleStageBuilder->setJobId(jobID);
92 tupleStageBuilder->setJobStageId(nextStageID++);
93 tupleStageBuilder->setTargetTupleSetName(finalAtomicComputationName);
94 tupleStageBuilder->setTargetComputationName(computationSpecifier);
95 tupleStageBuilder->setOutputTypeName(
"IntermediateData");
96 tupleStageBuilder->setSinkContext(
sink);
97 tupleStageBuilder->setRepartition(
true);
98 tupleStageBuilder->setAllocatorPolicy(curComp->getAllocatorPolicy());
99 tupleStageBuilder->setRepartitionJoin(
true);
103 tupleStageBuilder->addHashSetToProbe(it.first, it.second);
117 result->physicalPlanToOutput.emplace_front(joinPrepStage);
120 result->interGlobalSets.emplace_front(
sink);
123 result->success =
true;
127 if(
pipeline.back()->getNumConsumers() != 0) {
130 for(
int i = 0; i <
pipeline.back()->getNumConsumers(); ++i) {
131 result->createdSourceComputations.push_back(
pipeline.back()->getConsumer(i));
Handle< SetIdentifier > source
void extractAtomicComputations()
PhysicalOptimizerResultPtr generate(int nextStageID, const StatisticsPtr &stats) override
std::shared_ptr< Statistics > StatisticsPtr
Handle< ComputePlan > computePlan
void updateConsumers(const Handle< SetIdentifier > &sink, DataStatistics approxSize, const StatisticsPtr &stats)
std::list< AdvancedPhysicalPipelineNodePtr > pipeline
void extractHashSetsToProbe()
AdvancedPhysicalShuffleSetAlgorithm(const AdvancedPhysicalPipelineNodePtr &handle, const std::string &jobID, bool isProbing, bool isOutput, const Handle< SetIdentifier > &source, const Handle< ComputePlan > &computePlan, const LogicalPlanPtr &logicalPlan, const ConfigurationPtr &conf)
std::shared_ptr< LogicalPlan > LogicalPlanPtr
list< AtomicComputationPtr > pipelineComputations
virtual DataStatistics approximateResultSize(const StatisticsPtr &stats)
AdvancedPhysicalAbstractAlgorithmTypeID getType() override
AdvancedPhysicalAbstractAlgorithmTypeID
shared_ptr< Configuration > ConfigurationPtr
LogicalPlanPtr logicalPlan
unordered_map< std::string, std::string > probingHashSets
Handle< SetIdentifier > sink
std::shared_ptr< AdvancedPhysicalAbstractPipe > AdvancedPhysicalPipelineNodePtr
std::shared_ptr< TupleSetJobStageBuilder > TupleSetJobStageBuilderPtr
std::shared_ptr< PhysicalOptimizerResult > PhysicalOptimizerResultPtr