25 const std::string &jobID,
64 std::string computationSpecifier = joinAtomicComputation->getComputationName();
70 std::string outputName = joinAtomicComputation->getOutputName();
73 sink = makeObject<SetIdentifier>(
jobID, outputName +
"_broadcastData");
74 sink->setPageSize(
conf->getBroadcastPageSize());
88 tupleStageBuilder->addTupleSetToBuildPipeline(it->getOutputName());
92 tupleStageBuilder->setSourceTupleSetName(sourceAtomicComputation->getOutputName());
93 tupleStageBuilder->setSourceContext(
source);
94 tupleStageBuilder->setInputAggHashOut(
source->isAggregationResult());
95 tupleStageBuilder->setJobId(jobID);
98 tupleStageBuilder->setJobStageId(nextStageID);
99 tupleStageBuilder->setTargetTupleSetName(finalAtomicComputationName);
100 tupleStageBuilder->setTargetComputationName(computationSpecifier);
101 tupleStageBuilder->setOutputTypeName(
"IntermediateData");
102 tupleStageBuilder->setSinkContext(
sink);
103 tupleStageBuilder->setBroadcasting(
true);
104 tupleStageBuilder->setAllocatorPolicy(curComp->getAllocatorPolicy());
108 tupleStageBuilder->addHashSetToProbe(it.first, it.second);
115 result->physicalPlanToOutput.emplace_back(joinPrepStage);
118 result->interGlobalSets.push_back(
sink);
121 std::string hashSetName =
sink->toSourceSetName();
127 broadcastBuilder->setJobId(jobID);
128 broadcastBuilder->setJobStageId(nextStageID);
129 broadcastBuilder->setSourceTupleSetName(joinPrepStage->getSourceTupleSetSpecifier());
130 broadcastBuilder->setTargetTupleSetName(finalAtomicComputationName);
131 broadcastBuilder->setTargetComputationName(computationSpecifier);
132 broadcastBuilder->setSourceContext(
sink);
133 broadcastBuilder->setHashSetName(hashSetName);
143 result->physicalPlanToOutput.emplace_back(joinBroadcastStage);
146 result->success =
true;
Handle< SetIdentifier > source
void extractAtomicComputations()
std::shared_ptr< Statistics > StatisticsPtr
Handle< ComputePlan > computePlan
AdvancedPhysicalAbstractAlgorithmTypeID getType() override
std::list< AdvancedPhysicalPipelineNodePtr > pipeline
void extractHashSetsToProbe()
std::shared_ptr< LogicalPlan > LogicalPlanPtr
list< AtomicComputationPtr > pipelineComputations
std::shared_ptr< BroadcastJoinBuildHTJobStageBuilder > BroadcastJoinBuildHTJobStageBuilderPtr
PhysicalOptimizerResultPtr generate(int nextStageID, const StatisticsPtr &stats) override
AdvancedPhysicalAbstractAlgorithmTypeID
shared_ptr< Configuration > ConfigurationPtr
LogicalPlanPtr logicalPlan
unordered_map< std::string, std::string > probingHashSets
Handle< SetIdentifier > sink
std::shared_ptr< AdvancedPhysicalAbstractPipe > AdvancedPhysicalPipelineNodePtr
std::shared_ptr< TupleSetJobStageBuilder > TupleSetJobStageBuilderPtr
std::shared_ptr< PhysicalOptimizerResult > PhysicalOptimizerResultPtr