53 this->communicator->setLongConnection(
true);
63 logger->error(std::string(
"DataProxy: addTempSet with numTries=") +
64 std::to_string(numTries));
68 std::cout <<
"ERROR in DataProxy: connection is closed" << std::endl;
69 logger->error(
"DataProxy: connection is closed, to reconnect");
71 std::cout << errMsg << std::endl;
72 logger->error(std::string(
"DataProxy: reconnect failed with errMsg") + errMsg);
76 if (needMem ==
true) {
81 pdb::makeObject<pdb::StorageAddTempSet>(setName);
86 cout <<
"Sending object failure: " << errMsg <<
"\n";
87 return addTempSet(setName, setId, needMem, numTries + 1);
93 size_t objectSize = this->
communicator->getSizeOfNextObject();
94 if (objectSize == 0) {
95 cout <<
"Receiving ack failure" << std::endl;
96 return addTempSet(setName, setId, needMem, numTries + 1);
103 if (ack ==
nullptr) {
104 cout <<
"Receiving ack failure:" << errMsg <<
"\n";
105 return addTempSet(setName, setId, needMem, numTries + 1);
107 if (success ==
true) {
108 setId = ack->getTempSetID();
116 pdb::makeObject<pdb::StorageAddTempSet>(setName);
121 cout <<
"Sending object failure: " << errMsg <<
"\n";
122 return addTempSet(setName, setId, needMem, numTries + 1);
131 if (ack ==
nullptr) {
132 cout <<
"Receiving ack failure:" << errMsg <<
"\n";
133 return addTempSet(setName, setId, needMem, numTries + 1);
135 if (success ==
true) {
136 setId = ack->getTempSetID();
149 logger->error(std::string(
"DataProxy: removeTempSet with numTries=") +
150 std::to_string(numTries));
154 std::cout <<
"ERROR in DataProxy: connection is closed" << std::endl;
155 logger->error(
"DataProxy: connection is closed, to reconnect");
157 std::cout << errMsg << std::endl;
158 logger->error(std::string(
"DataProxy: reconnect failed with errMsg") + errMsg);
164 if (needMem ==
true) {
168 pdb::makeObject<pdb::StorageRemoveTempSet>(setId);
172 cout <<
"Sending object failure: " << errMsg <<
"\n";
180 size_t objectSize = this->
communicator->getSizeOfNextObject();
181 if (objectSize == 0) {
182 std::cout <<
"Receiving ack failure" << std::endl;
188 if (ack ==
nullptr) {
189 cout <<
"Receiving ack failure:" << errMsg <<
"\n";
193 return success && (ack->getRes().first);
198 pdb::makeObject<pdb::StorageRemoveTempSet>(setId);
202 cout <<
"Sending object failure: " << errMsg <<
"\n";
212 if (ack ==
nullptr) {
213 cout <<
"Receiving ack failure:" << errMsg <<
"\n";
216 return success && (ack->getRes().first);
223 return addUserPage(0, 0, setId, page, needMem, numTries);
232 logger->error(std::string(
"DataProxy: addUserPage with numTries=") +
233 std::to_string(numTries));
237 std::cout <<
"ERROR in DataProxy: connection is closed" << std::endl;
238 logger->error(
"DataProxy: connection is closed, to reconnect");
240 std::cout << errMsg << std::endl;
241 logger->error(std::string(
"DataProxy: reconnect failed with errMsg") + errMsg);
245 if (needMem ==
true) {
251 msg->setNodeID(this->
nodeId);
252 msg->setDatabaseID(dbId);
253 msg->setUserTypeID(typeId);
254 msg->setSetID(setId);
255 msg->setWasNewPage(
true);
258 cout <<
"DataProxy.AddUserPage(): Sending object failure: " << errMsg <<
"\n";
259 return addUserPage(dbId, typeId, setId, page, needMem, numTries + 1);
265 size_t objectSize = this->
communicator->getSizeOfNextObject();
266 if (objectSize == 0) {
267 std::cout <<
"Receive ack failure" << std::endl;
268 return addUserPage(dbId, typeId, setId, page, needMem, numTries + 1);
270 PDB_COUT <<
"DataProxy: to allocate memory block for PagePinned object" << std::endl;
272 PDB_COUT <<
"DataProxy: memory block allocated" << std::endl;
276 if (ack ==
nullptr) {
277 cout <<
"Receiving ack failure:" << errMsg <<
"\n";
278 return addUserPage(dbId, typeId, setId, page, needMem, numTries + 1);
280 char* dataIn = (
char*)this->
shm->getPointer(ack->getSharedMemOffset());
281 page = make_shared<PDBPage>(dataIn,
283 ack->getDatabaseID(),
284 ack->getUserTypeID(),
288 ack->getSharedMemOffset());
289 page->setPinned(
true);
290 page->setDirty(
true);
299 msg->setNodeID(this->
nodeId);
300 msg->setDatabaseID(dbId);
301 msg->setUserTypeID(typeId);
302 msg->setSetID(setId);
303 msg->setWasNewPage(
true);
306 cout <<
"Sending object failure: " << errMsg <<
"\n";
307 return addUserPage(dbId, typeId, setId, page, needMem, numTries + 1);
316 if (ack ==
nullptr) {
317 cout <<
"Receiving ack failure:" << errMsg <<
"\n";
318 return addUserPage(dbId, typeId, setId, page, needMem, numTries + 1);
320 char* dataIn = (
char*)this->
shm->getPointer(ack->getSharedMemOffset());
321 page = make_shared<PDBPage>(dataIn,
323 ack->getDatabaseID(),
324 ack->getUserTypeID(),
328 ack->getSharedMemOffset());
329 page->setPinned(
true);
330 page->setDirty(
true);
347 logger->error(std::string(
"DataProxy: pinBytes with numTries=") + std::to_string(numTries));
351 std::cout <<
"ERROR in DataProxy: connection is closed" << std::endl;
352 logger->error(
"DataProxy: connection is closed, to reconnect");
354 std::cout << errMsg << std::endl;
355 logger->error(std::string(
"DataProxy: reconnect failed with errMsg") + errMsg);
359 if (needMem ==
true) {
364 msg->setNodeID(this->
nodeId);
365 msg->setDatabaseID(dbId);
366 msg->setUserTypeID(typeId);
367 msg->setSetID(setId);
368 msg->setSizeOfBytes(sizeOfBytes);
371 cout <<
"DataProxy.AddUserPage(): Sending object failure: " << errMsg <<
"\n";
373 return pinBytes(dbId, typeId, setId, sizeOfBytes, bytes, needMem, numTries + 1);
379 size_t objectSize = this->
communicator->getSizeOfNextObject();
380 if (objectSize == 0) {
381 std::cout <<
"Receive ack failure" << std::endl;
383 return pinBytes(dbId, typeId, setId, sizeOfBytes, bytes, needMem, numTries + 1);
385 PDB_COUT <<
"DataProxy: to allocate memory block for BytesPinned object" << std::endl;
387 PDB_COUT <<
"DataProxy: memory block allocated" << std::endl;
391 if (ack ==
nullptr) {
392 cout <<
"Receiving ack failure:" << errMsg <<
"\n";
394 return pinBytes(dbId, typeId, setId, sizeOfBytes, bytes, needMem, numTries + 1);
396 void* dest = this->
shm->getPointer(ack->getSharedMemOffset());
397 memcpy(dest, bytes, sizeOfBytes);
405 msg->setNodeID(this->
nodeId);
406 msg->setDatabaseID(dbId);
407 msg->setUserTypeID(typeId);
408 msg->setSetID(setId);
409 msg->setSizeOfBytes(sizeOfBytes);
412 cout <<
"Sending object failure: " << errMsg <<
"\n";
414 return pinBytes(dbId, typeId, setId, sizeOfBytes, bytes, needMem, numTries + 1);
422 if (ack ==
nullptr) {
423 cout <<
"Receiving ack failure:" << errMsg <<
"\n";
425 return pinBytes(dbId, typeId, setId, sizeOfBytes, bytes, needMem, numTries + 1);
427 void* dest = this->
shm->getPointer(ack->getSharedMemOffset());
428 memcpy(dest, bytes, sizeOfBytes);
451 logger->error(std::string(
"DataProxy: pinUserPage with numTries=") +
452 std::to_string(numTries));
456 std::cout <<
"ERROR in DataProxy: connection is closed" << std::endl;
457 logger->error(
"DataProxy: connection is closed, to reconnect");
459 std::cout << errMsg << std::endl;
460 logger->error(std::string(
"DataProxy: reconnect failed with errMsg") + errMsg);
465 if (nodeId != this->nodeId) {
467 "DataProxy: We do not support to load pages from "
468 "remote node for the time being.");
469 return pinUserPage(nodeId, dbId, typeId, setId, pageId, page, needMem, numTries + 1);
472 if (needMem ==
true) {
477 msg->setNodeID(nodeId);
478 msg->setDatabaseID(dbId);
479 msg->setUserTypeID(typeId);
480 msg->setSetID(setId);
481 msg->setPageID(pageId);
482 msg->setWasNewPage(
false);
486 cout <<
"Sending object failure: " << errMsg <<
"\n";
488 nodeId, dbId, typeId, setId, pageId, page, needMem, numTries + 1);
494 size_t objectSize = this->
communicator->getSizeOfNextObject();
495 if (objectSize == 0) {
496 std::cout <<
"Receiveing ack failure" << std::endl;
498 nodeId, dbId, typeId, setId, pageId, page, needMem, numTries + 1);
504 if (ack ==
nullptr) {
505 cout <<
"Receiving ack failure:" << errMsg <<
"\n";
507 nodeId, dbId, typeId, setId, pageId, page, needMem, numTries + 1);
509 char* dataIn = (
char*)this->
shm->getPointer(ack->getSharedMemOffset());
510 page = make_shared<PDBPage>(dataIn, ack->getSharedMemOffset(), 0);
511 page->setPinned(
true);
512 page->setDirty(
false);
519 msg->setNodeID(nodeId);
520 msg->setDatabaseID(dbId);
521 msg->setUserTypeID(typeId);
522 msg->setSetID(setId);
523 msg->setPageID(pageId);
524 msg->setWasNewPage(
false);
528 cout <<
"Sending object failure: " << errMsg <<
"\n";
530 nodeId, dbId, typeId, setId, pageId, page, needMem, numTries + 1);
539 if (ack ==
nullptr) {
540 cout <<
"Receiving ack failure:" << errMsg <<
"\n";
542 nodeId, dbId, typeId, setId, pageId, page, needMem, numTries + 1);
544 char* dataIn = (
char*)this->
shm->getPointer(ack->getSharedMemOffset());
545 page = make_shared<PDBPage>(dataIn, ack->getSharedMemOffset(), 0);
546 page->setPinned(
true);
547 page->setDirty(
false);
569 logger->error(std::string(
"DataProxy: unpinUserPage with numTries=") +
570 std::to_string(numTries));
574 std::cout <<
"ERROR in DataProxy: connection is closed" << std::endl;
575 logger->error(
"DataProxy: connection is closed, to reconnect");
577 std::cout << errMsg << std::endl;
578 logger->error(std::string(
"DataProxy: reconnect failed with errMsg") + errMsg);
583 if (needMem ==
true) {
590 msg->setNodeID(nodeId);
591 msg->setDatabaseID(dbId);
592 msg->setUserTypeID(typeId);
593 msg->setSetID(setId);
594 msg->setPageID(page->getPageID());
596 if (page->isDirty() ==
true) {
597 msg->setWasDirty(
true);
599 msg->setWasDirty(
false);
604 std::cout <<
"Sending StorageUnpinPage object failure: " << errMsg <<
"\n";
605 logger->error(std::string(
"Sending StorageUnpinPage object failure:") + errMsg);
606 return unpinUserPage(nodeId, dbId, typeId, setId, page, needMem, numTries + 1);
612 size_t objectSize = this->
communicator->getSizeOfNextObject();
613 if (objectSize == 0) {
614 std::cout <<
"receive ack failure" << std::endl;
615 return unpinUserPage(nodeId, dbId, typeId, setId, page, needMem, numTries + 1);
621 if (ack ==
nullptr) {
622 cout <<
"Receiving ack failure:" << errMsg <<
"\n";
623 return unpinUserPage(nodeId, dbId, typeId, setId, page, needMem, numTries + 1);
625 return success && (ack->getRes().first);
632 msg->setNodeID(nodeId);
633 msg->setDatabaseID(dbId);
634 msg->setUserTypeID(typeId);
635 msg->setSetID(setId);
636 msg->setPageID(page->getPageID());
638 if (page->isDirty() ==
true) {
639 msg->setWasDirty(
true);
641 msg->setWasDirty(
false);
646 std::cout <<
"Sending StorageUnpinPage object failure: " << errMsg <<
"\n";
647 return unpinUserPage(nodeId, dbId, typeId, setId, page, needMem, numTries + 1);
656 if (ack ==
nullptr) {
657 cout <<
"Receiving ack failure:" << errMsg <<
"\n";
658 return unpinUserPage(nodeId, dbId, typeId, setId, page, needMem, numTries + 1);
660 return success && (ack->getRes().first);
668 std::cout <<
"ERROR in DataProxy.getScanner: connection is closed" << std::endl;
670 std::cout << errMsg << std::endl;
674 if (numThreads <= 0) {
677 int scannerBufferSize;
679 scannerBufferSize = 3;
681 scannerBufferSize = 1;
shared_ptr< PageScanner > PageScannerPtr
shared_ptr< PDBPage > PDBPagePtr
DataProxy(NodeID nodeId, pdb::PDBCommunicatorPtr communicator, SharedMemPtr shm, pdb::PDBLoggerPtr logger)
bool addUserPage(DatabaseID dbId, UserTypeID typeId, SetID setId, PDBPagePtr &page, bool needMem=true, int numTries=0)
bool unpinTempPage(SetID setId, PDBPagePtr page, bool needMem=true, int numTries=0)
bool pinBytes(DatabaseID dbId, UserTypeID typeId, SetID setId, size_t sizeOfBytes, void *bytes, bool needMem=true, int numTries=0)
bool addTempSet(string setName, SetID &setId, bool needMem=true, int numTries=0)
bool pinTempPage(SetID setId, PageID pageId, PDBPagePtr &page, bool needMem=true, int numTries=0)
bool unpinUserPage(NodeID nodeId, DatabaseID dbId, UserTypeID typeId, SetID setId, PDBPagePtr page, bool needMem=true, int numTries=0)
shared_ptr< SharedMem > SharedMemPtr
std::shared_ptr< PDBCommunicator > PDBCommunicatorPtr
bool removeTempSet(SetID setId, bool needMem=true, int numTries=0)
bool addTempPage(SetID setId, PDBPagePtr &page, bool needMem=true, int numTries=0)
pdb::PDBCommunicatorPtr communicator
PageScannerPtr getScanner(int numThreads)
std::shared_ptr< PDBLogger > PDBLoggerPtr
bool pinUserPage(NodeID nodeId, DatabaseID dbId, UserTypeID typeId, SetID setId, PageID pageId, PDBPagePtr &page, bool needMem=true, int numTries=0)
#define DEFAULT_PAGE_SIZE