35 transversed(false), rollbacked(false) {}
43 Handle<Computation> comp = logicalPlan->getNode(node->getComputationName()).getComputationHandle();
46 PDB_COUT <<
"Sink Computation Type: " << comp->getComputationType() <<
" are not supported as sink node right now\n";
56 string targetTupleSetName = prevNode ==
nullptr ? node->getInputName() : prevNode->getNode()->getOutputName();
61 shared_ptr<ApplyJoin> joinNode = dynamic_pointer_cast<
ApplyJoin>(node);
67 std::string outputName = node->getOutputName();
70 Handle<Computation> curComp = logicalPlan->getNode(computationSpecifier).getComputationHandle();
83 double sourceCost = getCost(tupleStageBuilder->getSourceSetIdentifier(), stats);
87 if (tupleStageBuilder->isPipelineProbing() && (sourceCost <= BROADCAST_JOIN_COST_THRESHOLD) && !rollbacked) {
93 result->success =
false;
94 result->physicalPlanToOutput.clear();
95 result->interGlobalSets.clear();
105 else if (sourceCost > BROADCAST_JOIN_COST_THRESHOLD) {
108 joinNode->setPartitioningLHS(
true);
112 join = unsafeCast<JoinComp<Object, Object, Object>,
Computation>(curComp);
119 sink = makeObject<SetIdentifier>(jobId, outputName +
"_repartitionData");
120 sink->setPageSize(conf->getBroadcastPageSize());
123 tupleStageBuilder->setJobStageId(nextStageID++);
124 tupleStageBuilder->setTargetTupleSetName(targetTupleSetName);
125 tupleStageBuilder->setTargetComputationName(computationSpecifier);
126 tupleStageBuilder->setOutputTypeName(
"IntermediateData");
127 tupleStageBuilder->setSinkContext(sink);
128 tupleStageBuilder->setRepartition(
true);
129 tupleStageBuilder->setAllocatorPolicy(curComp->getAllocatorPolicy());
130 tupleStageBuilder->setRepartitionJoin(
true);
136 result->physicalPlanToOutput.emplace_back(joinPrepStage);
139 result->interGlobalSets.push_back(sink);
142 hashSetName = sink->toSourceSetName();
146 hashBuilder = make_shared<HashPartitionedJoinBuildHTJobStageBuilder>();
149 hashBuilder->setJobId(jobId);
150 hashBuilder->setJobStageId(nextStageID++);
151 hashBuilder->setSourceTupleSetName(joinPrepStage->getSourceTupleSetSpecifier());
152 hashBuilder->setTargetTupleSetName(targetTupleSetName);
153 hashBuilder->setTargetComputationName(computationSpecifier);
154 hashBuilder->setSourceContext(sink);
155 hashBuilder->setHashSetName(hashSetName);
156 hashBuilder->setComputePlan(computePlan);
162 result->physicalPlanToOutput.emplace_back(joinPartitionStage);
172 sink = makeObject<SetIdentifier>(jobId, outputName +
"_broadcastData");
173 sink->setPageSize(conf->getBroadcastPageSize());
176 tupleStageBuilder->setJobStageId(nextStageID);
177 tupleStageBuilder->setTargetTupleSetName(targetTupleSetName);
178 tupleStageBuilder->setTargetComputationName(computationSpecifier);
179 tupleStageBuilder->setOutputTypeName(
"IntermediateData");
180 tupleStageBuilder->setSinkContext(sink);
181 tupleStageBuilder->setBroadcasting(
true);
182 tupleStageBuilder->setAllocatorPolicy(curComp->getAllocatorPolicy());
188 result->physicalPlanToOutput.emplace_back(joinPrepStage);
191 result->interGlobalSets.push_back(sink);
194 hashSetName = sink->toSourceSetName();
200 broadcastBuilder->setJobId(jobId);
201 broadcastBuilder->setJobStageId(nextStageID);
202 broadcastBuilder->setSourceTupleSetName(joinPrepStage->getSourceTupleSetSpecifier());
203 broadcastBuilder->setTargetTupleSetName(targetTupleSetName);
204 broadcastBuilder->setTargetComputationName(computationSpecifier);
205 broadcastBuilder->setSourceContext(sink);
206 broadcastBuilder->setHashSetName(hashSetName);
207 broadcastBuilder->setComputePlan(computePlan);
213 result->physicalPlanToOutput.emplace_back(joinBroadcastStage);
221 result->success =
true;
233 if (joinNode->isPartitioningLHS()) {
239 sink = makeObject<SetIdentifier>(jobId, outputName +
"_repartitionData");
240 sink->setPageSize(conf->getBroadcastPageSize());
243 tupleStageBuilder->setJobStageId(nextStageID++);
244 tupleStageBuilder->setTargetTupleSetName(targetTupleSetName);
245 tupleStageBuilder->setTargetComputationName(computationSpecifier);
246 tupleStageBuilder->setOutputTypeName(
"IntermediateData");
247 tupleStageBuilder->setSinkContext(sink);
248 tupleStageBuilder->setRepartition(
true);
249 tupleStageBuilder->setAllocatorPolicy(curComp->getAllocatorPolicy());
250 tupleStageBuilder->setRepartitionJoin(
true);
261 probingStageBuilder->setJobId(joinPrepStage->getJobId());
264 probingStageBuilder->setSourceTupleSetName(tupleStageBuilder->getSourceTupleSetName());
267 string lastOne = tupleStageBuilder->getLastSetThatBuildsPipeline();
270 probingStageBuilder->addTupleSetToBuildPipeline(lastOne);
271 probingStageBuilder->addTupleSetToBuildPipeline(node->getOutputName());
274 probingStageBuilder->addHashSetToProbe(outputName, hashSetName);
275 probingStageBuilder->setProbing(
true);
276 probingStageBuilder->setSourceContext(sink);
277 probingStageBuilder->setSourceTupleSetName(tupleStageBuilder->getSourceTupleSetName());
278 probingStageBuilder->setComputePlan(computePlan);
284 stats->addSetAlias(tupleStageBuilder->getSourceSetIdentifier()->getDatabase(),
285 tupleStageBuilder->getSourceSetIdentifier()->getSetName(),
296 stats->removeSet(sink->getDatabase(), sink->getSetName());
299 result->physicalPlanToOutput.emplace_front(joinPrepStage);
302 result->interGlobalSets.emplace_front(sink);
311 tupleStageBuilder->addTupleSetToBuildPipeline(node->getOutputName());
312 tupleStageBuilder->addHashSetToProbe(outputName, hashSetName);
313 tupleStageBuilder->setProbing(
true);
319 return activeConsumers.front()->analyze(tupleStageBuilder, newPrevNode, stats, nextStageID);
PhysicalOptimizerResultPtr analyzeSingleConsumer(TupleSetJobStageBuilderPtr &tupleStageBuilder, SimplePhysicalNodePtr &prevNode, const StatisticsPtr &stats, int nextStageID) override
std::shared_ptr< Statistics > StatisticsPtr
std::string & getComputationName()
std::shared_ptr< LogicalPlan > LogicalPlanPtr
std::shared_ptr< HashPartitionedJoinBuildHTJobStageBuilder > HashPartitionedJoinBuildHTJobStageBuilderPtr
PhysicalOptimizerResultPtr analyzeOutput(TupleSetJobStageBuilderPtr &ptr, SimplePhysicalNodePtr &prevNode, const StatisticsPtr &stats, int nextStageID) override
std::shared_ptr< BroadcastJoinBuildHTJobStageBuilder > BroadcastJoinBuildHTJobStageBuilderPtr
shared_ptr< Configuration > ConfigurationPtr
std::shared_ptr< struct AtomicComputation > AtomicComputationPtr
std::shared_ptr< SimplePhysicalNode > SimplePhysicalNodePtr
SimplePhysicalJoinNode(string jobId, AtomicComputationPtr node, const Handle< ComputePlan > &computePlan, LogicalPlanPtr logicalPlan, ConfigurationPtr conf)
std::shared_ptr< TupleSetJobStageBuilder > TupleSetJobStageBuilderPtr
std::shared_ptr< PhysicalOptimizerResult > PhysicalOptimizerResultPtr