A platform for high-performance distributed tool and library development written in C++. It can be deployed in two different cluster modes: standalone or distributed. API for v0.5.0, released on June 13, 2018.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
CombinedShuffleSink.h
Go to the documentation of this file.
1 /*****************************************************************************
2  * *
3  * Copyright 2018 Rice University *
4  * *
5  * Licensed under the Apache License, Version 2.0 (the "License"); *
6  * you may not use this file except in compliance with the License. *
7  * You may obtain a copy of the License at *
8  * *
9  * http://www.apache.org/licenses/LICENSE-2.0 *
10  * *
11  * Unless required by applicable law or agreed to in writing, software *
12  * distributed under the License is distributed on an "AS IS" BASIS, *
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. *
14  * See the License for the specific language governing permissions and *
15  * limitations under the License. *
16  * *
17  *****************************************************************************/
18 
19 #ifndef COMBINED_SHUFFLE_SINK_H
20 #define COMBINED_SHUFFLE_SINK_H
21 
22 
23 #include "ComputeSink.h"
24 #include "TupleSetMachine.h"
25 #include "TupleSet.h"
26 #include "DataTypes.h"
27 #include "AggregationMap.h"
28 #include <vector>
29 
30 namespace pdb {
31 
32 // runs hashes all of the tuples, and stores aggregated results to a container that is partitioned
33 // by node partitions.
34 template <class KeyType, class ValueType>
36 
37 private:
38  // the attributes to operate on
41  int numNodes;
43 
44 public:
46  int numNodes,
47  TupleSpec& inputSchema,
48  TupleSpec& attsToOperateOn) {
49 
50 
51  // to setup the output tuple set
52  TupleSpec empty;
53  TupleSetSetupMachine myMachine(inputSchema, empty);
54 
55  // this is the input attribute that we will process
56  std::vector<int> matches = myMachine.match(attsToOperateOn);
57  whichAttToHash = matches[0];
58  whichAttToAggregate = matches[1];
59  this->numNodes = numNodes;
60  this->numPartitionsPerNode = numPartitionsPerNode;
61  }
62 
63 
65  HashPartitionID myHashID,
67 
68  int nodeId = myHashID / numPartitionsPerNode;
69  int partitionId = myHashID % numPartitionsPerNode;
70  return *((*((*outputData)[nodeId]))[partitionId]);
71  }
72 
74 
75  // we create a node-partitioned map to store the output
77  makeObject<Vector<Handle<Vector<Handle<AggregationMap<KeyType, ValueType>>>>>>(
78  numNodes);
79  int i, j;
80  for (i = 0; i < numNodes; i++) {
82  makeObject<Vector<Handle<AggregationMap<KeyType, ValueType>>>>(
83  this->numPartitionsPerNode);
84  for (j = 0; j < this->numPartitionsPerNode; j++) {
86  makeObject<AggregationMap<KeyType, ValueType>>();
87  curMap->setHashPartitionId(j);
88  nodeData->push_back(curMap);
89  }
90  returnVal->push_back(nodeData);
91  }
92  return returnVal;
93  }
94 
95  void writeOut(TupleSetPtr input, Handle<Object>& writeToMe) override {
96 
97  // get the map we are adding to
99  unsafeCast<Vector<Handle<Vector<Handle<AggregationMap<KeyType, ValueType>>>>>>(
100  writeToMe);
101  size_t hashVal;
102 
103 
104  // get the input columns
105  std::vector<KeyType>& keyColumn = input->getColumn<KeyType>(whichAttToHash);
106  std::vector<ValueType>& valueColumn = input->getColumn<ValueType>(whichAttToAggregate);
107 
108  // and aggregate everyone
109  size_t length = keyColumn.size();
110  for (size_t i = 0; i < length; i++) {
111 
112  hashVal = Hasher<KeyType>::hash(keyColumn[i]);
113 
115  getMap(hashVal % (numNodes * numPartitionsPerNode), writeMe);
116  // if this key is not already there...
117  if (myMap.count(keyColumn[i]) == 0) {
118 
119  // this point will record where the value is located
120  ValueType* temp = nullptr;
121 
122  // try to add the key... this will cause an allocation for a new key/val pair
123  try {
124  // get the location that we need to write to...
125  temp = &(myMap[keyColumn[i]]);
126 
127  // if we get an exception, then we could not fit a new key/value pair
128  } catch (NotEnoughSpace& n) {
129  // if we got here, then we ran out of space, and so we need to delete the
130  // already-processed
131  // data so that we can try again...
132  keyColumn.erase(keyColumn.begin(), keyColumn.begin() + i);
133  valueColumn.erase(valueColumn.begin(), valueColumn.begin() + i);
134  throw n;
135  }
136 
137  // we were able to fit a new key/value pair, so copy over the value
138  try {
139  *temp = valueColumn[i];
140  // if we could not fit the value...
141  } catch (NotEnoughSpace& n) {
142 
143  // then we need to erase the key from the map
144  myMap.setUnused(keyColumn[i]);
145 
146  // and erase all of these guys from the tuple set since they were processed
147  keyColumn.erase(keyColumn.begin(), keyColumn.begin() + i);
148  valueColumn.erase(valueColumn.begin(), valueColumn.begin() + i);
149  throw n;
150  }
151 
152  // the key is there
153  } else {
154 
155  // get the value and a copy of it
156  ValueType& temp = myMap[keyColumn[i]];
157  ValueType copy = temp;
158 
159  // and add to the old value, producing a new one
160  try {
161  temp = copy + valueColumn[i];
162  // if we got here, then it means that we ram out of RAM when we were trying
163  // to put the new value into the hash table
164  } catch (NotEnoughSpace& n) {
165 
166  // restore the old value
167  temp = copy;
168 
169  // and erase all of the guys who were processed
170  keyColumn.erase(keyColumn.begin(), keyColumn.begin() + i);
171  valueColumn.erase(valueColumn.begin(), valueColumn.begin() + i);
172  throw n;
173  }
174  }
175  }
176  }
177 
179 };
180 }
181 
182 #endif
CombinedShuffleSink(int numPartitionsPerNode, int numNodes, TupleSpec &inputSchema, TupleSpec &attsToOperateOn)
Handle< Object > createNewOutputContainer() override
unsigned int HashPartitionID
Definition: DataTypes.h:28
std::vector< int > match(TupleSpec &attsToMatch)
int count(const KeyType &which)
Definition: PDBMap.cc:129
void writeOut(TupleSetPtr input, Handle< Object > &writeToMe) override
std::shared_ptr< TupleSet > TupleSetPtr
Definition: TupleSet.h:64
void setUnused(const KeyType &clearMe)
Definition: PDBMap.cc:106
static auto hash(const KeyType &k) -> decltype(hash_impl(k, 0))
Definition: PairArray.cc:85
AggregationMap< KeyType, ValueType > & getMap(HashPartitionID myHashID, Handle< Vector< Handle< Vector< Handle< AggregationMap< KeyType, ValueType >>>>>> outputData)