A platform for high-performance distributed tool and library development written in C++. It can be deployed in two different cluster modes: standalone or distributed. API for v0.5.0, released on June 13, 2018.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
PangeaStorageServer.h
Go to the documentation of this file.
1 /*****************************************************************************
2  * *
3  * Copyright 2018 Rice University *
4  * *
5  * Licensed under the Apache License, Version 2.0 (the "License"); *
6  * you may not use this file except in compliance with the License. *
7  * You may obtain a copy of the License at *
8  * *
9  * http://www.apache.org/licenses/LICENSE-2.0 *
10  * *
11  * Unless required by applicable law or agreed to in writing, software *
12  * distributed under the License is distributed on an "AS IS" BASIS, *
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. *
14  * See the License for the specific language governing permissions and *
15  * limitations under the License. *
16  * *
17  *****************************************************************************/
18 
19 #ifndef PANGEA_STORAGE_SERVER_H
20 #define PANGEA_STORAGE_SERVER_H
21 
22 
23 
24 #include "ServerFunctionality.h"
25 #include "PDBServer.h"
26 #include "Record.h"
27 #include "PDBVector.h"
28 #include "Configuration.h"
29 #include "DataTypes.h"
30 #include "PDBLogger.h"
31 #include "DefaultDatabase.h"
32 #include "SharedMem.h"
33 #include "TempSet.h"
34 #include "PDBWork.h"
35 #include <vector>
36 #include <string>
37 #include <map>
38 #include <boost/filesystem.hpp>
39 #include <pthread.h>
40 #include <memory>
41 
42 
43 namespace pdb {
44 
45 
47 typedef std::shared_ptr<PangeaStorageServer> PangeaStorageServerPtr;
48 
49 //this class encapsulates the PangeaStorageServer functionality
50 //It should be installed on each Worker node, and managed by DistributedStorageManager and Dispatcher.
51 //It includes a PageCache as a buffer pool, and a group of PartitionedFile instances to serve as user-level file system.
52 //The organization of user data has two levels: Database and UserSet, and one database can have multiple user sets.
53 
54 //As a local server, it handles user requests for following storage services:
55 //-- StorageCleanup: to force a flush of all pages
56 //-- StorageAddDatabase: to add a database
57 //-- StorageAddSet: to add a user set
58 //-- StorageAddTempSet: to add a temp set, that is transient and not need registration with catalog
59 //-- StorageRemoveDatabase: to remove a database
60 //-- StorageRemoveUserSet: to remove a user set
61 //-- StorageClearSet: to remove data from a set
62 //-- StorageRemoveTempSet: to remove a temp set
63 //-- StorageAddObjectInLoop: to add large objects in loop, and add one large object each time
64 //-- StorageAddData: to add a Vector<Object> to a set
65 //-- StorageGetData: to return data from a set
66 //-- StorageCollectStats: to collect stats for this local storage
67 //-- StoragePinPage: to pin a page in one set
68 //-- StoragePinBytes: to pin bytes of specified length in one set
69 //-- StorageUnpinPage: to unpin a page from one set
70 //-- StorageGetSetPages: to trigger a parallel scan over a set
71 
72 
73 
74 
75 
77 
78 public:
79  // creates a storage server, putting all data in the indicated directory.
80  // pages can be dynamically allocated from the specified shared memory pool.
85  bool standalone = true);
86 
87 
88  /*****************To Comply with Chris' Interfaces***************************/
89 
90 
91  // takes all of the currently buffered records for the given database/set pair,
92  // creates at most one page of data with them, and then writes that page to storage
93  void writeBackRecords(pair<std::string, std::string> databaseAndSet,
94  bool flushOrNot = true,
95  bool directPutOrNot = false);
96 
97  // this allocates a new page at the end of the indicated database/set combo
98  PDBPagePtr getNewPage(pair<std::string, std::string> databaseAndSet);
99 
100  // returns a set object referencing the given database/set pair
101  SetPtr getSet(std::pair<std::string, std::string> databaseAndSet);
102 
103 
104  // from the ServerFunctionality interface... registers the StorageServer's handlers
105  void registerHandlers(PDBServer& forMe) override;
106 
107  // stores a record---we'll keep buffering records until we get enough of them that
108  // we can put them together into a page. The return value is the total size of
109  // all of the records that we are buffering for this database and set
110  size_t bufferRecord(pair<std::string, std::string> databaseAndSet,
111  Record<Vector<Handle<Object>>>* addMe);
112 
113  // destructor
115 
116 
117  /****************Pangea Interfaces******************************************/
118 
122  string getServerName();
123 
127  NodeID getNodeId();
128 
132  string getPathToBackEndServer();
133 
138 
143 
148 
153 
158 
162  bool addDatabase(std::string dbName, DatabaseID dbId);
163 
167  bool addDatabase(std::string dbName);
168 
169 
173  bool removeDatabase(std::string dbName);
174 
175 
179  bool addType(std::string typeName, UserTypeID typeId);
180 
184  bool removeTypeFromDatabase(std::string dbName, std::string typeName);
185 
189  bool removeType(std::string typeName);
190 
194  bool addSet(std::string dbName,
195  std::string typeName,
196  std::string setName,
197  SetID setId,
198  size_t pageSize = DEFAULT_PAGE_SIZE);
199 
200 
204  bool addSet(std::string dbName,
205  std::string typeName,
206  std::string setName,
207  size_t pageSize = DEFAULT_PAGE_SIZE);
208 
212  bool addSet(std::string dbName, std::string setName, size_t pageSize = DEFAULT_PAGE_SIZE);
213 
214 
218  bool removeSet(std::string dbName, std::string typeName, std::string setName);
219 
223  bool removeSet(std::string dbName, std::string setName);
224 
225 
231 
232 
236  bool addTempSet(std::string setName, SetID& setId, size_t pageSize = DEFAULT_PAGE_SIZE);
237 
238 
242  bool removeTempSet(SetID setId);
243 
244 
248  TempSetPtr getTempSet(SetID setId);
249 
250 
254  SetPtr getSet(DatabaseID dbId, UserTypeID typeId, SetID setId);
255 
256 
262 
263 
268 
269 
274 
278  bool isStandalone();
279 
280 
281  // cleaner to be invoked in destruct
282 
283  void cleanup(bool flushOrNot = true);
284 
285  // export to a local file
286  bool exportToFile(std::string dbName,
287  std::string setName,
288  std::string path,
289  std::string format,
290  std::string& errMsg);
291 
292  // export to a HDFS partition
293  bool exportToHDFSFile(std::string dbName,
294  std::string setName,
295  std::string hdfsNameNodeIp,
296  int hdfsNameNodePort,
297  std::string path,
298  std::string format,
299  std::string& errMsg);
300 
301 protected:
305  string encodeDBPath(string rootPath, DatabaseID dbId, string dbName);
306 
307 
311  void createTempDirs();
312 
313 
317  void createRootDirs();
318 
319 
323  bool initializeFromRootDirs(string metaRootPath, vector<string> dataRootPath);
324 
325 
329  void clearDB(DatabaseID dbId, string dbName);
330 
331 
335  void addDatabaseBySequenceFiles(string dbName, DatabaseID dbId, boost::filesystem::path dbPath);
336 
337 
341  void addDatabaseByPartitionedFiles(string dbName,
342  DatabaseID dbId,
343  boost::filesystem::path dbMetaPath);
344 
345 
346 private:
347  // Mapping DatabaseID and SetID to a Set instance
348  std::map<std::pair<DatabaseID, SetID>, SetPtr>* userSets;
349 
350  // Mapping Database name and Set name to DatabaseID and SetID
351  std::map<std::pair<std::string, std::string>, std::pair<DatabaseID, SetID>>* names2ids;
352 
353  // Mapping a user type name to UserTypeID
354  std::map<std::string, UserTypeID>* typename2id;
355 
356  // Mapping SetID to a TempSet instance
357  std::map<SetID, TempSetPtr>* tempSets;
358 
359  // Mapping TempSet name to ID
360  std::map<std::string, SetID>* name2tempSetId;
361 
362  // Log instance
364 
365  // Configuration object
367 
368  // the name of the server
369  std::string serverName;
370 
371  // the id of the server
373 
374  // the instance to shared memory manager
376 
377  // the path for storing TempSet metadata
378  std::string metaTempPath;
379 
380  // the path for storing TempSet data
381  std::vector<std::string> dataTempPaths;
382 
383  // the path for storing UserSet metadata
384  std::string metaRootPath;
385 
386  // the path for storing UserSet data
387  std::vector<std::string> dataRootPaths;
388 
389  // Pointer to page cache
391 
392  // Path to backend server
394 
395  // mutex for managing database
396  pthread_mutex_t databaseLock;
397 
398  // mutex for managing type
399  pthread_mutex_t typeLock;
400 
401  // mutex for managing set
402  pthread_mutex_t usersetLock;
403 
404  // mutex for managing tempset
405  pthread_mutex_t tempsetLock;
406 
407  // SequenceID for adding temp set
409 
410  // SequenceID for adding database
412 
413  // SequenceID for adding user set
414  std::map<std::string, SequenceID*>* usersetSeqIds;
415 
416  // Thread Pool for starting flushing threads
418 
419  // the flush buffer connecting producers and consumers
421 
422  // The vector of flush threads
423  std::vector<PDBWorkPtr> flushers;
424 
425  /****** for distribution *******************************/
426 
427 private:
428  bool standalone = true;
429 
430 
431  /******* to comply with Chris' interfaces ***************/
432 
433 private:
434  // this stores the set of all records that we are buffering
435  std::map<pair<std::string, std::string>, std::vector<Record<Vector<Handle<Object>>>*>>
437 
438  // this stores the total sizes of all lists of records that we are buffering
439  std::map<pair<std::string, std::string>, size_t> sizes;
440 
441  /******* backward compliance ***************************/
442 
443 private:
444  // this stores the set of databases
445  std::map<DatabaseID, DefaultDatabasePtr>* dbs;
446 
447  // this stores the mapping from Database name to DatabaseID
448  std::map<std::string, DatabaseID>* name2id;
449 
451 
452  pthread_mutex_t workingMutex;
453  pthread_mutex_t counterMutex;
455 };
456 }
457 
458 #endif
std::map< std::string, UserTypeID > * typename2id
std::map< SetID, TempSetPtr > * tempSets
DefaultDatabasePtr getDatabase(DatabaseID dbId)
shared_ptr< TempSet > TempSetPtr
Definition: TempSet.h:30
unsigned int SetID
Definition: DataTypes.h:31
shared_ptr< PDBPage > PDBPagePtr
Definition: PDBPage.h:32
std::map< pair< std::string, std::string >, std::vector< Record< Vector< Handle< Object > > > * > > allRecords
bool addSet(std::string dbName, std::string typeName, std::string setName, SetID setId, size_t pageSize=DEFAULT_PAGE_SIZE)
bool removeType(std::string typeName)
std::vector< std::string > dataTempPaths
shared_ptr< PageCache > PageCachePtr
Definition: PageCache.h:39
bool exportToFile(std::string dbName, std::string setName, std::string path, std::string format, std::string &errMsg)
std::shared_ptr< PangeaStorageServer > PangeaStorageServerPtr
void addDatabaseBySequenceFiles(string dbName, DatabaseID dbId, boost::filesystem::path dbPath)
bool removeTypeFromDatabase(std::string dbName, std::string typeName)
shared_ptr< DefaultDatabase > DefaultDatabasePtr
TempSetPtr getTempSet(SetID setId)
std::map< std::string, SequenceID * > * usersetSeqIds
unsigned int NodeID
Definition: DataTypes.h:27
void writeBackRecords(pair< std::string, std::string > databaseAndSet, bool flushOrNot=true, bool directPutOrNot=false)
void clearDB(DatabaseID dbId, string dbName)
PDBPagePtr getNewPage(pair< std::string, std::string > databaseAndSet)
std::map< DatabaseID, DefaultDatabasePtr > * dbs
bool initializeFromRootDirs(string metaRootPath, vector< string > dataRootPath)
bool removeDatabase(std::string dbName)
shared_ptr< SharedMem > SharedMemPtr
Definition: SharedMem.h:32
std::map< std::string, DatabaseID > * name2id
std::map< std::pair< DatabaseID, SetID >, SetPtr > * userSets
bool addDatabase(std::string dbName, DatabaseID dbId)
unsigned int DatabaseID
Definition: DataTypes.h:29
string encodeDBPath(string rootPath, DatabaseID dbId, string dbName)
PageCircularBufferPtr flushBuffer
std::map< std::string, SetID > * name2tempSetId
std::vector< std::string > dataRootPaths
size_t bufferRecord(pair< std::string, std::string > databaseAndSet, Record< Vector< Handle< Object >>> *addMe)
shared_ptr< PDBWorkerQueue > PDBWorkerQueuePtr
bool addTempSet(std::string setName, SetID &setId, size_t pageSize=DEFAULT_PAGE_SIZE)
shared_ptr< Configuration > ConfigurationPtr
Definition: Configuration.h:89
shared_ptr< PDBWorker > PDBWorkerPtr
Definition: PDBWorker.h:40
void addDatabaseByPartitionedFiles(string dbName, DatabaseID dbId, boost::filesystem::path dbMetaPath)
SetPtr getSet(std::pair< std::string, std::string > databaseAndSet)
bool exportToHDFSFile(std::string dbName, std::string setName, std::string hdfsNameNodeIp, int hdfsNameNodePort, std::string path, std::string format, std::string &errMsg)
std::map< std::pair< std::string, std::string >, std::pair< DatabaseID, SetID > > * names2ids
void registerHandlers(PDBServer &forMe) override
bool removeSet(std::string dbName, std::string typeName, std::string setName)
std::shared_ptr< PDBLogger > PDBLoggerPtr
Definition: PDBLogger.h:40
PageCircularBufferPtr getFlushBuffer()
shared_ptr< UserSet > SetPtr
Definition: UserSet.h:36
#define DEFAULT_PAGE_SIZE
Definition: Configuration.h:36
std::map< pair< std::string, std::string >, size_t > sizes
bool addType(std::string typeName, UserTypeID typeId)
shared_ptr< PageCircularBuffer > PageCircularBufferPtr
PangeaStorageServer(SharedMemPtr shm, PDBWorkerQueuePtr workers, PDBLoggerPtr logger, ConfigurationPtr conf, bool standalone=true)
std::vector< PDBWorkPtr > flushers
unsigned int UserTypeID
Definition: DataTypes.h:25