A platform for high-performance distributed tool and library development written in C++. It can be deployed in two different cluster modes: standalone or distributed. API for v0.5.0, released on June 13, 2018.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
FilterBlockQueryProcessor.cc
Go to the documentation of this file.
1 /*****************************************************************************
2  * *
3  * Copyright 2018 Rice University *
4  * *
5  * Licensed under the Apache License, Version 2.0 (the "License"); *
6  * you may not use this file except in compliance with the License. *
7  * You may obtain a copy of the License at *
8  * *
9  * http://www.apache.org/licenses/LICENSE-2.0 *
10  * *
11  * Unless required by applicable law or agreed to in writing, software *
12  * distributed under the License is distributed on an "AS IS" BASIS, *
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. *
14  * See the License for the specific language governing permissions and *
15  * limitations under the License. *
16  * *
17  *****************************************************************************/
18 
19 #ifndef FILTER_BLOCK_QUERY_PROCESSOR_CC
20 #define FILTER_BLOCK_QUERY_PROCESSOR_CC
21 #include "PDBDebug.h"
22 #include "InterfaceFunctions.h"
24 #include "SharedEmployee.h"
25 
26 namespace pdb {
27 
28 template <class Output, class Input>
30  // std :: cout << "running FilterBlockQueryProcessor destructor" << std :: endl;
31  this->inputBlock = nullptr;
32  this->outputBlock = nullptr;
33  this->context = nullptr;
34  this->inputObject = nullptr;
35 }
36 
37 
38 template <class Output, class Input>
40  Selection<Output, Input>& forMe) {
41 
42  // get a copy of the lambdas for query processing
43  filterPred = forMe.getProjectionSelection(inputObject);
44  finalized = false;
45 }
46 
47 template <class Output, class Input>
49 
50  // get a copy of the lambdas for filter processing
51  this->filterPred = filterPred;
52  finalized = false;
53 }
54 
55 // no need to do anything
56 template <class Output, class Input>
58  filterFunc = filterPred.getFunc();
59  finalized = false;
60 }
61 
62 // loads up another input page to process
63 template <class Output, class Input>
65  this->inputBlock = inputBlock;
66  this->batchSize = this->inputBlock->getBlock().size();
67  posInInput = 0;
68 }
69 
70 // load up another output page to process
71 template <class Output, class Input>
73 
74  // and here's where we write the ouput to
75  this->outputBlock = makeObject<GenericBlock>();
76  return this->outputBlock;
77 }
78 
79 template <class Output, class Input>
81  // std :: cout << "Filter processor is running" << std :: endl;
82  Vector<Handle<Output>>& myInVec = (inputBlock->getBlock());
83  Vector<Handle<Output>>& myOutVec = (outputBlock->getBlock());
84 
85  // if we are finalized, see if there are some left over records
86  if (finalized) {
87  return false;
88  }
89 
90  // int totalObjects = 0;
91  // we are not finalized, so process the page
92  try {
93  int vecSize = myInVec.size();
94  // std :: cout << "filterProcessor: posInInput=" << posInInput << ",input object num =" <<
95  // vecSize << std :: endl;
96  for (; posInInput < vecSize; posInInput++) {
97  inputObject = myInVec[posInInput];
98  // std :: cout << "posInInput=" << posInInput << std :: endl;
99  if (filterFunc()) {
100  // std :: cout << "to push back posInInput=" << posInInput << std :: endl;
101  myOutVec.push_back(inputObject);
102  // totalObjects ++;
103  // std :: cout << "push back posInInput=" << posInInput << std :: endl;
104  }
105  }
106  // std :: cout << "Filter processor processed an input block with "<< totalObjects << "
107  // output objects" << std :: endl;
108  return false;
109 
110  } catch (NotEnoughSpace& n) {
111  // std :: cout << "Filter processor consumed current page with "<< totalObjects << "
112  // objects" << std :: endl;
113  // std :: cout << "posInInput =" << posInInput << std :: endl;
114  if (this->context != nullptr) {
115  // getRecord (context->outputVec);
116  context->setOutputFull(true);
117  }
118  return true;
119  }
120 }
121 
122 // must be called repeately after all of the input pages have been sent in...
123 template <class Output, class Input>
125  finalized = true;
126 }
127 
128 // must be called before freeing the memory in output page
129 template <class Output, class Input>
131  outputBlock = nullptr;
132 }
133 
134 // must be called before freeing the memory in input page
135 template <class Output, class Input>
137  inputBlock = nullptr;
138  inputObject = nullptr;
139 }
140 }
141 
142 #endif
void loadInputBlock(Handle< GenericBlock > block) override
Handle< GenericBlock > & loadOutputBlock() override
size_t size() const
Definition: PDBVector.cc:67
void push_back(const TypeContained &val)
Definition: PDBVector.cc:95
FilterBlockQueryProcessor(Selection< Output, Input > &forMe)