A platform for high-performance distributed tool and library development written in C++. It can be deployed in two different cluster modes: standalone or distributed. API for v0.5.0, released on June 13, 2018.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
Lambda.h
Go to the documentation of this file.
1 /*****************************************************************************
2  * *
3  * Copyright 2018 Rice University *
4  * *
5  * Licensed under the Apache License, Version 2.0 (the "License"); *
6  * you may not use this file except in compliance with the License. *
7  * You may obtain a copy of the License at *
8  * *
9  * http://www.apache.org/licenses/LICENSE-2.0 *
10  * *
11  * Unless required by applicable law or agreed to in writing, software *
12  * distributed under the License is distributed on an "AS IS" BASIS, *
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. *
14  * See the License for the specific language governing permissions and *
15  * limitations under the License. *
16  * *
17  *****************************************************************************/
18 
19 #ifndef LAMBDA_H
20 #define LAMBDA_H
21 
22 #include <memory>
23 #include <vector>
24 #include <functional>
25 #include "Object.h"
26 #include "Handle.h"
27 #include "Ptr.h"
28 #include "TupleSpec.h"
29 #include "ComputeExecutor.h"
30 #include "GenericLambdaObject.h"
31 #include "DereferenceLambda.h"
32 #include "MultiInputsBase.h"
33 
34 namespace pdb {
35 
40 template<class ReturnType>
41 class Lambda {
42  public:
43 
49  Lambda(LambdaTree<Ptr<ReturnType>> treeWithPointer) {
50 
51  // a problem is that consumers of this lambda will not be able to deal with a
52  // Ptr<ReturnType>...
53  // so we need to add an additional operation that dereferences the pointer
54  std::shared_ptr<DereferenceLambda<ReturnType>> newRoot = std::make_shared<DereferenceLambda<ReturnType>>(treeWithPointer);
55  tree = newRoot;
56  }
57 
62  Lambda(LambdaTree<ReturnType> tree) : tree(tree.getPtr()) {}
63 
64  unsigned int getInputIndex() {
65  return tree->getInputIndex();
66  }
67 
73  void toMap(std::map<std::string, GenericLambdaObjectPtr> &returnVal, int &suffix) {
74  traverse(returnVal, tree, suffix);
75  }
76 
77  std::vector<std::string> getAllInputs(MultiInputsBase *multiInputsBase) {
78  std::vector<std::string> ret;
79  this->getInputs(ret, tree, multiInputsBase);
80  return ret;
81  }
82 
101  std::string toTCAPString(std::string inputTupleSetName,
102  std::vector<std::string> &inputColumnNames,
103  std::vector<std::string> &inputColumnsToApply,
104  std::vector<std::string> &childrenLambdaNames,
105  int &lambdaLabel,
106  std::string computationName,
107  int computationLabel,
108  std::string &outputTupleSetName,
109  std::vector<std::string> &outputColumnNames,
110  std::string &addedOutputColumnName,
111  std::string &myLambdaName,
112  bool whetherToRemoveUnusedOutputColumns,
113  MultiInputsBase *multiInputsComp = nullptr,
114  bool amIPartOfJoinPredicate = false) {
115  std::vector<std::string> tcapStrings;
116  std::string outputTCAPString;
117  std::vector<std::string> inputTupleSetNames;
118  inputTupleSetNames.push_back(inputTupleSetName);
119  std::vector<std::string> columnNames;
120 
121  for (const auto &inputColumnName : inputColumnNames) {
122  columnNames.push_back(inputColumnName);
123  }
124 
125  std::vector<std::string> columnsToApply;
126  for (const auto &i : inputColumnsToApply) {
127  columnsToApply.push_back(i);
128  }
129 
130  std::vector<std::string> childrenLambdas;
131  for (const auto &childrenLambdaName : childrenLambdaNames) {
132  childrenLambdas.push_back(childrenLambdaName);
133  }
134 
135  getTCAPString(tcapStrings,
136  inputTupleSetNames,
137  columnNames,
138  columnsToApply,
139  childrenLambdas,
140  tree,
141  lambdaLabel,
142  computationName,
143  computationLabel,
144  addedOutputColumnName,
145  myLambdaName,
146  outputTupleSetName,
147  multiInputsComp,
148  amIPartOfJoinPredicate);
149  PDB_COUT << "Lambda: lambdaLabel=" << lambdaLabel << std::endl;
150  bool isOutputInInput = false;
151  outputColumnNames.clear();
152 
153  if (!whetherToRemoveUnusedOutputColumns) {
154 
155  for (const auto &columnName : columnNames) {
156  outputColumnNames.push_back(columnName);
157  if (addedOutputColumnName == columnName) {
158  isOutputInInput = true;
159  }
160  }
161 
162  if (!isOutputInInput) {
163  outputColumnNames.push_back(addedOutputColumnName);
164  }
165 
166  } else {
167  outputColumnNames.push_back(addedOutputColumnName);
168  }
169 
170  // TODO this is very dirty and should not be done like that! For now I'm going to patch it!
171  if (whetherToRemoveUnusedOutputColumns) {
172 
173  // get the last tcap string
174  unsigned long last = tcapStrings.size() - 1;
175 
176  PDB_COUT << "tcapStrings[" << last << "]=" << tcapStrings[last] << std::endl;
177  std::string right = tcapStrings[last].substr(tcapStrings[last].find("<="));
178 
179  // by default the end is an empty string
180  std::string end;
181 
182  // check if we have an info dictionary if we have chop off the end and store it in the end variable
183  if (right.find('[') != std::string::npos) {
184  end = right.substr(right.find('['));
185  right = right.substr(0, right.find('['));
186  }
187 
188  // find the positions of the last brackets ()
189  unsigned long pos1 = right.find_last_of('(');
190  unsigned long pos2 = right.rfind("),");
191 
192  // empty out anything between the brackets
193  right.replace(pos1 + 1, pos2 - 1 - (pos1 + 1) + 1, "");
194 
195  // combine the string and replace it
196  tcapStrings[last] = outputTupleSetName + " (" + addedOutputColumnName + ") " + right + end;
197  }
198 
199  // combine all the tcap strings
200  for (const auto &tcapString : tcapStrings) {
201  outputTCAPString.append(tcapString);
202  }
203 
204  return outputTCAPString;
205  }
206 
207  private:
208 
212  std::shared_ptr<TypedLambdaObject<ReturnType>> tree;
213 
222  static void traverse(std::map<std::string, GenericLambdaObjectPtr> &fillMe,
224  int &startLabel) {
225 
226  for (int i = 0; i < root->getNumChildren(); i++) {
227  GenericLambdaObjectPtr child = root->getChild(i);
228  traverse(fillMe, child, startLabel);
229  }
230 
231  std::string myName = root->getTypeOfLambda();
232  myName = myName + "_" + std::to_string(startLabel);
233  startLabel++;
234  fillMe[myName] = root;
235  }
236 
243  void getInputs(std::vector<std::string> &allInputs, GenericLambdaObjectPtr root, MultiInputsBase *multiInputsBase) {
244 
245  for (int i = 0; i < root->getNumChildren(); i++) {
246 
247  GenericLambdaObjectPtr child = root->getChild(i);
248  getInputs(allInputs, child, multiInputsBase);
249  }
250 
251  if (root->getNumChildren() == 0) {
252  for (int i = 0; i < root->getNumInputs(); i++) {
253  std::string myName = multiInputsBase->getNameForIthInput(root->getInputIndex(i));
254  auto iter = std::find(allInputs.begin(), allInputs.end(), myName);
255 
256  if (iter == allInputs.end()) {
257  allInputs.push_back(myName);
258  }
259  }
260  }
261  }
262 
284  static void getTCAPString(std::vector<std::string> &tcapStrings,
285  std::vector<std::string> &inputTupleSetNames,
286  std::vector<std::string> &inputColumnNames,
287  std::vector<std::string> &inputColumnsToApply,
288  std::vector<std::string> &childrenLambdaNames,
290  int &lambdaLabel,
291  std::string computationName,
292  int computationLabel,
293  std::string &addedOutputColumnName,
294  std::string &myLambdaName,
295  std::string &outputTupleSetName,
296  MultiInputsBase *multiInputsComp = nullptr,
297  bool amIPartOfJoinPredicate = false,
298  bool amILeftChildOfEqualLambda = false,
299  bool amIRightChildOfEqualLambda = false,
300  std::string parentLambdaName = "",
301  bool isSelfJoin = false) {
302 
303  std::vector<std::string> columnsToApply;
304  std::vector<std::string> childrenLambdas;
305  std::vector<std::string> inputNames;
306  std::vector<std::string> inputColumns;
307 
308  if (root->getNumChildren() > 0) {
309 
310  for (const auto &i : inputColumnsToApply) {
311  columnsToApply.push_back(i);
312  }
313 
314  inputColumnsToApply.clear();
315 
316  for (const auto &childrenLambdaName : childrenLambdaNames) {
317  childrenLambdas.push_back(childrenLambdaName);
318  }
319 
320  childrenLambdaNames.clear();
321 
322  for (const auto &inputTupleSetName : inputTupleSetNames) {
323  auto iter = std::find(inputNames.begin(), inputNames.end(), inputTupleSetName);
324  if (iter == inputNames.end()) {
325  inputNames.push_back(inputTupleSetName);
326  }
327  }
328 
329  inputTupleSetNames.clear();
330 
331  for (const auto &inputColumnName : inputColumnNames) {
332  inputColumns.push_back(inputColumnName);
333  }
334 
335  inputColumnNames.clear();
336  }
337 
338  std::string myTypeName = root->getTypeOfLambda();
339  PDB_COUT << "\tExtracted lambda named: " << myTypeName << "\n";
340  std::string myName = myTypeName + "_" + std::to_string(lambdaLabel + root->getNumChildren());
341 
342  bool isLeftChildOfEqualLambda = false;
343  bool isRightChildOfEqualLambda = false;
344  bool isChildSelfJoin = false;
345 
346  GenericLambdaObjectPtr nextChild = nullptr;
347  for (int i = 0; i < root->getNumChildren(); i++) {
348  GenericLambdaObjectPtr child = root->getChild(i);
349 
350  if ((i + 1) < root->getNumChildren()) {
351  nextChild = root->getChild(i + 1);
352  }
353 
354  if (myTypeName == "==") {
355 
356  if (i == 0) {
357  isLeftChildOfEqualLambda = true;
358  }
359 
360  if (i == 1) {
361  isRightChildOfEqualLambda = true;
362  }
363 
364  }
365 
366  if ((isLeftChildOfEqualLambda || isRightChildOfEqualLambda) && (multiInputsComp != nullptr)) {
367 
368  std::string nextInputName;
369 
370  if (nextChild != nullptr) {
371  nextInputName = multiInputsComp->getNameForIthInput(nextChild->getInputIndex(0));
372  }
373 
374  std::string myInputName = multiInputsComp->getNameForIthInput(child->getInputIndex(0));
375 
376  if (nextInputName == myInputName) {
377  isChildSelfJoin = true;
378  }
379  }
380 
381  getTCAPString(tcapStrings,
382  inputNames,
383  inputColumns,
384  columnsToApply,
385  childrenLambdas,
386  child,
387  lambdaLabel,
388  computationName,
389  computationLabel,
390  addedOutputColumnName,
391  myLambdaName,
392  outputTupleSetName,
393  multiInputsComp,
394  amIPartOfJoinPredicate,
395  isLeftChildOfEqualLambda,
396  isRightChildOfEqualLambda,
397  myName,
398  isChildSelfJoin);
399 
400  inputColumnsToApply.push_back(addedOutputColumnName);
401  childrenLambdaNames.push_back(myLambdaName);
402 
403  if (multiInputsComp != nullptr) {
404  auto iter = std::find(inputTupleSetNames.begin(), inputTupleSetNames.end(), outputTupleSetName);
405 
406  if (iter == inputTupleSetNames.end()) {
407  inputTupleSetNames.push_back(outputTupleSetName);
408  }
409 
410  } else {
411 
412  inputTupleSetNames.clear();
413  inputTupleSetNames.push_back(outputTupleSetName);
414  inputColumnNames.clear();
415  }
416 
417  for (const auto &inputColumn : inputColumns) {
418  auto iter =
419  std::find(inputColumnNames.begin(), inputColumnNames.end(), inputColumn);
420  if (iter == inputColumnNames.end()) {
421  inputColumnNames.push_back(inputColumn);
422  }
423  }
424 
425  isLeftChildOfEqualLambda = false;
426  isRightChildOfEqualLambda = false;
427  isChildSelfJoin = false;
428  nextChild = nullptr;
429  }
430 
431  std::vector<std::string> outputColumns;
432  std::string tcapString = root->toTCAPString(inputTupleSetNames,
433  inputColumnNames,
434  inputColumnsToApply,
435  childrenLambdaNames,
436  lambdaLabel,
437  computationName,
438  computationLabel,
439  outputTupleSetName,
440  outputColumns,
441  addedOutputColumnName,
442  myLambdaName,
443  multiInputsComp,
444  amIPartOfJoinPredicate,
445  amILeftChildOfEqualLambda,
446  amIRightChildOfEqualLambda,
447  parentLambdaName,
448  isSelfJoin);
449 
450  tcapStrings.push_back(tcapString);
451  lambdaLabel++;
452 
453  if (multiInputsComp == nullptr) {
454  inputTupleSetNames.clear();
455  inputTupleSetNames.push_back(outputTupleSetName);
456  }
457 
458  inputColumnNames.clear();
459  for (const auto &outputColumn : outputColumns) {
460  inputColumnNames.push_back(outputColumn);
461  }
462 
463  }
464 };
465 }
466 
467 #endif
unsigned int getInputIndex()
Definition: Lambda.h:64
std::shared_ptr< TypedLambdaObject< ReturnType > > tree
Definition: Lambda.h:212
static void getTCAPString(std::vector< std::string > &tcapStrings, std::vector< std::string > &inputTupleSetNames, std::vector< std::string > &inputColumnNames, std::vector< std::string > &inputColumnsToApply, std::vector< std::string > &childrenLambdaNames, GenericLambdaObjectPtr root, int &lambdaLabel, std::string computationName, int computationLabel, std::string &addedOutputColumnName, std::string &myLambdaName, std::string &outputTupleSetName, MultiInputsBase *multiInputsComp=nullptr, bool amIPartOfJoinPredicate=false, bool amILeftChildOfEqualLambda=false, bool amIRightChildOfEqualLambda=false, std::string parentLambdaName="", bool isSelfJoin=false)
Definition: Lambda.h:284
void getInputs(std::vector< std::string > &allInputs, GenericLambdaObjectPtr root, MultiInputsBase *multiInputsBase)
Definition: Lambda.h:243
void toMap(std::map< std::string, GenericLambdaObjectPtr > &returnVal, int &suffix)
Definition: Lambda.h:73
std::shared_ptr< GenericLambdaObject > GenericLambdaObjectPtr
static void traverse(std::map< std::string, GenericLambdaObjectPtr > &fillMe, GenericLambdaObjectPtr root, int &startLabel)
Definition: Lambda.h:222
#define PDB_COUT
Definition: PDBDebug.h:31
std::vector< std::string > getAllInputs(MultiInputsBase *multiInputsBase)
Definition: Lambda.h:77
Lambda(LambdaTree< ReturnType > tree)
Definition: Lambda.h:62
std::string toTCAPString(std::string inputTupleSetName, std::vector< std::string > &inputColumnNames, std::vector< std::string > &inputColumnsToApply, std::vector< std::string > &childrenLambdaNames, int &lambdaLabel, std::string computationName, int computationLabel, std::string &outputTupleSetName, std::vector< std::string > &outputColumnNames, std::string &addedOutputColumnName, std::string &myLambdaName, bool whetherToRemoveUnusedOutputColumns, MultiInputsBase *multiInputsComp=nullptr, bool amIPartOfJoinPredicate=false)
Definition: Lambda.h:101
Definition: Ptr.h:32
Lambda(LambdaTree< Ptr< ReturnType >> treeWithPointer)
Definition: Lambda.h:49
std::string getNameForIthInput(int i)