A platform for high-performance distributed tool and library development written in C++. It can be deployed in two different cluster modes: standalone or distributed. API for v0.5.0, released on June 13, 2018.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
TupleSet.h
Go to the documentation of this file.
1 /*****************************************************************************
2  * *
3  * Copyright 2018 Rice University *
4  * *
5  * Licensed under the Apache License, Version 2.0 (the "License"); *
6  * you may not use this file except in compliance with the License. *
7  * You may obtain a copy of the License at *
8  * *
9  * http://www.apache.org/licenses/LICENSE-2.0 *
10  * *
11  * Unless required by applicable law or agreed to in writing, software *
12  * distributed under the License is distributed on an "AS IS" BASIS, *
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. *
14  * See the License for the specific language governing permissions and *
15  * limitations under the License. *
16  * *
17  *****************************************************************************/
18 
19 #ifndef TUPLE_SET_H
20 #define TUPLE_SET_H
21 
22 #include "Handle.h"
23 #include "PDBVector.h"
24 #include <functional>
25 
26 namespace pdb {
27 
28 // this simple function automatically deferences a pointer type
29 template <bool B, typename InputType>
30 auto tryDereference(InputType& arg) -> typename std::enable_if_t<B, decltype(*arg) &> {
31  return *arg;
32 }
33 
34 // called instead if we can't dereference the pointer type
35 template <bool B, typename InputType>
36 auto tryDereference(InputType&& arg) -> typename std::enable_if_t<!B, InputType&> {
37  return arg;
38 }
39 
40 // this simple function finds the size of a
41 template <bool B, typename InputType>
42 auto getSerializedSize() -> typename std::enable_if_t<B, size_t> {
43  return InputType::getObjectSize();
44 }
45 
46 // called instead if we can't dereference the pointer type
47 template <bool B, typename InputType>
48 auto getSerializedSize() -> typename std::enable_if_t<!B, size_t> {
49  return sizeof(InputType);
50 }
51 
52 // this simple function automatically takes the address of a type
53 template <bool B, typename InputType>
54 auto tryToObtainPointer(InputType& arg) -> typename std::enable_if_t<B, InputType*> {
55  return &arg;
56 }
57 
58 // called instead if we can't dereference the pointer type
59 template <bool B, typename InputType>
60 auto tryToObtainPointer(InputType& arg) -> typename std::enable_if_t<!B, InputType&> {
61  return arg;
62 }
63 
64 class TupleSet;
65 typedef std::shared_ptr<TupleSet> TupleSetPtr;
66 
67 // this structure contains type-specific information that will allow us to properly delete and/or
68 // fliter
69 // a column
71 
72  // this is a deleter for a particular column, stored as a void*
73  std::function<void(void*)> deleter;
74 
75  // this is a filter function for a particular column
76  std::function<void*(void*, std::vector<bool>&)> filter;
77 
78  // this replicates instances of a column to run a join
79  std::function<void*(void*, std::vector<uint32_t>&)> replicate;
80 
81  // JiaNote: this gets count for a particular column
82  std::function<size_t(void*)> getCount;
83 
84  // this is a function that creates and returns a pdb :: Vector for a column
85  std::function<Handle<Vector<Handle<Object>>>()> createPDBVector;
86 
87  // this function writes out the column to a pdb :: Vector
88  std::function<void(Handle<Vector<Handle<Object>>>&, void*, size_t&)> writeToVector;
89 
90  // this is the name of the type that we contain
91  std::string typeContained;
92 
93  // tells us if we need to delete
94  bool mustDelete;
95 
96  // tells us the serialized size of an object in this column
98 
99  // the last value that we wrote if we are writing out this column
100  size_t lastWritten = 0;
101 
102  // empty constructor
104 
105  // fill all of the fields
107  std::function<void(void*)> deleter,
108  std::function<void*(void*, std::vector<bool>&)> filter,
109  std::function<void*(void*, std::vector<uint32_t>&)> replicate,
110  std::function<size_t(void*)> getCount,
111  std::function<Handle<Vector<Handle<Object>>>()> createPDBVector,
112  std::function<void(Handle<Vector<Handle<Object>>>&, void*, size_t&)> writeToVector,
113  bool mustDelete,
114  std::string typeContained,
115  size_t serializedSize)
116  : deleter(deleter),
117  filter(filter),
122  typeContained(typeContained),
123  mustDelete(mustDelete),
124  serializedSize(serializedSize) {}
125 };
126 
127 // this is the basic type that it pushed through the system during query processing
128 class TupleSet {
129 
130 private:
131  // the key of this map is an integer (the identifier of the column)
132  // the value is a pair; the first is a pointer to the column, and the second is a pair of
133  // lambdas. The first is a deleter (which frees the column) and the second is a condenser
134  // (that filters rows from the column)
135  std::map<int, std::pair<void*, MaintenanceFuncs>> columns;
136 
137 public:
138  // get the number of columns in this TupleSet
140  return columns.size();
141  }
142 
143  /* TODO: this will be needed to be able to do joins!!!
144  // deep copies the set of serialized objects from the specified location, do the specified
145  location
146  void deepCopyAndDelete (std :: vector <void *> &source, std :: vector <void *> &dest) {
147  size_t offset = 0;
148  for (int i = 0; columns.count (i) != 0; i++) {
149  // columns[i].second.deepCopyAndDelete (source, dest, offset);
150  // offset += columns[i].second.serializedSize;
151  }
152  }*/
153 
154  // gets a list, in order, of the types of the columns in this tuple set
155  // this can be used at a later time to re-constitute the tuple set
156  std::vector<std::string> getTypeNames() {
157  std::vector<std::string> output;
158  for (int i = 0; columns.count(i) != 0; i++) {
159  output.push_back(columns[i].second.typeContained);
160  }
161  return output;
162  }
163 
164  /* TODO: this will be needed to be able to do joins!!!
165  // gets the serialized size of one row in this tuple set
166  size_t getSerializedSize () {
167  return columns[whichColumn].second.serializedSize;
168  size_t offset = 0;
169  for (int i = 0; columns.count (i) != 0; i++) {
170  offset += columns[i].second.serializedSize;
171  }
172  return offset;
173  } */
174 
175 
176  // this takes as input a vector of pointers to
177  // return a specified column
178  template <typename ColType>
179  std::vector<ColType>& getColumn(int whichColumn) {
180  if (columns.count(whichColumn) == 0) {
181  std::cout << "This is bad. Tried to get column " << whichColumn
182  << " but could not find it.\n";
183  }
184  return *((std::vector<ColType>*)columns[whichColumn].first);
185  }
186 
187  // writes out a specified column... the boolean argument is true when we want to start from
188  // scratch; false
189  // if we want to continue the last write
190  void writeOutColumn(int whichColumn,
191  Handle<Vector<Handle<Object>>>& writeToMe,
192  bool startFromScratch) {
193  if (columns.count(whichColumn) == 0) {
194  std::cout << "This is bad. Tried to write out column " << whichColumn
195  << " but could not find it.\n";
196  }
197  auto& which = columns[whichColumn];
198 
199  // if we we need to start over, then do do
200  if (startFromScratch)
201  which.second.lastWritten = 0;
202 
203  which.second.writeToVector(writeToMe, which.first, which.second.lastWritten);
204  }
205 
206  // use the specified column to build pdb :: Vector of the correct type to hold the output
207  // Note: this had better be a Vector <Handle <Something>> or we are going to have problems!!
209  return columns[whichColToOutput].second.createPDBVector();
210  }
211 
212  // see if we have the specified column
213  bool hasColumn(int whichColumn) {
214  return columns.count(whichColumn) != 0;
215  }
216 
218 
219  // delete all of the columns
220  for (auto& a : columns) {
221  auto& res = a.second;
222  if (res.second.mustDelete)
223  res.second.deleter(res.first);
224  }
225  }
226 
227  // filters a column
228  void filterColumn(int whichColToFilter, std::vector<bool>& usingMe) {
229 
230  // kill the old one so we don't have a memory leak
231  if (hasColumn(whichColToFilter)) {
232 
233  // filter the column, getting a new version
234  auto& value = columns[whichColToFilter];
235  auto res = value.second.filter(value.first, usingMe);
236 
237  // delete the old one, if necessary
238  if (value.second.mustDelete) {
239  value.second.deleter(value.first);
240  }
241 
242  // record the new column
243  value.first = res;
244 
245  // remember that we need to delete it
246  value.second.mustDelete = true;
247  return;
248  }
249 
250  std::cout << "This is really bad... trying to filter a non-existing column";
251  }
252 
253  // creates a replication of the column from another tuple set, copying each item a specified
254  // number of times and deleting the target, if necessary
255  void replicate(TupleSetPtr fromMe,
256  int whichColInFromMe,
257  int whichColToCopyTo,
258  std::vector<uint32_t>& replications) {
259 
260  // kill the old one so we don't have a memory leak
261  if (hasColumn(whichColToCopyTo)) {
262  // delete the existing column, if necessary
263  auto& value = columns[whichColToCopyTo];
264  if (value.second.mustDelete) {
265  value.second.deleter(value.first);
266  }
267  }
268 
269  // create a copy of the maintenance funcs
270  auto& value = fromMe->columns[whichColInFromMe];
271  MaintenanceFuncs temp = value.second;
272 
273  // remember that this is a deep copy... so we need to delete
274  temp.mustDelete = true;
275 
276  // and go ahead and replicate the column
277  void* newCol = temp.replicate(value.first, replications);
278 
279  // and go ahead and remember the column
280  columns[whichColToCopyTo] = std::make_pair(newCol, temp);
281  }
282 
283  // JiaNote: to get number of rows in a particular column
284  // returns -1 if column doesn't exist
285  int getNumRows(int whichColumn) {
286  if (hasColumn(whichColumn) == false) {
287  return -1;
288  }
289  return columns[whichColumn].second.getCount(columns[whichColumn].first);
290  }
291 
292 
293  // copies a column from another TupleSet, deleting the target, if necessary
294  void copyColumn(TupleSetPtr fromMe, int whichColInFromMe, int whichColToCopyTo) {
295 
296  // kill the old one so we don't have a memory leak
297  if (hasColumn(whichColToCopyTo)) {
298 
299  // delete the existing column, if necessary
300  auto& value = columns[whichColToCopyTo];
301  if (value.second.mustDelete) {
302  value.second.deleter(value.first);
303  }
304  }
305 
306  // create a copy of the maintenance funcs
307  auto& value = fromMe->columns[whichColInFromMe];
308  MaintenanceFuncs temp = value.second;
309 
310  // remember that this is a shallow copy... no need to delete
311  temp.mustDelete = false;
312 
313  // and go ahead and remember the column
314  columns[whichColToCopyTo] = std::make_pair(value.first, temp);
315  }
316 
317  // creates a new column, adding it to the tuple set
318  template <typename ColType>
319  void addColumn(int where, std::vector<ColType>* addMe, bool needToDelete) {
320 
321  // delete the old one, if needed
322  if (columns.count(where) != 0) {
323  auto& value = columns[where];
324  if (value.second.mustDelete) {
325  value.second.deleter(value.first);
326  }
327  }
328 
329  // now, add the new column... this reqires creating three lambdas to deal with
330  // column maintenance. The first lamba deletes the column, correctly taking into
331  // account the type of the column...
332  std::function<void(void*)> deleter;
333  deleter = [](void* deleteMe) {
334  std::vector<ColType>* killMe = (std::vector<ColType>*)deleteMe;
335  delete killMe;
336  };
337 
338  // and the second lambda filters the column, again correctly taking into account
339  // the type of the column
340  std::function<void*(void*, std::vector<bool>&)> filter;
341  filter = [](void* filter, std::vector<bool>& whichAreValid) {
342  std::vector<ColType>& filterMe = *((std::vector<ColType>*)filter);
343 
344  // count the number of rows that need to be retained
345  int counter = 0;
346  for (auto a : whichAreValid)
347  if (a)
348  counter++;
349 
350  // copy the ones that need to be retained over
351  std::vector<ColType>* newVec = new std::vector<ColType>(counter);
352  counter = 0;
353  for (int i = 0; i < filterMe.size(); i++) {
354  if (whichAreValid[i])
355  (*newVec)[counter++] = filterMe[i];
356  }
357 
358  // and return the result
359  return (void*)newVec;
360  };
361  std::function<void*(void*, std::vector<uint32_t>&)> replicate;
362  replicate = [](void* replicate, std::vector<uint32_t>& timesToReplicate) {
363 
364  std::vector<ColType>& replicateMe = *((std::vector<ColType>*)replicate);
365 
366  // count the number of rows that need to be retained
367  int counter = 0;
368  for (auto& a : timesToReplicate)
369  counter += a;
370 
371  // copy the ones that need to be retained over
372  std::vector<ColType>* newVec = new std::vector<ColType>(counter);
373  counter = 0;
374  for (int i = 0; i < timesToReplicate.size(); i++) {
375  for (int j = 0; j < timesToReplicate[i]; j++) {
376  (*newVec)[counter] = replicateMe[i];
377  counter++;
378  }
379  }
380 
381  // and return the result
382  return (void*)newVec;
383  };
384  // JiaNote: add getCount to get number of rows for a particular column at runtime
385  std::function<size_t(void*)> getCount;
386  getCount = [](void* countMe) {
387  std::vector<ColType>* toCountRowsOfMe = (std::vector<ColType>*)countMe;
388  return toCountRowsOfMe->size();
389  };
390 
391  // the third lambda is responsible for writing this column to an output vector
392  std::function<void(Handle<Vector<Handle<Object>>>&, void*, size_t&)> writeToVector;
393  if (std::is_base_of<PtrBase, ColType>::value)
394  writeToVector =
395  [](Handle<Vector<Handle<Object>>>& writeToMe, void* writeMe, size_t& lastWritten) {
396  std::vector<Ptr<Handle<Object>>>& writeMeOut =
397  *((std::vector<Ptr<Handle<Object>>>*)writeMe);
398  Vector<Handle<Object>>& outputToMe = *writeToMe;
399  for (; lastWritten < writeMeOut.size(); lastWritten++) {
400  Ptr<Handle<Object>> temp = writeMeOut[lastWritten];
401  outputToMe.push_back(*(writeMeOut[lastWritten]));
402  }
403  };
404  else
405  writeToVector = [](
406  Handle<Vector<Handle<Object>>>& writeToMe, void* writeMe, size_t& lastWritten) {
407  std::vector<Handle<Object>>& writeMeOut = *((std::vector<Handle<Object>>*)writeMe);
408  Vector<Handle<Object>>& outputToMe = *writeToMe;
409  for (; lastWritten < writeMeOut.size(); lastWritten++) {
410  outputToMe.push_back(writeMeOut[lastWritten]);
411  }
412  };
413 
414 
415  // finally, the sixth creates a pdb :: Vector to hold the column
416  std::function<Handle<Vector<Handle<Object>>>()> createPDBVector;
417  createPDBVector = []() {
418  Handle<Vector<Handle<ColType>>> returnVal = makeObject<Vector<Handle<ColType>>>();
419  return unsafeCast<Vector<Handle<Object>>>(returnVal);
420  };
421 
422  MaintenanceFuncs myFuncs(
423  deleter,
424  filter,
425  replicate,
426  getCount,
427  createPDBVector,
428  writeToVector,
429  needToDelete,
430  getTypeName<ColType>(),
431  getSerializedSize<std::is_base_of<PtrBase, ColType>::value, ColType>());
432  columns[where] = std::make_pair((void*)addMe, myFuncs);
433  }
434 };
435 }
436 
437 #endif
bool hasColumn(int whichColumn)
Definition: TupleSet.h:213
int getNumRows(int whichColumn)
Definition: TupleSet.h:285
Handle< Vector< Handle< Object > > > getOutputVector(int whichColToOutput)
Definition: TupleSet.h:208
std::function< void *(void *, std::vector< bool > &)> filter
Definition: TupleSet.h:76
std::vector< ColType > & getColumn(int whichColumn)
Definition: TupleSet.h:179
std::function< void(Handle< Vector< Handle< Object >>> &, void *, size_t &)> writeToVector
Definition: TupleSet.h:88
MaintenanceFuncs(std::function< void(void *)> deleter, std::function< void *(void *, std::vector< bool > &)> filter, std::function< void *(void *, std::vector< uint32_t > &)> replicate, std::function< size_t(void *)> getCount, std::function< Handle< Vector< Handle< Object >>>()> createPDBVector, std::function< void(Handle< Vector< Handle< Object >>> &, void *, size_t &)> writeToVector, bool mustDelete, std::string typeContained, size_t serializedSize)
Definition: TupleSet.h:106
void deleter(void *deleteMe, ObjType *dummy)
Definition: DeepCopy.h:48
auto tryDereference(InputType &arg) -> typename std::enable_if_t< B, decltype(*arg)& >
Definition: TupleSet.h:30
std::vector< std::string > getTypeNames()
Definition: TupleSet.h:156
void writeOutColumn(int whichColumn, Handle< Vector< Handle< Object >>> &writeToMe, bool startFromScratch)
Definition: TupleSet.h:190
std::function< void(void *)> deleter
Definition: TupleSet.h:73
PolicyList< OtherPolicies...>::type first()
int getNumColumns()
Definition: TupleSet.h:139
void addColumn(int where, std::vector< ColType > *addMe, bool needToDelete)
Definition: TupleSet.h:319
auto getSerializedSize() -> typename std::enable_if_t< B, size_t >
Definition: TupleSet.h:42
std::shared_ptr< TupleSet > TupleSetPtr
Definition: TupleSet.h:64
void filterColumn(int whichColToFilter, std::vector< bool > &usingMe)
Definition: TupleSet.h:228
std::function< size_t(void *)> getCount
Definition: TupleSet.h:82
std::function< void *(void *, std::vector< uint32_t > &)> replicate
Definition: TupleSet.h:79
std::string typeContained
Definition: TupleSet.h:91
void filter(std::string &r, const char *b)
Definition: TypeName.cc:20
std::function< Handle< Vector< Handle< Object > > >)> createPDBVector
Definition: TupleSet.h:85
void push_back(const TypeContained &val)
Definition: PDBVector.cc:95
void replicate(TupleSetPtr fromMe, int whichColInFromMe, int whichColToCopyTo, std::vector< uint32_t > &replications)
Definition: TupleSet.h:255
Definition: Ptr.h:32
std::map< int, std::pair< void *, MaintenanceFuncs > > columns
Definition: TupleSet.h:135
auto tryToObtainPointer(InputType &arg) -> typename std::enable_if_t< B, InputType * >
Definition: TupleSet.h:54
void copyColumn(TupleSetPtr fromMe, int whichColInFromMe, int whichColToCopyTo)
Definition: TupleSet.h:294