A platform for high-performance distributed tool and library development written in C++. It can be deployed in two different cluster modes: standalone or distributed. API for v0.5.0, released on June 13, 2018.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
HashPartitionTransformationSink.h
Go to the documentation of this file.
1 /*****************************************************************************
2  * *
3  * Copyright 2018 Rice University *
4  * *
5  * Licensed under the Apache License, Version 2.0 (the "License"); *
6  * you may not use this file except in compliance with the License. *
7  * You may obtain a copy of the License at *
8  * *
9  * http://www.apache.org/licenses/LICENSE-2.0 *
10  * *
11  * Unless required by applicable law or agreed to in writing, software *
12  * distributed under the License is distributed on an "AS IS" BASIS, *
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. *
14  * See the License for the specific language governing permissions and *
15  * limitations under the License. *
16  * *
17  *****************************************************************************/
18 
19 #ifndef HASH_PARTITION_TRANSFORMATION_SINK_H
20 #define HASH_PARTITION_TRANSFORMATION_SINK_H
21 
22 
23 #include "ComputeSink.h"
24 #include "TupleSetMachine.h"
25 #include "TupleSet.h"
26 #include "DataTypes.h"
27 #include <vector>
28 
29 namespace pdb {
30 
31 // runs hashes all of the tuples, and stores all tuples to a container that is partitioned
32 // by node partitions.
33 template <class ValueType>
35 
36 
37 public:
38 
45  HashPartitionTransformationSink(int numPartitions, int numNodes, TupleSpec& inputSchema, TupleSpec& attToOperateOn) {
46 
47  // to setup the output tuple set
48  TupleSpec empty;
49  TupleSetSetupMachine myMachine(inputSchema, empty);
50 
51  // this is the input attribute that we will process
52  std::vector<int> matches = myMachine.match(attToOperateOn);
53  whichAttToHash = matches[0];
54  this->numPartitions = numPartitions;
55  this->numNodes = numNodes;
56  }
57 
63 
64  // we create a node-partitioned vector to store the output
65  // we create a node-partitioned vector to store the output
67  makeObject<Vector<Handle<Vector<Handle<Vector<ValueType>>>>>>(numNodes);
68  int i, j;
69  for (i = 0; i < numNodes; i++) {
71  = makeObject<Vector<Handle<Vector<ValueType>>>>(numPartitions/numNodes);
72  for (j = 0; j < numPartitions/numNodes; j++) {
73  Handle<Vector<ValueType>> curVec = makeObject<Vector<ValueType>>();
74  curNodeVec->push_back(curVec);
75  }
76  returnVal->push_back(curNodeVec);
77  }
78  return returnVal;
79 
80  }
81 
82 
89  void writeOut(TupleSetPtr input, Handle<Object>& writeToMe) override {
90 
91  // get the partitioned vector we are adding to
93  unsafeCast<Vector<Handle<Vector<Handle<Vector<ValueType>>>>>>(writeToMe);
94  size_t hashVal;
95 
96 
97  // get the input columns
98  std::vector<ValueType>& valueColumn = input->getColumn<ValueType>(whichAttToHash);
99 
100  // and allocate everyone to a partition
101  size_t length = valueColumn.size();
102  for (size_t i = 0; i < length; i++) {
103 
104  hashVal = Hasher<ValueType>::hash(valueColumn[i]);
105  int nodeId = (hashVal % (numPartitions/numNodes))/(numPartitions/numNodes);
106  int partitionId = (hashVal % (numPartitions/numNodes)) % (numPartitions/numNodes);
107  Vector<ValueType>& myVec = *((*((*writeMe)[nodeId]))[partitionId]);
108 
109  try {
110  //to add the value to the partition
111  myVec.push_back(valueColumn[i]);
112 
113  } catch (NotEnoughSpace & n) {
114 
115  /* if we got here then we run out of space and we need delete the already-processed
116  * data, throw an exception so that new space can be allocated by handling the exception,
117  * and try to process the remaining unprocessed data again */
118  valueColumn.erase(valueColumn.begin(), valueColumn.begin() + i);
119  throw n;
120 
121  }
122  }
123  }
124 
126 
127 
128 private:
129  // the attributes to operate on
131 
132  // number of partitions in the cluster
134 
135  // number of nodes in the cluster
136  int numNodes;
137 
138 };
139 }
140 
141 #endif
std::vector< int > match(TupleSpec &attsToMatch)
HashPartitionTransformationSink(int numPartitions, int numNodes, TupleSpec &inputSchema, TupleSpec &attToOperateOn)
std::shared_ptr< TupleSet > TupleSetPtr
Definition: TupleSet.h:64
static auto hash(const KeyType &k) -> decltype(hash_impl(k, 0))
Definition: PairArray.cc:85
void push_back(const TypeContained &val)
Definition: PDBVector.cc:95
void writeOut(TupleSetPtr input, Handle< Object > &writeToMe) override