18 #ifndef OBJECTQUERYMODEL_DISTRIBUTEDSTORAGEMANAGERSERVER_CC
19 #define OBJECTQUERYMODEL_DISTRIBUTEDSTORAGEMANAGERSERVER_CC
62 #define USING_ALL_NODES
68 std::shared_ptr<StatisticsDB> statisticsDB)
75 std::shared_ptr<StatisticsDB> statisticsDB)
91 DistributedStorageAddDatabase_TYPEID,
96 std::string database = request->getDatabase();
100 if (getFunctionality<CatalogServer>().getCatalog()->keyIsFound(
101 catalogType, database, value)) {
102 PDB_COUT <<
"Database " << database <<
" already exists " << std::endl;
104 PDB_COUT <<
"Database " << database <<
" does not exist" << std::endl;
105 if (!getFunctionality<CatalogClient>().createDatabase(database, errMsg)) {
106 std::cout <<
"Could not register db, because: " << errMsg << std::endl;
108 makeObject<SimpleRequestResult>(
false, errMsg);
109 bool res = sendUsingMe->sendObject(response, errMsg);
110 return make_pair(res, errMsg);
115 auto successfulNodes = std::vector<std::string>();
116 auto failureNodes = std::vector<std::string>();
117 auto nodesToBroadcastTo = std::vector<std::string>();
119 #ifndef USING_ALL_NODES
120 if (!getFunctionality<DistributedStorageManagerServer>().findNodesForDatabase(
121 database, nodesToBroadcastTo, errMsg)) {
122 PDB_COUT <<
"Could not find nodes to broadcast database to: " << errMsg
125 makeObject<SimpleRequestResult>(
false, errMsg);
126 bool res = sendUsingMe->sendObject(response, errMsg);
127 return make_pair(res, errMsg);
130 std::vector<std::string> allNodes;
131 const auto nodes = getFunctionality<ResourceManagerServer>().getAllNodes();
132 for (
int i = 0; i < nodes->size(); i++) {
133 std::string address =
static_cast<std::string
>((*nodes)[i]->getAddress());
134 std::string port = std::to_string((*nodes)[i]->getPort());
135 allNodes.push_back(address +
":" + port);
137 nodesToBroadcastTo = allNodes;
141 makeObject<StorageAddDatabase>(request->getDatabase());
142 getFunctionality<DistributedStorageManagerServer>()
143 .broadcast<StorageAddDatabase, Object, SimpleRequestResult>(
148 [&](std::string errMsg, std::string serverName) {
150 std::cout <<
"Server " << serverName <<
" received an error: " << errMsg
152 failureNodes.push_back(serverName);
157 for (
auto node : successfulNodes) {
158 if (!getFunctionality<CatalogClient>().addNodeToDB(
159 node, request->getDatabase(), errMsg)) {
161 std::cout <<
"Failed to register node " << node <<
" for database "
162 << request->getDatabase() <<
" in Catalog" << std::endl;
167 res = sendUsingMe->sendObject(response, errMsg);
168 return make_pair(res, errMsg);
171 DistributedStorageClearSet_TYPEID,
175 std::cout <<
"received DistributedStorageClearSet message" << std::endl;
180 auto successfulNodes = std::vector<std::string>();
181 auto failureNodes = std::vector<std::string>();
182 auto nodesToBroadcast = std::vector<std::string>();
184 std::string database = request->getDatabase();
185 std::string set = request->getSetName();
186 std::string fullSetName = database +
"." + set;
187 PDB_COUT <<
"set to clear is " << fullSetName << std::endl;
191 if (getFunctionality<CatalogServer>().getCatalog()->keyIsFound(
192 catalogType, fullSetName, value)) {
193 std::cout <<
"Set " << fullSetName <<
" already exists " << std::endl;
195 #ifndef USING_ALL_NODES
196 if (!getFunctionality<DistributedStorageManagerServer>().
findNodesForSet(
197 database, set, nodesToBroadcast, errMsg)) {
198 std::cout <<
"Could not find nodes to broadcast set to: " << errMsg
201 makeObject<SimpleRequestResult>(
false, errMsg);
202 bool res = sendUsingMe->sendObject(response, errMsg);
203 return make_pair(res, errMsg);
206 std::vector<std::string> allNodes;
207 const auto nodes = getFunctionality<ResourceManagerServer>().getAllNodes();
208 for (
int i = 0; i < nodes->size(); i++) {
209 std::string address =
static_cast<std::string
>((*nodes)[i]->getAddress());
210 std::string port = std::to_string((*nodes)[i]->getPort());
211 allNodes.push_back(address +
":" + port);
213 nodesToBroadcast = allNodes;
216 request->getDatabase(), request->getSetName(), request->getTypeName());
219 getFunctionality<DistributedStorageManagerServer>()
220 .broadcast<StorageClearSet, Object, SimpleRequestResult>(
227 PDB_COUT <<
"Set " << fullSetName <<
" does not exist" << std::endl;
229 errMsg = std::string(
"Set to clear with name=") + fullSetName +
230 std::string(
" doesn't exist");
234 StatisticsPtr stats = getFunctionality<QuerySchedulerServer>().getStats();
235 if (stats ==
nullptr) {
236 getFunctionality<QuerySchedulerServer>().collectStats();
237 stats = getFunctionality<QuerySchedulerServer>().getStats();
239 stats->setNumPages(request->getDatabase(), request->getSetName(), 0);
240 stats->setNumBytes(request->getDatabase(), request->getSetName(), 0);
243 res = sendUsingMe->sendObject(response, errMsg);
244 return make_pair(res, errMsg);
249 DistributedStorageAddTempSet_TYPEID,
253 auto begin = std::chrono::high_resolution_clock::now();
255 PDB_COUT <<
"received DistributedStorageAddTempSet message" << std::endl;
259 auto successfulNodes = std::vector<std::string>();
260 auto failureNodes = std::vector<std::string>();
261 auto nodesToBroadcast = std::vector<std::string>();
263 std::string set = request->getSetName();
266 std::vector<std::string> allNodes;
267 const auto nodes = getFunctionality<ResourceManagerServer>().getAllNodes();
268 for (
int i = 0; i < nodes->size(); i++) {
269 std::string address =
static_cast<std::string
>((*nodes)[i]->getAddress());
270 std::string port = std::to_string((*nodes)[i]->getPort());
271 allNodes.push_back(address +
":" + port);
273 nodesToBroadcast = allNodes;
276 request->getSetName(),
277 request->getTypeName(),
278 request->getPageSize());
280 getFunctionality<DistributedStorageManagerServer>()
281 .broadcast<StorageAddSet, Object, SimpleRequestResult>(
287 auto storageAddSetEnd = std::chrono::high_resolution_clock::now();
288 PDB_COUT <<
"Time Duration for adding temp set:\t "
289 << std::chrono::duration_cast<std::chrono::duration<float>>(storageAddSetEnd -
292 <<
" secs." << std::endl;
295 if (failureNodes.size() > 0) {
298 for (
int i = 0; i < failureNodes.size(); i++) {
299 errMsg += failureNodes[i] + std::string(
";");
303 StatisticsPtr stats = getFunctionality<QuerySchedulerServer>().getStats();
304 if (stats ==
nullptr) {
305 getFunctionality<QuerySchedulerServer>().collectStats();
306 stats = getFunctionality<QuerySchedulerServer>().getStats();
308 stats->setNumPages(
"temp", request->getSetName(), 0);
309 stats->setNumBytes(
"temp", request->getSetName(), 0);
312 res = sendUsingMe->sendObject(response, errMsg);
313 return make_pair(res, errMsg);
318 DistributedStorageAddSet_TYPEID,
322 auto begin = std::chrono::high_resolution_clock::now();
323 auto beforeCreateSet = begin;
324 auto afterCreateSet = begin;
326 PDB_COUT <<
"received DistributedStorageAddSet message" << std::endl;
330 auto successfulNodes = std::vector<std::string>();
331 auto failureNodes = std::vector<std::string>();
332 auto nodesToBroadcast = std::vector<std::string>();
334 std::string database = request->getDatabase();
335 std::string set = request->getSetName();
336 std::string fullSetName = database +
"." + set;
337 PDB_COUT <<
"set to create is " << fullSetName << std::endl;
341 if (getFunctionality<CatalogServer>().getCatalog()->keyIsFound(
342 catalogType, fullSetName, value)) {
343 std::cout <<
"Set " << fullSetName <<
" already exists " << std::endl;
345 PDB_COUT <<
"Set " << fullSetName <<
" does not exist" << std::endl;
353 return make_pair(
false,
"Could not identify type=" + request->getTypeName());
356 beforeCreateSet = std::chrono::high_resolution_clock::now();
358 if (!getFunctionality<CatalogClient>().createSet(typeId, database, set, errMsg)) {
359 std::cout <<
"Could not register set, because: " << errMsg << std::endl;
361 makeObject<SimpleRequestResult>(
false, errMsg);
362 bool res = sendUsingMe->sendObject(response, errMsg);
363 return make_pair(res, errMsg);
365 afterCreateSet = std::chrono::high_resolution_clock::now();
367 #ifndef USING_ALL_NODES
368 if (!getFunctionality<DistributedStorageManagerServer>().
findNodesForSet(
369 database, set, nodesToBroadcast, errMsg)) {
370 std::cout <<
"Could not find nodes to broadcast set to: " << errMsg << std::endl;
372 makeObject<SimpleRequestResult>(
false, errMsg);
373 bool res = sendUsingMe->sendObject(response, errMsg);
374 return make_pair(res, errMsg);
377 std::vector<std::string> allNodes;
378 const auto nodes = getFunctionality<ResourceManagerServer>().getAllNodes();
379 for (
int i = 0; i < nodes->size(); i++) {
380 std::string address =
static_cast<std::string
>((*nodes)[i]->getAddress());
381 std::string port = std::to_string((*nodes)[i]->getPort());
382 allNodes.push_back(address +
":" + port);
384 nodesToBroadcast = allNodes;
386 auto catalogGetNodesEnd = std::chrono::high_resolution_clock::now();
389 request->getSetName(),
390 request->getTypeName(),
391 request->getPageSize());
394 getFunctionality<DistributedStorageManagerServer>()
395 .broadcast<StorageAddSet, Object, SimpleRequestResult>(
401 auto storageAddSetEnd = std::chrono::high_resolution_clock::now();
404 for (
auto node : successfulNodes) {
405 if (!getFunctionality<CatalogClient>().addNodeToSet(node, database, set, errMsg)) {
406 std::cout <<
"Failed to register node " << node <<
" for set " << fullSetName
407 <<
" in Catalog" << std::endl;
411 if (failureNodes.size() > 0) {
415 StatisticsPtr stats = getFunctionality<QuerySchedulerServer>().getStats();
416 if (stats ==
nullptr) {
417 getFunctionality<QuerySchedulerServer>().collectStats();
418 stats = getFunctionality<QuerySchedulerServer>().getStats();
420 stats->setNumPages(request->getDatabase(), request->getSetName(), 0);
421 stats->setNumBytes(request->getDatabase(), request->getSetName(), 0);
426 request->getSetName(),
429 request->getTypeName(),
431 request->getPageSize(),
433 std::cout <<
"created data in statistics database with id = " <<
id << std::endl;
435 auto catalogAddSetEnd = std::chrono::high_resolution_clock::now();
437 PDB_COUT <<
"Time Duration for catalog create set Metadata:\t "
438 << std::chrono::duration_cast<std::chrono::duration<float>>(afterCreateSet -
441 <<
" secs." << std::endl;
442 PDB_COUT <<
"Time Duration for catalog getting nodes:\t "
443 << std::chrono::duration_cast<std::chrono::duration<float>>(
444 catalogGetNodesEnd - afterCreateSet)
446 <<
" secs." << std::endl;
447 PDB_COUT <<
"Time Duration for storage adding set:\t "
448 << std::chrono::duration_cast<std::chrono::duration<float>>(storageAddSetEnd -
451 <<
" secs." << std::endl;
452 PDB_COUT <<
"Time Duration for catalog adding addNodeToSet metadata:\t "
453 << std::chrono::duration_cast<std::chrono::duration<float>>(catalogAddSetEnd -
456 <<
" secs." << std::endl;
460 res = sendUsingMe->sendObject(response, errMsg);
461 return make_pair(res, errMsg);
465 DistributedStorageRemoveDatabase_TYPEID,
471 std::vector<std::string> successfulNodes = std::vector<std::string>();
472 std::vector<std::string> failureNodes = std::vector<std::string>();
474 std::string database = request->getDatabase();
478 if (!getFunctionality<CatalogServer>().getCatalog()->keyIsFound(
479 catalogType, database, value)) {
480 errMsg =
"Cannot delete database, database " + database +
" does not exist\n";
482 makeObject<SimpleRequestResult>(
false, errMsg);
483 bool res = sendUsingMe->sendObject(response, errMsg);
484 return make_pair(res, errMsg);
487 auto nodesToBroadcastTo = std::vector<std::string>();
489 #ifndef USING_ALL_NODES
490 if (!getFunctionality<DistributedStorageManagerServer>().findNodesContainingDatabase(
491 database, nodesToBroadcastTo, errMsg)) {
492 std::cout <<
"Could not find nodes to broadcast database delete to " << errMsg
495 makeObject<SimpleRequestResult>(
false, errMsg);
496 bool res = sendUsingMe->sendObject(response, errMsg);
497 return make_pair(res, errMsg);
500 std::vector<std::string> allNodes;
501 const auto nodes = getFunctionality<ResourceManagerServer>().getAllNodes();
502 for (
int i = 0; i < nodes->size(); i++) {
503 std::string address =
static_cast<std::string
>((*nodes)[i]->getAddress());
504 std::string port = std::to_string((*nodes)[i]->getPort());
505 allNodes.push_back(address +
":" + port);
507 nodesToBroadcastTo = allNodes;
510 getFunctionality<DistributedStorageManagerServer>()
511 .broadcast<StorageRemoveDatabase, Object, SimpleRequestResult>(
517 if (failureNodes.size() == 0) {
518 PDB_COUT <<
"Successfully deleted database on " << successfulNodes.size()
519 <<
" nodes" << std::endl;
521 errMsg =
"Failed to delete database on " + std::to_string(failureNodes.size()) +
522 " nodes. Skipping registering with catalog";
525 makeObject<SimpleRequestResult>(
false, errMsg);
526 bool res = sendUsingMe->sendObject(response, errMsg);
527 return make_pair(res, errMsg);
530 if (!getFunctionality<CatalogClient>().deleteDatabase(database, errMsg)) {
531 std::cout <<
"Could not delete database, because: " << errMsg << std::endl;
533 makeObject<SimpleRequestResult>(
false, errMsg);
534 bool res = sendUsingMe->sendObject(response, errMsg);
535 return make_pair(res, errMsg);
541 res = sendUsingMe->sendObject(response, errMsg);
542 return make_pair(res, errMsg);
546 DistributedStorageRemoveTempSet_TYPEID,
550 auto begin = std::chrono::high_resolution_clock::now();
554 auto successfulNodes = std::vector<std::string>();
555 auto failureNodes = std::vector<std::string>();
556 auto nodesToBroadcast = std::vector<std::string>();
558 std::string database = request->getDatabase();
559 std::string set = request->getSetName();
560 std::string fullSetName = database +
"." + set;
561 std::vector<std::string> allNodes;
562 const auto nodes = getFunctionality<ResourceManagerServer>().getAllNodes();
563 for (
int i = 0; i < nodes->size(); i++) {
564 std::string address =
static_cast<std::string
>((*nodes)[i]->getAddress());
565 std::string port = std::to_string((*nodes)[i]->getPort());
566 allNodes.push_back(address +
":" + port);
568 nodesToBroadcast = allNodes;
570 PDB_COUT <<
"to broadcast StorageRemoveTempSet" << std::endl;
572 request->getDatabase(), request->getSetName(), request->getTypeName());
573 getFunctionality<DistributedStorageManagerServer>()
574 .broadcast<StorageRemoveUserSet, Object, SimpleRequestResult>(
580 auto storageRemoveSetEnd = std::chrono::high_resolution_clock::now();
582 PDB_COUT <<
"Time Duration for storage removing set:\t "
583 << std::chrono::duration_cast<std::chrono::duration<float>>(
584 storageRemoveSetEnd - begin)
586 <<
" secs." << std::endl;
589 if (failureNodes.size() > 0) {
592 for (
int i = 0; i < failureNodes.size(); i++) {
593 errMsg += failureNodes[i] + std::string(
";");
597 StatisticsPtr stats = getFunctionality<QuerySchedulerServer>().getStats();
598 if (stats ==
nullptr) {
599 getFunctionality<QuerySchedulerServer>().collectStats();
600 stats = getFunctionality<QuerySchedulerServer>().getStats();
602 stats->removeSet(request->getDatabase(), request->getSetName());
605 res = sendUsingMe->sendObject(response, errMsg);
606 return make_pair(res, errMsg);
611 DistributedStorageRemoveSet_TYPEID,
616 auto begin = std::chrono::high_resolution_clock::now();
620 auto successfulNodes = std::vector<std::string>();
621 auto failureNodes = std::vector<std::string>();
622 auto nodesToBroadcast = std::vector<std::string>();
624 std::string database = request->getDatabase();
625 std::string set = request->getSetName();
626 std::string fullSetName = database +
"." + set;
629 makeObject<pdb::Vector<CatalogSetMetadata>>();
631 std::string typeName;
633 getFunctionality<CatalogServer>().getCatalog()->getListOfSets(returnValues,
636 if (returnValues->size() == 0) {
637 std::cout <<
"Cannot remove set, Set " << fullSetName <<
" does not exist "
640 makeObject<SimpleRequestResult>(
false, errMsg);
641 bool res = sendUsingMe->sendObject(response, errMsg);
642 return make_pair(res, errMsg);
644 typeName = (*returnValues)[0].getObjectTypeName();
647 #ifndef USING_ALL_NODES
649 database, set, nodesToBroadcast, errMsg)) {
650 std::cout <<
"Could not find nodes to broadcast set to: " << errMsg << std::endl;
652 makeObject<SimpleRequestResult>(
false, errMsg);
653 bool res = sendUsingMe->sendObject(response, errMsg);
654 return make_pair(res, errMsg);
657 std::vector<std::string> allNodes;
658 const auto nodes = getFunctionality<ResourceManagerServer>().getAllNodes();
659 for (
int i = 0; i < nodes->size(); i++) {
660 std::string address =
static_cast<std::string
>((*nodes)[i]->getAddress());
661 std::string port = std::to_string((*nodes)[i]->getPort());
662 allNodes.push_back(address +
":" + port);
664 nodesToBroadcast = allNodes;
667 auto catalogGetNodesEnd = std::chrono::high_resolution_clock::now();
668 PDB_COUT <<
"to broadcast StorageRemoveUserSet" << std::endl;
670 request->getDatabase(), request->getSetName(), typeName);
671 getFunctionality<DistributedStorageManagerServer>()
672 .broadcast<StorageRemoveUserSet, Object, SimpleRequestResult>(
678 if (failureNodes.size() == 0) {
679 PDB_COUT <<
"Successfully deleted set on " << successfulNodes.size() <<
" nodes"
682 errMsg =
"Failed to delete set on " + std::to_string(failureNodes.size()) +
683 " nodes. Skipping registering with catalog";
684 std::cout << errMsg << std::endl;
686 makeObject<SimpleRequestResult>(
false, errMsg);
687 bool res = sendUsingMe->sendObject(response, errMsg);
688 return make_pair(res, errMsg);
691 auto storageRemoveSetEnd = std::chrono::high_resolution_clock::now();
694 StatisticsPtr stats = getFunctionality<QuerySchedulerServer>().getStats();
695 if (stats ==
nullptr) {
696 getFunctionality<QuerySchedulerServer>().collectStats();
697 stats = getFunctionality<QuerySchedulerServer>().getStats();
699 stats->removeSet(request->getDatabase(), request->getSetName());
701 if (failureNodes.size() == 0) {
705 PDB_COUT <<
"Succeeded in deleting set " << fullSetName <<
" on all nodes"
707 if (!getFunctionality<CatalogClient>().deleteSet(database, set, errMsg)) {
708 std::cout <<
"Could not delete set, because: " << errMsg << std::endl;
710 makeObject<SimpleRequestResult>(
false, errMsg);
711 bool res = sendUsingMe->sendObject(response, errMsg);
712 return make_pair(res, errMsg);
719 errMsg =
"Nodes failed to remove set " + fullSetName +
": ";
720 for (
auto node : successfulNodes) {
721 if (!getFunctionality<CatalogClient>().removeNodeFromSet(
722 node, database, set, errMsg)) {
723 errMsg += node +
", ";
724 std::cout << errMsg << std::endl;
730 sendUsingMe->sendObject(response, errMsg);
731 return make_pair(
false, errMsg);
734 auto catalogRemoveSetEnd = std::chrono::high_resolution_clock::now();
737 res = sendUsingMe->sendObject(response, errMsg);
740 PDB_COUT <<
"Time Duration for catalog get nodes info:\t "
741 << std::chrono::duration_cast<std::chrono::duration<float>>(
742 catalogGetNodesEnd - begin)
744 <<
" secs." << std::endl;
745 PDB_COUT <<
"Time Duration for storage removing set:\t "
746 << std::chrono::duration_cast<std::chrono::duration<float>>(
747 storageRemoveSetEnd - catalogGetNodesEnd)
749 <<
" secs." << std::endl;
750 PDB_COUT <<
"Time Duration for catalog removing set:\t "
751 << std::chrono::duration_cast<std::chrono::duration<float>>(
752 catalogRemoveSetEnd - storageRemoveSetEnd)
754 <<
" secs." << std::endl;
756 return make_pair(res, errMsg);
763 DistributedStorageCleanup_TYPEID,
768 PDB_COUT <<
"received DistributedStorageCleanup" << std::endl;
771 auto successfulNodes = std::vector<std::string>();
772 auto failureNodes = std::vector<std::string>();
774 std::vector<std::string> allNodes;
775 getFunctionality<DispatcherServer>().waitAllRequestsProcessed();
776 std::cout <<
"All data requests have been served" << std::endl;
777 const auto nodes = getFunctionality<ResourceManagerServer>().getAllNodes();
778 for (
int i = 0; i < nodes->size(); i++) {
779 std::string address =
static_cast<std::string
>((*nodes)[i]->getAddress());
780 std::string port = std::to_string((*nodes)[i]->getPort());
781 allNodes.push_back(address +
":" + port);
786 getFunctionality<DistributedStorageManagerServer>()
787 .broadcast<StorageCleanup, Object, SimpleRequestResult>(
794 if (failureNodes.size() > 0) {
797 for (
int i = 0; i < failureNodes.size(); i++) {
798 errMsg += failureNodes[i] + std::string(
";");
802 res = sendUsingMe->sendObject(response, errMsg);
803 return make_pair(res, errMsg);
813 DistributedStorageExportSet_TYPEID,
818 PDB_COUT <<
"received DistributedStorageExportSet" << std::endl;
821 auto successfulNodes = std::vector<std::string>();
822 auto failureNodes = std::vector<std::string>();
824 std::vector<std::string> allNodes;
825 const auto nodes = getFunctionality<ResourceManagerServer>().getAllNodes();
826 for (
int i = 0; i < nodes->size(); i++) {
827 std::string address =
static_cast<std::string
>((*nodes)[i]->getAddress());
828 std::string port = std::to_string((*nodes)[i]->getPort());
829 allNodes.push_back(address +
":" + port);
833 makeObject<StorageExportSet>(request->getDbName(),
834 request->getSetName(),
835 request->getOutputFilePath(),
836 request->getFormat());
838 getFunctionality<DistributedStorageManagerServer>()
839 .broadcast<StorageExportSet, Object, SimpleRequestResult>(
846 if (failureNodes.size() > 0) {
849 for (
int i = 0; i < failureNodes.size(); i++) {
850 errMsg += failureNodes[i] + std::string(
";");
854 res = sendUsingMe->sendObject(response, errMsg);
855 return make_pair(res, errMsg);
871 std::string dbName = request->getDatabase();
872 std::string setName = request->getSetName();
873 PDB_COUT <<
"DistributedStorageManager received SetScan message: dbName =" << dbName
874 <<
", setName =" << setName << std::endl;
891 std::vector<std::string> nodesToBroadcast;
892 #ifndef USING_ALL_NODES
894 dbName, setName, nodesToBroadcast, errMsg)) {
895 errMsg =
"Error in handling SetScan message: Could not find nodes for this set";
896 std::cout << errMsg << std::endl;
897 return make_pair(
false, errMsg);
900 std::vector<std::string> allNodes;
901 const auto nodes = getFunctionality<ResourceManagerServer>().getAllNodes();
902 for (
int i = 0; i < nodes->size(); i++) {
903 std::string address =
static_cast<std::string
>((*nodes)[i]->getAddress());
904 std::string port = std::to_string((*nodes)[i]->getPort());
905 allNodes.push_back(address +
":" + port);
907 nodesToBroadcast = allNodes;
910 PDB_COUT <<
"num nodes for this set" << nodesToBroadcast.size() << std::endl;
914 char* curPage =
nullptr;
916 bool keepGoingSent =
false;
917 for (
int i = 0; i < nodesToBroadcast.size(); i++) {
918 std::string serverName = nodesToBroadcast[i];
921 size_t pos = serverName.find(
":");
922 if (pos != string::npos) {
923 port = stoi(serverName.substr(pos + 1, serverName.size()));
924 address = serverName.substr(0, pos);
926 if (
conf !=
nullptr) {
927 port =
conf->getPort();
931 address = serverName;
935 PDB_COUT <<
"to collect data from the " << i
936 <<
"-th server with address=" << address <<
" and port=" << port
939 makeObject<SetScan>(request->getDatabase(), request->getSetName());
941 PDB_COUT <<
"to connect to the remote node" << std::endl;
944 PDB_COUT <<
"port:" << port << std::endl;
945 PDB_COUT <<
"ip address:" << address << std::endl;
947 if (communicator->connectToInternetServer(
logger, port, address, errMsg)) {
949 std::cout << errMsg << std::endl;
953 if (!communicator->sendObject(newRequest, errMsg)) {
955 std::cout << errMsg << std::endl;
958 std::cout <<
"sent SetScan object to " << address << std::endl;
960 if (curPage !=
nullptr) {
963 if (keepGoingSent ==
false) {
964 if (sendUsingMe->getObjectTypeID() != DoneWithResult_TYPEID) {
966 sendUsingMe->getNextObject<
KeepGoing>(success, errMsg);
970 communicator =
nullptr;
974 if (!communicator->sendObject(temp, errMsg)) {
975 std::cout <<
"Problem forwarding keep going: " << errMsg
977 communicator =
nullptr;
981 keepGoingSent =
true;
987 <<
"Problem getting done message from client: " << errMsg
989 communicator =
nullptr;
990 return std::make_pair(
false, errMsg);
993 if (!communicator->sendObject(doneMsg, errMsg)) {
994 std::cout <<
"Problem forwarding done message: " << errMsg
996 communicator =
nullptr;
997 return std::make_pair(
false, errMsg);
1000 return std::make_pair(
true, errMsg);
1004 size_t objSize = communicator->getSizeOfNextObject();
1007 if (communicator->getObjectTypeID() == DoneWithResult_TYPEID) {
1008 PDB_COUT <<
"got done from this slave!" << std::endl;
1009 communicator =
nullptr;
1012 curPage = (
char*)malloc(objSize);
1013 if (!communicator->receiveBytes(curPage, errMsg)) {
1014 std::cout <<
"Problem getting data from slave: " << errMsg << std::endl;
1015 communicator =
nullptr;
1018 if (!sendUsingMe->sendBytes(curPage, objSize, errMsg)) {
1019 std::cout <<
"Problem forwarding data to client: " << errMsg << std::endl;
1020 communicator =
nullptr;
1024 keepGoingSent =
false;
1028 if (!sendUsingMe->sendObject(doneWithResult, errMsg)) {
1029 std::cout <<
"Problem sending done message to client: " << errMsg << std::endl;
1030 return std::make_pair(
false,
"could not send done message: " + errMsg);
1032 PDB_COUT <<
"sent done message to client!" << std::endl;
1033 return std::make_pair(
true, errMsg);
1038 std::function<void(Handle<SimpleRequestResult>, std::string)>
1040 std::vector<std::string>& failures,
1047 if (!response->getRes().first) {
1048 PDB_COUT <<
"BROADCAST CALLBACK FAIL: " << server <<
": " << response->getRes().first
1049 <<
" : " << response->getRes().second << std::endl;
1050 failures.push_back(server);
1052 PDB_COUT <<
"BROADCAST CALLBACK SUCCESS: " << server <<
": " << response->getRes().first
1053 <<
" : " << response->getRes().second << std::endl;
1054 success.push_back(server);
1061 const std::string& databaseName,
1062 std::vector<std::string>& nodesForDatabase,
1063 std::string& errMsg) {
1065 auto takenNodes = std::vector<std::string>();
1070 std::vector<std::string> allNodes = std::vector<std::string>();
1071 const auto nodes = getFunctionality<ResourceManagerServer>().getAllNodes();
1073 PDB_COUT <<
"findNodesForDatabase considering " << nodes->size() <<
" nodes" << std::endl;
1075 for (
int i = 0; i < nodes->size(); i++) {
1076 std::string address =
static_cast<std::string
>((*nodes)[i]->getAddress());
1077 std::string port = std::to_string((*nodes)[i]->getPort());
1078 allNodes.push_back(address +
":" + port);
1081 for (
auto node : allNodes) {
1082 if (std::find(takenNodes.begin(), takenNodes.end(), node) == takenNodes.end()) {
1083 nodesForDatabase.push_back(node);
1090 const std::string& databaseName,
1091 std::vector<std::string>& nodesForDatabase,
1092 std::string& errMsg) {
1094 PDB_COUT <<
"findNodesContainingDatabase:" << std::endl;
1096 makeObject<Vector<CatalogDatabaseMetadata>>();
1098 getFunctionality<CatalogServer>().getCatalog()->getListOfDatabases(returnValues, databaseName);
1100 if (returnValues->size() != 1) {
1101 errMsg =
"Could not find metadata for database: " + databaseName;
1102 std::cout << errMsg;
1105 auto nodesInDB = (*returnValues)[0].getNodesInDB();
1106 for (
auto const& node : (*(*returnValues)[0].getNodesInDB())) {
1107 PDB_COUT <<
"node: " << node.key << std::endl;
1108 nodesForDatabase.push_back(node.key);
1116 const std::string& setName,
1117 std::vector<std::string>& nodesForSet,
1118 std::string& errMsg) {
1119 PDB_COUT <<
"findNodesForSet:" << std::endl;
1120 auto nodesInDatabase = std::vector<std::string>();
1125 auto nodesContainingSet = std::vector<std::string>();
1130 for (
auto node : nodesInDatabase) {
1131 if (std::find(nodesContainingSet.begin(), nodesContainingSet.end(), node) ==
1132 nodesContainingSet.end()) {
1133 PDB_COUT <<
"node: " << node << std::endl;
1134 nodesForSet.push_back(node);
1137 PDB_COUT <<
"findNodesForSet return nodes size:" << nodesForSet.size() << std::endl;
1142 const std::string& databaseName,
1143 const std::string& setName,
1144 std::vector<std::string>& nodesContainingSet,
1145 std::string& errMsg) {
1147 std::string fullSetName = databaseName +
"." + setName;
1149 makeObject<Vector<CatalogDatabaseMetadata>>();
1151 getFunctionality<CatalogServer>().getCatalog()->getListOfDatabases(returnValues, databaseName);
1153 if (returnValues->size() != 1) {
1154 errMsg =
"Could not find metadata for database: " + databaseName;
1157 bool setFound =
false;
1158 auto listOfSets = (*returnValues)[0].getListOfSets();
1159 for (
int i = 0; i < listOfSets->size(); i++) {
1160 if ((*listOfSets)[i] == setName) {
1166 errMsg =
"Set " + fullSetName +
" does not exist in database " + databaseName;
1169 auto setsInDB = (*returnValues)[0].getSetsInDB();
1170 for (
auto& kv : (*setsInDB)) {
1171 std::cout << kv.key << std::endl;
1174 PDB_COUT <<
"pdbSetName=" << pdbSetName << std::endl;
1175 if (setsInDB->count(pdbSetName) == 0) {
1177 PDB_COUT <<
"set is not in map" << std::endl;
1180 PDB_COUT <<
"set is in map" << std::endl;
1181 auto nodes = (*setsInDB)[pdbSetName];
1182 for (
int i = 0; i < nodes.size(); i++) {
1183 PDB_COUT << i <<
":" << nodes[i] << std::endl;
1184 nodesContainingSet.push_back(nodes[i]);
1186 PDB_COUT <<
"findNodesContainingSet return nodes size:" << nodesContainingSet.size()
1190 errMsg =
"Database not found " + databaseName;
std::shared_ptr< Statistics > StatisticsPtr
std::function< void(Handle< SimpleRequestResult >, std::string)> generateAckHandler(std::vector< std::string > &success, std::vector< std::string > &failures, mutex &lock)
bool findNodesContainingSet(const std::string &databaseName, const std::string &setName, std::vector< std::string > &nodesContainingSet, std::string &errMsg)
bool findNodesForDatabase(const std::string &databaseName, std::vector< std::string > &nodesForDatabase, std::string &errMsg)
bool findNodesContainingDatabase(const std::string &databaseName, std::vector< std::string > &nodesContainingDatabase, std::string &errMsg)
std::shared_ptr< PDBCommunicator > PDBCommunicatorPtr
shared_ptr< Configuration > ConfigurationPtr
void registerHandlers(PDBServer &forMe) override
DistributedStorageManagerServer(PDBLoggerPtr logger, ConfigurationPtr conf, std::shared_ptr< StatisticsDB > statisticsDB)
void registerHandler(int16_t typeID, PDBCommWorkPtr handledBy)
static int16_t getIDByName(std::string objectName, bool withLock=true)
std::shared_ptr< PDBLogger > PDBLoggerPtr
~DistributedStorageManagerServer()
bool findNodesForSet(const std::string &databaseName, const std::string &setName, std::vector< std::string > &nodesContainingSet, std::string &errMsg)
std::shared_ptr< StatisticsDB > statisticsDB