A platform for high-performance distributed tool and library development written in C++. It can be deployed in two different cluster modes: standalone or distributed. API for v0.5.0, released on June 13, 2018.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
EqualsLambda.h
Go to the documentation of this file.
1 /*****************************************************************************
2  * *
3  * Copyright 2018 Rice University *
4  * *
5  * Licensed under the Apache License, Version 2.0 (the "License"); *
6  * you may not use this file except in compliance with the License. *
7  * You may obtain a copy of the License at *
8  * *
9  * http://www.apache.org/licenses/LICENSE-2.0 *
10  * *
11  * Unless required by applicable law or agreed to in writing, software *
12  * distributed under the License is distributed on an "AS IS" BASIS, *
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. *
14  * See the License for the specific language governing permissions and *
15  * limitations under the License. *
16  * *
17  *****************************************************************************/
18 
19 #ifndef EQUALS_LAM_H
20 #define EQUALS_LAM_H
21 
22 #include <vector>
23 #include "Lambda.h"
24 #include "ComputeExecutor.h"
25 #include "TupleSetMachine.h"
26 #include "TupleSet.h"
27 #include "Ptr.h"
28 #include "PDBMap.h"
29 
30 namespace pdb {
31 
32 // only one of these two versions is going to work... used to automatically hash on the underlying
33 // type
34 // in the case of a Ptr<> type
35 template<class MyType>
36 std::enable_if_t<std::is_base_of<PtrBase, MyType>::value, size_t> hashHim(MyType &him) {
37  return Hasher<decltype(*him)>::hash(*him);
38 }
39 
40 template<class MyType>
41 std::enable_if_t<!std::is_base_of<PtrBase, MyType>::value, size_t> hashHim(MyType &him) {
42  return Hasher<MyType>::hash(him);
43 }
44 
45 // only one of these four versions is going to work... used to automatically dereference a Ptr<blah>
46 // type on either the LHS or RHS of an equality check
47 template<class LHS, class RHS>
48 std::enable_if_t<std::is_base_of<PtrBase, LHS>::value && std::is_base_of<PtrBase, RHS>::value, bool>
49 checkEquals(LHS &lhs, RHS &rhs) {
50  // std :: cout << "std :: is_base_of <PtrBase, LHS> :: value && std :: is_base_of <PtrBase, RHS>
51  // :: value" << std :: endl;
52  return *lhs == *rhs;
53 }
54 
55 template<class LHS, class RHS>
56 std::enable_if_t<std::is_base_of<PtrBase, LHS>::value && !(std::is_base_of<PtrBase, RHS>::value),
57  bool>
58 checkEquals(LHS &lhs, RHS &rhs) {
59  // std :: cout << "std :: is_base_of <PtrBase, LHS> :: value && !(std :: is_base_of <PtrBase,
60  // RHS> :: value) " << std :: endl;
61  return *lhs == rhs;
62 }
63 
64 template<class LHS, class RHS>
65 std::enable_if_t<!(std::is_base_of<PtrBase, LHS>::value) && std::is_base_of<PtrBase, RHS>::value,
66  bool>
67 checkEquals(LHS &lhs, RHS &rhs) {
68  // std :: cout << "!(std :: is_base_of <PtrBase, LHS> :: value) && std :: is_base_of <PtrBase,
69  // RHS> :: value" << std :: endl;
70  return lhs == *rhs;
71 }
72 
73 template<class LHS, class RHS>
74 std::enable_if_t<!(std::is_base_of<PtrBase, LHS>::value) && !(std::is_base_of<PtrBase, RHS>::value),
75  bool>
76 checkEquals(LHS &lhs, RHS &rhs) {
77  // std :: cout << "!(std :: is_base_of <PtrBase, LHS> :: value) && !(std :: is_base_of <PtrBase,
78  // RHS>" << std :: endl;
79  return lhs == rhs;
80 }
81 
82 template<class LeftType, class RightType>
83 class EqualsLambda : public TypedLambdaObject<bool> {
84 
85  public:
88 
89  public:
91  lhs = lhsIn;
92  rhs = rhsIn;
93  PDB_COUT << "EqualsLambda: LHS index is " << lhs.getInputIndex(0) << std::endl;
94  PDB_COUT << "EqualsLambda: RHS index is " << rhs.getInputIndex(0) << std::endl;
95  this->setInputIndex(0, lhs.getInputIndex(0));
96  this->setInputIndex(1, rhs.getInputIndex(0));
97  }
98 
99  std::string getTypeOfLambda() override {
100  return std::string("==");
101  }
102 
103  std::string toTCAPString(std::vector<std::string> &inputTupleSetNames,
104  std::vector<std::string> &inputColumnNames,
105  std::vector<std::string> &inputColumnsToApply,
106  std::vector<std::string> &childrenLambdaNames,
107  int lambdaLabel,
108  std::string computationName,
109  int computationLabel,
110  std::string &outputTupleSetName,
111  std::vector<std::string> &outputColumns,
112  std::string &outputColumnName,
113  std::string &myLambdaName,
114  MultiInputsBase *multiInputsComp = nullptr,
115  bool amIPartOfJoinPredicate = false,
116  bool amILeftChildOfEqualLambda = false,
117  bool amIRightChildOfEqualLambda = false,
118  std::string parentLambdaName = "",
119  bool isSelfJoin = false) override {
120  std::string tcapString;
121  myLambdaName = getTypeOfLambda() + "_" + std::to_string(lambdaLabel);
122  std::string computationNameWithLabel = computationName + "_" + std::to_string(computationLabel);
123  std::string inputTupleSetName;
124  if (multiInputsComp == nullptr) {
125  inputTupleSetName = inputTupleSetNames[0];
126  outputTupleSetName = "equals_" + std::to_string(lambdaLabel) + "OutFor" + computationName + std::to_string(computationLabel);
127  outputColumnName = "bool_" + std::to_string(lambdaLabel) + "_" + std::to_string(computationLabel);
128  outputColumns.clear();
129  for (const auto &inputColumnName : inputColumnNames) {
130  outputColumns.push_back(inputColumnName);
131  }
132  outputColumns.push_back(outputColumnName);
133 
134  tcapString += "\n/* Apply selection predicate on " + inputColumnsToApply[0] + " and " + inputColumnsToApply[1] + "*/\n";
135  tcapString += this->getTCAPString(inputTupleSetName,
136  inputColumnNames,
137  inputColumnsToApply,
138  outputTupleSetName,
139  outputColumns,
140  outputColumnName,
141  "APPLY",
142  computationNameWithLabel,
143  myLambdaName,
144  getInfo());
145 
146  } else {
147 
148  if (inputColumnNames[inputColumnNames.size() - 2] == inputColumnsToApply[0]) {
149  if (inputColumnNames.size() == 4) {
150  inputColumnNames[2] = inputColumnNames[1];
151  inputColumnNames[1] = inputColumnsToApply[0];
152  } else if (inputColumnNames.size() == 3) {
153  inputColumnNames.push_back(inputColumnNames[2]);
154  inputColumnNames[2] = inputColumnNames[0];
155  } else {
156  std::cout << "Error: right now we can't support such complex join selection "
157  "conditions"
158  << std::endl;
159  exit(1);
160  }
161  }
162  tcapString += "\n/* Join ( " + inputColumnNames[0];
163  for (unsigned int i = 1; i < inputColumnNames.size() - 1; i++) {
164  if (inputColumnNames[i] == inputColumnsToApply[0]) {
165  tcapString += " ) and (";
166  } else {
167  tcapString += " " + inputColumnNames[i];
168  }
169  }
170 
171  tcapString += " ) */\n";
172  outputTupleSetName = "JoinedFor_equals" + std::to_string(lambdaLabel) +
173  computationName + std::to_string(computationLabel);
174  std::string tupleSetNamePrefix = outputTupleSetName;
175  outputColumns.clear();
176 
177  // TODO: push down projection here
178  for (const auto &inputColumnName : inputColumnNames) {
179  auto iter = std::find(
180  inputColumnsToApply.begin(), inputColumnsToApply.end(), inputColumnName);
181  if (iter == inputColumnsToApply.end()) {
182  outputColumns.push_back(inputColumnName);
183  }
184  }
185  outputColumnName = "";
186 
187  tcapString += outputTupleSetName + "(" + outputColumns[0];
188  for (int i = 1; i < outputColumns.size(); i++) {
189  tcapString += ", " + outputColumns[i];
190  }
191  tcapString += ") <= JOIN (" + inputTupleSetNames[0] + "(" + inputColumnsToApply[0] + "), ";
192  tcapString += inputTupleSetNames[0] + "(" + inputColumnNames[0];
193  int end1 = -1;
194  for (int i = 1; i < inputColumnNames.size(); i++) {
195  auto iter = std::find(
196  inputColumnsToApply.begin(), inputColumnsToApply.end(), inputColumnNames[i]);
197  if (iter != inputColumnsToApply.end()) {
198  end1 = i;
199  break;
200  }
201  tcapString += ", " + inputColumnNames[i];
202  }
203  if (end1 + 1 >= inputColumnNames.size()) {
204  std::cout << "Can't generate TCAP for this query graph" << std::endl;
205  exit(1);
206  }
207  tcapString += "), " + inputTupleSetNames[1] + "(" + inputColumnsToApply[1] + "), " +
208  inputTupleSetNames[1] + "(" + inputColumnNames[end1 + 1];
209  for (int i = end1 + 2; i < inputColumnNames.size(); i++) {
210  auto iter = std::find(
211  inputColumnsToApply.begin(), inputColumnsToApply.end(), inputColumnNames[i]);
212  if (iter != inputColumnsToApply.end()) {
213  break;
214  }
215  tcapString += ", " + inputColumnNames[i];
216  }
217 
218  tcapString += "), '" + computationNameWithLabel + "')\n";
219 
220  inputTupleSetName = outputTupleSetName;
221  inputColumnNames.clear();
222  for (const auto &outputColumn : outputColumns) {
223  inputColumnNames.push_back(outputColumn);
224  }
225  inputColumnsToApply.clear();
226  inputColumnsToApply.push_back(multiInputsComp->getNameForIthInput(lhs.getInputIndex(0)));
227  outputColumnName = "LHSExtractedFor_" + std::to_string(lambdaLabel) + "_" + std::to_string(computationLabel);
228  outputColumns.push_back(outputColumnName);
229  outputTupleSetName = tupleSetNamePrefix + "_WithLHSExtracted";
230 
231  // the additional info about this attribute access lambda
232  std::map<std::string, std::string> info;
233 
234  tcapString += this->getTCAPString(inputTupleSetName,
235  inputColumnNames,
236  inputColumnsToApply,
237  outputTupleSetName,
238  outputColumns,
239  outputColumnName,
240  "APPLY",
241  computationNameWithLabel,
242  childrenLambdaNames[0],
243  getChild(0)->getInfo());
244 
245  inputTupleSetName = outputTupleSetName;
246  inputColumnNames.push_back(outputColumnName);
247  inputColumnsToApply.clear();
248  inputColumnsToApply.push_back(multiInputsComp->getNameForIthInput(rhs.getInputIndex(0)));
249  outputTupleSetName = tupleSetNamePrefix + "_WithBOTHExtracted";
250  outputColumnName = "RHSExtractedFor_" + std::to_string(lambdaLabel) + "_" + std::to_string(computationLabel);
251  outputColumns.push_back(outputColumnName);
252 
253  // add the tcap string
254  tcapString += this->getTCAPString(inputTupleSetName,
255  inputColumnNames,
256  inputColumnsToApply,
257  outputTupleSetName,
258  outputColumns,
259  outputColumnName,
260  "APPLY",
261  computationNameWithLabel,
262  childrenLambdaNames[1],
263  getChild(1)->getInfo());
264 
265  inputTupleSetName = outputTupleSetName;
266  inputColumnsToApply.clear();
267  inputColumnsToApply.push_back("LHSExtractedFor_" + std::to_string(lambdaLabel) + "_" +
268  std::to_string(computationLabel));
269  inputColumnsToApply.push_back("RHSExtractedFor_" + std::to_string(lambdaLabel) + "_" +
270  std::to_string(computationLabel));
271  inputColumnNames.pop_back();
272  outputColumnName =
273  "bool_" + std::to_string(lambdaLabel) + "_" + std::to_string(computationLabel);
274  outputColumns.pop_back();
275  outputColumns.pop_back();
276  outputColumns.push_back(outputColumnName);
277  outputTupleSetName = tupleSetNamePrefix + "_BOOL";
278 
279  tcapString += this->getTCAPString(inputTupleSetName,
280  inputColumnNames,
281  inputColumnsToApply,
282  outputTupleSetName,
283  outputColumns,
284  outputColumnName,
285  "APPLY",
286  computationNameWithLabel,
287  myLambdaName,
288  getInfo());
289 
290  inputTupleSetName = outputTupleSetName;
291  outputColumnName = "";
292  outputColumns.pop_back();
293  outputTupleSetName = tupleSetNamePrefix + "_FILTERED";
294  tcapString += outputTupleSetName + "(" + outputColumns[0];
295  for (int i = 1; i < outputColumns.size(); i++) {
296  tcapString += ", " + outputColumns[i];
297  }
298  tcapString += ") <= FILTER (" + inputTupleSetName + "(bool_" +
299  std::to_string(lambdaLabel) + "_" + std::to_string(computationLabel) + "), " +
300  inputTupleSetName + "(" + outputColumns[0];
301  for (int i = 1; i < outputColumns.size(); i++) {
302  tcapString += ", " + outputColumns[i];
303  }
304  tcapString += "), '" + computationNameWithLabel + "')\n";
305 
306  if (!isSelfJoin) {
307  for (unsigned int index = 0; index < multiInputsComp->getNumInputs(); index++) {
308  std::string curInput = multiInputsComp->getNameForIthInput(index);
309  auto iter = std::find(outputColumns.begin(), outputColumns.end(), curInput);
310  if (iter != outputColumns.end()) {
311  multiInputsComp->setTupleSetNameForIthInput(index, outputTupleSetName);
312  multiInputsComp->setInputColumnsForIthInput(index, outputColumns);
313  multiInputsComp->setInputColumnsToApplyForIthInput(index, outputColumnName);
314  }
315  }
316  }
317  }
318  return tcapString;
319  }
320 
325  std::map<std::string, std::string> getInfo() override {
326 
327  // fill in the info
328  return std::map<std::string, std::string>{
329  std::make_pair ("lambdaType", getTypeOfLambda())
330  };
331  };
332 
333  unsigned int getNumInputs() override {
334  return 2;
335  }
336 
337  int getNumChildren() override {
338  return 2;
339  }
340 
341  GenericLambdaObjectPtr getChild(int which) override {
342  if (which == 0)
343  return lhs.getPtr();
344  if (which == 1)
345  return rhs.getPtr();
346  return nullptr;
347  }
348 
349  /* bool addColumnToTupleSet (std :: string &pleaseCreateThisType, TupleSetPtr input, int outAtt)
350  override {
351  if (pleaseCreateThisType == getTypeName <bool> ()) {
352  std :: vector <bool> *outColumn = new std :: vector <bool>;
353  input->addColumn (outAtt, outColumn, true);
354  return true;
355  }
356  return false;
357  } */
358 
360  TupleSpec &attsToOperateOn,
361  TupleSpec &attsToIncludeInOutput) override {
362 
363  // create the output tuple set
364  TupleSetPtr output = std::make_shared<TupleSet>();
365 
366  // create the machine that is going to setup the output tuple set, using the input tuple set
367  TupleSetSetupMachinePtr myMachine =
368  std::make_shared<TupleSetSetupMachine>(inputSchema, attsToIncludeInOutput);
369 
370  // these are the input attributes that we will process
371  std::vector<int> inputAtts = myMachine->match(attsToOperateOn);
372  int firstAtt = inputAtts[0];
373  int secondAtt = inputAtts[1];
374 
375  // this is the output attribute
376  int outAtt = attsToIncludeInOutput.getAtts().size();
377 
378  return std::make_shared<SimpleComputeExecutor>(
379  output,
380  [=](TupleSetPtr input) {
381 
382  // set up the output tuple set
383  myMachine->setup(input, output);
384 
385  // get the columns to operate on
386  std::vector<LeftType> &leftColumn = input->getColumn<LeftType>(firstAtt);
387  std::vector<RightType> &rightColumn = input->getColumn<RightType>(secondAtt);
388 
389  // create the output attribute, if needed
390  if (!output->hasColumn(outAtt)) {
391  std::vector<bool> *outColumn = new std::vector<bool>;
392  output->addColumn(outAtt, outColumn, true);
393  }
394 
395  // get the output column
396  std::vector<bool> &outColumn = output->getColumn<bool>(outAtt);
397 
398  // loop down the columns, setting the output
399  int numTuples = leftColumn.size();
400  outColumn.resize(numTuples);
401  // std :: cout << "numTuples: " << numTuples << std :: endl;
402  for (int i = 0; i < numTuples; i++) {
403  // std :: cout << "processing " << i << std :: endl;
404  bool out = checkEquals(leftColumn[i], rightColumn[i]);
405  // std :: cout << "out is " << out << std :: endl;
406 
407  outColumn[i] = out;
408  }
409  return output;
410  },
411 
412  "equalsLambda");
413  }
414 
416  TupleSpec &attsToOperateOn,
417  TupleSpec &attsToIncludeInOutput) override {
418 
419  // create the output tuple set
420  TupleSetPtr output = std::make_shared<TupleSet>();
421 
422  // create the machine that is going to setup the output tuple set, using the input tuple set
423  TupleSetSetupMachinePtr myMachine =
424  std::make_shared<TupleSetSetupMachine>(inputSchema, attsToIncludeInOutput);
425 
426  // these are the input attributes that we will process
427  std::vector<int> inputAtts = myMachine->match(attsToOperateOn);
428  int secondAtt = inputAtts[0];
429 
430  // this is the output attribute
431  int outAtt = attsToIncludeInOutput.getAtts().size();
432 
433  return std::make_shared<SimpleComputeExecutor>(
434  output,
435  [=](TupleSetPtr input) {
436 
437  // set up the output tuple set
438  myMachine->setup(input, output);
439 
440  // get the columns to operate on
441  std::vector<RightType> &rightColumn = input->getColumn<RightType>(secondAtt);
442 
443  // create the output attribute, if needed
444  if (!output->hasColumn(outAtt)) {
445  std::vector<size_t> *outColumn = new std::vector<size_t>;
446  output->addColumn(outAtt, outColumn, true);
447  }
448 
449  // get the output column
450  std::vector<size_t> &outColumn = output->getColumn<size_t>(outAtt);
451 
452  // loop down the columns, setting the output
453  int numTuples = rightColumn.size();
454  outColumn.resize(numTuples);
455  for (int i = 0; i < numTuples; i++) {
456  outColumn[i] = hashHim(rightColumn[i]);
457  }
458  return output;
459  },
460 
461  "rightHasher"
462 
463  );
464  }
465 
467  TupleSpec &attsToOperateOn,
468  TupleSpec &attsToIncludeInOutput) override {
469 
470  // create the output tuple set
471  TupleSetPtr output = std::make_shared<TupleSet>();
472 
473  // create the machine that is going to setup the output tuple set, using the input tuple set
474  TupleSetSetupMachinePtr myMachine =
475  std::make_shared<TupleSetSetupMachine>(inputSchema, attsToIncludeInOutput);
476 
477  // these are the input attributes that we will process
478  std::vector<int> inputAtts = myMachine->match(attsToOperateOn);
479  int firstAtt = inputAtts[0];
480 
481  // this is the output attribute
482  int outAtt = attsToIncludeInOutput.getAtts().size();
483 
484  return std::make_shared<SimpleComputeExecutor>(
485  output,
486  [=](TupleSetPtr input) {
487 
488  // set up the output tuple set
489  myMachine->setup(input, output);
490 
491  // get the columns to operate on
492  std::vector<LeftType> &leftColumn = input->getColumn<LeftType>(firstAtt);
493 
494  // create the output attribute, if needed
495  if (!output->hasColumn(outAtt)) {
496  std::vector<size_t> *outColumn = new std::vector<size_t>;
497  output->addColumn(outAtt, outColumn, true);
498  }
499 
500  // get the output column
501  std::vector<size_t> &outColumn = output->getColumn<size_t>(outAtt);
502 
503  // loop down the columns, setting the output
504  int numTuples = leftColumn.size();
505  outColumn.resize(numTuples);
506  for (int i = 0; i < numTuples; i++) {
507  outColumn[i] = hashHim(leftColumn[i]);
508  }
509  return output;
510  },
511 
512  "leftHasher");
513  }
514 };
515 }
516 
517 #endif
std::vector< std::string > & getAtts()
Definition: TupleSpec.h:60
ComputeExecutorPtr getRightHasher(TupleSpec &inputSchema, TupleSpec &attsToOperateOn, TupleSpec &attsToIncludeInOutput) override
Definition: EqualsLambda.h:415
std::enable_if_t< std::is_base_of< PtrBase, LHS >::value &&std::is_base_of< PtrBase, RHS >::value, bool > checkEquals(LHS &lhs, RHS &rhs)
Definition: EqualsLambda.h:49
std::string toTCAPString(std::vector< std::string > &inputTupleSetNames, std::vector< std::string > &inputColumnNames, std::vector< std::string > &inputColumnsToApply, std::vector< std::string > &childrenLambdaNames, int lambdaLabel, std::string computationName, int computationLabel, std::string &outputTupleSetName, std::vector< std::string > &outputColumns, std::string &outputColumnName, std::string &myLambdaName, MultiInputsBase *multiInputsComp=nullptr, bool amIPartOfJoinPredicate=false, bool amILeftChildOfEqualLambda=false, bool amIRightChildOfEqualLambda=false, std::string parentLambdaName="", bool isSelfJoin=false) override
Definition: EqualsLambda.h:103
std::string getTCAPString(const std::string &inputTupleSetName, const std::vector< std::string > &inputColumnNames, const std::vector< std::string > &inputColumnsToApply, const std::string &outputTupleSetName, const std::vector< std::string > &outputColumns, const std::string &outputColumnName, const std::string &tcapOperation, const std::string &computationNameAndLabel, const std::string &lambdaNameAndLabel, const std::map< std::string, std::string > &info)
std::shared_ptr< TupleSetSetupMachine > TupleSetSetupMachinePtr
std::enable_if_t< std::is_base_of< PtrBase, MyType >::value, size_t > hashHim(MyType &him)
Definition: EqualsLambda.h:36
ComputeExecutorPtr getLeftHasher(TupleSpec &inputSchema, TupleSpec &attsToOperateOn, TupleSpec &attsToIncludeInOutput) override
Definition: EqualsLambda.h:466
EqualsLambda(LambdaTree< LeftType > lhsIn, LambdaTree< RightType > rhsIn)
Definition: EqualsLambda.h:90
int getNumChildren() override
Definition: EqualsLambda.h:337
std::shared_ptr< GenericLambdaObject > GenericLambdaObjectPtr
ComputeExecutorPtr getExecutor(TupleSpec &inputSchema, TupleSpec &attsToOperateOn, TupleSpec &attsToIncludeInOutput) override
Definition: EqualsLambda.h:359
std::string getTypeOfLambda() override
Definition: EqualsLambda.h:99
GenericLambdaObjectPtr getChild(int which) override
Definition: EqualsLambda.h:341
unsigned int getNumInputs() override
Definition: EqualsLambda.h:333
std::shared_ptr< TupleSet > TupleSetPtr
Definition: TupleSet.h:64
#define PDB_COUT
Definition: PDBDebug.h:31
LambdaTree< LeftType > lhs
Definition: EqualsLambda.h:86
std::shared_ptr< ComputeExecutor > ComputeExecutorPtr
static auto hash(const KeyType &k) -> decltype(hash_impl(k, 0))
Definition: PairArray.cc:85
void setInputIndex(int i, unsigned int index)
LambdaTree< RightType > rhs
Definition: EqualsLambda.h:87
std::map< std::string, std::string > getInfo() override
Definition: EqualsLambda.h:325