A platform for high-performance distributed tool and library development written in C++. It can be deployed in two different cluster modes: standalone or distributed. API for v0.5.0, released on June 13, 2018.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
Computation.h
Go to the documentation of this file.
1 /*****************************************************************************
2  * *
3  * Copyright 2018 Rice University *
4  * *
5  * Licensed under the Apache License, Version 2.0 (the "License"); *
6  * you may not use this file except in compliance with the License. *
7  * You may obtain a copy of the License at *
8  * *
9  * http://www.apache.org/licenses/LICENSE-2.0 *
10  * *
11  * Unless required by applicable law or agreed to in writing, software *
12  * distributed under the License is distributed on an "AS IS" BASIS, *
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. *
14  * See the License for the specific language governing permissions and *
15  * limitations under the License. *
16  * *
17  *****************************************************************************/
18 
19 #ifndef COMPUTATION_H
20 #define COMPUTATION_H
21 
22 #include "Object.h"
23 #include "Lambda.h"
24 #include "ComputeSource.h"
25 #include "ComputeSink.h"
26 #include "SinkMerger.h"
27 #include "SinkShuffler.h"
28 #include "InputTupleSetSpecifier.h"
29 #include "PDBString.h"
30 #include <map>
31 
32 namespace pdb {
33 
34 class ComputePlan;
35 
54 };
55 
59 class Computation : public Object {
60 
61 public:
62 
67  virtual void extractLambdas(std::map<std::string, GenericLambdaObjectPtr>& returnVal) {}
68 
79  return nullptr;
80  }
81 
89  TupleSpec& projection,
90  ComputePlan& plan) {
91  return nullptr;
92  }
93 
95  TupleSpec& whichAttsToOpOn,
96  TupleSpec& projection,
97  ComputePlan& plan) {
98  return getComputeSink(consumeMe, projection, plan);
99  }
100 
105  TupleSpec& projection,
106  ComputePlan& plan) {
107  return nullptr;
108  }
109 
110 
112  TupleSpec& whichAttsToOpOn,
113  TupleSpec& projection,
114  ComputePlan& plan) {
115  return getSinkMerger(consumeMe, projection, plan);
116  }
117 
119 
121  TupleSpec& projection,
122  ComputePlan& plan) {
123  return nullptr;
124  }
125 
126 
128  TupleSpec& whichAttsToOpOn,
129  TupleSpec& projection,
130  ComputePlan& plan) {
131  return getSinkShuffler(consumeMe, projection, plan);
132  }
133 
134  // returns the type of this Computation
135  virtual std::string getComputationType() = 0;
136 
137  // JiaNote: below function returns a TCAP string for this Computation
138  virtual std::string toTCAPString(std::vector<InputTupleSetSpecifier>& inputTupleSets,
139  int computationLabel,
140  std::string& outputTupleSetName,
141  std::vector<std::string>& outputColumnNames,
142  std::string& addedOutputColumnName) = 0;
143 
149  virtual std::string getIthInputType(int i) = 0;
150 
151  bool hasInput() {
152  return !inputs.isNullPtr();
153  }
154 
161  return (*inputs)[i];
162  }
163 
168  virtual int getNumInputs() = 0;
169 
174  virtual std::string getOutputType() = 0;
175 
181 
187  return numConsumers;
188  }
189 
191  this->numConsumers = numConsumers;
192  }
193 
200  return setInput(0, toMe);
201  }
202 
209  bool setInput(int whichSlot, Handle<Computation> toMe) {
210 
211  // set the array of inputs if it is a nullptr
212  if (inputs == nullptr) {
213  inputs = makeObject<Vector<Handle<Computation>>>(getNumInputs());
214  for (int i = 0; i < getNumInputs(); i++) {
215  inputs->push_back(nullptr);
216  }
217  }
218 
219  // if we are adding this query to a valid slot
220  if (whichSlot < getNumInputs()) {
221 
222 
223  // make sure the output type of the guy we are accepting meets the input type
224  if (getIthInputType(whichSlot) != toMe->getOutputType()) {
225  std::cout << "Cannot set output of query node with output of type "
226  << toMe->getOutputType() << " to be the input";
227  std::cout << " of a query with input type " << getIthInputType(whichSlot) << ".\n";
228  return false;
229  }
230  (*inputs)[whichSlot] = toMe;
231  toMe->setNumConsumers(toMe->getNumConsumers() + 1);
232 
233  } else {
234 
235  return false;
236  }
237  return true;
238  }
239 
240 
245  bool isTraversed() {
246 
247  return traversed;
248  }
249 
254  void setTraversed(bool traversed) {
255 
256  this->traversed = traversed;
257  }
258 
263  std::string getOutputTupleSetName() {
264 
265  if (traversed) {
266  return outputTupleSetName;
267  }
268  return "";
269  }
270 
276  this->outputTupleSetName = outputTupleSetName;
277  }
278 
283  std::string getOutputColumnToApply() {
284 
285  if (traversed) {
286  return outputColumnToApply;
287  }
288  return "";
289  }
290 
295  this->outputColumnToApply = outputColumnToApply;
296  }
297 
303  virtual void setOutput(std::string dbName, std::string setName) {}
304 
305  virtual std::string getDatabaseName() {
306  return "";
307  }
308 
309  virtual std::string getSetName() {
310  return "";
311  }
312 
313  virtual bool needsMaterializeOutput() {
314  return false;
315  }
316 
317  virtual void setBatchSize(int batchSize) {}
318 
319  virtual bool isUsingCombiner() {
320  std::cout << "Only aggregation needs to set flag for combiner" << std::endl;
321  return false;
322  }
323 
324  virtual void setUsingCombiner(bool useCombinerOrNot) {
325  std::cout << "Only aggregation needs to set flag for combiner" << std::endl;
326  }
327 
328 
330  this->myAllocatorPolicy = myPolicy;
331  }
332 
333  void setObjectPolicy(ObjectPolicy myPolicy) {
334  this->myObjectPolicy = myPolicy;
335  }
336 
338  return this->myAllocatorPolicy;
339  }
340 
342  return this->myObjectPolicy;
343  }
344 
349  virtual void setCollectAsMap(bool collectAsMapOrNot){};
350 
355  virtual bool isCollectAsMap() {
356  return false;
357  }
358 
363  virtual int getNumNodesToCollect() {
364  return 0;
365  }
366 
371  virtual void setNumNodesToCollect(int numNodesToCollect) {}
372 
373 private:
374 
376 
377  bool traversed = false;
378 
380 
382 
383  int numConsumers = 0;
384 
386 
388 };
389 }
390 
391 #endif
ComputationTypeID
Definition: Computation.h:39
virtual SinkMergerPtr getSinkMerger(TupleSpec &consumeMe, TupleSpec &projection, ComputePlan &plan)
Definition: Computation.h:104
virtual void setUsingCombiner(bool useCombinerOrNot)
Definition: Computation.h:324
std::shared_ptr< ComputeSource > ComputeSourcePtr
Definition: ComputeSource.h:26
void setAllocatorPolicy(AllocatorPolicy myPolicy)
Definition: Computation.h:329
ObjectPolicy
Definition: Object.h:34
AllocatorPolicy getAllocatorPolicy()
Definition: Computation.h:337
virtual int getNumNodesToCollect()
Definition: Computation.h:363
std::shared_ptr< SinkShuffler > SinkShufflerPtr
Definition: SinkShuffler.h:31
virtual bool isUsingCombiner()
Definition: Computation.h:319
AllocatorPolicy
Definition: Allocator.h:130
std::shared_ptr< SinkMerger > SinkMergerPtr
Definition: SinkMerger.h:30
virtual void setBatchSize(int batchSize)
Definition: Computation.h:317
Handle< Computation > & getIthInput(int i) const
Definition: Computation.h:160
void setObjectPolicy(ObjectPolicy myPolicy)
Definition: Computation.h:333
String outputColumnToApply
Definition: Computation.h:381
bool setInput(int whichSlot, Handle< Computation > toMe)
Definition: Computation.h:209
virtual void setNumNodesToCollect(int numNodesToCollect)
Definition: Computation.h:371
AllocatorPolicy myAllocatorPolicy
Definition: Computation.h:385
virtual void setCollectAsMap(bool collectAsMapOrNot)
Definition: Computation.h:349
Handle< Vector< Handle< Computation > > > inputs
Definition: Computation.h:375
void setNumConsumers(int numConsumers)
Definition: Computation.h:190
virtual ComputationTypeID getComputationTypeID()=0
virtual std::string toTCAPString(std::vector< InputTupleSetSpecifier > &inputTupleSets, int computationLabel, std::string &outputTupleSetName, std::vector< std::string > &outputColumnNames, std::string &addedOutputColumnName)=0
ObjectPolicy myObjectPolicy
Definition: Computation.h:387
std::shared_ptr< ComputeSink > ComputeSinkPtr
Definition: ComputeSink.h:27
virtual void extractLambdas(std::map< std::string, GenericLambdaObjectPtr > &returnVal)
Definition: Computation.h:67
virtual std::string getIthInputType(int i)=0
virtual std::string getOutputType()=0
virtual SinkMergerPtr getSinkMerger(TupleSpec &consumeMe, TupleSpec &whichAttsToOpOn, TupleSpec &projection, ComputePlan &plan)
Definition: Computation.h:111
virtual ComputeSinkPtr getComputeSink(TupleSpec &consumeMe, TupleSpec &whichAttsToOpOn, TupleSpec &projection, ComputePlan &plan)
Definition: Computation.h:94
virtual SinkShufflerPtr getSinkShuffler(TupleSpec &consumeMe, TupleSpec &whichAttsToOpOn, TupleSpec &projection, ComputePlan &plan)
Definition: Computation.h:127
void setOutputColumnToApply(std::string outputColumnToApply)
Definition: Computation.h:294
virtual ComputeSinkPtr getComputeSink(TupleSpec &consumeMe, TupleSpec &projection, ComputePlan &plan)
Definition: Computation.h:88
virtual ComputeSourcePtr getComputeSource(TupleSpec &produceMe, ComputePlan &plan)
Definition: Computation.h:78
virtual SinkShufflerPtr getSinkShuffler(TupleSpec &consumeMe, TupleSpec &projection, ComputePlan &plan)
JiaNote: add below interface for shuffling multiple join map sinks for hash partitioned join...
Definition: Computation.h:120
virtual int getNumInputs()=0
virtual std::string getSetName()
Definition: Computation.h:309
std::string getOutputColumnToApply()
Definition: Computation.h:283
virtual std::string getComputationType()=0
virtual bool isCollectAsMap()
Definition: Computation.h:355
bool setInput(Handle< Computation > toMe)
Definition: Computation.h:199
virtual std::string getDatabaseName()
Definition: Computation.h:305
void setTraversed(bool traversed)
Definition: Computation.h:254
void setOutputTupleSetName(std::string outputTupleSetName)
Definition: Computation.h:275
virtual bool needsMaterializeOutput()
Definition: Computation.h:313
virtual void setOutput(std::string dbName, std::string setName)
Definition: Computation.h:303
ObjectPolicy getObjectPolicy()
Definition: Computation.h:341
std::string getOutputTupleSetName()
Definition: Computation.h:263
String outputTupleSetName
Definition: Computation.h:379