A platform for high-performance distributed tool and library development written in C++. It can be deployed in two different cluster modes: standalone or distributed. API for v0.5.0, released on June 13, 2018.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
SimplePhysicalNode.cc
Go to the documentation of this file.
1 /*****************************************************************************
2  * *
3  * Copyright 2018 Rice University *
4  * *
5  * Licensed under the Apache License, Version 2.0 (the "License"); *
6  * you may not use this file except in compliance with the License. *
7  * You may obtain a copy of the License at *
8  * *
9  * http://www.apache.org/licenses/LICENSE-2.0 *
10  * *
11  * Unless required by applicable law or agreed to in writing, software *
12  * distributed under the License is distributed on an "AS IS" BASIS, *
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. *
14  * See the License for the specific language governing permissions and *
15  * limitations under the License. *
16  * *
17  *****************************************************************************/
18 #include "SetIdentifier.h"
19 #include "Statistics.h"
22 
23 namespace pdb {
24 
27  const Handle<ComputePlan> &computePlan,
28  LogicalPlanPtr logicalPlan,
29  ConfigurationPtr conf) : AbstractPhysicalNode(jobId, computePlan, logicalPlan, conf),
30  node(node) {
31  // if this node is a scan set we want to create a set identifier for it
32  if(node->getAtomicComputationTypeID() == ScanSetAtomicTypeID) {
33 
34  // grab the computation
35  std::cout << node->getOutputName() << std::endl;
36  Handle<Computation> comp = logicalPlan->getNode(node->getComputationName()).getComputationHandle();
37 
38  // create a set identifier from it
40  }
41 }
42 
44 
45  // create a job stage builder
46  pdb::TupleSetJobStageBuilderPtr jobStageBuilder = make_shared<TupleSetJobStageBuilder>();
47 
48  // the input to the pipeline is the output set of the source node
49  jobStageBuilder->setSourceTupleSetName(node->getOutputName());
50 
51  // set the source set identifier
52  jobStageBuilder->setSourceContext(sourceSetIdentifier);
53 
54  // is this source a result of an aggregation
55  jobStageBuilder->setInputAggHashOut(sourceSetIdentifier->isAggregationResult());
56 
57  // set the job id
58  jobStageBuilder->setJobId(jobId);
59 
60  // set the compute plan
61  jobStageBuilder->setComputePlan(computePlan);
62 
63  // this is a source so there is no last node
64  SimplePhysicalNodePtr prevNode = nullptr;
65 
66  // run the recursive analysis it will essentially grab the first consumer of the source node
67  // and analyze it as if the source had just one consumer
68  auto result = SimplePhysicalNode::analyzeSingleConsumer(jobStageBuilder, prevNode, stats, nextStageID);
69 
70  // if we failed we want to avoid processing the same consumer twice therefore we are moving it to the back
71  if(!result->success) {
72 
73  // grab the consumer
74  auto tmp = activeConsumers.front();
75 
76  // pop it from the front
77  activeConsumers.pop_front();
78 
79  // push it to the back
80  activeConsumers.push_back(tmp);
81  }
82 
83  return result;
84 }
85 
87  SimplePhysicalNodePtr &prevNode,
88  const StatisticsPtr &stats,
89  int nextStageID) {
90 
91  // depending on the number of consumers this node has we call different methods to analyze the plan
92  switch (consumers.size()){
93  // we are analyzing an output node
94  case 0: return analyzeOutput(jobStageBuilder, prevNode, stats, nextStageID);
95 
96  // we are analyzing a node that has a single consumer
97  case 1: return analyzeSingleConsumer(jobStageBuilder, prevNode, stats, nextStageID);
98 
99  // we are analyzing a node that has multiple consumers
100  default: return analyzeMultipleConsumers(jobStageBuilder, prevNode, stats, nextStageID);
101  }
102 }
103 
105  return node;
106 }
107 
109  return !activeConsumers.empty();
110 }
111 
113  return *sourceSetIdentifier == *set;
114 }
115 
117  // call the consumer
119 
120  // add the consumer to the active consumers
121  activeConsumers.push_back(std::dynamic_pointer_cast<SimplePhysicalNode>(consumer));
122 }
123 
125 
126  // if the set identifier does not exist log that
127  if (source == nullptr) {
128  PDB_COUT << "WARNING: the set provided to the get cost is a nullptr\n";
129  return 0;
130  }
131 
132  // do we have statistics, if not just return 0
133  if(stats == nullptr) {
134  PDB_COUT << "WARNING: there are not stats when looking for the set=" << source->toSourceSetName() << "\n";
135  return 0;
136  }
137 
138  // calculate the cost based on the formula cost = number_of_bytes / 1000000
139  double cost = stats->getNumBytes(source->getDatabase(), source->getSetName());
140  return cost / 1000000.0;
141 }
142 
144  SimplePhysicalNodePtr &prevNode,
145  const StatisticsPtr &stats,
146  int nextStageID) {
147 
148  // add this node to the pipeline
149  tupleStageBuilder->addTupleSetToBuildPipeline(node->getOutputName());
150 
151  // grab the computation associated with this node
152  Handle<Computation> curComp = logicalPlan->getNode(node->getComputationName()).getComputationHandle();
153 
154 
155  // set this node as the previous node
157 
158  // go to the next node
159  PhysicalOptimizerResultPtr result = activeConsumers.front()->analyze(tupleStageBuilder,
160  newPrevNode,
161  stats,
162  nextStageID);
163 
164  // remove the consumer we just processed if we succeeded
165  if(result->success) {
166  activeConsumers.pop_front();
167  }
168 
169  return result;
170 }
171 
173  SimplePhysicalNodePtr &prevNode,
174  const StatisticsPtr &stats,
175  int nextStageID) {
176 
177  // grab the computation associated with this node
178  Handle<Computation> curComp = logicalPlan->getNode(node->getComputationName()).getComputationHandle();
179 
180  // create a SetIdentifier for the output set
181  Handle<SetIdentifier> sink = makeObject<SetIdentifier>(curComp->getDatabaseName(), curComp->getSetName());
182 
183  // set the parameters
184  tupleStageBuilder->setJobStageId(nextStageID);
185  tupleStageBuilder->setTargetTupleSetName(node->getInputName());
186  tupleStageBuilder->setTargetComputationName(node->getComputationName());
187  tupleStageBuilder->setOutputTypeName(curComp->getOutputType());
188  tupleStageBuilder->setSinkContext(sink);
189  tupleStageBuilder->setAllocatorPolicy(curComp->getAllocatorPolicy());
190 
191  // create the job stage
192  Handle<TupleSetJobStage> jobStage = tupleStageBuilder->build();
193 
194  // create a analyzer result
195  PhysicalOptimizerResultPtr result = make_shared<PhysicalOptimizerResult>();
196 
197  // add the job stage to the result
198  result->physicalPlanToOutput.emplace_back(jobStage);
199  result->success = true;
200 
201  return result;
202 }
203 
205  SimplePhysicalNodePtr &prevNode,
206  const StatisticsPtr &stats,
207  int nextStageID) {
208 
209  // create a analyzer result
210  PhysicalOptimizerResultPtr result = make_shared<PhysicalOptimizerResult>();
211 
212  // grab the output of the current node
213  std::string outputName = node->getOutputName();
214 
215  // add this node to the pipeline
216  tupleSetJobStageBuilder->addTupleSetToBuildPipeline(outputName);
217 
218  // grab the computation associated with this node
219  Handle<Computation> curComp = logicalPlan->getNode(node->getComputationName()).getComputationHandle();
220 
221  // I am a pipeline breaker because I have more than one consumers
222  Handle<SetIdentifier> sink = nullptr;
223 
224  // in the case that the current computation does not require materialization by default
225  // we have to set an output to it, we it gets materialized
226  if (!curComp->needsMaterializeOutput()) {
227 
228  // set the output
229  curComp->setOutput(jobId, outputName);
230 
231  // create the sink and set the page size
232  sink = makeObject<SetIdentifier>(jobId, outputName);
233  sink->setPageSize(conf->getPageSize());
234 
235  // add this set to the list of intermediate sets
236  result->interGlobalSets.push_back(sink);
237  } else {
238  // this computation needs materialization either way so just create the sink set identifier
239  sink = makeObject<SetIdentifier>(curComp->getDatabaseName(), curComp->getSetName());
240  }
241 
242  // set the parameters
243  tupleSetJobStageBuilder->setJobStageId(nextStageID);
244  tupleSetJobStageBuilder->setTargetTupleSetName(outputName);
245  tupleSetJobStageBuilder->setTargetComputationName(node->getComputationName());
246  tupleSetJobStageBuilder->setOutputTypeName(curComp->getOutputType());
247  tupleSetJobStageBuilder->setSinkContext(sink);
248  tupleSetJobStageBuilder->setAllocatorPolicy(curComp->getAllocatorPolicy());
249 
250  // create the job stage
251  Handle<TupleSetJobStage> jobStage = tupleSetJobStageBuilder->build();
252 
253  // add the job stage to the result
254  result->physicalPlanToOutput.emplace_back(jobStage);
255  result->success = true;
256  result->createdSourceComputations.push_back(getSimpleNodeHandle());
257 
258  // the new source is now the sink
259  sourceSetIdentifier = sink;
260 
261  return result;
262 }
263 
265 
266  // return the cost of the source set identifier
267  return getCost(sourceSetIdentifier, stats);
268 }
269 
271  return node->getOutputName();
272 }
273 
275  return sourceSetIdentifier;
276 }
277 
279  // return the handle to this node
280  return std::dynamic_pointer_cast<SimplePhysicalNode>(getHandle());
281 }
282 
283 }
284 
AbstractPhysicalNodePtr getHandle()
virtual PhysicalOptimizerResultPtr analyzeOutput(TupleSetJobStageBuilderPtr &tupleStageBuilder, SimplePhysicalNodePtr &prevNode, const StatisticsPtr &stats, int nextStageID)
SimplePhysicalNode(string jobId, AtomicComputationPtr node, const Handle< ComputePlan > &computePlan, LogicalPlanPtr logicalPlan, ConfigurationPtr conf)
const AtomicComputationPtr & getNode() const
Handle< ComputePlan > computePlan
std::shared_ptr< Statistics > StatisticsPtr
Definition: Statistics.h:27
PhysicalOptimizerResultPtr analyze(const StatisticsPtr &stats, int nextStageID) override
void addConsumer(const AbstractPhysicalNodePtr &consumer) override
double getCost(const StatisticsPtr &stats) override
bool isConsuming(Handle< SetIdentifier > &set) override
std::shared_ptr< LogicalPlan > LogicalPlanPtr
Definition: ComputePlan.h:36
std::string getNodeIdentifier() override
virtual PhysicalOptimizerResultPtr analyzeMultipleConsumers(TupleSetJobStageBuilderPtr &ptr, SimplePhysicalNodePtr &prevNode, const StatisticsPtr &stats, int nextStageID)
const Handle< SetIdentifier > & getSourceSetIdentifier() const
#define PDB_COUT
Definition: PDBDebug.h:31
shared_ptr< Configuration > ConfigurationPtr
Definition: Configuration.h:89
std::shared_ptr< struct AtomicComputation > AtomicComputationPtr
std::shared_ptr< SimplePhysicalNode > SimplePhysicalNodePtr
virtual PhysicalOptimizerResultPtr analyzeSingleConsumer(TupleSetJobStageBuilderPtr &tupleStageBuilder, SimplePhysicalNodePtr &prevNode, const StatisticsPtr &stats, int nextStageID)
std::list< AbstractPhysicalNodePtr > consumers
std::shared_ptr< AbstractPhysicalNode > AbstractPhysicalNodePtr
Handle< SetIdentifier > getSetIdentifierFromComputation(Handle< Computation > computation)
SimplePhysicalNodePtr getSimpleNodeHandle()
virtual void addConsumer(const pdb::AbstractPhysicalNodePtr &consumer)
AtomicComputationPtr node
Handle< SetIdentifier > sourceSetIdentifier
std::list< SimplePhysicalNodePtr > activeConsumers
std::shared_ptr< TupleSetJobStageBuilder > TupleSetJobStageBuilderPtr
std::shared_ptr< PhysicalOptimizerResult > PhysicalOptimizerResultPtr