A platform for high-performance distributed tool and library development written in C++. It can be deployed in two different cluster modes: standalone or distributed. API for v0.5.0, released on June 13, 2018.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
MethodCallLambda.h
Go to the documentation of this file.
1 /*****************************************************************************
2  * *
3  * Copyright 2018 Rice University *
4  * *
5  * Licensed under the Apache License, Version 2.0 (the "License"); *
6  * you may not use this file except in compliance with the License. *
7  * You may obtain a copy of the License at *
8  * *
9  * http://www.apache.org/licenses/LICENSE-2.0 *
10  * *
11  * Unless required by applicable law or agreed to in writing, software *
12  * distributed under the License is distributed on an "AS IS" BASIS, *
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. *
14  * See the License for the specific language governing permissions and *
15  * limitations under the License. *
16  * *
17  *****************************************************************************/
18 
19 #ifndef METHOD_CALL_LAM_H
20 #define METHOD_CALL_LAM_H
21 
22 #include <vector>
23 #include "Lambda.h"
24 #include "ComputeExecutor.h"
25 #include "mustache.h"
26 
27 namespace pdb {
28 
29 template<class Out, class ClassType>
30 class MethodCallLambda : public TypedLambdaObject<Out> {
31 
32  public:
33  std::function<ComputeExecutorPtr(TupleSpec &, TupleSpec &, TupleSpec &)> getExecutorFunc;
34  std::function<bool(std::string &, TupleSetPtr, int)> columnBuilder;
35  std::string inputTypeName;
36  std::string methodName;
37  std::string returnTypeName;
38 
39  public:
40  // create an att access lambda; offset is the position in the input object where we are going to
41  // find the input att
43  std::string inputTypeName,
44  std::string methodName,
45  std::string returnTypeName,
46  Handle<ClassType> &input,
47  std::function<bool(std::string &, TupleSetPtr, int)> columnBuilder,
51  inputTypeName(inputTypeName),
52  methodName(methodName),
53  returnTypeName(returnTypeName) {
54 
55  PDB_COUT << "MethodCallLambda: input type code is " << input.getExactTypeInfoValue()
56  << std::endl;
57  this->setInputIndex(0, -(input.getExactTypeInfoValue() + 1));
58  }
59 
60  /* bool addColumnToTupleSet (std :: string &typeToMatch, TupleSetPtr addToMe, int posToAddTo)
61  override {
62  return columnBuilder (typeToMatch, addToMe, posToAddTo);
63  } */
64 
65  std::string getTypeOfLambda() override {
66  return std::string("methodCall");
67  }
68 
69  std::string whichMethodWeCall() {
70  return methodName;
71  }
72 
73  unsigned int getNumInputs() override {
74  return 1;
75  }
76 
77  std::string getInputType() {
78  return inputTypeName;
79  }
80 
81  std::string getOutputType() override {
82  return returnTypeName;
83  }
84 
85  std::string toTCAPString(std::vector<std::string> &inputTupleSetNames,
86  std::vector<std::string> &inputColumnNames,
87  std::vector<std::string> &inputColumnsToApply,
88  std::vector<std::string> &childrenLambdaNames,
89  int lambdaLabel,
90  std::string computationName,
91  int computationLabel,
92  std::string &outputTupleSetName,
93  std::vector<std::string> &outputColumns,
94  std::string &outputColumnName,
95  std::string &lambdaName,
96  MultiInputsBase *multiInputsComp = nullptr,
97  bool amIPartOfJoinPredicate = false,
98  bool amILeftChildOfEqualLambda = false,
99  bool amIRightChildOfEqualLambda = false,
100  std::string parentLambdaName = "",
101  bool isSelfJoin = false) override {
102 
103 
104 
105  // create the data for the lambda
106  mustache::data lambdaData;
107  lambdaData.set("computationName", computationName);
108  lambdaData.set("computationLabel", std::to_string(computationLabel));
109  lambdaData.set("typeOfLambda", getTypeOfLambda());
110  lambdaData.set("lambdaLabel", std::to_string(lambdaLabel));
111 
112  // create the computation name with label
113  mustache::mustache computationNameWithLabelTemplate{"{{computationName}}_{{computationLabel}}"};
114  std::string computationNameWithLabel = computationNameWithLabelTemplate.render(lambdaData);
115 
116  // create the lambda name
117  mustache::mustache lambdaNameTemplate{"{{typeOfLambda}}_{{lambdaLabel}}"};
118  lambdaName = lambdaNameTemplate.render(lambdaData);
119 
120  // things we need to figure out in the next step
121  int myIndex = -1;
122  std::string inputTupleSetName;
123  std::string tupleSetMidTag;
124  std::string originalInputColumnToApply;
125 
126  if (multiInputsComp == nullptr) {
127  tupleSetMidTag = "OutFor_";
128  inputTupleSetName = inputTupleSetNames[0];
129  } else {
130  tupleSetMidTag = "ExtractedFor_";
131  myIndex = this->getInputIndex(0);
132  PDB_COUT << lambdaName << ": myIndex=" << myIndex << std::endl;
133  inputTupleSetName = multiInputsComp->getTupleSetNameForIthInput(myIndex);
134  PDB_COUT << "inputTupleSetName=" << inputTupleSetName << std::endl;
135  inputColumnNames = multiInputsComp->getInputColumnsForIthInput(myIndex);
136  inputColumnsToApply.clear();
137  inputColumnsToApply.push_back(multiInputsComp->getNameForIthInput(myIndex));
138  originalInputColumnToApply = multiInputsComp->getNameForIthInput(myIndex);
139  PDB_COUT << "originalInputColumnToApply=" << originalInputColumnToApply << std::endl;
140  }
141 
142  // set the lambda data
143  lambdaData.set("tupleSetMidTag", tupleSetMidTag);
144  lambdaData.set("methodName", methodName);
145 
146  // create the output tuple set name
147  mustache::mustache outputTupleSetNameTemplate
148  {"methodCall_{{lambdaLabel}}{{tupleSetMidTag}}{{computationName}}{{computationLabel}}"};
149  outputTupleSetName = outputTupleSetNameTemplate.render(lambdaData);
150 
151  // create the output column name
152  mustache::mustache outputColumnNameTemplate{"methodCall_{{lambdaLabel}}{{tupleSetMidTag}}_{{methodName}}"};
153  outputColumnName = outputColumnNameTemplate.render(lambdaData);
154 
155  // initialize the output columns
156  outputColumns.clear();
157  for (const auto &inputColumnName : inputColumnNames) {
158  outputColumns.push_back(inputColumnName);
159  }
160  outputColumns.push_back(outputColumnName);
161 
162  // the additional info about this attribute access lambda
163  std::map<std::string, std::string> info;
164 
165  // generate the TCAP string for the lambda
166  std::string tcapString;
167  tcapString += this->getTCAPString(inputTupleSetName,
168  inputColumnNames,
169  inputColumnsToApply,
170  outputTupleSetName,
171  outputColumns,
172  outputColumnName,
173  "APPLY",
174  computationNameWithLabel,
175  lambdaName,
176  getInfo());
177 
178  // is a multi input computation just return the tcapString
179  if (multiInputsComp == nullptr) {
180  return tcapString;
181  }
182 
183  if (amILeftChildOfEqualLambda || amIRightChildOfEqualLambda) {
184  inputTupleSetName = outputTupleSetName;
185  inputColumnNames.clear();
186  for (const auto &outputColumn : outputColumns) {
187  // we want to remove the extracted value column from here
188  if (outputColumn != outputColumnName) {
189  inputColumnNames.push_back(outputColumn);
190  }
191  }
192  inputColumnsToApply.clear();
193  inputColumnsToApply.push_back(outputColumnName);
194 
195  std::string hashOperator = amILeftChildOfEqualLambda ? "HASHLEFT" : "HASHRIGHT";
196  outputTupleSetName = outputTupleSetName + "_hashed";
197  outputColumnName = outputColumnName + "_hash";
198  outputColumns.clear();
199 
200  std::copy(inputColumnNames.begin(), inputColumnNames.end(), std::back_inserter(outputColumns));
201  outputColumns.push_back(outputColumnName);;
202 
203  tcapString += this->getTCAPString(inputTupleSetName,
204  inputColumnNames,
205  inputColumnsToApply,
206  outputTupleSetName,
207  outputColumns,
208  outputColumnName,
209  hashOperator,
210  computationNameWithLabel,
211  parentLambdaName,
212  std::map<std::string, std::string>());
213  }
214  if (!isSelfJoin) {
215  for (unsigned int index = 0; index < multiInputsComp->getNumInputs(); index++) {
216  std::string curInput = multiInputsComp->getNameForIthInput(index);
217  auto iter = std::find(outputColumns.begin(), outputColumns.end(), curInput);
218  if (iter != outputColumns.end()) {
219  PDB_COUT << "MultiInputBase for index=" << index << " is updated" << std::endl;
220  multiInputsComp->setTupleSetNameForIthInput(index, outputTupleSetName);
221  multiInputsComp->setInputColumnsForIthInput(index, outputColumns);
222  multiInputsComp->setInputColumnsToApplyForIthInput(index, outputColumnName);
223  }
224  if (originalInputColumnToApply == curInput) {
225  PDB_COUT << "MultiInputBase for index=" << index << " is updated" << std::endl;
226  multiInputsComp->setTupleSetNameForIthInput(index, outputTupleSetName);
227  multiInputsComp->setInputColumnsForIthInput(index, outputColumns);
228  multiInputsComp->setInputColumnsToApplyForIthInput(index, outputColumnName);
229  }
230  }
231  } else {
232  // only update myIndex, I am a self-join
233  multiInputsComp->setTupleSetNameForIthInput(myIndex, outputTupleSetName);
234  multiInputsComp->setInputColumnsForIthInput(myIndex, outputColumns);
235  multiInputsComp->setInputColumnsToApplyForIthInput(myIndex, outputColumnName);
236  }
237 
238  return tcapString;
239  }
245  std::map<std::string, std::string> getInfo() override {
246 
247  // fill in the info
248  return std::map<std::string, std::string>{
249  std::make_pair ("lambdaType", getTypeOfLambda()),
250  std::make_pair ("inputTypeName", inputTypeName),
251  std::make_pair ("methodName", methodName),
252  std::make_pair ("returnTypeName", returnTypeName)
253  };
254  };
255 
256  int getNumChildren() override {
257  return 0;
258  }
259 
260  GenericLambdaObjectPtr getChild(int which) override {
261  return nullptr;
262  }
263 
265  TupleSpec &attsToOperateOn,
266  TupleSpec &attsToIncludeInOutput) override {
267  return getExecutorFunc(inputSchema, attsToOperateOn, attsToIncludeInOutput);
268  }
269 };
270 }
271 
272 #endif
std::string getTCAPString(const std::string &inputTupleSetName, const std::vector< std::string > &inputColumnNames, const std::vector< std::string > &inputColumnsToApply, const std::string &outputTupleSetName, const std::vector< std::string > &outputColumns, const std::string &outputColumnName, const std::string &tcapOperation, const std::string &computationNameAndLabel, const std::string &lambdaNameAndLabel, const std::map< std::string, std::string > &info)
std::function< bool(std::string &, TupleSetPtr, int)> columnBuilder
std::function< ComputeExecutorPtr(TupleSpec &, TupleSpec &, TupleSpec &)> getExecutorFunc
std::shared_ptr< GenericLambdaObject > GenericLambdaObjectPtr
std::string getOutputType() override
std::shared_ptr< TupleSet > TupleSetPtr
Definition: TupleSet.h:64
GenericLambdaObjectPtr getChild(int which) override
#define PDB_COUT
Definition: PDBDebug.h:31
virtual unsigned int getInputIndex(int i)
std::shared_ptr< ComputeExecutor > ComputeExecutorPtr
std::map< std::string, std::string > getInfo() override
std::string whichMethodWeCall()
unsigned int getNumInputs() override
int getNumChildren() override
std::string getInputType()
void setInputIndex(int i, unsigned int index)
ComputeExecutorPtr getExecutor(TupleSpec &inputSchema, TupleSpec &attsToOperateOn, TupleSpec &attsToIncludeInOutput) override
MethodCallLambda(std::string inputTypeName, std::string methodName, std::string returnTypeName, Handle< ClassType > &input, std::function< bool(std::string &, TupleSetPtr, int)> columnBuilder, std::function< ComputeExecutorPtr(TupleSpec &, TupleSpec &, TupleSpec &)> getExecutorFunc)
std::string getTypeOfLambda() override
std::string toTCAPString(std::vector< std::string > &inputTupleSetNames, std::vector< std::string > &inputColumnNames, std::vector< std::string > &inputColumnsToApply, std::vector< std::string > &childrenLambdaNames, int lambdaLabel, std::string computationName, int computationLabel, std::string &outputTupleSetName, std::vector< std::string > &outputColumns, std::string &outputColumnName, std::string &lambdaName, MultiInputsBase *multiInputsComp=nullptr, bool amIPartOfJoinPredicate=false, bool amILeftChildOfEqualLambda=false, bool amIRightChildOfEqualLambda=false, std::string parentLambdaName="", bool isSelfJoin=false) override