A platform for high-performance distributed tool and library development written in C++. It can be deployed in two different cluster modes: standalone or distributed. API for v0.5.0, released on June 13, 2018.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
VectorTupleSetIterator.h
Go to the documentation of this file.
1 /*****************************************************************************
2  * *
3  * Copyright 2018 Rice University *
4  * *
5  * Licensed under the Apache License, Version 2.0 (the "License"); *
6  * you may not use this file except in compliance with the License. *
7  * You may obtain a copy of the License at *
8  * *
9  * http://www.apache.org/licenses/LICENSE-2.0 *
10  * *
11  * Unless required by applicable law or agreed to in writing, software *
12  * distributed under the License is distributed on an "AS IS" BASIS, *
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. *
14  * See the License for the specific language governing permissions and *
15  * limitations under the License. *
16  * *
17  *****************************************************************************/
18 
19 #ifndef VECTOR_TUPLESET_ITER_H
20 #define VECTOR_TUPLESET_ITER_H
21 
22 namespace pdb {
23 
24 // this class iterates over an input pdb :: Vector, breaking it up into a series of TupleSet objects
26 
27 private:
28  // function to call to get another vector to process
29  std::function<void*()> getAnotherVector;
30 
31  // function to call to free the vector
32  std::function<void(void*)> doneWithVector;
33 
34  // this is the vector to process
36 
37  // the pointer to the current page holding the vector, and the last page that we previously
38  // processed
40 
41  // how many objects to put into a chunk
42  size_t chunkSize;
43 
44  // where we are in the chunk
45  size_t pos;
46 
47  // and the tuple set we return
49 
50 public:
51  // the first param is a callback function that the iterator will call in order to obtain the
52  // page holding the next vector to iterate
53  // over. The secomd param is a callback that the iterator will call when the specified page is
54  // done being processed and can be
55  // freed. The third param tells us how many objects to put into a tuple set
56  VectorTupleSetIterator(std::function<void*()> getAnotherVector,
57  std::function<void(void*)> doneWithVector,
58  size_t chunkSize)
60 
61  // create the tuple set that we'll return during iteration
62  output = std::make_shared<TupleSet>();
63 
64  // extract the vector from the input page
66 
67  if (myRec != nullptr) {
68 
69  iterateOverMe = myRec->getRootObject();
70  PDB_COUT << "Got iterateOverMe" << std::endl;
71  // create the output vector and put it into the tuple set
72  std::vector<Handle<Object>>* inputColumn = new std::vector<Handle<Object>>;
73  output->addColumn(0, inputColumn, true);
74  } else {
75 
76  iterateOverMe = nullptr;
77  output = nullptr;
78  }
79 
80  // we are at position zero
81  pos = 0;
82 
83  // and we have no data so far
84  lastRec = nullptr;
85  }
86 
87  void setChunkSize(size_t chunkSize) override {
88  this->chunkSize = chunkSize;
89  }
90 
91 
92  // returns the next tuple set to process, or nullptr if there is not one to process
94 
95  // JiaNote: below two lines are necessary to fix a bug that iterateOverMe may be nullptr
96  // when first time get to here
97  if (iterateOverMe == nullptr) {
98  return nullptr;
99  }
100 
101  // if we made it here with lastRec being a valid pointer, then it means
102  // that we have gone through an entire cycle, and so all of the data that
103  // we will ever reference stored in lastRec has been fluhhed through the
104  // pipeline; hence, we can kill it
105 
106  if (lastRec != nullptr) {
108  lastRec = nullptr;
109  }
110 
111  size_t mySize = iterateOverMe->size();
112  if (mySize == 0) {
113  return nullptr;
114  }
115  // see if there are no more items in the vector to iterate over
116  if (pos == mySize) {
117 
118  // this means that we got to the end of the vector
119  lastRec = myRec;
120 
121  // try to get another vector
123 
124  // if we could not, then we are outta here
125  if (myRec == nullptr)
126  return nullptr;
127 
128  // and reset everything
129  iterateOverMe = myRec->getRootObject();
130  // JiaNote: we also need to reset mySize
131  mySize = iterateOverMe->size();
132  if (mySize == 0) {
133  return nullptr;
134  }
135  pos = 0;
136  }
137 
138  // compute how many slots in the output vector we can fill
139  int numSlotsToIterate = chunkSize;
140  if (numSlotsToIterate + pos > mySize) {
141  numSlotsToIterate = mySize - pos;
142  }
143 
145 
146  // resize the output vector as appropriate
147  std::vector<Handle<Object>>& inputColumn = output->getColumn<Handle<Object>>(0);
148  inputColumn.resize(numSlotsToIterate);
149  // fill it up
150  for (int i = 0; i < numSlotsToIterate; i++) {
151  inputColumn[i] = myVec[pos];
152  pos++;
153  }
154 
155  // and return the output TupleSet
156  return output;
157  }
158 
160 
161  // if lastRec is not a nullptr, then it means that we have not yet freed it
162  if (lastRec != nullptr) {
163  makeObjectAllocatorBlock(4096, true);
165  }
166 
167  lastRec = nullptr;
168  }
169 };
170 }
171 
172 #endif
std::function< void(void *)> doneWithVector
TupleSetPtr getNextTupleSet() override
std::function< void *()> getAnotherVector
Record< Vector< Handle< Object > > > * lastRec
void setChunkSize(size_t chunkSize) override
std::shared_ptr< TupleSet > TupleSetPtr
Definition: TupleSet.h:64
Handle< Vector< Handle< Object > > > iterateOverMe
#define PDB_COUT
Definition: PDBDebug.h:31
VectorTupleSetIterator(std::function< void *()> getAnotherVector, std::function< void(void *)> doneWithVector, size_t chunkSize)
void makeObjectAllocatorBlock(size_t numBytesIn, bool throwExceptionOnFail)
Record< Vector< Handle< Object > > > * myRec