A platform for high-performance distributed tool and library development written in C++. It can be deployed in two different cluster modes: standalone or distributed. API for v0.5.0, released on June 13, 2018.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
CatalogServer.cc
Go to the documentation of this file.
1 /*****************************************************************************
2  * *
3  * Copyright 2018 Rice University *
4  * *
5  * Licensed under the Apache License, Version 2.0 (the "License"); *
6  * you may not use this file except in compliance with the License. *
7  * You may obtain a copy of the License at *
8  * *
9  * http://www.apache.org/licenses/LICENSE-2.0 *
10  * *
11  * Unless required by applicable law or agreed to in writing, software *
12  * distributed under the License is distributed on an "AS IS" BASIS, *
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. *
14  * See the License for the specific language governing permissions and *
15  * limitations under the License. *
16  * *
17  *****************************************************************************/
18 #include <cstddef>
19 #include <cstring>
20 #include <ctime>
21 #include <fcntl.h>
22 #include <fstream>
23 #include <iostream>
24 #include <string>
25 #include <sys/stat.h>
26 #include <sys/types.h>
27 #include <unistd.h>
28 #include <vector>
29 
30 #include "BuiltInObjectTypeIDs.h"
32 #include "CatAddNodeToSetRequest.h"
34 #include "CatCreateSetRequest.h"
36 #include "CatDeleteSetRequest.h"
37 #include "CatRegisterType.h"
43 #include "CatTypeNameSearch.h"
45 #include "CatTypeSearchResult.h"
48 #include "CatalogPrintMetadata.h"
49 #include "CatalogServer.h"
50 #include "SimpleRequestHandler.h"
51 #include "SimpleRequestResult.h"
53 
54 namespace pdb {
55 
57 
58 int16_t CatalogServer::searchForObjectTypeName(std::string objectTypeName) {
59 
60  // first search for the type name in the vTable map (in case it is built in)
61  if (VTableMap::lookupBuiltInType(objectTypeName) != -1)
62  return VTableMap::lookupBuiltInType(objectTypeName);
63 
64  // if this type's vtable pointer has been previously extracted by this process
65  // return the typeId
66  if (allTypeNames.count(objectTypeName) != 0) {
67  return allTypeNames[objectTypeName];
68  }
69 
70  // otherwise fetch it from the Manager Catalog Server only if this is a Worker
71  // Node catalog
72  if (this->isManagerCatalogServer == false) {
74  objectTypeName);
75  }
76  // otherwise returns -1
77  return -1;
78 }
79 
80 // register handlers for processing requests to the Catalog Server
82  PDB_COUT << "Catalog Server registering handlers" << endl;
83 
84  // handles a request to register metadata of a new cluster Node in the catalog
85  forMe.registerHandler(
86  CatalogNodeMetadata_TYPEID,
88  Handle<CatalogNodeMetadata> request, PDBCommunicatorPtr sendUsingMe) {
89 
90  std::string errMsg;
91 
92  PDB_COUT << "CatalogServer handler CatalogNodeMetadata_TYPEID calling "
93  "addNodeMetadata "
94  << string(request->getItemKey()) << endl;
95 
96  const UseTemporaryAllocationBlock block{1024 * 1024};
97 
98  // adds the node metadata
99  bool res =
100  getFunctionality<CatalogServer>().addNodeMetadata(request, errMsg);
101 
102  const UseTemporaryAllocationBlock tempBlock{1024};
103  Handle<SimpleRequestResult> response =
104  makeObject<SimpleRequestResult>(res, errMsg);
105 
106  // sends result to requester
107  res = sendUsingMe->sendObject(response, errMsg);
108  return make_pair(res, errMsg);
109  }));
110 
111  // handles a request to register or update metadata for a Database in the
112  // catalog
113  forMe.registerHandler(
114  CatalogDatabaseMetadata_TYPEID,
117  PDBCommunicatorPtr sendUsingMe) {
118 
119  std::string errMsg;
120 
121  PDB_COUT << "--->CatalogServer handler "
122  "CatalogDatabaseMetadata_TYPEID calling "
123  "addDatabaseMetadata"
124  << endl;
125  const UseTemporaryAllocationBlock block{1024 * 1024};
126  bool res = false;
127  string itemKey = request->getItemKey().c_str();
128  PDB_COUT << " Looking for key " + itemKey << endl;
129 
130  // if database doesn't exist inserts metadata, otherwise only
131  // updates it
132  if (isDatabaseRegistered(itemKey) == false) {
133  res = getFunctionality<CatalogServer>().addDatabaseMetadata(
134  request, errMsg);
135  } else {
136  res = getFunctionality<CatalogServer>().updateDatabaseMetadata(
137  request, errMsg);
138  }
139 
140  const UseTemporaryAllocationBlock tempBlock{1024};
141  Handle<SimpleRequestResult> response =
142  makeObject<SimpleRequestResult>(res, errMsg);
143 
144  // sends result to requester
145  res = sendUsingMe->sendObject(response, errMsg);
146  return make_pair(res, errMsg);
147  }));
148 
149  // handles a request to register or update metadata of a Set in the catalog
150  forMe.registerHandler(
151  CatalogSetMetadata_TYPEID,
153  Handle<CatalogSetMetadata> request, PDBCommunicatorPtr sendUsingMe) {
154 
155  std::string errMsg;
156 
157  PDB_COUT << "--->CatalogServer handler CatalogSetMetadata_TYPEID "
158  "calling addSetMetadata"
159  << endl;
160  const UseTemporaryAllocationBlock block{1024 * 1024};
161  bool res =
162  getFunctionality<CatalogServer>().addSetMetadata(request, errMsg);
163 
164  const UseTemporaryAllocationBlock tempBlock{1024};
165 
166  Handle<SimpleRequestResult> response =
167  makeObject<SimpleRequestResult>(res, errMsg);
168 
169  // sends result to requester
170  res = sendUsingMe->sendObject(response, errMsg);
171  return make_pair(res, errMsg);
172  }));
173 
174  // handles a request to display the contents of the Catalog that have changed
175  // since a given
176  // timestamp
177  forMe.registerHandler(
178  CatalogPrintMetadata_TYPEID,
180  Handle<CatalogPrintMetadata> itemToPrint,
181  PDBCommunicatorPtr sendUsingMe) {
182 
183  std::string errMsg;
184 
185  PDB_COUT << "--->Testing CatalogPrintMetadata handler with timeStamp "
186  << itemToPrint->getTimeStamp().c_str() << endl;
187 
188  getFunctionality<CatalogServer>().printCatalog(itemToPrint);
189 
190  // sends result to requester
191  bool res = sendUsingMe->sendObject(itemToPrint, errMsg);
192  return make_pair(res, errMsg);
193  }));
194 
195  // handles a request to return the typeID of a Type given its name
196  forMe.registerHandler(
197  CatTypeNameSearch_TYPEID,
199  Handle<CatTypeNameSearch> request, PDBCommunicatorPtr sendUsingMe) {
200 
201  PDB_COUT << "received CatTypeNameSearch message" << endl;
202 
203  const LockGuard guard{workingMutex};
204  const UseTemporaryAllocationBlock block{1024 * 1024};
205 
206  // ask the catalog server for the type ID given the type name
207  int16_t typeID =
208  getFunctionality<CatalogServer>().searchForObjectTypeName(
209  request->getObjectTypeName());
210 
211  PDB_COUT << "searchForObjectTypeName for " +
212  request->getObjectTypeName() + " is " +
213  std::to_string(typeID)
214  << endl;
215 
216  const UseTemporaryAllocationBlock tempBlock{1024};
217  Handle<CatTypeSearchResult> response =
218  makeObject<CatTypeSearchResult>(typeID);
219 
220  // sends result to requester
221  std::string errMsg;
222  bool res = sendUsingMe->sendObject(response, errMsg);
223  return make_pair(res, errMsg);
224  }));
225 
226  // handles a request to retrieve an .so library given a Type Id
227  forMe.registerHandler(
228  CatSharedLibraryRequest_TYPEID,
231  PDBCommunicatorPtr sendUsingMe) {
232 
233  const LockGuard guard{workingMutex};
234  const UseTemporaryAllocationBlock block{1024 * 1024};
235 
236  vector<char> *putResultHere = new vector<char>();
237  std::string errMsg;
238  int16_t typeID = request->getTypeID();
239 
240  PDB_COUT << "CatalogServer to handle CatSharedLibraryRequest to "
241  "get shared library "
242  "for typeID="
243  << std::to_string(typeID) << endl;
244 
245  // retrieves the .so library
246  bool res = getFunctionality<CatalogServer>().getSharedLibrary(
247  typeID, (*putResultHere), errMsg);
248 
249  if (!res) {
250  const UseTemporaryAllocationBlock tempBlock{1024};
251  Handle<Vector<char>> response = makeObject<Vector<char>>();
252 
253  // sends the response in case of failure
254  res = sendUsingMe->sendObject(response, errMsg);
255  } else {
256  PDB_COUT << "On Catalog Server bytes returned " +
257  std::to_string((*putResultHere).size())
258  << endl;
259 
260  // allocates memory for the .so library bytes
261  const UseTemporaryAllocationBlock temp{1024 +
262  (*putResultHere).size()};
263  Handle<Vector<char>> response = makeObject<Vector<char>>(
264  (*putResultHere).size(), (*putResultHere).size());
265  memmove(response->c_ptr(), (*putResultHere).data(),
266  (*putResultHere).size());
267 
268  // sends result to requester
269  res = sendUsingMe->sendObject(response, errMsg);
270  }
271 
272  delete putResultHere;
273 
274  // return the result
275  return make_pair(res, errMsg);
276  }));
277 
278  // handles a request to retrieve an .so library given a Type Name along with
279  // its metadata (stored as a serialized CatalogUserTypeMetadata object)
280  forMe.registerHandler(
281  CatSharedLibraryByNameRequest_TYPEID,
284  PDBCommunicatorPtr sendUsingMe) {
285 
286  string typeName = request->getTypeLibraryName();
287  int16_t typeId = 0;
288  // if typeName is empty we are searching by typeId, hence retrieves the
289  // typeId from the request object
290  if (typeName.empty() == true) {
291  typeId = request->getTypeLibraryId();
292  } else {
293  // if a typeName is provided, we are searching by that name, so first
294  // we have to retrieve the typeID, given the provided typeName
295  typeId = allTypeNames[typeName];
296  }
297 
298  const LockGuard guard{workingMutex};
299  PDB_COUT << "Triggering Handler CatalogServer "
300  "CatSharedLibraryByNameRequest for typeName="
301  << typeName << " and typeId=" << std::to_string(typeId)
302  << endl;
303 
304  bool res = false;
305  string returnedBytes;
306  std::string errMsg;
307 
308  PDB_COUT << std::string("CatalogServer to handle "
309  "CatSharedLibraryByNameRequest to get shared "
310  "library for typeName=")
311  << typeName << std::string(" and typeId=")
312  << std::to_string(typeId) << endl;
313 
314  // if this is the Manager catalog retrieves .so bytes from local catalog
315  // copy
316  if (this->isManagerCatalogServer == true) {
317  // Allocates 150Mb for sending .so libraries
318  const UseTemporaryAllocationBlock tempBlock{1024 * 1024 * 150};
319 
321  makeObject<CatalogUserTypeMetadata>();
322 
323  PDB_COUT << " Invoking getSharedLibrary(typeName) from "
324  "CatalogServer Handler "
325  "b/c this is Manager Catalog "
326  << endl;
327 
328  // if the type is not registered in the Manager Catalog just return
329  // with a typeID = -1
330  if (allTypeCodes.count(typeId) == 0) {
331 
332  const UseTemporaryAllocationBlock tempBlock{1024};
333 
334  // Creates an empty Object just to send the response to caller
335  Handle<CatalogUserTypeMetadata> notFoundResponse =
336  makeObject<CatalogUserTypeMetadata>();
337  String newItemID("-1");
338  notFoundResponse->setObjectId(newItemID);
339 
340  res = sendUsingMe->sendObject(notFoundResponse, errMsg);
341 
342  } else {
343  const UseTemporaryAllocationBlock tempBlock{1024 * 1024 * 150};
344  // resolves typeName given the typeId
345  typeName = allTypeCodes[typeId];
346  PDB_COUT << "Resolved typeName " << typeName
347  << " for typeId=" + std::to_string(typeId) << endl;
348 
349  // the type was found in the catalog, retrieves metadata and bytes
350  // of the Shared Library
351  res = getFunctionality<CatalogServer>().getSharedLibraryByTypeName(
352  typeName, response, returnedBytes, errMsg);
353 
354  PDB_COUT << " Bytes returned YES isManager: " +
355  std::to_string(returnedBytes.size())
356  << endl;
357  PDB_COUT << "typeId=" + string(response->getObjectID()) << endl;
358  PDB_COUT << "ItemName=" + string(response->getItemName()) << endl;
359  PDB_COUT << "ItemKey=" + string(response->getItemKey()) << endl;
360 
361  // copies the bytes of the Shared Library to the object to be sent
362  // back to the caller
363  response->setLibraryBytes(returnedBytes);
364 
365  PDB_COUT << "Object Id isManager: " + string(response->getObjectID())
366  << " | " << string(response->getItemKey())
367  << " | " + string(response->getItemName()) << endl;
368 
369  if (!res) {
370  res = sendUsingMe->sendObject(response, errMsg);
371  } else {
372  PDB_COUT << " Sending metadata and bytes to caller!" << endl;
373  // sends result to requester
374  res = sendUsingMe->sendObject(response, errMsg);
375  }
376  }
377  } else {
378  // if this is not the Manager catalog, retrieves .so bytes from the
379  // remote manager catalog
380  if (allTypeCodes.count(typeId) == 0) {
381  // process the case where the type is not registered in this local
382  // catalog, allocates 150Mb for sending .so libraries
383  const UseTemporaryAllocationBlock tempBlock{1024 * 1024 * 150};
384 
386  makeObject<CatalogUserTypeMetadata>();
387 
388  PDB_COUT << " Connecting to the Remote Catalog Server via "
389  "Catalog Client"
390  << endl;
391  PDB_COUT << " Invoking "
392  "CatalogClient.getSharedLibraryByName(typeName) from "
393  "CatalogServer b/c this is Local Catalog "
394  << endl;
395 
396  // uses a dummyObjectFile since this is just making a remote call to
397  // the Catalog Manager Server and what matters is the returned bytes.
398  string dummyObjectFile = catalogDirectory + "/tmp_so_files/temp.so";
399 
400  // retrieves from remote catalog the Shared Library bytes in
401  // "returnedBytes" and metadata in the "response" object
403  .getSharedLibraryByTypeName(typeId, typeName,
404  dummyObjectFile, response,
405  returnedBytes, errMsg);
406 
407  PDB_COUT << " Bytes returned NOT isManager: "
408  << std::to_string(returnedBytes.size()) << endl;
409 
410  // if the library was successfully retrieved, go ahead and resolve
411  // vtable fixing in the local catalog, given the library and
412  // metadata retrieved from the remote Manager Catalog
413  if (res == true) {
414  res = getFunctionality<CatalogServer>().addObjectType(
415  typeId, returnedBytes, errMsg);
416  }
417 
418  if (!res) {
419  PDB_COUT << " before sending response Vtable not fixed!!!!!!"
420  << endl;
421 
422  PDB_COUT << errMsg << endl;
423  const UseTemporaryAllocationBlock tempBlock{1024};
424 
425  Handle<CatalogUserTypeMetadata> notFoundResponse =
426  makeObject<CatalogUserTypeMetadata>();
427  String newItemID("-1");
428  notFoundResponse->setObjectId(newItemID);
429 
430  res = sendUsingMe->sendObject(notFoundResponse, errMsg);
431 
432  } else {
433  // if retrieval was successful prepare and send object to caller
434  PDB_COUT << " before sending response Vtable fixed!!!!"
435  << endl;
436 
437  const UseTemporaryAllocationBlock tempBlock{1024 * 1024 * 150};
438 
439  // prepares Object to be sent to caller
440  Handle<CatalogUserTypeMetadata> objectToBeSent =
441  makeObject<CatalogUserTypeMetadata>();
442 
443  String _retBytes(returnedBytes);
444  char objectIDCharArray[50];
445  sprintf(objectIDCharArray, "%d", typeId);
446  String newItemID(objectIDCharArray);
447  objectToBeSent->setObjectId(newItemID);
448  objectToBeSent->setLibraryBytes(_retBytes);
449  String newTypeName(typeName);
450  objectToBeSent->setItemName(newTypeName);
451  objectToBeSent->setItemKey(newTypeName);
452 
453  // sends result to requester
454  res = sendUsingMe->sendObject(objectToBeSent, errMsg);
455  } // end retrieval fail/successful
456  } else {
457  // process the case where the type is already registered in this
458  // local catalog
459  // Allocates 150Mb for sending .so libraries
460  const UseTemporaryAllocationBlock tempBlock{1024 * 1024 * 150};
461 
463  makeObject<CatalogUserTypeMetadata>();
464 
465  typeName = allTypeCodes[typeId];
466  PDB_COUT << "Resolved typeName" << typeName
467  << " for typeId=" << std::to_string(typeId) << endl;
468 
469  // retrieves from local catalog the Shared Library bytes in
470  // "returnedBytes" and metadata in the "response" object
471  res = getFunctionality<CatalogServer>().getSharedLibraryByTypeName(
472  typeName, response, returnedBytes, errMsg);
473 
474  PDB_COUT << " Bytes returned No isManager: "
475  << std::to_string(returnedBytes.size()) << endl;
476 
477  response->setLibraryBytes(returnedBytes);
478 
479  PDB_COUT << "Object Id isLocal: " << string(response->getObjectID())
480  << " | " << string(response->getItemKey()) << " | "
481  << string(response->getItemName()) << endl;
482 
483  if (!res) {
484  // sends result to requester
485  const UseTemporaryAllocationBlock tempBlock{1024};
486  res = sendUsingMe->sendObject(response, errMsg);
487  } else {
488  PDB_COUT << " Sending metadata and Shared Library to caller!"
489  << endl;
490  res = sendUsingMe->sendObject(response, errMsg);
491  }
492 
493  } // end "if" type was found in local catalog or not
494  } // end "if" is Manager or Local catalog case
495 
496  return make_pair(res, errMsg);
497  }));
498 
499  // handles a request to retrieve the TypeId of a Type, if it's not registered
500  // returns -1
501  forMe.registerHandler(
502  CatSetObjectTypeRequest_TYPEID,
505  PDBCommunicatorPtr sendUsingMe) {
506 
507  const LockGuard guard{workingMutex};
508  const UseTemporaryAllocationBlock block{1024 * 1024};
509 
510  // ask the catalog server for the type ID and then the name of the
511  // type
512  int16_t typeID = getFunctionality<CatalogServer>().getObjectType(
513  request->getDatabaseName(), request->getSetName());
514 
515  PDB_COUT << "typeID for Set with dbName="
516  << string(request->getDatabaseName())
517  << " and setName=" << string(request->getSetName())
518  << " is " << std::to_string(typeID) << endl;
519 
520  const UseTemporaryAllocationBlock tempBlock{1024};
522  if (typeID >= 0)
523  response = makeObject<CatTypeNameSearchResult>(
524  searchForObjectTypeName(typeID), true, "success");
525  else
526  response = makeObject<CatTypeNameSearchResult>(
527  "", false, "could not find requested type");
528 
529  // sends result to requester
530  std::string errMsg;
531  bool res = sendUsingMe->sendObject(response, errMsg);
532  return make_pair(res, errMsg);
533  }));
534 
535  // handle a request to register metadata for a new Database in the catalog
536  forMe.registerHandler(
537  CatCreateDatabaseRequest_TYPEID,
540  PDBCommunicatorPtr sendUsingMe) {
541 
542  const LockGuard guard{workingMutex};
543  const UseTemporaryAllocationBlock block{1024 * 1024};
544 
545  std::string errMsg;
546 
547  // invokes adding Database metadata to catalog
548  bool res = getFunctionality<CatalogServer>().addDatabase(
549  request->dbToCreate(), errMsg);
550 
551  const UseTemporaryAllocationBlock tempBlock{1024};
552  Handle<SimpleRequestResult> response =
553  makeObject<SimpleRequestResult>(res, errMsg);
554 
555  // sends result to requester
556  res = sendUsingMe->sendObject(response, errMsg);
557  return make_pair(res, errMsg);
558  }));
559 
560  // handle a request to register metadata for a new Set in the catalog
561  forMe.registerHandler(
562  CatCreateSetRequest_TYPEID,
564  Handle<CatCreateSetRequest> request, PDBCommunicatorPtr sendUsingMe) {
565 
566  const LockGuard guard{workingMutex};
567  const UseTemporaryAllocationBlock block{1024 * 1024};
568 
569  std::string errMsg;
570  auto info = request->whichSet();
571 
572  // invokes adding Set metadata to catalog
573  bool res = getFunctionality<CatalogServer>().addSet(
574  request->whichType(), info.first, info.second, errMsg);
575 
576  const UseTemporaryAllocationBlock tempBlock{1024};
577  Handle<SimpleRequestResult> response =
578  makeObject<SimpleRequestResult>(res, errMsg);
579 
580  // sends result to requester
581  res = sendUsingMe->sendObject(response, errMsg);
582  return make_pair(res, errMsg);
583  }));
584 
585  // handle a request to delete metadata for an existing Database in the catalog
586  forMe.registerHandler(
587  CatDeleteDatabaseRequest_TYPEID,
590  PDBCommunicatorPtr sendUsingMe) {
591 
592  const LockGuard guard{workingMutex};
593  const UseTemporaryAllocationBlock block{1024 * 1024};
594 
595  std::string errMsg;
596 
597  // invokes deleting Database metadata from catalog
598  bool res = getFunctionality<CatalogServer>().deleteDatabase(
599  request->dbToDelete(), errMsg);
600 
601  const UseTemporaryAllocationBlock tempBlock{1024};
602  Handle<SimpleRequestResult> response =
603  makeObject<SimpleRequestResult>(res, errMsg);
604 
605  // sends result to requester
606  res = sendUsingMe->sendObject(response, errMsg);
607  return make_pair(res, errMsg);
608  }));
609 
610  // handle a request to delete metadata for an existing Set in the catalog
611  forMe.registerHandler(
612  CatDeleteSetRequest_TYPEID,
614  Handle<CatDeleteSetRequest> request, PDBCommunicatorPtr sendUsingMe) {
615 
616  const LockGuard guard{workingMutex};
617  const UseTemporaryAllocationBlock block{1024 * 1024};
618 
619  std::string errMsg;
620  auto info = request->whichSet();
621 
622  // invokes deleting Set metadata from catalog
623  bool res = getFunctionality<CatalogServer>().deleteSet(
624  info.first, info.second, errMsg);
625 
626  const UseTemporaryAllocationBlock tempBlock{1024};
627  Handle<SimpleRequestResult> response =
628  makeObject<SimpleRequestResult>(res, errMsg);
629 
630  // sends result to requester
631  res = sendUsingMe->sendObject(response, errMsg);
632  return make_pair(res, errMsg);
633  }));
634 
635  // handle a request to add information of a node to an existing Database
636  // this is invoked when the Distributed Storage Manager creates storage in
637  // a node in the cluster for a given Database
638  forMe.registerHandler(
639  CatAddNodeToDatabaseRequest_TYPEID,
642  PDBCommunicatorPtr sendUsingMe) {
643 
644  const LockGuard guard{workingMutex};
645  const UseTemporaryAllocationBlock block{1024 * 1024};
646 
647  std::string errMsg;
648 
649  // invokes adding node metadata to an Existing Database
650  bool res = getFunctionality<CatalogServer>().addNodeToDB(
651  request->nodeToAdd(), request->whichDB(), errMsg);
652 
653  const UseTemporaryAllocationBlock tempBlock{1024};
654 
655  Handle<SimpleRequestResult> response =
656  makeObject<SimpleRequestResult>(res, errMsg);
657 
658  // sends result to requester
659  res = sendUsingMe->sendObject(response, errMsg);
660  return make_pair(res, errMsg);
661  }));
662 
663  // handle a request to add information of a set to an existing Database
664  // this is invoked when the Distributed Storage Manager creates a set
665  // in a given node for an existing Database
666  forMe.registerHandler(
667  CatAddNodeToSetRequest_TYPEID,
669  [&](Handle<CatAddNodeToSetRequest> request,
670  PDBCommunicatorPtr sendUsingMe) {
671 
672  const LockGuard guard{workingMutex};
673  const UseTemporaryAllocationBlock block{1024 * 1024};
674 
675  std::string errMsg;
676 
677  // invokes add Node information to a Set in a given Database
678  bool res = getFunctionality<CatalogServer>().addNodeToSet(
679  request->nodeToAdd(), request->whichDB(), request->whichSet(),
680  errMsg);
681 
682  const UseTemporaryAllocationBlock tempBlock{1024};
683  Handle<SimpleRequestResult> response =
684  makeObject<SimpleRequestResult>(res, errMsg);
685 
686  // sends result to requester
687  res = sendUsingMe->sendObject(response, errMsg);
688  return make_pair(res, errMsg);
689  }));
690 
691  // handle a request to remove information of a node to an existing Database
692  // this is invoked when the Distributed Storage Manager deletes storage in
693  // a node in the cluster for a given Database
694  forMe.registerHandler(
695  CatRemoveNodeFromDatabaseRequest_TYPEID,
698  PDBCommunicatorPtr sendUsingMe) {
699 
700  const LockGuard guard{workingMutex};
701  const UseTemporaryAllocationBlock block{1024 * 1024};
702  // ask the catalog server for the type ID and then the name of the
703  // type
704  std::string errMsg;
705 
706  // invokes remove node information from an existing Database
707  bool res = getFunctionality<CatalogServer>().removeNodeFromDB(
708  request->nodeToRemove(), request->whichDB(), errMsg);
709 
710  const UseTemporaryAllocationBlock tempBlock{1024};
711  Handle<SimpleRequestResult> response =
712  makeObject<SimpleRequestResult>(res, errMsg);
713 
714  // sends result to requester
715  res = sendUsingMe->sendObject(response, errMsg);
716  return make_pair(res, errMsg);
717  }));
718 
719  // handle a request to remove information of a set to an existing Database
720  // this is invoked when the Distributed Storage Manager deletes a set
721  // in a given node for an existing Database
722  forMe.registerHandler(
723  CatRemoveNodeFromSetRequest_TYPEID,
726  PDBCommunicatorPtr sendUsingMe) {
727 
728  const LockGuard guard{workingMutex};
729  const UseTemporaryAllocationBlock block{1024 * 1024};
730 
731  std::string errMsg;
732  auto info = request->whichSet();
733 
734  // invokes remove node information from a Set in an existing
735  // Database
736  bool res = getFunctionality<CatalogServer>().removeNodeFromSet(
737  request->nodeToRemove(), request->whichDB(),
738  request->whichSet(), errMsg);
739 
740  const UseTemporaryAllocationBlock tempBlock{1024};
741  Handle<SimpleRequestResult> response =
742  makeObject<SimpleRequestResult>(res, errMsg);
743 
744  // sends result to requester
745  res = sendUsingMe->sendObject(response, errMsg);
746  return make_pair(res, errMsg);
747  }));
748 
749  // handles a request to register a shared library
750  forMe.registerHandler(
751  CatRegisterType_TYPEID,
753  [&](Handle<CatRegisterType> request, PDBCommunicatorPtr sendUsingMe) {
754  PDB_COUT << "Got a CatRegisterType request" << std::endl;
755 
756  const LockGuard guard{workingMutex};
757  PDB_COUT << "Got lockGuard" << std::endl;
758  const UseTemporaryAllocationBlock block{1024 * 1024};
759 
760  // get the next object... this holds the shared library file... it
761  // could be big, so be careful!!
762  size_t objectSize = sendUsingMe->getSizeOfNextObject();
763  PDB_COUT << "Got objectSize=" << objectSize << std::endl;
764  bool res;
765  std::string errMsg;
766  void *memory = malloc(objectSize + 2048);
767  Handle<Vector<char>> myFile =
768  sendUsingMe->getNextObject<Vector<char>>(memory, res, errMsg);
769  PDB_COUT << "Received all data" << std::endl;
770  if (res) {
771  string soFile(myFile->c_ptr(), objectSize);
772  PDB_COUT << "Create a large string on stack" << std::endl;
773  res = (addObjectType(-1, soFile, errMsg) >= 0);
774  }
775  free(memory);
776 
777  const UseTemporaryAllocationBlock tempBlock{1024};
778  Handle<SimpleRequestResult> response =
779  makeObject<SimpleRequestResult>(res, errMsg);
780 
781  // sends result to requester
782  res = sendUsingMe->sendObject(response, errMsg);
783  return make_pair(res, errMsg);
784  }));
785 
786  // handle to close the SQLite DB Handler
787  forMe.registerHandler(
788  CatalogCloseSQLiteDBHandler_TYPEID,
791  PDBCommunicatorPtr sendUsingMe) {
792 
793  std::string errMsg;
794  PDB_COUT << "--->Testing CatalogCloseSQLiteDBHandler handler " << endl;
795  const UseTemporaryAllocationBlock block{1024 * 1024};
796 
797  pdbCatalog->closeSQLiteHandler();
798 
799  const UseTemporaryAllocationBlock tempBlock{1024};
800  Handle<SimpleRequestResult> response =
801  makeObject<SimpleRequestResult>(true, errMsg);
802 
803  // sends result to requester
804  bool res = sendUsingMe->sendObject(response, errMsg);
805  return make_pair(res, errMsg);
806  }));
807 }
808 
809 // returns the Name of a Type given its Type ID
810 std::string CatalogServer::searchForObjectTypeName(int16_t typeIdentifier) {
811 
812  PDB_COUT << "searchForObjectTypeName with typeIdentifier ="
813  << std::to_string(typeIdentifier) << endl;
814 
815  // first search for the type name in the vTable map (in case it is built in)
816  std::string result = VTableMap::lookupBuiltInType(typeIdentifier);
817  if (result != "")
818  return result;
819 
820  // return a -1 if we've never seen this type name
821  if (allTypeCodes.count(typeIdentifier) == 0)
822  return "";
823 
824  // return the name of a non built-in type (registered via Shared Library)
825  return allTypeCodes[typeIdentifier];
826 }
827 
828 // retrieves the bytes of a Shared Library and returns them in putResultHere
829 bool CatalogServer::getSharedLibrary(int16_t identifier,
830  vector<char> &putResultHere,
831  std::string &errMsg) {
832 
833  // first, make sure we have this identifier
834  if (allTypeCodes.count(identifier) == 0) {
835  errMsg = "CatalogServer::getSharedLibrary(): Error: didn't know the "
836  "identifier you sent me";
837  PDB_COUT << errMsg << endl;
838  return false;
839  }
840 
841  // now, read in the .so file
842  std::string whichFile =
843  catalogDirectory + "/tmp_so_files/" + allTypeCodes[identifier] + ".so";
844  std::ifstream in(whichFile, std::ifstream::ate | std::ifstream::binary);
845  size_t fileLen = in.tellg();
846 
847  struct stat st;
848  stat(whichFile.c_str(), &st);
849  fileLen = st.st_size;
850  int filedesc = open(whichFile.c_str(), O_RDONLY);
851  putResultHere.resize(fileLen);
852 
853  // put the bytes in the output parameter "putResultHere"
854  read(filedesc, putResultHere.data(), fileLen);
855  close(filedesc);
856 
857  return true;
858 }
859 
860 // retrieves the bytes of a Shared Library and its associated metadata, given
861 // its typeName
863  std::string typeName, Handle<CatalogUserTypeMetadata> &itemMetadata,
864  string &returnedBytes, std::string &errMsg) {
865 
866  PDB_COUT << " Catalog Server->inside get getSharedLibraryByName id for type "
867  << typeName << endl;
868 
869  int metadataCategory = (int)PDBCatalogMsgType::CatalogPDBRegisteredObject;
870 
871  // retrieves the type id given its name
872  string id = pdbCatalog->itemName2ItemId(metadataCategory, typeName);
873 
874  PDB_COUT << " id " << id << endl;
875 
876  // data_types is used for retrieving User Defined Types
877  string typeOfObject = "data_types";
878 
879  if (pdbCatalog->retrievesDynamicLibrary(typeName, typeOfObject, itemMetadata,
880  returnedBytes, errMsg) == true) {
881 
882  PDB_COUT << "Metadata returned at get SharedLibrary Id: "
883  << string(itemMetadata->getItemId()) << endl;
884  PDB_COUT << "Metadata returned at get SharedLibrary Key: "
885  << string(itemMetadata->getItemKey()) << endl;
886  PDB_COUT << "Bytes after string " << std::to_string(returnedBytes.size())
887  << endl;
888  return true;
889  } else {
890  PDB_COUT << "Item with key " << typeName << " was not found! " << endl;
891  return false;
892  }
893 }
894 
895 // returns the typeId of a Type given it's name, if not found returns -1
896 int16_t CatalogServer::getObjectType(std::string databaseName,
897  std::string setName) {
898  if (setTypes.count(make_pair(databaseName, setName)) == 0)
899  return -1;
900 
901  return setTypes[make_pair(databaseName, setName)];
902 }
903 
904 // Adds metadata and bytes of a Shared Library in the catalog and returns its
905 // typeId
906 int16_t CatalogServer::addObjectType(int16_t typeIDFromManagerCatalog,
907  string &soFile, string &errMsg) {
908 
909  // read the bytes from the temporary extracted file and copy to output
910  // parameter
911  string tempFile = catalogDirectory + "/tmp_so_files/temp.so";
912  int filedesc = open(tempFile.c_str(), O_CREAT | O_WRONLY, S_IRUSR | S_IWUSR);
913  size_t sizeWritten = write(filedesc, soFile.data(), soFile.size());
914  PDB_COUT << "addObjectType: sizeWritten=" << sizeWritten << std::endl;
915  close(filedesc);
916 
917  // check to make sure it is valid Shared Library
918  void *so_handle = nullptr;
919  so_handle = dlopen(tempFile.c_str(), RTLD_LOCAL | RTLD_LAZY);
920  if (!so_handle) {
921  const char *dlsym_error = dlerror();
922  errMsg = "Cannot process shared library. " + string(dlsym_error) + '\n';
923  std::cout << "CatalogServer: " << errMsg << std::endl;
924  dlclose(so_handle);
925  return -1;
926  }
927 
928  // makes a call to the "getObjectTypeName" function defined in the shared
929  // library
930  const char *dlsym_error;
931  std::string getName = "getObjectTypeName";
932  typedef char *getObjectTypeNameFunc();
933  getObjectTypeNameFunc *myFunc =
934  (getObjectTypeNameFunc *)dlsym(so_handle, getName.c_str());
935 
936  PDB_COUT << "open function: " << getName << endl;
937 
938  // if the function is not defined or there was an error return
939  if ((dlsym_error = dlerror())) {
940  errMsg =
941  "Error, can't load function getObjectTypeName in the shared library. " +
942  string(dlsym_error) + '\n';
943  PDB_COUT << errMsg << endl;
944  return -1;
945  }
946  PDB_COUT << "all ok" << endl;
947 
948  // now, get the type name and write the appropriate file
949  string typeName(myFunc());
950  dlclose(so_handle);
951 
952  PDB_COUT << "typeName returned from SO file: " << typeName << endl;
953 
954  // rename temporary file
955  string newName = catalogDirectory + "/tmp_so_files/" + typeName + ".so";
956  int result = rename(tempFile.c_str(), newName.c_str());
957  if (result == 0) {
958  PDB_COUT << "Successfully renaming file " << newName << endl;
959  } else {
960  PDB_COUT << "Renaming temp file failed " << newName << endl;
961  return -1;
962  }
963 
964  // add the new type name, if we don't already have it
965  if (allTypeNames.count(typeName) == 0) {
966  PDB_COUT << "Fixing vtable ptr for type " << typeName
967  << " with metadata retrieved from remote Catalog Server." << endl;
968 
969  int16_t typeCode;
970 
971  // if the type received is -1 this is a type not registered and we set the
972  // new typeID increasing by 1, otherwise we use the typeID received from
973  // the Manager Catalog
974  if (typeIDFromManagerCatalog == -1)
975  typeCode = 8192 + allTypeNames.size();
976  else
977  typeCode = typeIDFromManagerCatalog;
978 
979  PDB_COUT << "Id Assigned to type " << typeName << " was "
980  << std::to_string(typeCode) << endl;
981 
982  allTypeNames[typeName] = typeCode;
983  allTypeCodes[typeCode] = typeName;
984 
985  vector<string> typeNames;
986  vector<int> typeCodes;
987 
988  for (auto &pair : allTypeNames) {
989  typeNames.push_back(pair.first);
990  typeCodes.push_back(pair.second);
991  }
992 
993  const UseTemporaryAllocationBlock tempBlock{1024 * 1024 * 128};
994  Handle<CatalogUserTypeMetadata> objectMetadata =
995  makeObject<CatalogUserTypeMetadata>();
996 
997  PDB_COUT << "before calling registerUserDefinedObject with typeCode="
998  << typeCode << endl;
999 
1000  // register the bytes of the shared library along its metadata in the
1001  // catalog
1002  pdbCatalog->registerUserDefinedObject(
1003  typeCode, objectMetadata, std::string(soFile.begin(), soFile.end()),
1004  typeName, catalogDirectory + "/tmp_so_files/" + typeName + ".so",
1005  "data_types", errMsg);
1006 
1007  return typeCode;
1008  } else
1009  return allTypeNames[typeName];
1010 }
1011 
1012 // deletes metadata about a Set in the Catalog
1013 bool CatalogServer::deleteSet(std::string databaseName, std::string setName,
1014  std::string &errMsg) {
1015 
1016  // prepares the key
1017  string setUniqueId = databaseName + "." + setName;
1018  PDB_COUT << "Deleting set " << setUniqueId << endl;
1019 
1020  // make sure the Database for this set is registered
1021  if (isDatabaseRegistered(databaseName) == false) {
1022  errMsg = "Database does not exist.\n";
1023  return false;
1024  }
1025 
1026  // make sure the set is registered
1027  if (isSetRegistered(databaseName, setName) == false) {
1028  errMsg = "Set doesn't exist " + databaseName + "." + setName;
1029  return false;
1030  }
1031 
1032  Handle<CatalogSetMetadata> metadataObject = makeObject<CatalogSetMetadata>();
1033 
1034  string _setName(databaseName + "." + setName);
1035  String setKeyCatalog = String(_setName);
1036  String setNameCatalog = String(setName);
1037  String dbName(databaseName);
1038 
1039  int catalogType = PDBCatalogMsgType::CatalogPDBSet;
1040  String setId = String(pdbCatalog->itemName2ItemId(catalogType, _setName));
1041 
1042  // populates object metadata
1043  metadataObject->setItemId(setId);
1044  metadataObject->setItemKey(setKeyCatalog);
1045  metadataObject->setItemName(setNameCatalog);
1046  metadataObject->setDBName(dbName);
1047 
1048  // deletes Set metadata in the catalog
1049  pdbCatalog->deleteMetadataInCatalog(metadataObject, catalogType, errMsg);
1050 
1051  // prepares Database object to update the set deletion
1052  Handle<CatalogDatabaseMetadata> dbMetadataObject =
1053  makeObject<CatalogDatabaseMetadata>();
1054 
1056  makeObject<Vector<CatalogDatabaseMetadata>>();
1057 
1059 
1060  // retrieves the metadata for this Set's Database
1061  if (pdbCatalog->getMetadataFromCatalog(false, databaseName, databaseItems,
1062  errMsg, catalogType) == false)
1063  PDB_COUT << errMsg << endl;
1064 
1065  for (int i = 0; i < (*databaseItems).size(); i++) {
1066  if ((*databaseItems)[i].getItemKey().c_str() == databaseName)
1067  *dbMetadataObject = (*databaseItems)[i];
1068  }
1069 
1070  // deletes the Set
1071  (*dbMetadataObject).deleteSet(setNameCatalog);
1072 
1073  // updates the corresponding database metadata
1074  if (!pdbCatalog->updateMetadataInCatalog(dbMetadataObject, catalogType,
1075  errMsg)) {
1076  return false;
1077  }
1078 
1079  // after it updated the database metadata in the local catalog, if this is the
1080  // manager catalog iterate over all nodes in the cluster and broadcast the
1081  // update to the distributed copies of the catalog
1082  if (isManagerCatalogServer) {
1083  // map to capture the results of broadcasting the Set deletion
1084  map<string, pair<bool, string>> updateResults;
1085 
1086  Handle<CatDeleteSetRequest> setToRemove =
1087  makeObject<CatDeleteSetRequest>(databaseName, setName);
1088 
1089  // first, broadcasts the metadata of the removed set to all copies of the
1090  // catalog in the cluster, removing this Set
1091  broadcastCatalogDelete(setToRemove, updateResults, errMsg);
1092 
1093  for (auto &item : updateResults) {
1094  PDB_COUT << "Set Metadata in node IP: " << item.first
1095  << ((item.second.first == true)
1096  ? " updated correctly!"
1097  : " couldn't be updated due to error: ")
1098  << item.second.second << endl;
1099  }
1100 
1101  // map to capture the results of broadcasting the DB update
1102  map<string, pair<bool, string>> updateSetResults;
1103 
1104  // second, broadcasts the metadata of the DB to which this set has been
1105  // removed from, updating all distributed copies of the catalog
1106  broadcastCatalogUpdate(dbMetadataObject, updateSetResults, errMsg);
1107 
1108  for (auto &item : updateSetResults) {
1109  PDB_COUT << "DB Metadata in node IP: " << item.first
1110  << ((item.second.first == true)
1111  ? "updated correctly!"
1112  : "couldn't be updated due to error: ")
1113  << item.second.second << endl;
1114  }
1115  PDB_COUT << "******************* deleteSet step completed!!!!!!!" << endl;
1116  } else {
1117  PDB_COUT << "This is not Manager Catalog Node, thus metadata was only "
1118  "registered locally!"
1119  << endl;
1120  }
1121 
1122  return true;
1123 }
1124 
1125 // Adds Metadata about a Set that has been created into the Catalog
1126 bool CatalogServer::addSet(int16_t typeIdentifier, std::string databaseName,
1127  std::string setName, std::string &errMsg) {
1128 
1129  // make sure we are only adding to an existing database
1130  if (isDatabaseRegistered(databaseName) == false) {
1131  errMsg = "Database does not exist.\n";
1132  return false;
1133  }
1134 
1135  // make sure that the set does not exist
1136  string setUniqueId = databaseName + "." + setName;
1137  if (isSetRegistered(databaseName, setName) == true) {
1138  errMsg = "Set already exists.\n";
1139  return false;
1140  }
1141 
1142  // make sure that type associated to this Set exists
1143  if (typeIdentifier >= 8192 && allTypeCodes.count(typeIdentifier) == 0) {
1144  errMsg = "Type code does not exist.\n";
1145  PDB_COUT << errMsg << "TypeId=" << std::to_string(typeIdentifier) << endl;
1146  return false;
1147  }
1148 
1149  PDB_COUT << "...... Calling CatalogServer :: addSet" << endl;
1150 
1151  // add the Type Id to the map of databases and sets
1152  setTypes[make_pair(databaseName, setName)] = typeIdentifier;
1153 
1154  // searches for the Type Name given its Type ID
1155  string typeNameStr = searchForObjectTypeName(typeIdentifier);
1156  PDB_COUT << "Got typeName=" << typeNameStr << endl;
1157 
1158  if (typeNameStr == "") {
1159  errMsg = "TypeName doesn not exist";
1160  return false;
1161  }
1162 
1163  String typeName(typeNameStr);
1164 
1165  PDB_COUT << "TypeID for Set with dbName=" << databaseName
1166  << " and setName=" << setName << " is "
1167  << std::to_string(typeIdentifier) << endl;
1168 
1169  Handle<CatalogSetMetadata> metadataObject = makeObject<CatalogSetMetadata>();
1170 
1171  // gets the Database Id
1172  int catalogType = PDBCatalogMsgType::CatalogPDBDatabase;
1173  string dbId = pdbCatalog->itemName2ItemId(catalogType, databaseName);
1174 
1175 
1177 
1178  string typeId = std::to_string(typeIdentifier);
1179 
1180  // creates Strings
1181  String setKeyCatalog = String(setUniqueId);
1182  String setNameCatalog = String(setName);
1183  String dbName(databaseName);
1184 
1185  // populates object metadata
1186  metadataObject->setItemKey(setKeyCatalog);
1187  metadataObject->setItemName(setNameCatalog);
1188 
1189  // retrieves the Set Id
1191  String _dbId = String(pdbCatalog->itemName2ItemId(catalogType, databaseName));
1192 
1193  metadataObject->setDBId(_dbId);
1194  metadataObject->setDBName(dbName);
1195 
1196  String _typeId = String(std::to_string(typeIdentifier));
1197 
1198  metadataObject->setTypeId(_typeId);
1199  metadataObject->setTypeName(typeName);
1200 
1201  catalogType = PDBCatalogMsgType::CatalogPDBSet;
1202 
1203  // adds metadata to the catalog if this is a new item,
1204  // otherwise updates existing metadata
1205  if (isSetRegistered(dbName, setName) == false) {
1206  pdbCatalog->addMetadataToCatalog(metadataObject, catalogType,
1207  errMsg);
1208  } else {
1209  pdbCatalog->updateMetadataInCatalog(metadataObject, catalogType, errMsg);
1210  }
1211 
1212  // prepares object for the Database metadata
1214  Handle<CatalogDatabaseMetadata> dbMetadataObject =
1215  makeObject<CatalogDatabaseMetadata>();
1216 
1217  // retrieves the metadata for this Set's Database
1219  makeObject<Vector<CatalogDatabaseMetadata>>();
1220 
1221  if (pdbCatalog->getMetadataFromCatalog(false, databaseName, databaseItems,
1222  errMsg, catalogType) == false)
1223  PDB_COUT << errMsg << endl;
1224 
1225  for (int i = 0; i < (*databaseItems).size(); i++) {
1226  if ((*databaseItems)[i].getItemKey().c_str() == databaseName)
1227  *dbMetadataObject = (*databaseItems)[i];
1228  }
1229 
1230  (*dbMetadataObject).addSet(setNameCatalog);
1231  (*dbMetadataObject).addType(typeName);
1232 
1233  // updates the corresponding database metadata
1234  if (isDatabaseRegistered(databaseName) == true) {
1236  if (pdbCatalog->updateMetadataInCatalog(dbMetadataObject, catalogType,
1237  errMsg) == true)
1238  PDB_COUT << "DB Update Set metadata OK" << endl;
1239  else {
1240  PDB_COUT << "DB Update metadata Set Error: " << errMsg << endl;
1241  return false;
1242  }
1243  } else {
1244  return false;
1245  }
1246 
1247  // after it updated the database metadata in the local catalog, if this is the
1248  // manager catalog iterate over all nodes in the cluster and broadcast the
1249  // update to the distributed copies of the catalog
1250  if (isManagerCatalogServer) {
1251  // map to capture the results of broadcasting the Set insertion
1252  map<string, pair<bool, string>> updateResults;
1253 
1254  // first, broadcasts the metadata of the new set to the distributed copies
1255  // of the catalog updating metaddata of the new Set
1256  broadcastCatalogUpdate(metadataObject, updateResults, errMsg);
1257 
1258  for (auto &item : updateResults) {
1259  PDB_COUT << "Set Metadata broadcasted to node IP: "
1260  << item.first + ((item.second.first == true)
1261  ? " updated correctly!"
1262  : " couldn't be updated due to error: ")
1263  << item.second.second << endl;
1264  }
1265 
1266  // map to capture the results of broadcasting the DB update
1267  map<string, pair<bool, string>> updateSetResults;
1268 
1269  // second, broadcasts the metadata of the DB to which this set has been
1270  // added, updating the distributed copies of the catalog
1271  broadcastCatalogUpdate(dbMetadataObject, updateSetResults, errMsg);
1272 
1273  for (auto &item : updateSetResults) {
1274  PDB_COUT << "DB Metadata broadcasted to node IP: "
1275  << item.first + ((item.second.first == true)
1276  ? " updated correctly!"
1277  : " couldn't be updated due to error: ")
1278  << item.second.second << endl;
1279  }
1280  PDB_COUT << "******************* addSet step completed!!!!!!!" << endl;
1281  } else {
1282  PDB_COUT << "This is not Manager Catalog Node, thus metadata was only "
1283  "registered locally!"
1284  << endl;
1285  }
1286  return true;
1287 }
1288 
1289 // add Metadata about a new Database into the Catalog
1290 bool CatalogServer::addDatabase(std::string databaseName, std::string &errMsg) {
1291 
1292  // don't add a database that is already registered
1293  if (isDatabaseRegistered(databaseName) == true) {
1294  errMsg = "Database name already exists.\n";
1295  return false;
1296  }
1297 
1298  PDB_COUT << "...... Calling CatalogServer :: addDatabase" << endl;
1299 
1300  int catalogType = PDBCatalogMsgType::CatalogPDBDatabase;
1301  Handle<CatalogDatabaseMetadata> metadataObject =
1302  makeObject<CatalogDatabaseMetadata>();
1303 
1304 
1305  // populates object metadata
1306  String dbName = String(databaseName);
1307  metadataObject->setItemName(dbName);
1308 
1309  // adds Metadata into the Catalog
1310  if (isDatabaseRegistered(databaseName) == false) {
1311  pdbCatalog->addMetadataToCatalog(metadataObject, catalogType,
1312  errMsg);
1313  } else {
1314  pdbCatalog->updateMetadataInCatalog(metadataObject, catalogType, errMsg);
1315  }
1316 
1317  // after it registered the database metadata in the local catalog, if this is
1318  // the manager catalog iterate over all nodes in the cluster and broadcast the
1319  // update to the distributed copies of the catalog
1320  if (isManagerCatalogServer) {
1321 
1322  // get the results of each broadcast
1323  map<string, pair<bool, string>> updateResults;
1324 
1325  broadcastCatalogUpdate(metadataObject, updateResults, errMsg);
1326 
1327  for (auto &item : updateResults) {
1328  PDB_COUT << "DB metadata broadcasted to node IP: "
1329  << item.first + ((item.second.first == true)
1330  ? " updated correctly!"
1331  : " couldn't be updated due to error: ")
1332  << item.second.second << endl;
1333  }
1334  PDB_COUT << "******************* addDatabase step completed!!!!!!!" << endl;
1335  } else {
1336  PDB_COUT << "This is not Manager Catalog Node, thus metadata was only "
1337  "registered locally!"
1338  << endl;
1339  }
1340 
1341  return true;
1342 }
1343 
1344 // delete Metadata about a Database into the Catalog
1345 bool CatalogServer::deleteDatabase(std::string databaseName,
1346  std::string &errMsg) {
1347 
1348  // don't delete a database that doesn't exist
1349  if (isDatabaseRegistered(databaseName) == false) {
1350  errMsg = "Database does not exist.\n";
1351  return false;
1352  }
1353 
1354  int catalogType = PDBCatalogMsgType::CatalogPDBDatabase;
1355  Handle<CatalogDatabaseMetadata> dbMetadataObject =
1356  makeObject<CatalogDatabaseMetadata>();
1357  Handle<Vector<CatalogDatabaseMetadata>> vectorResultItems =
1358  makeObject<Vector<CatalogDatabaseMetadata>>();
1359 
1360  if (pdbCatalog->getMetadataFromCatalog(false, databaseName, vectorResultItems,
1361  errMsg, catalogType) == false) {
1362  errMsg = "Database does not exist.\n";
1363  return false;
1364  }
1365 
1366  if ((*vectorResultItems).size() == 0) {
1367  errMsg = "Database does not exist.\n";
1368  return false;
1369  }
1370 
1371  *dbMetadataObject = (*vectorResultItems)[0];
1372 
1373  // Delete the sets of this Database from the Catalog
1374  for (int i = 0; i < dbMetadataObject->getListOfSets()->size(); i++) {
1375  if (!deleteSet(databaseName, (*dbMetadataObject->getListOfSets())[i],
1376  errMsg)) {
1377  errMsg = "Failed to delete set ";
1378  errMsg += (*dbMetadataObject->getListOfSets())[i].c_str();
1379  }
1380  }
1381 
1382  // after it deleted the database metadata in the local catalog, if this is the
1383  // manager catalog iterate over all nodes in the cluster and broadcast the
1384  // update to the distributed copies of the catalog
1385  if (isManagerCatalogServer) {
1386  // get the results of each broadcast
1387  map<string, pair<bool, string>> updateResults;
1388  errMsg = "";
1389 
1390  Handle<CatDeleteDatabaseRequest> databaseToRemove =
1391  makeObject<CatDeleteDatabaseRequest>(databaseName);
1392  broadcastCatalogDelete(databaseToRemove, updateResults, errMsg);
1393 
1394  for (auto &item : updateResults) {
1395  // adds node info to database metadata
1396  PDB_COUT << "DB metadata broadcasted to node IP: " << item.first
1397  << ((item.second.first == true)
1398  ? " deleted correctly!"
1399  : " couldn't be deleted due to error: ")
1400  << item.second.second << endl;
1401  }
1402  } else {
1403  PDB_COUT << "This is not Manager Catalog Node, thus metadata was only "
1404  "registered locally!"
1405  << endl;
1406  }
1407 
1408  // deletes the metadata in the Catalog
1409  return pdbCatalog->deleteMetadataInCatalog(dbMetadataObject, catalogType,
1410  errMsg);
1411 }
1412 
1413 // destructor
1414 CatalogServer::~CatalogServer() { pthread_mutex_destroy(&workingMutex); }
1415 
1416 // constructor
1417 CatalogServer::CatalogServer(std::string catalogDirectoryIn,
1418  bool isManagerCatalogServer, string managerIPValue,
1419  int managerPortValue) {
1420  PDBLoggerPtr catalogLogger = make_shared<PDBLogger>("catalogLogger");
1421  catServerLogger = make_shared<pdb::PDBLogger>("catalogServer.log");
1422 
1423  managerIP = managerIPValue;
1424  managerPort = managerPortValue;
1425 
1427  CatalogClient(managerPort, managerIP,
1428  make_shared<pdb::PDBLogger>("clientCatalogToServerLog"));
1429 
1430  const UseTemporaryAllocationBlock tempBlock{1024 * 1024 * 128};
1431  _allNodesInCluster = makeObject<Vector<CatalogNodeMetadata>>();
1432  _setTypes = makeObject<Vector<CatalogSetMetadata>>();
1433  _allDatabases = makeObject<Vector<CatalogDatabaseMetadata>>();
1434  _udfsValues = makeObject<Vector<CatalogUserTypeMetadata>>();
1435 
1436  this->isManagerCatalogServer = isManagerCatalogServer;
1437 
1438  catalogDirectory = catalogDirectoryIn;
1439  PDB_COUT << "Catalog Server ctor is Manager Catalog= "
1440  << std::to_string(this->isManagerCatalogServer) << endl;
1441 
1442  // creates instance of catalog
1443  pdbCatalog = make_shared<PDBCatalog>(catalogLogger, catalogDirectory);
1444 
1445  // retrieves catalog metadata from an SQLite storage and loads metadata into
1446  // memory
1447  pdbCatalog->open();
1448 
1449  pthread_mutex_init(&workingMutex, nullptr);
1450 
1451  PDB_COUT << "Loading catalog metadata." << endl;
1452 
1453  string errMsg;
1454  string emptyString("");
1455 
1456  // retrieves metadata for user-defined types from SQLite storage and loads
1457  // them into memory
1458  if (pdbCatalog->getMetadataFromCatalog(
1459  false, emptyString, _udfsValues, errMsg,
1461 
1462  PDB_COUT << errMsg << endl;
1463 
1464  for (int i = 0; i < (*_udfsValues).size(); i++) {
1465  string _typeName = (*_udfsValues)[i].getItemKey().c_str();
1466  int16_t _typeId = (int16_t)atoi((*_udfsValues)[i].getObjectID().c_str());
1467 
1468  allTypeNames[_typeName] = _typeId;
1469  allTypeCodes[_typeId] = _typeName;
1470  }
1471 
1472  // retrieves metadata for databases from SQLite storage and loads them into
1473  // memory
1474  if (pdbCatalog->getMetadataFromCatalog(
1475  false, emptyString, _allDatabases, errMsg,
1477 
1478  PDB_COUT << errMsg << endl;
1479 
1480  for (int i = 0; i < (*_allDatabases).size(); i++) {
1481  string _dbName = (*_allDatabases)[i].getItemKey().c_str();
1482  for (int j = 0; j < (*(*_allDatabases)[i].getListOfSets()).size(); j++) {
1483  string _setName = (*(*_allDatabases)[i].getListOfSets())[j].c_str();
1484  string _typeName = (*(*_allDatabases)[i].getListOfTypes())[j].c_str();
1485 
1486  PDB_COUT << "Database " << _dbName << " has set " << _setName
1487  << " and type " << _typeName << endl;
1488 
1489  // populates information about types and sets for a given database
1490  PDB_COUT << "Adding type= " << _typeName << " db= " << _dbName
1491  << " _set=" << _setName << " typeId= "
1492  << string(pdbCatalog->getUserDefinedTypesList()[_typeName]
1493  .getObjectID())
1494  << endl;
1495 
1496  setTypes[make_pair(_dbName, _setName)] =
1497  (int16_t)std::atoi(pdbCatalog->getUserDefinedTypesList()[_typeName]
1498  .getObjectID()
1499  .c_str());
1500  }
1501  }
1502 
1503  // retrieves metadata for nodes in the cluster from SQLite storage and loads
1504  // them into memory
1505  if (pdbCatalog->getMetadataFromCatalog(
1506  false, emptyString, _allNodesInCluster, errMsg,
1508 
1509  PDB_COUT << errMsg << endl;
1510 
1511  for (int i = 0; i < (*_allNodesInCluster).size(); i++) {
1512  string _nodeAddress = (*_allNodesInCluster)[i].getItemId().c_str();
1513  string _nodeIP = (*_allNodesInCluster)[i].getNodeIP().c_str();
1514  int _nodePort = (*_allNodesInCluster)[i].getNodePort();
1515  string _nodeName = (*_allNodesInCluster)[i].getItemName().c_str();
1516  string _nodeType = (*_allNodesInCluster)[i].getNodeType().c_str();
1517  int status = (*_allNodesInCluster)[i].getNodeStatus();
1518 
1519  PDB_COUT << _nodeAddress << " | " << _nodeIP << " | "
1520  << std::to_string(_nodePort) << " | " << _nodeName << " | "
1521  << _nodeType << " | " << std::to_string(status) << endl;
1522 
1523  allNodesInCluster.push_back(_nodeAddress);
1524  }
1525 
1526  PDB_COUT << "Catalog Metadata successfully loaded!" << endl;
1527 }
1528 
1529 // invokes a method to retrieve metadata that has changed since a given
1530 // timestamp
1532  Handle<CatalogPrintMetadata> &metadataToPrint) {
1533 
1534  string itemKey = metadataToPrint->getItemName().c_str();
1535  string categoryToPrint = metadataToPrint->getCategoryToPrint().c_str();
1536  string timeStamp = metadataToPrint->getTimeStamp().c_str();
1537 
1538  string resultToPrint;
1539  string errorMsg;
1540 
1541  if (categoryToPrint.compare("all") == 0) {
1542  pdbCatalog->listNodesInCluster(resultToPrint, errorMsg);
1543  pdbCatalog->listRegisteredDatabases(resultToPrint, errorMsg);
1544  pdbCatalog->listUserDefinedTypes(resultToPrint, errorMsg);
1545  } else {
1546 
1547  if (categoryToPrint.compare("databases") == 0)
1548  pdbCatalog->listRegisteredDatabases(resultToPrint, errorMsg);
1549 
1550  if (categoryToPrint.compare("sets") == 0)
1551  pdbCatalog->listRegisteredSetsForADatabase(resultToPrint, itemKey,
1552  errorMsg);
1553 
1554  if (categoryToPrint.compare("nodes") == 0)
1555  pdbCatalog->listNodesInCluster(resultToPrint, errorMsg);
1556 
1557  if (categoryToPrint.compare("udts") == 0)
1558  pdbCatalog->listUserDefinedTypes(resultToPrint, errorMsg);
1559  }
1560 
1561  metadataToPrint->setMetadataToPrint(resultToPrint);
1562 
1563 }
1564 
1565 // add metadata about a Node in the cluster
1567  std::string &errMsg) {
1568 
1569  string _nodeIP = nodeMetadata->getNodeIP().c_str();
1570  string nodeAddress = _nodeIP + ":" + to_string(nodeMetadata->getNodePort());
1571 
1572  // don't add a node that is already registered
1573  if (isNodeRegistered(nodeAddress) == true) {
1574  errMsg = "NodeAddress " + nodeAddress + " is already registered.\n";
1575  return false;
1576  }
1577 
1578  // add the node info to container
1579  allNodesInCluster.push_back(nodeAddress);
1580 
1581  int metadataCategory = PDBCatalogMsgType::CatalogPDBNode;
1582  Handle<CatalogNodeMetadata> metadataObject =
1583  makeObject<CatalogNodeMetadata>();
1584 
1585  *metadataObject = *nodeMetadata;
1586 
1587  pdbCatalog->addMetadataToCatalog(metadataObject,
1588  metadataCategory, errMsg);
1589 
1590  PDB_COUT << "Node metadata was properly registered in the Catalog!" << endl;
1591 
1592  return true;
1593 }
1594 
1595 // adds Metadata of a new Database to the Catalog
1597  Handle<CatalogDatabaseMetadata> &dbMetadata, std::string &errMsg) {
1598  string dbName = dbMetadata->getItemName().c_str();
1599 
1600  // don't add a database that is already registered
1601  if (isDatabaseRegistered(dbName) == true) {
1602  errMsg = "Db name: " + dbName + " is already registered.\n";
1603  return false;
1604  }
1605 
1606  int metadataCategory = PDBCatalogMsgType::CatalogPDBDatabase;
1607  Handle<CatalogDatabaseMetadata> metadataObject =
1608  makeObject<CatalogDatabaseMetadata>();
1609 
1610  *metadataObject = *dbMetadata;
1611 
1612  pdbCatalog->addMetadataToCatalog(metadataObject,
1613  metadataCategory, errMsg);
1614 
1615  // after it registered the Database metadata in the local catalog, if this is
1616  // the manager catalog iterate over all nodes in the cluster and broadcast the
1617  // insert to the distributed copies of the catalog
1618  if (isManagerCatalogServer) {
1619  // get the results of each broadcast
1620  map<string, pair<bool, string>> updateResults;
1621  errMsg = "";
1622  broadcastCatalogUpdate(metadataObject, updateResults, errMsg);
1623 
1624  for (auto &item : updateResults) {
1625  PDB_COUT << "Node IP: " << item.first
1626  << ((item.second.first == true)
1627  ? " updated correctly!"
1628  : " couldn't be updated due to error: ")
1629  << item.second.second << endl;
1630  }
1631  } else {
1632  PDB_COUT << "This is not Manager Catalog Node, thus metadata was only "
1633  "registered locally!"
1634  << endl;
1635  }
1636 
1637  return true;
1638 }
1639 
1640 // updates metadata about a Database that has changed in the catalog
1642  Handle<CatalogDatabaseMetadata> &dbMetadata, std::string &errMsg) {
1643 
1644  string dbName = dbMetadata->getItemName().c_str();
1645 
1646  int metadataCategory = PDBCatalogMsgType::CatalogPDBDatabase;
1647  Handle<CatalogDatabaseMetadata> metadataObject =
1648  makeObject<CatalogDatabaseMetadata>();
1649  *metadataObject = *dbMetadata;
1650 
1651  pdbCatalog->updateMetadataInCatalog(metadataObject, metadataCategory, errMsg);
1652 
1653  // after it updates the Database metadata in the local catalog, if this is the
1654  // manager catalog iterate over all nodes in the cluster and broadcast the
1655  // insert to the distributed copies of the catalog
1656  if (isManagerCatalogServer) {
1657  // get the results of each broadcast
1658  map<string, pair<bool, string>> updateResults;
1659  errMsg = "";
1660  broadcastCatalogUpdate(metadataObject, updateResults, errMsg);
1661 
1662  for (auto &item : updateResults) {
1663  PDB_COUT << "Node IP: "
1664  << item.first + ((item.second.first == true)
1665  ? " updated correctly!"
1666  : " couldn't be updated due to error: ")
1667  << item.second.second << endl;
1668  }
1669  } else {
1670  PDB_COUT << "This is not Manager Catalog Node, thus metadata was only "
1671  "registered locally!"
1672  << endl;
1673  }
1674 
1675  return true;
1676 }
1677 
1678 // adds Metadata of a new Set into the Catalog
1680  std::string &errMsg) {
1681  // gets the set name
1682  string setName = string(setMetadata->getItemName().c_str());
1683  // gets the database name
1684  string dbName = string(setMetadata->getDBName().c_str());
1685  // gets the type Id
1686  int16_t typeId = (int16_t)atoi((*setMetadata).getObjectTypeId().c_str());
1687 
1688  // don't add a set that is already registered
1689  if (isSetRegistered(dbName, setName) == true) {
1690  errMsg =
1691  "Set name: " + dbName + "." + setName + " is already registered.\n";
1692  return false;
1693  }
1694 
1695  PDB_COUT << "inserting set-----------------> dbName= " << dbName
1696  << " setName " << setName << " id " << std::to_string(typeId)
1697  << endl;
1698  setTypes.insert(make_pair(make_pair(dbName, setName), typeId));
1699 
1700  int metadataCategory = PDBCatalogMsgType::CatalogPDBSet;
1701  Handle<CatalogSetMetadata> metadataObject = makeObject<CatalogSetMetadata>();
1702 
1703  *metadataObject = *setMetadata;
1704 
1705  PDB_COUT << "Adding set metadata for set " << setName << endl;
1706  pdbCatalog->addMetadataToCatalog(metadataObject,
1707  metadataCategory, errMsg);
1708 
1709  // after it registered the Set metadata in the local catalog, if this is the
1710  // manager catalog iterate over all nodes in the cluster and broadcast the
1711  // insert to the distributed copies of the catalog
1712  if (isManagerCatalogServer) {
1713  // get the results of each broadcast
1714  map<string, pair<bool, string>> updateResults;
1715 
1716  broadcastCatalogUpdate(metadataObject, updateResults, errMsg);
1717 
1718  for (auto &item : updateResults) {
1719  PDB_COUT << "Node IP: "
1720  << item.first + ((item.second.first == true)
1721  ? " updated correctly!"
1722  : " couldn't be updated due to error: ")
1723  << item.second.second << endl;
1724  }
1725  } else {
1726  PDB_COUT << "This is not Manager Catalog Node, thus metadata was only "
1727  "registered locally!"
1728  << endl;
1729  }
1730  return true;
1731 }
1732 
1733 // Adds the IP of a node to a given set
1734 bool CatalogServer::addNodeToSet(std::string nodeIP, std::string databaseName,
1735  std::string setName, std::string &errMsg) {
1736  // make sure we are only adding to an existing database
1737  if (isDatabaseRegistered(databaseName) == false) {
1738  errMsg = "Database does not exist.\n";
1739  return false;
1740  }
1741 
1742  // make sure we are only adding to an existing Set
1743  if (isSetRegistered(databaseName, setName) == false) {
1744  errMsg = "Set does not exists.\n";
1745  return false;
1746  }
1747 
1748  PDB_COUT << "...... Calling CatalogServer :: addNodeToSet" << endl;
1749 
1750  // prepares object for the DB metadata
1751  int catalogType = PDBCatalogMsgType::CatalogPDBDatabase;
1752  Handle<CatalogDatabaseMetadata> dbMetadataObject =
1753  makeObject<CatalogDatabaseMetadata>();
1754 
1756  makeObject<Vector<CatalogDatabaseMetadata>>();
1757 
1758  if (pdbCatalog->getMetadataFromCatalog(false, databaseName, databaseItems,
1759  errMsg, catalogType) == false)
1760  PDB_COUT << errMsg << endl;
1761 
1762  for (int i = 0; i < (*databaseItems).size(); i++) {
1763  if ((*databaseItems)[i].getItemKey().c_str() == databaseName)
1764  *dbMetadataObject = (*databaseItems)[i];
1765  }
1766 
1767  String setNameCatalog(setName);
1768  String nodeID(nodeIP);
1769 
1770  // add Set to Map
1771  (*dbMetadataObject).addSetToMap(setNameCatalog, nodeID);
1772 
1773  // updates the corresponding database metadata
1775 
1776  // if database exists update its metadata
1777  if (isDatabaseRegistered(databaseName) == true) {
1778  PDB_COUT << ".......... Invoking updateMetadataInCatalog key: "
1779  << databaseName << endl;
1780  if (pdbCatalog->updateMetadataInCatalog(dbMetadataObject, catalogType,
1781  errMsg) == true)
1782  PDB_COUT << "DB Update Set metadata OK" << endl;
1783  else {
1784  PDB_COUT << "DB Update metadata Set Error: " << errMsg << endl;
1785  return false;
1786  }
1787  } else {
1788  return false;
1789  }
1790 
1791  // after it registered the Set metadata in the local catalog, if this is the
1792  // manager catalog iterate over all nodes in the cluster and broadcast the
1793  // insert to the distributed copies of the catalog
1794  if (isManagerCatalogServer) {
1795  PDB_COUT << "About to broadcast addition of node to set in the cluster: "
1796  << endl;
1797 
1798  // map to capture the results of broadcasting the DB update
1799  map<string, pair<bool, string>> updateSetResults;
1800 
1801  if (broadcastCatalogUpdate(dbMetadataObject, updateSetResults, errMsg)) {
1802  PDB_COUT << " Broadcasting DB updated Ok. " << endl;
1803  } else {
1804  PDB_COUT << " Error broadcasting DB update." << endl;
1805  }
1806  for (auto &item : updateSetResults) {
1807  PDB_COUT << "Node IP: "
1808  << item.first + ((item.second.first == true)
1809  ? " updated correctly!"
1810  : " couldn't be updated due to error: ")
1811  << item.second.second << endl;
1812  }
1813  } else {
1814  PDB_COUT << "This is not Manager Catalog Node, thus metadata was only "
1815  "registered locally!"
1816  << endl;
1817  }
1818 
1819  return true;
1820 }
1821 
1822 // adds Node Information to a registered Database
1823 bool CatalogServer::addNodeToDB(std::string nodeIP, std::string databaseName,
1824  std::string &errMsg) {
1825  // make sure we are only adding to an existing database
1826  if (isDatabaseRegistered(databaseName) == false) {
1827  errMsg = "Database does not exist.\n";
1828  return false;
1829  }
1830 
1831  PDB_COUT << "...... Calling CatalogServer :: addNodeToSet" << endl;
1832 
1833  // prepares data for the DB metadata
1834  int catalogType = PDBCatalogMsgType::CatalogPDBDatabase;
1835  Handle<CatalogDatabaseMetadata> dbMetadataObject =
1836  makeObject<CatalogDatabaseMetadata>();
1837 
1839  makeObject<Vector<CatalogDatabaseMetadata>>();
1840 
1841  if (pdbCatalog->getMetadataFromCatalog(false, databaseName, databaseItems,
1842  errMsg, catalogType) == false)
1843  PDB_COUT << errMsg << endl;
1844 
1845  for (int i = 0; i < (*databaseItems).size(); i++) {
1846  if ((*databaseItems)[i].getItemKey().c_str() == databaseName)
1847  *dbMetadataObject = (*databaseItems)[i];
1848  }
1849 
1850  String nodeID(nodeIP);
1851  String dbName(databaseName);
1852  (*dbMetadataObject).addNodeToMap(nodeID, dbName);
1853 
1854  // updates the corresponding database metadata
1856 
1857  // if database exists update its metadata
1858  if (isDatabaseRegistered(databaseName) == true) {
1859  PDB_COUT << ".......... Invoking updateMetadataInCatalog key: " +
1860  databaseName
1861  << endl;
1862  if (pdbCatalog->updateMetadataInCatalog(dbMetadataObject, catalogType,
1863  errMsg) == true)
1864  PDB_COUT << "DB Update Set metadata OK" << endl;
1865  else {
1866  PDB_COUT << "DB Update metadata Set Error: " << errMsg << endl;
1867  return false;
1868  }
1869 
1870  } else {
1871  return false;
1872  }
1873 
1874  // after it registered the Set metadata in the local catalog, if this is the
1875  // managervcatalog iterate over all nodes in the cluster and broadcast the
1876  // insert to the distributed copies of the catalog
1877  if (isManagerCatalogServer) {
1878  PDB_COUT << "About to broadcast addition of node to set in the cluster: "
1879  << endl;
1880 
1881  // map to capture the results of broadcasting the DB update
1882  map<string, pair<bool, string>> updateSetResults;
1883 
1884  // second, broadcasts the metadata of the DB to which this set has been
1885  // added, updating all distributed copies of the catalog in the cluster
1886  if (broadcastCatalogUpdate(dbMetadataObject, updateSetResults, errMsg)) {
1887  PDB_COUT << " Broadcasting DB updated Ok. " << endl;
1888  } else {
1889  PDB_COUT << " Error broadcasting DB update." << endl;
1890  }
1891  for (auto &item : updateSetResults) {
1892  PDB_COUT << "Node IP: "
1893  << item.first + ((item.second.first == true)
1894  ? " updated correctly!"
1895  : " couldn't be updated due to error: ")
1896  << item.second.second << endl;
1897  }
1898 
1899  } else {
1900  PDB_COUT << "This is not Manager Catalog Node, thus metadata was only "
1901  "registered locally!"
1902  << endl;
1903  }
1904 
1905  return true;
1906 }
1907 
1908 // removes node information from a Set
1909 bool CatalogServer::removeNodeFromSet(std::string nodeIP,
1910  std::string databaseName,
1911  std::string setName,
1912  std::string &errMsg) {
1913  string setUniqueId = databaseName + "." + setName;
1914 
1915  // make sure the database exists
1916  if (!isDatabaseRegistered(databaseName)) {
1917  errMsg = "Database does not exist.\n";
1918  PDB_COUT << errMsg << endl;
1919  return false;
1920  }
1921 
1922  // make sure the set exists
1923  if (!isSetRegistered(databaseName, setName)) {
1924  errMsg = "Set doesn't exist.\n";
1925  PDB_COUT << errMsg << endl;
1926  return false;
1927  }
1928 
1929  int catalogType = PDBCatalogMsgType::CatalogPDBDatabase;
1930  Handle<CatalogDatabaseMetadata> dbMetadataObject =
1931  makeObject<CatalogDatabaseMetadata>();
1932 
1934  makeObject<Vector<CatalogDatabaseMetadata>>();
1935 
1936  if (!pdbCatalog->getMetadataFromCatalog(false, databaseName, databaseItems,
1937  errMsg, catalogType)) {
1938  PDB_COUT << errMsg << endl;
1939  return false;
1940  }
1941 
1942  if (databaseItems->size() != 1) {
1943  errMsg = "Could not find database " + databaseName;
1944  PDB_COUT << errMsg << endl;
1945  return false;
1946  }
1947 
1948  *dbMetadataObject = (*databaseItems)[0];
1949 
1950  dbMetadataObject->removeNodeFromSet(nodeIP, setName);
1951 
1952  // updates catalog
1953  if (pdbCatalog->updateMetadataInCatalog(dbMetadataObject, catalogType,
1954  errMsg)) {
1955  PDB_COUT << "DB Update Set metadata OK" << endl;
1956  } else {
1957  PDB_COUT << "DB Update metadata Set Error: " << errMsg << endl;
1958  return false;
1959  }
1960 
1961  // after it registered the Set metadata in the local catalog, if this is the
1962  // manager catalog iterate over all nodes in the cluster and broadcast the
1963  // insert to the distributed copies of the catalog
1964  if (isManagerCatalogServer) {
1965  // map to capture the results of broadcasting the Set insertion
1966  map<string, pair<bool, string>> updateResults;
1967 
1968  // first, broadcasts the metadata of the new set to all local copies of the
1969  // catalog in the cluster, inserting the new item
1970  broadcastCatalogUpdate(dbMetadataObject, updateResults, errMsg);
1971 
1972  for (auto &item : updateResults) {
1973  PDB_COUT << "Node IP: " << item.first
1974  << ((item.second.first == true)
1975  ? " updated correctly!"
1976  : " couldn't be updated due to error: ")
1977  << item.second.second << endl;
1978  }
1979 
1980  // map to capture the results of broadcasting the DB update
1981  map<string, pair<bool, string>> updateSetResults;
1982 
1983  // second, broadcasts the metadata of the DB to which this set has been
1984  // added, updating all local copies of the catalog in the cluster
1985  broadcastCatalogUpdate(dbMetadataObject, updateSetResults, errMsg);
1986 
1987  for (auto &item : updateSetResults) {
1988  PDB_COUT << "DB Metadata broadcasted to Node IP: "
1989  << item.first + ((item.second.first == true)
1990  ? " updated correctly!"
1991  : " couldn't be updated due to error: ")
1992  << item.second.second << endl;
1993  }
1994  } else {
1995  PDB_COUT << "This is not Manager Catalog Node, thus metadata was only "
1996  "registered locally!"
1997  << endl;
1998  }
1999  return true;
2000 }
2001 
2002 // removes a node ip from a registered Database
2003 bool CatalogServer::removeNodeFromDB(std::string nodeIP,
2004  std::string databaseName,
2005  std::string &errMsg) {
2006  errMsg = "Remove node from db not implemented";
2007  return false;
2008 }
2009 
2010 // templated method for broadcasting a Catalog Update to nodes in the cluster
2011 template <class Type>
2013  Handle<Type> metadataToSend,
2014  map<string, pair<bool, string>> &broadcastResults, string &errMsg) {
2015 
2016  PDBLoggerPtr catalogLogger = make_shared<PDBLogger>("distCatalogLogger");
2017 
2018  for (auto &item : pdbCatalog->getListOfNodesInCluster()) {
2019  string nodeAddress = string(item.second.getNodeIP().c_str()) + ":" +
2020  to_string(item.second.getNodePort());
2021  string nodeIP = item.second.getNodeIP().c_str();
2022  int nodePort = item.second.getNodePort();
2023  bool res = false;
2024 
2025  CatalogClient clusterCatalogClient =
2026  CatalogClient(nodePort, nodeIP, catalogLogger);
2027 
2028  if (string(item.second.getNodeType().c_str()).compare("manager") != 0) {
2029 
2030  // sends the request to a node in the cluster
2031  res =
2032  clusterCatalogClient.registerGenericMetadata(metadataToSend, errMsg);
2033 
2034  // adds the result of the update
2035  broadcastResults.insert(make_pair(nodeIP, make_pair(res, errMsg)));
2036  }
2037  }
2038 
2039  return true;
2040 }
2041 
2042 } // namespace
2043 
2044 // templated method for broadcasting a Catalog Deletion to nodes in the cluster
2045 template <class Type>
2046 bool CatalogServer::broadcastCatalogDelete(
2047  Handle<Type> metadataToSend,
2048  map<string, pair<bool, string>> &broadcastResults, string &errMsg) {
2049 
2050  PDBLoggerPtr catalogLogger = make_shared<PDBLogger>("distCatalogLogger");
2051 
2052  for (auto &item : pdbCatalog->getListOfNodesInCluster()) {
2053 
2054  string nodeAddress = string(item.second.getNodeIP().c_str()) + ":" +
2055  to_string(item.second.getNodePort());
2056  string nodeIP = item.second.getNodeIP().c_str();
2057  int nodePort = item.second.getNodePort();
2058  bool res = false;
2059 
2060  CatalogClient clusterCatalogClient =
2061  CatalogClient(nodePort, nodeIP, catalogLogger);
2062 
2063  if (string(item.second.getNodeType().c_str()).compare("manager") != 0) {
2064 
2065  // sends the request to a node in the cluster
2066  res = clusterCatalogClient.deleteGenericMetadata(metadataToSend, errMsg);
2067 
2068  // adds the result of the update
2069  broadcastResults.insert(make_pair(nodeIP, make_pair(res, errMsg)));
2070 
2071  } else {
2072  PDB_COUT << "Don't broadcast to " + nodeAddress +
2073  " because it has the manager catalog."
2074  << endl;
2075  }
2076  }
2077 
2078  return true;
2079 }
2080 
2081 // checks if a Database is registered
2083  int catalogType = PDBCatalogMsgType::CatalogPDBDatabase;
2084  string result("");
2085 
2086  return pdbCatalog->keyIsFound(catalogType, dbName, result);
2087 }
2088 
2089 // checks if a Set is registered
2090 bool CatalogServer::isSetRegistered(string dbName, string setName) {
2091  int catalogType = PDBCatalogMsgType::CatalogPDBSet;
2092  string result("");
2093  string setUniqueId = dbName + "." + setName;
2094  return pdbCatalog->keyIsFound(catalogType, setUniqueId, result);
2095 }
2096 
2097 // checks if a Node is registered
2098 bool CatalogServer::isNodeRegistered(string nodeIP) {
2099  int catalogType = PDBCatalogMsgType::CatalogPDBNode;
2100  string result("");
2101  PDB_COUT << "searching to register node " << nodeIP << endl;
2102 
2103  return pdbCatalog->keyIsFound(catalogType, nodeIP, result);
2104 }
2105 
2106 // checks if this is the Manager Catalog
2108  return isManagerCatalogServer;
2109 }
2110 
2111 // sets this as a Manager (true) or Worker (false) Catalog
2112 void CatalogServer::setIsManagerCatalogServer(bool isManagerCatalogServerIn) {
2113  isManagerCatalogServer = isManagerCatalogServerIn;
2114 }
2115 
2116 /* Explicit instantiation to broadcast Catalog Updates for a Node */
2118  Handle<CatalogNodeMetadata> metadataToSend,
2119  map<string, pair<bool, string>> &broadcastResults,
2120  string &errMsg);
2121 
2122 /* Explicit instantiation to broadcast Catalog Updates for a Database */
2124  Handle<CatalogDatabaseMetadata> metadataToSend,
2125  map<string, pair<bool, string>> &broadcastResults,
2126  string &errMsg);
2127 
2128 /* Explicit instantiation to broadcast Catalog Updates for a Set */
2130  Handle<CatalogSetMetadata> metadataToSend,
2131  map<string, pair<bool, string>> &broadcastResults,
2132  string &errMsg);
int16_t addObjectType(int16_t typeID, string &soFile, string &errMsg)
void registerHandlers(PDBServer &forMe) override
bool deleteDatabase(string databaseName, string &errMsg)
bool deleteSet(std::string databaseName, std::string setName, std::string &errMsg)
bool addSet(int16_t typeIdentifier, string databaseName, string setName, string &errMsg)
Handle< Vector< CatalogSetMetadata > > _setTypes
map< int16_t, string > allTypeCodes
bool broadcastCatalogDelete(Handle< Type > metadataToSend, map< string, pair< bool, string >> &broadcastResults, string &errMsg)
bool addSetMetadata(Handle< CatalogSetMetadata > &setMetadata, std::string &errMsg)
map< pair< string, string >, int16_t > setTypes
bool addDatabaseMetadata(Handle< CatalogDatabaseMetadata > &dbMetadata, std::string &errMsg)
PDBCatalogPtr getCatalog()
vector< string > allNodesInCluster
PDBCatalogPtr pdbCatalog
bool getSharedLibrary(int16_t identifier, vector< char > &putResultHere, std::string &errMsg)
bool deleteGenericMetadata(pdb::Handle< Type > metadataItem, std::string &errMsg)
bool broadcastCatalogUpdate(Handle< Type > metadataToSend, map< string, pair< bool, string >> &broadcastResults, string &errMsg)
bool addNodeMetadata(Handle< CatalogNodeMetadata > &nodeMetadata, std::string &errMsg)
std::string catalogDirectory
int16_t searchForObjectTypeName(std::string objectTypeName)
bool getSharedLibraryByTypeName(std::string typeName, Handle< CatalogUserTypeMetadata > &typeMetadata, string &sharedLibraryBytes, std::string &errMsg)
bool removeNodeFromSet(std::string nodeIP, std::string databaseName, std::string setName, std::string &errMsg)
std::shared_ptr< PDBCommunicator > PDBCommunicatorPtr
Handle< Vector< CatalogNodeMetadata > > _allNodesInCluster
bool addNodeToSet(std::string nodeIP, std::string databaseName, std::string setName, std::string &errMsg)
PDBLoggerPtr catServerLogger
bool getSharedLibraryByTypeName(int16_t identifier, std::string &typeName, std::string sharedLibraryFileName, Handle< CatalogUserTypeMetadata > &typeMetadata, string &sharedLibraryBytes, std::string &errMsg)
bool updateDatabaseMetadata(Handle< CatalogDatabaseMetadata > &dbMetadata, std::string &errMsg)
shared_ptr< PDBCatalog > PDBCatalogPtr
Definition: PDBCatalog.h:77
#define PDB_COUT
Definition: PDBDebug.h:31
void registerHandler(int16_t typeID, PDBCommWorkPtr handledBy)
Definition: PDBServer.cc:81
bool isDatabaseRegistered(string dbName)
void setIsManagerCatalogServer(bool isManagerCatalogServerIn)
map< string, int16_t > allTypeNames
static int16_t lookupBuiltInType(std::string objectTypeName)
Definition: VTableMap.cc:41
std::shared_ptr< PDBLogger > PDBLoggerPtr
Definition: PDBLogger.h:40
bool removeNodeFromDB(std::string nodeIP, std::string databaseName, std::string &errMsg)
int16_t searchForObjectTypeName(string objectTypeName)
Handle< Vector< CatalogUserTypeMetadata > > _udfsValues
bool registerGenericMetadata(pdb::Handle< Type > metadataItem, std::string &errMsg)
bool addDatabase(string databaseName, string &errMsg)
pthread_mutex_t workingMutex
bool isNodeRegistered(string nodeIP)
bool isSetRegistered(string dbName, string setName)
Handle< Vector< CatalogDatabaseMetadata > > _allDatabases
CatalogClient catalogClientConnectionToManagerCatalogServer
int16_t getObjectType(string databaseName, string setName)
CatalogServer(std::string catalogDirectory, bool isManagerCatalogServer, std::string managerIP, int managerPort)
bool addNodeToDB(std::string nodeIP, std::string databaseName, std::string &errMsg)
bool getIsManagerCatalogServer()