19 #ifndef HERMES_EXECUTION_SERVER_CC
20 #define HERMES_EXECUTION_SERVER_CC
49 #ifndef JOIN_HASH_TABLE_SIZE_RATIO
50 #define JOIN_HASH_TABLE_SIZE_RATIO 1.5
53 #ifndef HASH_PARTITIONED_JOIN_SIZE_RATIO
54 #define HASH_PARTITIONED_JOIN_SIZE_RATIO 1
66 StoragePagePinned_TYPEID,
69 PDB_COUT <<
"Start a handler to process StoragePagePinned messages\n";
73 if (scanner ==
nullptr) {
75 errMsg =
"Fatal Error: No job is running in execution server.";
76 std::cout << errMsg << std::endl;
78 PDB_COUT <<
"StoragePagePinned handler: to throw pinned pages to a circular buffer!"
80 scanner->recvPagesLoop(request, sendUsingMe);
83 return make_pair(res, errMsg);
94 PDB_COUT <<
"Got StorageNoMorePage object." << std::endl;
97 PDB_COUT <<
"To close the scanner..." << std::endl;
98 if (scanner ==
nullptr) {
99 PDB_COUT <<
"The scanner has already been closed." << std::endl;
101 scanner->closeBuffer();
102 PDB_COUT <<
"We closed the scanner buffer." << std::endl;
105 return make_pair(res, errMsg);
111 BackendTestSetScan_TYPEID,
119 SetID setId = request->getSetID();
120 PDB_COUT <<
"Backend received BackendTestSetScan message with dbId=" << dbId
121 <<
", typeId=" << typeId <<
", setId=" << setId << std::endl;
123 int numThreads = getFunctionality<HermesExecutionServer>().
getConf()->getNumThreads();
127 int backendCircularBufferSize = 3;
130 communicatorToFrontend->connectToInternetServer(
132 getFunctionality<HermesExecutionServer>().
getConf()->getPort(),
136 communicatorToFrontend,
shm,
logger, numThreads, backendCircularBufferSize,
nodeId);
138 if (getFunctionality<HermesExecutionServer>().setCurPageScanner(scanner) ==
false) {
140 errMsg =
"Error: A job is already running!";
141 std::cout << errMsg << std::endl;
142 return make_pair(res, errMsg);
145 std::vector<PageCircularBufferIteratorPtr> iterators =
146 scanner->getSetIterators(nodeId, dbId, typeId, setId);
148 int numIteratorsReturned = iterators.size();
149 if (numIteratorsReturned != numThreads) {
151 errMsg =
"Error: number of iterators doesn't match number of threads!";
152 std::cout << errMsg << std::endl;
153 return make_pair(res, errMsg);
155 PDB_COUT <<
"Buzzer is created in TestScanWork\n";
158 PDB_COUT <<
"counter = " << counter << std::endl;
161 for (
int i = 0; i < numThreads; i++) {
163 getFunctionality<HermesExecutionServer>().
getWorkers()->getWorker();
167 iterators.at(i), &(getFunctionality<HermesExecutionServer>()), counter);
168 worker->execute(testScanWork, tempBuzzer);
171 while (counter < numThreads) {
180 res = sendUsingMe->sendObject(response, errMsg);
181 return make_pair(res, errMsg);
188 BroadcastJoinBuildHTJobStage_TYPEID,
195 #ifdef ENABLE_LARGE_GRAPH
197 (size_t) ((
size_t) 256 * (size_t) 1024 * (
size_t) 1024)};
200 (size_t)((
size_t)32 * (size_t)1024 * (
size_t)1024)};
204 PDB_COUT <<
"Backend got Broadcast JobStage message with Id=" << request->getStageId()
209 size_t hashSetSize =
conf->getBroadcastPageSize() * (size_t) (request->getNumPages()) *
211 std::cout <<
"BroadcastJoinBuildHTJobStage: hashSetSize=" << hashSetSize << std::endl;
213 make_shared<SharedHashSet>(request->getHashSetName(), hashSetSize);
214 if (sharedHashSet->isValid() ==
false) {
215 hashSetSize =
conf->getBroadcastPageSize() * (size_t) (request->getNumPages()) * 1.5;
217 size_t memSize = request->getTotalMemoryOnThisNode();
218 size_t sharedMemPoolSize =
conf->getShmSize();
219 if (hashSetSize > (memSize - sharedMemPoolSize) * 0.8) {
220 hashSetSize = (memSize - sharedMemPoolSize) * 0.8;
221 std::cout <<
"WARNING: no more memory on heap can be allocated for hash set, "
222 "we reduce hash set size."
226 std::cout <<
"BroadcastJoinBuildHTJobStage: tuned hashSetSize to be " << hashSetSize
228 sharedHashSet = make_shared<SharedHashSet>(request->getHashSetName(), hashSetSize);
230 if (sharedHashSet->isValid() ==
false) {
232 errMsg =
"Error: heap memory becomes insufficient";
233 std::cout << errMsg << std::endl;
235 PDB_COUT <<
"to send back reply" << std::endl;
238 makeObject<SimpleRequestResult>(success, errMsg);
240 success = sendUsingMe->sendObject(response, errMsg);
241 return make_pair(success, errMsg);
243 this->
addHashSet(request->getHashSetName(), sharedHashSet);
244 std::cout <<
"BroadcastJoinBuildHTJobStage: hashSetName=" << request->getHashSetName()
248 int backendCircularBufferSize = 1;
249 if (
conf->getShmSize() /
conf->getPageSize() - 2 <
250 2 + 2 * numThreads + backendCircularBufferSize) {
252 errMsg =
"Error: Not enough buffer pool size to run the query!";
253 std::cout << errMsg << std::endl;
255 PDB_COUT <<
"to send back reply" << std::endl;
258 makeObject<SimpleRequestResult>(success, errMsg);
260 success = sendUsingMe->sendObject(response, errMsg);
261 return make_pair(success, errMsg);
263 backendCircularBufferSize =
264 (
conf->getShmSize() /
conf->getPageSize() - 4 - 2 * numThreads);
265 if (backendCircularBufferSize > 10) {
266 backendCircularBufferSize = 10;
269 PDB_COUT <<
"backendCircularBufferSize is tuned to be " << backendCircularBufferSize
274 PDBLoggerPtr scanLogger = make_shared<PDBLogger>(
"agg-scanner.log");
276 communicatorToFrontend->connectToInternetServer(
278 PageScannerPtr scanner = make_shared<PageScanner>(communicatorToFrontend,
282 backendCircularBufferSize,
284 if (getFunctionality<HermesExecutionServer>().setCurPageScanner(scanner) ==
false) {
286 errMsg =
"Error: A job is already running!";
287 std::cout << errMsg << std::endl;
289 PDB_COUT <<
"to send back reply" << std::endl;
291 makeObject<SimpleRequestResult>(success, errMsg);
293 success = sendUsingMe->sendObject(response, errMsg);
294 return make_pair(success, errMsg);
298 PDB_COUT <<
"To send GetSetPages message" << std::endl;
299 std::vector<PageCircularBufferIteratorPtr> iterators =
300 scanner->getSetIterators(
nodeId,
301 request->getSourceContext()->getDatabaseId(),
302 request->getSourceContext()->getTypeId(),
303 request->getSourceContext()->getSetId());
304 PDB_COUT <<
"GetSetPages message is sent" << std::endl;
308 anotherCommunicatorToFrontend->connectToInternetServer(
311 make_shared<DataProxy>(
nodeId, anotherCommunicatorToFrontend,
shm,
logger);
317 std::cout <<
"BroadcastJoinBuildHTJobStage-backend: print inactive blocks:"
319 std::cout << out << std::endl;
321 PDB_COUT <<
"hashSetSize = " << hashSetSize << std::endl;
324 std::string sourceTupleSetSpecifier = request->getSourceTupleSetSpecifier();
325 std::string targetTupleSetSpecifier = request->getTargetTupleSetSpecifier();
326 std::string targetComputationSpecifier = request->getTargetComputationSpecifier();
329 sourceTupleSetSpecifier, targetTupleSetSpecifier, targetComputationSpecifier);
335 while (iter->hasNext()) {
337 if (page !=
nullptr) {
340 while (recordIter->hasNext() ==
true) {
342 if (record !=
nullptr) {
345 merger->writeOut(theOtherMap, myMap);
349 proxy->unpinUserPage(
350 nodeId, page->getDbID(), page->getTypeID(), page->getSetID(), page);
361 errMsg =
"Error: No job is running!";
362 std::cout << errMsg << std::endl;
365 PDB_COUT <<
"to send back reply" << std::endl;
369 success = sendUsingMe->sendObject(response, errMsg);
370 return make_pair(success, errMsg);
376 AggregationJobStage_TYPEID,
385 std::cout <<
"Backend got Aggregation JobStage message with Id="
386 << request->getStageId() << std::endl;
391 std::cout <<
"AggregationJobStage-backend: print inactive blocks:" << std::endl;
392 std::cout << out << std::endl;
396 int numPartitions = request->getNumNodePartitions();
404 size_t memSize = request->getTotalMemoryOnThisNode();
405 size_t sharedMemPoolSize =
conf->getShmSize();
407 #ifdef ENABLE_LARGE_GRAPH
408 size_t tunedHashPageSize =
409 (double) (memSize * ((
size_t) (1024)) - sharedMemPoolSize -
410 ((size_t) (
conf->getNumThreads()) * (
size_t) (256) * (
size_t) (1024) *
413 (ratio) / (double) (numPartitions);
415 size_t tunedHashPageSize =
416 (double)(memSize * ((
size_t)(1024)) - sharedMemPoolSize -
418 (ratio) / (double)(numPartitions);
420 if (memSize * ((
size_t) (1024)) <
421 sharedMemPoolSize + (size_t) 512 * (
size_t) 1024 * (size_t) 1024) {
422 std::cout <<
"WARNING: Auto tuning can not work, use default values" << std::endl;
423 tunedHashPageSize =
conf->getHashPageSize();
425 std::cout <<
"Tuned hash page size is " << tunedHashPageSize << std::endl;
426 conf->setHashPageSize(tunedHashPageSize);
431 int aggregationBufferSize = 2;
432 std::vector<PageCircularBufferPtr> hashBuffers;
433 std::vector<PageCircularBufferIteratorPtr> hashIters;
435 pthread_mutex_t connection_mutex;
436 pthread_mutex_init(&connection_mutex,
nullptr);
439 pthread_mutex_lock(&connection_mutex);
441 communicatorToFrontend->connectToInternetServer(
443 pthread_mutex_unlock(&connection_mutex);
448 make_shared<PDBBuzzer>([&](
PDBAlarm myAlarm,
int &hashCounter) {
450 PDB_COUT <<
"hashCounter = " << hashCounter << std::endl;
452 std::cout <<
"to run aggregation with " << numPartitions <<
" threads." << std::endl;
455 std::string hashSetName =
"";
457 if (request->needsToMaterializeAggOut() ==
false) {
459 std::string dbName = sinkSetIdentifier->getDatabase();
460 std::string setName = sinkSetIdentifier->getSetName();
461 hashSetName = dbName +
":" + setName;
463 make_shared<PartitionedHashSet>(hashSetName, this->
conf->getHashPageSize());
464 this->
addHashSet(hashSetName, aggregationSet);
471 for (i = 0; i < numPartitions; i++) {
473 make_shared<PDBLogger>(std::string(
"aggregation-") + std::to_string(i));
475 make_shared<PageCircularBuffer>(aggregationBufferSize, myLogger);
476 hashBuffers.push_back(buffer);
478 make_shared<PageCircularBufferIterator>(i, buffer, myLogger);
479 hashIters.push_back(iter);
481 getFunctionality<HermesExecutionServer>().
getWorkers()->getWorker();
482 PDB_COUT <<
"to run the " << i <<
"-th work..." << std::endl;
490 (
size_t) ((
size_t) 32 * (
size_t) 1024 * (
size_t) 1024));
492 (
size_t) ((
size_t) 256 * (
size_t) 1024 * (
size_t) 1024));
494 pthread_mutex_lock(&connection_mutex);
496 make_shared<PDBCommunicator>();
497 anotherCommunicatorToFrontend->connectToInternetServer(
499 pthread_mutex_unlock(&connection_mutex);
501 make_shared<DataProxy>(
nodeId, anotherCommunicatorToFrontend,
shm,
logger);
502 #ifdef ENABLE_LARGE_GRAPH
512 deepCopyToCurrentAllocationBlock<AbstractAggregateComp>(aggComputation);
517 aggregateProcessor->initialize();
519 if (request->needsToMaterializeAggOut() ==
false) {
521 void *outBytes =
nullptr;
522 while (myIter->hasNext()) {
524 if (page !=
nullptr) {
529 if (inputData !=
nullptr) {
530 inputSize = inputData->size();
532 for (
int j = 0; j < inputSize; j++) {
533 aggregateProcessor->loadInputObject((*inputData)[j]);
534 if (aggregateProcessor->needsProcessInput() ==
false) {
537 if (outBytes ==
nullptr) {
539 outBytes = aggregationSet->addPage();
540 if (outBytes ==
nullptr) {
541 std::cout <<
"insufficient memory in heap" << std::endl;
544 aggregateProcessor->loadOutputPage(
545 outBytes, aggregationSet->getPageSize());
547 if (aggregateProcessor->fillNextOutputPage()) {
548 aggregateProcessor->clearOutputPage();
550 <<
"WARNING: aggregation for partition-" << i
551 <<
" can't finish in one aggregation page with size="
552 << aggregationSet->getPageSize() << std::endl;
553 std::cout <<
"WARNING: results may not be fully aggregated "
555 << i <<
", please increase hash page size!!"
557 logger->error(std::string(
558 "Hash page size is too small or memory is "
559 "insufficient, results are not fully aggregated!"));
566 if (page->getRefCount() == 0) {
567 proxy->unpinUserPage(nodeId,
575 if (outBytes !=
nullptr) {
576 aggregateProcessor->finalize();
577 aggregateProcessor->fillNextOutputPage();
578 aggregateProcessor->clearOutputPage();
584 make_shared<SetSpecifier>(request->getSinkContext()->getDatabase(),
585 request->getSinkContext()->getSetName(),
586 request->getSinkContext()->getDatabaseId(),
587 request->getSinkContext()->getTypeId(),
588 request->getSinkContext()->getSetId());
592 size_t aggregationPageSize =
conf->getHashPageSize();
594 void *aggregationPage =
nullptr;
598 newAgg->getAggOutProcessor();
599 aggOutProcessor->initialize();
601 while (myIter->hasNext()) {
603 if (page !=
nullptr) {
609 if (inputData !=
nullptr) {
610 inputSize = inputData->size();
612 for (
int j = 0; j < inputSize; j++) {
613 aggregateProcessor->loadInputObject((*inputData)[j]);
614 if (aggregateProcessor->needsProcessInput() ==
false) {
617 if (aggregationPage ==
nullptr) {
619 (
void *) malloc(aggregationPageSize *
sizeof(
char));
620 aggregateProcessor->loadOutputPage(aggregationPage,
621 aggregationPageSize);
623 if (aggregateProcessor->fillNextOutputPage()) {
625 <<
"WARNING: aggregation for partition-" << i
626 <<
" can't finish in one aggregation page with size="
627 << aggregationPageSize << std::endl;
628 std::cout <<
"WARNING: results may not be fully aggregated "
631 <<
", please ask PDB admin to tune memory size!!"
633 logger->error(std::string(
634 "Hash page size is too small or memory is "
635 "insufficient, results are not fully aggregated!"));
638 aggOutProcessor->loadInputPage(aggregationPage);
640 if (output ==
nullptr) {
641 proxy->addUserPage(outputSet->getDatabaseId(),
642 outputSet->getTypeId(),
643 outputSet->getSetId(),
645 aggOutProcessor->loadOutputPage(output->getBytes(),
648 while (aggOutProcessor->fillNextOutputPage()) {
649 aggOutProcessor->clearOutputPage();
650 PDB_COUT << i <<
": AggOutProcessor: we now filled an "
651 "output page and unpin it"
654 proxy->unpinUserPage(nodeId,
655 outputSet->getDatabaseId(),
656 outputSet->getTypeId(),
657 outputSet->getSetId(),
660 proxy->addUserPage(outputSet->getDatabaseId(),
661 outputSet->getTypeId(),
662 outputSet->getSetId(),
665 aggOutProcessor->loadOutputPage(output->getBytes(),
668 aggregateProcessor->clearOutputPage();
669 free(aggregationPage);
676 if (page->getRefCount() == 0) {
677 proxy->unpinUserPage(nodeId,
685 if (aggregationPage !=
nullptr) {
687 aggregateProcessor->finalize();
688 aggregateProcessor->fillNextOutputPage();
690 aggOutProcessor->loadInputPage(aggregationPage);
692 if (output ==
nullptr) {
693 proxy->addUserPage(outputSet->getDatabaseId(),
694 outputSet->getTypeId(),
695 outputSet->getSetId(),
697 aggOutProcessor->loadOutputPage(output->getBytes(),
700 while (aggOutProcessor->fillNextOutputPage()) {
701 aggOutProcessor->clearOutputPage();
703 proxy->unpinUserPage(nodeId,
704 outputSet->getDatabaseId(),
705 outputSet->getTypeId(),
706 outputSet->getSetId(),
709 proxy->addUserPage(outputSet->getDatabaseId(),
710 outputSet->getTypeId(),
711 outputSet->getSetId(),
714 aggOutProcessor->loadOutputPage(output->getBytes(),
719 aggOutProcessor->finalize();
720 aggOutProcessor->fillNextOutputPage();
721 aggOutProcessor->clearOutputPage();
722 proxy->unpinUserPage(nodeId,
723 outputSet->getDatabaseId(),
724 outputSet->getTypeId(),
725 outputSet->getSetId(),
728 aggregateProcessor->clearOutputPage();
729 free(aggregationPage);
736 std::cout <<
"AggregationJobStage-backend-thread: print inactive blocks:"
738 std::cout << out << std::endl;
743 worker->execute(myWork, hashBuzzer);
751 int backendCircularBufferSize = numPartitions;
753 PDBLoggerPtr scanLogger = make_shared<PDBLogger>(
"agg-scanner.log");
754 PageScannerPtr scanner = make_shared<PageScanner>(communicatorToFrontend,
758 backendCircularBufferSize,
760 if (getFunctionality<HermesExecutionServer>().setCurPageScanner(scanner) ==
false) {
762 errMsg =
"Error: A job is already running!";
763 std::cout << errMsg << std::endl;
765 PDB_COUT <<
"to send back reply" << std::endl;
768 makeObject<SimpleRequestResult>(success, errMsg);
770 success = sendUsingMe->sendObject(response, errMsg);
771 return make_pair(success, errMsg);
775 std::cout <<
"To send GetSetPages message" << std::endl;
776 std::vector<PageCircularBufferIteratorPtr> iterators =
777 scanner->getSetIterators(
nodeId,
778 request->getSourceContext()->getDatabaseId(),
779 request->getSourceContext()->getTypeId(),
780 request->getSourceContext()->getSetId());
781 std::cout <<
"GetSetPages message is sent" << std::endl;
782 int numIteratorsReturned = iterators.size();
783 if (numIteratorsReturned != numThreads) {
785 for (k = 0; k < numPartitions; k++) {
790 while (hashCounter < numPartitions) {
793 pthread_mutex_destroy(&connection_mutex);
795 errMsg =
"Error: number of iterators doesn't match number of threads!";
796 std::cout << errMsg << std::endl;
798 PDB_COUT <<
"to send back reply" << std::endl;
801 makeObject<SimpleRequestResult>(success, errMsg);
803 success = sendUsingMe->sendObject(response, errMsg);
804 return make_pair(success, errMsg);
810 PDB_COUT <<
"scan counter = " << counter << std::endl;
814 for (
int j = 0; j < numThreads; j++) {
816 getFunctionality<HermesExecutionServer>().
getWorkers()->getWorker();
817 std::cout <<
"to run the " << j <<
"-th scan work..." << std::endl;
824 while (iter->hasNext()) {
826 if (page !=
nullptr) {
828 for (k = 0; k < numPartitions; k++) {
831 for (k = 0; k < numPartitions; k++) {
832 hashBuffers[k]->addPageToTail(page);
839 worker->execute(myWork, tempBuzzer);
842 while (counter < numThreads) {
847 for (k = 0; k < numPartitions; k++) {
854 while (hashCounter < numPartitions) {
859 pthread_mutex_destroy(&connection_mutex);
861 if (getFunctionality<HermesExecutionServer>().
setCurPageScanner(
nullptr) ==
false) {
863 errMsg =
"Error: No job is running!";
864 std::cout << errMsg << std::endl;
869 PDB_COUT <<
"to send back reply" << std::endl;
873 success = sendUsingMe->sendObject(response, errMsg);
874 return make_pair(success, errMsg);
882 HashPartitionedJoinBuildHTJobStage_TYPEID,
890 std::cout <<
"Backend got HashPartitionedJoinBuildHTJobStage message with Id="
891 << request->getStageId() << std::endl;
896 std::cout <<
"HashPartitionedJoinBuildHTJobStage-backend: print inactive blocks:"
898 std::cout << out << std::endl;
903 int numPartitions = request->getNumNodePartitions();
904 int numPages = request->getNumPages();
909 if (sizeRatio > numPartitions) {
910 sizeRatio = numPartitions;
912 size_t hashSetSize = (double) (
conf->getShufflePageSize()) *
913 (
double) (numPages) * sizeRatio / (
double) (numPartitions);
915 std::string hashSetName = request->getHashSetName();
917 this->
addHashSet(hashSetName, partitionedSet);
918 std::cout <<
"Added hash set for HashPartitionedJoin to probe" << std::endl;
919 for (
int i = 0; i < numPartitions; i++) {
920 void *bytes = partitionedSet->addPage();
921 if (bytes ==
nullptr) {
922 std::cout <<
"Insufficient memory in heap" << std::endl;
927 int buildingHTBufferSize = 2;
928 std::vector<PageCircularBufferPtr> hashBuffers;
929 std::vector<PageCircularBufferIteratorPtr> hashIters;
931 pthread_mutex_t connection_mutex;
932 pthread_mutex_init(&connection_mutex,
nullptr);
935 pthread_mutex_lock(&connection_mutex);
937 communicatorToFrontend->connectToInternetServer(
939 pthread_mutex_unlock(&connection_mutex);
944 make_shared<PDBBuzzer>([&](
PDBAlarm myAlarm,
int &hashCounter) {
946 PDB_COUT <<
"hashCounter = " << hashCounter << std::endl;
948 std::cout <<
"to build hashtables with " << numPartitions <<
" threads." << std::endl;
952 std::string sourceTupleSetSpecifier = request->getSourceTupleSetSpecifier();
953 std::string targetTupleSetSpecifier = request->getTargetTupleSetSpecifier();
954 std::string targetComputationSpecifier = request->getTargetComputationSpecifier();
956 SinkMergerPtr merger = myComputePlan->getMerger(sourceTupleSetSpecifier,
957 targetTupleSetSpecifier,
958 targetComputationSpecifier);
968 for (
int i = 0; i < numPartitions; i++) {
969 PDBLoggerPtr myLogger = make_shared<PDBLogger>(std::string(
"buildHT-") + std::to_string(i));
971 hashBuffers.push_back(buffer);
973 hashIters.push_back(iter);
975 PDB_COUT <<
"to run the " << i <<
"-th work..." << std::endl;
979 pthread_mutex_lock(&connection_mutex);
981 anotherCommunicatorToFrontend->connectToInternetServer(
logger,
983 conf->getServerAddress(),
985 pthread_mutex_unlock(&connection_mutex);
997 <<
"HashPartitionedJoinBuildHTJobStage-backend: print inactive blocks:"
999 std::cout << out << std::endl;
1001 PDB_COUT <<
"hashSetSize = " << hashSetSize << std::endl;
1008 while (myIter->hasNext()) {
1009 page = myIter->next();
1010 if (page !=
nullptr) {
1013 while (recordIter->hasNext()) {
1015 if (record !=
nullptr) {
1017 merger->writeVectorOut(mapsToMerge, myMap);
1021 page->decRefCount();
1022 if (page->getRefCount() == 0) {
1023 proxy->unpinUserPage(nodeId,
1031 PDB_COUT <<
"####Scanner got a null page" << std::endl;
1034 PDB_COUT <<
"To get record" << std::endl;
1040 std::cout <<
"HashPartitionedJoinBuildHTJobStage-backend-thread: print "
1043 std::cout << out << std::endl;
1048 worker->execute(myWork, hashBuzzer);
1058 int backendCircularBufferSize = numPartitions;
1060 PDBLoggerPtr scanLogger = make_shared<PDBLogger>(
"buildHTs-scanner.log");
1061 PageScannerPtr scanner = make_shared<PageScanner>(communicatorToFrontend,
1065 backendCircularBufferSize,
1067 if (getFunctionality<HermesExecutionServer>().setCurPageScanner(scanner) ==
false) {
1069 errMsg =
"Error: A job is already running!";
1070 std::cout << errMsg << std::endl;
1072 PDB_COUT <<
"to send back reply" << std::endl;
1076 success = sendUsingMe->sendObject(response, errMsg);
1077 return make_pair(success, errMsg);
1080 PDB_COUT <<
"To send GetSetPages message" << std::endl;
1081 std::vector<PageCircularBufferIteratorPtr> iterators = scanner->getSetIterators(
nodeId,
1082 request->getSourceContext()->getDatabaseId(),
1083 request->getSourceContext()->getTypeId(),
1084 request->getSourceContext()->getSetId());
1085 PDB_COUT <<
"GetSetPages message is sent" << std::endl;
1086 unsigned long numIteratorsReturned = iterators.size();
1087 if (numIteratorsReturned != numThreads) {
1089 for (k = 0; k < numPartitions; k++) {
1094 while (hashCounter < numPartitions) {
1097 pthread_mutex_destroy(&connection_mutex);
1099 errMsg =
"Error: number of iterators doesn't match number of threads!";
1100 std::cout << errMsg << std::endl;
1102 PDB_COUT <<
"to send back reply" << std::endl;
1106 success = sendUsingMe->sendObject(response, errMsg);
1107 return make_pair(success, errMsg);
1113 PDB_COUT <<
"scan counter = " << counter << std::endl;
1117 for (
int j = 0; j < numThreads; j++) {
1119 PDB_COUT <<
"to run the " << j <<
"-th scan work..." << std::endl;
1126 while (iter->hasNext()) {
1127 page = iter->next();
1128 if (page !=
nullptr) {
1130 for (k = 0; k < numPartitions; k++) {
1131 page->incRefCount();
1133 for (k = 0; k < numPartitions; k++) {
1134 hashBuffers[k]->addPageToTail(page);
1141 worker->execute(myWork, tempBuzzer);
1144 while (counter < numThreads) {
1149 for (k = 0; k < numPartitions; k++) {
1156 while (hashCounter < numPartitions) {
1161 pthread_mutex_destroy(&connection_mutex);
1163 if (getFunctionality<HermesExecutionServer>().
setCurPageScanner(
nullptr) ==
false) {
1165 errMsg =
"Error: No job is running!";
1166 std::cout << errMsg << std::endl;
1171 PDB_COUT <<
"to send back reply" << std::endl;
1175 success = sendUsingMe->sendObject(response, errMsg);
1176 return make_pair(success, errMsg);
1183 TupleSetJobStage_TYPEID,
1189 PDB_COUT <<
"Backend got Tuple JobStage message with Id=" << request->getStageId()
1194 #ifdef ENABLE_LARGE_GRAPH
1201 std::cout <<
"TupleSetJobStage-backend: print inactive blocks:" << std::endl;
1202 std::cout << out << std::endl;
1215 conf->getBatchSize(),
1216 conf->getNumThreads());
1217 if (request->isRepartitionJoin() ==
true) {
1218 PDB_COUT <<
"run pipeline for hash partitioned join" << std::endl;
1219 pipeline->runPipelineWithHashPartitionSink(
this);
1220 }
else if (((request->isRepartition() ==
false) ||
1221 (request->isCombining() ==
false)) &&
1222 (request->isBroadcasting() ==
false)) {
1223 PDB_COUT <<
"run pipeline..." << std::endl;
1224 pipeline->runPipeline(
this);
1225 }
else if (request->isBroadcasting() ==
true) {
1226 PDB_COUT <<
"run pipeline with broadcasting..." << std::endl;
1227 pipeline->runPipelineWithBroadcastSink(
this);
1229 PDB_COUT <<
"run pipeline with combiner..." << std::endl;
1230 pipeline->runPipelineWithShuffleSink(
this);
1232 if ((sourceContext->isAggregationResult() ==
true) &&
1234 std::string hashSetName =
1235 sourceContext->getDatabase() +
":" + sourceContext->getSetName();
1237 if (hashSet !=
nullptr) {
1241 std::cout <<
"Can't remove hash set " << hashSetName
1242 <<
": set doesn't exist" << std::endl;
1247 if (request->isProbing() ==
true) {
1249 if (hashTables !=
nullptr) {
1251 mapIter != hashTables->end();
1253 std::string key = (*mapIter).key;
1254 std::string hashSetName = (*mapIter).value;
1255 std::cout <<
"remove " << key <<
":" << hashSetName << std::endl;
1257 if (hashSet !=
nullptr) {
1261 std::cout <<
"Can't remove hash set " << hashSetName
1262 <<
": set doesn't exist" << std::endl;
1270 errMsg =
"A Job is already running in this server";
1271 std::cout << errMsg << std::endl;
1274 PDB_COUT <<
"to send back reply" << std::endl;
1278 res = sendUsingMe->sendObject(response, errMsg);
1279 return make_pair(res, errMsg);
1283 StorageRemoveHashSet_TYPEID,
1287 bool success =
true;
1288 std::string hashSetName = request->getDatabase() +
":" + request->getSetName();
1290 if (hashSet !=
nullptr) {
1294 errMsg = std::string(
"Can't remove hash set ") + hashSetName +
1295 std::string(
": set doesn't exist");
1298 PDB_COUT <<
"to send back reply" << std::endl;
1302 success = sendUsingMe->sendObject(response, errMsg);
1303 return make_pair(success, errMsg);
1310 BackendTestSetCopy_TYPEID,
1317 DatabaseID dbIdIn = request->getDatabaseIn();
1318 UserTypeID typeIdIn = request->getTypeIdIn();
1319 SetID setIdIn = request->getSetIdIn();
1320 DatabaseID dbIdOut = request->getDatabaseOut();
1321 UserTypeID typeIdOut = request->getTypeIdOut();
1322 SetID setIdOut = request->getSetIdOut();
1324 int numThreads = getFunctionality<HermesExecutionServer>().
getConf()->getNumThreads();
1328 int backendCircularBufferSize = 3;
1333 communicatorToFrontend->connectToInternetServer(
1335 getFunctionality<HermesExecutionServer>().
getConf()->getPort(),
1339 communicatorToFrontend,
shm,
logger, numThreads, backendCircularBufferSize,
nodeId);
1341 if (getFunctionality<HermesExecutionServer>().setCurPageScanner(scanner) ==
false) {
1343 errMsg =
"Error: A job is already running!";
1344 std::cout << errMsg << std::endl;
1345 return make_pair(res, errMsg);
1348 std::vector<PageCircularBufferIteratorPtr> iterators =
1349 scanner->getSetIterators(nodeId, dbIdIn, typeIdIn, setIdIn);
1351 int numIteratorsReturned = iterators.size();
1352 if (numIteratorsReturned != numThreads) {
1354 errMsg =
"Error: number of iterators doesn't match number of threads!";
1355 std::cout << errMsg << std::endl;
1356 return make_pair(res, errMsg);
1362 anotherCommunicatorToFrontend->connectToInternetServer(
1364 getFunctionality<HermesExecutionServer>().
getConf()->getPort(),
1368 make_shared<DataProxy>(
nodeId, anotherCommunicatorToFrontend,
shm,
logger);
1370 proxy->addTempSet(
"intermediateData", tempSetId);
1371 PDB_COUT <<
"temp set created with setId = " << tempSetId << std::endl;
1375 PDB_COUT <<
"counter = " << counter << std::endl;
1379 for (
int i = 0; i < numThreads; i++) {
1381 getFunctionality<HermesExecutionServer>().
getWorkers()->getWorker();
1384 make_shared<TestCopyWork>(iterators.at(i),
1388 &(getFunctionality<HermesExecutionServer>()),
1390 worker->execute(testCopyWork, tempBuzzer);
1393 while (counter < numThreads) {
1398 PDB_COUT <<
"All objects have been copied from set with databaseID =" << dbIdIn
1399 <<
", typeID=" << typeIdIn <<
", setID=" << setIdIn << std::endl;
1400 PDB_COUT <<
"All objects have been copied to a temp set with setID =" << tempSetId
1405 communicatorToFrontend = make_shared<PDBCommunicator>();
1406 communicatorToFrontend->connectToInternetServer(
1408 getFunctionality<HermesExecutionServer>().
getConf()->getPort(),
1411 scanner = make_shared<PageScanner>(
1412 communicatorToFrontend,
shm,
logger, numThreads, backendCircularBufferSize,
nodeId);
1415 iterators = scanner->getSetIterators(nodeId, 0, 0, tempSetId);
1418 make_shared<PDBBuzzer>([&](
PDBAlarm myAlarm,
int &counter) {
1420 PDB_COUT <<
"counter = " << counter << std::endl;
1423 for (
int i = 0; i < numThreads; i++) {
1425 getFunctionality<HermesExecutionServer>().
getWorkers()->getWorker();
1429 make_shared<TestCopyWork>(iterators.at(i),
1433 &(getFunctionality<HermesExecutionServer>()),
1435 worker->execute(testCopyWork, anotherTempBuzzer);
1438 while (counter < numThreads) {
1439 anotherTempBuzzer->wait();
1442 PDB_COUT <<
"All objects have been copied from a temp set with setID=" << tempSetId
1444 PDB_COUT <<
"All objects have been copied to a set with databaseID=" << dbIdOut
1445 <<
", typeID=" << typeIdOut <<
", setID =" << setIdOut << std::endl;
1448 res = proxy->removeTempSet(tempSetId);
1450 PDB_COUT <<
"temp set removed with setId = " << tempSetId << std::endl;
1452 errMsg =
"Fatal error: Temp Set doesn't exist!";
1453 std::cout << errMsg << std::endl;
1460 res = sendUsingMe->sendObject(response, errMsg);
1461 return make_pair(res, errMsg);
shared_ptr< PageScanner > PageScannerPtr
std::shared_ptr< AbstractHashSet > AbstractHashSetPtr
shared_ptr< PDBPage > PDBPagePtr
void registerHandlers(PDBServer &forMe) override
ConfigurationPtr getConf()
Handle< ObjType > getRootObject()
shared_ptr< DataProxy > DataProxyPtr
bool addHashSet(std::string name, AbstractHashSetPtr hashSet)
void setPolicy(AllocatorPolicy policy)
std::shared_ptr< SinkMerger > SinkMergerPtr
Allocator & getAllocator()
unsigned int HashPartitionID
#define JOIN_HASH_TABLE_SIZE_RATIO
bool setCurPageScanner(PageScannerPtr curPageScanner)
PageScannerPtr getCurPageScanner()
shared_ptr< PDBWork > PDBWorkPtr
shared_ptr< TestCopyWork > TestCopyWorkPtr
shared_ptr< SharedMem > SharedMemPtr
std::shared_ptr< PartitionedHashSet > PartitionedHashSetPtr
Record< ObjType > * getRecord(Handle< ObjType > &forMe)
std::shared_ptr< SharedHashSet > SharedHashSetPtr
std::shared_ptr< PDBCommunicator > PDBCommunicatorPtr
std::shared_ptr< SimpleSingleTableQueryProcessor > SimpleSingleTableQueryProcessorPtr
SharedMemPtr getSharedMem()
bool removeHashSet(std::string name)
PDBWorkerQueuePtr getWorkers()
shared_ptr< PDBBuzzer > PDBBuzzerPtr
void cleanInactiveBlocks()
shared_ptr< Configuration > ConfigurationPtr
shared_ptr< PDBWorker > PDBWorkerPtr
void registerHandler(int16_t typeID, PDBCommWorkPtr handledBy)
std::shared_ptr< RecordIterator > RecordIteratorPtr
std::shared_ptr< PDBLogger > PDBLoggerPtr
pdb::PDBLoggerPtr getLogger()
std::shared_ptr< SetSpecifier > SetSpecifierPtr
shared_ptr< TestScanWork > TestScanWorkPtr
shared_ptr< PageCircularBuffer > PageCircularBufferPtr
shared_ptr< PageCircularBufferIterator > PageCircularBufferIteratorPtr
AbstractHashSetPtr getHashSet(std::string name)
#define HASH_PARTITIONED_JOIN_SIZE_RATIO
std::string printInactiveBlocks()