A platform for high-performance distributed tool and library development written in C++. It can be deployed in two different cluster modes: standalone or distributed. API for v0.5.0, released on June 13, 2018.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
PDBWorkerQueue.cc
Go to the documentation of this file.
1 /*****************************************************************************
2  * *
3  * Copyright 2018 Rice University *
4  * *
5  * Licensed under the Apache License, Version 2.0 (the "License"); *
6  * you may not use this file except in compliance with the License. *
7  * You may obtain a copy of the License at *
8  * *
9  * http://www.apache.org/licenses/LICENSE-2.0 *
10  * *
11  * Unless required by applicable law or agreed to in writing, software *
12  * distributed under the License is distributed on an "AS IS" BASIS, *
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. *
14  * See the License for the specific language governing permissions and *
15  * limitations under the License. *
16  * *
17  *****************************************************************************/
18 
19 #ifndef PDB_WORKER_Q_C
20 #define PDB_WORKER_Q_C
21 
22 #include <iostream>
23 #include <climits>
24 #include "LockGuard.h"
25 #include "NothingWork.h"
26 #include <limits.h>
27 #include "PDBWorkerQueue.h"
28 
29 namespace pdb {
30 
31 extern void* stackBase;
32 extern void* stackEnd;
33 
34 PDBWorkerQueue::PDBWorkerQueue(PDBLoggerPtr myLoggerIn, int numWorkers) {
35 
36  // first, make sure that another worker queue does not exist
37  if (stackBase != nullptr) {
38  std::cout << "I have detected that you have started two PDBWorkerQueue objects in this "
39  "process.\n";
40  std::cout << "This is a really bad idea. It probably will result in a crash at some "
41  "point, and\n";
42  std::cout << "regardless of that, the idea is to have one queue that everyone draws "
43  "workers from.\n";
44  }
45 
46  pthread_mutex_init(&waitingMutex, nullptr);
47  pthread_mutex_init(&workingMutex, nullptr);
48  pthread_cond_init(&waitingSignal, nullptr);
49  shuttingDown = false;
50  numOut = 0;
51 
52  if (stackBase != nullptr) {
53  std::cout
54  << "This is bad... it appears that you are creating more than one worker queue!\n";
55  exit(1);
56  }
57 
58  // this is the location where each worker is going to have their stack
59  origStackBase = malloc(1024 * 1024 * 4 * (numWorkers + 1));
60 
61  // align it to 2^22... chop off the last 22 bits, and then add 2^22 to the address
62  stackBase = (void*)(((((size_t)origStackBase) >> 22) << 22) + (1024 * 1024 * 4));
63  stackEnd = ((char*)stackBase) + 1024 * 1024 * 4 * numWorkers;
64 
65  // create each worker...
66  for (int i = 0; i < numWorkers; i++) {
67  // put an allocator at the base of the stack for this worker... give him 64MB of RAM to work
68  // with
69  new (i * 1024 * 1024 * 4 + ((char*)stackBase))
71 
72  // now create the worker
73  addAnotherWorker(i * 1024 * 1024 * 4 + sizeof(Allocator) + ((char*)stackBase),
74  (i + 1) * 1024 * 1024 * 4 + ((char*)stackBase));
75  }
76  myLogger = myLoggerIn;
77 }
78 
80  return myLogger;
81 }
82 
84 
85  // let everyone know we are shutting down
86  shuttingDown = true;
87 
88  // keep getting threads and giving them nothing to do... once they complete
89  // the nothing work, they will die since shuttingDown = true. This goes
90  // until we can't get any thrads because there are no more of them outstanding
91  while (true) {
92 
93  // basically, we repeatedly ask for workers; this call will return
94  // when either (a) there is a worker available, or (b) there are
95  // no workers available but there are no workers in existence
96  PDBWorkerPtr myWorker = getWorker();
97  if (myWorker == nullptr)
98  break;
99 
100  PDBWorkPtr nothing{make_shared<NothingWork>()};
101  myWorker->execute(nothing, nullptr);
102  }
103 
104  // join with all of the threads
105  for (pthread_t thread : threads) {
106  if (pthread_join(thread, nullptr) != 0) {
107  cout << "Error joining with thread as the worker queue is shutting down!!\n";
108  exit(-1);
109  }
110  }
111 
112  // kill all of the mutexes/signal vars
113  pthread_mutex_destroy(&waitingMutex);
114  pthread_mutex_destroy(&workingMutex);
115  pthread_cond_destroy(&waitingSignal);
116 
117  // kill the allocators and destroy the stack space
118  for (int i = 0; i < threads.size(); i++) {
119  ((Allocator*)(i * 1024 * 1024 * 4 + ((char*)stackBase)))->~Allocator();
120  }
121  free(origStackBase);
122 }
123 
125 
126  PDBWorkerPtr myWorker;
127 
128  {
129  // make sure there is a worker
130  const LockGuard guard{waitingMutex};
131  while (waiting.size() == 0 && numOut != 0) {
132  pthread_cond_wait(&waitingSignal, &waitingMutex);
133  }
134 
135  // special case: there are no workers... just return a null
136  if (numOut == 0) {
137  return nullptr;
138  }
139 
140  // get the worker
141  myWorker = waiting.back();
142  waiting.pop_back();
143  }
144 
145  {
146  // remember that he is working
147  const LockGuard guard{workingMutex};
148  working.insert(myWorker);
149  }
150 
151  return myWorker;
152 }
153 
154 // this is the entry point for all of the worker threads
155 
156 void* enterTheQueue(void* pdbWorkerQueueInstance) {
157  PDBWorkerQueue* temp = static_cast<PDBWorkerQueue*>(pdbWorkerQueueInstance);
158  temp->enter();
159  return nullptr;
160 }
161 
162 void PDBWorkerQueue::addAnotherWorker(void* stackBaseIn, void* stackEndIn) {
163 
164  pthread_t thread;
165  threads.push_back(thread);
166 
167  // adjust the stack base to align it correctly
168  stackBaseIn = (void*)((((long)stackBaseIn + (PTHREAD_STACK_MIN - 1)) / PTHREAD_STACK_MIN) *
169  PTHREAD_STACK_MIN);
170 
171  // create a thread with an allocator located at the base of its stack
172  pthread_attr_t tattr;
173  pthread_attr_init(&tattr);
174  // pthread_attr_setdetachstate(&tattr, PTHREAD_CREATE_DETACHED);
175  pthread_attr_setstack(&tattr, stackBaseIn, ((char*)stackEndIn) - (char*)stackBaseIn);
176 
177  int return_code = pthread_create(&(threads[threads.size() - 1]), &tattr, enterTheQueue, this);
178  if (return_code) {
179  cout << "ERROR; return code from pthread_create () is " << return_code << '\n';
180  exit(-1);
181  } else {
182  numOut++;
183  }
184 }
185 
187  const LockGuard guard{workingMutex};
188 
189  // loop through all of the current workers
190  for (PDBWorkerPtr p : working) {
191 
192  // sound the buzzer...
193  p->soundBuzzer(withMe);
194  }
195 }
196 
198 
199  // create the work
200  PDBWorkerPtr temp{make_shared<PDBWorker>(this)};
201 
202  // then work until (a) someone told us that we need to die, or
203  // (b) we are trying to shut down the worker queue
204  while (!shuttingDown) {
205 
206  // put the work on the end of the queue
207  {
208  const LockGuard guard{waitingMutex};
209  waiting.push_back(temp);
210  pthread_cond_signal(&waitingSignal);
211  }
212 
213  // and enter the work
214  temp->enter();
215 
216  // when we have exited the work, it means that we are done, do we are no longer working
217  {
218  const LockGuard guard{workingMutex};
219  if (working.erase(temp) != 1) {
220  cout << "Why did I find != 1 copies of the worker?";
221  exit(-1);
222  }
223  }
224 
225  // make sure to kill all references to stuff that we want to be able to dellocate
226  // or else we will block and that stuff will sit around forever
227  temp->reset();
228  }
229 
230  // if we exited the loop, then this particular worker
231  // needs to die... so decrement the count of outstanding
232  // workers and let everyone know if the number is down to zero
233  {
234  // decrement the count of outstanding workers
235  const LockGuard guard{waitingMutex};
236  numOut--;
237 
238  // if there are no outstanding workers, everyone is woken up and gets a null
239  if (numOut == 0)
240  pthread_cond_broadcast(&waitingSignal);
241  }
242 }
243 }
244 
245 #endif
void * enterTheQueue(void *pdbWorkerQueueInstance)
PDBWorkerPtr getWorker()
void notifyAllWorkers(PDBAlarm withMe)
PDBLoggerPtr myLogger
void * stackBase
MultiPolicyAllocator< DefaultPolicy, NoReusePolicy, NoReferenceCountPolicy > Allocator
Definition: Allocator.h:522
static const size_t defaultAllocatorBlockSize
void * stackEnd
vector< PDBWorkerPtr > waiting
set< PDBWorkerPtr > working
shared_ptr< PDBWork > PDBWorkPtr
Definition: PDBWork.h:47
pthread_mutex_t waitingMutex
void addAnotherWorker(void *stackStart, void *stackEnd)
pthread_mutex_t workingMutex
PDBWorkerQueue(PDBLoggerPtr myLogger, int numWorkers)
shared_ptr< PDBWorker > PDBWorkerPtr
Definition: PDBWorker.h:40
PDBAlarm
Definition: PDBAlarm.h:28
std::shared_ptr< PDBLogger > PDBLoggerPtr
Definition: PDBLogger.h:40
pthread_cond_t waitingSignal
vector< pthread_t > threads
PDBLoggerPtr getLogger()