28 #ifndef MIN_BATCH_SIZE
29 #define MIN_BATCH_SIZE 10
105 dataSource(dataSource),
121 std::vector<MemoryHolderPtr> reverser;
127 while (reverser.size() > 0) {
137 <<
"This is bad: in destructor for pipeline, still some pages with objects!!\n";
146 PDB_COUT <<
"to clean page for iteration-" << iteration << std::endl;
161 std::cout <<
"This is Strange... how did I find a page with objects??\n";
170 PDB_COUT <<
"to empty out containing block" << std::endl;
176 if (iteration == 999999999)
194 if (myRAM->location ==
nullptr) {
195 std::cout <<
"ERROR: insufficient memory in heap" << std::endl;
198 if (myRAM->outputSink ==
nullptr) {
199 myRAM->outputSink =
dataSink->createNewOutputContainer();
216 myRAM->setIteration(iteration);
218 myRAM = std::make_shared<MemoryHolder>(
getNewPage());
219 if (myRAM->location ==
nullptr) {
220 std::cout <<
"ERROR: insufficient memory in heap" << std::endl;
223 if (myRAM->outputSink ==
nullptr) {
224 myRAM->outputSink =
dataSink->createNewOutputContainer();
231 std::cout <<
"batch size tuned to be " <<
MIN_BATCH_SIZE << std::endl;
236 std::cout <<
"batch size tuned to be 1" << std::endl;
241 std::cout <<
"batch size tuned to be " <<
MIN_BATCH_SIZE << std::endl;
247 if (curChunk ==
nullptr) {
254 curChunk = q->process(curChunk);
258 myRAM->setIteration(iteration);
260 myRAM = std::make_shared<MemoryHolder>(
getNewPage());
261 if (myRAM->location ==
nullptr) {
262 std::cout <<
"ERROR: insufficient memory in heap" << std::endl;
265 if (myRAM->outputSink ==
nullptr) {
266 myRAM->outputSink =
dataSink->createNewOutputContainer();
270 curChunk = q->process(curChunk);
272 std::cout <<
"Pipeline Error: Batch processing memory exceeds page size "
273 "for executor type: "
274 << q->getType() <<
", consider to reduce batch size" << std::endl;
282 if (myRAM->outputSink ==
nullptr) {
283 myRAM->outputSink =
dataSink->createNewOutputContainer();
285 dataSink->writeOut(curChunk, myRAM->outputSink);
291 std::cout <<
"pipeline runs out of RAM" << std::endl;
292 myRAM->setIteration(iteration);
294 myRAM = std::make_shared<MemoryHolder>(
getNewPage());
297 myRAM->outputSink =
dataSink->createNewOutputContainer();
298 dataSink->writeOut(curChunk, myRAM->outputSink);
307 myRAM->setIteration(iteration);
std::function< void(void *)> writeBackPage
std::function< void(void *)> discardPage
std::shared_ptr< ComputeSource > ComputeSourcePtr
std::shared_ptr< MemoryHolder > MemoryHolderPtr
std::function< std::pair< void *, size_t >)> getNewPage
std::queue< MemoryHolderPtr > unwrittenPages
std::shared_ptr< Pipeline > PipelinePtr
ComputeSourcePtr dataSource
Record< ObjType > * getRecord(Handle< ObjType > &forMe)
Pipeline(std::function< std::pair< void *, size_t >()> getNewPage, std::function< void(void *)> discardPage, std::function< void(void *)> writeBackPage, ComputeSourcePtr dataSource, ComputeSinkPtr tupleSink)
std::shared_ptr< ComputeSink > ComputeSinkPtr
void cleanPages(int iteration)
std::vector< ComputeExecutorPtr > pipeline
std::shared_ptr< TupleSet > TupleSetPtr
unsigned getNumObjectsInAllocatorBlock(void *forMe)
std::shared_ptr< ComputeExecutor > ComputeExecutorPtr
Handle< Object > outputSink
void addStage(ComputeExecutorPtr addMe)
void makeObjectAllocatorBlock(size_t numBytesIn, bool throwExceptionOnFail)
void emptyOutContainingBlock(void *forMe)
void setIteration(int iterationIn)
MemoryHolder(std::pair< void *, size_t > buildMe)