A platform for high-performance distributed tool and library development written in C++. It can be deployed in two different cluster modes: standalone or distributed. API for v0.5.0, released on June 13, 2018.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
DispatcherServer.cc
Go to the documentation of this file.
1 /*****************************************************************************
2  * *
3  * Copyright 2018 Rice University *
4  * *
5  * Licensed under the Apache License, Version 2.0 (the "License"); *
6  * you may not use this file except in compliance with the License. *
7  * You may obtain a copy of the License at *
8  * *
9  * http://www.apache.org/licenses/LICENSE-2.0 *
10  * *
11  * Unless required by applicable law or agreed to in writing, software *
12  * distributed under the License is distributed on an "AS IS" BASIS, *
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. *
14  * See the License for the specific language governing permissions and *
15  * limitations under the License. *
16  * *
17  *****************************************************************************/
18 
19 #ifndef DISPATCHER_SERVER_CC
20 #define DISPATCHER_SERVER_CC
21 
22 #include "DispatcherServer.h"
23 #include "CatalogServer.h"
24 #include "PDBDebug.h"
25 #include "SimpleRequestHandler.h"
26 #include "SimpleRequestResult.h"
27 #include "SimpleSendBytesRequest.h"
28 #include "DispatcherAddData.h"
29 #include "BuiltInObjectTypeIDs.h"
30 #include "QuerySchedulerServer.h"
31 #include "Statistics.h"
32 #include "PartitionPolicyFactory.h"
34 #include <snappy.h>
35 #define MAX_CONCURRENT_REQUESTS 10
36 
37 namespace pdb {
38 
39 DispatcherServer::DispatcherServer(PDBLoggerPtr logger, std::shared_ptr<StatisticsDB> statisticsDB) {
40  this->logger = logger;
41  this->statisticsDB = statisticsDB;
42  this->storageNodes = pdb::makeObject<Vector<Handle<NodeDispatcherData>>>();
43  this->partitionPolicies = std::map<std::pair<std::string, std::string>, PartitionPolicyPtr>();
44  pthread_mutex_init(&mutex, nullptr);
46 }
47 
49 
51  pthread_mutex_destroy(&mutex);
52 }
53 
55  forMe.registerHandler(
56  DispatcherAddData_TYPEID,
58  PDBCommunicatorPtr sendUsingMe) {
59  pthread_mutex_lock(&mutex);
61  pthread_mutex_unlock(&mutex);
62  sleep(1);
63  pthread_mutex_lock(&mutex);
64  }
66  pthread_mutex_unlock(&mutex);
67  std::string errMsg;
68  bool res = true;
69  PDB_COUT << "DispatcherAddData handler running" << std::endl;
70  // Receive the data to send
71  size_t numBytes = sendUsingMe->getSizeOfNextObject();
72  std::cout << "Dispacher received numBytes = " << numBytes << std::endl;
73  Handle<Vector<Handle<Object>>> dataToSend;
74  char* tempPage = nullptr;
75  char* readToHere = nullptr;
76  if (request->isShallowCopy() == false) {
77  const UseTemporaryAllocationBlock tempBlock{numBytes + 65535};
78  dataToSend = sendUsingMe->getNextObject<Vector<Handle<Object>>>(res, errMsg);
79  } else {
80 #ifdef ENABLE_COMPRESSION
81  tempPage = new char[numBytes];
82  sendUsingMe->receiveBytes(tempPage, errMsg);
83 #else
84  readToHere = malloc(numBytes);
85  sendUsingMe->receiveBytes(readToHere, errMsg);
86 #endif
87 
88 #ifdef ENABLE_COMPRESSION
89  size_t uncompressedSize = 0;
90  snappy::GetUncompressedLength(tempPage, numBytes, &uncompressedSize);
91  readToHere = (char*)malloc(uncompressedSize);
92  snappy::RawUncompress(tempPage, numBytes, (char*)(readToHere));
93  Record<Vector<Handle<Object>>>* myRecord =
94  (Record<Vector<Handle<Object>>>*)readToHere;
95  dataToSend = myRecord->getRootObject();
96 #endif
97  }
98  if (dataToSend->size() == 0) {
99  errMsg = "Warning: client attemps to store zero object vector";
100  Handle<SimpleRequestResult> response =
101  makeObject<SimpleRequestResult>(false, errMsg);
102  res = sendUsingMe->sendObject(response, errMsg);
103  std::cout << errMsg << std::endl;
104  return make_pair(false, errMsg);
105 
106  } else {
107  std::cout << "Dispatch to send vector size = " << dataToSend->size() << std::endl;
108  }
109  // Check that the type of the data being stored matches what is known to the catalog
110  if (!validateTypes(request->getDatabaseName(),
111  request->getSetName(),
112  request->getTypeName(),
113  errMsg)) {
114  Handle<SimpleRequestResult> response =
115  makeObject<SimpleRequestResult>(false, errMsg);
116  res = sendUsingMe->sendObject(response, errMsg);
117  std::cout << errMsg << std::endl;
118  return make_pair(false, errMsg);
119  }
120  Handle<SimpleRequestResult> response = makeObject<SimpleRequestResult>(res, errMsg);
121  res = sendUsingMe->sendObject(response, errMsg);
122 
123  if (request->isShallowCopy() == false) {
124  dispatchData(std::pair<std::string, std::string>(request->getSetName(),
125  request->getDatabaseName()),
126  request->getTypeName(),
127  dataToSend);
128  } else {
129 
130 #ifdef ENABLE_COMPRESSION
131  dispatchBytes(std::pair<std::string, std::string>(request->getSetName(),
132  request->getDatabaseName()),
133  request->getTypeName(),
134  tempPage,
135  numBytes);
136  free(tempPage);
137 #else
138  dispatchBytes(std::pair<std::string, std::string>(request->getSetName(),
139  request->getDatabaseName()),
140  request->getTypeName(),
141  readToHere,
142  numBytes);
143 #endif
144  free(readToHere);
145  }
146 
147  // update stats
148  pthread_mutex_lock(&mutex);
149  StatisticsPtr stats = getFunctionality<QuerySchedulerServer>().getStats();
150  if (stats == nullptr) {
151  getFunctionality<QuerySchedulerServer>().collectStats();
152  stats = getFunctionality<QuerySchedulerServer>().getStats();
153  }
154  size_t oldNumBytes =
155  stats->getNumBytes(request->getDatabaseName(), request->getSetName());
156  size_t newNumBytes = oldNumBytes + numBytes;
157  stats->setNumBytes(request->getDatabaseName(), request->getSetName(), newNumBytes);
159  pthread_mutex_unlock(&mutex);
160  return make_pair(res, errMsg);
161  }));
162 
163  forMe.registerHandler(
164  DispatcherRegisterPartitionPolicy_TYPEID,
167 
168  PDB_COUT << "Registering partition policy for set " << request->getSetName() << ":"
169  << request->getDatabaseName() << std::endl;
170 
171  std::string errMsg;
172  bool res = true;
173 
174  registerSet(std::pair<std::string, std::string>(request->getSetName(),
175  request->getDatabaseName()),
176  PartitionPolicyFactory::buildPartitionPolicy(request->getPolicy()));
177 
178  Handle<SimpleRequestResult> response = makeObject<SimpleRequestResult>(res, errMsg);
179  res = sendUsingMe->sendObject(response, errMsg);
180 
181  return make_pair(res, errMsg);
182  }));
183 }
184 
186  Handle<Vector<Handle<NodeDispatcherData>>> storageNodes) {
187  this->storageNodes = storageNodes;
188  for (int i = 0; i < storageNodes->size(); i++) {
189  auto node = (*storageNodes)[i];
190  PDB_COUT << "Dispatcher register node: " << node->getAddress() << " : " << node->getPort()
191  << std::endl;
192  }
193 
194  for (auto const partitionPolicy : partitionPolicies) {
195  partitionPolicy.second->updateStorageNodes(storageNodes);
196  }
197 }
198 
199 void DispatcherServer::registerSet(std::pair<std::string, std::string> setAndDatabase,
200  PartitionPolicyPtr partitionPolicy) {
201  if (partitionPolicies.find(setAndDatabase) != partitionPolicies.end()) {
202  PDB_COUT << "Updating old set" << setAndDatabase.first << ":" << setAndDatabase.second
203  << std::endl;
204  } else {
205  PDB_COUT << "Found new set: " << setAndDatabase.first << ":" << setAndDatabase.second
206  << std::endl;
207  }
208  partitionPolicies.insert(std::pair<std::pair<std::string, std::string>, PartitionPolicyPtr>(
209  setAndDatabase, partitionPolicy));
210  partitionPolicies[setAndDatabase]->updateStorageNodes(storageNodes);
211 }
212 
213 
214 bool DispatcherServer::dispatchData(std::pair<std::string, std::string> setAndDatabase,
215  std::string type,
216  Handle<Vector<Handle<Object>>> toDispatch) {
217  // TODO: Implement this
218 
219  if (partitionPolicies.find(setAndDatabase) == partitionPolicies.end()) {
220  PDB_COUT << "No partition policy was found for set: " << setAndDatabase.first << ":"
221  << setAndDatabase.second << std::endl;
222  PDB_COUT << "Defaulting to random policy" << std::endl;
224  return dispatchData(setAndDatabase, type, toDispatch);
225  } else {
226  auto mappedPartitions = partitionPolicies[setAndDatabase]->partition(toDispatch);
227  PDB_COUT << "mappedPartitions size = " << mappedPartitions->size() << std::endl;
228  for (auto const& pair : (*mappedPartitions)) {
229  if (!sendData(setAndDatabase, type, findNode(pair.first), pair.second)) {
230  return false;
231  }
232  }
233  return true;
234  }
235 }
236 
237 
238 bool DispatcherServer::dispatchBytes(std::pair<std::string, std::string> setAndDatabase,
239  std::string type,
240  char* bytes,
241  size_t numBytes) {
242  // TODO: Implement this
243 
244  if (partitionPolicies.find(setAndDatabase) == partitionPolicies.end()) {
245  PDB_COUT << "No partition policy was found for set: " << setAndDatabase.first << ":"
246  << setAndDatabase.second << std::endl;
247  PDB_COUT << "Defaulting to random policy" << std::endl;
249  return dispatchBytes(setAndDatabase, type, bytes, numBytes);
250  } else {
251  auto mappedPartitions = partitionPolicies[setAndDatabase]->partition(nullptr);
252  PDB_COUT << "mappedPartitions size = " << mappedPartitions->size() << std::endl;
253  for (auto const& pair : (*mappedPartitions)) {
254  if (!sendBytes(setAndDatabase, type, findNode(pair.first), bytes, numBytes)) {
255  return false;
256  }
257  }
258  return true;
259  }
260 }
261 
262 
263 bool DispatcherServer::validateTypes(const std::string& databaseName,
264  const std::string& setName,
265  const std::string& typeName,
266  std::string& errMsg) {
267  PDB_COUT << "running validateTypes with typeName" << typeName << std::endl;
268  /* std::string fullSetName = databaseName + "." + setName;
269  Handle<pdb::Vector<CatalogSetMetadata>> returnValues =
270  makeObject<pdb::Vector<CatalogSetMetadata>>();
271 
272  getFunctionality<CatalogServer>().getCatalog()->getListOfSets(returnValues, fullSetName) ;
273 
274  if (returnValues->size() == 0) {
275  errMsg = "Set " + fullSetName + " cannot be found in the catalog";
276  std :: cout << errMsg << std :: endl;
277  return false;
278  } else {
279  if ((* returnValues)[0].getObjectTypeName() == typeName) {
280  PDB_COUT << "validateTypes succeed" << std :: endl;
281  return true;
282  } else {
283  errMsg = "Dispatched type " + typeName + " does not match stored type " +
284  (* returnValues)[0].getObjectTypeName().c_str();
285  std :: cout << errMsg << std :: endl;
286  return false;
287  }
288  }
289  PDB_COUT << fullSetName << std :: endl;
290  std :: cout << errMsg << std :: endl;
291  return false;
292  */
293 
294  return true;
295 }
296 
297 bool DispatcherServer::sendData(std::pair<std::string, std::string> setAndDatabase,
298  std::string type,
299  Handle<NodeDispatcherData> destination,
300  Handle<Vector<Handle<Object>>> toSend) {
301 
302  PDB_COUT << "Sending data to " << destination->getPort() << " : " << destination->getAddress()
303  << std::endl;
304  std::string err;
305  StorageClient storageClient =
306  StorageClient(destination->getPort(), destination->getAddress(), logger);
307  if (!storageClient.storeData(toSend, setAndDatabase.second, setAndDatabase.first, type, err)) {
308  PDB_COUT << "Not able to store data: " << err << std::endl;
309  return 0;
310  }
311  return 1;
312 }
313 
314 bool DispatcherServer::sendBytes(std::pair<std::string, std::string> setAndDatabase,
315  std::string type,
316  Handle<NodeDispatcherData> destination,
317  char* bytes,
318  size_t numBytes) {
319 #ifndef ENABLE_COMPRESSION
320  std::cout << "Now only objects or compressed bytes can be dispatched!!" << std::endl;
321 #endif
322  int port = destination->getPort();
323  std::string address = destination->getAddress();
324  std::string databaseName = setAndDatabase.second;
325  std::string setName = setAndDatabase.first;
326  std::string errMsg;
327  std::cout << "store compressed bytes to address=" << address << " and port=" << port
328  << ", with compressed byte size = " << numBytes << " to database=" << databaseName
329  << " and set=" << setName << " and type = IntermediateData" << std::endl;
330  return simpleSendBytesRequest<StorageAddData, SimpleRequestResult, bool>(
331  logger,
332  port,
333  address,
334  false,
335  1024,
336  [&](Handle<SimpleRequestResult> result) {
337  if (result != nullptr)
338  if (!result->getRes().first) {
339  logger->error("Error sending data: " + result->getRes().second);
340  errMsg = "Error sending data: " + result->getRes().second;
341  }
342  return true;
343  },
344  bytes,
345  numBytes,
346  databaseName,
347  setName,
348  "IntermediateData",
349  false,
350  true,
351  true,
352  true);
353 }
354 
355 
357  for (int i = 0; i < storageNodes->size(); i++) {
358  auto storageNode = (*storageNodes)[i];
359  if (storageNode->getNodeId() == nodeId) {
360  return storageNode;
361  }
362  }
363  return nullptr;
364 }
365 }
366 
367 #endif
Handle< Vector< Handle< NodeDispatcherData > > > storageNodes
bool sendBytes(std::pair< std::string, std::string > setAndDatabase, std::string type, Handle< NodeDispatcherData > destination, char *bytes, size_t numBytes)
std::shared_ptr< Statistics > StatisticsPtr
Definition: Statistics.h:27
Handle< ObjType > getRootObject()
Definition: Record.cc:46
unsigned int NodeID
Definition: DataTypes.h:27
void registerSet(std::pair< std::string, std::string > setAndDatabase, PartitionPolicyPtr partitionPolicy)
bool dispatchData(std::pair< std::string, std::string > setAndDatabase, std::string type, Handle< Vector< Handle< Object >>> toDispatch)
static PartitionPolicyPtr buildPartitionPolicy(PartitionPolicy::Policy policy)
std::shared_ptr< PDBCommunicator > PDBCommunicatorPtr
void registerStorageNodes(Handle< Vector< Handle< NodeDispatcherData >>> storageNodes)
std::shared_ptr< PartitionPolicy > PartitionPolicyPtr
#define PDB_COUT
Definition: PDBDebug.h:31
void registerHandlers(PDBServer &forMe) override
DispatcherServer(PDBLoggerPtr logger, std::shared_ptr< StatisticsDB > statisticsDB)
void registerHandler(int16_t typeID, PDBCommWorkPtr handledBy)
Definition: PDBServer.cc:81
std::shared_ptr< StatisticsDB > statisticsDB
bool validateTypes(const std::string &databaseName, const std::string &setName, const std::string &typeName, std::string &errMsg)
std::shared_ptr< PDBLogger > PDBLoggerPtr
Definition: PDBLogger.h:40
static PartitionPolicyPtr buildDefaultPartitionPolicy()
std::map< std::pair< std::string, std::string >, PartitionPolicyPtr > partitionPolicies
bool sendData(std::pair< std::string, std::string > setAndDatabase, std::string type, Handle< NodeDispatcherData > destination, Handle< Vector< Handle< Object >>> toSend)
bool storeData(Handle< Vector< Handle< DataType >>> data, std::string databaseName, std::string setName, std::string &errMsg, bool typeCheck=true)
#define MAX_CONCURRENT_REQUESTS
Handle< NodeDispatcherData > findNode(NodeID nodeId)
bool dispatchBytes(std::pair< std::string, std::string > setAndDatabase, std::string type, char *bytes, size_t numBytes)