A platform for high-performance distributed tool and library development written in C++. It can be deployed in two different cluster modes: standalone or distributed. API for v0.5.0, released on June 13, 2018.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
AndLambda.h
Go to the documentation of this file.
1 /*****************************************************************************
2  * *
3  * Copyright 2018 Rice University *
4  * *
5  * Licensed under the Apache License, Version 2.0 (the "License"); *
6  * you may not use this file except in compliance with the License. *
7  * You may obtain a copy of the License at *
8  * *
9  * http://www.apache.org/licenses/LICENSE-2.0 *
10  * *
11  * Unless required by applicable law or agreed to in writing, software *
12  * distributed under the License is distributed on an "AS IS" BASIS, *
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. *
14  * See the License for the specific language governing permissions and *
15  * limitations under the License. *
16  * *
17  *****************************************************************************/
18 
19 #ifndef AND_LAM_H
20 #define AND_LAM_H
21 
22 #include <vector>
23 #include "Lambda.h"
24 #include "ComputeExecutor.h"
25 #include "TupleSetMachine.h"
26 #include "TupleSet.h"
27 #include "Ptr.h"
28 
29 namespace pdb {
30 
31 // only one of these four versions is going to work... used to automatically dereference a Ptr<blah>
32 // type on either the LHS or RHS of an "and" check
33 template <class LHS, class RHS>
34 std::enable_if_t<std::is_base_of<PtrBase, LHS>::value && std::is_base_of<PtrBase, RHS>::value, bool>
35 checkAnd(LHS lhs, RHS rhs) {
36  return *lhs && *rhs;
37 }
38 
39 template <class LHS, class RHS>
40 std::enable_if_t<std::is_base_of<PtrBase, LHS>::value && !(std::is_base_of<PtrBase, RHS>::value),
41  bool>
42 checkAnd(LHS lhs, RHS rhs) {
43  return *lhs && rhs;
44 }
45 
46 template <class LHS, class RHS>
47 std::enable_if_t<!(std::is_base_of<PtrBase, LHS>::value) && std::is_base_of<PtrBase, RHS>::value,
48  bool>
49 checkAnd(LHS lhs, RHS rhs) {
50  return lhs && *rhs;
51 }
52 
53 template <class LHS, class RHS>
54 std::enable_if_t<!(std::is_base_of<PtrBase, LHS>::value) && !(std::is_base_of<PtrBase, RHS>::value),
55  bool>
56 checkAnd(LHS lhs, RHS rhs) {
57  return lhs && rhs;
58 }
59 
60 template <class LeftType, class RightType>
61 class AndLambda : public TypedLambdaObject<bool> {
62 
63 public:
66 
67 public:
69  lhs = lhsIn;
70  rhs = rhsIn;
71  PDB_COUT << "ANDLambda: LHS index is " << lhs.getInputIndex(0) << std::endl;
72  PDB_COUT << "ANDLambda: RHS index is " << rhs.getInputIndex(0) << std::endl;
73  PDB_COUT << "ANDLambda: LHS type is " << getTypeName<LeftType>() << std::endl;
74  PDB_COUT << "ANDLambda: RHS type is " << getTypeName<RightType>() << std::endl;
75  this->setInputIndex(0, lhs.getInputIndex(0));
76  this->setInputIndex(1, rhs.getInputIndex(0));
77  }
78 
79  std::string getTypeOfLambda() override {
80  return std::string("&&");
81  }
82 
83  unsigned int getNumInputs() override {
84  return 2;
85  }
86 
87  int getNumChildren() override {
88  return 2;
89  }
90 
91  GenericLambdaObjectPtr getChild(int which) override {
92  if (which == 0)
93  return lhs.getPtr();
94  if (which == 1)
95  return rhs.getPtr();
96  return nullptr;
97  }
98 
99 
101  TupleSpec& attsToOperateOn,
102  TupleSpec& attsToIncludeInOutput) override {
103 
104  // create the output tuple set
105  TupleSetPtr output = std::make_shared<TupleSet>();
106 
107  // create the machine that is going to setup the output tuple set, using the input tuple set
108  TupleSetSetupMachinePtr myMachine =
109  std::make_shared<TupleSetSetupMachine>(inputSchema, attsToIncludeInOutput);
110 
111  // these are the input attributes that we will process
112  std::vector<int> inputAtts = myMachine->match(attsToOperateOn);
113  int firstAtt = inputAtts[0];
114  int secondAtt = inputAtts[1];
115 
116  // this is the output attribute
117  auto outAtt = (int) attsToIncludeInOutput.getAtts().size();
118 
119  return std::make_shared<SimpleComputeExecutor>(
120  output,
121  [=](TupleSetPtr input) {
122 
123  // set up the output tuple set
124  myMachine->setup(input, output);
125 
126  // get the columns to operate on
127  std::vector<LeftType>& leftColumn = input->getColumn<LeftType>(firstAtt);
128  std::vector<RightType>& rightColumn = input->getColumn<RightType>(secondAtt);
129 
130  // create the output attribute, if needed
131  if (!output->hasColumn(outAtt)) {
132  auto outColumn = new std::vector<bool>;
133  output->addColumn(outAtt, outColumn, true);
134  }
135 
136  // get the output column
137  std::vector<bool>& outColumn = output->getColumn<bool>(outAtt);
138 
139  // loop down the columns, setting the output
140  auto numTuples = leftColumn.size();
141  outColumn.resize(numTuples);
142  for (int i = 0; i < numTuples; i++) {
143  outColumn[i] = checkAnd(leftColumn[i], rightColumn[i]);
144  }
145  return output;
146  },
147 
148  "andLambda");
149  }
150 
151 
152  std::string toTCAPString(std::vector<std::string>& inputTupleSetNames,
153  std::vector<std::string>& inputColumnNames,
154  std::vector<std::string>& inputColumnsToApply,
155  std::vector<std::string>& childrenLambdaNames,
156  int lambdaLabel,
157  std::string computationName,
158  int computationLabel,
159  std::string& outputTupleSetName,
160  std::vector<std::string>& outputColumns,
161  std::string& outputColumnName,
162  std::string& myLambdaName,
163  MultiInputsBase* multiInputsComp = nullptr,
164  bool amIPartOfJoinPredicate = false,
165  bool amILeftChildOfEqualLambda = false,
166  bool amIRightChildOfEqualLambda = false,
167  std::string parentLambdaName = "",
168  bool isSelfJoin = false) override {
169 
170  if ((multiInputsComp != nullptr) && amIPartOfJoinPredicate) {
171  std::string tcapString;
172  std::string myComputationName = computationName + "_" + std::to_string(computationLabel);
173  // Step 1. get list of input names in LHS
174  unsigned int leftIndex = lhs.getInputIndex(0);
175  std::vector<std::string> lhsColumnNames =
176  multiInputsComp->getInputColumnsForIthInput(leftIndex);
177  std::vector<std::string> lhsInputNames;
178  for (const auto &curColumnName : lhsColumnNames) {
179  for (int j = 0; j < multiInputsComp->getNumInputs(); j++) {
180  if (multiInputsComp->getNameForIthInput(j) == curColumnName) {
181  lhsInputNames.push_back(curColumnName);
182  break;
183  }
184  }
185  }
186 
187  // Step 2. get list of input names in RHS
188  unsigned int rightIndex = rhs.getInputIndex(0);
189  std::vector<std::string> rhsColumnNames =
190  multiInputsComp->getInputColumnsForIthInput(rightIndex);
191  std::vector<std::string> rhsInputNames;
192  for (const auto &curColumnName : rhsColumnNames) {
193  for (int j = 0; j < multiInputsComp->getNumInputs(); j++) {
194  if (multiInputsComp->getNameForIthInput(j) == curColumnName) {
195  rhsInputNames.push_back(curColumnName);
196  break;
197  }
198  }
199  }
200 
201  // Step 3. if two lists are disjoint do a cartesian join, otherwise return ""
202  std::vector<std::string> inputNamesIntersection;
203 
204  for (const auto &lhsInputName : lhsInputNames) {
205  for (const auto &rhsInputName : rhsInputNames) {
206  if (lhsInputName == rhsInputName) {
207  inputNamesIntersection.push_back(lhsInputName);
208  }
209  }
210  }
211 
212  if (!inputNamesIntersection.empty()) {
213  return "";
214  } else {
215  // we need a cartesian join
216  // hashone for lhs
217  std::string leftTupleSetName =
218  multiInputsComp->getTupleSetNameForIthInput(leftIndex);
219  std::string leftColumnToApply = lhsInputNames[0];
220  std::vector<std::string> leftColumnsToApply;
221  leftColumnsToApply.push_back(leftColumnToApply);
222  std::string leftOutputTupleSetName = "hashOneFor_" + leftColumnToApply + "_" +
223  std::to_string(computationLabel) + "_" + std::to_string(lambdaLabel);
224  std::string leftOutputColumnName = "OneFor_left_" +
225  std::to_string(computationLabel) + "_" + std::to_string(lambdaLabel);
226  std::vector<std::string> leftOutputColumns;
227  for (const auto &lhsColumnName : lhsColumnNames) {
228  leftOutputColumns.push_back(lhsColumnName);
229  }
230  leftOutputColumns.push_back(leftOutputColumnName);
231  tcapString += this->getTCAPString(leftTupleSetName,
232  lhsColumnNames,
233  leftColumnsToApply,
234  leftOutputTupleSetName,
235  leftOutputColumns,
236  leftOutputColumnName,
237  "HASHONE",
238  myComputationName,
239  "",
240  std::map<std::string, std::string>());
241 
242  // hashone for rhs
243  std::string rightTupleSetName = multiInputsComp->getTupleSetNameForIthInput(rightIndex);
244  std::string rightColumnToApply = rhsInputNames[0];
245  std::vector<std::string> rightColumnsToApply;
246  rightColumnsToApply.push_back(rightColumnToApply);
247  std::string rightOutputTupleSetName = "hashOneFor_" + rightColumnToApply + "_" + std::to_string(computationLabel) + "_" + std::to_string(lambdaLabel);
248  std::string rightOutputColumnName = "OneFor_right_" + std::to_string(computationLabel) + "_" + std::to_string(lambdaLabel);
249  std::vector<std::string> rightOutputColumns;
250  for (const auto &rhsColumnName : rhsColumnNames) {
251  rightOutputColumns.push_back(rhsColumnName);
252  }
253  rightOutputColumns.push_back(rightOutputColumnName);
254  tcapString += this->getTCAPString(rightTupleSetName,
255  rhsColumnNames,
256  rightColumnsToApply,
257  rightOutputTupleSetName,
258  rightOutputColumns,
259  rightOutputColumnName,
260  "HASHONE",
261  myComputationName,
262  "",
263  std::map<std::string, std::string>());
264 
265  // cartesian join lhs and rhs
266  tcapString += "\n/* CartesianJoin ( " + lhsInputNames[0];
267 
268  outputTupleSetName = "CartesianJoined__" + lhsInputNames[0];
269  for (unsigned int i = 1; i < lhsInputNames.size(); i++) {
270  outputTupleSetName += "_" + lhsInputNames[i];
271  tcapString += " " + lhsInputNames[i];
272  }
273  outputTupleSetName += "___" + rhsInputNames[0];
274  tcapString += " ) and ( " + rhsInputNames[0];
275  for (unsigned int i = 1; i < rhsInputNames.size(); i++) {
276  outputTupleSetName += "_" + rhsInputNames[i];
277  tcapString += " " + rhsInputNames[i];
278  }
279  outputTupleSetName += "_";
280  tcapString += " ) */\n";
281 
282  // TODO: push down projection here
283  outputColumns.clear();
284  tcapString += outputTupleSetName + "(" + lhsColumnNames[0];
285  outputColumns.push_back(lhsColumnNames[0]);
286  for (unsigned int i = 1; i < lhsColumnNames.size(); i++) {
287  tcapString += ", " + lhsColumnNames[i];
288  outputColumns.push_back(lhsColumnNames[i]);
289  }
290  tcapString += ", " + rhsColumnNames[0];
291  outputColumns.push_back(rhsColumnNames[0]);
292  for (unsigned int i = 1; i < rhsColumnNames.size(); i++) {
293  tcapString += ", " + rhsColumnNames[i];
294  outputColumns.push_back(rhsColumnNames[i]);
295  }
296  tcapString += ") <= JOIN (" + leftOutputTupleSetName + "(" + leftOutputColumnName +
297  "), " + leftOutputTupleSetName + "(" + lhsColumnNames[0];
298  for (unsigned int i = 1; i < lhsColumnNames.size(); i++) {
299  tcapString += ", " + lhsColumnNames[i];
300  }
301  tcapString += "), " + rightOutputTupleSetName + "(" + rightOutputColumnName +
302  "), " + rightOutputTupleSetName + "(" + rhsColumnNames[0];
303  for (unsigned int i = 1; i < rhsColumnNames.size(); i++) {
304  tcapString += ", " + rhsColumnNames[i];
305  }
306  tcapString += "), '" + myComputationName + "')\n";
307 
308  // update multiInputsComp
309  for (unsigned int i = 0; i < multiInputsComp->getNumInputs(); i++) {
310  std::string curInput = multiInputsComp->getNameForIthInput(i);
311  auto iter = std::find(outputColumns.begin(), outputColumns.end(), curInput);
312  if (iter != outputColumns.end()) {
313  multiInputsComp->setTupleSetNameForIthInput(i, outputTupleSetName);
314  multiInputsComp->setInputColumnsForIthInput(i, outputColumns);
315  multiInputsComp->setInputColumnsToApplyForIthInput(i, outputColumns);
316  }
317  }
318  return tcapString;
319  }
320 
321  } else {
322  return "";
323  }
324  }
325 
330  std::map<std::string, std::string> getInfo() override {
331 
332  // fill in the info
333  return std::map<std::string, std::string>{
334  std::make_pair ("lambdaType", getTypeOfLambda())
335  };
336  };
337 
338 };
339 }
340 
341 #endif
std::map< std::string, std::string > getInfo() override
Definition: AndLambda.h:330
ComputeExecutorPtr getExecutor(TupleSpec &inputSchema, TupleSpec &attsToOperateOn, TupleSpec &attsToIncludeInOutput) override
Definition: AndLambda.h:100
int getNumChildren() override
Definition: AndLambda.h:87
std::vector< std::string > & getAtts()
Definition: TupleSpec.h:60
std::string getTCAPString(const std::string &inputTupleSetName, const std::vector< std::string > &inputColumnNames, const std::vector< std::string > &inputColumnsToApply, const std::string &outputTupleSetName, const std::vector< std::string > &outputColumns, const std::string &outputColumnName, const std::string &tcapOperation, const std::string &computationNameAndLabel, const std::string &lambdaNameAndLabel, const std::map< std::string, std::string > &info)
std::shared_ptr< TupleSetSetupMachine > TupleSetSetupMachinePtr
unsigned int getNumInputs() override
Definition: AndLambda.h:83
std::shared_ptr< GenericLambdaObject > GenericLambdaObjectPtr
AndLambda(LambdaTree< LeftType > lhsIn, LambdaTree< RightType > rhsIn)
Definition: AndLambda.h:68
std::shared_ptr< TupleSet > TupleSetPtr
Definition: TupleSet.h:64
#define PDB_COUT
Definition: PDBDebug.h:31
GenericLambdaObjectPtr getChild(int which) override
Definition: AndLambda.h:91
std::shared_ptr< ComputeExecutor > ComputeExecutorPtr
LambdaTree< LeftType > lhs
Definition: AndLambda.h:64
std::enable_if_t< std::is_base_of< PtrBase, LHS >::value &&std::is_base_of< PtrBase, RHS >::value, bool > checkAnd(LHS lhs, RHS rhs)
Definition: AndLambda.h:35
std::string toTCAPString(std::vector< std::string > &inputTupleSetNames, std::vector< std::string > &inputColumnNames, std::vector< std::string > &inputColumnsToApply, std::vector< std::string > &childrenLambdaNames, int lambdaLabel, std::string computationName, int computationLabel, std::string &outputTupleSetName, std::vector< std::string > &outputColumns, std::string &outputColumnName, std::string &myLambdaName, MultiInputsBase *multiInputsComp=nullptr, bool amIPartOfJoinPredicate=false, bool amILeftChildOfEqualLambda=false, bool amIRightChildOfEqualLambda=false, std::string parentLambdaName="", bool isSelfJoin=false) override
Definition: AndLambda.h:152
LambdaTree< RightType > rhs
Definition: AndLambda.h:65
void setInputIndex(int i, unsigned int index)
std::string getTypeOfLambda() override
Definition: AndLambda.h:79