A platform for high-performance distributed tool and library development written in C++. It can be deployed in two different cluster modes: standalone or distributed. API for v0.5.0, released on June 13, 2018.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
DistributionManagerClient.cc
Go to the documentation of this file.
1 /*****************************************************************************
2  * *
3  * Copyright 2018 Rice University *
4  * *
5  * Licensed under the Apache License, Version 2.0 (the "License"); *
6  * you may not use this file except in compliance with the License. *
7  * You may obtain a copy of the License at *
8  * *
9  * http://www.apache.org/licenses/LICENSE-2.0 *
10  * *
11  * Unless required by applicable law or agreed to in writing, software *
12  * distributed under the License is distributed on an "AS IS" BASIS, *
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. *
14  * See the License for the specific language governing permissions and *
15  * limitations under the License. *
16  * *
17  *****************************************************************************/
18 /*
19  * DistributionManagerClient.cc
20  *
21  * Created on: Sep 12, 2016
22  * Author: kia
23  */
24 
25 #ifndef DISTRIBUTION_MANAGER_CLIENT_CC
26 #define DISTRIBUTION_MANAGER_CLIENT_CC
27 
28 #include <chrono>
29 #include <thread>
30 #include <iostream>
31 
33 #include "NodeInfo.h"
34 #include "InterfaceFunctions.h"
35 
37 #include "GetListOfNodes.h"
38 
39 namespace pdb {
40 
42  logger = loggerIn;
43 }
44 
46  int portIn,
47  PDBLoggerPtr loggerIn) {
48  logger = loggerIn;
49  port = portIn;
50  hostname = hostnameIn;
51 };
52 
54 
56  PDBServer& forMe) { /* no handlers for a DistributionManager client!! */
57 }
58 
59 void DistributionManagerClient::sendHeartBeat(string& managerHostName,
60  int managerNodePort,
61  bool& wasError,
62  string& errMsg) {
63 
64  std::chrono::seconds interval(2); // 2 seconds
65 
66 
67  try {
68  makeObjectAllocatorBlock(1024 * 24, true);
69  Handle<NodeInfo> m_nodeInfo = makeObject<NodeInfo>();
70 
71  m_nodeInfo->setHostName(hostname);
72  m_nodeInfo->setPort(port);
73 
74  // TODO: this is temporary to check the heart beat functionality and implement the timer
75  // inside PDBServer.
76  while (true) {
77 
78  // First build a new connection to the Server
79  PDBCommunicator myCommunicator;
80  if (myCommunicator.connectToInternetServer(
81  logger, managerNodePort, managerHostName, errMsg)) {
82  logger->error("[DistributionManagerClient] - Error when connecting to server: " +
83  errMsg);
84 
85  // try to connect again after awhile.
86  } else {
87 
88  // send the object ovber
89  if (!myCommunicator.sendObject(m_nodeInfo, errMsg)) {
90  logger->error(
91  "[DistributionManagerClient] - HeartBeat Client: Sending nodeInfo "
92  "object: " +
93  errMsg);
94  // try to connect and send the object again.
95  }
96  }
97  // sleep for the time interval and send it again.
98  std::this_thread::sleep_for(interval);
99  }
100 
101  } catch (NotEnoughSpace& e) {
102  logger->error("[DistributionManagerClient] - Not enough memory");
103  }
104 }
105 
107  int managerNodePort,
108  bool& wasError,
109  string& errMsg) {
110  // First build a new connection to the Server
111  PDBCommunicator myCommunicator;
112  if (myCommunicator.connectToInternetServer(logger, managerNodePort, managerHostName, errMsg)) {
113  logger->error("[DistributionManagerClient] - Error when connecting to server: " + errMsg);
114  wasError = true;
115  return nullptr;
116  }
117  makeObjectAllocatorBlock(1024, true);
118 
119  try {
120  Handle<GetListOfNodes> requestToGetListOfNodes = makeObject<GetListOfNodes>();
121 
122  // send the object over
123  if (!myCommunicator.sendObject(requestToGetListOfNodes, errMsg)) {
124  logger->error("[DistributionManagerClient] - HeartBeat Client: Sending nodeInfo object: " + errMsg);
125  // try to connect and send the object again.
126  wasError = true;
127  return nullptr;
128  }
129 
130  } catch (NotEnoughSpace& e) {
131  logger->error("[DistributionManagerClient] - Not enough memory");
132  }
133 
134  // Get the response from the server.
135  bool success;
136  Handle<ListOfNodes> response = myCommunicator.getNextObject<ListOfNodes>(success, errMsg);
137  if (!success) {
138  logger->error(
139  "[DistributionManagerClient] - getCurrentNodes Error when connecting to server: " +
140  errMsg);
141  return nullptr;
142  }
143 
144  return response;
145 }
146 
147 // bool DistributionManagerClient::shutDownServer(std::string &errMsg) {
148 //
149 // return simpleRequest<ShutDown, SimpleRequestResult, bool>(logger, port, address, false, 1024,
150 //[&] (Handle <SimpleRequestResult> result) {
151 // if (result != nullptr) {
152 // if (!result->getRes ().first) {
153 // errMsg = "Error shutting down server: " + result->getRes ().second;
154 // myLogger->error ("Error shutting down server: " + result->getRes ().second);
155 // return false;
156 // }
157 // return true;
158 // }
159 // errMsg = "Error getting type name: got nothing back from catalog";
160 // return false;});
161 //}
162 
164  string& hostName,
165  int managerNodePort,
166  pdb::Handle<QueryPermit> m_queryPermit,
167  bool& wasError,
168  string& errMsg) {
169 
170  // First build a new connection to the Server
171  PDBCommunicator myCommunicator;
172 
173  if (myCommunicator.connectToInternetServer(logger, managerNodePort, hostName, errMsg)) {
174  logger->error("Error when connecting to server: " + errMsg);
175  wasError = true;
176  return nullptr;
177  }
178 
179  // send QueryPermit object over the socket
180  if (!myCommunicator.sendObject(m_queryPermit, errMsg)) {
181  logger->error("sendQueryPermitt Client: Sending QueryPermit object: " + errMsg);
182  wasError = true;
183  return nullptr;
184  }
185 
186  bool success;
187 
189  myCommunicator.getNextObject<QueryPermitResponse>(success, errMsg);
190 
191  if (!success) {
192  logger->error("sendQueryPermitt Error when connecting to server: " + errMsg);
193  return nullptr;
194  }
195 
196  logger->trace("Got back From Server Query ID : " + string(response->getQueryId()));
197 
198  return response;
199 }
200 
202  int managerNodePort,
203  Handle<QueryDone> m_queryDone,
204  bool& wasError,
205  string& errMsg) {
206 
207  // First build a new connection to the Server
208  PDBCommunicator myCommunicator;
209 
210  if (myCommunicator.connectToInternetServer(logger, managerNodePort, hostName, errMsg)) {
211  logger->error("Error when connecting to server: " + errMsg);
212  wasError = true;
213  return nullptr;
214  }
215 
216  // send QueryPermit object over the socket
217  if (!myCommunicator.sendObject(m_queryDone, errMsg)) {
218  logger->error("sendQueryDone Client: Sending QueryPermit object: " + errMsg);
219  wasError = true;
220  return nullptr;
221  }
222 
223  bool success;
224  pdb::Handle<Ack> response = myCommunicator.getNextObject<Ack>(success, errMsg);
225 
226  if (!success) {
227  logger->error("PDBDistributionManagerClient -sendQueryDone no ack received: " + errMsg);
228  return nullptr;
229  }
230 
231  logger->trace("Got back From Server Query ID : " + string(response->getInfo()));
232  return response;
233 }
234 
236  string& manangerNodeHostName,
237  int managerNodePort,
238  Handle<PlaceOfQueryPlanner> m_PlaceOfQueryPlanner,
239  bool& wasError,
240  string& errMsg) {
241 
242  // First build a new connection to the Server
243  PDBCommunicator myCommunicator;
244 
245  if (myCommunicator.connectToInternetServer(
246  logger, managerNodePort, manangerNodeHostName, errMsg)) {
247  logger->error("Error when connecting to server: " + errMsg);
248  wasError = true;
249  return nullptr;
250  }
251 
252  // send QueryPermit object over the socket
253  if (!myCommunicator.sendObject(m_PlaceOfQueryPlanner, errMsg)) {
254  logger->error("sendQueryDone Client: Sending QueryPermit object: " + errMsg);
255  wasError = true;
256  return nullptr;
257  }
258 
259  bool success;
260  // get the next object
261  pdb::Handle<Ack> response = myCommunicator.getNextObject<Ack>(success, errMsg);
262 
263  if (!success) {
264  logger->error("ERROR sendQueryPermitt: Uh oh. The type is not what I expected!!\n");
265  }
266 
267  logger->trace("sendGetPlaceOfQueryPlanner: Got back From Server Query ID : " +
268  string(response->getInfo()));
269 
270  return response;
271 }
272 }
273 
274 #endif
virtual void registerHandlers(PDBServer &forMe) override
bool sendObject(Handle< ObjType > &sendMe, std::string &errMsg)
bool connectToInternetServer(PDBLoggerPtr logToMeIn, int portNumber, std::string serverAddress, std::string &errMsg)
Handle< ObjType > getNextObject(void *readToHere, bool &success, std::string &errMsg)
void sendHeartBeat(string &managerHostName, int managerNodePort, bool &wasError, string &errMsg)
Handle< Ack > sendGetPlaceOfQueryPlanner(string &hostName, int managerNodePort, Handle< PlaceOfQueryPlanner > m_PlaceOfQueryPlanner, bool &wasError, string &errMsg)
std::shared_ptr< PDBLogger > PDBLoggerPtr
Definition: PDBLogger.h:40
void makeObjectAllocatorBlock(size_t numBytesIn, bool throwExceptionOnFail)
Handle< QueryPermitResponse > sendQueryPermitt(string &hostName, int managerNodePort, pdb::Handle< QueryPermit > m_queryPermit, bool &wasError, string &errMsg)
Handle< Ack > sendQueryDone(string &hostName, int managerNodePort, Handle< QueryDone > m_queryDone, bool &wasError, string &errMsg)
Handle< ListOfNodes > getCurrentNodes(string &managerHostName, int managerNodePort, bool &wasError, string &errMsg)
Definition: Ack.h:32