18 #ifndef OBJECTQUERYMODEL_BROADCASTSERVERTEMPLATE_CC
19 #define OBJECTQUERYMODEL_BROADCASTSERVERTEMPLATE_CC
34 #define HEADER_SIZE 20
40 template <
class MsgType,
class PayloadType,
class ResponseType>
43 std::vector<std::string> receivers,
45 std::function<
void(std::string, std::string)> errorCallBack) {
48 auto failures = make_shared<std::vector<std::pair<std::string, std::string>>>();
55 std::cout <<
"Error broadcast " << errors <<
" to " << serverName << std::endl;
59 PDB_COUT <<
"Successful broadcast " << success <<
" to " << serverName << std::endl;
64 for (
int i = 0; i < receivers.size(); i++) {
67 std::string serverName = receivers[i];
71 PDB_COUT <<
"Broadcasting to " << serverName << std::endl;
73 size_t pos = serverName.find(
":");
74 if (pos != string::npos) {
75 port = stoi(serverName.substr(pos + 1, serverName.size()));
76 address = serverName.substr(0, pos);
78 if (
conf !=
nullptr) {
79 port =
conf->getPort();
86 PDBWorkPtr myWork = make_shared<GenericWork>([i,
95 PDB_COUT <<
"the " << i <<
"-th thread is started" << std::endl;
102 int portNumber = port;
103 std::string serverAddress = address;
108 PDB_COUT << i <<
":port = " << portNumber << std::endl;
109 PDB_COUT << i <<
":address = " << serverAddress << std::endl;
110 if (communicator->connectToInternetServer(
111 this->logger, portNumber, serverAddress, errMsg)) {
112 PDB_COUT << i <<
":connectToInternetServer: " << errMsg << std::endl;
115 PDB_COUT <<
"to retry to resend message" << std::endl;
120 errorCallBack(errMsg, serverName);
126 PDB_COUT << i <<
":connected to server: " << serverAddress << std::endl;
128 deepCopyToCurrentAllocationBlock<MsgType>(broadcastMsg);
129 if (!communicator->sendObject<MsgType>(broadcastMsgCopy, errMsg)) {
130 PDB_COUT << i <<
":sendObject: " << errMsg << std::endl;
133 PDB_COUT <<
"to retry to resend message" << std::endl;
136 errorCallBack(errMsg, serverName);
141 PDB_COUT << i <<
":send object to server: " << serverAddress << std::endl;
142 if (broadcastData !=
nullptr) {
144 deepCopyToCurrentAllocationBlock<Vector<Handle<PayloadType>>>(
146 if (!communicator->sendObject(payloadCopy, errMsg)) {
147 PDB_COUT << i <<
":sendBytes: " << errMsg << std::endl;
150 PDB_COUT <<
"to retry to resend message" << std::endl;
153 errorCallBack(errMsg, serverName);
160 size_t objectSize = communicator->getSizeOfNextObject();
162 std::cout <<
"received size is too small for an object" << std::endl;
165 PDB_COUT <<
"to retry to resend message" << std::endl;
168 errorCallBack(errMsg, serverName);
176 communicator->getNextObject<ResponseType>(err, errMsg);
177 if (result ==
nullptr) {
178 PDB_COUT <<
"the " << i <<
"-th thread connection closed unexpectedly"
182 PDB_COUT <<
"to retry to resend message" << std::endl;
185 errorCallBack(errMsg, serverName);
190 callBack(result, serverName);
191 PDB_COUT <<
"the " << i <<
"-th thread finished" << std::endl;
196 myWorker->execute(myWork, buzzer);
198 while (errors + success < receivers.size()) {
201 PDB_COUT <<
"all broadcasting thread returns" << std::endl;
205 template <
class DataType>
208 (*newObject) = (*original);
shared_ptr< PDBWork > PDBWorkPtr
std::shared_ptr< PDBCommunicator > PDBCommunicatorPtr
void broadcast(Handle< MsgType > broadcastMsg, Handle< Vector< Handle< PayloadType >>> broadCastData, std::vector< std::string > receivers, std::function< void(Handle< ResponseType >, std::string)> successCallBack, std::function< void(std::string, std::string)> errorCallBack=[](std::string errMsg, std::string serverName){})
Handle< DataType > deepCopy(const Handle< DataType > &original)
shared_ptr< PDBBuzzer > PDBBuzzerPtr
shared_ptr< PDBWorker > PDBWorkerPtr
pthread_mutex_t connection_mutex