A platform for high-performance distributed tool and library development written in C++. It can be deployed in two different cluster modes: standalone or distributed. API for v0.5.0, released on June 13, 2018.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
AttAccessLambda.h
Go to the documentation of this file.
1 /*****************************************************************************
2  * *
3  * Copyright 2018 Rice University *
4  * *
5  * Licensed under the Apache License, Version 2.0 (the "License"); *
6  * you may not use this file except in compliance with the License. *
7  * You may obtain a copy of the License at *
8  * *
9  * http://www.apache.org/licenses/LICENSE-2.0 *
10  * *
11  * Unless required by applicable law or agreed to in writing, software *
12  * distributed under the License is distributed on an "AS IS" BASIS, *
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. *
14  * See the License for the specific language governing permissions and *
15  * limitations under the License. *
16  * *
17  *****************************************************************************/
18 
19 #ifndef ATT_ACCESS_LAM_H
20 #define ATT_ACCESS_LAM_H
21 
22 #include "Handle.h"
23 #include <string>
24 #include "Ptr.h"
25 #include "TupleSet.h"
26 #include <vector>
27 #include "mustache.h"
28 #include "SimpleComputeExecutor.h"
29 #include "TupleSetMachine.h"
30 
31 namespace pdb {
32 
33 template<class Out, class ClassType>
34 class AttAccessLambda : public TypedLambdaObject<pdb::Ptr<Out>> {
35  public:
37  std::string inputTypeName;
38  std::string attName;
39  std::string attTypeName;
40 
41  // create an att access lambda; offset is the position in the input object where we are going to
42  // find the input att
43  AttAccessLambda(std::string inputTypeNameIn,
44  std::string attNameIn,
45  std::string attType,
46  Handle<ClassType> &input,
47  size_t offset)
48  : offsetOfAttToProcess(offset),
49  inputTypeName(inputTypeNameIn),
50  attName(attNameIn),
51  attTypeName(attType) {
52 
53  this->setInputIndex(0, -(input.getExactTypeInfoValue() + 1));
54  }
55 
56  std::string getTypeOfLambda() override {
57  return std::string("attAccess");
58  }
59 
60  std::string typeOfAtt() {
61  return attTypeName;
62  }
63 
64  std::string whichAttWeProcess() {
65  return attName;
66  }
67 
68  unsigned int getNumInputs() override {
69  return 1;
70  }
71 
72  std::string getInputType() {
73  return inputTypeName;
74  }
75 
76  std::string toTCAPString(std::vector<std::string> &inputTupleSetNames,
77  std::vector<std::string> &inputColumnNames,
78  std::vector<std::string> &inputColumnsToApply,
79  std::vector<std::string> &childrenLambdaNames,
80  int lambdaLabel,
81  std::string computationName,
82  int computationLabel,
83  std::string &outputTupleSetName,
84  std::vector<std::string> &outputColumns,
85  std::string &outputColumnName,
86  std::string &lambdaName,
87  MultiInputsBase *multiInputsComp = nullptr,
88  bool amIPartOfJoinPredicate = false,
89  bool amILeftChildOfEqualLambda = false,
90  bool amIRightChildOfEqualLambda = false,
91  std::string parentLambdaName = "",
92  bool isSelfJoin = false) override {
93 
94  // create the data for the lambda
95  mustache::data lambdaData;
96  lambdaData.set("computationName", computationName);
97  lambdaData.set("computationLabel", std::to_string(computationLabel));
98  lambdaData.set("typeOfLambda", getTypeOfLambda());
99  lambdaData.set("lambdaLabel", std::to_string(lambdaLabel));
100 
101  // create the computation name with label
102  mustache::mustache computationNameWithLabelTemplate{"{{computationName}}_{{computationLabel}}"};
103  std::string computationNameWithLabel = computationNameWithLabelTemplate.render(lambdaData);
104 
105  // create the lambda name
106  mustache::mustache lambdaNameTemplate{"{{typeOfLambda}}_{{lambdaLabel}}"};
107  lambdaName = lambdaNameTemplate.render(lambdaData);
108 
109  // things we need to figure out in the next step
110  int index;
111  std::string inputTupleSetName;
112  std::string tupleSetMidTag;
113  std::string originalInputColumnToApply;
114 
115  if (multiInputsComp == nullptr) {
116  tupleSetMidTag = "OutFor";
117  inputTupleSetName = inputTupleSetNames[0];
118  } else {
119  tupleSetMidTag = "ExtractedFor";
120  index = this->getInputIndex(0);
121  PDB_COUT << lambdaName << ": myIndex=" << index << std::endl;
122  inputTupleSetName = multiInputsComp->getTupleSetNameForIthInput(index);
123  PDB_COUT << "inputTupleSetName=" << inputTupleSetName << std::endl;
124  inputColumnNames = multiInputsComp->getInputColumnsForIthInput(index);
125  inputColumnsToApply.clear();
126  inputColumnsToApply.push_back(multiInputsComp->getNameForIthInput(index));
127  originalInputColumnToApply = multiInputsComp->getNameForIthInput(index);
128  PDB_COUT << "originalInputColumnToApply=" << originalInputColumnToApply << std::endl;
129  }
130 
131  // set the lambda data
132  lambdaData.set("tupleSetMidTag", tupleSetMidTag);
133  lambdaData.set("attName", attName);
134 
135  // create the output tuple set name
136  mustache::mustache outputTupleSetNameTemplate{"attAccess_{{lambdaLabel}}{{tupleSetMidTag}}{{computationName}}{{computationLabel}}"};
137  outputTupleSetName = outputTupleSetNameTemplate.render(lambdaData);
138 
139  // create the output column name
140  mustache::mustache outputColumnNameTemplate{"att_{{lambdaLabel}}{{tupleSetMidTag}}_{{attName}}"};
141  outputColumnName = outputColumnNameTemplate.render(lambdaData);
142 
143  // initialize the output columns
144  outputColumns.clear();
145  for (const auto &inputColumnName : inputColumnNames) {
146  outputColumns.push_back(inputColumnName);
147  }
148  outputColumns.push_back(outputColumnName);
149 
150  // generate the TCAP string for the lambda
151  std::string tcapString;
152 
153  // the additional info about this attribute access lambda
154  std::map<std::string, std::string> info;
155 
156  tcapString += this->getTCAPString(inputTupleSetName,
157  inputColumnNames,
158  inputColumnsToApply,
159  outputTupleSetName,
160  outputColumns,
161  outputColumnName,
162  "APPLY",
163  computationNameWithLabel,
164  lambdaName,
165  getInfo());
166 
167  if (multiInputsComp != nullptr) {
168  if (amILeftChildOfEqualLambda || amIRightChildOfEqualLambda) {
169  inputTupleSetName = outputTupleSetName;
170  inputColumnNames.clear();
171  for (const auto &outputColumn : outputColumns) {
172  // we want to remove the extracted value column from here
173  if (outputColumn != outputColumnName) {
174  inputColumnNames.push_back(outputColumn);
175  }
176  }
177  inputColumnsToApply.clear();
178  inputColumnsToApply.push_back(outputColumnName);
179 
180  std::string hashOperator = amILeftChildOfEqualLambda ? "HASHLEFT" : "HASHRIGHT";
181  outputTupleSetName = outputTupleSetName.append("_hashed");
182  outputColumnName = outputColumnName.append("_hash");
183  outputColumns.clear();
184 
185  std::copy(inputColumnNames.begin(), inputColumnNames.end(), std::back_inserter(outputColumns));
186  outputColumns.push_back(outputColumnName);
187 
188  tcapString += this->getTCAPString(inputTupleSetName,
189  inputColumnNames,
190  inputColumnsToApply,
191  outputTupleSetName,
192  outputColumns,
193  outputColumnName,
194  hashOperator,
195  computationNameWithLabel,
196  parentLambdaName,
197  std::map<std::string, std::string>());
198  }
199 
200  if (!isSelfJoin) {
201  for (unsigned int i = 0; i < multiInputsComp->getNumInputs(); i++) {
202  std::string curInput = multiInputsComp->getNameForIthInput(i);
203  auto iter = std::find(outputColumns.begin(), outputColumns.end(), curInput);
204  if (iter != outputColumns.end()) {
205  PDB_COUT << "MultiInputBase for index=" << i << " is updated" << std::endl;
206  multiInputsComp->setTupleSetNameForIthInput(i, outputTupleSetName);
207  multiInputsComp->setInputColumnsForIthInput(i, outputColumns);
208  multiInputsComp->setInputColumnsToApplyForIthInput(i, outputColumnName);
209  }
210  if (originalInputColumnToApply == curInput) {
211  PDB_COUT << "MultiInputBase for index=" << i << " is updated" << std::endl;
212  multiInputsComp->setTupleSetNameForIthInput(i, outputTupleSetName);
213  multiInputsComp->setInputColumnsForIthInput(i, outputColumns);
214  multiInputsComp->setInputColumnsToApplyForIthInput(i, outputColumnName);
215  }
216  }
217  } else {
218  // only update myIndex
219  multiInputsComp->setTupleSetNameForIthInput(index, outputTupleSetName);
220  multiInputsComp->setInputColumnsForIthInput(index, outputColumns);
221  multiInputsComp->setInputColumnsToApplyForIthInput(index, outputColumnName);
222  }
223  }
224  return tcapString;
225  }
226 
232  std::map<std::string, std::string> getInfo() override {
233 
234  // fill in the info
235  return std::map<std::string, std::string>{
236 
237  std::make_pair ("lambdaType", getTypeOfLambda()),
238  std::make_pair ("inputTypeName", inputTypeName),
239  std::make_pair ("attName", attName),
240  std::make_pair ("attTypeName", attTypeName)
241  };
242  };
243 
244  int getNumChildren() override {
245  return 0;
246  }
247 
248  GenericLambdaObjectPtr getChild(int which) override {
249  return nullptr;
250  }
251 
253  TupleSpec &attsToOperateOn,
254  TupleSpec &attsToIncludeInOutput) override {
255 
256  // create the output tuple set
257  TupleSetPtr output = std::make_shared<TupleSet>();
258 
259  // create the machine that is going to setup the output tuple set, using the input tuple set
260  TupleSetSetupMachinePtr myMachine =
261  std::make_shared<TupleSetSetupMachine>(inputSchema, attsToIncludeInOutput);
262 
263  // this is the input attribute that we will process
264  std::vector<int> matches = myMachine->match(attsToOperateOn);
265  int whichAtt = matches[0];
266 
267  // this is the output attribute
268  int outAtt = attsToIncludeInOutput.getAtts().size();
269 
270  return std::make_shared<SimpleComputeExecutor>(
271  output,
272  [=](TupleSetPtr input) {
273 
274  // set up the output tuple set
275  myMachine->setup(input, output);
276 
277  // get the columns to operate on
278  std::vector<Handle<ClassType>> &inputColumn =
279  input->getColumn<Handle<ClassType>>(whichAtt);
280 
281  // setup the output column, if it is not already set up
282  if (!output->hasColumn(outAtt)) {
283  std::vector<Ptr<Out>> *outputCol = new std::vector<Ptr<Out>>;
284  output->addColumn(outAtt, outputCol, true);
285  }
286 
287  // get the output column
288  std::vector<Ptr<Out>> &outColumn = output->getColumn<Ptr<Out>>(outAtt);
289 
290  // loop down the columns, setting the output
291  int numTuples = inputColumn.size();
292  outColumn.resize(numTuples);
293  for (int i = 0; i < numTuples; i++) {
294  outColumn[i] = (Out *) ((char *) &(*(inputColumn[i])) + offsetOfAttToProcess);
295  }
296 
297  return output;
298  },
299  "attAccessLambda");
300  }
301 };
302 }
303 
304 #endif
std::string getInputType()
std::vector< std::string > & getAtts()
Definition: TupleSpec.h:60
GenericLambdaObjectPtr getChild(int which) override
std::string getTCAPString(const std::string &inputTupleSetName, const std::vector< std::string > &inputColumnNames, const std::vector< std::string > &inputColumnsToApply, const std::string &outputTupleSetName, const std::vector< std::string > &outputColumns, const std::string &outputColumnName, const std::string &tcapOperation, const std::string &computationNameAndLabel, const std::string &lambdaNameAndLabel, const std::map< std::string, std::string > &info)
std::shared_ptr< TupleSetSetupMachine > TupleSetSetupMachinePtr
std::string inputTypeName
AttAccessLambda(std::string inputTypeNameIn, std::string attNameIn, std::string attType, Handle< ClassType > &input, size_t offset)
std::shared_ptr< GenericLambdaObject > GenericLambdaObjectPtr
ComputeExecutorPtr getExecutor(TupleSpec &inputSchema, TupleSpec &attsToOperateOn, TupleSpec &attsToIncludeInOutput) override
std::string getTypeOfLambda() override
std::shared_ptr< TupleSet > TupleSetPtr
Definition: TupleSet.h:64
#define PDB_COUT
Definition: PDBDebug.h:31
virtual unsigned int getInputIndex(int i)
std::shared_ptr< ComputeExecutor > ComputeExecutorPtr
unsigned int getNumInputs() override
std::string whichAttWeProcess()
std::string typeOfAtt()
void setInputIndex(int i, unsigned int index)
std::map< std::string, std::string > getInfo() override
Definition: Ptr.h:32
std::string toTCAPString(std::vector< std::string > &inputTupleSetNames, std::vector< std::string > &inputColumnNames, std::vector< std::string > &inputColumnsToApply, std::vector< std::string > &childrenLambdaNames, int lambdaLabel, std::string computationName, int computationLabel, std::string &outputTupleSetName, std::vector< std::string > &outputColumns, std::string &outputColumnName, std::string &lambdaName, MultiInputsBase *multiInputsComp=nullptr, bool amIPartOfJoinPredicate=false, bool amILeftChildOfEqualLambda=false, bool amIRightChildOfEqualLambda=false, std::string parentLambdaName="", bool isSelfJoin=false) override
int getNumChildren() override