5 #ifndef PDB_BASEAGGREGATECOMP_H
6 #define PDB_BASEAGGREGATECOMP_H
15 template<
class OutputClass,
class InputClass,
class KeyClass,
class ValueClass>
38 void setOutput(std::string dbName, std::string setName)
override {
59 void extractLambdas(std::map<std::string, GenericLambdaObjectPtr> &returnVal)
override {
64 keyLambda.
toMap(returnVal, suffix);
65 valueLambda.
toMap(returnVal, suffix);
80 return std::make_shared<ShuffleSink<KeyClass, ValueClass>>(
numPartitions, consumeMe, projection);
83 std::cout <<
"ERROR: cluster has 0 node" << std::endl;
87 std::cout <<
"ERROR: each node must have at least one partition" << std::endl;
90 return std::make_shared<CombinedShuffleSink<KeyClass, ValueClass>>(
114 std::cout <<
"ClusterAggregate: getComputeSource: BATCHSIZE=" <<
batchSize
116 return std::make_shared<MapTupleSetIterator<KeyClass, ValueClass, OutputClass>>(
131 std::vector<HashPartitionID> partitions)
override {
132 return make_shared<CombinerProcessor<KeyClass, ValueClass>>(partitions);
144 return make_shared<AggregationProcessor<KeyClass, ValueClass>>(id);
155 return make_shared<AggOutProcessor<OutputClass, KeyClass, ValueClass>>();
211 return std::string(
"ClusterAggregationComp");
227 return getTypeName<OutputClass>();
245 return getTypeName<InputClass>();
284 std::string
toTCAPString(std::vector<InputTupleSetSpecifier> &inputTupleSets,
285 int computationLabel,
287 std::vector<std::string> &outputColumnNames,
288 std::string &addedOutputColumnName)
override {
290 if (inputTupleSets.empty()) {
295 std::vector<std::string> childrenLambdaNames;
296 std::string myLambdaName;
305 addedOutputColumnName,
323 std::vector<std::string> &inputColumnNames,
324 std::vector<std::string> &inputColumnsToApply,
325 std::vector<std::string> &childrenLambdaNames,
326 int computationLabel,
328 std::vector<std::string> &outputColumnNames,
329 std::string &addedOutputColumnName,
330 std::string &myLambdaName) {
332 PDB_COUT <<
"To GET TCAP STRING FOR CLUSTER AGGREGATE COMP" << std::endl;
334 PDB_COUT <<
"To GET TCAP STRING FOR AGGREGATE KEY" << std::endl;
337 std::string tupleSetName;
338 std::vector<std::string> columnNames;
339 std::string addedColumnName;
342 std::vector<std::string> columnsToApply;
343 for (
const auto &i : inputColumnsToApply) {
344 columnsToApply.push_back(i);
347 std::string tcapString;
348 tcapString +=
"\n/* Extract key for aggregation */\n";
362 PDB_COUT <<
"To GET TCAP STRING FOR AGGREGATE VALUE" << std::endl;
365 std::vector<std::string> columnsToKeep;
366 columnsToKeep.push_back(addedColumnName);
368 tcapString +=
"\n/* Extract value for aggregation */\n";
378 addedOutputColumnName,
384 mustache::data clusterAggCompData;
386 clusterAggCompData.set(
"computationLabel", std::to_string(computationLabel));
387 clusterAggCompData.set(
"outputTupleSetName", outputTupleSetName);
388 clusterAggCompData.set(
"addedColumnName", addedColumnName);
389 clusterAggCompData.set(
"addedOutputColumnName", addedOutputColumnName);
392 mustache::mustache newTupleSetNameTemplate{
"aggOutFor{{computationType}}{{computationLabel}}"};
393 std::string newTupleSetName = newTupleSetNameTemplate.render(clusterAggCompData);
396 mustache::mustache newAddedOutputColumnName1Template{
"aggOutFor{{computationLabel}}"};
397 std::string addedOutputColumnName1 = newAddedOutputColumnName1Template.render(clusterAggCompData);
399 clusterAggCompData.set(
"addedOutputColumnName1", addedOutputColumnName1);
401 tcapString +=
"\n/* Apply aggregation */\n";
403 mustache::mustache aggregateTemplate{
"aggOutFor{{computationType}}{{computationLabel}} ({{addedOutputColumnName1}})"
404 "<= AGGREGATE ({{outputTupleSetName}}({{addedColumnName}}, {{addedOutputColumnName}}),"
405 "'{{computationType}}_{{computationLabel}}')\n"};
407 tcapString += aggregateTemplate.render(clusterAggCompData);
410 outputTupleSetName = newTupleSetName;
411 outputColumnNames.clear();
412 outputColumnNames.push_back(addedOutputColumnName1);
417 addedOutputColumnName = addedOutputColumnName1;
446 #endif //PDB_BASEAGGREGATECOMP_H
SimpleSingleTableQueryProcessorPtr getAggregationProcessor(HashPartitionID id) override
void setProxy(DataProxyPtr proxy) override
std::shared_ptr< ComputeSource > ComputeSourcePtr
std::string getComputationType() override
shared_ptr< DataProxy > DataProxyPtr
ComputeSinkPtr getComputeSink(TupleSpec &consumeMe, TupleSpec &projection, ComputePlan &plan) override
virtual Lambda< KeyClass > getKeyProjection(Handle< InputClass > aggMe)=0
void setCollectAsMap(bool collectAsMapOrNot) override
unsigned int HashPartitionID
SimpleSingleTableQueryProcessorPtr getAggOutProcessor() override
void * whereHashTableSitsForThePartition
void toMap(std::map< std::string, GenericLambdaObjectPtr > &returnVal, int &suffix)
Handle< ScanUserSet< OutputClass > > & getOutputSetScanner()
bool isCollectAsMap() override
virtual Lambda< ValueClass > getValueProjection(Handle< InputClass > aggMe)=0
std::string getSetName() override
std::string toTCAPString(std::vector< InputTupleSetSpecifier > &inputTupleSets, int computationLabel, std::string &outputTupleSetName, std::vector< std::string > &outputColumnNames, std::string &addedOutputColumnName) override
void extractLambdas(std::map< std::string, GenericLambdaObjectPtr > &returnVal) override
std::shared_ptr< ComputeSink > ComputeSinkPtr
std::shared_ptr< SimpleSingleTableQueryProcessor > SimpleSingleTableQueryProcessorPtr
void setNumNodesToCollect(int numNodesToCollect) override
Handle< ScanUserSet< OutputClass > > outputSetScanner
std::string toTCAPString(std::string inputTupleSetName, std::vector< std::string > &inputColumnNames, std::vector< std::string > &inputColumnsToApply, std::vector< std::string > &childrenLambdaNames, int computationLabel, std::string &outputTupleSetName, std::vector< std::string > &outputColumnNames, std::string &addedOutputColumnName, std::string &myLambdaName)
void setOutput(std::string dbName, std::string setName) override
void setOutputColumnToApply(std::string outputColumnToApply)
int getNumInputs() override
std::string getOutputType() override
void setHashTablePointer(void *hashTablePointer)
void setSetName(std::string setName) override
void setTraversed(bool traversed)
void setDatabaseName(std::string dbName) override
void setOutputTupleSetName(std::string outputTupleSetName)
bool isUsingCombiner() override
ComputationTypeID getComputationTypeID() override
int getNumNodesToCollect() override
ComputeSourcePtr getComputeSource(TupleSpec &outputScheme, ComputePlan &plan) override
std::string toTCAPString(std::string inputTupleSetName, std::vector< std::string > &inputColumnNames, std::vector< std::string > &inputColumnsToApply, std::vector< std::string > &childrenLambdaNames, int &lambdaLabel, std::string computationName, int computationLabel, std::string &outputTupleSetName, std::vector< std::string > &outputColumnNames, std::string &addedOutputColumnName, std::string &myLambdaName, bool whetherToRemoveUnusedOutputColumns, MultiInputsBase *multiInputsComp=nullptr, bool amIPartOfJoinPredicate=false)
SimpleSingleTableQueryProcessorPtr getCombinerProcessor(std::vector< HashPartitionID > partitions) override
std::string getDatabaseName() override
std::string getIthInputType(int i) override
shared_ptr< PageCircularBufferIterator > PageCircularBufferIteratorPtr
void setIterator(PageCircularBufferIteratorPtr iterator) override
String outputTupleSetName