A platform for high-performance distributed tool and library development written in C++. It can be deployed in two different cluster modes: standalone or distributed. API for v0.5.0, released on June 13, 2018.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
PDBScanWork.cc
Go to the documentation of this file.
1 /*****************************************************************************
2  * *
3  * Copyright 2018 Rice University *
4  * *
5  * Licensed under the Apache License, Version 2.0 (the "License"); *
6  * you may not use this file except in compliance with the License. *
7  * You may obtain a copy of the License at *
8  * *
9  * http://www.apache.org/licenses/LICENSE-2.0 *
10  * *
11  * Unless required by applicable law or agreed to in writing, software *
12  * distributed under the License is distributed on an "AS IS" BASIS, *
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. *
14  * See the License for the specific language governing permissions and *
15  * limitations under the License. *
16  * *
17  *****************************************************************************/
18 
19 #ifndef PDBSCANWORK_CC
20 #define PDBSCANWORK_CC
21 
22 #include "PDBDebug.h"
23 #include "PDBScanWork.h"
24 #include "PDBPage.h"
25 #include "PDBCommunicator.h"
26 #include "StoragePagePinned.h"
27 #include "SimpleRequestResult.h"
29 #include "string.h"
30 #include <thread>
31 #include <sstream>
32 using namespace std;
33 
34 #ifndef HEADER_SIZE
35 #define HEADER_SIZE 20
36 #endif
37 
38 #ifndef MAX_RETRIES
39 #define MAX_RETRIES 5
40 #endif
41 
43  : counter(counter) {
44  this->iter = iter;
45  this->storage = storage;
46  pthread_mutex_init(&connection_mutex, nullptr);
47 }
48 
50  pthread_mutex_destroy(&connection_mutex);
51 }
52 
54  bool morePagesToPin,
55  NodeID nodeId,
56  DatabaseID dbId,
57  UserTypeID typeId,
58  SetID setId,
59  PageID pageId,
60  size_t pageSize,
61  size_t offset) {
62 
63  const pdb::UseTemporaryAllocationBlock myBlock{2048};
64  pdb::Handle<pdb::StoragePagePinned> pagePinnedMsg = pdb::makeObject<pdb::StoragePagePinned>();
65  pagePinnedMsg->setMorePagesToLoad(morePagesToPin);
66  pagePinnedMsg->setNodeID(nodeId);
67  pagePinnedMsg->setDatabaseID(dbId);
68  pagePinnedMsg->setUserTypeID(typeId);
69  pagePinnedMsg->setSetID(setId);
70  pagePinnedMsg->setPageID(pageId);
71  pagePinnedMsg->setPageSize(pageSize);
72  pagePinnedMsg->setSharedMemOffset(offset);
73 
74  string errMsg;
75  if (!myCommunicator->sendObject<pdb::StoragePagePinned>(pagePinnedMsg, errMsg)) {
76  errMsg = "could not scan data: " + errMsg;
77  return false;
78  }
79  return true;
80 }
81 
83  bool& wasError,
84  string& info,
85  string& errMsg) {
86 
87  size_t sizeOfNextObject = myCommunicator->getSizeOfNextObject();
88  if (sizeOfNextObject < HEADER_SIZE) {
89  wasError = true;
90  return false;
91  }
92  PDB_COUT << "PDBScanWork: to create memory block for SimpleRequestResult" << std::endl;
93  const pdb::UseTemporaryAllocationBlock myBlock{sizeOfNextObject};
94  PDB_COUT << "PDBScanWork: memory block allocated" << std::endl;
95  bool success;
97  myCommunicator->getNextObject<pdb::SimpleRequestResult>(success, errMsg);
98 
99  if (!success) {
100  wasError = true;
101  return false;
102  }
103 
104  wasError = false;
105  return true;
106 }
107 
108 
109 // do the actual work
110 void PDBScanWork::execute(PDBBuzzerPtr callerBuzzer) {
111  pdb::PDBLoggerPtr logger = make_shared<pdb::PDBLogger>("pdbScanWorks.log");
112  logger->debug("PDBScanWork: running...");
113  PDBPagePtr page;
114  string errMsg, info;
115  bool wasError;
116 
117  logger->debug("PDBScanWork: connect to backend...");
118  pthread_mutex_lock(&connection_mutex);
119  pdb::PDBCommunicatorPtr communicatorToBackEnd = make_shared<pdb::PDBCommunicator>();
120  int retry = 0;
121  while (communicatorToBackEnd->connectToLocalServer(
122  logger, storage->getPathToBackEndServer(), errMsg) &&
123  (retry < MAX_RETRIES)) {
124  retry++;
125  if (retry >= MAX_RETRIES) {
126  errMsg = "PDBScanWowrk: can not connect to local server.";
127  cout << errMsg << "\n";
128  callerBuzzer->buzz(PDBAlarm::GenericError);
129  pthread_mutex_unlock(&connection_mutex);
130  return;
131  }
132  sleep(0);
133  }
134  pthread_mutex_unlock(&connection_mutex);
135  if (retry > 0) {
136  }
137 
138  logger->debug("PDBScanWork: pin pages...");
139  // for each loaded page retrieved from iterator, notify backend server!
140  while (this->iter->hasNext()) {
141  page = this->iter->next();
142  if (page != nullptr) {
143  // send PagePinned object to backend
144  PDB_COUT << "PDBScanWork: pin page with pageId =" << page->getPageID() << "\n";
145  retry = 0;
146 
147  while (retry < MAX_RETRIES) {
148  logger->debug(string("PDBScanWork: pin pages with pageId = ") +
149  to_string(page->getPageID()));
150  bool ret = this->sendPagePinned(communicatorToBackEnd,
151  true,
152  page->getNodeID(),
153  page->getDbID(),
154  page->getTypeID(),
155  page->getSetID(),
156  page->getPageID(),
157  page->getSize(),
158  page->getOffset());
159  if (ret == false) {
160  communicatorToBackEnd->reconnect(errMsg);
161  retry++;
162  continue;
163  }
164  // receive ack object from backend
165  PDB_COUT << "PDBScanWork: waiting for ack..." << std::endl;
166  logger->debug("PDBScanWork: waiting for ack... ");
167  ret = this->acceptPagePinnedAck(communicatorToBackEnd, wasError, info, errMsg);
168  if (ret == false) {
169  communicatorToBackEnd->reconnect(errMsg);
170  retry++;
171  continue;
172  }
173  logger->debug("PDBScanWork: ack received ");
174  PDB_COUT << "PDBScanWork: got ack!" << std::endl;
175  break;
176  }
177  }
178  }
179  // close the connection
180  PDB_COUT << "PDBScanWork to close the loop" << std::endl;
181  logger->debug("PDBScanWork to close the loop");
182  retry = 0;
183  while (retry < MAX_RETRIES) {
184  bool ret = this->sendPagePinned(communicatorToBackEnd, false, 0, 0, 0, 0, 0, 0, 0);
185  if (ret == false) {
186  communicatorToBackEnd->reconnect(errMsg);
187  retry++;
188  continue;
189  }
190  ret = this->acceptPagePinnedAck(communicatorToBackEnd, wasError, info, errMsg);
191  if (ret == false) {
192  communicatorToBackEnd->reconnect(errMsg);
193  retry++;
194  continue;
195  }
196  // notify the caller that this scan thread has finished work.
197  PDB_COUT << "PDBScanWork finished.\n";
198  logger->debug("PDBScanWork finished.\n");
199  break;
200  }
201  callerBuzzer->buzz(PDBAlarm::WorkAllDone, this->counter);
202 }
203 
204 #endif
pdb::PangeaStorageServer * storage
Definition: PDBScanWork.h:60
unsigned int SetID
Definition: DataTypes.h:31
shared_ptr< PDBPage > PDBPagePtr
Definition: PDBPage.h:32
pthread_mutex_t connection_mutex
Definition: PDBScanWork.h:62
shared_ptr< PageIteratorInterface > PageIteratorPtr
Definition: PageIterator.h:33
unsigned int NodeID
Definition: DataTypes.h:27
#define HEADER_SIZE
Definition: PDBScanWork.cc:35
#define MAX_RETRIES
Definition: PDBScanWork.cc:39
PageIteratorPtr iter
Definition: PDBScanWork.h:59
bool sendPagePinned(pdb::PDBCommunicatorPtr myCommunicator, bool morePagesToPin, NodeID nodeId, DatabaseID dbId, UserTypeID typeId, SetID setId, PageID pageId, size_t pageSize, size_t offset)
Definition: PDBScanWork.cc:53
unsigned int DatabaseID
Definition: DataTypes.h:29
std::shared_ptr< PDBCommunicator > PDBCommunicatorPtr
unsigned int PageID
Definition: DataTypes.h:26
int & counter
Definition: PDBScanWork.h:61
shared_ptr< PDBBuzzer > PDBBuzzerPtr
Definition: PDBBuzzer.h:32
PDBScanWork(PageIteratorPtr iter, pdb::PangeaStorageServer *storage, int &counter)
Definition: PDBScanWork.cc:42
#define PDB_COUT
Definition: PDBDebug.h:31
void execute(PDBBuzzerPtr callerBuzzer) override
Definition: PDBScanWork.cc:110
std::shared_ptr< PDBLogger > PDBLoggerPtr
Definition: PDBLogger.h:40
bool acceptPagePinnedAck(pdb::PDBCommunicatorPtr myCommunicator, bool &wasError, string &info, string &errMsg)
Definition: PDBScanWork.cc:82
unsigned int UserTypeID
Definition: DataTypes.h:25