44 std::string computationSpecifier =
node->getComputationName();
54 aggregator->setPageSize(
conf->getShufflePageSize());
58 if (comp->isUsingCombiner()) {
59 combiner = makeObject<SetIdentifier>(
jobId,
node->getOutputName() +
"_combinerData");
73 tupleStageBuilder->setJobStageId(nextStageID++);
74 tupleStageBuilder->setTargetTupleSetName(
node->getInputName());
75 tupleStageBuilder->setTargetComputationName(computationSpecifier);
76 tupleStageBuilder->setOutputTypeName(
"IntermediateData");
77 tupleStageBuilder->setCombiner(combiner);
78 tupleStageBuilder->setSinkContext(aggregator);
79 tupleStageBuilder->setRepartition(
true);
80 tupleStageBuilder->setAllocatorPolicy(comp->getAllocatorPolicy());
81 tupleStageBuilder->setCollectAsMap(agg->isCollectAsMap());
82 tupleStageBuilder->setNumNodesToCollect(agg->getNumNodesToCollect());
85 result->interGlobalSets.push_back(aggregator);
88 result->physicalPlanToOutput.emplace_back(tupleStageBuilder->build());
94 aggregationBuilder->setJobId(jobId);
95 aggregationBuilder->setJobStageId(nextStageID);
96 aggregationBuilder->setAggComp(agg);
97 aggregationBuilder->setSourceContext(aggregator);
98 aggregationBuilder->setSinkContext(sink);
99 aggregationBuilder->setMaterializeOrNot(
true);
102 result->physicalPlanToOutput.emplace_back(aggregationBuilder->build());
105 result->success =
true;
118 std::string computationSpecifier =
node->getComputationName();
125 aggregator->setPageSize(
conf->getShufflePageSize());
129 if (curComp->isUsingCombiner()) {
130 combiner = makeObject<SetIdentifier>(
jobId,
node->getOutputName() +
"_combinerData");
144 tupleStageBuilder->setJobStageId(nextStageID++);
145 tupleStageBuilder->setTargetTupleSetName(
node->getInputName());
146 tupleStageBuilder->setTargetComputationName(computationSpecifier);
147 tupleStageBuilder->setOutputTypeName(
"IntermediateData");
148 tupleStageBuilder->setCombiner(combiner);
149 tupleStageBuilder->setSinkContext(aggregator);
150 tupleStageBuilder->setRepartition(
true);
151 tupleStageBuilder->setAllocatorPolicy(curComp->getAllocatorPolicy());
152 tupleStageBuilder->setCollectAsMap(agg->isCollectAsMap());
153 tupleStageBuilder->setNumNodesToCollect(agg->getNumNodesToCollect());
156 result->physicalPlanToOutput.emplace_back(tupleStageBuilder->build());
162 if (curComp->needsMaterializeOutput()) {
163 sink = makeObject<SetIdentifier>(curComp->getDatabaseName(), curComp->getSetName());
172 aggregationBuilder->setJobId(jobId);
173 aggregationBuilder->setJobStageId(nextStageID);
174 aggregationBuilder->setAggComp(agg);
175 aggregationBuilder->setSourceContext(aggregator);
176 aggregationBuilder->setSinkContext(sink);
177 aggregationBuilder->setMaterializeOrNot(curComp->needsMaterializeOutput());
180 result->physicalPlanToOutput.emplace_back(aggregationBuilder->build());
183 result->interGlobalSets.push_back(aggregator);
189 result->success =
true;
205 std::string computationSpecifier =
node->getComputationName();
215 if (!curComp->needsMaterializeOutput()) {
218 curComp->setOutput(
jobId,
node->getOutputName());
221 sink = makeObject<SetIdentifier>(
jobId,
node->getOutputName());
222 sink->setPageSize(
conf->getPageSize());
225 result->interGlobalSets.push_back(sink);
228 sink = makeObject<SetIdentifier>(curComp->getDatabaseName(), curComp->getSetName());
233 aggregator->setPageSize(
conf->getShufflePageSize());
237 if (curComp->isUsingCombiner()) {
239 combiner = makeObject<SetIdentifier>(
jobId,
node->getOutputName() +
"_combinerData");
249 tupleStageBuilder->setJobStageId(nextStageID++);
250 tupleStageBuilder->setTargetTupleSetName(
node->getInputName());
251 tupleStageBuilder->setTargetComputationName(computationSpecifier);
252 tupleStageBuilder->setOutputTypeName(
"IntermediateData");
253 tupleStageBuilder->setCombiner(combiner);
254 tupleStageBuilder->setSinkContext(aggregator);
255 tupleStageBuilder->setRepartition(
true);
256 tupleStageBuilder->setAllocatorPolicy(curComp->getAllocatorPolicy());
259 result->physicalPlanToOutput.emplace_back(tupleStageBuilder->build());
268 aggregationBuilder->setJobId(jobId);
269 aggregationBuilder->setJobStageId(nextStageID);
270 aggregationBuilder->setAggComp(agg);
271 aggregationBuilder->setSourceContext(aggregator);
272 aggregationBuilder->setSinkContext(sink);
273 aggregationBuilder->setMaterializeOrNot(curComp->needsMaterializeOutput());
276 result->physicalPlanToOutput.emplace_back(aggregationBuilder->build());
279 result->interGlobalSets.push_back(aggregator);
285 result->success =
true;
std::shared_ptr< Statistics > StatisticsPtr
PhysicalOptimizerResultPtr analyzeOutput(TupleSetJobStageBuilderPtr &tupleStageBuilder, SimplePhysicalNodePtr &prevNode, const StatisticsPtr &stats, int nextStageID) override
LogicalPlanPtr logicalPlan
std::shared_ptr< LogicalPlan > LogicalPlanPtr
PhysicalOptimizerResultPtr analyzeMultipleConsumers(TupleSetJobStageBuilderPtr &tupleStageBuilder, SimplePhysicalNodePtr &prevNode, const StatisticsPtr &stats, int nextStageID) override
SimplePhysicalAggregationNode(string jobId, AtomicComputationPtr node, const Handle< ComputePlan > &computePlan, LogicalPlanPtr logicalPlan, ConfigurationPtr conf)
shared_ptr< Configuration > ConfigurationPtr
std::shared_ptr< struct AtomicComputation > AtomicComputationPtr
std::shared_ptr< SimplePhysicalNode > SimplePhysicalNodePtr
SimplePhysicalNodePtr getSimpleNodeHandle()
AtomicComputationPtr node
Handle< SetIdentifier > sourceSetIdentifier
std::shared_ptr< AggregationJobStageBuilder > AggregationJobStageBuilderPtr
PhysicalOptimizerResultPtr analyzeSingleConsumer(TupleSetJobStageBuilderPtr &tupleStageBuilder, SimplePhysicalNodePtr &prevNode, const StatisticsPtr &stats, int nextStageID) override
std::shared_ptr< TupleSetJobStageBuilder > TupleSetJobStageBuilderPtr
std::shared_ptr< PhysicalOptimizerResult > PhysicalOptimizerResultPtr