A platform for high-performance distributed tool and library development written in C++. It can be deployed in two different cluster modes: standalone or distributed. API for v0.5.0, released on June 13, 2018.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
PDBFlushConsumerWork.cc
Go to the documentation of this file.
1 /*****************************************************************************
2  * *
3  * Copyright 2018 Rice University *
4  * *
5  * Licensed under the Apache License, Version 2.0 (the "License"); *
6  * you may not use this file except in compliance with the License. *
7  * You may obtain a copy of the License at *
8  * *
9  * http://www.apache.org/licenses/LICENSE-2.0 *
10  * *
11  * Unless required by applicable law or agreed to in writing, software *
12  * distributed under the License is distributed on an "AS IS" BASIS, *
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. *
14  * See the License for the specific language governing permissions and *
15  * limitations under the License. *
16  * *
17  *****************************************************************************/
18 #include "PDBDebug.h"
19 #include "PDBFlushConsumerWork.h"
20 #include "PageCircularBuffer.h"
21 #include <chrono>
22 #include <ctime>
23 #include <fcntl.h>
24 #include <sys/types.h>
25 #include <sys/stat.h>
26 #include <unistd.h>
28  pdb::PangeaStorageServer* server) {
29  this->partitionId = partitionId;
30  this->server = server;
31  this->isStopped = false;
32 }
33 
35  this->isStopped = true;
36 }
37 
38 
40  PageCircularBufferPtr flushBuffer = this->server->getFlushBuffer();
41  PDBPagePtr page;
42  SetPtr set = nullptr;
43  while (!isStopped) {
44  if ((page = flushBuffer->popPageFromHead()) != nullptr) {
45  // got a page from flush buffer
46  // find the set of the page
47  PDB_COUT << "Got a page with PageID " << page->getPageID()
48  << " for partition:" << this->partitionId << "\n";
49  PDB_COUT << "page dbId=" << page->getDbID() << "\n";
50  PDB_COUT << "page typeId=" << page->getTypeID() << "\n";
51  PDB_COUT << "page setId=" << page->getSetID() << "\n";
52  bool isTempSet = false;
53  if ((page->getDbID() == 0) && (page->getTypeID() == 0)) {
54  set = this->server->getTempSet(page->getSetID());
55  isTempSet = true;
56  } else {
57  set = this->server->getSet(page->getDbID(), page->getTypeID(), page->getSetID());
58  isTempSet = false;
59  }
60  CacheKey key;
61  key.dbId = page->getDbID();
62  key.typeId = page->getTypeID();
63  key.setId = page->getSetID();
64  key.pageId = page->getPageID();
65  this->server->getCache()->flushLock();
66  if ((set != nullptr) && (page->getRawBytes() != nullptr)) {
67 
68  // append the page to the partition
69  int ret = set->getFile()->appendPage(this->partitionId, page);
70  if (ret < 0) {
71  PDB_COUT << "Can't write page with below info:\n";
72  PDB_COUT << "Got a page with PageID " << page->getPageID()
73  << " for partition:" << this->partitionId << "\n";
74  PDB_COUT << "page dbId=" << page->getDbID() << "\n";
75  PDB_COUT << "page typeId=" << page->getTypeID() << "\n";
76  PDB_COUT << "page setId=" << page->getSetID() << "\n";
77  }
78  set->lockDirtyPageSet();
79  if (isTempSet == false) {
80  PDB_COUT << "to write meta" << std::endl;
81  set->getFile()->writeMeta();
82  }
83  set->removePageFromDirtyPageSet(page->getPageID(), this->partitionId, ret);
84  set->unlockDirtyPageSet();
85  PDB_COUT << "page with PageID " << page->getPageID()
86  << " appended to partition with PartitionID " << this->partitionId << "\n";
87  }
88 #ifndef UNPIN_FOR_NON_ZERO_REF_COUNT
89  if ((page->getRawBytes() != nullptr) && (page->getRefCount() == 0) &&
90  (page->isInEviction() == true)) {
91 #else
92  if ((page->getRawBytes() != nullptr) && (page->isInEviction() == true)) {
93 #endif
94 
95  // remove the page from cache!
96  PDB_COUT << "to free the page!\n";
97  this->server->getSharedMem()->free(page->getRawBytes() - page->getInternalOffset(),
98  page->getSize() + 512);
99  PDB_COUT << "internalOffset=" << page->getInternalOffset() << "\n";
100  page->setOffset(0);
101  page->setRawBytes(nullptr);
102  }
103 // remove the page from cache!
104 #ifndef UNPIN_FOR_NON_ZERO_REF_COUNT
105  if ((page->getRefCount() == 0) && (page->isInEviction() == true)) {
106 #else
107  if (page->isInEviction() == true) {
108 #endif
109  this->server->getCache()->removePage(key);
110  } else {
111  page->setInFlush(false);
112  page->setDirty(false);
113  }
114  PDB_COUT << "PDBFlushConsumerWork: page freed from cache" << std::endl;
115  this->server->getCache()->flushUnlock();
116  this->server->getLogger()->writeLn(
117  "PDBFlushConsumerWork: unlocked for flushUnlock()...");
118  }
119  }
120  PDB_COUT << "flushing thread stopped running for partition: " << partitionId << "\n";
121 }
SetID setId
Definition: DataTypes.h:87
shared_ptr< PDBPage > PDBPagePtr
Definition: PDBPage.h:32
FilePartitionID partitionId
PDBFlushConsumerWork(FilePartitionID partitionId, pdb::PangeaStorageServer *server)
DatabaseID dbId
Definition: DataTypes.h:85
TempSetPtr getTempSet(SetID setId)
pdb::PangeaStorageServer * server
void execute(PDBBuzzerPtr callerBuzzer) override
PageID pageId
Definition: DataTypes.h:88
shared_ptr< PDBBuzzer > PDBBuzzerPtr
Definition: PDBBuzzer.h:32
#define PDB_COUT
Definition: PDBDebug.h:31
SetPtr getSet(std::pair< std::string, std::string > databaseAndSet)
PageCircularBufferPtr getFlushBuffer()
shared_ptr< UserSet > SetPtr
Definition: UserSet.h:36
shared_ptr< PageCircularBuffer > PageCircularBufferPtr
UserTypeID typeId
Definition: DataTypes.h:86
unsigned int FilePartitionID
Definition: DataTypes.h:32