A platform for high-performance distributed tool and library development written in C++. It can be deployed in two different cluster modes: standalone or distributed. API for v0.5.0, released on June 13, 2018.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
DistributedStorageManagerServer.cc
Go to the documentation of this file.
1 /*****************************************************************************
2  * *
3  * Copyright 2018 Rice University *
4  * *
5  * Licensed under the Apache License, Version 2.0 (the "License"); *
6  * you may not use this file except in compliance with the License. *
7  * You may obtain a copy of the License at *
8  * *
9  * http://www.apache.org/licenses/LICENSE-2.0 *
10  * *
11  * Unless required by applicable law or agreed to in writing, software *
12  * distributed under the License is distributed on an "AS IS" BASIS, *
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. *
14  * See the License for the specific language governing permissions and *
15  * limitations under the License. *
16  * *
17  *****************************************************************************/
18 #ifndef OBJECTQUERYMODEL_DISTRIBUTEDSTORAGEMANAGERSERVER_CC
19 #define OBJECTQUERYMODEL_DISTRIBUTEDSTORAGEMANAGERSERVER_CC
20 
21 #include "PDBDebug.h"
23 #include "CatalogClient.h"
24 #include "CatalogServer.h"
25 #include "ResourceManagerServer.h"
26 #include "DispatcherServer.h"
27 #include "PDBCatalogMsgType.h"
29 #include "CatalogSetMetadata.h"
30 #include "SimpleRequestHandler.h"
31 
41 #include "QuerySchedulerServer.h"
42 #include "Statistics.h"
43 #include "StorageAddDatabase.h"
44 #include "StorageAddSet.h"
45 #include "StorageAddTempSet.h"
46 #include "StorageRemoveDatabase.h"
47 #include "StorageRemoveUserSet.h"
48 #include "StorageExportSet.h"
49 #include "StorageClearSet.h"
50 #include "StorageCleanup.h"
51 #include "Configuration.h"
52 
53 #include "SetScan.h"
54 #include "KeepGoing.h"
55 #include "DoneWithResult.h"
56 
57 #include <chrono>
58 #include <ctime>
59 #include <unistd.h>
60 #include <snappy.h>
61 
62 #define USING_ALL_NODES
63 
64 namespace pdb {
65 
67  ConfigurationPtr conf,
68  std::shared_ptr<StatisticsDB> statisticsDB)
69  : BroadcastServer(logger, conf) {
70  this->statisticsDB = statisticsDB;
71 }
72 
73 
75  std::shared_ptr<StatisticsDB> statisticsDB)
76  : BroadcastServer(logger) {
77  this->statisticsDB = statisticsDB;
78 }
79 
80 
82  // no-op
83 }
84 
86 
90  forMe.registerHandler(
91  DistributedStorageAddDatabase_TYPEID,
94  const UseTemporaryAllocationBlock tempBlock{1 * 1024 * 1024};
95  std::string errMsg;
96  std::string database = request->getDatabase();
97  std::string value;
98  int catalogType = PDBCatalogMsgType::CatalogPDBDatabase;
99 
100  if (getFunctionality<CatalogServer>().getCatalog()->keyIsFound(
101  catalogType, database, value)) {
102  PDB_COUT << "Database " << database << " already exists " << std::endl;
103  } else {
104  PDB_COUT << "Database " << database << " does not exist" << std::endl;
105  if (!getFunctionality<CatalogClient>().createDatabase(database, errMsg)) {
106  std::cout << "Could not register db, because: " << errMsg << std::endl;
107  Handle<SimpleRequestResult> response =
108  makeObject<SimpleRequestResult>(false, errMsg);
109  bool res = sendUsingMe->sendObject(response, errMsg);
110  return make_pair(res, errMsg);
111  }
112  }
113 
114  mutex lock;
115  auto successfulNodes = std::vector<std::string>();
116  auto failureNodes = std::vector<std::string>();
117  auto nodesToBroadcastTo = std::vector<std::string>();
118 
119 #ifndef USING_ALL_NODES
120  if (!getFunctionality<DistributedStorageManagerServer>().findNodesForDatabase(
121  database, nodesToBroadcastTo, errMsg)) {
122  PDB_COUT << "Could not find nodes to broadcast database to: " << errMsg
123  << std::endl;
124  Handle<SimpleRequestResult> response =
125  makeObject<SimpleRequestResult>(false, errMsg);
126  bool res = sendUsingMe->sendObject(response, errMsg);
127  return make_pair(res, errMsg);
128  }
129 #else
130  std::vector<std::string> allNodes;
131  const auto nodes = getFunctionality<ResourceManagerServer>().getAllNodes();
132  for (int i = 0; i < nodes->size(); i++) {
133  std::string address = static_cast<std::string>((*nodes)[i]->getAddress());
134  std::string port = std::to_string((*nodes)[i]->getPort());
135  allNodes.push_back(address + ":" + port);
136  }
137  nodesToBroadcastTo = allNodes;
138 #endif
139 
140  Handle<StorageAddDatabase> storageCmd =
141  makeObject<StorageAddDatabase>(request->getDatabase());
142  getFunctionality<DistributedStorageManagerServer>()
143  .broadcast<StorageAddDatabase, Object, SimpleRequestResult>(
144  storageCmd,
145  nullptr,
146  nodesToBroadcastTo,
147  generateAckHandler(successfulNodes, failureNodes, lock),
148  [&](std::string errMsg, std::string serverName) {
149  lock.lock();
150  std::cout << "Server " << serverName << " received an error: " << errMsg
151  << std::endl;
152  failureNodes.push_back(serverName);
153  lock.unlock();
154  });
155 
156  bool res = true;
157  for (auto node : successfulNodes) {
158  if (!getFunctionality<CatalogClient>().addNodeToDB(
159  node, request->getDatabase(), errMsg)) {
160  // TODO: Handle error
161  std::cout << "Failed to register node " << node << " for database "
162  << request->getDatabase() << " in Catalog" << std::endl;
163  }
164  }
165 
166  Handle<SimpleRequestResult> response = makeObject<SimpleRequestResult>(res, errMsg);
167  res = sendUsingMe->sendObject(response, errMsg);
168  return make_pair(res, errMsg);
169  }));
170  forMe.registerHandler(
171  DistributedStorageClearSet_TYPEID,
173  [&](Handle<DistributedStorageClearSet> request, PDBCommunicatorPtr sendUsingMe) {
174  const UseTemporaryAllocationBlock tempBlock{8 * 1024 * 1024};
175  std::cout << "received DistributedStorageClearSet message" << std::endl;
176  std::string errMsg;
177  bool res = true;
178  mutex lock;
179 
180  auto successfulNodes = std::vector<std::string>();
181  auto failureNodes = std::vector<std::string>();
182  auto nodesToBroadcast = std::vector<std::string>();
183 
184  std::string database = request->getDatabase();
185  std::string set = request->getSetName();
186  std::string fullSetName = database + "." + set;
187  PDB_COUT << "set to clear is " << fullSetName << std::endl;
188  std::string value;
189  int catalogType = PDBCatalogMsgType::CatalogPDBSet;
190 
191  if (getFunctionality<CatalogServer>().getCatalog()->keyIsFound(
192  catalogType, fullSetName, value)) {
193  std::cout << "Set " << fullSetName << " already exists " << std::endl;
194 // to remove set
195 #ifndef USING_ALL_NODES
196  if (!getFunctionality<DistributedStorageManagerServer>().findNodesForSet(
197  database, set, nodesToBroadcast, errMsg)) {
198  std::cout << "Could not find nodes to broadcast set to: " << errMsg
199  << std::endl;
200  Handle<SimpleRequestResult> response =
201  makeObject<SimpleRequestResult>(false, errMsg);
202  bool res = sendUsingMe->sendObject(response, errMsg);
203  return make_pair(res, errMsg);
204  }
205 #else
206  std::vector<std::string> allNodes;
207  const auto nodes = getFunctionality<ResourceManagerServer>().getAllNodes();
208  for (int i = 0; i < nodes->size(); i++) {
209  std::string address = static_cast<std::string>((*nodes)[i]->getAddress());
210  std::string port = std::to_string((*nodes)[i]->getPort());
211  allNodes.push_back(address + ":" + port);
212  }
213  nodesToBroadcast = allNodes;
214 #endif
215  Handle<StorageClearSet> storageCmd = makeObject<StorageClearSet>(
216  request->getDatabase(), request->getSetName(), request->getTypeName());
217 
218 
219  getFunctionality<DistributedStorageManagerServer>()
220  .broadcast<StorageClearSet, Object, SimpleRequestResult>(
221  storageCmd,
222  nullptr,
223  nodesToBroadcast,
224  generateAckHandler(successfulNodes, failureNodes, lock));
225  res = true;
226  } else {
227  PDB_COUT << "Set " << fullSetName << " does not exist" << std::endl;
228  res = false;
229  errMsg = std::string("Set to clear with name=") + fullSetName +
230  std::string(" doesn't exist");
231  }
232 
233  // update stats
234  StatisticsPtr stats = getFunctionality<QuerySchedulerServer>().getStats();
235  if (stats == nullptr) {
236  getFunctionality<QuerySchedulerServer>().collectStats();
237  stats = getFunctionality<QuerySchedulerServer>().getStats();
238  }
239  stats->setNumPages(request->getDatabase(), request->getSetName(), 0);
240  stats->setNumBytes(request->getDatabase(), request->getSetName(), 0);
241 
242  Handle<SimpleRequestResult> response = makeObject<SimpleRequestResult>(res, errMsg);
243  res = sendUsingMe->sendObject(response, errMsg);
244  return make_pair(res, errMsg);
245  }));
246 
247 
248  forMe.registerHandler(
249  DistributedStorageAddTempSet_TYPEID,
252  const UseTemporaryAllocationBlock tempBlock{8 * 1024 * 1024};
253  auto begin = std::chrono::high_resolution_clock::now();
254 
255  PDB_COUT << "received DistributedStorageAddTempSet message" << std::endl;
256  std::string errMsg;
257  mutex lock;
258 
259  auto successfulNodes = std::vector<std::string>();
260  auto failureNodes = std::vector<std::string>();
261  auto nodesToBroadcast = std::vector<std::string>();
262 
263  std::string set = request->getSetName();
264  std::string value;
265 
266  std::vector<std::string> allNodes;
267  const auto nodes = getFunctionality<ResourceManagerServer>().getAllNodes();
268  for (int i = 0; i < nodes->size(); i++) {
269  std::string address = static_cast<std::string>((*nodes)[i]->getAddress());
270  std::string port = std::to_string((*nodes)[i]->getPort());
271  allNodes.push_back(address + ":" + port);
272  }
273  nodesToBroadcast = allNodes;
274 
275  Handle<StorageAddSet> storageCmd = makeObject<StorageAddSet>(request->getDatabaseName(),
276  request->getSetName(),
277  request->getTypeName(),
278  request->getPageSize());
279 
280  getFunctionality<DistributedStorageManagerServer>()
281  .broadcast<StorageAddSet, Object, SimpleRequestResult>(
282  storageCmd,
283  nullptr,
284  nodesToBroadcast,
285  generateAckHandler(successfulNodes, failureNodes, lock));
286 
287  auto storageAddSetEnd = std::chrono::high_resolution_clock::now();
288  PDB_COUT << "Time Duration for adding temp set:\t "
289  << std::chrono::duration_cast<std::chrono::duration<float>>(storageAddSetEnd -
290  begin)
291  .count()
292  << " secs." << std::endl;
293 
294  bool res = true;
295  if (failureNodes.size() > 0) {
296  res = false;
297  errMsg = "";
298  for (int i = 0; i < failureNodes.size(); i++) {
299  errMsg += failureNodes[i] + std::string(";");
300  }
301  }
302  // update stats
303  StatisticsPtr stats = getFunctionality<QuerySchedulerServer>().getStats();
304  if (stats == nullptr) {
305  getFunctionality<QuerySchedulerServer>().collectStats();
306  stats = getFunctionality<QuerySchedulerServer>().getStats();
307  }
308  stats->setNumPages("temp", request->getSetName(), 0);
309  stats->setNumBytes("temp", request->getSetName(), 0);
310 
311  Handle<SimpleRequestResult> response = makeObject<SimpleRequestResult>(res, errMsg);
312  res = sendUsingMe->sendObject(response, errMsg);
313  return make_pair(res, errMsg);
314 
315  }));
316 
317  forMe.registerHandler(
318  DistributedStorageAddSet_TYPEID,
321  const UseTemporaryAllocationBlock tempBlock{8 * 1024 * 1024};
322  auto begin = std::chrono::high_resolution_clock::now();
323  auto beforeCreateSet = begin;
324  auto afterCreateSet = begin;
325 
326  PDB_COUT << "received DistributedStorageAddSet message" << std::endl;
327  std::string errMsg;
328  mutex lock;
329 
330  auto successfulNodes = std::vector<std::string>();
331  auto failureNodes = std::vector<std::string>();
332  auto nodesToBroadcast = std::vector<std::string>();
333 
334  std::string database = request->getDatabase();
335  std::string set = request->getSetName();
336  std::string fullSetName = database + "." + set;
337  PDB_COUT << "set to create is " << fullSetName << std::endl;
338  std::string value;
339  int catalogType = PDBCatalogMsgType::CatalogPDBSet;
340 
341  if (getFunctionality<CatalogServer>().getCatalog()->keyIsFound(
342  catalogType, fullSetName, value)) {
343  std::cout << "Set " << fullSetName << " already exists " << std::endl;
344  } else {
345  PDB_COUT << "Set " << fullSetName << " does not exist" << std::endl;
346 
347  // JiaNote: comment out below line because searchForObjectTypeName doesn't work for
348  // complex type like Vector<Handle<Foo>>
349  // int16_t typeId =
350  // getFunctionality<CatalogClient>().searchForObjectTypeName(request->getTypeName());
351  int16_t typeId = VTableMap::getIDByName(request->getTypeName(), false);
352  if (typeId == 0) {
353  return make_pair(false, "Could not identify type=" + request->getTypeName());
354  }
355 
356  beforeCreateSet = std::chrono::high_resolution_clock::now();
357 
358  if (!getFunctionality<CatalogClient>().createSet(typeId, database, set, errMsg)) {
359  std::cout << "Could not register set, because: " << errMsg << std::endl;
360  Handle<SimpleRequestResult> response =
361  makeObject<SimpleRequestResult>(false, errMsg);
362  bool res = sendUsingMe->sendObject(response, errMsg);
363  return make_pair(res, errMsg);
364  }
365  afterCreateSet = std::chrono::high_resolution_clock::now();
366  }
367 #ifndef USING_ALL_NODES
368  if (!getFunctionality<DistributedStorageManagerServer>().findNodesForSet(
369  database, set, nodesToBroadcast, errMsg)) {
370  std::cout << "Could not find nodes to broadcast set to: " << errMsg << std::endl;
371  Handle<SimpleRequestResult> response =
372  makeObject<SimpleRequestResult>(false, errMsg);
373  bool res = sendUsingMe->sendObject(response, errMsg);
374  return make_pair(res, errMsg);
375  }
376 #else
377  std::vector<std::string> allNodes;
378  const auto nodes = getFunctionality<ResourceManagerServer>().getAllNodes();
379  for (int i = 0; i < nodes->size(); i++) {
380  std::string address = static_cast<std::string>((*nodes)[i]->getAddress());
381  std::string port = std::to_string((*nodes)[i]->getPort());
382  allNodes.push_back(address + ":" + port);
383  }
384  nodesToBroadcast = allNodes;
385 #endif
386  auto catalogGetNodesEnd = std::chrono::high_resolution_clock::now();
387 
388  Handle<StorageAddSet> storageCmd = makeObject<StorageAddSet>(request->getDatabase(),
389  request->getSetName(),
390  request->getTypeName(),
391  request->getPageSize());
392 
393 
394  getFunctionality<DistributedStorageManagerServer>()
395  .broadcast<StorageAddSet, Object, SimpleRequestResult>(
396  storageCmd,
397  nullptr,
398  nodesToBroadcast,
399  generateAckHandler(successfulNodes, failureNodes, lock));
400 
401  auto storageAddSetEnd = std::chrono::high_resolution_clock::now();
402 
403 
404  for (auto node : successfulNodes) {
405  if (!getFunctionality<CatalogClient>().addNodeToSet(node, database, set, errMsg)) {
406  std::cout << "Failed to register node " << node << " for set " << fullSetName
407  << " in Catalog" << std::endl;
408  }
409  }
410  bool res = true;
411  if (failureNodes.size() > 0) {
412  res = false;
413  } else {
414  // update stats
415  StatisticsPtr stats = getFunctionality<QuerySchedulerServer>().getStats();
416  if (stats == nullptr) {
417  getFunctionality<QuerySchedulerServer>().collectStats();
418  stats = getFunctionality<QuerySchedulerServer>().getStats();
419  }
420  stats->setNumPages(request->getDatabase(), request->getSetName(), 0);
421  stats->setNumBytes(request->getDatabase(), request->getSetName(), 0);
422  }
423  long id = -1;
424  int typeId = VTableMap::getIDByName(request->getTypeName());
425  this->statisticsDB->createData(request->getDatabase(),
426  request->getSetName(),
427  "UNKNOWN",
428  "UserSet",
429  request->getTypeName(),
430  typeId,
431  request->getPageSize(),
432  id);
433  std::cout << "created data in statistics database with id = " << id << std::endl;
434 
435  auto catalogAddSetEnd = std::chrono::high_resolution_clock::now();
436 
437  PDB_COUT << "Time Duration for catalog create set Metadata:\t "
438  << std::chrono::duration_cast<std::chrono::duration<float>>(afterCreateSet -
439  beforeCreateSet)
440  .count()
441  << " secs." << std::endl;
442  PDB_COUT << "Time Duration for catalog getting nodes:\t "
443  << std::chrono::duration_cast<std::chrono::duration<float>>(
444  catalogGetNodesEnd - afterCreateSet)
445  .count()
446  << " secs." << std::endl;
447  PDB_COUT << "Time Duration for storage adding set:\t "
448  << std::chrono::duration_cast<std::chrono::duration<float>>(storageAddSetEnd -
449  catalogGetNodesEnd)
450  .count()
451  << " secs." << std::endl;
452  PDB_COUT << "Time Duration for catalog adding addNodeToSet metadata:\t "
453  << std::chrono::duration_cast<std::chrono::duration<float>>(catalogAddSetEnd -
454  storageAddSetEnd)
455  .count()
456  << " secs." << std::endl;
457  PDB_COUT << std::endl;
458 
459  Handle<SimpleRequestResult> response = makeObject<SimpleRequestResult>(res, errMsg);
460  res = sendUsingMe->sendObject(response, errMsg);
461  return make_pair(res, errMsg);
462  }));
463 
464  forMe.registerHandler(
465  DistributedStorageRemoveDatabase_TYPEID,
468  const UseTemporaryAllocationBlock tempBlock{1 * 1024 * 1024};
469  std::string errMsg;
470  mutex lock;
471  std::vector<std::string> successfulNodes = std::vector<std::string>();
472  std::vector<std::string> failureNodes = std::vector<std::string>();
473 
474  std::string database = request->getDatabase();
475  std::string value;
476  int catalogType = PDBCatalogMsgType::CatalogPDBDatabase;
477 
478  if (!getFunctionality<CatalogServer>().getCatalog()->keyIsFound(
479  catalogType, database, value)) {
480  errMsg = "Cannot delete database, database " + database + " does not exist\n";
481  Handle<SimpleRequestResult> response =
482  makeObject<SimpleRequestResult>(false, errMsg);
483  bool res = sendUsingMe->sendObject(response, errMsg);
484  return make_pair(res, errMsg);
485  }
486 
487  auto nodesToBroadcastTo = std::vector<std::string>();
488 
489 #ifndef USING_ALL_NODES
490  if (!getFunctionality<DistributedStorageManagerServer>().findNodesContainingDatabase(
491  database, nodesToBroadcastTo, errMsg)) {
492  std::cout << "Could not find nodes to broadcast database delete to " << errMsg
493  << std::endl;
494  Handle<SimpleRequestResult> response =
495  makeObject<SimpleRequestResult>(false, errMsg);
496  bool res = sendUsingMe->sendObject(response, errMsg);
497  return make_pair(res, errMsg);
498  }
499 #else
500  std::vector<std::string> allNodes;
501  const auto nodes = getFunctionality<ResourceManagerServer>().getAllNodes();
502  for (int i = 0; i < nodes->size(); i++) {
503  std::string address = static_cast<std::string>((*nodes)[i]->getAddress());
504  std::string port = std::to_string((*nodes)[i]->getPort());
505  allNodes.push_back(address + ":" + port);
506  }
507  nodesToBroadcastTo = allNodes;
508 #endif
509  Handle<StorageRemoveDatabase> storageCmd = makeObject<StorageRemoveDatabase>(database);
510  getFunctionality<DistributedStorageManagerServer>()
511  .broadcast<StorageRemoveDatabase, Object, SimpleRequestResult>(
512  storageCmd,
513  nullptr,
514  nodesToBroadcastTo,
515  generateAckHandler(successfulNodes, failureNodes, lock));
516 
517  if (failureNodes.size() == 0) {
518  PDB_COUT << "Successfully deleted database on " << successfulNodes.size()
519  << " nodes" << std::endl;
520  } else {
521  errMsg = "Failed to delete database on " + std::to_string(failureNodes.size()) +
522  " nodes. Skipping registering with catalog";
523  std::cout << errMsg;
524  Handle<SimpleRequestResult> response =
525  makeObject<SimpleRequestResult>(false, errMsg);
526  bool res = sendUsingMe->sendObject(response, errMsg);
527  return make_pair(res, errMsg);
528  }
529 
530  if (!getFunctionality<CatalogClient>().deleteDatabase(database, errMsg)) {
531  std::cout << "Could not delete database, because: " << errMsg << std::endl;
532  Handle<SimpleRequestResult> response =
533  makeObject<SimpleRequestResult>(false, errMsg);
534  bool res = sendUsingMe->sendObject(response, errMsg);
535  return make_pair(res, errMsg);
536  }
537 
538  bool res = true;
539 
540  Handle<SimpleRequestResult> response = makeObject<SimpleRequestResult>(res, errMsg);
541  res = sendUsingMe->sendObject(response, errMsg);
542  return make_pair(res, errMsg);
543  }));
544 
545  forMe.registerHandler(
546  DistributedStorageRemoveTempSet_TYPEID,
549  const UseTemporaryAllocationBlock tempBlock{1 * 1024 * 1024};
550  auto begin = std::chrono::high_resolution_clock::now();
551 
552  std::string errMsg;
553  mutex lock;
554  auto successfulNodes = std::vector<std::string>();
555  auto failureNodes = std::vector<std::string>();
556  auto nodesToBroadcast = std::vector<std::string>();
557 
558  std::string database = request->getDatabase();
559  std::string set = request->getSetName();
560  std::string fullSetName = database + "." + set;
561  std::vector<std::string> allNodes;
562  const auto nodes = getFunctionality<ResourceManagerServer>().getAllNodes();
563  for (int i = 0; i < nodes->size(); i++) {
564  std::string address = static_cast<std::string>((*nodes)[i]->getAddress());
565  std::string port = std::to_string((*nodes)[i]->getPort());
566  allNodes.push_back(address + ":" + port);
567  }
568  nodesToBroadcast = allNodes;
569 
570  PDB_COUT << "to broadcast StorageRemoveTempSet" << std::endl;
571  Handle<StorageRemoveUserSet> storageCmd = makeObject<StorageRemoveUserSet>(
572  request->getDatabase(), request->getSetName(), request->getTypeName());
573  getFunctionality<DistributedStorageManagerServer>()
574  .broadcast<StorageRemoveUserSet, Object, SimpleRequestResult>(
575  storageCmd,
576  nullptr,
577  nodesToBroadcast,
578  generateAckHandler(successfulNodes, failureNodes, lock));
579 
580  auto storageRemoveSetEnd = std::chrono::high_resolution_clock::now();
581 
582  PDB_COUT << "Time Duration for storage removing set:\t "
583  << std::chrono::duration_cast<std::chrono::duration<float>>(
584  storageRemoveSetEnd - begin)
585  .count()
586  << " secs." << std::endl;
587 
588  bool res = true;
589  if (failureNodes.size() > 0) {
590  res = false;
591  errMsg = "";
592  for (int i = 0; i < failureNodes.size(); i++) {
593  errMsg += failureNodes[i] + std::string(";");
594  }
595  }
596  // update stats
597  StatisticsPtr stats = getFunctionality<QuerySchedulerServer>().getStats();
598  if (stats == nullptr) {
599  getFunctionality<QuerySchedulerServer>().collectStats();
600  stats = getFunctionality<QuerySchedulerServer>().getStats();
601  }
602  stats->removeSet(request->getDatabase(), request->getSetName());
603 
604  Handle<SimpleRequestResult> response = makeObject<SimpleRequestResult>(res, errMsg);
605  res = sendUsingMe->sendObject(response, errMsg);
606  return make_pair(res, errMsg);
607  }));
608 
609 
610  forMe.registerHandler(
611  DistributedStorageRemoveSet_TYPEID,
614 
615  const UseTemporaryAllocationBlock tempBlock{1 * 1024 * 1024};
616  auto begin = std::chrono::high_resolution_clock::now();
617 
618  std::string errMsg;
619  mutex lock;
620  auto successfulNodes = std::vector<std::string>();
621  auto failureNodes = std::vector<std::string>();
622  auto nodesToBroadcast = std::vector<std::string>();
623 
624  std::string database = request->getDatabase();
625  std::string set = request->getSetName();
626  std::string fullSetName = database + "." + set;
627 
629  makeObject<pdb::Vector<CatalogSetMetadata>>();
630 
631  std::string typeName;
632 
633  getFunctionality<CatalogServer>().getCatalog()->getListOfSets(returnValues,
634  fullSetName);
635 
636  if (returnValues->size() == 0) {
637  std::cout << "Cannot remove set, Set " << fullSetName << " does not exist "
638  << std::endl;
639  Handle<SimpleRequestResult> response =
640  makeObject<SimpleRequestResult>(false, errMsg);
641  bool res = sendUsingMe->sendObject(response, errMsg);
642  return make_pair(res, errMsg);
643  } else {
644  typeName = (*returnValues)[0].getObjectTypeName();
645  // std :: cout << "typeName=" << typeName << std :: endl;
646  }
647 #ifndef USING_ALL_NODES
648  if (!getFunctionality<DistributedStorageManagerServer>().findNodesContainingSet(
649  database, set, nodesToBroadcast, errMsg)) {
650  std::cout << "Could not find nodes to broadcast set to: " << errMsg << std::endl;
651  Handle<SimpleRequestResult> response =
652  makeObject<SimpleRequestResult>(false, errMsg);
653  bool res = sendUsingMe->sendObject(response, errMsg);
654  return make_pair(res, errMsg);
655  }
656 #else
657  std::vector<std::string> allNodes;
658  const auto nodes = getFunctionality<ResourceManagerServer>().getAllNodes();
659  for (int i = 0; i < nodes->size(); i++) {
660  std::string address = static_cast<std::string>((*nodes)[i]->getAddress());
661  std::string port = std::to_string((*nodes)[i]->getPort());
662  allNodes.push_back(address + ":" + port);
663  }
664  nodesToBroadcast = allNodes;
665 #endif
666 
667  auto catalogGetNodesEnd = std::chrono::high_resolution_clock::now();
668  PDB_COUT << "to broadcast StorageRemoveUserSet" << std::endl;
669  Handle<StorageRemoveUserSet> storageCmd = makeObject<StorageRemoveUserSet>(
670  request->getDatabase(), request->getSetName(), typeName);
671  getFunctionality<DistributedStorageManagerServer>()
672  .broadcast<StorageRemoveUserSet, Object, SimpleRequestResult>(
673  storageCmd,
674  nullptr,
675  nodesToBroadcast,
676  generateAckHandler(successfulNodes, failureNodes, lock));
677 
678  if (failureNodes.size() == 0) {
679  PDB_COUT << "Successfully deleted set on " << successfulNodes.size() << " nodes"
680  << std::endl;
681  } else {
682  errMsg = "Failed to delete set on " + std::to_string(failureNodes.size()) +
683  " nodes. Skipping registering with catalog";
684  std::cout << errMsg << std::endl;
685  Handle<SimpleRequestResult> response =
686  makeObject<SimpleRequestResult>(false, errMsg);
687  bool res = sendUsingMe->sendObject(response, errMsg);
688  return make_pair(res, errMsg);
689  }
690 
691  auto storageRemoveSetEnd = std::chrono::high_resolution_clock::now();
692 
693  // update stats
694  StatisticsPtr stats = getFunctionality<QuerySchedulerServer>().getStats();
695  if (stats == nullptr) {
696  getFunctionality<QuerySchedulerServer>().collectStats();
697  stats = getFunctionality<QuerySchedulerServer>().getStats();
698  }
699  stats->removeSet(request->getDatabase(), request->getSetName());
700 
701  if (failureNodes.size() == 0) {
702  // If all the nodes succeeded in removing the set then we can simply delete the set
703  // from the
704  // catalog
705  PDB_COUT << "Succeeded in deleting set " << fullSetName << " on all nodes"
706  << std::endl;
707  if (!getFunctionality<CatalogClient>().deleteSet(database, set, errMsg)) {
708  std::cout << "Could not delete set, because: " << errMsg << std::endl;
709  Handle<SimpleRequestResult> response =
710  makeObject<SimpleRequestResult>(false, errMsg);
711  bool res = sendUsingMe->sendObject(response, errMsg);
712  return make_pair(res, errMsg);
713  }
714  } else {
715  // If some nodes failed in removing the set remove these nodes from the set maps in
716  // the catalog
717  // so that future calls to this function will only attempt to affect the unmodified
718  // storage nodes
719  errMsg = "Nodes failed to remove set " + fullSetName + ": ";
720  for (auto node : successfulNodes) {
721  if (!getFunctionality<CatalogClient>().removeNodeFromSet(
722  node, database, set, errMsg)) {
723  errMsg += node + ", ";
724  std::cout << errMsg << std::endl;
725  }
726  }
727 
728  bool res = false;
729  Handle<SimpleRequestResult> response = makeObject<SimpleRequestResult>(res, errMsg);
730  sendUsingMe->sendObject(response, errMsg);
731  return make_pair(false, errMsg);
732  }
733 
734  auto catalogRemoveSetEnd = std::chrono::high_resolution_clock::now();
735  bool res = true;
736  Handle<SimpleRequestResult> response = makeObject<SimpleRequestResult>(res, errMsg);
737  res = sendUsingMe->sendObject(response, errMsg);
738 
739 
740  PDB_COUT << "Time Duration for catalog get nodes info:\t "
741  << std::chrono::duration_cast<std::chrono::duration<float>>(
742  catalogGetNodesEnd - begin)
743  .count()
744  << " secs." << std::endl;
745  PDB_COUT << "Time Duration for storage removing set:\t "
746  << std::chrono::duration_cast<std::chrono::duration<float>>(
747  storageRemoveSetEnd - catalogGetNodesEnd)
748  .count()
749  << " secs." << std::endl;
750  PDB_COUT << "Time Duration for catalog removing set:\t "
751  << std::chrono::duration_cast<std::chrono::duration<float>>(
752  catalogRemoveSetEnd - storageRemoveSetEnd)
753  .count()
754  << " secs." << std::endl;
755  PDB_COUT << std::endl;
756  return make_pair(res, errMsg);
757 
758  }));
759 
760  // JiaNote: Below handler is to process DistributedStorageCleanup message, this handler is to
761  // write back records on all slaves
762  forMe.registerHandler(
763  DistributedStorageCleanup_TYPEID,
765 
766  [&](Handle<DistributedStorageCleanup> request, PDBCommunicatorPtr sendUsingMe) {
767  const UseTemporaryAllocationBlock tempBlock{1 * 1024 * 1024};
768  PDB_COUT << "received DistributedStorageCleanup" << std::endl;
769  std::string errMsg;
770  mutex lock;
771  auto successfulNodes = std::vector<std::string>();
772  auto failureNodes = std::vector<std::string>();
773 
774  std::vector<std::string> allNodes;
775  getFunctionality<DispatcherServer>().waitAllRequestsProcessed();
776  std::cout << "All data requests have been served" << std::endl;
777  const auto nodes = getFunctionality<ResourceManagerServer>().getAllNodes();
778  for (int i = 0; i < nodes->size(); i++) {
779  std::string address = static_cast<std::string>((*nodes)[i]->getAddress());
780  std::string port = std::to_string((*nodes)[i]->getPort());
781  allNodes.push_back(address + ":" + port);
782  }
783 
784  Handle<StorageCleanup> storageCmd = makeObject<StorageCleanup>();
785 
786  getFunctionality<DistributedStorageManagerServer>()
787  .broadcast<StorageCleanup, Object, SimpleRequestResult>(
788  storageCmd,
789  nullptr,
790  allNodes,
791  generateAckHandler(successfulNodes, failureNodes, lock));
792 
793  bool res = true;
794  if (failureNodes.size() > 0) {
795  res = false;
796  errMsg = "";
797  for (int i = 0; i < failureNodes.size(); i++) {
798  errMsg += failureNodes[i] + std::string(";");
799  }
800  }
801  Handle<SimpleRequestResult> response = makeObject<SimpleRequestResult>(res, errMsg);
802  res = sendUsingMe->sendObject(response, errMsg);
803  return make_pair(res, errMsg);
804 
805 
806  }
807 
808  ));
809 
810  // JiaNote: Below handler is to process DistributedStorageExportSet message, this handler is to
811  // write back records on all slaves
812  forMe.registerHandler(
813  DistributedStorageExportSet_TYPEID,
815 
816  [&](Handle<DistributedStorageExportSet> request, PDBCommunicatorPtr sendUsingMe) {
817  const UseTemporaryAllocationBlock tempBlock{1 * 1024 * 1024};
818  PDB_COUT << "received DistributedStorageExportSet" << std::endl;
819  std::string errMsg;
820  mutex lock;
821  auto successfulNodes = std::vector<std::string>();
822  auto failureNodes = std::vector<std::string>();
823 
824  std::vector<std::string> allNodes;
825  const auto nodes = getFunctionality<ResourceManagerServer>().getAllNodes();
826  for (int i = 0; i < nodes->size(); i++) {
827  std::string address = static_cast<std::string>((*nodes)[i]->getAddress());
828  std::string port = std::to_string((*nodes)[i]->getPort());
829  allNodes.push_back(address + ":" + port);
830  }
831 
832  Handle<StorageExportSet> storageCmd =
833  makeObject<StorageExportSet>(request->getDbName(),
834  request->getSetName(),
835  request->getOutputFilePath(),
836  request->getFormat());
837 
838  getFunctionality<DistributedStorageManagerServer>()
839  .broadcast<StorageExportSet, Object, SimpleRequestResult>(
840  storageCmd,
841  nullptr,
842  allNodes,
843  generateAckHandler(successfulNodes, failureNodes, lock));
844 
845  bool res = true;
846  if (failureNodes.size() > 0) {
847  res = false;
848  errMsg = "";
849  for (int i = 0; i < failureNodes.size(); i++) {
850  errMsg += failureNodes[i] + std::string(";");
851  }
852  }
853  Handle<SimpleRequestResult> response = makeObject<SimpleRequestResult>(res, errMsg);
854  res = sendUsingMe->sendObject(response, errMsg);
855  return make_pair(res, errMsg);
856 
857 
858  }
859 
860  ));
861 
862 
863  // JiaNote: Below handler is to process SetScan message
864  forMe.registerHandler(
865  SetScan_TYPEID,
866  make_shared<SimpleRequestHandler<SetScan>>([&](Handle<SetScan> request,
867  PDBCommunicatorPtr sendUsingMe) {
868  const UseTemporaryAllocationBlock tempBlock{8 * 1024 * 1024};
869  std::string errMsg;
870  bool success;
871  std::string dbName = request->getDatabase();
872  std::string setName = request->getSetName();
873  PDB_COUT << "DistributedStorageManager received SetScan message: dbName =" << dbName
874  << ", setName =" << setName << std::endl;
875 
876  // to check whether set exists
877  /*
878  std :: string fullSetName = dbName + "." + setName;
879  std :: string value;
880  int catalogType = PDBCatalogMsgType::CatalogPDBSet;
881  if (getFunctionality<CatalogServer>().getCatalog()->keyIsFound(catalogType, fullSetName,
882  value)) {
883  PDB_COUT << "Set " << fullSetName << " exists" << std :: endl;
884  } else {
885  errMsg = "Error in handling SetScan message: Set does not exist";
886  std :: cout << errMsg << std :: endl;
887  return make_pair(false, errMsg);
888  }
889  */
890  // to get all nodes having data for this set
891  std::vector<std::string> nodesToBroadcast;
892 #ifndef USING_ALL_NODES
893  if (!getFunctionality<DistributedStorageManagerServer>().findNodesContainingSet(
894  dbName, setName, nodesToBroadcast, errMsg)) {
895  errMsg = "Error in handling SetScan message: Could not find nodes for this set";
896  std::cout << errMsg << std::endl;
897  return make_pair(false, errMsg);
898  }
899 #else
900  std::vector<std::string> allNodes;
901  const auto nodes = getFunctionality<ResourceManagerServer>().getAllNodes();
902  for (int i = 0; i < nodes->size(); i++) {
903  std::string address = static_cast<std::string>((*nodes)[i]->getAddress());
904  std::string port = std::to_string((*nodes)[i]->getPort());
905  allNodes.push_back(address + ":" + port);
906  }
907  nodesToBroadcast = allNodes;
908 
909 #endif
910  PDB_COUT << "num nodes for this set" << nodesToBroadcast.size() << std::endl;
911 
912  // to send SetScan message to slave servers iteratively
913 
914  char* curPage = nullptr;
915  Handle<KeepGoing> temp;
916  bool keepGoingSent = false;
917  for (int i = 0; i < nodesToBroadcast.size(); i++) {
918  std::string serverName = nodesToBroadcast[i];
919  int port;
920  std::string address;
921  size_t pos = serverName.find(":");
922  if (pos != string::npos) {
923  port = stoi(serverName.substr(pos + 1, serverName.size()));
924  address = serverName.substr(0, pos);
925  } else {
926  if (conf != nullptr) {
927  port = conf->getPort();
928  } else {
929  port = 8108;
930  }
931  address = serverName;
932  }
933 
934 
935  PDB_COUT << "to collect data from the " << i
936  << "-th server with address=" << address << " and port=" << port
937  << std::endl;
938  Handle<SetScan> newRequest =
939  makeObject<SetScan>(request->getDatabase(), request->getSetName());
940 
941  PDB_COUT << "to connect to the remote node" << std::endl;
942  PDBCommunicatorPtr communicator = std::make_shared<PDBCommunicator>();
943 
944  PDB_COUT << "port:" << port << std::endl;
945  PDB_COUT << "ip address:" << address << std::endl;
946 
947  if (communicator->connectToInternetServer(logger, port, address, errMsg)) {
948  success = false;
949  std::cout << errMsg << std::endl;
950  break;
951  }
952 
953  if (!communicator->sendObject(newRequest, errMsg)) {
954  success = false;
955  std::cout << errMsg << std::endl;
956  break;
957  }
958  std::cout << "sent SetScan object to " << address << std::endl;
959  while (true) {
960  if (curPage != nullptr) {
961  free(curPage);
962  curPage = nullptr;
963  if (keepGoingSent == false) {
964  if (sendUsingMe->getObjectTypeID() != DoneWithResult_TYPEID) {
965  Handle<KeepGoing> temp =
966  sendUsingMe->getNextObject<KeepGoing>(success, errMsg);
967  if (!success) {
968  // std :: cout << "DistributedStorageMangerServer: Problem
969  // getting keep going from client: "<< errMsg << std :: endl;
970  communicator = nullptr;
971  break;
972  }
973  // std :: cout << "got keep going" << std :: endl;
974  if (!communicator->sendObject(temp, errMsg)) {
975  std::cout << "Problem forwarding keep going: " << errMsg
976  << std::endl;
977  communicator = nullptr;
978  break;
979  }
980  // std :: cout << "sent keep going" << std :: endl;
981  keepGoingSent = true;
982  } else {
983  Handle<DoneWithResult> doneMsg =
984  sendUsingMe->getNextObject<DoneWithResult>(success, errMsg);
985  if (!success) {
986  std::cout
987  << "Problem getting done message from client: " << errMsg
988  << std::endl;
989  communicator = nullptr;
990  return std::make_pair(false, errMsg);
991  }
992  // std :: cout << "got done from this client!" << std :: endl;
993  if (!communicator->sendObject(doneMsg, errMsg)) {
994  std::cout << "Problem forwarding done message: " << errMsg
995  << std::endl;
996  communicator = nullptr;
997  return std::make_pair(false, errMsg);
998  }
999  // std :: cout << "sent done message!" << std :: endl;
1000  return std::make_pair(true, errMsg);
1001  }
1002  }
1003  }
1004  size_t objSize = communicator->getSizeOfNextObject();
1005  // std :: cout << "Distributed storage to receive size " << objSize << std ::
1006  // endl;
1007  if (communicator->getObjectTypeID() == DoneWithResult_TYPEID) {
1008  PDB_COUT << "got done from this slave!" << std::endl;
1009  communicator = nullptr;
1010  break;
1011  }
1012  curPage = (char*)malloc(objSize);
1013  if (!communicator->receiveBytes(curPage, errMsg)) {
1014  std::cout << "Problem getting data from slave: " << errMsg << std::endl;
1015  communicator = nullptr;
1016  break;
1017  }
1018  if (!sendUsingMe->sendBytes(curPage, objSize, errMsg)) {
1019  std::cout << "Problem forwarding data to client: " << errMsg << std::endl;
1020  communicator = nullptr;
1021  break;
1022  }
1023  // std :: cout << "sent data to client!" << std :: endl;
1024  keepGoingSent = false;
1025  }
1026  }
1027  Handle<DoneWithResult> doneWithResult = makeObject<DoneWithResult>();
1028  if (!sendUsingMe->sendObject(doneWithResult, errMsg)) {
1029  std::cout << "Problem sending done message to client: " << errMsg << std::endl;
1030  return std::make_pair(false, "could not send done message: " + errMsg);
1031  }
1032  PDB_COUT << "sent done message to client!" << std::endl;
1033  return std::make_pair(true, errMsg);
1034 
1035  }));
1036 }
1037 
1038 std::function<void(Handle<SimpleRequestResult>, std::string)>
1040  std::vector<std::string>& failures,
1041  mutex& lock) {
1042  return [&](Handle<SimpleRequestResult> response, std::string server) {
1043  lock.lock();
1044 
1045  // TODO: Better error handling
1046 
1047  if (!response->getRes().first) {
1048  PDB_COUT << "BROADCAST CALLBACK FAIL: " << server << ": " << response->getRes().first
1049  << " : " << response->getRes().second << std::endl;
1050  failures.push_back(server);
1051  } else {
1052  PDB_COUT << "BROADCAST CALLBACK SUCCESS: " << server << ": " << response->getRes().first
1053  << " : " << response->getRes().second << std::endl;
1054  success.push_back(server);
1055  }
1056  lock.unlock();
1057  };
1058 }
1059 
1061  const std::string& databaseName,
1062  std::vector<std::string>& nodesForDatabase,
1063  std::string& errMsg) {
1064 
1065  auto takenNodes = std::vector<std::string>();
1066  if (!findNodesContainingDatabase(databaseName, takenNodes, errMsg)) {
1067  return false;
1068  }
1069 
1070  std::vector<std::string> allNodes = std::vector<std::string>();
1071  const auto nodes = getFunctionality<ResourceManagerServer>().getAllNodes();
1072 
1073  PDB_COUT << "findNodesForDatabase considering " << nodes->size() << " nodes" << std::endl;
1074 
1075  for (int i = 0; i < nodes->size(); i++) {
1076  std::string address = static_cast<std::string>((*nodes)[i]->getAddress());
1077  std::string port = std::to_string((*nodes)[i]->getPort());
1078  allNodes.push_back(address + ":" + port);
1079  }
1080 
1081  for (auto node : allNodes) {
1082  if (std::find(takenNodes.begin(), takenNodes.end(), node) == takenNodes.end()) {
1083  nodesForDatabase.push_back(node);
1084  }
1085  }
1086  return true;
1087 }
1088 
1090  const std::string& databaseName,
1091  std::vector<std::string>& nodesForDatabase,
1092  std::string& errMsg) {
1093  const UseTemporaryAllocationBlock tempBlock{4 * 1024 * 1024};
1094  PDB_COUT << "findNodesContainingDatabase:" << std::endl;
1096  makeObject<Vector<CatalogDatabaseMetadata>>();
1097 
1098  getFunctionality<CatalogServer>().getCatalog()->getListOfDatabases(returnValues, databaseName);
1099 
1100  if (returnValues->size() != 1) {
1101  errMsg = "Could not find metadata for database: " + databaseName;
1102  std::cout << errMsg;
1103  return false;
1104  } else {
1105  auto nodesInDB = (*returnValues)[0].getNodesInDB();
1106  for (auto const& node : (*(*returnValues)[0].getNodesInDB())) {
1107  PDB_COUT << "node: " << node.key << std::endl;
1108  nodesForDatabase.push_back(node.key);
1109  }
1110  return true;
1111  }
1112  return false;
1113 }
1114 
1115 bool DistributedStorageManagerServer::findNodesForSet(const std::string& databaseName,
1116  const std::string& setName,
1117  std::vector<std::string>& nodesForSet,
1118  std::string& errMsg) {
1119  PDB_COUT << "findNodesForSet:" << std::endl;
1120  auto nodesInDatabase = std::vector<std::string>();
1121  if (!findNodesContainingDatabase(databaseName, nodesInDatabase, errMsg)) {
1122  return false;
1123  }
1124 
1125  auto nodesContainingSet = std::vector<std::string>();
1126  if (!findNodesContainingSet(databaseName, setName, nodesContainingSet, errMsg)) {
1127  return false;
1128  }
1129 
1130  for (auto node : nodesInDatabase) {
1131  if (std::find(nodesContainingSet.begin(), nodesContainingSet.end(), node) ==
1132  nodesContainingSet.end()) {
1133  PDB_COUT << "node: " << node << std::endl;
1134  nodesForSet.push_back(node);
1135  }
1136  }
1137  PDB_COUT << "findNodesForSet return nodes size:" << nodesForSet.size() << std::endl;
1138  return true;
1139 }
1140 
1142  const std::string& databaseName,
1143  const std::string& setName,
1144  std::vector<std::string>& nodesContainingSet,
1145  std::string& errMsg) {
1146  const UseTemporaryAllocationBlock tempBlock{4 * 1024 * 1024};
1147  std::string fullSetName = databaseName + "." + setName;
1149  makeObject<Vector<CatalogDatabaseMetadata>>();
1150 
1151  getFunctionality<CatalogServer>().getCatalog()->getListOfDatabases(returnValues, databaseName);
1152 
1153  if (returnValues->size() != 1) {
1154  errMsg = "Could not find metadata for database: " + databaseName;
1155  return false;
1156  } else {
1157  bool setFound = false;
1158  auto listOfSets = (*returnValues)[0].getListOfSets();
1159  for (int i = 0; i < listOfSets->size(); i++) {
1160  if ((*listOfSets)[i] == setName) {
1161  setFound = true;
1162  break;
1163  }
1164  }
1165  if (!setFound) {
1166  errMsg = "Set " + fullSetName + " does not exist in database " + databaseName;
1167  return false;
1168  }
1169  auto setsInDB = (*returnValues)[0].getSetsInDB();
1170  for (auto& kv : (*setsInDB)) {
1171  std::cout << kv.key << std::endl;
1172  }
1173  String pdbSetName = String(setName);
1174  PDB_COUT << "pdbSetName=" << pdbSetName << std::endl;
1175  if (setsInDB->count(pdbSetName) == 0) {
1176  // The set is currently contained in no nodes
1177  PDB_COUT << "set is not in map" << std::endl;
1178  return true;
1179  }
1180  PDB_COUT << "set is in map" << std::endl;
1181  auto nodes = (*setsInDB)[pdbSetName];
1182  for (int i = 0; i < nodes.size(); i++) {
1183  PDB_COUT << i << ":" << nodes[i] << std::endl;
1184  nodesContainingSet.push_back(nodes[i]);
1185  }
1186  PDB_COUT << "findNodesContainingSet return nodes size:" << nodesContainingSet.size()
1187  << std::endl;
1188  return true;
1189  }
1190  errMsg = "Database not found " + databaseName;
1191  return false;
1192 }
1193 }
1194 
1195 #endif
std::shared_ptr< Statistics > StatisticsPtr
Definition: Statistics.h:27
std::function< void(Handle< SimpleRequestResult >, std::string)> generateAckHandler(std::vector< std::string > &success, std::vector< std::string > &failures, mutex &lock)
bool findNodesContainingSet(const std::string &databaseName, const std::string &setName, std::vector< std::string > &nodesContainingSet, std::string &errMsg)
bool findNodesForDatabase(const std::string &databaseName, std::vector< std::string > &nodesForDatabase, std::string &errMsg)
bool findNodesContainingDatabase(const std::string &databaseName, std::vector< std::string > &nodesContainingDatabase, std::string &errMsg)
std::shared_ptr< PDBCommunicator > PDBCommunicatorPtr
#define PDB_COUT
Definition: PDBDebug.h:31
shared_ptr< Configuration > ConfigurationPtr
Definition: Configuration.h:89
DistributedStorageManagerServer(PDBLoggerPtr logger, ConfigurationPtr conf, std::shared_ptr< StatisticsDB > statisticsDB)
void registerHandler(int16_t typeID, PDBCommWorkPtr handledBy)
Definition: PDBServer.cc:81
static int16_t getIDByName(std::string objectName, bool withLock=true)
Definition: VTableMap.cc:90
std::shared_ptr< PDBLogger > PDBLoggerPtr
Definition: PDBLogger.h:40
ConfigurationPtr conf
bool findNodesForSet(const std::string &databaseName, const std::string &setName, std::vector< std::string > &nodesContainingSet, std::string &errMsg)