A platform for high-performance distributed tool and library development written in C++. It can be deployed in two different cluster modes: standalone or distributed. API for v0.5.0, released on June 13, 2018.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
CommunicatorTemplates.cc
Go to the documentation of this file.
1 /*****************************************************************************
2  * *
3  * Copyright 2018 Rice University *
4  * *
5  * Licensed under the Apache License, Version 2.0 (the "License"); *
6  * you may not use this file except in compliance with the License. *
7  * You may obtain a copy of the License at *
8  * *
9  * http://www.apache.org/licenses/LICENSE-2.0 *
10  * *
11  * Unless required by applicable law or agreed to in writing, software *
12  * distributed under the License is distributed on an "AS IS" BASIS, *
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. *
14  * See the License for the specific language governing permissions and *
15  * limitations under the License. *
16  * *
17  *****************************************************************************/
18 
19 
20 #ifndef PDB_COMMUN_TEMPLATES_C
21 #define PDB_COMMUN_TEMPLATES_C
22 
23 #include "PDBDebug.h"
24 #include "BuiltInObjectTypeIDs.h"
25 #include "Handle.h"
26 #include <iostream>
27 #include <netdb.h>
28 #include <netinet/in.h>
29 #include "Object.h"
30 #include "PDBVector.h"
31 #include "CloseConnection.h"
32 #include "InterfaceFunctions.h"
33 #include "PDBCommunicator.h"
35 #include "Configuration.h"
36 #include <stdio.h>
37 #include <sys/socket.h>
38 #include <sys/types.h>
39 #include <sys/uio.h>
40 #include <sys/un.h>
41 #include <netinet/in.h>
42 #include <unistd.h>
43 
44 namespace pdb {
45 
46 template <class ObjType>
47 bool PDBCommunicator::sendObject(Handle<ObjType>& sendMe, std::string& errMsg) {
48 
49  // first, write the record type
50  int16_t recType = getTypeID<ObjType>();
51  if (recType < 0) {
52  std::cout << "Fatal Error: BAD! Trying to send a handle to a non-Object type.\n";
53  logToMe->error("Fatal Error: BAD! Trying to send a handle to a non-Object type.\n");
54  exit(1);
55  }
56  if (doTheWrite(((char*)&recType), ((char*)&recType) + sizeof(int16_t))) {
57  errMsg = "PDBCommunicator: not able to send the object type";
58  logToMe->error(errMsg);
59  logToMe->error(strerror(errno));
60  std::cout << errMsg << strerror(errno) << std::endl;
61  return false;
62  }
63  // next, write the object
64  auto* record = getRecord(sendMe);
65 
66  void* mem = nullptr;
67  if (record == nullptr) {
68  // JiaNote: below is refactored to make it more flexible.
69  // If sendMe is not in this thread's allocator block, we do a deep copy
70  mem = (void*)calloc(DEFAULT_PAGE_SIZE, sizeof(char));
71  record = getRecord(sendMe, mem, DEFAULT_PAGE_SIZE);
72  if (record == nullptr) {
73  int* a = 0;
74  *a = 12;
75  std::cout << "Fatal Error: BAD! Trying to get a record for an object not created by "
76  "this thread's allocator.\n";
77  logToMe->error(
78  "Fatal Error: BAD! Trying to get a record for an object not created by this "
79  "thread's allocator.\n");
80  if (mem != nullptr) {
81  free(mem);
82  }
83  exit(1);
84  }
85  }
86  if (doTheWrite((char*)record, ((char*)record) + record->numBytes())) {
87  PDB_COUT << "recType=" << recType << std::endl;
88  errMsg = "PDBCommunicator: not able to send the object size";
89  std::cout << errMsg << std::endl;
90  std::cout << strerror(errno) << std::endl;
91  logToMe->error(errMsg);
92  logToMe->error(strerror(errno));
93  if (mem != nullptr) {
94  free(mem);
95  }
96  return false;
97  }
98  if (mem != nullptr) {
99  free(mem);
100  }
101 
102  PDB_COUT << "Sent object with typeName=" << getTypeName<ObjType>() << ", recType=" << recType
103  << " and socketFD=" << socketFD << std::endl;
104  logToMe->info(std::string("Sent object with typeName=") + getTypeName<ObjType>() +
105  std::string(", recType=") + std::to_string(recType) +
106  std::string(" and socketFD=") + std::to_string(socketFD));
107  return true;
108 }
109 
110 inline bool PDBCommunicator::receiveBytes(void* data, std::string& errMsg) {
111 
112  // if we have previously gotten the size, just return it
113  if (!readCurMsgSize) {
115  }
116 
117  // the first few bytes of a record always record the size
118  char* mem = (char*)data;
119 
120  if (doTheRead(mem)) {
121  errMsg = "Could not read the next object coming over the wire";
122  readCurMsgSize = false;
123  return false;
124  }
125 
126  return true;
127 }
128 
129 inline bool PDBCommunicator::sendBytes(void* data, size_t sizeOfBytes, std::string& errMsg) {
130 
131  int16_t recType = NoMsg_TYPEID;
132  if (doTheWrite(((char*)&recType), ((char*)&recType) + sizeof(int16_t))) {
133  errMsg = "PDBCommunicator: not able to send the object type";
134  logToMe->error(errMsg);
135  logToMe->error(strerror(errno));
136  return false;
137  }
138 
139  // now write the size
140  if (doTheWrite(((char*)&sizeOfBytes), ((char*)&sizeOfBytes) + sizeof(size_t))) {
141  errMsg = "PDBCommunicator: not able to send the object size";
142  logToMe->error(errMsg);
143  logToMe->error(strerror(errno));
144  return false;
145  }
146 
147  // now we put the actual bytes
148  if (doTheWrite(((char*)data), ((char*)data) + sizeOfBytes)) {
149  errMsg = "PDBCommunicator: not able to send the bytes";
150  logToMe->error(errMsg);
151  logToMe->error(strerror(errno));
152  return false;
153  }
154 
155  return true;
156 }
157 
158 
159 template <class ObjType>
161  bool& success,
162  std::string& errMsg) {
163 
164  // if we have previously gotten the size, just return it
165  if (!readCurMsgSize) {
167  logToMe->debug(std::string("run getSizeOfNextObject() and get type=") +
168  std::to_string(nextTypeID) + std::string(" and size=") +
169  std::to_string(msgSize));
170  } else {
171  logToMe->debug(std::string("get size info directly with type=") +
172  std::to_string(nextTypeID) + std::string(" and size=") +
173  std::to_string(msgSize));
174  }
175 
176  if (msgSize == 0) {
177  success = false;
178  errMsg = "Could not read the the object size";
179  std::cout << "PDBCommunicator: can not get message size, the connection is possibly closed "
180  "by the other side"
181  << std::endl;
182  logToMe->error(
183  "PDBCommunicator: can not get message size, the connection is possibly closed by the "
184  "other side");
185  return nullptr;
186  }
187 
188 
189  // the first few bytes of a record always record the size
190  char* mem = (char*)readToHere;
191  *((size_t*)mem) = msgSize;
192  // now we read the rest
193  mem += sizeof(size_t);
194  msgSize -= sizeof(size_t);
195 
196  if (doTheRead(mem)) {
197  errMsg = "Could not read the next object coming over the wire";
198  success = false;
199  readCurMsgSize = false;
200  return nullptr;
201  }
202 
203  // create an object and get outta here
204  success = true;
205  logToMe->trace("PDBCommunicator: read the object with no problem.");
206  logToMe->trace("PDBCommunicator: root offset is " +
207  std::to_string(((Record<ObjType>*)readToHere)->rootObjectOffset()));
208  readCurMsgSize = false;
209  Handle<ObjType> request = ((Record<ObjType>*)readToHere)->getRootObject();
210  return request;
211 }
212 
213 template <class ObjType>
214 Handle<ObjType> PDBCommunicator::getNextObject(bool& success, std::string& errMsg) {
215 
216  // if we have previously gotten the size, just return it
217  if (!readCurMsgSize) {
219  logToMe->debug(std::string("run getSizeOfNextObject() and get type=") +
220  std::to_string(nextTypeID) + std::string(" and size=") +
221  std::to_string(msgSize));
222  } else {
223  logToMe->debug(std::string("get size info directly with type=") +
224  std::to_string(nextTypeID) + std::string(" and size=") +
225  std::to_string(msgSize));
226  }
227  if (msgSize == 0) {
228  success = false;
229  errMsg = "Could not read the object size";
230  std::cout << "PDBCommunicator: can not get message size, the connection is possibly closed "
231  "by the other side"
232  << std::endl;
233  logToMe->error(
234  "PDBCommunicator: can not get message size, the connection is possibly closed by the "
235  "other side");
236  return nullptr;
237  }
238  // read in the object
239  void* mem = malloc(msgSize);
240  if (mem == nullptr) {
241  PDB_COUT << "nextTypeId = " << nextTypeID << std::endl;
242  PDB_COUT << "msgSize = " << msgSize << std::endl;
243  PDB_COUT << "memory is failed to allocate for getNextObject()" << std::endl;
244  exit(-1);
245  }
246  Handle<ObjType> temp = getNextObject<ObjType>(mem, success, errMsg);
247  UseTemporaryAllocationBlock myBlock{msgSize + 4 * 1024 * 1024};
248  // if we were successful, then copy it to the current allocation block
249  if (success) {
250  logToMe->trace("PDBCommunicator: about to do the deep copy.");
251  // std :: cout << "to get handle by deep copy to current block" << std :: endl;
253  // std :: cout << "got handle" << std :: endl;
254  logToMe->trace("PDBCommunicator: completed the deep copy.");
255  free(mem);
256  return temp;
257  } else {
258  free(mem);
259  return nullptr;
260  }
261 }
262 }
263 
264 #endif
bool doTheWrite(char *start, char *end)
Handle< TargetType > deepCopyToCurrentAllocationBlock(Handle< TargetType > &copyMe)
bool doTheRead(char *dataIn)
bool sendObject(Handle< ObjType > &sendMe, std::string &errMsg)
bool sendBytes(void *data, size_t size, std::string &errMsg)
Handle< ObjType > getNextObject(void *readToHere, bool &success, std::string &errMsg)
bool receiveBytes(void *data, std::string &errMsg)
Record< ObjType > * getRecord(Handle< ObjType > &forMe)
#define PDB_COUT
Definition: PDBDebug.h:31
#define DEFAULT_PAGE_SIZE
Definition: Configuration.h:36