A platform for high-performance distributed tool and library development written in C++. It can be deployed in two different cluster modes: standalone or distributed. API for v0.5.0, released on June 13, 2018.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
CPlusPlusLambda.h
Go to the documentation of this file.
1 /*****************************************************************************
2  * *
3  * Copyright 2018 Rice University *
4  * *
5  * Licensed under the Apache License, Version 2.0 (the "License"); *
6  * you may not use this file except in compliance with the License. *
7  * You may obtain a copy of the License at *
8  * *
9  * http://www.apache.org/licenses/LICENSE-2.0 *
10  * *
11  * Unless required by applicable law or agreed to in writing, software *
12  * distributed under the License is distributed on an "AS IS" BASIS, *
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. *
14  * See the License for the specific language governing permissions and *
15  * limitations under the License. *
16  * *
17  *****************************************************************************/
18 
19 #ifndef C_PLUS_PLUS_LAM_CC
20 #define C_PLUS_PLUS_LAM_CC
21 
22 #include <memory>
23 #include <iostream>
24 #include <vector>
25 
26 #define CAST(TYPENAME, WHICH) ((*(((std::vector<Handle<TYPENAME>>**)args)[WHICH]))[which])
27 
28 namespace pdb {
29 
30 template <typename F,
31  typename ReturnType,
32  typename ParamOne,
33  typename ParamTwo,
34  typename ParamThree,
35  typename ParamFour,
36  typename ParamFive>
37 typename std::enable_if<
38  !std::is_base_of<Nothing, ParamOne>::value && std::is_base_of<Nothing, ParamTwo>::value &&
39  std::is_base_of<Nothing, ParamThree>::value && std::is_base_of<Nothing, ParamFour>::value &&
40  std::is_base_of<Nothing, ParamFive>::value,
41  void>::type
42 callLambda(F& func, std::vector<ReturnType>& assignToMe, int which, void** args) {
43  assignToMe[which] = func(CAST(ParamOne, 0));
44 }
45 
46 template <typename F,
47  typename ReturnType,
48  typename ParamOne,
49  typename ParamTwo,
50  typename ParamThree,
51  typename ParamFour,
52  typename ParamFive>
53 typename std::enable_if<
54  !std::is_base_of<Nothing, ParamOne>::value && !std::is_base_of<Nothing, ParamTwo>::value &&
55  std::is_base_of<Nothing, ParamThree>::value && std::is_base_of<Nothing, ParamFour>::value &&
56  std::is_base_of<Nothing, ParamFive>::value,
57  void>::type
58 callLambda(F& func, std::vector<ReturnType>& assignToMe, int which, void** args) {
59  assignToMe[which] = func(CAST(ParamOne, 0), CAST(ParamTwo, 1));
60 }
61 
62 template <typename F,
63  typename ReturnType,
64  typename ParamOne,
65  typename ParamTwo,
66  typename ParamThree,
67  typename ParamFour,
68  typename ParamFive>
69 typename std::enable_if<
70  !std::is_base_of<Nothing, ParamOne>::value && !std::is_base_of<Nothing, ParamTwo>::value &&
71  !std::is_base_of<Nothing, ParamThree>::value &&
72  std::is_base_of<Nothing, ParamFour>::value && std::is_base_of<Nothing, ParamFive>::value,
73  void>::type
74 callLambda(F& func, std::vector<ReturnType>& assignToMe, int which, void** args) {
75  assignToMe[which] = func(CAST(ParamOne, 0), CAST(ParamTwo, 1), CAST(ParamThree, 2));
76 }
77 
78 template <typename F,
79  typename ReturnType,
80  typename ParamOne,
81  typename ParamTwo,
82  typename ParamThree,
83  typename ParamFour,
84  typename ParamFive>
85 typename std::enable_if<
86  !std::is_base_of<Nothing, ParamOne>::value && !std::is_base_of<Nothing, ParamTwo>::value &&
87  !std::is_base_of<Nothing, ParamThree>::value &&
88  !std::is_base_of<Nothing, ParamFour>::value && std::is_base_of<Nothing, ParamFive>::value,
89  void>::type
90 callLambda(F& func, std::vector<ReturnType>& assignToMe, int which, void** args) {
91  assignToMe[which] =
92  func(CAST(ParamOne, 0), CAST(ParamTwo, 1), CAST(ParamThree, 2), CAST(ParamFour, 3));
93 }
94 
95 template <typename F,
96  typename ReturnType,
97  typename ParamOne,
98  typename ParamTwo,
99  typename ParamThree,
100  typename ParamFour,
101  typename ParamFive>
102 typename std::enable_if<
103  !std::is_base_of<Nothing, ParamOne>::value && !std::is_base_of<Nothing, ParamTwo>::value &&
104  !std::is_base_of<Nothing, ParamThree>::value &&
105  !std::is_base_of<Nothing, ParamFour>::value && !std::is_base_of<Nothing, ParamFive>::value,
106  void>::type
107 callLambda(F& func, std::vector<ReturnType>& assignToMe, int which, void** args) {
108  assignToMe[which] = func(CAST(ParamOne, 0),
109  CAST(ParamTwo, 1),
110  CAST(ParamThree, 2),
111  CAST(ParamFour, 3),
112  CAST(ParamFive, 4));
113 }
114 
115 template <typename F,
116  typename ReturnType,
117  typename ParamOne = Nothing,
118  typename ParamTwo = Nothing,
119  typename ParamThree = Nothing,
120  typename ParamFour = Nothing,
121  typename ParamFive = Nothing>
122 class CPlusPlusLambda : public TypedLambdaObject<ReturnType> {
123 
124 private:
126  int numInputs = 0;
127 
128 public:
129  // JiaNote: I changed CPlusPlusLambda constructor interface to obtain input information for
130  // query graph analysis.
132  Handle<ParamOne>& input1,
133  Handle<ParamTwo>& input2,
134  Handle<ParamThree>& input3,
135  Handle<ParamFour>& input4,
136  Handle<ParamFive>& input5)
137  : myFunc(arg) {
138 
139  if (getTypeName<ParamOne>() != "pdb::Nothing") {
140  this->numInputs++;
141  this->setInputIndex(0, -((input1.getExactTypeInfoValue() + 1)));
142  }
143  if (getTypeName<ParamTwo>() != "pdb::Nothing") {
144  this->numInputs++;
145  this->setInputIndex(1, -((input2.getExactTypeInfoValue() + 1)));
146  }
147  if (getTypeName<ParamThree>() != "pdb::Nothing") {
148  this->numInputs++;
149  this->setInputIndex(2, -((input3.getExactTypeInfoValue() + 1)));
150  }
151  if (getTypeName<ParamFour>() != "pdb::Nothing") {
152  this->numInputs++;
153  this->setInputIndex(3, -((input4.getExactTypeInfoValue() + 1)));
154  }
155  if (getTypeName<ParamFive>() != "pdb::Nothing") {
156  this->numInputs++;
157  this->setInputIndex(4, -((input5.getExactTypeInfoValue() + 1)));
158  }
159  }
160 
161 
162  unsigned int getNumInputs() override {
163  return this->numInputs;
164  }
165 
166  std::string getTypeOfLambda() override {
167  return std::string("native_lambda");
168  }
169 
170  GenericLambdaObjectPtr getChild(int which) override {
171  return nullptr;
172  }
173 
174  int getNumChildren() override {
175  return 0;
176  }
177 
179 
180 
182  TupleSpec& attsToOperateOn,
183  TupleSpec& attsToIncludeInOutput) override {
184 
185  // create the output tuple set
186  TupleSetPtr output = std::make_shared<TupleSet>();
187 
188  // create the machine that is going to setup the output tuple set, using the input tuple set
189  TupleSetSetupMachinePtr myMachine =
190  std::make_shared<TupleSetSetupMachine>(inputSchema, attsToIncludeInOutput);
191 
192  // this is the list of input attributes that we need to match on
193  std::vector<int> matches = myMachine->match(attsToOperateOn);
194 
195  // fix this!! Use a smart pointer
196  std::shared_ptr<std::vector<void*>> inputAtts = std::make_shared<std::vector<void*>>();
197  for (int i = 0; i < matches.size(); i++) {
198  inputAtts->push_back(nullptr);
199  }
200 
201  // this is the output attribute
202  int outAtt = attsToIncludeInOutput.getAtts().size();
203 
204  return std::make_shared<SimpleComputeExecutor>(
205  output,
206  [=](TupleSetPtr input) {
207 
208  // set up the output tuple set
209  myMachine->setup(input, output);
210 
211  // get the columns to operate on
212  int numAtts = matches.size();
213  void** inAtts = inputAtts->data();
214  for (int i = 0; i < numAtts; i++) {
215  inAtts[i] = &(input->getColumn<int>(matches[i]));
216  }
217 
218  // setup the output column, if it is not already set up
219  if (!output->hasColumn(outAtt)) {
220  std::vector<ReturnType>* outputCol = new std::vector<ReturnType>;
221  output->addColumn(outAtt, outputCol, true);
222  }
223 
224  // get the output column
225  std::vector<ReturnType>& outColumn = output->getColumn<ReturnType>(outAtt);
226 
227  // loop down the columns, setting the output
228  int numTuples = ((std::vector<Handle<ParamOne>>*)inAtts[0])->size();
229  outColumn.resize(numTuples);
230  for (int i = 0; i < numTuples; i++) {
231  callLambda<F, ReturnType, ParamOne, ParamTwo, ParamThree, ParamFour, ParamFive>(
232  myFunc, outColumn, i, inAtts);
233  }
234 
235  return output;
236  },
237  "nativeLambda");
238  }
239 
240  // JiaNote: we need this to generate TCAP for a cartesian join
241  std::string toTCAPStringForCartesianJoin(int lambdaLabel,
242  std::string computationName,
243  int computationLabel,
244  std::string& outputTupleSetName,
245  std::vector<std::string>& outputColumns,
246  std::string& outputColumnName,
247  std::string& myLambdaName,
248  MultiInputsBase* multiInputsComp) override {
249 
250  std::string tcapString = "";
251  myLambdaName = "native_lambda_" + std::to_string(lambdaLabel);
252  std::string myComputationName = computationName + "_" + std::to_string(computationLabel);
253  if (multiInputsComp == nullptr) {
254  return tcapString;
255  }
256 
257 
258  // Step 1. for each input get its current tupleset name;
259  unsigned int numInputs = this->getNumInputs();
260  std::vector<std::string> inputTupleSetNames;
261  std::map<std::string, std::vector<unsigned int>> inputPartitions;
262  std::vector<std::vector<std::string>> inputColumnNames;
263  std::vector<std::vector<std::string>> inputColumnsToApply;
264  for (unsigned int i = 0; i < numInputs; i++) {
265  unsigned int index = this->getInputIndex(i);
266  std::string curTupleSetName = multiInputsComp->getTupleSetNameForIthInput(index);
267  auto iter =
268  std::find(inputTupleSetNames.begin(), inputTupleSetNames.end(), curTupleSetName);
269  if (iter == inputTupleSetNames.end()) {
270  inputTupleSetNames.push_back(curTupleSetName);
271  inputColumnNames.push_back(multiInputsComp->getInputColumnsForIthInput(index));
272  inputColumnsToApply.push_back(
273  multiInputsComp->getInputColumnsToApplyForIthInput(index));
274  }
275  inputPartitions[curTupleSetName].push_back(index);
276  }
277 
278  for (auto curTupleSetName : inputTupleSetNames) {
279  std::vector<unsigned int> curVec = inputPartitions[curTupleSetName];
280  }
281 
282 
283  std::string curLeftTupleSetName;
284  std::vector<std::string> curLeftColumnsToKeep;
285  std::string curLeftHashColumnName;
286  std::vector<unsigned int> curLeftIndexes;
287  // Step 2. if there are more than one input tuplesets, we need to do cartesian join for all
288  // of them
289  if (inputTupleSetNames.size() > 1) {
290  for (unsigned int i = 0; i < inputTupleSetNames.size() - 1; i++) {
291  if (i == 0) {
292  // HashOne for the 0-th tupleset
293  std::string curLeftInputTupleSetName = inputTupleSetNames[0];
294  curLeftIndexes = inputPartitions[curLeftInputTupleSetName];
295  curLeftColumnsToKeep = inputColumnNames[0];
296  std::vector<std::string> curInputColumnsToApply = inputColumnsToApply[0];
297  std::string curPrefix = "hashOneFor_" + std::to_string(computationLabel) + "_" +
298  std::to_string(lambdaLabel);
299  if (curLeftIndexes.size() > 1) {
300  curPrefix += "Joined";
301  }
302  curLeftTupleSetName =
303  curPrefix + "_" + multiInputsComp->getNameForIthInput(curLeftIndexes[0]);
304  for (unsigned int j = 1; j < curLeftIndexes.size(); j++) {
305  curLeftTupleSetName +=
306  "_" + multiInputsComp->getNameForIthInput(curLeftIndexes[j]);
307  }
308  curLeftHashColumnName = "OneFor_0_" + std::to_string(computationLabel) + "_" +
309  std::to_string(lambdaLabel);
310  std::vector<std::string> curOutputColumnNames;
311  for (const auto &j : curLeftColumnsToKeep) {
312  curOutputColumnNames.push_back(j);
313  }
314  curOutputColumnNames.push_back(curLeftHashColumnName);
315  tcapString += this->getTCAPString(curLeftInputTupleSetName,
316  curLeftColumnsToKeep,
317  curInputColumnsToApply,
318  curLeftTupleSetName,
319  curOutputColumnNames,
320  curLeftHashColumnName,
321  "HASHONE",
322  myComputationName,
323  "",
324  std::map<std::string, std::string>());
325  }
326 
327  // HashOne for the (i+1)-th table
328  std::string curInputTupleSetName = inputTupleSetNames[i + 1];
329  std::vector<std::string> curInputColumnNames = inputColumnNames[i + 1];
330  std::vector<std::string> curInputColumnsToApply = inputColumnsToApply[i + 1];
331  std::vector<unsigned int> curIndexes = inputPartitions[curInputTupleSetName];
332  std::string curPrefix = "hashOneFor_" + std::to_string(computationLabel) + "_" +
333  std::to_string(lambdaLabel);
334  if (curIndexes.size() > 1) {
335  curPrefix += "Joined";
336  }
337  std::string curOutputTupleSetName =
338  curPrefix + "_" + multiInputsComp->getNameForIthInput(curIndexes[0]);
339  for (unsigned int j = 1; j < curIndexes.size(); j++) {
340  curOutputTupleSetName +=
341  "_" + multiInputsComp->getNameForIthInput(curIndexes[j]);
342  }
343  std::string curOutputColumnName = "OneFor_" + std::to_string(i + 1) + "_" +
344  std::to_string(computationLabel) + "_" + std::to_string(lambdaLabel);
345  std::vector<std::string> curOutputColumnNames;
346  for (const auto &curInputColumnName : curInputColumnNames) {
347  curOutputColumnNames.push_back(curInputColumnName);
348  }
349  curOutputColumnNames.push_back(curOutputColumnName);
350  tcapString += this->getTCAPString(curInputTupleSetName,
351  curInputColumnNames,
352  curInputColumnsToApply,
353  curOutputTupleSetName,
354  curOutputColumnNames,
355  curOutputColumnName,
356  "HASHONE",
357  myComputationName,
358  "",
359  std::map<std::string, std::string>());
360 
361 
362  // Join the two tables
363  tcapString += "\n/* CartesianJoin ( " +
364  multiInputsComp->getNameForIthInput(curLeftIndexes[0]);
365  std::string outputTupleSetName =
366  "CartesianJoined__" + multiInputsComp->getNameForIthInput(curLeftIndexes[0]);
367  for (unsigned int j = 1; j < curLeftIndexes.size(); j++) {
368  outputTupleSetName +=
369  "_" + multiInputsComp->getNameForIthInput(curLeftIndexes[j]);
370  tcapString += " " + multiInputsComp->getNameForIthInput(curLeftIndexes[j]);
371  }
372  outputTupleSetName += "___" + multiInputsComp->getNameForIthInput(curIndexes[0]);
373  tcapString += " ) and ( " + multiInputsComp->getNameForIthInput(curIndexes[0]);
374  for (unsigned int j = 1; j < curIndexes.size(); j++) {
375  outputTupleSetName += "_" + multiInputsComp->getNameForIthInput(curIndexes[j]);
376  tcapString += " " + multiInputsComp->getNameForIthInput(curIndexes[j]);
377  }
378  outputTupleSetName += "_";
379  tcapString += " ) */\n";
380  // push down projection here
381  tcapString += outputTupleSetName + "(" + curLeftColumnsToKeep[0];
382  for (unsigned int j = 1; j < curLeftColumnsToKeep.size(); j++) {
383  tcapString += ", " + curLeftColumnsToKeep[j];
384  }
385  for (unsigned int j = 0; j < curInputColumnNames.size(); j++) {
386  tcapString += ", " + curInputColumnNames[j];
387  }
388  if (i + 1 < inputTupleSetNames.size() - 1) {
389  tcapString += ", " + curOutputColumnName;
390  }
391  tcapString +=
392  ") <= JOIN (" + curLeftTupleSetName + "(" + curLeftHashColumnName + "), ";
393  tcapString += curLeftTupleSetName + "(" + curLeftColumnsToKeep[0];
394  for (unsigned int j = 1; j < curLeftColumnsToKeep.size(); j++) {
395  tcapString += ", " + curLeftColumnsToKeep[j];
396  }
397  tcapString += "), ";
398  tcapString += curOutputTupleSetName + "(" + curOutputColumnName + "), ";
399  tcapString += curOutputTupleSetName + "(" + curInputColumnNames[0];
400  for (unsigned int j = 1; j < curInputColumnNames.size(); j++) {
401  tcapString += ", " + curInputColumnNames[j];
402  }
403  if (i + 1 < inputTupleSetNames.size() - 1) {
404  tcapString += ", " + curOutputColumnName;
405  }
406  tcapString += "), '" + myComputationName + "')\n";
407 
408 
409  // update counters
410  curLeftTupleSetName = outputTupleSetName;
411  for (unsigned int j = 0; j < curInputColumnNames.size(); j++) {
412  curLeftColumnsToKeep.push_back(curInputColumnNames[j]);
413  }
414  for (unsigned int j = 0; j < curIndexes.size(); j++) {
415  curLeftIndexes.push_back(curIndexes[j]);
416  }
417  curLeftHashColumnName = curOutputColumnName;
418  }
419  } else {
420 
421  curLeftTupleSetName = inputTupleSetNames[0];
422  curLeftColumnsToKeep = inputColumnNames[0];
423  }
424 
425  // Step 3. do an apply to add a boolean column
426  std::vector<std::string> curInputColumnsToApply;
427  for (int i = 0; i < numInputs; i++) {
428  unsigned int index = this->getInputIndex(i);
429  curInputColumnsToApply.push_back(multiInputsComp->getNameForIthInput(index));
430  }
431  std::string curOutputTupleSetName = "nativOutFor_" + myLambdaName + "_" + myComputationName;
432  std::string curOutputColumnName =
433  "nativOut_" + std::to_string(lambdaLabel) + "_" + std::to_string(computationLabel);
434  std::vector<std::string> curOutputColumnNames;
435  for (const auto &i : curLeftColumnsToKeep) {
436  curOutputColumnNames.push_back(i);
437  }
438  curOutputColumnNames.push_back(curOutputColumnName);
439 
440  tcapString += this->getTCAPString(curLeftTupleSetName,
441  curLeftColumnsToKeep,
442  curInputColumnsToApply,
443  curOutputTupleSetName,
444  curOutputColumnNames,
445  curOutputColumnName,
446  "APPLY",
447  myComputationName,
448  myLambdaName,
449  getInfo());
450 
451  // Step 4. do a filter to remove false rows
452  outputColumns.clear();
453  outputTupleSetName = "filtedOutFor_" + myLambdaName + "_" + myComputationName;
454  tcapString += outputTupleSetName + "(" + curLeftColumnsToKeep[0];
455  outputColumns.push_back(curLeftColumnsToKeep[0]);
456  for (unsigned int i = 1; i < curLeftColumnsToKeep.size(); i++) {
457  tcapString += ", " + curLeftColumnsToKeep[i];
458  outputColumns.push_back(curLeftColumnsToKeep[0]);
459  }
460  tcapString += ") <= FILTER (" + curOutputTupleSetName + "(" + curOutputColumnName + "), ";
461  tcapString += curOutputTupleSetName + "(" + curLeftColumnsToKeep[0];
462  for (unsigned int i = 1; i < curLeftColumnsToKeep.size(); i++) {
463  tcapString += ", " + curLeftColumnsToKeep[i];
464  }
465  tcapString += "), '" + myComputationName + "')\n";
466 
467  // Step 5. update tupleset names, columns and columns to apply in multiInputsComp
468  for (unsigned int i = 0; i < multiInputsComp->getNumInputs(); i++) {
469  std::string curInput = multiInputsComp->getNameForIthInput(i);
470  auto iter =
471  std::find(curLeftColumnsToKeep.begin(), curLeftColumnsToKeep.end(), curInput);
472  if (iter != curLeftColumnsToKeep.end()) {
473  multiInputsComp->setTupleSetNameForIthInput(i, outputTupleSetName);
474  multiInputsComp->setInputColumnsForIthInput(i, curLeftColumnsToKeep);
475  multiInputsComp->setInputColumnsToApplyForIthInput(i, curInputColumnsToApply);
476  }
477  }
478  return tcapString;
479  }
480 
485  std::map<std::string, std::string> getInfo() override {
486 
487  // fill in the info
488  return std::map<std::string, std::string>{
489 
490  std::make_pair ("lambdaType", getTypeOfLambda()),
491  };
492  };
493 };
494 }
495 
496 #endif
int getNumChildren() override
#define CAST(TYPENAME, WHICH)
void setInputColumnsForIthInput(int i, std::vector< std::string > &columns)
std::enable_if< !std::is_base_of< Nothing, ParamOne >::value &&std::is_base_of< Nothing, ParamTwo >::value &&std::is_base_of< Nothing, ParamThree >::value &&std::is_base_of< Nothing, ParamFour >::value &&std::is_base_of< Nothing, ParamFive >::value, void >::type callLambda(F &func, std::vector< ReturnType > &assignToMe, int which, void **args)
std::vector< std::string > & getAtts()
Definition: TupleSpec.h:60
std::string getTupleSetNameForIthInput(int i)
GenericLambdaObjectPtr getChild(int which) override
void setInputColumnsToApplyForIthInput(int i, std::vector< std::string > &columnsToApply)
std::string getTCAPString(const std::string &inputTupleSetName, const std::vector< std::string > &inputColumnNames, const std::vector< std::string > &inputColumnsToApply, const std::string &outputTupleSetName, const std::vector< std::string > &outputColumns, const std::string &outputColumnName, const std::string &tcapOperation, const std::string &computationNameAndLabel, const std::string &lambdaNameAndLabel, const std::map< std::string, std::string > &info)
std::shared_ptr< TupleSetSetupMachine > TupleSetSetupMachinePtr
void setTupleSetNameForIthInput(int i, std::string name)
std::vector< std::string > getInputColumnsForIthInput(int i)
std::shared_ptr< GenericLambdaObject > GenericLambdaObjectPtr
CPlusPlusLambda(F arg, Handle< ParamOne > &input1, Handle< ParamTwo > &input2, Handle< ParamThree > &input3, Handle< ParamFour > &input4, Handle< ParamFive > &input5)
std::shared_ptr< TupleSet > TupleSetPtr
Definition: TupleSet.h:64
ComputeExecutorPtr getExecutor(TupleSpec &inputSchema, TupleSpec &attsToOperateOn, TupleSpec &attsToIncludeInOutput) override
std::string toTCAPStringForCartesianJoin(int lambdaLabel, std::string computationName, int computationLabel, std::string &outputTupleSetName, std::vector< std::string > &outputColumns, std::string &outputColumnName, std::string &myLambdaName, MultiInputsBase *multiInputsComp) override
virtual unsigned int getInputIndex(int i)
std::shared_ptr< ComputeExecutor > ComputeExecutorPtr
unsigned int getNumInputs() override
void setInputIndex(int i, unsigned int index)
std::vector< std::string > getInputColumnsToApplyForIthInput(int i)
std::map< std::string, std::string > getInfo() override
std::string getNameForIthInput(int i)
std::string getTypeOfLambda() override