A platform for high-performance distributed tool and library development written in C++. It can be deployed in two different cluster modes: standalone or distributed. API for v0.5.0, released on June 13, 2018.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
AdvancedPhysicalAggregationPipelineAlgorithm.cc
Go to the documentation of this file.
1 /*****************************************************************************
2  * *
3  * Copyright 2018 Rice University *
4  * *
5  * Licensed under the Apache License, Version 2.0 (the "License"); *
6  * you may not use this file except in compliance with the License. *
7  * You may obtain a copy of the License at *
8  * *
9  * http://www.apache.org/licenses/LICENSE-2.0 *
10  * *
11  * Unless required by applicable law or agreed to in writing, software *
12  * distributed under the License is distributed on an "AS IS" BASIS, *
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. *
14  * See the License for the specific language governing permissions and *
15  * limitations under the License. *
16  * *
17  *****************************************************************************/
18 
22 
23 namespace pdb {
24 
26  const std::string &jobID,
27  bool isProbing,
28  bool isOutput,
29  const Handle<SetIdentifier> &source,
30  const Handle<ComputePlan> &computePlan,
31  const LogicalPlanPtr &logicalPlan,
32  const ConfigurationPtr &conf)
34  jobID,
35  isProbing,
36  isOutput,
37  source,
38  computePlan,
39  logicalPlan,
40  conf) {}
42  const StatisticsPtr &stats) {
43 
44  // extract the atomic computations from the pipes for this algorithm
46 
47  // extract the hash sets we might want to probe
49 
50  // get the final atomic computation
51  auto finalAtomicComputation = this->pipelineComputations.back();
52 
53  // get the source atomic computation
54  auto sourceAtomicComputation = this->pipelineComputations.front();
55 
56  // create a analyzer result
57  PhysicalOptimizerResultPtr result = make_shared<PhysicalOptimizerResult>();
58 
59  // grab the output of the current node
60  std::string outputName = finalAtomicComputation->getOutputName();
61 
62  // the computation specifier of this aggregation
63  std::string computationSpecifier = finalAtomicComputation->getComputationName();
64 
65  // grab the computation associated with this node
66  Handle<Computation> curComp = logicalPlan->getNode(computationSpecifier).getComputationHandle();
67 
68  // I am a pipeline breaker because I have more than one consumers
69  Handle<SetIdentifier> sink = nullptr;
70 
71  // in the case that the current computation does not require materialization by default
72  // we have to set an output to it, we it gets materialized
73  if (!curComp->needsMaterializeOutput() && pipeline.back()->getNumConsumers() == 1) {
74 
75  sink = makeObject<SetIdentifier>(jobID,
76  finalAtomicComputation->getOutputName() + "_aggregationResult",
78  true);
79  }
80  else if(!curComp->needsMaterializeOutput()){
81 
82  // set the output
83  curComp->setOutput(jobID, finalAtomicComputation->getOutputName());
84 
85  // create the sink and set the page size
86  sink = makeObject<SetIdentifier>(jobID, finalAtomicComputation->getOutputName());
87  sink->setPageSize(conf->getPageSize());
88 
89  // add this set to the list of intermediate sets
90  result->interGlobalSets.push_back(sink);
91 
92  } else {
93  // this computation needs materialization either way so just create the sink set identifier
94  sink = makeObject<SetIdentifier>(curComp->getDatabaseName(), curComp->getSetName());
95  }
96 
97  // create the set identifier where we store the data to be aggregated after the TupleSetJobStage
98  Handle<SetIdentifier> aggregator = makeObject<SetIdentifier>(jobID, finalAtomicComputation->getOutputName() + "_aggregationData");
99  aggregator->setPageSize(conf->getShufflePageSize());
100 
101  // are we using a combiner (the thing that combines the records by key before sending them to the right node)
102  Handle<SetIdentifier> combiner = nullptr;
103  if (curComp->isUsingCombiner()) {
104  // create a set identifier for the combiner
105  combiner = makeObject<SetIdentifier>(jobID, finalAtomicComputation->getOutputName() + "_combinerData");
106  }
107 
108  // create a tuple set job stage builder
109  TupleSetJobStageBuilderPtr tupleStageBuilder = make_shared<TupleSetJobStageBuilder>();
110 
111  // the input to the pipeline is the output set of the source node
112  tupleStageBuilder->setSourceTupleSetName(sourceAtomicComputation->getOutputName());
113 
114  // set the source set identifier
115  tupleStageBuilder->setSourceContext(source);
116 
117  // is this source a result of an aggregation
118  tupleStageBuilder->setInputAggHashOut(source->isAggregationResult());
119 
120  // set the job id
121  tupleStageBuilder->setJobId(jobID);
122 
123  // are we probing a hash set in this pipeline
124  tupleStageBuilder->setProbing(isProbing);
125 
126  // set the compute plan
127  tupleStageBuilder->setComputePlan(computePlan);
128 
129  // copy the computation names
130  for(auto it = this->pipelineComputations.begin(); it != this->pipelineComputations.end(); ++it) {
131 
132  // grab the computation
133  auto &comp = *it;
134 
135  // we don't need the output set name... (that is just the way the pipeline building works)
136  // the aggregation is not applied here it is applied in the next job stage
137  if(comp->getAtomicComputationTypeID() == WriteSetTypeID ||
138  (it != this->pipelineComputations.begin() && comp->getAtomicComputationTypeID() == ApplyAggTypeID)) {
139  continue;
140  }
141 
142  // add the set name of the atomic computation to the pipeline
143  tupleStageBuilder->addTupleSetToBuildPipeline(comp->getOutputName());
144  }
145 
146  // create the tuple set job stage to run the pipeline with a shuffle sink
147  // here is what we are doing :
148  // the input to the stage is either the output of the join or the source node we started)
149  // the repartitioning flag is set to true, so that we run a pipeline with a shuffle sink
150  // the pipeline until the combiner will apply all the computations to the source set
151  // and put them on a page partitioned into multiple maps the combiner will then read the maps that belong to
152  // the partitions of a certain node and combine them by key. The output pages of the combiner will then be sent
153  tupleStageBuilder->setJobStageId(nextStageID++);
154  tupleStageBuilder->setTargetTupleSetName(finalAtomicComputation->getInputName());
155  tupleStageBuilder->setTargetComputationName(computationSpecifier);
156  tupleStageBuilder->setOutputTypeName("IntermediateData");
157  tupleStageBuilder->setCombiner(combiner);
158  tupleStageBuilder->setSinkContext(aggregator);
159  tupleStageBuilder->setRepartition(true);
160  tupleStageBuilder->setAllocatorPolicy(curComp->getAllocatorPolicy());
161 
162  // add all the probing hash sets
163  for(auto it : probingHashSets) {
164  tupleStageBuilder->addHashSetToProbe(it.first, it.second);
165  }
166 
167  // add the created tuple job stage to the
168  result->physicalPlanToOutput.emplace_back(tupleStageBuilder->build());
169 
170  // cast the computation to AbstractAggregateComp to create the consuming job stage for aggregation
171  Handle<AbstractAggregateComp> agg = unsafeCast<AbstractAggregateComp, Computation>(curComp);
172 
173  // we need an aggregation stage after this to aggregate the results from the tuple stage we previously created
174  // the data will be aggregated in the sink set
175  AggregationJobStageBuilderPtr aggregationBuilder = make_shared<AggregationJobStageBuilder>();
176 
177  aggregationBuilder->setJobId(jobID);
178  aggregationBuilder->setJobStageId(nextStageID);
179  aggregationBuilder->setAggComp(agg);
180  aggregationBuilder->setSourceContext(aggregator);
181  aggregationBuilder->setSinkContext(sink);
182  aggregationBuilder->setMaterializeOrNot(curComp->needsMaterializeOutput());
183 
184  // to push back the aggregation stage;
185  result->physicalPlanToOutput.emplace_back(aggregationBuilder->build());
186 
187  // to push back the aggregator set
188  result->interGlobalSets.push_back(aggregator);
189 
190  // update the source pipes to reflect the state after executing the job stages
191  // if we have a consumer we have a new source pipe since we materialize this result
192  if(pipeline.back()->getNumConsumers() != 0) {
193 
194  // add consumers as new sources
195  for(int i = 0; i < pipeline.back()->getNumConsumers(); ++i) {
196  result->createdSourceComputations.push_back(pipeline.back()->getConsumer(i));
197  }
198  }
199 
200  // we succeeded
201  result->success = true;
202 
203  // the new source is the sink
204  updateConsumers(sink, approximateResultSize(stats), stats);
205 
206  return result;
207 }
208 
210  return AGGREGATION_ALGORITHM;
211 }
212 
213 
214 }
215 
std::shared_ptr< Statistics > StatisticsPtr
Definition: Statistics.h:27
void updateConsumers(const Handle< SetIdentifier > &sink, DataStatistics approxSize, const StatisticsPtr &stats)
std::list< AdvancedPhysicalPipelineNodePtr > pipeline
std::shared_ptr< LogicalPlan > LogicalPlanPtr
Definition: ComputePlan.h:36
virtual DataStatistics approximateResultSize(const StatisticsPtr &stats)
AdvancedPhysicalAggregationPipelineAlgorithm(const AdvancedPhysicalPipelineNodePtr &handle, const std::string &jobID, bool isProbing, bool isOutput, const Handle< SetIdentifier > &source, const Handle< ComputePlan > &computePlan, const LogicalPlanPtr &logicalPlan, const ConfigurationPtr &conf)
shared_ptr< Configuration > ConfigurationPtr
Definition: Configuration.h:89
PhysicalOptimizerResultPtr generate(int nextStageID, const StatisticsPtr &stats) override
std::shared_ptr< AggregationJobStageBuilder > AggregationJobStageBuilderPtr
unordered_map< std::string, std::string > probingHashSets
std::shared_ptr< AdvancedPhysicalAbstractPipe > AdvancedPhysicalPipelineNodePtr
std::shared_ptr< TupleSetJobStageBuilder > TupleSetJobStageBuilderPtr
std::shared_ptr< PhysicalOptimizerResult > PhysicalOptimizerResultPtr