19 #ifndef PDB_PARTITIONCOMPBASE_H
20 #define PDB_PARTITIONCOMPBASE_H
30 template<
class KeyClass,
class ValueClass>
50 void extractLambdas(std::map<std::string, GenericLambdaObjectPtr> &returnVal)
override {
54 projectionLambda.
toMap(returnVal, suffix);
63 return std::string(
"PartitionComp");
77 return getTypeName<ValueClass>();
94 std::vector<std::string> &inputColumnNames,
95 std::vector<std::string> &inputColumnsToApply,
96 std::vector<std::string> &childrenLambdaNames,
99 std::vector<std::string> &outputColumnNames,
100 std::string &addedOutputColumnName,
101 std::string &myLambdaName)
override {
103 PDB_COUT <<
"ABOUT TO GET TCAP STRING FOR PARTITION" << std::endl;
105 std::string tupleSetName;
106 std::vector<std::string> columnNames;
107 std::string addedColumnName;
111 std::string tcapString;
113 PDB_COUT <<
"TO GET TCAP STRING FOR PROJECTION LAMBDA\n";
116 tcapString +=
"\n/* Apply projection */\n";
117 tcapString += projectionLambda.
toTCAPString(inputTupleSetName,
130 PDB_COUT <<
"TO REMOVE THE KEY COLUMN\n";
132 mustache::data partitionCompTCAP;
134 partitionCompTCAP.set(
"computationLabel", std::to_string(computationLabel));
135 partitionCompTCAP.set(
"tupleSetName", tupleSetName);
136 partitionCompTCAP.set(
"addedColumnName", addedColumnName);
139 mustache::mustache outputTupleSetNameTemplate{
"partitionOutFor{{computationType}}{{computationLabel}}"};
140 outputTupleSetName = outputTupleSetNameTemplate.render(partitionCompTCAP);
142 partitionCompTCAP.set(
"outputTupleSetName", outputTupleSetName);
145 addedOutputColumnName = inputColumnsToApply[0];
147 partitionCompTCAP.set(
"addedOutputColumnName", addedOutputColumnName);
149 tcapString +=
"\n/* Apply partition */\n";
151 mustache::mustache partitionTCAPTemplate{
"{{outputTupleSetName}} ({{addedOutputColumnName}})"
152 " <= PARTITION ({{tupleSetName}}({{addedOutputColumnName}}, {{addedColumnName}}),"
153 "'{{computationType}}_{{computationLabel}}')\n"};
155 tcapString += partitionTCAPTemplate.render(partitionCompTCAP);
158 outputColumnNames.clear();
159 outputColumnNames.push_back(addedOutputColumnName);
177 void setOutput(std::string dbName, std::string setName)
override {
226 std::cout <<
"database name is " << this->
outputSetScanner->getDatabaseName() << std::endl;
227 std::cout <<
"set name is " << this->
outputSetScanner->getSetName() << std::endl;
230 std::cout <<
"ERROR: get compute source for " << outputScheme <<
" returns nullptr" << std::endl;
248 return std::make_shared<HashPartitionSink<KeyClass, ValueClass>>(this->
numPartitions, this->
numNodes, consumeMe, projection);
268 #endif //PDB_PARTITIONCOMPBASE_H
void extractLambdas(std::map< std::string, GenericLambdaObjectPtr > &returnVal) override
std::shared_ptr< ComputeSource > ComputeSourcePtr
Handle< ScanUserSet< ValueClass > > & getOutputSetScanner()
virtual Lambda< KeyClass > getProjection(Handle< ValueClass > checkMe)=0
void toMap(std::map< std::string, GenericLambdaObjectPtr > &returnVal, int &suffix)
ComputeSinkPtr getComputeSink(TupleSpec &consumeMe, TupleSpec &projection, ComputePlan &plan) override
std::string toTCAPString(std::string inputTupleSetName, std::vector< std::string > &inputColumnNames, std::vector< std::string > &inputColumnsToApply, std::vector< std::string > &childrenLambdaNames, int computationLabel, std::string &outputTupleSetName, std::vector< std::string > &outputColumnNames, std::string &addedOutputColumnName, std::string &myLambdaName) override
bool materializeSelectionOut
std::shared_ptr< ComputeSink > ComputeSinkPtr
void setBatchSize(int batchSize) override
std::string getSetName() override
void setOutputColumnToApply(std::string outputColumnToApply)
std::string getComputationType() override
std::string getOutputType() override
void setTraversed(bool traversed)
Handle< ScanUserSet< ValueClass > > outputSetScanner
void setOutputTupleSetName(std::string outputTupleSetName)
std::string getDatabaseName() override
ComputationTypeID getComputationTypeID() override
std::string toTCAPString(std::string inputTupleSetName, std::vector< std::string > &inputColumnNames, std::vector< std::string > &inputColumnsToApply, std::vector< std::string > &childrenLambdaNames, int &lambdaLabel, std::string computationName, int computationLabel, std::string &outputTupleSetName, std::vector< std::string > &outputColumnNames, std::string &addedOutputColumnName, std::string &myLambdaName, bool whetherToRemoveUnusedOutputColumns, MultiInputsBase *multiInputsComp=nullptr, bool amIPartOfJoinPredicate=false)
void setOutput(std::string dbName, std::string setName) override
ComputeSourcePtr getComputeSource(TupleSpec &outputScheme, ComputePlan &plan) override
String outputTupleSetName