18 #ifndef PIPELINE_STAGE_CC
19 #define PIPELINE_STAGE_CC
49 #ifdef ENABLE_COMPRESSION
76 int numNodes = this->
jobStage->getNumNodes();
77 for (
int i = 0; i < numNodes; i++) {
94 std::string databaseName,
98 bool whetherToPersist,
99 std::string& errMsg) {
101 port =
conf->getPort();
103 std::cout <<
"store shuffle data to address=" << address <<
" and port=" << port
104 <<
", with size = " << data->size() <<
" to database=" << databaseName
105 <<
" and set=" << setName <<
" and type = IntermediateData" << std::endl;
113 if (result !=
nullptr)
114 if (!result->getRes().first) {
115 logger->error(
"Error sending data: " + result->getRes().second);
116 errMsg =
"Error sending data: " + result->getRes().second;
130 std::string databaseName,
134 std::string& errMsg) {
136 port =
conf->getPort();
138 std::cout <<
"store shuffle data to address=" << address <<
" and port=" << port
139 <<
", with compressed byte size = " << numBytes <<
" to database=" << databaseName
140 <<
" and set=" << setName <<
" and type = IntermediateData" << std::endl;
141 return simpleSendBytesRequest<StorageAddData, SimpleRequestResult, bool>(
148 if (result !=
nullptr)
149 if (!result->getRes().first) {
150 logger->error(
"Error sending data: " + result->getRes().second);
151 errMsg =
"Error sending data: " + result->getRes().second;
169 std::string databaseName,
171 std::string& errMsg) {
173 if (data !=
nullptr) {
174 #ifdef DEBUG_SHUFFLING
176 std::string fileName =
177 jobStage->getJobId() +
"_" + std::to_string(
jobStage->getStageId()) +
"_shuffle";
178 FILE* myFile = fopen(fileName.c_str(),
"w");
179 fwrite(data, 1, size, myFile);
184 databaseName, setName,
"IntermediateData",
false,
false);
185 conn->sendObject(request, errMsg);
186 #ifdef ENABLE_COMPRESSION
187 char* compressedBytes =
new char[snappy::MaxCompressedLength(size)];
188 size_t compressedSize;
189 snappy::RawCompress((
char*)data, size, compressedBytes, &compressedSize);
190 std::cout <<
"size before compression is " << size <<
" and size after compression is "
191 << compressedSize << std::endl;
192 conn->sendBytes(compressedBytes, compressedSize, errMsg);
193 delete[] compressedBytes;
195 conn->sendBytes(data, size, errMsg);
197 #ifdef DEBUG_SHUFFLING
199 std::string fileName1 =
200 jobStage->getJobId() +
"_" + std::to_string(
jobStage->getStageId()) +
"_sent";
201 FILE* myFile1 = fopen(fileName1.c_str(),
"w");
202 fwrite(data, 1, size, myFile1);
207 request->setLoopEnded();
208 conn->sendObject(request, errMsg);
217 int backendCircularBufferSize = 1;
218 if (
conf->getShmSize() /
conf->getPageSize() - 2 <
219 2 + 2 *
numThreads + backendCircularBufferSize) {
221 errMsg =
"Error: Not enough buffer pool size to run the query! Please reduce number of threads or increase shared memory pool size or reduce default page size and retry";
222 std::cout << errMsg << std::endl;
225 backendCircularBufferSize = (
conf->getShmSize() /
conf->getPageSize() - 4 - 2 *
numThreads);
226 if (backendCircularBufferSize > 10) {
227 backendCircularBufferSize = 10;
230 PDB_COUT <<
"backendCircularBufferSize is tuned to be " << backendCircularBufferSize
232 return backendCircularBufferSize;
241 communicatorToFrontend->connectToInternetServer(
244 PDBLoggerPtr scannerLogger = make_shared<PDBLogger>(
"scanner.log");
247 PageScannerPtr scanner = make_shared<PageScanner>(communicatorToFrontend,
251 backendCircularBufferSize,
254 std::vector<PageCircularBufferIteratorPtr> iterators;
258 errMsg =
"Error: A job is already running!";
259 std::cout << errMsg << std::endl;
264 std::cout <<
"To send GetSetPages message" << std::endl;
265 iterators = scanner->getSetIterators(
nodeId,
266 jobStage->getSourceContext()->getDatabaseId(),
267 jobStage->getSourceContext()->getTypeId(),
268 jobStage->getSourceContext()->getSetId());
269 std::cout <<
"GetSetPages message is sent" << std::endl;
278 std::vector<PageCircularBufferPtr>& sourceBuffers,
283 std::string& errMsg) {
284 std::cout <<
"to feed shared buffers for " << numPartitions <<
" partitions" << std::endl;
286 std::vector<PageCircularBufferIteratorPtr> scanIterators =
288 int numScanThreads = scanIterators.size();
289 std::cout <<
"we've got " << numScanThreads <<
" iterators" << std::endl;
292 for (
int i = 0; i < numScanThreads; i++) {
296 std::cout <<
"to run the " << i <<
"-th scan work..." << std::endl;
302 while (iter->hasNext()) {
304 if (page !=
nullptr) {
305 std::cout <<
"Scanner got a non-null page" << std::endl;
306 for (
int j = 0; j < numPartitions; j++) {
309 for (
int j = 0; j < numPartitions; j++) {
310 std::cout <<
"add page to the " << j <<
"-th buffer" << std::endl;
311 sourceBuffers[j]->addPageToTail(page);
318 worker->execute(myWork, tempBuzzer);
324 pthread_mutex_t connection_mutex,
325 std::string& errMsg) {
328 std::string loggerName = std::string(
"PipelineStage_") + std::to_string(i);
330 pthread_mutex_lock(&connection_mutex);
332 anotherCommunicatorToFrontend->connectToInternetServer(
333 logger,
conf->getPort(),
conf->getServerAddress(), errMsg);
334 pthread_mutex_unlock(&connection_mutex);
344 std::vector<PageCircularBufferIteratorPtr>& iterators,
347 std::vector<PageCircularBufferPtr>& sinkBuffers,
349 std::string& errMsg) {
351 #ifdef REUSE_CONNECTION_FOR_AGG_NO_COMBINER
353 std::vector<PDBCommunicatorPtr> connections;
354 for (
int j = 0; j <
jobStage->getNumNodes(); j++) {
357 std::string address = this->
jobStage->getIPAddress(j);
358 PDB_COUT <<
"address = " << address << std::endl;
361 int port = this->
jobStage->getPort(j);
362 PDB_COUT <<
"port = " << port << std::endl;
366 communicator->connectToInternetServer(
logger, port, address, errMsg);
367 connections.push_back(communicator);
371 #ifdef CLEANUP_INACTIVE_BLOCKS
379 #ifdef ENABLE_LARGE_GRAPH
385 PDB_COUT << i <<
": to get compute plan" << std::endl;
387 plan->nullifyPlanPointer();
388 PDB_COUT << i <<
": to deep copy ComputePlan object" << std::endl;
391 bool isHashPartitionedJoinProbing =
false;
393 std::vector<std::string> buildTheseTupleSets;
394 jobStage->getTupleSetsToBuildPipeline(buildTheseTupleSets);
395 PDB_COUT <<
"buildTheseTupleSets[0]=" << buildTheseTupleSets[0] << std::endl;
396 std::string sourceSpecifier =
jobStage->getSourceTupleSetSpecifier();
397 PDB_COUT <<
"Source tupleset name=" << sourceSpecifier << std::endl;
398 if (buildTheseTupleSets[0] != sourceSpecifier) {
399 std::string producerComputationName =
400 newPlan->getProducingComputationName(buildTheseTupleSets[0]);
401 PDB_COUT <<
"Producer computation name=" << producerComputationName << std::endl;
402 computation = newPlan->getPlan()->getNode(producerComputationName).getComputationHandle();
403 if (computation->getComputationType() ==
"JoinComp") {
404 isHashPartitionedJoinProbing =
true;
407 if (isHashPartitionedJoinProbing ==
false) {
408 std::string producerComputationName = newPlan->getProducingComputationName(sourceSpecifier);
409 PDB_COUT <<
"Producer computation name=" << producerComputationName << std::endl;
410 computation = newPlan->getPlan()->getNode(producerComputationName).getComputationHandle();
417 if ((sourceContext->getSetType() ==
UserSetType) &&
418 (computation->getComputationType() !=
"JoinComp")) {
421 if (computation->getComputationType() ==
"ScanUserSet") {
422 scanner = unsafeCast<ScanUserSet<Object>,
Computation>(computation);
423 }
else if (computation->getComputationType() ==
"SelectionComp") {
425 unsafeCast<SelectionComp<Object, Object>,
Computation>(computation);
426 scanner = selection->getOutputSetScanner();
427 }
else if (computation->getComputationType() ==
"MultiSelectionComp") {
429 unsafeCast<MultiSelectionComp<Object, Object>,
Computation>(computation);
430 scanner = multiSelection->getOutputSetScanner();
431 }
else if (computation->getComputationType() ==
"ClusterAggregationComp") {
433 unsafeCast<AggregateComp<Object, Object, Object, Object>,
Computation>(
435 scanner = aggregator->getOutputSetScanner();
436 }
else if (computation->getComputationType() ==
"PartitionComp") {
438 unsafeCast<PartitionComp<Object, Object>,
Computation>(computation);
439 scanner = partitioner->getOutputSetScanner();
441 std::cout <<
"Error: we can't support source computation type "
442 << computation->getComputationType() << std::endl;
446 if (scanner !=
nullptr) {
447 scanner->setIterator(iterators.at(i));
448 scanner->setProxy(proxy);
449 if ((scanner->getBatchSize() <= 0) || (scanner->getBatchSize() > 100)) {
452 PDB_COUT <<
"SCANNER BATCH SIZE: " << scanner->getBatchSize() << std::endl;
454 }
else if ((sourceContext->getSetType() ==
UserSetType) &&
455 (computation->getComputationType() ==
"JoinComp")) {
458 unsafeCast<JoinComp<Object, Object, Object>,
Computation>(computation);
459 join->setIterator(iterators.at(i));
460 join->setProxy(proxy);
461 if (join->getBatchSize() == 0) {
462 join->setBatchSize(3);
464 join->setPartitionId(i);
465 join->setNumPartitions(this->
jobStage->getNumTotalPartitions());
466 join->setNumNodes(this->
jobStage->getNumNodes());
471 unsafeCast<AggregateComp<Object, Object, Object, Object>,
Computation>(
473 void* pagePointer = hashSet->getPage(i);
474 if (pagePointer !=
nullptr) {
475 aggregator->setHashTablePointer(hashSet->getPage(i));
477 std::cout <<
"There is no more hash partition for this thread, we simply return"
484 std::map<std::string, ComputeInfoPtr> info;
485 if ((this->
jobStage->isProbing() ==
true) && (this->
jobStage->getHashSets() !=
nullptr)) {
488 mapIter != hashSetsToProbe->end();
490 std::string key = (*mapIter).key;
491 std::string hashSetName = (*mapIter).value;
492 std::cout <<
"to probe " << key <<
":" << hashSetName << std::endl;
494 if (hashSet ==
nullptr) {
495 std::cout <<
"ERROR in pipeline execution: data not found in hash set "
496 << hashSetName <<
"!" << std::endl;
499 if (hashSet->getHashSetType() ==
"SharedHashSet") {
501 info[key] = std::make_shared<JoinArg>(*newPlan, sharedHashSet->getPage());
502 }
else if (hashSet->getHashSetType() ==
"PartitionedHashSet") {
505 info[key] = std::make_shared<JoinArg>(*newPlan, partitionedHashSet->getPage(i));
509 std::cout <<
"info contains nothing for this stage" << std::endl;
510 if (this->
jobStage->isProbing() ==
true) {
511 std::cout <<
"this stage needs probing hash tables" << std::endl;
513 std::cout <<
"this stage doesn't need probing hash tables" << std :: endl;
515 if (this->
jobStage->getHashSets() !=
nullptr) {
516 std::cout <<
"we have hash tables prepared for the stage" << std::endl;
518 std::cout <<
"we don't have hash tables prepared for the stage" << std :: endl;
521 std::cout <<
"this stage has a UserSetType source" << std::endl;
523 std::cout <<
"this stage doesn't have a UserSetType source" << std :: endl;
527 std::cout <<
"source specifier: " << this->
jobStage->getSourceTupleSetSpecifier() << std::endl;
528 std::cout <<
"target specifier: " << this->
jobStage->getTargetTupleSetSpecifier() << std::endl;
529 std::cout <<
"target computation: " << this->
jobStage->getTargetComputationSpecifier()
533 std::string targetSpecifier =
jobStage->getTargetComputationSpecifier();
534 if (targetSpecifier.find(
"ClusterAggregationComp") != std::string::npos) {
536 newPlan->getPlan()->getNode(targetSpecifier).getComputationHandle();
538 unsafeCast<AbstractAggregateComp, Computation>(aggComputation);
539 int numPartitionsInCluster = this->
jobStage->getNumTotalPartitions();
540 PDB_COUT <<
"num partitions in the cluster is " << numPartitionsInCluster << std::endl;
541 aggregate->setNumNodes(
jobStage->getNumNodes());
542 aggregate->setNumPartitions(numPartitionsInCluster);
543 aggregate->setBatchSize(this->
batchSize);
544 }
else if (targetSpecifier.find(
"JoinComp") != std::string::npos) {
546 newPlan->getPlan()->getNode(targetSpecifier).getComputationHandle();
547 join = unsafeCast<JoinComp<Object, Object, Object>,
Computation>(joinComputation);
548 join->setNumPartitions(this->
jobStage->getNumTotalPartitions());
549 join->setNumNodes(this->
jobStage->getNumNodes());
550 std::cout <<
"Join set to have " << join->getNumPartitions() <<
" partitions" << std::endl;
551 std::cout <<
"Join set to have " << join->getNumNodes() <<
" nodes" << std::endl;
552 }
else if (targetSpecifier.find(
"PartitionComp") != std::string::npos) {
554 newPlan->getPlan()->getNode(targetSpecifier).getComputationHandle();
556 unsafeCast<PartitionComp<Object, Object>,
Computation>(partitionComputation);
557 int numPartitionsInCluster = this->
jobStage->getNumTotalPartitions();
558 PDB_COUT <<
"num partitions in the cluster is " << numPartitionsInCluster << std::endl;
559 partitioner->setNumPartitions(numPartitionsInCluster);
560 partitioner->setNumNodes(
jobStage->getNumNodes());
563 #ifdef REUSE_CONNECTION_FOR_AGG_NO_COMBINER
565 if ((this->
jobStage->isRepartition() ==
true) && (this->
jobStage->isCombining() ==
false) &&
567 mem = (
char*)malloc(
conf->getNetShufflePageSize());
570 newPlan->nullifyPlanPointer();
574 this->
jobStage->getSourceTupleSetSpecifier(),
575 this->
jobStage->getTargetComputationSpecifier(),
576 [&]() -> std::pair<void*, size_t> {
578 sizeof(
SetID) +
sizeof(
PageID) +
sizeof(
int) +
sizeof(size_t));
579 if ((this->
jobStage->isBroadcasting() ==
false) &&
580 (this->
jobStage->isRepartition() ==
false) &&
582 proxy->addUserPage(outputSet->getDatabaseId(),
583 outputSet->getTypeId(),
584 outputSet->getSetId(),
587 if (output ==
nullptr) {
588 std::cout <<
"Pipeline Error: insufficient memory in heap" << std::endl;
589 return std::make_pair(
nullptr, 0);
591 return std::make_pair(output->getBytes(), output->getSize());
594 }
else if ((this->
jobStage->isBroadcasting() ==
false) &&
595 (this->
jobStage->isRepartition() ==
false) &&
599 void* myPage = calloc(outputSet->getPageSize(), 1);
600 if (myPage ==
nullptr) {
601 std::cout <<
"Pipeline Error: insufficient memory in heap" << std::endl;
603 return std::make_pair((
char*)myPage + headerSize,
604 outputSet->getPageSize() - headerSize);
607 }
else if ((this->
jobStage->isBroadcasting() ==
true) ||
608 ((this->
jobStage->isRepartition() ==
true) &&
609 (this->
jobStage->isCombining() ==
false) && (join !=
nullptr))) {
612 void* myPage = calloc(
conf->getBroadcastPageSize(), 1);
613 if (myPage ==
nullptr) {
614 std::cout <<
"Pipeline Error: insufficient memory in heap" << std::endl;
616 return std::make_pair((
char*)myPage + headerSize,
conf->getNetBroadcastPageSize());
622 std::cout <<
"to allocate a page for storing partition sink with size="
623 <<
conf->getShufflePageSize() << std::endl;
624 void* myPage = calloc(
conf->getShufflePageSize(), 1);
625 if (myPage ==
nullptr) {
626 std::cout <<
"Pipeline Error: insufficient memory in heap" << std::endl;
628 return std::make_pair((
char*)myPage + headerSize,
conf->getNetShufflePageSize());
635 if ((this->
jobStage->isBroadcasting() ==
false) &&
636 (this->
jobStage->isRepartition() ==
false) &&
638 if (output !=
nullptr) {
639 proxy->unpinUserPage(
640 nodeId, output->getDbID(), output->getTypeID(), output->getSetID(), output);
645 sizeof(
SetID) +
sizeof(
PageID) +
sizeof(
int) +
sizeof(
size_t)));
651 sizeof(
SetID) +
sizeof(
PageID) +
sizeof(
int) +
sizeof(size_t));
652 if (this->
jobStage->isBroadcasting() ==
true) {
653 PDB_COUT <<
"to broadcast a page" << std::endl;
658 if (record !=
nullptr) {
660 if (objectToSend !=
nullptr) {
661 PDBPagePtr pageToBroadcast = std::make_shared<PDBPage>(
664 sizeof(
SetID) +
sizeof(
PageID) +
sizeof(int) +
sizeof(
size_t))),
670 conf->getBroadcastPageSize(),
673 int numNodes =
jobStage->getNumNodes();
676 for (k = 0; k < numNodes; k++) {
677 pageToBroadcast->incRefCount();
679 for (k = 0; k < numNodes; k++) {
682 buffer->addPageToTail(pageToBroadcast);
685 proxy->pinBytes(outputSet->getDatabaseId(),
686 outputSet->getTypeId(),
687 outputSet->getSetId(),
690 pageToBroadcast->decRefCount();
691 if (pageToBroadcast->getRefCount() == 0) {
692 pageToBroadcast->freeContent();
695 free((
char*)page - headerSize);
698 free((
char*)page - headerSize);
700 }
else if ((this->
jobStage->isRepartition() ==
true) &&
701 (this->
jobStage->isCombining() ==
false) && (join !=
nullptr)) {
702 PDB_COUT <<
"to hash partition a page" << std::endl;
707 if (record !=
nullptr) {
709 if (objectToSend !=
nullptr) {
710 PDBPagePtr pageToSend = std::make_shared<PDBPage>(
713 sizeof(
SetID) +
sizeof(
PageID) +
sizeof(int) +
sizeof(
size_t))),
719 conf->getBroadcastPageSize(),
722 int numNodes =
jobStage->getNumNodes();
724 for (k = 0; k < numNodes; k++) {
725 pageToSend->incRefCount();
727 for (k = 0; k < numNodes; k++) {
729 buffer->addPageToTail(pageToSend);
732 free((
char*)page - headerSize);
735 free((
char*)page - headerSize);
738 }
else if ((this->
jobStage->isRepartition() ==
true) &&
739 (this->
jobStage->isCombining() ==
true)) {
743 sizeof(
SetID) +
sizeof(
PageID) +
sizeof(int) +
sizeof(
size_t)),
749 conf->getShufflePageSize(),
752 int numNodes =
jobStage->getNumNodes();
754 for (k = 0; k < numNodes; k++) {
755 output->incRefCount();
757 for (k = 0; k < numNodes; k++) {
759 buffer->addPageToTail(output);
762 }
else if ((this->
jobStage->isRepartition() ==
true) &&
763 (this->
jobStage->isCombining() ==
false) && (join ==
nullptr)) {
765 std::cout <<
"to shuffle data on this page" << std::endl;
768 if (record !=
nullptr) {
771 int numNodes =
jobStage->getNumNodes();
773 for (k = 0; k < numNodes; k++) {
775 #ifdef REUSE_CONNECTION_FOR_AGG_NO_COMBINER
777 getRecord(objectToShuffle, mem,
conf->getNetShufflePageSize());
778 std::cout <<
"send " << myRecord->
numBytes() <<
" bytes to node-" << k
780 if (objectToShuffle !=
nullptr) {
785 jobStage->getSinkContext()->getDatabase(),
786 jobStage->getSinkContext()->getSetName(),
790 if (objectToShuffle !=
nullptr) {
793 std::string address = this->
jobStage->getIPAddress(k);
796 int port = this->
jobStage->getPort(k);
797 bool whetherToPersist =
true;
799 this->
jobStage->getSinkContext()->getDatabase(),
800 this->
jobStage->getSinkContext()->getSetName(),
809 free((
char*)page - headerSize);
813 proxy->unpinUserPage(
814 nodeId, output->getDbID(), output->getTypeID(), output->getSetID(), output);
819 proxy->addUserPage(outputSet->getDatabaseId(),
820 outputSet->getTypeId(),
821 outputSet->getSetId(),
823 memcpy(output->getBytes(), page, output->getSize());
824 proxy->unpinUserPage(
825 nodeId, output->getDbID(), output->getTypeID(), output->getSetID(), output);
826 free((
char*)page - headerSize);
832 std::cout <<
"\nRunning Pipeline\n";
834 curPipeline =
nullptr;
835 newPlan->nullifyPlanPointer();
836 #ifdef REUSE_CONNECTION_FOR_AGG_NO_COMBINER
838 for (
int j = 0; j <
jobStage->getNumNodes(); j++) {
842 jobStage->getSinkContext()->getDatabase(),
843 jobStage->getSinkContext()->getSetName(),
846 if (mem !=
nullptr) {
855 std::vector<PageCircularBufferPtr> sinkBuffers;
857 make_shared<SetSpecifier>(
jobStage->getSinkContext()->getDatabase(),
858 jobStage->getSinkContext()->getSetName(),
859 jobStage->getSinkContext()->getDatabaseId(),
860 jobStage->getSinkContext()->getTypeId(),
861 jobStage->getSinkContext()->getSetId(),
862 jobStage->getSinkContext()->getPageSize());
869 std::vector<PageCircularBufferPtr> sinkBuffers,
872 #ifdef ENABLE_LARGE_GRAPH
879 int numPartitions = 0;
880 int sourceCounter = 0;
882 std::vector<PageCircularBufferPtr> sourceBuffers;
884 std::vector<PageCircularBufferIteratorPtr> iterators;
890 plan->nullifyPlanPointer();
891 PDB_COUT <<
"to deep copy ComputePlan object" << std::endl;
893 bool isHashPartitionedJoinProbing =
false;
895 std::vector<std::string> buildTheseTupleSets;
896 jobStage->getTupleSetsToBuildPipeline(buildTheseTupleSets);
897 PDB_COUT <<
"buildTheseTupleSets[0]=" << buildTheseTupleSets[0] << std::endl;
898 std::string sourceSpecifier =
jobStage->getSourceTupleSetSpecifier();
899 PDB_COUT <<
"Source tupleset name=" << sourceSpecifier << std::endl;
900 if (buildTheseTupleSets[0] != sourceSpecifier) {
901 std::string producerComputationName =
902 newPlan->getProducingComputationName(buildTheseTupleSets[0]);
903 PDB_COUT <<
"Producer computation name=" << producerComputationName << std::endl;
904 computation = newPlan->getPlan()->getNode(producerComputationName).getComputationHandle();
905 if (computation->getComputationType() ==
"JoinComp") {
906 isHashPartitionedJoinProbing =
true;
909 if (isHashPartitionedJoinProbing ==
false) {
910 std::string producerComputationName = newPlan->getProducingComputationName(sourceSpecifier);
911 PDB_COUT <<
"Producer computation name=" << producerComputationName << std::endl;
912 computation = newPlan->getPlan()->getNode(producerComputationName).getComputationHandle();
916 if ((sourceContext->getSetType() ==
UserSetType) &&
917 (computation->getComputationType() !=
"JoinComp")) {
919 }
else if ((sourceContext->getSetType() ==
UserSetType) &&
920 (computation->getComputationType() ==
"JoinComp")) {
921 int sourceBufferSize = 2;
922 int numPartitionsInCluster = this->
jobStage->getNumTotalPartitions();
923 int numNodes = this->
jobStage->getNumNodes();
924 numPartitions = numPartitionsInCluster / numNodes;
925 for (
int i = 0; i < numPartitions; i++) {
927 make_shared<PDBLogger>(std::string(
"scanPartitionedSource-") + std::to_string(i));
929 make_shared<PageCircularBuffer>(sourceBufferSize, myLogger);
930 sourceBuffers.push_back(buffer);
932 make_shared<PageCircularBufferIterator>(i, buffer, myLogger);
933 iterators.push_back(iter);
936 std::string hashSetName = sourceContext->getDatabase() +
":" + sourceContext->getSetName();
942 pthread_mutex_t connection_mutex;
943 pthread_mutex_init(&connection_mutex,
nullptr);
947 make_shared<PDBBuzzer>([&](
PDBAlarm myAlarm,
int& counter) { counter++; });
950 if (numPartitions > 0) {
951 numSourceThreads = numPartitions;
953 std::cout <<
"to run pipeline with " << numSourceThreads <<
" threads." << std::endl;
956 for (
int i = 0; i < numSourceThreads; i++) {
959 PDB_COUT <<
"to run the " << i <<
"-th work..." << std::endl;
966 std::cout <<
"print inactive blocks before running pipeline in this worker:"
968 std::cout << out << std::endl;
978 i, outputSet, iterators, hashSet, proxy, sinkBuffers, server, errMsg);
984 std::cout <<
"print inactive blocks after running pipeline in this worker:"
986 std::cout << out << std::endl;
993 worker->execute(myWork, tempBuzzer);
996 if ((sourceContext->getSetType() ==
UserSetType) &&
997 (computation->getComputationType() ==
"JoinComp")) {
999 std::cout <<
"start scanning source set and put pages to source buffers" << std::endl;
1001 sourceBuzzer = make_shared<PDBBuzzer>([&](
PDBAlarm myAlarm,
int& sourceCounter) {
1003 PDB_COUT <<
"source counter = " << sourceCounter << std::endl;
1006 std::vector<PageCircularBufferIteratorPtr> scanIterators =
1008 int numScanThreads = scanIterators.size();
1009 std::cout <<
"we've got " << numScanThreads <<
" iterators" << std::endl;
1011 for (
int i = 0; i < numScanThreads; i++) {
1015 std::cout <<
"to run the " << i <<
"-th scan work..." << std::endl;
1022 while (iter->hasNext()) {
1023 page = iter->next();
1024 if (page !=
nullptr) {
1025 std::cout <<
"Scanner got a non-null page" << std::endl;
1026 for (
int j = 0; j < numPartitions; j++) {
1027 page->incRefCount();
1029 std::cout <<
"Initialize join source page reference count to "
1030 << page->getRefCount() << std::endl;
1031 for (
int j = 0; j < numPartitions; j++) {
1032 sourceBuffers[j]->addPageToTail(page);
1039 worker->execute(myWork, sourceBuzzer);
1042 while (sourceCounter < 1) {
1043 sourceBuzzer->wait();
1046 std::cout <<
"Scanned all pages, now we close all source buffers" << std::endl;
1048 for (
int i = 0; i < numPartitions; i++) {
1054 while (counter < numSourceThreads) {
1059 pthread_mutex_destroy(&connection_mutex);
1064 errMsg =
"Error: No job is running!";
1065 std::cout << errMsg << std::endl;
1078 int numNodes =
jobStage->getNumNodes();
1081 size_t memSize =
jobStage->getTotalMemoryOnThisNode();
1082 size_t sharedMemPoolSize =
conf->getShmSize();
1083 #ifndef USE_VALGRIND
1084 size_t tunedHashPageSize =
1085 (double)(memSize * ((
size_t)(1024)) - sharedMemPoolSize - server->
getHashSetsSize()) *
1086 (0.8) / (double)(numNodes);
1088 size_t tunedHashPageSize =
1089 (double)(memSize * ((
size_t)(1024)) - sharedMemPoolSize - server->
getHashSetsSize()) *
1090 (0.5) / (double)(numNodes);
1092 if (memSize * ((
size_t)(1024)) <
1093 sharedMemPoolSize + (size_t)512 * (
size_t)1024 * (size_t)1024) {
1094 std::cout <<
"WARNING: Auto tuning can not work for this case, we use default value"
1096 tunedHashPageSize =
conf->getHashPageSize();
1099 std::cout <<
"Tuned combiner page size is " << tunedHashPageSize << std::endl;
1100 conf->setHashPageSize(tunedHashPageSize);
1104 size_t combinerPageSize =
conf->getHashPageSize();
1107 if (combinerBufferSize > 12) {
1108 combinerBufferSize = 12;
1110 PDB_COUT <<
"combinerBufferSize=" << combinerBufferSize << std::endl;
1111 std::vector<PageCircularBufferPtr> combinerBuffers;
1112 std::vector<PageCircularBufferIteratorPtr> combinerIters;
1114 pthread_mutex_t connection_mutex;
1115 pthread_mutex_init(&connection_mutex,
nullptr);
1119 make_shared<PDBBuzzer>([&](
PDBAlarm myAlarm,
int& combinerCounter) {
1121 PDB_COUT <<
"combinerCounter = " << combinerCounter << std::endl;
1123 PDB_COUT <<
"to run combiner with " << numNodes <<
" threads." << std::endl;
1124 int combinerCounter = 0;
1127 for (i = 0; i < numNodes; i++) {
1129 combinerBuffers.push_back(buffer);
1131 make_shared<PageCircularBufferIterator>(i, buffer,
logger);
1132 combinerIters.push_back(iter);
1135 PDB_COUT <<
"to run the " << i <<
"-th combining work..." << std::endl;
1142 std::cout <<
"inactive blocks before running combiner in this worker:" << std::endl;
1143 std::cout << out << std::endl;
1154 std::string address =
"";
1156 int numNodesToCollect = this->
jobStage->getNumNodesToCollect();
1157 if (this->
jobStage->isCollectAsMap() ==
false) {
1160 address = this->
jobStage->getIPAddress(i);
1161 PDB_COUT <<
"address = " << address << std::endl;
1165 PDB_COUT <<
"port = " << port << std::endl;
1170 address = this->
jobStage->getIPAddress(i % numNodesToCollect);
1171 PDB_COUT <<
"address = " << address << std::endl;
1174 port = this->
jobStage->getPort(i % numNodesToCollect);
1175 PDB_COUT <<
"port = " << port << std::endl;
1178 PDB_COUT << i <<
": to get compute plan" << std::endl;
1179 #ifdef ENABLE_LARGE_GRAPH
1185 plan->nullifyPlanPointer();
1186 PDB_COUT << i <<
": to deep copy ComputePlan object" << std::endl;
1188 std::string targetSpecifier =
jobStage->getTargetComputationSpecifier();
1189 PDB_COUT <<
"target computation name=" << targetSpecifier << std::endl;
1191 newPlan->getPlan()->getNode(targetSpecifier).getComputationHandle();
1193 unsafeCast<AbstractAggregateComp, Computation>(computation);
1195 std::vector<HashPartitionID> stdPartitions;
1196 int numPartitionsOnTheNode = partitions->size();
1197 PDB_COUT <<
"num partitions on this node:" << numPartitionsOnTheNode << std::endl;
1198 for (
int m = 0; m < numPartitionsOnTheNode; m++) {
1199 PDB_COUT << m <<
":" << (*partitions)[m] << std::endl;
1200 stdPartitions.push_back((*partitions)[m]);
1204 aggregate->getCombinerProcessor(stdPartitions);
1205 size_t myCombinerPageSize = combinerPageSize;
1206 if (myCombinerPageSize >
conf->getShufflePageSize() - 64) {
1207 myCombinerPageSize =
conf->getShufflePageSize() - 64;
1209 void* combinerPage = (
void*)calloc(myCombinerPageSize,
sizeof(
char));
1210 if (combinerPage ==
nullptr) {
1211 std::cout <<
"Fatal Error: insufficient memory can be allocated from memory"
1215 std::cout << i <<
": load a combiner page with size = " << myCombinerPageSize
1217 combinerProcessor->loadOutputPage(combinerPage, myCombinerPageSize);
1221 while (myIter->hasNext()) {
1223 if (page !=
nullptr) {
1226 combinerProcessor->loadInputPage(page->getBytes());
1227 while (combinerProcessor->fillNextOutputPage()) {
1231 #ifndef ENABLE_COMPRESSION
1233 this->
jobStage->getSinkContext()->getDatabase(),
1234 this->
jobStage->getSinkContext()->getSetName(),
1240 char* compressedBytes =
1241 new char[snappy::MaxCompressedLength(record->
numBytes())];
1242 size_t compressedSize;
1243 snappy::RawCompress(
1244 (
char*)record, record->
numBytes(), compressedBytes, &compressedSize);
1245 std::cout <<
"size before compression is " << record->
numBytes()
1246 <<
" and size after compression is " << compressedSize
1251 this->
jobStage->getSinkContext()->getDatabase(),
1252 this->
jobStage->getSinkContext()->getSetName(),
1256 delete[] compressedBytes;
1262 combinerProcessor->clearOutputPage();
1265 combinerPage = (
void*)malloc(myCombinerPageSize *
sizeof(
char));
1266 if (combinerPage ==
nullptr) {
1268 <<
"Fatal Error: insufficient memory can be allocated from memory"
1272 std::cout <<
"load a combiner page with size = " << myCombinerPageSize
1275 combinerProcessor->loadOutputPage(combinerPage, myCombinerPageSize);
1279 page->decRefCount();
1280 if (page->getRefCount() == 0) {
1281 page->freeContent();
1285 combinerProcessor->finalize();
1286 combinerProcessor->fillNextOutputPage();
1288 PDB_COUT <<
"processed " << numPages <<
" pages" << std::endl;
1290 #ifndef ENABLE_COMPRESSION
1292 this->
jobStage->getSinkContext()->getDatabase(),
1293 this->
jobStage->getSinkContext()->getSetName(),
1299 char* compressedBytes =
new char[snappy::MaxCompressedLength(record->
numBytes())];
1300 size_t compressedSize;
1301 snappy::RawCompress(
1302 (
char*)record, record->
numBytes(), compressedBytes, &compressedSize);
1303 std::cout <<
"size before compression is " << record->
numBytes()
1304 <<
" and size after compression is " << compressedSize << std::endl;
1307 this->
jobStage->getSinkContext()->getDatabase(),
1308 this->
jobStage->getSinkContext()->getSetName(),
1312 delete[] compressedBytes;
1317 combinerProcessor->clearOutputPage();
1322 std::cout <<
"inactive blocks after running combiner in this worker:" << std::endl;
1323 std::cout << out << std::endl;
1332 worker->execute(myWork, combinerBuzzer);
1335 make_shared<SetSpecifier>(
jobStage->getCombinerContext()->getDatabase(),
1336 jobStage->getCombinerContext()->getSetName(),
1337 jobStage->getCombinerContext()->getDatabaseId(),
1338 jobStage->getCombinerContext()->getTypeId(),
1339 jobStage->getCombinerContext()->getSetId(),
1340 jobStage->getCombinerContext()->getPageSize());
1345 for (k = 0; k < numNodes; k++) {
1350 while (combinerCounter < numNodes) {
1351 combinerBuzzer->wait();
1354 combinerCounter = 0;
1364 int numNodes =
jobStage->getNumNodes();
1368 if (shuffleBufferSize > 12) {
1369 shuffleBufferSize = 12;
1371 PDB_COUT <<
"shuffleBufferSize=" << shuffleBufferSize << std::endl;
1372 std::vector<PageCircularBufferPtr> shuffleBuffers;
1373 std::vector<PageCircularBufferIteratorPtr> shuffleIters;
1375 pthread_mutex_t connection_mutex;
1376 pthread_mutex_init(&connection_mutex,
nullptr);
1379 PDBBuzzerPtr shuffleBuzzer = make_shared<PDBBuzzer>([&](
PDBAlarm myAlarm,
int& shuffleCounter) {
1381 PDB_COUT <<
"shuffleCounter = " << shuffleCounter << std::endl;
1383 PDB_COUT <<
"to run shuffle with " << numNodes <<
" threads." << std::endl;
1384 int shuffleCounter = 0;
1388 for (i = 0; i < numNodes; i++) {
1390 shuffleBuffers.push_back(buffer);
1392 make_shared<PageCircularBufferIterator>(i, buffer,
logger);
1393 shuffleIters.push_back(iter);
1396 PDB_COUT <<
"to run the " << i <<
"-th broadcasting work..." << std::endl;
1399 if (i == myNodeId) {
1407 std::cout <<
"inactive blocks before sending data in this worker:" << std::endl;
1408 std::cout << out << std::endl;
1414 std::string address = this->
jobStage->getIPAddress(i);
1415 PDB_COUT <<
"address = " << address << std::endl;
1418 int port = this->
jobStage->getPort(i);
1420 PDB_COUT <<
"port = " << port << std::endl;
1424 communicator->connectToInternetServer(
logger, port, address, errMsg);
1428 while (myIter->hasNext()) {
1430 if (page !=
nullptr) {
1438 jobStage->getSinkContext()->getDatabase(),
1439 jobStage->getSinkContext()->getSetName(),
1442 page->decRefCount();
1443 if (page->getRefCount() == 0) {
1444 page->freeContent();
1448 std::cout <<
"broadcasted " << numPages <<
" pages to address: " << address
1453 jobStage->getSinkContext()->getDatabase(),
1454 jobStage->getSinkContext()->getSetName(),
1458 std::cout <<
"inactive blocks after sending data in this worker:" << std::endl;
1459 std::cout << out << std::endl;
1468 worker->execute(myWork, shuffleBuzzer);
1471 make_shared<SetSpecifier>(
jobStage->getSinkContext()->getDatabase(),
1472 jobStage->getSinkContext()->getSetName(),
1473 jobStage->getSinkContext()->getDatabaseId(),
1474 jobStage->getSinkContext()->getTypeId(),
1475 jobStage->getSinkContext()->getSetId(),
1476 jobStage->getSinkContext()->getPageSize());
1481 for (k = 0; k < numNodes; k++) {
1486 while (shuffleCounter < numNodes) {
1487 shuffleBuzzer->wait();
1500 int numNodes =
jobStage->getNumNodes();
1504 if (shuffleBufferSize > 12) {
1505 shuffleBufferSize = 12;
1507 PDB_COUT <<
"shuffleBufferSize=" << shuffleBufferSize << std::endl;
1508 std::vector<PageCircularBufferPtr> shuffleBuffers;
1509 std::vector<PageCircularBufferIteratorPtr> shuffleIters;
1511 pthread_mutex_t connection_mutex;
1512 pthread_mutex_init(&connection_mutex,
nullptr);
1515 PDBBuzzerPtr shuffleBuzzer = make_shared<PDBBuzzer>([&](
PDBAlarm myAlarm,
int& shuffleCounter) {
1517 PDB_COUT <<
"shuffleCounter = " << shuffleCounter << std::endl;
1519 PDB_COUT <<
"to run shuffle with " << numNodes <<
" threads." << std::endl;
1520 int shuffleCounter = 0;
1524 for (i = 0; i < numNodes; i++) {
1526 shuffleBuffers.push_back(buffer);
1528 make_shared<PageCircularBufferIterator>(i, buffer,
logger);
1529 shuffleIters.push_back(iter);
1532 PDB_COUT <<
"to run the " << i <<
"-th hash partitioning work..." << std::endl;
1539 std::cout <<
"inactive blocks before sending data in this worker:" << std::endl;
1540 std::cout << out << std::endl;
1548 if (i == myNodeId) {
1552 std::string address = this->
jobStage->getIPAddress(i);
1553 PDB_COUT <<
"address = " << address << std::endl;
1556 int port = this->
jobStage->getPort(i);
1558 PDB_COUT <<
"port = " << port << std::endl;
1561 communicator->connectToInternetServer(
logger, port, address, errMsg);
1564 PDB_COUT << i <<
": to get compute plan" << std::endl;
1566 plan->nullifyPlanPointer();
1567 PDB_COUT << i <<
": to deep copy ComputePlan object" << std::endl;
1569 std::string sourceTupleSetSpecifier =
jobStage->getSourceTupleSetSpecifier();
1570 std::string targetTupleSetSpecifier =
jobStage->getTargetTupleSetSpecifier();
1571 std::string targetSpecifier =
jobStage->getTargetComputationSpecifier();
1574 sourceTupleSetSpecifier, targetTupleSetSpecifier, targetSpecifier);
1575 shuffler->setNodeId(i);
1581 char* output =
nullptr;
1582 char* buffer =
nullptr;
1584 while (myIter->hasNext()) {
1586 if (page !=
nullptr) {
1588 if (output ==
nullptr) {
1590 output = (
char*)calloc(
conf->getNetBroadcastPageSize(), 1);
1593 myMaps = shuffler->createNewOutputContainer();
1602 if (record !=
nullptr) {
1607 for (
int j = 0; j < theOtherMaps.
size(); j++) {
1610 bool success = shuffler->writeOut(theOtherMaps[j], myMaps);
1611 if (success ==
false) {
1615 size_t numBytes = myRecord->
numBytes();
1616 char* sendBuffer = (
char*)malloc(numBytes);
1617 if (sendBuffer ==
nullptr) {
1618 std::cout <<
"Out of memory on heap" << std::endl;
1621 memcpy(sendBuffer, output, numBytes);
1622 if (i != myNodeId) {
1630 jobStage->getSinkContext()->getDatabase(),
1631 jobStage->getSinkContext()->getSetName(),
1635 proxy->pinBytes(
jobStage->getSinkContext()->getDatabaseId(),
1636 jobStage->getSinkContext()->getTypeId(),
1637 jobStage->getSinkContext()->getSetId(),
1646 buffer = (
char*)calloc(
conf->getNetBroadcastPageSize(), 1);
1649 myMaps = shuffler->createNewOutputContainer();
1650 shuffler->writeOut(theOtherMaps[j], myMaps);
1654 if ((output !=
nullptr) && (buffer !=
nullptr) && (output != buffer)) {
1662 page->decRefCount();
1663 if (page->getRefCount() == 0) {
1664 page->freeContent();
1670 std::cout <<
"inactive blocks before sending data in this worker:" << std::endl;
1671 std::cout << out << std::endl;
1672 if (myMaps !=
nullptr) {
1675 size_t numBytes = myRecord->
numBytes();
1676 char* sendBuffer = (
char*)malloc(numBytes);
1677 if (sendBuffer ==
nullptr) {
1678 std::cout <<
"Out of memory on heap" << std::endl;
1681 memcpy(sendBuffer, output, numBytes);
1683 std::cout <<
"inactive blocks before sending data in this worker:" << std::endl;
1684 std::cout << out << std::endl;
1685 if (i != myNodeId) {
1691 jobStage->getSinkContext()->getDatabase(),
1692 jobStage->getSinkContext()->getSetName(),
1696 proxy->pinBytes(
jobStage->getSinkContext()->getDatabaseId(),
1697 jobStage->getSinkContext()->getTypeId(),
1698 jobStage->getSinkContext()->getSetId(),
1707 if (output !=
nullptr) {
1711 std::cout <<
"HashPartitioned " << numPages <<
" pages to address: " << address
1713 std::cout << numMaps <<
" maps are written in total for partition-" << i << std::endl;
1714 if (i != myNodeId) {
1718 conf->getNetBroadcastPageSize(),
1719 jobStage->getSinkContext()->getDatabase(),
1720 jobStage->getSinkContext()->getSetName(),
1725 std::cout <<
"inactive blocks after sending data in this worker:" << std::endl;
1726 std::cout << out << std::endl;
1735 worker->execute(myWork, shuffleBuzzer);
1738 make_shared<SetSpecifier>(
jobStage->getSinkContext()->getDatabase(),
1739 jobStage->getSinkContext()->getSetName(),
1740 jobStage->getSinkContext()->getDatabaseId(),
1741 jobStage->getSinkContext()->getTypeId(),
1742 jobStage->getSinkContext()->getSetId(),
1743 jobStage->getSinkContext()->getPageSize());
1746 for (k = 0; k < numNodes; k++) {
1751 while (shuffleCounter < numNodes) {
1752 shuffleBuzzer->wait();
shared_ptr< PageScanner > PageScannerPtr
std::string printCurrentBlock()
std::shared_ptr< AbstractHashSet > AbstractHashSetPtr
shared_ptr< PDBPage > PDBPagePtr
bool storeCompressedShuffleData(char *bytes, size_t numBytes, std::string databaseName, std::string setName, std::string address, int port, std::string &errMsg)
Handle< ObjType > getRootObject()
void runPipelineWithShuffleSink(HermesExecutionServer *server)
void runPipeline(HermesExecutionServer *server, std::vector< PageCircularBufferPtr > combinerBuffers, SetSpecifierPtr outputSet)
shared_ptr< DataProxy > DataProxyPtr
std::shared_ptr< SinkShuffler > SinkShufflerPtr
Functionality & getFunctionality()
void setPolicy(AllocatorPolicy policy)
bool storeShuffleData(Handle< Vector< Handle< Object >>> data, std::string databaseName, std::string setName, std::string address, int port, bool whetherToPersiste, std::string &errMsg)
Handle< TupleSetJobStage > & getJobStage()
std::vector< PageCircularBufferIteratorPtr > getUserSetIterators(HermesExecutionServer *server, int numThreads, bool &success, std::string &errMsg)
Allocator & getAllocator()
std::shared_ptr< Pipeline > PipelinePtr
bool setCurPageScanner(PageScannerPtr curPageScanner)
shared_ptr< PDBWork > PDBWorkPtr
Handle< TupleSetJobStage > jobStage
bool sendData(PDBCommunicatorPtr conn, void *bytes, size_t size, std::string databaseName, std::string setName, std::string &errMsg)
shared_ptr< SharedMem > SharedMemPtr
std::shared_ptr< PartitionedHashSet > PartitionedHashSetPtr
void feedSharedBuffers(HermesExecutionServer *server, std::vector< PageCircularBufferPtr > &sourceBuffers, int numPartitions, int &counter, PDBBuzzerPtr tempBuzzer, bool &success, std::string &errMsg)
Record< ObjType > * getRecord(Handle< ObjType > &forMe)
std::shared_ptr< SharedHashSet > SharedHashSetPtr
std::shared_ptr< PDBCommunicator > PDBCommunicatorPtr
void runPipelineWithBroadcastSink(HermesExecutionServer *server)
std::shared_ptr< SimpleSingleTableQueryProcessor > SimpleSingleTableQueryProcessorPtr
void executePipelineWork(int i, SetSpecifierPtr outputSet, std::vector< PageCircularBufferIteratorPtr > &iterators, PartitionedHashSetPtr hashSet, DataProxyPtr proxy, std::vector< PageCircularBufferPtr > &sinkBuffers, HermesExecutionServer *server, std::string &errMsg)
shared_ptr< PDBBuzzer > PDBBuzzerPtr
void cleanInactiveBlocks()
shared_ptr< Configuration > ConfigurationPtr
shared_ptr< PDBWorker > PDBWorkerPtr
std::shared_ptr< PDBLogger > PDBLoggerPtr
size_t getBackendCircularBufferSize(bool &success, std::string &errMsg)
void runPipelineWithHashPartitionSink(HermesExecutionServer *server)
std::shared_ptr< SetSpecifier > SetSpecifierPtr
shared_ptr< PageCircularBuffer > PageCircularBufferPtr
void makeObjectAllocatorBlock(size_t numBytesIn, bool throwExceptionOnFail)
std::vector< int > nodeIds
shared_ptr< PageCircularBufferIterator > PageCircularBufferIteratorPtr
AbstractHashSetPtr getHashSet(std::string name)
DataProxyPtr createProxy(int i, pthread_mutex_t connection_mutex, std::string &errMsg)
PipelineStage(Handle< TupleSetJobStage > stage, SharedMemPtr shm, PDBLoggerPtr logger, ConfigurationPtr conf, NodeID nodeId, size_t batchSize, int numThreads)
#define DEFAULT_NET_PAGE_SIZE
std::string printInactiveBlocks()