A platform for high-performance distributed tool and library development written in C++. It can be deployed in two different cluster modes: standalone or distributed. API for v0.5.0, released on June 13, 2018.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
GenericLambdaObject.h
Go to the documentation of this file.
1 /*****************************************************************************
2  * *
3  * Copyright 2018 Rice University *
4  * *
5  * Licensed under the Apache License, Version 2.0 (the "License"); *
6  * you may not use this file except in compliance with the License. *
7  * You may obtain a copy of the License at *
8  * *
9  * http://www.apache.org/licenses/LICENSE-2.0 *
10  * *
11  * Unless required by applicable law or agreed to in writing, software *
12  * distributed under the License is distributed on an "AS IS" BASIS, *
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. *
14  * See the License for the specific language governing permissions and *
15  * limitations under the License. *
16  * *
17  *****************************************************************************/
18 
19 #ifndef LAMBDA_HELPER_H
20 #define LAMBDA_HELPER_H
21 
22 #include <memory>
23 #include <vector>
24 #include <functional>
25 #include <mustache.h>
26 #include <mustache_helper.h>
27 #include "Object.h"
28 #include "Handle.h"
29 #include "Ptr.h"
30 #include "TupleSpec.h"
31 #include "ComputeExecutor.h"
32 #include "SimpleComputeExecutor.h"
33 #include "ComputeInfo.h"
34 #include "MultiInputsBase.h"
35 #include "TupleSetMachine.h"
36 #include "LambdaTree.h"
37 
42 namespace pdb {
43 
49 typedef std::shared_ptr<GenericLambdaObject> GenericLambdaObjectPtr;
50 
55 
56  private:
57 
61  std::vector<unsigned int> inputIndexes;
62 
63  public:
64 
65  virtual ~GenericLambdaObject() = default;
66 
72  void setInputIndex(int i, unsigned int index) {
73  size_t numInputs = this->getNumInputs();
74  if (numInputs == 0) {
75  numInputs = 1;
76  }
77  if (inputIndexes.size() != numInputs) {
78  inputIndexes.resize(numInputs);
79  }
80  if (i < numInputs) {
81  this->inputIndexes[i] = index;
82  }
83  }
84 
90  virtual unsigned int getInputIndex(int i) {
91  if (i >= this->getNumInputs()) {
92  return (unsigned int) (-1);
93  }
94  return inputIndexes[i];
95  }
96 
101  virtual unsigned int getNumInputs() = 0;
102 
110  virtual ComputeExecutorPtr getExecutor(TupleSpec &inputSchema,
111  TupleSpec &attsToOperateOn,
112  TupleSpec &attsToIncludeInOutput) = 0;
113 
124  TupleSpec &attsToOperateOn,
125  TupleSpec &attsToIncludeInOutput,
126  ComputeInfoPtr) {
127  return getExecutor(inputSchema, attsToOperateOn, attsToIncludeInOutput);
128  }
129 
138  TupleSpec &attsToOperateOn,
139  TupleSpec &attsToIncludeInOutput) {
140  std::cout << "getLeftHasher not implemented for this type!!\n";
141  exit(1);
142  }
143 
152  TupleSpec &attsToOperateOn,
153  TupleSpec &attsToIncludeInOutput,
154  ComputeInfoPtr) {
155  return getLeftHasher(inputSchema, attsToOperateOn, attsToIncludeInOutput);
156  }
157 
166  TupleSpec &attsToOperateOn,
167  TupleSpec &attsToIncludeInOutput) {
168  std::cout << "getRightHasher not implemented for this type!!\n";
169  exit(1);
170  }
171 
180  TupleSpec &attsToOperateOn,
181  TupleSpec &attsToIncludeInOutput,
182  ComputeInfoPtr) {
183  return getRightHasher(inputSchema, attsToOperateOn, attsToIncludeInOutput);
184  }
185 
190  virtual std::string getTypeOfLambda() = 0;
191 
215  virtual int getNumChildren() = 0;
216 
222  virtual GenericLambdaObjectPtr getChild(int which) = 0;
223 
240  std::string getTCAPString(const std::string &inputTupleSetName,
241  const std::vector<std::string> &inputColumnNames,
242  const std::vector<std::string> &inputColumnsToApply,
243  const std::string &outputTupleSetName,
244  const std::vector<std::string> &outputColumns,
245  const std::string &outputColumnName,
246  const std::string &tcapOperation,
247  const std::string &computationNameAndLabel,
248  const std::string &lambdaNameAndLabel,
249  const std::map<std::string, std::string> &info) {
250 
251  mustache::mustache outputTupleSetNameTemplate{"{{outputTupleSetName}}({{#outputColumns}}{{value}}{{^isLast}},{{/isLast}}{{/outputColumns}}) <= "
252  "{{tcapOperation}} ({{inputTupleSetName}}({{#inputColumnsToApply}}{{value}}{{^isLast}},{{/isLast}}{{/inputColumnsToApply}}), "
253  "{{inputTupleSetName}}({{#hasColumnNames}}{{#inputColumnNames}}{{value}}{{^isLast}},{{/isLast}}{{/inputColumnNames}}{{/hasColumnNames}}), "
254  "'{{computationNameAndLabel}}', "
255  "{{#hasLambdaNameAndLabel}}'{{lambdaNameAndLabel}}', {{/hasLambdaNameAndLabel}}"
256  "[{{#info}}('{{key}}', '{{value}}'){{^isLast}}, {{/isLast}}{{/info}}])\n"};
257 
258  // create the data for the output columns
259  mustache::data outputColumnData = mustache::from_vector<std::string>(outputColumns);
260 
261  // create the data for the input columns to apply
262  mustache::data inputColumnsToApplyData = mustache::from_vector<std::string>(inputColumnsToApply);
263 
264  // create the data for the input columns to apply
265  mustache::data inputColumnNamesData = mustache::from_vector<std::string>(inputColumnNames);
266 
267  // create the info data
268  mustache::data infoData = mustache::from_map(info);
269 
270  // create the data for the lambda
271  mustache::data lambdaData;
272 
273  lambdaData.set("outputTupleSetName", outputTupleSetName);
274  lambdaData.set("outputColumns", outputColumnData);
275  lambdaData.set("tcapOperation", tcapOperation);
276  lambdaData.set("inputTupleSetName", inputTupleSetName);
277  lambdaData.set("inputColumnsToApply", inputColumnsToApplyData);
278  lambdaData.set("hasColumnNames", !inputColumnNames.empty());
279  lambdaData.set("inputColumnNames", inputColumnNamesData);
280  lambdaData.set("inputTupleSetName", inputTupleSetName);
281  lambdaData.set("computationNameAndLabel", computationNameAndLabel);
282  lambdaData.set("hasLambdaNameAndLabel", !lambdaNameAndLabel.empty());
283  lambdaData.set("lambdaNameAndLabel", lambdaNameAndLabel);
284  lambdaData.set("info", infoData);
285 
286  return outputTupleSetNameTemplate.render(lambdaData);
287  }
288 
301  virtual std::string toTCAPStringForCartesianJoin(int lambdaLabel,
302  std::string computationName,
303  int computationLabel,
304  std::string &outputTupleSetName,
305  std::vector<std::string> &outputColumns,
306  std::string &outputColumnName,
307  std::string &myLambdaName,
308  MultiInputsBase *multiInputsComp) {
309  std::cout << "toTCAPStringForCartesianJoin() should not be implemented here!" << std::endl;
310  exit(1);
311  }
312 
339  virtual std::string toTCAPString(std::vector<std::string> &inputTupleSetNames,
340  std::vector<std::string> &inputColumnNames,
341  std::vector<std::string> &inputColumnsToApply,
342  std::vector<std::string> &childrenLambdaNames,
343  int lambdaLabel,
344  std::string computationName,
345  int computationLabel,
346  std::string &outputTupleSetName,
347  std::vector<std::string> &outputColumns,
348  std::string &outputColumnName,
349  std::string &myLambdaName,
350  MultiInputsBase *multiInputsComp = nullptr,
351  bool amIPartOfJoinPredicate = false,
352  bool amILeftChildOfEqualLambda = false,
353  bool amIRightChildOfEqualLambda = false,
354  std::string parentLambdaName = "",
355  bool isSelfJoin = false) {
356  std::string tcapString;
357  std::string lambdaType = getTypeOfLambda();
358  if ((lambdaType.find("==") != std::string::npos) ||
359  (lambdaType.find("&&") != std::string::npos)) {
360  return "";
361  }
362 
363  if ((lambdaType.find("native_lambda") != std::string::npos) && (multiInputsComp != nullptr)
364  && amIPartOfJoinPredicate &&
365  !amIRightChildOfEqualLambda
366  && ((parentLambdaName.empty()) || (parentLambdaName.find("&&") != std::string::npos))) {
367 
368  return toTCAPStringForCartesianJoin(lambdaLabel,
369  computationName,
370  computationLabel,
371  outputTupleSetName,
372  outputColumns,
373  outputColumnName,
374  myLambdaName,
375  multiInputsComp);
376  }
377 
378  std::string computationNameWithLabel = computationName + "_" + std::to_string(computationLabel);
379  myLambdaName = getTypeOfLambda() + "_" + std::to_string(lambdaLabel);
380  std::string inputTupleSetName = inputTupleSetNames[0];
381  std::string tupleSetMidTag = "OutFor";
382 
383  std::vector<std::string> originalInputColumnsToApply;
384 
385  int myIndex;
386  if (multiInputsComp != nullptr) {
387  if (amILeftChildOfEqualLambda || amIRightChildOfEqualLambda) {
388  tupleSetMidTag = "Extracted";
389  }
390  myIndex = this->getInputIndex(0);
391  PDB_COUT << myLambdaName + ": myIndex=" << myIndex << std::endl;
392  inputTupleSetName = multiInputsComp->getTupleSetNameForIthInput(myIndex);
393  PDB_COUT << "inputTupleSetName=" << inputTupleSetName << std::endl;
394  inputColumnNames = multiInputsComp->getInputColumnsForIthInput(myIndex);
395 
396  inputColumnsToApply.clear();
397 
398  if (this->getNumInputs() == 1) {
399  inputColumnsToApply.push_back(multiInputsComp->getNameForIthInput(myIndex));
400  originalInputColumnsToApply.push_back(multiInputsComp->getNameForIthInput(myIndex));
401  } else {
402  for (int i = 0; i < this->getNumInputs(); i++) {
403  int index = this->getInputIndex(i);
404  inputColumnsToApply.push_back(multiInputsComp->getNameForIthInput(index));
405  originalInputColumnsToApply.push_back(
406  multiInputsComp->getNameForIthInput(myIndex));
407  }
408  }
409  multiInputsComp->setLambdasForIthInputAndPredicate(
410  myIndex, parentLambdaName, myLambdaName);
411  }
412 
413  PDB_COUT << "input columns to apply: " << std::endl;
414  for (const auto &i : originalInputColumnsToApply) {
415  PDB_COUT << i << std::endl;
416  }
417 
418  outputTupleSetName = lambdaType.substr(0, 5) + "_" + std::to_string(lambdaLabel) + tupleSetMidTag + computationName
419  + std::to_string(computationLabel);
420  outputColumnName =
421  lambdaType.substr(0, 5) + "_" + std::to_string(lambdaLabel) + "_" + std::to_string(computationLabel)
422  + tupleSetMidTag;
423 
424  outputColumns.clear();
425  for (const auto &inputColumnName : inputColumnNames) {
426  outputColumns.push_back(inputColumnName);
427  }
428  outputColumns.push_back(outputColumnName);
429 
430  // the additional info about this attribute access lambda
431  std::map<std::string, std::string> info;
432 
433  // fill in the info
434  info["lambdaType"] = getTypeOfLambda();
435 
436  tcapString += getTCAPString(inputTupleSetName,
437  inputColumnNames,
438  inputColumnsToApply,
439  outputTupleSetName,
440  outputColumns,
441  outputColumnName,
442  "APPLY",
443  computationNameWithLabel,
444  myLambdaName,
445  info);
446 
447  if (multiInputsComp != nullptr) {
448  if (amILeftChildOfEqualLambda || amIRightChildOfEqualLambda) {
449  inputTupleSetName = outputTupleSetName;
450  inputColumnNames.clear();
451  for (const auto &outputColumn : outputColumns) {
452  // we want to remove the extracted value column from here
453  if (outputColumn != outputColumnName) {
454  inputColumnNames.push_back(outputColumn);
455  }
456  }
457  inputColumnsToApply.clear();
458  inputColumnsToApply.push_back(outputColumnName);
459 
460  std::string hashOperator;
461  if (amILeftChildOfEqualLambda) {
462  hashOperator = "HASHLEFT";
463  } else {
464  hashOperator = "HASHRIGHT";
465  }
466  outputTupleSetName = outputTupleSetName + "_hashed";
467  outputColumnName = outputColumnName + "_hash";
468  outputColumns.clear();
469 
470  for (const auto &inputColumnName : inputColumnNames) {
471  outputColumns.push_back(inputColumnName);
472  }
473  outputColumns.push_back(outputColumnName);
474 
475  tcapString += getTCAPString(inputTupleSetName,
476  inputColumnNames,
477  inputColumnsToApply,
478  outputTupleSetName,
479  outputColumns,
480  outputColumnName,
481  hashOperator,
482  computationNameWithLabel,
483  parentLambdaName,
484  std::map<std::string, std::string>());
485  }
486  if (!isSelfJoin) {
487  for (unsigned int index = 0; index < multiInputsComp->getNumInputs(); index++) {
488  std::string curInput = multiInputsComp->getNameForIthInput(index);
489  PDB_COUT << "curInput is " << curInput << std::endl;
490  auto iter = std::find(outputColumns.begin(), outputColumns.end(), curInput);
491  if (iter != outputColumns.end()) {
492  PDB_COUT << "MultiInputsBase with index=" << index << " is updated."
493  << std::endl;
494  multiInputsComp->setTupleSetNameForIthInput(index, outputTupleSetName);
495  multiInputsComp->setInputColumnsForIthInput(index, outputColumns);
496  multiInputsComp->setInputColumnsToApplyForIthInput(index, outputColumnName);
497  }
498  PDB_COUT << std::endl;
499  auto iter1 = std::find(originalInputColumnsToApply.begin(),
500  originalInputColumnsToApply.end(),
501  curInput);
502  if (iter1 != originalInputColumnsToApply.end()) {
503  PDB_COUT << "MultiInputsBase with index=" << index << " is updated."
504  << std::endl;
505  multiInputsComp->setTupleSetNameForIthInput(index, outputTupleSetName);
506  multiInputsComp->setInputColumnsForIthInput(index, outputColumns);
507  multiInputsComp->setInputColumnsToApplyForIthInput(index, outputColumnName);
508  }
509  }
510  } else {
511  // only update myIndex
512  multiInputsComp->setTupleSetNameForIthInput(myIndex, outputTupleSetName);
513  multiInputsComp->setInputColumnsForIthInput(myIndex, outputColumns);
514  multiInputsComp->setInputColumnsToApplyForIthInput(myIndex, outputColumnName);
515  }
516  }
517  return tcapString;
518  }
519 
524  virtual std::map<std::string, std::string> getInfo() = 0;
525 
530  virtual std::string getOutputType() = 0;
531 };
532 
537 template<typename Out>
539 
540  public:
541 
542  virtual ~TypedLambdaObject() = default;
543 
544  std::string getOutputType() override {
545  return getTypeName<Out>();
546  }
547 };
548 }
549 
550 #endif
virtual ComputeExecutorPtr getLeftHasher(TupleSpec &inputSchema, TupleSpec &attsToOperateOn, TupleSpec &attsToIncludeInOutput)
std::string getOutputType() override
virtual ComputeExecutorPtr getRightHasher(TupleSpec &inputSchema, TupleSpec &attsToOperateOn, TupleSpec &attsToIncludeInOutput, ComputeInfoPtr)
virtual ComputeExecutorPtr getLeftHasher(TupleSpec &inputSchema, TupleSpec &attsToOperateOn, TupleSpec &attsToIncludeInOutput, ComputeInfoPtr)
std::string getTCAPString(const std::string &inputTupleSetName, const std::vector< std::string > &inputColumnNames, const std::vector< std::string > &inputColumnsToApply, const std::string &outputTupleSetName, const std::vector< std::string > &outputColumns, const std::string &outputColumnName, const std::string &tcapOperation, const std::string &computationNameAndLabel, const std::string &lambdaNameAndLabel, const std::map< std::string, std::string > &info)
virtual GenericLambdaObjectPtr getChild(int which)=0
std::shared_ptr< ComputeInfo > ComputeInfoPtr
Definition: ComputeInfo.h:33
virtual ComputeExecutorPtr getExecutor(TupleSpec &inputSchema, TupleSpec &attsToOperateOn, TupleSpec &attsToIncludeInOutput)=0
std::shared_ptr< GenericLambdaObject > GenericLambdaObjectPtr
virtual std::string getTypeOfLambda()=0
std::vector< unsigned int > inputIndexes
virtual unsigned int getNumInputs()=0
virtual ~TypedLambdaObject()=default
#define PDB_COUT
Definition: PDBDebug.h:31
virtual unsigned int getInputIndex(int i)
virtual std::string getOutputType()=0
std::shared_ptr< ComputeExecutor > ComputeExecutorPtr
virtual std::string toTCAPStringForCartesianJoin(int lambdaLabel, std::string computationName, int computationLabel, std::string &outputTupleSetName, std::vector< std::string > &outputColumns, std::string &outputColumnName, std::string &myLambdaName, MultiInputsBase *multiInputsComp)
virtual ~GenericLambdaObject()=default
virtual ComputeExecutorPtr getRightHasher(TupleSpec &inputSchema, TupleSpec &attsToOperateOn, TupleSpec &attsToIncludeInOutput)
virtual int getNumChildren()=0
void setInputIndex(int i, unsigned int index)
virtual std::map< std::string, std::string > getInfo()=0
virtual ComputeExecutorPtr getExecutor(TupleSpec &inputSchema, TupleSpec &attsToOperateOn, TupleSpec &attsToIncludeInOutput, ComputeInfoPtr)
virtual std::string toTCAPString(std::vector< std::string > &inputTupleSetNames, std::vector< std::string > &inputColumnNames, std::vector< std::string > &inputColumnsToApply, std::vector< std::string > &childrenLambdaNames, int lambdaLabel, std::string computationName, int computationLabel, std::string &outputTupleSetName, std::vector< std::string > &outputColumns, std::string &outputColumnName, std::string &myLambdaName, MultiInputsBase *multiInputsComp=nullptr, bool amIPartOfJoinPredicate=false, bool amILeftChildOfEqualLambda=false, bool amIRightChildOfEqualLambda=false, std::string parentLambdaName="", bool isSelfJoin=false)