A platform for high-performance distributed tool and library development written in C++. It can be deployed in two different cluster modes: standalone or distributed. API for v0.5.0, released on June 13, 2018.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
SimpleSendBytesRequest.cc
Go to the documentation of this file.
1 /*****************************************************************************
2  * *
3  * Copyright 2018 Rice University *
4  * *
5  * Licensed under the Apache License, Version 2.0 (the "License"); *
6  * you may not use this file except in compliance with the License. *
7  * You may obtain a copy of the License at *
8  * *
9  * http://www.apache.org/licenses/LICENSE-2.0 *
10  * *
11  * Unless required by applicable law or agreed to in writing, software *
12  * distributed under the License is distributed on an "AS IS" BASIS, *
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. *
14  * See the License for the specific language governing permissions and *
15  * limitations under the License. *
16  * *
17  *****************************************************************************/
18 
19 #ifndef SIMPLE_SEND_BYTES_REQUEST_CC
20 #define SIMPLE_SEND_BYTES_REQUEST_CC
21 
22 #include "InterfaceFunctions.h"
24 
25 #ifndef MAX_RETRIES
26 #define MAX_RETRIES 5
27 #endif
28 #ifndef HEADER_SIZE
29 #define HEADER_SIZE 20
30 #endif
31 
32 namespace pdb {
33 
34 template <class RequestType, class ResponseType, class ReturnType, class... RequestTypeParams>
36  int port,
37  std::string address,
38  ReturnType onErr,
39  size_t bytesForRequest,
40  function<ReturnType(Handle<ResponseType>)> processResponse,
41  char* bytes,
42  size_t numBytes,
43  RequestTypeParams&&... args) {
44 
45  int retries = 0;
46 
47  while (retries <= MAX_RETRIES) {
48 
49  PDBCommunicator temp;
50  string errMsg;
51  bool success;
52 
53  if (temp.connectToInternetServer(myLogger, port, address, errMsg)) {
54  myLogger->error(errMsg);
55  myLogger->error("simpleSendDataRequest: not able to connect to server.\n");
56  std::cout << "ERROR: can't connect to remote server with port =" << port
57  << " and address =" << address << std::endl;
58  return onErr;
59  }
60 
61  // build the request
62  if (bytesForRequest < HEADER_SIZE) {
63  std::cout << "ERROR: block size is too small" << std::endl;
64  return onErr;
65  }
66  const UseTemporaryAllocationBlock tempBlock{bytesForRequest};
67  Handle<RequestType> request = makeObject<RequestType>(args...);
68  if (!temp.sendObject(request, errMsg)) {
69  myLogger->error(errMsg);
70  myLogger->error("simpleSendDataRequest: not able to send request to server.\n");
71  if (retries < MAX_RETRIES) {
72  retries++;
73  continue;
74  } else {
75  return onErr;
76  }
77  }
78  // now, send the bytes
79  if (!temp.sendBytes(bytes, numBytes, errMsg)) {
80  myLogger->error(errMsg);
81  myLogger->error("simpleSendDataRequest: not able to send data to server.\n");
82  if (retries < MAX_RETRIES) {
83  retries++;
84  continue;
85  } else {
86  return onErr;
87  }
88  }
89 
90  // get the response and process it
91  size_t objectSize = temp.getSizeOfNextObject();
92  if (objectSize == 0) {
93  myLogger->error("simpleRequest: not able to get next object size");
94  std::cout << "simpleRequest: not able to get next object size" << std::endl;
95  if (retries < MAX_RETRIES) {
96  retries++;
97  continue;
98  } else {
99  return onErr;
100  }
101  }
102  void* memory = malloc(objectSize);
103  if (memory == nullptr) {
104  myLogger->error(std::string("FATAL ERROR: not able to allocate memory with size=") +
105  std::to_string(objectSize));
106  std::cout << "FATAL ERROR: not able to allocate memory" << std::endl;
107  exit(-1);
108  }
109  ReturnType finalResult;
110  {
111  Handle<ResponseType> result = temp.getNextObject<ResponseType>(memory, success, errMsg);
112  if (!success) {
113  myLogger->error(errMsg);
114  myLogger->error("simpleRequest: not able to get next object over the wire.\n");
115  // JiaNote: we need free memory here!!!
116  free(memory);
117  if (retries < MAX_RETRIES) {
118  retries++;
119  continue;
120  } else {
121  return onErr;
122  }
123  }
124 
125  finalResult = processResponse(result);
126  }
127 
128  free(memory);
129  return finalResult;
130  }
131  return onErr;
132 }
133 }
134 #endif
#define HEADER_SIZE
#define MAX_RETRIES
bool sendObject(Handle< ObjType > &sendMe, std::string &errMsg)
bool connectToInternetServer(PDBLoggerPtr logToMeIn, int portNumber, std::string serverAddress, std::string &errMsg)
bool sendBytes(void *data, size_t size, std::string &errMsg)
Handle< ObjType > getNextObject(void *readToHere, bool &success, std::string &errMsg)
ReturnType simpleSendBytesRequest(PDBLoggerPtr myLogger, int port, std::string address, ReturnType onErr, size_t bytesForRequest, function< ReturnType(Handle< ResponseType >)> processResponse, char *bytes, size_t numBytes, RequestTypeParams &&...args)
std::shared_ptr< PDBLogger > PDBLoggerPtr
Definition: PDBLogger.h:40