31 #define WARN_THRESHOLD 0.9
32 #ifndef EVICT_STOP_THRESHOLD
33 #define EVICT_STOP_THRESHOLD 0.9
42 this->cache =
new unordered_map<CacheKey, PDBPagePtr, CacheKeyHash, CacheKeyEqual>();
44 this->workers = workers;
45 pthread_mutex_init(&this->countLock,
nullptr);
46 pthread_mutex_init(&this->cacheMutex,
nullptr);
47 pthread_mutex_init(&this->evictionMutex,
nullptr);
48 pthread_rwlock_init(&this->evictionAndFlushLock,
nullptr);
49 this->accessCount = 0;
50 this->inEviction =
false;
51 this->maxSize = conf->getShmSize();
55 std::cout <<
"maxSize=" << maxSize << std::endl;
59 std::cout <<
"evictStopSize=" << evictStopSize << std::endl;
61 if (this->evictStopSize >= (shm->getShmSize() - 6 * conf->getMaxPageSize())) {
62 this->evictStopSize = shm->getShmSize() - 6 * conf->getMaxPageSize();
63 std::cout <<
"evictStopSize=" << evictStopSize << std::endl;
64 if (this->evictStopSize <= conf->getMaxPageSize()) {
65 this->evictStopSize = conf->getMaxPageSize();
66 std::cout <<
"evictStopSize=" << evictStopSize << std::endl;
69 std::cout <<
"PageCache: EVICT_STOP_SIZE is automatically tuned to be " << this->evictStopSize
71 this->flushBuffer = flushBuffer;
72 this->logger = logger;
74 this->strategy = strategy;
75 this->priorityList =
new vector<list<LocalitySetPtr>*>();
77 for (i = 0; i < 6; i++) {
78 list<LocalitySetPtr>* curList =
new list<LocalitySetPtr>();
79 this->priorityList->push_back(curList);
81 logger->writeLn(
"LRUPageCache: warn size:");
82 logger->writeInt(this->warnSize);
83 logger->writeLn(
"LRUPageCache: stop size:");
84 logger->writeInt(this->evictStopSize);
89 pthread_mutex_destroy(&this->countLock);
90 pthread_mutex_destroy(&this->cacheMutex);
91 pthread_mutex_destroy(&this->evictionMutex);
92 pthread_rwlock_destroy(&this->evictionAndFlushLock);
97 if (page ==
nullptr) {
98 logger->writeLn(
"LRUPageCache: null page.");
101 key.
dbId = page->getDbID();
102 key.
typeId = page->getTypeID();
103 key.
setId = page->getSetID();
104 key.
pageId = page->getPageID();
105 pthread_mutex_lock(&this->cacheMutex);
106 if (this->cache->find(key) == this->cache->end()) {
107 pair<CacheKey, PDBPagePtr> pair = make_pair(key, page);
108 this->cache->insert(pair);
109 this->size += page->getRawSize() + 512;
111 logger->writeLn(
"LRUPageCache: page was there already.");
113 pthread_mutex_unlock(&this->cacheMutex);
114 if (set !=
nullptr) {
124 char* data = (
char*)this->shm->mallocAlign(size, 512, alignOffset);
126 while (data ==
nullptr) {
127 this->logger->info(
"LRUPageCache: out of memory in off-heap pool, start eviction.");
128 #ifdef PROFILING_CACHE
129 std::cout <<
"Out of memory in shared memory pool, trying to allocate " << size <<
" data"
132 if (this->inEviction ==
false) {
135 this->logger->info(
"waiting for eviction work to evict at least one page.");
138 data = (
char*)this->shm->mallocAlign(size, 512, alignOffset);
147 char* data = (
char*)this->shm->mallocAlign(size, 512, alignOffset);
148 if (data ==
nullptr) {
149 this->logger->writeLn(
"LRUPageCache: out of memory in off-heap pool, start eviction.");
151 data = (
char*)this->shm->mallocAlign(size, 512, alignOffset);
158 pthread_rwlock_wrlock(&this->evictionAndFlushLock);
163 pthread_rwlock_unlock(&this->evictionAndFlushLock);
168 pthread_rwlock_rdlock(&this->evictionAndFlushLock);
173 pthread_rwlock_unlock(&this->evictionAndFlushLock);
184 char* buffer = allocateBufferFromSharedMemoryBlocking(size, offset);
185 ssize_t readSize = read(handle, buffer, size);
187 std::cout <<
"PageCache: Read failed" << std::endl;
191 buffer, nodeId, dbId, typeId, setId, pageId, size, shm->computeOffset(buffer), offset);
199 unsigned int pageSeqInPartition,
202 if (pageData ==
nullptr) {
207 cur = cur +
sizeof(
PageID);
208 int numObjects = (int)(*(
int*)cur);
210 PDBPagePtr page = make_shared<PDBPage>(pageData,
217 shm->computeOffset(pageData),
220 if (page ==
nullptr) {
221 this->logger->error(
"Fatal Error: PageCache: out of memory in heap.");
222 std::cout <<
"FATAL ERROR: PageCache out of memory" << std::endl;
225 page->setNumObjects(numObjects);
226 page->setPartitionId(partitionId);
227 page->setPageSeqInPartition(pageSeqInPartition);
235 unsigned int pageSeqInPartition,
239 return this->loadPage(dynamic_pointer_cast<SequenceFile>(file), (
PageID)pageSeqInPartition);
244 unsigned int numPagesInPartition =
245 (
unsigned int)curFile->
getMetaData()->getPartition(partitionId)->getNumPages();
247 if (pageSeqInPartition >= numPagesInPartition) {
250 int internalOffset = 0;
251 size_t pageSize = file->getPageSize();
254 char* pageData = this->allocateBufferFromSharedMemoryBlocking(pageSize, internalOffset);
256 if (sequential ==
true) {
257 curFile->loadPageFromCurPos(partitionId, pageSeqInPartition, pageData, pageSize);
259 curFile->loadPage(partitionId, pageSeqInPartition, pageData, pageSize);
262 return this->buildPageFromSharedMemoryData(
263 file, pageData, partitionId, pageSeqInPartition, internalOffset, pageSize);
270 if (pageId > file->getLastFlushedPageID()) {
271 this->logger->writeLn(
272 "LRUPageCache: page is still in input buffer, and hasn't been flushed yet.");
275 int internalOffset = 0;
278 size_t pageSize = file->getPageSize();
279 char* pageData = this->allocateBufferFromSharedMemoryBlocking(pageSize, internalOffset);
281 file->loadPage(pageId, pageData, pageSize);
283 return this->buildPageFromSharedMemoryData(file, pageData, 0, pageId, internalOffset, pageSize);
290 pthread_mutex_lock(&this->cacheMutex);
291 if (this->containsPage(key) ==
false) {
292 pthread_mutex_unlock(&this->cacheMutex);
295 size_t pageSizeAllocated = this->cache->at(key)->getRawSize() + 512;
296 this->cache->erase(key);
297 this->size -= pageSizeAllocated;
298 pthread_mutex_unlock(&this->cacheMutex);
306 key.
dbId = curPage->getDbID();
307 key.
typeId = curPage->getTypeID();
308 key.
setId = curPage->getSetID();
309 key.
pageId = curPage->getPageID();
311 pthread_mutex_lock(&this->cacheMutex);
312 if (this->containsPage(key) ==
false) {
313 pthread_mutex_unlock(&this->cacheMutex);
316 size_t pageSizeAllocated = this->cache->at(key)->getRawSize() + 512;
318 this->size -= pageSizeAllocated;
319 pthread_mutex_unlock(&this->cacheMutex);
320 this->shm->free(curPage->getRawBytes() - curPage->getInternalOffset(),
321 curPage->getRawSize() + 512);
322 curPage->setOffset(0);
323 curPage->setRawBytes(
nullptr);
332 unsigned int pageSeqInPartition,
337 key.
dbId = file->getDbId();
338 key.
typeId = file->getTypeId();
339 key.
setId = file->getSetId();
343 if ((partitionId == (
unsigned int)(-1)) || (pageSeqInPartition == (
unsigned int)(-1))) {
344 PageIndex pageIndex = file->getMetaData()->getPageIndex(pageId);
350 pthread_mutex_lock(&this->evictionMutex);
351 this->evictionLock();
352 pthread_mutex_lock(&this->cacheMutex);
353 if (this->containsPage(key) !=
true) {
354 pthread_mutex_unlock(&this->cacheMutex);
355 this->evictionUnlock();
356 pthread_mutex_unlock(&this->evictionMutex);
357 page = this->loadPage(file, partitionId, pageSeqInPartition, sequential);
358 if (page ==
nullptr) {
361 pthread_mutex_lock(&this->countLock);
362 page->setAccessSequenceId(this->accessCount);
364 pthread_mutex_unlock(&this->countLock);
366 pthread_mutex_lock(&this->evictionMutex);
367 this->cachePage(page, set);
368 page->setPinned(
true);
369 page->setDirty(
false);
371 pthread_mutex_unlock(&this->evictionMutex);
373 page = this->cache->at(key);
374 pthread_mutex_unlock(&this->cacheMutex);
375 if (page ==
nullptr) {
376 std::cout <<
"WARNING: PartitionPageIterator get nullptr in cache.\n" << std::endl;
377 logger->warn(
"PartitionPageIterator get nullptr in cache.");
378 this->evictionUnlock();
379 pthread_mutex_unlock(&this->evictionMutex);
382 page->setPinned(
true);
384 this->evictionUnlock();
385 pthread_mutex_unlock(&this->evictionMutex);
386 pthread_mutex_lock(&this->countLock);
387 page->setAccessSequenceId(this->accessCount);
389 pthread_mutex_unlock(&this->countLock);
390 if (set !=
nullptr) {
408 pthread_mutex_lock(&this->cacheMutex);
409 if (this->containsPage(key) !=
true) {
410 pthread_mutex_unlock(&this->cacheMutex);
411 std::cout <<
"WARNING: SetCachePageIterator get nullptr in cache.\n" << std::endl;
412 logger->warn(
"SetCachePageIterator get nullptr in cache.");
416 pthread_mutex_unlock(&this->cacheMutex);
417 if (page ==
nullptr) {
418 std::cout <<
"WARNING: SetCachePageIterator get nullptr in cache.\n" << std::endl;
419 logger->warn(
"SetCachePageIterator get nullptr in cache.");
423 pthread_mutex_lock(&this->countLock);
424 page->setAccessSequenceId(this->accessCount);
426 pthread_mutex_unlock(&this->countLock);
427 if (set !=
nullptr) {
438 if (this->containsPage(key) ==
true) {
441 int internalOffset = 0;
443 pageData = tryAllocateBufferFromSharedMemory(pageSize, internalOffset);
444 if (pageData !=
nullptr) {
448 PDBPagePtr page = make_shared<PDBPage>(pageData,
455 shm->computeOffset(pageData),
458 pthread_mutex_lock(&this->countLock);
459 page->setAccessSequenceId(this->accessCount);
461 pthread_mutex_unlock(&this->countLock);
462 page->setPinned(
true);
463 page->setDirty(
true);
464 pthread_mutex_lock(&evictionMutex);
465 this->cachePage(page, set);
467 pthread_mutex_unlock(&evictionMutex);
475 pthread_mutex_lock(&evictionMutex);
476 if (this->containsPage(key) ==
true) {
477 pthread_mutex_unlock(&evictionMutex);
480 pthread_mutex_unlock(&evictionMutex);
481 int internalOffset = 0;
483 pageData = allocateBufferFromSharedMemoryBlocking(pageSize, internalOffset);
484 if (pageData !=
nullptr) {
485 PDB_COUT <<
"PageCache: getNewPage: Page created for typeId=" << key.
typeId
486 <<
",setId=" << key.
setId <<
",pageId=" << key.
pageId <<
"\n";
491 PDBPagePtr page = make_shared<PDBPage>(pageData,
498 shm->computeOffset(pageData),
501 pthread_mutex_lock(&this->countLock);
502 page->setAccessSequenceId(this->accessCount);
504 pthread_mutex_unlock(&this->countLock);
505 page->setPinned(
true);
506 page->setDirty(
true);
507 pthread_mutex_lock(&evictionMutex);
508 this->cachePage(page, set);
510 pthread_mutex_unlock(&evictionMutex);
517 if (this->containsPage(key) ==
false) {
527 return (this->cache->find(key) != this->cache->end());
534 if (this->inEviction ==
true) {
538 this->logger->writeLn(
"Storage server: start cache eviction for all dirty pages...");
539 pthread_mutex_lock(&this->evictionMutex);
540 this->inEviction =
true;
543 unordered_map<CacheKey, PDBPagePtr, CacheKeyHash, CacheKeyEqual>::iterator cacheIter;
544 vector<PDBPagePtr>* evictableDirtyPages =
new vector<PDBPagePtr>();
545 this->evictionLock();
546 for (cacheIter = this->cache->begin(); cacheIter != this->cache->end(); cacheIter++) {
547 page = cacheIter->second;
548 if (page ==
nullptr) {
549 this->inEviction =
false;
550 pthread_mutex_unlock(&this->evictionMutex);
552 }
else if ((page->isDirty() ==
true) && (page->isInFlush() ==
false)) {
553 while (page->getRefCount() > 0) {
556 evictableDirtyPages->push_back(page);
561 this->evictionUnlock();
563 for (i = 0; i < evictableDirtyPages->size(); i++) {
564 page = evictableDirtyPages->at(i);
565 if (evictPage(page) ==
true) {
570 this->inEviction =
false;
571 pthread_mutex_unlock(&this->evictionMutex);
572 delete evictableDirtyPages;
579 if (this->inEviction ==
true) {
583 this->logger->writeLn(
"Storage server: start cache eviction for all dirty pages...");
584 pthread_mutex_lock(&this->evictionMutex);
585 this->inEviction =
true;
588 unordered_map<CacheKey, PDBPagePtr, CacheKeyHash, CacheKeyEqual>::iterator cacheIter;
589 vector<PDBPagePtr>* evictableDirtyPages =
new vector<PDBPagePtr>();
590 this->evictionLock();
591 for (cacheIter = this->cache->begin(); cacheIter != this->cache->end(); cacheIter++) {
592 page = cacheIter->second;
593 if (page ==
nullptr) {
594 this->inEviction =
false;
595 pthread_mutex_unlock(&this->evictionMutex);
597 }
else if ((page->isDirty() ==
true) && (page->getRefCount() == 0) &&
598 (page->isInFlush() ==
false)) {
599 evictableDirtyPages->push_back(page);
604 this->evictionUnlock();
606 for (i = 0; i < evictableDirtyPages->size(); i++) {
607 page = evictableDirtyPages->at(i);
608 if (evictPage(page) ==
true) {
613 this->inEviction =
false;
614 pthread_mutex_unlock(&this->evictionMutex);
615 delete evictableDirtyPages;
622 if (this->containsPage(key) ==
true) {
624 if ((page->isDirty() ==
true) && (page->isInFlush() ==
false)) {
625 page->setInFlush(
true);
626 page->setInEviction(
false);
627 this->flushBuffer->addPageToTail(page);
643 if (this->containsPage(key) ==
true) {
645 #ifndef UNPIN_FOR_NON_ZERO_REF_COUNT
646 if (page->getRefCount() > 0) {
647 cout <<
"can't be unpinned due to non-zero reference count " << page->getRefCount()
648 <<
"with DatabaseID=" << page->getDbID() <<
", TypeID=" << page->getTypeID()
649 <<
", SetID=" << page->getSetID() <<
", PageID=" << page->getPageID() <<
"\n";
650 this->logger->writeLn(
651 "LRUPageCache: can not evict page because it has been pinned by at least one "
653 this->logger->writeInt(page->getPageID());
659 page->setPinned(
false);
660 if ((tryFlushOrNot ==
true) && (page->isDirty() ==
true) &&
661 (page->isInFlush() ==
false) &&
662 ((page->getDbID() != 0) || (page->getTypeID() != 1)) &&
663 ((page->getDbID() != 0) || (page->getTypeID() != 2))) {
664 #ifdef PROFILING_CACHE
665 std::cout <<
"going to unpin a dirty page...\n";
668 page->setInFlush(
true);
669 page->setInEviction(
true);
672 this->flushBuffer->addPageToTail(page);
674 }
else if (page->isInFlush() ==
false) {
675 #ifdef PROFILING_CACHE
676 std::cout <<
"going to unpin a clean page...\n";
686 this->shm->free(page->getRawBytes() - page->getInternalOffset(),
687 page->getRawSize() + 512);
690 page->setRawBytes(
nullptr);
694 #ifdef PROFILING_CACHE
695 std::cout <<
"Storage server: evicting page from cache for dbId:" << page->getDbID()
696 <<
", typeID:" << page->getTypeID() <<
", setID=" << page->getSetID()
697 <<
", pageID: " << page->getPageID() <<
", tryFlushing=" << tryFlushOrNot
703 PDB_COUT <<
"can not find page in cache!\n";
704 this->logger->writeLn(
"LRUPageCache: can not evict page because it is not in cache");
713 key.
dbId = page->getDbID();
714 key.
typeId = page->getTypeID();
715 key.
setId = page->getSetID();
716 key.
pageId = page->getPageID();
717 bool ret = evictPage(key);
719 if (set !=
nullptr) {
720 set->removeCachedPage(page);
728 while ((worker = this->workers->getWorker()) ==
nullptr) {
733 worker->execute(evictWork, buzzer);
737 if (this->inEviction ==
true) {
740 #ifdef PROFILING_CACHE
741 std::cout <<
"Storage server: starting cache eviction to get more room with used size= "
742 << this->size <<
"!\n";
744 pthread_mutex_lock(&this->evictionMutex);
745 this->inEviction =
true;
747 this->evictionLock();
748 vector<PDBPagePtr>* pagesToEvict =
nullptr;
751 list<LocalitySetPtr>* curList;
752 for (i = 0; i < 6; i++) {
753 curList = this->priorityList->at(i);
754 for (list<LocalitySetPtr>::reverse_iterator it = curList->rbegin();
755 it != curList->rend();
758 pagesToEvict = set->selectPagesForReplacement();
759 if (pagesToEvict !=
nullptr) {
760 this->evictionUnlock();
761 for (j = 0; j < pagesToEvict->size(); j++) {
762 if (this->evictPage(pagesToEvict->at(j), set)) {
766 this->evictionLock();
768 pagesToEvict =
nullptr;
771 if (numEvicted > 0) {
775 this->evictionUnlock();
778 this->evictionLock();
779 this->logger->debug(
"PageCache::evict(): got the lock for evictionLock()...");
782 unordered_map<CacheKey, PDBPagePtr, CacheKeyHash, CacheKeyEqual>::iterator cacheIter;
784 for (cacheIter = this->cache->begin(); cacheIter != this->cache->end(); cacheIter++) {
785 curPage = cacheIter->second;
786 if (curPage ==
nullptr) {
787 this->logger->error(
"PageCache::evict(): got a null page, return!");
789 this->inEviction =
false;
790 this->evictionUnlock();
791 pthread_mutex_unlock(&this->evictionMutex);
795 "PageCache::evict(): got a page, check whether it can be evicted...");
796 if ((curPage->getRefCount() == 0) &&
797 ((curPage->isDirty() ==
false) ||
798 ((curPage->isDirty() ==
true) && (curPage->isInFlush() ==
false)))) {
799 cachedPages->push(curPage);
800 #ifdef PROFILING_CACHE
801 std::cout <<
"Add to eviction queue: curPage->getRefCount()="
802 << curPage->getRefCount() <<
", curPage->isDirty()=" << curPage->isDirty()
803 <<
", curPage->isInFlush)=" << curPage->isInFlush()
804 <<
", curPage->dbId=" << curPage->getDbID()
805 <<
", curPage->setId=" << curPage->getSetID() << std::endl;
811 this->evictionUnlock();
813 while ((this->size > this->evictStopSize) && (cachedPages->size() > 0)) {
814 page = cachedPages->top();
815 if (page ==
nullptr) {
816 PDB_COUT <<
"PageCache: nothing to evict, return!\n";
817 this->logger->debug(
"PageCache: nothing to evict, return!\n");
820 if (this->evictPage(page) ==
true) {
822 std::cout <<
"Storage server: evicted page from cache passively for dbId:"
823 << page->getDbID() <<
", typeID:" << page->getTypeID()
824 <<
", setID=" << page->getSetID() <<
", pageID: " << page->getPageID()
828 std::string(
"Storage server: evicting page from cache for pageID:") +
829 std::to_string(page->getPageID()));
837 this->inEviction =
false;
838 pthread_mutex_unlock(&this->evictionMutex);
840 std::cout <<
"Storage server: finished cache eviction!\n";
842 logger->debug(
"Storage server: finished cache eviction!\n");
846 this->warnSize = (this->maxSize) * warnThreshold;
847 this->logger->writeLn(
"LRUPageCache: warnSize was set to:");
848 this->logger->writeInt(this->warnSize);
852 this->evictStopSize = (this->maxSize) * evictThreshold;
853 this->logger->writeLn(
"LRUPageCache: evictSize was set to:");
854 this->logger->writeInt(this->evictStopSize);
858 this->priorityList->at(level)->push_back(set);
862 this->priorityList->at(level)->remove(set);
868 set->pin(policy, operationType);
869 if (set->getPersistenceType() ==
Transient) {
884 if (set->getPersistenceType() ==
Transient) {
895 if (set->getPersistenceType() ==
Transient) {
PDBPagePtr getNewPage(NodeID nodeId, CacheKey key, LocalitySet *set=nullptr, size_t pageSize=DEFAULT_PAGE_SIZE)
void pin(LocalitySetPtr set, LocalitySetReplacementPolicy policy, OperationType operationType)
shared_ptr< PDBPage > PDBPagePtr
char * allocateBufferFromSharedMemoryBlocking(size_t size, int &alignOffset)
unsigned int pageSeqInPartition
bool freePage(PDBPagePtr page)
PDBPagePtr getPage(SequenceFilePtr file, PageID pageId)
void getAndSetWarnSize(unsigned int numSets, double warnThreshold)
bool removePage(CacheKey key)
PDBPagePtr buildAndCachePageFromFileHandle(int handle, size_t size, NodeID nodeId, DatabaseID dbId, UserTypeID typeId, SetID setId, PageID pageId)
void addCachedPage(PDBPagePtr page)
void removeLocalitySetFromPriorityList(LocalitySetPtr set, PriorityLevel level)
LocalitySetReplacementPolicy
void updateCachedPage(PDBPagePtr page)
shared_ptr< PartitionedFile > PartitionedFilePtr
shared_ptr< SharedMem > SharedMemPtr
#define EVICT_STOP_THRESHOLD
void addLocalitySetToPriorityList(LocalitySetPtr set, PriorityLevel level)
FilePartitionID partitionId
void cachePage(PDBPagePtr page, LocalitySet *set=nullptr)
shared_ptr< PDBEvictWork > PDBEvictWorkPtr
bool flushPageWithoutEviction(CacheKey key)
PDBPagePtr buildPageFromSharedMemoryData(PDBFilePtr file, char *pageData, FilePartitionID partitionId, unsigned int pageSeqInPartition, int internalOffset, size_t pageSize=DEFAULT_PAGE_SIZE)
void unpin(LocalitySetPtr set)
PartitionedFileMetaDataPtr getMetaData()
bool containsPage(CacheKey key)
shared_ptr< PDBBuzzer > PDBBuzzerPtr
shared_ptr< PDBWorkerQueue > PDBWorkerQueuePtr
bool decPageRefCount(CacheKey key)
shared_ptr< PDBFileInterface > PDBFilePtr
int unpinAndEvictAllDirtyPages()
shared_ptr< Configuration > ConfigurationPtr
shared_ptr< PDBWorker > PDBWorkerPtr
PageCache(ConfigurationPtr conf, pdb::PDBWorkerQueuePtr workers, PageCircularBufferPtr flushBuffer, pdb::PDBLoggerPtr logger, SharedMemPtr shm, CacheStrategy strategy=UnifiedMRU)
shared_ptr< LocalitySet > LocalitySetPtr
PDBPagePtr getNewPageNonBlocking(NodeID nodeId, CacheKey key, LocalitySet *set=nullptr, size_t pageSize=DEFAULT_PAGE_SIZE)
std::shared_ptr< PDBLogger > PDBLoggerPtr
bool evictPage(CacheKey key, bool tryFlushOrNot=true)
PDBPagePtr loadPage(SequenceFilePtr file, PageID pageId)
shared_ptr< PageCircularBuffer > PageCircularBufferPtr
void getAndSetEvictStopSize(unsigned int numSets, double evictThreshold)
char * tryAllocateBufferFromSharedMemory(size_t size, int &alignOffset)
unsigned int FilePartitionID
shared_ptr< SequenceFile > SequenceFilePtr