19 #ifndef PANGEA_STORAGE_SERVER_C
20 #define PANGEA_STORAGE_SERVER_C
78 #ifdef ENABLE_COMPRESSION
82 #define FLUSH_BUFFER_SIZE 3
91 std::vector<Record<Vector<Handle<Object>>>*> records;
92 records.push_back(addMe);
94 sizes[databaseAndSet] = addMe->numBytes();
97 sizes[databaseAndSet] += addMe->numBytes();
99 return sizes[databaseAndSet];
111 this->
nodeId = conf->getNodeID();
130 this->
dbs =
new std::map<DatabaseID, DefaultDatabasePtr>();
131 this->
name2id =
new std::map<std::string, DatabaseID>();
132 this->
tempSets =
new std::map<SetID, TempSetPtr>();
134 this->
userSets =
new std::map<std::pair<DatabaseID, SetID>,
SetPtr>();
136 new std::map<std::pair<std::string, std::string>, std::pair<DatabaseID, SetID>>();
137 this->
typename2id =
new std::map<std::string, SetID>();
141 pthread_mutex_init(&(this->
typeLock),
nullptr);
148 this->
usersetSeqIds =
new std::map<std::string, SequenceID*>();
156 this->
addType(
"UnknownUserData", 0);
160 if (
names2ids->count(databaseAndSet) != 0) {
161 pair<DatabaseID, SetID> ids =
names2ids->at(databaseAndSet);
170 PDB_COUT <<
"to clean up for storage..." << std::endl;
181 while (a.second.size() > 0)
184 std::cout <<
"Now there are " <<
totalObjects <<
" new objects stored in storage" << std::endl;
185 PDB_COUT <<
"sleep for 1 second to wait for all data gets flushed" << std::endl;
187 PDB_COUT <<
"cleaned up for storage..." << std::endl;
194 pthread_mutex_destroy(&(this->
typeLock));
212 if (whichSet ==
nullptr) {
215 return whichSet->addPage();
222 bool directPutOrNot) {
229 size_t numBytesToProcess =
sizes[databaseAndSet];
230 PDB_COUT <<
"buffer is full, to write to a storage page" << std::endl;
234 if (myPage ==
nullptr) {
235 std::cout <<
"FATAL ERROR: set to store data doesn't exist!" << std::endl;
236 std::cout <<
"databaseName" << databaseAndSet.first << std::endl;
237 std::cout <<
"setName" << databaseAndSet.second << std::endl;
240 size_t pageSize = myPage->getSize();
245 int numObjectsInRecord;
257 while (allRecs.size() > 0) {
259 auto& allObjects = *(allRecs[allRecs.size() - 1]->getRootObject());
260 numObjectsInRecord = allObjects.size();
262 for (; pos < numObjectsInRecord; pos++) {
263 data->push_back(allObjects[pos]);
268 numBytesToProcess -= allRecs[allRecs.size() - 1]->numBytes();
269 free(allRecs[allRecs.size() - 1]);
277 PDB_COUT <<
"Write all of the bytes in the record.\n";
281 key.
dbId = myPage->getDbID();
282 key.
typeId = myPage->getTypeID();
283 key.
setId = myPage->getSetID();
284 key.
pageId = myPage->getPageID();
285 this->
getCache()->decPageRefCount(key);
286 if (flushOrNot ==
true) {
287 this->
getCache()->flushPageWithoutEviction(key);
294 std::cout <<
"Writing back a page!!\n";
296 if (data->size() == 0) {
298 <<
"FATAL ERROR: object size is larger than a page, pleases increase page size"
300 std::cout <<
"databaseName" << databaseAndSet.first << std::endl;
301 std::cout <<
"setName" << databaseAndSet.second << std::endl;
305 key.
dbId = myPage->getDbID();
306 key.
typeId = myPage->getTypeID();
307 key.
setId = myPage->getSetID();
308 key.
pageId = myPage->getPageID();
309 this->
getCache()->decPageRefCount(key);
310 if (flushOrNot ==
true) {
311 this->
getCache()->flushPageWithoutEviction(key);
315 if (numBytesToProcess + (((numObjectsInRecord - pos) / numObjectsInRecord) *
316 allRecs[allRecs.size() - 1]->numBytes()) >
320 pageSize = myPage->getSize();
327 void* myRAM = malloc(allRecs[allRecs.size() - 1]->numBytes());
329 allRecs[allRecs.size() - 1]->numBytes());
331 makeObject<Vector<Handle<Object>>>(numObjectsInRecord - pos);
334 auto& allObjects = *(allRecs[allRecs.size() - 1]->getRootObject());
335 for (; pos < numObjectsInRecord; pos++) {
336 extraData->push_back(allObjects[pos]);
340 numBytesToProcess -= allRecs[allRecs.size() - 1]->numBytes();
341 free(allRecs[allRecs.size() - 1]);
344 allRecs[allRecs.size() - 1] =
getRecord(extraData);
345 numBytesToProcess += allRecs[allRecs.size() - 1]->numBytes();
351 PDB_COUT <<
"Now all the records are back.\n";
352 sizes[databaseAndSet] = numBytesToProcess;
361 std::string& errMsg) {
363 FILE* myFile = fopen(path.c_str(),
"w+");
364 if (myFile == NULL) {
365 errMsg =
"Error opening file for writing: " + path;
366 std::cout << errMsg << std::endl;
371 getFunctionality<PangeaStorageServer>().
getSet(std::make_pair(dbName, setName));
372 if (setToExport ==
nullptr) {
373 errMsg =
"Error in exportToFile: set doesn't exist: " + dbName +
":" + setName;
374 std::cout << errMsg << std::endl;
378 bool isHeadWritten =
false;
379 setToExport->setPinned(
true);
380 std::vector<PageIteratorPtr>* pageIters = setToExport->getIterators();
381 int numIterators = pageIters->size();
382 for (
int i = 0; i < numIterators; i++) {
384 while (iter->hasNext()) {
386 if (nextPage !=
nullptr) {
390 int vecSize = inputVec->size();
391 for (
int j = 0; j < vecSize; j++) {
393 unsafeCast<ExportableObject, Object>((*inputVec)[j]);
394 if (isHeadWritten ==
false) {
395 std::string header = objectToExport->toSchemaString(format);
397 fprintf(myFile,
"%s", header.c_str());
399 isHeadWritten =
true;
401 std::string value = objectToExport->toValueString(format);
403 fprintf(myFile,
"%s", value.c_str());
409 key.
dbId = nextPage->getDbID();
410 key.
typeId = nextPage->getTypeID();
411 key.
setId = nextPage->getSetID();
412 key.
pageId = nextPage->getPageID();
413 cache->decPageRefCount(key);
414 cache->evictPage(key);
419 setToExport->setPinned(
false);
429 std::string hdfsNameNodeIp,
430 int hdfsNameNodePort,
433 std::string& errMsg) {
441 StorageCleanup_TYPEID,
444 PDB_COUT <<
"received StorageCleanup" << std::endl;
447 getFunctionality<PangeaStorageServer>().
cleanup(request->isFlushing());
452 res = sendUsingMe->sendObject(response, errMsg);
453 return make_pair(res, errMsg);
461 StorageAddDatabase_TYPEID,
464 PDB_COUT <<
"received StorageAddDatabase" << std::endl;
468 res = getFunctionality<PangeaStorageServer>().
addDatabase(request->getDatabase());
470 errMsg =
"Database already exists\n";
472 res = getFunctionality<CatalogServer>().
addDatabase(request->getDatabase(),
477 if ((res = getFunctionality<PangeaStorageServer>().
addDatabase(
478 request->getDatabase())) ==
false) {
479 errMsg =
"Database already exists\n";
487 res = sendUsingMe->sendObject(response, errMsg);
488 return make_pair(res, errMsg);
495 StorageAddSet_TYPEID,
498 PDB_COUT <<
"received StorageAddSet" << std::endl;
502 if (request->getPageSize() >
conf->getMaxPageSize()) {
503 errMsg =
"Error: page size is larger than maxPageSize\n";
504 std::cout << errMsg << std::endl;
508 PDB_COUT <<
"adding set in standalone mode" << std::endl;
509 res = getFunctionality<PangeaStorageServer>().
addSet(request->getDatabase(),
510 request->getTypeName(),
511 request->getSetName(),
512 request->getPageSize());
514 errMsg =
"Set " + request->getDatabase() +
":" + request->getSetName() +
515 ":" + request->getTypeName() +
" already exists\n";
518 PDB_COUT <<
"TypeID =" << typeID << std::endl;
520 errMsg =
"Could not find type " + request->getTypeName();
523 PDB_COUT <<
"to add set in catalog" << std::endl;
524 res = getFunctionality<CatalogServer>().
addSet(
525 typeID, request->getDatabase(), request->getSetName(), errMsg);
534 PDB_COUT <<
"creating set in Pangea in distributed environment...with setName="
535 << request->getSetName() << std::endl;
536 if ((res = getFunctionality<PangeaStorageServer>().
addSet(
537 request->getDatabase(),
538 request->getTypeName(),
539 request->getSetName(),
540 request->getPageSize())) ==
false) {
541 errMsg =
"Set " + request->getDatabase() +
":" + request->getSetName() +
542 ":" + request->getTypeName() +
" already exists\n";
543 cout << errMsg << endl;
547 PDB_COUT <<
"TypeID =" << typeID << std::endl;
550 errMsg =
"Could not find type " + request->getTypeName();
551 cout << errMsg << endl;
563 res = sendUsingMe->sendObject(response, errMsg);
564 return make_pair(res, errMsg);
570 StorageAddTempSet_TYPEID,
576 bool res = getFunctionality<PangeaStorageServer>().
addTempSet(
577 request->getSetName(), setId, request->getPageSize());
579 errMsg =
"TempSet " + request->getSetName() +
" already exists\n";
585 makeObject<StorageAddTempSetResult>(res, errMsg, setId);
590 return make_pair(res, errMsg);
595 StorageRemoveDatabase_TYPEID,
600 std::string databaseName = request->getDatabase();
602 PDB_COUT <<
"Deleting database " << databaseName << std::endl;
604 res = getFunctionality<PangeaStorageServer>().
removeDatabase(databaseName);
606 errMsg =
"Failed to delete database\n";
608 res = getFunctionality<CatalogServer>().deleteDatabase(databaseName, errMsg);
611 res = getFunctionality<PangeaStorageServer>().
removeDatabase(databaseName);
613 errMsg =
"Failed to delete database\n";
621 return make_pair(res, errMsg);
626 StorageRemoveUserSet_TYPEID,
630 std::string databaseName = request->getDatabase();
631 std::string typeName = request->getTypeName();
632 std::string setName = request->getSetName();
634 SetPtr setToRemove =
getSet(std::pair<std::string, std::string>(databaseName, setName));
635 if (setToRemove ==
nullptr) {
638 errMsg =
"Set doesn't exist.";
640 makeObject<SimpleRequestResult>(
false, errMsg);
643 res = sendUsingMe->sendObject(response, errMsg);
644 return make_pair(res, errMsg);
647 res = getFunctionality<PangeaStorageServer>().
removeSet(
648 databaseName, typeName, setName);
650 errMsg =
"Set doesn't exist\n";
654 res = getFunctionality<CatalogServer>().deleteSet(
655 request->getDatabase(), request->getSetName(), errMsg);
659 if ((res = getFunctionality<PangeaStorageServer>().
removeSet(databaseName,
660 setName)) ==
false) {
661 errMsg =
"Error removing set!\n";
662 cout << errMsg << endl;
670 res = sendUsingMe->sendObject(response, errMsg);
671 return make_pair(res, errMsg);
677 StorageClearSet_TYPEID,
681 std::string databaseName = request->getDatabase();
682 std::string typeName = request->getTypeName();
683 std::string setName = request->getSetName();
685 SetPtr set =
getSet(std::make_pair(databaseName, setName));
686 if (set ==
nullptr) {
688 errMsg =
"Set doesn't exist\n";
690 size_t pageSize = set->getPageSize();
692 PDB_COUT <<
"removing set in standalone mode" << std::endl;
693 res = getFunctionality<PangeaStorageServer>().
removeSet(
694 databaseName, typeName, setName);
696 errMsg =
"Set doesn't exist\n";
698 PDB_COUT <<
"adding set in standalone mode" << std::endl;
699 res = getFunctionality<PangeaStorageServer>().
addSet(request->getDatabase(),
700 request->getTypeName(),
701 request->getSetName(),
706 PDB_COUT <<
"removing set in cluster mode" << std::endl;
707 if ((res = getFunctionality<PangeaStorageServer>().
removeSet(
708 databaseName, setName)) ==
false) {
709 errMsg =
"Error removing set!\n";
710 cout << errMsg << endl;
712 if ((res = getFunctionality<PangeaStorageServer>().
addSet(
713 request->getDatabase(),
714 request->getTypeName(),
715 request->getSetName(),
716 pageSize)) ==
false) {
717 errMsg =
"Set already exists\n";
718 cout << errMsg << endl;
728 res = sendUsingMe->sendObject(response, errMsg);
729 return make_pair(res, errMsg);
737 StorageRemoveTempSet_TYPEID,
742 SetID setId = request->getSetID();
743 bool res = getFunctionality<PangeaStorageServer>().
removeTempSet(setId);
745 errMsg =
"Set doesn't exist\n";
752 res = sendUsingMe->sendObject(response, errMsg);
753 return make_pair(res, errMsg);
758 StorageRemoveHashSet_TYPEID,
765 if (communicatorToBackend->connectToLocalServer(
766 getFunctionality<PangeaStorageServer>().getLogger(),
769 std::cout << errMsg << std::endl;
771 }
else if (!communicatorToBackend->sendObject(request, errMsg)) {
772 std::cout << errMsg << std::endl;
773 errMsg = std::string(
"can't send message to backend: ") + errMsg;
776 PDB_COUT <<
"Storage sent request to backend" << std::endl;
780 std::cout <<
"Error waiting for backend to remove hash set. " << errMsg
782 errMsg = std::string(
"backend failed to remove hash set: ") + errMsg;
790 success = sendUsingMe->sendObject(response, errMsg);
791 return make_pair(success, errMsg);
796 StorageExportSet_TYPEID,
801 getFunctionality<PangeaStorageServer>().
exportToFile(request->getDbName(),
802 request->getSetName(),
803 request->getOutputFilePath(),
804 request->getFormat(),
811 res = sendUsingMe->sendObject(response, errMsg);
812 return make_pair(res, errMsg);
819 StorageAddObjectInLoop_TYPEID,
822 std::cout <<
"start StorageAddObjectInLoop" << std::endl;
824 bool everythingOK =
true;
826 void* requestInLoop =
nullptr;
827 while (curRequest->isLoopEnded() ==
false) {
828 bool typeCheckOrNot = request->isTypeCheck();
829 if (typeCheckOrNot ==
true) {
830 #ifdef DEBUG_SET_TYPE
833 int16_t typeID = getFunctionality<CatalogServer>().getObjectType(
834 request->getDatabase(), request->getSetName());
836 everythingOK =
false;
840 everythingOK =
false;
846 size_t numBytes = sendUsingMe->getSizeOfNextObject();
847 std::cout <<
"received " << numBytes <<
" bytes" << std::endl;
848 #ifdef ENABLE_COMPRESSION
849 char* readToHere =
new char[numBytes];
851 void* readToHere = malloc(numBytes);
853 everythingOK = sendUsingMe->receiveBytes(readToHere, errMsg);
858 makeObject<SimpleRequestResult>(everythingOK, errMsg);
861 everythingOK = sendUsingMe->sendObject(response, errMsg);
864 #ifdef ENABLE_COMPRESSION
865 size_t sizeOfBytesToAdd = 0;
866 snappy::GetUncompressedLength(readToHere, numBytes, &sizeOfBytesToAdd);
868 size_t sizeOfBytesToAdd = numBytes;
871 auto databaseAndSet = make_pair((std::string)request->getDatabase(),
872 (std::string)request->getSetName());
875 if (mySet ==
nullptr) {
876 std::cout <<
"FATAL ERROR: set to store data doesn't exist!" << std::endl;
877 std::cout <<
"databaseName" << databaseAndSet.first << std::endl;
878 std::cout <<
"setName" << databaseAndSet.second << std::endl;
880 false, std::string(
"FATAL ERROR: set to store data doesn't exist!"));
882 std::cout <<
"sizeOfBytesToAdd is " << sizeOfBytesToAdd << std::endl;
883 char* myBytes = (
char*)mySet->getNewBytes(sizeOfBytesToAdd);
884 if (myBytes ==
nullptr) {
885 return make_pair(
false,
886 std::string(
"FATAL ERROR: can't get bytes from user set " +
887 databaseAndSet.second));
889 #ifdef ENABLE_COMPRESSION
890 snappy::RawUncompress(readToHere, numBytes, myBytes);
892 memcpy(myBytes, readToHere, numBytes);
897 "Tried to add data of the wrong type to a database set or database set "
899 everythingOK =
false;
901 #ifdef ENABLE_COMPRESSION
907 numBytes = sendUsingMe->getSizeOfNextObject();
908 if (requestInLoop !=
nullptr) {
911 requestInLoop = malloc(numBytes);
913 requestInLoop, everythingOK, errMsg);
914 std::cout <<
"got new StorageAddObjectInLoop" << std::endl;
916 if (requestInLoop !=
nullptr) {
922 makeObject<SimpleRequestResult>(everythingOK, errMsg);
925 everythingOK = sendUsingMe->sendObject(response, errMsg);
927 std::cout <<
"end StorageAddObjectInLoop" << std::endl;
928 return make_pair(everythingOK, errMsg);
934 StorageAddData_TYPEID,
938 bool everythingOK =
true;
939 bool typeCheckOrNot = request->isTypeCheck();
940 if (typeCheckOrNot ==
true) {
941 #ifdef DEBUG_SET_TYPE
944 int16_t typeID = getFunctionality<CatalogServer>().getObjectType(
945 request->getDatabase(), request->getSetName());
947 everythingOK =
false;
951 everythingOK =
false;
957 size_t numBytes = sendUsingMe->getSizeOfNextObject();
958 bool compressedOrNot = request->isCompressed();
960 char* readToHere =
nullptr;
961 if (compressedOrNot ==
false) {
962 readToHere = (
char*)malloc(numBytes);
964 readToHere, everythingOK, errMsg);
966 char* temp =
new char[numBytes];
967 sendUsingMe->receiveBytes(temp, errMsg);
968 size_t uncompressedSize = 0;
969 snappy::GetUncompressedLength(temp, numBytes, &uncompressedSize);
970 readToHere = (
char*)malloc(uncompressedSize);
971 snappy::RawUncompress(temp, numBytes, (
char*)(readToHere));
978 if (objectsToStore->size() == 0) {
979 everythingOK =
false;
981 "Warning: client attemps to store a vector that contains zero objects, simply "
983 std::cout << errMsg << std::endl;
986 if (request->isFlushing() ==
false) {
989 makeObject<SimpleRequestResult>(everythingOK, errMsg);
992 everythingOK = sendUsingMe->sendObject(response, errMsg);
1001 auto databaseAndSet = make_pair((std::string)request->getDatabase(),
1002 (std::string)request->getSetName());
1004 SetPtr mySet = getFunctionality<PangeaStorageServer>().
getSet(databaseAndSet);
1005 size_t myPageSize = mySet->getPageSize();
1006 if (request->isDirectPut() ==
false) {
1012 size_t numBytesToProcess =
sizes[databaseAndSet];
1013 size_t rawPageSize = myPageSize;
1015 if (numBytesToProcess < rawPageSize) {
1016 PDB_COUT <<
"data is buffered, all buffered data size=" << numBytesToProcess
1020 std::cout <<
"Got the data.\n";
1021 std::cout <<
"Are " <<
sizes[databaseAndSet] <<
" bytes to write.\n";
1022 std::cout <<
"Page size is " << rawPageSize << std::endl;
1024 databaseAndSet, request->isFlushing());
1025 PDB_COUT <<
"Done with write back.\n";
1026 PDB_COUT <<
"Are " << sizes[databaseAndSet] <<
" bytes left.\n";
1032 if (myRecord->
numBytes() <= myPageSize) {
1034 getFunctionality<PangeaStorageServer>().
getNewPage(databaseAndSet);
1036 memcpy(myPage->getBytes(), readToHere, myRecord->
numBytes());
1039 key.
dbId = myPage->getDbID();
1040 key.typeId = myPage->getTypeID();
1041 key.setId = myPage->getSetID();
1042 key.pageId = myPage->getPageID();
1043 getFunctionality<PangeaStorageServer>().
getCache()->decPageRefCount(key);
1044 if (request->isFlushing() ==
true) {
1045 getFunctionality<PangeaStorageServer>()
1047 ->flushPageWithoutEviction(key);
1050 errMsg =
"Tried to directly put larger data than the page, size=" +
1051 std::to_string(myRecord->
numBytes());
1052 std::cout << errMsg << std::endl;
1053 everythingOK =
false;
1061 "Tried to add data of the wrong type to a database set or database set doesn't "
1063 everythingOK =
false;
1065 if (request->isFlushing() ==
true) {
1068 makeObject<SimpleRequestResult>(everythingOK, errMsg);
1071 everythingOK = sendUsingMe->sendObject(response, errMsg);
1073 return make_pair(everythingOK, errMsg);
1079 StorageGetData_TYPEID,
1085 SetPtr set = getFunctionality<PangeaStorageServer>().
getSet(
1086 std::pair<std::string, std::string>(request->getDatabase(), request->getSetName()));
1087 if (set ==
nullptr) {
1088 errMsg =
"Set doesn't exist.\n";
1091 int numPages = set->getNumPages();
1096 request->getDatabase(),
1097 request->getSetName(),
1099 set->getPageSize() -
1101 sizeof(
SetID) +
sizeof(
PageID) +
sizeof(
int) +
sizeof(size_t)),
1104 res = sendUsingMe->sendObject(response, errMsg);
1106 if (getFunctionality<PangeaStorageServer>().
isStandalone() ==
true) {
1108 PDB_COUT <<
"TypeID =" << typeID << std::endl;
1110 errMsg =
"Could not find type " + request->getType();
1113 getFunctionality<PangeaStorageServer>().
getCache()->pin(set,
MRU,
Read);
1114 std::vector<PageIteratorPtr>* iterators = set->getIterators();
1115 int numIterators = iterators->size();
1116 int numPagesSent = 0;
1117 for (
int i = 0; i < numIterators; i++) {
1119 while (iter->hasNext()) {
1121 if (page !=
nullptr) {
1122 PDB_COUT <<
"to send the " << numPagesSent <<
"-th page"
1124 res = sendUsingMe->sendBytes(
1125 page->getRawBytes(), page->getRawSize(), errMsg);
1128 std::cout <<
"sending data failed\n";
1129 return make_pair(res, errMsg);
1140 return make_pair(res, errMsg);
1145 StorageCollectStats_TYPEID,
1151 makeObject<StorageCollectStatsResponse>();
1153 makeObject<Vector<Handle<SetIdentifier>>>();
1155 for (std::map<std::pair<DatabaseID, SetID>,
SetPtr>::iterator it =
1159 std::pair<DatabaseID, SetID> idPair = it->first;
1161 std::string setName = set->getSetName();
1162 int numPages = set->getNumPages();
1165 std::string dbName = db->getDatabaseName();
1167 setIdentifier->setNumPages(numPages);
1168 setIdentifier->setPageSize(set->getPageSize());
1169 stats->push_back(setIdentifier);
1171 response->setStats(stats);
1173 return make_pair(res, errMsg);
1178 StoragePinPage_TYPEID,
1183 UserTypeID typeId = request->getUserTypeID();
1184 SetID setId = request->getSetID();
1185 PageID pageId = request->getPageID();
1186 bool wasNewPage = request->getWasNewPage();
1188 PDB_COUT <<
"to pin page in set with setId=" << setId << std::endl;
1195 if ((dbId == 0) && (typeId == 0)) {
1197 set = getFunctionality<PangeaStorageServer>().
getTempSet(setId);
1200 set = getFunctionality<PangeaStorageServer>().
getSet(dbId, typeId, setId);
1203 if (set !=
nullptr) {
1204 if (wasNewPage ==
true) {
1205 page = set->addPage();
1209 PageIndex index = meta->getPageIndex(pageId);
1214 if (page !=
nullptr) {
1216 std::string(
"Handling StoragePinPage: page is not null, we build the "
1217 "StoragePagePinned message"));
1220 ack->setMorePagesToLoad(
true);
1221 ack->setDatabaseID(dbId);
1222 ack->setUserTypeID(typeId);
1223 ack->setSetID(setId);
1224 ack->setPageID(page->getPageID());
1225 ack->setPageSize(page->getRawSize());
1226 ack->setSharedMemOffset(page->getOffset());
1228 std::string(
"Handling StoragePinPage: to send StoragePagePinned message"));
1231 std::string(
"Handling StoragePinPage: sent StoragePagePinned message"));
1234 errMsg =
"Fatal Error: Page doesn't exist for pinning page.";
1235 std::cout <<
"dbId = " << dbId <<
", typeId = " << typeId
1236 <<
", setId = " << setId << std::endl;
1237 std::cout << errMsg << std::endl;
1238 logger->error(errMsg);
1240 return make_pair(res, errMsg);
1245 StoragePinBytes_TYPEID,
1250 UserTypeID typeId = request->getUserTypeID();
1251 SetID setId = request->getSetID();
1252 size_t sizeOfBytes = request->getSizeOfBytes();
1260 if ((dbId == 0) && (typeId == 0)) {
1262 set = getFunctionality<PangeaStorageServer>().
getTempSet(setId);
1265 set = getFunctionality<PangeaStorageServer>().
getSet(dbId, typeId, setId);
1268 void* myBytes =
nullptr;
1269 if (set !=
nullptr) {
1270 myBytes = set->getNewBytes(sizeOfBytes);
1273 if (myBytes !=
nullptr) {
1276 ack->setSizeOfBytes(sizeOfBytes);
1277 size_t offset = this->
shm->computeOffset(myBytes);
1278 ack->setSharedMemOffset(offset);
1282 errMsg =
"Can't get " + std::to_string(sizeOfBytes) +
" bytes for set:";
1283 std::cout <<
"dbId = " << dbId <<
", typeId = " << typeId
1284 <<
", setId = " << setId << std::endl;
1285 std::cout << errMsg << std::endl;
1286 logger->error(errMsg);
1288 return make_pair(res, errMsg);
1294 StorageUnpinPage_TYPEID,
1301 UserTypeID typeId = request->getUserTypeID();
1302 SetID setId = request->getSetID();
1303 PageID pageId = request->getPageID();
1314 if (getFunctionality<PangeaStorageServer>().
getCache()->decPageRefCount(key) ==
false) {
1316 errMsg =
"Fatal Error: Page doesn't exist for unpinning page.";
1317 std::cout <<
"dbId=" << dbId <<
", typeId=" << typeId <<
", setId=" << setId
1318 <<
", pageId=" << pageId << std::endl;
1319 std::cout << errMsg << std::endl;
1320 logger->error(errMsg);
1322 #ifdef ENABLE_EVICTION
1323 getFunctionality<PangeaStorageServer>().
getCache()->evictPage(key);
1328 logger->debug(std::string(
"Making response object.\n"));
1333 logger->debug(std::string(
"Sending response object.\n"));
1334 res = sendUsingMe->sendObject(response, errMsg);
1335 logger->debug(std::string(
"response sent for StorageUnpinPage.\n"));
1337 return make_pair(res, errMsg);
1345 StorageGetSetPages_TYPEID,
1350 UserTypeID typeId = request->getUserTypeID();
1351 SetID setId = request->getSetID();
1356 SetPtr set = getFunctionality<PangeaStorageServer>().
getSet(dbId, typeId, setId);
1357 if (set ==
nullptr) {
1359 errMsg =
"Fatal Error: Set doesn't exist.";
1360 std::cout << errMsg << std::endl;
1361 return make_pair(res, errMsg);
1366 std::vector<PageIteratorPtr>* iterators = set->getIterators();
1369 set->setPinned(
true);
1370 int numIterators = iterators->size();
1374 PDB_COUT <<
"counter = " << counter << std::endl;
1380 for (
int i = 0; i < numIterators; i++) {
1383 iterators->at(i), &getFunctionality<PangeaStorageServer>(), counter);
1384 worker->execute(scanWork, tempBuzzer);
1387 while (counter < numIterators) {
1390 set->setPinned(
false);
1398 if (communicatorToBackEnd->connectToLocalServer(
1399 getFunctionality<PangeaStorageServer>().getLogger(),
1403 std::cout << errMsg << std::endl;
1404 return make_pair(res, errMsg);
1409 if (!communicatorToBackEnd->sendObject<
StorageNoMorePage>(noMorePage, errMsg)) {
1411 std::cout << errMsg << std::endl;
1412 return make_pair(res, errMsg);
1415 return make_pair(res, errMsg);
1421 StorageTestSetScan_TYPEID,
1425 std::string dbName = request->getDatabase();
1426 std::string setName = request->getSetName();
1427 SetPtr set = getFunctionality<PangeaStorageServer>().
getSet(
1428 std::pair<std::string, std::string>(dbName, setName));
1433 if (set ==
nullptr) {
1435 errMsg =
"Fatal Error: Set doesn't exist!";
1436 std::cout << errMsg << std::endl;
1437 return make_pair(res, errMsg);
1441 if (communicatorToBackend->connectToLocalServer(
1442 getFunctionality<PangeaStorageServer>().getLogger(),
1446 std::cout << errMsg << std::endl;
1447 return make_pair(res, errMsg);
1452 SetID setId = set->getSetID();
1457 makeObject<BackendTestSetScan>(dbId, typeId, setId);
1460 std::cout << errMsg << std::endl;
1461 return make_pair(res, errMsg);
1467 communicatorToBackend->getSizeOfNextObject()};
1474 makeObject<SimpleRequestResult>(res, errMsg);
1477 res = sendUsingMe->sendObject(response, errMsg);
1479 return make_pair(res, errMsg);
1488 StorageTestSetCopy_TYPEID,
1492 std::string dbNameIn = request->getDatabaseIn();
1493 std::string setNameIn = request->getSetNameIn();
1494 SetPtr setIn = getFunctionality<PangeaStorageServer>().
getSet(
1495 std::pair<std::string, std::string>(dbNameIn, setNameIn));
1497 std::string dbNameOut = request->getDatabaseOut();
1498 std::string setNameOut = request->getSetNameOut();
1499 SetPtr setOut = getFunctionality<PangeaStorageServer>().
getSet(
1500 std::pair<std::string, std::string>(dbNameOut, setNameOut));
1506 if (setIn ==
nullptr) {
1508 errMsg =
"Fatal Error: Input set doesn't exist!";
1509 std::cout << errMsg << std::endl;
1512 if (setOut ==
nullptr) {
1514 errMsg +=
"Fatal Error: Output set doesn't exist!";
1515 std::cout << errMsg << std::endl;
1519 if ((setIn !=
nullptr) && (setOut !=
nullptr)) {
1522 if (communicatorToBackend->connectToLocalServer(
1523 getFunctionality<PangeaStorageServer>().getLogger(),
1527 std::cout << errMsg << std::endl;
1528 return make_pair(res, errMsg);
1533 SetID setIdIn = setIn->getSetID();
1536 SetID setIdOut = setOut->getSetID();
1541 dbIdIn, typeIdIn, setIdIn, dbIdOut, typeIdOut, setIdOut);
1544 std::cout << errMsg << std::endl;
1547 communicatorToBackend->getSizeOfNextObject()};
1557 res = sendUsingMe->sendObject(response, errMsg);
1560 return make_pair(res, errMsg);
1584 sprintf(buffer,
"%s/%d_%s", rootPath.c_str(), dbId, dbName.c_str());
1585 return string(buffer);
1590 if ((this->
metaTempPath = this->
conf->getMetaTempDir()).compare(
"") != 0) {
1595 string strDataTempPaths = this->
conf->getDataTempDirs();
1596 string curDataTempPath;
1597 size_t startPos = 0;
1599 if ((curPos = strDataTempPaths.find(
',')) == string::npos) {
1600 boost::filesystem::remove_all(strDataTempPaths);
1601 this->
conf->createDir(strDataTempPaths);
1602 PDB_COUT <<
"dataTempPath:" << strDataTempPaths <<
"\n";
1605 while ((curPos = strDataTempPaths.find(
',')) != string::npos) {
1606 curDataTempPath = strDataTempPaths.substr(startPos, curPos);
1607 boost::filesystem::remove_all(curDataTempPath);
1608 this->
conf->createDir(curDataTempPath);
1609 PDB_COUT <<
"dataTempPath:" << curDataTempPath <<
"\n";
1611 strDataTempPaths = strDataTempPaths.substr(curPos + 1, strDataTempPaths.length() + 1);
1613 boost::filesystem::remove_all(strDataTempPaths);
1614 this->
conf->createDir(strDataTempPaths);
1615 PDB_COUT <<
"dataTempPath:" << strDataTempPaths <<
"\n";
1626 string strDataRootPaths = this->
conf->getDataDirs();
1627 string curDataRootPath;
1628 size_t startPos = 0;
1630 if ((curPos = strDataRootPaths.find(
',')) == string::npos) {
1631 this->
conf->createDir(strDataRootPaths);
1634 while ((curPos = strDataRootPaths.find(
',')) != string::npos) {
1635 curDataRootPath = strDataRootPaths.substr(startPos, curPos);
1636 this->
conf->createDir(curDataRootPath);
1638 strDataRootPaths = strDataRootPaths.substr(curPos + 1, strDataRootPaths.length() + 1);
1640 this->
conf->createDir(strDataRootPaths);
1648 if (this->
dbs->find(dbId) != this->
dbs->end()) {
1649 this->
logger->writeLn(
"PDBStorage: database exists.");
1655 this->
conf->createDir(metaDBPath);
1659 vector<string>* dataDBPaths =
new vector<string>();
1661 string curDataDBPath;
1664 dataDBPaths->push_back(curDataDBPath);
1678 this->
dbs->insert(pair<DatabaseID, DefaultDatabasePtr>(dbId, db));
1679 this->
name2id->insert(pair<string, DatabaseID>(dbName, dbId));
1681 this->
usersetSeqIds->insert(pair<string, SequenceID*>(dbName, seqId));
1689 if (
name2id->count(dbName) != 0) {
1690 std::cout <<
"Database " << dbName <<
" exists" << std::endl;
1705 boost::filesystem::remove_all(path.c_str());
1708 boost::filesystem::remove_all(path.c_str());
1716 if (
name2id->count(dbName) != 0) {
1723 map<DatabaseID, DefaultDatabasePtr>::iterator it = this->
dbs->find(dbId);
1724 if (it != this->
dbs->end()) {
1726 string dbName = it->second->getDatabaseName();
1728 map<string, DatabaseID>::iterator name2idIt = this->
name2id->find(dbName);
1735 this->logger->writeLn(
"Database doesn't exist:");
1736 this->logger->writeInt(dbId);
1743 map<DatabaseID, DefaultDatabasePtr>::iterator it = this->
dbs->find(dbId);
1744 if (it != this->
dbs->end()) {
1757 pthread_mutex_lock(&this->
typeLock);
1758 this->
typename2id->insert(std::pair<std::string, UserTypeID>(typeName, typeId));
1759 pthread_mutex_unlock(&this->
typeLock);
1767 if (this->
name2id->count(dbName) == 0) {
1779 db->removeType(typeId);
1791 pthread_mutex_lock(&this->
typeLock);
1793 pthread_mutex_unlock(&this->
typeLock);
1801 std::string dbName, std::string typeName, std::string setName,
SetID setId,
size_t pageSize) {
1802 SetPtr set =
getSet(std::pair<std::string, std::string>(dbName, setName));
1803 if (set !=
nullptr) {
1805 std::cout <<
"Set exists with setName=" << setName << std::endl;
1808 if (this->
name2id->count(dbName) == 0) {
1810 std::cout <<
"Database doesn't exist with dbName=" << dbName << std::endl;
1818 if ((typeId <= 0) || (typeId == 8191)) {
1819 PDB_COUT <<
"type doesn't exist for name=" << typeName
1820 <<
", and we store it as default type" << std::endl;
1821 typeName =
"UnknownUserData";
1824 PDB_COUT <<
"Pangea add new type when add set: typeName=" << typeName << std::endl;
1825 PDB_COUT <<
"Pangea add new type when add set: typeId=" << typeId << std::endl;
1833 TypePtr type = db->getType(typeId);
1834 if (type ==
nullptr) {
1838 db->addType(typeName, typeId);
1839 type = db->getType(typeId);
1841 set = type->getSet(setId);
1842 if (set !=
nullptr) {
1846 type->addSet(setName, setId, pageSize);
1847 std::cout <<
"to add set with dbName=" << dbName <<
", typeName=" << typeName
1848 <<
", setName=" << setName <<
", setId=" << setId <<
", pageSize=" << pageSize
1850 set = type->getSet(setId);
1852 PDB_COUT <<
"to get usersetLock" << std::endl;
1854 this->
userSets->insert(std::pair<std::pair<DatabaseID, SetID>,
SetPtr>(
1855 std::pair<DatabaseID, SetID>(dbId, setId), set));
1857 std::pair<std::pair<std::string, std::string>, std::pair<DatabaseID, SetID>>(
1858 std::pair<std::string, std::string>(dbName, setName),
1859 std::pair<DatabaseID, SetID>(dbId, setId)));
1861 PDB_COUT <<
"released usersetLock" << std::endl;
1868 std::string typeName,
1869 std::string setName,
1878 PDB_COUT <<
"to add set with dbName=" << dbName <<
", typeName=" << typeName
1879 <<
", setName=" << setName <<
", setId=" << setId << std::endl;
1881 return addSet(dbName, typeName, setName, setId, pageSize);
1887 return addSet(dbName,
"UnknownUserData", setName, pageSize);
1891 SetPtr set =
getSet(std::pair<std::string, std::string>(dbName, setName));
1892 if (set ==
nullptr) {
1893 PDB_COUT <<
"set with dbName=" << dbName <<
" and setName=" << setName <<
" doesn't exist"
1897 #ifdef REMOVE_SET_WITH_EVICTION
1898 std::cout <<
"To evict all pages in set with dbName=" << dbName <<
" and setName=" << setName
1899 <<
" to remove the set" << std::endl;
1904 SetID setId = set->getSetID();
1906 TypePtr type = database->getType(typeId);
1908 type->removeSet(setId);
1909 int numRemoved =
userSets->erase(std::pair<DatabaseID, SetID>(dbId, setId));
1910 PDB_COUT <<
"numItems removed from userSets:" << numRemoved << std::endl;
1911 numRemoved =
names2ids->erase(std::pair<std::string, std::string>(dbName, setName));
1912 PDB_COUT <<
"numItems removed from names2ids:" << numRemoved << std::endl;
1921 if (
name2id->count(dbName) == 0) {
1933 TypePtr type = database->getType(typeId);
1934 if (type !=
nullptr) {
1935 SetPtr set =
getSet(std::pair<std::string, std::string>(dbName, setName));
1936 if (set !=
nullptr) {
1937 #ifdef REMOVE_SET_WITH_EVICTION
1940 SetID setId = set->getSetID();
1942 type->removeSet(setId);
1943 userSets->erase(std::pair<DatabaseID, SetID>(dbId, setId));
1944 names2ids->erase(std::pair<std::string, std::string>(dbName, setName));
1958 this->logger->writeLn(
"To add temp set with setName=");
1959 this->logger->writeLn(setName);
1961 cout <<
"TempSet exists!\n";
1962 this->logger->writeLn(
"TempSet exists for setName=");
1963 this->logger->writeLn(setName);
1967 this->logger->writeLn(
"SetId=");
1968 this->logger->writeInt(setId);
1969 TempSetPtr tempSet = make_shared<TempSet>(setId,
1977 this->logger->writeLn(
"temp set created!");
1979 this->
tempSets->insert(pair<SetID, TempSetPtr>(setId, tempSet));
1980 this->
name2tempSetId->insert(pair<string, SetID>(setName, setId));
1986 map<SetID, TempSetPtr>::iterator it = this->
tempSets->find(setId);
1988 string setName = it->second->getSetName();
1989 it->second->clear();
2002 this->logger->writeLn(
"PDBStorage: Searching for temp set:");
2003 this->logger->writeInt(setId);
2004 map<SetID, TempSetPtr>::iterator it = this->
tempSets->find(setId);
2008 this->logger->writeLn(
"PDBStorage: TempSet doesn't exist:");
2009 this->logger->writeInt(setId);
2015 if ((dbId == 0) && (typeId == 0)) {
2020 if (db ==
nullptr) {
2021 this->logger->writeLn(
"PDBStorage: Database doesn't exist.");
2025 TypePtr type = db->getType(typeId);
2026 if (type ==
nullptr) {
2027 this->logger->writeLn(
"PDBStorage: Type doesn't exist.");
2031 SetPtr set = type->getSet(setId);
2042 PDB_COUT <<
"number of partitions:" << numThreads <<
"\n";
2046 for (i = 0; i < numThreads; i++) {
2048 flusher = make_shared<PDBFlushConsumerWork>(i,
this);
2051 while ((worker = this->
getWorker()) ==
nullptr) {
2054 worker->execute(flusher, flusher->getLinkedBuzzer());
2055 PDB_COUT <<
"flushing thread started for partition: " << i <<
"\n";
2065 for (i = 0; i < this->
flushers.size(); i++) {
2075 return this->
workers->getWorker();
2085 using namespace boost::filesystem;
2095 if (metaRootPath.compare(
"") == 0) {
2100 root = path(dataRootPath.at(0));
2106 root = path(metaRootPath);
2109 if (is_directory(root)) {
2110 vector<path> dbDirs;
2111 copy(directory_iterator(root), directory_iterator(), back_inserter(dbDirs));
2112 vector<path>::iterator iter;
2114 std::string dirName;
2119 for (iter = dbDirs.begin(); iter != dbDirs.end(); iter++) {
2120 if (is_directory(*iter)) {
2122 path = std::string(iter->c_str());
2125 dirName = path.substr(path.find_last_of(
'/') + 1, path.length() - 1);
2128 strId = dirName.substr(0, dirName.find(
'_'));
2129 dbId = stoul(strId);
2130 if (maxDbId < dbId) {
2134 name = dirName.substr(dirName.find(
'_') + 1, dirName.length() - 1);
2140 this->addDatabaseBySequenceFiles(name, dbId, path);
2142 this->addDatabaseByPartitionedFiles(name, dbId, path);
2144 this->databaseSeqId.initialize(maxDbId);
2168 if (this->dbs->find(dbId) != this->dbs->end()) {
2169 this->logger->writeLn(
"PDBStorage: database exists.");
2173 vector<string>* dataDBPaths =
new vector<string>();
2174 dataDBPaths->push_back(std::string(dbPath.c_str()));
2185 if (db ==
nullptr) {
2186 this->logger->writeLn(
"PDBStorage: Out of Memory.");
2187 std::cout <<
"FATAL ERROR: PDBStorage Out of Memory" << std::endl;
2191 db->initializeFromDBDir(dbPath);
2193 pthread_mutex_lock(&this->databaseLock);
2194 this->dbs->insert(pair<DatabaseID, DefaultDatabasePtr>(dbId, db));
2195 this->name2id->insert(pair<string, DatabaseID>(dbName, dbId));
2196 pthread_mutex_unlock(&this->databaseLock);
2205 if (this->dbs->find(dbId) != this->dbs->end()) {
2206 this->logger->writeLn(
"PDBStorage: database exists.");
2210 vector<string>* dataDBPaths =
new vector<string>();
2213 for (i = 0; i < dataRootPaths.size(); i++) {
2214 dataDBPath = this->encodeDBPath(this->dataRootPaths.at(i), dbId, dbName);
2215 dataDBPaths->push_back(dataDBPath);
2223 string(metaDBPath.c_str()),
2227 if (db ==
nullptr) {
2228 this->logger->writeLn(
"PDBStorage: Out of Memory.");
2229 std::cout <<
"FATAL ERROR: PDBStorage Out of Memory" << std::endl;
2233 db->initializeFromMetaDBDir(metaDBPath);
2235 pthread_mutex_lock(&this->databaseLock);
2236 this->dbs->insert(pair<DatabaseID, DefaultDatabasePtr>(dbId, db));
2237 this->name2id->insert(pair<string, DatabaseID>(dbName, dbId));
2238 pthread_mutex_unlock(&this->databaseLock);
2240 std::map<UserTypeID, TypePtr>* types = db->getTypes();
2241 std::map<UserTypeID, TypePtr>::iterator typeIter;
2245 for (typeIter = types->begin(); typeIter != types->end(); typeIter++) {
2247 TypePtr type = typeIter->second;
2248 std::string typeName = type->getName();
2249 pthread_mutex_lock(&this->typeLock);
2250 this->typename2id->insert(std::pair<std::string, UserTypeID>(typeName, typeId));
2251 pthread_mutex_unlock(&this->typeLock);
2252 std::map<SetID, SetPtr>* sets = type->getSets();
2253 std::map<SetID, SetPtr>::iterator setIter;
2254 for (setIter = sets->begin(); setIter != sets->end(); setIter++) {
2255 SetID setId = setIter->first;
2256 if (maxSetId <= setId) {
2257 maxSetId = setId + 1;
2259 SetPtr set = setIter->second;
2260 PDB_COUT <<
"Loaded existing set with database: " << dbName <<
", type: " << typeName
2261 <<
", set: " << set->getSetName() << std::endl;
2262 pthread_mutex_lock(&this->usersetLock);
2263 this->userSets->insert(std::pair<std::pair<DatabaseID, SetID>,
SetPtr>(
2264 std::pair<DatabaseID, SetID>(dbId, setId), set));
2265 this->names2ids->insert(
2266 std::pair<std::pair<std::string, std::string>, std::pair<DatabaseID, SetID>>(
2267 std::pair<std::string, std::string>(dbName, set->getSetName()),
2268 std::pair<DatabaseID, SetID>(dbId, setId)));
2269 pthread_mutex_unlock(&this->usersetLock);
2274 this->usersetSeqIds->insert(std::pair<std::string, SequenceID*>(dbName, seqId));
2278 return this->logger;
2295 return this->standalone;
std::map< std::string, UserTypeID > * typename2id
std::map< SetID, TempSetPtr > * tempSets
DefaultDatabasePtr getDatabase(DatabaseID dbId)
shared_ptr< TempSet > TempSetPtr
shared_ptr< PDBPage > PDBPagePtr
std::map< pair< std::string, std::string >, std::vector< Record< Vector< Handle< Object > > > * > > allRecords
unsigned int pageSeqInPartition
bool addSet(std::string dbName, std::string typeName, std::string setName, SetID setId, size_t pageSize=DEFAULT_PAGE_SIZE)
shared_ptr< UserType > TypePtr
Handle< ObjType > getRootObject()
bool removeType(std::string typeName)
std::vector< std::string > dataTempPaths
pthread_mutex_t workingMutex
shared_ptr< PageCache > PageCachePtr
bool exportToFile(std::string dbName, std::string setName, std::string path, std::string format, std::string &errMsg)
void addDatabaseBySequenceFiles(string dbName, DatabaseID dbId, boost::filesystem::path dbPath)
bool removeTypeFromDatabase(std::string dbName, std::string typeName)
shared_ptr< DefaultDatabase > DefaultDatabasePtr
TempSetPtr getTempSet(SetID setId)
void stopFlushConsumerThreads()
shared_ptr< PageIteratorInterface > PageIteratorPtr
std::map< std::string, SequenceID * > * usersetSeqIds
shared_ptr< PDBScanWork > PDBScanWorkPtr
void writeBackRecords(pair< std::string, std::string > databaseAndSet, bool flushOrNot=true, bool directPutOrNot=false)
void clearDB(DatabaseID dbId, string dbName)
shared_ptr< PDBFlushConsumerWork > PDBFlushConsumerWorkPtr
#define FLUSH_BUFFER_SIZE
bool removeTempSet(SetID setId)
void startFlushConsumerThreads()
PDBPagePtr getNewPage(pair< std::string, std::string > databaseAndSet)
SharedMemPtr getSharedMem()
pthread_mutex_t usersetLock
std::map< DatabaseID, DefaultDatabasePtr > * dbs
pthread_mutex_t tempsetLock
string getPathToBackEndServer()
bool initializeFromRootDirs(string metaRootPath, vector< string > dataRootPath)
bool removeDatabase(std::string dbName)
shared_ptr< PartitionedFile > PartitionedFilePtr
shared_ptr< SharedMem > SharedMemPtr
std::map< std::string, DatabaseID > * name2id
std::map< std::pair< DatabaseID, SetID >, SetPtr > * userSets
bool addDatabase(std::string dbName, DatabaseID dbId)
Record< ObjType > * getRecord(Handle< ObjType > &forMe)
std::shared_ptr< PDBCommunicator > PDBCommunicatorPtr
FilePartitionID partitionId
string encodeDBPath(string rootPath, DatabaseID dbId, string dbName)
void initialize(unsigned int currentID)
PageCircularBufferPtr flushBuffer
std::map< std::string, SetID > * name2tempSetId
std::vector< std::string > dataRootPaths
size_t bufferRecord(pair< std::string, std::string > databaseAndSet, Record< Vector< Handle< Object >>> *addMe)
shared_ptr< PDBBuzzer > PDBBuzzerPtr
shared_ptr< PDBWorkerQueue > PDBWorkerQueuePtr
bool addTempSet(std::string setName, SetID &setId, size_t pageSize=DEFAULT_PAGE_SIZE)
pthread_mutex_t counterMutex
shared_ptr< Configuration > ConfigurationPtr
shared_ptr< PDBWorker > PDBWorkerPtr
void addDatabaseByPartitionedFiles(string dbName, DatabaseID dbId, boost::filesystem::path dbMetaPath)
pthread_mutex_t databaseLock
void registerHandler(int16_t typeID, PDBCommWorkPtr handledBy)
SetPtr getSet(std::pair< std::string, std::string > databaseAndSet)
bool exportToHDFSFile(std::string dbName, std::string setName, std::string hdfsNameNodeIp, int hdfsNameNodePort, std::string path, std::string format, std::string &errMsg)
static int16_t getIDByName(std::string objectName, bool withLock=true)
std::map< std::pair< std::string, std::string >, std::pair< DatabaseID, SetID > > * names2ids
unsigned int getNextSequenceID()
void registerHandlers(PDBServer &forMe) override
bool removeSet(std::string dbName, std::string typeName, std::string setName)
std::shared_ptr< PDBLogger > PDBLoggerPtr
ConfigurationPtr getConf()
PageCircularBufferPtr getFlushBuffer()
shared_ptr< UserSet > SetPtr
std::map< pair< std::string, std::string >, size_t > sizes
PDBWorkerQueuePtr workers
bool addType(std::string typeName, UserTypeID typeId)
shared_ptr< PageCircularBuffer > PageCircularBufferPtr
int numWaitingBufferDataRequests
PangeaStorageServer(SharedMemPtr shm, PDBWorkerQueuePtr workers, PDBLoggerPtr logger, ConfigurationPtr conf, bool standalone=true)
std::vector< PDBWorkPtr > flushers
string pathToBackEndServer