19 #ifndef COMBINED_SHUFFLE_SINK_H
20 #define COMBINED_SHUFFLE_SINK_H
34 template <
class KeyType,
class ValueType>
56 std::vector<int> matches = myMachine.
match(attsToOperateOn);
70 return *((*((*outputData)[nodeId]))[partitionId]);
77 makeObject<Vector<Handle<Vector<Handle<AggregationMap<KeyType, ValueType>>>>>>(
82 makeObject<Vector<Handle<AggregationMap<KeyType, ValueType>>>>(
86 makeObject<AggregationMap<KeyType, ValueType>>();
87 curMap->setHashPartitionId(j);
88 nodeData->push_back(curMap);
90 returnVal->push_back(nodeData);
99 unsafeCast<Vector<Handle<Vector<Handle<AggregationMap<KeyType, ValueType>>>>>>(
105 std::vector<KeyType>& keyColumn = input->getColumn<KeyType>(
whichAttToHash);
109 size_t length = keyColumn.size();
110 for (
size_t i = 0; i < length; i++) {
117 if (myMap.
count(keyColumn[i]) == 0) {
120 ValueType* temp =
nullptr;
125 temp = &(myMap[keyColumn[i]]);
132 keyColumn.erase(keyColumn.begin(), keyColumn.begin() + i);
133 valueColumn.erase(valueColumn.begin(), valueColumn.begin() + i);
139 *temp = valueColumn[i];
147 keyColumn.erase(keyColumn.begin(), keyColumn.begin() + i);
148 valueColumn.erase(valueColumn.begin(), valueColumn.begin() + i);
156 ValueType& temp = myMap[keyColumn[i]];
157 ValueType copy = temp;
161 temp = copy + valueColumn[i];
170 keyColumn.erase(keyColumn.begin(), keyColumn.begin() + i);
171 valueColumn.erase(valueColumn.begin(), valueColumn.begin() + i);
CombinedShuffleSink(int numPartitionsPerNode, int numNodes, TupleSpec &inputSchema, TupleSpec &attsToOperateOn)
Handle< Object > createNewOutputContainer() override
unsigned int HashPartitionID
std::vector< int > match(TupleSpec &attsToMatch)
int count(const KeyType &which)
void writeOut(TupleSetPtr input, Handle< Object > &writeToMe) override
std::shared_ptr< TupleSet > TupleSetPtr
void setUnused(const KeyType &clearMe)
static auto hash(const KeyType &k) -> decltype(hash_impl(k, 0))
AggregationMap< KeyType, ValueType > & getMap(HashPartitionID myHashID, Handle< Vector< Handle< Vector< Handle< AggregationMap< KeyType, ValueType >>>>>> outputData)