A platform for high-performance distributed tool and library development written in C++. It can be deployed in two different cluster modes: standalone or distributed. API for v0.5.0, released on June 13, 2018.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
UserType.cc
Go to the documentation of this file.
1 /*****************************************************************************
2  * *
3  * Copyright 2018 Rice University *
4  * *
5  * Licensed under the Apache License, Version 2.0 (the "License"); *
6  * you may not use this file except in compliance with the License. *
7  * You may obtain a copy of the License at *
8  * *
9  * http://www.apache.org/licenses/LICENSE-2.0 *
10  * *
11  * Unless required by applicable law or agreed to in writing, software *
12  * distributed under the License is distributed on an "AS IS" BASIS, *
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. *
14  * See the License for the specific language governing permissions and *
15  * limitations under the License. *
16  * *
17  *****************************************************************************/
18 #include "UserType.h"
19 #include "PartitionedFile.h"
20 
25  DatabaseID dbId,
26  UserTypeID id,
27  string name,
28  ConfigurationPtr conf,
29  pdb::PDBLoggerPtr logger,
30  SharedMemPtr shm,
31  string metaTypePath,
32  vector<string>* dataTypePaths,
33  PageCachePtr cache,
34  PageCircularBufferPtr flushBuffer) {
35  this->name = name;
36  this->nodeId = nodeId;
37  this->dbId = dbId;
38  this->id = id;
39  this->conf = conf;
40  this->logger = logger;
41  this->shm = shm;
42  sets = new map<SetID, SetPtr>();
43  pthread_mutex_init(&setLock, nullptr);
44  this->dataPaths = dataTypePaths;
45  unsigned int i;
46 
47  this->metaPath = metaTypePath;
48  if (this->metaPath.compare("") != 0) {
49  this->conf->createDir(this->metaPath);
50  }
51  for (i = 0; i < this->dataPaths->size(); i++) {
52  string dataPath = this->dataPaths->at(i);
53  this->conf->createDir(dataPath);
54  }
55  numSets = 0;
56  this->cache = cache;
57  this->flushBuffer = flushBuffer;
58 }
59 
65  if (sets != nullptr) {
66  delete sets;
67  }
68  if (dataPaths != nullptr) {
69  delete dataPaths;
70  }
71 }
72 
76 // Now all the flush is managed by PageCache class, so we remove flush here
77 
78 
80 }
84 string UserType::encodePath(string typePath, SetID setId, string setName) {
85  char buffer[500];
86  sprintf(buffer, "%s/%d_%s", typePath.c_str(), setId, setName.c_str());
87  return string(buffer);
88 }
89 
90 // add new set
91 // Not thread-safe
92 int UserType::addSet(string setName, SetID setId, size_t pageSize) {
93  if (this->sets->find(setId) != this->sets->end()) {
94  this->logger->writeLn("UserType: set exists.");
95  return -1;
96  }
97  string typePath;
98  PartitionedFilePtr file;
99  string metaFilePath = this->encodePath(this->metaPath, setId, setName);
100  vector<string> dataFilePaths;
101  unsigned int i;
102  for (i = 0; i < this->dataPaths->size(); i++) {
103  dataFilePaths.push_back(this->encodePath(this->dataPaths->at(i), setId, setName));
104  }
105  file = make_shared<PartitionedFile>(this->nodeId,
106  this->dbId,
107  this->id,
108  setId,
109  metaFilePath,
110  dataFilePaths,
111  this->logger,
112  pageSize);
113 
114  SetPtr set = make_shared<UserSet>(
115  pageSize, logger, shm, nodeId, dbId, id, setId, setName, file, this->cache);
116  if (set == 0) {
117  this->logger->writeLn("UserType: Out of Memory.");
118  return -1;
119  }
120  this->logger->writeLn("UserType: set added.");
121  pthread_mutex_lock(&setLock);
122  this->sets->insert(pair<SetID, SetPtr>(setId, set));
123  pthread_mutex_unlock(&setLock);
124  this->numSets++;
125  return 0;
126 }
127 
128 // Remove an existing set.
129 // If successful, return 0.
130 // Otherwise, e.g. the set doesn't exist, return -1.
132  map<SetID, SetPtr>::iterator setIter;
133  if ((setIter = this->sets->find(setId)) != this->sets->end()) {
134  this->logger->writeLn("UserType: removing input buffer for set:");
135  this->logger->writeInt(setId);
136  this->logger->writeLn("\n");
137  setIter->second->getFile()->clear();
138  pthread_mutex_lock(&setLock);
139  this->sets->erase(setIter);
140  pthread_mutex_unlock(&setLock);
141  this->logger->writeLn("UserType: set is removed.");
142  this->numSets--;
143  return 0;
144  }
145  this->logger->writeLn("UserType: set doesn't exist.");
146  return -1;
147 }
148 
149 // Return the specified set that is belonging to this type instance.
151  map<SetID, SetPtr>::iterator setIter;
152  if ((setIter = sets->find(setId)) != sets->end()) {
153  return setIter->second;
154  } else {
155  return nullptr;
156  }
157 }
158 
159 using namespace boost::filesystem;
160 
161 // Initialize type instance based on disk dirs and files.
162 // This function is only used for PartitionedFile instances.
163 bool UserType::initializeFromMetaTypeDir(path metaTypeDir) {
164  // traverse all set files in type directory
165  // for each set file, invoke addSet to initialize PDBFile object and CircularInputBuffer
166  if (exists(metaTypeDir)) {
167  if (is_directory(metaTypeDir)) {
168  vector<path> metaSetFiles;
169  copy(
170  directory_iterator(metaTypeDir), directory_iterator(), back_inserter(metaSetFiles));
171  vector<path>::iterator iter;
172  std::string path;
173  std::string dirName;
174  std::string name;
175  SetID setId;
176  for (iter = metaSetFiles.begin(); iter != metaSetFiles.end(); iter++) {
177  if (is_regular_file(*iter)) {
178  // find a set
179  path = std::string(iter->c_str());
180  dirName = path.substr(path.find_last_of('/') + 1, path.length() - 1);
181  // parse set name
182  name = dirName.substr(dirName.find('_') + 1, dirName.length() - 1);
183  // parse set id
184  setId = stoul(dirName.substr(0, dirName.find('_')));
185 
186  // check whether set exists
187  if (this->sets->find(setId) != this->sets->end()) {
188  this->logger->writeLn("UserType: set exists.");
189  return false;
190  }
191 
192  // create PartitionedFile instance
193  PartitionedFilePtr partitionedFile = make_shared<PartitionedFile>(
194  this->nodeId, this->dbId, this->id, setId, path, this->logger);
195 
196  partitionedFile->buildMetaDataFromMetaPartition(nullptr);
197  partitionedFile->initializeDataFiles();
198  partitionedFile->openData();
199  // create a Set instance from file
200  SetPtr set = make_shared<UserSet>(partitionedFile->getPageSize(),
201  logger,
202  this->shm,
203  nodeId,
204  dbId,
205  id,
206  setId,
207  name,
208  partitionedFile,
209  this->cache);
210  // add buffer to map
211  if (set == 0) {
212  this->logger->error("Fatal Error: UserType: out of memory.");
213  exit(1);
214  }
215  this->sets->insert(pair<SetID, SetPtr>(setId, set));
216  this->numSets++;
217  this->logger->writeLn("UserType: set added.");
218  }
219  }
220  } else {
221  return false;
222  }
223  } else {
224  return false;
225  }
226  return true;
227 }
228 
229 // Initialize type instance based on disk dirs and files.
230 // This function is used for importing sets from SequenceFile instances.
231 // This function is mainly to provide backward compatibility for SequenceFile instances.
233 
234  return false;
235 }
void setId(UserTypeID id)
Definition: UserType.h:111
int addSet(string setName, SetID setId, size_t pageSize=DEFAULT_PAGE_SIZE)
Definition: UserType.cc:92
unsigned int SetID
Definition: DataTypes.h:31
UserType(NodeID nodeId, DatabaseID dbId, UserTypeID id, string name, ConfigurationPtr conf, pdb::PDBLoggerPtr logger, SharedMemPtr shm, string metaTypePath, vector< string > *dataTypePaths, PageCachePtr cache, PageCircularBufferPtr flushBuffer)
Definition: UserType.cc:24
NodeID nodeId
Definition: UserType.h:139
string name
Definition: UserType.h:137
ConfigurationPtr conf
Definition: UserType.h:143
shared_ptr< PageCache > PageCachePtr
Definition: PageCache.h:39
PageCachePtr cache
Definition: UserType.h:148
~UserType()
Definition: UserType.cc:64
unsigned int NodeID
Definition: DataTypes.h:27
map< SetID, SetPtr > * sets
Definition: UserType.h:142
string metaPath
Definition: UserType.h:146
SetPtr getSet(SetID setId)
Definition: UserType.cc:150
vector< string > * dataPaths
Definition: UserType.h:147
bool initializeFromTypeDir(path typeDir)
Definition: UserType.cc:232
shared_ptr< PartitionedFile > PartitionedFilePtr
shared_ptr< SharedMem > SharedMemPtr
Definition: SharedMem.h:32
bool initializeFromMetaTypeDir(path metaTypeDir)
Definition: UserType.cc:163
int numSets
Definition: UserType.h:141
unsigned int DatabaseID
Definition: DataTypes.h:29
void setName(string name)
Definition: UserType.h:116
UserTypeID id
Definition: UserType.h:138
shared_ptr< Configuration > ConfigurationPtr
Definition: Configuration.h:89
SharedMemPtr shm
Definition: UserType.h:145
DatabaseID dbId
Definition: UserType.h:140
std::shared_ptr< PDBLogger > PDBLoggerPtr
Definition: PDBLogger.h:40
void flush()
Definition: UserType.cc:79
int removeSet(SetID setId)
Definition: UserType.cc:131
string encodePath(string typePath, SetID setId, string setName)
Definition: UserType.cc:84
shared_ptr< UserSet > SetPtr
Definition: UserSet.h:36
PageCircularBufferPtr flushBuffer
Definition: UserType.h:149
pthread_mutex_t setLock
Definition: UserType.h:150
pdb::PDBLoggerPtr logger
Definition: UserType.h:144
shared_ptr< PageCircularBuffer > PageCircularBufferPtr
unsigned int UserTypeID
Definition: DataTypes.h:25