A platform for high-performance distributed tool and library development written in C++. It can be deployed in two different cluster modes: standalone or distributed. API for v0.5.0, released on June 13, 2018.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
HermesExecutionServer.cc
Go to the documentation of this file.
1 /*****************************************************************************
2  * *
3  * Copyright 2018 Rice University *
4  * *
5  * Licensed under the Apache License, Version 2.0 (the "License"); *
6  * you may not use this file except in compliance with the License. *
7  * You may obtain a copy of the License at *
8  * *
9  * http://www.apache.org/licenses/LICENSE-2.0 *
10  * *
11  * Unless required by applicable law or agreed to in writing, software *
12  * distributed under the License is distributed on an "AS IS" BASIS, *
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. *
14  * See the License for the specific language governing permissions and *
15  * limitations under the License. *
16  * *
17  *****************************************************************************/
18 
19 #ifndef HERMES_EXECUTION_SERVER_CC
20 #define HERMES_EXECUTION_SERVER_CC
21 
22 #include "PDBDebug.h"
23 #include "GenericWork.h"
24 #include "HermesExecutionServer.h"
25 #include "StoragePagePinned.h"
26 #include "StorageNoMorePage.h"
27 #include "StorageRemoveHashSet.h"
28 #include "SimpleRequestHandler.h"
29 #include "SimpleRequestResult.h"
30 #include "BackendTestSetScan.h"
31 #include "BackendTestSetCopy.h"
34 #include "TestScanWork.h"
35 #include "TestCopyWork.h"
36 #include "DataProxy.h"
37 #include "QueryBase.h"
38 #include "TupleSetJobStage.h"
39 #include "AggregationJobStage.h"
42 #include "PipelineStage.h"
43 #include "PartitionedHashSet.h"
44 #include "SharedHashSet.h"
45 #include "JoinMap.h"
46 #include "RecordIterator.h"
47 #include <vector>
48 
49 #ifndef JOIN_HASH_TABLE_SIZE_RATIO
50 #define JOIN_HASH_TABLE_SIZE_RATIO 1.5
51 #endif
52 
53 #ifndef HASH_PARTITIONED_JOIN_SIZE_RATIO
54 #define HASH_PARTITIONED_JOIN_SIZE_RATIO 1
55 #endif
56 
57 namespace pdb {
58 
60 
61 
62  // register a handler to process StoragePagePinned messages that are reponses to the same
63  // StorageGetSetPages message initiated by the current PageScanner instance.
64 
65  forMe.registerHandler(
66  StoragePagePinned_TYPEID,
68  PDBCommunicatorPtr sendUsingMe) {
69  PDB_COUT << "Start a handler to process StoragePagePinned messages\n";
70  bool res;
71  std::string errMsg;
72  PageScannerPtr scanner = getFunctionality<HermesExecutionServer>().getCurPageScanner();
73  if (scanner == nullptr) {
74  res = false;
75  errMsg = "Fatal Error: No job is running in execution server.";
76  std::cout << errMsg << std::endl;
77  } else {
78  PDB_COUT << "StoragePagePinned handler: to throw pinned pages to a circular buffer!"
79  << std::endl;
80  scanner->recvPagesLoop(request, sendUsingMe);
81  res = true;
82  }
83  return make_pair(res, errMsg);
84  }));
85 
86  // register a handler to process StorageNoMorePage message, that is the final response to the
87  // StorageGetSetPages message initiated by the current PageScanner instance.
88 
89  forMe.registerHandler(StorageNoMorePage_TYPEID,
91  Handle<StorageNoMorePage> request, PDBCommunicatorPtr sendUsingMe) {
92  bool res;
93  std::string errMsg;
94  PDB_COUT << "Got StorageNoMorePage object." << std::endl;
95  PageScannerPtr scanner =
96  getFunctionality<HermesExecutionServer>().getCurPageScanner();
97  PDB_COUT << "To close the scanner..." << std::endl;
98  if (scanner == nullptr) {
99  PDB_COUT << "The scanner has already been closed." << std::endl;
100  } else {
101  scanner->closeBuffer();
102  PDB_COUT << "We closed the scanner buffer." << std::endl;
103  }
104  res = true;
105  return make_pair(res, errMsg);
106 
107  }));
108 
109  // register a handler to process the BackendTestSetScan message
110  forMe.registerHandler(
111  BackendTestSetScan_TYPEID,
113  Handle<BackendTestSetScan> request, PDBCommunicatorPtr sendUsingMe) {
114  bool res;
115  std::string errMsg;
116 
117  DatabaseID dbId = request->getDatabaseID();
118  UserTypeID typeId = request->getUserTypeID();
119  SetID setId = request->getSetID();
120  PDB_COUT << "Backend received BackendTestSetScan message with dbId=" << dbId
121  << ", typeId=" << typeId << ", setId=" << setId << std::endl;
122 
123  int numThreads = getFunctionality<HermesExecutionServer>().getConf()->getNumThreads();
124  NodeID nodeId = getFunctionality<HermesExecutionServer>().getNodeID();
125  pdb::PDBLoggerPtr logger = getFunctionality<HermesExecutionServer>().getLogger();
126  SharedMemPtr shm = getFunctionality<HermesExecutionServer>().getSharedMem();
127  int backendCircularBufferSize = 3;
128 
129  PDBCommunicatorPtr communicatorToFrontend = make_shared<PDBCommunicator>();
130  communicatorToFrontend->connectToInternetServer(
131  logger,
132  getFunctionality<HermesExecutionServer>().getConf()->getPort(),
133  "localhost",
134  errMsg);
135  PageScannerPtr scanner = make_shared<PageScanner>(
136  communicatorToFrontend, shm, logger, numThreads, backendCircularBufferSize, nodeId);
137 
138  if (getFunctionality<HermesExecutionServer>().setCurPageScanner(scanner) == false) {
139  res = false;
140  errMsg = "Error: A job is already running!";
141  std::cout << errMsg << std::endl;
142  return make_pair(res, errMsg);
143  }
144 
145  std::vector<PageCircularBufferIteratorPtr> iterators =
146  scanner->getSetIterators(nodeId, dbId, typeId, setId);
147 
148  int numIteratorsReturned = iterators.size();
149  if (numIteratorsReturned != numThreads) {
150  res = false;
151  errMsg = "Error: number of iterators doesn't match number of threads!";
152  std::cout << errMsg << std::endl;
153  return make_pair(res, errMsg);
154  }
155  PDB_COUT << "Buzzer is created in TestScanWork\n";
156  PDBBuzzerPtr tempBuzzer = make_shared<PDBBuzzer>([&](PDBAlarm myAlarm, int &counter) {
157  counter++;
158  PDB_COUT << "counter = " << counter << std::endl;
159  });
160  int counter = 0;
161  for (int i = 0; i < numThreads; i++) {
162  PDBWorkerPtr worker =
163  getFunctionality<HermesExecutionServer>().getWorkers()->getWorker();
164 
165  // starting processing threads;
166  TestScanWorkPtr testScanWork = make_shared<TestScanWork>(
167  iterators.at(i), &(getFunctionality<HermesExecutionServer>()), counter);
168  worker->execute(testScanWork, tempBuzzer);
169  }
170 
171  while (counter < numThreads) {
172  tempBuzzer->wait();
173  }
174 
175  res = true;
176  const UseTemporaryAllocationBlock block{1024};
177  Handle<SimpleRequestResult> response = makeObject<SimpleRequestResult>(res, errMsg);
178 
179  // return the result
180  res = sendUsingMe->sendObject(response, errMsg);
181  return make_pair(res, errMsg);
182 
183  }));
184 
185 
186  // register a handler to process the BroadcastJoinBuildHTJobStage message
187  forMe.registerHandler(
188  BroadcastJoinBuildHTJobStage_TYPEID,
191 
192  getAllocator().cleanInactiveBlocks((size_t) ((size_t) 32 * (size_t) 1024 * (size_t) 1024));
193  getAllocator().cleanInactiveBlocks((size_t) ((size_t) 256 * (size_t) 1024 * (size_t) 1024));
194 
195 #ifdef ENABLE_LARGE_GRAPH
196  const UseTemporaryAllocationBlock block{
197  (size_t) ((size_t) 256 * (size_t) 1024 * (size_t) 1024)};
198 #else
199  const UseTemporaryAllocationBlock block{
200  (size_t)((size_t)32 * (size_t)1024 * (size_t)1024)};
201 #endif
202  bool success;
203  std::string errMsg;
204  PDB_COUT << "Backend got Broadcast JobStage message with Id=" << request->getStageId()
205  << std::endl;
206  request->print();
207 
208  // create a SharedHashSet instance
209  size_t hashSetSize = conf->getBroadcastPageSize() * (size_t) (request->getNumPages()) *
211  std::cout << "BroadcastJoinBuildHTJobStage: hashSetSize=" << hashSetSize << std::endl;
212  SharedHashSetPtr sharedHashSet =
213  make_shared<SharedHashSet>(request->getHashSetName(), hashSetSize);
214  if (sharedHashSet->isValid() == false) {
215  hashSetSize = conf->getBroadcastPageSize() * (size_t) (request->getNumPages()) * 1.5;
216 #ifdef AUTO_TUNING
217  size_t memSize = request->getTotalMemoryOnThisNode();
218  size_t sharedMemPoolSize = conf->getShmSize();
219  if (hashSetSize > (memSize - sharedMemPoolSize) * 0.8) {
220  hashSetSize = (memSize - sharedMemPoolSize) * 0.8;
221  std::cout << "WARNING: no more memory on heap can be allocated for hash set, "
222  "we reduce hash set size."
223  << std::endl;
224  }
225 #endif
226  std::cout << "BroadcastJoinBuildHTJobStage: tuned hashSetSize to be " << hashSetSize
227  << std::endl;
228  sharedHashSet = make_shared<SharedHashSet>(request->getHashSetName(), hashSetSize);
229  }
230  if (sharedHashSet->isValid() == false) {
231  success = false;
232  errMsg = "Error: heap memory becomes insufficient";
233  std::cout << errMsg << std::endl;
234  // return result to frontend
235  PDB_COUT << "to send back reply" << std::endl;
236  const UseTemporaryAllocationBlock block{1024};
237  Handle<SimpleRequestResult> response =
238  makeObject<SimpleRequestResult>(success, errMsg);
239  // return the result
240  success = sendUsingMe->sendObject(response, errMsg);
241  return make_pair(success, errMsg);
242  }
243  this->addHashSet(request->getHashSetName(), sharedHashSet);
244  std::cout << "BroadcastJoinBuildHTJobStage: hashSetName=" << request->getHashSetName()
245  << std::endl;
246  // tune backend circular buffer size
247  int numThreads = 1;
248  int backendCircularBufferSize = 1;
249  if (conf->getShmSize() / conf->getPageSize() - 2 <
250  2 + 2 * numThreads + backendCircularBufferSize) {
251  success = false;
252  errMsg = "Error: Not enough buffer pool size to run the query!";
253  std::cout << errMsg << std::endl;
254  // return result to frontend
255  PDB_COUT << "to send back reply" << std::endl;
256  const UseTemporaryAllocationBlock block{1024};
257  Handle<SimpleRequestResult> response =
258  makeObject<SimpleRequestResult>(success, errMsg);
259  // return the result
260  success = sendUsingMe->sendObject(response, errMsg);
261  return make_pair(success, errMsg);
262  }
263  backendCircularBufferSize =
264  (conf->getShmSize() / conf->getPageSize() - 4 - 2 * numThreads);
265  if (backendCircularBufferSize > 10) {
266  backendCircularBufferSize = 10;
267  }
268  success = true;
269  PDB_COUT << "backendCircularBufferSize is tuned to be " << backendCircularBufferSize
270  << std::endl;
271 
272 
273  // get scanner and iterators
274  PDBLoggerPtr scanLogger = make_shared<PDBLogger>("agg-scanner.log");
275  PDBCommunicatorPtr communicatorToFrontend = make_shared<PDBCommunicator>();
276  communicatorToFrontend->connectToInternetServer(
277  logger, conf->getPort(), conf->getServerAddress(), errMsg);
278  PageScannerPtr scanner = make_shared<PageScanner>(communicatorToFrontend,
279  shm,
280  scanLogger,
281  numThreads,
282  backendCircularBufferSize,
283  nodeId);
284  if (getFunctionality<HermesExecutionServer>().setCurPageScanner(scanner) == false) {
285  success = false;
286  errMsg = "Error: A job is already running!";
287  std::cout << errMsg << std::endl;
288  // return result to frontend
289  PDB_COUT << "to send back reply" << std::endl;
290  Handle<SimpleRequestResult> response =
291  makeObject<SimpleRequestResult>(success, errMsg);
292  // return the result
293  success = sendUsingMe->sendObject(response, errMsg);
294  return make_pair(success, errMsg);
295  }
296 
297  // get iterators
298  PDB_COUT << "To send GetSetPages message" << std::endl;
299  std::vector<PageCircularBufferIteratorPtr> iterators =
300  scanner->getSetIterators(nodeId,
301  request->getSourceContext()->getDatabaseId(),
302  request->getSourceContext()->getTypeId(),
303  request->getSourceContext()->getSetId());
304  PDB_COUT << "GetSetPages message is sent" << std::endl;
305 
306  // get data proxy
307  PDBCommunicatorPtr anotherCommunicatorToFrontend = make_shared<PDBCommunicator>();
308  anotherCommunicatorToFrontend->connectToInternetServer(
309  logger, conf->getPort(), conf->getServerAddress(), errMsg);
310  DataProxyPtr proxy =
311  make_shared<DataProxy>(nodeId, anotherCommunicatorToFrontend, shm, logger);
312 
313  // make allocator block and allocate the JoinMap
314  const UseTemporaryAllocationBlock tempBlock(sharedHashSet->getPage(), hashSetSize);
315 #ifdef PROFILING
316  std::string out = getAllocator().printInactiveBlocks();
317  std::cout << "BroadcastJoinBuildHTJobStage-backend: print inactive blocks:"
318  << std::endl;
319  std::cout << out << std::endl;
320 #endif
321  PDB_COUT << "hashSetSize = " << hashSetSize << std::endl;
323  // to get the sink merger
324  std::string sourceTupleSetSpecifier = request->getSourceTupleSetSpecifier();
325  std::string targetTupleSetSpecifier = request->getTargetTupleSetSpecifier();
326  std::string targetComputationSpecifier = request->getTargetComputationSpecifier();
327  Handle<ComputePlan> myComputePlan = request->getComputePlan();
328  SinkMergerPtr merger = myComputePlan->getMerger(
329  sourceTupleSetSpecifier, targetTupleSetSpecifier, targetComputationSpecifier);
330  Handle<Object> myMap = merger->createNewOutputContainer();
331 
332  // setup an output page to store intermediate results and final output
333  PageCircularBufferIteratorPtr iter = iterators.at(0);
334  PDBPagePtr page = nullptr;
335  while (iter->hasNext()) {
336  page = iter->next();
337  if (page != nullptr) {
338  // to get the map on the page
339  RecordIteratorPtr recordIter = make_shared<RecordIterator>(page);
340  while (recordIter->hasNext() == true) {
341  Record<Object> *record = recordIter->next();
342  if (record != nullptr) {
343  Handle<Object> theOtherMap = record->getRootObject();
344  // to merge the two maps
345  merger->writeOut(theOtherMap, myMap);
346  }
347  }
348 
349  proxy->unpinUserPage(
350  nodeId, page->getDbID(), page->getTypeID(), page->getSetID(), page);
351 
352  }
353 
354  }
355  getRecord(myMap);
356 
358 
359  if (this->setCurPageScanner(nullptr) == false) {
360  success = false;
361  errMsg = "Error: No job is running!";
362  std::cout << errMsg << std::endl;
363  }
364  // return result to frontend
365  PDB_COUT << "to send back reply" << std::endl;
366  const UseTemporaryAllocationBlock block1{1024};
367  Handle<SimpleRequestResult> response = makeObject<SimpleRequestResult>(success, errMsg);
368  // return the result
369  success = sendUsingMe->sendObject(response, errMsg);
370  return make_pair(success, errMsg);
371 
372  }));
373 
374  // register a handler to process the AggregationJobStge message
375  forMe.registerHandler(
376  AggregationJobStage_TYPEID,
378  Handle<AggregationJobStage> request, PDBCommunicatorPtr sendUsingMe) {
379  getAllocator().cleanInactiveBlocks((size_t) ((size_t) 32 * (size_t) 1024 * (size_t) 1024));
380  getAllocator().cleanInactiveBlocks((size_t) ((size_t) 256 * (size_t) 1024 * (size_t) 1024));
381  const UseTemporaryAllocationBlock block{32 * 1024 * 1024};
382  bool success;
383  std::string errMsg;
384 
385  std::cout << "Backend got Aggregation JobStage message with Id="
386  << request->getStageId() << std::endl;
387  request->print();
388 
389 #ifdef PROFILING
390  std::string out = getAllocator().printInactiveBlocks();
391  std::cout << "AggregationJobStage-backend: print inactive blocks:" << std::endl;
392  std::cout << out << std::endl;
393 #endif
394 
395  // get number of partitions
396  int numPartitions = request->getNumNodePartitions();
397 #ifdef USE_VALGRIND
398  double ratio = 0.05;
399 #else
400  double ratio = 0.8;
401 #endif
402 
403 #ifdef AUTO_TUNING
404  size_t memSize = request->getTotalMemoryOnThisNode();
405  size_t sharedMemPoolSize = conf->getShmSize();
406 
407 #ifdef ENABLE_LARGE_GRAPH
408  size_t tunedHashPageSize =
409  (double) (memSize * ((size_t) (1024)) - sharedMemPoolSize -
410  ((size_t) (conf->getNumThreads()) * (size_t) (256) * (size_t) (1024) *
411  (size_t) (1024)) -
412  getFunctionality<HermesExecutionServer>().getHashSetsSize()) *
413  (ratio) / (double) (numPartitions);
414 #else
415  size_t tunedHashPageSize =
416  (double)(memSize * ((size_t)(1024)) - sharedMemPoolSize -
417  getFunctionality<HermesExecutionServer>().getHashSetsSize()) *
418  (ratio) / (double)(numPartitions);
419 #endif
420  if (memSize * ((size_t) (1024)) <
421  sharedMemPoolSize + (size_t) 512 * (size_t) 1024 * (size_t) 1024) {
422  std::cout << "WARNING: Auto tuning can not work, use default values" << std::endl;
423  tunedHashPageSize = conf->getHashPageSize();
424  }
425  std::cout << "Tuned hash page size is " << tunedHashPageSize << std::endl;
426  conf->setHashPageSize(tunedHashPageSize);
427 #endif
428 
429 
430  // create multiple page circular queues
431  int aggregationBufferSize = 2;
432  std::vector<PageCircularBufferPtr> hashBuffers;
433  std::vector<PageCircularBufferIteratorPtr> hashIters;
434 
435  pthread_mutex_t connection_mutex;
436  pthread_mutex_init(&connection_mutex, nullptr);
437 
438  // create data proxy
439  pthread_mutex_lock(&connection_mutex);
440  PDBCommunicatorPtr communicatorToFrontend = make_shared<PDBCommunicator>();
441  communicatorToFrontend->connectToInternetServer(
442  logger, conf->getPort(), conf->getServerAddress(), errMsg);
443  pthread_mutex_unlock(&connection_mutex);
444 
445 
446  // create a buzzer and counter
447  PDBBuzzerPtr hashBuzzer =
448  make_shared<PDBBuzzer>([&](PDBAlarm myAlarm, int &hashCounter) {
449  hashCounter++;
450  PDB_COUT << "hashCounter = " << hashCounter << std::endl;
451  });
452  std::cout << "to run aggregation with " << numPartitions << " threads." << std::endl;
453  int hashCounter = 0;
454 
455  std::string hashSetName = "";
456  PartitionedHashSetPtr aggregationSet = nullptr;
457  if (request->needsToMaterializeAggOut() == false) {
458  Handle<SetIdentifier> sinkSetIdentifier = request->getSinkContext();
459  std::string dbName = sinkSetIdentifier->getDatabase();
460  std::string setName = sinkSetIdentifier->getSetName();
461  hashSetName = dbName + ":" + setName;
462  aggregationSet =
463  make_shared<PartitionedHashSet>(hashSetName, this->conf->getHashPageSize());
464  this->addHashSet(hashSetName, aggregationSet);
465  }
466 
467 
468  // start multiple threads
469  // each thread creates a hash set as temp set, and put key-value pairs to the hash set
470  int i;
471  for (i = 0; i < numPartitions; i++) {
472  PDBLoggerPtr myLogger =
473  make_shared<PDBLogger>(std::string("aggregation-") + std::to_string(i));
474  PageCircularBufferPtr buffer =
475  make_shared<PageCircularBuffer>(aggregationBufferSize, myLogger);
476  hashBuffers.push_back(buffer);
478  make_shared<PageCircularBufferIterator>(i, buffer, myLogger);
479  hashIters.push_back(iter);
480  PDBWorkerPtr worker =
481  getFunctionality<HermesExecutionServer>().getWorkers()->getWorker();
482  PDB_COUT << "to run the " << i << "-th work..." << std::endl;
483  // start threads
484  PDBWorkPtr myWork = make_shared<GenericWork>([&, i](PDBBuzzerPtr callerBuzzer) {
485 
486  std::string out = getAllocator().printInactiveBlocks();
487  logger->warn(out);
488  PDB_COUT << out << std::endl;
490  (size_t) ((size_t) 32 * (size_t) 1024 * (size_t) 1024));
492  (size_t) ((size_t) 256 * (size_t) 1024 * (size_t) 1024));
494  pthread_mutex_lock(&connection_mutex);
495  PDBCommunicatorPtr anotherCommunicatorToFrontend =
496  make_shared<PDBCommunicator>();
497  anotherCommunicatorToFrontend->connectToInternetServer(
498  logger, conf->getPort(), conf->getServerAddress(), errMsg);
499  pthread_mutex_unlock(&connection_mutex);
500  DataProxyPtr proxy =
501  make_shared<DataProxy>(nodeId, anotherCommunicatorToFrontend, shm, logger);
502 #ifdef ENABLE_LARGE_GRAPH
503  const UseTemporaryAllocationBlock block{256 * 1024 * 1024};
504 #else
505  const UseTemporaryAllocationBlock block{32 * 1024 * 1024};
506 #endif
507  std::string errMsg;
508 
509  // get aggregate computation
510  Handle<AbstractAggregateComp> aggComputation = request->getAggComputation();
512  deepCopyToCurrentAllocationBlock<AbstractAggregateComp>(aggComputation);
513 
514  // get aggregate processor
515  SimpleSingleTableQueryProcessorPtr aggregateProcessor =
516  newAgg->getAggregationProcessor((HashPartitionID) (i));
517  aggregateProcessor->initialize();
518  PageCircularBufferIteratorPtr myIter = hashIters[i];
519  if (request->needsToMaterializeAggOut() == false) {
520 
521  void *outBytes = nullptr;
522  while (myIter->hasNext()) {
523  PDBPagePtr page = myIter->next();
524  if (page != nullptr) {
526  (Record<Vector<Handle<Object>>> *) page->getBytes();
527  Handle<Vector<Handle<Object>>> inputData = myRec->getRootObject();
528  int inputSize = 0;
529  if (inputData != nullptr) {
530  inputSize = inputData->size();
531  }
532  for (int j = 0; j < inputSize; j++) {
533  aggregateProcessor->loadInputObject((*inputData)[j]);
534  if (aggregateProcessor->needsProcessInput() == false) {
535  continue;
536  }
537  if (outBytes == nullptr) {
538  // create a new partition
539  outBytes = aggregationSet->addPage();
540  if (outBytes == nullptr) {
541  std::cout << "insufficient memory in heap" << std::endl;
542  exit(-1);
543  }
544  aggregateProcessor->loadOutputPage(
545  outBytes, aggregationSet->getPageSize());
546  }
547  if (aggregateProcessor->fillNextOutputPage()) {
548  aggregateProcessor->clearOutputPage();
549  std::cout
550  << "WARNING: aggregation for partition-" << i
551  << " can't finish in one aggregation page with size="
552  << aggregationSet->getPageSize() << std::endl;
553  std::cout << "WARNING: results may not be fully aggregated "
554  "for partition-"
555  << i << ", please increase hash page size!!"
556  << std::endl;
557  logger->error(std::string(
558  "Hash page size is too small or memory is "
559  "insufficient, results are not fully aggregated!"));
560  break;
561  }
562  }
563  // unpin user page
564  // aggregateProcessor->clearInputPage();
565  page->decRefCount();
566  if (page->getRefCount() == 0) {
567  proxy->unpinUserPage(nodeId,
568  page->getDbID(),
569  page->getTypeID(),
570  page->getSetID(),
571  page);
572  }
573  }
574  }
575  if (outBytes != nullptr) {
576  aggregateProcessor->finalize();
577  aggregateProcessor->fillNextOutputPage();
578  aggregateProcessor->clearOutputPage();
579  }
580 
581  } else {
582  // get output set
583  SetSpecifierPtr outputSet =
584  make_shared<SetSpecifier>(request->getSinkContext()->getDatabase(),
585  request->getSinkContext()->getSetName(),
586  request->getSinkContext()->getDatabaseId(),
587  request->getSinkContext()->getTypeId(),
588  request->getSinkContext()->getSetId());
589  PDBPagePtr output = nullptr;
590 
591  // aggregation page size
592  size_t aggregationPageSize = conf->getHashPageSize();
593  // allocate one output page
594  void *aggregationPage = nullptr;
595 
596  // get aggOut processor
597  SimpleSingleTableQueryProcessorPtr aggOutProcessor =
598  newAgg->getAggOutProcessor();
599  aggOutProcessor->initialize();
600  PageCircularBufferIteratorPtr myIter = hashIters[i];
601  while (myIter->hasNext()) {
602  PDBPagePtr page = myIter->next();
603  if (page != nullptr) {
605  (Record<Vector<Handle<Object>>> *) page->getBytes();
606  Handle<Vector<Handle<Object>>> inputData = myRec->getRootObject();
607  // to make valgrind happy
608  int inputSize = 0;
609  if (inputData != nullptr) {
610  inputSize = inputData->size();
611  }
612  for (int j = 0; j < inputSize; j++) {
613  aggregateProcessor->loadInputObject((*inputData)[j]);
614  if (aggregateProcessor->needsProcessInput() == false) {
615  continue;
616  }
617  if (aggregationPage == nullptr) {
618  aggregationPage =
619  (void *) malloc(aggregationPageSize * sizeof(char));
620  aggregateProcessor->loadOutputPage(aggregationPage,
621  aggregationPageSize);
622  }
623  if (aggregateProcessor->fillNextOutputPage()) {
624  std::cout
625  << "WARNING: aggregation for partition-" << i
626  << " can't finish in one aggregation page with size="
627  << aggregationPageSize << std::endl;
628  std::cout << "WARNING: results may not be fully aggregated "
629  "for partition-"
630  << i
631  << ", please ask PDB admin to tune memory size!!"
632  << std::endl;
633  logger->error(std::string(
634  "Hash page size is too small or memory is "
635  "insufficient, results are not fully aggregated!"));
636  // write to output set
637  // load input page
638  aggOutProcessor->loadInputPage(aggregationPage);
639  // get output page
640  if (output == nullptr) {
641  proxy->addUserPage(outputSet->getDatabaseId(),
642  outputSet->getTypeId(),
643  outputSet->getSetId(),
644  output);
645  aggOutProcessor->loadOutputPage(output->getBytes(),
646  output->getSize());
647  }
648  while (aggOutProcessor->fillNextOutputPage()) {
649  aggOutProcessor->clearOutputPage();
650  PDB_COUT << i << ": AggOutProcessor: we now filled an "
651  "output page and unpin it"
652  << std::endl;
653  // unpin the output page
654  proxy->unpinUserPage(nodeId,
655  outputSet->getDatabaseId(),
656  outputSet->getTypeId(),
657  outputSet->getSetId(),
658  output);
659  // pin a new output page
660  proxy->addUserPage(outputSet->getDatabaseId(),
661  outputSet->getTypeId(),
662  outputSet->getSetId(),
663  output);
664  // load output
665  aggOutProcessor->loadOutputPage(output->getBytes(),
666  output->getSize());
667  }
668  aggregateProcessor->clearOutputPage();
669  free(aggregationPage);
670  break;
671  }
672  }
673  // aggregateProcessor->clearInputPage();
674  // unpin the input page
675  page->decRefCount();
676  if (page->getRefCount() == 0) {
677  proxy->unpinUserPage(nodeId,
678  page->getDbID(),
679  page->getTypeID(),
680  page->getSetID(),
681  page);
682  }
683  }
684  }
685  if (aggregationPage != nullptr) {
686  // finalize()
687  aggregateProcessor->finalize();
688  aggregateProcessor->fillNextOutputPage();
689  // load input page
690  aggOutProcessor->loadInputPage(aggregationPage);
691  // get output page
692  if (output == nullptr) {
693  proxy->addUserPage(outputSet->getDatabaseId(),
694  outputSet->getTypeId(),
695  outputSet->getSetId(),
696  output);
697  aggOutProcessor->loadOutputPage(output->getBytes(),
698  output->getSize());
699  }
700  while (aggOutProcessor->fillNextOutputPage()) {
701  aggOutProcessor->clearOutputPage();
702  // unpin the output page
703  proxy->unpinUserPage(nodeId,
704  outputSet->getDatabaseId(),
705  outputSet->getTypeId(),
706  outputSet->getSetId(),
707  output);
708  // pin a new output page
709  proxy->addUserPage(outputSet->getDatabaseId(),
710  outputSet->getTypeId(),
711  outputSet->getSetId(),
712  output);
713  // load output
714  aggOutProcessor->loadOutputPage(output->getBytes(),
715  output->getSize());
716  }
717 
718  // finalize() and unpin last output page
719  aggOutProcessor->finalize();
720  aggOutProcessor->fillNextOutputPage();
721  aggOutProcessor->clearOutputPage();
722  proxy->unpinUserPage(nodeId,
723  outputSet->getDatabaseId(),
724  outputSet->getTypeId(),
725  outputSet->getSetId(),
726  output);
727  // free aggregation page
728  aggregateProcessor->clearOutputPage();
729  free(aggregationPage);
730  } // aggregationPage != nullptr
731 
732  } // request->needsToMaterializeAggOut() == true
734 #ifdef PROFILING
736  std::cout << "AggregationJobStage-backend-thread: print inactive blocks:"
737  << std::endl;
738  std::cout << out << std::endl;
739 #endif
740  callerBuzzer->buzz(PDBAlarm::WorkAllDone, hashCounter);
741 
742  });
743  worker->execute(myWork, hashBuzzer);
744 
745  } // for
746 
747  // start single-thread scanner
748  // the thread iterates page, and put each page to all queues, in the end close all
749  // buffers
750 
751  int backendCircularBufferSize = numPartitions;
752  int numThreads = 1;
753  PDBLoggerPtr scanLogger = make_shared<PDBLogger>("agg-scanner.log");
754  PageScannerPtr scanner = make_shared<PageScanner>(communicatorToFrontend,
755  shm,
756  scanLogger,
757  numThreads,
758  backendCircularBufferSize,
759  nodeId);
760  if (getFunctionality<HermesExecutionServer>().setCurPageScanner(scanner) == false) {
761  success = false;
762  errMsg = "Error: A job is already running!";
763  std::cout << errMsg << std::endl;
764  // return result to frontend
765  PDB_COUT << "to send back reply" << std::endl;
766  const UseTemporaryAllocationBlock block{1024};
767  Handle<SimpleRequestResult> response =
768  makeObject<SimpleRequestResult>(success, errMsg);
769  // return the result
770  success = sendUsingMe->sendObject(response, errMsg);
771  return make_pair(success, errMsg);
772  }
773 
774  // get iterators
775  std::cout << "To send GetSetPages message" << std::endl;
776  std::vector<PageCircularBufferIteratorPtr> iterators =
777  scanner->getSetIterators(nodeId,
778  request->getSourceContext()->getDatabaseId(),
779  request->getSourceContext()->getTypeId(),
780  request->getSourceContext()->getSetId());
781  std::cout << "GetSetPages message is sent" << std::endl;
782  int numIteratorsReturned = iterators.size();
783  if (numIteratorsReturned != numThreads) {
784  int k;
785  for (k = 0; k < numPartitions; k++) {
786  PageCircularBufferPtr buffer = hashBuffers[k];
787  buffer->close();
788  }
789 
790  while (hashCounter < numPartitions) {
791  hashBuzzer->wait();
792  }
793  pthread_mutex_destroy(&connection_mutex);
794  success = false;
795  errMsg = "Error: number of iterators doesn't match number of threads!";
796  std::cout << errMsg << std::endl;
797  // return result to frontend
798  PDB_COUT << "to send back reply" << std::endl;
799  const UseTemporaryAllocationBlock block{1024};
800  Handle<SimpleRequestResult> response =
801  makeObject<SimpleRequestResult>(success, errMsg);
802  // return the result
803  success = sendUsingMe->sendObject(response, errMsg);
804  return make_pair(success, errMsg);
805  }
806 
807  // create a buzzer and counter
808  PDBBuzzerPtr tempBuzzer = make_shared<PDBBuzzer>([&](PDBAlarm myAlarm, int &counter) {
809  counter++;
810  PDB_COUT << "scan counter = " << counter << std::endl;
811  });
812  int counter = 0;
813 
814  for (int j = 0; j < numThreads; j++) {
815  PDBWorkerPtr worker =
816  getFunctionality<HermesExecutionServer>().getWorkers()->getWorker();
817  std::cout << "to run the " << j << "-th scan work..." << std::endl;
818  // start threads
819  PDBWorkPtr myWork = make_shared<GenericWork>([&, j](PDBBuzzerPtr callerBuzzer) {
820  // setup an output page to store intermediate results and final output
821  const UseTemporaryAllocationBlock tempBlock{4 * 1024 * 1024};
822  PageCircularBufferIteratorPtr iter = iterators.at(j);
823  PDBPagePtr page = nullptr;
824  while (iter->hasNext()) {
825  page = iter->next();
826  if (page != nullptr) {
827  int k;
828  for (k = 0; k < numPartitions; k++) {
829  page->incRefCount();
830  }
831  for (k = 0; k < numPartitions; k++) {
832  hashBuffers[k]->addPageToTail(page);
833  }
834  }
835  }
836  callerBuzzer->buzz(PDBAlarm::WorkAllDone, counter);
837  });
838 
839  worker->execute(myWork, tempBuzzer);
840  }
841 
842  while (counter < numThreads) {
843  tempBuzzer->wait();
844  }
845 
846  int k;
847  for (k = 0; k < numPartitions; k++) {
848  PageCircularBufferPtr buffer = hashBuffers[k];
849  buffer->close();
850  }
851 
852 
853  // wait for multiple threads to return
854  while (hashCounter < numPartitions) {
855  hashBuzzer->wait();
856  }
857 
858  // reset scanner
859  pthread_mutex_destroy(&connection_mutex);
860 
861  if (getFunctionality<HermesExecutionServer>().setCurPageScanner(nullptr) == false) {
862  success = false;
863  errMsg = "Error: No job is running!";
864  std::cout << errMsg << std::endl;
865  }
866 
867 
868  // return result to frontend
869  PDB_COUT << "to send back reply" << std::endl;
870  const UseTemporaryAllocationBlock block1{1024};
871  Handle<SimpleRequestResult> response = makeObject<SimpleRequestResult>(success, errMsg);
872  // return the result
873  success = sendUsingMe->sendObject(response, errMsg);
874  return make_pair(success, errMsg);
875 
876  }
877 
878  ));
879 
880  // register a handler to process the HashPartitionedJoinBuildHTJobStage message
881  forMe.registerHandler(
882  HashPartitionedJoinBuildHTJobStage_TYPEID,
885  getAllocator().cleanInactiveBlocks((size_t) ((size_t) 256 * (size_t) 1024 * (size_t) 1024));
886  const UseTemporaryAllocationBlock block{32 * 1024 * 1024};
887  bool success;
888  std::string errMsg;
889 
890  std::cout << "Backend got HashPartitionedJoinBuildHTJobStage message with Id="
891  << request->getStageId() << std::endl;
892  request->print();
893 
894 #ifdef PROFILING
895  std::string out = getAllocator().printInactiveBlocks();
896  std::cout << "HashPartitionedJoinBuildHTJobStage-backend: print inactive blocks:"
897  << std::endl;
898  std::cout << out << std::endl;
899 #endif
900 
901  // estimate memory for creating the partitioned hash set;
902  // get number of partitions
903  int numPartitions = request->getNumNodePartitions();
904  int numPages = request->getNumPages();
905  if (numPages == 0) {
906  numPages = 1;
907  }
908  double sizeRatio = HASH_PARTITIONED_JOIN_SIZE_RATIO * numPartitions;
909  if (sizeRatio > numPartitions) {
910  sizeRatio = numPartitions;
911  }
912  size_t hashSetSize = (double) (conf->getShufflePageSize()) *
913  (double) (numPages) * sizeRatio / (double) (numPartitions);
914  // create hash set
915  std::string hashSetName = request->getHashSetName();
916  PartitionedHashSetPtr partitionedSet = make_shared<PartitionedHashSet>(hashSetName, hashSetSize);
917  this->addHashSet(hashSetName, partitionedSet);
918  std::cout << "Added hash set for HashPartitionedJoin to probe" << std::endl;
919  for (int i = 0; i < numPartitions; i++) {
920  void *bytes = partitionedSet->addPage();
921  if (bytes == nullptr) {
922  std::cout << "Insufficient memory in heap" << std::endl;
923  exit(1);
924  }
925  }
926  // create multiple page circular queues
927  int buildingHTBufferSize = 2;
928  std::vector<PageCircularBufferPtr> hashBuffers;
929  std::vector<PageCircularBufferIteratorPtr> hashIters;
930 
931  pthread_mutex_t connection_mutex;
932  pthread_mutex_init(&connection_mutex, nullptr);
933 
934  // create data proxy
935  pthread_mutex_lock(&connection_mutex);
936  PDBCommunicatorPtr communicatorToFrontend = make_shared<PDBCommunicator>();
937  communicatorToFrontend->connectToInternetServer(
938  logger, conf->getPort(), conf->getServerAddress(), errMsg);
939  pthread_mutex_unlock(&connection_mutex);
940 
941 
942  // create a buzzer and counter
943  PDBBuzzerPtr hashBuzzer =
944  make_shared<PDBBuzzer>([&](PDBAlarm myAlarm, int &hashCounter) {
945  hashCounter++;
946  PDB_COUT << "hashCounter = " << hashCounter << std::endl;
947  });
948  std::cout << "to build hashtables with " << numPartitions << " threads." << std::endl;
949  int hashCounter = 0;
950 
951  // to get the sink merger
952  std::string sourceTupleSetSpecifier = request->getSourceTupleSetSpecifier();
953  std::string targetTupleSetSpecifier = request->getTargetTupleSetSpecifier();
954  std::string targetComputationSpecifier = request->getTargetComputationSpecifier();
955  Handle<ComputePlan> myComputePlan = request->getComputePlan();
956  SinkMergerPtr merger = myComputePlan->getMerger(sourceTupleSetSpecifier,
957  targetTupleSetSpecifier,
958  targetComputationSpecifier);
959 
960  // start multiple threads, with each thread have a queue and check pages in the queue
961  // each page has a vector of JoinMap
962  // each thread has a partition id and check whether each JoinMap has the same partition
963  // id
964  // if it finds a JoinMap in the same partition, the thread merge it
965 
966  // start multiple threads
967  // each thread creates a hash set as temp set, and put key-value pairs to the hash set
968  for (int i = 0; i < numPartitions; i++) {
969  PDBLoggerPtr myLogger = make_shared<PDBLogger>(std::string("buildHT-") + std::to_string(i));
970  PageCircularBufferPtr buffer = make_shared<PageCircularBuffer>(buildingHTBufferSize, myLogger);
971  hashBuffers.push_back(buffer);
972  PageCircularBufferIteratorPtr iter = make_shared<PageCircularBufferIterator>(i, buffer, myLogger);
973  hashIters.push_back(iter);
974  PDBWorkerPtr worker = getFunctionality<HermesExecutionServer>().getWorkers()->getWorker();
975  PDB_COUT << "to run the " << i << "-th work..." << std::endl;
976  // start threads
977  PDBWorkPtr myWork = make_shared<GenericWork>([&, i](PDBBuzzerPtr callerBuzzer) {
978 
979  pthread_mutex_lock(&connection_mutex);
980  PDBCommunicatorPtr anotherCommunicatorToFrontend = make_shared<PDBCommunicator>();
981  anotherCommunicatorToFrontend->connectToInternetServer(logger,
982  conf->getPort(),
983  conf->getServerAddress(),
984  errMsg);
985  pthread_mutex_unlock(&connection_mutex);
986  DataProxyPtr proxy = make_shared<DataProxy>(nodeId, anotherCommunicatorToFrontend, shm, logger);
987 
988  std::string errMsg;
989 
990  // make allocator block and allocate the JoinMap
991  const UseTemporaryAllocationBlock tempBlock(partitionedSet->getPage(i),
992  hashSetSize);
993 #ifdef PROFILING
994  std::string out = getAllocator().printInactiveBlocks();
995  logger->warn(out);
996  std::cout
997  << "HashPartitionedJoinBuildHTJobStage-backend: print inactive blocks:"
998  << std::endl;
999  std::cout << out << std::endl;
1000 #endif
1001  PDB_COUT << "hashSetSize = " << hashSetSize << std::endl;
1003  Handle<Object> myMap = merger->createNewOutputContainer();
1004 
1005  // setup an output page to store intermediate results and final output
1006  PageCircularBufferIteratorPtr myIter = hashIters[i];
1007  PDBPagePtr page = nullptr;
1008  while (myIter->hasNext()) {
1009  page = myIter->next();
1010  if (page != nullptr) {
1011  // to get the map on the page
1012  RecordIteratorPtr recordIter = make_shared<RecordIterator>(page);
1013  while (recordIter->hasNext()) {
1014  Record<Object> *record = recordIter->next();
1015  if (record != nullptr) {
1016  Handle<Object> mapsToMerge = record->getRootObject();
1017  merger->writeVectorOut(mapsToMerge, myMap);
1018  }
1019  }
1020  // unpin the input page
1021  page->decRefCount();
1022  if (page->getRefCount() == 0) {
1023  proxy->unpinUserPage(nodeId,
1024  page->getDbID(),
1025  page->getTypeID(),
1026  page->getSetID(),
1027  page);
1028  }
1029 
1030  } else {
1031  PDB_COUT << "####Scanner got a null page" << std::endl;
1032  }
1033  }
1034  PDB_COUT << "To get record" << std::endl;
1035  getRecord(myMap);
1036 
1038 #ifdef PROFILING
1040  std::cout << "HashPartitionedJoinBuildHTJobStage-backend-thread: print "
1041  "inactive blocks:"
1042  << std::endl;
1043  std::cout << out << std::endl;
1044 #endif
1045  callerBuzzer->buzz(PDBAlarm::WorkAllDone, hashCounter);
1046 
1047  });
1048  worker->execute(myWork, hashBuzzer);
1049 
1050  } // for
1051 
1052  // get input set and start a one thread scanner to scan that input set, and put the
1053  // pointer to pages to each of the queues
1054  // start single-thread scanner
1055  // the thread iterates page, and put each page to all queues, in the end close all
1056  // buffers
1057 
1058  int backendCircularBufferSize = numPartitions;
1059  int numThreads = 1;
1060  PDBLoggerPtr scanLogger = make_shared<PDBLogger>("buildHTs-scanner.log");
1061  PageScannerPtr scanner = make_shared<PageScanner>(communicatorToFrontend,
1062  shm,
1063  scanLogger,
1064  numThreads,
1065  backendCircularBufferSize,
1066  nodeId);
1067  if (getFunctionality<HermesExecutionServer>().setCurPageScanner(scanner) == false) {
1068  success = false;
1069  errMsg = "Error: A job is already running!";
1070  std::cout << errMsg << std::endl;
1071  // return result to frontend
1072  PDB_COUT << "to send back reply" << std::endl;
1073  const UseTemporaryAllocationBlock block{1024};
1074  Handle<SimpleRequestResult> response = makeObject<SimpleRequestResult>(success, errMsg);
1075  // return the result
1076  success = sendUsingMe->sendObject(response, errMsg);
1077  return make_pair(success, errMsg);
1078  }
1079  // get iterators
1080  PDB_COUT << "To send GetSetPages message" << std::endl;
1081  std::vector<PageCircularBufferIteratorPtr> iterators = scanner->getSetIterators(nodeId,
1082  request->getSourceContext()->getDatabaseId(),
1083  request->getSourceContext()->getTypeId(),
1084  request->getSourceContext()->getSetId());
1085  PDB_COUT << "GetSetPages message is sent" << std::endl;
1086  unsigned long numIteratorsReturned = iterators.size();
1087  if (numIteratorsReturned != numThreads) {
1088  int k;
1089  for (k = 0; k < numPartitions; k++) {
1090  PageCircularBufferPtr buffer = hashBuffers[k];
1091  buffer->close();
1092  }
1093 
1094  while (hashCounter < numPartitions) {
1095  hashBuzzer->wait();
1096  }
1097  pthread_mutex_destroy(&connection_mutex);
1098  success = false;
1099  errMsg = "Error: number of iterators doesn't match number of threads!";
1100  std::cout << errMsg << std::endl;
1101  // return result to frontend
1102  PDB_COUT << "to send back reply" << std::endl;
1103  const UseTemporaryAllocationBlock block{1024};
1104  Handle<SimpleRequestResult> response = makeObject<SimpleRequestResult>(success, errMsg);
1105  // return the result
1106  success = sendUsingMe->sendObject(response, errMsg);
1107  return make_pair(success, errMsg);
1108  }
1109 
1110  // create a buzzer and counter
1111  PDBBuzzerPtr tempBuzzer = make_shared<PDBBuzzer>([&](PDBAlarm myAlarm, int &counter) {
1112  counter++;
1113  PDB_COUT << "scan counter = " << counter << std::endl;
1114  });
1115  int counter = 0;
1116 
1117  for (int j = 0; j < numThreads; j++) {
1118  PDBWorkerPtr worker = getFunctionality<HermesExecutionServer>().getWorkers()->getWorker();
1119  PDB_COUT << "to run the " << j << "-th scan work..." << std::endl;
1120  // start threads
1121  PDBWorkPtr myWork = make_shared<GenericWork>([&, j](PDBBuzzerPtr callerBuzzer) {
1122  // setup an output page to store intermediate results and final output
1123  const UseTemporaryAllocationBlock tempBlock{4 * 1024 * 1024};
1124  PageCircularBufferIteratorPtr iter = iterators.at(j);
1125  PDBPagePtr page = nullptr;
1126  while (iter->hasNext()) {
1127  page = iter->next();
1128  if (page != nullptr) {
1129  int k;
1130  for (k = 0; k < numPartitions; k++) {
1131  page->incRefCount();
1132  }
1133  for (k = 0; k < numPartitions; k++) {
1134  hashBuffers[k]->addPageToTail(page);
1135  }
1136  }
1137  }
1138  callerBuzzer->buzz(PDBAlarm::WorkAllDone, counter);
1139  });
1140 
1141  worker->execute(myWork, tempBuzzer);
1142  }
1143 
1144  while (counter < numThreads) {
1145  tempBuzzer->wait();
1146  }
1147 
1148  int k;
1149  for (k = 0; k < numPartitions; k++) {
1150  PageCircularBufferPtr buffer = hashBuffers[k];
1151  buffer->close();
1152  }
1153 
1154 
1155  // wait for multiple threads to return
1156  while (hashCounter < numPartitions) {
1157  hashBuzzer->wait();
1158  }
1159 
1160  // reset scanner
1161  pthread_mutex_destroy(&connection_mutex);
1162 
1163  if (getFunctionality<HermesExecutionServer>().setCurPageScanner(nullptr) == false) {
1164  success = false;
1165  errMsg = "Error: No job is running!";
1166  std::cout << errMsg << std::endl;
1167  }
1168 
1169 
1170  // return result to frontend
1171  PDB_COUT << "to send back reply" << std::endl;
1172  const UseTemporaryAllocationBlock block1{1024};
1173  Handle<SimpleRequestResult> response = makeObject<SimpleRequestResult>(success, errMsg);
1174  // return the result
1175  success = sendUsingMe->sendObject(response, errMsg);
1176  return make_pair(success, errMsg);
1177 
1178  }));
1179 
1180 
1181  // register a handler to process the TupleSetJobStage message
1182  forMe.registerHandler(
1183  TupleSetJobStage_TYPEID,
1185  PDBCommunicatorPtr sendUsingMe) {
1186  getAllocator().cleanInactiveBlocks((size_t) ((size_t) 32 * (size_t) 1024 * (size_t) 1024));
1187 
1188  getAllocator().cleanInactiveBlocks((size_t) ((size_t) 256 * (size_t) 1024 * (size_t) 1024));
1189  PDB_COUT << "Backend got Tuple JobStage message with Id=" << request->getStageId()
1190  << std::endl;
1191  request->print();
1192  bool res = true;
1193  std::string errMsg;
1194 #ifdef ENABLE_LARGE_GRAPH
1195  const UseTemporaryAllocationBlock block1{256 * 1024 * 1024};
1196 #else
1197  const UseTemporaryAllocationBlock block1{32 * 1024 * 1024};
1198 #endif
1199 #ifdef PROFILING
1200  std::string out = getAllocator().printInactiveBlocks();
1201  std::cout << "TupleSetJobStage-backend: print inactive blocks:" << std::endl;
1202  std::cout << out << std::endl;
1203 #endif
1204  Handle<SetIdentifier> sourceContext = request->getSourceContext();
1205  if (getCurPageScanner() == nullptr) {
1206  NodeID nodeId = getFunctionality<HermesExecutionServer>().getNodeID();
1207  pdb::PDBLoggerPtr logger = getFunctionality<HermesExecutionServer>().getLogger();
1208  SharedMemPtr shm = getFunctionality<HermesExecutionServer>().getSharedMem();
1209  ConfigurationPtr conf = getFunctionality<HermesExecutionServer>().getConf();
1210  Handle<PipelineStage> pipeline = makeObject<PipelineStage>(request,
1211  shm,
1212  logger,
1213  conf,
1214  nodeId,
1215  conf->getBatchSize(),
1216  conf->getNumThreads());
1217  if (request->isRepartitionJoin() == true) {
1218  PDB_COUT << "run pipeline for hash partitioned join" << std::endl;
1219  pipeline->runPipelineWithHashPartitionSink(this);
1220  } else if (((request->isRepartition() == false) ||
1221  (request->isCombining() == false)) &&
1222  (request->isBroadcasting() == false)) {
1223  PDB_COUT << "run pipeline..." << std::endl;
1224  pipeline->runPipeline(this);
1225  } else if (request->isBroadcasting() == true) {
1226  PDB_COUT << "run pipeline with broadcasting..." << std::endl;
1227  pipeline->runPipelineWithBroadcastSink(this);
1228  } else {
1229  PDB_COUT << "run pipeline with combiner..." << std::endl;
1230  pipeline->runPipelineWithShuffleSink(this);
1231  }
1232  if ((sourceContext->isAggregationResult() == true) &&
1233  (sourceContext->getSetType() == PartitionedHashSetType)) {
1234  std::string hashSetName =
1235  sourceContext->getDatabase() + ":" + sourceContext->getSetName();
1236  AbstractHashSetPtr hashSet = this->getHashSet(hashSetName);
1237  if (hashSet != nullptr) {
1238  hashSet->cleanup();
1239  this->removeHashSet(hashSetName);
1240  } else {
1241  std::cout << "Can't remove hash set " << hashSetName
1242  << ": set doesn't exist" << std::endl;
1243  }
1244  }
1245 
1246  // if this stage scans hash tables we need remove those hash tables
1247  if (request->isProbing() == true) {
1248  Handle<Map<String, String>> hashTables = request->getHashSets();
1249  if (hashTables != nullptr) {
1250  for (PDBMapIterator<String, String> mapIter = hashTables->begin();
1251  mapIter != hashTables->end();
1252  ++mapIter) {
1253  std::string key = (*mapIter).key;
1254  std::string hashSetName = (*mapIter).value;
1255  std::cout << "remove " << key << ":" << hashSetName << std::endl;
1256  AbstractHashSetPtr hashSet = this->getHashSet(hashSetName);
1257  if (hashSet != nullptr) {
1258  hashSet->cleanup();
1259  this->removeHashSet(hashSetName);
1260  } else {
1261  std::cout << "Can't remove hash set " << hashSetName
1262  << ": set doesn't exist" << std::endl;
1263  }
1264  }
1265  }
1266  }
1267 
1268  } else {
1269  res = false;
1270  errMsg = "A Job is already running in this server";
1271  std::cout << errMsg << std::endl;
1272  // We do not remove the hash table, so that we can try again.
1273  }
1274  PDB_COUT << "to send back reply" << std::endl;
1275  const UseTemporaryAllocationBlock block2{1024};
1276  Handle<SimpleRequestResult> response = makeObject<SimpleRequestResult>(res, errMsg);
1277  // return the result
1278  res = sendUsingMe->sendObject(response, errMsg);
1279  return make_pair(res, errMsg);
1280  }));
1281 
1282  forMe.registerHandler(
1283  StorageRemoveHashSet_TYPEID,
1285  Handle<StorageRemoveHashSet> request, PDBCommunicatorPtr sendUsingMe) {
1286  std::string errMsg;
1287  bool success = true;
1288  std::string hashSetName = request->getDatabase() + ":" + request->getSetName();
1289  AbstractHashSetPtr hashSet = this->getHashSet(hashSetName);
1290  if (hashSet != nullptr) {
1291  hashSet->cleanup();
1292  this->removeHashSet(hashSetName);
1293  } else {
1294  errMsg = std::string("Can't remove hash set ") + hashSetName +
1295  std::string(": set doesn't exist");
1296  success = false;
1297  }
1298  PDB_COUT << "to send back reply" << std::endl;
1299  const UseTemporaryAllocationBlock block{1024};
1300  Handle<SimpleRequestResult> response = makeObject<SimpleRequestResult>(success, errMsg);
1301  // return the result
1302  success = sendUsingMe->sendObject(response, errMsg);
1303  return make_pair(success, errMsg);
1304 
1305  }));
1306 
1307 
1308  // register a handler to process the BackendTestSetScan message
1309  forMe.registerHandler(
1310  BackendTestSetCopy_TYPEID,
1312  Handle<BackendTestSetCopy> request, PDBCommunicatorPtr sendUsingMe) {
1313  bool res;
1314  std::string errMsg;
1315 
1316  // get input and output information
1317  DatabaseID dbIdIn = request->getDatabaseIn();
1318  UserTypeID typeIdIn = request->getTypeIdIn();
1319  SetID setIdIn = request->getSetIdIn();
1320  DatabaseID dbIdOut = request->getDatabaseOut();
1321  UserTypeID typeIdOut = request->getTypeIdOut();
1322  SetID setIdOut = request->getSetIdOut();
1323 
1324  int numThreads = getFunctionality<HermesExecutionServer>().getConf()->getNumThreads();
1325  NodeID nodeId = getFunctionality<HermesExecutionServer>().getNodeID();
1326  pdb::PDBLoggerPtr logger = getFunctionality<HermesExecutionServer>().getLogger();
1327  SharedMemPtr shm = getFunctionality<HermesExecutionServer>().getSharedMem();
1328  int backendCircularBufferSize = 3;
1329 
1330 
1331  // create a scanner for input set
1332  PDBCommunicatorPtr communicatorToFrontend = make_shared<PDBCommunicator>();
1333  communicatorToFrontend->connectToInternetServer(
1334  logger,
1335  getFunctionality<HermesExecutionServer>().getConf()->getPort(),
1336  "localhost",
1337  errMsg);
1338  PageScannerPtr scanner = make_shared<PageScanner>(
1339  communicatorToFrontend, shm, logger, numThreads, backendCircularBufferSize, nodeId);
1340 
1341  if (getFunctionality<HermesExecutionServer>().setCurPageScanner(scanner) == false) {
1342  res = false;
1343  errMsg = "Error: A job is already running!";
1344  std::cout << errMsg << std::endl;
1345  return make_pair(res, errMsg);
1346  }
1347 
1348  std::vector<PageCircularBufferIteratorPtr> iterators =
1349  scanner->getSetIterators(nodeId, dbIdIn, typeIdIn, setIdIn);
1350 
1351  int numIteratorsReturned = iterators.size();
1352  if (numIteratorsReturned != numThreads) {
1353  res = false;
1354  errMsg = "Error: number of iterators doesn't match number of threads!";
1355  std::cout << errMsg << std::endl;
1356  return make_pair(res, errMsg);
1357  }
1358 
1359 
1360  // create a data proxy for creating temp set
1361  PDBCommunicatorPtr anotherCommunicatorToFrontend = make_shared<PDBCommunicator>();
1362  anotherCommunicatorToFrontend->connectToInternetServer(
1363  logger,
1364  getFunctionality<HermesExecutionServer>().getConf()->getPort(),
1365  "localhost",
1366  errMsg);
1367  DataProxyPtr proxy =
1368  make_shared<DataProxy>(nodeId, anotherCommunicatorToFrontend, shm, logger);
1369  SetID tempSetId;
1370  proxy->addTempSet("intermediateData", tempSetId);
1371  PDB_COUT << "temp set created with setId = " << tempSetId << std::endl;
1372 
1373  PDBBuzzerPtr tempBuzzer = make_shared<PDBBuzzer>([&](PDBAlarm myAlarm, int &counter) {
1374  counter++;
1375  PDB_COUT << "counter = " << counter << std::endl;
1376  });
1377  int counter = 0;
1378 
1379  for (int i = 0; i < numThreads; i++) {
1380  PDBWorkerPtr worker =
1381  getFunctionality<HermesExecutionServer>().getWorkers()->getWorker();
1382  // starting processing threads;
1383  TestCopyWorkPtr testCopyWork =
1384  make_shared<TestCopyWork>(iterators.at(i),
1385  0,
1386  0,
1387  tempSetId,
1388  &(getFunctionality<HermesExecutionServer>()),
1389  counter);
1390  worker->execute(testCopyWork, tempBuzzer);
1391  }
1392 
1393  while (counter < numThreads) {
1394  tempBuzzer->wait();
1395  }
1396 
1397  counter = 0;
1398  PDB_COUT << "All objects have been copied from set with databaseID =" << dbIdIn
1399  << ", typeID=" << typeIdIn << ", setID=" << setIdIn << std::endl;
1400  PDB_COUT << "All objects have been copied to a temp set with setID =" << tempSetId
1401  << std::endl;
1402 
1403  // create a scanner for intermediate set
1404 
1405  communicatorToFrontend = make_shared<PDBCommunicator>();
1406  communicatorToFrontend->connectToInternetServer(
1407  logger,
1408  getFunctionality<HermesExecutionServer>().getConf()->getPort(),
1409  "localhost",
1410  errMsg);
1411  scanner = make_shared<PageScanner>(
1412  communicatorToFrontend, shm, logger, numThreads, backendCircularBufferSize, nodeId);
1413  getFunctionality<HermesExecutionServer>().setCurPageScanner(nullptr);
1414  getFunctionality<HermesExecutionServer>().setCurPageScanner(scanner);
1415  iterators = scanner->getSetIterators(nodeId, 0, 0, tempSetId);
1416 
1417  PDBBuzzerPtr anotherTempBuzzer =
1418  make_shared<PDBBuzzer>([&](PDBAlarm myAlarm, int &counter) {
1419  counter++;
1420  PDB_COUT << "counter = " << counter << std::endl;
1421  });
1422 
1423  for (int i = 0; i < numThreads; i++) {
1424  PDBWorkerPtr worker =
1425  getFunctionality<HermesExecutionServer>().getWorkers()->getWorker();
1426 
1427  // starting processing threads;
1428  TestCopyWorkPtr testCopyWork =
1429  make_shared<TestCopyWork>(iterators.at(i),
1430  dbIdOut,
1431  typeIdOut,
1432  setIdOut,
1433  &(getFunctionality<HermesExecutionServer>()),
1434  counter);
1435  worker->execute(testCopyWork, anotherTempBuzzer);
1436  }
1437 
1438  while (counter < numThreads) {
1439  anotherTempBuzzer->wait();
1440  }
1441 
1442  PDB_COUT << "All objects have been copied from a temp set with setID=" << tempSetId
1443  << std::endl;
1444  PDB_COUT << "All objects have been copied to a set with databaseID=" << dbIdOut
1445  << ", typeID=" << typeIdOut << ", setID =" << setIdOut << std::endl;
1446 
1447  getFunctionality<HermesExecutionServer>().setCurPageScanner(nullptr);
1448  res = proxy->removeTempSet(tempSetId);
1449  if (res == true) {
1450  PDB_COUT << "temp set removed with setId = " << tempSetId << std::endl;
1451  } else {
1452  errMsg = "Fatal error: Temp Set doesn't exist!";
1453  std::cout << errMsg << std::endl;
1454  }
1455 
1456  const UseTemporaryAllocationBlock block{1024};
1457  Handle<SimpleRequestResult> response = makeObject<SimpleRequestResult>(res, errMsg);
1458 
1459  // return the result
1460  res = sendUsingMe->sendObject(response, errMsg);
1461  return make_pair(res, errMsg);
1462 
1463  }));
1464 }
1465 }
1466 
1467 #endif
shared_ptr< PageScanner > PageScannerPtr
Definition: PageScanner.h:39
std::shared_ptr< AbstractHashSet > AbstractHashSetPtr
unsigned int SetID
Definition: DataTypes.h:31
shared_ptr< PDBPage > PDBPagePtr
Definition: PDBPage.h:32
void registerHandlers(PDBServer &forMe) override
Handle< ObjType > getRootObject()
Definition: Record.cc:46
shared_ptr< DataProxy > DataProxyPtr
Definition: DataProxy.h:30
bool addHashSet(std::string name, AbstractHashSetPtr hashSet)
void setPolicy(AllocatorPolicy policy)
Definition: Allocator.cc:536
unsigned int NodeID
Definition: DataTypes.h:27
std::shared_ptr< SinkMerger > SinkMergerPtr
Definition: SinkMerger.h:30
Allocator & getAllocator()
Definition: Allocator.cc:943
unsigned int HashPartitionID
Definition: DataTypes.h:28
#define JOIN_HASH_TABLE_SIZE_RATIO
bool setCurPageScanner(PageScannerPtr curPageScanner)
shared_ptr< PDBWork > PDBWorkPtr
Definition: PDBWork.h:47
shared_ptr< TestCopyWork > TestCopyWorkPtr
Definition: TestCopyWork.h:30
shared_ptr< SharedMem > SharedMemPtr
Definition: SharedMem.h:32
std::shared_ptr< PartitionedHashSet > PartitionedHashSetPtr
unsigned int DatabaseID
Definition: DataTypes.h:29
Record< ObjType > * getRecord(Handle< ObjType > &forMe)
std::shared_ptr< SharedHashSet > SharedHashSetPtr
Definition: SharedHashSet.h:26
std::shared_ptr< PDBCommunicator > PDBCommunicatorPtr
std::shared_ptr< SimpleSingleTableQueryProcessor > SimpleSingleTableQueryProcessorPtr
bool removeHashSet(std::string name)
shared_ptr< PDBBuzzer > PDBBuzzerPtr
Definition: PDBBuzzer.h:32
#define PDB_COUT
Definition: PDBDebug.h:31
shared_ptr< Configuration > ConfigurationPtr
Definition: Configuration.h:89
shared_ptr< PDBWorker > PDBWorkerPtr
Definition: PDBWorker.h:40
void registerHandler(int16_t typeID, PDBCommWorkPtr handledBy)
Definition: PDBServer.cc:81
std::shared_ptr< RecordIterator > RecordIteratorPtr
PDBAlarm
Definition: PDBAlarm.h:28
std::shared_ptr< PDBLogger > PDBLoggerPtr
Definition: PDBLogger.h:40
std::shared_ptr< SetSpecifier > SetSpecifierPtr
Definition: SetSpecifier.h:28
shared_ptr< TestScanWork > TestScanWorkPtr
Definition: TestScanWork.h:27
shared_ptr< PageCircularBuffer > PageCircularBufferPtr
shared_ptr< PageCircularBufferIterator > PageCircularBufferIteratorPtr
AbstractHashSetPtr getHashSet(std::string name)
#define HASH_PARTITIONED_JOIN_SIZE_RATIO
unsigned int UserTypeID
Definition: DataTypes.h:25
std::string printInactiveBlocks()
Definition: Allocator.cc:888