A platform for high-performance distributed tool and library development written in C++. It can be deployed in two different cluster modes: standalone or distributed. API for v0.5.0, released on June 13, 2018.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
PangeaStorageServer.cc
Go to the documentation of this file.
1 /*****************************************************************************
2  * *
3  * Copyright 2018 Rice University *
4  * *
5  * Licensed under the Apache License, Version 2.0 (the "License"); *
6  * you may not use this file except in compliance with the License. *
7  * You may obtain a copy of the License at *
8  * *
9  * http://www.apache.org/licenses/LICENSE-2.0 *
10  * *
11  * Unless required by applicable law or agreed to in writing, software *
12  * distributed under the License is distributed on an "AS IS" BASIS, *
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. *
14  * See the License for the specific language governing permissions and *
15  * limitations under the License. *
16  * *
17  *****************************************************************************/
18 
19 #ifndef PANGEA_STORAGE_SERVER_C
20 #define PANGEA_STORAGE_SERVER_C
21 
22 #include "PDBDebug.h"
23 #include "JoinMap.h"
24 #include "PangeaStorageServer.h"
25 #include "SimpleRequestResult.h"
26 #include "CatalogServer.h"
27 #include "StorageAddData.h"
28 #include "StorageAddObjectInLoop.h"
29 #include "StorageAddDatabase.h"
30 #include "StorageAddSet.h"
31 #include "StorageClearSet.h"
32 #include "StorageGetData.h"
33 #include "StorageGetDataResponse.h"
34 #include "StorageGetSetPages.h"
35 #include "StoragePinPage.h"
36 #include "StoragePinBytes.h"
37 #include "StorageUnpinPage.h"
38 #include "StoragePagePinned.h"
39 #include "StorageBytesPinned.h"
40 #include "StorageNoMorePage.h"
41 #include "StorageTestSetScan.h"
42 #include "BackendTestSetScan.h"
43 #include "StorageTestSetCopy.h"
44 #include "BackendTestSetCopy.h"
45 #include "StorageAddTempSet.h"
47 #include "StorageRemoveDatabase.h"
48 #include "StorageRemoveTempSet.h"
49 #include "StorageExportSet.h"
50 #include "StorageRemoveUserSet.h"
51 #include "StorageRemoveHashSet.h"
52 #include "StorageCleanup.h"
53 #include "StorageCollectStats.h"
55 #include "PDBScanWork.h"
57 #include "SimpleRequestHandler.h"
58 #include "Record.h"
59 #include "InterfaceFunctions.h"
60 #include "DeleteSet.h"
61 #include "DefaultDatabase.h"
62 #include "DataTypes.h"
63 #include "SharedMem.h"
64 #include "PDBFlushProducerWork.h"
65 #include "PDBFlushConsumerWork.h"
66 #include "ExportableObject.h"
67 #include "JoinTupleBase.h"
68 //#include <hdfs/hdfs.h>
69 #include <cstdio>
70 #include <memory>
71 #include <string>
72 #include <iostream>
73 #include <signal.h>
74 #include <stdio.h>
75 #include <map>
76 #include <iterator>
77 #include <pthread.h>
78 #ifdef ENABLE_COMPRESSION
79 #include <snappy.h>
80 #endif
81 
82 #define FLUSH_BUFFER_SIZE 3
83 
84 
85 namespace pdb {
86 
87 
88 size_t PangeaStorageServer::bufferRecord(pair<std::string, std::string> databaseAndSet,
89  Record<Vector<Handle<Object>>>* addMe) {
90  if (allRecords.count(databaseAndSet) == 0) {
91  std::vector<Record<Vector<Handle<Object>>>*> records;
92  records.push_back(addMe);
93  allRecords[databaseAndSet] = records;
94  sizes[databaseAndSet] = addMe->numBytes();
95  } else {
96  allRecords[databaseAndSet].push_back(addMe);
97  sizes[databaseAndSet] += addMe->numBytes();
98  }
99  return sizes[databaseAndSet];
100 }
101 
103  PDBWorkerQueuePtr workers,
104  PDBLoggerPtr logger,
105  ConfigurationPtr conf,
106  bool standalone) {
107 
108  // server initialization
109 
110  // configuring server
111  this->nodeId = conf->getNodeID();
112  this->serverName = conf->getServerName();
113  this->shm = shm;
114  this->workers = workers;
115  this->conf = conf;
116  this->logger = logger;
117  this->standalone = standalone;
118  this->totalObjects = 0;
119  // IPC file used to communicate with backend
120  this->pathToBackEndServer = this->conf->getBackEndIpcFile();
121 
122  // initialize flush buffer
123  // a producer work will periodically remove unpinned data from input buffer, and
124  this->flushBuffer = make_shared<PageCircularBuffer>(FLUSH_BUFFER_SIZE, logger);
125 
126  // initialize cache, must be initialized before databases
127  this->cache = make_shared<PageCache>(conf, workers, flushBuffer, logger, shm);
128 
129  // initialize and load databases, must be initialized after cache
130  this->dbs = new std::map<DatabaseID, DefaultDatabasePtr>();
131  this->name2id = new std::map<std::string, DatabaseID>();
132  this->tempSets = new std::map<SetID, TempSetPtr>();
133  this->name2tempSetId = new std::map<std::string, SetID>();
134  this->userSets = new std::map<std::pair<DatabaseID, SetID>, SetPtr>();
135  this->names2ids =
136  new std::map<std::pair<std::string, std::string>, std::pair<DatabaseID, SetID>>();
137  this->typename2id = new std::map<std::string, SetID>();
138 
139  // initialize meta data mutex
140  pthread_mutex_init(&(this->databaseLock), nullptr);
141  pthread_mutex_init(&(this->typeLock), nullptr);
142  pthread_mutex_init(&(this->tempsetLock), nullptr);
143  pthread_mutex_init(&(this->usersetLock), nullptr);
144  pthread_mutex_init(&(this->workingMutex), nullptr);
145  pthread_mutex_init(&(this->counterMutex), nullptr);
146 
147  this->databaseSeqId.initialize(1); // DatabaseID starting from 1
148  this->usersetSeqIds = new std::map<std::string, SequenceID*>();
149  // if meta/data/temp directories do not exist, create them.
150  this->createRootDirs();
151 
154  this->createTempDirs();
155  this->conf->createDir(this->metaTempPath);
156  this->addType("UnknownUserData", 0); // user type starting from 1
157 }
158 
159 SetPtr PangeaStorageServer::getSet(pair<std::string, std::string> databaseAndSet) {
160  if (names2ids->count(databaseAndSet) != 0) {
161  pair<DatabaseID, SetID> ids = names2ids->at(databaseAndSet);
162  if (userSets->count(ids) != 0) {
163  return (*userSets)[ids];
164  }
165  }
166  return nullptr;
167 }
168 
169 void PangeaStorageServer::cleanup(bool flushOrNot) {
170  PDB_COUT << "to clean up for storage..." << std::endl;
171 
172  pthread_mutex_lock(&counterMutex);
173  while (numWaitingBufferDataRequests > 0) {
174  pthread_mutex_unlock(&counterMutex);
175  sched_yield();
176  pthread_mutex_lock(&counterMutex);
177  }
178  pthread_mutex_unlock(&counterMutex);
179  const LockGuard guard{workingMutex};
180  for (auto& a : allRecords) {
181  while (a.second.size() > 0)
182  writeBackRecords(a.first, flushOrNot);
183  }
184  std::cout << "Now there are " << totalObjects << " new objects stored in storage" << std::endl;
185  PDB_COUT << "sleep for 1 second to wait for all data gets flushed" << std::endl;
186  sleep(1);
187  PDB_COUT << "cleaned up for storage..." << std::endl;
188 }
189 
191 
193  pthread_mutex_destroy(&(this->databaseLock));
194  pthread_mutex_destroy(&(this->typeLock));
195  pthread_mutex_destroy(&(this->tempsetLock));
196  pthread_mutex_destroy(&(this->usersetLock));
197  pthread_mutex_destroy(&(this->workingMutex));
198  pthread_mutex_destroy(&(this->counterMutex));
199  delete this->dbs;
200  delete this->name2id;
201  delete this->tempSets;
202  delete this->name2tempSetId;
203  delete this->userSets;
204  delete this->names2ids;
205  delete this->typename2id;
206 }
207 
208 PDBPagePtr PangeaStorageServer::getNewPage(pair<std::string, std::string> databaseAndSet) {
209 
210  // and get that page
211  SetPtr whichSet = getSet(databaseAndSet);
212  if (whichSet == nullptr) {
213  return nullptr;
214  } else {
215  return whichSet->addPage();
216  }
217 }
218 
219 
220 void PangeaStorageServer::writeBackRecords(pair<std::string, std::string> databaseAndSet,
221  bool flushOrNot,
222  bool directPutOrNot) {
223 
224  // get all of the records
225  auto& allRecs = allRecords[databaseAndSet];
226 
227 
228  // the current size (in bytes) of records we need to process
229  size_t numBytesToProcess = sizes[databaseAndSet];
230  PDB_COUT << "buffer is full, to write to a storage page" << std::endl;
231 
232  // now, get a page to write to
233  PDBPagePtr myPage = getNewPage(databaseAndSet);
234  if (myPage == nullptr) {
235  std::cout << "FATAL ERROR: set to store data doesn't exist!" << std::endl;
236  std::cout << "databaseName" << databaseAndSet.first << std::endl;
237  std::cout << "setName" << databaseAndSet.second << std::endl;
238  return;
239  }
240  size_t pageSize = myPage->getSize();
241  // the position in the output vector
242  int pos = 0;
243 
244  // the number of items in the current record we are processing
245  int numObjectsInRecord;
246 
247 
248  // now, keep looping until we run out of records to process (in which case, we'll break)
249  while (true) {
250 
251  // all allocations will be done to the page
252  UseTemporaryAllocationBlock block(myPage->getBytes(), pageSize);
253  Handle<Vector<Handle<Object>>> data = makeObject<Vector<Handle<Object>>>();
254 
255  try {
256  // while there are still records
257  while (allRecs.size() > 0) {
258 
259  auto& allObjects = *(allRecs[allRecs.size() - 1]->getRootObject());
260  numObjectsInRecord = allObjects.size();
261  // put all of the data onto the page
262  for (; pos < numObjectsInRecord; pos++) {
263  data->push_back(allObjects[pos]);
264  totalObjects++;
265  }
266 
267  // now kill this record
268  numBytesToProcess -= allRecs[allRecs.size() - 1]->numBytes();
269  free(allRecs[allRecs.size() - 1]);
270  allRecs.pop_back();
271  pos = 0;
272  }
273 
274  // if we got here, all records have been processed
275 
276  // comment the following three lines of code to allow Pangea to manage pages
277  PDB_COUT << "Write all of the bytes in the record.\n";
278  getRecord(data);
279 
280  CacheKey key;
281  key.dbId = myPage->getDbID();
282  key.typeId = myPage->getTypeID();
283  key.setId = myPage->getSetID();
284  key.pageId = myPage->getPageID();
285  this->getCache()->decPageRefCount(key);
286  if (flushOrNot == true) {
287  this->getCache()->flushPageWithoutEviction(key);
288  }
289  break;
290 
291  // put the extra objects tht we could not store back in the record
292  } catch (NotEnoughSpace& n) {
293  // comment the following three lines of code to allow Pangea to manage pages
294  std::cout << "Writing back a page!!\n";
295  getRecord(data);
296  if (data->size() == 0) {
297  std::cout
298  << "FATAL ERROR: object size is larger than a page, pleases increase page size"
299  << std::endl;
300  std::cout << "databaseName" << databaseAndSet.first << std::endl;
301  std::cout << "setName" << databaseAndSet.second << std::endl;
302  pos++;
303  }
304  CacheKey key;
305  key.dbId = myPage->getDbID();
306  key.typeId = myPage->getTypeID();
307  key.setId = myPage->getSetID();
308  key.pageId = myPage->getPageID();
309  this->getCache()->decPageRefCount(key);
310  if (flushOrNot == true) {
311  this->getCache()->flushPageWithoutEviction(key);
312  }
313  // there are two cases... in the first case, we can make another page out of this data,
314  // since we have enough records to do so
315  if (numBytesToProcess + (((numObjectsInRecord - pos) / numObjectsInRecord) *
316  allRecs[allRecs.size() - 1]->numBytes()) >
317  pageSize) {
318 
319  myPage = getNewPage(databaseAndSet);
320  pageSize = myPage->getSize();
321  continue;
322 
323  // in this case, we have a small bit of data left
324  } else {
325 
326  // create the vector to hold these guys
327  void* myRAM = malloc(allRecs[allRecs.size() - 1]->numBytes());
328  const UseTemporaryAllocationBlock block(myRAM,
329  allRecs[allRecs.size() - 1]->numBytes());
330  Handle<Vector<Handle<Object>>> extraData =
331  makeObject<Vector<Handle<Object>>>(numObjectsInRecord - pos);
332 
333  // write the objects to the vector
334  auto& allObjects = *(allRecs[allRecs.size() - 1]->getRootObject());
335  for (; pos < numObjectsInRecord; pos++) {
336  extraData->push_back(allObjects[pos]);
337  }
338 
339  // destroy the record that we were copying from
340  numBytesToProcess -= allRecs[allRecs.size() - 1]->numBytes();
341  free(allRecs[allRecs.size() - 1]);
342 
343  // and get the record that we copied to
344  allRecs[allRecs.size() - 1] = getRecord(extraData);
345  numBytesToProcess += allRecs[allRecs.size() - 1]->numBytes();
346  break;
347  }
348  }
349  }
350 
351  PDB_COUT << "Now all the records are back.\n";
352  sizes[databaseAndSet] = numBytesToProcess;
353 }
354 
355 
356 // export to a local file
357 bool PangeaStorageServer::exportToFile(std::string dbName,
358  std::string setName,
359  std::string path,
360  std::string format,
361  std::string& errMsg) {
362 
363  FILE* myFile = fopen(path.c_str(), "w+");
364  if (myFile == NULL) {
365  errMsg = "Error opening file for writing: " + path;
366  std::cout << errMsg << std::endl;
367  return false;
368  }
369 
370  SetPtr setToExport =
371  getFunctionality<PangeaStorageServer>().getSet(std::make_pair(dbName, setName));
372  if (setToExport == nullptr) {
373  errMsg = "Error in exportToFile: set doesn't exist: " + dbName + ":" + setName;
374  std::cout << errMsg << std::endl;
375  return false;
376  }
377 
378  bool isHeadWritten = false;
379  setToExport->setPinned(true);
380  std::vector<PageIteratorPtr>* pageIters = setToExport->getIterators();
381  int numIterators = pageIters->size();
382  for (int i = 0; i < numIterators; i++) {
383  PageIteratorPtr iter = pageIters->at(i);
384  while (iter->hasNext()) {
385  PDBPagePtr nextPage = iter->next();
386  if (nextPage != nullptr) {
388  (Record<Vector<Handle<Object>>>*)(nextPage->getBytes());
389  Handle<Vector<Handle<Object>>> inputVec = myRec->getRootObject();
390  int vecSize = inputVec->size();
391  for (int j = 0; j < vecSize; j++) {
392  Handle<ExportableObject> objectToExport =
393  unsafeCast<ExportableObject, Object>((*inputVec)[j]);
394  if (isHeadWritten == false) {
395  std::string header = objectToExport->toSchemaString(format);
396  if (header != "") {
397  fprintf(myFile, "%s", header.c_str());
398  }
399  isHeadWritten = true;
400  }
401  std::string value = objectToExport->toValueString(format);
402  if (value != "") {
403  fprintf(myFile, "%s", value.c_str());
404  }
405  }
406  // to evict this page
407  PageCachePtr cache = getFunctionality<PangeaStorageServer>().getCache();
408  CacheKey key;
409  key.dbId = nextPage->getDbID();
410  key.typeId = nextPage->getTypeID();
411  key.setId = nextPage->getSetID();
412  key.pageId = nextPage->getPageID();
413  cache->decPageRefCount(key);
414  cache->evictPage(key); // try to modify this to something like
415  // evictPageWithoutFlush() or clear set in the end.
416  }
417  }
418  }
419  setToExport->setPinned(false);
420  delete pageIters;
421  fflush(myFile);
422  fclose(myFile);
423  return true;
424 }
425 
426 // export to a HDFS partition
428  std::string setName,
429  std::string hdfsNameNodeIp,
430  int hdfsNameNodePort,
431  std::string path,
432  std::string format,
433  std::string& errMsg) {
434  //TODO
435  return false;
436 }
437 
439  // this handler accepts a request to write back all buffered records
440  forMe.registerHandler(
441  StorageCleanup_TYPEID,
443  [&](Handle<StorageCleanup> request, PDBCommunicatorPtr sendUsingMe) {
444  PDB_COUT << "received StorageCleanup" << std::endl;
445  std::string errMsg;
446  bool res = true;
447  getFunctionality<PangeaStorageServer>().cleanup(request->isFlushing());
448 
449  const UseTemporaryAllocationBlock tempBlock{1024};
450  Handle<SimpleRequestResult> response = makeObject<SimpleRequestResult>(res, errMsg);
451 
452  res = sendUsingMe->sendObject(response, errMsg);
453  return make_pair(res, errMsg);
454  }
455 
456  ));
457 
458 
459  // this handler accepts a request to add a database
460  forMe.registerHandler(
461  StorageAddDatabase_TYPEID,
463  Handle<StorageAddDatabase> request, PDBCommunicatorPtr sendUsingMe) {
464  PDB_COUT << "received StorageAddDatabase" << std::endl;
465  std::string errMsg;
466  bool res = true;
467  if (standalone == true) {
468  res = getFunctionality<PangeaStorageServer>().addDatabase(request->getDatabase());
469  if (res == false) {
470  errMsg = "Database already exists\n";
471  } else {
472  res = getFunctionality<CatalogServer>().addDatabase(request->getDatabase(),
473  errMsg);
474  }
475 
476  } else {
477  if ((res = getFunctionality<PangeaStorageServer>().addDatabase(
478  request->getDatabase())) == false) {
479  errMsg = "Database already exists\n";
480  }
481  }
482  // make response
483  const UseTemporaryAllocationBlock tempBlock{1024};
484  Handle<SimpleRequestResult> response = makeObject<SimpleRequestResult>(res, errMsg);
485 
486  // return the result
487  res = sendUsingMe->sendObject(response, errMsg);
488  return make_pair(res, errMsg);
489  }
490 
491  ));
492 
493  // this handler accepts a request to add a set
494  forMe.registerHandler(
495  StorageAddSet_TYPEID,
497  PDBCommunicatorPtr sendUsingMe) {
498  PDB_COUT << "received StorageAddSet" << std::endl;
499 
500  std::string errMsg;
501  bool res = true;
502  if (request->getPageSize() > conf->getMaxPageSize()) {
503  errMsg = "Error: page size is larger than maxPageSize\n";
504  std::cout << errMsg << std::endl;
505  res = false;
506  } else {
507  if (standalone == true) {
508  PDB_COUT << "adding set in standalone mode" << std::endl;
509  res = getFunctionality<PangeaStorageServer>().addSet(request->getDatabase(),
510  request->getTypeName(),
511  request->getSetName(),
512  request->getPageSize());
513  if (res == false) {
514  errMsg = "Set " + request->getDatabase() + ":" + request->getSetName() +
515  ":" + request->getTypeName() + " already exists\n";
516  } else {
517  int16_t typeID = VTableMap::getIDByName(request->getTypeName(), false);
518  PDB_COUT << "TypeID =" << typeID << std::endl;
519  if (typeID == -1) {
520  errMsg = "Could not find type " + request->getTypeName();
521  res = false;
522  } else {
523  PDB_COUT << "to add set in catalog" << std::endl;
524  res = getFunctionality<CatalogServer>().addSet(
525  typeID, request->getDatabase(), request->getSetName(), errMsg);
526  if (res == true) {
527  PDB_COUT << "success" << std::endl;
528  } else {
529  PDB_COUT << "failed" << std::endl;
530  }
531  }
532  }
533  } else {
534  PDB_COUT << "creating set in Pangea in distributed environment...with setName="
535  << request->getSetName() << std::endl;
536  if ((res = getFunctionality<PangeaStorageServer>().addSet(
537  request->getDatabase(),
538  request->getTypeName(),
539  request->getSetName(),
540  request->getPageSize())) == false) {
541  errMsg = "Set " + request->getDatabase() + ":" + request->getSetName() +
542  ":" + request->getTypeName() + " already exists\n";
543  cout << errMsg << endl;
544  } else {
545 #ifdef CHECK_TYPE
546  int16_t typeID = VTableMap::getIDByName(request->getTypeName(), false);
547  PDB_COUT << "TypeID =" << typeID << std::endl;
548  // make sure the type is registered in the catalog
549  if (typeID == -1) {
550  errMsg = "Could not find type " + request->getTypeName();
551  cout << errMsg << endl;
552  res = false;
553  }
554 #endif
555  }
556  }
557  }
558  // make the response
559  const UseTemporaryAllocationBlock tempBlock{1024};
560  Handle<SimpleRequestResult> response = makeObject<SimpleRequestResult>(res, errMsg);
561 
562  // return the result
563  res = sendUsingMe->sendObject(response, errMsg);
564  return make_pair(res, errMsg);
565  }));
566 
567 
568  // this handler requests to add a temp set
569  forMe.registerHandler(
570  StorageAddTempSet_TYPEID,
572  [&](Handle<StorageAddTempSet> request, PDBCommunicatorPtr sendUsingMe) {
573  std::string errMsg;
574  // add a temp set in local
575  SetID setId;
576  bool res = getFunctionality<PangeaStorageServer>().addTempSet(
577  request->getSetName(), setId, request->getPageSize());
578  if (res == false) {
579  errMsg = "TempSet " + request->getSetName() + " already exists\n";
580  }
581 
582  // make the response
583  const UseTemporaryAllocationBlock tempBlock{1024};
585  makeObject<StorageAddTempSetResult>(res, errMsg, setId);
586 
587 
588  // return the result
589  res = sendUsingMe->sendObject<StorageAddTempSetResult>(response, errMsg);
590  return make_pair(res, errMsg);
591  }));
592 
593  // this handler requests to add a temp set
594  forMe.registerHandler(
595  StorageRemoveDatabase_TYPEID,
597  Handle<StorageRemoveDatabase> request, PDBCommunicatorPtr sendUsingMe) {
598 
599  std::string errMsg;
600  std::string databaseName = request->getDatabase();
601  bool res;
602  PDB_COUT << "Deleting database " << databaseName << std::endl;
603  if (standalone == true) {
604  res = getFunctionality<PangeaStorageServer>().removeDatabase(databaseName);
605  if (res == false) {
606  errMsg = "Failed to delete database\n";
607  } else {
608  res = getFunctionality<CatalogServer>().deleteDatabase(databaseName, errMsg);
609  }
610  } else {
611  res = getFunctionality<PangeaStorageServer>().removeDatabase(databaseName);
612  if (res == false) {
613  errMsg = "Failed to delete database\n";
614  }
615  }
616  // make the response
617  const UseTemporaryAllocationBlock tempBlock{1024};
618  Handle<SimpleRequestResult> response = makeObject<SimpleRequestResult>(res, errMsg);
619 
620  res = sendUsingMe->sendObject<SimpleRequestResult>(response, errMsg);
621  return make_pair(res, errMsg);
622  }));
623 
624  // this handler requests to remove a user set
625  forMe.registerHandler(
626  StorageRemoveUserSet_TYPEID,
628  Handle<StorageRemoveUserSet> request, PDBCommunicatorPtr sendUsingMe) {
629  std::string errMsg;
630  std::string databaseName = request->getDatabase();
631  std::string typeName = request->getTypeName();
632  std::string setName = request->getSetName();
633  bool res = true;
634  SetPtr setToRemove = getSet(std::pair<std::string, std::string>(databaseName, setName));
635  if (setToRemove == nullptr) {
636  // make the response
637  const UseTemporaryAllocationBlock tempBlock{1024};
638  errMsg = "Set doesn't exist.";
639  Handle<SimpleRequestResult> response =
640  makeObject<SimpleRequestResult>(false, errMsg);
641 
642  // return the result
643  res = sendUsingMe->sendObject(response, errMsg);
644  return make_pair(res, errMsg);
645  }
646  if (standalone == true) {
647  res = getFunctionality<PangeaStorageServer>().removeSet(
648  databaseName, typeName, setName);
649  if (res == false) {
650  errMsg = "Set doesn't exist\n";
651  }
652 
653  // deletes set in catalog
654  res = getFunctionality<CatalogServer>().deleteSet(
655  request->getDatabase(), request->getSetName(), errMsg);
656 
657 
658  } else {
659  if ((res = getFunctionality<PangeaStorageServer>().removeSet(databaseName,
660  setName)) == false) {
661  errMsg = "Error removing set!\n";
662  cout << errMsg << endl;
663  }
664  }
665  // make the response
666  const UseTemporaryAllocationBlock tempBlock{1024};
667  Handle<SimpleRequestResult> response = makeObject<SimpleRequestResult>(res, errMsg);
668 
669  // return the result
670  res = sendUsingMe->sendObject(response, errMsg);
671  return make_pair(res, errMsg);
672  }
673 
674  ));
675 
676  forMe.registerHandler(
677  StorageClearSet_TYPEID,
679  PDBCommunicatorPtr sendUsingMe) {
680  std::string errMsg;
681  std::string databaseName = request->getDatabase();
682  std::string typeName = request->getTypeName();
683  std::string setName = request->getSetName();
684  bool res = true;
685  SetPtr set = getSet(std::make_pair(databaseName, setName));
686  if (set == nullptr) {
687  res = false;
688  errMsg = "Set doesn't exist\n";
689  } else {
690  size_t pageSize = set->getPageSize();
691  if (standalone == true) {
692  PDB_COUT << "removing set in standalone mode" << std::endl;
693  res = getFunctionality<PangeaStorageServer>().removeSet(
694  databaseName, typeName, setName);
695  if (res == false) {
696  errMsg = "Set doesn't exist\n";
697  } else {
698  PDB_COUT << "adding set in standalone mode" << std::endl;
699  res = getFunctionality<PangeaStorageServer>().addSet(request->getDatabase(),
700  request->getTypeName(),
701  request->getSetName(),
702  pageSize);
703  }
704 
705  } else {
706  PDB_COUT << "removing set in cluster mode" << std::endl;
707  if ((res = getFunctionality<PangeaStorageServer>().removeSet(
708  databaseName, setName)) == false) {
709  errMsg = "Error removing set!\n";
710  cout << errMsg << endl;
711  } else {
712  if ((res = getFunctionality<PangeaStorageServer>().addSet(
713  request->getDatabase(),
714  request->getTypeName(),
715  request->getSetName(),
716  pageSize)) == false) {
717  errMsg = "Set already exists\n";
718  cout << errMsg << endl;
719  }
720  }
721  }
722  }
723  // make the response
724  const UseTemporaryAllocationBlock tempBlock{1024};
725  Handle<SimpleRequestResult> response = makeObject<SimpleRequestResult>(res, errMsg);
726 
727  // return the result
728  res = sendUsingMe->sendObject(response, errMsg);
729  return make_pair(res, errMsg);
730  }
731 
732  ));
733 
734 
735  // this handler requests to remove a temp set
736  forMe.registerHandler(
737  StorageRemoveTempSet_TYPEID,
739  [&](Handle<StorageRemoveTempSet> request, PDBCommunicatorPtr sendUsingMe) {
740  std::string errMsg;
741  // add a set in local
742  SetID setId = request->getSetID();
743  bool res = getFunctionality<PangeaStorageServer>().removeTempSet(setId);
744  if (res == false) {
745  errMsg = "Set doesn't exist\n";
746  }
747  // make the response
748  const UseTemporaryAllocationBlock tempBlock{1024};
749  Handle<SimpleRequestResult> response = makeObject<SimpleRequestResult>(res, errMsg);
750 
751  // return the result
752  res = sendUsingMe->sendObject(response, errMsg);
753  return make_pair(res, errMsg);
754  }));
755 
756  // this handler requests to remove a hash set
757  forMe.registerHandler(
758  StorageRemoveHashSet_TYPEID,
760  Handle<StorageRemoveHashSet> request, PDBCommunicatorPtr sendUsingMe) {
761  std::string errMsg;
762  bool success;
763  // connect to backend
764  PDBCommunicatorPtr communicatorToBackend = make_shared<PDBCommunicator>();
765  if (communicatorToBackend->connectToLocalServer(
766  getFunctionality<PangeaStorageServer>().getLogger(),
767  getFunctionality<PangeaStorageServer>().getPathToBackEndServer(),
768  errMsg)) {
769  std::cout << errMsg << std::endl;
770  success = false;
771  } else if (!communicatorToBackend->sendObject(request, errMsg)) {
772  std::cout << errMsg << std::endl;
773  errMsg = std::string("can't send message to backend: ") + errMsg;
774  success = false;
775  } else {
776  PDB_COUT << "Storage sent request to backend" << std::endl;
777  // wait for backend to finish.
778  communicatorToBackend->getNextObject<SimpleRequestResult>(success, errMsg);
779  if (!success) {
780  std::cout << "Error waiting for backend to remove hash set. " << errMsg
781  << std::endl;
782  errMsg = std::string("backend failed to remove hash set: ") + errMsg;
783  }
784  }
785  // make the response
786  const UseTemporaryAllocationBlock tempBlock{1024};
787  Handle<SimpleRequestResult> response = makeObject<SimpleRequestResult>(success, errMsg);
788 
789  // return the result
790  success = sendUsingMe->sendObject(response, errMsg);
791  return make_pair(success, errMsg);
792  }));
793 
794  // this handler requests to export a set to a local file
795  forMe.registerHandler(
796  StorageExportSet_TYPEID,
798  PDBCommunicatorPtr sendUsingMe) {
799  std::string errMsg;
800  bool res =
801  getFunctionality<PangeaStorageServer>().exportToFile(request->getDbName(),
802  request->getSetName(),
803  request->getOutputFilePath(),
804  request->getFormat(),
805  errMsg);
806  // make the response
807  const UseTemporaryAllocationBlock tempBlock{1024};
808  Handle<SimpleRequestResult> response = makeObject<SimpleRequestResult>(res, errMsg);
809 
810  // return the result
811  res = sendUsingMe->sendObject(response, errMsg);
812  return make_pair(res, errMsg);
813 
814  }));
815 
816 
817  // this handler accepts a request to store one large object (e.g. JoinMap) in one page
818  forMe.registerHandler(
819  StorageAddObjectInLoop_TYPEID,
821  Handle<StorageAddObjectInLoop> request, PDBCommunicatorPtr sendUsingMe) {
822  std::cout << "start StorageAddObjectInLoop" << std::endl;
823  std::string errMsg;
824  bool everythingOK = true;
825  Handle<StorageAddObjectInLoop> curRequest = request;
826  void* requestInLoop = nullptr;
827  while (curRequest->isLoopEnded() == false) {
828  bool typeCheckOrNot = request->isTypeCheck();
829  if (typeCheckOrNot == true) {
830 #ifdef DEBUG_SET_TYPE
831  // first, check with the catalog to make sure that the given database, set, and
832  // type are correct
833  int16_t typeID = getFunctionality<CatalogServer>().getObjectType(
834  request->getDatabase(), request->getSetName());
835  if (typeID < 0) {
836  everythingOK = false;
837  }
838  // if we made it here, the type is correct, as is the database and the set
839  else if (typeID != VTableMap::getIDByName(request->getType(), false)) {
840  everythingOK = false;
841  }
842 #endif
843  }
844 
845  // get the record
846  size_t numBytes = sendUsingMe->getSizeOfNextObject();
847  std::cout << "received " << numBytes << " bytes" << std::endl;
848 #ifdef ENABLE_COMPRESSION
849  char* readToHere = new char[numBytes];
850 #else
851  void* readToHere = malloc(numBytes);
852 #endif
853  everythingOK = sendUsingMe->receiveBytes(readToHere, errMsg);
854 
855  {
856  const UseTemporaryAllocationBlock block{1024};
857  Handle<SimpleRequestResult> response =
858  makeObject<SimpleRequestResult>(everythingOK, errMsg);
859 
860  // return the result
861  everythingOK = sendUsingMe->sendObject(response, errMsg);
862  }
863  if (everythingOK) {
864 #ifdef ENABLE_COMPRESSION
865  size_t sizeOfBytesToAdd = 0;
866  snappy::GetUncompressedLength(readToHere, numBytes, &sizeOfBytesToAdd);
867 #else
868  size_t sizeOfBytesToAdd = numBytes;
869 #endif
870 
871  auto databaseAndSet = make_pair((std::string)request->getDatabase(),
872  (std::string)request->getSetName());
873  // now, get a page to write to
874  SetPtr mySet = getSet(databaseAndSet);
875  if (mySet == nullptr) {
876  std::cout << "FATAL ERROR: set to store data doesn't exist!" << std::endl;
877  std::cout << "databaseName" << databaseAndSet.first << std::endl;
878  std::cout << "setName" << databaseAndSet.second << std::endl;
879  return make_pair(
880  false, std::string("FATAL ERROR: set to store data doesn't exist!"));
881  }
882  std::cout << "sizeOfBytesToAdd is " << sizeOfBytesToAdd << std::endl;
883  char* myBytes = (char*)mySet->getNewBytes(sizeOfBytesToAdd);
884  if (myBytes == nullptr) {
885  return make_pair(false,
886  std::string("FATAL ERROR: can't get bytes from user set " +
887  databaseAndSet.second));
888  }
889 #ifdef ENABLE_COMPRESSION
890  snappy::RawUncompress(readToHere, numBytes, myBytes);
891 #else
892  memcpy(myBytes, readToHere, numBytes);
893 #endif
894 
895  } else {
896  errMsg =
897  "Tried to add data of the wrong type to a database set or database set "
898  "doesn't exit.\n";
899  everythingOK = false;
900  }
901 #ifdef ENABLE_COMPRESSION
902  delete[] readToHere;
903 #else
904  free(readToHere);
905 #endif
906 
907  numBytes = sendUsingMe->getSizeOfNextObject();
908  if (requestInLoop != nullptr) {
909  free(requestInLoop);
910  }
911  requestInLoop = malloc(numBytes);
912  curRequest = sendUsingMe->getNextObject<StorageAddObjectInLoop>(
913  requestInLoop, everythingOK, errMsg);
914  std::cout << "got new StorageAddObjectInLoop" << std::endl;
915  }
916  if (requestInLoop != nullptr) {
917  free(requestInLoop);
918  }
919  {
920  const UseTemporaryAllocationBlock block{1024};
921  Handle<SimpleRequestResult> response =
922  makeObject<SimpleRequestResult>(everythingOK, errMsg);
923 
924  // return the result
925  everythingOK = sendUsingMe->sendObject(response, errMsg);
926  }
927  std::cout << "end StorageAddObjectInLoop" << std::endl;
928  return make_pair(everythingOK, errMsg);
929  }));
930 
931 
932  // this handler accepts a request to store some data
933  forMe.registerHandler(
934  StorageAddData_TYPEID,
936  PDBCommunicatorPtr sendUsingMe) {
937  std::string errMsg;
938  bool everythingOK = true;
939  bool typeCheckOrNot = request->isTypeCheck();
940  if (typeCheckOrNot == true) {
941 #ifdef DEBUG_SET_TYPE
942  // first, check with the catalog to make sure that the given database, set, and type
943  // are correct
944  int16_t typeID = getFunctionality<CatalogServer>().getObjectType(
945  request->getDatabase(), request->getSetName());
946  if (typeID < 0) {
947  everythingOK = false;
948  }
949  // if we made it here, the type is correct, as is the database and the set
950  else if (typeID != VTableMap::getIDByName(request->getType(), false)) {
951  everythingOK = false;
952  }
953 #endif
954  }
955 
956  // get the record
957  size_t numBytes = sendUsingMe->getSizeOfNextObject();
958  bool compressedOrNot = request->isCompressed();
959  Handle<Vector<Handle<Object>>> objectsToStore = nullptr;
960  char* readToHere = nullptr;
961  if (compressedOrNot == false) {
962  readToHere = (char*)malloc(numBytes);
963  objectsToStore = sendUsingMe->getNextObject<Vector<Handle<Object>>>(
964  readToHere, everythingOK, errMsg);
965  } else {
966  char* temp = new char[numBytes];
967  sendUsingMe->receiveBytes(temp, errMsg);
968  size_t uncompressedSize = 0;
969  snappy::GetUncompressedLength(temp, numBytes, &uncompressedSize);
970  readToHere = (char*)malloc(uncompressedSize);
971  snappy::RawUncompress(temp, numBytes, (char*)(readToHere));
972  Record<Vector<Handle<Object>>>* myRecord =
973  (Record<Vector<Handle<Object>>>*)readToHere;
974  objectsToStore = myRecord->getRootObject();
975  delete[] temp;
976  }
977 
978  if (objectsToStore->size() == 0) {
979  everythingOK = false;
980  errMsg =
981  "Warning: client attemps to store a vector that contains zero objects, simply "
982  "ignores it";
983  std::cout << errMsg << std::endl;
984  }
985 
986  if (request->isFlushing() == false) {
987  const UseTemporaryAllocationBlock block{1024};
988  Handle<SimpleRequestResult> response =
989  makeObject<SimpleRequestResult>(everythingOK, errMsg);
990 
991  // return the result
992  everythingOK = sendUsingMe->sendObject(response, errMsg);
993  }
994 
995  if (everythingOK) {
996  pthread_mutex_lock(&counterMutex);
998  pthread_mutex_unlock(&counterMutex);
999  const LockGuard guard{workingMutex};
1000  // at this point, we have performed the serialization, so remember the record
1001  auto databaseAndSet = make_pair((std::string)request->getDatabase(),
1002  (std::string)request->getSetName());
1003 
1004  SetPtr mySet = getFunctionality<PangeaStorageServer>().getSet(databaseAndSet);
1005  size_t myPageSize = mySet->getPageSize();
1006  if (request->isDirectPut() == false) {
1007 
1008  getFunctionality<PangeaStorageServer>().bufferRecord(
1009  databaseAndSet, (Record<Vector<Handle<Object>>>*)readToHere);
1010 
1011 
1012  size_t numBytesToProcess = sizes[databaseAndSet];
1013  size_t rawPageSize = myPageSize;
1014 
1015  if (numBytesToProcess < rawPageSize) {
1016  PDB_COUT << "data is buffered, all buffered data size=" << numBytesToProcess
1017  << std::endl;
1018  } else {
1019  // if we have enough space to fill up a page, do it
1020  std::cout << "Got the data.\n";
1021  std::cout << "Are " << sizes[databaseAndSet] << " bytes to write.\n";
1022  std::cout << "Page size is " << rawPageSize << std::endl;
1023  getFunctionality<PangeaStorageServer>().writeBackRecords(
1024  databaseAndSet, request->isFlushing());
1025  PDB_COUT << "Done with write back.\n";
1026  PDB_COUT << "Are " << sizes[databaseAndSet] << " bytes left.\n";
1027  }
1028 
1029  } else {
1030  Record<Vector<Handle<Object>>>* myRecord =
1031  (Record<Vector<Handle<Object>>>*)readToHere;
1032  if (myRecord->numBytes() <= myPageSize) {
1033  PDBPagePtr myPage =
1034  getFunctionality<PangeaStorageServer>().getNewPage(databaseAndSet);
1035  // memory copy
1036  memcpy(myPage->getBytes(), readToHere, myRecord->numBytes());
1037  // unpin the page
1038  CacheKey key;
1039  key.dbId = myPage->getDbID();
1040  key.typeId = myPage->getTypeID();
1041  key.setId = myPage->getSetID();
1042  key.pageId = myPage->getPageID();
1043  getFunctionality<PangeaStorageServer>().getCache()->decPageRefCount(key);
1044  if (request->isFlushing() == true) {
1045  getFunctionality<PangeaStorageServer>()
1046  .getCache()
1047  ->flushPageWithoutEviction(key);
1048  }
1049  } else {
1050  errMsg = "Tried to directly put larger data than the page, size=" +
1051  std::to_string(myRecord->numBytes());
1052  std::cout << errMsg << std::endl;
1053  everythingOK = false;
1054  }
1055  }
1056  pthread_mutex_lock(&counterMutex);
1058  pthread_mutex_unlock(&counterMutex);
1059  } else {
1060  errMsg =
1061  "Tried to add data of the wrong type to a database set or database set doesn't "
1062  "exit.\n";
1063  everythingOK = false;
1064  }
1065  if (request->isFlushing() == true) { // this is a client query
1066  const UseTemporaryAllocationBlock block{1024};
1067  Handle<SimpleRequestResult> response =
1068  makeObject<SimpleRequestResult>(everythingOK, errMsg);
1069 
1070  // return the result
1071  everythingOK = sendUsingMe->sendObject(response, errMsg);
1072  }
1073  return make_pair(everythingOK, errMsg);
1074  }));
1075 
1076 
1077  // this handler accepts a request to get data from a set
1078  forMe.registerHandler(
1079  StorageGetData_TYPEID,
1081  PDBCommunicatorPtr sendUsingMe) {
1082  std::string errMsg;
1083  bool res;
1084  // add a set in local
1085  SetPtr set = getFunctionality<PangeaStorageServer>().getSet(
1086  std::pair<std::string, std::string>(request->getDatabase(), request->getSetName()));
1087  if (set == nullptr) {
1088  errMsg = "Set doesn't exist.\n";
1089  res = false;
1090  } else {
1091  int numPages = set->getNumPages();
1092  {
1093  const UseTemporaryAllocationBlock tempBlock{1024};
1094  Handle<StorageGetDataResponse> response = makeObject<StorageGetDataResponse>(
1095  numPages,
1096  request->getDatabase(),
1097  request->getSetName(),
1098  set->getPageSize(),
1099  set->getPageSize() -
1100  (sizeof(NodeID) + sizeof(DatabaseID) + sizeof(UserTypeID) +
1101  sizeof(SetID) + sizeof(PageID) + sizeof(int) + sizeof(size_t)),
1102  res,
1103  errMsg);
1104  res = sendUsingMe->sendObject(response, errMsg);
1105  }
1106  if (getFunctionality<PangeaStorageServer>().isStandalone() == true) {
1107  int16_t typeID = VTableMap::getIDByName(request->getType(), false);
1108  PDB_COUT << "TypeID =" << typeID << std::endl;
1109  if (typeID == -1) {
1110  errMsg = "Could not find type " + request->getType();
1111  res = false;
1112  } else {
1113  getFunctionality<PangeaStorageServer>().getCache()->pin(set, MRU, Read);
1114  std::vector<PageIteratorPtr>* iterators = set->getIterators();
1115  int numIterators = iterators->size();
1116  int numPagesSent = 0;
1117  for (int i = 0; i < numIterators; i++) {
1118  PageIteratorPtr iter = iterators->at(i);
1119  while (iter->hasNext()) {
1120  PDBPagePtr page = iter->next();
1121  if (page != nullptr) {
1122  PDB_COUT << "to send the " << numPagesSent << "-th page"
1123  << std::endl;
1124  res = sendUsingMe->sendBytes(
1125  page->getRawBytes(), page->getRawSize(), errMsg);
1126  page->unpin();
1127  if (res == false) {
1128  std::cout << "sending data failed\n";
1129  return make_pair(res, errMsg);
1130  }
1131  numPagesSent++;
1132  }
1133  }
1134  }
1135  PDB_COUT << "All done!\n";
1136  delete iterators;
1137  }
1138  }
1139  }
1140  return make_pair(res, errMsg);
1141  }));
1142 
1143  // this handler collect statistics in this storage
1144  forMe.registerHandler(
1145  StorageCollectStats_TYPEID,
1147  Handle<StorageCollectStats> request, PDBCommunicatorPtr sendUsingMe) {
1148  const UseTemporaryAllocationBlock myBlock{4 * 1024 * 1024};
1149  std::string errMsg;
1151  makeObject<StorageCollectStatsResponse>();
1153  makeObject<Vector<Handle<SetIdentifier>>>();
1154  // iterate sets
1155  for (std::map<std::pair<DatabaseID, SetID>, SetPtr>::iterator it =
1156  this->userSets->begin();
1157  it != this->userSets->end();
1158  ++it) {
1159  std::pair<DatabaseID, SetID> idPair = it->first;
1160  SetPtr set = it->second;
1161  std::string setName = set->getSetName();
1162  int numPages = set->getNumPages();
1163  DatabaseID dbId = idPair.first;
1164  DefaultDatabasePtr db = this->getDatabase(dbId);
1165  std::string dbName = db->getDatabaseName();
1166  Handle<SetIdentifier> setIdentifier = makeObject<SetIdentifier>(dbName, setName);
1167  setIdentifier->setNumPages(numPages);
1168  setIdentifier->setPageSize(set->getPageSize());
1169  stats->push_back(setIdentifier);
1170  }
1171  response->setStats(stats);
1172  bool res = sendUsingMe->sendObject<StorageCollectStatsResponse>(response, errMsg);
1173  return make_pair(res, errMsg);
1174  }));
1175 
1176  // this handler accepts a request to pin a page
1177  forMe.registerHandler(
1178  StoragePinPage_TYPEID,
1180  [&](Handle<StoragePinPage> request, PDBCommunicatorPtr sendUsingMe) {
1181  PDBLoggerPtr logger = make_shared<PDBLogger>("storagePinPage.log");
1182  DatabaseID dbId = request->getDatabaseID();
1183  UserTypeID typeId = request->getUserTypeID();
1184  SetID setId = request->getSetID();
1185  PageID pageId = request->getPageID();
1186  bool wasNewPage = request->getWasNewPage();
1187 
1188  PDB_COUT << "to pin page in set with setId=" << setId << std::endl;
1189  bool res;
1190  string errMsg;
1191 
1192  PDBPagePtr page = nullptr;
1193  SetPtr set = nullptr;
1194 
1195  if ((dbId == 0) && (typeId == 0)) {
1196  // temp set
1197  set = getFunctionality<PangeaStorageServer>().getTempSet(setId);
1198  } else {
1199  // user set
1200  set = getFunctionality<PangeaStorageServer>().getSet(dbId, typeId, setId);
1201  }
1202 
1203  if (set != nullptr) {
1204  if (wasNewPage == true) {
1205  page = set->addPage();
1206  } else {
1207  PartitionedFilePtr file = set->getFile();
1208  PartitionedFileMetaDataPtr meta = file->getMetaData();
1209  PageIndex index = meta->getPageIndex(pageId);
1210  page = set->getPage(index.partitionId, index.pageSeqInPartition, pageId);
1211  }
1212  }
1213 
1214  if (page != nullptr) {
1215  logger->debug(
1216  std::string("Handling StoragePinPage: page is not null, we build the "
1217  "StoragePagePinned message"));
1218  const UseTemporaryAllocationBlock myBlock{2048};
1219  Handle<StoragePagePinned> ack = makeObject<StoragePagePinned>();
1220  ack->setMorePagesToLoad(true);
1221  ack->setDatabaseID(dbId);
1222  ack->setUserTypeID(typeId);
1223  ack->setSetID(setId);
1224  ack->setPageID(page->getPageID());
1225  ack->setPageSize(page->getRawSize());
1226  ack->setSharedMemOffset(page->getOffset());
1227  logger->debug(
1228  std::string("Handling StoragePinPage: to send StoragePagePinned message"));
1229  res = sendUsingMe->sendObject<StoragePagePinned>(ack, errMsg);
1230  logger->debug(
1231  std::string("Handling StoragePinPage: sent StoragePagePinned message"));
1232  } else {
1233  res = false;
1234  errMsg = "Fatal Error: Page doesn't exist for pinning page.";
1235  std::cout << "dbId = " << dbId << ", typeId = " << typeId
1236  << ", setId = " << setId << std::endl;
1237  std::cout << errMsg << std::endl;
1238  logger->error(errMsg);
1239  }
1240  return make_pair(res, errMsg);
1241  }));
1242 
1243  // this handler accepts a request to pin bytes in a set
1244  forMe.registerHandler(
1245  StoragePinBytes_TYPEID,
1247  [&](Handle<StoragePinBytes> request, PDBCommunicatorPtr sendUsingMe) {
1248  PDBLoggerPtr logger = make_shared<PDBLogger>("storagePinPage.log");
1249  DatabaseID dbId = request->getDatabaseID();
1250  UserTypeID typeId = request->getUserTypeID();
1251  SetID setId = request->getSetID();
1252  size_t sizeOfBytes = request->getSizeOfBytes();
1253 
1254  bool res;
1255  string errMsg;
1256 
1257  PDBPagePtr page = nullptr;
1258  SetPtr set = nullptr;
1259 
1260  if ((dbId == 0) && (typeId == 0)) {
1261  // temp set
1262  set = getFunctionality<PangeaStorageServer>().getTempSet(setId);
1263  } else {
1264  // user set
1265  set = getFunctionality<PangeaStorageServer>().getSet(dbId, typeId, setId);
1266  }
1267 
1268  void* myBytes = nullptr;
1269  if (set != nullptr) {
1270  myBytes = set->getNewBytes(sizeOfBytes);
1271  }
1272 
1273  if (myBytes != nullptr) {
1274  const UseTemporaryAllocationBlock myBlock{2048};
1275  Handle<StorageBytesPinned> ack = makeObject<StorageBytesPinned>();
1276  ack->setSizeOfBytes(sizeOfBytes);
1277  size_t offset = this->shm->computeOffset(myBytes);
1278  ack->setSharedMemOffset(offset);
1279  res = sendUsingMe->sendObject<StorageBytesPinned>(ack, errMsg);
1280  } else {
1281  res = false;
1282  errMsg = "Can't get " + std::to_string(sizeOfBytes) + " bytes for set:";
1283  std::cout << "dbId = " << dbId << ", typeId = " << typeId
1284  << ", setId = " << setId << std::endl;
1285  std::cout << errMsg << std::endl;
1286  logger->error(errMsg);
1287  }
1288  return make_pair(res, errMsg);
1289  }));
1290 
1291 
1292  // this handler accepts a request to unpin a page
1293  forMe.registerHandler(
1294  StorageUnpinPage_TYPEID,
1296  PDBCommunicatorPtr sendUsingMe) {
1297 
1298  PDBLoggerPtr logger = make_shared<PDBLogger>("storageUnpinPage.log");
1299 
1300  DatabaseID dbId = request->getDatabaseID();
1301  UserTypeID typeId = request->getUserTypeID();
1302  SetID setId = request->getSetID();
1303  PageID pageId = request->getPageID();
1304 
1305  CacheKey key;
1306  key.dbId = dbId;
1307  key.typeId = typeId;
1308  key.setId = setId;
1309  key.pageId = pageId;
1310 
1311 
1312  bool res;
1313  std::string errMsg;
1314  if (getFunctionality<PangeaStorageServer>().getCache()->decPageRefCount(key) == false) {
1315  res = false;
1316  errMsg = "Fatal Error: Page doesn't exist for unpinning page.";
1317  std::cout << "dbId=" << dbId << ", typeId=" << typeId << ", setId=" << setId
1318  << ", pageId=" << pageId << std::endl;
1319  std::cout << errMsg << std::endl;
1320  logger->error(errMsg);
1321  } else {
1322 #ifdef ENABLE_EVICTION
1323  getFunctionality<PangeaStorageServer>().getCache()->evictPage(key);
1324 #endif
1325  res = true;
1326  }
1327 
1328  logger->debug(std::string("Making response object.\n"));
1329  const UseTemporaryAllocationBlock block{1024};
1330  Handle<SimpleRequestResult> response = makeObject<SimpleRequestResult>(res, errMsg);
1331 
1332  // return the result
1333  logger->debug(std::string("Sending response object.\n"));
1334  res = sendUsingMe->sendObject(response, errMsg);
1335  logger->debug(std::string("response sent for StorageUnpinPage.\n"));
1336 
1337  return make_pair(res, errMsg);
1338 
1339  }));
1340 
1341 
1342  // this handler accepts a request to load all pages in a set to memory iteratively, and send
1343  // back information about loaded pages
1344  forMe.registerHandler(
1345  StorageGetSetPages_TYPEID,
1347  Handle<StorageGetSetPages> request, PDBCommunicatorPtr sendUsingMe) {
1348 
1349  DatabaseID dbId = request->getDatabaseID();
1350  UserTypeID typeId = request->getUserTypeID();
1351  SetID setId = request->getSetID();
1352 
1353  bool res = true;
1354  std::string errMsg;
1355 
1356  SetPtr set = getFunctionality<PangeaStorageServer>().getSet(dbId, typeId, setId);
1357  if (set == nullptr) {
1358  res = false;
1359  errMsg = "Fatal Error: Set doesn't exist.";
1360  std::cout << errMsg << std::endl;
1361  return make_pair(res, errMsg);
1362  }
1363 
1364  // use frontend iterators: one iterator for in-memory dirty pages, and one iterator for
1365  // each file partition
1366  std::vector<PageIteratorPtr>* iterators = set->getIterators();
1367  getFunctionality<PangeaStorageServer>().getCache()->pin(set, MRU, Write);
1368 
1369  set->setPinned(true);
1370  int numIterators = iterators->size();
1371 
1372  PDBBuzzerPtr tempBuzzer = make_shared<PDBBuzzer>([&](PDBAlarm myAlarm, int& counter) {
1373  counter++;
1374  PDB_COUT << "counter = " << counter << std::endl;
1375  });
1376 
1377  // scan pages and load pages in a multi-threaded style
1378 
1379  int counter = 0;
1380  for (int i = 0; i < numIterators; i++) {
1381  PDBWorkerPtr worker = getFunctionality<PangeaStorageServer>().getWorker();
1382  PDBScanWorkPtr scanWork = make_shared<PDBScanWork>(
1383  iterators->at(i), &getFunctionality<PangeaStorageServer>(), counter);
1384  worker->execute(scanWork, tempBuzzer);
1385  }
1386 
1387  while (counter < numIterators) {
1388  tempBuzzer->wait();
1389  }
1390  set->setPinned(false);
1391  delete iterators;
1392 
1393  // here, we have already loaded all pages, and sent all information about those pages to
1394  // the other side, now we need inform the other side that this process has been done.
1395  // The other side has closed connection, so first we need to create a separate
1396  // connection to backend
1397  PDBCommunicatorPtr communicatorToBackEnd = make_shared<PDBCommunicator>();
1398  if (communicatorToBackEnd->connectToLocalServer(
1399  getFunctionality<PangeaStorageServer>().getLogger(),
1400  getFunctionality<PangeaStorageServer>().getPathToBackEndServer(),
1401  errMsg)) {
1402  res = false;
1403  std::cout << errMsg << std::endl;
1404  return make_pair(res, errMsg);
1405  }
1406 
1407  UseTemporaryAllocationBlock myBlock{1024};
1408  Handle<StorageNoMorePage> noMorePage = makeObject<StorageNoMorePage>();
1409  if (!communicatorToBackEnd->sendObject<StorageNoMorePage>(noMorePage, errMsg)) {
1410  res = false;
1411  std::cout << errMsg << std::endl;
1412  return make_pair(res, errMsg);
1413  }
1414 
1415  return make_pair(res, errMsg);
1416  }));
1417 
1418  // this handler accepts a request to translate <databaseName, setName> into <databaseId, typeId,
1419  // setId> and forward to backend
1420  forMe.registerHandler(
1421  StorageTestSetScan_TYPEID,
1423  [&](Handle<StorageTestSetScan> request, PDBCommunicatorPtr sendUsingMe) {
1424 
1425  std::string dbName = request->getDatabase();
1426  std::string setName = request->getSetName();
1427  SetPtr set = getFunctionality<PangeaStorageServer>().getSet(
1428  std::pair<std::string, std::string>(dbName, setName));
1429 
1430  bool res;
1431  std::string errMsg;
1432 
1433  if (set == nullptr) {
1434  res = false;
1435  errMsg = "Fatal Error: Set doesn't exist!";
1436  std::cout << errMsg << std::endl;
1437  return make_pair(res, errMsg);
1438  } else {
1439  // first we need to create a separate connection to backend
1440  PDBCommunicatorPtr communicatorToBackend = make_shared<PDBCommunicator>();
1441  if (communicatorToBackend->connectToLocalServer(
1442  getFunctionality<PangeaStorageServer>().getLogger(),
1443  getFunctionality<PangeaStorageServer>().getPathToBackEndServer(),
1444  errMsg)) {
1445  res = false;
1446  std::cout << errMsg << std::endl;
1447  return make_pair(res, errMsg);
1448  }
1449 
1450  DatabaseID dbId = set->getDbID();
1451  UserTypeID typeId = set->getTypeID();
1452  SetID setId = set->getSetID();
1453 
1454  {
1455  const UseTemporaryAllocationBlock myBlock{1024};
1457  makeObject<BackendTestSetScan>(dbId, typeId, setId);
1458  if (!communicatorToBackend->sendObject<BackendTestSetScan>(msg, errMsg)) {
1459  res = false;
1460  std::cout << errMsg << std::endl;
1461  return make_pair(res, errMsg);
1462  }
1463  }
1464 
1465  {
1466  const UseTemporaryAllocationBlock myBlock{
1467  communicatorToBackend->getSizeOfNextObject()};
1468  communicatorToBackend->getNextObject<SimpleRequestResult>(res, errMsg);
1469  }
1470 
1471  {
1472  const UseTemporaryAllocationBlock block{1024};
1473  Handle<SimpleRequestResult> response =
1474  makeObject<SimpleRequestResult>(res, errMsg);
1475 
1476  // return the result
1477  res = sendUsingMe->sendObject(response, errMsg);
1478  }
1479  return make_pair(res, errMsg);
1480  }
1481 
1482  }));
1483 
1484  // this handler accepts a request to translate <<srcDatabaseName, srcSetName>,
1485  // <destDatabaseName, destSetName>> into <<srcDatabaseId, srcTypeId, SrcSetId>, <destDatabaseId,
1486  // destTypeId, destSetId>> and forward to backend
1487  forMe.registerHandler(
1488  StorageTestSetCopy_TYPEID,
1490  Handle<StorageTestSetCopy> request, PDBCommunicatorPtr sendUsingMe) {
1491 
1492  std::string dbNameIn = request->getDatabaseIn();
1493  std::string setNameIn = request->getSetNameIn();
1494  SetPtr setIn = getFunctionality<PangeaStorageServer>().getSet(
1495  std::pair<std::string, std::string>(dbNameIn, setNameIn));
1496 
1497  std::string dbNameOut = request->getDatabaseOut();
1498  std::string setNameOut = request->getSetNameOut();
1499  SetPtr setOut = getFunctionality<PangeaStorageServer>().getSet(
1500  std::pair<std::string, std::string>(dbNameOut, setNameOut));
1501 
1502 
1503  bool res;
1504  std::string errMsg;
1505 
1506  if (setIn == nullptr) {
1507  res = false;
1508  errMsg = "Fatal Error: Input set doesn't exist!";
1509  std::cout << errMsg << std::endl;
1510  }
1511 
1512  if (setOut == nullptr) {
1513  res = false;
1514  errMsg += "Fatal Error: Output set doesn't exist!";
1515  std::cout << errMsg << std::endl;
1516  }
1517 
1518 
1519  if ((setIn != nullptr) && (setOut != nullptr)) {
1520  // first we need to create a separate connection to backend
1521  PDBCommunicatorPtr communicatorToBackend = make_shared<PDBCommunicator>();
1522  if (communicatorToBackend->connectToLocalServer(
1523  getFunctionality<PangeaStorageServer>().getLogger(),
1524  getFunctionality<PangeaStorageServer>().getPathToBackEndServer(),
1525  errMsg)) {
1526  res = false;
1527  std::cout << errMsg << std::endl;
1528  return make_pair(res, errMsg);
1529  }
1530 
1531  DatabaseID dbIdIn = setIn->getDbID();
1532  UserTypeID typeIdIn = setIn->getTypeID();
1533  SetID setIdIn = setIn->getSetID();
1534  DatabaseID dbIdOut = setOut->getDbID();
1535  UserTypeID typeIdOut = setOut->getTypeID();
1536  SetID setIdOut = setOut->getSetID();
1537 
1538 
1539  const UseTemporaryAllocationBlock myBlock{4096};
1540  Handle<BackendTestSetCopy> msg = makeObject<BackendTestSetCopy>(
1541  dbIdIn, typeIdIn, setIdIn, dbIdOut, typeIdOut, setIdOut);
1542  if (!communicatorToBackend->sendObject<BackendTestSetCopy>(msg, errMsg)) {
1543  res = false;
1544  std::cout << errMsg << std::endl;
1545  } else {
1546  const UseTemporaryAllocationBlock myBlock{
1547  communicatorToBackend->getSizeOfNextObject()};
1548  communicatorToBackend->getNextObject<SimpleRequestResult>(res, errMsg);
1549  }
1550  }
1551 
1552  {
1553  const UseTemporaryAllocationBlock block{1024};
1554  Handle<SimpleRequestResult> response = makeObject<SimpleRequestResult>(res, errMsg);
1555 
1556  // return the result
1557  res = sendUsingMe->sendObject(response, errMsg);
1558  }
1559 
1560  return make_pair(res, errMsg);
1561 
1562  }));
1563 }
1564 
1565 
1566 // Returns ipc path to backend
1568  return this->pathToBackEndServer;
1569 }
1570 
1571 // Returns server name
1573  return this->serverName;
1574 }
1575 
1576 // Returns nodeId
1578  return this->nodeId;
1579 }
1580 
1581 // Encode database path
1582 string PangeaStorageServer::encodeDBPath(string rootPath, DatabaseID dbId, string dbName) {
1583  char buffer[500];
1584  sprintf(buffer, "%s/%d_%s", rootPath.c_str(), dbId, dbName.c_str());
1585  return string(buffer);
1586 }
1587 
1588 // create temp directories
1590  if ((this->metaTempPath = this->conf->getMetaTempDir()).compare("") != 0) {
1591  boost::filesystem::remove_all(metaTempPath);
1592  this->conf->createDir(metaTempPath);
1593  PDB_COUT << "metaTempPath:" << metaTempPath << "\n";
1594  }
1595  string strDataTempPaths = this->conf->getDataTempDirs();
1596  string curDataTempPath;
1597  size_t startPos = 0;
1598  size_t curPos;
1599  if ((curPos = strDataTempPaths.find(',')) == string::npos) {
1600  boost::filesystem::remove_all(strDataTempPaths);
1601  this->conf->createDir(strDataTempPaths);
1602  PDB_COUT << "dataTempPath:" << strDataTempPaths << "\n";
1603  dataTempPaths.push_back(strDataTempPaths);
1604  } else {
1605  while ((curPos = strDataTempPaths.find(',')) != string::npos) {
1606  curDataTempPath = strDataTempPaths.substr(startPos, curPos);
1607  boost::filesystem::remove_all(curDataTempPath);
1608  this->conf->createDir(curDataTempPath);
1609  PDB_COUT << "dataTempPath:" << curDataTempPath << "\n";
1610  dataTempPaths.push_back(curDataTempPath);
1611  strDataTempPaths = strDataTempPaths.substr(curPos + 1, strDataTempPaths.length() + 1);
1612  }
1613  boost::filesystem::remove_all(strDataTempPaths);
1614  this->conf->createDir(strDataTempPaths);
1615  PDB_COUT << "dataTempPath:" << strDataTempPaths << "\n";
1616  dataTempPaths.push_back(strDataTempPaths);
1617  }
1618 }
1619 
1620 // Create database directories
1622 
1623  if ((this->metaRootPath = this->conf->getMetaDir()).compare("") != 0) {
1624  this->conf->createDir(metaRootPath);
1625  }
1626  string strDataRootPaths = this->conf->getDataDirs();
1627  string curDataRootPath;
1628  size_t startPos = 0;
1629  size_t curPos;
1630  if ((curPos = strDataRootPaths.find(',')) == string::npos) {
1631  this->conf->createDir(strDataRootPaths);
1632  dataRootPaths.push_back(strDataRootPaths);
1633  } else {
1634  while ((curPos = strDataRootPaths.find(',')) != string::npos) {
1635  curDataRootPath = strDataRootPaths.substr(startPos, curPos);
1636  this->conf->createDir(curDataRootPath);
1637  dataRootPaths.push_back(curDataRootPath);
1638  strDataRootPaths = strDataRootPaths.substr(curPos + 1, strDataRootPaths.length() + 1);
1639  }
1640  this->conf->createDir(strDataRootPaths);
1641  dataRootPaths.push_back(strDataRootPaths);
1642  }
1643 }
1644 
1645 
1646 // add a new and empty database
1647 bool PangeaStorageServer::addDatabase(string dbName, DatabaseID dbId) {
1648  if (this->dbs->find(dbId) != this->dbs->end()) {
1649  this->logger->writeLn("PDBStorage: database exists.");
1650  return false;
1651  }
1652  string metaDBPath;
1653  if (this->metaRootPath.compare("") != 0) {
1654  metaDBPath = encodeDBPath(this->metaRootPath, dbId, dbName);
1655  this->conf->createDir(metaDBPath);
1656  } else {
1657  metaDBPath = "";
1658  }
1659  vector<string>* dataDBPaths = new vector<string>();
1660  unsigned int i;
1661  string curDataDBPath;
1662  for (i = 0; i < this->dataRootPaths.size(); i++) {
1663  curDataDBPath = encodeDBPath(this->dataRootPaths.at(i), dbId, dbName);
1664  dataDBPaths->push_back(curDataDBPath);
1665  }
1666  DefaultDatabasePtr db = make_shared<DefaultDatabase>(this->nodeId,
1667  dbId,
1668  dbName,
1669  this->conf,
1670  this->logger,
1671  this->shm,
1672  metaDBPath,
1673  dataDBPaths,
1674  this->cache,
1675  this->flushBuffer);
1676 
1677  pthread_mutex_lock(&this->databaseLock);
1678  this->dbs->insert(pair<DatabaseID, DefaultDatabasePtr>(dbId, db));
1679  this->name2id->insert(pair<string, DatabaseID>(dbName, dbId));
1680  SequenceID* seqId = new SequenceID();
1681  this->usersetSeqIds->insert(pair<string, SequenceID*>(dbName, seqId));
1682  pthread_mutex_unlock(&this->databaseLock);
1683  return true;
1684 }
1685 
1686 
1687 // add a new and empty database using only name
1688 bool PangeaStorageServer::addDatabase(std::string dbName) {
1689  if (name2id->count(dbName) != 0) {
1690  std::cout << "Database " << dbName << " exists" << std::endl;
1691  return false;
1692  }
1693  pthread_mutex_lock(&this->databaseLock);
1695  pthread_mutex_unlock(&this->databaseLock);
1696  return this->addDatabase(dbName, dbId);
1697 }
1698 
1699 
1700 // clear database data and disk files for removal
1701 void PangeaStorageServer::clearDB(DatabaseID dbId, string dbName) {
1702  unsigned int i;
1703  string path;
1704  path = this->encodeDBPath(this->metaRootPath, dbId, dbName);
1705  boost::filesystem::remove_all(path.c_str());
1706  for (i = 0; i < this->dataRootPaths.size(); i++) {
1707  path = this->encodeDBPath(this->dataRootPaths.at(i), dbId, dbName);
1708  boost::filesystem::remove_all(path.c_str());
1709  }
1710 }
1711 
1712 
1713 // remove database
1714 bool PangeaStorageServer::removeDatabase(std::string dbName) {
1715  DatabaseID dbId;
1716  if (name2id->count(dbName) != 0) {
1717  dbId = name2id->at(dbName);
1718  } else {
1719  // database doesn't exist;
1720  return false;
1721  }
1722  // TODO we need to delete files on disk
1723  map<DatabaseID, DefaultDatabasePtr>::iterator it = this->dbs->find(dbId);
1724  if (it != this->dbs->end()) {
1725  pthread_mutex_lock(&this->databaseLock);
1726  string dbName = it->second->getDatabaseName();
1727  clearDB(dbId, dbName);
1728  map<string, DatabaseID>::iterator name2idIt = this->name2id->find(dbName);
1729  name2id->erase(name2idIt);
1730  dbs->erase(it);
1731  usersetSeqIds->erase(dbName);
1732  pthread_mutex_unlock(&this->databaseLock);
1733  return true;
1734  } else {
1735  this->logger->writeLn("Database doesn't exist:");
1736  this->logger->writeInt(dbId);
1737  return false;
1738  }
1739 }
1740 
1741 // return database
1743  map<DatabaseID, DefaultDatabasePtr>::iterator it = this->dbs->find(dbId);
1744  if (it != this->dbs->end()) {
1745  return it->second;
1746  }
1747  return nullptr;
1748 }
1749 
1750 // to add a new and empty type
1751 bool PangeaStorageServer::addType(std::string typeName, UserTypeID typeId) {
1752  if (this->typename2id->count(typeName) != 0) {
1753  // the type exists!
1754  return false;
1755  } else {
1756 
1757  pthread_mutex_lock(&this->typeLock);
1758  this->typename2id->insert(std::pair<std::string, UserTypeID>(typeName, typeId));
1759  pthread_mutex_unlock(&this->typeLock);
1760  }
1761  return true;
1762 }
1763 
1764 
1765 // to remove a type from a database, and also all sets in the database having that type
1766 bool PangeaStorageServer::removeTypeFromDatabase(std::string dbName, std::string typeName) {
1767  if (this->name2id->count(dbName) == 0) {
1768  // database doesn't exist
1769  return false;
1770  } else {
1771  // database exists
1772  if (this->typename2id->count(typeName) == 0) {
1773  // type doesn't exist
1774  return false;
1775  } else {
1776  DatabaseID dbId = this->name2id->at(dbName);
1777  DefaultDatabasePtr db = this->getDatabase(dbId);
1778  UserTypeID typeId = this->typename2id->at(typeName);
1779  db->removeType(typeId);
1780  }
1781  }
1782  return true;
1783 }
1784 
1785 // to remove a type from the typeName to typeId mapping
1786 bool PangeaStorageServer::removeType(std::string typeName) {
1787  if (this->typename2id->count(typeName) == 0) {
1788  // the type doesn't exist
1789  return false;
1790  } else {
1791  pthread_mutex_lock(&this->typeLock);
1792  this->typename2id->erase(typeName);
1793  pthread_mutex_unlock(&this->typeLock);
1794  }
1795  return true;
1796 }
1797 
1798 
1799 // to add a new and empty set
1801  std::string dbName, std::string typeName, std::string setName, SetID setId, size_t pageSize) {
1802  SetPtr set = getSet(std::pair<std::string, std::string>(dbName, setName));
1803  if (set != nullptr) {
1804  // set exists
1805  std::cout << "Set exists with setName=" << setName << std::endl;
1806  return false;
1807  }
1808  if (this->name2id->count(dbName) == 0) {
1809  // database doesn't exist
1810  std::cout << "Database doesn't exist with dbName=" << dbName << std::endl;
1811  return false;
1812  } else {
1813  // database exists
1814  if (this->typename2id->count(typeName) == 0) {
1815  // type doesn't exist
1816  // now we fetch the type id through catalog
1817  int typeId = VTableMap::getIDByName(typeName, false);
1818  if ((typeId <= 0) || (typeId == 8191)) {
1819  PDB_COUT << "type doesn't exist for name=" << typeName
1820  << ", and we store it as default type" << std::endl;
1821  typeName = "UnknownUserData";
1822  this->addType(typeName, (UserTypeID)-1);
1823  } else {
1824  PDB_COUT << "Pangea add new type when add set: typeName=" << typeName << std::endl;
1825  PDB_COUT << "Pangea add new type when add set: typeId=" << typeId << std::endl;
1826  this->addType(typeName, (UserTypeID)typeId);
1827  }
1828  }
1829  }
1830  DatabaseID dbId = this->name2id->at(dbName);
1831  DefaultDatabasePtr db = this->getDatabase(dbId);
1832  UserTypeID typeId = this->typename2id->at(typeName);
1833  TypePtr type = db->getType(typeId);
1834  if (type == nullptr) {
1835  // type hasn't been added to the database, so we need to add it first for creating
1836  // hierarchical directory so that optimization like compression can be applied at type and
1837  // database level.
1838  db->addType(typeName, typeId);
1839  type = db->getType(typeId);
1840  } else {
1841  set = type->getSet(setId);
1842  if (set != nullptr) {
1843  return false;
1844  }
1845  }
1846  type->addSet(setName, setId, pageSize);
1847  std::cout << "to add set with dbName=" << dbName << ", typeName=" << typeName
1848  << ", setName=" << setName << ", setId=" << setId << ", pageSize=" << pageSize
1849  << std::endl;
1850  set = type->getSet(setId);
1851  this->getCache()->pin(set, MRU, Write);
1852  PDB_COUT << "to get usersetLock" << std::endl;
1853  pthread_mutex_lock(&this->usersetLock);
1854  this->userSets->insert(std::pair<std::pair<DatabaseID, SetID>, SetPtr>(
1855  std::pair<DatabaseID, SetID>(dbId, setId), set));
1856  this->names2ids->insert(
1857  std::pair<std::pair<std::string, std::string>, std::pair<DatabaseID, SetID>>(
1858  std::pair<std::string, std::string>(dbName, setName),
1859  std::pair<DatabaseID, SetID>(dbId, setId)));
1860  pthread_mutex_unlock(&this->usersetLock);
1861  PDB_COUT << "released usersetLock" << std::endl;
1862  return true;
1863 }
1864 
1865 
1866 // to add a new and empty set using only name
1867 bool PangeaStorageServer::addSet(std::string dbName,
1868  std::string typeName,
1869  std::string setName,
1870  size_t pageSize) {
1871  pthread_mutex_lock(&this->usersetLock);
1872  if (usersetSeqIds->count(dbName) == 0) {
1873  // database doesn't exist
1874  pthread_mutex_unlock(&this->usersetLock);
1875  addDatabase(dbName);
1876  }
1877  SetID setId = usersetSeqIds->at(dbName)->getNextSequenceID();
1878  PDB_COUT << "to add set with dbName=" << dbName << ", typeName=" << typeName
1879  << ", setName=" << setName << ", setId=" << setId << std::endl;
1880  pthread_mutex_unlock(&this->usersetLock);
1881  return addSet(dbName, typeName, setName, setId, pageSize);
1882 }
1883 
1884 
1885 // to add a set using only database name and set name
1886 bool PangeaStorageServer::addSet(std::string dbName, std::string setName, size_t pageSize) {
1887  return addSet(dbName, "UnknownUserData", setName, pageSize);
1888 }
1889 
1890 bool PangeaStorageServer::removeSet(std::string dbName, std::string setName) {
1891  SetPtr set = getSet(std::pair<std::string, std::string>(dbName, setName));
1892  if (set == nullptr) {
1893  PDB_COUT << "set with dbName=" << dbName << " and setName=" << setName << " doesn't exist"
1894  << std::endl;
1895  return false;
1896  }
1897 #ifdef REMOVE_SET_WITH_EVICTION
1898  std::cout << "To evict all pages in set with dbName=" << dbName << " and setName=" << setName
1899  << " to remove the set" << std::endl;
1900  set->evictPages();
1901 #endif
1902  DatabaseID dbId = set->getDbID();
1903  UserTypeID typeId = set->getTypeID();
1904  SetID setId = set->getSetID();
1905  DefaultDatabasePtr database = dbs->at(dbId);
1906  TypePtr type = database->getType(typeId);
1907  pthread_mutex_lock(&this->usersetLock);
1908  type->removeSet(setId);
1909  int numRemoved = userSets->erase(std::pair<DatabaseID, SetID>(dbId, setId));
1910  PDB_COUT << "numItems removed from userSets:" << numRemoved << std::endl;
1911  numRemoved = names2ids->erase(std::pair<std::string, std::string>(dbName, setName));
1912  PDB_COUT << "numItems removed from names2ids:" << numRemoved << std::endl;
1913  pthread_mutex_unlock(&this->usersetLock);
1914  return true;
1915 }
1916 
1917 // to remove an existing set
1918 bool PangeaStorageServer::removeSet(std::string dbName, std::string typeName, std::string setName) {
1919  // get the type
1920  DatabaseID dbId;
1921  if (name2id->count(dbName) == 0) {
1922  // database doesn't exist
1923  return false;
1924  } else {
1925  dbId = name2id->at(dbName);
1926  DefaultDatabasePtr database = dbs->at(dbId);
1927  UserTypeID typeId;
1928  if (typename2id->count(typeName) == 0) {
1929  // type doesn't exist
1930  return false;
1931  } else {
1932  typeId = typename2id->at(typeName);
1933  TypePtr type = database->getType(typeId);
1934  if (type != nullptr) {
1935  SetPtr set = getSet(std::pair<std::string, std::string>(dbName, setName));
1936  if (set != nullptr) {
1937 #ifdef REMOVE_SET_WITH_EVICTION
1938  set->evictPages();
1939 #endif
1940  SetID setId = set->getSetID();
1941  pthread_mutex_lock(&this->usersetLock);
1942  type->removeSet(setId);
1943  userSets->erase(std::pair<DatabaseID, SetID>(dbId, setId));
1944  names2ids->erase(std::pair<std::string, std::string>(dbName, setName));
1945 
1946  pthread_mutex_unlock(&this->usersetLock);
1947  } else {
1948  return false;
1949  }
1950  }
1951  }
1952  }
1953  return true;
1954 }
1955 
1956 
1957 bool PangeaStorageServer::addTempSet(string setName, SetID& setId, size_t pageSize) {
1958  this->logger->writeLn("To add temp set with setName=");
1959  this->logger->writeLn(setName);
1960  if (this->name2tempSetId->find(setName) != this->name2tempSetId->end()) {
1961  cout << "TempSet exists!\n";
1962  this->logger->writeLn("TempSet exists for setName=");
1963  this->logger->writeLn(setName);
1964  return false;
1965  }
1966  setId = this->tempsetSeqId.getNextSequenceID();
1967  this->logger->writeLn("SetId=");
1968  this->logger->writeInt(setId);
1969  TempSetPtr tempSet = make_shared<TempSet>(setId,
1970  setName,
1971  this->metaTempPath,
1972  this->dataTempPaths,
1973  this->shm,
1974  this->cache,
1975  this->logger);
1976  this->getCache()->pin(tempSet, MRU, Write);
1977  this->logger->writeLn("temp set created!");
1978  pthread_mutex_lock(&this->tempsetLock);
1979  this->tempSets->insert(pair<SetID, TempSetPtr>(setId, tempSet));
1980  this->name2tempSetId->insert(pair<string, SetID>(setName, setId));
1981  pthread_mutex_unlock(&this->tempsetLock);
1982  return true;
1983 }
1984 
1986  map<SetID, TempSetPtr>::iterator it = this->tempSets->find(setId);
1987  if (it != this->tempSets->end()) {
1988  string setName = it->second->getSetName();
1989  it->second->clear();
1990  pthread_mutex_lock(&this->tempsetLock);
1991  this->name2tempSetId->erase(setName);
1992  this->tempSets->erase(it);
1993  pthread_mutex_unlock(&this->tempsetLock);
1994  return true;
1995  } else {
1996  return false;
1997  }
1998 }
1999 
2000 // returns specified temp set
2002  this->logger->writeLn("PDBStorage: Searching for temp set:");
2003  this->logger->writeInt(setId);
2004  map<SetID, TempSetPtr>::iterator it = this->tempSets->find(setId);
2005  if (it != this->tempSets->end()) {
2006  return it->second;
2007  }
2008  this->logger->writeLn("PDBStorage: TempSet doesn't exist:");
2009  this->logger->writeInt(setId);
2010  return nullptr;
2011 }
2012 
2013 // returns a specified set
2015  if ((dbId == 0) && (typeId == 0)) {
2016  SetPtr set = this->getTempSet(setId);
2017  return set;
2018  }
2019  DefaultDatabasePtr db = this->getDatabase(dbId);
2020  if (db == nullptr) {
2021  this->logger->writeLn("PDBStorage: Database doesn't exist.");
2022  return nullptr;
2023  }
2024 
2025  TypePtr type = db->getType(typeId);
2026  if (type == nullptr) {
2027  this->logger->writeLn("PDBStorage: Type doesn't exist.");
2028  return nullptr;
2029  }
2030 
2031  SetPtr set = type->getSet(setId);
2032  return set;
2033 }
2034 
2040  // get the number of threads to start
2041  int numThreads = this->dataRootPaths.size();
2042  PDB_COUT << "number of partitions:" << numThreads << "\n";
2043  int i;
2044  PDBFlushConsumerWorkPtr flusher;
2045  PDBWorkerPtr worker;
2046  for (i = 0; i < numThreads; i++) {
2047  // create a flush worker
2048  flusher = make_shared<PDBFlushConsumerWork>(i, this);
2049  flushers.push_back(flusher);
2050  // find a thread in thread pool, if we can not find a thread, we block.
2051  while ((worker = this->getWorker()) == nullptr) {
2052  sched_yield();
2053  }
2054  worker->execute(flusher, flusher->getLinkedBuzzer());
2055  PDB_COUT << "flushing thread started for partition: " << i << "\n";
2056  }
2057 }
2058 
2063 
2064  unsigned int i;
2065  for (i = 0; i < this->flushers.size(); i++) {
2066  dynamic_pointer_cast<PDBFlushConsumerWork>(flushers.at(i))->stop();
2067  }
2068  this->flushBuffer->close();
2069 }
2070 
2075  return this->workers->getWorker();
2076 }
2077 
2082  return this->flushBuffer;
2083 }
2084 
2085 using namespace boost::filesystem;
2086 
2092 bool PangeaStorageServer::initializeFromRootDirs(string metaRootPath, vector<string> dataRootPath) {
2093  FileType curFileType;
2094  path root;
2095  if (metaRootPath.compare("") == 0) {
2096  // This is a SequenceFile instance
2097  curFileType = FileType::SequenceFileType;
2098  // Then there is only one root directory,
2099  // and we only check dataRootPath.at(0), all other data paths will be ignored!
2100  root = path(dataRootPath.at(0));
2101  } else {
2102  // This is a PartitionedFile instance
2103  curFileType = FileType::PartitionedFileType;
2104  // Then there is only one root directory,
2105  // and we only check dataRootPath.at(0), all other data paths will be ignored!
2106  root = path(metaRootPath);
2107  }
2108  if (exists(root)) {
2109  if (is_directory(root)) {
2110  vector<path> dbDirs;
2111  copy(directory_iterator(root), directory_iterator(), back_inserter(dbDirs));
2112  vector<path>::iterator iter;
2113  std::string path;
2114  std::string dirName;
2115  std::string name;
2116  std::string strId;
2117  DatabaseID dbId;
2118  DatabaseID maxDbId = 0;
2119  for (iter = dbDirs.begin(); iter != dbDirs.end(); iter++) {
2120  if (is_directory(*iter)) {
2121  // find a database
2122  path = std::string(iter->c_str());
2123 
2124  // get the directory name
2125  dirName = path.substr(path.find_last_of('/') + 1, path.length() - 1);
2126 
2127  // parse database id from directory name
2128  strId = dirName.substr(0, dirName.find('_'));
2129  dbId = stoul(strId);
2130  if (maxDbId < dbId) {
2131  maxDbId = dbId;
2132  }
2133  // parse database name from directory name
2134  name = dirName.substr(dirName.find('_') + 1, dirName.length() - 1);
2135 
2136 
2137  // initialize the database instance based on existing data stored in this
2138  // directory.
2139  if (curFileType == FileType::SequenceFileType) {
2140  this->addDatabaseBySequenceFiles(name, dbId, path);
2141  } else {
2142  this->addDatabaseByPartitionedFiles(name, dbId, path);
2143  }
2144  this->databaseSeqId.initialize(maxDbId);
2145  } else {
2146  // Meet a problem when trying to recover database instance from existing data.
2147  // Because database directory doesn't exist.
2148  return false;
2149  }
2150  }
2151  } else {
2152  // we can't recover database instances from existing data, because root directory
2153  // doesn't exist.
2154  return false;
2155  }
2156  } else {
2157  // we can't recover database instances from existing data, because root directory doesn't
2158  // exist.
2159  return false;
2160  }
2161 
2162  return true;
2163 }
2164 
2165 
2166 // add database based on sequence file
2167 void PangeaStorageServer::addDatabaseBySequenceFiles(string dbName, DatabaseID dbId, path dbPath) {
2168  if (this->dbs->find(dbId) != this->dbs->end()) {
2169  this->logger->writeLn("PDBStorage: database exists.");
2170  return;
2171  }
2172  // create a database instance
2173  vector<string>* dataDBPaths = new vector<string>();
2174  dataDBPaths->push_back(std::string(dbPath.c_str()));
2175  DefaultDatabasePtr db = make_shared<DefaultDatabase>(this->nodeId,
2176  dbId,
2177  dbName,
2178  this->conf,
2179  this->logger,
2180  this->shm,
2181  "",
2182  dataDBPaths,
2183  this->cache,
2184  this->flushBuffer);
2185  if (db == nullptr) {
2186  this->logger->writeLn("PDBStorage: Out of Memory.");
2187  std::cout << "FATAL ERROR: PDBStorage Out of Memory" << std::endl;
2188  exit(1);
2189  }
2190  // initialize it
2191  db->initializeFromDBDir(dbPath);
2192  // add it to map
2193  pthread_mutex_lock(&this->databaseLock);
2194  this->dbs->insert(pair<DatabaseID, DefaultDatabasePtr>(dbId, db));
2195  this->name2id->insert(pair<string, DatabaseID>(dbName, dbId));
2196  pthread_mutex_unlock(&this->databaseLock);
2197 }
2198 
2203  DatabaseID dbId,
2204  path metaDBPath) {
2205  if (this->dbs->find(dbId) != this->dbs->end()) {
2206  this->logger->writeLn("PDBStorage: database exists.");
2207  return;
2208  }
2209  // create a database instance
2210  vector<string>* dataDBPaths = new vector<string>();
2211  string dataDBPath;
2212  unsigned int i;
2213  for (i = 0; i < dataRootPaths.size(); i++) {
2214  dataDBPath = this->encodeDBPath(this->dataRootPaths.at(i), dbId, dbName);
2215  dataDBPaths->push_back(dataDBPath);
2216  }
2217  DefaultDatabasePtr db = make_shared<DefaultDatabase>(this->nodeId,
2218  dbId,
2219  dbName,
2220  this->conf,
2221  this->logger,
2222  this->shm,
2223  string(metaDBPath.c_str()),
2224  dataDBPaths,
2225  this->cache,
2226  this->flushBuffer);
2227  if (db == nullptr) {
2228  this->logger->writeLn("PDBStorage: Out of Memory.");
2229  std::cout << "FATAL ERROR: PDBStorage Out of Memory" << std::endl;
2230  exit(-1);
2231  }
2232  // initialize it
2233  db->initializeFromMetaDBDir(metaDBPath);
2234  // add it to map
2235  pthread_mutex_lock(&this->databaseLock);
2236  this->dbs->insert(pair<DatabaseID, DefaultDatabasePtr>(dbId, db));
2237  this->name2id->insert(pair<string, DatabaseID>(dbName, dbId));
2238  pthread_mutex_unlock(&this->databaseLock);
2239 
2240  std::map<UserTypeID, TypePtr>* types = db->getTypes();
2241  std::map<UserTypeID, TypePtr>::iterator typeIter;
2242 
2243  // to update the sequence generator
2244  SetID maxSetId = 0;
2245  for (typeIter = types->begin(); typeIter != types->end(); typeIter++) {
2246  UserTypeID typeId = typeIter->first;
2247  TypePtr type = typeIter->second;
2248  std::string typeName = type->getName();
2249  pthread_mutex_lock(&this->typeLock);
2250  this->typename2id->insert(std::pair<std::string, UserTypeID>(typeName, typeId));
2251  pthread_mutex_unlock(&this->typeLock);
2252  std::map<SetID, SetPtr>* sets = type->getSets();
2253  std::map<SetID, SetPtr>::iterator setIter;
2254  for (setIter = sets->begin(); setIter != sets->end(); setIter++) {
2255  SetID setId = setIter->first;
2256  if (maxSetId <= setId) {
2257  maxSetId = setId + 1;
2258  }
2259  SetPtr set = setIter->second;
2260  PDB_COUT << "Loaded existing set with database: " << dbName << ", type: " << typeName
2261  << ", set: " << set->getSetName() << std::endl;
2262  pthread_mutex_lock(&this->usersetLock);
2263  this->userSets->insert(std::pair<std::pair<DatabaseID, SetID>, SetPtr>(
2264  std::pair<DatabaseID, SetID>(dbId, setId), set));
2265  this->names2ids->insert(
2266  std::pair<std::pair<std::string, std::string>, std::pair<DatabaseID, SetID>>(
2267  std::pair<std::string, std::string>(dbName, set->getSetName()),
2268  std::pair<DatabaseID, SetID>(dbId, setId)));
2269  pthread_mutex_unlock(&this->usersetLock);
2270  }
2271  }
2272  SequenceID* seqId = new SequenceID();
2273  seqId->initialize(maxSetId);
2274  this->usersetSeqIds->insert(std::pair<std::string, SequenceID*>(dbName, seqId));
2275 }
2276 
2278  return this->logger;
2279 }
2280 
2282  return this->conf;
2283 }
2284 
2286  return this->shm;
2287 }
2288 
2290  return this->cache;
2291 }
2292 
2293 // return whether the PangeaStorageServer instance is running standalone or in cluster mode.
2295  return this->standalone;
2296 }
2297 }
2298 
2299 
2300 #endif
std::map< std::string, UserTypeID > * typename2id
std::map< SetID, TempSetPtr > * tempSets
DefaultDatabasePtr getDatabase(DatabaseID dbId)
shared_ptr< TempSet > TempSetPtr
Definition: TempSet.h:30
SetID setId
Definition: DataTypes.h:87
unsigned int SetID
Definition: DataTypes.h:31
shared_ptr< PDBPage > PDBPagePtr
Definition: PDBPage.h:32
std::map< pair< std::string, std::string >, std::vector< Record< Vector< Handle< Object > > > * > > allRecords
unsigned int pageSeqInPartition
Definition: DataTypes.h:99
bool addSet(std::string dbName, std::string typeName, std::string setName, SetID setId, size_t pageSize=DEFAULT_PAGE_SIZE)
shared_ptr< UserType > TypePtr
Definition: UserType.h:41
Handle< ObjType > getRootObject()
Definition: Record.cc:46
FileType
Definition: DataTypes.h:72
bool removeType(std::string typeName)
std::vector< std::string > dataTempPaths
shared_ptr< PageCache > PageCachePtr
Definition: PageCache.h:39
bool exportToFile(std::string dbName, std::string setName, std::string path, std::string format, std::string &errMsg)
void addDatabaseBySequenceFiles(string dbName, DatabaseID dbId, boost::filesystem::path dbPath)
bool removeTypeFromDatabase(std::string dbName, std::string typeName)
shared_ptr< DefaultDatabase > DefaultDatabasePtr
DatabaseID dbId
Definition: DataTypes.h:85
TempSetPtr getTempSet(SetID setId)
shared_ptr< PageIteratorInterface > PageIteratorPtr
Definition: PageIterator.h:33
std::map< std::string, SequenceID * > * usersetSeqIds
shared_ptr< PDBScanWork > PDBScanWorkPtr
Definition: PDBScanWork.h:28
unsigned int NodeID
Definition: DataTypes.h:27
void writeBackRecords(pair< std::string, std::string > databaseAndSet, bool flushOrNot=true, bool directPutOrNot=false)
void clearDB(DatabaseID dbId, string dbName)
shared_ptr< PDBFlushConsumerWork > PDBFlushConsumerWorkPtr
#define FLUSH_BUFFER_SIZE
PDBPagePtr getNewPage(pair< std::string, std::string > databaseAndSet)
std::map< DatabaseID, DefaultDatabasePtr > * dbs
bool initializeFromRootDirs(string metaRootPath, vector< string > dataRootPath)
bool removeDatabase(std::string dbName)
shared_ptr< PartitionedFile > PartitionedFilePtr
shared_ptr< SharedMem > SharedMemPtr
Definition: SharedMem.h:32
std::map< std::string, DatabaseID > * name2id
Definition: DataTypes.h:52
std::map< std::pair< DatabaseID, SetID >, SetPtr > * userSets
bool addDatabase(std::string dbName, DatabaseID dbId)
unsigned int DatabaseID
Definition: DataTypes.h:29
Record< ObjType > * getRecord(Handle< ObjType > &forMe)
std::shared_ptr< PDBCommunicator > PDBCommunicatorPtr
unsigned int PageID
Definition: DataTypes.h:26
FilePartitionID partitionId
Definition: DataTypes.h:98
string encodeDBPath(string rootPath, DatabaseID dbId, string dbName)
void initialize(unsigned int currentID)
Definition: SequenceID.h:39
PageCircularBufferPtr flushBuffer
PageID pageId
Definition: DataTypes.h:88
std::map< std::string, SetID > * name2tempSetId
std::vector< std::string > dataRootPaths
size_t bufferRecord(pair< std::string, std::string > databaseAndSet, Record< Vector< Handle< Object >>> *addMe)
size_t numBytes()
Definition: Record.cc:36
shared_ptr< PDBBuzzer > PDBBuzzerPtr
Definition: PDBBuzzer.h:32
shared_ptr< PDBWorkerQueue > PDBWorkerQueuePtr
#define PDB_COUT
Definition: PDBDebug.h:31
bool addTempSet(std::string setName, SetID &setId, size_t pageSize=DEFAULT_PAGE_SIZE)
shared_ptr< Configuration > ConfigurationPtr
Definition: Configuration.h:89
shared_ptr< PDBWorker > PDBWorkerPtr
Definition: PDBWorker.h:40
void addDatabaseByPartitionedFiles(string dbName, DatabaseID dbId, boost::filesystem::path dbMetaPath)
void registerHandler(int16_t typeID, PDBCommWorkPtr handledBy)
Definition: PDBServer.cc:81
SetPtr getSet(std::pair< std::string, std::string > databaseAndSet)
bool exportToHDFSFile(std::string dbName, std::string setName, std::string hdfsNameNodeIp, int hdfsNameNodePort, std::string path, std::string format, std::string &errMsg)
static int16_t getIDByName(std::string objectName, bool withLock=true)
Definition: VTableMap.cc:90
std::map< std::pair< std::string, std::string >, std::pair< DatabaseID, SetID > > * names2ids
unsigned int getNextSequenceID()
Definition: SequenceID.h:43
PDBAlarm
Definition: PDBAlarm.h:28
void registerHandlers(PDBServer &forMe) override
bool removeSet(std::string dbName, std::string typeName, std::string setName)
shared_ptr< PartitionedFileMetaData > PartitionedFileMetaDataPtr
std::shared_ptr< PDBLogger > PDBLoggerPtr
Definition: PDBLogger.h:40
PageCircularBufferPtr getFlushBuffer()
shared_ptr< UserSet > SetPtr
Definition: UserSet.h:36
std::map< pair< std::string, std::string >, size_t > sizes
bool addType(std::string typeName, UserTypeID typeId)
shared_ptr< PageCircularBuffer > PageCircularBufferPtr
Definition: DataTypes.h:57
UserTypeID typeId
Definition: DataTypes.h:86
PangeaStorageServer(SharedMemPtr shm, PDBWorkerQueuePtr workers, PDBLoggerPtr logger, ConfigurationPtr conf, bool standalone=true)
std::vector< PDBWorkPtr > flushers
unsigned int UserTypeID
Definition: DataTypes.h:25