18 #ifndef COMBINER_PROCESSOR_CC
19 #define COMBINER_PROCESSOR_CC
27 template <
class KeyType,
class ValueType>
29 PDB_COUT <<
"running CombinerProcessor constructor" << std::endl;
30 this->numNodePartitions = partitions.size();
34 for (i = 0; i < partitions.size(); i++) {
35 PDB_COUT << i <<
":" << partitions[i] << std::endl;
36 nodePartitionIds.push_back(partitions[i]);
45 template <
class KeyType,
class ValueType>
51 template <
class KeyType,
class ValueType>
53 PDB_COUT <<
"CombinerProcessor: to load a new input page" << std::endl;
59 curPartId = nodePartitionIds[curPartPos];
60 curMap = (*inputData)[curPartId];
61 if (begin !=
nullptr) {
67 PDB_COUT <<
"CombinerProcessor: loaded a page with first partition id=" << curPartId
68 <<
" and size=" << curMap->size() << std::endl;
74 template <
class KeyType,
class ValueType>
76 size_t numBytesInPage) {
79 blockPtr = std::make_shared<UseTemporaryAllocationBlock>(pageToWriteTo, numBytesInPage);
81 makeObject<Vector<Handle<AggregationMap<KeyType, ValueType>>>>(this->numNodePartitions);
83 for (i = 0; i < numNodePartitions; i++) {
84 PDB_COUT <<
"to create the " << i <<
"-th partition on this node" << std::endl;
86 makeObject<AggregationMap<KeyType, ValueType>>();
88 PDB_COUT <<
"currentPartitionId=" << currentPartitionId << std::endl;
90 currentMap->setHashPartitionId(i);
91 outputData->push_back(currentMap);
93 curOutputMap = (*outputData)[curPartPos];
96 template <
class KeyType,
class ValueType>
101 for (
int i = 0; i < numNodePartitions; i++) {
102 PDB_COUT <<
"outputData[" << i <<
"].size()=" << (*outputData)[i]->size() << std::endl;
103 PDB_COUT <<
"count=" << count << std::endl;
116 if (!((*begin) != (*end))) {
117 if (curPartPos < numNodePartitions - 1) {
119 PDB_COUT <<
"curPartPos=" << curPartPos << std::endl;
120 curPartId = nodePartitionIds[curPartPos];
121 PDB_COUT <<
"curPartId=" << curPartId << std::endl;
122 curMap = (*inputData)[curPartId];
123 PDB_COUT <<
"(*inputData)[" << curPartId <<
"].size()=" << curMap->size()
125 if (curMap->size() > 0) {
129 if ((*begin) != (*end)) {
130 curOutputMap = (*outputData)[curPartPos];
132 PDB_COUT <<
"this is strage: map size > 0 but begin == end"
143 curOutputMap = (*outputData)[0];
147 KeyType curKey = (*(*begin)).key;
148 ValueType curValue = (*(*begin)).value;
149 if (curOutputMap->count(curKey) == 0) {
152 ValueType* temp =
nullptr;
154 temp = &((*curOutputMap)[curKey]);
167 curOutputMap->setUnused(curKey);
175 ValueType& temp = (*curOutputMap)[curKey];
176 ValueType copy = temp;
181 temp = copy + curValue;
195 for (
int i = 0; i < numNodePartitions; i++) {
196 PDB_COUT <<
"outputData[" << i <<
"].size()=" << (*outputData)[i]->size() << std::endl;
203 template <
class KeyType,
class ValueType>
208 template <
class KeyType,
class ValueType>
211 outputData =
nullptr;
212 curOutputMap =
nullptr;
215 template <
class KeyType,
class ValueType>
void clearInputPage() override
Handle< ObjType > getRootObject()
void loadOutputPage(void *pageToWriteTo, size_t numBytesInPage) override
unsigned int HashPartitionID
Record< ObjType > * getRecord(Handle< ObjType > &forMe)
void initialize() override
void clearOutputPage() override
bool fillNextOutputPage() override
CombinerProcessor(std::vector< HashPartitionID > &partitions)
void loadInputPage(void *pageToProcess) override