A platform for high-performance distributed tool and library development written in C++. It can be deployed in two different cluster modes: standalone or distributed. API for v0.5.0, released on June 13, 2018.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
VectorSink.h
Go to the documentation of this file.
1 /*****************************************************************************
2  * *
3  * Copyright 2018 Rice University *
4  * *
5  * Licensed under the Apache License, Version 2.0 (the "License"); *
6  * you may not use this file except in compliance with the License. *
7  * You may obtain a copy of the License at *
8  * *
9  * http://www.apache.org/licenses/LICENSE-2.0 *
10  * *
11  * Unless required by applicable law or agreed to in writing, software *
12  * distributed under the License is distributed on an "AS IS" BASIS, *
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. *
14  * See the License for the specific language governing permissions and *
15  * limitations under the License. *
16  * *
17  *****************************************************************************/
18 
19 #ifndef VECTOR_SINK_H
20 #define VECTOR_SINK_H
21 
22 #include "ComputeSink.h"
23 #include "TupleSetMachine.h"
24 #include "TupleSet.h"
25 #include <vector>
26 
27 namespace pdb {
28 
29 // writes out all of the items in a tuple set to a vector
30 template <typename DataType>
31 class VectorSink : public ComputeSink {
32 
33 private:
34  // the attribute to store
37 
38 public:
39  VectorSink(TupleSpec& inputSchema, TupleSpec& attsToOperateOn) {
40 
41  // to setup the output tuple set
42  TupleSpec empty;
43  TupleSetSetupMachine myMachine(inputSchema, empty);
44 
45  // this is the input attribute that we will process
46  std::vector<int> matches = myMachine.match(attsToOperateOn);
47  whichAttToStore = matches[0];
48  }
49 
51 
52  // we simply create a new map to store the output
53  Handle<Vector<Handle<DataType>>> returnVal = makeObject<Vector<Handle<DataType>>>();
54  return returnVal;
55  }
56 
57  void writeOut(TupleSetPtr input, Handle<Object>& writeToMe) override {
58 
59  // get the map we are adding to
60  Handle<Vector<Handle<DataType>>> writeMe = unsafeCast<Vector<Handle<DataType>>>(writeToMe);
61  auto& myVec = *writeMe;
62 
63  // get the input column
64  std::vector<Handle<DataType>>& inputColumn =
65  input->getColumn<Handle<DataType>>(whichAttToStore);
66 
67  // and aggregate everyone
68  int length = inputColumn.size();
69  for (int i = 0; i < length; i++) {
70  try {
71  myVec.push_back(inputColumn[i]);
72  } catch (NotEnoughSpace& n) {
73 
74  // if we got here, we need to erase all of the input that has been processed
75  inputColumn.erase(inputColumn.begin(), inputColumn.begin() + i);
76  throw n;
77  }
78  }
79  }
80 
82 };
83 }
84 
85 #endif
int whichAttToAggregate
Definition: VectorSink.h:36
void writeOut(TupleSetPtr input, Handle< Object > &writeToMe) override
Definition: VectorSink.h:57
Handle< Object > createNewOutputContainer() override
Definition: VectorSink.h:50
VectorSink(TupleSpec &inputSchema, TupleSpec &attsToOperateOn)
Definition: VectorSink.h:39
std::vector< int > match(TupleSpec &attsToMatch)
std::shared_ptr< TupleSet > TupleSetPtr
Definition: TupleSet.h:64