19 #ifndef C_PLUS_PLUS_LAM_CC
20 #define C_PLUS_PLUS_LAM_CC
26 #define CAST(TYPENAME, WHICH) ((*(((std::vector<Handle<TYPENAME>>**)args)[WHICH]))[which])
37 typename std::enable_if<
38 !std::is_base_of<Nothing, ParamOne>::value && std::is_base_of<Nothing, ParamTwo>::value &&
39 std::is_base_of<Nothing, ParamThree>::value && std::is_base_of<Nothing, ParamFour>::value &&
40 std::is_base_of<Nothing, ParamFive>::value,
42 callLambda(F& func, std::vector<ReturnType>& assignToMe,
int which,
void** args) {
43 assignToMe[which] = func(
CAST(ParamOne, 0));
53 typename std::enable_if<
54 !std::is_base_of<Nothing, ParamOne>::value && !std::is_base_of<Nothing, ParamTwo>::value &&
55 std::is_base_of<Nothing, ParamThree>::value && std::is_base_of<Nothing, ParamFour>::value &&
56 std::is_base_of<Nothing, ParamFive>::value,
58 callLambda(F& func, std::vector<ReturnType>& assignToMe,
int which,
void** args) {
59 assignToMe[which] = func(
CAST(ParamOne, 0),
CAST(ParamTwo, 1));
69 typename std::enable_if<
70 !std::is_base_of<Nothing, ParamOne>::value && !std::is_base_of<Nothing, ParamTwo>::value &&
71 !std::is_base_of<Nothing, ParamThree>::value &&
72 std::is_base_of<Nothing, ParamFour>::value && std::is_base_of<Nothing, ParamFive>::value,
74 callLambda(F& func, std::vector<ReturnType>& assignToMe,
int which,
void** args) {
75 assignToMe[which] = func(
CAST(ParamOne, 0),
CAST(ParamTwo, 1),
CAST(ParamThree, 2));
85 typename std::enable_if<
86 !std::is_base_of<Nothing, ParamOne>::value && !std::is_base_of<Nothing, ParamTwo>::value &&
87 !std::is_base_of<Nothing, ParamThree>::value &&
88 !std::is_base_of<Nothing, ParamFour>::value && std::is_base_of<Nothing, ParamFive>::value,
90 callLambda(F& func, std::vector<ReturnType>& assignToMe,
int which,
void** args) {
92 func(
CAST(ParamOne, 0),
CAST(ParamTwo, 1),
CAST(ParamThree, 2),
CAST(ParamFour, 3));
102 typename std::enable_if<
103 !std::is_base_of<Nothing, ParamOne>::value && !std::is_base_of<Nothing, ParamTwo>::value &&
104 !std::is_base_of<Nothing, ParamThree>::value &&
105 !std::is_base_of<Nothing, ParamFour>::value && !std::is_base_of<Nothing, ParamFive>::value,
107 callLambda(F& func, std::vector<ReturnType>& assignToMe,
int which,
void** args) {
108 assignToMe[which] = func(
CAST(ParamOne, 0),
115 template <
typename F,
117 typename ParamOne = Nothing,
118 typename ParamTwo = Nothing,
119 typename ParamThree = Nothing,
120 typename ParamFour = Nothing,
121 typename ParamFive = Nothing>
139 if (getTypeName<ParamOne>() !=
"pdb::Nothing") {
141 this->
setInputIndex(0, -((input1.getExactTypeInfoValue() + 1)));
143 if (getTypeName<ParamTwo>() !=
"pdb::Nothing") {
145 this->
setInputIndex(1, -((input2.getExactTypeInfoValue() + 1)));
147 if (getTypeName<ParamThree>() !=
"pdb::Nothing") {
149 this->
setInputIndex(2, -((input3.getExactTypeInfoValue() + 1)));
151 if (getTypeName<ParamFour>() !=
"pdb::Nothing") {
153 this->
setInputIndex(3, -((input4.getExactTypeInfoValue() + 1)));
155 if (getTypeName<ParamFive>() !=
"pdb::Nothing") {
157 this->
setInputIndex(4, -((input5.getExactTypeInfoValue() + 1)));
167 return std::string(
"native_lambda");
183 TupleSpec& attsToIncludeInOutput)
override {
190 std::make_shared<TupleSetSetupMachine>(inputSchema, attsToIncludeInOutput);
193 std::vector<int> matches = myMachine->match(attsToOperateOn);
196 std::shared_ptr<std::vector<void*>> inputAtts = std::make_shared<std::vector<void*>>();
197 for (
int i = 0; i < matches.size(); i++) {
198 inputAtts->push_back(
nullptr);
202 int outAtt = attsToIncludeInOutput.
getAtts().size();
204 return std::make_shared<SimpleComputeExecutor>(
209 myMachine->setup(input, output);
212 int numAtts = matches.size();
213 void** inAtts = inputAtts->data();
214 for (
int i = 0; i < numAtts; i++) {
215 inAtts[i] = &(input->getColumn<
int>(matches[i]));
219 if (!output->hasColumn(outAtt)) {
220 std::vector<ReturnType>* outputCol =
new std::vector<ReturnType>;
221 output->addColumn(outAtt, outputCol,
true);
225 std::vector<ReturnType>& outColumn = output->getColumn<ReturnType>(outAtt);
228 int numTuples = ((std::vector<Handle<ParamOne>>*)inAtts[0])->size();
229 outColumn.resize(numTuples);
230 for (
int i = 0; i < numTuples; i++) {
231 callLambda<F, ReturnType, ParamOne, ParamTwo, ParamThree, ParamFour, ParamFive>(
232 myFunc, outColumn, i, inAtts);
242 std::string computationName,
243 int computationLabel,
244 std::string& outputTupleSetName,
245 std::vector<std::string>& outputColumns,
246 std::string& outputColumnName,
247 std::string& myLambdaName,
250 std::string tcapString =
"";
251 myLambdaName =
"native_lambda_" + std::to_string(lambdaLabel);
252 std::string myComputationName = computationName +
"_" + std::to_string(computationLabel);
253 if (multiInputsComp ==
nullptr) {
260 std::vector<std::string> inputTupleSetNames;
261 std::map<std::string, std::vector<unsigned int>> inputPartitions;
262 std::vector<std::vector<std::string>> inputColumnNames;
263 std::vector<std::vector<std::string>> inputColumnsToApply;
264 for (
unsigned int i = 0; i <
numInputs; i++) {
268 std::find(inputTupleSetNames.begin(), inputTupleSetNames.end(), curTupleSetName);
269 if (iter == inputTupleSetNames.end()) {
270 inputTupleSetNames.push_back(curTupleSetName);
272 inputColumnsToApply.push_back(
275 inputPartitions[curTupleSetName].push_back(index);
278 for (
auto curTupleSetName : inputTupleSetNames) {
279 std::vector<unsigned int> curVec = inputPartitions[curTupleSetName];
283 std::string curLeftTupleSetName;
284 std::vector<std::string> curLeftColumnsToKeep;
285 std::string curLeftHashColumnName;
286 std::vector<unsigned int> curLeftIndexes;
289 if (inputTupleSetNames.size() > 1) {
290 for (
unsigned int i = 0; i < inputTupleSetNames.size() - 1; i++) {
293 std::string curLeftInputTupleSetName = inputTupleSetNames[0];
294 curLeftIndexes = inputPartitions[curLeftInputTupleSetName];
295 curLeftColumnsToKeep = inputColumnNames[0];
296 std::vector<std::string> curInputColumnsToApply = inputColumnsToApply[0];
297 std::string curPrefix =
"hashOneFor_" + std::to_string(computationLabel) +
"_" +
298 std::to_string(lambdaLabel);
299 if (curLeftIndexes.size() > 1) {
300 curPrefix +=
"Joined";
302 curLeftTupleSetName =
304 for (
unsigned int j = 1; j < curLeftIndexes.size(); j++) {
305 curLeftTupleSetName +=
308 curLeftHashColumnName =
"OneFor_0_" + std::to_string(computationLabel) +
"_" +
309 std::to_string(lambdaLabel);
310 std::vector<std::string> curOutputColumnNames;
311 for (
const auto &j : curLeftColumnsToKeep) {
312 curOutputColumnNames.push_back(j);
314 curOutputColumnNames.push_back(curLeftHashColumnName);
316 curLeftColumnsToKeep,
317 curInputColumnsToApply,
319 curOutputColumnNames,
320 curLeftHashColumnName,
324 std::map<std::string, std::string>());
328 std::string curInputTupleSetName = inputTupleSetNames[i + 1];
329 std::vector<std::string> curInputColumnNames = inputColumnNames[i + 1];
330 std::vector<std::string> curInputColumnsToApply = inputColumnsToApply[i + 1];
331 std::vector<unsigned int> curIndexes = inputPartitions[curInputTupleSetName];
332 std::string curPrefix =
"hashOneFor_" + std::to_string(computationLabel) +
"_" +
333 std::to_string(lambdaLabel);
334 if (curIndexes.size() > 1) {
335 curPrefix +=
"Joined";
337 std::string curOutputTupleSetName =
339 for (
unsigned int j = 1; j < curIndexes.size(); j++) {
340 curOutputTupleSetName +=
343 std::string curOutputColumnName =
"OneFor_" + std::to_string(i + 1) +
"_" +
344 std::to_string(computationLabel) +
"_" + std::to_string(lambdaLabel);
345 std::vector<std::string> curOutputColumnNames;
346 for (
const auto &curInputColumnName : curInputColumnNames) {
347 curOutputColumnNames.push_back(curInputColumnName);
349 curOutputColumnNames.push_back(curOutputColumnName);
352 curInputColumnsToApply,
353 curOutputTupleSetName,
354 curOutputColumnNames,
359 std::map<std::string, std::string>());
363 tcapString +=
"\n/* CartesianJoin ( " +
365 std::string outputTupleSetName =
367 for (
unsigned int j = 1; j < curLeftIndexes.size(); j++) {
368 outputTupleSetName +=
374 for (
unsigned int j = 1; j < curIndexes.size(); j++) {
378 outputTupleSetName +=
"_";
379 tcapString +=
" ) */\n";
381 tcapString += outputTupleSetName +
"(" + curLeftColumnsToKeep[0];
382 for (
unsigned int j = 1; j < curLeftColumnsToKeep.size(); j++) {
383 tcapString +=
", " + curLeftColumnsToKeep[j];
385 for (
unsigned int j = 0; j < curInputColumnNames.size(); j++) {
386 tcapString +=
", " + curInputColumnNames[j];
388 if (i + 1 < inputTupleSetNames.size() - 1) {
389 tcapString +=
", " + curOutputColumnName;
392 ") <= JOIN (" + curLeftTupleSetName +
"(" + curLeftHashColumnName +
"), ";
393 tcapString += curLeftTupleSetName +
"(" + curLeftColumnsToKeep[0];
394 for (
unsigned int j = 1; j < curLeftColumnsToKeep.size(); j++) {
395 tcapString +=
", " + curLeftColumnsToKeep[j];
398 tcapString += curOutputTupleSetName +
"(" + curOutputColumnName +
"), ";
399 tcapString += curOutputTupleSetName +
"(" + curInputColumnNames[0];
400 for (
unsigned int j = 1; j < curInputColumnNames.size(); j++) {
401 tcapString +=
", " + curInputColumnNames[j];
403 if (i + 1 < inputTupleSetNames.size() - 1) {
404 tcapString +=
", " + curOutputColumnName;
406 tcapString +=
"), '" + myComputationName +
"')\n";
410 curLeftTupleSetName = outputTupleSetName;
411 for (
unsigned int j = 0; j < curInputColumnNames.size(); j++) {
412 curLeftColumnsToKeep.push_back(curInputColumnNames[j]);
414 for (
unsigned int j = 0; j < curIndexes.size(); j++) {
415 curLeftIndexes.push_back(curIndexes[j]);
417 curLeftHashColumnName = curOutputColumnName;
421 curLeftTupleSetName = inputTupleSetNames[0];
422 curLeftColumnsToKeep = inputColumnNames[0];
426 std::vector<std::string> curInputColumnsToApply;
431 std::string curOutputTupleSetName =
"nativOutFor_" + myLambdaName +
"_" + myComputationName;
432 std::string curOutputColumnName =
433 "nativOut_" + std::to_string(lambdaLabel) +
"_" + std::to_string(computationLabel);
434 std::vector<std::string> curOutputColumnNames;
435 for (
const auto &i : curLeftColumnsToKeep) {
436 curOutputColumnNames.push_back(i);
438 curOutputColumnNames.push_back(curOutputColumnName);
441 curLeftColumnsToKeep,
442 curInputColumnsToApply,
443 curOutputTupleSetName,
444 curOutputColumnNames,
452 outputColumns.clear();
453 outputTupleSetName =
"filtedOutFor_" + myLambdaName +
"_" + myComputationName;
454 tcapString += outputTupleSetName +
"(" + curLeftColumnsToKeep[0];
455 outputColumns.push_back(curLeftColumnsToKeep[0]);
456 for (
unsigned int i = 1; i < curLeftColumnsToKeep.size(); i++) {
457 tcapString +=
", " + curLeftColumnsToKeep[i];
458 outputColumns.push_back(curLeftColumnsToKeep[0]);
460 tcapString +=
") <= FILTER (" + curOutputTupleSetName +
"(" + curOutputColumnName +
"), ";
461 tcapString += curOutputTupleSetName +
"(" + curLeftColumnsToKeep[0];
462 for (
unsigned int i = 1; i < curLeftColumnsToKeep.size(); i++) {
463 tcapString +=
", " + curLeftColumnsToKeep[i];
465 tcapString +=
"), '" + myComputationName +
"')\n";
468 for (
unsigned int i = 0; i < multiInputsComp->
getNumInputs(); i++) {
471 std::find(curLeftColumnsToKeep.begin(), curLeftColumnsToKeep.end(), curInput);
472 if (iter != curLeftColumnsToKeep.end()) {
485 std::map<std::string, std::string>
getInfo()
override {
488 return std::map<std::string, std::string>{
int getNumChildren() override
#define CAST(TYPENAME, WHICH)
std::enable_if< !std::is_base_of< Nothing, ParamOne >::value &&std::is_base_of< Nothing, ParamTwo >::value &&std::is_base_of< Nothing, ParamThree >::value &&std::is_base_of< Nothing, ParamFour >::value &&std::is_base_of< Nothing, ParamFive >::value, void >::type callLambda(F &func, std::vector< ReturnType > &assignToMe, int which, void **args)
std::vector< std::string > & getAtts()
GenericLambdaObjectPtr getChild(int which) override
std::string getTCAPString(const std::string &inputTupleSetName, const std::vector< std::string > &inputColumnNames, const std::vector< std::string > &inputColumnsToApply, const std::string &outputTupleSetName, const std::vector< std::string > &outputColumns, const std::string &outputColumnName, const std::string &tcapOperation, const std::string &computationNameAndLabel, const std::string &lambdaNameAndLabel, const std::map< std::string, std::string > &info)
std::shared_ptr< TupleSetSetupMachine > TupleSetSetupMachinePtr
std::shared_ptr< GenericLambdaObject > GenericLambdaObjectPtr
CPlusPlusLambda(F arg, Handle< ParamOne > &input1, Handle< ParamTwo > &input2, Handle< ParamThree > &input3, Handle< ParamFour > &input4, Handle< ParamFive > &input5)
std::shared_ptr< TupleSet > TupleSetPtr
ComputeExecutorPtr getExecutor(TupleSpec &inputSchema, TupleSpec &attsToOperateOn, TupleSpec &attsToIncludeInOutput) override
std::string toTCAPStringForCartesianJoin(int lambdaLabel, std::string computationName, int computationLabel, std::string &outputTupleSetName, std::vector< std::string > &outputColumns, std::string &outputColumnName, std::string &myLambdaName, MultiInputsBase *multiInputsComp) override
virtual unsigned int getInputIndex(int i)
std::shared_ptr< ComputeExecutor > ComputeExecutorPtr
unsigned int getNumInputs() override
void setInputIndex(int i, unsigned int index)
std::map< std::string, std::string > getInfo() override
std::string getTypeOfLambda() override