A platform for high-performance distributed tool and library development written in C++. It can be deployed in two different cluster modes: standalone or distributed. API for v0.5.0, released on June 13, 2018.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
PageScanner.cc
Go to the documentation of this file.
1 /*****************************************************************************
2  * *
3  * Copyright 2018 Rice University *
4  * *
5  * Licensed under the Apache License, Version 2.0 (the "License"); *
6  * you may not use this file except in compliance with the License. *
7  * You may obtain a copy of the License at *
8  * *
9  * http://www.apache.org/licenses/LICENSE-2.0 *
10  * *
11  * Unless required by applicable law or agreed to in writing, software *
12  * distributed under the License is distributed on an "AS IS" BASIS, *
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. *
14  * See the License for the specific language governing permissions and *
15  * limitations under the License. *
16  * *
17  *****************************************************************************/
18 
19 
20 #ifndef PAGESCANNER_CC
21 #define PAGESCANNER_CC
22 
23 #include "PageScanner.h"
24 #include "PDBCommunicator.h"
26 #include "PDBPage.h"
27 #include "SharedMem.h"
28 #include "StorageGetSetPages.h"
29 #include "StoragePagePinned.h"
30 #include "SimpleRequestResult.h"
32 #include "Handle.h"
33 #include "InterfaceFunctions.h"
34 #include <vector>
35 #include <iostream>
36 using namespace std;
37 
39  SharedMemPtr shm,
40  pdb::PDBLoggerPtr logger,
41  int numThreads,
42  int recvBufSize,
43  NodeID nodeId) {
44  this->communicator = communicator;
45  this->shm = shm;
46  this->logger = logger;
47  this->numThreads = numThreads;
48  this->buffer = make_shared<PageCircularBuffer>(recvBufSize, logger);
49  this->nodeId = nodeId;
50 }
51 
53 
58  string& errMsg,
59  bool& morePagesToLoad,
60  NodeID& dataNodeId,
61  DatabaseID& dataDbId,
62  UserTypeID& dataTypeId,
63  SetID& dataSetId,
64  PageID& dataPageId,
65  size_t& pageSize,
66  size_t& offset) {
67 
68  if (myCommunicator == nullptr) {
69  return false;
70  }
71 
72  // receive the PinPage object
73  size_t receivedSize = myCommunicator->getSizeOfNextObject();
74  if (receivedSize == 0) {
75  std::cout << "ERROR in PageScanner: received size is 0" << std::endl;
76  return false;
77  }
78  const pdb::UseTemporaryAllocationBlock myBlock{receivedSize};
79  bool success;
81  myCommunicator->getNextObject<pdb::StoragePagePinned>(success, errMsg);
82  if (success == true) {
83  morePagesToLoad = msg->getMorePagesToLoad();
84  dataNodeId = msg->getNodeID();
85  dataDbId = msg->getDatabaseID();
86  dataTypeId = msg->getUserTypeID();
87  dataSetId = msg->getSetID();
88  dataPageId = msg->getPageID();
89  pageSize = msg->getPageSize();
90  offset = msg->getSharedMemOffset();
91  }
92  return success;
93 }
94 
99  bool wasError,
100  string info,
101  string& errMsg) {
102  const pdb::UseTemporaryAllocationBlock myBlock{1024};
104  pdb::makeObject<pdb::SimpleRequestResult>(!wasError, errMsg);
105  if (!myCommunicator->sendObject<pdb::SimpleRequestResult>(ack, errMsg)) {
106  cout << "Sending object failure: " << errMsg << "\n";
107  return false;
108  }
109  return true;
110 }
111 
112 
113 vector<PageCircularBufferIteratorPtr> PageScanner::getSetIterators(NodeID nodeId,
114  DatabaseID dbId,
115  UserTypeID typeId,
116  SetID setId) {
117  // create an GetSetPages object
118  string errMsg;
119  const pdb::UseTemporaryAllocationBlock myBlock{1024};
120  pdb::Handle<pdb::StorageGetSetPages> getSetPagesRequest =
121  pdb::makeObject<pdb::StorageGetSetPages>();
122  getSetPagesRequest->setDatabaseID(dbId);
123  getSetPagesRequest->setUserTypeID(typeId);
124  getSetPagesRequest->setSetID(setId);
125 
126  vector<PageCircularBufferIteratorPtr> vec;
127  // send request to storage
128  if (!this->communicator->sendObject<pdb::StorageGetSetPages>(getSetPagesRequest, errMsg)) {
129  errMsg = "Could not send data to server.";
130  logger->error(std::string("PageScanner: ") + errMsg);
131  return vec;
132  }
133 
134  // initialize iterators;
135  unsigned int i;
137  for (i = 0; i < this->numThreads; i++) {
138  iter = make_shared<PageCircularBufferIterator>(i, this->buffer, this->logger);
139  vec.push_back(iter);
140  }
141  return vec;
142 }
143 
145  pdb::PDBCommunicatorPtr myCommunicator) {
146  PDB_COUT << "PageScanner: recvPagesLoop processing.\n";
147 
148  string errMsg;
149  bool morePagesToLoad = pinnedPage->getMorePagesToLoad();
150  NodeID dataNodeId = pinnedPage->getNodeID();
151  DatabaseID dataDbId = pinnedPage->getDatabaseID();
152  UserTypeID dataTypeId = pinnedPage->getUserTypeID();
153  SetID dataSetId = pinnedPage->getSetID();
154  PageID dataPageId = pinnedPage->getPageID();
155  size_t pageSize = pinnedPage->getPageSize();
156  size_t offset = pinnedPage->getSharedMemOffset();
157  PDBPagePtr page;
158  bool ret;
159 
160  // Due to the new handling mechanism, we need to process the first message then accept the next
161  // message;
162  do {
163  PDB_COUT << "dataPageId:" << dataPageId << "\n";
164  PDB_COUT << "morePagesToLoad:" << morePagesToLoad << "\n";
165  logger->debug(string("got a page with pageId=") + to_string(dataPageId));
166  // if there are no more pages to send at the frontend side, send ACK and return.
167  if (morePagesToLoad == false) {
168  PDB_COUT << "BackEndServer: sending Ack to frontEnd to end loop...\n";
169  logger->debug(string("BackEndServer: sending Ack to frontEnd to end loop...\n"));
170  this->sendPagePinnedAck(myCommunicator, false, "", errMsg);
171  PDB_COUT << "BackEndServer: sent Ack to frontend to end loop...\n";
172  logger->debug(string("BackEndServer: sent Ack to frontend to end loop...\n"));
173  return true;
174  }
175  // if there are more pages to send at the frontend side,
176  // we wrap the page object, add it to buffer, and send back ack.
177  else {
178  char* rawData = (char*)this->shm->getPointer(offset);
179  page = make_shared<PDBPage>(rawData, offset, 0);
180  logger->debug(string("BackEndServer: add page scanner page to circular buffer...\n"));
181  if (this->buffer != nullptr) {
182  this->buffer->addPageToTail(page);
183  } else {
184  std::cout << "Fatal Error: this is bad, the circular buffer is null!" << std::endl;
185  logger->error("Fatal Error: this is bad, the circular buffer is null!");
186  return true;
187  }
188  PDB_COUT << "BackEndServer: sending PagePinnedAck to frontEnd...\n";
189  logger->debug("BackEndServer: sending PagePinnedAck to frontEnd...\n");
190  this->sendPagePinnedAck(myCommunicator, false, "", errMsg);
191  PDB_COUT << "BackEndServer: sent PagePinnedAck to frontEnd...\n";
192  logger->debug("BackEndServer: sent PagePinnedAck to frontEnd...\n");
193  }
194  } while ((ret = this->acceptPagePinned(myCommunicator,
195  errMsg,
196  morePagesToLoad,
197  dataNodeId,
198  dataDbId,
199  dataTypeId,
200  dataSetId,
201  dataPageId,
202  pageSize,
203  offset)) == true);
204  PDB_COUT << "PageScanner Work is done" << endl;
205  logger->debug("PageScanner Work is done");
206  return false;
207 }
208 
213  this->buffer->close();
214 }
219  this->buffer->open();
220 }
221 #endif
unsigned int SetID
Definition: DataTypes.h:31
shared_ptr< PDBPage > PDBPagePtr
Definition: PDBPage.h:32
PageScanner(pdb::PDBCommunicatorPtr communicator, SharedMemPtr shm, pdb::PDBLoggerPtr logger, int numThreads, int recvBufSize, NodeID nodeId)
Definition: PageScanner.cc:38
vector< PageCircularBufferIteratorPtr > getSetIterators(NodeID nodeId, DatabaseID dbId, UserTypeID typeId, SetID setId)
Definition: PageScanner.cc:113
void closeBuffer()
Definition: PageScanner.cc:212
unsigned int NodeID
Definition: DataTypes.h:27
bool sendPagePinnedAck(pdb::PDBCommunicatorPtr myCommunicator, bool wasError, string info, string &errMsg)
Definition: PageScanner.cc:98
shared_ptr< SharedMem > SharedMemPtr
Definition: SharedMem.h:32
unsigned int DatabaseID
Definition: DataTypes.h:29
std::shared_ptr< PDBCommunicator > PDBCommunicatorPtr
unsigned int PageID
Definition: DataTypes.h:26
void openBuffer()
Definition: PageScanner.cc:218
#define PDB_COUT
Definition: PDBDebug.h:31
bool recvPagesLoop(pdb::Handle< pdb::StoragePagePinned > pinnedPage, pdb::PDBCommunicatorPtr myCommunicator)
Definition: PageScanner.cc:144
std::shared_ptr< PDBLogger > PDBLoggerPtr
Definition: PDBLogger.h:40
shared_ptr< PageCircularBufferIterator > PageCircularBufferIteratorPtr
bool acceptPagePinned(pdb::PDBCommunicatorPtr myCommunicator, string &errMsg, bool &morePagesToLoad, NodeID &dataNodeId, DatabaseID &dataDbId, UserTypeID &dataTypeId, SetID &dataSetId, PageID &dataPageId, size_t &pageSize, size_t &offset)
Definition: PageScanner.cc:57
unsigned int UserTypeID
Definition: DataTypes.h:25