A platform for high-performance distributed tool and library development written in C++. It can be deployed in two different cluster modes: standalone or distributed. API for v0.5.0, released on June 13, 2018.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
PageCache.cc
Go to the documentation of this file.
1 /*****************************************************************************
2  * *
3  * Copyright 2018 Rice University *
4  * *
5  * Licensed under the Apache License, Version 2.0 (the "License"); *
6  * you may not use this file except in compliance with the License. *
7  * You may obtain a copy of the License at *
8  * *
9  * http://www.apache.org/licenses/LICENSE-2.0 *
10  * *
11  * Unless required by applicable law or agreed to in writing, software *
12  * distributed under the License is distributed on an "AS IS" BASIS, *
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. *
14  * See the License for the specific language governing permissions and *
15  * limitations under the License. *
16  * *
17  *****************************************************************************/
18 
19 #ifndef PAGE_CACHE_CC
20 #define PAGE_CACHE_CC
21 
22 #include "PDBDebug.h"
23 #include "PageCache.h"
24 #include "PDBEvictWork.h"
25 
26 #include <queue>
27 #include <stdlib.h>
28 #include <sched.h>
29 using namespace std;
30 
31 #define WARN_THRESHOLD 0.9
32 #ifndef EVICT_STOP_THRESHOLD
33 #define EVICT_STOP_THRESHOLD 0.9
34 #endif
35 
37  pdb::PDBWorkerQueuePtr workers,
38  PageCircularBufferPtr flushBuffer,
39  pdb::PDBLoggerPtr logger,
40  SharedMemPtr shm,
41  CacheStrategy strategy) {
42  this->cache = new unordered_map<CacheKey, PDBPagePtr, CacheKeyHash, CacheKeyEqual>();
43  this->conf = conf;
44  this->workers = workers;
45  pthread_mutex_init(&this->countLock, nullptr);
46  pthread_mutex_init(&this->cacheMutex, nullptr);
47  pthread_mutex_init(&this->evictionMutex, nullptr);
48  pthread_rwlock_init(&this->evictionAndFlushLock, nullptr);
49  this->accessCount = 0;
50  this->inEviction = false;
51  this->maxSize = conf->getShmSize();
52  this->size = 0;
53  this->warnSize = (this->maxSize) * WARN_THRESHOLD;
54 
55  std::cout << "maxSize=" << maxSize << std::endl;
56 
57  this->evictStopSize = (this->maxSize) * EVICT_STOP_THRESHOLD;
58 
59  std::cout << "evictStopSize=" << evictStopSize << std::endl;
60 
61  if (this->evictStopSize >= (shm->getShmSize() - 6 * conf->getMaxPageSize())) {
62  this->evictStopSize = shm->getShmSize() - 6 * conf->getMaxPageSize();
63  std::cout << "evictStopSize=" << evictStopSize << std::endl;
64  if (this->evictStopSize <= conf->getMaxPageSize()) {
65  this->evictStopSize = conf->getMaxPageSize();
66  std::cout << "evictStopSize=" << evictStopSize << std::endl;
67  }
68  }
69  std::cout << "PageCache: EVICT_STOP_SIZE is automatically tuned to be " << this->evictStopSize
70  << std::endl;
71  this->flushBuffer = flushBuffer;
72  this->logger = logger;
73  this->shm = shm;
74  this->strategy = strategy;
75  this->priorityList = new vector<list<LocalitySetPtr>*>();
76  int i;
77  for (i = 0; i < 6; i++) {
78  list<LocalitySetPtr>* curList = new list<LocalitySetPtr>();
79  this->priorityList->push_back(curList);
80  }
81  logger->writeLn("LRUPageCache: warn size:");
82  logger->writeInt(this->warnSize);
83  logger->writeLn("LRUPageCache: stop size:");
84  logger->writeInt(this->evictStopSize);
85 }
86 
88  delete this->cache;
89  pthread_mutex_destroy(&this->countLock);
90  pthread_mutex_destroy(&this->cacheMutex);
91  pthread_mutex_destroy(&this->evictionMutex);
92  pthread_rwlock_destroy(&this->evictionAndFlushLock);
93 }
94 
95 // Cache the page with specified name and buffer;
97  if (page == nullptr) {
98  logger->writeLn("LRUPageCache: null page.");
99  }
100  CacheKey key;
101  key.dbId = page->getDbID();
102  key.typeId = page->getTypeID();
103  key.setId = page->getSetID();
104  key.pageId = page->getPageID();
105  pthread_mutex_lock(&this->cacheMutex);
106  if (this->cache->find(key) == this->cache->end()) {
107  pair<CacheKey, PDBPagePtr> pair = make_pair(key, page);
108  this->cache->insert(pair);
109  this->size += page->getRawSize() + 512;
110  } else {
111  logger->writeLn("LRUPageCache: page was there already.");
112  }
113  pthread_mutex_unlock(&this->cacheMutex);
114  if (set != nullptr) {
115  set->addCachedPage(page);
116  }
117 }
118 
119 // If there is sufficient room in shared memory, allocate the buffer as required
120 // Otherwise, try to evict a page from shared memory.
121 // It will block until data can be allocated.
122 char* PageCache::allocateBufferFromSharedMemoryBlocking(size_t size, int& alignOffset) {
123  // below function is thread-safe
124  char* data = (char*)this->shm->mallocAlign(size, 512, alignOffset);
125  // Dangerous: dead loop
126  while (data == nullptr) {
127  this->logger->info("LRUPageCache: out of memory in off-heap pool, start eviction.");
128 #ifdef PROFILING_CACHE
129  std::cout << "Out of memory in shared memory pool, trying to allocate " << size << " data"
130  << std::endl;
131 #endif
132  if (this->inEviction == false) {
133  this->evict();
134  } else {
135  this->logger->info("waiting for eviction work to evict at least one page.");
136  sched_yield();
137  }
138  data = (char*)this->shm->mallocAlign(size, 512, alignOffset);
139  }
140 
141  PDB_COUT << "page allocated!\n";
142  return data;
143 }
144 
145 // Try to allocate buffer of required size from shared memory, if no room, return nullptr.
146 char* PageCache::tryAllocateBufferFromSharedMemory(size_t size, int& alignOffset) {
147  char* data = (char*)this->shm->mallocAlign(size, 512, alignOffset);
148  if (data == nullptr) {
149  this->logger->writeLn("LRUPageCache: out of memory in off-heap pool, start eviction.");
150  this->evict();
151  data = (char*)this->shm->mallocAlign(size, 512, alignOffset);
152  }
153  return data;
154 }
155 
156 // Lock for eviction.
158  pthread_rwlock_wrlock(&this->evictionAndFlushLock);
159 }
160 
161 // Unlock for eviction.
163  pthread_rwlock_unlock(&this->evictionAndFlushLock);
164 }
165 
166 // Lock for flushing.
168  pthread_rwlock_rdlock(&this->evictionAndFlushLock);
169 }
170 
171 // Unlock for flushing.
173  pthread_rwlock_unlock(&this->evictionAndFlushLock);
174 }
175 
177  size_t size,
178  NodeID nodeId,
179  DatabaseID dbId,
180  UserTypeID typeId,
181  SetID setId,
182  PageID pageId) {
183  int offset;
184  char* buffer = allocateBufferFromSharedMemoryBlocking(size, offset);
185  ssize_t readSize = read(handle, buffer, size);
186  if (readSize <= 0) {
187  std::cout << "PageCache: Read failed" << std::endl;
188  return nullptr;
189  }
190  PDBPagePtr page = make_shared<PDBPage>(
191  buffer, nodeId, dbId, typeId, setId, pageId, size, shm->computeOffset(buffer), offset);
192  return page;
193 }
194 
195 // Build a PDBPage instance based on the info of file and info parsed from loaded page data.
197  char* pageData,
198  FilePartitionID partitionId,
199  unsigned int pageSeqInPartition,
200  int internalOffset,
201  size_t pageSize) {
202  if (pageData == nullptr) {
203  return nullptr;
204  }
205  char* cur = pageData + sizeof(NodeID) + sizeof(DatabaseID) + sizeof(UserTypeID) + sizeof(SetID);
206  PageID pageId = (PageID)(*(PageID*)cur);
207  cur = cur + sizeof(PageID);
208  int numObjects = (int)(*(int*)cur);
209  // create a new PDBPagePtr
210  PDBPagePtr page = make_shared<PDBPage>(pageData,
211  file->getNodeId(),
212  file->getDbId(),
213  file->getTypeId(),
214  file->getSetId(),
215  pageId,
216  pageSize,
217  shm->computeOffset(pageData),
218  internalOffset,
219  numObjects);
220  if (page == nullptr) {
221  this->logger->error("Fatal Error: PageCache: out of memory in heap.");
222  std::cout << "FATAL ERROR: PageCache out of memory" << std::endl;
223  exit(-1);
224  }
225  page->setNumObjects(numObjects);
226  page->setPartitionId(partitionId);
227  page->setPageSeqInPartition(pageSeqInPartition);
228  return page;
229 }
230 
231 // Load the page in a PDBFile (SequenceFile or PartitionedFile) into memory
232 // If the file is SequenceFile, partition id will be ignored.
234  FilePartitionID partitionId,
235  unsigned int pageSeqInPartition,
236  bool sequential) {
237  // check file type
238  if (file->getFileType() == FileType::SequenceFileType) {
239  return this->loadPage(dynamic_pointer_cast<SequenceFile>(file), (PageID)pageSeqInPartition);
240  } else {
241  // It's a PartitionedFile instance.
242  PartitionedFilePtr curFile = dynamic_pointer_cast<PartitionedFile>(file);
243  // Check the partition size.
244  unsigned int numPagesInPartition =
245  (unsigned int)curFile->getMetaData()->getPartition(partitionId)->getNumPages();
246 
247  if (pageSeqInPartition >= numPagesInPartition) {
248  return nullptr;
249  }
250  int internalOffset = 0;
251  size_t pageSize = file->getPageSize();
252  // allocate memory for the page from shared memory pool, if there is no free page, it will
253  // blocking and evicting, until there is new room.
254  char* pageData = this->allocateBufferFromSharedMemoryBlocking(pageSize, internalOffset);
255  // seek to page
256  if (sequential == true) {
257  curFile->loadPageFromCurPos(partitionId, pageSeqInPartition, pageData, pageSize);
258  } else {
259  curFile->loadPage(partitionId, pageSeqInPartition, pageData, pageSize);
260  }
261  // build page from loaded data
262  return this->buildPageFromSharedMemoryData(
263  file, pageData, partitionId, pageSeqInPartition, internalOffset, pageSize);
264  }
265  return nullptr;
266 }
267 
268 // Load the page in a SequenceFile into memory;
270  if (pageId > file->getLastFlushedPageID()) {
271  this->logger->writeLn(
272  "LRUPageCache: page is still in input buffer, and hasn't been flushed yet.");
273  return nullptr;
274  }
275  int internalOffset = 0;
276  // allocate a page from shared memory pool, if there is no free page, it will blocking and
277  // evicting, until there is new room.
278  size_t pageSize = file->getPageSize();
279  char* pageData = this->allocateBufferFromSharedMemoryBlocking(pageSize, internalOffset);
280  // seek to the pageId
281  file->loadPage(pageId, pageData, pageSize);
282  // build page from loaded data
283  return this->buildPageFromSharedMemoryData(file, pageData, 0, pageId, internalOffset, pageSize);
284 }
285 
286 
287 // Remove page specified by Key from cache hashMap.
288 // This function will be used by the flushConsumer thread.
290  pthread_mutex_lock(&this->cacheMutex);
291  if (this->containsPage(key) == false) {
292  pthread_mutex_unlock(&this->cacheMutex);
293  return false;
294  }
295  size_t pageSizeAllocated = this->cache->at(key)->getRawSize() + 512;
296  this->cache->erase(key);
297  this->size -= pageSizeAllocated;
298  pthread_mutex_unlock(&this->cacheMutex);
299  return true;
300 }
301 
302 // Free page data and Remove page specified by Key from cache hashMap.
303 // This function will be used by the UserSet::clear() method.
305  CacheKey key;
306  key.dbId = curPage->getDbID();
307  key.typeId = curPage->getTypeID();
308  key.setId = curPage->getSetID();
309  key.pageId = curPage->getPageID();
310 
311  pthread_mutex_lock(&this->cacheMutex);
312  if (this->containsPage(key) == false) {
313  pthread_mutex_unlock(&this->cacheMutex);
314  return false;
315  }
316  size_t pageSizeAllocated = this->cache->at(key)->getRawSize() + 512;
317  cache->erase(key);
318  this->size -= pageSizeAllocated;
319  pthread_mutex_unlock(&this->cacheMutex);
320  this->shm->free(curPage->getRawBytes() - curPage->getInternalOffset(),
321  curPage->getRawSize() + 512);
322  curPage->setOffset(0);
323  curPage->setRawBytes(nullptr);
324  return true;
325 }
326 
327 
328 // Below method can be used for all PDBFile instances, include sequence file and partitioned file.
329 // note that below method will cause cached page reference count ++;
331  FilePartitionID partitionId,
332  unsigned int pageSeqInPartition,
333  PageID pageId,
334  bool sequential,
335  LocalitySet* set) {
336  CacheKey key;
337  key.dbId = file->getDbId();
338  key.typeId = file->getTypeId();
339  key.setId = file->getSetId();
340  key.pageId = pageId;
341  PDBPagePtr page;
342 
343  if ((partitionId == (unsigned int)(-1)) || (pageSeqInPartition == (unsigned int)(-1))) {
344  PageIndex pageIndex = file->getMetaData()->getPageIndex(pageId);
345  partitionId = pageIndex.partitionId;
346  pageSeqInPartition = pageIndex.pageSeqInPartition;
347  }
348  // Assumption: At one time, for a page, only one thread will try to load it.
349  // Above assumption is guaranteed by the front-end scan model.
350  pthread_mutex_lock(&this->evictionMutex);
351  this->evictionLock();
352  pthread_mutex_lock(&this->cacheMutex);
353  if (this->containsPage(key) != true) {
354  pthread_mutex_unlock(&this->cacheMutex);
355  this->evictionUnlock();
356  pthread_mutex_unlock(&this->evictionMutex);
357  page = this->loadPage(file, partitionId, pageSeqInPartition, sequential);
358  if (page == nullptr) {
359  return nullptr;
360  }
361  pthread_mutex_lock(&this->countLock);
362  page->setAccessSequenceId(this->accessCount);
363  this->accessCount++;
364  pthread_mutex_unlock(&this->countLock);
365 
366  pthread_mutex_lock(&this->evictionMutex);
367  this->cachePage(page, set);
368  page->setPinned(true);
369  page->setDirty(false);
370  page->incRefCount();
371  pthread_mutex_unlock(&this->evictionMutex);
372  } else {
373  page = this->cache->at(key);
374  pthread_mutex_unlock(&this->cacheMutex);
375  if (page == nullptr) {
376  std::cout << "WARNING: PartitionPageIterator get nullptr in cache.\n" << std::endl;
377  logger->warn("PartitionPageIterator get nullptr in cache.");
378  this->evictionUnlock();
379  pthread_mutex_unlock(&this->evictionMutex);
380  return nullptr;
381  }
382  page->setPinned(true);
383  page->incRefCount();
384  this->evictionUnlock();
385  pthread_mutex_unlock(&this->evictionMutex);
386  pthread_mutex_lock(&this->countLock);
387  page->setAccessSequenceId(this->accessCount);
388  this->accessCount++;
389  pthread_mutex_unlock(&this->countLock);
390  if (set != nullptr) {
391  set->updateCachedPage(page);
392  }
393  }
394 
395  return page;
396 }
397 
398 // Below method is mainly to provide backward-compatibility for sequence files.
399 // note that below method will cause cached page reference count ++;
400 // NOT SUPPORTED ANY MORE, TO REMOVE THE METHOD
402  return nullptr;
403 }
404 
405 // Below method will cause reference count ++;
406 // It will only be used in SetCachePageIterator class to get dirty pages, and will be guarded there
408  pthread_mutex_lock(&this->cacheMutex);
409  if (this->containsPage(key) != true) {
410  pthread_mutex_unlock(&this->cacheMutex);
411  std::cout << "WARNING: SetCachePageIterator get nullptr in cache.\n" << std::endl;
412  logger->warn("SetCachePageIterator get nullptr in cache.");
413  return nullptr;
414  } else {
415  PDBPagePtr page = this->cache->at(key);
416  pthread_mutex_unlock(&this->cacheMutex);
417  if (page == nullptr) {
418  std::cout << "WARNING: SetCachePageIterator get nullptr in cache.\n" << std::endl;
419  logger->warn("SetCachePageIterator get nullptr in cache.");
420  return nullptr;
421  }
422  page->incRefCount();
423  pthread_mutex_lock(&this->countLock);
424  page->setAccessSequenceId(this->accessCount);
425  this->accessCount++;
426  pthread_mutex_unlock(&this->countLock);
427  if (set != nullptr) {
428  set->updateCachedPage(page);
429  }
430  return page;
431  }
432 }
433 
435  CacheKey key,
436  LocalitySet* set,
437  size_t pageSize) {
438  if (this->containsPage(key) == true) {
439  return nullptr;
440  }
441  int internalOffset = 0;
442  char* pageData;
443  pageData = tryAllocateBufferFromSharedMemory(pageSize, internalOffset);
444  if (pageData != nullptr) {
445  } else {
446  return nullptr;
447  }
448  PDBPagePtr page = make_shared<PDBPage>(pageData,
449  nodeId,
450  key.dbId,
451  key.typeId,
452  key.setId,
453  key.pageId,
454  pageSize,
455  shm->computeOffset(pageData),
456  internalOffset);
457 
458  pthread_mutex_lock(&this->countLock);
459  page->setAccessSequenceId(this->accessCount);
460  this->accessCount++;
461  pthread_mutex_unlock(&this->countLock);
462  page->setPinned(true);
463  page->setDirty(true);
464  pthread_mutex_lock(&evictionMutex);
465  this->cachePage(page, set);
466  page->incRefCount();
467  pthread_mutex_unlock(&evictionMutex);
468  return page;
469 }
470 
471 
472 // Assumption: for a new pageId, at one time, only one thread will try to allocate a new page for it
473 // To allocate a new page, set it as pinned&dirty, add it to cache, and increment reference count
474 PDBPagePtr PageCache::getNewPage(NodeID nodeId, CacheKey key, LocalitySet* set, size_t pageSize) {
475  pthread_mutex_lock(&evictionMutex);
476  if (this->containsPage(key) == true) {
477  pthread_mutex_unlock(&evictionMutex);
478  return nullptr;
479  }
480  pthread_mutex_unlock(&evictionMutex);
481  int internalOffset = 0;
482  char* pageData;
483  pageData = allocateBufferFromSharedMemoryBlocking(pageSize, internalOffset);
484  if (pageData != nullptr) {
485  PDB_COUT << "PageCache: getNewPage: Page created for typeId=" << key.typeId
486  << ",setId=" << key.setId << ",pageId=" << key.pageId << "\n";
487  } else {
488  PDB_COUT << "failed!!!\n";
489  return nullptr;
490  }
491  PDBPagePtr page = make_shared<PDBPage>(pageData,
492  nodeId,
493  key.dbId,
494  key.typeId,
495  key.setId,
496  key.pageId,
497  pageSize,
498  shm->computeOffset(pageData),
499  internalOffset);
500 
501  pthread_mutex_lock(&this->countLock);
502  page->setAccessSequenceId(this->accessCount);
503  this->accessCount++;
504  pthread_mutex_unlock(&this->countLock);
505  page->setPinned(true);
506  page->setDirty(true);
507  pthread_mutex_lock(&evictionMutex);
508  this->cachePage(page, set);
509  page->incRefCount();
510  pthread_mutex_unlock(&evictionMutex);
511  return page;
512 }
513 
514 // please note that only below method will cause cached page reference count --
515 
517  if (this->containsPage(key) == false) {
518  return false;
519  } else {
520  PDBPagePtr page = this->cache->at(key);
521  page->decRefCount();
522  return true;
523  }
524 }
525 
527  return (this->cache->find(key) != this->cache->end());
528 }
529 
530 
531 // Unpin all dirty pages
532 // IMPORTANT: This function can only be called before shutdown.
534  if (this->inEviction == true) {
535  return 0;
536  }
537 
538  this->logger->writeLn("Storage server: start cache eviction for all dirty pages...");
539  pthread_mutex_lock(&this->evictionMutex);
540  this->inEviction = true;
541  int numEvicted = 0;
542  PDBPagePtr page;
543  unordered_map<CacheKey, PDBPagePtr, CacheKeyHash, CacheKeyEqual>::iterator cacheIter;
544  vector<PDBPagePtr>* evictableDirtyPages = new vector<PDBPagePtr>();
545  this->evictionLock();
546  for (cacheIter = this->cache->begin(); cacheIter != this->cache->end(); cacheIter++) {
547  page = cacheIter->second;
548  if (page == nullptr) {
549  this->inEviction = false;
550  pthread_mutex_unlock(&this->evictionMutex);
551  return 0;
552  } else if ((page->isDirty() == true) && (page->isInFlush() == false)) {
553  while (page->getRefCount() > 0) {
554  page->decRefCount();
555  }
556  evictableDirtyPages->push_back(page);
557  } else {
558  // do nothing
559  }
560  }
561  this->evictionUnlock();
562  int i;
563  for (i = 0; i < evictableDirtyPages->size(); i++) {
564  page = evictableDirtyPages->at(i);
565  if (evictPage(page) == true) {
566  numEvicted++;
567  }
568  }
569 
570  this->inEviction = false;
571  pthread_mutex_unlock(&this->evictionMutex);
572  delete evictableDirtyPages;
573  return numEvicted;
574 }
575 
576 
577 // Evict all dirty pages
579  if (this->inEviction == true) {
580  return 0;
581  }
582 
583  this->logger->writeLn("Storage server: start cache eviction for all dirty pages...");
584  pthread_mutex_lock(&this->evictionMutex);
585  this->inEviction = true;
586  int numEvicted = 0;
587  PDBPagePtr page;
588  unordered_map<CacheKey, PDBPagePtr, CacheKeyHash, CacheKeyEqual>::iterator cacheIter;
589  vector<PDBPagePtr>* evictableDirtyPages = new vector<PDBPagePtr>();
590  this->evictionLock();
591  for (cacheIter = this->cache->begin(); cacheIter != this->cache->end(); cacheIter++) {
592  page = cacheIter->second;
593  if (page == nullptr) {
594  this->inEviction = false;
595  pthread_mutex_unlock(&this->evictionMutex);
596  return 0;
597  } else if ((page->isDirty() == true) && (page->getRefCount() == 0) &&
598  (page->isInFlush() == false)) {
599  evictableDirtyPages->push_back(page);
600  } else {
601  // do nothing
602  }
603  }
604  this->evictionUnlock();
605  int i;
606  for (i = 0; i < evictableDirtyPages->size(); i++) {
607  page = evictableDirtyPages->at(i);
608  if (evictPage(page) == true) {
609  numEvicted++;
610  }
611  }
612 
613  this->inEviction = false;
614  pthread_mutex_unlock(&this->evictionMutex);
615  delete evictableDirtyPages;
616  return numEvicted;
617 }
618 
619 
620 // Flush a page.
622  if (this->containsPage(key) == true) {
623  PDBPagePtr page = this->cache->at(key);
624  if ((page->isDirty() == true) && (page->isInFlush() == false)) {
625  page->setInFlush(true);
626  page->setInEviction(false);
627  this->flushBuffer->addPageToTail(page);
628  } else {
629  // can't flush
630  return false;
631  }
632  } else {
633  // can't find page
634  return false;
635  }
636  return true;
637 }
638 
639 
640 // Evict a page
641 
642 bool PageCache::evictPage(CacheKey key, bool tryFlushOrNot) {
643  if (this->containsPage(key) == true) {
644  PDBPagePtr page = this->cache->at(key);
645 #ifndef UNPIN_FOR_NON_ZERO_REF_COUNT
646  if (page->getRefCount() > 0) {
647  cout << "can't be unpinned due to non-zero reference count " << page->getRefCount()
648  << "with DatabaseID=" << page->getDbID() << ", TypeID=" << page->getTypeID()
649  << ", SetID=" << page->getSetID() << ", PageID=" << page->getPageID() << "\n";
650  this->logger->writeLn(
651  "LRUPageCache: can not evict page because it has been pinned by at least one "
652  "client");
653  this->logger->writeInt(page->getPageID());
654  return false;
655  } else
656 #endif
657  {
658 
659  page->setPinned(false);
660  if ((tryFlushOrNot == true) && (page->isDirty() == true) &&
661  (page->isInFlush() == false) &&
662  ((page->getDbID() != 0) || (page->getTypeID() != 1)) &&
663  ((page->getDbID() != 0) || (page->getTypeID() != 2))) {
664 #ifdef PROFILING_CACHE
665  std::cout << "going to unpin a dirty page...\n";
666 #endif
667  // update counter
668  page->setInFlush(true);
669  page->setInEviction(true);
670  // flush the page
671  // first we release the lock so that the flushing thread can run.
672  this->flushBuffer->addPageToTail(page);
673 
674  } else if (page->isInFlush() == false) {
675 #ifdef PROFILING_CACHE
676  std::cout << "going to unpin a clean page...\n";
677 #endif
678  // free the page
679  // We use flush lock (which is a read write lock) to synchronize with getPage() that
680  // will be invoked in PartitionPageIterator;
681  // One scenario is: PDB load old data from disk to memory through iterators while
682  // application pins new pages that requires to evict data, then an old page in
683  // checking for loading may get evicted before it is pinned.
684  // Add flush lock is to guard for similar scenarios.
685  this->flushLock();
686  this->shm->free(page->getRawBytes() - page->getInternalOffset(),
687  page->getRawSize() + 512);
688 
689  page->setOffset(0);
690  page->setRawBytes(nullptr);
691  removePage(key);
692  this->flushUnlock();
693  }
694 #ifdef PROFILING_CACHE
695  std::cout << "Storage server: evicting page from cache for dbId:" << page->getDbID()
696  << ", typeID:" << page->getTypeID() << ", setID=" << page->getSetID()
697  << ", pageID: " << page->getPageID() << ", tryFlushing=" << tryFlushOrNot
698  << ".\n";
699 #endif
700  }
701 
702  } else {
703  PDB_COUT << "can not find page in cache!\n";
704  this->logger->writeLn("LRUPageCache: can not evict page because it is not in cache");
705  return false;
706  }
707 
708  return true;
709 }
710 
712  CacheKey key;
713  key.dbId = page->getDbID();
714  key.typeId = page->getTypeID();
715  key.setId = page->getSetID();
716  key.pageId = page->getPageID();
717  bool ret = evictPage(key);
718  if (ret == true) {
719  if (set != nullptr) {
720  set->removeCachedPage(page);
721  }
722  }
723  return ret;
724 }
725 
727  pdb::PDBWorkerPtr worker;
728  while ((worker = this->workers->getWorker()) == nullptr) {
729  sleep(0);
730  }
731  PDBEvictWorkPtr evictWork = make_shared<PDBEvictWork>(this);
732  PDBBuzzerPtr buzzer = evictWork->getLinkedBuzzer();
733  worker->execute(evictWork, buzzer);
734 }
735 
737  if (this->inEviction == true) {
738  return;
739  }
740 #ifdef PROFILING_CACHE
741  std::cout << "Storage server: starting cache eviction to get more room with used size= "
742  << this->size << "!\n";
743 #endif
744  pthread_mutex_lock(&this->evictionMutex);
745  this->inEviction = true;
746  if (this->strategy == UnifiedIntelligent) {
747  this->evictionLock();
748  vector<PDBPagePtr>* pagesToEvict = nullptr;
749  int i, j;
750  int numEvicted = 0;
751  list<LocalitySetPtr>* curList;
752  for (i = 0; i < 6; i++) {
753  curList = this->priorityList->at(i);
754  for (list<LocalitySetPtr>::reverse_iterator it = curList->rbegin();
755  it != curList->rend();
756  ++it) {
757  LocalitySetPtr set = (*it);
758  pagesToEvict = set->selectPagesForReplacement();
759  if (pagesToEvict != nullptr) {
760  this->evictionUnlock();
761  for (j = 0; j < pagesToEvict->size(); j++) {
762  if (this->evictPage(pagesToEvict->at(j), set)) {
763  numEvicted++;
764  }
765  }
766  this->evictionLock();
767  delete pagesToEvict;
768  pagesToEvict = nullptr;
769  }
770  }
771  if (numEvicted > 0) {
772  break;
773  }
774  }
775  this->evictionUnlock();
776 
777  } else {
778  this->evictionLock();
779  this->logger->debug("PageCache::evict(): got the lock for evictionLock()...");
780  priority_queue<PDBPagePtr, vector<PDBPagePtr>, CompareCachedPagesMRU>* cachedPages =
781  new priority_queue<PDBPagePtr, vector<PDBPagePtr>, CompareCachedPagesMRU>();
782  unordered_map<CacheKey, PDBPagePtr, CacheKeyHash, CacheKeyEqual>::iterator cacheIter;
783  PDBPagePtr curPage;
784  for (cacheIter = this->cache->begin(); cacheIter != this->cache->end(); cacheIter++) {
785  curPage = cacheIter->second;
786  if (curPage == nullptr) {
787  this->logger->error("PageCache::evict(): got a null page, return!");
788  delete cachedPages;
789  this->inEviction = false;
790  this->evictionUnlock();
791  pthread_mutex_unlock(&this->evictionMutex);
792  return;
793  }
794  this->logger->debug(
795  "PageCache::evict(): got a page, check whether it can be evicted...");
796  if ((curPage->getRefCount() == 0) &&
797  ((curPage->isDirty() == false) ||
798  ((curPage->isDirty() == true) && (curPage->isInFlush() == false)))) {
799  cachedPages->push(curPage);
800 #ifdef PROFILING_CACHE
801  std::cout << "Add to eviction queue: curPage->getRefCount()="
802  << curPage->getRefCount() << ", curPage->isDirty()=" << curPage->isDirty()
803  << ", curPage->isInFlush)=" << curPage->isInFlush()
804  << ", curPage->dbId=" << curPage->getDbID()
805  << ", curPage->setId=" << curPage->getSetID() << std::endl;
806 #endif
807  } else {
808  // do nothing
809  }
810  }
811  this->evictionUnlock();
812  PDBPagePtr page;
813  while ((this->size > this->evictStopSize) && (cachedPages->size() > 0)) {
814  page = cachedPages->top();
815  if (page == nullptr) {
816  PDB_COUT << "PageCache: nothing to evict, return!\n";
817  this->logger->debug("PageCache: nothing to evict, return!\n");
818  break;
819  }
820  if (this->evictPage(page) == true) {
821 #ifdef PROFILING
822  std::cout << "Storage server: evicted page from cache passively for dbId:"
823  << page->getDbID() << ", typeID:" << page->getTypeID()
824  << ", setID=" << page->getSetID() << ", pageID: " << page->getPageID()
825  << ".\n";
826 #endif
827  this->logger->debug(
828  std::string("Storage server: evicting page from cache for pageID:") +
829  std::to_string(page->getPageID()));
830  cachedPages->pop();
831  } else {
832  cachedPages->pop();
833  }
834  }
835  delete cachedPages;
836  }
837  this->inEviction = false;
838  pthread_mutex_unlock(&this->evictionMutex);
839 #ifdef PROFILING
840  std::cout << "Storage server: finished cache eviction!\n";
841 #endif
842  logger->debug("Storage server: finished cache eviction!\n");
843 }
844 
845 void PageCache::getAndSetWarnSize(unsigned int numSets, double warnThreshold) {
846  this->warnSize = (this->maxSize) * warnThreshold;
847  this->logger->writeLn("LRUPageCache: warnSize was set to:");
848  this->logger->writeInt(this->warnSize);
849 }
850 
851 void PageCache::getAndSetEvictStopSize(unsigned int numSets, double evictThreshold) {
852  this->evictStopSize = (this->maxSize) * evictThreshold;
853  this->logger->writeLn("LRUPageCache: evictSize was set to:");
854  this->logger->writeInt(this->evictStopSize);
855 }
856 
858  this->priorityList->at(level)->push_back(set);
859 }
860 
862  this->priorityList->at(level)->remove(set);
863 }
864 
867  OperationType operationType) {
868  set->pin(policy, operationType);
869  if (set->getPersistenceType() == Transient) {
870  if (set->getLocalityType() == ShuffleData) {
871  this->addLocalitySetToPriorityList(set, TransientLifetimeNotEndedShuffleData);
872  } else if (set->getLocalityType() == HashPartitionData) {
873  this->addLocalitySetToPriorityList(set, TransientLifetimeNotEndedHashData);
874  } else {
875  this->addLocalitySetToPriorityList(set, TransientLifetimeNotEndedPartialData);
876  }
877  } else {
878  this->addLocalitySetToPriorityList(set, PersistentLifetimeNotEnded);
879  }
880 }
881 
883  set->unpin();
884  if (set->getPersistenceType() == Transient) {
885  if (set->getLocalityType() == ShuffleData) {
886  this->removeLocalitySetFromPriorityList(set, TransientLifetimeNotEndedShuffleData);
887  } else if (set->getLocalityType() == HashPartitionData) {
888  this->removeLocalitySetFromPriorityList(set, TransientLifetimeNotEndedHashData);
889  } else {
890  this->removeLocalitySetFromPriorityList(set, TransientLifetimeNotEndedPartialData);
891  }
892  } else {
893  this->removeLocalitySetFromPriorityList(set, PersistentLifetimeNotEnded);
894  }
895  if (set->getPersistenceType() == Transient) {
896  this->addLocalitySetToPriorityList(set, TransientLifetimeEnded);
897  } else {
898  this->addLocalitySetToPriorityList(set, PersistentLifetimeEnded);
899  }
900 }
901 
902 #endif
PDBPagePtr getNewPage(NodeID nodeId, CacheKey key, LocalitySet *set=nullptr, size_t pageSize=DEFAULT_PAGE_SIZE)
Definition: PageCache.cc:474
void pin(LocalitySetPtr set, LocalitySetReplacementPolicy policy, OperationType operationType)
Definition: PageCache.cc:865
SetID setId
Definition: DataTypes.h:87
unsigned int SetID
Definition: DataTypes.h:31
shared_ptr< PDBPage > PDBPagePtr
Definition: PDBPage.h:32
char * allocateBufferFromSharedMemoryBlocking(size_t size, int &alignOffset)
Definition: PageCache.cc:122
void flushLock()
Definition: PageCache.cc:167
OperationType
Definition: DataTypes.h:57
unsigned int pageSeqInPartition
Definition: DataTypes.h:99
bool freePage(PDBPagePtr page)
Definition: PageCache.cc:304
PDBPagePtr getPage(SequenceFilePtr file, PageID pageId)
Definition: PageCache.cc:401
DatabaseID dbId
Definition: DataTypes.h:85
void getAndSetWarnSize(unsigned int numSets, double warnThreshold)
Definition: PageCache.cc:845
unsigned int NodeID
Definition: DataTypes.h:27
void evict()
Definition: PageCache.cc:736
bool removePage(CacheKey key)
Definition: PageCache.cc:289
PDBPagePtr buildAndCachePageFromFileHandle(int handle, size_t size, NodeID nodeId, DatabaseID dbId, UserTypeID typeId, SetID setId, PageID pageId)
Definition: PageCache.cc:176
void addCachedPage(PDBPagePtr page)
Definition: LocalitySet.cc:43
CacheStrategy
Definition: DataTypes.h:54
void removeLocalitySetFromPriorityList(LocalitySetPtr set, PriorityLevel level)
Definition: PageCache.cc:861
LocalitySetReplacementPolicy
Definition: DataTypes.h:52
void evictionUnlock()
Definition: PageCache.cc:162
void updateCachedPage(PDBPagePtr page)
Definition: LocalitySet.cc:47
shared_ptr< PartitionedFile > PartitionedFilePtr
shared_ptr< SharedMem > SharedMemPtr
Definition: SharedMem.h:32
#define EVICT_STOP_THRESHOLD
Definition: PageCache.cc:33
unsigned int DatabaseID
Definition: DataTypes.h:29
void addLocalitySetToPriorityList(LocalitySetPtr set, PriorityLevel level)
Definition: PageCache.cc:857
unsigned int PageID
Definition: DataTypes.h:26
FilePartitionID partitionId
Definition: DataTypes.h:98
void cachePage(PDBPagePtr page, LocalitySet *set=nullptr)
Definition: PageCache.cc:96
int evictAllDirtyPages()
Definition: PageCache.cc:578
shared_ptr< PDBEvictWork > PDBEvictWorkPtr
Definition: PDBEvictWork.h:24
bool flushPageWithoutEviction(CacheKey key)
Definition: PageCache.cc:621
PageID pageId
Definition: DataTypes.h:88
PDBPagePtr buildPageFromSharedMemoryData(PDBFilePtr file, char *pageData, FilePartitionID partitionId, unsigned int pageSeqInPartition, int internalOffset, size_t pageSize=DEFAULT_PAGE_SIZE)
Definition: PageCache.cc:196
void unpin(LocalitySetPtr set)
Definition: PageCache.cc:882
~PageCache()
Definition: PageCache.cc:87
PartitionedFileMetaDataPtr getMetaData()
bool containsPage(CacheKey key)
Definition: PageCache.cc:526
shared_ptr< PDBBuzzer > PDBBuzzerPtr
Definition: PDBBuzzer.h:32
shared_ptr< PDBWorkerQueue > PDBWorkerQueuePtr
bool decPageRefCount(CacheKey key)
Definition: PageCache.cc:516
shared_ptr< PDBFileInterface > PDBFilePtr
Definition: PDBFile.h:29
#define PDB_COUT
Definition: PDBDebug.h:31
int unpinAndEvictAllDirtyPages()
Definition: PageCache.cc:533
shared_ptr< Configuration > ConfigurationPtr
Definition: Configuration.h:89
shared_ptr< PDBWorker > PDBWorkerPtr
Definition: PDBWorker.h:40
void flushUnlock()
Definition: PageCache.cc:172
PageCache(ConfigurationPtr conf, pdb::PDBWorkerQueuePtr workers, PageCircularBufferPtr flushBuffer, pdb::PDBLoggerPtr logger, SharedMemPtr shm, CacheStrategy strategy=UnifiedMRU)
Definition: PageCache.cc:36
void evictionLock()
Definition: PageCache.cc:157
shared_ptr< LocalitySet > LocalitySetPtr
Definition: LocalitySet.h:29
PDBPagePtr getNewPageNonBlocking(NodeID nodeId, CacheKey key, LocalitySet *set=nullptr, size_t pageSize=DEFAULT_PAGE_SIZE)
Definition: PageCache.cc:434
std::shared_ptr< PDBLogger > PDBLoggerPtr
Definition: PDBLogger.h:40
bool evictPage(CacheKey key, bool tryFlushOrNot=true)
Definition: PageCache.cc:642
PDBPagePtr loadPage(SequenceFilePtr file, PageID pageId)
Definition: PageCache.cc:269
void runEviction()
Definition: PageCache.cc:726
PriorityLevel
Definition: DataTypes.h:41
shared_ptr< PageCircularBuffer > PageCircularBufferPtr
void getAndSetEvictStopSize(unsigned int numSets, double evictThreshold)
Definition: PageCache.cc:851
UserTypeID typeId
Definition: DataTypes.h:86
#define WARN_THRESHOLD
Definition: PageCache.cc:31
char * tryAllocateBufferFromSharedMemory(size_t size, int &alignOffset)
Definition: PageCache.cc:146
unsigned int FilePartitionID
Definition: DataTypes.h:32
shared_ptr< SequenceFile > SequenceFilePtr
Definition: SequenceFile.h:29
unsigned int UserTypeID
Definition: DataTypes.h:25