A platform for high-performance distributed tool and library development written in C++. It can be deployed in two different cluster modes: standalone or distributed. API for v0.5.0, released on June 13, 2018.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
HashPartitionSink.h
Go to the documentation of this file.
1 /*****************************************************************************
2  * *
3  * Copyright 2018 Rice University *
4  * *
5  * Licensed under the Apache License, Version 2.0 (the "License"); *
6  * you may not use this file except in compliance with the License. *
7  * You may obtain a copy of the License at *
8  * *
9  * http://www.apache.org/licenses/LICENSE-2.0 *
10  * *
11  * Unless required by applicable law or agreed to in writing, software *
12  * distributed under the License is distributed on an "AS IS" BASIS, *
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. *
14  * See the License for the specific language governing permissions and *
15  * limitations under the License. *
16  * *
17  *****************************************************************************/
18 
19 #ifndef HASH_PARTITION_SINK_H
20 #define HASH_PARTITION_SINK_H
21 
22 
23 #include "ComputeSink.h"
24 #include "TupleSetMachine.h"
25 #include "TupleSet.h"
26 #include "DataTypes.h"
27 #include <vector>
28 
29 namespace pdb {
30 
31 // runs hashes all of the tuples, and stores all tuples to a container that is partitioned
32 // by node partitions.
33 template <class KeyType, class ValueType>
35 
36 
37 public:
38 
45  HashPartitionSink(int numPartitions, int numNodes, TupleSpec& inputSchema, TupleSpec& attsToOperateOn) {
46 
47  // to setup the output tuple set
48  TupleSpec empty;
49  TupleSetSetupMachine myMachine(inputSchema, empty);
50 
51  // this is the input attribute that we will process
52  std::vector<int> matches = myMachine.match(attsToOperateOn);
53  whichAttToStore = matches[0];
54  whichAttToHash = matches[1];
55  std::cout << "whichAttToStore=" << whichAttToStore << std::endl;
56  std::cout << "whichAttToHash=" << whichAttToHash << std::endl;
57  this->numPartitions = numPartitions;
58  this->numNodes = numNodes;
59  std::cout << "numPartitions=" << numPartitions << std::endl;
60  std::cout << "numNodes=" << numNodes << std::endl;
61  }
62 
68 
69  // we create a node-partitioned vector to store the output
71  makeObject<Vector<Handle<Vector<Handle<ValueType>>>>>(numNodes);
72  for (int i = 0; i < numNodes; i++) {
74  = makeObject<Vector<Handle<ValueType>>>();
75  returnVal->push_back(curNodeVec);
76  }
77  return returnVal;
78  }
79 
80 
87  void writeOut(TupleSetPtr input, Handle<Object>& writeToMe) override {
88 
89  // get the partitioned vector we are adding to
91  unsafeCast<Vector<Handle<Vector<Handle<ValueType>>>>>(writeToMe);
92  size_t hashVal;
93 
94 
95  // get the key columns
96  std::vector<KeyType>& keyColumn = input->getColumn<KeyType>(whichAttToHash);
97 
98  // get the value columns
99  std::vector<Handle<ValueType>>& valueColumn = input->getColumn<Handle<ValueType>>(whichAttToStore);
100 
101  // and allocate everyone to a partition
102  size_t length = keyColumn.size();
103  for (size_t i = 0; i < length; i++) {
104 
105  hashVal = Hasher<KeyType>::hash(keyColumn[i]);
106  int nodeId = (hashVal % (numPartitions))/(numPartitions/numNodes);
107  Vector<Handle<ValueType>>& myVec = *((*writeMe)[nodeId]);
108 
109  try {
110  //to add the value to the partition
111  myVec.push_back(valueColumn[i]);
112 
113  } catch (NotEnoughSpace & n) {
114 
115  /* if we got here then we run out of space and we need delete the already-processed
116  * data, throw an exception so that new space can be allocated by handling the exception,
117  * and try to process the remaining unprocessed data again */
118  keyColumn.erase(keyColumn.begin(), keyColumn.begin() + i);
119  valueColumn.erase(valueColumn.begin(), valueColumn.begin() + i);
120  throw n;
121 
122  }
123  }
124  }
125 
127 
128 
129 private:
130  // the attribute to operate on
132 
133  // the attribute to store
135 
136  // number of partitions in the cluster
138 
139  // number of nodes in the cluster
140  int numNodes;
141 
142 };
143 }
144 
145 #endif
Handle< Object > createNewOutputContainer() override
void writeOut(TupleSetPtr input, Handle< Object > &writeToMe) override
std::vector< int > match(TupleSpec &attsToMatch)
HashPartitionSink(int numPartitions, int numNodes, TupleSpec &inputSchema, TupleSpec &attsToOperateOn)
std::shared_ptr< TupleSet > TupleSetPtr
Definition: TupleSet.h:64
static auto hash(const KeyType &k) -> decltype(hash_impl(k, 0))
Definition: PairArray.cc:85
void push_back(const TypeContained &val)
Definition: PDBVector.cc:95