26 const std::string &jobID,
66 std::string computationSpecifier = joinAtomicComputation->getComputationName();
69 std::string outputName = joinAtomicComputation->getOutputName();
82 sink = makeObject<SetIdentifier>(
jobID, outputName +
"_repartitionData");
83 sink->setPageSize(
conf->getBroadcastPageSize());
97 tupleStageBuilder->addTupleSetToBuildPipeline(it->getOutputName());
101 tupleStageBuilder->setSourceTupleSetName(sourceAtomicComputation->getOutputName());
102 tupleStageBuilder->setSourceContext(
source);
103 tupleStageBuilder->setInputAggHashOut(
source->isAggregationResult());
104 tupleStageBuilder->setJobId(jobID);
105 tupleStageBuilder->setProbing(
isProbing);
107 tupleStageBuilder->setJobStageId(nextStageID++);
108 tupleStageBuilder->setTargetTupleSetName(finalAtomicComputationName);
109 tupleStageBuilder->setTargetComputationName(computationSpecifier);
110 tupleStageBuilder->setOutputTypeName(
"IntermediateData");
111 tupleStageBuilder->setSinkContext(sink);
112 tupleStageBuilder->setRepartition(
true);
113 tupleStageBuilder->setAllocatorPolicy(curComp->getAllocatorPolicy());
114 tupleStageBuilder->setRepartitionJoin(
true);
118 tupleStageBuilder->addHashSetToProbe(it.first, it.second);
125 result->physicalPlanToOutput.emplace_back(joinPrepStage);
128 result->interGlobalSets.push_back(sink);
131 std::string hashSetName = sink->toSourceSetName();
137 hashBuilder->setJobId(jobID);
138 hashBuilder->setJobStageId(nextStageID);
139 hashBuilder->setSourceTupleSetName(joinPrepStage->getSourceTupleSetSpecifier());
140 hashBuilder->setTargetTupleSetName(finalAtomicComputationName);
141 hashBuilder->setTargetComputationName(computationSpecifier);
142 hashBuilder->setSourceContext(sink);
143 hashBuilder->setHashSetName(hashSetName);
153 result->physicalPlanToOutput.emplace_back(joinPartitionStage);
156 result->success =
true;
Handle< SetIdentifier > source
void extractAtomicComputations()
std::shared_ptr< Statistics > StatisticsPtr
Handle< ComputePlan > computePlan
std::list< AdvancedPhysicalPipelineNodePtr > pipeline
void extractHashSetsToProbe()
std::shared_ptr< LogicalPlan > LogicalPlanPtr
list< AtomicComputationPtr > pipelineComputations
std::shared_ptr< HashPartitionedJoinBuildHTJobStageBuilder > HashPartitionedJoinBuildHTJobStageBuilderPtr
PhysicalOptimizerResultPtr generate(int nextStageID, const StatisticsPtr &stats) override
AdvancedPhysicalAbstractAlgorithmTypeID
shared_ptr< Configuration > ConfigurationPtr
LogicalPlanPtr logicalPlan
unordered_map< std::string, std::string > probingHashSets
Handle< SetIdentifier > sink
std::shared_ptr< AdvancedPhysicalAbstractPipe > AdvancedPhysicalPipelineNodePtr
std::shared_ptr< TupleSetJobStageBuilder > TupleSetJobStageBuilderPtr
std::shared_ptr< PhysicalOptimizerResult > PhysicalOptimizerResultPtr
AdvancedPhysicalAbstractAlgorithmTypeID getType() override