A platform for high-performance distributed tool and library development written in C++. It can be deployed in two different cluster modes: standalone or distributed. API for v0.5.0, released on June 13, 2018.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
PartitionCompBase.h
Go to the documentation of this file.
1 /*****************************************************************************
2  * *
3  * Copyright 2018 Rice University *
4  * *
5  * Licensed under the Apache License, Version 2.0 (the "License"); *
6  * you may not use this file except in compliance with the License. *
7  * You may obtain a copy of the License at *
8  * *
9  * http://www.apache.org/licenses/LICENSE-2.0 *
10  * *
11  * Unless required by applicable law or agreed to in writing, software *
12  * distributed under the License is distributed on an "AS IS" BASIS, *
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. *
14  * See the License for the specific language governing permissions and *
15  * limitations under the License. *
16  * *
17  *****************************************************************************/
18 
19 #ifndef PDB_PARTITIONCOMPBASE_H
20 #define PDB_PARTITIONCOMPBASE_H
21 
22 #include "Computation.h"
23 #include "VectorSink.h"
24 #include "ScanUserSet.h"
25 #include "TypeName.h"
26 #include "AbstractPartitionComp.h"
27 #include "HashPartitionSink.h"
28 
29 namespace pdb {
30 template<class KeyClass, class ValueClass>
31 class PartitionCompBase : public AbstractPartitionComp<KeyClass, ValueClass> {
32 
33  public:
34 
35 
44 
45 
50  void extractLambdas(std::map<std::string, GenericLambdaObjectPtr> &returnVal) override {
51  int suffix = 0;
52  Handle<ValueClass> checkMe = nullptr;
53  Lambda<KeyClass> projectionLambda = getProjection(checkMe);
54  projectionLambda.toMap(returnVal, suffix);
55  }
56 
57 
58 
62  std::string getComputationType() override {
63  return std::string("PartitionComp");
64  }
65 
70  return PartitionCompTypeID;
71  }
72 
76  std::string getOutputType() override {
77  return getTypeName<ValueClass>();
78  }
79 
80 
93  std::string toTCAPString(std::string inputTupleSetName,
94  std::vector<std::string> &inputColumnNames,
95  std::vector<std::string> &inputColumnsToApply,
96  std::vector<std::string> &childrenLambdaNames,
97  int computationLabel,
98  std::string &outputTupleSetName,
99  std::vector<std::string> &outputColumnNames,
100  std::string &addedOutputColumnName,
101  std::string &myLambdaName) override {
102 
103  PDB_COUT << "ABOUT TO GET TCAP STRING FOR PARTITION" << std::endl;
104  Handle<ValueClass> checkMe = nullptr;
105  std::string tupleSetName;
106  std::vector<std::string> columnNames;
107  std::string addedColumnName;
108  int lambdaLabel = 0;
109 
110 
111  std::string tcapString;
112 
113  PDB_COUT << "TO GET TCAP STRING FOR PROJECTION LAMBDA\n";
114  Lambda<KeyClass> projectionLambda = getProjection(checkMe);
115 
116  tcapString += "\n/* Apply projection */\n";
117  tcapString += projectionLambda.toTCAPString(inputTupleSetName,
118  inputColumnNames,
119  inputColumnsToApply,
120  childrenLambdaNames,
121  lambdaLabel,
123  computationLabel,
124  tupleSetName,
125  columnNames,
126  addedColumnName,
127  myLambdaName,
128  false);
129 
130  PDB_COUT << "TO REMOVE THE KEY COLUMN\n";
131 
132  mustache::data partitionCompTCAP;
133  partitionCompTCAP.set("computationType", getComputationType());
134  partitionCompTCAP.set("computationLabel", std::to_string(computationLabel));
135  partitionCompTCAP.set("tupleSetName", tupleSetName);
136  partitionCompTCAP.set("addedColumnName", addedColumnName);
137 
138  //set the output tuple set name
139  mustache::mustache outputTupleSetNameTemplate{"partitionOutFor{{computationType}}{{computationLabel}}"};
140  outputTupleSetName = outputTupleSetNameTemplate.render(partitionCompTCAP);
141 
142  partitionCompTCAP.set("outputTupleSetName", outputTupleSetName);
143 
144  // set the added output column
145  addedOutputColumnName = inputColumnsToApply[0];
146 
147  partitionCompTCAP.set("addedOutputColumnName", addedOutputColumnName);
148 
149  tcapString += "\n/* Apply partition */\n";
150 
151  mustache::mustache partitionTCAPTemplate{"{{outputTupleSetName}} ({{addedOutputColumnName}})"
152  " <= PARTITION ({{tupleSetName}}({{addedOutputColumnName}}, {{addedColumnName}}),"
153  "'{{computationType}}_{{computationLabel}}')\n"};
154 
155  tcapString += partitionTCAPTemplate.render(partitionCompTCAP);
156 
157  // update the state of the computation
158  outputColumnNames.clear();
159  outputColumnNames.push_back(addedOutputColumnName);
160 
161 
162  // update the state of the computation
163  this->setTraversed(true);
164  this->setOutputTupleSetName(outputTupleSetName);
165  this->setOutputColumnToApply(addedOutputColumnName);
166 
167  // return the TCAP string
168  return tcapString;
169  }
170 
171 
177  void setOutput(std::string dbName, std::string setName) override {
178  this->materializeSelectionOut = true;
179  this->outputSetScanner = makeObject<ScanUserSet<ValueClass>>();
180  this->outputSetScanner->setDatabaseName(dbName);
181  this->outputSetScanner->setSetName(setName);
182  }
183 
189  void setBatchSize(int batchSize) override {
190  if (this->outputSetScanner != nullptr) {
191  this->outputSetScanner->setBatchSize(batchSize);
192  }
193  }
194 
198  std::string getDatabaseName() override {
199  if (this->outputSetScanner != nullptr) {
200  return this->outputSetScanner->getDatabaseName();
201  } else {
202  return "";
203  }
204  }
205 
209  std::string getSetName() override {
210  if (this->outputSetScanner != nullptr) {
211  return this->outputSetScanner->getSetName();
212  } else {
213  return "";
214  }
215  }
216 
223  ComputeSourcePtr getComputeSource(TupleSpec &outputScheme, ComputePlan &plan) override {
224 
225  if (this->outputSetScanner != nullptr) {
226  std::cout << "database name is " << this->outputSetScanner->getDatabaseName() << std::endl;
227  std::cout << "set name is " << this->outputSetScanner->getSetName() << std::endl;
228  return outputSetScanner->getComputeSource(outputScheme, plan);
229  }
230  std::cout << "ERROR: get compute source for " << outputScheme << " returns nullptr" << std::endl;
231  return nullptr;
232  }
233 
234 
235 
236 
245  TupleSpec &projection,
246  ComputePlan &plan) override {
247 
248  return std::make_shared<HashPartitionSink<KeyClass, ValueClass>>(this->numPartitions, this->numNodes, consumeMe, projection);
249  }
250 
251 
256  return outputSetScanner;
257  }
258 
259 
260 private:
261 
263 
264 };
265 
266 }
267 
268 #endif //PDB_PARTITIONCOMPBASE_H
ComputationTypeID
Definition: Computation.h:39
void extractLambdas(std::map< std::string, GenericLambdaObjectPtr > &returnVal) override
std::shared_ptr< ComputeSource > ComputeSourcePtr
Definition: ComputeSource.h:26
Handle< ScanUserSet< ValueClass > > & getOutputSetScanner()
virtual Lambda< KeyClass > getProjection(Handle< ValueClass > checkMe)=0
void toMap(std::map< std::string, GenericLambdaObjectPtr > &returnVal, int &suffix)
Definition: Lambda.h:73
ComputeSinkPtr getComputeSink(TupleSpec &consumeMe, TupleSpec &projection, ComputePlan &plan) override
std::string toTCAPString(std::string inputTupleSetName, std::vector< std::string > &inputColumnNames, std::vector< std::string > &inputColumnsToApply, std::vector< std::string > &childrenLambdaNames, int computationLabel, std::string &outputTupleSetName, std::vector< std::string > &outputColumnNames, std::string &addedOutputColumnName, std::string &myLambdaName) override
std::shared_ptr< ComputeSink > ComputeSinkPtr
Definition: ComputeSink.h:27
void setBatchSize(int batchSize) override
std::string getSetName() override
void setOutputColumnToApply(std::string outputColumnToApply)
Definition: Computation.h:294
#define PDB_COUT
Definition: PDBDebug.h:31
std::string getComputationType() override
std::string getOutputType() override
void setTraversed(bool traversed)
Definition: Computation.h:254
Handle< ScanUserSet< ValueClass > > outputSetScanner
void setOutputTupleSetName(std::string outputTupleSetName)
Definition: Computation.h:275
std::string getDatabaseName() override
ComputationTypeID getComputationTypeID() override
std::string toTCAPString(std::string inputTupleSetName, std::vector< std::string > &inputColumnNames, std::vector< std::string > &inputColumnsToApply, std::vector< std::string > &childrenLambdaNames, int &lambdaLabel, std::string computationName, int computationLabel, std::string &outputTupleSetName, std::vector< std::string > &outputColumnNames, std::string &addedOutputColumnName, std::string &myLambdaName, bool whetherToRemoveUnusedOutputColumns, MultiInputsBase *multiInputsComp=nullptr, bool amIPartOfJoinPredicate=false)
Definition: Lambda.h:101
void setOutput(std::string dbName, std::string setName) override
ComputeSourcePtr getComputeSource(TupleSpec &outputScheme, ComputePlan &plan) override
String outputTupleSetName
Definition: Computation.h:379