A platform for high-performance distributed tool and library development written in C++. It can be deployed in two different cluster modes: standalone or distributed. API for v0.5.0, released on June 13, 2018.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
JoinCompBase.h
Go to the documentation of this file.
1 /*****************************************************************************
2  * *
3  * Copyright 2018 Rice University *
4  * *
5  * Licensed under the Apache License, Version 2.0 (the "License"); *
6  * you may not use this file except in compliance with the License. *
7  * You may obtain a copy of the License at *
8  * *
9  * http://www.apache.org/licenses/LICENSE-2.0 *
10  * *
11  * Unless required by applicable law or agreed to in writing, software *
12  * distributed under the License is distributed on an "AS IS" BASIS, *
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. *
14  * See the License for the specific language governing permissions and *
15  * limitations under the License. *
16  * *
17  *****************************************************************************/
18 
19 #ifndef PDB_JOINCOMPBASE_H
20 #define PDB_JOINCOMPBASE_H
21 
22 #include "JoinTuple.h"
23 #include "DataProxy.h"
24 #include "AbstractJoinComp.h"
25 #include "ComputeInfo.h"
26 #include "ComputePlan.h"
27 
28 namespace pdb {
29 
33 class JoinArg : public ComputeInfo {
34 
35  public:
36  // this is the compute plan that we are part of
38 
39  // the location of the hash table
41 
42  JoinArg(ComputePlan& plan, void* pageWhereHashTableIs) : plan(plan), pageWhereHashTableIs(pageWhereHashTableIs) {}
43 
44  ~JoinArg() override = default;
45 };
46 
47 
51 typedef enum {
52 
55 
56 } JoinType;
57 
58 template <typename Out, typename In1, typename In2, typename... Rest>
60  private:
61  // JiaNote: this is used to pass to lambda tree to update pipeline information for each input
63 
64  // JiaNote: this is to specify the JoinType, by default we use broadcast join
66 
67  // JiaNote: partition number in the cluster, used by hash partition join
68  int numPartitions = 0;
69 
70  // JiaNote: number of nodes, used by hash partition join
71  int numNodes = 0;
72 
73  // JiaNote: partitionId for JoinSource, used by hash partition join
74  size_t myPartitionId = 0;
75 
76  // JiaNote: the iterator for retrieving TupleSets from JoinMaps in pages
77  // be careful here that we put PageCircularBufferIteratorPtr and DataProxyPtr in a pdb object.
79 
80  // JiaNote: the data proxy for accessing pages in frontend storage server.
81  DataProxyPtr proxy = nullptr;
82 
83  // batch size
84  int batchSize = -1;
85 
86  public:
87 
88  virtual ~JoinCompBase() {
89  if (multiInputsBase == nullptr) {
90  delete (multiInputsBase);
91  }
92  this->iterator = nullptr;
93  this->proxy = nullptr;
94  }
95 
96  // set join type
98  this->joinType = joinType;
99  }
100 
101  // get join type
103  return this->joinType;
104  }
105 
106  // set number of partitions (used in hash partition join)
108  this->numPartitions = numPartitions;
109  }
110 
111  // return my number of partitions (used in hash partition join)
113  return numPartitions;
114  }
115 
116  // set number of nodes (used in hash partition join)
117  void setNumNodes(int numNodes) {
118  this->numNodes = numNodes;
119  }
120 
121  // return my number of nodes (used in hash partition join)
122  int getNumNodes() {
123  return numNodes;
124  }
125 
126  // set my partition id for obtaining JoinSource for one partition (used in hash partition join)
128  this->myPartitionId = myPartitionId;
129  }
130 
131  // return my partition id for obtaining JoinSource for one partition (used in hash partition
132  // join)
133  size_t getPartitionId() {
134  return this->myPartitionId;
135  }
136 
137  // JiaNote: be careful here that we put PageCircularBufferIteratorPtr and DataProxyPtr in a pdb
138  // object (used in hash partition join)
140  this->iterator = iterator;
141  }
142 
143  // to set proxy for communicating with frontend storage server (used in hash partition join)
145  this->proxy = proxy;
146  }
147 
148  // to set chunk size for JoinSource (used in hash partition join)
149  void setBatchSize(int batchSize) override {
150  this->batchSize = batchSize;
151  }
152 
153  // to get batch size for JoinSource (used in hash partition join)
154  int getBatchSize() {
155  return this->batchSize;
156  }
157 
159  if (multiInputsBase == nullptr) {
161  }
162  return multiInputsBase;
163  }
164 
166  if (multiInputsBase == nullptr) {
167  delete (multiInputsBase);
168  }
169  multiInputsBase = nullptr;
170  }
171 
172  void analyzeInputSets(std::vector<std::string>& inputNames) {
173  if (multiInputsBase == nullptr) {
175  }
176  // Step 1. setup all input names (the column name corresponding to input in tuple set)
177  for (int i = 0; i < inputNames.size(); i++) {
178  this->multiInputsBase->setNameForIthInput(i, inputNames[i]);
179  }
180 
181  // Step 2. analyze selectionLambda to find all inputs in predicates
182  Lambda<bool> selectionLambda = callGetSelection(*this);
183  std::vector<std::string> inputsInPredicates =
184  selectionLambda.getAllInputs(this->multiInputsBase);
185  for (auto &inputsInPredicate : inputsInPredicates) {
186  this->multiInputsBase->addInputNameToPredicates(inputsInPredicate);
187  }
188  // Step 3. analyze projectionLambda to find all inputs in projection
189  Lambda<Handle<Out>> projectionLambda = callGetProjection(*this);
190  std::vector<std::string> inputsInProjection =
191  projectionLambda.getAllInputs(this->multiInputsBase);
192  for (auto &i : inputsInProjection) {
194  }
195  }
196 
197  // the computation returned by this method is called to see if a data item should be returned in
198  // the output set
200  Handle<In2> in2,
201  Handle<Rest>... otherArgs) = 0;
202 
203  // the computation returned by this method is called to produce output tuples from this method
205  Handle<In2> in2,
206  Handle<Rest>... otherArgs) = 0;
207 
208  // calls getProjection and getSelection to extract the lambdas
209  void extractLambdas(std::map<std::string, GenericLambdaObjectPtr>& returnVal) override {
210  int suffix = 0;
211  Lambda<bool> selectionLambda = callGetSelection(*this);
212  Lambda<Handle<Out>> projectionLambda = callGetProjection(*this);
213  selectionLambda.toMap(returnVal, suffix);
214  projectionLambda.toMap(returnVal, suffix);
215  }
216 
217  // return the output type
218  std::string getOutputType() override {
219  return getTypeName<Out>();
220  }
221 
222  // count the number of inputs
223  int getNumInputs() final {
224  const int extras = sizeof...(Rest);
225  return extras + 2;
226  }
227 
228  template <typename First, typename... Others>
229  typename std::enable_if<sizeof...(Others) == 0, std::string>::type getIthInputType(int i) {
230  if (i == 0) {
231  return getTypeName<First>();
232  } else {
233  std::cout << "Asked for an input type that didn't exist!";
234  exit(1);
235  }
236  }
237 
238  // helper function to get a particular intput type
239  template <typename First, typename... Others>
240  typename std::enable_if<sizeof...(Others) != 0, std::string>::type getIthInputType(int i) {
241  if (i == 0) {
242  return getTypeName<First>();
243  } else {
244  return getIthInputType<Others...>(i - 1);
245  }
246  }
247 
248  // from the interface: get the i^th input type
249  std::string getIthInputType(int i) final {
250  return getIthInputType<In1, In2, Rest...>(i);
251  }
252 
253  // JiaNote: TODO: encapsulate and reuse code for getting correctJoinTuple
254 
255  // JiaNote: this gets a sink merger
257  TupleSpec& attsToOpOn,
258  TupleSpec& projection,
259  ComputePlan& plan) override {
260 
261  // loop through each of the attributes that we are supposed to accept, and for each of them,
262  // find the type
263  std::vector<std::string> typeList;
264  AtomicComputationPtr producer = plan.getPlan()->getComputations().getProducingAtomicComputation(consumeMe.getSetName());
265 
266  for (auto& a : projection.getAtts()) {
267 
268  // find the identity of the producing computation
269  std::pair<std::string, std::string> res = producer->findSource(a, plan.getPlan()->getComputations());
270 
271  // and find its type... in the first case, there is not a particular lambda that we need to ask for
272  if (res.second.empty()) {
273  typeList.push_back("pdb::Handle<" + plan.getPlan()->getNode(res.first).getComputation().getOutputType() + ">");
274  } else {
275  std::string myType =
276  plan.getPlan()->getNode(res.first).getLambda(res.second)->getOutputType();
277  // std :: cout << "my type is " << myType << std :: endl;
278 
279  if (myType.find_first_of("pdb::Handle<") == 0) {
280  typeList.push_back(myType);
281  } else {
282  typeList.push_back("pdb::Handle<" + myType + ">");
283  }
284  }
285  }
286 
287  // now we get the correct join tuple, that will allow us to pack tuples from the join in a
288  // hash table
289  std::vector<int> whereEveryoneGoes;
290  JoinTuplePtr correctJoinTuple = findCorrectJoinTuple<In1, In2, Rest...>(typeList, whereEveryoneGoes);
291 
292  return correctJoinTuple->getMerger();
293  }
294 
295  // JiaNote: this gets a sink shuffler
297  TupleSpec& attsToOpOn,
298  TupleSpec& projection,
299  ComputePlan& plan) override {
300 
301  // loop through each of the attributes that we are supposed to accept, and for each of them,
302  // find the type
303  std::vector<std::string> typeList;
304  AtomicComputationPtr producer = plan.getPlan()->getComputations().getProducingAtomicComputation(consumeMe.getSetName());
305 
306  for (auto& a : projection.getAtts()) {
307 
308  // find the identity of the producing computation
309  std::pair<std::string, std::string> res = producer->findSource(a, plan.getPlan()->getComputations());
310 
311  // and find its type... in the first case, there is not a particular lambda that we need to ask for
312  if (res.second.empty()) {
313  typeList.push_back("pdb::Handle<" + plan.getPlan()->getNode(res.first).getComputation().getOutputType() + ">");
314  } else {
315  std::string myType = plan.getPlan()->getNode(res.first).getLambda(res.second)->getOutputType();
316 
317  if (myType.find_first_of("pdb::Handle<") == 0) {
318  typeList.push_back(myType);
319  } else {
320  typeList.push_back("pdb::Handle<" + myType + ">");
321  }
322  }
323  }
324 
325  // now we get the correct join tuple, that will allow us to pack tuples from the join in a hash table
326  std::vector<int> whereEveryoneGoes;
327  JoinTuplePtr correctJoinTuple = findCorrectJoinTuple<In1, In2, Rest...>(typeList, whereEveryoneGoes);
328 
329  return correctJoinTuple->getShuffler();
330  }
331 
332 
333  // this gets a compute sink
335  TupleSpec& attsToOpOn,
336  TupleSpec& projection,
337  ComputePlan& plan) override {
338 
339  // loop through each of the attributes that we are supposed to accept, and for each of them, find the type
340  std::vector<std::string> typeList;
341  AtomicComputationPtr producer = plan.getPlan()->getComputations().getProducingAtomicComputation(consumeMe.getSetName());
342 
343  for (auto& a : projection.getAtts()) {
344 
345  // find the identity of the producing computation
346 
347  std::pair<std::string, std::string> res = producer->findSource(a, plan.getPlan()->getComputations());
348 
349  // and find its type... in the first case, there is not a particular lambda that we need to ask for
350  if (res.second.empty()) {
351  typeList.push_back(
352  "pdb::Handle<" +
353  plan.getPlan()->getNode(res.first).getComputation().getOutputType() + ">");
354  } else {
355 
356  std::string myType = plan.getPlan()->getNode(res.first).getLambda(res.second)->getOutputType();
357 
358  if (myType.find_first_of("pdb::Handle<") == 0) {
359  typeList.push_back(myType);
360  } else {
361  typeList.push_back("pdb::Handle<" + myType + ">");
362  }
363  }
364  }
365 
366  // now we get the correct join tuple, that will allow us to pack tuples from the join in a hash table
367  std::vector<int> whereEveryoneGoes;
368  JoinTuplePtr correctJoinTuple = findCorrectJoinTuple<In1, In2, Rest...>(typeList, whereEveryoneGoes);
369 
370  if (this->joinType == BroadcastJoin) {
371  return correctJoinTuple->getSink(consumeMe, attsToOpOn, projection, whereEveryoneGoes);
372  } else if (this->joinType == HashPartitionedJoin) {
373  return correctJoinTuple->getPartitionedSink(numPartitions / numNodes,
374  numNodes,
375  consumeMe,
376  attsToOpOn,
377  projection,
378  whereEveryoneGoes);
379  }
380 
381  return nullptr;
382  }
383 
384  // JiaNote: to get compute source for HashPartitionedJoin
385  ComputeSourcePtr getComputeSource(TupleSpec& outputScheme, ComputePlan& plan) override {
386 
387  if (this->joinType != HashPartitionedJoin) {
388  return nullptr;
389  }
390  // loop through each of the attributes that we are supposed to accept, and for each of them, find the type
391  std::vector<std::string> typeList;
392  AtomicComputationPtr producer = plan.getPlan()->getComputations().getProducingAtomicComputation(outputScheme.getSetName());
393 
394  for (auto& a : outputScheme.getAtts()) {
395  if (a.find("hash") != std::string::npos) {
396  continue;
397  }
398 
399  // find the identity of the producing computation
400  std::pair<std::string, std::string> res = producer->findSource(a, plan.getPlan()->getComputations());
401 
402  // and find its type... in the first case, there is not a particular lambda that we need to ask for
403  if (res.second.empty()) {
404  typeList.push_back("pdb::Handle<" + plan.getPlan()->getNode(res.first).getComputation().getOutputType() + ">");
405  } else {
406 
407  std::string myType = plan.getPlan()->getNode(res.first).getLambda(res.second)->getOutputType();
408 
409  if (myType.find_first_of("pdb::Handle<") == 0) {
410  typeList.push_back(myType);
411  } else {
412  typeList.push_back("pdb::Handle<" + myType + ">");
413  }
414  }
415  }
416 
417 
418  // now we get the correct join tuple, that will allow us to pack tuples from the join in a hash table
419  std::vector<int> whereEveryoneGoes;
420  JoinTuplePtr correctJoinTuple = findCorrectJoinTuple<In1, In2, Rest...>(typeList, whereEveryoneGoes);
421 
422  return correctJoinTuple->getPartitionedSource(
423  this->myPartitionId,
424 
425  [&]() -> PDBPagePtr {
426 
427  if (this->iterator == nullptr) {
428  std::cout << "Error in JoinComp: partitioned join source has a null iterator"
429  << std::endl;
430  return nullptr;
431  }
432 
433  while (this->iterator->hasNext()) {
434 
435  PDBPagePtr page = this->iterator->next();
436  if (page != nullptr) {
437  return page;
438  }
439  }
440 
441  return nullptr;
442 
443  },
444 
445  [&](PDBPagePtr freeMe) -> void {
446  if (this->proxy != nullptr) {
447  char* curBytes = freeMe->getRawBytes();
448  NodeID nodeId = (NodeID)(*((NodeID*)(curBytes)));
449  curBytes = curBytes + sizeof(NodeID);
450  DatabaseID dbId = (DatabaseID)(*((DatabaseID*)(curBytes)));
451  curBytes = curBytes + sizeof(DatabaseID);
452  UserTypeID typeId = (UserTypeID)(*((UserTypeID*)(curBytes)));
453  curBytes = curBytes + sizeof(UserTypeID);
454  SetID setId = (SetID)(*((SetID*)(curBytes)));
455  freeMe->decRefCount();
456  if (freeMe->getRefCount() == 0) {
457 #ifdef PROFILING_CACHE
458  std::cout << "To unpin Join source page with DatabaseID=" << dbId
459  << ", UserTypeID=" << typeId << ", SetID=" << setId
460  << ", PageID=" << freeMe->getPageID() << std::endl;
461 #endif
462  try {
463  this->proxy->unpinUserPage(nodeId, dbId, typeId, setId, freeMe, false);
464  } catch (NotEnoughSpace& n) {
465  makeObjectAllocatorBlock(4096, true);
466  this->proxy->unpinUserPage(nodeId, dbId, typeId, setId, freeMe, false);
467  throw n;
468  }
469  }
470 #ifdef PROFILING_CACHE
471  else {
472  std::cout << "Can't unpin Join source page with DatabaseID=" << dbId
473  << ", UserTypeID=" << typeId << ", SetID=" << setId
474  << ", PageID=" << freeMe->getPageID()
475  << ", reference count=" << freeMe->getRefCount() << std::endl;
476  }
477 #endif
478  }
479  },
480 
481  (size_t)this->batchSize,
482 
483  whereEveryoneGoes
484 
485  );
486  }
487 
488 
489  // this is a join computation
490  std::string getComputationType() override {
491  return std::string("JoinComp");
492  }
493 
494  // to return the type if of this computation
496  return JoinCompTypeID;
497  }
498 
499  // JiaNote: Returning a TCAP string for this Join computation
500  std::string toTCAPString(std::vector<InputTupleSetSpecifier>& inputTupleSets,
501  int computationLabel,
502  std::string& outputTupleSetName,
503  std::vector<std::string>& outputColumnNames,
504  std::string& addedOutputColumnName) override {
505 
506  if (inputTupleSets.size() == getNumInputs()) {
507  std::string tcapString;
508  if (multiInputsBase == nullptr) {
510  }
512  std::vector<std::string> inputNames;
513 
514  // update tupleset name for input sets
515  for (unsigned int i = 0; i < inputTupleSets.size(); i++) {
516  this->multiInputsBase->setTupleSetNameForIthInput(i, inputTupleSets[i].getTupleSetName());
517  this->multiInputsBase->setInputColumnsForIthInput(i, inputTupleSets[i].getColumnNamesToKeep());
518  this->multiInputsBase->setInputColumnsToApplyForIthInput(i, inputTupleSets[i].getColumnNamesToApply());
519  inputNames.push_back(inputTupleSets[i].getColumnNamesToApply()[0]);
520  }
521 
522  analyzeInputSets(inputNames);
523  Lambda<bool> selectionLambda = callGetSelection(*this);
524  std::string inputTupleSetName;
525  std::vector<std::string> inputColumnNames;
526  std::vector<std::string> inputColumnsToApply;
527  std::vector<std::string> childrenLambdaNames;
528  int lambdaLabel = 0;
529  std::string myLambdaName;
530  MultiInputsBase* multiInputsComp = this->getMultiInputsBase();
531  tcapString += selectionLambda.toTCAPString(inputTupleSetName,
532  inputColumnNames,
533  inputColumnsToApply,
534  childrenLambdaNames,
535  lambdaLabel,
536  "JoinComp",
537  computationLabel,
538  outputTupleSetName,
539  outputColumnNames,
540  addedOutputColumnName,
541  myLambdaName,
542  false,
543  multiInputsComp,
544  true);
545 
546  std::vector<std::string> inputsInProjection = multiInputsComp->getInputsInProjection();
547  tcapString += "\n/* run Join projection on ( " + inputsInProjection[0];
548  for (unsigned int i = 1; i < inputsInProjection.size(); i++) {
549  tcapString += " " + inputsInProjection[i];
550  }
551  tcapString += " )*/\n";
552  Lambda<Handle<Out>> projectionLambda = callGetProjection(*this);
553  inputTupleSetName = outputTupleSetName;
554  inputColumnNames.clear();
555  inputColumnsToApply.clear();
556  childrenLambdaNames.clear();
557  for (unsigned int index = 0; index < multiInputsComp->getNumInputs(); index++) {
558  multiInputsComp->setInputColumnsForIthInput(index, inputColumnNames);
559  }
560 
561  tcapString += projectionLambda.toTCAPString(inputTupleSetName,
562  inputColumnNames,
563  inputColumnsToApply,
564  childrenLambdaNames,
565  lambdaLabel,
566  "JoinComp",
567  computationLabel,
568  outputTupleSetName,
569  outputColumnNames,
570  addedOutputColumnName,
571  myLambdaName,
572  true,
573  multiInputsComp,
574  false);
575 
576  this->setOutputTupleSetName(outputTupleSetName);
577  this->setOutputColumnToApply(addedOutputColumnName);
579  return tcapString;
580 
581  } else {
582  std::cout << "ERROR: inputTupleSet size is " << inputTupleSets.size()
583  << " and not equivalent with Join's inputs " << getNumInputs() << std::endl;
584  return "";
585  }
586  }
587 
588 
589  // gets an execute that can run a scan join... needToSwapAtts is true if the atts that are
590  // currently stored in the hash table need to
591  // come SECOND in the output tuple sets... hashedInputSchema tells us the schema for the
592  // attributes that are currently stored in the
593  // hash table... pipelinedInputSchema tells us the schema for the attributes that will be coming
594  // through the pipeline...
595  // pipelinedAttsToOperateOn is the identity of the hash attribute...
596  // pipelinedAttsToIncludeInOutput tells us the set of attributes
597  // that are coming through the pipeline that we actually have to write to the output stream
598  ComputeExecutorPtr getExecutor(bool needToSwapAtts,
599  TupleSpec& hashedInputSchema,
600  TupleSpec& pipelinedInputSchema,
601  TupleSpec& pipelinedAttsToOperateOn,
602  TupleSpec& pipelinedAttsToIncludeInOutput,
603  ComputeInfoPtr arg) override {
604  // get the argument to the join
605  JoinArg& joinArg = *((JoinArg*)arg.get());
606 
607 
608  // loop through each of the attributes that we are supposed to accept, and for each of them,
609  // find the type
610  std::vector<std::string> typeList;
611  AtomicComputationPtr producer = joinArg.plan.getPlan()->getComputations().getProducingAtomicComputation(hashedInputSchema.getSetName());
612  for (auto& a : (hashedInputSchema.getAtts())) {
613 
614  // find the identity of the producing computation
615  std::pair<std::string, std::string> res = producer->findSource(a, joinArg.plan.getPlan()->getComputations());
616 
617  // and find its type... in the first case, there is not a particular lambda that we need to ask for
618  if (res.second.empty()) {
619  typeList.push_back("pdb::Handle<" + joinArg.plan.getPlan()->getNode(res.first).getComputation().getOutputType() + ">");
620  } else {
621 
622  std::string myType = joinArg.plan.getPlan()->getNode(res.first).getLambda(res.second)->getOutputType();
623 
624  if (myType.find_first_of("pdb::Handle<") == 0) {
625  typeList.push_back(myType);
626  } else {
627  typeList.push_back("pdb::Handle<" + myType + ">");
628  }
629  }
630  }
631 
632  // now we get the correct join tuple, that will allow us to pack tuples from the join in a hash table
633  std::vector<int> whereEveryoneGoes;
634  JoinTuplePtr correctJoinTuple = findCorrectJoinTuple<In1, In2, Rest...>(typeList, whereEveryoneGoes);
635 
636  // and return the correct probing code
637  return correctJoinTuple->getProber(joinArg.pageWhereHashTableIs,
638  whereEveryoneGoes,
639  pipelinedInputSchema,
640  pipelinedAttsToOperateOn,
641  pipelinedAttsToIncludeInOutput,
642  needToSwapAtts);
643  }
644 
645  ComputeExecutorPtr getExecutor(bool needToSwapAtts,
646  TupleSpec& hashedInputSchema,
647  TupleSpec& pipelinedInputSchema,
648  TupleSpec& pipelinedAttsToOperateOn,
649  TupleSpec& pipelinedAttsToIncludeInOutput) override {
650  std::cout << "Currently, no pipelined version of the join doesn't take an arg.\n";
651  exit(1);
652  }
653 };
654 
655 }
656 
657 
658 #endif //PDB_JOINCOMPBASE_H
ComputeSourcePtr getComputeSource(TupleSpec &outputScheme, ComputePlan &plan) override
Definition: JoinCompBase.h:385
ComputationTypeID
Definition: Computation.h:39
void setNumInputs(int numInputs)
unsigned int SetID
Definition: DataTypes.h:31
shared_ptr< PDBPage > PDBPagePtr
Definition: PDBPage.h:32
std::shared_ptr< ComputeSource > ComputeSourcePtr
Definition: ComputeSource.h:26
ComputePlan & plan
Definition: JoinCompBase.h:37
ComputeExecutorPtr getExecutor(bool needToSwapAtts, TupleSpec &hashedInputSchema, TupleSpec &pipelinedInputSchema, TupleSpec &pipelinedAttsToOperateOn, TupleSpec &pipelinedAttsToIncludeInOutput, ComputeInfoPtr arg) override
Definition: JoinCompBase.h:598
void * pageWhereHashTableIs
Definition: JoinCompBase.h:40
void setInputColumnsForIthInput(int i, std::vector< std::string > &columns)
LogicalPlanPtr getPlan()
Definition: ComputePlan.cc:39
SinkMergerPtr getSinkMerger(TupleSpec &consumeMe, TupleSpec &attsToOpOn, TupleSpec &projection, ComputePlan &plan) override
Definition: JoinCompBase.h:256
std::shared_ptr< JoinTupleSingleton > JoinTuplePtr
Definition: JoinTuple.h:1251
std::vector< std::string > & getAtts()
Definition: TupleSpec.h:60
JoinArg(ComputePlan &plan, void *pageWhereHashTableIs)
Definition: JoinCompBase.h:42
shared_ptr< DataProxy > DataProxyPtr
Definition: DataProxy.h:30
std::shared_ptr< SinkShuffler > SinkShufflerPtr
Definition: SinkShuffler.h:31
ComputationTypeID getComputationTypeID() override
Definition: JoinCompBase.h:495
void analyzeInputSets(std::vector< std::string > &inputNames)
Definition: JoinCompBase.h:172
std::enable_if< sizeof...(Others)!=0, std::string >::type getIthInputType(int i)
Definition: JoinCompBase.h:240
void setInputColumnsToApplyForIthInput(int i, std::vector< std::string > &columnsToApply)
DataProxyPtr proxy
Definition: JoinCompBase.h:81
unsigned int NodeID
Definition: DataTypes.h:27
std::shared_ptr< SinkMerger > SinkMergerPtr
Definition: SinkMerger.h:30
void setTupleSetNameForIthInput(int i, std::string name)
PageCircularBufferIteratorPtr iterator
Definition: JoinCompBase.h:78
void addInputNameToProjection(std::string name)
void toMap(std::map< std::string, GenericLambdaObjectPtr > &returnVal, int &suffix)
Definition: Lambda.h:73
std::shared_ptr< ComputeInfo > ComputeInfoPtr
Definition: ComputeInfo.h:33
MultiInputsBase * getMultiInputsBase()
Definition: JoinCompBase.h:158
std::string & getSetName()
Definition: TupleSpec.h:56
size_t getPartitionId()
Definition: JoinCompBase.h:133
std::string getOutputType() override
Definition: JoinCompBase.h:218
void setBatchSize(int batchSize) override
Definition: JoinCompBase.h:149
ComputeSinkPtr getComputeSink(TupleSpec &consumeMe, TupleSpec &attsToOpOn, TupleSpec &projection, ComputePlan &plan) override
Definition: JoinCompBase.h:334
std::string toTCAPString(std::vector< InputTupleSetSpecifier > &inputTupleSets, int computationLabel, std::string &outputTupleSetName, std::vector< std::string > &outputColumnNames, std::string &addedOutputColumnName) override
Definition: JoinCompBase.h:500
unsigned int DatabaseID
Definition: DataTypes.h:29
MultiInputsBase * multiInputsBase
Definition: JoinCompBase.h:62
ComputeExecutorPtr getExecutor(bool needToSwapAtts, TupleSpec &hashedInputSchema, TupleSpec &pipelinedInputSchema, TupleSpec &pipelinedAttsToOperateOn, TupleSpec &pipelinedAttsToIncludeInOutput) override
Definition: JoinCompBase.h:645
SinkShufflerPtr getSinkShuffler(TupleSpec &consumeMe, TupleSpec &attsToOpOn, TupleSpec &projection, ComputePlan &plan) override
Definition: JoinCompBase.h:296
virtual ~JoinCompBase()
Definition: JoinCompBase.h:88
void setPartitionId(size_t myPartitionId)
Definition: JoinCompBase.h:127
std::enable_if< std::is_base_of< JoinTupleBase, In1 >::value, JoinTuplePtr >::type findCorrectJoinTuple(std::vector< std::string > &typeList, std::vector< int > &whereEveryoneGoes)
Definition: JoinTuple.h:1301
std::shared_ptr< ComputeSink > ComputeSinkPtr
Definition: ComputeSink.h:27
auto callGetProjection(TypeToCallMethodOn &a, decltype(HasTwoArgs::test(&a))*arg=nullptr)
Definition: JoinTests.h:93
void addInputNameToPredicates(std::string name)
void setOutputColumnToApply(std::string outputColumnToApply)
Definition: Computation.h:294
std::string getIthInputType(int i) final
Definition: JoinCompBase.h:249
std::shared_ptr< struct AtomicComputation > AtomicComputationPtr
virtual Lambda< bool > getSelection(Handle< In1 > in1, Handle< In2 > in2, Handle< Rest >...otherArgs)=0
std::shared_ptr< ComputeExecutor > ComputeExecutorPtr
auto callGetSelection(TypeToCallMethodOn &a, decltype(HasTwoArgs::test(&a))*arg=nullptr)
Definition: JoinTests.h:59
void setMultiInputsBaseToNull()
Definition: JoinCompBase.h:165
void setJoinType(JoinType joinType)
Definition: JoinCompBase.h:97
std::vector< std::string > getAllInputs(MultiInputsBase *multiInputsBase)
Definition: Lambda.h:77
void setNumPartitions(int numPartitions)
Definition: JoinCompBase.h:107
std::enable_if< sizeof...(Others)==0, std::string >::type getIthInputType(int i)
Definition: JoinCompBase.h:229
void setNameForIthInput(int i, std::string name)
std::string getComputationType() override
Definition: JoinCompBase.h:490
JoinType
Definition: JoinCompBase.h:51
void setOutputTupleSetName(std::string outputTupleSetName)
Definition: Computation.h:275
~JoinArg() override=default
void setIterator(PageCircularBufferIteratorPtr iterator)
Definition: JoinCompBase.h:139
JoinType getJoinType()
Definition: JoinCompBase.h:102
void setNumNodes(int numNodes)
Definition: JoinCompBase.h:117
void makeObjectAllocatorBlock(size_t numBytesIn, bool throwExceptionOnFail)
virtual Lambda< Handle< Out > > getProjection(Handle< In1 > in1, Handle< In2 > in2, Handle< Rest >...otherArgs)=0
std::string toTCAPString(std::string inputTupleSetName, std::vector< std::string > &inputColumnNames, std::vector< std::string > &inputColumnsToApply, std::vector< std::string > &childrenLambdaNames, int &lambdaLabel, std::string computationName, int computationLabel, std::string &outputTupleSetName, std::vector< std::string > &outputColumnNames, std::string &addedOutputColumnName, std::string &myLambdaName, bool whetherToRemoveUnusedOutputColumns, MultiInputsBase *multiInputsComp=nullptr, bool amIPartOfJoinPredicate=false)
Definition: Lambda.h:101
shared_ptr< PageCircularBufferIterator > PageCircularBufferIteratorPtr
int getNumInputs() final
Definition: JoinCompBase.h:223
void setProxy(DataProxyPtr proxy)
Definition: JoinCompBase.h:144
unsigned int UserTypeID
Definition: DataTypes.h:25
void extractLambdas(std::map< std::string, GenericLambdaObjectPtr > &returnVal) override
Definition: JoinCompBase.h:209
std::vector< std::string > getInputsInProjection()
String outputTupleSetName
Definition: Computation.h:379