18 #ifndef AGGREGATION_PROCESSOR_CC
19 #define AGGREGATION_PROCESSOR_CC
26 template <
class KeyType,
class ValueType>
37 template <
class KeyType,
class ValueType>
42 template <
class KeyType,
class ValueType>
44 curMap = unsafeCast<AggregationMap<KeyType, ValueType>,
Object>(objectToProcess);
46 if (curMap->getHashPartitionId() == id) {
48 if (begin !=
nullptr) {
65 template <
class KeyType,
class ValueType>
67 PDB_COUT <<
"AggregationProcessor-" <<
id <<
": Loading input page" << std::endl;
71 int numPartitions = inputData->size();
73 for (i = 0; i < numPartitions; i++) {
74 curMap = (*inputData)[i];
76 PDB_COUT << i <<
"-th map's partitionId is " << hashIdForCurrentMap << std::endl;
77 if (curMap->getHashPartitionId() == id) {
78 PDB_COUT <<
"this map has my id = " <<
id << std::endl;
80 if (begin !=
nullptr) {
81 PDB_COUT <<
"we delete the begin iterator of last input page" << std::endl;
85 PDB_COUT <<
"we delete the end iterator of last input page" << std::endl;
100 template <
class KeyType,
class ValueType>
102 size_t numBytesInPage) {
103 PDB_COUT <<
"AggregationProcessor-" <<
id <<
": Loading output page" << std::endl;
105 blockPtr = std::make_shared<UseTemporaryAllocationBlock>(pageToWriteTo, numBytesInPage);
106 outputData = makeObject<Map<KeyType, ValueType>>();
109 template <
class KeyType,
class ValueType>
119 if (curMap ==
nullptr) {
120 PDB_COUT <<
"this page doesn't have my map with id = " <<
id << std::endl;
131 if (!((*begin) != (*end))) {
135 KeyType curKey = (*(*begin)).key;
136 ValueType curValue = (*(*begin)).value;
138 if (outputData->count(curKey) == 0) {
139 ValueType* temp =
nullptr;
140 temp = &((*outputData)[curKey]);
148 outputData->setUnused(curKey);
154 ValueType& temp = (*outputData)[curKey];
155 ValueType copy = temp;
160 temp = copy + curValue;
179 template <
class KeyType,
class ValueType>
184 template <
class KeyType,
class ValueType>
187 outputData =
nullptr;
190 template <
class KeyType,
class ValueType>
196 template <
class KeyType,
class ValueType>
198 if (curMap ==
nullptr) {
void initialize() override
Handle< ObjType > getRootObject()
void loadInputObject(Handle< Object > &objectToProcess) override
unsigned int HashPartitionID
Record< ObjType > * getRecord(Handle< ObjType > &forMe)
bool needsProcessInput() override
void loadInputPage(void *pageToProcess) override
bool fillNextOutputPage() override
void loadOutputPage(void *pageToWriteTo, size_t numBytesInPage) override
void clearInputPage() override
void clearOutputPage() override