A platform for high-performance distributed tool and library development written in C++. It can be deployed in two different cluster modes: standalone or distributed. API for v0.5.0, released on June 13, 2018.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
AggregationProcessor.cc
Go to the documentation of this file.
1 /*****************************************************************************
2  * *
3  * Copyright 2018 Rice University *
4  * *
5  * Licensed under the Apache License, Version 2.0 (the "License"); *
6  * you may not use this file except in compliance with the License. *
7  * You may obtain a copy of the License at *
8  * *
9  * http://www.apache.org/licenses/LICENSE-2.0 *
10  * *
11  * Unless required by applicable law or agreed to in writing, software *
12  * distributed under the License is distributed on an "AS IS" BASIS, *
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. *
14  * See the License for the specific language governing permissions and *
15  * limitations under the License. *
16  * *
17  *****************************************************************************/
18 #ifndef AGGREGATION_PROCESSOR_CC
19 #define AGGREGATION_PROCESSOR_CC
20 
21 #include "AggregationProcessor.h"
22 
23 namespace pdb {
24 
25 
26 template <class KeyType, class ValueType>
28 
29  this->id = id;
30  finalized = false;
31  count = 0;
32  begin = nullptr;
33  end = nullptr;
34 }
35 
36 // initialize
37 template <class KeyType, class ValueType>
39  finalized = false;
40 }
41 
42 template <class KeyType, class ValueType>
44  curMap = unsafeCast<AggregationMap<KeyType, ValueType>, Object>(objectToProcess);
45  HashPartitionID hashIdForCurrentMap = curMap->getHashPartitionId();
46  if (curMap->getHashPartitionId() == id) {
47  count = 0;
48  if (begin != nullptr) {
49  delete begin;
50  }
51  if (end != nullptr) {
52  delete end;
53  }
54  begin = new PDBMapIterator<KeyType, ValueType>(curMap->getArray(), true);
55  end = new PDBMapIterator<KeyType, ValueType>(curMap->getArray());
56 
57  } else {
58  // there is no hash partition for this thread
59  curMap = nullptr;
60  }
61 }
62 
63 
64 // loads up another input page to process
65 template <class KeyType, class ValueType>
67  PDB_COUT << "AggregationProcessor-" << id << ": Loading input page" << std::endl;
70  inputData = myRec->getRootObject();
71  int numPartitions = inputData->size();
72  int i;
73  for (i = 0; i < numPartitions; i++) {
74  curMap = (*inputData)[i];
75  HashPartitionID hashIdForCurrentMap = curMap->getHashPartitionId();
76  PDB_COUT << i << "-th map's partitionId is " << hashIdForCurrentMap << std::endl;
77  if (curMap->getHashPartitionId() == id) {
78  PDB_COUT << "this map has my id = " << id << std::endl;
79  count = 0;
80  if (begin != nullptr) {
81  PDB_COUT << "we delete the begin iterator of last input page" << std::endl;
82  delete begin;
83  }
84  if (end != nullptr) {
85  PDB_COUT << "we delete the end iterator of last input page" << std::endl;
86  delete end;
87  }
88  begin = new PDBMapIterator<KeyType, ValueType>(curMap->getArray(), true);
89  end = new PDBMapIterator<KeyType, ValueType>(curMap->getArray());
90 
91  break;
92  } else {
93  // there is no hash partition for this thread
94  curMap = nullptr;
95  }
96  }
97 }
98 
99 // loads up another output page to write results to
100 template <class KeyType, class ValueType>
102  size_t numBytesInPage) {
103  PDB_COUT << "AggregationProcessor-" << id << ": Loading output page" << std::endl;
104  blockPtr = nullptr;
105  blockPtr = std::make_shared<UseTemporaryAllocationBlock>(pageToWriteTo, numBytesInPage);
106  outputData = makeObject<Map<KeyType, ValueType>>();
107 }
108 
109 template <class KeyType, class ValueType>
111 
112  // if we are finalized, see if there are some left over records
113  if (finalized) {
114  getRecord(outputData);
115  return false;
116  }
117 
118 
119  if (curMap == nullptr) {
120  PDB_COUT << "this page doesn't have my map with id = " << id << std::endl;
121  return false;
122  }
123 
124 
125  // we are not finalized, so process the page
126  try {
127 
128  // see if there are any more items in current map to iterate over
129  while (true) {
130 
131  if (!((*begin) != (*end))) {
132  count = 0;
133  return false;
134  }
135  KeyType curKey = (*(*begin)).key;
136  ValueType curValue = (*(*begin)).value;
137  // if the key is not there
138  if (outputData->count(curKey) == 0) {
139  ValueType* temp = nullptr;
140  temp = &((*outputData)[curKey]);
141  try {
142 
143  *temp = curValue;
144  ++(*begin);
145  count++;
146  // if we couldn't fit the value
147  } catch (NotEnoughSpace& n) {
148  outputData->setUnused(curKey);
149  throw n;
150  }
151  // the key is there
152  } else {
153  // get the value and copy of it
154  ValueType& temp = (*outputData)[curKey];
155  ValueType copy = temp;
156 
157  // and add to old value, producing a new one
158  try {
159 
160  temp = copy + curValue;
161  ++(*begin);
162  count++;
163 
164  // if we got here, it means we run out of RAM and we need to restore the old
165  // value in the destination hash map
166  } catch (NotEnoughSpace& n) {
167  temp = copy;
168  throw n;
169  }
170  }
171  }
172 
173  } catch (NotEnoughSpace& n) {
174  getRecord(outputData);
175  return true;
176  }
177 }
178 
179 template <class KeyType, class ValueType>
181  finalized = true;
182 }
183 
184 template <class KeyType, class ValueType>
186  blockPtr = nullptr;
187  outputData = nullptr;
188 }
189 
190 template <class KeyType, class ValueType>
192  curMap = nullptr;
193  inputData = nullptr;
194 }
195 
196 template <class KeyType, class ValueType>
198  if (curMap == nullptr) {
199  return false;
200  } else {
201  return true;
202  }
203 }
204 }
205 
206 
207 #endif
Handle< ObjType > getRootObject()
Definition: Record.cc:46
void loadInputObject(Handle< Object > &objectToProcess) override
unsigned int HashPartitionID
Definition: DataTypes.h:28
Record< ObjType > * getRecord(Handle< ObjType > &forMe)
#define PDB_COUT
Definition: PDBDebug.h:31
void loadInputPage(void *pageToProcess) override
void loadOutputPage(void *pageToWriteTo, size_t numBytesInPage) override