A platform for high-performance distributed tool and library development written in C++. It can be deployed in two different cluster modes: standalone or distributed. API for v0.5.0, released on June 13, 2018.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
PageCircularBuffer.cc
Go to the documentation of this file.
1 /*****************************************************************************
2  * *
3  * Copyright 2018 Rice University *
4  * *
5  * Licensed under the Apache License, Version 2.0 (the "License"); *
6  * you may not use this file except in compliance with the License. *
7  * You may obtain a copy of the License at *
8  * *
9  * http://www.apache.org/licenses/LICENSE-2.0 *
10  * *
11  * Unless required by applicable law or agreed to in writing, software *
12  * distributed under the License is distributed on an "AS IS" BASIS, *
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. *
14  * See the License for the specific language governing permissions and *
15  * limitations under the License. *
16  * *
17  *****************************************************************************/
18 
19 
20 #ifndef PAGE_CIRCULAR_BUFFER_CC
21 #define PAGE_CIRCULAR_BUFFER_CC
22 
23 #include "PDBDebug.h"
24 #include "PageCircularBuffer.h"
25 #include <string>
26 #include <iostream>
27 #include <pthread.h>
28 #include <sched.h>
29 
31  this->maxArraySize = bufferSize + 1;
32  this->logger = logger;
33  this->closed = false;
34  this->initArray();
35  pthread_mutex_init(&(this->mutex), NULL);
36  pthread_mutex_init(&(this->addPageMutex), NULL);
37  pthread_cond_init(&(this->cond), NULL);
38 }
39 
41  // the buffer is not responsible for freeing the elements in the buffer
42  delete[] this->pageArray;
43  pthread_mutex_destroy(&(this->mutex));
44  pthread_cond_destroy(&(this->cond));
45 }
46 
48  this->pageArray = new PDBPagePtr[this->maxArraySize];
49  if (this->pageArray == nullptr) {
50  cout << "PageCircularBuffer: Out of Memory in Heap.\n";
51  this->logger->writeLn("PageCircularBuffer: Out of Memory in Heap.");
52  return -1;
53  }
54  unsigned int i;
55  for (i = 0; i < this->maxArraySize; i++) {
56 
57  this->pageArray[i] = nullptr;
58  }
59  this->pageArrayHead = 0;
60  this->pageArrayTail = 0;
61  return 0;
62 }
63 
64 // in our case, more than one producer will add pages to the tail of the blocking queue
66  pthread_mutex_lock(&(this->addPageMutex));
67  int i = 0;
68  while (this->isFull()) {
69  i++;
70  if (i % 10000000 == 0) {
71  this->logger->info(std::to_string(i) +
72  std::string(":PageCircularBuffer: array is full."));
73  }
74  pthread_cond_signal(&(this->cond));
75  sched_yield(); // TODO: consider to use another conditional variable
76  }
77 
78  this->logger->writeLn("PageCircularBuffer:got a place.");
79  this->pageArrayTail = (this->pageArrayTail + 1) % this->maxArraySize;
80  this->pageArray[this->pageArrayTail] = page;
81  pthread_mutex_unlock(&(this->addPageMutex));
82  pthread_mutex_lock(&(this->mutex));
83  if (this->getSize() <= 2) { // TODO <= numThreads? or not necessary
84  pthread_cond_broadcast(&(this->cond));
85  } else {
86  pthread_cond_signal(&(this->cond));
87  }
88  pthread_mutex_unlock(&(this->mutex));
89  return 0;
90 }
91 
92 // there will be multiple consumers, so we need to guard the blocking queue
93 
95  pthread_mutex_lock(&(this->mutex));
96  if (this->isEmpty() && (this->closed == false)) {
97  this->logger->writeLn("PageCircularBuffer: array is empty.");
98  pthread_cond_wait(&(this->cond), &(this->mutex));
99  }
100  if (!this->isEmpty()) {
101  this->logger->debug("PageCircularBuffer: not empty, return the head element");
102  this->pageArrayHead = (this->pageArrayHead + 1) % this->maxArraySize;
103  PDBPagePtr ret = this->pageArray[this->pageArrayHead];
104  this->pageArray[this->pageArrayHead] = nullptr;
105  pthread_mutex_unlock(&(this->mutex));
106  return ret;
107  } else {
108  pthread_mutex_unlock(&(this->mutex));
109  return nullptr;
110  }
111 }
112 // not thread-safe
113 
115  return (this->pageArrayHead == (this->pageArrayTail + 1) % this->maxArraySize);
116 }
117 // not thread-safe
118 
120  PDB_COUT << "this->pageArrayHead=" << this->pageArrayHead << std::endl;
121  this->logger->debug(std::string("this->pageArrayHead=") + std::to_string(this->pageArrayHead));
122  this->logger->debug(std::string("this->pageArrayTail=") + std::to_string(this->pageArrayTail));
123  PDB_COUT << "this->pageArrayTail=" << this->pageArrayTail << std::endl;
124  return (this->pageArrayHead == this->pageArrayTail);
125 }
126 // not thread-safe
127 
129  return (this->pageArrayTail - this->pageArrayHead + this->maxArraySize) % this->maxArraySize;
130 }
131 
133  pthread_mutex_lock(&(this->mutex));
134  this->closed = true;
135  pthread_cond_broadcast(&(this->cond));
136  pthread_mutex_unlock(&(this->mutex));
137 }
138 
139 
141  this->closed = false;
142 }
143 #endif
shared_ptr< PDBPage > PDBPagePtr
Definition: PDBPage.h:32
pthread_mutex_t addPageMutex
unsigned int pageArrayTail
unsigned int pageArrayHead
pthread_mutex_t mutex
int addPageToTail(PDBPagePtr page)
#define PDB_COUT
Definition: PDBDebug.h:31
PDBPagePtr popPageFromHead()
std::shared_ptr< PDBLogger > PDBLoggerPtr
Definition: PDBLogger.h:40
pdb::PDBLoggerPtr logger
PageCircularBuffer(unsigned int bufferSize, pdb::PDBLoggerPtr logger)