A platform for high-performance distributed tool and library development written in C++. It can be deployed in two different cluster modes: standalone or distributed. API for v0.5.0, released on June 13, 2018.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
SimplePhysicalAggregationNode.cc
Go to the documentation of this file.
1 /*****************************************************************************
2  * *
3  * Copyright 2018 Rice University *
4  * *
5  * Licensed under the Apache License, Version 2.0 (the "License"); *
6  * you may not use this file except in compliance with the License. *
7  * You may obtain a copy of the License at *
8  * *
9  * http://www.apache.org/licenses/LICENSE-2.0 *
10  * *
11  * Unless required by applicable law or agreed to in writing, software *
12  * distributed under the License is distributed on an "AS IS" BASIS, *
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. *
14  * See the License for the specific language governing permissions and *
15  * limitations under the License. *
16  * *
17  *****************************************************************************/
22 
23 namespace pdb {
24 
27  const Handle<ComputePlan> &computePlan,
28  LogicalPlanPtr logicalPlan,
29  ConfigurationPtr conf) : SimplePhysicalNode(std::move(jobId),
30  std::move(node),
31  computePlan,
32  logicalPlan,
33  std::move(conf)) {}
34 
36  SimplePhysicalNodePtr &prevNode,
37  const StatisticsPtr &stats,
38  int nextStageID) {
39 
40  // create a analyzer result
41  PhysicalOptimizerResultPtr result = make_shared<PhysicalOptimizerResult>();
42 
43  // the computation specifier of this aggregation
44  std::string computationSpecifier = node->getComputationName();
45 
46  // grab the computation associated with this node
47  Handle<Computation> comp = logicalPlan->getNode(computationSpecifier).getComputationHandle();
48 
49  // create a SetIdentifier for the output set
50  Handle<SetIdentifier> sink = makeObject<SetIdentifier>(comp->getDatabaseName(), comp->getSetName());
51 
52  // create the set identifier where we store the data to be aggregated after the TupleSetJobStage
53  Handle<SetIdentifier> aggregator = makeObject<SetIdentifier>(jobId, node->getOutputName() + "_aggregationData");
54  aggregator->setPageSize(conf->getShufflePageSize());
55 
56  Handle<SetIdentifier> combiner = nullptr;
57  // are we using a combiner (the thing that combines the records by key before sending them to the right node)
58  if (comp->isUsingCombiner()) {
59  combiner = makeObject<SetIdentifier>(jobId, node->getOutputName() + "_combinerData");
60  }
61 
62  // cast the computation to AbstractAggregateComp to create the consuming job stage for aggregation
63  Handle<AbstractAggregateComp> agg = unsafeCast<AbstractAggregateComp, Computation>(comp);
64 
65  // create the tuple set job stage to run the pipeline with a shuffle sink
66  // here is what we are doing :
67  // the input to the stage is either the output of the join or the source node we started)
68  // the repartitioning flag is set to true, so that we run a pipeline with a shuffle sink
69  // the pipeline until the combiner will apply all the computations to the source set
70  // and put them on a page partitioned into multiple maps the combiner will then read the maps that belong to
71  // the partitions of a certain node and combine them by key. The output pages of the combiner will then be sent
72  // to the appropriate nodes depending on the parameters isCollectAsMap and getNumNodesToCollect
73  tupleStageBuilder->setJobStageId(nextStageID++);
74  tupleStageBuilder->setTargetTupleSetName(node->getInputName());
75  tupleStageBuilder->setTargetComputationName(computationSpecifier);
76  tupleStageBuilder->setOutputTypeName("IntermediateData");
77  tupleStageBuilder->setCombiner(combiner);
78  tupleStageBuilder->setSinkContext(aggregator);
79  tupleStageBuilder->setRepartition(true);
80  tupleStageBuilder->setAllocatorPolicy(comp->getAllocatorPolicy());
81  tupleStageBuilder->setCollectAsMap(agg->isCollectAsMap());
82  tupleStageBuilder->setNumNodesToCollect(agg->getNumNodesToCollect());
83 
84  // to push back the aggregator set
85  result->interGlobalSets.push_back(aggregator);
86 
87  // to push back the job stage
88  result->physicalPlanToOutput.emplace_back(tupleStageBuilder->build());
89 
90  // initialize the build hash partition set builder stage
91  AggregationJobStageBuilderPtr aggregationBuilder = make_shared<AggregationJobStageBuilder>();
92 
93  // to create the consuming job stage for aggregation
94  aggregationBuilder->setJobId(jobId);
95  aggregationBuilder->setJobStageId(nextStageID);
96  aggregationBuilder->setAggComp(agg);
97  aggregationBuilder->setSourceContext(aggregator);
98  aggregationBuilder->setSinkContext(sink);
99  aggregationBuilder->setMaterializeOrNot(true);
100 
101  // to push back the aggregation stage;
102  result->physicalPlanToOutput.emplace_back(aggregationBuilder->build());
103 
104  // we succeeded
105  result->success = true;
106 
107  return result;
108 }
109 
111  SimplePhysicalNodePtr &prevNode,
112  const StatisticsPtr &stats,
113  int nextStageID) {
114  // create a analyzer result
115  PhysicalOptimizerResultPtr result = make_shared<PhysicalOptimizerResult>();
116 
117  // the computation specifier of this aggregation
118  std::string computationSpecifier = node->getComputationName();
119 
120  // grab the computation associated with this node
121  Handle<Computation> curComp = logicalPlan->getNode(computationSpecifier).getComputationHandle();
122 
123  // to create the producing job stage for aggregation and set the page size
124  Handle<SetIdentifier> aggregator = makeObject<SetIdentifier>(jobId, node->getOutputName() + "_aggregationData");
125  aggregator->setPageSize(conf->getShufflePageSize());
126 
127  // are we using a combiner (the thing that combines the records by key before sending them to the right node)
128  Handle<SetIdentifier> combiner = nullptr;
129  if (curComp->isUsingCombiner()) {
130  combiner = makeObject<SetIdentifier>(jobId, node->getOutputName() + "_combinerData");
131  }
132 
133  // cast the computation to AbstractAggregateComp to create the consuming job stage for aggregation
134  Handle<AbstractAggregateComp> agg = unsafeCast<AbstractAggregateComp, Computation>(curComp);
135 
136  // create the tuple set job stage to run the pipeline with a shuffle sink
137  // here is what we are doing :
138  // the input to the stage is either the output of the join or the source node we started)
139  // the repartitioning flag is set to true, so that we run a pipeline with a shuffle sink
140  // the pipeline until the combiner will apply all the computations to the source set
141  // and put them on a page partitioned into multiple maps the combiner will then read the maps that belong to
142  // the partitions of a certain node and combine them by key. The output pages of the combiner will then be sent
143  // to the appropriate nodes depending on the parameters isCollectAsMap and getNumNodesToCollect
144  tupleStageBuilder->setJobStageId(nextStageID++);
145  tupleStageBuilder->setTargetTupleSetName(node->getInputName());
146  tupleStageBuilder->setTargetComputationName(computationSpecifier);
147  tupleStageBuilder->setOutputTypeName("IntermediateData");
148  tupleStageBuilder->setCombiner(combiner);
149  tupleStageBuilder->setSinkContext(aggregator);
150  tupleStageBuilder->setRepartition(true);
151  tupleStageBuilder->setAllocatorPolicy(curComp->getAllocatorPolicy());
152  tupleStageBuilder->setCollectAsMap(agg->isCollectAsMap());
153  tupleStageBuilder->setNumNodesToCollect(agg->getNumNodesToCollect());
154 
155  // to push back the job stage
156  result->physicalPlanToOutput.emplace_back(tupleStageBuilder->build());
157 
158  // to create the consuming job stage for aggregation;
160 
161  // does the current computation already need materialization
162  if (curComp->needsMaterializeOutput()) {
163  sink = makeObject<SetIdentifier>(curComp->getDatabaseName(), curComp->getSetName());
164  } else {
165  sink = makeObject<SetIdentifier>(jobId, node->getOutputName() + "_aggregationResult", PartitionedHashSetType, true);
166  }
167 
168  // initialize the build hash partition set builder stage
169  AggregationJobStageBuilderPtr aggregationBuilder = make_shared<AggregationJobStageBuilder>();
170 
171  // create an aggregation job
172  aggregationBuilder->setJobId(jobId);
173  aggregationBuilder->setJobStageId(nextStageID);
174  aggregationBuilder->setAggComp(agg);
175  aggregationBuilder->setSourceContext(aggregator);
176  aggregationBuilder->setSinkContext(sink);
177  aggregationBuilder->setMaterializeOrNot(curComp->needsMaterializeOutput());
178 
179  // to push back the aggregation stage;
180  result->physicalPlanToOutput.emplace_back(aggregationBuilder->build());
181 
182  // to push back the aggregator set
183  result->interGlobalSets.push_back(aggregator);
184 
185  // update the source sets (if the source node is not being used anymore we just remove it)
186  result->createdSourceComputations.push_back(getSimpleNodeHandle());
187 
188  // we succeeded
189  result->success = true;
190 
191  // the new source is the sink
192  sourceSetIdentifier = sink;
193 
194  return result;
195 }
196 
198  SimplePhysicalNodePtr &prevNode,
199  const StatisticsPtr &stats,
200  int nextStageID) {
201  // create a analyzer result
202  PhysicalOptimizerResultPtr result = make_shared<PhysicalOptimizerResult>();
203 
204  // the computation specifier of this aggregation
205  std::string computationSpecifier = node->getComputationName();
206 
207  // grab the computation associated with this node
208  Handle<Computation> curComp = logicalPlan->getNode(computationSpecifier).getComputationHandle();
209 
210  // I am a pipeline breaker because I have more than one consumers
211  Handle<SetIdentifier> sink = nullptr;
212 
213  // in the case that the current computation does not require materialization by default
214  // we have to set an output to it, we it gets materialized
215  if (!curComp->needsMaterializeOutput()) {
216 
217  // set the output
218  curComp->setOutput(jobId, node->getOutputName());
219 
220  // create the sink and set the page size
221  sink = makeObject<SetIdentifier>(jobId, node->getOutputName());
222  sink->setPageSize(conf->getPageSize());
223 
224  // add this set to the list of intermediate sets
225  result->interGlobalSets.push_back(sink);
226  } else {
227  // this computation needs materialization either way so just create the sink set identifier
228  sink = makeObject<SetIdentifier>(curComp->getDatabaseName(), curComp->getSetName());
229  }
230 
231  // create the set identifier where we store the data to be aggregated after the TupleSetJobStage
232  Handle<SetIdentifier> aggregator = makeObject<SetIdentifier>(jobId, node->getOutputName() + "_aggregationData");
233  aggregator->setPageSize(conf->getShufflePageSize());
234 
235  // are we using a combiner (the thing that combines the records by key before sending them to the right node)
236  Handle<SetIdentifier> combiner = nullptr;
237  if (curComp->isUsingCombiner()) {
238  // create a set identifier for the combiner
239  combiner = makeObject<SetIdentifier>(jobId, node->getOutputName() + "_combinerData");
240  }
241 
242  // create the tuple set job stage to run the pipeline with a shuffle sink
243  // here is what we are doing :
244  // the input to the stage is either the output of the join or the source node we started)
245  // the repartitioning flag is set to true, so that we run a pipeline with a shuffle sink
246  // the pipeline until the combiner will apply all the computations to the source set
247  // and put them on a page partitioned into multiple maps the combiner will then read the maps that belong to
248  // the partitions of a certain node and combine them by key. The output pages of the combiner will then be sent
249  tupleStageBuilder->setJobStageId(nextStageID++);
250  tupleStageBuilder->setTargetTupleSetName(node->getInputName());
251  tupleStageBuilder->setTargetComputationName(computationSpecifier);
252  tupleStageBuilder->setOutputTypeName("IntermediateData");
253  tupleStageBuilder->setCombiner(combiner);
254  tupleStageBuilder->setSinkContext(aggregator);
255  tupleStageBuilder->setRepartition(true);
256  tupleStageBuilder->setAllocatorPolicy(curComp->getAllocatorPolicy());
257 
258  // add the created tuple job stage to the
259  result->physicalPlanToOutput.emplace_back(tupleStageBuilder->build());
260 
261  // cast the computation to AbstractAggregateComp to create the consuming job stage for aggregation
262  Handle<AbstractAggregateComp> agg = unsafeCast<AbstractAggregateComp, Computation>(curComp);
263 
264  // we need an aggregation stage after this to aggregate the results from the tuple stage we previously created
265  // the data will be aggregated in the sink set
266  AggregationJobStageBuilderPtr aggregationBuilder = make_shared<AggregationJobStageBuilder>();
267 
268  aggregationBuilder->setJobId(jobId);
269  aggregationBuilder->setJobStageId(nextStageID);
270  aggregationBuilder->setAggComp(agg);
271  aggregationBuilder->setSourceContext(aggregator);
272  aggregationBuilder->setSinkContext(sink);
273  aggregationBuilder->setMaterializeOrNot(curComp->needsMaterializeOutput());
274 
275  // to push back the aggregation stage;
276  result->physicalPlanToOutput.emplace_back(aggregationBuilder->build());
277 
278  // to push back the aggregator set
279  result->interGlobalSets.push_back(aggregator);
280 
281  // update the source sets to reflect the state after executing the job stages
282  result->createdSourceComputations.push_back(getSimpleNodeHandle());
283 
284  // we succeeded
285  result->success = true;
286 
287  // the new source is the sink
288  sourceSetIdentifier = sink;
289 
290  return result;
291 }
292 
293 }
std::shared_ptr< Statistics > StatisticsPtr
Definition: Statistics.h:27
PhysicalOptimizerResultPtr analyzeOutput(TupleSetJobStageBuilderPtr &tupleStageBuilder, SimplePhysicalNodePtr &prevNode, const StatisticsPtr &stats, int nextStageID) override
std::shared_ptr< LogicalPlan > LogicalPlanPtr
Definition: ComputePlan.h:36
PhysicalOptimizerResultPtr analyzeMultipleConsumers(TupleSetJobStageBuilderPtr &tupleStageBuilder, SimplePhysicalNodePtr &prevNode, const StatisticsPtr &stats, int nextStageID) override
SimplePhysicalAggregationNode(string jobId, AtomicComputationPtr node, const Handle< ComputePlan > &computePlan, LogicalPlanPtr logicalPlan, ConfigurationPtr conf)
shared_ptr< Configuration > ConfigurationPtr
Definition: Configuration.h:89
std::shared_ptr< struct AtomicComputation > AtomicComputationPtr
std::shared_ptr< SimplePhysicalNode > SimplePhysicalNodePtr
SimplePhysicalNodePtr getSimpleNodeHandle()
AtomicComputationPtr node
Handle< SetIdentifier > sourceSetIdentifier
std::shared_ptr< AggregationJobStageBuilder > AggregationJobStageBuilderPtr
PhysicalOptimizerResultPtr analyzeSingleConsumer(TupleSetJobStageBuilderPtr &tupleStageBuilder, SimplePhysicalNodePtr &prevNode, const StatisticsPtr &stats, int nextStageID) override
std::shared_ptr< TupleSetJobStageBuilder > TupleSetJobStageBuilderPtr
std::shared_ptr< PhysicalOptimizerResult > PhysicalOptimizerResultPtr