A platform for high-performance distributed tool and library development written in C++. It can be deployed in two different cluster modes: standalone or distributed. API for v0.5.0, released on June 13, 2018.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
ComputePlan.cc
Go to the documentation of this file.
1 /*****************************************************************************
2  * *
3  * Copyright 2018 Rice University *
4  * *
5  * Licensed under the Apache License, Version 2.0 (the "License"); *
6  * you may not use this file except in compliance with the License. *
7  * You may obtain a copy of the License at *
8  * *
9  * http://www.apache.org/licenses/LICENSE-2.0 *
10  * *
11  * Unless required by applicable law or agreed to in writing, software *
12  * distributed under the License is distributed on an "AS IS" BASIS, *
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. *
14  * See the License for the specific language governing permissions and *
15  * limitations under the License. *
16  * *
17  *****************************************************************************/
18 
19 #ifndef COMPUTE_PLAN_CC
20 #define COMPUTE_PLAN_CC
21 
22 #include "JoinCompBase.h"
23 #include "ComputePlan.h"
24 #include "FilterExecutor.h"
25 #include "HashOneExecutor.h"
26 #include "FlattenExecutor.h"
28 #include "EqualsLambda.h"
29 #include "AbstractJoinComp.h"
30 #include "Lexer.h"
31 #include "Parser.h"
32 
33 extern int yydebug;
34 
35 namespace pdb {
36 
38 
40 
41  // if we already have the plan, then just return it
42  if (myPlan != nullptr)
43  return myPlan;
44 
45  // get the string to compile
46  std::string myLogicalPlan = TCAPComputation;
47  myLogicalPlan.push_back('\0');
48 
49  // where the result of the parse goes
50  AtomicComputationList* myResult;
51 
52  // now, do the compilation
53  yyscan_t scanner;
54  LexerExtra extra{""};
55  yylex_init_extra(&extra, &scanner);
56  const YY_BUFFER_STATE buffer{yy_scan_string(myLogicalPlan.data(), scanner)};
57  const int parseFailed{yyparse(scanner, &myResult)};
58  yy_delete_buffer(buffer, scanner);
59  yylex_destroy(scanner);
60 
61  // if it didn't parse, get outta here
62  if (parseFailed) {
63  std::cout << "Parse error when compiling TCAP: " << extra.errorMessage;
64  exit(1);
65  }
66 
67  // this is the logical plan to return
68  myPlan = std::make_shared<LogicalPlan>(*myResult, allComputations);
69  delete myResult;
70 
71  // and now we are outta here
72  return myPlan;
73 }
74 
76  myPlan = nullptr;
77 }
78 
79 // this does a DFS, trying to find a list of computations that lead to the specified computation
81  std::vector<AtomicComputationPtr>& listSoFar,
82  std::string& targetTupleSetName) {
83 
84  // see if the guy at the end of the list is indeed the target
85  if (listSoFar.back()->getOutputName() == targetTupleSetName) {
86 
87  // in this case, we have the complete list of computations
88  return true;
89  }
90 
91  // get all of the guys who consume the dude on the end of the list
92  std::vector<AtomicComputationPtr>& nextOnes =
93  myPlan->getComputations().getConsumingAtomicComputations(listSoFar.back()->getOutputName());
94 
95  // and try to put each of the next computations on the end of the list, and recursively search
96  for (auto& a : nextOnes) {
97 
98  // see if the next computation was on the path to the target
99  listSoFar.push_back(a);
100  if (recurse(myPlan, listSoFar, targetTupleSetName)) {
101 
102  // it was! So we are done
103  return true;
104  }
105 
106  // we couldn't find the target
107  listSoFar.pop_back();
108  }
109 
110  // if we made it here, we could not find the target
111  return false;
112 }
113 
114 
115 inline std::string ComputePlan::getProducingComputationName(std::string sourceTupleSetName) {
116 
117  if (myPlan == nullptr) {
118  getPlan();
119  }
120 
121  AtomicComputationList& allComps = myPlan->getComputations();
122 
123  return allComps.getProducingAtomicComputation(sourceTupleSetName)->getComputationName();
124 }
125 
126 
127 // JiaNote: ToDo: reuse code to obtain consumer, attsToOperateOn and projection, encapsuelate that
128 // into a separate function.
129 
130 // JiaNote: implemented following method to provide merger for broadcast join
131 inline SinkMergerPtr ComputePlan::getMerger(std::string sourceTupleSetName,
132  std::string targetTupleSetName,
133  std::string targetComputationName) {
134 
135  if (targetComputationName.find("JoinComp") == std::string::npos) {
136  return nullptr;
137  }
138 
139  // build the plan if it is not already done
140  if (myPlan == nullptr)
141  getPlan();
142 
143  // get all of the computations
144  AtomicComputationList& allComps = myPlan->getComputations();
145 
146 
147  // and get the schema for the output TupleSet objects that it is supposed to produce
148  TupleSpec& targetSpec = allComps.getProducingAtomicComputation(targetTupleSetName)->getOutput();
149 
150  // and get the projection for this guy
151  std::vector<AtomicComputationPtr>& consumers =
152  allComps.getConsumingAtomicComputations(targetSpec.getSetName());
153  // JiaNote: change the reference into a new variable based on Chris' Join code
154  // TupleSpec &targetProjection = targetSpec;
155  TupleSpec targetProjection;
156  TupleSpec targetAttsToOpOn;
157  for (auto& a : consumers) {
158  if (a->getComputationName() == targetComputationName) {
159 
160  // we found the consuming computation
161  if (targetSpec == a->getInput()) {
162  targetProjection = a->getProjection();
163  targetAttsToOpOn = a->getInput();
164  break;
165  }
166 
167  // the only way that the input to this guy does not match targetSpec is if he is a join,
168  // which has two inputs
169  if (a->getAtomicComputationType() != std::string("JoinSets")) {
170  std::cout << "This is bad... is the target computation name correct??";
171  std::cout << "Didn't find a JoinSets, target was " << targetSpec.getSetName()
172  << "\n";
173  exit(1);
174  }
175 
176  // get the join and make sure it matches
177  ApplyJoin* myGuy = (ApplyJoin*)a.get();
178  if (!(myGuy->getRightInput() == targetSpec)) {
179  std::cout << "This is bad... is the target computation name correct??";
180  std::cout << "Find a JoinSets, target was " << targetSpec.getSetName() << "\n";
181  exit(1);
182  }
183 
184  targetProjection = myGuy->getRightProjection();
185  targetAttsToOpOn = myGuy->getRightInput();
186  }
187  }
188 
189  // now we have the list of computations, and so it is time to get the sink merger
190  SinkMergerPtr sinkMerger =
191  myPlan->getNode(targetComputationName)
192  .getComputation()
193  .getSinkMerger(targetSpec, targetAttsToOpOn, targetProjection, *this);
194  return sinkMerger;
195 }
196 
197 // JiaNote: implemented following method to provide shuffler for hash partitioned join
198 inline SinkShufflerPtr ComputePlan::getShuffler(std::string sourceTupleSetName,
199  std::string targetTupleSetName,
200  std::string targetComputationName) {
201 
202  if (targetComputationName.find("JoinComp") == std::string::npos) {
203  return nullptr;
204  }
205  // build the plan if it is not already done
206  if (myPlan == nullptr)
207  getPlan();
208 
209  // get all of the computations
210  AtomicComputationList& allComps = myPlan->getComputations();
211 
212 
213  // and get the schema for the output TupleSet objects that it is supposed to produce
214  TupleSpec& targetSpec = allComps.getProducingAtomicComputation(targetTupleSetName)->getOutput();
215 
216  // and get the projection for this guy
217  std::vector<AtomicComputationPtr>& consumers =
218  allComps.getConsumingAtomicComputations(targetSpec.getSetName());
219  // JiaNote: change the reference into a new variable based on Chris' Join code
220  // TupleSpec &targetProjection = targetSpec;
221  TupleSpec targetProjection;
222  TupleSpec targetAttsToOpOn;
223  for (auto& a : consumers) {
224  if (a->getComputationName() == targetComputationName) {
225 
226  // we found the consuming computation
227  if (targetSpec == a->getInput()) {
228  targetProjection = a->getProjection();
229  targetAttsToOpOn = a->getInput();
230  break;
231  }
232 
233  // the only way that the input to this guy does not match targetSpec is if he is a join,
234  // which has two inputs
235  if (a->getAtomicComputationType() != std::string("JoinSets")) {
236  std::cout << "This is bad... is the target computation name correct??";
237  std::cout << "Didn't find a JoinSets, target was " << targetSpec.getSetName()
238  << "\n";
239  exit(1);
240  }
241 
242  // get the join and make sure it matches
243  ApplyJoin* myGuy = (ApplyJoin*)a.get();
244  if (!(myGuy->getRightInput() == targetSpec)) {
245  std::cout << "This is bad... is the target computation name correct??";
246  std::cout << "Find a JoinSets, target was " << targetSpec.getSetName() << "\n";
247  exit(1);
248  }
249 
250  targetProjection = myGuy->getRightProjection();
251  targetAttsToOpOn = myGuy->getRightInput();
252  }
253  }
254 
255  // now we have the list of computations, and so it is time to get the sink merger
256  SinkShufflerPtr sinkShuffler =
257  myPlan->getNode(targetComputationName)
258  .getComputation()
259  .getSinkShuffler(targetSpec, targetAttsToOpOn, targetProjection, *this);
260  return sinkShuffler;
261 }
262 
263 
264 // JiaNote: add a new buildPipeline method to avoid ambiguity
265 inline PipelinePtr ComputePlan::buildPipeline(std::vector<std::string> buildTheseTupleSets,
266  std::string sourceTupleSetName,
267  std::string targetComputationName,
268  std::function<std::pair<void*, size_t>()> getPage,
269  std::function<void(void*)> discardTempPage,
270  std::function<void(void*)> writeBackPage) {
271 
272  std::map<std::string, ComputeInfoPtr> params;
273  return buildPipeline(buildTheseTupleSets,
274  sourceTupleSetName,
275  targetComputationName,
276  getPage,
277  discardTempPage,
278  writeBackPage,
279  params);
280 }
281 
282 
283 inline PipelinePtr ComputePlan::buildPipeline(std::string sourceTupleSetName,
284  std::string targetTupleSetName,
285  std::string targetComputationName,
286  std::function<std::pair<void*, size_t>()> getPage,
287  std::function<void(void*)> discardTempPage,
288  std::function<void(void*)> writeBackPage) {
289 
290  std::map<std::string, ComputeInfoPtr> params;
291  return buildPipeline(sourceTupleSetName,
292  targetTupleSetName,
293  targetComputationName,
294  getPage,
295  discardTempPage,
296  writeBackPage,
297  params);
298 }
299 
300 
301 // JiaNote: add below method to make sure the pipeline to build is unique, and no ambiguity.
302 inline PipelinePtr ComputePlan::buildPipeline(std::vector<std::string> buildTheseTupleSets,
303  std::string sourceTupleSetName,
304  std::string targetComputationName,
305  std::function<std::pair<void*, size_t>()> getPage,
306  std::function<void(void*)> discardTempPage,
307  std::function<void(void*)> writeBackPage,
308  std::map<std::string, ComputeInfoPtr>& params) {
309 
310 
311  // build the plan if it is not already done
312  if (myPlan == nullptr)
313  getPlan();
314 
315  // get all of the computations
316  AtomicComputationList& allComps = myPlan->getComputations();
317 
318 
319  // to get compute source
320  // now we get the name of the actual computation object that corresponds to the producer of this
321  // tuple set
322  size_t numTupleSets = buildTheseTupleSets.size();
323  if (numTupleSets == 0) {
324  std::cout << "ERROR: there is no tuple sets to build pipeline" << std::endl;
325  return nullptr;
326  }
327 
328 
329  std::string producerName =
330  allComps.getProducingAtomicComputation(buildTheseTupleSets[0])->getComputationName();
331  std::cout << "producerName = " << producerName << std::endl;
332 
333  // and get the schema for the output TupleSet objects that it is supposed to produce
334  TupleSpec& origSpec =
335  allComps.getProducingAtomicComputation(buildTheseTupleSets[0])->getOutput();
336 
337  std::cout << "origSpec: " << origSpec << std::endl;
338 
339  // now we are going to ask that particular node for the compute source
340  ComputeSourcePtr computeSource =
341  myPlan->getNode(producerName).getComputation().getComputeSource(origSpec, *this);
342 
343 
344  // to get compute sink
345  std::string targetTupleSetName = buildTheseTupleSets[numTupleSets - 1];
346  TupleSpec& targetSpec = allComps.getProducingAtomicComputation(targetTupleSetName)->getOutput();
347  TupleSpec targetProjection;
348  TupleSpec targetAttsToOpOn;
349 
350  if (targetComputationName.find("SelectionComp") == std::string::npos) {
351 
352  // and get the schema for the output TupleSet objects that it is supposed to produce
353  if ((allComps.getConsumingAtomicComputations(targetTupleSetName)).size() > 1) {
354  targetProjection = targetSpec;
355  targetAttsToOpOn = targetSpec;
356  } else {
357 
358  // JiaNote: change the reference into a new variable based on Chris' Join code
359  // TupleSpec &targetProjection = targetSpec;
360 
361  auto a = (allComps.getConsumingAtomicComputations(targetTupleSetName))[0];
362 
363  // we found the consuming computation
364  if (targetSpec == a->getInput()) {
365  targetProjection = a->getProjection();
366 
367  // added following to merge join code
368  if (targetComputationName.find("JoinComp") == std::string::npos) {
369  targetSpec = targetProjection;
370  }
371 
372  targetAttsToOpOn = a->getInput();
373 
374  }
375 
376  // the only way that the input to this guy does not match targetSpec is if he is a join,
377  // which has two inputs
378  else if (a->getAtomicComputationType() != std::string("JoinSets")) {
379  std::cout << "This is bad... is the target computation name correct??";
380  std::cout << "Didn't find a JoinSets, target was " << targetSpec.getSetName()
381  << "\n";
382  exit(1);
383  } else {
384  // get the join and make sure it matches
385  ApplyJoin* myGuy = (ApplyJoin*)a.get();
386  if (!(myGuy->getRightInput() == targetSpec)) {
387  std::cout << "This is bad... is the target computation name correct??";
388  std::cout << "Find a JoinSets, target was " << targetSpec.getSetName() << "\n";
389  exit(1);
390  } else {
391  targetProjection = myGuy->getRightProjection();
392  targetAttsToOpOn = myGuy->getRightInput();
393  }
394  }
395  }
396  } else {
397  targetProjection = targetSpec;
398  targetAttsToOpOn = targetSpec;
399  }
400  // now we have the list of computations, and so it is time to build the pipeline... start by
401  // building a compute sink
402  std::cout << "to get compute sink for " << targetComputationName << " using targetSpec=" <<
403  targetSpec << ", targetAttsToOpOn=" << targetAttsToOpOn << ", and targetProjection=" <<
404  targetProjection << std::endl;
405  ComputeSinkPtr computeSink =
406  myPlan->getNode(targetComputationName)
407  .getComputation()
408  .getComputeSink(targetSpec, targetAttsToOpOn, targetProjection, *this);
409 
410  // make the pipeline
411  PipelinePtr returnVal = std::make_shared<Pipeline>(
412  getPage, discardTempPage, writeBackPage, computeSource, computeSink);
413 
414  // add the operations to the pipeline
415  AtomicComputationPtr lastOne =
416  myPlan->getComputations().getProducingAtomicComputation(buildTheseTupleSets[0]);
417 
418  for (int i = 1; i < buildTheseTupleSets.size(); i++) {
419 
420 
422  myPlan->getComputations().getProducingAtomicComputation(buildTheseTupleSets[i]);
423 
424  if (a == nullptr) {
425 
426  std::cout << "ERROR: We can't get producing computation and stop building" << std::endl;
427  return nullptr;
428  }
429 
430 
431  // if we have a filter, then just go ahead and create it
432  if (a->getAtomicComputationType() == "Filter") {
433  if (params.count(a->getOutput().getSetName()) == 0) {
434  returnVal->addStage(std::make_shared<FilterExecutor>(
435  lastOne->getOutput(), a->getInput(), a->getProjection()));
436  } else {
437 
438  returnVal->addStage(
439  std::make_shared<FilterExecutor>(lastOne->getOutput(),
440  a->getInput(),
441  a->getProjection(),
442  params[a->getOutput().getSetName()]));
443  }
444  // if we had an apply, go ahead and find it and add it to the pipeline
445  } else if (a->getAtomicComputationType() == "Apply") {
446 
447  // if we have an available parameter, send it
448  if (params.count(a->getOutput().getSetName()) == 0) {
449  returnVal->addStage(
450  myPlan->getNode(a->getComputationName())
451  .getLambda(((ApplyLambda*)a.get())->getLambdaToApply())
452  ->getExecutor(lastOne->getOutput(), a->getInput(), a->getProjection()));
453  } else {
454  returnVal->addStage(myPlan->getNode(a->getComputationName())
455  .getLambda(((ApplyLambda*)a.get())->getLambdaToApply())
456  ->getExecutor(lastOne->getOutput(),
457  a->getInput(),
458  a->getProjection(),
459  params[a->getOutput().getSetName()]));
460  }
461 
462  } else if (a->getAtomicComputationType() == "HashLeft") {
463 
464  // if we have an available parameter, send it
465  if (params.count(a->getOutput().getSetName()) == 0)
466  returnVal->addStage(
467  myPlan->getNode(a->getComputationName())
468  .getLambda(((HashLeft*)a.get())->getLambdaToApply())
469  ->getLeftHasher(lastOne->getOutput(), a->getInput(), a->getProjection()));
470  else
471  returnVal->addStage(myPlan->getNode(a->getComputationName())
472  .getLambda(((HashLeft*)a.get())->getLambdaToApply())
473  ->getLeftHasher(lastOne->getOutput(),
474  a->getInput(),
475  a->getProjection(),
476  params[a->getOutput().getSetName()]));
477 
478  } else if (a->getAtomicComputationType() == "HashRight") {
479 
480  // if we have an available parameter, send it
481  if (params.count(a->getOutput().getSetName()) == 0)
482  returnVal->addStage(
483  myPlan->getNode(a->getComputationName())
484  .getLambda(((HashLeft*)a.get())->getLambdaToApply())
485  ->getRightHasher(lastOne->getOutput(), a->getInput(), a->getProjection()));
486  else
487  returnVal->addStage(myPlan->getNode(a->getComputationName())
488  .getLambda(((HashLeft*)a.get())->getLambdaToApply())
489  ->getRightHasher(lastOne->getOutput(),
490  a->getInput(),
491  a->getProjection(),
492  params[a->getOutput().getSetName()]));
493 
494  } else if (a->getAtomicComputationType() == "HashOne") {
495  if (params.count(a->getOutput().getSetName()) == 0) {
496  returnVal->addStage(std::make_shared<HashOneExecutor>(
497  lastOne->getOutput(), a->getInput(), a->getProjection()));
498  } else {
499 
500  returnVal->addStage(
501  std::make_shared<HashOneExecutor>(lastOne->getOutput(),
502  a->getInput(),
503  a->getProjection(),
504  params[a->getOutput().getSetName()]));
505  }
506  } else if (a->getAtomicComputationType() == "Flatten") {
507  if (params.count(a->getOutput().getSetName()) == 0) {
508  returnVal->addStage(std::make_shared<FlattenExecutor>(
509  lastOne->getOutput(), a->getInput(), a->getProjection()));
510  } else {
511 
512  returnVal->addStage(
513  std::make_shared<FlattenExecutor>(lastOne->getOutput(),
514  a->getInput(),
515  a->getProjection(),
516  params[a->getOutput().getSetName()]));
517  }
518 
519  } else if (a->getAtomicComputationType() == "JoinSets") {
520 
521  // join is weird, because there are two inputs...
522  AbstractJoinComp& myComp = (AbstractJoinComp&)myPlan->getNode(a->getComputationName()).getComputation();
523  ApplyJoin* myJoin = (ApplyJoin*)(a.get());
524 
525  // check if we are pipelinining the right input
526  if (lastOne->getOutput().getSetName() == myJoin->getRightInput().getSetName()) {
527 
528 
529  // if we are pipelining the right input, then we don't need to switch left and right
530  // inputs
531  if (params.count(a->getOutput().getSetName()) == 0) {
532  returnVal->addStage(myComp.getExecutor(true,
533  myJoin->getProjection(),
534  lastOne->getOutput(),
535  myJoin->getRightInput(),
536  myJoin->getRightProjection()));
537  } else {
538  returnVal->addStage(myComp.getExecutor(true,
539  myJoin->getProjection(),
540  lastOne->getOutput(),
541  myJoin->getRightInput(),
542  myJoin->getRightProjection(),
543  params[a->getOutput().getSetName()]));
544  }
545 
546  } else {
547 
548  // std :: cout << "We are pipelining the left input...\n";
549 
550  // if we are pipelining the right input, then we don't need to switch left and right
551  // inputs
552  if (params.count(a->getOutput().getSetName()) == 0) {
553  returnVal->addStage(myComp.getExecutor(false,
554  myJoin->getRightProjection(),
555  lastOne->getOutput(),
556  myJoin->getInput(),
557  myJoin->getProjection()));
558  } else {
559  returnVal->addStage(myComp.getExecutor(false,
560  myJoin->getRightProjection(),
561  lastOne->getOutput(),
562  myJoin->getInput(),
563  myJoin->getProjection(),
564  params[a->getOutput().getSetName()]));
565  }
566  }
567 
568 
569  } else {
570  std::cout << "This is bad... found an unexpected computation type ("
571  << a->getComputationName() << ") inside of a pipeline.\n";
572  }
573 
574  lastOne = a;
575  }
576  // std :: cout << "Sink: " << targetSpec << " [" << targetProjection << "]\n";
577  return returnVal;
578 }
579 
580 inline PipelinePtr ComputePlan::buildPipeline(std::string sourceTupleSetName,
581  std::string targetTupleSetName,
582  std::string targetComputationName,
583  std::function<std::pair<void*, size_t>()> getPage,
584  std::function<void(void*)> discardTempPage,
585  std::function<void(void*)> writeBackPage,
586  std::map<std::string, ComputeInfoPtr>& params) {
587 
588  // build the plan if it is not already done
589  if (myPlan == nullptr)
590  getPlan();
591 
592  // get all of the computations
593  AtomicComputationList& allComps = myPlan->getComputations();
594 
595  // std :: cout << "print computations:" << std :: endl;
596  // std :: cout << allComps << std :: endl;
597 
598  // now we get the name of the actual computation object that corresponds to the producer of this
599  // tuple set
600  std::string producerName =
601  allComps.getProducingAtomicComputation(sourceTupleSetName)->getComputationName();
602 
603  // std :: cout << "producerName = " << producerName << std :: endl;
604 
605  // and get the schema for the output TupleSet objects that it is supposed to produce
606  TupleSpec& origSpec = allComps.getProducingAtomicComputation(sourceTupleSetName)->getOutput();
607 
608  // now we are going to ask that particular node for the compute source
609  ComputeSourcePtr computeSource =
610  myPlan->getNode(producerName).getComputation().getComputeSource(origSpec, *this);
611 
612  // std :: cout << "\nBUILDING PIPELINE\n";
613  // std :: cout << "Source: " << origSpec << "\n";
614  // now we have to do a DFS. This vector will store all of the computations we've found so far
615  std::vector<AtomicComputationPtr> listSoFar;
616 
617  // and this list stores the computations that we still need to process
618  std::vector<AtomicComputationPtr>& nextOnes =
619  myPlan->getComputations().getConsumingAtomicComputations(origSpec.getSetName());
620 
621  // now, see if each of the next guys can get us to the target tuple set
622  bool gotIt = false;
623  for (auto& a : nextOnes) {
624  listSoFar.push_back(a);
625 
626  // see if the next computation was on the path to the target
627  if (recurse(myPlan, listSoFar, targetTupleSetName)) {
628  gotIt = true;
629  break;
630  }
631 
632  // we couldn't find the target
633  listSoFar.pop_back();
634  }
635 
636  // see if we could not find a path
637  if (!gotIt) {
638  std::cerr
639  << "This is bad. Could not find a path from source computation to sink computation.\n";
640  exit(1);
641  }
642 
643  // and get the schema for the output TupleSet objects that it is supposed to produce
644  TupleSpec& targetSpec = allComps.getProducingAtomicComputation(targetTupleSetName)->getOutput();
645  // std :: cout << "The target is " << targetSpec << "\n";
646 
647 
648  // and get the projection for this guy
649  std::vector<AtomicComputationPtr>& consumers =
650  allComps.getConsumingAtomicComputations(targetSpec.getSetName());
651  // JiaNote: change the reference into a new variable based on Chris' Join code
652  // TupleSpec &targetProjection = targetSpec;
653  TupleSpec targetProjection;
654  TupleSpec targetAttsToOpOn;
655  for (auto& a : consumers) {
656  if (a->getComputationName() == targetComputationName) {
657 
658  // std :: cout << "targetComputationName was " << targetComputationName << "\n";
659 
660  // we found the consuming computation
661  if (targetSpec == a->getInput()) {
662  targetProjection = a->getProjection();
663 
664  // added following to merge join code
665  if (targetComputationName.find("JoinComp") == std::string::npos) {
666  targetSpec = targetProjection;
667  }
668 
669  targetAttsToOpOn = a->getInput();
670  break;
671  }
672 
673  // the only way that the input to this guy does not match targetSpec is if he is a join,
674  // which has two inputs
675  if (a->getAtomicComputationType() != std::string("JoinSets")) {
676  std::cout << "This is bad... is the target computation name correct??";
677  std::cout << "Didn't find a JoinSets, target was " << targetSpec.getSetName()
678  << "\n";
679  exit(1);
680  }
681 
682  // get the join and make sure it matches
683  ApplyJoin* myGuy = (ApplyJoin*)a.get();
684  if (!(myGuy->getRightInput() == targetSpec)) {
685  std::cout << "This is bad... is the target computation name correct??";
686  std::cout << "Find a JoinSets, target was " << targetSpec.getSetName() << "\n";
687  exit(1);
688  }
689 
690  // std :: cout << "Building sink for: " << targetSpec << " " <<
691  // myGuy->getRightProjection () << " " << myGuy->getRightInput () << "\n";
692  targetProjection = myGuy->getRightProjection();
693  targetAttsToOpOn = myGuy->getRightInput();
694  // std :: cout << "Building sink for: " << targetSpec << " " << targetAttsToOpOn << " "
695  // << targetProjection << "\n";
696  }
697  }
698 
699  // now we have the list of computations, and so it is time to build the pipeline... start by
700  // building a compute sink
701  ComputeSinkPtr computeSink =
702  myPlan->getNode(targetComputationName)
703  .getComputation()
704  .getComputeSink(targetSpec, targetAttsToOpOn, targetProjection, *this);
705 
706  // make the pipeline
707  PipelinePtr returnVal = std::make_shared<Pipeline>(
708  getPage, discardTempPage, writeBackPage, computeSource, computeSink);
709 
710  // add the operations to the pipeline
711  AtomicComputationPtr lastOne =
712  myPlan->getComputations().getProducingAtomicComputation(sourceTupleSetName);
713  for (auto& a : listSoFar) {
714 
715  // if we have a filter, then just go ahead and create it
716  if (a->getAtomicComputationType() == "Filter") {
717  // std :: cout << "Adding: " << a->getProjection () << " + filter [" << a->getInput ()
718  // << "] => " << a->getOutput () << "\n";
719  if (params.count(a->getOutput().getSetName()) == 0) {
720  returnVal->addStage(std::make_shared<FilterExecutor>(
721  lastOne->getOutput(), a->getInput(), a->getProjection()));
722  } else {
723 
724  returnVal->addStage(
725  std::make_shared<FilterExecutor>(lastOne->getOutput(),
726  a->getInput(),
727  a->getProjection(),
728  params[a->getOutput().getSetName()]));
729  }
730  // if we had an apply, go ahead and find it and add it to the pipeline
731  } else if (a->getAtomicComputationType() == "Apply") {
732  // std :: cout << "Adding: " << a->getProjection () << " + apply [" << a->getInput () <<
733  // "] => " << a->getOutput () << "\n";
734 
735  // if we have an available parameter, send it
736  if (params.count(a->getOutput().getSetName()) == 0) {
737  returnVal->addStage(
738  myPlan->getNode(a->getComputationName())
739  .getLambda(((ApplyLambda*)a.get())->getLambdaToApply())
740  ->getExecutor(lastOne->getOutput(), a->getInput(), a->getProjection()));
741  } else {
742  returnVal->addStage(myPlan->getNode(a->getComputationName())
743  .getLambda(((ApplyLambda*)a.get())->getLambdaToApply())
744  ->getExecutor(lastOne->getOutput(),
745  a->getInput(),
746  a->getProjection(),
747  params[a->getOutput().getSetName()]));
748  }
749 
750  } else if (a->getAtomicComputationType() == "HashLeft") {
751  // std :: cout << "Adding: " << a->getProjection () << " + hashleft [" << a->getInput ()
752  // << "] => " << a->getOutput () << "\n";
753 
754  // if we have an available parameter, send it
755  if (params.count(a->getOutput().getSetName()) == 0)
756  returnVal->addStage(
757  myPlan->getNode(a->getComputationName())
758  .getLambda(((HashLeft*)a.get())->getLambdaToApply())
759  ->getLeftHasher(lastOne->getOutput(), a->getInput(), a->getProjection()));
760  else
761  returnVal->addStage(myPlan->getNode(a->getComputationName())
762  .getLambda(((HashLeft*)a.get())->getLambdaToApply())
763  ->getLeftHasher(lastOne->getOutput(),
764  a->getInput(),
765  a->getProjection(),
766  params[a->getOutput().getSetName()]));
767 
768  } else if (a->getAtomicComputationType() == "HashRight") {
769  // std :: cout << "Adding: " << a->getProjection () << " + hashright [" << a->getInput
770  // () << "] => " << a->getOutput () << "\n";
771 
772  // if we have an available parameter, send it
773  if (params.count(a->getOutput().getSetName()) == 0)
774  returnVal->addStage(
775  myPlan->getNode(a->getComputationName())
776  .getLambda(((HashLeft*)a.get())->getLambdaToApply())
777  ->getRightHasher(lastOne->getOutput(), a->getInput(), a->getProjection()));
778  else
779  returnVal->addStage(myPlan->getNode(a->getComputationName())
780  .getLambda(((HashLeft*)a.get())->getLambdaToApply())
781  ->getRightHasher(lastOne->getOutput(),
782  a->getInput(),
783  a->getProjection(),
784  params[a->getOutput().getSetName()]));
785 
786  } else if (a->getAtomicComputationType() == "HashOne") {
787  // std :: cout << "Adding: " << a->getProjection () << " + hashone [" << a->getInput ()
788  // << "] => " << a->getOutput () << "\n";
789  if (params.count(a->getOutput().getSetName()) == 0) {
790  returnVal->addStage(std::make_shared<HashOneExecutor>(
791  lastOne->getOutput(), a->getInput(), a->getProjection()));
792  } else {
793 
794  returnVal->addStage(
795  std::make_shared<HashOneExecutor>(lastOne->getOutput(),
796  a->getInput(),
797  a->getProjection(),
798  params[a->getOutput().getSetName()]));
799  }
800 
801  } else if (a->getAtomicComputationType() == "Flatten") {
802  // std :: cout << "Adding: " << a->getProjection () << " + flatten [" << a->getInput ()
803  // << "] => " << a->getOutput () << "\n";
804  if (params.count(a->getOutput().getSetName()) == 0) {
805  returnVal->addStage(std::make_shared<FlattenExecutor>(
806  lastOne->getOutput(), a->getInput(), a->getProjection()));
807  } else {
808 
809  returnVal->addStage(
810  std::make_shared<FlattenExecutor>(lastOne->getOutput(),
811  a->getInput(),
812  a->getProjection(),
813  params[a->getOutput().getSetName()]));
814  }
815  } else if (a->getAtomicComputationType() == "JoinSets") {
816  // std :: cout << "Adding: " << a->getProjection () << " + join [" << a->getInput () <<
817  // "] => " << a->getOutput () << "\n";
818 
819  // join is weird, because there are two inputs...
820  AbstractJoinComp& myComp = (AbstractJoinComp&)myPlan->getNode(a->getComputationName()).getComputation();
821  ApplyJoin* myJoin = (ApplyJoin*)(a.get());
822 
823  // check if we are pipelinining the right input
824  if (lastOne->getOutput().getSetName() == myJoin->getRightInput().getSetName()) {
825 
826  // std :: cout << "We are pipelining the right input...\n";
827 
828  // if we are pipelining the right input, then we don't need to switch left and right
829  // inputs
830  if (params.count(a->getOutput().getSetName()) == 0) {
831  returnVal->addStage(myComp.getExecutor(true,
832  myJoin->getProjection(),
833  lastOne->getOutput(),
834  myJoin->getRightInput(),
835  myJoin->getRightProjection()));
836  } else {
837  returnVal->addStage(myComp.getExecutor(true,
838  myJoin->getProjection(),
839  lastOne->getOutput(),
840  myJoin->getRightInput(),
841  myJoin->getRightProjection(),
842  params[a->getOutput().getSetName()]));
843  }
844 
845  } else {
846 
847  // std :: cout << "We are pipelining the left input...\n";
848 
849  // if we are pipelining the right input, then we don't need to switch left and right
850  // inputs
851  if (params.count(a->getOutput().getSetName()) == 0) {
852  returnVal->addStage(myComp.getExecutor(false,
853  myJoin->getRightProjection(),
854  lastOne->getOutput(),
855  myJoin->getInput(),
856  myJoin->getProjection()));
857  } else {
858  returnVal->addStage(myComp.getExecutor(false,
859  myJoin->getRightProjection(),
860  lastOne->getOutput(),
861  myJoin->getInput(),
862  myJoin->getProjection(),
863  params[a->getOutput().getSetName()]));
864  }
865  }
866 
867 
868  } else {
869  std::cout << "This is bad... found an unexpected computation type ("
870  << a->getComputationName() << ") inside of a pipeline.\n";
871  }
872 
873  lastOne = a;
874  }
875 
876  // std :: cout << "Sink: " << targetSpec << " [" << targetProjection << "]\n";
877  return returnVal;
878 }
879 
882  : TCAPComputation(TCAPComputation), allComputations(allComputations) {}
883 }
884 
885 #endif
int yyparse(yyscan_t, struct AtomicComputationList **)
std::shared_ptr< ComputeSource > ComputeSourcePtr
Definition: ComputeSource.h:26
LogicalPlanPtr getPlan()
Definition: ComputePlan.cc:39
PipelinePtr buildPipeline(std::string sourceTupleSetName, std::string targetTupleSetName, std::string targetComputationName, std::function< std::pair< void *, size_t >()> getPage, std::function< void(void *)> discardTempPage, std::function< void(void *)> writeBackPage, std::map< std::string, ComputeInfoPtr > &params)
Definition: ComputePlan.cc:580
void * yyscan_t
Definition: Lexer.h:31
TupleSpec & getRightInput()
std::string & getComputationName()
std::shared_ptr< SinkShuffler > SinkShufflerPtr
Definition: SinkShuffler.h:31
struct yy_buffer_state * YY_BUFFER_STATE
Definition: Lexer.h:35
SinkShufflerPtr getShuffler(std::string sourceTupleSetName, std::string targetTupleSetName, std::string targetComputationName)
Definition: ComputePlan.cc:198
TupleSpec & getInput()
int yydebug
std::shared_ptr< SinkMerger > SinkMergerPtr
Definition: SinkMerger.h:30
LogicalPlanPtr myPlan
Definition: ComputePlan.h:50
std::shared_ptr< Pipeline > PipelinePtr
Definition: Pipeline.h:314
int yylex_init_extra(struct LexerExtra *, yyscan_t *)
std::shared_ptr< LogicalPlan > LogicalPlanPtr
Definition: ComputePlan.h:36
std::string & getSetName()
Definition: TupleSpec.h:56
Vector< Handle< Computation > > allComputations
Definition: ComputePlan.h:46
ENABLE_DEEP_COPY ComputePlan()
Definition: ComputePlan.cc:37
AtomicComputationPtr getProducingAtomicComputation(std::string outputName)
std::vector< AtomicComputationPtr > & getConsumingAtomicComputations(std::string inputName)
String TCAPComputation
Definition: ComputePlan.h:43
std::string getProducingComputationName(std::string sourceTupleSetName)
Definition: ComputePlan.cc:115
std::shared_ptr< ComputeSink > ComputeSinkPtr
Definition: ComputeSink.h:27
std::shared_ptr< struct AtomicComputation > AtomicComputationPtr
TupleSpec & getProjection()
int yylex_destroy(yyscan_t)
virtual ComputeExecutorPtr getExecutor(bool needToSwapAtts, TupleSpec &hashedInputSchema, TupleSpec &pipelinedInputSchema, TupleSpec &pipelinedAttsToOperateOn, TupleSpec &pipelinedAttsToIncludeInOutput, ComputeInfoPtr arg)=0
bool recurse(LogicalPlanPtr myPlan, std::vector< AtomicComputationPtr > &listSoFar, std::string &targetTupleSetName)
Definition: ComputePlan.cc:80
void nullifyPlanPointer()
Definition: ComputePlan.cc:75
TupleSpec & getRightProjection()
void yy_delete_buffer(YY_BUFFER_STATE, yyscan_t)
YY_BUFFER_STATE yy_scan_string(const char *, yyscan_t)
SinkMergerPtr getMerger(std::string sourceTupleSetName, std::string targetTupleSetName, std::string targetComputationName)
Definition: ComputePlan.cc:131