A platform for high-performance distributed tool and library development written in C++. It can be deployed in two different cluster modes: standalone or distributed. API for v0.5.0, released on June 13, 2018.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
ShuffleSink.h
Go to the documentation of this file.
1 /*****************************************************************************
2  * *
3  * Copyright 2018 Rice University *
4  * *
5  * Licensed under the Apache License, Version 2.0 (the "License"); *
6  * you may not use this file except in compliance with the License. *
7  * You may obtain a copy of the License at *
8  * *
9  * http://www.apache.org/licenses/LICENSE-2.0 *
10  * *
11  * Unless required by applicable law or agreed to in writing, software *
12  * distributed under the License is distributed on an "AS IS" BASIS, *
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. *
14  * See the License for the specific language governing permissions and *
15  * limitations under the License. *
16  * *
17  *****************************************************************************/
18 
19 #ifndef SHUFFLE_SINK_H
20 #define SHUFFLE_SINK_H
21 
22 
23 #include "ComputeSink.h"
24 #include "TupleSetMachine.h"
25 #include "TupleSet.h"
26 #include "DataTypes.h"
27 #include <vector>
28 
29 namespace pdb {
30 
31 // runs hashes all of the tuples, and stores aggregated results to a container that is partitioned
32 // by node partitions.
33 template <class KeyType, class ValueType>
34 class ShuffleSink : public ComputeSink {
35 
36 private:
37  // the attributes to operate on
41 
42 public:
43  ShuffleSink(int numPartitions, TupleSpec& inputSchema, TupleSpec& attsToOperateOn) {
44 
45  // to setup the output tuple set
46  TupleSpec empty;
47  TupleSetSetupMachine myMachine(inputSchema, empty);
48 
49  // this is the input attribute that we will process
50  std::vector<int> matches = myMachine.match(attsToOperateOn);
51  whichAttToHash = matches[0];
52  whichAttToAggregate = matches[1];
53  this->numPartitions = numPartitions;
54  }
55 
57 
58  // we create a node-partitioned map to store the output
60  makeObject<Vector<Handle<Map<KeyType, ValueType>>>>(numPartitions);
61  int i;
62  for (i = 0; i < numPartitions; i++) {
63  Handle<Map<KeyType, ValueType>> curMap = makeObject<Map<KeyType, ValueType>>();
64  returnVal->push_back(curMap);
65  }
66  return returnVal;
67  }
68 
69  void writeOut(TupleSetPtr input, Handle<Object>& writeToMe) override {
70 
71  // get the map we are adding to
73  unsafeCast<Vector<Handle<Map<KeyType, ValueType>>>>(writeToMe);
74  size_t hashVal;
75 
76 
77  // get the input columns
78  std::vector<KeyType>& keyColumn = input->getColumn<KeyType>(whichAttToHash);
79  std::vector<ValueType>& valueColumn = input->getColumn<ValueType>(whichAttToAggregate);
80 
81  // and aggregate everyone
82  size_t length = keyColumn.size();
83  for (size_t i = 0; i < length; i++) {
84 
85  hashVal = Hasher<KeyType>::hash(keyColumn[i]);
86 #ifndef NO_MOD_PARTITION
87  Map<KeyType, ValueType>& myMap = *((*writeMe)[(hashVal) % numPartitions]);
88 #else
90  *((*writeMe)[(hashVal / numPartitions) % numPartitions]);
91 #endif
92  // if this key is not already there...
93  if (myMap.count(keyColumn[i]) == 0) {
94 
95  // this point will record where the value is located
96  ValueType* temp = nullptr;
97 
98  // try to add the key... this will cause an allocation for a new key/val pair
99  try {
100  // get the location that we need to write to...
101  temp = &(myMap[keyColumn[i]]);
102 
103  // if we get an exception, then we could not fit a new key/value pair
104  } catch (NotEnoughSpace& n) {
105  // if we got here, then we ran out of space, and so we need to delete the
106  // already-processed
107  // std :: cout << "not enough space in shuffle sink to get new value" << std ::
108  // endl;
109  // data so that we can try again...
110  keyColumn.erase(keyColumn.begin(), keyColumn.begin() + i);
111  valueColumn.erase(valueColumn.begin(), valueColumn.begin() + i);
112  throw n;
113  }
114 
115  // we were able to fit a new key/value pair, so copy over the value
116  try {
117  *temp = valueColumn[i];
118  // if we could not fit the value...
119  } catch (NotEnoughSpace& n) {
120 
121  // std :: cout << "not enough space in shuffle sink to set value" << std ::
122  // endl;
123  // then we need to erase the key from the map
124  myMap.setUnused(keyColumn[i]);
125 
126  // and erase all of these guys from the tuple set since they were processed
127  keyColumn.erase(keyColumn.begin(), keyColumn.begin() + i);
128  valueColumn.erase(valueColumn.begin(), valueColumn.begin() + i);
129  throw n;
130  }
131 
132  // the key is there
133  } else {
134 
135  // get the value and a copy of it
136  ValueType& temp = myMap[keyColumn[i]];
137  ValueType copy = temp;
138 
139  // and add to the old value, producing a new one
140  try {
141  temp = copy + valueColumn[i];
142  // if we got here, then it means that we ram out of RAM when we were trying
143  // to put the new value into the hash table
144  } catch (NotEnoughSpace& n) {
145 
146  // std :: cout << "not enough space in shuffle sink to update value" << std ::
147  // endl;
148  // restore the old value
149  temp = copy;
150 
151  // and erase all of the guys who were processed
152  keyColumn.erase(keyColumn.begin(), keyColumn.begin() + i);
153  valueColumn.erase(valueColumn.begin(), valueColumn.begin() + i);
154  throw n;
155  }
156  }
157  }
158  }
159 
161 };
162 }
163 
164 #endif
std::vector< int > match(TupleSpec &attsToMatch)
int count(const KeyType &which)
Definition: PDBMap.cc:129
Handle< Object > createNewOutputContainer() override
Definition: ShuffleSink.h:56
std::shared_ptr< TupleSet > TupleSetPtr
Definition: TupleSet.h:64
ShuffleSink(int numPartitions, TupleSpec &inputSchema, TupleSpec &attsToOperateOn)
Definition: ShuffleSink.h:43
void setUnused(const KeyType &clearMe)
Definition: PDBMap.cc:106
static auto hash(const KeyType &k) -> decltype(hash_impl(k, 0))
Definition: PairArray.cc:85
void writeOut(TupleSetPtr input, Handle< Object > &writeToMe) override
Definition: ShuffleSink.h:69