A platform for high-performance distributed tool and library development written in C++. It can be deployed in two different cluster modes: standalone or distributed. API for v0.5.0, released on June 13, 2018.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
PDBCommunicator.cc
Go to the documentation of this file.
1 /*****************************************************************************
2  * *
3  * Copyright 2018 Rice University *
4  * *
5  * Licensed under the Apache License, Version 2.0 (the "License"); *
6  * you may not use this file except in compliance with the License. *
7  * You may obtain a copy of the License at *
8  * *
9  * http://www.apache.org/licenses/LICENSE-2.0 *
10  * *
11  * Unless required by applicable law or agreed to in writing, software *
12  * distributed under the License is distributed on an "AS IS" BASIS, *
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. *
14  * See the License for the specific language governing permissions and *
15  * limitations under the License. *
16  * *
17  *****************************************************************************/
18 
19 
20 #ifndef PDB_COMMUN_C
21 #define PDB_COMMUN_C
22 
23 #include "PDBDebug.h"
24 #include "BuiltInObjectTypeIDs.h"
25 #include "Handle.h"
26 #include <iostream>
27 #include <netdb.h>
28 #include <netinet/in.h>
29 #include "Object.h"
30 #include "PDBVector.h"
31 #include "CloseConnection.h"
33 #include "InterfaceFunctions.h"
34 #include "PDBCommunicator.h"
35 #include <stdio.h>
36 #include <sys/socket.h>
37 #include <netinet/in.h>
38 #include <arpa/inet.h>
39 #include <sys/types.h>
40 #include <sys/uio.h>
41 #include <sys/un.h>
42 #include <netinet/in.h>
43 #include <unistd.h>
44 
45 
46 #define MAX_RETRIES 5
47 
48 
49 namespace pdb {
50 
52  readCurMsgSize = false;
53  socketFD = -1;
54  nextTypeID = NoMsg_TYPEID;
55  socketClosed = true;
56  // Jia: moved this logic from Chris' message-based communication framework to here
58  longConnection = false;
59 }
60 
61 bool PDBCommunicator::pointToInternet(PDBLoggerPtr logToMeIn, int socketFDIn, std::string& errMsg) {
62 
63  // first, connect to the backend
64  logToMe = logToMeIn;
65 
66  struct sockaddr_in cli_addr;
67  socklen_t clilen = sizeof(cli_addr);
68  bzero((char*)&cli_addr, sizeof(cli_addr));
69  logToMe->info("PDBCommunicator: about to wait for request from Internet");
70  socketFD = accept(socketFDIn, (struct sockaddr*)&cli_addr, &clilen);
71  if (socketFD < 0) {
72  logToMe->error("PDBCommunicator: could not get FD to internet socket");
73  logToMe->error(strerror(errno));
74  errMsg = "Could not get socket ";
75  errMsg += strerror(errno);
76  close(socketFD);
77  socketFD = -1;
78  return true;
79  }
80  socketClosed = false;
81  logToMe->info("PDBCommunicator: got request from Internet");
82  return false;
83 }
84 
86  int portNumber,
87  std::string serverAddress,
88  std::string& errMsg) {
89 
90  logToMe = logToMeIn;
91  // std :: cout << "################################" << std :: endl;
92  // std :: cout << "To connect to Internet server..." << std :: endl;
93  // std :: cout << "portNumber=" << portNumber << std :: endl;
94  // std :: cout << "serverAddress=" << serverAddress << std :: endl;
95  // set up the socket
96  // struct sockaddr_in serv_addr;
97  // struct hostent *server;
98  /*
99  socketFD = socket(AF_INET, SOCK_STREAM, 0);
100  std :: cout << "socketFD=" << socketFD << std :: endl;
101  if (socketFD < 0) {
102  logToMe->error("PDBCommunicator: could not get FD to internet socket");
103  logToMe->error(strerror(errno));
104  errMsg = "Could not get socket to backend ";
105  errMsg += strerror(errno);
106  return true;
107  }
108 
109  logToMe->trace("PDBCommunicator: Got internet socket");
110  logToMe->trace("PDBCommunicator: About to check the database for the host name");
111  */
112  /* CHRIS NOTE: turns out that gethostbyname () is depricated, and should be replaced */
113  // std :: cout << "Address cstring=" << serverAddress.c_str() << std :: endl;
114  // server = gethostbyname(serverAddress.c_str());
115  // std :: cout << "h_name=" << server->h_name << std :: endl;
116  // std :: cout << "h_addr_list[0]=" << server->h_addr_list[0] << std :: endl;
117  // std :: cout << "h_addr=" << server->h_addr << std :: endl;
118  /*if (server == nullptr) {
119  logToMe->error("PDBCommunicator: could not get host by name");
120  logToMe->error(strerror(errno));
121  errMsg = "Could not get host by name ";
122  errMsg += strerror(errno);
123  return true;
124  }*/
125 
126  logToMe->trace("PDBCommunicator: About to connect to the remote host");
127 
128  /*
129  bzero((char *) &serv_addr, sizeof (serv_addr));
130  serv_addr.sin_family = AF_INET;
131  bcopy((char *) server->h_addr, (char *) &serv_addr.sin_addr.s_addr, server->h_length);
132  std :: cout << "copied to address=" << (char *) inet_ntoa(serv_addr.sin_addr) << std :: endl;
133  serv_addr.sin_port = htons(portNumber);
134  if (::connect(socketFD, (struct sockaddr *) &serv_addr, sizeof (serv_addr)) < 0) {
135  logToMe->error("PDBCommunicator: could not get host by name");
136  logToMe->error(strerror(errno));
137  errMsg = "Could not connect to server ";
138  errMsg += strerror(errno);
139  return true;
140  }
141  */
142 
143  // Jia: gethostbyname() has multi-threading issue, to replace it with getaddrinfo()
144 
145  struct addrinfo hints;
146  struct addrinfo *result, *rp;
147  char port[10];
148  sprintf(port, "%d", portNumber);
149 
150  memset(&hints, 0, sizeof(struct addrinfo));
151  hints.ai_family = AF_INET;
152  hints.ai_socktype = SOCK_STREAM;
153  hints.ai_flags = 0;
154  hints.ai_protocol = 0;
155 
156  int s = getaddrinfo(serverAddress.c_str(), port, &hints, &result);
157  if (s != 0) {
158  logToMe->error("PDBCommunicator: could not get addr info");
159  logToMe->error(strerror(errno));
160  errMsg = "Could not get addr info ";
161  errMsg += strerror(errno);
162  std::cout << errMsg << std::endl;
163  socketClosed = true;
164  return true;
165  }
166 
167  bool connected = false;
168  for (rp = result; rp != NULL; rp = rp->ai_next) {
169  int count = 0;
170  while (count <= MAX_RETRIES) {
171  logToMe->trace("PDBCommunicator: creating socket....");
172  socketFD = socket(rp->ai_family, rp->ai_socktype, rp->ai_protocol);
173  if (socketFD == -1) {
174  continue;
175  }
176  if (::connect(socketFD, rp->ai_addr, rp->ai_addrlen) != -1) {
177  connected = true;
178  break;
179  }
180  count++;
181  std::cout << "Connection error, to retry..." << std::endl;
182  sleep(1);
183  close(socketFD);
184  socketFD = -1;
185  }
186  if (connected == true) {
187  break;
188  }
189  }
190 
191  if (rp == NULL) {
192  logToMe->error("PDBCommunicator: could not connect to server: address info is null");
193  logToMe->error(strerror(errno));
194  errMsg = "Could not connect to server: address info is null with ip=" + serverAddress +
195  ", and port=" + port;
196  errMsg += strerror(errno);
197  std::cout << errMsg << std::endl;
198  socketClosed = true;
199  return true;
200  }
201 
202  freeaddrinfo(result);
203  // Jia: moved automatic tear-down logic from Chris' message-based communication to here
204  // note that we need to close this up when we are done
206  isInternet = true;
207  this->portNumber = portNumber;
208  this->serverAddress = serverAddress;
209  socketClosed = false;
210  logToMe->trace("PDBCommunicator: Successfully connected to the remote host");
211  logToMe->trace("PDBCommunicator: Socket FD is " + std::to_string(socketFD));
212  /* std :: cout << "##########################" << std :: endl;
213  std :: cout << "Connected to server with port =" << portNumber <<", address =" <<
214  serverAddress << ", socket=" << socketFD << std :: endl;
215  std :: cout << "==========================" << std :: endl;
216  */
217  return false;
218 }
219 
221  needToSendDisconnectMsg = option;
222 }
223 
225  std::string fName,
226  std::string& errMsg) {
227 
228  logToMe = logToMeIn;
229  struct sockaddr_un server;
230  // TODO: add retry logic here
231  // TODO: add retry logic here
232  socketFD = socket(AF_UNIX, SOCK_STREAM, 0);
233  if (socketFD < 0) {
234  logToMe->error("PDBCommunicator: could not get FD to local server socket");
235  logToMe->error(strerror(errno));
236  errMsg = "Could not get FD to local server socket ";
237  errMsg += strerror(errno);
238  close(socketFD);
239  socketFD = -1;
240  socketClosed = true;
241  return true;
242  }
243 
244  // std :: cout << "In here!!\n";
245 
246  server.sun_family = AF_UNIX;
247  strcpy(server.sun_path, fName.c_str());
248  if (::connect(socketFD, (struct sockaddr*)&server, sizeof(struct sockaddr_un)) < 0) {
249  logToMe->error("PDBCommunicator: could not connect to local server socket");
250  logToMe->error(strerror(errno));
251  errMsg = "Could not connect to local server socket ";
252  errMsg += strerror(errno);
253  close(socketFD);
254  socketFD = -1;
255  socketClosed = true;
256  return true;
257  }
258 
259  // Jia: moved automatic tear-down logic from Chris' message-based communication to here
260  // note that we need to close this up when we are done
262  isInternet = false;
263  fileName = fName;
264  // std :: cout << "Connected!!\n";
265  socketClosed = false;
266  return false;
267 }
268 
269 bool PDBCommunicator::pointToFile(PDBLoggerPtr logToMeIn, int socketFDIn, std::string& errMsg) {
270 
271  // connect to the backend
272  logToMe = logToMeIn;
273 
274  logToMe->trace("PDBCommunicator: about to wait for request from same machine");
275  socketFD = accept(socketFDIn, 0, 0);
276  if (socketFD < 0) {
277  logToMe->error("PDBCommunicator: could not get FD to local socket");
278  logToMe->error(strerror(errno));
279  errMsg = "Could not get socket ";
280  errMsg += strerror(errno);
281  close(socketFD);
282  socketFD = -1;
283  socketClosed = true;
284  return true;
285  }
286 
287  logToMe->trace("PDBCommunicator: got request from same machine");
288  socketClosed = false;
289  return false;
290 }
291 
293 
294 // Jia: moved below logic from Chris' message-based communication to here.
295 // tell the server that we are disconnecting (note that needToSendDisconnectMsg is
296 // set to true only if we are a client and we want to close a connection to the server
297 #ifdef __APPLE__
298  if (needToSendDisconnectMsg && socketFD > 0) {
299  const UseTemporaryAllocationBlock tempBlock{1024};
300  Handle<CloseConnection> temp = makeObject<CloseConnection>();
301  logToMe->trace("PDBCommunicator: closing connection to the server");
302  std::string errMsg;
303  if (!sendObject(temp, errMsg)) {
304  logToMe->trace("PDBCommunicator: could not send close connection message");
305  }
306  }
307 
308  if (socketFD >= 0) {
309  close(socketFD);
310  socketClosed = true;
311  socketFD = -1;
312  }
313 #else
314 
315 
316  if (needToSendDisconnectMsg && socketFD >= 0) {
317  close(socketFD);
318  socketFD = -1;
319  } else if (!needToSendDisconnectMsg && socketFD >= 0) {
320  shutdown(socketFD, SHUT_WR);
321  // below logic doesn't work!
322  /*
323  char c;
324  ssize_t res = recv(socketFD, &c, 1, MSG_PEEK);
325  if (res == 0) {
326  std :: cout << "server socket closed" << std :: endl;
327  } else {
328  std :: cout << "there is some error in the socket" << std :: endl;
329  }
330  */
331  close(socketFD);
332  socketFD = -1;
333  }
334  socketClosed = true;
335 #endif
336 }
337 
339  return socketFD;
340 }
341 
343 
344  if (!readCurMsgSize) {
346  }
347  return nextTypeID;
348 }
349 
351 
352  // if we have previously gotten the size, just return it
353  if (readCurMsgSize) {
354  logToMe->debug("getSizeOfNextObject: we've done this before");
355  return msgSize;
356  }
357 
358  // make sure we got enough bytes... if we did not, then error out
359  // JIANOTE: we may not receive all the bytes at once, so we need a loop
360  int receivedBytes = 0;
361  int receivedTotal = 0;
362  int bytesToReceive = (int)(sizeof(int16_t));
363  int retries = 0;
364  while (receivedTotal < (int)(sizeof(int16_t))) {
365  if ((receivedBytes = read(socketFD,
366  (char*)((char*)(&nextTypeID) + receivedTotal * sizeof(char)),
367  bytesToReceive)) < 0) {
368  std::string errMsg =
369  std::string("PDBCommunicator: could not read next message type") + strerror(errno);
370  logToMe->error(errMsg);
371  PDB_COUT << errMsg << std::endl;
372  nextTypeID = NoMsg_TYPEID;
373  msgSize = 0;
374  close(socketFD);
375  socketFD = -1;
376  if (longConnection) {
377  // std :: cout << "############################################" << std :: endl;
378  // std :: cout << "WARNING: LONG CONNECTION CLOSED DUE TO READ ERROR" << std ::
379  // endl;
380  // std :: cout << "############################################" << std :: endl;
381  } else {
382  // std :: cout << "############################################" << std :: endl;
383  // std :: cout << "WARNING: CONNECTION CLOSED DUE TO READ ERROR" << std :: endl;
384  // std :: cout << "############################################" << std :: endl;
385  }
386  socketClosed = true;
387  return 0;
388  } else if (receivedBytes == 0) {
389  logToMe->info(
390  "PDBCommunicator: the other side closed the socket when we try to read the type");
391  nextTypeID = NoMsg_TYPEID;
392  PDB_COUT
393  << "PDBCommunicator: the other side closed the socket when we try to get next type"
394  << std::endl;
395 
396  // if (retries < MAX_RETRIES) {
397  if (retries < 0) {
398  retries++;
399  logToMe->info("PDBCommunicator: Retry to see whether network can recover");
400  PDB_COUT << "PDBCommunicator: Retry to see whether network can recover"
401  << std::endl;
402  continue;
403  } else {
404  if (longConnection) {
405  // std :: cout << "############################################" << std :: endl;
406  // std :: cout << "WARNING: LONG CONNECTION CLOSED DUE TO READ ERROR AFTER
407  // RETRY" << std :: endl;
408  // std :: cout << "############################################" << std :: endl;
409  }
410  close(socketFD);
411  socketFD = -1;
412  socketClosed = true;
413  msgSize = 0;
414  return 0;
415  }
416 
417  } else {
418  logToMe->info(std::string("PDBCommunicator: receivedBytes for reading type is ") +
419  std::to_string(receivedBytes));
420  receivedTotal = receivedTotal + receivedBytes;
421  bytesToReceive = sizeof(int16_t) - receivedTotal;
422  }
423  }
424  // now we get enough bytes
425  logToMe->trace("PDBCommunicator: typeID of next object is " + std::to_string(nextTypeID));
426  logToMe->trace("PDBCommunicator: getting the size of the next object:");
427 
428  // make sure we got enough bytes... if we did not, then error out
429  receivedBytes = 0;
430  receivedTotal = 0;
431  bytesToReceive = (int)(sizeof(size_t));
432  retries = 0;
433  while (receivedTotal < (int)(sizeof(size_t))) {
434  if ((receivedBytes = read(socketFD,
435  (char*)((char*)(&msgSize) + receivedTotal * sizeof(char)),
436  bytesToReceive)) < 0) {
437  std::string errMsg = "PDBCommunicator: could not read next message size:" +
438  std::to_string(receivedTotal) + strerror(errno);
439  logToMe->error(errMsg);
440  PDB_COUT << errMsg << std::endl;
441  close(socketFD);
442  socketFD = -1;
443  if (longConnection) {
444  // std :: cout << "############################################" << std :: endl;
445  // std :: cout << "WARNING: LONG CONNECTION CLOSED DUE TO READ ERROR" << std ::
446  // endl;
447  // std :: cout << "############################################" << std :: endl;
448  } else {
449  // std :: cout << "############################################"<< std :: endl;
450  // std :: cout << "WARNING: CONNECTION CLOSED DUE TO READ ERROR"<< std :: endl;
451  // std :: cout << "############################################" << std :: endl;
452  }
453  socketClosed = true;
454  msgSize = 0;
455  return 0;
456  } else if (receivedBytes == 0) {
457  logToMe->info(
458  "PDBCommunicator: the other side closed the socket when we try to get next size");
459  nextTypeID = NoMsg_TYPEID;
460  PDB_COUT
461  << "PDBCommunicator: the other side closed the socket when we try to get next size"
462  << std::endl;
463  // if (retries < MAX_RETRIES) {
464  if (retries < 0) {
465  retries++;
466  PDB_COUT << "PDBCommunicator: Retry to see whether network can recover"
467  << std::endl;
468  logToMe->info("PDBCommunicator: Retry to see whether network can recover");
469  continue;
470  } else {
471  if (longConnection) {
472  // std :: cout << "############################################" << std :: endl;
473  // std :: cout << "WARNING: LONG CONNECTION CLOSED DUE TO READ ERROR AFTER
474  // RETRIE" << std :: endl;
475  // std :: cout << "############################################" << std :: endl;
476  }
477  close(socketFD);
478  socketFD = -1;
479  socketClosed = true;
480  msgSize = 0;
481  return 0;
482  }
483 
484  } else {
485  logToMe->info(std::string("PDBCommunicator: receivedBytes for reading size is ") +
486  std::to_string(receivedBytes));
487  receivedTotal = receivedTotal + receivedBytes;
488  bytesToReceive = sizeof(size_t) - receivedTotal;
489  }
490  }
491  // OK, we did get enough bytes
492  logToMe->trace("PDBCommunicator: size of next object is " + std::to_string(msgSize));
493  readCurMsgSize = true;
494  return msgSize;
495 }
496 
497 bool PDBCommunicator::doTheWrite(char* start, char* end) {
498 
499  int retries = 0;
500  // and do the write
501  while (end != start) {
502 
503  // write some bytes
504  ssize_t numBytes = write(socketFD, start, end - start);
505  // make sure they went through
506  if (numBytes < 0) {
507  logToMe->error("PDBCommunicator: error in socket write");
508  logToMe->trace("PDBCommunicator: tried to write " + std::to_string(end - start) +
509  " bytes.\n");
510  logToMe->trace("PDBCommunicator: Socket FD is " + std::to_string(socketFD));
511  logToMe->error(strerror(errno));
512  // if (retries < MAX_RETRIES) {
513  if (retries < 0) {
514  retries++;
515  PDB_COUT << "PDBCommunicator: Retry to see whether network can recover"
516  << std::endl;
517  logToMe->info("PDBCommunicator: Retry to see whether network can recover");
518  continue;
519  continue;
520  } else {
521  // std :: cout << "############################################" << std :: endl;
522  // std :: cout << "WARNING: CONNECTION CLOSED DUE TO WRITE ERROR AFTER RETRY" << std
523  // :: endl;
524  // std :: cout << "############################################" << std :: endl;
525  close(socketFD);
526  socketFD = -1;
527  socketClosed = true;
528  return true;
529  }
530  } else {
531  logToMe->trace("PDBCommunicator: wrote " + std::to_string(numBytes) + " and are " +
532  std::to_string(end - start - numBytes) + " to go!");
533  start += numBytes;
534  }
535  }
536  return false;
537 }
538 
539 bool PDBCommunicator::doTheRead(char* dataIn) {
540 
541  if (!readCurMsgSize) {
543  }
544  readCurMsgSize = false;
545 
546  // now, read the rest of the bytes
547  char* start = dataIn;
548  char* cur = start;
549 
550  int retries = 0;
551  while (cur - start < (long)msgSize) {
552 
553  ssize_t numBytes = read(socketFD, cur, msgSize - (cur - start));
554  this->logToMe->trace("PDBCommunicator: received bytes: " + std::to_string(numBytes));
555 
556  if (numBytes < 0) {
557  logToMe->error(
558  "PDBCommunicator: error reading socket when trying to accept text message");
559  logToMe->error(strerror(errno));
560  close(socketFD);
561  socketFD = -1;
562  if (longConnection) {
563  // std :: cout << "############################################" << std :: endl;
564  // std :: cout << "WARNING: LONG CONNECTION CLOSED DUE TO READ ERROR" << std ::
565  // endl;
566  // std :: cout << "############################################" << std :: endl;
567  } else {
568  // std :: cout << "############################################" << std :: endl;
569  // std :: cout << "WARNING: CONNECTION CLOSED DUE TO READ ERROR" << std :: endl;
570  // std :: cout << "############################################" << std :: endl;
571  }
572  socketClosed = true;
573  return true;
574  } else if (numBytes == 0) {
575  logToMe->info("PDBCommunicator: the other side closed the socket when we do the read");
576  PDB_COUT << "PDBCommunicator: the other side closed the socket when we doTheRead"
577  << std::endl;
578  // if (retries < MAX_RETRIES) {
579  if (retries < 0) {
580  retries++;
581  logToMe->info("PDBCommunicator: Retry to see whether network can recover");
582  PDB_COUT << "PDBCommunicator: Retry to see whether network can recover"
583  << std::endl;
584  continue;
585  } else {
586  if (longConnection) {
587  // std :: cout << "############################################" << std :: endl;
588  // std :: cout << "WARNING: LONG CONNECTION IS CLOSED DUE TO READ ERROR AFTER
589  // RETRY" << std :: endl;
590  // std :: cout << "############################################" << std :: endl;
591  }
592  close(socketFD);
593  socketFD = -1;
594  socketClosed = true;
595  return true;
596  }
597  } else {
598  cur += numBytes;
599  }
600  this->logToMe->trace("PDBCommunicator: " + std::to_string(msgSize - (cur - start)) +
601  " bytes to go!");
602  }
603  return false;
604 }
605 
606 // JiaNote: add following functions to enable a stable long connection:
607 
609  return socketClosed;
610 }
611 
613  return longConnection;
614 }
615 
616 void PDBCommunicator::setLongConnection(bool longConnection) {
617  this->longConnection = longConnection;
618 }
619 
620 bool PDBCommunicator::reconnect(std::string& errMsg) {
621 
622  if (needToSendDisconnectMsg == true) {
623  // I can reconnect because I'm a client
624  PDB_COUT << "To reconnect..." << std::endl;
625 
626  if (socketFD >= 0) {
627  close(socketFD);
628  socketFD = -1;
629  socketClosed = true;
630  }
631 
632  if (isInternet == true) {
633 
635 
636  } else {
637 
638  return connectToLocalServer(logToMe, fileName, errMsg);
639  }
640 
641  } else {
642  errMsg = "Can't reconnect because I'm a server";
643  logToMe->error(errMsg);
644  return true;
645  }
646 }
647 }
648 
649 #endif
bool doTheWrite(char *start, char *end)
bool reconnect(std::string &errMsg)
bool connectToLocalServer(PDBLoggerPtr logToMeIn, std::string fName, std::string &errMsg)
bool doTheRead(char *dataIn)
void setNeedsToDisconnect(bool option)
bool sendObject(Handle< ObjType > &sendMe, std::string &errMsg)
bool connectToInternetServer(PDBLoggerPtr logToMeIn, int portNumber, std::string serverAddress, std::string &errMsg)
bool pointToFile(PDBLoggerPtr logToMeIn, int socketFDIn, std::string &errMsg)
bool pointToInternet(PDBLoggerPtr logToMeIn, int socketFDIn, std::string &errMs)
#define PDB_COUT
Definition: PDBDebug.h:31
void setLongConnection(bool longConnection)
std::shared_ptr< PDBLogger > PDBLoggerPtr
Definition: PDBLogger.h:40
#define MAX_RETRIES