A platform for high-performance distributed tool and library development written in C++. It can be deployed in two different cluster modes: standalone or distributed. API for v0.5.0, released on June 13, 2018.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
UserSet.cc
Go to the documentation of this file.
1 /*****************************************************************************
2  * *
3  * Copyright 2018 Rice University *
4  * *
5  * Licensed under the Apache License, Version 2.0 (the "License"); *
6  * you may not use this file except in compliance with the License. *
7  * You may obtain a copy of the License at *
8  * *
9  * http://www.apache.org/licenses/LICENSE-2.0 *
10  * *
11  * Unless required by applicable law or agreed to in writing, software *
12  * distributed under the License is distributed on an "AS IS" BASIS, *
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. *
14  * See the License for the specific language governing permissions and *
15  * limitations under the License. *
16  * *
17  *****************************************************************************/
18 
19 #ifndef USER_SET_CC
20 #define USER_SET_CC
21 
22 #include "PDBDebug.h"
23 #include "UserSet.h"
24 #include "PartitionPageIterator.h"
25 #include "SetCachePageIterator.h"
26 #include <string>
27 #include <string.h>
32  SharedMemPtr shm,
33  NodeID nodeId,
34  DatabaseID dbId,
35  UserTypeID typeId,
36  SetID setId,
37  string setName,
38  PageCachePtr pageCache,
39  LocalityType localityType,
41  OperationType operation,
42  DurabilityType durability,
43  PersistenceType persistence,
44  size_t pageSize)
45  : LocalitySet(localityType, policy, operation, durability, persistence) {
46  this->pageSize = pageSize;
47  this->logger = logger;
48  this->shm = shm;
49  this->nodeId = nodeId;
50  this->dbId = dbId;
51  this->typeId = typeId;
52  this->setId = setId;
53  this->setName = setName;
54  this->pageCache = pageCache;
55  this->inputBufferPage = nullptr;
56  this->lastFlushedPageId = (unsigned int)(-1);
57  this->dirtyPagesInPageCache = new unordered_map<PageID, FileSearchKey>();
58  pthread_mutex_init(&this->dirtyPageSetMutex, nullptr);
59  pthread_mutex_init(&this->addBytesMutex, nullptr);
60  this->isPinned = false;
61  this->numPages = 0;
62 }
63 
64 
68 UserSet::UserSet(size_t pageSize,
69  pdb::PDBLoggerPtr logger,
70  SharedMemPtr shm,
71  NodeID nodeId,
72  DatabaseID dbId,
73  UserTypeID typeId,
74  SetID setId,
75  string setName,
76  PartitionedFilePtr file,
77  PageCachePtr pageCache,
78  LocalityType localityType,
80  OperationType operation,
81  DurabilityType durability,
82  PersistenceType persistence)
83  : LocalitySet(localityType, policy, operation, durability, persistence) {
84  this->pageSize = pageSize;
85  this->logger = logger;
86  this->shm = shm;
87  this->nodeId = nodeId;
88  this->dbId = dbId;
89  this->typeId = typeId;
90  this->setId = setId;
91  this->setName = setName;
92  this->file = file;
93  if (this->file->getNumFlushedPages() == 0) {
94  this->lastFlushedPageId = (unsigned int)(-1);
95  } else {
96  this->lastFlushedPageId = file->getLastFlushedPageID();
97  this->seqId.initialize(this->lastFlushedPageId + 1);
98  }
99  this->pageCache = pageCache;
100  this->file->openAll();
101  this->inputBufferPage = nullptr;
102  this->dirtyPagesInPageCache = new unordered_map<PageID, FileSearchKey>();
103  pthread_mutex_init(&this->dirtyPageSetMutex, nullptr);
104  pthread_mutex_init(&this->addBytesMutex, nullptr);
105  this->isPinned = false;
106  this->numPages = this->file->getNumFlushedPages();
107  cout << "Number of existing pages = " << this->numPages << endl;
108 }
109 
110 
115  delete this->dirtyPagesInPageCache;
116  pthread_mutex_destroy(&this->dirtyPageSetMutex);
117  pthread_mutex_destroy(&this->addBytesMutex);
118 }
119 
120 
127  unsigned int pageSeqInPartition,
128  PageID pageId) {
129 
135  return this->pageCache->getPage(
136  this->file, partitionId, pageSeqInPartition, pageId, false, this);
137 }
138 
140  PageID pageId = seqId.getNextSequenceID();
141  CacheKey key;
142  key.dbId = this->dbId;
143  key.typeId = this->typeId;
144  key.setId = this->setId;
145  key.pageId = pageId;
146  PDBPagePtr page = this->pageCache->getNewPage(this->nodeId, key, this, this->pageSize);
147  if (page == nullptr) {
148  return nullptr;
149  }
150  page->preparePage();
151  this->addPageToDirtyPageSet(page->getPageID());
152  numPages++;
153  return page;
154 }
155 
156 PDBPagePtr UserSet::addPageByRawBytes(size_t sharedMemOffset) {
157  return nullptr;
158 }
159 
164  return numPages;
165 }
166 
173 vector<PageIteratorPtr>* UserSet::getIterators() {
174 
175  this->cleanDirtyPageSet();
176  this->lockDirtyPageSet();
177  vector<PageIteratorPtr>* retVec = new vector<PageIteratorPtr>();
178  PageIteratorPtr iterator = nullptr;
179  if (dirtyPagesInPageCache->size() > 0) {
180  PDB_COUT << "dirtyPages size=" << dirtyPagesInPageCache->size() << std::endl;
181  iterator = make_shared<SetCachePageIterator>(this->pageCache, this);
182  if (iterator != nullptr) {
183  retVec->push_back(iterator);
184  }
185  }
186  if (this->file->getFileType() == FileType::SequenceFileType) {
187  SequenceFilePtr seqFile = dynamic_pointer_cast<SequenceFile>(this->file);
188  iterator =
189  make_shared<PartitionPageIterator>(this->pageCache, file, (FilePartitionID)0, this);
190  retVec->push_back(iterator);
191  } else {
192  PartitionedFilePtr partitionedFile = dynamic_pointer_cast<PartitionedFile>(this->file);
193  int numPartitions = partitionedFile->getNumPartitions();
194  int i = 0;
195  for (i = 0; i < numPartitions; i++) {
196  if (partitionedFile->getMetaData()->getPartition(i)->getNumPages() > 0) {
197  PDB_COUT << "numpages in partition:" << i << " ="
198  << partitionedFile->getMetaData()->getPartition(i)->getNumPages()
199  << std::endl;
200  iterator = make_shared<PartitionPageIterator>(
201  this->pageCache, file, (FilePartitionID)i, this);
202  retVec->push_back(iterator);
203  }
204  }
205  }
206  this->unlockDirtyPageSet();
207  return retVec;
208 }
209 
210 // user MUST guarantee that the size of buffer is large enough for dumping all data in the set.
211 void UserSet::dump(char* buffer) {
212  setPinned(true);
213  vector<PageIteratorPtr>* iterators = this->getIterators();
214  int numIterators = iterators->size();
215  int i;
216  char* cur = buffer;
217  for (i = 0; i < numIterators; i++) {
218  PageIteratorPtr curIter = iterators->at(i);
219  while (curIter->hasNext()) {
220  PDBPagePtr curPage = curIter->next();
221  if (curPage != nullptr) {
222  memcpy(cur, curPage->getRawBytes(), curPage->getRawSize());
223  cur = cur + curPage->getRawSize();
224  curPage->decRefCount();
225  }
226  }
227  }
228  setPinned(false);
229  delete iterators;
230 }
231 
233  setPinned(true);
234  vector<PageIteratorPtr>* iterators = this->getIterators();
235  int numIterators = iterators->size();
236  int i;
237  for (i = 0; i < numIterators; i++) {
238  PageIteratorPtr curIter = iterators->at(i);
239  while (curIter->hasNext()) {
240  PDBPagePtr curPage = curIter->next();
241  if (curPage != nullptr) {
242  CacheKey key;
243  key.dbId = this->getDbID();
244  key.typeId = this->getTypeID();
245  key.setId = this->getSetID();
246  key.pageId = curPage->getPageID();
247  curPage->resetRefCount();
248  this->pageCache->evictPage(key, false);
249  }
250  }
251  }
252  setPinned(false);
253  delete iterators;
254 }
255 
256 // thread-safe
258  this->lockDirtyPageSet();
259  auto iter = this->getDirtyPageSet()->begin();
260  while (iter != this->getDirtyPageSet()->end()) {
261  if (iter->second.inCache == false) {
262  iter = this->getDirtyPageSet()->erase(iter);
263  } else {
264  iter++;
265  }
266  }
267  this->unlockDirtyPageSet();
268 }
269 
270 
272  auto iter = this->getDirtyPageSet()->begin();
273  while (iter != this->getDirtyPageSet()->end()) {
274  if (iter->second.inCache == true) {
275  CacheKey key;
276  key.dbId = this->getDbID();
277  key.typeId = this->getTypeID();
278  key.setId = this->getSetID();
279  key.pageId = iter->first;
280  this->pageCache->flushPageWithoutEviction(key);
281  } else {
282  iter++;
283  }
284  }
285 }
286 
287 
288 #endif
SetID setId
Definition: DataTypes.h:87
PDBPagePtr addPageByRawBytes(size_t sharedMemOffset)
Definition: UserSet.cc:156
unsigned int SetID
Definition: DataTypes.h:31
shared_ptr< PDBPage > PDBPagePtr
Definition: PDBPage.h:32
virtual vector< PageIteratorPtr > * getIterators()
Definition: UserSet.cc:173
int numPages
Definition: UserSet.h:405
OperationType
Definition: DataTypes.h:57
UserSet(pdb::PDBLoggerPtr logger, SharedMemPtr shm, NodeID nodeId, DatabaseID dbId, UserTypeID typeId, SetID setId, string setName, PageCachePtr pageCache, LocalityType localityType=JobData, LocalitySetReplacementPolicy policy=MRU, OperationType operation=Read, DurabilityType durability=TryCache, PersistenceType persistence=Persistent, size_t pageSize=DEFAULT_PAGE_SIZE)
Definition: UserSet.cc:31
bool isPinned
Definition: UserSet.h:404
shared_ptr< PageCache > PageCachePtr
Definition: PageCache.h:39
DatabaseID dbId
Definition: UserSet.h:394
void evictPages()
Definition: UserSet.cc:232
DatabaseID dbId
Definition: DataTypes.h:85
SharedMemPtr shm
Definition: UserSet.h:391
void dump(char *buffer)
Definition: UserSet.cc:211
shared_ptr< PageIteratorInterface > PageIteratorPtr
Definition: PageIterator.h:33
unsigned int NodeID
Definition: DataTypes.h:27
int getNumPages()
Definition: UserSet.cc:163
PDBPagePtr getPage(FilePartitionID partitionId, unsigned int pageSeqInPartition, PageID pageId)
Definition: UserSet.cc:126
LocalityType
Definition: DataTypes.h:50
PageID lastFlushedPageId
Definition: UserSet.h:398
UserTypeID getTypeID()
Definition: UserSet.h:225
NodeID nodeId
Definition: UserSet.h:393
SetID getSetID()
Definition: UserSet.h:233
DurabilityType
Definition: DataTypes.h:59
unsigned int getNumPartitions()
LocalitySetReplacementPolicy
Definition: DataTypes.h:52
shared_ptr< PartitionedFile > PartitionedFilePtr
shared_ptr< SharedMem > SharedMemPtr
Definition: SharedMem.h:32
unsigned int DatabaseID
Definition: DataTypes.h:29
unsigned int PageID
Definition: DataTypes.h:26
PDBPagePtr inputBufferPage
Definition: UserSet.h:400
void initialize(unsigned int currentID)
Definition: SequenceID.h:39
unordered_map< PageID, FileSearchKey > * dirtyPagesInPageCache
Definition: UserSet.h:402
SequenceID seqId
Definition: UserSet.h:401
size_t pageSize
Definition: UserSet.h:407
PageID pageId
Definition: DataTypes.h:88
PDBPagePtr addPage()
Definition: UserSet.cc:139
pthread_mutex_t addBytesMutex
Definition: UserSet.h:406
string setName
Definition: UserSet.h:397
void addPageToDirtyPageSet(PageID pageId)
Definition: UserSet.h:276
DatabaseID getDbID()
Definition: UserSet.h:217
PersistenceType
Definition: DataTypes.h:68
#define PDB_COUT
Definition: PDBDebug.h:31
void lockDirtyPageSet()
Definition: UserSet.h:322
UserTypeID typeId
Definition: UserSet.h:395
void setPinned(bool isPinned)
Definition: UserSet.h:348
PartitionedFilePtr file
Definition: UserSet.h:389
unsigned int getNextSequenceID()
Definition: SequenceID.h:43
pthread_mutex_t dirtyPageSetMutex
Definition: UserSet.h:403
std::shared_ptr< PDBLogger > PDBLoggerPtr
Definition: PDBLogger.h:40
~UserSet()
Definition: UserSet.cc:114
void unlockDirtyPageSet()
Definition: UserSet.h:326
void cleanDirtyPageSet()
Definition: UserSet.cc:257
SetID setId
Definition: UserSet.h:396
UserTypeID typeId
Definition: DataTypes.h:86
pdb::PDBLoggerPtr logger
Definition: UserSet.h:392
PageCachePtr pageCache
Definition: UserSet.h:390
unordered_map< PageID, FileSearchKey > * getDirtyPageSet()
Definition: UserSet.h:318
unsigned int FilePartitionID
Definition: DataTypes.h:32
void flushDirtyPages()
Definition: UserSet.cc:271
shared_ptr< SequenceFile > SequenceFilePtr
Definition: SequenceFile.h:29
unsigned int UserTypeID
Definition: DataTypes.h:25