A platform for high-performance distributed tool and library development written in C++. It can be deployed in two different cluster modes: standalone or distributed. API for v0.5.0, released on June 13, 2018.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
BroadcastServerTemplate.cc
Go to the documentation of this file.
1 /*****************************************************************************
2  * *
3  * Copyright 2018 Rice University *
4  * *
5  * Licensed under the Apache License, Version 2.0 (the "License"); *
6  * you may not use this file except in compliance with the License. *
7  * You may obtain a copy of the License at *
8  * *
9  * http://www.apache.org/licenses/LICENSE-2.0 *
10  * *
11  * Unless required by applicable law or agreed to in writing, software *
12  * distributed under the License is distributed on an "AS IS" BASIS, *
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. *
14  * See the License for the specific language governing permissions and *
15  * limitations under the License. *
16  * *
17  *****************************************************************************/
18 #ifndef OBJECTQUERYMODEL_BROADCASTSERVERTEMPLATE_CC
19 #define OBJECTQUERYMODEL_BROADCASTSERVERTEMPLATE_CC
20 
21 #include "PDBDebug.h"
22 #include "BroadcastServer.h"
23 #include "ResourceManagerServer.h"
24 #include "PDBWorker.h"
25 #include "PDBWork.h"
26 #include "GenericWork.h"
28 
29 
30 #ifndef MAX_RETRIES
31 #define MAX_RETRIES 5
32 #endif
33 #ifndef HEADER_SIZE
34 #define HEADER_SIZE 20
35 #endif
36 
37 namespace pdb {
38 
39 
40 template <class MsgType, class PayloadType, class ResponseType>
42  Handle<Vector<Handle<PayloadType>>> broadcastData,
43  std::vector<std::string> receivers,
44  std::function<void(Handle<ResponseType>, std::string)> callBack,
45  std::function<void(std::string, std::string)> errorCallBack) {
46  int errors = 0;
47  int success = 0;
48  auto failures = make_shared<std::vector<std::pair<std::string, std::string>>>();
49 
50  PDBBuzzerPtr buzzer = make_shared<PDBBuzzer>([&](PDBAlarm myAlarm, std::string serverName) {
51  lock.lock();
52  // Handle the error cases here
53  if (myAlarm == PDBAlarm::GenericError) {
54  errors++;
55  std::cout << "Error broadcast " << errors << " to " << serverName << std::endl;
56  lock.unlock();
57  } else {
58  success++;
59  PDB_COUT << "Successful broadcast " << success << " to " << serverName << std::endl;
60  lock.unlock();
61  }
62  });
63 
64  for (int i = 0; i < receivers.size(); i++) {
65  PDBWorkerPtr myWorker = getWorker();
66 
67  std::string serverName = receivers[i];
68  int port;
69  std::string address;
70 
71  PDB_COUT << "Broadcasting to " << serverName << std::endl;
72 
73  size_t pos = serverName.find(":");
74  if (pos != string::npos) {
75  port = stoi(serverName.substr(pos + 1, serverName.size()));
76  address = serverName.substr(0, pos);
77  } else {
78  if (conf != nullptr) {
79  port = conf->getPort();
80  } else {
81  port = 8108;
82  }
83  address = serverName;
84  }
85 
86  PDBWorkPtr myWork = make_shared<GenericWork>([i,
87  serverName,
88  port,
89  address,
90  this,
91  &errorCallBack,
92  &broadcastMsg,
93  &broadcastData,
94  &callBack](PDBBuzzerPtr callerBuzzer) {
95  PDB_COUT << "the " << i << "-th thread is started" << std::endl;
96 
97  int retries = 0;
98 
99  while (retries <= MAX_RETRIES) {
100 
101  std::string errMsg;
102  int portNumber = port;
103  std::string serverAddress = address;
104 
105  // socket() is not thread-safe, so we need synchronize here
106  pthread_mutex_lock(&connection_mutex);
107  PDBCommunicatorPtr communicator = std::make_shared<PDBCommunicator>();
108  PDB_COUT << i << ":port = " << portNumber << std::endl;
109  PDB_COUT << i << ":address = " << serverAddress << std::endl;
110  if (communicator->connectToInternetServer(
111  this->logger, portNumber, serverAddress, errMsg)) {
112  PDB_COUT << i << ":connectToInternetServer: " << errMsg << std::endl;
113  if (retries < MAX_RETRIES) {
114  retries++;
115  PDB_COUT << "to retry to resend message" << std::endl;
116  pthread_mutex_unlock(&connection_mutex);
117  continue;
118  } else {
119  pthread_mutex_unlock(&connection_mutex);
120  errorCallBack(errMsg, serverName);
121  callerBuzzer->buzz(PDBAlarm::GenericError, serverName);
122  return;
123  }
124  }
125  pthread_mutex_unlock(&connection_mutex);
126  PDB_COUT << i << ":connected to server: " << serverAddress << std::endl;
127  Handle<MsgType> broadcastMsgCopy =
128  deepCopyToCurrentAllocationBlock<MsgType>(broadcastMsg);
129  if (!communicator->sendObject<MsgType>(broadcastMsgCopy, errMsg)) {
130  PDB_COUT << i << ":sendObject: " << errMsg << std::endl;
131  if (retries < MAX_RETRIES) {
132  retries++;
133  PDB_COUT << "to retry to resend message" << std::endl;
134  continue;
135  } else {
136  errorCallBack(errMsg, serverName);
137  callerBuzzer->buzz(PDBAlarm::GenericError, serverName);
138  return;
139  }
140  }
141  PDB_COUT << i << ":send object to server: " << serverAddress << std::endl;
142  if (broadcastData != nullptr) {
143  Handle<Vector<Handle<PayloadType>>> payloadCopy =
144  deepCopyToCurrentAllocationBlock<Vector<Handle<PayloadType>>>(
145  broadcastData);
146  if (!communicator->sendObject(payloadCopy, errMsg)) {
147  PDB_COUT << i << ":sendBytes: " << errMsg << std::endl;
148  if (retries < MAX_RETRIES) {
149  retries++;
150  PDB_COUT << "to retry to resend message" << std::endl;
151  continue;
152  } else {
153  errorCallBack(errMsg, serverName);
154  callerBuzzer->buzz(PDBAlarm::GenericError, serverName);
155  return;
156  }
157  }
158  }
159 
160  size_t objectSize = communicator->getSizeOfNextObject();
161  if (objectSize < HEADER_SIZE) {
162  std::cout << "received size is too small for an object" << std::endl;
163  if (retries < MAX_RETRIES) {
164  retries++;
165  PDB_COUT << "to retry to resend message" << std::endl;
166  continue;
167  } else {
168  errorCallBack(errMsg, serverName);
169  callerBuzzer->buzz(PDBAlarm::GenericError, serverName);
170  return;
171  }
172  }
173  const UseTemporaryAllocationBlock myBlock{objectSize};
174  bool err;
175  Handle<ResponseType> result =
176  communicator->getNextObject<ResponseType>(err, errMsg);
177  if (result == nullptr) {
178  PDB_COUT << "the " << i << "-th thread connection closed unexpectedly"
179  << std::endl;
180  if (retries < MAX_RETRIES) {
181  retries++;
182  PDB_COUT << "to retry to resend message" << std::endl;
183  continue;
184  } else {
185  errorCallBack(errMsg, serverName);
186  callerBuzzer->buzz(PDBAlarm::GenericError, serverName);
187  return;
188  }
189  }
190  callBack(result, serverName);
191  PDB_COUT << "the " << i << "-th thread finished" << std::endl;
192  callerBuzzer->buzz(PDBAlarm::WorkAllDone, serverName);
193  return;
194  }
195  });
196  myWorker->execute(myWork, buzzer);
197  }
198  while (errors + success < receivers.size()) {
199  buzzer->wait();
200  }
201  PDB_COUT << "all broadcasting thread returns" << std::endl;
202 }
203 
204 
205 template <class DataType>
207  Handle<DataType> newObject = makeObject<DataType>();
208  (*newObject) = (*original);
209  return newObject;
210 }
211 }
212 
213 
214 #endif
#define HEADER_SIZE
shared_ptr< PDBWork > PDBWorkPtr
Definition: PDBWork.h:47
std::shared_ptr< PDBCommunicator > PDBCommunicatorPtr
#define MAX_RETRIES
void broadcast(Handle< MsgType > broadcastMsg, Handle< Vector< Handle< PayloadType >>> broadCastData, std::vector< std::string > receivers, std::function< void(Handle< ResponseType >, std::string)> successCallBack, std::function< void(std::string, std::string)> errorCallBack=[](std::string errMsg, std::string serverName){})
Handle< DataType > deepCopy(const Handle< DataType > &original)
shared_ptr< PDBBuzzer > PDBBuzzerPtr
Definition: PDBBuzzer.h:32
#define PDB_COUT
Definition: PDBDebug.h:31
shared_ptr< PDBWorker > PDBWorkerPtr
Definition: PDBWorker.h:40
PDBAlarm
Definition: PDBAlarm.h:28
pthread_mutex_t connection_mutex
ConfigurationPtr conf