A platform for high-performance distributed tool and library development written in C++. It can be deployed in two different cluster modes: standalone or distributed. API for v0.5.0, released on June 13, 2018.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
DataProxy.cc
Go to the documentation of this file.
1 /*****************************************************************************
2  * *
3  * Copyright 2018 Rice University *
4  * *
5  * Licensed under the Apache License, Version 2.0 (the "License"); *
6  * you may not use this file except in compliance with the License. *
7  * You may obtain a copy of the License at *
8  * *
9  * http://www.apache.org/licenses/LICENSE-2.0 *
10  * *
11  * Unless required by applicable law or agreed to in writing, software *
12  * distributed under the License is distributed on an "AS IS" BASIS, *
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. *
14  * See the License for the specific language governing permissions and *
15  * limitations under the License. *
16  * *
17  *****************************************************************************/
18 
19 
20 #ifndef DATA_PROXY_H
21 #define DATA_PROXY_H
22 
23 #include "PDBDebug.h"
24 #include "DataProxy.h"
25 #include "InterfaceFunctions.h"
27 #include "Object.h"
28 #include "Handle.h"
29 #include "StorageAddTempSet.h"
31 #include "StoragePinPage.h"
32 #include "StoragePinBytes.h"
33 #include "StorageUnpinPage.h"
34 #include "SimpleRequestResult.h"
35 #include "StoragePagePinned.h"
36 #include "StorageBytesPinned.h"
37 #include "StorageRemoveTempSet.h"
38 #include "CloseConnection.h"
39 #include "Configuration.h"
40 
41 #ifndef MAX_RETRIES
42 #define MAX_RETRIES 5
43 #endif
44 
46  pdb::PDBCommunicatorPtr communicator,
47  SharedMemPtr shm,
48  pdb::PDBLoggerPtr logger) {
49  this->nodeId = nodeId;
50  this->communicator = communicator;
51  this->shm = shm;
52  this->logger = logger;
53  this->communicator->setLongConnection(true);
54 }
55 
57 
58 bool DataProxy::addTempSet(string setName, SetID& setId, bool needMem, int numTries) {
59  if (numTries == MAX_RETRIES) {
60  return false;
61  }
62  if (numTries > 0) {
63  logger->error(std::string("DataProxy: addTempSet with numTries=") +
64  std::to_string(numTries));
65  }
66  string errMsg;
67  if (this->communicator->isSocketClosed() == true) {
68  std::cout << "ERROR in DataProxy: connection is closed" << std::endl;
69  logger->error("DataProxy: connection is closed, to reconnect");
70  if (communicator->reconnect(errMsg)) {
71  std::cout << errMsg << std::endl;
72  logger->error(std::string("DataProxy: reconnect failed with errMsg") + errMsg);
73  return false;
74  }
75  }
76  if (needMem == true) {
77  // create an AddSet object
78  {
79  const pdb::UseTemporaryAllocationBlock myBlock{1024};
81  pdb::makeObject<pdb::StorageAddTempSet>(setName);
82  // we don't know the SetID to be added, the frontend will assign one.
83  // send the message out
84  if (!this->communicator->sendObject<pdb::StorageAddTempSet>(msg, errMsg)) {
85  // We reserve Database 0 and Type 0 as temp data
86  cout << "Sending object failure: " << errMsg << "\n";
87  return addTempSet(setName, setId, needMem, numTries + 1);
88  }
89  }
90 
91  // receive the StorageAddSetResult message
92  {
93  size_t objectSize = this->communicator->getSizeOfNextObject();
94  if (objectSize == 0) {
95  cout << "Receiving ack failure" << std::endl;
96  return addTempSet(setName, setId, needMem, numTries + 1);
97  }
98  const pdb::UseTemporaryAllocationBlock myBlock{objectSize};
99  bool success;
101  this->communicator->getNextObject<pdb::StorageAddTempSetResult>(success, errMsg);
102 
103  if (ack == nullptr) {
104  cout << "Receiving ack failure:" << errMsg << "\n";
105  return addTempSet(setName, setId, needMem, numTries + 1);
106  }
107  if (success == true) {
108  setId = ack->getTempSetID();
109  }
110  return success;
111  }
112  } else {
113 
114  {
116  pdb::makeObject<pdb::StorageAddTempSet>(setName);
117  // we don't know the SetID to be added, the frontend will assign one.
118  // send the message out
119  if (!this->communicator->sendObject<pdb::StorageAddTempSet>(msg, errMsg)) {
120  // We reserve Database 0 and Type 0 as temp data
121  cout << "Sending object failure: " << errMsg << "\n";
122  return addTempSet(setName, setId, needMem, numTries + 1);
123  }
124  }
125 
126  // receive the StorageAddSetResult message
127  {
128  bool success;
130  this->communicator->getNextObject<pdb::StorageAddTempSetResult>(success, errMsg);
131  if (ack == nullptr) {
132  cout << "Receiving ack failure:" << errMsg << "\n";
133  return addTempSet(setName, setId, needMem, numTries + 1);
134  }
135  if (success == true) {
136  setId = ack->getTempSetID();
137  }
138  return success;
139  }
140  }
141 }
142 
143 
144 bool DataProxy::removeTempSet(SetID setId, bool needMem, int numTries) {
145  if (numTries == MAX_RETRIES) {
146  return false;
147  }
148  if (numTries > 0) {
149  logger->error(std::string("DataProxy: removeTempSet with numTries=") +
150  std::to_string(numTries));
151  }
152  string errMsg;
153  if (this->communicator->isSocketClosed() == true) {
154  std::cout << "ERROR in DataProxy: connection is closed" << std::endl;
155  logger->error("DataProxy: connection is closed, to reconnect");
156  if (communicator->reconnect(errMsg)) {
157  std::cout << errMsg << std::endl;
158  logger->error(std::string("DataProxy: reconnect failed with errMsg") + errMsg);
159  return false;
160  }
161  }
162  // create a RemoveSet object
163 
164  if (needMem == true) {
165  {
166  const pdb::UseTemporaryAllocationBlock myBlock{1024};
168  pdb::makeObject<pdb::StorageRemoveTempSet>(setId);
169 
170  // send the message out
171  if (!this->communicator->sendObject<pdb::StorageRemoveTempSet>(msg, errMsg)) {
172  cout << "Sending object failure: " << errMsg << "\n";
173  return removeTempSet(setId, needMem, numTries + 1);
174  }
175  }
176 
177  // receive the SimpleRequestResult message
178  {
179  bool success;
180  size_t objectSize = this->communicator->getSizeOfNextObject();
181  if (objectSize == 0) {
182  std::cout << "Receiving ack failure" << std::endl;
183  return removeTempSet(setId, needMem, numTries + 1);
184  }
185  const pdb::UseTemporaryAllocationBlock myBlock{objectSize};
187  this->communicator->getNextObject<pdb::SimpleRequestResult>(success, errMsg);
188  if (ack == nullptr) {
189  cout << "Receiving ack failure:" << errMsg << "\n";
190  return removeTempSet(setId, needMem, numTries + 1);
191  }
192 
193  return success && (ack->getRes().first);
194  }
195  } else {
196  {
198  pdb::makeObject<pdb::StorageRemoveTempSet>(setId);
199 
200  // send the message out
201  if (!this->communicator->sendObject<pdb::StorageRemoveTempSet>(msg, errMsg)) {
202  cout << "Sending object failure: " << errMsg << "\n";
203  return removeTempSet(setId, needMem, numTries + 1);
204  }
205  }
206 
207  // receive the SimpleRequestResult message
208  {
209  bool success;
211  this->communicator->getNextObject<pdb::SimpleRequestResult>(success, errMsg);
212  if (ack == nullptr) {
213  cout << "Receiving ack failure:" << errMsg << "\n";
214  return removeTempSet(setId, needMem, numTries + 1);
215  }
216  return success && (ack->getRes().first);
217  }
218  }
219 }
220 
221 // page will be pinned at Storage Server
222 bool DataProxy::addTempPage(SetID setId, PDBPagePtr& page, bool needMem, int numTries) {
223  return addUserPage(0, 0, setId, page, needMem, numTries);
224 }
225 
227  DatabaseID dbId, UserTypeID typeId, SetID setId, PDBPagePtr& page, bool needMem, int numTries) {
228  if (numTries == MAX_RETRIES) {
229  return false;
230  }
231  if (numTries > 0) {
232  logger->error(std::string("DataProxy: addUserPage with numTries=") +
233  std::to_string(numTries));
234  }
235  string errMsg;
236  if (this->communicator->isSocketClosed() == true) {
237  std::cout << "ERROR in DataProxy: connection is closed" << std::endl;
238  logger->error("DataProxy: connection is closed, to reconnect");
239  if (communicator->reconnect(errMsg)) {
240  std::cout << errMsg << std::endl;
241  logger->error(std::string("DataProxy: reconnect failed with errMsg") + errMsg);
242  return false;
243  }
244  }
245  if (needMem == true) {
246  // create a PinPage object
247  {
248  const pdb::UseTemporaryAllocationBlock myBlock{2048};
249  pdb::Handle<pdb::StoragePinPage> msg = pdb::makeObject<pdb::StoragePinPage>();
250  // We reserve Database 0 and Type 0 as temp data
251  msg->setNodeID(this->nodeId);
252  msg->setDatabaseID(dbId);
253  msg->setUserTypeID(typeId);
254  msg->setSetID(setId);
255  msg->setWasNewPage(true);
256  // send the message out
257  if (!this->communicator->sendObject<pdb::StoragePinPage>(msg, errMsg)) {
258  cout << "DataProxy.AddUserPage(): Sending object failure: " << errMsg << "\n";
259  return addUserPage(dbId, typeId, setId, page, needMem, numTries + 1);
260  }
261  }
262 
263  // receive the PagePinned object
264  {
265  size_t objectSize = this->communicator->getSizeOfNextObject();
266  if (objectSize == 0) {
267  std::cout << "Receive ack failure" << std::endl;
268  return addUserPage(dbId, typeId, setId, page, needMem, numTries + 1);
269  }
270  PDB_COUT << "DataProxy: to allocate memory block for PagePinned object" << std::endl;
271  const pdb::UseTemporaryAllocationBlock myBlock{objectSize};
272  PDB_COUT << "DataProxy: memory block allocated" << std::endl;
273  bool success;
275  this->communicator->getNextObject<pdb::StoragePagePinned>(success, errMsg);
276  if (ack == nullptr) {
277  cout << "Receiving ack failure:" << errMsg << "\n";
278  return addUserPage(dbId, typeId, setId, page, needMem, numTries + 1);
279  }
280  char* dataIn = (char*)this->shm->getPointer(ack->getSharedMemOffset());
281  page = make_shared<PDBPage>(dataIn,
282  ack->getNodeID(),
283  ack->getDatabaseID(),
284  ack->getUserTypeID(),
285  ack->getSetID(),
286  ack->getPageID(),
287  ack->getPageSize(),
288  ack->getSharedMemOffset());
289  page->setPinned(true);
290  page->setDirty(true);
291  return success;
292  }
293  } else {
294 
295  // create a PinPage object
296  {
297  pdb::Handle<pdb::StoragePinPage> msg = pdb::makeObject<pdb::StoragePinPage>();
298  // We reserve Database 0 and Type 0 as temp data
299  msg->setNodeID(this->nodeId);
300  msg->setDatabaseID(dbId);
301  msg->setUserTypeID(typeId);
302  msg->setSetID(setId);
303  msg->setWasNewPage(true);
304  // send the message out
305  if (!this->communicator->sendObject<pdb::StoragePinPage>(msg, errMsg)) {
306  cout << "Sending object failure: " << errMsg << "\n";
307  return addUserPage(dbId, typeId, setId, page, needMem, numTries + 1);
308  }
309  }
310 
311  // receive the PagePinned object
312  {
313  bool success;
315  this->communicator->getNextObject<pdb::StoragePagePinned>(success, errMsg);
316  if (ack == nullptr) {
317  cout << "Receiving ack failure:" << errMsg << "\n";
318  return addUserPage(dbId, typeId, setId, page, needMem, numTries + 1);
319  }
320  char* dataIn = (char*)this->shm->getPointer(ack->getSharedMemOffset());
321  page = make_shared<PDBPage>(dataIn,
322  ack->getNodeID(),
323  ack->getDatabaseID(),
324  ack->getUserTypeID(),
325  ack->getSetID(),
326  ack->getPageID(),
327  ack->getPageSize(),
328  ack->getSharedMemOffset());
329  page->setPinned(true);
330  page->setDirty(true);
331  return success;
332  }
333  }
334 }
335 
337  UserTypeID typeId,
338  SetID setId,
339  size_t sizeOfBytes,
340  void* bytes,
341  bool needMem,
342  int numTries) {
343  if (numTries == MAX_RETRIES) {
344  return false;
345  }
346  if (numTries > 0) {
347  logger->error(std::string("DataProxy: pinBytes with numTries=") + std::to_string(numTries));
348  }
349  string errMsg;
350  if (this->communicator->isSocketClosed() == true) {
351  std::cout << "ERROR in DataProxy: connection is closed" << std::endl;
352  logger->error("DataProxy: connection is closed, to reconnect");
353  if (communicator->reconnect(errMsg)) {
354  std::cout << errMsg << std::endl;
355  logger->error(std::string("DataProxy: reconnect failed with errMsg") + errMsg);
356  return false;
357  }
358  }
359  if (needMem == true) {
360  // create a PinPage object
361  {
362  const pdb::UseTemporaryAllocationBlock myBlock{2048};
363  pdb::Handle<pdb::StoragePinBytes> msg = pdb::makeObject<pdb::StoragePinBytes>();
364  msg->setNodeID(this->nodeId);
365  msg->setDatabaseID(dbId);
366  msg->setUserTypeID(typeId);
367  msg->setSetID(setId);
368  msg->setSizeOfBytes(sizeOfBytes);
369  // send the message out
370  if (!this->communicator->sendObject<pdb::StoragePinBytes>(msg, errMsg)) {
371  cout << "DataProxy.AddUserPage(): Sending object failure: " << errMsg << "\n";
372  sleep(numTries + 1);
373  return pinBytes(dbId, typeId, setId, sizeOfBytes, bytes, needMem, numTries + 1);
374  }
375  }
376 
377  // receive the PagePinned object
378  {
379  size_t objectSize = this->communicator->getSizeOfNextObject();
380  if (objectSize == 0) {
381  std::cout << "Receive ack failure" << std::endl;
382  sleep(numTries + 1);
383  return pinBytes(dbId, typeId, setId, sizeOfBytes, bytes, needMem, numTries + 1);
384  }
385  PDB_COUT << "DataProxy: to allocate memory block for BytesPinned object" << std::endl;
386  const pdb::UseTemporaryAllocationBlock myBlock{objectSize};
387  PDB_COUT << "DataProxy: memory block allocated" << std::endl;
388  bool success;
390  this->communicator->getNextObject<pdb::StorageBytesPinned>(success, errMsg);
391  if (ack == nullptr) {
392  cout << "Receiving ack failure:" << errMsg << "\n";
393  sleep(numTries + 1);
394  return pinBytes(dbId, typeId, setId, sizeOfBytes, bytes, needMem, numTries + 1);
395  }
396  void* dest = this->shm->getPointer(ack->getSharedMemOffset());
397  memcpy(dest, bytes, sizeOfBytes);
398  return success;
399  }
400  } else {
401 
402  // create a PinBytes object
403  {
404  pdb::Handle<pdb::StoragePinBytes> msg = pdb::makeObject<pdb::StoragePinBytes>();
405  msg->setNodeID(this->nodeId);
406  msg->setDatabaseID(dbId);
407  msg->setUserTypeID(typeId);
408  msg->setSetID(setId);
409  msg->setSizeOfBytes(sizeOfBytes);
410  // send the message out
411  if (!this->communicator->sendObject<pdb::StoragePinBytes>(msg, errMsg)) {
412  cout << "Sending object failure: " << errMsg << "\n";
413  sleep(numTries + 1);
414  return pinBytes(dbId, typeId, setId, sizeOfBytes, bytes, needMem, numTries + 1);
415  }
416  }
417  // receive the PageBytes object
418  {
419  bool success;
421  this->communicator->getNextObject<pdb::StorageBytesPinned>(success, errMsg);
422  if (ack == nullptr) {
423  cout << "Receiving ack failure:" << errMsg << "\n";
424  sleep(numTries + 1);
425  return pinBytes(dbId, typeId, setId, sizeOfBytes, bytes, needMem, numTries + 1);
426  }
427  void* dest = this->shm->getPointer(ack->getSharedMemOffset());
428  memcpy(dest, bytes, sizeOfBytes);
429  return success;
430  }
431  }
432 }
433 
435  SetID setId, PageID pageId, PDBPagePtr& page, bool needMem, int numTries) {
436  return pinUserPage(this->nodeId, 0, 0, setId, pageId, page, needMem, numTries);
437 }
438 
440  DatabaseID dbId,
441  UserTypeID typeId,
442  SetID setId,
443  PageID pageId,
444  PDBPagePtr& page,
445  bool needMem,
446  int numTries) {
447  if (numTries == MAX_RETRIES) {
448  return false;
449  }
450  if (numTries > 0) {
451  logger->error(std::string("DataProxy: pinUserPage with numTries=") +
452  std::to_string(numTries));
453  }
454  std::string errMsg;
455  if (this->communicator->isSocketClosed() == true) {
456  std::cout << "ERROR in DataProxy: connection is closed" << std::endl;
457  logger->error("DataProxy: connection is closed, to reconnect");
458  if (communicator->reconnect(errMsg)) {
459  std::cout << errMsg << std::endl;
460  logger->error(std::string("DataProxy: reconnect failed with errMsg") + errMsg);
461  return false;
462  }
463  }
464 
465  if (nodeId != this->nodeId) {
466  this->logger->writeLn(
467  "DataProxy: We do not support to load pages from "
468  "remote node for the time being.");
469  return pinUserPage(nodeId, dbId, typeId, setId, pageId, page, needMem, numTries + 1);
470  }
471 
472  if (needMem == true) {
473  // create a PinPage object
474  {
475  const pdb::UseTemporaryAllocationBlock myBlock{1024};
476  pdb::Handle<pdb::StoragePinPage> msg = pdb::makeObject<pdb::StoragePinPage>();
477  msg->setNodeID(nodeId);
478  msg->setDatabaseID(dbId);
479  msg->setUserTypeID(typeId);
480  msg->setSetID(setId);
481  msg->setPageID(pageId);
482  msg->setWasNewPage(false);
483 
484  // send the message out
485  if (!this->communicator->sendObject<pdb::StoragePinPage>(msg, errMsg)) {
486  cout << "Sending object failure: " << errMsg << "\n";
487  return pinUserPage(
488  nodeId, dbId, typeId, setId, pageId, page, needMem, numTries + 1);
489  }
490  }
491 
492  // receive the PagePinned object
493  {
494  size_t objectSize = this->communicator->getSizeOfNextObject();
495  if (objectSize == 0) {
496  std::cout << "Receiveing ack failure" << std::endl;
497  return pinUserPage(
498  nodeId, dbId, typeId, setId, pageId, page, needMem, numTries + 1);
499  }
500  const pdb::UseTemporaryAllocationBlock myBlock{objectSize};
501  bool success;
503  this->communicator->getNextObject<pdb::StoragePagePinned>(success, errMsg);
504  if (ack == nullptr) {
505  cout << "Receiving ack failure:" << errMsg << "\n";
506  return pinUserPage(
507  nodeId, dbId, typeId, setId, pageId, page, needMem, numTries + 1);
508  }
509  char* dataIn = (char*)this->shm->getPointer(ack->getSharedMemOffset());
510  page = make_shared<PDBPage>(dataIn, ack->getSharedMemOffset(), 0);
511  page->setPinned(true);
512  page->setDirty(false);
513  return success;
514  }
515  } else {
516  // create a PinPage object
517  {
518  pdb::Handle<pdb::StoragePinPage> msg = pdb::makeObject<pdb::StoragePinPage>();
519  msg->setNodeID(nodeId);
520  msg->setDatabaseID(dbId);
521  msg->setUserTypeID(typeId);
522  msg->setSetID(setId);
523  msg->setPageID(pageId);
524  msg->setWasNewPage(false);
525 
526  // send the message out
527  if (!this->communicator->sendObject<pdb::StoragePinPage>(msg, errMsg)) {
528  cout << "Sending object failure: " << errMsg << "\n";
529  return pinUserPage(
530  nodeId, dbId, typeId, setId, pageId, page, needMem, numTries + 1);
531  }
532  }
533 
534  // receive the PagePinned object
535  {
536  bool success;
538  this->communicator->getNextObject<pdb::StoragePagePinned>(success, errMsg);
539  if (ack == nullptr) {
540  cout << "Receiving ack failure:" << errMsg << "\n";
541  return pinUserPage(
542  nodeId, dbId, typeId, setId, pageId, page, needMem, numTries + 1);
543  }
544  char* dataIn = (char*)this->shm->getPointer(ack->getSharedMemOffset());
545  page = make_shared<PDBPage>(dataIn, ack->getSharedMemOffset(), 0);
546  page->setPinned(true);
547  page->setDirty(false);
548  return success;
549  }
550  }
551 }
552 
553 
554 bool DataProxy::unpinTempPage(SetID setId, PDBPagePtr page, bool needMem, int numTries) {
555  return unpinUserPage(this->nodeId, 0, 0, setId, page, needMem, numTries);
556 }
557 
559  DatabaseID dbId,
560  UserTypeID typeId,
561  SetID setId,
562  PDBPagePtr page,
563  bool needMem,
564  int numTries) {
565  if (numTries == MAX_RETRIES) {
566  return false;
567  }
568  if (numTries > 0) {
569  logger->error(std::string("DataProxy: unpinUserPage with numTries=") +
570  std::to_string(numTries));
571  }
572  std::string errMsg;
573  if (this->communicator->isSocketClosed() == true) {
574  std::cout << "ERROR in DataProxy: connection is closed" << std::endl;
575  logger->error("DataProxy: connection is closed, to reconnect");
576  if (communicator->reconnect(errMsg)) {
577  std::cout << errMsg << std::endl;
578  logger->error(std::string("DataProxy: reconnect failed with errMsg") + errMsg);
579  return false;
580  }
581  }
582 
583  if (needMem == true) {
584  // create a UnpinPage object
585  {
586  pdb::UseTemporaryAllocationBlock myBlock{2048};
587 
588  pdb::Handle<pdb::StorageUnpinPage> msg = pdb::makeObject<pdb::StorageUnpinPage>();
589 
590  msg->setNodeID(nodeId);
591  msg->setDatabaseID(dbId);
592  msg->setUserTypeID(typeId);
593  msg->setSetID(setId);
594  msg->setPageID(page->getPageID());
595 
596  if (page->isDirty() == true) {
597  msg->setWasDirty(true);
598  } else {
599  msg->setWasDirty(false);
600  }
601 
602  // send the message out
603  if (!this->communicator->sendObject<pdb::StorageUnpinPage>(msg, errMsg)) {
604  std::cout << "Sending StorageUnpinPage object failure: " << errMsg << "\n";
605  logger->error(std::string("Sending StorageUnpinPage object failure:") + errMsg);
606  return unpinUserPage(nodeId, dbId, typeId, setId, page, needMem, numTries + 1);
607  }
608  }
609 
610  // receive the Ack object
611  {
612  size_t objectSize = this->communicator->getSizeOfNextObject();
613  if (objectSize == 0) {
614  std::cout << "receive ack failure" << std::endl;
615  return unpinUserPage(nodeId, dbId, typeId, setId, page, needMem, numTries + 1);
616  }
617  pdb::UseTemporaryAllocationBlock myBlock{objectSize};
618  bool success;
620  this->communicator->getNextObject<pdb::SimpleRequestResult>(success, errMsg);
621  if (ack == nullptr) {
622  cout << "Receiving ack failure:" << errMsg << "\n";
623  return unpinUserPage(nodeId, dbId, typeId, setId, page, needMem, numTries + 1);
624  }
625  return success && (ack->getRes().first);
626  }
627  } else {
628  // create a UnpinPage object
629  {
630  pdb::Handle<pdb::StorageUnpinPage> msg = pdb::makeObject<pdb::StorageUnpinPage>();
631 
632  msg->setNodeID(nodeId);
633  msg->setDatabaseID(dbId);
634  msg->setUserTypeID(typeId);
635  msg->setSetID(setId);
636  msg->setPageID(page->getPageID());
637 
638  if (page->isDirty() == true) {
639  msg->setWasDirty(true);
640  } else {
641  msg->setWasDirty(false);
642  }
643 
644  // send the message out
645  if (!this->communicator->sendObject<pdb::StorageUnpinPage>(msg, errMsg)) {
646  std::cout << "Sending StorageUnpinPage object failure: " << errMsg << "\n";
647  return unpinUserPage(nodeId, dbId, typeId, setId, page, needMem, numTries + 1);
648  }
649  }
650 
651  // receive the Ack object
652  {
653  bool success;
655  this->communicator->getNextObject<pdb::SimpleRequestResult>(success, errMsg);
656  if (ack == nullptr) {
657  cout << "Receiving ack failure:" << errMsg << "\n";
658  return unpinUserPage(nodeId, dbId, typeId, setId, page, needMem, numTries + 1);
659  }
660  return success && (ack->getRes().first);
661  }
662  }
663 }
664 
666  std::string errMsg;
667  if (this->communicator->isSocketClosed() == true) {
668  std::cout << "ERROR in DataProxy.getScanner: connection is closed" << std::endl;
669  if (communicator->reconnect(errMsg)) {
670  std::cout << errMsg << std::endl;
671  return nullptr;
672  }
673  }
674  if (numThreads <= 0) {
675  return nullptr;
676  }
677  int scannerBufferSize;
678  if (this->shm->getShmSize() / (DEFAULT_PAGE_SIZE) > 16) {
679  scannerBufferSize = 3;
680  } else {
681  scannerBufferSize = 1;
682  }
683  PageScannerPtr scanner = make_shared<PageScanner>(
684  this->communicator, this->shm, this->logger, numThreads, scannerBufferSize, this->nodeId);
685  return scanner;
686 }
687 
688 
689 #endif
shared_ptr< PageScanner > PageScannerPtr
Definition: PageScanner.h:39
unsigned int SetID
Definition: DataTypes.h:31
shared_ptr< PDBPage > PDBPagePtr
Definition: PDBPage.h:32
DataProxy(NodeID nodeId, pdb::PDBCommunicatorPtr communicator, SharedMemPtr shm, pdb::PDBLoggerPtr logger)
Definition: DataProxy.cc:45
#define MAX_RETRIES
Definition: DataProxy.cc:42
bool addUserPage(DatabaseID dbId, UserTypeID typeId, SetID setId, PDBPagePtr &page, bool needMem=true, int numTries=0)
Definition: DataProxy.cc:226
bool unpinTempPage(SetID setId, PDBPagePtr page, bool needMem=true, int numTries=0)
Definition: DataProxy.cc:554
bool pinBytes(DatabaseID dbId, UserTypeID typeId, SetID setId, size_t sizeOfBytes, void *bytes, bool needMem=true, int numTries=0)
Definition: DataProxy.cc:336
bool addTempSet(string setName, SetID &setId, bool needMem=true, int numTries=0)
Definition: DataProxy.cc:58
unsigned int NodeID
Definition: DataTypes.h:27
bool pinTempPage(SetID setId, PageID pageId, PDBPagePtr &page, bool needMem=true, int numTries=0)
Definition: DataProxy.cc:434
pdb::PDBLoggerPtr logger
Definition: DataProxy.h:153
bool unpinUserPage(NodeID nodeId, DatabaseID dbId, UserTypeID typeId, SetID setId, PDBPagePtr page, bool needMem=true, int numTries=0)
Definition: DataProxy.cc:558
shared_ptr< SharedMem > SharedMemPtr
Definition: SharedMem.h:32
unsigned int DatabaseID
Definition: DataTypes.h:29
std::shared_ptr< PDBCommunicator > PDBCommunicatorPtr
unsigned int PageID
Definition: DataTypes.h:26
bool removeTempSet(SetID setId, bool needMem=true, int numTries=0)
Definition: DataProxy.cc:144
bool addTempPage(SetID setId, PDBPagePtr &page, bool needMem=true, int numTries=0)
Definition: DataProxy.cc:222
pdb::PDBCommunicatorPtr communicator
Definition: DataProxy.h:151
~DataProxy()
Definition: DataProxy.cc:56
#define PDB_COUT
Definition: PDBDebug.h:31
PageScannerPtr getScanner(int numThreads)
Definition: DataProxy.cc:665
std::shared_ptr< PDBLogger > PDBLoggerPtr
Definition: PDBLogger.h:40
bool pinUserPage(NodeID nodeId, DatabaseID dbId, UserTypeID typeId, SetID setId, PageID pageId, PDBPagePtr &page, bool needMem=true, int numTries=0)
Definition: DataProxy.cc:439
SharedMemPtr shm
Definition: DataProxy.h:152
#define DEFAULT_PAGE_SIZE
Definition: Configuration.h:36
NodeID nodeId
Definition: DataProxy.h:154
unsigned int UserTypeID
Definition: DataTypes.h:25