A platform for high-performance distributed tool and library development written in C++. It can be deployed in two different cluster modes: standalone or distributed. API for v0.5.0, released on June 13, 2018.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
PDBServer.cc
Go to the documentation of this file.
1 /*****************************************************************************
2  * *
3  * Copyright 2018 Rice University *
4  * *
5  * Licensed under the Apache License, Version 2.0 (the "License"); *
6  * you may not use this file except in compliance with the License. *
7  * You may obtain a copy of the License at *
8  * *
9  * http://www.apache.org/licenses/LICENSE-2.0 *
10  * *
11  * Unless required by applicable law or agreed to in writing, software *
12  * distributed under the License is distributed on an "AS IS" BASIS, *
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. *
14  * See the License for the specific language governing permissions and *
15  * limitations under the License. *
16  * *
17  *****************************************************************************/
18 
19 #ifndef PDB_SERVER_CC
20 #define PDB_SERVER_CC
21 
22 #include "BuiltInObjectTypeIDs.h"
23 #include "Handle.h"
24 #include "PDBAlarm.h"
25 #include <iostream>
26 #include <netinet/in.h>
27 #include "PDBServer.h"
28 #include "PDBWorker.h"
29 #include "ServerWork.h"
30 #include <signal.h>
31 #include <sys/socket.h>
32 #include <stdio.h>
33 #include <sys/socket.h>
34 #include <sys/types.h>
35 #include <sys/un.h>
36 #include <unistd.h>
37 #include <signal.h>
38 #include "PDBCommunicator.h"
39 #include "CloseConnection.h"
40 #include "ShutDown.h"
41 #include "ServerFunctionality.h"
43 #include "SimpleRequestResult.h"
44 #include <memory>
45 
46 namespace pdb {
47 
48 // Constructor for a server passing the port and hostname as args
49 PDBServer::PDBServer(int portNumberIn, int numConnectionsIn, PDBLoggerPtr myLoggerIn) {
50 
51  // remember the communication data
52  portNumber = portNumberIn;
53  numConnections = numConnectionsIn;
54  myLogger = myLoggerIn;
55  isInternet = true;
56  allDone = false;
57  struct sigaction sa;
58  memset(&sa, '\0', sizeof(sa));
59  sa.sa_handler = SIG_IGN;
60  sigaction(SIGPIPE, &sa, 0);
61  // create the workers
62  myWorkers = make_shared<PDBWorkerQueue>(myLogger, numConnections);
63 }
64 
65 PDBServer::PDBServer(string unixFileIn, int numConnectionsIn, PDBLoggerPtr myLoggerIn) {
66 
67  // remember the communication data
68  unixFile = unixFileIn;
69  numConnections = numConnectionsIn;
70  myLogger = myLoggerIn;
71  isInternet = false;
72  allDone = false;
73  struct sigaction sa;
74  memset(&sa, '\0', sizeof(sa));
75  sa.sa_handler = SIG_IGN;
76  sigaction(SIGPIPE, &sa, 0);
77  // create the workers
78  myWorkers = make_shared<PDBWorkerQueue>(myLogger, numConnections);
79 }
80 
81 void PDBServer::registerHandler(int16_t requestID, PDBCommWorkPtr handledBy) {
82  handlers[requestID] = handledBy;
83 }
84 
85 // this is the entry point for the listener to the port
86 
87 void* callListen(void* serverInstance) {
88  PDBServer* temp = static_cast<PDBServer*>(serverInstance);
89  temp->listen();
90  return nullptr;
91 }
92 
94 
95  string errMsg;
96 
97  // two cases: first, we are connecting to the internet
98  if (isInternet) {
99 
100  // wait for an internet socket
101  sockFD = socket(AF_INET, SOCK_STREAM, 0);
102 
103  // added by Jia to avoid TimeWait state for old sockets
104  int optval = 1;
105  if (setsockopt(sockFD, SOL_SOCKET, SO_REUSEADDR, &optval, sizeof(optval)) < 0) {
106  myLogger->error("PDBServer: couldn't setsockopt");
107  myLogger->error(strerror(errno));
108  std::cout << "PDBServer: couldn't setsockopt:" << strerror(errno) << std::endl;
109  close(sockFD);
110  exit(0);
111  }
112 
113 
114  if (sockFD < 0) {
115  myLogger->error("PDBServer: could not get FD to internet socket");
116  myLogger->error(strerror(errno));
117  close(sockFD);
118  exit(0);
119  }
120 
121  // bind the socket FD
122  struct sockaddr_in serv_addr;
123  bzero((char*)&serv_addr, sizeof(serv_addr));
124  serv_addr.sin_family = AF_INET;
125  serv_addr.sin_addr.s_addr = INADDR_ANY;
126  serv_addr.sin_port = htons(portNumber);
127  int retVal = ::bind(sockFD, (struct sockaddr*)&serv_addr, sizeof(serv_addr));
128  if (retVal < 0) {
129  myLogger->error("PDBServer: could not bind to internet socket");
130  myLogger->error(strerror(errno));
131  close(sockFD);
132  exit(0);
133  }
134 
135  myLogger->trace("PDBServer: about to listen to the Internet for a connection");
136 
137  // set the backlog on the socket
138  if (::listen(sockFD, 100) != 0) {
139  myLogger->error("PDBServer: listen error");
140  myLogger->error(strerror(errno));
141  close(sockFD);
142  exit(0);
143  }
144 
145  myLogger->trace("PDBServer: ready to go!");
146 
147  // wait for someone to try to connect
148  while (!allDone) {
149  PDBCommunicatorPtr myCommunicator = make_shared<PDBCommunicator>();
150  if (myCommunicator->pointToInternet(myLogger, sockFD, errMsg)) {
151  myLogger->error("PDBServer: could not point to an internet socket: " + errMsg);
152  continue;
153  }
154  myLogger->info(std::string("accepted the connection with sockFD=") +
155  std::to_string(myCommunicator->getSocketFD()));
156  PDB_COUT << "||||||||||||||||||||||||||||||||||" << std::endl;
157  PDB_COUT << "accepted the connection with sockFD=" << myCommunicator->getSocketFD()
158  << std::endl;
159  handleRequest(myCommunicator);
160  }
161 
162  // second, we are connecting to a local UNIX socket
163  } else {
164 
165  myLogger->trace("PDBServer: getting socket to file");
166  sockFD = socket(PF_UNIX, SOCK_STREAM, 0);
167 
168  if (sockFD < 0) {
169  myLogger->error("PDBServer: could not get FD to local socket");
170  myLogger->error(strerror(errno));
171  exit(0);
172  }
173 
174  // bind the socket FD
175  struct sockaddr_un serv_addr;
176  bzero((char*)&serv_addr, sizeof(serv_addr));
177  serv_addr.sun_family = AF_UNIX;
178  snprintf(serv_addr.sun_path, sizeof(serv_addr.sun_path), "%s", unixFile.c_str());
179 
180  if (::bind(sockFD, (struct sockaddr*)&serv_addr, sizeof(struct sockaddr_un))) {
181  myLogger->error("PDBServer: could not bind to local socket");
182  myLogger->error(strerror(errno));
183  // if pathToBackEndServer exists, delete it.
184  if (unlink(unixFile.c_str()) == 0) {
185  PDB_COUT << "Removed outdated " << unixFile.c_str() << ".\n";
186  }
187  if (::bind(sockFD, (struct sockaddr*)&serv_addr, sizeof(struct sockaddr_un))) {
188  myLogger->error(
189  "PDBServer: still could not bind to local socket after removing unixFile");
190  myLogger->error(strerror(errno));
191  exit(0);
192  }
193  }
194 
195  myLogger->debug("PDBServer: socket has name");
196  myLogger->debug(serv_addr.sun_path);
197 
198  myLogger->trace("PDBServer: about to listen to the file for a connection");
199 
200  // set the backlog on the socket
201  if (::listen(sockFD, 100) != 0) {
202  myLogger->error("PDBServer: listen error");
203  myLogger->error(strerror(errno));
204  exit(0);
205  }
206 
207  myLogger->trace("PDBServer: ready to go!");
208 
209  // wait for someone to try to connect
210  while (!allDone) {
211  PDBCommunicatorPtr myCommunicator;
212  myCommunicator = make_shared<PDBCommunicator>();
213  if (myCommunicator->pointToFile(myLogger, sockFD, errMsg)) {
214  myLogger->error("PDBServer: could not point to an local UNIX socket: " + errMsg);
215  continue;
216  }
217  PDB_COUT << "||||||||||||||||||||||||||||||||||" << std::endl;
218  PDB_COUT << "accepted the connection with sockFD=" << myCommunicator->getSocketFD()
219  << std::endl;
220  handleRequest(myCommunicator);
221  }
222  }
223  // let the main thread know we are done
224  allDone = true;
225 }
226 
227 // gets access to worker queue
229  return this->myWorkers;
230 }
231 
232 // gets access to logger
234  return this->myLogger;
235 }
236 
238 
239  ServerWorkPtr tempWork{make_shared<ServerWork>(*this)};
240  tempWork->setGuts(myCommunicator);
241  PDBWorkerPtr tempWorker = myWorkers->getWorker();
242  tempWorker->execute(tempWork, tempWork->getLinkedBuzzer());
243 }
244 
245 // returns true while we need to keep going... false when this connection is done
246 bool PDBServer::handleOneRequest(PDBBuzzerPtr callerBuzzer, PDBCommunicatorPtr myCommunicator) {
247 
248  // figure out what type of message the client is sending us
249  int16_t requestID = myCommunicator->getObjectTypeID();
250  string info;
251  bool success;
252 
253  // if there was a request to close the connection, just get outta here
254  if (requestID == CloseConnection_TYPEID) {
255  UseTemporaryAllocationBlock tempBlock{2048};
256  Handle<CloseConnection> closeMsg =
257  myCommunicator->getNextObject<CloseConnection>(success, info);
258  if (!success) {
259  myLogger->error("PDBServer: close connection request, but was an error: " + info);
260  } else {
261  myLogger->trace("PDBServer: close connection request");
262  }
263  return false;
264  }
265 
266  if (requestID == NoMsg_TYPEID) {
267  string err, info;
268  myLogger->trace("PDBServer: the other side closed the connection");
269  return false;
270  }
271 
272  // if we are asked to shut down...
273  if (requestID == ShutDown_TYPEID) {
274  UseTemporaryAllocationBlock tempBlock{2048};
275  Handle<ShutDown> closeMsg = myCommunicator->getNextObject<ShutDown>(success, info);
276  if (!success) {
277  myLogger->error("PDBServer: close connection request, but was an error: " + info);
278  } else {
279  myLogger->trace("PDBServer: close connection request");
280  }
281  PDB_COUT << "Cleanup server functionalities" << std::endl;
282  // for each functionality, invoke its clean() method
283  for (int i = 0; i < allFunctionalities.size(); i++) {
284  allFunctionalities.at(i)->cleanup();
285  }
286 
287 
288  // ack the result
289  std::string errMsg;
291  makeObject<SimpleRequestResult>(true, "successful shutdown of server");
292  if (!myCommunicator->sendObject(result, errMsg)) {
293  myLogger->error("PDBServer: close connection request, but count not send response: " +
294  errMsg);
295  }
296 
297  // kill the FD and let everyone know we are done
298  allDone = true;
299  // close(sockFD); //we can't simply close socket like this, because there are still incoming
300  // messages in accepted connections
301  // use reuse address option instead
302  return false;
303  }
304 
305  // and get a worker plus the appropriate work to service it
306  if (handlers.count(requestID) == 0) {
307 
308  // there is not one, so send back an appropriate message
309  myLogger->error("PDBServer: could not find an appropriate handler");
310  return false;
311 
312  // in this case, got a handler
313  } else {
314 
315  /*// get the handler
316  myLogger->writeLn("PDBServer: found an appropriate handler");
317  myLogger->writeLn("PDBServer: getting a worker...");
318 
319  //should comment out following lines to recover Chris' old code;
320  PDBCommWorkPtr tempWork = handlers[requestID]->clone();
321  myLogger->writeLn("PDBServer: setting guts");
322  tempWork->setGuts (myCommunicator);
323  tempWork->execute(callerBuzzer);*/
324 
325  // End code replacement for testing
326 
327  // Chris' old code: (Observed problem: sometimes, buzzer never get buzzed.)
328  // get a worker to run the handler (this blocks if no workers available)
329  PDBWorkerPtr tempWorker = myWorkers->getWorker();
330  myLogger->trace("PDBServer: got a worker, start to do something...");
331  myLogger->trace("PDBServer: requestID " + std::to_string(requestID));
332 
333  PDBCommWorkPtr tempWork = handlers[requestID]->clone();
334 
335  myLogger->trace("PDBServer: setting guts");
336  tempWork->setGuts(myCommunicator);
337  tempWorker->execute(tempWork, callerBuzzer);
338  callerBuzzer->wait();
339  myLogger->trace("PDBServer: handler has completed its work");
340  return true;
341  }
342 }
343 
344 void PDBServer::signal(PDBAlarm signalWithMe) {
345  myWorkers->notifyAllWorkers(signalWithMe);
346 }
347 
348 void PDBServer::startServer(PDBWorkPtr runMeAtStart) {
349 
350  // ignore broken pipe signals
351  ::signal(SIGPIPE, SIG_IGN);
352 
353  // if there was some work to execute to start things up, then do it
354  if (runMeAtStart != nullptr) {
355  PDBBuzzerPtr buzzMeWhenDone = runMeAtStart->getLinkedBuzzer();
356  PDBWorkerPtr tempWorker = myWorkers->getWorker();
357  tempWorker->execute(runMeAtStart, buzzMeWhenDone);
358  buzzMeWhenDone->wait();
359  }
360 
361  // listen to the socket
362  int return_code = pthread_create(&listenerThread, nullptr, callListen, this);
363  if (return_code) {
364  myLogger->error("ERROR; return code from pthread_create () is " + to_string(return_code));
365  exit(-1);
366  }
367 
368  // and now just sleep
369  while (!allDone) {
370  sleep(1);
371  }
372 }
373 
375  allFunctionalities[allFunctionalities.size() - 1]->recordServer(*this);
376  allFunctionalities[allFunctionalities.size() - 1]->registerHandlers(*this);
377 }
378 
380  allDone = true;
381 }
382 }
383 
384 #endif
map< int16_t, PDBCommWorkPtr > handlers
Definition: PDBServer.h:130
void startServer(PDBWorkPtr runMeAtStart)
Definition: PDBServer.cc:348
shared_ptr< PDBWork > PDBWorkPtr
Definition: PDBWork.h:47
pthread_t listenerThread
Definition: PDBServer.h:157
shared_ptr< ServerWork > ServerWorkPtr
Definition: ServerWork.h:36
PDBLoggerPtr getLogger()
Definition: PDBServer.cc:233
PDBWorkerQueuePtr getWorkerQueue()
Definition: PDBServer.cc:228
void signal(PDBAlarm signalWithMe)
Definition: PDBServer.cc:344
PDBLoggerPtr myLogger
Definition: PDBServer.h:151
std::shared_ptr< PDBCommunicator > PDBCommunicatorPtr
void * callListen(void *serverInstance)
Definition: PDBServer.cc:87
bool handleOneRequest(PDBBuzzerPtr buzzMeWhenDone, PDBCommunicatorPtr myCommunicator)
Definition: PDBServer.cc:246
void listen()
Definition: PDBServer.cc:93
int numConnections
Definition: PDBServer.h:145
PDBServer(int portNumberIn, int numConnections, PDBLoggerPtr myLogger)
Definition: PDBServer.cc:49
shared_ptr< PDBBuzzer > PDBBuzzerPtr
Definition: PDBBuzzer.h:32
shared_ptr< PDBWorkerQueue > PDBWorkerQueuePtr
#define PDB_COUT
Definition: PDBDebug.h:31
shared_ptr< PDBWorker > PDBWorkerPtr
Definition: PDBWorker.h:40
void registerHandler(int16_t typeID, PDBCommWorkPtr handledBy)
Definition: PDBServer.cc:81
void handleRequest(PDBCommunicatorPtr myCommunicator)
Definition: PDBServer.cc:237
std::vector< shared_ptr< ServerFunctionality > > allFunctionalities
Definition: PDBServer.h:166
PDBAlarm
Definition: PDBAlarm.h:28
std::shared_ptr< PDBCommWork > PDBCommWorkPtr
Definition: PDBCommWork.h:37
string unixFile
Definition: PDBServer.h:148
std::shared_ptr< PDBLogger > PDBLoggerPtr
Definition: PDBLogger.h:40
PDBWorkerQueuePtr myWorkers
Definition: PDBServer.h:133
void registerHandlersFromLastFunctionality()
Definition: PDBServer.cc:374