A platform for high-performance distributed tool and library development written in C++. It can be deployed in two different cluster modes: standalone or distributed. API for v0.5.0, released on June 13, 2018.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
FlattenExecutor.h
Go to the documentation of this file.
1 /*****************************************************************************
2  * *
3  * Copyright 2018 Rice University *
4  * *
5  * Licensed under the Apache License, Version 2.0 (the "License"); *
6  * you may not use this file except in compliance with the License. *
7  * You may obtain a copy of the License at *
8  * *
9  * http://www.apache.org/licenses/LICENSE-2.0 *
10  * *
11  * Unless required by applicable law or agreed to in writing, software *
12  * distributed under the License is distributed on an "AS IS" BASIS, *
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. *
14  * See the License for the specific language governing permissions and *
15  * limitations under the License. *
16  * *
17  *****************************************************************************/
18 
19 #ifndef FLATTEN_QUERY_EXEC_H
20 #define FLATTEN_QUERY_EXEC_H
21 
22 #include "ComputeExecutor.h"
23 #include "TupleSetMachine.h"
24 #include "TupleSet.h"
25 #include <vector>
26 
27 
28 namespace pdb {
29 
30 // runs an appending 1 operation
32 
33 private:
34  // this is the output TupleSet that we return
36 
37  // the attribute to operate on
38  int whichAtt;
39 
40  // the output attribute
41  int outAtt;
42 
43  // to setup the output tuple set
45 
46  // the list of counts for matches of each of the input tuples
47  std::vector<uint32_t> counts;
48 
49 public:
50  // currently, we just ignore the extra parameter to the filter if we get it
51  FlattenExecutor(TupleSpec& inputSchema,
52  TupleSpec& attsToOperateOn,
53  TupleSpec& attsToIncludeInOutput,
55  : myMachine(inputSchema, attsToIncludeInOutput) {
56 
57  // this is the input attribute that we will process
58  output = std::make_shared<TupleSet>();
59  std::vector<int> matches = myMachine.match(attsToOperateOn);
60  whichAtt = matches[0];
61  outAtt = attsToIncludeInOutput.getAtts().size();
62  }
63 
64  FlattenExecutor(TupleSpec& inputSchema,
65  TupleSpec& attsToOperateOn,
66  TupleSpec& attsToIncludeInOutput)
67  : myMachine(inputSchema, attsToIncludeInOutput) {
68 
69  // this is the input attribute that we will process
70  output = std::make_shared<TupleSet>();
71  std::vector<int> matches = myMachine.match(attsToOperateOn);
72  whichAtt = matches[0];
73  outAtt = attsToIncludeInOutput.getAtts().size();
74  }
75 
76  TupleSetPtr process(TupleSetPtr input) override {
77 
78  // std :: cout << "FlattenExecutor: to process a tuple set" << std :: endl;
79 
80  std::vector<Vector<Handle<Object>>> inputVecData =
81  input->getColumn<Vector<Handle<Object>>>(whichAtt);
82 
83  // redo the vector of counts if it's not the correct size
84  if (counts.size() != inputVecData.size()) {
85  counts.resize(inputVecData.size());
86  }
87 
88  // get counts for replication
89  int numFlattenedRows = 0;
90  for (int i = 0; i < inputVecData.size(); i++) {
91  Vector<Handle<Object>>& myVec = inputVecData[i];
92  int mySize = myVec.size();
93  // std :: cout << "mySize is" << mySize << std :: endl;
94  counts[i] = mySize;
95  numFlattenedRows += mySize;
96  }
97 
98  // replicates other columns
99  myMachine.replicate(input, output, counts, 0);
100 
101  // add new column
102  if (!output->hasColumn(outAtt)) {
103  std::vector<Handle<Object>>* outColumn = new std::vector<Handle<Object>>();
104  output->addColumn(outAtt, outColumn, true);
105  }
106 
107  // get the output column
108  std::vector<Handle<Object>>& outColumn = output->getColumn<Handle<Object>>(outAtt);
109  // loop over to set the rows
110  outColumn.resize(numFlattenedRows);
111  int overallCounter = 0;
112  size_t mySize = inputVecData.size();
113  for (int i = 0; i < mySize; i++) {
114  Vector<Handle<Object>>& myVec = inputVecData[i];
115  Handle<Object>* myRawData = myVec.c_ptr();
116  size_t myVecSize = myVec.size();
117  for (int j = 0; j < myVecSize; j++) {
118  outColumn[overallCounter] = myRawData[j];
119  overallCounter++;
120  }
121  }
122 
123  return output;
124  }
125 
126 
127  std::string getType() override {
128  return "FLATTEN";
129  }
130 };
131 }
132 
133 #endif
std::vector< std::string > & getAtts()
Definition: TupleSpec.h:60
TypeContained * c_ptr() const
Definition: PDBVector.cc:118
std::shared_ptr< ComputeInfo > ComputeInfoPtr
Definition: ComputeInfo.h:33
std::string getType() override
std::vector< int > match(TupleSpec &attsToMatch)
TupleSetPtr process(TupleSetPtr input) override
void replicate(TupleSetPtr input, TupleSetPtr output, std::vector< uint32_t > &counts, int offset)
std::vector< uint32_t > counts
std::shared_ptr< TupleSet > TupleSetPtr
Definition: TupleSet.h:64
size_t size() const
Definition: PDBVector.cc:67
TupleSetSetupMachine myMachine
FlattenExecutor(TupleSpec &inputSchema, TupleSpec &attsToOperateOn, TupleSpec &attsToIncludeInOutput, ComputeInfoPtr)
FlattenExecutor(TupleSpec &inputSchema, TupleSpec &attsToOperateOn, TupleSpec &attsToIncludeInOutput)