A platform for high-performance distributed tool and library development written in C++. It can be deployed in two different cluster modes: standalone or distributed. API for v0.5.0, released on June 13, 2018.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
DereferenceLambda.h
Go to the documentation of this file.
1 /*****************************************************************************
2  * *
3  * Copyright 2018 Rice University *
4  * *
5  * Licensed under the Apache License, Version 2.0 (the "License"); *
6  * you may not use this file except in compliance with the License. *
7  * You may obtain a copy of the License at *
8  * *
9  * http://www.apache.org/licenses/LICENSE-2.0 *
10  * *
11  * Unless required by applicable law or agreed to in writing, software *
12  * distributed under the License is distributed on an "AS IS" BASIS, *
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. *
14  * See the License for the specific language governing permissions and *
15  * limitations under the License. *
16  * *
17  *****************************************************************************/
18 
19 #ifndef DEREF_LAM_H
20 #define DEREF_LAM_H
21 
22 #include <vector>
23 #include "mustache.h"
24 #include "GenericLambdaObject.h"
25 #include "ComputeExecutor.h"
26 #include "SimpleComputeExecutor.h"
27 #include "TupleSetMachine.h"
28 #include "TupleSet.h"
29 #include "Ptr.h"
30 
31 namespace pdb {
32 
33 template<class OutType>
34 class DereferenceLambda : public TypedLambdaObject<OutType> {
35 
36  public:
38 
39  public:
40  unsigned int getInputIndex(int i) override {
41 
42  return input.getInputIndex(i);
43  }
44 
46 
47  unsigned int getNumInputs() override {
48  return 1;
49  }
50 
51  std::string getTypeOfLambda() override {
52  return std::string("deref");
53  }
54 
55  std::string toTCAPString(std::vector<std::string> &inputTupleSetNames,
56  std::vector<std::string> &inputColumnNames,
57  std::vector<std::string> &inputColumnsToApply,
58  std::vector<std::string> &childrenLambdaNames,
59  int lambdaLabel,
60  std::string computationName,
61  int computationLabel,
62  std::string &outputTupleSetName,
63  std::vector<std::string> &outputColumns,
64  std::string &outputColumnName,
65  std::string &lambdaName,
66  MultiInputsBase *multiInputsComp = nullptr,
67  bool amIPartOfJoinPredicate = false,
68  bool amILeftChildOfEqualLambda = false,
69  bool amIRightChildOfEqualLambda = false,
70  std::string parentLambdaName = "",
71  bool isSelfJoin = false) override {
72 
73  // create the data for the lambda
74  mustache::data lambdaData;
75  lambdaData.set("typeOfLambda", getTypeOfLambda());
76  lambdaData.set("lambdaLabel", std::to_string(lambdaLabel));
77  lambdaData.set("computationName", computationName);
78  lambdaData.set("computationLabel", std::to_string(computationLabel));
79 
80  // create the lambda name
81  mustache::mustache lambdaNameTemplate{"{{typeOfLambda}}_{{lambdaLabel}}"};
82  lambdaName = lambdaNameTemplate.render(lambdaData);
83 
84  std::string inputTupleSetName;
85  std::string tupleSetMidTag;
86  int index;
87 
88  if (multiInputsComp == nullptr) {
89  tupleSetMidTag = "OutFor";
90  inputTupleSetName = inputTupleSetNames[0];
91  } else {
92  tupleSetMidTag = "ExtractedFor";
93  index = this->getInputIndex(0);
94  inputTupleSetName = multiInputsComp->getTupleSetNameForIthInput(index);
95  inputColumnNames = multiInputsComp->getInputColumnsForIthInput(index);
96  inputColumnsToApply = multiInputsComp->getInputColumnsToApplyForIthInput(index);
97  }
98 
99  // create the data for the lambda
100  lambdaData.set("tupleSetMidTag", tupleSetMidTag);
101 
102  // create the output tuple set name
103  mustache::mustache outputTupleSetNameTemplate{"deref_{{lambdaLabel}}{{tupleSetMidTag}}{{computationName}}{{computationLabel}}"};
104  outputTupleSetName = outputTupleSetNameTemplate.render(lambdaData);
105 
106  // set the output column name
107  outputColumnName = inputColumnsToApply[0];
108  PDB_COUT << "OuputColumnName: " << outputColumnName << "\n";
109 
110  // fill up the output columns and setup the data
111  mustache::data outputColumnsData = mustache::data::type::list;
112  outputColumns.clear();
113  for (const auto &inputColumnName : inputColumnNames) {
114  if (inputColumnName != outputColumnName) {
115  outputColumns.push_back(inputColumnName);
116 
117  // set the data
118  mustache::data columnData;
119  columnData.set("columnName", inputColumnName);
120  columnData.set("isLast", false);
121 
122  // add the data entry
123  outputColumnsData.push_back(columnData);
124  }
125  }
126 
127  // add the output column
128  outputColumns.push_back(outputColumnName);
129 
130  // add the last output column data entry
131  mustache::data lastColumnData;
132  lastColumnData.set("columnName", outputColumnName);
133  lastColumnData.set("isLast", true);
134  outputColumnsData.push_back(lastColumnData);
135 
136  // create the data for the column names
137  mustache::data inputColumnsToApplyData = mustache::data::type::list;
138  for(int i = 0; i < inputColumnsToApply.size(); i++) {
139 
140  // fill in the column data
141  mustache::data columnData;
142  columnData.set("columnName", inputColumnsToApply[i]);
143  columnData.set("isLast", i == inputColumnsToApply.size()-1);
144 
145  // add the data entry
146  inputColumnsToApplyData.push_back(columnData);
147  }
148 
149  // form the input columns to keep
150  std::vector<std::string> inputColumnsToKeep;
151  for (const auto &inputColumnName : inputColumnNames) {
152  if(std::find(inputColumnsToApply.begin(), inputColumnsToApply.end(), inputColumnName) == inputColumnsToApply.end()) {
153  // add the data
154  inputColumnsToKeep.push_back(inputColumnName);
155  }
156  }
157 
158  // fill in the data
159  mustache::data inputColumnsToKeepData = mustache::data::type::list;
160  for(int i = 0; i < inputColumnsToKeep.size(); i++) {
161 
162  // fill in the column data
163  mustache::data columnData;
164  columnData.set("columnName", inputColumnsToKeep[i]);
165  columnData.set("isLast", i == inputColumnsToKeep.size()-1);
166 
167  inputColumnsToKeepData.push_back(columnData);
168  }
169 
170  // fill in the data
171  lambdaData.set("outputTupleSetName", outputTupleSetName);
172  lambdaData.set("outputColumns", outputColumnsData);
173  lambdaData.set("inputTupleSetName", inputTupleSetName);
174  lambdaData.set("inputColumnsToApply", inputColumnsToApplyData);
175  lambdaData.set("inputColumnsToKeep", inputColumnsToKeepData);
176  lambdaData.set("lambdaName", lambdaName);
177 
178  // apply template
179  mustache::mustache ApplyTemplate{"{{outputTupleSetName}}"
180  "({{#outputColumns}}{{columnName}}{{^isLast}}, {{/isLast}}{{/outputColumns}})"
181  " <= APPLY ({{inputTupleSetName}}({{#inputColumnsToApply}}{{columnName}}{{^isLast}}, {{/isLast}}{{/inputColumnsToApply}}), "
182  "{{inputTupleSetName}}({{#inputColumnsToKeep}}{{columnName}}{{^isLast}}, {{/isLast}}{{/inputColumnsToKeep}}), "
183  "'{{computationName}}_{{computationLabel}}', '{{lambdaName}}')\n"};
184 
185  // the tcap string
186  std::string tcapString = ApplyTemplate.render(lambdaData);
187 
188  if (multiInputsComp != nullptr) {
189  if (amILeftChildOfEqualLambda || amIRightChildOfEqualLambda) {
190  inputTupleSetName = outputTupleSetName;
191  inputColumnNames.clear();
192  for (const auto &outputColumn : outputColumns) {
193  // we want to remove the extracted value column from here
194  if (outputColumn != outputColumnName) {
195  inputColumnNames.push_back(outputColumn);
196  }
197  }
198  inputColumnsToApply.clear();
199  inputColumnsToApply.push_back(outputColumnName);
200 
201  std::string hashOperator = amILeftChildOfEqualLambda ? "HASHLEFT" : "HASHRIGHT";
202  outputTupleSetName = outputTupleSetName.append("_hashed");
203  outputColumnName = outputColumnName.append("_hash");
204  outputColumns.clear();
205 
206  for (const auto &inputColumnName : inputColumnNames) {
207  outputColumns.push_back(inputColumnName);
208  }
209  outputColumns.push_back(outputColumnName);
210  std::string computationNameWithLabel = computationName + std::to_string(computationLabel);
211 
212  tcapString += this->getTCAPString(inputTupleSetName,
213  inputColumnNames,
214  inputColumnsToApply,
215  outputTupleSetName,
216  outputColumns,
217  outputColumnName,
218  hashOperator,
219  computationNameWithLabel,
220  parentLambdaName,
221  std::map<std::string, std::string>());
222  }
223  if (!isSelfJoin) {
224  for (unsigned int i = 0; i < multiInputsComp->getNumInputs(); i++) {
225  std::string curInput = multiInputsComp->getNameForIthInput(i);
226  auto iter = std::find(outputColumns.begin(), outputColumns.end(), curInput);
227  if (iter != outputColumns.end()) {
228  multiInputsComp->setTupleSetNameForIthInput(i, outputTupleSetName);
229  multiInputsComp->setInputColumnsForIthInput(i, outputColumns);
230  multiInputsComp->setInputColumnsToApplyForIthInput(i, outputColumnName);
231  }
232  }
233  } else {
234  // only update myIndex
235  multiInputsComp->setTupleSetNameForIthInput(index, outputTupleSetName);
236  multiInputsComp->setInputColumnsForIthInput(index, outputColumns);
237  multiInputsComp->setInputColumnsToApplyForIthInput(index, outputColumnName);
238  }
239  }
240 
241  return tcapString;
242  }
243 
248  std::map<std::string, std::string> getInfo() override {
249 
250  // fill in the info
251  return std::map<std::string, std::string>{
252  std::make_pair ("lambdaType", getTypeOfLambda()),
253  };
254  };
255 
256  int getNumChildren() override {
257  return 1;
258  }
259 
260  GenericLambdaObjectPtr getChild(int which) override {
261  if (which == 0)
262  return input.getPtr();
263  return nullptr;
264  }
265 
267  TupleSpec &attsToOperateOn,
268  TupleSpec &attsToIncludeInOutput) override {
269 
270  // create the output tuple set
271  TupleSetPtr output = std::make_shared<TupleSet>();
272 
273  // create the machine that is going to setup the output tuple set, using the input tuple set
274  TupleSetSetupMachinePtr myMachine =
275  std::make_shared<TupleSetSetupMachine>(inputSchema, attsToIncludeInOutput);
276 
277  // these are the input attributes that we will process
278  std::vector<int> inputAtts = myMachine->match(attsToOperateOn);
279  int firstAtt = inputAtts[0];
280 
281  // this is the output attribute
282  int outAtt = attsToIncludeInOutput.getAtts().size();
283 
284  return std::make_shared<SimpleComputeExecutor>(
285  output,
286  [=](TupleSetPtr input) {
287 
288  // set up the output tuple set
289  myMachine->setup(input, output);
290 
291  // get the columns to operate on
292  std::vector<Ptr<OutType>> &inColumn = input->getColumn<Ptr<OutType>>(firstAtt);
293 
294  // create the output attribute, if needed
295  if (!output->hasColumn(outAtt)) {
296  std::vector<OutType> *outColumn = new std::vector<OutType>;
297  output->addColumn(outAtt, outColumn, true);
298  }
299 
300  // get the output column
301  std::vector<OutType> &outColumn = output->getColumn<OutType>(outAtt);
302 
303  // loop down the columns, setting the output
304  int numTuples = inColumn.size();
305  outColumn.resize(numTuples);
306  for (int i = 0; i < numTuples; i++) {
307  outColumn[i] = *inColumn[i];
308  }
309  return output;
310  },
311 
312  "dereferenceLambda");
313  }
314 };
315 }
316 
317 #endif
std::map< std::string, std::string > getInfo() override
DereferenceLambda(LambdaTree< Ptr< OutType >> &input)
std::vector< std::string > & getAtts()
Definition: TupleSpec.h:60
GenericLambdaObjectPtr getChild(int which) override
std::string getTCAPString(const std::string &inputTupleSetName, const std::vector< std::string > &inputColumnNames, const std::vector< std::string > &inputColumnsToApply, const std::string &outputTupleSetName, const std::vector< std::string > &outputColumns, const std::string &outputColumnName, const std::string &tcapOperation, const std::string &computationNameAndLabel, const std::string &lambdaNameAndLabel, const std::map< std::string, std::string > &info)
std::shared_ptr< TupleSetSetupMachine > TupleSetSetupMachinePtr
LambdaTree< Ptr< OutType > > input
std::shared_ptr< GenericLambdaObject > GenericLambdaObjectPtr
std::string toTCAPString(std::vector< std::string > &inputTupleSetNames, std::vector< std::string > &inputColumnNames, std::vector< std::string > &inputColumnsToApply, std::vector< std::string > &childrenLambdaNames, int lambdaLabel, std::string computationName, int computationLabel, std::string &outputTupleSetName, std::vector< std::string > &outputColumns, std::string &outputColumnName, std::string &lambdaName, MultiInputsBase *multiInputsComp=nullptr, bool amIPartOfJoinPredicate=false, bool amILeftChildOfEqualLambda=false, bool amIRightChildOfEqualLambda=false, std::string parentLambdaName="", bool isSelfJoin=false) override
ComputeExecutorPtr getExecutor(TupleSpec &inputSchema, TupleSpec &attsToOperateOn, TupleSpec &attsToIncludeInOutput) override
std::shared_ptr< TupleSet > TupleSetPtr
Definition: TupleSet.h:64
#define PDB_COUT
Definition: PDBDebug.h:31
std::shared_ptr< ComputeExecutor > ComputeExecutorPtr
std::string getTypeOfLambda() override
int getNumChildren() override
unsigned int getNumInputs() override
Definition: Ptr.h:32
unsigned int getInputIndex(int i) override