A platform for high-performance distributed tool and library development written in C++. It can be deployed in two different cluster modes: standalone or distributed. API for v0.5.0, released on June 13, 2018.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
AggregateCompBase.h
Go to the documentation of this file.
1 //
2 // Created by dimitrije on 3/16/18.
3 //
4 
5 #ifndef PDB_BASEAGGREGATECOMP_H
6 #define PDB_BASEAGGREGATECOMP_H
7 
9 
14 namespace pdb {
15 template<class OutputClass, class InputClass, class KeyClass, class ValueClass>
17  public:
18 
25 
32 
38  void setOutput(std::string dbName, std::string setName) override {
39  this->materializeAggOut = true;
40  this->outputSetScanner = makeObject<ScanUserSet<OutputClass>>();
41  this->outputSetScanner->setBatchSize(batchSize);
42  this->outputSetScanner->setDatabaseName(dbName);
43  this->outputSetScanner->setSetName(setName);
44  this->whereHashTableSitsForThePartition = nullptr;
45  }
46 
51  void setHashTablePointer(void *hashTablePointer) {
52  this->whereHashTableSitsForThePartition = hashTablePointer;
53  }
54 
59  void extractLambdas(std::map<std::string, GenericLambdaObjectPtr> &returnVal) override {
60  int suffix = 0;
61  Handle<InputClass> checkMe = nullptr;
62  Lambda<KeyClass> keyLambda = getKeyProjection(checkMe);
63  Lambda<ValueClass> valueLambda = getValueProjection(checkMe);
64  keyLambda.toMap(returnVal, suffix);
65  valueLambda.toMap(returnVal, suffix);
66  }
67 
76  TupleSpec &projection,
77  ComputePlan &plan) override {
78 
79  if (this->isUsingCombiner()) {
80  return std::make_shared<ShuffleSink<KeyClass, ValueClass>>(numPartitions, consumeMe, projection);
81  } else {
82  if (numNodes == 0) {
83  std::cout << "ERROR: cluster has 0 node" << std::endl;
84  return nullptr;
85  }
86  if (numPartitions < numNodes) {
87  std::cout << "ERROR: each node must have at least one partition" << std::endl;
88  return nullptr;
89  }
90  return std::make_shared<CombinedShuffleSink<KeyClass, ValueClass>>(
91  numPartitions / numNodes, numNodes, consumeMe, projection);
92  }
93  }
94 
101  ComputeSourcePtr getComputeSource(TupleSpec &outputScheme, ComputePlan &plan) override {
102  // materialize aggregation result to user set
103  if (this->materializeAggOut) {
104  if (outputSetScanner != nullptr) {
105  return outputSetScanner->getComputeSource(outputScheme, plan);
106  }
107  return nullptr;
108 
109  // not materialize aggregation result, keep them in hash table
110  } else {
111  if (whereHashTableSitsForThePartition != nullptr) {
112  Handle<Object> myHashTable =
113  ((Record<Object> *) whereHashTableSitsForThePartition)->getRootObject();
114  std::cout << "ClusterAggregate: getComputeSource: BATCHSIZE=" << batchSize
115  << std::endl;
116  return std::make_shared<MapTupleSetIterator<KeyClass, ValueClass, OutputClass>>(
117  myHashTable, batchSize);
118  }
119  return nullptr;
120  }
121  }
122 
131  std::vector<HashPartitionID> partitions) override {
132  return make_shared<CombinerProcessor<KeyClass, ValueClass>>(partitions);
133  }
134 
144  return make_shared<AggregationProcessor<KeyClass, ValueClass>>(id);
145  }
146 
155  return make_shared<AggOutProcessor<OutputClass, KeyClass, ValueClass>>();
156  }
157 
162  void setIterator(PageCircularBufferIteratorPtr iterator) override {
163  this->outputSetScanner->setIterator(iterator);
164  }
165 
170  void setProxy(DataProxyPtr proxy) override {
171  this->outputSetScanner->setProxy(proxy);
172  }
173 
178  void setDatabaseName(std::string dbName) override {
179  this->outputSetScanner->setDatabaseName(dbName);
180  }
181 
186  void setSetName(std::string setName) override {
187  this->outputSetScanner->setSetName(setName);
188  }
189 
194  std::string getDatabaseName() override {
195  return this->outputSetScanner->getDatabaseName();
196  }
197 
202  std::string getSetName() override {
203  return this->outputSetScanner->getSetName();
204  }
205 
210  std::string getComputationType() override {
211  return std::string("ClusterAggregationComp");
212  }
213 
220  }
221 
226  std::string getOutputType() override {
227  return getTypeName<OutputClass>();
228  }
229 
234  int getNumInputs() override {
235  return 1;
236  }
237 
243  std::string getIthInputType(int i) override {
244  if (i == 0) {
245  return getTypeName<InputClass>();
246  } else {
247  return "";
248  }
249  }
250 
255  void setCollectAsMap(bool collectAsMapOrNot) override {
256  this->collectAsMapOrNot = collectAsMapOrNot;
257  }
258 
263  bool isCollectAsMap() override {
264  return this->collectAsMapOrNot;
265  }
266 
267  int getNumNodesToCollect() override {
268  return this->numNodesToCollect;
269  }
270 
272  this->numNodesToCollect = numNodesToCollect;
273  }
274 
284  std::string toTCAPString(std::vector<InputTupleSetSpecifier> &inputTupleSets,
285  int computationLabel,
286  std::string &outputTupleSetName,
287  std::vector<std::string> &outputColumnNames,
288  std::string &addedOutputColumnName) override {
289 
290  if (inputTupleSets.empty()) {
291  return "";
292  }
293 
294  InputTupleSetSpecifier inputTupleSet = inputTupleSets[0];
295  std::vector<std::string> childrenLambdaNames;
296  std::string myLambdaName;
297 
298  return toTCAPString(inputTupleSet.getTupleSetName(),
299  inputTupleSet.getColumnNamesToKeep(),
300  inputTupleSet.getColumnNamesToApply(),
301  childrenLambdaNames,
302  computationLabel,
304  outputColumnNames,
305  addedOutputColumnName,
306  myLambdaName);
307  }
308 
322  std::string toTCAPString(std::string inputTupleSetName,
323  std::vector<std::string> &inputColumnNames,
324  std::vector<std::string> &inputColumnsToApply,
325  std::vector<std::string> &childrenLambdaNames,
326  int computationLabel,
327  std::string &outputTupleSetName,
328  std::vector<std::string> &outputColumnNames,
329  std::string &addedOutputColumnName,
330  std::string &myLambdaName) {
331 
332  PDB_COUT << "To GET TCAP STRING FOR CLUSTER AGGREGATE COMP" << std::endl;
333 
334  PDB_COUT << "To GET TCAP STRING FOR AGGREGATE KEY" << std::endl;
335  Handle<InputClass> checkMe = nullptr;
336  Lambda<KeyClass> keyLambda = getKeyProjection(checkMe);
337  std::string tupleSetName;
338  std::vector<std::string> columnNames;
339  std::string addedColumnName;
340  int lambdaLabel = 0;
341 
342  std::vector<std::string> columnsToApply;
343  for (const auto &i : inputColumnsToApply) {
344  columnsToApply.push_back(i);
345  }
346 
347  std::string tcapString;
348  tcapString += "\n/* Extract key for aggregation */\n";
349  tcapString += keyLambda.toTCAPString(inputTupleSetName,
350  inputColumnNames,
351  inputColumnsToApply,
352  childrenLambdaNames,
353  lambdaLabel,
355  computationLabel,
356  tupleSetName,
357  columnNames,
358  addedColumnName,
359  myLambdaName,
360  false);
361 
362  PDB_COUT << "To GET TCAP STRING FOR AGGREGATE VALUE" << std::endl;
363 
364  Lambda<ValueClass> valueLambda = getValueProjection(checkMe);
365  std::vector<std::string> columnsToKeep;
366  columnsToKeep.push_back(addedColumnName);
367 
368  tcapString += "\n/* Extract value for aggregation */\n";
369  tcapString += valueLambda.toTCAPString(tupleSetName,
370  columnsToKeep,
371  columnsToApply,
372  childrenLambdaNames,
373  lambdaLabel,
375  computationLabel,
376  outputTupleSetName,
377  outputColumnNames,
378  addedOutputColumnName,
379  myLambdaName,
380  false);
381 
382 
383  // create the data for the filter
384  mustache::data clusterAggCompData;
385  clusterAggCompData.set("computationType", getComputationType());
386  clusterAggCompData.set("computationLabel", std::to_string(computationLabel));
387  clusterAggCompData.set("outputTupleSetName", outputTupleSetName);
388  clusterAggCompData.set("addedColumnName", addedColumnName);
389  clusterAggCompData.set("addedOutputColumnName", addedOutputColumnName);
390 
391  // set the new tuple set name
392  mustache::mustache newTupleSetNameTemplate{"aggOutFor{{computationType}}{{computationLabel}}"};
393  std::string newTupleSetName = newTupleSetNameTemplate.render(clusterAggCompData);
394 
395  // set new added output columnName 1
396  mustache::mustache newAddedOutputColumnName1Template{"aggOutFor{{computationLabel}}"};
397  std::string addedOutputColumnName1 = newAddedOutputColumnName1Template.render(clusterAggCompData);
398 
399  clusterAggCompData.set("addedOutputColumnName1", addedOutputColumnName1);
400 
401  tcapString += "\n/* Apply aggregation */\n";
402 
403  mustache::mustache aggregateTemplate{"aggOutFor{{computationType}}{{computationLabel}} ({{addedOutputColumnName1}})"
404  "<= AGGREGATE ({{outputTupleSetName}}({{addedColumnName}}, {{addedOutputColumnName}}),"
405  "'{{computationType}}_{{computationLabel}}')\n"};
406 
407  tcapString += aggregateTemplate.render(clusterAggCompData);
408 
409  // update the state of the computation
410  outputTupleSetName = newTupleSetName;
411  outputColumnNames.clear();
412  outputColumnNames.push_back(addedOutputColumnName1);
413 
414  this->setTraversed(true);
415  this->setOutputTupleSetName(outputTupleSetName);
416  this->setOutputColumnToApply(addedOutputColumnName1);
417  addedOutputColumnName = addedOutputColumnName1;
418 
419  return tcapString;
420  }
421 
423  return outputSetScanner;
424  }
425 
426  protected:
427 
432 
436  bool collectAsMapOrNot = false;
437 
442 };
443 
444 }
445 
446 #endif //PDB_BASEAGGREGATECOMP_H
SimpleSingleTableQueryProcessorPtr getAggregationProcessor(HashPartitionID id) override
ComputationTypeID
Definition: Computation.h:39
void setProxy(DataProxyPtr proxy) override
std::shared_ptr< ComputeSource > ComputeSourcePtr
Definition: ComputeSource.h:26
std::vector< std::string > & getColumnNamesToKeep()
std::string getComputationType() override
shared_ptr< DataProxy > DataProxyPtr
Definition: DataProxy.h:30
ComputeSinkPtr getComputeSink(TupleSpec &consumeMe, TupleSpec &projection, ComputePlan &plan) override
virtual Lambda< KeyClass > getKeyProjection(Handle< InputClass > aggMe)=0
void setCollectAsMap(bool collectAsMapOrNot) override
unsigned int HashPartitionID
Definition: DataTypes.h:28
SimpleSingleTableQueryProcessorPtr getAggOutProcessor() override
void toMap(std::map< std::string, GenericLambdaObjectPtr > &returnVal, int &suffix)
Definition: Lambda.h:73
Handle< ScanUserSet< OutputClass > > & getOutputSetScanner()
bool isCollectAsMap() override
virtual Lambda< ValueClass > getValueProjection(Handle< InputClass > aggMe)=0
std::string getSetName() override
std::string toTCAPString(std::vector< InputTupleSetSpecifier > &inputTupleSets, int computationLabel, std::string &outputTupleSetName, std::vector< std::string > &outputColumnNames, std::string &addedOutputColumnName) override
void extractLambdas(std::map< std::string, GenericLambdaObjectPtr > &returnVal) override
std::shared_ptr< ComputeSink > ComputeSinkPtr
Definition: ComputeSink.h:27
std::shared_ptr< SimpleSingleTableQueryProcessor > SimpleSingleTableQueryProcessorPtr
void setNumNodesToCollect(int numNodesToCollect) override
Handle< ScanUserSet< OutputClass > > outputSetScanner
std::string toTCAPString(std::string inputTupleSetName, std::vector< std::string > &inputColumnNames, std::vector< std::string > &inputColumnsToApply, std::vector< std::string > &childrenLambdaNames, int computationLabel, std::string &outputTupleSetName, std::vector< std::string > &outputColumnNames, std::string &addedOutputColumnName, std::string &myLambdaName)
void setOutput(std::string dbName, std::string setName) override
void setOutputColumnToApply(std::string outputColumnToApply)
Definition: Computation.h:294
#define PDB_COUT
Definition: PDBDebug.h:31
std::string getOutputType() override
void setHashTablePointer(void *hashTablePointer)
void setSetName(std::string setName) override
void setTraversed(bool traversed)
Definition: Computation.h:254
std::vector< std::string > & getColumnNamesToApply()
void setDatabaseName(std::string dbName) override
void setOutputTupleSetName(std::string outputTupleSetName)
Definition: Computation.h:275
ComputationTypeID getComputationTypeID() override
int getNumNodesToCollect() override
ComputeSourcePtr getComputeSource(TupleSpec &outputScheme, ComputePlan &plan) override
std::string toTCAPString(std::string inputTupleSetName, std::vector< std::string > &inputColumnNames, std::vector< std::string > &inputColumnsToApply, std::vector< std::string > &childrenLambdaNames, int &lambdaLabel, std::string computationName, int computationLabel, std::string &outputTupleSetName, std::vector< std::string > &outputColumnNames, std::string &addedOutputColumnName, std::string &myLambdaName, bool whetherToRemoveUnusedOutputColumns, MultiInputsBase *multiInputsComp=nullptr, bool amIPartOfJoinPredicate=false)
Definition: Lambda.h:101
SimpleSingleTableQueryProcessorPtr getCombinerProcessor(std::vector< HashPartitionID > partitions) override
std::string getDatabaseName() override
std::string getIthInputType(int i) override
shared_ptr< PageCircularBufferIterator > PageCircularBufferIteratorPtr
void setIterator(PageCircularBufferIteratorPtr iterator) override
String outputTupleSetName
Definition: Computation.h:379