A platform for high-performance distributed tool and library development written in C++. It can be deployed in two different cluster modes: standalone or distributed. API for v0.5.0, released on June 13, 2018.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
SelectionCompBase.h
Go to the documentation of this file.
1 /*****************************************************************************
2  * *
3  * Copyright 2018 Rice University *
4  * *
5  * Licensed under the Apache License, Version 2.0 (the "License"); *
6  * you may not use this file except in compliance with the License. *
7  * You may obtain a copy of the License at *
8  * *
9  * http://www.apache.org/licenses/LICENSE-2.0 *
10  * *
11  * Unless required by applicable law or agreed to in writing, software *
12  * distributed under the License is distributed on an "AS IS" BASIS, *
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. *
14  * See the License for the specific language governing permissions and *
15  * limitations under the License. *
16  * *
17  *****************************************************************************/
18 
19 #ifndef PDB_SELECTIONCOMPBASE_H
20 #define PDB_SELECTIONCOMPBASE_H
21 
22 #include "Computation.h"
23 #include "VectorSink.h"
24 #include "ScanUserSet.h"
25 #include "TypeName.h"
26 
27 namespace pdb {
28 template<class OutputClass, class InputClass>
30 
31  public:
32 
38  virtual Lambda<bool> getSelection(Handle<InputClass> checkMe) = 0;
39 
47 
52  void extractLambdas(std::map<std::string, GenericLambdaObjectPtr> &returnVal) override {
53  int suffix = 0;
54  Handle<InputClass> checkMe = nullptr;
55  Lambda<bool> selectionLambda = getSelection(checkMe);
56  Lambda<Handle<OutputClass>> projectionLambda = getProjection(checkMe);
57  selectionLambda.toMap(returnVal, suffix);
58  projectionLambda.toMap(returnVal, suffix);
59  }
60 
65  std::string getComputationType() override {
66  return std::string("SelectionComp");
67  }
68 
74  return SelectionCompTypeID;
75  }
76 
82  std::string getIthInputType(int i) override {
83  if (i == 0) {
84  return getTypeName<InputClass>();
85  } else {
86  return "";
87  }
88  }
89 
94  int getNumInputs() override {
95  return 1;
96  }
97 
102  std::string getOutputType() override {
103  return getTypeName<OutputClass>();
104  }
105 
115  std::string toTCAPString(std::vector<InputTupleSetSpecifier> &inputTupleSets,
116  int computationLabel,
117  std::string &outputTupleSetName,
118  std::vector<std::string> &outputColumnNames,
119  std::string &addedOutputColumnName) override {
120 
121  if (inputTupleSets.empty()) {
122  return "";
123  }
124  InputTupleSetSpecifier inputTupleSet = inputTupleSets[0];
125  std::vector<std::string> childrenLambdaNames;
126  std::string myLambdaName;
127  return toTCAPString(inputTupleSet.getTupleSetName(),
128  inputTupleSet.getColumnNamesToKeep(),
129  inputTupleSet.getColumnNamesToApply(),
130  childrenLambdaNames,
131  computationLabel,
133  outputColumnNames,
134  addedOutputColumnName,
135  myLambdaName);
136  }
137 
151  std::string toTCAPString(std::string inputTupleSetName,
152  std::vector<std::string> &inputColumnNames,
153  std::vector<std::string> &inputColumnsToApply,
154  std::vector<std::string> &childrenLambdaNames,
155  int computationLabel,
156  std::string &outputTupleSetName,
157  std::vector<std::string> &outputColumnNames,
158  std::string &addedOutputColumnName,
159  std::string &myLambdaName) {
160 
161  PDB_COUT << "ABOUT TO GET TCAP STRING FOR SELECTION" << std::endl;
162  Handle<InputClass> checkMe = nullptr;
163  std::string tupleSetName;
164  std::vector<std::string> columnNames;
165  std::string addedColumnName;
166  int lambdaLabel = 0;
167 
168  PDB_COUT << "ABOUT TO GET TCAP STRING FOR SELECTION LAMBDA" << std::endl;
169  Lambda<bool> selectionLambda = getSelection(checkMe);
170 
171  std::string tcapString;
172  tcapString += "\n/* Apply selection filtering */\n";
173  tcapString += selectionLambda.toTCAPString(inputTupleSetName,
174  inputColumnNames,
175  inputColumnsToApply,
176  childrenLambdaNames,
177  lambdaLabel,
179  computationLabel,
180  tupleSetName,
181  columnNames,
182  addedColumnName,
183  myLambdaName,
184  false);
185 
186  PDB_COUT << "The tcapString after parsing selection lambda: " << tcapString << "\n";
187  PDB_COUT << "lambdaLabel=" << lambdaLabel << "\n";
188 
189  // create the data for the column names
190  mustache::data inputColumnData = mustache::data::type::list;
191  for(int i = 0; i < inputColumnNames.size(); i++) {
192 
193  mustache::data columnData;
194 
195  // fill in the column data
196  columnData.set("columnName", inputColumnNames[i]);
197  columnData.set("isLast", i == inputColumnNames.size()-1);
198 
199  inputColumnData.push_back(columnData);
200  }
201 
202  // create the data for the filter
203  mustache::data selectionCompData;
204  selectionCompData.set("computationType", getComputationType());
205  selectionCompData.set("computationLabel", std::to_string(computationLabel));
206  selectionCompData.set("inputColumns", inputColumnData);
207  selectionCompData.set("tupleSetName", tupleSetName);
208  selectionCompData.set("addedColumnName", addedColumnName);
209 
210  // tupleSetName1(att1, att2, ...) <= FILTER (tupleSetName(methodCall_0OutFor_isFrank), methodCall_0OutFor_SelectionComp1(in0), 'SelectionComp_1')
211  mustache::mustache scanSetTemplate{"filteredInputFor{{computationType}}{{computationLabel}}({{#inputColumns}}{{columnName}}{{^isLast}}, {{/isLast}}{{/inputColumns}}) "
212  "<= FILTER ({{tupleSetName}}({{addedColumnName}}), {{tupleSetName}}({{#inputColumns}}{{columnName}}{{^isLast}}, {{/isLast}}{{/inputColumns}}), '{{computationType}}_{{computationLabel}}')\n"};
213 
214  // generate the TCAP string for the FILTER
215  tcapString += scanSetTemplate.render(selectionCompData);
216 
217  // template for the new tuple set name
218  mustache::mustache newTupleSetNameTemplate{"filteredInputFor{{computationType}}{{computationLabel}}"};
219 
220  // generate the new tuple set name
221  std::string newTupleSetName = newTupleSetNameTemplate.render(selectionCompData);
222 
223  PDB_COUT << "TO GET TCAP STRING FOR PROJECTION LAMBDA\n";
224  Lambda<Handle<OutputClass>> projectionLambda = getProjection(checkMe);
225 
226  // generate the TCAP string for the FILTER
227  tcapString += "\n/* Apply selection projection */\n";
228  tcapString += projectionLambda.toTCAPString(newTupleSetName,
229  inputColumnNames,
230  inputColumnsToApply,
231  childrenLambdaNames,
232  lambdaLabel,
234  computationLabel,
235  outputTupleSetName,
236  outputColumnNames,
237  addedOutputColumnName,
238  myLambdaName,
239  true);
240 
241  // update the state of the computation
242  this->setTraversed(true);
243  this->setOutputTupleSetName(outputTupleSetName);
244  this->setOutputColumnToApply(addedOutputColumnName);
245 
246  // return the TCAP string
247  return tcapString;
248  }
249 
250  void setOutput(std::string dbName, std::string setName) override {
251  this->materializeSelectionOut = true;
252  this->outputSetScanner = makeObject<ScanUserSet<OutputClass>>();
253  this->outputSetScanner->setDatabaseName(dbName);
254  this->outputSetScanner->setSetName(setName);
255  }
256 
257  void setBatchSize(int batchSize) override {
258  if (this->outputSetScanner != nullptr) {
259  this->outputSetScanner->setBatchSize(batchSize);
260  }
261  }
262 
267  std::string getDatabaseName() override {
268  return this->outputSetScanner->getDatabaseName();
269  }
270 
275  std::string getSetName() override {
276  return this->outputSetScanner->getSetName();
277  }
278 
285  ComputeSourcePtr getComputeSource(TupleSpec &outputScheme, ComputePlan &plan) override {
286 
287  if (this->materializeSelectionOut) {
288  if (this->outputSetScanner != nullptr) {
289  return outputSetScanner->getComputeSource(outputScheme, plan);
290  }
291  }
292  std::cout << "ERROR: get compute source for " << outputScheme << " returns nullptr" << std::endl;
293  return nullptr;
294  }
295 
304  TupleSpec &projection,
305  ComputePlan &plan) override {
306 
307  if (this->materializeSelectionOut) {
308  return std::make_shared<VectorSink<OutputClass>>(consumeMe, projection);
309  }
310  return nullptr;
311  }
312 
313  bool needsMaterializeOutput() override {
315  }
316 
318  return outputSetScanner;
319  }
320 
321  private:
324 };
325 
326 }
327 
328 #endif //PDB_SELECTIONCOMPBASE_H
ComputationTypeID
Definition: Computation.h:39
std::shared_ptr< ComputeSource > ComputeSourcePtr
Definition: ComputeSource.h:26
std::vector< std::string > & getColumnNamesToKeep()
std::string toTCAPString(std::vector< InputTupleSetSpecifier > &inputTupleSets, int computationLabel, std::string &outputTupleSetName, std::vector< std::string > &outputColumnNames, std::string &addedOutputColumnName) override
std::string toTCAPString(std::string inputTupleSetName, std::vector< std::string > &inputColumnNames, std::vector< std::string > &inputColumnsToApply, std::vector< std::string > &childrenLambdaNames, int computationLabel, std::string &outputTupleSetName, std::vector< std::string > &outputColumnNames, std::string &addedOutputColumnName, std::string &myLambdaName)
void setOutput(std::string dbName, std::string setName) override
void toMap(std::map< std::string, GenericLambdaObjectPtr > &returnVal, int &suffix)
Definition: Lambda.h:73
ComputeSourcePtr getComputeSource(TupleSpec &outputScheme, ComputePlan &plan) override
void setBatchSize(int batchSize) override
virtual Lambda< bool > getSelection(Handle< InputClass > checkMe)=0
std::string getDatabaseName() override
std::string getOutputType() override
std::shared_ptr< ComputeSink > ComputeSinkPtr
Definition: ComputeSink.h:27
void extractLambdas(std::map< std::string, GenericLambdaObjectPtr > &returnVal) override
virtual Lambda< Handle< OutputClass > > getProjection(Handle< InputClass > checkMe)=0
std::string getComputationType() override
void setOutputColumnToApply(std::string outputColumnToApply)
Definition: Computation.h:294
#define PDB_COUT
Definition: PDBDebug.h:31
int getNumInputs() override
ComputationTypeID getComputationTypeID() override
std::string getIthInputType(int i) override
ComputeSinkPtr getComputeSink(TupleSpec &consumeMe, TupleSpec &projection, ComputePlan &plan) override
std::string getSetName() override
void setTraversed(bool traversed)
Definition: Computation.h:254
std::vector< std::string > & getColumnNamesToApply()
void setOutputTupleSetName(std::string outputTupleSetName)
Definition: Computation.h:275
bool needsMaterializeOutput() override
std::string toTCAPString(std::string inputTupleSetName, std::vector< std::string > &inputColumnNames, std::vector< std::string > &inputColumnsToApply, std::vector< std::string > &childrenLambdaNames, int &lambdaLabel, std::string computationName, int computationLabel, std::string &outputTupleSetName, std::vector< std::string > &outputColumnNames, std::string &addedOutputColumnName, std::string &myLambdaName, bool whetherToRemoveUnusedOutputColumns, MultiInputsBase *multiInputsComp=nullptr, bool amIPartOfJoinPredicate=false)
Definition: Lambda.h:101
Handle< ScanUserSet< OutputClass > > outputSetScanner
Handle< ScanUserSet< OutputClass > > & getOutputSetScanner()
String outputTupleSetName
Definition: Computation.h:379