A platform for high-performance distributed tool and library development written in C++. It can be deployed in two different cluster modes: standalone or distributed. API for v0.5.0, released on June 13, 2018.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
HashSink.h
Go to the documentation of this file.
1 /*****************************************************************************
2  * *
3  * Copyright 2018 Rice University *
4  * *
5  * Licensed under the Apache License, Version 2.0 (the "License"); *
6  * you may not use this file except in compliance with the License. *
7  * You may obtain a copy of the License at *
8  * *
9  * http://www.apache.org/licenses/LICENSE-2.0 *
10  * *
11  * Unless required by applicable law or agreed to in writing, software *
12  * distributed under the License is distributed on an "AS IS" BASIS, *
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. *
14  * See the License for the specific language governing permissions and *
15  * limitations under the License. *
16  * *
17  *****************************************************************************/
18 
19 #ifndef HASH_SINK_H
20 #define HASH_SINK_H
21 
22 #include "ComputeSink.h"
23 #include "TupleSetMachine.h"
24 #include "TupleSet.h"
25 #include <vector>
26 
27 namespace pdb {
28 
29 // runs hashes all of the tuples
30 template <class KeyType, class ValueType>
31 class HashSink : public ComputeSink {
32 
33 private:
34  // the attributes to operate on
37 
38 public:
39  HashSink(TupleSpec& inputSchema, TupleSpec& attsToOperateOn) {
40 
41  // to setup the output tuple set
42  TupleSpec empty;
43  TupleSetSetupMachine myMachine(inputSchema, empty);
44 
45  // this is the input attribute that we will process
46  std::vector<int> matches = myMachine.match(attsToOperateOn);
47  whichAttToHash = matches[0];
48  whichAttToAggregate = matches[1];
49  }
50 
52 
53  // we simply create a new map to store the output
54  Handle<Map<KeyType, ValueType>> returnVal = makeObject<Map<KeyType, ValueType>>();
55  return returnVal;
56  }
57 
58  void writeOut(TupleSetPtr input, Handle<Object>& writeToMe) override {
59 
60  // get the map we are adding to
61  Handle<Map<KeyType, ValueType>> writeMe = unsafeCast<Map<KeyType, ValueType>>(writeToMe);
62  Map<KeyType, ValueType>& myMap = *writeMe;
63 
64  // get the input columns
65  std::vector<KeyType>& keyColumn = input->getColumn<KeyType>(whichAttToHash);
66  std::vector<ValueType>& valueColumn = input->getColumn<ValueType>(whichAttToAggregate);
67 
68  // and aggregate everyone
69  size_t length = keyColumn.size();
70  for (size_t i = 0; i < length; i++) {
71 
72  // if this key is not already there...
73  if (myMap.count(keyColumn[i]) == 0) {
74 
75  // this point will record where the value is located
76  ValueType* temp = nullptr;
77 
78  // try to add the key... this will cause an allocation for a new key/val pair
79  try {
80  // get the location that we need to write to...
81  temp = &(myMap[keyColumn[i]]);
82 
83  // if we get an exception, then we could not fit a new key/value pair
84  } catch (NotEnoughSpace& n) {
85 
86  // if we got here, then we ran out of space, and so we need to delete the
87  // already-processed
88  // data so that we can try again...
89  keyColumn.erase(keyColumn.begin(), keyColumn.begin() + i);
90  valueColumn.erase(valueColumn.begin(), valueColumn.begin() + i);
91  throw n;
92  }
93 
94  // we were able to fit a new key/value pair, so copy over the value
95  try {
96  *temp = valueColumn[i];
97 
98  // if we could not fit the value...
99  } catch (NotEnoughSpace& n) {
100 
101  // then we need to erase the key from the map
102  myMap.setUnused(keyColumn[i]);
103 
104  // and erase all of these guys from the tuple set since they were processed
105  keyColumn.erase(keyColumn.begin(), keyColumn.begin() + i);
106  valueColumn.erase(valueColumn.begin(), valueColumn.begin() + i);
107  throw n;
108  }
109 
110  // the key is there
111  } else {
112 
113  // get the value and a copy of it
114  ValueType& temp = myMap[keyColumn[i]];
115  ValueType copy = temp;
116 
117  // and add to the old value, producing a new one
118  try {
119  temp = copy + valueColumn[i];
120 
121  // if we got here, then it means that we ram out of RAM when we were trying
122  // to put the new value into the hash table
123  } catch (NotEnoughSpace& n) {
124 
125  // restore the old value
126  temp = copy;
127 
128  // and erase all of the guys who were processed
129  keyColumn.erase(keyColumn.begin(), keyColumn.begin() + i);
130  valueColumn.erase(valueColumn.begin(), valueColumn.begin() + i);
131  throw n;
132  }
133  }
134  }
135  }
136 
138 };
139 }
140 
141 #endif
Handle< Object > createNewOutputContainer() override
Definition: HashSink.h:51
int whichAttToHash
Definition: HashSink.h:35
void writeOut(TupleSetPtr input, Handle< Object > &writeToMe) override
Definition: HashSink.h:58
std::vector< int > match(TupleSpec &attsToMatch)
std::shared_ptr< TupleSet > TupleSetPtr
Definition: TupleSet.h:64
HashSink(TupleSpec &inputSchema, TupleSpec &attsToOperateOn)
Definition: HashSink.h:39
int whichAttToAggregate
Definition: HashSink.h:36