20 #ifndef PAGESCANNER_CC
21 #define PAGESCANNER_CC
44 this->communicator = communicator;
46 this->logger = logger;
47 this->numThreads = numThreads;
48 this->buffer = make_shared<PageCircularBuffer>(recvBufSize, logger);
49 this->nodeId = nodeId;
59 bool& morePagesToLoad,
68 if (myCommunicator ==
nullptr) {
73 size_t receivedSize = myCommunicator->getSizeOfNextObject();
74 if (receivedSize == 0) {
75 std::cout <<
"ERROR in PageScanner: received size is 0" << std::endl;
82 if (success ==
true) {
83 morePagesToLoad = msg->getMorePagesToLoad();
84 dataNodeId = msg->getNodeID();
85 dataDbId = msg->getDatabaseID();
86 dataTypeId = msg->getUserTypeID();
87 dataSetId = msg->getSetID();
88 dataPageId = msg->getPageID();
89 pageSize = msg->getPageSize();
90 offset = msg->getSharedMemOffset();
104 pdb::makeObject<pdb::SimpleRequestResult>(!wasError, errMsg);
106 cout <<
"Sending object failure: " << errMsg <<
"\n";
121 pdb::makeObject<pdb::StorageGetSetPages>();
122 getSetPagesRequest->setDatabaseID(dbId);
123 getSetPagesRequest->setUserTypeID(typeId);
124 getSetPagesRequest->setSetID(setId);
126 vector<PageCircularBufferIteratorPtr> vec;
129 errMsg =
"Could not send data to server.";
130 logger->error(std::string(
"PageScanner: ") + errMsg);
137 for (i = 0; i < this->numThreads; i++) {
138 iter = make_shared<PageCircularBufferIterator>(i, this->buffer, this->logger);
146 PDB_COUT <<
"PageScanner: recvPagesLoop processing.\n";
149 bool morePagesToLoad = pinnedPage->getMorePagesToLoad();
150 NodeID dataNodeId = pinnedPage->getNodeID();
151 DatabaseID dataDbId = pinnedPage->getDatabaseID();
152 UserTypeID dataTypeId = pinnedPage->getUserTypeID();
153 SetID dataSetId = pinnedPage->getSetID();
154 PageID dataPageId = pinnedPage->getPageID();
155 size_t pageSize = pinnedPage->getPageSize();
156 size_t offset = pinnedPage->getSharedMemOffset();
163 PDB_COUT <<
"dataPageId:" << dataPageId <<
"\n";
164 PDB_COUT <<
"morePagesToLoad:" << morePagesToLoad <<
"\n";
165 logger->debug(
string(
"got a page with pageId=") + to_string(dataPageId));
167 if (morePagesToLoad ==
false) {
168 PDB_COUT <<
"BackEndServer: sending Ack to frontEnd to end loop...\n";
169 logger->debug(
string(
"BackEndServer: sending Ack to frontEnd to end loop...\n"));
170 this->sendPagePinnedAck(myCommunicator,
false,
"", errMsg);
171 PDB_COUT <<
"BackEndServer: sent Ack to frontend to end loop...\n";
172 logger->debug(
string(
"BackEndServer: sent Ack to frontend to end loop...\n"));
178 char* rawData = (
char*)this->shm->getPointer(offset);
179 page = make_shared<PDBPage>(rawData, offset, 0);
180 logger->debug(
string(
"BackEndServer: add page scanner page to circular buffer...\n"));
181 if (this->buffer !=
nullptr) {
182 this->buffer->addPageToTail(page);
184 std::cout <<
"Fatal Error: this is bad, the circular buffer is null!" << std::endl;
185 logger->error(
"Fatal Error: this is bad, the circular buffer is null!");
188 PDB_COUT <<
"BackEndServer: sending PagePinnedAck to frontEnd...\n";
189 logger->debug(
"BackEndServer: sending PagePinnedAck to frontEnd...\n");
190 this->sendPagePinnedAck(myCommunicator,
false,
"", errMsg);
191 PDB_COUT <<
"BackEndServer: sent PagePinnedAck to frontEnd...\n";
192 logger->debug(
"BackEndServer: sent PagePinnedAck to frontEnd...\n");
194 }
while ((ret = this->acceptPagePinned(myCommunicator,
204 PDB_COUT <<
"PageScanner Work is done" << endl;
205 logger->debug(
"PageScanner Work is done");
213 this->buffer->close();
219 this->buffer->open();
shared_ptr< PDBPage > PDBPagePtr
PageScanner(pdb::PDBCommunicatorPtr communicator, SharedMemPtr shm, pdb::PDBLoggerPtr logger, int numThreads, int recvBufSize, NodeID nodeId)
vector< PageCircularBufferIteratorPtr > getSetIterators(NodeID nodeId, DatabaseID dbId, UserTypeID typeId, SetID setId)
bool sendPagePinnedAck(pdb::PDBCommunicatorPtr myCommunicator, bool wasError, string info, string &errMsg)
shared_ptr< SharedMem > SharedMemPtr
std::shared_ptr< PDBCommunicator > PDBCommunicatorPtr
bool recvPagesLoop(pdb::Handle< pdb::StoragePagePinned > pinnedPage, pdb::PDBCommunicatorPtr myCommunicator)
std::shared_ptr< PDBLogger > PDBLoggerPtr
shared_ptr< PageCircularBufferIterator > PageCircularBufferIteratorPtr
bool acceptPagePinned(pdb::PDBCommunicatorPtr myCommunicator, string &errMsg, bool &morePagesToLoad, NodeID &dataNodeId, DatabaseID &dataDbId, UserTypeID &dataTypeId, SetID &dataSetId, PageID &dataPageId, size_t &pageSize, size_t &offset)