A platform for high-performance distributed tool and library development written in C++. It can be deployed in two different cluster modes: standalone or distributed. API for v0.5.0, released on June 13, 2018.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
Pipeline.h
Go to the documentation of this file.
1 /*****************************************************************************
2  * *
3  * Copyright 2018 Rice University *
4  * *
5  * Licensed under the Apache License, Version 2.0 (the "License"); *
6  * you may not use this file except in compliance with the License. *
7  * You may obtain a copy of the License at *
8  * *
9  * http://www.apache.org/licenses/LICENSE-2.0 *
10  * *
11  * Unless required by applicable law or agreed to in writing, software *
12  * distributed under the License is distributed on an "AS IS" BASIS, *
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. *
14  * See the License for the specific language governing permissions and *
15  * limitations under the License. *
16  * *
17  *****************************************************************************/
18 
19 #ifndef PIPELINE_H
20 #define PIPELINE_H
21 
22 #include "ComputeSource.h"
23 #include "ComputeSink.h"
25 #include "Handle.h"
26 #include <queue>
27 
28 #ifndef MIN_BATCH_SIZE
29 #define MIN_BATCH_SIZE 10
30 #endif
31 
32 
33 namespace pdb {
34 
35 // this is used to buffer unwritten pages
36 struct MemoryHolder {
37 
38  // the output vector that this guy stores
40 
41  // his memory
42  void* location;
43 
44  // the iteration where he was last written...
45  // we use this beause we canot delete
46  int iteration;
47 
48  void setIteration(int iterationIn) {
49  if (outputSink != nullptr)
51  iteration = iterationIn;
52  }
53 
54  MemoryHolder(std::pair<void*, size_t> buildMe) {
55  location = buildMe.first;
56  makeObjectAllocatorBlock(location, buildMe.second, true);
57  outputSink = nullptr;
58  }
59 };
60 
61 typedef std::shared_ptr<MemoryHolder> MemoryHolderPtr;
62 
63 // this is a prototype for the pipeline
64 class Pipeline {
65 
66 private:
67  // this is a function that the pipeline calls to obtain a new page to
68  // write output to. The function returns a pair. The first item in
69  // the pair is the page, the second is the number of bytes in the page
70  std::function<std::pair<void*, size_t>()> getNewPage;
71 
72  // this is a function that the pipeline calls to write back a page.
73  // The first arg is the page to write back (and free), and the second
74  // is the size of the page
75  std::function<void(void*)> writeBackPage;
76 
77  // this is a function that the pipieline calls to free a page, without
78  // writing it back (because it has no useful data)
79  std::function<void(void*)> discardPage;
80 
81  // this is the source of data in the pipeline
83 
84  // this is where the pipeline goes to write the data
86 
87  // here is our pipeline
88  std::vector<ComputeExecutorPtr> pipeline;
89 
90  // and here is all of the pages we've not yet written back
91  std::queue<MemoryHolderPtr> unwrittenPages;
92 
93 public:
94  // the first argument is a function to call that gets a new output page...
95  // the second arguement is a function to call that deals with a full output page
96  // the third argument is the iterator that will create TupleSets to process
97  Pipeline(std::function<std::pair<void*, size_t>()> getNewPage,
98  std::function<void(void*)> discardPage,
99  std::function<void(void*)> writeBackPage,
101  ComputeSinkPtr tupleSink)
105  dataSource(dataSource),
106  dataSink(tupleSink) {}
107 
108  // adds a stage to the pipeline
110  pipeline.push_back(addMe);
111  }
112 
114 
115  // kill all of the pipeline stages
116  while (pipeline.size())
117  pipeline.pop_back();
118 
119  // first, reverse the queue so we go oldest to newest
120  // this ensures that everything is deleted in the reverse order that it was created
121  std::vector<MemoryHolderPtr> reverser;
122  while (unwrittenPages.size() > 0) {
123  reverser.push_back(unwrittenPages.front());
124  unwrittenPages.pop();
125  }
126 
127  while (reverser.size() > 0) {
128  unwrittenPages.push(reverser.back());
129  reverser.pop_back();
130  }
131 
132  // write back all of the pages
133  cleanPages(999999999);
134 
135  if (unwrittenPages.size() != 0)
136  std::cout
137  << "This is bad: in destructor for pipeline, still some pages with objects!!\n";
138  }
139 
140  // writes back any unwritten pages
141  void cleanPages(int iteration) {
142 
143  // take care of getting rid of any pages... but only get rid of those from two iterations
144  // ago...
145  // pages from the last iteration may still have pointers into them
146  PDB_COUT << "to clean page for iteration-" << iteration << std::endl;
147  PDB_COUT << "unwrittenPages.size() =" << unwrittenPages.size() << std::endl;
148 
149  while (unwrittenPages.size() > 0 && iteration > unwrittenPages.front()->iteration + 1) {
150  PDB_COUT << "unwrittenPages.front()->iteration=" << unwrittenPages.front()->iteration
151  << std::endl;
152  // in this case, the page did not have any output data written to it... it only had
153  // intermediate results, and so we will just discard it
154  if (unwrittenPages.front()->outputSink == nullptr) {
155  if (getNumObjectsInAllocatorBlock(unwrittenPages.front()->location) != 0) {
156 
157  // this is bad... there should not be any objects here because this memory
158  // chunk does not store an output vector
159  emptyOutContainingBlock(unwrittenPages.front()->location);
160 
161  std::cout << "This is Strange... how did I find a page with objects??\n";
162  }
163 
164  discardPage(unwrittenPages.front()->location);
165  unwrittenPages.pop();
166 
167  // in this case, the page DID have some data written to it
168  } else {
169  // and force the reference count for this guy to go to zero
170  PDB_COUT << "to empty out containing block" << std::endl;
171  unwrittenPages.front()->outputSink.emptyOutContainingBlock();
172 
173  // OK, because we will have invalidated the current object allocator block, we need
174  // to
175  // create a new one, or this could cause a lot of problems!!
176  if (iteration == 999999999)
177  makeObjectAllocatorBlock(1024, true);
178 
179  // make sure he is written
180  writeBackPage(unwrittenPages.front()->location);
181 
182  // and get ridda him
183  unwrittenPages.pop();
184  }
185  }
186  }
187 
188  // runs the pipeline
189  void run() {
190  // this is where we are outputting all of our results to
191  MemoryHolderPtr myRAM = std::make_shared<MemoryHolder>(getNewPage());
192  // Jia Note: this is not perfect to always create a container in every new page, but
193  // doing this can avoid a memory copy
194  if (myRAM->location == nullptr) {
195  std::cout << "ERROR: insufficient memory in heap" << std::endl;
196  return;
197  }
198  if (myRAM->outputSink == nullptr) {
199  myRAM->outputSink = dataSink->createNewOutputContainer();
200  }
201 
202  // and here is the chunk
203  TupleSetPtr curChunk;
204 
205  // the iteration counter
206  int iteration = 0;
207 
208  // while there is still data
209  // Jia Note: dataSource->getNextTupleSet() can throw exception for certain data sources like
210  // MapTupleSetIterator
211  while (true) {
212 
213  try {
214  curChunk = dataSource->getNextTupleSet();
215  } catch (NotEnoughSpace& n) {
216  myRAM->setIteration(iteration);
217  unwrittenPages.push(myRAM);
218  myRAM = std::make_shared<MemoryHolder>(getNewPage());
219  if (myRAM->location == nullptr) {
220  std::cout << "ERROR: insufficient memory in heap" << std::endl;
221  return;
222  }
223  if (myRAM->outputSink == nullptr) {
224  myRAM->outputSink = dataSink->createNewOutputContainer();
225  }
226  // then try again
227  try {
228  curChunk = dataSource->getNextTupleSet();
229  } catch (NotEnoughSpace& n) {
230  // consider to reduce batch size" << std :: endl;
231  std::cout << "batch size tuned to be " << MIN_BATCH_SIZE << std::endl;
232  dataSource->setChunkSize(MIN_BATCH_SIZE);
233  try {
234  curChunk = dataSource->getNextTupleSet();
235  } catch (NotEnoughSpace& n) {
236  std::cout << "batch size tuned to be 1" << std::endl;
237  dataSource->setChunkSize(1);
238  try {
239  curChunk = dataSource->getNextTupleSet();
240  } catch (NotEnoughSpace& n) {
241  std::cout << "batch size tuned to be " << MIN_BATCH_SIZE << std::endl;
242  return;
243  }
244  }
245  }
246  }
247  if (curChunk == nullptr) {
248  break;
249  }
250  // go through all of the pipeline stages
251  for (ComputeExecutorPtr& q : pipeline) {
252 
253  try {
254  curChunk = q->process(curChunk);
255 
256  } catch (NotEnoughSpace& n) {
257  // and get a new page
258  myRAM->setIteration(iteration);
259  unwrittenPages.push(myRAM);
260  myRAM = std::make_shared<MemoryHolder>(getNewPage());
261  if (myRAM->location == nullptr) {
262  std::cout << "ERROR: insufficient memory in heap" << std::endl;
263  return;
264  }
265  if (myRAM->outputSink == nullptr) {
266  myRAM->outputSink = dataSink->createNewOutputContainer();
267  }
268  // then try again
269  try {
270  curChunk = q->process(curChunk);
271  } catch (NotEnoughSpace& n) {
272  std::cout << "Pipeline Error: Batch processing memory exceeds page size "
273  "for executor type: "
274  << q->getType() << ", consider to reduce batch size" << std::endl;
275  return;
276  }
277  }
278  }
279 
280  try {
281 
282  if (myRAM->outputSink == nullptr) {
283  myRAM->outputSink = dataSink->createNewOutputContainer();
284  }
285  dataSink->writeOut(curChunk, myRAM->outputSink);
286 
287  } catch (NotEnoughSpace& n) {
288 
289  // again, we ran out of RAM here, so write back the page and then create a new
290  // output page
291  std::cout << "pipeline runs out of RAM" << std::endl;
292  myRAM->setIteration(iteration);
293  unwrittenPages.push(myRAM);
294  myRAM = std::make_shared<MemoryHolder>(getNewPage());
295 
296  // and again, try to write back the output
297  myRAM->outputSink = dataSink->createNewOutputContainer();
298  dataSink->writeOut(curChunk, myRAM->outputSink);
299  }
300 
301  // lastly, write back all of the output pages
302  iteration++;
303  cleanPages(iteration);
304  }
305 
306  // set the iteration
307  myRAM->setIteration(iteration);
308 
309  // and remember the page
310  unwrittenPages.push(myRAM);
311  }
312 };
313 
314 typedef std::shared_ptr<Pipeline> PipelinePtr;
315 }
316 
317 #endif
std::function< void(void *)> writeBackPage
Definition: Pipeline.h:75
std::function< void(void *)> discardPage
Definition: Pipeline.h:79
std::shared_ptr< ComputeSource > ComputeSourcePtr
Definition: ComputeSource.h:26
std::shared_ptr< MemoryHolder > MemoryHolderPtr
Definition: Pipeline.h:61
std::function< std::pair< void *, size_t >)> getNewPage
Definition: Pipeline.h:70
std::queue< MemoryHolderPtr > unwrittenPages
Definition: Pipeline.h:91
std::shared_ptr< Pipeline > PipelinePtr
Definition: Pipeline.h:314
#define MIN_BATCH_SIZE
Definition: Pipeline.h:29
ComputeSourcePtr dataSource
Definition: Pipeline.h:82
Record< ObjType > * getRecord(Handle< ObjType > &forMe)
Pipeline(std::function< std::pair< void *, size_t >()> getNewPage, std::function< void(void *)> discardPage, std::function< void(void *)> writeBackPage, ComputeSourcePtr dataSource, ComputeSinkPtr tupleSink)
Definition: Pipeline.h:97
std::shared_ptr< ComputeSink > ComputeSinkPtr
Definition: ComputeSink.h:27
void cleanPages(int iteration)
Definition: Pipeline.h:141
std::vector< ComputeExecutorPtr > pipeline
Definition: Pipeline.h:88
std::shared_ptr< TupleSet > TupleSetPtr
Definition: TupleSet.h:64
void run()
Definition: Pipeline.h:189
#define PDB_COUT
Definition: PDBDebug.h:31
unsigned getNumObjectsInAllocatorBlock(void *forMe)
std::shared_ptr< ComputeExecutor > ComputeExecutorPtr
Handle< Object > outputSink
Definition: Pipeline.h:39
void * location
Definition: Pipeline.h:42
void addStage(ComputeExecutorPtr addMe)
Definition: Pipeline.h:109
void makeObjectAllocatorBlock(size_t numBytesIn, bool throwExceptionOnFail)
ComputeSinkPtr dataSink
Definition: Pipeline.h:85
void emptyOutContainingBlock(void *forMe)
void setIteration(int iterationIn)
Definition: Pipeline.h:48
MemoryHolder(std::pair< void *, size_t > buildMe)
Definition: Pipeline.h:54