A platform for high-performance distributed tool and library development written in C++. It can be deployed in two different cluster modes: standalone or distributed. API for v0.5.0, released on June 13, 2018.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
TestCopyWork.cc
Go to the documentation of this file.
1 /*****************************************************************************
2  * *
3  * Copyright 2018 Rice University *
4  * *
5  * Licensed under the Apache License, Version 2.0 (the "License"); *
6  * you may not use this file except in compliance with the License. *
7  * You may obtain a copy of the License at *
8  * *
9  * http://www.apache.org/licenses/LICENSE-2.0 *
10  * *
11  * Unless required by applicable law or agreed to in writing, software *
12  * distributed under the License is distributed on an "AS IS" BASIS, *
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. *
14  * See the License for the specific language governing permissions and *
15  * limitations under the License. *
16  * *
17  *****************************************************************************/
18 #ifndef TESTCOPYWORK_CC
19 #define TESTCOPYWORK_CC
20 
21 #include "TestCopyWork.h"
22 #include "PDBPage.h"
23 #include "Handle.h"
24 #include "Object.h"
25 #include "DataProxy.h"
26 #include "PageHandle.h"
27 #include "InterfaceFunctions.h"
29 #include "PDBVector.h"
30 #include <string>
31 #include "SharedEmployee.h"
32 
33 using namespace std;
34 
36  DatabaseID destDatabaseId,
37  UserTypeID destTypeId,
38  SetID destSetId,
40  int& counter)
41  : counter(counter) {
42  this->iter = iter;
43  this->destDatabaseId = destDatabaseId;
44  this->destTypeId = destTypeId;
45  this->destSetId = destSetId;
46  this->server = server;
47 }
48 
49 // do the actual work
50 
51 void TestCopyWork::execute(PDBBuzzerPtr callerBuzzer) {
52  char logName[100];
53  sprintf(logName, "thread%d.log", iter->getId());
54  pdb::PDBLoggerPtr logger = make_shared<pdb::PDBLogger>(logName);
55  logger->writeLn("TestCopyWork: running...");
56 
57  // create a new connection to frontend server
58  string errMsg;
59  pdb::PDBCommunicatorPtr communicatorToFrontEnd = make_shared<pdb::PDBCommunicator>();
60  communicatorToFrontEnd->connectToInternetServer(
61  logger, server->getConf()->getPort(), "localhost", errMsg);
62 
63  NodeID nodeId = server->getNodeID();
64  DataProxyPtr proxy = make_shared<DataProxy>(
65  nodeId, communicatorToFrontEnd, this->server->getSharedMem(), logger);
66 
67  PDBPagePtr page, destPage;
68  proxy->addUserPage(destDatabaseId, destTypeId, destSetId, destPage);
69  PageHandlePtr destPageHandle = make_shared<PageHandle>(proxy, destPage);
70 
71  // load output page
73  blockPtr = std::make_shared<pdb::UseTemporaryAllocationBlock>(
74  destPageHandle->getWritableBytes(), destPageHandle->getWritableSize());
76  pdb::makeObject<pdb::Vector<pdb::Handle<SharedEmployee>>>(300000);
78  while (this->iter->hasNext()) {
79  page = this->iter->next();
80  if (page != nullptr) {
81 
82  // load input page
83  std::cout << "processing page with pageId=" << page->getPageID() << std::endl;
86  inputVec = temp->getRootObject();
87  std::cout << "there are " << inputVec->size() << " objects on the page." << std::endl;
88 
89 
90  for (int i = 0; i < inputVec->size(); i++) {
91  pdb::Handle<SharedEmployee> object = (*inputVec)[i];
92  if (i % 10000 == 0) {
93  std::cout << i << ":";
94  object->print();
95  std::cout << std::endl;
96  }
97  try {
98  outputVec->push_back(object);
99  } catch (pdb::NotEnoughSpace& n) {
100  std::cout << "########################################\n";
101  std::cout << "a page is fully written with " << outputVec->size()
102  << " objects\n";
103  getRecord(outputVec);
104  destPageHandle->unpin();
105  proxy->addUserPage(
106  this->destDatabaseId, this->destTypeId, this->destSetId, destPage);
107  logger->writeLn("TestCopyWork: proxy pinned a new temp page with pageId=");
108  logger->writeInt(destPage->getPageID());
109  destPageHandle = make_shared<PageHandle>(proxy, destPage);
110 
111  blockPtr = nullptr;
112  blockPtr = std::make_shared<pdb::UseTemporaryAllocationBlock>(
113  destPageHandle->getWritableBytes(), destPageHandle->getWritableSize());
114  outputVec = pdb::makeObject<pdb::Vector<pdb::Handle<SharedEmployee>>>(300000);
115  outputVec->push_back(object);
116  }
117  }
118  // clean the page;
119  if (proxy->unpinUserPage(
120  nodeId, page->getDbID(), page->getTypeID(), page->getSetID(), page) == false) {
121  logger->writeLn("TestCopyWork: can not unpin finished page.");
122  destPageHandle->unpin();
123  callerBuzzer->buzz(PDBAlarm::QueryError);
124  return;
125  }
126  logger->writeLn("TestCopyWork: send out unpinPage for source page with pageID:");
127  logger->writeInt(page->getPageID());
128  }
129  }
130  std::cout << "########################################\n";
131  std::cout << "a page is fully written with " << outputVec->size() << " objects\n";
132  getRecord(outputVec);
133  destPageHandle->unpin();
134  callerBuzzer->buzz(PDBAlarm::WorkAllDone, counter);
135  return;
136 }
137 
138 #endif
UserTypeID destTypeId
Definition: TestCopyWork.h:54
unsigned int SetID
Definition: DataTypes.h:31
shared_ptr< PDBPage > PDBPagePtr
Definition: PDBPage.h:32
PageCircularBufferIteratorPtr iter
Definition: TestCopyWork.h:51
Handle< ObjType > getRootObject()
Definition: Record.cc:46
shared_ptr< DataProxy > DataProxyPtr
Definition: DataProxy.h:30
pdb::HermesExecutionServer * server
Definition: TestCopyWork.h:52
shared_ptr< PageHandle > PageHandlePtr
Definition: PageHandle.h:34
unsigned int NodeID
Definition: DataTypes.h:27
unsigned int DatabaseID
Definition: DataTypes.h:29
Record< ObjType > * getRecord(Handle< ObjType > &forMe)
std::shared_ptr< PDBCommunicator > PDBCommunicatorPtr
SetID destSetId
Definition: TestCopyWork.h:55
shared_ptr< PDBBuzzer > PDBBuzzerPtr
Definition: PDBBuzzer.h:32
std::shared_ptr< PDBLogger > PDBLoggerPtr
Definition: PDBLogger.h:40
std::shared_ptr< UseTemporaryAllocationBlock > UseTemporaryAllocationBlockPtr
DatabaseID destDatabaseId
Definition: TestCopyWork.h:53
void execute(PDBBuzzerPtr callerBuzzer) override
Definition: TestCopyWork.cc:51
TestCopyWork(PageCircularBufferIteratorPtr iter, DatabaseID destDatabaseId, UserTypeID destTypeId, SetID destSetId, pdb::HermesExecutionServer *server, int &counter)
Definition: TestCopyWork.cc:35
shared_ptr< PageCircularBufferIterator > PageCircularBufferIteratorPtr
unsigned int UserTypeID
Definition: DataTypes.h:25