A platform for high-performance distributed tool and library development written in C++. It can be deployed in two different cluster modes: standalone or distributed. API for v0.5.0, released on June 13, 2018.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
AdvancedPhysicalAbstractPipe.cc
Go to the documentation of this file.
1 /*****************************************************************************
2  * *
3  * Copyright 2018 Rice University *
4  * *
5  * Licensed under the Apache License, Version 2.0 (the "License"); *
6  * you may not use this file except in compliance with the License. *
7  * You may obtain a copy of the License at *
8  * *
9  * http://www.apache.org/licenses/LICENSE-2.0 *
10  * *
11  * Unless required by applicable law or agreed to in writing, software *
12  * distributed under the License is distributed on an "AS IS" BASIS, *
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. *
14  * See the License for the specific language governing permissions and *
15  * limitations under the License. *
16  * *
17  *****************************************************************************/
18 
22 
23 namespace pdb {
24 
26  const Handle<ComputePlan> &computePlan,
27  LogicalPlanPtr &logicalPlan,
28  ConfigurationPtr &conf,
29  vector<AtomicComputationPtr> &pipeComputations,
30  size_t id) : AbstractPhysicalNode(jobId,
31  computePlan,
32  logicalPlan,
33  conf),
34  pipeComputations(pipeComputations),
35  selectedAlgorithm(nullptr),
36  id(id) {
37 
38  // if this node is a scan set we want to create a set identifier for it
39  if(isSource()) {
40 
41  // get the name of the computation
42  auto computationName = pipeComputations.front()->getComputationName();
43 
44  // grab the computation
45  Handle<Computation> comp = logicalPlan->getNode(computationName).getComputationHandle();
46 
47  // create a set identifier from it
49  }
50 }
51 
53 
54 
57 
58  // we start with pipelining this pipeline maybe we will pipeline more
59  std::vector<AdvancedPhysicalPipelineNodePtr> pipelines = { getAdvancedPhysicalNodeHandle() };
60 
61  // delegate the logic for the pipelining to the next node
62  return consumers.front()->to<AdvancedPhysicalAbstractPipe>()->pipelineMe(nextStageID, pipelines, stats);
63  }
64 
66  if(consumers.empty()) {
67  return selectOutputAlgorithm()->generate(nextStageID, stats);
68  }
69 
71  // TODO for now I assume I have only one consumer
73 
75  if(consumers.size() == 1 && isChainable()) {
76 
77  // chain this thing to the next pipe
78  return consumers.front()->to<AdvancedPhysicalAbstractPipe>()->chainMe(nextStageID,
79  stats,
80  selectedAlgorithm->generate(nextStageID, stats));
81  }
82 
84  return selectedAlgorithm->generate(nextStageID, stats);
85 }
86 
88 
89  // check whether we are joining
90  if(this->isJoining()) {
91 
92  // if by any case the child of a join is not a join side pipeline assert
93  assert(node->getType() == JOIN_SIDE);
94 
95  // for each side of the join that is not the one we are coming from we check if it is processed
96  for(auto &producer : producers) {
97 
98  // if on of the producers is not a join side fail
99  assert(std::dynamic_pointer_cast<AdvancedPhysicalAbstractPipe>(producer)->getType() == JOIN_SIDE);
100 
101  // cast the consumer to the join
102  auto casted_producer = std::dynamic_pointer_cast<AdvancedPhysicalAbstractPipe>(producer);
103 
104  // if the consumer is not processed this operator is not pipelinable if
105  // A) this operator is not executed or B) We shuffled the other side so now we have to shuffle this side
106  if(node != producer && (!casted_producer->isExecuted() || casted_producer->getSelectedAlgorithm()->getType() == JOIN_SHUFFLED_HASHSET_ALGORITHM)) {
107 
108  // ok we can not pipeline this
109  return false;
110  }
111  }
112 
113  // we are fine to pipeline this join
114  return true;
115  }
116 
117  // if the join side is the producer of a pipeline that is not joining something is seriously wrong
118  assert(node->getType() != JOIN_SIDE);
119 
120  // only a straight pipeline can be pipelined
121  return node->getType() == STRAIGHT;
122 }
123 
124 
126  std::vector<AdvancedPhysicalPipelineNodePtr> pipeline,
127  const StatisticsPtr &stats) {
128 
131 
132  // add me to the pipeline
133  pipeline.push_back(getAdvancedPhysicalNodeHandle());
134 
135  // pipeline this node to the consumer
136  return consumers.front()->to<AdvancedPhysicalAbstractPipe>()->pipelineMe(nextStageID, pipeline, stats);
137  }
138 
140  if(consumers.empty()) {
141  return selectOutputAlgorithm()->generatePipelined(nextStageID, stats, pipeline);
142  }
143 
145  // TODO for now I assume I have only one consumer
147 
149  if(consumers.size() == 1 && isChainable()) {
150 
151  // chain this thing to the next pipe
152  return consumers.front()->to<AdvancedPhysicalAbstractPipe>()->chainMe(nextStageID,
153  stats,
154  selectedAlgorithm->generate(nextStageID, stats));
155  }
156 
158  return selectedAlgorithm->generatePipelined(nextStageID, stats, pipeline);
159 }
160 
162  const StatisticsPtr &stats,
163  PhysicalOptimizerResultPtr previous) {
164 
165  // analyze me and get my result the stage ID is not the old one plus the number of previously created stage IDs
166  auto current = analyze(stats, nextStageID + (int) previous->physicalPlanToOutput.size());
167 
168  // append the
169  current->physicalPlanToOutput.insert(current->physicalPlanToOutput.begin(),
170  previous->physicalPlanToOutput.begin(),
171  previous->physicalPlanToOutput.end());
172 
173  // a source should always exist
174  assert(sourceSetIdentifier != nullptr);
175 
176  // if the if we created a new source se
177  current->interGlobalSets.emplace_front(sourceSetIdentifier);
178 
179  // both have to be successful
180  current->success = current->success && previous->success;
181 
182  return current;
183 }
184 
186 
187  // currently we only chain the shuffle set algorithm to the join algorithm
188  return selectedAlgorithm->getType() == JOIN_SUFFLE_SET_ALGORITHM;
189 }
190 
192  return selectedAlgorithm != nullptr;
193 }
194 
196  return producers.size() >= 2;
197 }
198 
200 
201  // go through each computation in this pipe
202  for(auto &it : pipeComputations) {
203 
204  // returns true if this computation is an aggregation
205  if(it->getAtomicComputationTypeID() == ApplyAggTypeID){
206  return true;
207  }
208  }
209 
210  // of we could not find it return false
211  return false;
212 }
213 
215 
216  // do we have statistics, if not just return 0
217  if(stats == nullptr) {
218  return 0;
219  }
220 
221  // check if the source set identifier exists if it does not maybe this pipeline is a join therefore we need to
222  // get the source set from one if it's children
223  if (sourceSetIdentifier == nullptr) {
224  // this might be a problem
225  PDB_COUT << "WARNING: there is no source set for the node " << getNodeIdentifier() << "\n";
226  return 0;
227  }
228 
229  // calculate the cost based on the formula cost = number_of_bytes / 1000000
230  double cost = stats->getNumBytes(sourceSetIdentifier->getDatabase(), sourceSetIdentifier->getSetName());
231  return double((size_t) cost / 1000000);
232 }
233 
235  return selectedAlgorithm;
236 }
237 
239  // return the handle to this node
240  return std::dynamic_pointer_cast<AdvancedPhysicalAbstractPipe>(getHandle());
241 }
242 
244  // TODO this needs to be implemeted properly
245  return false;
246 }
247 
249  return *sourceSetIdentifier == *set;
250 }
251 
253  return "node_" + to_string(id);
254 }
255 
257 
258  // check whether the first node is a scan set
259  return !pipeComputations.empty() && pipeComputations.front()->getAtomicComputationTypeID() == ScanSetAtomicTypeID;
260 }
261 
263  return this->pipeComputations[idx];
264 }
265 const vector<AtomicComputationPtr> &AdvancedPhysicalAbstractPipe::getPipeComputations() const {
266  return pipeComputations;
267 }
268 
270  return sourceSetIdentifier;
271 }
272 
273 AdvancedPhysicalAbstractAlgorithmPtr AdvancedPhysicalAbstractPipe::propose(std::vector<AdvancedPhysicalAbstractAlgorithmPtr> algorithms) {
274 
276 
277  // if we are joining then we have a few extra rules when choosing the algorithm
278  if(isJoining()) {
279 
280  // get the lhs
281  auto lhs = producers.front()->to<AdvancedPhysicalAbstractPipe>();
282 
283  // grab the rhs
284  auto rhs = producers.back()->to<AdvancedPhysicalAbstractPipe>();
285 
286  // if we have executed the right side or the left side with a broadcast join a we are here something is wrong
287  assert(!(lhs->isExecuted() && lhs->getSelectedAlgorithm()->getType() == JOIN_BROADCASTED_HASHSET_ALGORITHM));
288  assert(!(rhs->isExecuted() && rhs->getSelectedAlgorithm()->getType() == JOIN_BROADCASTED_HASHSET_ALGORITHM));
289 
290  // if both the left and the right side are not executed then we just go though the algorithms
291  // if we can do a broadcast we do it otherwise we just select any of them
292  // TODO this is just placeholder logic
293  if(!lhs->isExecuted() && !rhs->isExecuted()) {
294 
295  // go through each algorithm if we have a broad cast algorithm we chose it always
296  for (const auto &algorithm : algorithms) {
297 
298  // we prefer the broadcast algorithm, but if we have none we are fine we just select any
299  if (algorithm->getType() == JOIN_BROADCASTED_HASHSET_ALGORITHM || best == nullptr) {
300 
301  // select the best algorithm
302  best = algorithm;
303  }
304  }
305 
306  // if this is false there is something seriously wrong with our system
307  assert(best != nullptr);
308 
309  // return the best
310  return best;
311  }
312 
313  // at this point we know that at least one of the sides is executed
314  // TODO this is just placeholder logic
315  auto otherAlgorithm = lhs->isExecuted() ? lhs->getSelectedAlgorithm() : rhs->getSelectedAlgorithm();
316 
317  // the must be an algorithm selected for the other side if it is executed
318  assert(otherAlgorithm != nullptr);
319 
320  // did we shuffle the other side, if we did then find the
321  if(otherAlgorithm->getType() == JOIN_SHUFFLED_HASHSET_ALGORITHM) {
322 
323  // go through each algorithm if we find the join shuffle set algorithm return it
324  for (const auto &algorithm : algorithms) {
325  // we prefer the broadcast algorithm, but if we have none we are fine
326  if (algorithm->getType() == JOIN_SUFFLE_SET_ALGORITHM) {
327 
328  return algorithm;
329  }
330  }
331  }
332 
333  // this should not happen
334  assert(false);
335  }
336 
337  // just pick the first on there at this stage this can only be an aggregation or a pipeline algorithm
338  return algorithms.front();
339 }
340 
343 }
344 
345 std::unordered_map<std::string, std::string> AdvancedPhysicalAbstractPipe::getProbingHashSets() {
346 
347  // the return value
348  std::unordered_map<std::string, std::string> ret;
349 
350  for(const auto &p : producers) {
351 
352  // grab the join side
353  auto joinSide = p->to<AdvancedPhysicalJoinSidePipe>();
354 
355  // if this is executed and it has a hash set
356  if(joinSide->isExecuted() && joinSide->hasHashSet()) {
357 
358  // add the hash set
359  ret[this->getPipelineComputationAt(0)->getOutputName()] = (p->to<AdvancedPhysicalJoinSidePipe>()->getGeneratedHashSet());
360  }
361  }
362 
363  return ret;
364 }
365 
366 }
AbstractPhysicalNodePtr getHandle()
virtual AdvancedPhysicalAbstractAlgorithmPtr selectOutputAlgorithm()=0
std::shared_ptr< Statistics > StatisticsPtr
Definition: Statistics.h:27
virtual PhysicalOptimizerResultPtr chainMe(int nextStageID, const StatisticsPtr &stats, PhysicalOptimizerResultPtr previous)
const vector< AtomicComputationPtr > & getPipeComputations() const
vector< AtomicComputationPtr > pipeComputations
AdvancedPhysicalPipelineNodePtr getAdvancedPhysicalNodeHandle()
AdvancedPhysicalAbstractAlgorithmPtr selectedAlgorithm
virtual AdvancedPhysicalAbstractAlgorithmPtr propose(std::vector< AdvancedPhysicalAbstractAlgorithmPtr > algorithms)
virtual PhysicalOptimizerResultPtr pipelineMe(int nextStageID, std::vector< AdvancedPhysicalPipelineNodePtr > pipeline, const StatisticsPtr &stats)
AdvancedPhysicalAbstractPipe(string &jobId, const Handle< ComputePlan > &computePlan, LogicalPlanPtr &logicalPlan, ConfigurationPtr &conf, vector< AtomicComputationPtr > &pipeComputations, size_t id)
std::shared_ptr< LogicalPlan > LogicalPlanPtr
Definition: ComputePlan.h:36
void setSourceSetIdentifier(const Handle< SetIdentifier > &sourceSetIdentifier)
PhysicalOptimizerResultPtr analyze(const StatisticsPtr &stats, int nextStageID) override
double getCost(const StatisticsPtr &stats) override
std::unordered_map< std::string, std::string > getProbingHashSets()
AtomicComputationPtr getPipelineComputationAt(size_t idx)
bool isConsuming(Handle< SetIdentifier > &set) override
virtual AdvancedPhysicalPipelineTypeID getType()=0
#define PDB_COUT
Definition: PDBDebug.h:31
shared_ptr< Configuration > ConfigurationPtr
Definition: Configuration.h:89
std::shared_ptr< struct AtomicComputation > AtomicComputationPtr
std::list< AbstractPhysicalNodePtr > consumers
virtual std::vector< AdvancedPhysicalAbstractAlgorithmPtr > getPossibleAlgorithms(const StatisticsPtr &stats)=0
Handle< SetIdentifier > getSetIdentifierFromComputation(Handle< Computation > computation)
std::shared_ptr< AdvancedPhysicalAbstractPipe > AdvancedPhysicalPipelineNodePtr
const Handle< SetIdentifier > & getSourceSetIdentifier() const
virtual bool isPipelinable(AdvancedPhysicalPipelineNodePtr node)
std::list< AbstractPhysicalNodeWeakPtr > producers
std::shared_ptr< PhysicalOptimizerResult > PhysicalOptimizerResultPtr
const AdvancedPhysicalAbstractAlgorithmPtr & getSelectedAlgorithm() const
std::shared_ptr< AdvancedPhysicalAbstractAlgorithm > AdvancedPhysicalAbstractAlgorithmPtr