A platform for high-performance distributed tool and library development written in C++. It can be deployed in two different cluster modes: standalone or distributed. API for v0.5.0, released on June 13, 2018.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
CombinerProcessor.cc
Go to the documentation of this file.
1 /*****************************************************************************
2  * *
3  * Copyright 2018 Rice University *
4  * *
5  * Licensed under the Apache License, Version 2.0 (the "License"); *
6  * you may not use this file except in compliance with the License. *
7  * You may obtain a copy of the License at *
8  * *
9  * http://www.apache.org/licenses/LICENSE-2.0 *
10  * *
11  * Unless required by applicable law or agreed to in writing, software *
12  * distributed under the License is distributed on an "AS IS" BASIS, *
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. *
14  * See the License for the specific language governing permissions and *
15  * limitations under the License. *
16  * *
17  *****************************************************************************/
18 #ifndef COMBINER_PROCESSOR_CC
19 #define COMBINER_PROCESSOR_CC
20 
21 #include "CombinerProcessor.h"
22 #include "Employee.h"
23 
24 namespace pdb {
25 
26 
27 template <class KeyType, class ValueType>
28 CombinerProcessor<KeyType, ValueType>::CombinerProcessor(std::vector<HashPartitionID>& partitions) {
29  PDB_COUT << "running CombinerProcessor constructor" << std::endl;
30  this->numNodePartitions = partitions.size();
31  finalized = false;
32 
33  int i;
34  for (i = 0; i < partitions.size(); i++) {
35  PDB_COUT << i << ":" << partitions[i] << std::endl;
36  nodePartitionIds.push_back(partitions[i]);
37  }
38  count = 0;
39  curPartPos = 0;
40  begin = nullptr;
41  end = nullptr;
42 }
43 
44 // initialize
45 template <class KeyType, class ValueType>
47  finalized = false;
48 }
49 
50 // loads up another input page to process
51 template <class KeyType, class ValueType>
53  PDB_COUT << "CombinerProcessor: to load a new input page" << std::endl;
56  inputData = myRec->getRootObject();
57  curPartPos = 0;
58  count = 0;
59  curPartId = nodePartitionIds[curPartPos];
60  curMap = (*inputData)[curPartId];
61  if (begin != nullptr) {
62  delete begin;
63  }
64  if (end != nullptr) {
65  delete end;
66  }
67  PDB_COUT << "CombinerProcessor: loaded a page with first partition id=" << curPartId
68  << " and size=" << curMap->size() << std::endl;
69  begin = new PDBMapIterator<KeyType, ValueType>(curMap->getArray(), true);
70  end = new PDBMapIterator<KeyType, ValueType>(curMap->getArray());
71 }
72 
73 // loads up another output page to write results to
74 template <class KeyType, class ValueType>
76  size_t numBytesInPage) {
77 
78  blockPtr = nullptr;
79  blockPtr = std::make_shared<UseTemporaryAllocationBlock>(pageToWriteTo, numBytesInPage);
80  outputData =
81  makeObject<Vector<Handle<AggregationMap<KeyType, ValueType>>>>(this->numNodePartitions);
82  int i;
83  for (i = 0; i < numNodePartitions; i++) {
84  PDB_COUT << "to create the " << i << "-th partition on this node" << std::endl;
86  makeObject<AggregationMap<KeyType, ValueType>>();
87  HashPartitionID currentPartitionId = nodePartitionIds[i];
88  PDB_COUT << "currentPartitionId=" << currentPartitionId << std::endl;
89  // however we only use the relative/local hash partition id
90  currentMap->setHashPartitionId(i);
91  outputData->push_back(currentMap);
92  }
93  curOutputMap = (*outputData)[curPartPos];
94 }
95 
96 template <class KeyType, class ValueType>
98 
99  // if we are finalized, see if there are some left over records
100  if (finalized) {
101  for (int i = 0; i < numNodePartitions; i++) {
102  PDB_COUT << "outputData[" << i << "].size()=" << (*outputData)[i]->size() << std::endl;
103  PDB_COUT << "count=" << count << std::endl;
104  }
105 
106  getRecord(outputData);
107  return false;
108  }
109 
110  // we are not finalized, so process the page
111  try {
112 
113  // see if there are any more items in current map to iterate over
114  while (true) {
115 
116  if (!((*begin) != (*end))) {
117  if (curPartPos < numNodePartitions - 1) {
118  curPartPos++;
119  PDB_COUT << "curPartPos=" << curPartPos << std::endl;
120  curPartId = nodePartitionIds[curPartPos];
121  PDB_COUT << "curPartId=" << curPartId << std::endl;
122  curMap = (*inputData)[curPartId];
123  PDB_COUT << "(*inputData)[" << curPartId << "].size()=" << curMap->size()
124  << std::endl;
125  if (curMap->size() > 0) {
126  begin = new PDBMapIterator<KeyType, ValueType>(curMap->getArray(), true);
127  end = new PDBMapIterator<KeyType, ValueType>(curMap->getArray());
128 
129  if ((*begin) != (*end)) {
130  curOutputMap = (*outputData)[curPartPos];
131  } else {
132  PDB_COUT << "this is strage: map size > 0 but begin == end"
133  << std::endl;
134  continue;
135  }
136  } else {
137  continue;
138  }
139  } else {
140  // JiaNote: this is important, we need make sure when we load next page, we
141  // start output from the 0-th partition
142  curPartPos = 0;
143  curOutputMap = (*outputData)[0];
144  return false;
145  }
146  }
147  KeyType curKey = (*(*begin)).key;
148  ValueType curValue = (*(*begin)).value;
149  if (curOutputMap->count(curKey) == 0) {
150  // if the key is not there
151 
152  ValueType* temp = nullptr;
153  try {
154  temp = &((*curOutputMap)[curKey]);
155 
156  } catch (NotEnoughSpace& n) {
157 
158  throw n;
159  }
160  try {
161 
162  *temp = curValue;
163  ++(*begin);
164  count++;
165  // if we couldn't fit the value
166  } catch (NotEnoughSpace& n) {
167  curOutputMap->setUnused(curKey);
168 
169  throw n;
170  }
171  // the key is there
172  } else {
173 
174  // get the value and copy of it
175  ValueType& temp = (*curOutputMap)[curKey];
176  ValueType copy = temp;
177 
178  // and add to old value, producing a new one
179  try {
180 
181  temp = copy + curValue;
182  ++(*begin);
183  count++;
184 
185  // if we got here, it means we run out of RAM and we need to restore the old
186  // value in the destination hash map
187  } catch (NotEnoughSpace& n) {
188  temp = copy;
189  throw n;
190  }
191  }
192  }
193 
194  } catch (NotEnoughSpace& n) {
195  for (int i = 0; i < numNodePartitions; i++) {
196  PDB_COUT << "outputData[" << i << "].size()=" << (*outputData)[i]->size() << std::endl;
197  }
198  getRecord(outputData);
199  return true;
200  }
201 }
202 
203 template <class KeyType, class ValueType>
205  finalized = true;
206 }
207 
208 template <class KeyType, class ValueType>
210  blockPtr = nullptr;
211  outputData = nullptr;
212  curOutputMap = nullptr;
213 }
214 
215 template <class KeyType, class ValueType>
217  inputData = nullptr;
218 }
219 }
220 
221 
222 #endif
void clearInputPage() override
Handle< ObjType > getRootObject()
Definition: Record.cc:46
void loadOutputPage(void *pageToWriteTo, size_t numBytesInPage) override
unsigned int HashPartitionID
Definition: DataTypes.h:28
Record< ObjType > * getRecord(Handle< ObjType > &forMe)
void initialize() override
void clearOutputPage() override
bool fillNextOutputPage() override
CombinerProcessor(std::vector< HashPartitionID > &partitions)
#define PDB_COUT
Definition: PDBDebug.h:31
void loadInputPage(void *pageToProcess) override