A platform for high-performance distributed tool and library development written in C++. It can be deployed in two different cluster modes: standalone or distributed. API for v0.5.0, released on June 13, 2018.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
MapTupleSetIterator.h
Go to the documentation of this file.
1 /*****************************************************************************
2  * *
3  * Copyright 2018 Rice University *
4  * *
5  * Licensed under the Apache License, Version 2.0 (the "License"); *
6  * you may not use this file except in compliance with the License. *
7  * You may obtain a copy of the License at *
8  * *
9  * http://www.apache.org/licenses/LICENSE-2.0 *
10  * *
11  * Unless required by applicable law or agreed to in writing, software *
12  * distributed under the License is distributed on an "AS IS" BASIS, *
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. *
14  * See the License for the specific language governing permissions and *
15  * limitations under the License. *
16  * *
17  *****************************************************************************/
18 
19 #ifndef MAP_TUPLESET_ITER_H
20 #define MAP_TUPLESET_ITER_H
21 
22 namespace pdb {
23 
24 // this class iterates over a pdb :: Map, returning a set of TupleSet objects
25 template <typename KeyType, typename ValueType, typename OutputType>
27 
28 private:
29  // the map we are iterating over
31 
32  // the number of items to put in each chunk that we produce
33  size_t chunkSize;
34 
35  // the tuple set we return
37 
38  // the iterator for the map
41 
42 public:
43  // the first param is a callback function that the iterator will call in order to obtain another
44  // vector
45  // to iterate over. The second param tells us how many objects to put into a tuple set
46  MapTupleSetIterator(Handle<Object> iterateOverMeIn, size_t chunkSize)
47  : iterateOverMe(unsafeCast<Map<KeyType, ValueType>>(iterateOverMeIn)),
48  chunkSize(chunkSize),
50  end(iterateOverMe->end()) {
51 
52  output = std::make_shared<TupleSet>();
53  std::vector<Handle<OutputType>>* inputColumn = new std::vector<Handle<OutputType>>;
54  output->addColumn(0, inputColumn, true);
55  }
56 
57  // JiaNote: so that we can tune chunk size automatically in the Pipeline class
58  void setChunkSize(size_t chunkSize) override {
59  this->chunkSize = chunkSize;
60  }
61 
62 
63  // returns the next tuple set to process, or nullptr if there is not one to process
67  // see if there are no more items in the vector to iterate over
68  if (!(begin != end)) {
69  return nullptr;
70  }
71 
72  std::vector<Handle<OutputType>>& inputColumn = output->getColumn<Handle<OutputType>>(0);
73  int limit = inputColumn.size();
74  for (int i = 0; i < chunkSize; i++) {
75  try {
76  if (i >= limit) {
77  Handle<OutputType> temp = (makeObject<OutputType>());
78  inputColumn.push_back(temp);
79  }
80  // key the key/value pair
81  inputColumn[i]->getKey() = (*begin).key;
82  inputColumn[i]->getValue() = (*begin).value;
83  } catch (NotEnoughSpace& n) {
84  begin = beginToRecover;
85  end = endToRecover;
86  inputColumn.clear();
87  throw n;
88  }
89  // move on to the next item
90  ++begin;
91 
92  // and exit if we are done
93  if (!(begin != end)) {
94  if (i + 1 < limit) {
95  inputColumn.resize(i + 1);
96  }
97  return output;
98  }
99  }
100 
101  return output;
102  }
103 
105 };
106 }
107 
108 #endif
void setChunkSize(size_t chunkSize) override
PDBMapIterator< KeyType, ValueType > begin
Handle< Map< KeyType, ValueType > > iterateOverMe
PDBMapIterator< KeyType, ValueType > end
std::shared_ptr< TupleSet > TupleSetPtr
Definition: TupleSet.h:64
Handle< OutObjType > unsafeCast(Handle< InObjType > &castMe)
MapTupleSetIterator(Handle< Object > iterateOverMeIn, size_t chunkSize)
TupleSetPtr getNextTupleSet() override