A platform for high-performance distributed tool and library development written in C++. It can be deployed in two different cluster modes: standalone or distributed. API for v0.5.0, released on June 13, 2018.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
AdvancedPhysicalPipelineAlgorithm.cc
Go to the documentation of this file.
1 /*****************************************************************************
2  * *
3  * Copyright 2018 Rice University *
4  * *
5  * Licensed under the Apache License, Version 2.0 (the "License"); *
6  * you may not use this file except in compliance with the License. *
7  * You may obtain a copy of the License at *
8  * *
9  * http://www.apache.org/licenses/LICENSE-2.0 *
10  * *
11  * Unless required by applicable law or agreed to in writing, software *
12  * distributed under the License is distributed on an "AS IS" BASIS, *
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. *
14  * See the License for the specific language governing permissions and *
15  * limitations under the License. *
16  * *
17  *****************************************************************************/
18 
22 
23 namespace pdb {
24 
26  const std::string &jobID,
27  bool isProbing,
28  bool isOutput,
29  const Handle<SetIdentifier> &source,
30  const Handle<ComputePlan> &computePlan,
31  const LogicalPlanPtr &logicalPlan,
32  const ConfigurationPtr &conf)
34  jobID,
35  isProbing,
36  isOutput,
37  source,
38  computePlan,
39  logicalPlan,
40  conf) {}
41 
43  const StatisticsPtr &stats) {
44 
45  // extract the atomic computations from the pipes for this algorithm
47 
48  // extract the hash sets we might want to probe
50 
51  // get the final atomic computation
52  auto finalAtomicComputation = this->pipelineComputations.back();
53 
54  // get the source atomic computation
55  auto sourceAtomicComputation = this->pipelineComputations.front();
56 
57  // create a analyzer result
58  PhysicalOptimizerResultPtr result = make_shared<PhysicalOptimizerResult>();
59 
60  // grab the output of the current node
61  std::string outputName = finalAtomicComputation->getOutputName();
62 
63  // grab the computation associated with this node
64  Handle<Computation> curComp = logicalPlan->getNode(finalAtomicComputation->getComputationName()).getComputationHandle();
65 
66  // I am a pipeline breaker because I have more than one consumers
67  Handle<SetIdentifier> sink = nullptr;
68 
69  // in the case that the current computation does not require materialization by default
70  // we have to set an output to it, we it gets materialized
71  if (!curComp->needsMaterializeOutput()) {
72 
73  // set the output
74  curComp->setOutput(jobID, outputName);
75 
76  // create the sink and set the page size
77  sink = makeObject<SetIdentifier>(jobID, outputName);
78  sink->setPageSize(conf->getPageSize());
79 
80  // add this set to the list of intermediate sets
81  result->interGlobalSets.push_back(sink);
82  } else {
83  // this computation needs materialization either way so just create the sink set identifier
84  sink = makeObject<SetIdentifier>(curComp->getDatabaseName(), curComp->getSetName());
85  }
86 
87  // create a tuple set job stage builder
88  TupleSetJobStageBuilderPtr tupleStageBuilder = make_shared<TupleSetJobStageBuilder>();
89 
90  // the input to the pipeline is the output set of the source node
91  tupleStageBuilder->setSourceTupleSetName(sourceAtomicComputation->getOutputName());
92 
93  // set the source set identifier
94  tupleStageBuilder->setSourceContext(source);
95 
96  // is this source a result of an aggregation
97  tupleStageBuilder->setInputAggHashOut(source->isAggregationResult());
98 
99  // set the job id
100  tupleStageBuilder->setJobId(jobID);
101 
102  // are we probing a hash set in this pipeline
103  tupleStageBuilder->setProbing(isProbing);
104 
105  // set the compute plan
106  tupleStageBuilder->setComputePlan(computePlan);
107 
108  // copy the computation names
109  for(const auto &it : this->pipelineComputations) {
110 
111  // we don't need the output set name... (that is jsut the way the pipeline building works)
112  if(it->getAtomicComputationTypeID() == WriteSetTypeID) {
113  continue;
114  }
115 
116  // add the set name of the atomic computation to the pipeline
117  tupleStageBuilder->addTupleSetToBuildPipeline(it->getOutputName());
118  }
119 
120  // set the parameters
121  tupleStageBuilder->setJobStageId(nextStageID);
122  tupleStageBuilder->setTargetTupleSetName(finalAtomicComputation->getInputName());
123  tupleStageBuilder->setTargetComputationName(finalAtomicComputation->getComputationName());
124  tupleStageBuilder->setOutputTypeName(curComp->getOutputType());
125  tupleStageBuilder->setSinkContext(sink);
126  tupleStageBuilder->setAllocatorPolicy(curComp->getAllocatorPolicy());
127 
128  // add all the probing hash sets
129  for(auto it : probingHashSets) {
130  tupleStageBuilder->addHashSetToProbe(it.first, it.second);
131  }
132 
133  // create the job stage
134  Handle<TupleSetJobStage> jobStage = tupleStageBuilder->build();
135 
136  // add the job stage to the result
137  result->physicalPlanToOutput.emplace_back(jobStage);
138  result->success = true;
139 
140  // update the source pipes to reflect the state after executing the job stages
141  // if we have a consumer we have a new source pipe since we materialize this result
142  if(pipeline.back()->getNumConsumers() != 0) {
143 
144  // add consumers as new sources
145  for(int i = 0; i < pipeline.back()->getNumConsumers(); ++i) {
146  result->createdSourceComputations.push_back(pipeline.back()->getConsumer(i));
147  }
148  }
149 
150  // the new source is now the sink
151  updateConsumers(sink, approximateResultSize(stats), stats);
152 
153  // return the result
154  return result;
155 }
156 
158  return SELECTION_ALGORITHM;
159 }
160 
161 
162 
163 
164 }
std::shared_ptr< Statistics > StatisticsPtr
Definition: Statistics.h:27
void updateConsumers(const Handle< SetIdentifier > &sink, DataStatistics approxSize, const StatisticsPtr &stats)
std::list< AdvancedPhysicalPipelineNodePtr > pipeline
AdvancedPhysicalAbstractAlgorithmTypeID getType() override
PhysicalOptimizerResultPtr generate(int nextStageID, const StatisticsPtr &stats) override
std::shared_ptr< LogicalPlan > LogicalPlanPtr
Definition: ComputePlan.h:36
virtual DataStatistics approximateResultSize(const StatisticsPtr &stats)
shared_ptr< Configuration > ConfigurationPtr
Definition: Configuration.h:89
AdvancedPhysicalPipelineAlgorithm(const AdvancedPhysicalPipelineNodePtr &handle, const std::string &jobID, bool isProbing, bool isOutput, const Handle< SetIdentifier > &source, const Handle< ComputePlan > &computePlan, const LogicalPlanPtr &logicalPlan, const ConfigurationPtr &conf)
unordered_map< std::string, std::string > probingHashSets
std::shared_ptr< AdvancedPhysicalAbstractPipe > AdvancedPhysicalPipelineNodePtr
std::shared_ptr< TupleSetJobStageBuilder > TupleSetJobStageBuilderPtr
std::shared_ptr< PhysicalOptimizerResult > PhysicalOptimizerResultPtr