19 #ifndef SHUFFLE_SINK_H
20 #define SHUFFLE_SINK_H
33 template <
class KeyType,
class ValueType>
50 std::vector<int> matches = myMachine.
match(attsToOperateOn);
60 makeObject<Vector<Handle<Map<KeyType, ValueType>>>>(
numPartitions);
64 returnVal->push_back(curMap);
73 unsafeCast<Vector<Handle<Map<KeyType, ValueType>>>>(writeToMe);
78 std::vector<KeyType>& keyColumn = input->getColumn<KeyType>(
whichAttToHash);
82 size_t length = keyColumn.size();
83 for (
size_t i = 0; i < length; i++) {
86 #ifndef NO_MOD_PARTITION
93 if (myMap.
count(keyColumn[i]) == 0) {
96 ValueType* temp =
nullptr;
101 temp = &(myMap[keyColumn[i]]);
110 keyColumn.erase(keyColumn.begin(), keyColumn.begin() + i);
111 valueColumn.erase(valueColumn.begin(), valueColumn.begin() + i);
117 *temp = valueColumn[i];
127 keyColumn.erase(keyColumn.begin(), keyColumn.begin() + i);
128 valueColumn.erase(valueColumn.begin(), valueColumn.begin() + i);
136 ValueType& temp = myMap[keyColumn[i]];
137 ValueType copy = temp;
141 temp = copy + valueColumn[i];
152 keyColumn.erase(keyColumn.begin(), keyColumn.begin() + i);
153 valueColumn.erase(valueColumn.begin(), valueColumn.begin() + i);
std::vector< int > match(TupleSpec &attsToMatch)
int count(const KeyType &which)
Handle< Object > createNewOutputContainer() override
std::shared_ptr< TupleSet > TupleSetPtr
ShuffleSink(int numPartitions, TupleSpec &inputSchema, TupleSpec &attsToOperateOn)
void setUnused(const KeyType &clearMe)
static auto hash(const KeyType &k) -> decltype(hash_impl(k, 0))
void writeOut(TupleSetPtr input, Handle< Object > &writeToMe) override