A platform for high-performance distributed tool and library development written in C++. It can be deployed in two different cluster modes: standalone or distributed. API for v0.5.0, released on June 13, 2018.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
PartitionTransformationCompBase.h
Go to the documentation of this file.
1 /*****************************************************************************
2  * *
3  * Copyright 2018 Rice University *
4  * *
5  * Licensed under the Apache License, Version 2.0 (the "License"); *
6  * you may not use this file except in compliance with the License. *
7  * You may obtain a copy of the License at *
8  * *
9  * http://www.apache.org/licenses/LICENSE-2.0 *
10  * *
11  * Unless required by applicable law or agreed to in writing, software *
12  * distributed under the License is distributed on an "AS IS" BASIS, *
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. *
14  * See the License for the specific language governing permissions and *
15  * limitations under the License. *
16  * *
17  *****************************************************************************/
18 
19 #ifndef PARTITION_TRANSFORMATION_COMP_BASE_H
20 #define PARTITION_TRANSFORMATION_COMP_BASE_H
21 
22 #include "Computation.h"
23 #include "VectorSink.h"
24 #include "ScanUserSet.h"
25 #include "TypeName.h"
26 #include "AbstractPartitionComp.h"
28 
29 namespace pdb {
30 template<class KeyClass, class ValueClass>
31 class PartitionTransformationCompBase : public AbstractPartitionComp<KeyClass, ValueClass> {
32 
33  public:
34 
35 
43 
44 
49  void extractLambdas(std::map<std::string, GenericLambdaObjectPtr> &returnVal) override {
50  int suffix = 0;
51  Handle<ValueClass> checkMe = nullptr;
52  Lambda<KeyClass> projectionLambda = getProjection(checkMe);
53  projectionLambda.toMap(returnVal, suffix);
54  }
55 
56 
57 
61  std::string getComputationType() override {
62  return std::string("PartitionTransformationComp");
63  }
64 
70  }
71 
72 
76  std::string getOutputType() override {
77  return getTypeName<KeyClass>();
78  }
79 
80 
93  std::string toTCAPString(std::string inputTupleSetName,
94  std::vector<std::string> &inputColumnNames,
95  std::vector<std::string> &inputColumnsToApply,
96  std::vector<std::string> &childrenLambdaNames,
97  int computationLabel,
98  std::string &outputTupleSetName,
99  std::vector<std::string> &outputColumnNames,
100  std::string &addedOutputColumnName,
101  std::string &myLambdaName) override {
102 
103  PDB_COUT << "ABOUT TO GET TCAP STRING FOR SELECTION" << std::endl;
104  Handle<ValueClass> checkMe = nullptr;
105  std::string tupleSetName;
106  std::vector<std::string> columnNames;
107  std::string addedColumnName;
108  int lambdaLabel = 0;
109 
110 
111  std::string tcapString;
112 
113  PDB_COUT << "TO GET TCAP STRING FOR PROJECTION LAMBDA\n";
114  Lambda<KeyClass> projectionLambda = getProjection(checkMe);
115 
116  tcapString += "\n/* Apply projection */\n";
117  tcapString += projectionLambda.toTCAPString(inputTupleSetName,
118  inputColumnNames,
119  inputColumnsToApply,
120  childrenLambdaNames,
121  lambdaLabel,
123  computationLabel,
124  outputTupleSetName,
125  outputColumnNames,
126  addedOutputColumnName,
127  myLambdaName,
128  true);
129 
130  // update the state of the computation
131  this->setTraversed(true);
132  this->setOutputTupleSetName(outputTupleSetName);
133  this->setOutputColumnToApply(addedOutputColumnName);
134 
135  // return the TCAP string
136  return tcapString;
137  }
138 
139 
145  void setOutput(std::string dbName, std::string setName) override {
146  this->materializeSelectionOut = true;
147  this->outputSetScanner = makeObject<ScanUserSet<KeyClass>>();
148  this->outputSetScanner->setDatabaseName(dbName);
149  this->outputSetScanner->setSetName(setName);
150  }
151 
157  void setBatchSize(int batchSize) override {
158  if (this->outputSetScanner != nullptr) {
159  this->outputSetScanner->setBatchSize(batchSize);
160  }
161  }
162 
166  std::string getDatabaseName() override {
167  if (this->outputSetScanner != nullptr) {
168  return this->outputSetScanner->getDatabaseName();
169  } else {
170  return "";
171  }
172  }
173 
177  std::string getSetName() override {
178  if (this->outputSetScanner != nullptr) {
179  return this->outputSetScanner->getSetName();
180  } else {
181  return "";
182  }
183  }
184 
191  ComputeSourcePtr getComputeSource(TupleSpec &outputScheme, ComputePlan &plan) override {
192 
193  if (this->materializeSelectionOut) {
194  if (this->outputSetScanner != nullptr) {
195  return outputSetScanner->getComputeSource(outputScheme, plan);
196  }
197  }
198  std::cout << "ERROR: get compute source for " << outputScheme << " returns nullptr" << std::endl;
199  return nullptr;
200  }
201 
202 
211  TupleSpec &projection,
212  ComputePlan &plan) override {
213 
214  if (this->materializeSelectionOut) {
215  return std::make_shared<HashPartitionTransformationSink<KeyClass>>(this->numPartitions, this->numNodes, consumeMe, projection);
216  }
217  return nullptr;
218  }
219 
220 
225  return outputSetScanner;
226  }
227 
228 private:
229 
231 
232 };
233 
234 }
235 
236 #endif //PDB_PARTITIONCOMPBASE_H
ComputationTypeID
Definition: Computation.h:39
std::shared_ptr< ComputeSource > ComputeSourcePtr
Definition: ComputeSource.h:26
ComputeSourcePtr getComputeSource(TupleSpec &outputScheme, ComputePlan &plan) override
void toMap(std::map< std::string, GenericLambdaObjectPtr > &returnVal, int &suffix)
Definition: Lambda.h:73
Handle< ScanUserSet< KeyClass > > & getOutputSetScanner()
virtual Lambda< KeyClass > getProjection(Handle< ValueClass > checkMe)=0
void setOutput(std::string dbName, std::string setName) override
std::shared_ptr< ComputeSink > ComputeSinkPtr
Definition: ComputeSink.h:27
void setOutputColumnToApply(std::string outputColumnToApply)
Definition: Computation.h:294
#define PDB_COUT
Definition: PDBDebug.h:31
ComputeSinkPtr getComputeSink(TupleSpec &consumeMe, TupleSpec &projection, ComputePlan &plan) override
void extractLambdas(std::map< std::string, GenericLambdaObjectPtr > &returnVal) override
void setTraversed(bool traversed)
Definition: Computation.h:254
Handle< ScanUserSet< KeyClass > > outputSetScanner
void setOutputTupleSetName(std::string outputTupleSetName)
Definition: Computation.h:275
std::string toTCAPString(std::string inputTupleSetName, std::vector< std::string > &inputColumnNames, std::vector< std::string > &inputColumnsToApply, std::vector< std::string > &childrenLambdaNames, int computationLabel, std::string &outputTupleSetName, std::vector< std::string > &outputColumnNames, std::string &addedOutputColumnName, std::string &myLambdaName) override
std::string toTCAPString(std::string inputTupleSetName, std::vector< std::string > &inputColumnNames, std::vector< std::string > &inputColumnsToApply, std::vector< std::string > &childrenLambdaNames, int &lambdaLabel, std::string computationName, int computationLabel, std::string &outputTupleSetName, std::vector< std::string > &outputColumnNames, std::string &addedOutputColumnName, std::string &myLambdaName, bool whetherToRemoveUnusedOutputColumns, MultiInputsBase *multiInputsComp=nullptr, bool amIPartOfJoinPredicate=false)
Definition: Lambda.h:101
String outputTupleSetName
Definition: Computation.h:379