A platform for high-performance distributed tool and library development written in C++. It can be deployed in two different cluster modes: standalone or distributed. API for v0.5.0, released on June 13, 2018.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
JoinTuple.h
Go to the documentation of this file.
1 /*****************************************************************************
2  * *
3  * Copyright 2018 Rice University *
4  * *
5  * Licensed under the Apache License, Version 2.0 (the "License"); *
6  * you may not use this file except in compliance with the License. *
7  * You may obtain a copy of the License at *
8  * *
9  * http://www.apache.org/licenses/LICENSE-2.0 *
10  * *
11  * Unless required by applicable law or agreed to in writing, software *
12  * distributed under the License is distributed on an "AS IS" BASIS, *
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. *
14  * See the License for the specific language governing permissions and *
15  * limitations under the License. *
16  * *
17  *****************************************************************************/
18 
19 #ifndef JOIN_TUPLE_H
20 #define JOIN_TUPLE_H
21 
22 #include "JoinMap.h"
23 #include "SinkMerger.h"
24 #include "SinkShuffler.h"
25 #include "JoinTupleBase.h"
26 #include "PDBPage.h"
27 #include "RecordIterator.h"
28 
29 namespace pdb {
30 
31 template <typename T>
32 void copyFrom(T& out, Handle<T>& in) {
33  out = *in;
34 }
35 
36 template <typename T>
37 void copyFrom(T& out, T& in) {
38  out = in;
39 }
40 
41 template <typename T>
42 void copyFrom(Handle<T>& out, Handle<T>& in) {
43  out = in;
44 }
45 
46 template <typename T>
47 void copyFrom(Handle<T>& out, T& in) {
48  *out = in;
49 }
50 
51 template <typename T>
52 void copyTo(T& out, Handle<T>& in) {
53  char* location = (char*)&out;
54  location -= REF_COUNT_PREAMBLE_SIZE;
55  in = (RefCountedObject<T>*)location;
56 }
57 
58 template <typename T>
59 void copyTo(Handle<T>& out, Handle<T>& in) {
60  in = out;
61 }
62 
63 // this checks to see if the class is abstract
64 // used like: decltype (IsAbstract <Foo> :: val) a;
65 // the type of a will be Handle <Foo> if foo is abstract, and Foo otherwise.
66 //
67 template <typename T>
68 struct IsAbstract {
69  template <typename U>
70  static U test(U x, int);
71 
72  template <typename U, typename... Rest>
73  static Handle<U> test(U& x, Rest...);
74 
75  static decltype(test<T>(*((T*)0), 1)) val;
76 };
77 
78 /*// all join tuples decend from this
79 class JoinTupleBase {
80 
81 };
82 */
83 
84 // this template is used to hold a tuple made of one row from each of a number of columns in a
85 // TupleSet
86 template <typename HoldMe, typename MeTo>
87 class JoinTuple : public JoinTupleBase {
88 
89 public:
90  // this stores the base type
91  decltype(IsAbstract<HoldMe>::val) myData;
92 
93  // and this is the recursion
94  MeTo myOtherData;
95 
96  static void* allocate(TupleSet& processMe, int where) {
97  std::vector<Handle<HoldMe>>* me = new std::vector<Handle<HoldMe>>;
98  processMe.addColumn(where, me, true);
99  return me;
100  }
101 
102  // JiaNote: add the below two functions to facilitate JoinMap merging for broadcast join
104  pdb::copyFrom(myData, me);
105  }
106 
107  void copyDataFrom(HoldMe me) {
108  pdb::copyFrom(myData, me);
109  }
110 
111  void copyFrom(void* input, int whichPos) {
112  std::vector<Handle<HoldMe>>& me = *((std::vector<Handle<HoldMe>>*)input);
113  pdb::copyFrom(myData, me[whichPos]);
114  }
115 
116  void copyTo(void* input, int whichPos) {
117  std::vector<Handle<HoldMe>>& me = *((std::vector<Handle<HoldMe>>*)input);
118 
119  if (whichPos >= me.size()) {
120  Handle<HoldMe> temp;
121  pdb::copyTo(myData, temp);
122  me.push_back(temp);
123  } else {
124  pdb::copyTo(myData, me[whichPos]);
125  }
126  }
127 
128  static void truncate(void* input, int i) {
129  std::vector<Handle<HoldMe>>& valColumn = *((std::vector<Handle<HoldMe>>*)(input));
130  valColumn.erase(valColumn.begin(), valColumn.begin() + i);
131  }
132 
133  static void eraseEnd(void* input, int i) {
134  std::vector<Handle<HoldMe>>& valColumn = *((std::vector<Handle<HoldMe>>*)(input));
135  valColumn.resize(i);
136  }
137 };
138 
139 
140 /***** CODE TO CREATE A SET OF ATTRIBUTES IN A TUPLE SET *****/
141 
142 // this adds a new column to processMe of type TypeToCreate. This is added at position offset +
143 // positions[whichPos]
144 template <typename TypeToCreate>
145 typename std::enable_if<sizeof(TypeToCreate::myOtherData) == 0, void>::type createCols(
146  void** putUsHere, TupleSet& processMe, int offset, int whichPos, std::vector<int> positions) {
147  putUsHere[whichPos] = TypeToCreate::allocate(processMe, offset + positions[whichPos]);
148 }
149 
150 // recursive version of the above
151 template <typename TypeToCreate>
152 typename std::enable_if<sizeof(TypeToCreate::myOtherData) != 0, void>::type createCols(
153  void** putUsHere, TupleSet& processMe, int offset, int whichPos, std::vector<int> positions) {
154  putUsHere[whichPos] = TypeToCreate::allocate(processMe, offset + positions[whichPos]);
155  createCols<decltype(TypeToCreate::myOtherData)>(
156  putUsHere, processMe, offset, whichPos + 1, positions);
157 }
158 
159 // JiaNote: add below two functions to c a join tuple from another join tuple
160 /**** CODE TO COPY A JOIN TUPLE FROM ANOTHER JOIN TUPLE ****/
161 
162 // this is the non-recursive version of packData; called if the type does NOT have a field called
163 // myOtherData, in which case
164 // we can just directly copy the data
165 template <typename TypeToPackData>
166 typename std::enable_if<(sizeof(TypeToPackData::myOtherData) == 0) &&
167  (sizeof(TypeToPackData::myData) != 0),
168  void>::type
169 packData(TypeToPackData& arg, TypeToPackData data) {
170  arg.copyDataFrom(data.myData);
171 }
172 
173 // this is the recursive version of packData; called if the type has a field called myOtherData to
174 // which we can recursively pack values to.
175 template <typename TypeToPackData>
176 typename std::enable_if<(sizeof(TypeToPackData::myOtherData) != 0) &&
177  (sizeof(TypeToPackData::myData) != 0),
178  void>::type
179 packData(TypeToPackData& arg, TypeToPackData data) {
180  arg.copyDataFrom(data.myData);
181  packData(arg.myOtherData, data.myOtherData);
182 }
183 
184 
185 /***** CODE TO PACK A JOIN TUPLE FROM A SET OF VALUES SPREAD ACCROSS COLUMNS *****/
186 
187 // this is the non-recursive version of pack; called if the type does NOT have a field called
188 // "myData", in which case
189 // we can just directly copy the data
190 template <typename TypeToPack>
191 typename std::enable_if<sizeof(TypeToPack::myOtherData) == 0, void>::type pack(
192  TypeToPack& arg, int whichPosInTupleSet, int whichVec, void** us) {
193  arg.copyFrom(us[whichVec], whichPosInTupleSet);
194 }
195 
196 // this is the recursive version of pack; called if the type has a field called "myData" to which we
197 // can recursively
198 // pack values to. Basically, what it does is to accept a pointer to a list of pointers to various
199 // std :: vector
200 // objects. We are going to recurse through the list of vectors, and for each vector, we record the
201 // entry at
202 // the position whichPosInTupleSet
203 template <typename TypeToPack>
204 typename std::enable_if<sizeof(TypeToPack::myOtherData) != 0, void>::type pack(
205  TypeToPack& arg, int whichPosInTupleSet, int whichVec, void** us) {
206 
207  arg.copyFrom(us[whichVec], whichPosInTupleSet);
208  pack(arg.myOtherData, whichPosInTupleSet, whichVec + 1, us);
209 }
210 
211 /***** CODE TO UNPACK A JOIN TUPLE FROM A SET OF VALUES SPREAD ACCROSS COLUMNS *****/
212 
213 // this is the non-recursive version of unpack
214 template <typename TypeToUnPack>
215 typename std::enable_if<sizeof(TypeToUnPack::myOtherData) == 0, void>::type unpack(
216  TypeToUnPack& arg, int whichPosInTupleSet, int whichVec, void** us) {
217  arg.copyTo(us[whichVec], whichPosInTupleSet);
218 }
219 
220 // this is analagous to pack, except that it unpacks this tuple into an array of vectors
221 template <typename TypeToUnPack>
222 typename std::enable_if<sizeof(TypeToUnPack::myOtherData) != 0, void>::type unpack(
223  TypeToUnPack& arg, int whichPosInTupleSet, int whichVec, void** us) {
224 
225  arg.copyTo(us[whichVec], whichPosInTupleSet);
226  unpack(arg.myOtherData, whichPosInTupleSet, whichVec + 1, us);
227 }
228 
229 /***** CODE TO ERASE DATA FROM THE END OF A SET OF VECTORS *****/
230 
231 // this is the non-recursive version of eraseEnd
232 template <typename TypeToTruncate>
233 typename std::enable_if<sizeof(TypeToTruncate::myOtherData) == 0, void>::type eraseEnd(int i,
234  int whichVec,
235  void** us) {
236 
237  TypeToTruncate::eraseEnd(us[whichVec], i);
238 }
239 
240 // recursive version
241 template <typename TypeToTruncate>
242 typename std::enable_if<sizeof(TypeToTruncate::myOtherData) != 0, void>::type eraseEnd(int i,
243  int whichVec,
244  void** us) {
245 
246  TypeToTruncate::eraseEnd(us[whichVec], i);
247  eraseEnd<decltype(TypeToTruncate::myOtherData)>(i, whichVec + 1, us);
248 }
249 
250 /***** CODE TO TRUNCATE A SET OF VECTORS *****/
251 
252 // this is the non-recursive version of truncate
253 template <typename TypeToTruncate>
254 typename std::enable_if<sizeof(TypeToTruncate::myOtherData) == 0, void>::type truncate(int i,
255  int whichVec,
256  void** us) {
257 
258  TypeToTruncate::truncate(us[whichVec], i);
259 }
260 
261 // this function goes through a list of vectors, and truncates each of them so that the first i
262 // entries of each vector is removed
263 template <typename TypeToTruncate>
264 typename std::enable_if<sizeof(TypeToTruncate::myOtherData) != 0, void>::type truncate(int i,
265  int whichVec,
266  void** us) {
267 
268  TypeToTruncate::truncate(us[whichVec], i);
269  truncate<decltype(TypeToTruncate::myOtherData)>(i, whichVec + 1, us);
270 }
271 
272 // this clsas is used to encapaulte the computation that is responsible for probing a hash table
273 template <typename RHSType>
274 class JoinProbe : public ComputeExecutor {
275 
276 private:
277  // this is the output TupleSet that we return
279 
280  // the attribute to operate on
281  int whichAtt;
282 
283  // to setup the output tuple set
285 
286  // the hash talbe we are processing
288 
289  // the list of counts for matches of each of the input tuples
290  std::vector<uint32_t> counts;
291 
292  // this is the list of all of the output columns in the output TupleSetPtr
293  void** columns;
294 
295  // used to create space of attributes in the case that the atts from attsToIncludeInOutput are
296  // not the first bunch of atts
297  // inside of the output tuple
298  int offset;
299 
300 public:
302  if (columns != nullptr)
303  delete[] columns;
304  }
305 
306  // when we probe a hash table, a subset of the atts that we need to put into the output stream
307  // are stored in the hash table... the positions
308  // of these packed atts are stored in typesStoredInHash, so that they can be extracted.
309  // inputSchema, attsToOperateOn, and attsToIncludeInOutput
310  // are standard for executors: they tell us the details of the input that are streaming in, as
311  // well as the identity of the has att, and
312  // the atts that will be streamed to the output, from the input. needToSwapLHSAndRhs is true if
313  // it's the case that theatts stored in the
314  // hash table need to come AFTER the atts being streamed through the join
315  JoinProbe(void* hashTable,
316  std::vector<int>& positions,
317  TupleSpec& inputSchema,
318  TupleSpec& attsToOperateOn,
319  TupleSpec& attsToIncludeInOutput,
320  bool needToSwapLHSAndRhs)
321  : myMachine(inputSchema, attsToIncludeInOutput) {
322 
323  // extract the hash table we've been given
324  Record<JoinMap<RHSType>>* input = (Record<JoinMap<RHSType>>*)hashTable;
325  inputTable = input->getRootObject();
326 
327  // set up the output tuple
328  output = std::make_shared<TupleSet>();
329  columns = new void*[positions.size()];
330  if (needToSwapLHSAndRhs) {
331  offset = positions.size();
332  createCols<RHSType>(columns, *output, 0, 0, positions);
333  } else {
334  offset = 0;
335  createCols<RHSType>(
336  columns, *output, attsToIncludeInOutput.getAtts().size(), 0, positions);
337  }
338 
339  // this is the input attribute that we will hash in order to try to find matches
340  std::vector<int> matches = myMachine.match(attsToOperateOn);
341  whichAtt = matches[0];
342  }
343 
344  std::string getType() override {
345  return "JoinProbe";
346  }
347 
348  TupleSetPtr process(TupleSetPtr input) override {
349 
350  std::vector<size_t> inputHash = input->getColumn<size_t>(whichAtt);
351  JoinMap<RHSType>& inputTableRef = *inputTable;
352 
353  // redo the vector of hash counts if it's not the correct size
354  if (counts.size() != inputHash.size()) {
355  counts.resize(inputHash.size());
356  }
357 
358  // now, run through and attempt to hash
359  int overallCounter = 0;
360  for (int i = 0; i < inputHash.size(); i++) {
361 
362  // deal with all of the matches
363  auto a = inputTableRef.lookup(inputHash[i]);
364  int numHits = a.size();
365 
366  for (int which = 0; which < numHits; which++) {
367  unpack(a[which], overallCounter, 0, columns);
368  overallCounter++;
369  }
370 
371  // remember how many matches we had
372  counts[i] = numHits;
373  }
374 
375  // truncate if we have extra
376  eraseEnd<RHSType>(overallCounter, 0, columns);
377 
378  // and finally, we need to relpicate the input data
379  myMachine.replicate(input, output, counts, offset);
380 
381  // outta here!
382  return output;
383  }
384 };
385 
386 
387 // JiaNote: this class is used to create a SinkMerger object that merges multiple JoinSinks for
388 // broadcast join
389 template <typename RHSType>
390 class JoinSinkMerger : public SinkMerger {
391 
392 
393 public:
395 
397 
399 
400  // we simply create a new map to store the output
401  Handle<JoinMap<RHSType>> returnVal = makeObject<JoinMap<RHSType>>();
402  return returnVal;
403  }
404 
405  void writeOut(Handle<Object> mergeMe, Handle<Object>& mergeToMe) override {
406 
407  // get the map we are adding to
408  Handle<JoinMap<RHSType>> mergedMap = unsafeCast<JoinMap<RHSType>>(mergeToMe);
409  JoinMap<RHSType>& myMap = *mergedMap;
410  Handle<JoinMap<RHSType>> mapToMerge = unsafeCast<JoinMap<RHSType>>(mergeMe);
411  JoinMap<RHSType>& theOtherMap = *mapToMerge;
412 
413  for (JoinMapIterator<RHSType> iter = theOtherMap.begin(); iter != theOtherMap.end();
414  ++iter) {
415  JoinRecordList<RHSType>* myList = *iter;
416  size_t mySize = myList->size();
417  size_t myHash = myList->getHash();
418  if (mySize > 0) {
419  for (size_t i = 0; i < mySize; i++) {
420  try {
421  RHSType* temp = &(myMap.push(myHash));
422  packData(*temp, ((*myList)[i]));
423  } catch (NotEnoughSpace& n) {
424  std::cout << "ERROR: join data is too large to be built in one map, "
425  "results are truncated!"
426  << std::endl;
427  delete (myList);
428  return;
429  }
430  }
431  }
432  delete (myList);
433  }
434  }
435 
436  void writeVectorOut(Handle<Object> mergeMe, Handle<Object>& mergeToMe) override {
437 
438  // get the map we are adding to
439  Handle<JoinMap<RHSType>> mergedMap = unsafeCast<JoinMap<RHSType>>(mergeToMe);
440  JoinMap<RHSType>& myMap = *mergedMap;
442  unsafeCast<Vector<Handle<JoinMap<RHSType>>>>(mergeMe);
443  Vector<Handle<JoinMap<RHSType>>>& theOtherMaps = *mapsToMerge;
444  for (int i = 0; i < theOtherMaps.size(); i++) {
445  JoinMap<RHSType>& theOtherMap = *(theOtherMaps[i]);
446  for (JoinMapIterator<RHSType> iter = theOtherMap.begin(); iter != theOtherMap.end();
447  ++iter) {
448  JoinRecordList<RHSType>* myList = *iter;
449  size_t mySize = myList->size();
450  size_t myHash = myList->getHash();
451  if (mySize > 0) {
452  for (size_t j = 0; j < mySize; j++) {
453  try {
454  RHSType* temp = &(myMap.push(myHash));
455  packData(*temp, ((*myList)[j]));
456  } catch (NotEnoughSpace& n) {
457  std::cout << "ERROR: join data is too large to be built in one map, "
458  "results are truncated!"
459  << std::endl;
460  delete (myList);
461  return;
462  }
463  }
464  }
465  delete (myList);
466  }
467  }
468  }
469 };
470 
471 
472 // JiaNote: this class is used to create a special JoinSource that will generate a stream of
473 // TupleSet from a series of JoinMaps
474 
475 template <typename RHSType>
477 
478 private:
479  // my partition id
481 
482  // function to call to get another vector to process
483  std::function<PDBPagePtr()> getAnotherVector;
484 
485  // function to call to free the vector
486  std::function<void(PDBPagePtr)> doneWithVector;
487 
488  // this is the vector to process
490 
491  // the pointer to current page holding the vector, and the last page that we previously
492  // processed
494 
495  // the page contains record
496  PDBPagePtr myPage, lastPage;
497 
498  // how many objects to put into a chunk
499  size_t chunkSize;
500 
501  // where we are in the Vector
502  size_t pos;
503 
504  // the current JoinMap
506 
507  // where we are in the JoinMap
509 
510  // end iterator
512 
513  // where we are in the Record list
515 
516  // and the tuple set we return
518 
519  // the hash column in the output TupleSet
520  std::vector<size_t>* hashColumn;
521 
522  // this is the list of output columns except the hash column in the output TupleSet
523  void** columns;
524 
525  // whether we have processed all pages
526  bool isDone;
527 
528  JoinRecordList<RHSType>* myList = nullptr;
529  size_t myListSize = 0;
530  size_t myHash = 0;
531 
532 
533  RecordIteratorPtr myIter = nullptr;
534 
535 public:
536  // the first param is a callback function that the iterator will call in order to obtain the
537  // page holding the next vector to iterate
538  // over. The secomd param is a callback that the iterator will call when the specified page is
539  // done being processed and can be
540  // freed. The third param tells us how many objects to put into a tuple set.
541  // The fourth param tells us positions of those packed columns.
543  std::function<PDBPagePtr()> getAnotherVector,
544  std::function<void(PDBPagePtr)> doneWithVector,
545  size_t chunkSize,
546  std::vector<int> positions)
547  : getAnotherVector(getAnotherVector), doneWithVector(doneWithVector), chunkSize(chunkSize) {
548 
549  // set my partition id
550  this->myPartitionId = myPartitionId;
551 
552  // create the tuple set that we'll return during iteration
553  output = std::make_shared<TupleSet>();
554  // extract the vector from the input page
555  myPage = getAnotherVector();
556  if (myPage != nullptr) {
557  myIter = make_shared<RecordIterator>(myPage);
558  if (myIter->hasNext() == true) {
559  myRec = (Record<Vector<Handle<JoinMap<RHSType>>>>*)(myIter->next());
560  } else {
561  myRec = nullptr;
562  }
563  } else {
564  myIter = nullptr;
565  myRec = nullptr;
566  }
567  if (myRec != nullptr) {
568 
569  iterateOverMe = myRec->getRootObject();
570  PDB_COUT << "Got iterateOverMe" << std::endl;
571  // create the output vector for objects and put it into the tuple set
572  columns = new void*[positions.size()];
573  createCols<RHSType>(columns, *output, 0, 0, positions);
574  // create the output vector for hash value and put it into the tuple set
575  hashColumn = new std::vector<size_t>;
576  output->addColumn(positions.size(), hashColumn, true);
577  isDone = false;
578 
579  } else {
580  iterateOverMe = nullptr;
581  output = nullptr;
582  isDone = true;
583  }
584 
585  // we are at position zero
586  pos = 0;
587  curJoinMap = nullptr;
588  posInRecordList = 0;
589  // and we have no data so far
590  lastRec = nullptr;
591  lastPage = nullptr;
592  }
593 
594  void setChunkSize(size_t chunkSize) override {
595  this->chunkSize = chunkSize;
596  }
597 
598  // returns the next tuple set to process, or nullptr if there is not one to process
600 
601  // JiaNote: below two lines are necessary to fix a bug that iterateOverMe may be nullptr
602  // when first time get to here
603  if ((iterateOverMe == nullptr) || (isDone == true)) {
604  return nullptr;
605  }
606 
607  size_t posToRecover = pos;
608  Handle<JoinMap<RHSType>> curJoinMapToRecover = curJoinMap;
609  JoinMapIterator<RHSType> curJoinMapIterToRecover = curJoinMapIter;
610  JoinMapIterator<RHSType> joinMapEndIterToRecover = joinMapEndIter;
611  size_t posInRecordListToRecover = posInRecordList;
612  JoinRecordList<RHSType>* myListToRecover = myList;
613  size_t myListSizeToRecover = myListSize;
614  size_t myHashToRecover = myHash;
615 
616  int overallCounter = 0;
617  hashColumn->clear();
618  while (true) {
619 
620  // if we made it here with lastRec being a valid pointer, then it means
621  // that we have gone through an entire cycle, and so all of the data that
622  // we will ever reference stored in lastRec has been fluhhed through the
623  // pipeline; hence, we can kill it
624 
625  if ((lastRec != nullptr) && (lastPage != nullptr)) {
626  doneWithVector(lastPage);
627  lastRec = nullptr;
628  lastPage = nullptr;
629  }
630 
631  while (curJoinMap == nullptr) {
632  curJoinMap = (*iterateOverMe)[pos];
633  pos++;
634  if (curJoinMap != nullptr) {
635  if ((curJoinMap->getPartitionId() % curJoinMap->getNumPartitions()) !=
636  myPartitionId) {
637  curJoinMap = nullptr;
638  } else {
639  curJoinMapIter = curJoinMap->begin();
640  joinMapEndIter = curJoinMap->end();
641  posInRecordList = 0;
642  }
643  }
644  if (curJoinMap == nullptr) {
645  if (pos == iterateOverMe->size()) {
646  break;
647  } else {
648  continue;
649  }
650  }
651  }
652  // there are two possibilities, first we find my map, second we come to end of this page
653  if (curJoinMap != nullptr) {
654  if (myList == nullptr) {
655  if (curJoinMapIter != joinMapEndIter) {
656  myList = *curJoinMapIter;
657  myListSize = myList->size();
658  myHash = myList->getHash();
659  posInRecordList = 0;
660  }
661  }
662  while (curJoinMapIter != joinMapEndIter) {
663  for (size_t i = posInRecordList; i < myListSize; i++) {
664  try {
665  unpack((*myList)[i], overallCounter, 0, columns);
666  } catch (NotEnoughSpace& n) {
667  pos = posToRecover;
668  curJoinMap = curJoinMapToRecover;
669  curJoinMapIter = curJoinMapIterToRecover;
670  joinMapEndIter = joinMapEndIterToRecover;
671  posInRecordList = posInRecordListToRecover;
672  myList = myListToRecover;
673  myListSize = myListSizeToRecover;
674  myHash = myHashToRecover;
675  throw n;
676  }
677  hashColumn->push_back(myHash);
678  posInRecordList++;
679  overallCounter++;
680  if (overallCounter == this->chunkSize) {
681  hashColumn->resize(overallCounter);
682  eraseEnd<RHSType>(overallCounter, 0, columns);
683  return output;
684  }
685  }
686  if (posInRecordList >= myListSize) {
687  posInRecordList = 0;
688  ++curJoinMapIter;
689  if (curJoinMapIter != joinMapEndIter) {
690  myList = *curJoinMapIter;
691  myListSize = myList->size();
692  myHash = myList->getHash();
693  } else {
694  myList = nullptr;
695  myListSize = 0;
696  myHash = 0;
697  }
698  }
699  }
700  curJoinMap = nullptr;
701  }
702  if ((curJoinMap == nullptr) && (pos == iterateOverMe->size())) {
703  // this means that we got to the end of the vector
704  lastRec = myRec;
705  if (myIter->hasNext() == true) {
706  myRec = (Record<Vector<Handle<JoinMap<RHSType>>>>*)myIter->next();
707  } else {
708  lastPage = myPage;
709  // try to get another vector
710  myPage = getAnotherVector();
711  if (myPage != nullptr) {
712  myIter = std::make_shared<RecordIterator>(myPage);
713  if (myIter->hasNext() == true) {
714  myRec = (Record<Vector<Handle<JoinMap<RHSType>>>>*)(myIter->next());
715  } else {
716  myRec = nullptr;
717  myIter = nullptr;
718  }
719  } else {
720  myRec = nullptr;
721  myIter = nullptr;
722  }
723  }
724  // if we could not, then we are outta here
725  if (myRec == nullptr) {
726  isDone = true;
727  iterateOverMe = nullptr;
728  if (overallCounter > 0) {
729 
730  hashColumn->resize(overallCounter);
731  eraseEnd<RHSType>(overallCounter, 0, columns);
732  return output;
733 
734  } else {
735  return nullptr;
736  }
737  }
738  // and reset everything
739  iterateOverMe = myRec->getRootObject();
740  pos = 0;
741  }
742  // our counter hasn't been full, so we continue the loop
743  }
744  isDone = true;
745  iterateOverMe = nullptr;
746  return nullptr;
747  }
748 
749 
751 
752  // if lastRec is not a nullptr, then it means that we have not yet freed it
753  if ((lastRec != nullptr) && (lastPage != nullptr)) {
754  makeObjectAllocatorBlock(4096, true);
755  doneWithVector(lastPage);
756  }
757  lastRec = nullptr;
758  lastPage = nullptr;
759  myRec = nullptr;
760  myPage = nullptr;
761  }
762 };
763 
764 // JiaNote: this class is used to create a Shuffler that picks all JoinMaps that belong to one node,
765 // and push back JoinMaps to another vector.
766 template <typename RHSType>
768 
769 private:
770  int nodeId;
771 
772 public:
774 
776 
777  void setNodeId(int nodeId) override {
778  this->nodeId = nodeId;
779  }
780 
781  int getNodeId() override {
782  return nodeId;
783  }
784 
786 
787  // we simply create a new map to store the output
789  makeObject<Vector<Handle<JoinMap<RHSType>>>>();
790  return returnVal;
791  }
792 
793  bool writeOut(Handle<Object> shuffleMe, Handle<Object>& shuffleToMe) override {
794 
795  // get the map we are adding to
796  Handle<JoinMap<RHSType>> theOtherMap = unsafeCast<JoinMap<RHSType>>(shuffleMe);
797  JoinMap<RHSType> mapToShuffle = *theOtherMap;
798 
799  Handle<Vector<Handle<JoinMap<RHSType>>>> shuffledMaps =
800  unsafeCast<Vector<Handle<JoinMap<RHSType>>>, Object>(shuffleToMe);
801  Vector<Handle<JoinMap<RHSType>>>& myMaps = *shuffledMaps;
802  Handle<JoinMap<RHSType>> thisMap;
803  try {
804  thisMap = makeObject<JoinMap<RHSType>>(mapToShuffle.size(),
805  mapToShuffle.getPartitionId(),
806  mapToShuffle.getNumPartitions());
807  } catch (NotEnoughSpace& n) {
808  std::cout << "ERROR: can't allocate for new map" << std::endl;
809  return false;
810  }
811  JoinMap<RHSType>& myMap = *thisMap;
812  int counter = 0;
813  int numPacked = 0;
814  for (JoinMapIterator<RHSType> iter = mapToShuffle.begin(); iter != mapToShuffle.end();
815  ++iter) {
816  JoinRecordList<RHSType>* myList = *iter;
817  size_t mySize = myList->size();
818  size_t myHash = myList->getHash();
819 
820  counter++;
821  if (mySize > 0) {
822  for (size_t i = 0; i < mySize; i++) {
823  if (myMap.count(myHash) == 0) {
824  try {
825  RHSType* temp = &(myMap.push(myHash));
826  packData(*temp, ((*myList)[i]));
827  numPacked++;
828  } catch (NotEnoughSpace& n) {
829  myMap.setUnused(myHash);
830  std::cout << "Run out of space in shuffling, to allocate a new page"
831  << std::endl;
832  delete (myList);
833  return false;
834  }
835  } else {
836  RHSType* temp;
837  try {
838  temp = &(myMap.push(myHash));
839  } catch (NotEnoughSpace& n) {
840  std::cout << "Run out of space in shuffling, to allocate a new page!"
841  << std::endl;
842  delete (myList);
843  return false;
844  }
845  try {
846  packData(*temp, ((*myList)[i]));
847  } catch (NotEnoughSpace& n) {
848  myMap.setUnused(myHash);
849  std::cout << "Run out of space in shuffling, to allocate a new page"
850  << std::endl;
851  delete (myList);
852  return false;
853  }
854  }
855  }
856  }
857  delete (myList);
858  }
859  // We push back only when the whole map has been copied. If it throws exception earlier than
860  // this point, the whole map will be discarded and will not add to the vector of maps.
861  myMaps.push_back(thisMap);
862  return true;
863  }
864 };
865 
866 // JiaNote: this class is used to create a special JoinSink that are partitioned into multiple
867 // JoinMaps
868 template <typename RHSType>
870 
871 private:
872  // number of partitions
874 
875  // number of nodes for shuffling
876  int numNodes;
877 
878  // tells us which attribute is the key
879  int keyAtt;
880 
881  // if useTheseAtts[i] = j, it means that the i^th attribute that we need to extract from the
882  // input tuple is j
883  std::vector<int> useTheseAtts;
884 
885  // if whereEveryoneGoes[i] = j, it means that the i^th entry in useTheseAtts goes in the j^th
886  // slot in the holder tuple
887  std::vector<int> whereEveryoneGoes;
888 
889  // this is the list of columns that we are processing
890  void** columns = nullptr;
891 
892 public:
894  if (columns != nullptr)
895  delete[] columns;
896  }
897 
898  PartitionedJoinSink(int numPartitionsPerNode,
899  int numNodes,
900  TupleSpec& inputSchema,
901  TupleSpec& attsToOperateOn,
902  TupleSpec& additionalAtts,
903  std::vector<int>& whereEveryoneGoes)
904  : whereEveryoneGoes(whereEveryoneGoes) {
905 
906  this->numPartitionsPerNode = numPartitionsPerNode;
907 
908  this->numNodes = numNodes;
909 
910  // used to manage attributes and set up the output
911  TupleSetSetupMachine myMachine(inputSchema);
912 
913  // figure out the key att
914  std::vector<int> matches = myMachine.match(attsToOperateOn);
915  keyAtt = matches[0];
916 
917  // now, figure out the attributes that we need to store in the hash table
918  useTheseAtts = myMachine.match(additionalAtts);
919  }
920 
922  // we create a vector of maps to store the output
924  makeObject<Vector<Handle<Vector<Handle<JoinMap<RHSType>>>>>>(numNodes);
925  for (int i = 0; i < numNodes; i++) {
927  makeObject<Vector<Handle<JoinMap<RHSType>>>>(numPartitionsPerNode);
928  for (int j = 0; j < numPartitionsPerNode; j++) {
929  Handle<JoinMap<RHSType>> myMap = makeObject<JoinMap<RHSType>>(
930  2, i * numPartitionsPerNode + j, numPartitionsPerNode);
931  myVector->push_back(myMap);
932  }
933  returnVal->push_back(myVector);
934  }
935  return returnVal;
936  }
937 
938  void writeOut(TupleSetPtr input, Handle<Object>& writeToMe) override {
939  PDB_COUT << "PartitionedJoinSink: write out tuples in this tuple set" << std::endl;
940  // get the map we are adding to
942  unsafeCast<Vector<Handle<Vector<Handle<JoinMap<RHSType>>>>>>(writeToMe);
943  // get all of the columns
944  if (columns == nullptr)
945  columns = new void*[whereEveryoneGoes.size()];
946 
947  int counter = 0;
948  // before: for (auto &a: whereEveryoneGoes) {
949  for (counter = 0; counter < whereEveryoneGoes.size(); counter++) {
950  // before: columns[a] = (void *) &(input->getColumn <int> (useTheseAtts[counter]));
951  columns[counter] =
952  (void*)&(input->getColumn<int>(useTheseAtts[whereEveryoneGoes[counter]]));
953  // before: counter++;
954  }
955 
956  // this is where the hash attribute is located
957  std::vector<size_t>& keyColumn = input->getColumn<size_t>(keyAtt);
958 
959  size_t length = keyColumn.size();
960  for (size_t i = 0; i < length; i++) {
961 #ifndef NO_MOD_PARTITION
962  size_t index = keyColumn[i] % (this->numPartitionsPerNode * this->numNodes);
963 #else
964  size_t index = (keyColumn[i] / (this->numPartitionsPerNode * this->numNodes)) %
965  (this->numPartitionsPerNode * this->numNodes);
966 #endif
967  size_t nodeIndex = index / this->numPartitionsPerNode;
968  size_t partitionIndex = index % this->numPartitionsPerNode;
969  JoinMap<RHSType>& myMap = *((*((*writeMe)[nodeIndex]))[partitionIndex]);
970  // try to add the key... this will cause an allocation for a new key/val pair
971  if (myMap.count(keyColumn[i]) == 0) {
972  try {
973  RHSType& temp = myMap.push(keyColumn[i]);
974  pack(temp, i, 0, columns);
975 
976  // if we get an exception, then we could not fit a new key/value pair
977  } catch (NotEnoughSpace& n) {
978  std::cout << "we are running out of space in writing join sink" << std::endl;
979  // if we got here, then we ran out of space, and so we need to delete the
980  // already-processed
981  // data so that we can try again...
982  myMap.setUnused(keyColumn[i]);
983  truncate<RHSType>(i, 0, columns);
984  keyColumn.erase(keyColumn.begin(), keyColumn.begin() + i);
985  throw n;
986  }
987 
988  // the key is there
989  } else {
990  // and add the value
991  RHSType* temp;
992  try {
993 
994  temp = &(myMap.push(keyColumn[i]));
995 
996  // an exception means that we couldn't complete the addition
997  } catch (NotEnoughSpace& n) {
998 
999  std::cout << "we are running out of space in writing join sink" << std::endl;
1000  truncate<RHSType>(i, 0, columns);
1001  keyColumn.erase(keyColumn.begin(), keyColumn.begin() + i);
1002  throw n;
1003  }
1004 
1005  // now try to do the copy
1006  try {
1007 
1008  pack(*temp, i, 0, columns);
1009 
1010  // if the copy didn't work, pop the value off
1011  } catch (NotEnoughSpace& n) {
1012  std::cout << "we are running out of space in writing join sink" << std::endl;
1013  myMap.setUnused(keyColumn[i]);
1014  truncate<RHSType>(i, 0, columns);
1015  keyColumn.erase(keyColumn.begin(), keyColumn.begin() + i);
1016  throw n;
1017  }
1018  }
1019  }
1020  }
1021 };
1022 
1023 
1024 // this class is used to create a ComputeSink object that stores special objects that wrap up
1025 // multiple columns of a tuple
1026 template <typename RHSType>
1027 class JoinSink : public ComputeSink {
1028 
1029 private:
1030  // tells us which attribute is the key
1031  int keyAtt;
1032 
1033  // if useTheseAtts[i] = j, it means that the i^th attribute that we need to extract from the
1034  // input tuple is j
1035  std::vector<int> useTheseAtts;
1036 
1037  // if whereEveryoneGoes[i] = j, it means that the i^th entry in useTheseAtts goes in the j^th
1038  // slot in the holder tuple
1039  std::vector<int> whereEveryoneGoes;
1040 
1041  // this is the list of columns that we are processing
1042  void** columns = nullptr;
1043 
1044 public:
1046  if (columns != nullptr)
1047  delete[] columns;
1048  }
1049 
1050  JoinSink(TupleSpec& inputSchema,
1051  TupleSpec& attsToOperateOn,
1052  TupleSpec& additionalAtts,
1053  std::vector<int>& whereEveryoneGoes)
1054  : whereEveryoneGoes(whereEveryoneGoes) {
1055 
1056  // used to manage attributes and set up the output
1057  TupleSetSetupMachine myMachine(inputSchema);
1058 
1059  // figure out the key att
1060  std::vector<int> matches = myMachine.match(attsToOperateOn);
1061  keyAtt = matches[0];
1062 
1063  // now, figure out the attributes that we need to store in the hash table
1064  useTheseAtts = myMachine.match(additionalAtts);
1065  }
1066 
1068  PDB_COUT << "JoinSink: to create new JoinMap instance" << std::endl;
1069  // we simply create a new map to store the output
1070  Handle<JoinMap<RHSType>> returnVal = makeObject<JoinMap<RHSType>>();
1071  return returnVal;
1072  }
1073 
1074  void writeOut(TupleSetPtr input, Handle<Object>& writeToMe) override {
1075  PDB_COUT << "JoinSink: write out tuples in this tuple set" << std::endl;
1076  // get the map we are adding to
1077  Handle<JoinMap<RHSType>> writeMe = unsafeCast<JoinMap<RHSType>>(writeToMe);
1078  JoinMap<RHSType>& myMap = *writeMe;
1079 
1080  // get all of the columns
1081  if (columns == nullptr)
1082  columns = new void*[whereEveryoneGoes.size()];
1083 
1084  int counter = 0;
1085  // before: for (auto &a: whereEveryoneGoes) {
1086  for (counter = 0; counter < whereEveryoneGoes.size(); counter++) {
1087  // before: columns[a] = (void *) &(input->getColumn <int> (useTheseAtts[counter]));
1088  columns[counter] =
1089  (void*)&(input->getColumn<int>(useTheseAtts[whereEveryoneGoes[counter]]));
1090  // before: counter++;
1091  }
1092 
1093  // this is where the hash attribute is located
1094  std::vector<size_t>& keyColumn = input->getColumn<size_t>(keyAtt);
1095 
1096  size_t length = keyColumn.size();
1097  for (size_t i = 0; i < length; i++) {
1098 
1099  // try to add the key... this will cause an allocation for a new key/val pair
1100  if (myMap.count(keyColumn[i]) == 0) {
1101 
1102  try {
1103  RHSType& temp = myMap.push(keyColumn[i]);
1104  pack(temp, i, 0, columns);
1105 
1106  // if we get an exception, then we could not fit a new key/value pair
1107  } catch (NotEnoughSpace& n) {
1108  // if we got here, then we ran out of space, and so we need to delete the
1109  // already-processed
1110  // data so that we can try again...
1111  myMap.setUnused(keyColumn[i]);
1112  truncate<RHSType>(i, 0, columns);
1113  keyColumn.erase(keyColumn.begin(), keyColumn.begin() + i);
1114  throw n;
1115  }
1116 
1117  // the key is there
1118  } else {
1119 
1120  // and add the value
1121  RHSType* temp;
1122  try {
1123 
1124  temp = &(myMap.push(keyColumn[i]));
1125 
1126  // an exception means that we couldn't complete the addition
1127  } catch (NotEnoughSpace& n) {
1128 
1129  truncate<RHSType>(i, 0, columns);
1130  keyColumn.erase(keyColumn.begin(), keyColumn.begin() + i);
1131  throw n;
1132  }
1133 
1134  // now try to do the copy
1135  try {
1136 
1137  pack(*temp, i, 0, columns);
1138 
1139  // if the copy didn't work, pop the value off
1140  } catch (NotEnoughSpace& n) {
1141  myMap.setUnused(keyColumn[i]);
1142  truncate<RHSType>(i, 0, columns);
1143  keyColumn.erase(keyColumn.begin(), keyColumn.begin() + i);
1144  throw n;
1145  }
1146  }
1147  }
1148  }
1149 };
1150 
1151 // all join singletons descend from this
1153 
1154 public:
1155  virtual ComputeExecutorPtr getProber(void* hashTable,
1156  std::vector<int>& positions,
1157  TupleSpec& inputSchema,
1158  TupleSpec& attsToOperateOn,
1159  TupleSpec& attsToIncludeInOutput,
1160  bool needToSwapLHSAndRhs) = 0;
1161 
1162  virtual ComputeSinkPtr getSink(TupleSpec& consumeMe,
1163  TupleSpec& attsToOpOn,
1164  TupleSpec& projection,
1165  std::vector<int>& whereEveryoneGoes) = 0;
1166 
1167  virtual ComputeSinkPtr getPartitionedSink(int numPartitionsPerNode,
1168  int numNodes,
1169  TupleSpec& consumeMe,
1170  TupleSpec& attsToOpOn,
1171  TupleSpec& projection,
1172  std::vector<int>& whereEveryoneGoes) = 0;
1173 
1174 
1175  virtual ComputeSourcePtr getPartitionedSource(size_t myPartitionId,
1176  std::function<PDBPagePtr()> getAnotherVector,
1177  std::function<void(PDBPagePtr)> doneWithVector,
1178  size_t chunkSize,
1179  std::vector<int>& whereEveryoneGoes) = 0;
1180 
1181  virtual SinkMergerPtr getMerger() = 0;
1182 
1183  virtual SinkShufflerPtr getShuffler() = 0;
1184 };
1185 
1186 // this is an actual class
1187 template <typename HoldMe>
1189 
1190  // the actual data that we hold
1191  HoldMe myData;
1192 
1193 public:
1194  // gets a hash table prober
1196  std::vector<int>& positions,
1197  TupleSpec& inputSchema,
1198  TupleSpec& attsToOperateOn,
1199  TupleSpec& attsToIncludeInOutput,
1200  bool needToSwapLHSAndRhs) override {
1201  return std::make_shared<JoinProbe<HoldMe>>(hashTable,
1202  positions,
1203  inputSchema,
1204  attsToOperateOn,
1205  attsToIncludeInOutput,
1206  needToSwapLHSAndRhs);
1207  }
1208 
1209  // creates a compute sink for this particular type
1211  TupleSpec& attsToOpOn,
1212  TupleSpec& projection,
1213  std::vector<int>& whereEveryoneGoes) override {
1214  return std::make_shared<JoinSink<HoldMe>>(
1215  consumeMe, attsToOpOn, projection, whereEveryoneGoes);
1216  }
1217 
1218  // JiaNote: create a partitioned sink for this particular type
1219  ComputeSinkPtr getPartitionedSink(int numPartitionsPerNode,
1220  int numNodes,
1221  TupleSpec& consumeMe,
1222  TupleSpec& attsToOpOn,
1223  TupleSpec& projection,
1224  std::vector<int>& whereEveryoneGoes) override {
1225  return std::make_shared<PartitionedJoinSink<HoldMe>>(
1226  numPartitionsPerNode, numNodes, consumeMe, attsToOpOn, projection, whereEveryoneGoes);
1227  }
1228 
1229  // JiaNote: create a partitioned source for this particular type
1231  std::function<PDBPagePtr()> getAnotherVector,
1232  std::function<void(PDBPagePtr)> doneWithVector,
1233  size_t chunkSize,
1234  std::vector<int>& whereEveryoneGoes) override {
1235  return std::make_shared<PartitionedJoinMapTupleSetIterator<HoldMe>>(
1236  myPartitionId, getAnotherVector, doneWithVector, chunkSize, whereEveryoneGoes);
1237  }
1238 
1239 
1240  // JiaNote: create a merger
1242  return std::make_shared<JoinSinkMerger<HoldMe>>();
1243  }
1244 
1245  // JiaNote: create a shuffler
1247  return std::make_shared<JoinSinkShuffler<HoldMe>>();
1248  }
1249 };
1250 
1251 typedef std::shared_ptr<JoinTupleSingleton> JoinTuplePtr;
1252 
1253 inline int findType(std::string& findMe, std::vector<std::string>& typeList) {
1254  for (int i = 0; i < typeList.size(); i++) {
1255  if (typeList[i] == findMe) {
1256  typeList[i] = std::string("");
1257  return i;
1258  }
1259  }
1260  return -1;
1261 }
1262 
1263 template <typename In1>
1264 typename std::enable_if<std::is_base_of<JoinTupleBase, In1>::value, JoinTuplePtr>::type
1265 findCorrectJoinTuple(std::vector<std::string>& typeList, std::vector<int>& whereEveryoneGoes);
1266 
1267 template <typename In1, typename... Rest>
1268 typename std::enable_if<sizeof...(Rest) != 0 && !std::is_base_of<JoinTupleBase, In1>::value,
1269  JoinTuplePtr>::type
1270 findCorrectJoinTuple(std::vector<std::string>& typeList, std::vector<int>& whereEveryoneGoes);
1271 
1272 template <typename In1, typename In2, typename... Rest>
1273 typename std::enable_if<std::is_base_of<JoinTupleBase, In1>::value, JoinTuplePtr>::type
1274 findCorrectJoinTuple(std::vector<std::string>& typeList, std::vector<int>& whereEveryoneGoes);
1275 
1276 template <typename In1>
1277 typename std::enable_if<!std::is_base_of<JoinTupleBase, In1>::value, JoinTuplePtr>::type
1278 findCorrectJoinTuple(std::vector<std::string>& typeList, std::vector<int>& whereEveryoneGoes);
1279 
1280 template <typename In1>
1281 typename std::enable_if<!std::is_base_of<JoinTupleBase, In1>::value, JoinTuplePtr>::type
1282 findCorrectJoinTuple(std::vector<std::string>& typeList, std::vector<int>& whereEveryoneGoes) {
1283 
1284  // we must always have one type...
1285  JoinTuplePtr returnVal;
1286  std::string in1Name = getTypeName<Handle<In1>>();
1287  int in1Pos = findType(in1Name, typeList);
1288 
1289  if (in1Pos != -1) {
1290  whereEveryoneGoes.push_back(in1Pos);
1291  typeList[in1Pos] = in1Name;
1292  return std::make_shared<JoinSingleton<JoinTuple<In1, char[0]>>>();
1293  } else {
1294  std::cout << "Why did we not find a type?\n";
1295  exit(1);
1296  }
1297 }
1298 
1299 template <typename In1>
1300 typename std::enable_if<std::is_base_of<JoinTupleBase, In1>::value, JoinTuplePtr>::type
1301 findCorrectJoinTuple(std::vector<std::string>& typeList, std::vector<int>& whereEveryoneGoes) {
1302  return std::make_shared<JoinSingleton<In1>>();
1303 }
1304 
1305 template <typename In1, typename... Rest>
1306 typename std::enable_if<sizeof...(Rest) != 0 && !std::is_base_of<JoinTupleBase, In1>::value,
1307  JoinTuplePtr>::type
1308 findCorrectJoinTuple(std::vector<std::string>& typeList, std::vector<int>& whereEveryoneGoes) {
1309 
1310  JoinTuplePtr returnVal;
1311  std::string in1Name = getTypeName<Handle<In1>>();
1312  int in1Pos = findType(in1Name, typeList);
1313 
1314  if (in1Pos != -1) {
1315  returnVal =
1316  findCorrectJoinTuple<JoinTuple<In1, char[0]>, Rest...>(typeList, whereEveryoneGoes);
1317  whereEveryoneGoes.push_back(in1Pos);
1318  typeList[in1Pos] = in1Name;
1319  } else {
1320  returnVal = findCorrectJoinTuple<Rest...>(typeList, whereEveryoneGoes);
1321  }
1322 
1323  return returnVal;
1324 }
1325 
1326 template <typename In1, typename In2, typename... Rest>
1327 typename std::enable_if<std::is_base_of<JoinTupleBase, In1>::value, JoinTuplePtr>::type
1328 findCorrectJoinTuple(std::vector<std::string>& typeList, std::vector<int>& whereEveryoneGoes) {
1329 
1330  JoinTuplePtr returnVal;
1331  std::string in2Name = getTypeName<Handle<In2>>();
1332  int in2Pos = findType(in2Name, typeList);
1333  if (in2Pos != -1) {
1334  returnVal = findCorrectJoinTuple<JoinTuple<In2, In1>, Rest...>(typeList, whereEveryoneGoes);
1335  whereEveryoneGoes.push_back(in2Pos);
1336  typeList[in2Pos] = in2Name;
1337  } else {
1338  returnVal = findCorrectJoinTuple<In1, Rest...>(typeList, whereEveryoneGoes);
1339  }
1340 
1341  return returnVal;
1342 }
1343 }
1344 
1345 #endif
std::enable_if< sizeof(TypeToCreate::myOtherData)==0, void >::type createCols(void **putUsHere, TupleSet &processMe, int offset, int whichPos, std::vector< int > positions)
Definition: JoinTuple.h:145
shared_ptr< PDBPage > PDBPagePtr
Definition: PDBPage.h:32
std::shared_ptr< ComputeSource > ComputeSourcePtr
Definition: ComputeSource.h:26
static U test(U x, int)
std::enable_if< sizeof(TypeToTruncate::myOtherData)==0, void >::type eraseEnd(int i, int whichVec, void **us)
Definition: JoinTuple.h:233
#define REF_COUNT_PREAMBLE_SIZE
static void eraseEnd(void *input, int i)
Definition: JoinTuple.h:133
std::vector< int > useTheseAtts
Definition: JoinTuple.h:883
std::vector< int > useTheseAtts
Definition: JoinTuple.h:1035
Handle< ObjType > getRootObject()
Definition: Record.cc:46
std::vector< int > whereEveryoneGoes
Definition: JoinTuple.h:1039
void ** columns
Definition: JoinTuple.h:293
Handle< JoinMap< RHSType > > inputTable
Definition: JoinTuple.h:287
PartitionedJoinMapTupleSetIterator(size_t myPartitionId, std::function< PDBPagePtr()> getAnotherVector, std::function< void(PDBPagePtr)> doneWithVector, size_t chunkSize, std::vector< int > positions)
Definition: JoinTuple.h:542
std::shared_ptr< JoinTupleSingleton > JoinTuplePtr
Definition: JoinTuple.h:1251
void copyDataFrom(HoldMe me)
Definition: JoinTuple.h:107
std::vector< std::string > & getAtts()
Definition: TupleSpec.h:60
SinkMergerPtr getMerger() override
Definition: JoinTuple.h:1241
std::shared_ptr< SinkShuffler > SinkShufflerPtr
Definition: SinkShuffler.h:31
TupleSetPtr getNextTupleSet() override
Definition: JoinTuple.h:599
void setUnused(const size_t &clearMe)
Definition: JoinMap.cc:68
std::shared_ptr< SinkMerger > SinkMergerPtr
Definition: SinkMerger.h:30
void setChunkSize(size_t chunkSize) override
Definition: JoinTuple.h:594
void copyTo(T &out, Handle< T > &in)
Definition: JoinTuple.h:52
void setNodeId(int nodeId) override
Definition: JoinTuple.h:777
std::enable_if<(sizeof(TypeToPackData::myOtherData)==0)&&(sizeof(TypeToPackData::myData)!=0), void >::type packData(TypeToPackData &arg, TypeToPackData data)
Definition: JoinTuple.h:169
ComputeSinkPtr getPartitionedSink(int numPartitionsPerNode, int numNodes, TupleSpec &consumeMe, TupleSpec &attsToOpOn, TupleSpec &projection, std::vector< int > &whereEveryoneGoes) override
Definition: JoinTuple.h:1219
Handle< Object > createNewOutputContainer() override
Definition: JoinTuple.h:921
Record< Vector< Handle< JoinMap< RHSType > > > > * myRec
Definition: JoinTuple.h:493
std::vector< int > match(TupleSpec &attsToMatch)
std::enable_if< sizeof(TypeToTruncate::myOtherData)!=0, void >::type truncate(int i, int whichVec, void **us)
Definition: JoinTuple.h:264
JoinMapIterator< ValueType > begin()
Definition: JoinMap.cc:97
int findType(std::string &findMe, std::vector< std::string > &typeList)
Definition: JoinTuple.h:1253
int getNodeId() override
Definition: JoinTuple.h:781
Handle< Vector< Handle< JoinMap< RHSType > > > > iterateOverMe
Definition: JoinTuple.h:489
void writeOut(TupleSetPtr input, Handle< Object > &writeToMe) override
Definition: JoinTuple.h:938
std::function< void(PDBPagePtr)> doneWithVector
Definition: JoinTuple.h:486
PartitionedJoinSink(int numPartitionsPerNode, int numNodes, TupleSpec &inputSchema, TupleSpec &attsToOperateOn, TupleSpec &additionalAtts, std::vector< int > &whereEveryoneGoes)
Definition: JoinTuple.h:898
std::string getType() override
Definition: JoinTuple.h:344
std::enable_if< sizeof(TypeToTruncate::myOtherData)==0, void >::type truncate(int i, int whichVec, void **us)
Definition: JoinTuple.h:254
JoinProbe(void *hashTable, std::vector< int > &positions, TupleSpec &inputSchema, TupleSpec &attsToOperateOn, TupleSpec &attsToIncludeInOutput, bool needToSwapLHSAndRhs)
Definition: JoinTuple.h:315
TupleSetPtr process(TupleSetPtr input) override
Definition: JoinTuple.h:348
std::vector< size_t > * hashColumn
Definition: JoinTuple.h:520
Handle< JoinMap< RHSType > > curJoinMap
Definition: JoinTuple.h:505
TupleSetPtr output
Definition: JoinTuple.h:278
ComputeExecutorPtr getProber(void *hashTable, std::vector< int > &positions, TupleSpec &inputSchema, TupleSpec &attsToOperateOn, TupleSpec &attsToIncludeInOutput, bool needToSwapLHSAndRhs) override
Definition: JoinTuple.h:1195
std::enable_if< std::is_base_of< JoinTupleBase, In1 >::value, JoinTuplePtr >::type findCorrectJoinTuple(std::vector< std::string > &typeList, std::vector< int > &whereEveryoneGoes)
Definition: JoinTuple.h:1301
std::shared_ptr< ComputeSink > ComputeSinkPtr
Definition: ComputeSink.h:27
std::enable_if< sizeof(TypeToUnPack::myOtherData)==0, void >::type unpack(TypeToUnPack &arg, int whichPosInTupleSet, int whichVec, void **us)
Definition: JoinTuple.h:215
std::function< PDBPagePtr()> getAnotherVector
Definition: JoinTuple.h:483
void copyTo(void *input, int whichPos)
Definition: JoinTuple.h:116
std::shared_ptr< TupleSet > TupleSetPtr
Definition: TupleSet.h:64
static decltype(test< T >(*((T *) 0), 1) val)
Definition: JoinTuple.h:75
JoinRecordList< ValueType > lookup(const size_t &which)
Definition: JoinMap.cc:82
#define PDB_COUT
Definition: PDBDebug.h:31
void copyDataFrom(Handle< HoldMe > me)
Definition: JoinTuple.h:103
std::shared_ptr< ComputeExecutor > ComputeExecutorPtr
ComputeSinkPtr getSink(TupleSpec &consumeMe, TupleSpec &attsToOpOn, TupleSpec &projection, std::vector< int > &whereEveryoneGoes) override
Definition: JoinTuple.h:1210
std::shared_ptr< RecordIterator > RecordIteratorPtr
Handle< Object > createNewOutputContainer() override
Definition: JoinTuple.h:785
void writeOut(Handle< Object > mergeMe, Handle< Object > &mergeToMe) override
Definition: JoinTuple.h:405
std::vector< uint32_t > counts
Definition: JoinTuple.h:290
void copyFrom(T &out, Handle< T > &in)
Definition: JoinTuple.h:32
static void truncate(void *input, int i)
Definition: JoinTuple.h:128
bool writeOut(Handle< Object > shuffleMe, Handle< Object > &shuffleToMe) override
Definition: JoinTuple.h:793
void copyFrom(void *input, int whichPos)
Definition: JoinTuple.h:111
TupleSetSetupMachine myMachine
Definition: JoinTuple.h:284
ValueType & push(const size_t &which)
Definition: JoinMap.cc:73
Handle< Object > createNewOutputContainer() override
Definition: JoinTuple.h:398
JoinMapIterator< RHSType > curJoinMapIter
Definition: JoinTuple.h:508
ComputeSourcePtr getPartitionedSource(size_t myPartitionId, std::function< PDBPagePtr()> getAnotherVector, std::function< void(PDBPagePtr)> doneWithVector, size_t chunkSize, std::vector< int > &whereEveryoneGoes) override
Definition: JoinTuple.h:1230
int count(const size_t &which)
Definition: JoinMap.cc:87
std::enable_if< sizeof(TypeToPack::myOtherData)==0, void >::type pack(TypeToPack &arg, int whichPosInTupleSet, int whichVec, void **us)
Definition: JoinTuple.h:191
std::enable_if< sizeof(TypeToTruncate::myOtherData)!=0, void >::type eraseEnd(int i, int whichVec, void **us)
Definition: JoinTuple.h:242
void makeObjectAllocatorBlock(size_t numBytesIn, bool throwExceptionOnFail)
JoinMapIterator< RHSType > joinMapEndIter
Definition: JoinTuple.h:511
void writeOut(TupleSetPtr input, Handle< Object > &writeToMe) override
Definition: JoinTuple.h:1074
JoinMapIterator< ValueType > end()
Definition: JoinMap.cc:103
void writeVectorOut(Handle< Object > mergeMe, Handle< Object > &mergeToMe) override
Definition: JoinTuple.h:436
JoinSink(TupleSpec &inputSchema, TupleSpec &attsToOperateOn, TupleSpec &additionalAtts, std::vector< int > &whereEveryoneGoes)
Definition: JoinTuple.h:1050
SinkShufflerPtr getShuffler() override
Definition: JoinTuple.h:1246
std::vector< int > whereEveryoneGoes
Definition: JoinTuple.h:887
Handle< Object > createNewOutputContainer() override
Definition: JoinTuple.h:1067