A platform for high-performance distributed tool and library development written in C++. It can be deployed in two different cluster modes: standalone or distributed. API for v0.5.0, released on June 13, 2018.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
MultiSelectionCompBase.h
Go to the documentation of this file.
1 /*****************************************************************************
2  * *
3  * Copyright 2018 Rice University *
4  * *
5  * Licensed under the Apache License, Version 2.0 (the "License"); *
6  * you may not use this file except in compliance with the License. *
7  * You may obtain a copy of the License at *
8  * *
9  * http://www.apache.org/licenses/LICENSE-2.0 *
10  * *
11  * Unless required by applicable law or agreed to in writing, software *
12  * distributed under the License is distributed on an "AS IS" BASIS, *
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. *
14  * See the License for the specific language governing permissions and *
15  * limitations under the License. *
16  * *
17  *****************************************************************************/
18 
19 #ifndef PDB_MULTISELECTIONCOMPBASE_H
20 #define PDB_MULTISELECTIONCOMPBASE_H
21 
22 #include "Computation.h"
23 #include "ComputePlan.h"
24 #include "VectorSink.h"
25 #include "ScanUserSet.h"
26 #include "TypeName.h"
27 
28 namespace pdb {
29 
35 template<class OutputClass, class InputClass>
37 
38  public:
39 
46 
53 
58  void extractLambdas(std::map<std::string, GenericLambdaObjectPtr> &returnVal) override {
59  int suffix = 0;
60  Handle<InputClass> checkMe = nullptr;
61  Lambda<bool> selectionLambda = getSelection(checkMe);
62  Lambda<Vector<Handle<OutputClass>>> projectionLambda = getProjection(checkMe);
63  selectionLambda.toMap(returnVal, suffix);
64  projectionLambda.toMap(returnVal, suffix);
65  }
66 
71  std::string getComputationType() override {
72  return std::string("MultiSelectionComp");
73  }
74 
81  }
82 
88  std::string getIthInputType(int i) override {
89  if (i == 0) {
90  return getTypeName<InputClass>();
91  } else {
92  return "";
93  }
94  }
95 
100  int getNumInputs() override {
101  return 1;
102  }
103 
108  std::string getOutputType() override {
109  return getTypeName<OutputClass>();
110  }
111 
121  std::string toTCAPString(std::vector<InputTupleSetSpecifier> &inputTupleSets,
122  int computationLabel,
123  std::string &outputTupleSetName,
124  std::vector<std::string> &outputColumnNames,
125  std::string &addedOutputColumnName) override {
126 
127  if (inputTupleSets.empty()) {
128  return "";
129  }
130 
131  InputTupleSetSpecifier inputTupleSet = inputTupleSets[0];
132  std::vector<std::string> childrenLambdaNames;
133  std::string myLambdaName;
134  return toTCAPString(inputTupleSet.getTupleSetName(),
135  inputTupleSet.getColumnNamesToKeep(),
136  inputTupleSet.getColumnNamesToApply(),
137  childrenLambdaNames,
138  computationLabel,
140  outputColumnNames,
141  addedOutputColumnName,
142  myLambdaName);
143  }
144 
145  // to return Selection tcap string
146  std::string toTCAPString(std::string inputTupleSetName,
147  std::vector<std::string> &inputColumnNames,
148  std::vector<std::string> &inputColumnsToApply,
149  std::vector<std::string> &childrenLambdaNames,
150  int computationLabel,
151  std::string &outputTupleSetName,
152  std::vector<std::string> &outputColumnNames,
153  std::string &addedOutputColumnName,
154  std::string &myLambdaName) {
155  PDB_COUT << "To GET TCAP STRING FOR SELECTION" << std::endl;
156 
157  Handle<InputClass> checkMe = nullptr;
158  PDB_COUT << "TO GET TCAP STRING FOR SELECTION LAMBDA" << std::endl;
159  Lambda<bool> selectionLambda = getSelection(checkMe);
160  std::string tupleSetName;
161  std::vector<std::string> columnNames;
162  std::string addedColumnName;
163  int lambdaLabel = 0;
164 
165  std::string tcapString;
166  tcapString += "\n/* Apply MultiSelection filtering */\n";
167  tcapString += selectionLambda.toTCAPString(inputTupleSetName,
168  inputColumnNames,
169  inputColumnsToApply,
170  childrenLambdaNames,
171  lambdaLabel,
173  computationLabel,
174  tupleSetName,
175  columnNames,
176  addedColumnName,
177  myLambdaName,
178  false);
179 
180  PDB_COUT << "tcapString after parsing selection lambda: " << tcapString << std::endl;
181  PDB_COUT << "lambdaLabel=" << lambdaLabel << std::endl;
182 
183  // create the data for the column names
184  mustache::data inputColumnData = mustache::data::type::list;
185  for(int i = 0; i < inputColumnNames.size(); i++) {
186 
187  mustache::data columnData;
188 
189  // fill in the column data
190  columnData.set("columnName", inputColumnNames[i]);
191  columnData.set("isLast", i == inputColumnNames.size()-1);
192 
193  inputColumnData.push_back(columnData);
194  }
195 
196  // create the data for the filter
197  mustache::data selectionCompData;
198  selectionCompData.set("computationType", getComputationType());
199  selectionCompData.set("computationLabel", std::to_string(computationLabel));
200  selectionCompData.set("inputColumns", inputColumnData);
201  selectionCompData.set("tupleSetName", tupleSetName);
202  selectionCompData.set("addedColumnName", addedColumnName);
203 
204  // set the new tuple set name
205  mustache::mustache newTupleSetNameTemplate{"filteredInputFor{{computationType}}{{computationLabel}}"};
206  std::string newTupleSetName = newTupleSetNameTemplate.render(selectionCompData);
207 
208  mustache::mustache filterTemplate{"filteredInputFor{{computationType}}{{computationLabel}}"
209  "({{#inputColumns}}{{columnName}}{{^isLast}}, {{/isLast}}{{/inputColumns}}) "
210  "<= FILTER ({{tupleSetName}}({{addedColumnName}}), {{tupleSetName}}"
211  "({{#inputColumns}}{{columnName}}{{^isLast}}, {{/isLast}}{{/inputColumns}}), "
212  "'{{computationType}}_{{computationLabel}}')\n"};
213 
214  tcapString += filterTemplate.render(selectionCompData);
215 
216  PDB_COUT << "tcapString after adding filter operation: " << tcapString << std::endl;
217  PDB_COUT << "TO GET TCAP STRING FOR PROJECTION LAMBDA" << std::endl;
218  PDB_COUT << "lambdaLabel=" << lambdaLabel << std::endl;
219 
220  Lambda<Vector<Handle<OutputClass>>> projectionLambda = getProjection(checkMe);
221  tcapString += "\n/* Apply MultiSelection projection */\n";
222  tcapString += projectionLambda.toTCAPString(newTupleSetName,
223  inputColumnNames,
224  inputColumnsToApply,
225  childrenLambdaNames,
226  lambdaLabel,
228  computationLabel,
229  outputTupleSetName,
230  outputColumnNames,
231  addedOutputColumnName,
232  myLambdaName,
233  true);
234 
235  // add the new data
236  selectionCompData.set("addedOutputColumnName", addedOutputColumnName);
237  selectionCompData.set("computationType", getComputationType());
238  selectionCompData.set("computationLabel", std::to_string(computationLabel));
239  selectionCompData.set("outputTupleSetName", outputTupleSetName);
240 
241 
242  // create the new tuple set name
243  newTupleSetNameTemplate = {"flattenedOutFor{{computationType}}{{computationLabel}}"};
244  newTupleSetName = newTupleSetNameTemplate.render(selectionCompData);
245 
246  // create the new output column name
247  mustache::mustache newOutputColumnNameTemplate = {"flattened_{{addedOutputColumnName}}"};
248  std::string newOutputColumnName = newOutputColumnNameTemplate.render(selectionCompData);
249 
250  // add flatten
251  mustache::mustache flattenTemplate{"flattenedOutFor{{computationType}}{{computationLabel}}(flattened_{{addedOutputColumnName}})"
252  " <= FLATTEN ({{outputTupleSetName}}({{addedOutputColumnName}}), "
253  "{{outputTupleSetName}}(), '{{computationType}}_{{computationLabel}}')\n"};
254  tcapString += flattenTemplate.render(selectionCompData);
255 
256  this->setTraversed(true);
257  this->setOutputTupleSetName(newTupleSetName);
258  outputTupleSetName = newTupleSetName;
259  this->setOutputColumnToApply(newOutputColumnName);
260  addedOutputColumnName = newOutputColumnName;
261  outputColumnNames.clear();
262  outputColumnNames.push_back(addedOutputColumnName);
263 
264  return tcapString;
265  }
266 
267  void setOutput(std::string dbName, std::string setName) override {
268  this->materializeSelectionOut = true;
269  this->outputSetScanner = makeObject<ScanUserSet<OutputClass>>();
270  this->outputSetScanner->setDatabaseName(dbName);
271  this->outputSetScanner->setSetName(setName);
272  }
273 
274  void setBatchSize(int batchSize) override {
275  if (this->outputSetScanner != nullptr) {
276  this->outputSetScanner->setBatchSize(batchSize);
277  }
278  }
279 
280  // to return the database name
281  std::string getDatabaseName() override {
282  return this->outputSetScanner->getDatabaseName();
283  }
284 
285  // to return the set name
286  std::string getSetName() override {
287  return this->outputSetScanner->getSetName();
288  }
289 
290  // source for consumer to read selection output, which has been written to a user set
291  ComputeSourcePtr getComputeSource(TupleSpec &outputScheme, ComputePlan &plan) override {
292 
293  if (this->materializeSelectionOut) {
294  if (this->outputSetScanner != nullptr) {
295  return outputSetScanner->getComputeSource(outputScheme, plan);
296  }
297  }
298  return nullptr;
299  }
300 
301  // sink to write selection output
303  TupleSpec &projection,
304  ComputePlan &plan) override {
305 
306  if (this->materializeSelectionOut) {
307  return std::make_shared<VectorSink<OutputClass>>(consumeMe, projection);
308  }
309  return nullptr;
310  }
311 
312  bool needsMaterializeOutput() override {
314  }
315 
317  return outputSetScanner;
318  }
319 
320  private:
323 };
324 
325 }
326 
327 #endif //PDB_MULTISELECTIONCOMPBASE_H
ComputationTypeID
Definition: Computation.h:39
std::shared_ptr< ComputeSource > ComputeSourcePtr
Definition: ComputeSource.h:26
std::vector< std::string > & getColumnNamesToKeep()
virtual pdb::Lambda< bool > getSelection(pdb::Handle< InputClass > checkMe)=0
Handle< ScanUserSet< OutputClass > > & getOutputSetScanner()
void setBatchSize(int batchSize) override
ComputationTypeID getComputationTypeID() override
void setOutput(std::string dbName, std::string setName) override
void toMap(std::map< std::string, GenericLambdaObjectPtr > &returnVal, int &suffix)
Definition: Lambda.h:73
ComputeSinkPtr getComputeSink(TupleSpec &consumeMe, TupleSpec &projection, ComputePlan &plan) override
std::string getSetName() override
Handle< ScanUserSet< OutputClass > > outputSetScanner
std::string getOutputType() override
std::shared_ptr< ComputeSink > ComputeSinkPtr
Definition: ComputeSink.h:27
void setOutputColumnToApply(std::string outputColumnToApply)
Definition: Computation.h:294
#define PDB_COUT
Definition: PDBDebug.h:31
std::string getComputationType() override
std::string toTCAPString(std::string inputTupleSetName, std::vector< std::string > &inputColumnNames, std::vector< std::string > &inputColumnsToApply, std::vector< std::string > &childrenLambdaNames, int computationLabel, std::string &outputTupleSetName, std::vector< std::string > &outputColumnNames, std::string &addedOutputColumnName, std::string &myLambdaName)
void extractLambdas(std::map< std::string, GenericLambdaObjectPtr > &returnVal) override
std::string getDatabaseName() override
void setTraversed(bool traversed)
Definition: Computation.h:254
std::string getIthInputType(int i) override
std::vector< std::string > & getColumnNamesToApply()
virtual pdb::Lambda< pdb::Vector< pdb::Handle< OutputClass > > > getProjection(pdb::Handle< InputClass > checkMe)=0
void setOutputTupleSetName(std::string outputTupleSetName)
Definition: Computation.h:275
std::string toTCAPString(std::vector< InputTupleSetSpecifier > &inputTupleSets, int computationLabel, std::string &outputTupleSetName, std::vector< std::string > &outputColumnNames, std::string &addedOutputColumnName) override
std::string toTCAPString(std::string inputTupleSetName, std::vector< std::string > &inputColumnNames, std::vector< std::string > &inputColumnsToApply, std::vector< std::string > &childrenLambdaNames, int &lambdaLabel, std::string computationName, int computationLabel, std::string &outputTupleSetName, std::vector< std::string > &outputColumnNames, std::string &addedOutputColumnName, std::string &myLambdaName, bool whetherToRemoveUnusedOutputColumns, MultiInputsBase *multiInputsComp=nullptr, bool amIPartOfJoinPredicate=false)
Definition: Lambda.h:101
ComputeSourcePtr getComputeSource(TupleSpec &outputScheme, ComputePlan &plan) override
String outputTupleSetName
Definition: Computation.h:379