A platform for high-performance distributed tool and library development written in C++. It can be deployed in two different cluster modes: standalone or distributed. API for v0.5.0, released on June 13, 2018.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
PipelineStage.cc
Go to the documentation of this file.
1 /*****************************************************************************
2  * *
3  * Copyright 2018 Rice University *
4  * *
5  * Licensed under the Apache License, Version 2.0 (the "License"); *
6  * you may not use this file except in compliance with the License. *
7  * You may obtain a copy of the License at *
8  * *
9  * http://www.apache.org/licenses/LICENSE-2.0 *
10  * *
11  * Unless required by applicable law or agreed to in writing, software *
12  * distributed under the License is distributed on an "AS IS" BASIS, *
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. *
14  * See the License for the specific language governing permissions and *
15  * limitations under the License. *
16  * *
17  *****************************************************************************/
18 #ifndef PIPELINE_STAGE_CC
19 #define PIPELINE_STAGE_CC
20 
21 
22 #include "AbstractAggregateComp.h"
23 #include "StorageAddData.h"
24 #include "StorageAddObjectInLoop.h"
25 #include "ComputePlan.h"
26 #include "ScanUserSet.h"
27 #include "SelectionComp.h"
28 #include "MultiSelectionComp.h"
29 #include "PDBDebug.h"
30 #include "PipelineStage.h"
32 #include "DataProxy.h"
33 #include "PageScanner.h"
35 #include "BlockQueryProcessor.h"
36 #include "InterfaceFunctions.h"
37 #include "HermesExecutionServer.h"
38 #include "GenericWork.h"
39 #include "SetSpecifier.h"
41 #include "Configuration.h"
42 #include "AggregateComp.h"
43 #include "SharedHashSet.h"
44 #include "JoinComp.h"
46 #include "SimpleSendBytesRequest.h"
47 #include "ShuffleSink.h"
48 #include "PartitionComp.h"
49 #ifdef ENABLE_COMPRESSION
50 #include <snappy.h>
51 #endif
52 #include <fstream>
53 
54 
55 namespace pdb {
56 
58  this->jobStage = nullptr;
59 }
60 
62  SharedMemPtr shm,
63  PDBLoggerPtr logger,
64  ConfigurationPtr conf,
65  NodeID nodeId,
66  size_t batchSize,
67  int numThreads) {
68  this->jobStage = stage;
69  this->batchSize = batchSize;
70  this->numThreads = numThreads;
71  this->nodeId = nodeId;
72  this->logger = logger;
73  this->conf = conf;
74  this->shm = shm;
75  this->id = 0;
76  int numNodes = this->jobStage->getNumNodes();
77  for (int i = 0; i < numNodes; i++) {
78  nodeIds.push_back(i);
79  }
80 }
81 
82 
84  return jobStage;
85 }
86 
87 
89  return this->numThreads;
90 }
91 
92 // send repartitioned data to a remote node
94  std::string databaseName,
95  std::string setName,
96  std::string address,
97  int port,
98  bool whetherToPersist,
99  std::string& errMsg) {
100  if (port <= 0) {
101  port = conf->getPort();
102  }
103  std::cout << "store shuffle data to address=" << address << " and port=" << port
104  << ", with size = " << data->size() << " to database=" << databaseName
105  << " and set=" << setName << " and type = IntermediateData" << std::endl;
106  return simpleSendDataRequest<StorageAddData, Handle<Object>, SimpleRequestResult, bool>(
107  logger,
108  port,
109  address,
110  false,
111  1024,
112  [&](Handle<SimpleRequestResult> result) {
113  if (result != nullptr)
114  if (!result->getRes().first) {
115  logger->error("Error sending data: " + result->getRes().second);
116  errMsg = "Error sending data: " + result->getRes().second;
117  }
118  return true;
119  },
120  data,
121  databaseName,
122  setName,
123  "IntermediateData",
124  false,
125  whetherToPersist);
126 }
127 
129  size_t numBytes,
130  std::string databaseName,
131  std::string setName,
132  std::string address,
133  int port,
134  std::string& errMsg) {
135  if (port <= 0) {
136  port = conf->getPort();
137  }
138  std::cout << "store shuffle data to address=" << address << " and port=" << port
139  << ", with compressed byte size = " << numBytes << " to database=" << databaseName
140  << " and set=" << setName << " and type = IntermediateData" << std::endl;
141  return simpleSendBytesRequest<StorageAddData, SimpleRequestResult, bool>(
142  logger,
143  port,
144  address,
145  false,
146  1024,
147  [&](Handle<SimpleRequestResult> result) {
148  if (result != nullptr)
149  if (!result->getRes().first) {
150  logger->error("Error sending data: " + result->getRes().second);
151  errMsg = "Error sending data: " + result->getRes().second;
152  }
153  return true;
154  },
155  bytes,
156  numBytes,
157  databaseName,
158  setName,
159  "IntermediateData",
160  false,
161  false,
162  true);
163 }
164 
165 // broadcast data
167  void* data,
168  size_t size,
169  std::string databaseName,
170  std::string setName,
171  std::string& errMsg) {
172  bool success;
173  if (data != nullptr) {
174 #ifdef DEBUG_SHUFFLING
175  // write the data to a test file
176  std::string fileName =
177  jobStage->getJobId() + "_" + std::to_string(jobStage->getStageId()) + "_shuffle";
178  FILE* myFile = fopen(fileName.c_str(), "w");
179  fwrite(data, 1, size, myFile);
180  fclose(myFile);
181 #endif
182 
183  Handle<StorageAddObjectInLoop> request = makeObject<StorageAddObjectInLoop>(
184  databaseName, setName, "IntermediateData", false, false);
185  conn->sendObject(request, errMsg);
186 #ifdef ENABLE_COMPRESSION
187  char* compressedBytes = new char[snappy::MaxCompressedLength(size)];
188  size_t compressedSize;
189  snappy::RawCompress((char*)data, size, compressedBytes, &compressedSize);
190  std::cout << "size before compression is " << size << " and size after compression is "
191  << compressedSize << std::endl;
192  conn->sendBytes(compressedBytes, compressedSize, errMsg);
193  delete[] compressedBytes;
194 #else
195  conn->sendBytes(data, size, errMsg);
196 #endif
197 #ifdef DEBUG_SHUFFLING
198  // write the data to a test file
199  std::string fileName1 =
200  jobStage->getJobId() + "_" + std::to_string(jobStage->getStageId()) + "_sent";
201  FILE* myFile1 = fopen(fileName1.c_str(), "w");
202  fwrite(data, 1, size, myFile1);
203  fclose(myFile1);
204 #endif
205  } else {
206  Handle<StorageAddObjectInLoop> request = makeObject<StorageAddObjectInLoop>();
207  request->setLoopEnded();
208  conn->sendObject(request, errMsg);
209  }
210  Handle<SimpleRequestResult> result = conn->getNextObject<SimpleRequestResult>(success, errMsg);
211  return true;
212 }
213 
214 // tuning the backend circular buffer size
215 size_t PipelineStage::getBackendCircularBufferSize(bool& success, std::string& errMsg) {
216 
217  int backendCircularBufferSize = 1;
218  if (conf->getShmSize() / conf->getPageSize() - 2 <
219  2 + 2 * numThreads + backendCircularBufferSize) {
220  success = false;
221  errMsg = "Error: Not enough buffer pool size to run the query! Please reduce number of threads or increase shared memory pool size or reduce default page size and retry";
222  std::cout << errMsg << std::endl;
223  return 0;
224  }
225  backendCircularBufferSize = (conf->getShmSize() / conf->getPageSize() - 4 - 2 * numThreads);
226  if (backendCircularBufferSize > 10) {
227  backendCircularBufferSize = 10;
228  }
229  success = true;
230  PDB_COUT << "backendCircularBufferSize is tuned to be " << backendCircularBufferSize
231  << std::endl;
232  return backendCircularBufferSize;
233 }
234 
235 // to get iterators to scan a user set
236 std::vector<PageCircularBufferIteratorPtr> PipelineStage::getUserSetIterators(
237  HermesExecutionServer* server, int numScanThreads, bool& success, std::string& errMsg) {
238 
239  // initialize the data proxy, scanner and set iterators
240  PDBCommunicatorPtr communicatorToFrontend = make_shared<PDBCommunicator>();
241  communicatorToFrontend->connectToInternetServer(
242  logger, conf->getPort(), conf->getServerAddress(), errMsg);
243 
244  PDBLoggerPtr scannerLogger = make_shared<PDBLogger>("scanner.log");
245  // getScanner
246  int backendCircularBufferSize = getBackendCircularBufferSize(success, errMsg);
247  PageScannerPtr scanner = make_shared<PageScanner>(communicatorToFrontend,
248  shm,
249  scannerLogger,
250  numScanThreads,
251  backendCircularBufferSize,
252  nodeId);
253 
254  std::vector<PageCircularBufferIteratorPtr> iterators;
255 
256  if (server->getFunctionality<HermesExecutionServer>().setCurPageScanner(scanner) == false) {
257  success = false;
258  errMsg = "Error: A job is already running!";
259  std::cout << errMsg << std::endl;
260  return iterators;
261  }
262 
263  // get iterators
264  std::cout << "To send GetSetPages message" << std::endl;
265  iterators = scanner->getSetIterators(nodeId,
266  jobStage->getSourceContext()->getDatabaseId(),
267  jobStage->getSourceContext()->getTypeId(),
268  jobStage->getSourceContext()->getSetId());
269  std::cout << "GetSetPages message is sent" << std::endl;
270 
271  // return iterators
272  return iterators;
273 }
274 
275 
276 // to get iterators to scan a user set in a shared way so that each iterator gets all pages
278  std::vector<PageCircularBufferPtr>& sourceBuffers,
279  int numPartitions,
280  int& counter,
281  PDBBuzzerPtr tempBuzzer,
282  bool& success,
283  std::string& errMsg) {
284  std::cout << "to feed shared buffers for " << numPartitions << " partitions" << std::endl;
285  // get scan iterators
286  std::vector<PageCircularBufferIteratorPtr> scanIterators =
287  getUserSetIterators(server, 1, success, errMsg);
288  int numScanThreads = scanIterators.size();
289  std::cout << "we've got " << numScanThreads << " iterators" << std::endl;
290  // start multiple thread to scan the set
291  counter = 0;
292  for (int i = 0; i < numScanThreads; i++) {
293  // each threads get a page and put the page to each source buffer
294  PDBWorkerPtr worker =
295  server->getFunctionality<HermesExecutionServer>().getWorkers()->getWorker();
296  std::cout << "to run the " << i << "-th scan work..." << std::endl;
297  // start threads
298  PDBWorkPtr myWork = make_shared<GenericWork>([&, i](PDBBuzzerPtr callerBuzzer) {
299  // setup an output page to store intermediate results and final output
300  PageCircularBufferIteratorPtr iter = scanIterators.at(i);
301  PDBPagePtr page = nullptr;
302  while (iter->hasNext()) {
303  page = iter->next();
304  if (page != nullptr) {
305  std::cout << "Scanner got a non-null page" << std::endl;
306  for (int j = 0; j < numPartitions; j++) {
307  page->incRefCount();
308  }
309  for (int j = 0; j < numPartitions; j++) {
310  std::cout << "add page to the " << j << "-th buffer" << std::endl;
311  sourceBuffers[j]->addPageToTail(page);
312  }
313  }
314  }
315  callerBuzzer->buzz(PDBAlarm::WorkAllDone, counter);
316  });
317 
318  worker->execute(myWork, tempBuzzer);
319  }
320 }
321 
322 // to create a data proxy
324  pthread_mutex_t connection_mutex,
325  std::string& errMsg) {
326 
327  // create a data proxy
328  std::string loggerName = std::string("PipelineStage_") + std::to_string(i);
329  PDBLoggerPtr logger = make_shared<PDBLogger>(loggerName);
330  pthread_mutex_lock(&connection_mutex);
331  PDBCommunicatorPtr anotherCommunicatorToFrontend = make_shared<PDBCommunicator>();
332  anotherCommunicatorToFrontend->connectToInternetServer(
333  logger, conf->getPort(), conf->getServerAddress(), errMsg);
334  pthread_mutex_unlock(&connection_mutex);
335  DataProxyPtr proxy = make_shared<DataProxy>(nodeId, anotherCommunicatorToFrontend, shm, logger);
336  return proxy;
337 }
338 
339 // to execute the pipeline work defined in a TupleSetJobStage
340 // iterators can be empty if hash input is used
341 // combinerBuffers can be empty if no combining is required
343  SetSpecifierPtr outputSet,
344  std::vector<PageCircularBufferIteratorPtr>& iterators,
345  PartitionedHashSetPtr hashSet,
346  DataProxyPtr proxy,
347  std::vector<PageCircularBufferPtr>& sinkBuffers,
348  HermesExecutionServer* server,
349  std::string& errMsg) {
350 
351 #ifdef REUSE_CONNECTION_FOR_AGG_NO_COMBINER
352  // connections
353  std::vector<PDBCommunicatorPtr> connections;
354  for (int j = 0; j < jobStage->getNumNodes(); j++) {
355 
356  // get the i-th address
357  std::string address = this->jobStage->getIPAddress(j);
358  PDB_COUT << "address = " << address << std::endl;
359 
360  // get the i-th port
361  int port = this->jobStage->getPort(j);
362  PDB_COUT << "port = " << port << std::endl;
363 
364  // get aggregate computation
365  PDBCommunicatorPtr communicator = std::make_shared<PDBCommunicator>();
366  communicator->connectToInternetServer(logger, port, address, errMsg);
367  connections.push_back(communicator);
368  }
369 #endif
370 
371 #ifdef CLEANUP_INACTIVE_BLOCKS
372  getAllocator().cleanInactiveBlocks((size_t)(1048576));
373 #endif
374  // seed the random number generator for each thread
375  srand(time(NULL));
376 
377 
378 // setup an output page to store intermediate results and final output
379 #ifdef ENABLE_LARGE_GRAPH
380  const UseTemporaryAllocationBlock tempBlock{256 * 1024 * 1024};
381 #else
382  const UseTemporaryAllocationBlock tempBlock{32 * 1024 * 1024};
383 #endif
384 
385  PDB_COUT << i << ": to get compute plan" << std::endl;
386  Handle<ComputePlan> plan = this->jobStage->getComputePlan();
387  plan->nullifyPlanPointer();
388  PDB_COUT << i << ": to deep copy ComputePlan object" << std::endl;
389  Handle<ComputePlan> newPlan = deepCopyToCurrentAllocationBlock<ComputePlan>(plan);
390 
391  bool isHashPartitionedJoinProbing = false;
392  Handle<Computation> computation = nullptr;
393  std::vector<std::string> buildTheseTupleSets;
394  jobStage->getTupleSetsToBuildPipeline(buildTheseTupleSets);
395  PDB_COUT << "buildTheseTupleSets[0]=" << buildTheseTupleSets[0] << std::endl;
396  std::string sourceSpecifier = jobStage->getSourceTupleSetSpecifier();
397  PDB_COUT << "Source tupleset name=" << sourceSpecifier << std::endl;
398  if (buildTheseTupleSets[0] != sourceSpecifier) {
399  std::string producerComputationName =
400  newPlan->getProducingComputationName(buildTheseTupleSets[0]);
401  PDB_COUT << "Producer computation name=" << producerComputationName << std::endl;
402  computation = newPlan->getPlan()->getNode(producerComputationName).getComputationHandle();
403  if (computation->getComputationType() == "JoinComp") {
404  isHashPartitionedJoinProbing = true;
405  }
406  }
407  if (isHashPartitionedJoinProbing == false) {
408  std::string producerComputationName = newPlan->getProducingComputationName(sourceSpecifier);
409  PDB_COUT << "Producer computation name=" << producerComputationName << std::endl;
410  computation = newPlan->getPlan()->getNode(producerComputationName).getComputationHandle();
411  }
412 
413 
414  Handle<SetIdentifier> sourceContext = this->jobStage->getSourceContext();
415 
416  // handle two types of sources
417  if ((sourceContext->getSetType() == UserSetType) &&
418  (computation->getComputationType() != "JoinComp")) {
419  // input is a user set
420  Handle<ScanUserSet<Object>> scanner = nullptr;
421  if (computation->getComputationType() == "ScanUserSet") {
422  scanner = unsafeCast<ScanUserSet<Object>, Computation>(computation);
423  } else if (computation->getComputationType() == "SelectionComp") {
425  unsafeCast<SelectionComp<Object, Object>, Computation>(computation);
426  scanner = selection->getOutputSetScanner();
427  } else if (computation->getComputationType() == "MultiSelectionComp") {
429  unsafeCast<MultiSelectionComp<Object, Object>, Computation>(computation);
430  scanner = multiSelection->getOutputSetScanner();
431  } else if (computation->getComputationType() == "ClusterAggregationComp") {
433  unsafeCast<AggregateComp<Object, Object, Object, Object>, Computation>(
434  computation);
435  scanner = aggregator->getOutputSetScanner();
436  } else if (computation->getComputationType() == "PartitionComp") {
438  unsafeCast<PartitionComp<Object, Object>, Computation>(computation);
439  scanner = partitioner->getOutputSetScanner();
440  } else {
441  std::cout << "Error: we can't support source computation type "
442  << computation->getComputationType() << std::endl;
443  return;
444  }
445 
446  if (scanner != nullptr) {
447  scanner->setIterator(iterators.at(i));
448  scanner->setProxy(proxy);
449  if ((scanner->getBatchSize() <= 0) || (scanner->getBatchSize() > 100)) {
450  scanner->setBatchSize(batchSize);
451  }
452  PDB_COUT << "SCANNER BATCH SIZE: " << scanner->getBatchSize() << std::endl;
453  }
454  } else if ((sourceContext->getSetType() == UserSetType) &&
455  (computation->getComputationType() == "JoinComp")) {
456 
458  unsafeCast<JoinComp<Object, Object, Object>, Computation>(computation);
459  join->setIterator(iterators.at(i));
460  join->setProxy(proxy);
461  if (join->getBatchSize() == 0) {
462  join->setBatchSize(3);
463  }
464  join->setPartitionId(i);
465  join->setNumPartitions(this->jobStage->getNumTotalPartitions());
466  join->setNumNodes(this->jobStage->getNumNodes());
467 
468  } else {
469  // input are hash tables
471  unsafeCast<AggregateComp<Object, Object, Object, Object>, Computation>(
472  computation);
473  void* pagePointer = hashSet->getPage(i);
474  if (pagePointer != nullptr) {
475  aggregator->setHashTablePointer(hashSet->getPage(i));
476  } else {
477  std::cout << "There is no more hash partition for this thread, we simply return"
478  << std::endl;
479  return;
480  }
481  }
482 
483  // handle probing
484  std::map<std::string, ComputeInfoPtr> info;
485  if ((this->jobStage->isProbing() == true) && (this->jobStage->getHashSets() != nullptr)) {
486  Handle<Map<String, String>> hashSetsToProbe = this->jobStage->getHashSets();
487  for (PDBMapIterator<String, String> mapIter = hashSetsToProbe->begin();
488  mapIter != hashSetsToProbe->end();
489  ++mapIter) {
490  std::string key = (*mapIter).key;
491  std::string hashSetName = (*mapIter).value;
492  std::cout << "to probe " << key << ":" << hashSetName << std::endl;
493  AbstractHashSetPtr hashSet = server->getHashSet(hashSetName);
494  if (hashSet == nullptr) {
495  std::cout << "ERROR in pipeline execution: data not found in hash set "
496  << hashSetName << "!" << std::endl;
497  return;
498  }
499  if (hashSet->getHashSetType() == "SharedHashSet") {
500  SharedHashSetPtr sharedHashSet = std::dynamic_pointer_cast<SharedHashSet>(hashSet);
501  info[key] = std::make_shared<JoinArg>(*newPlan, sharedHashSet->getPage());
502  } else if (hashSet->getHashSetType() == "PartitionedHashSet") {
503  PartitionedHashSetPtr partitionedHashSet =
504  std::dynamic_pointer_cast<PartitionedHashSet>(hashSet);
505  info[key] = std::make_shared<JoinArg>(*newPlan, partitionedHashSet->getPage(i));
506  }
507  }
508  } else {
509  std::cout << "info contains nothing for this stage" << std::endl;
510  if (this->jobStage->isProbing() == true) {
511  std::cout << "this stage needs probing hash tables" << std::endl;
512  } else {
513  std::cout << "this stage doesn't need probing hash tables" << std :: endl;
514  }
515  if (this->jobStage->getHashSets() != nullptr) {
516  std::cout << "we have hash tables prepared for the stage" << std::endl;
517  } else {
518  std::cout << "we don't have hash tables prepared for the stage" << std :: endl;
519  }
520  if (sourceContext->getSetType() == UserSetType) {
521  std::cout << "this stage has a UserSetType source" << std::endl;
522  } else {
523  std::cout << "this stage doesn't have a UserSetType source" << std :: endl;
524  }
525  }
526 
527  std::cout << "source specifier: " << this->jobStage->getSourceTupleSetSpecifier() << std::endl;
528  std::cout << "target specifier: " << this->jobStage->getTargetTupleSetSpecifier() << std::endl;
529  std::cout << "target computation: " << this->jobStage->getTargetComputationSpecifier()
530  << std::endl;
531 
533  std::string targetSpecifier = jobStage->getTargetComputationSpecifier();
534  if (targetSpecifier.find("ClusterAggregationComp") != std::string::npos) {
535  Handle<Computation> aggComputation =
536  newPlan->getPlan()->getNode(targetSpecifier).getComputationHandle();
538  unsafeCast<AbstractAggregateComp, Computation>(aggComputation);
539  int numPartitionsInCluster = this->jobStage->getNumTotalPartitions();
540  PDB_COUT << "num partitions in the cluster is " << numPartitionsInCluster << std::endl;
541  aggregate->setNumNodes(jobStage->getNumNodes());
542  aggregate->setNumPartitions(numPartitionsInCluster);
543  aggregate->setBatchSize(this->batchSize);
544  } else if (targetSpecifier.find("JoinComp") != std::string::npos) {
545  Handle<Computation> joinComputation =
546  newPlan->getPlan()->getNode(targetSpecifier).getComputationHandle();
547  join = unsafeCast<JoinComp<Object, Object, Object>, Computation>(joinComputation);
548  join->setNumPartitions(this->jobStage->getNumTotalPartitions());
549  join->setNumNodes(this->jobStage->getNumNodes());
550  std::cout << "Join set to have " << join->getNumPartitions() << " partitions" << std::endl;
551  std::cout << "Join set to have " << join->getNumNodes() << " nodes" << std::endl;
552  } else if (targetSpecifier.find("PartitionComp") != std::string::npos) {
553  Handle<Computation> partitionComputation =
554  newPlan->getPlan()->getNode(targetSpecifier).getComputationHandle();
556  unsafeCast<PartitionComp<Object, Object>, Computation>(partitionComputation);
557  int numPartitionsInCluster = this->jobStage->getNumTotalPartitions();
558  PDB_COUT << "num partitions in the cluster is " << numPartitionsInCluster << std::endl;
559  partitioner->setNumPartitions(numPartitionsInCluster);
560  partitioner->setNumNodes(jobStage->getNumNodes());
561  }
562 
563 #ifdef REUSE_CONNECTION_FOR_AGG_NO_COMBINER
564  char* mem = nullptr;
565  if ((this->jobStage->isRepartition() == true) && (this->jobStage->isCombining() == false) &&
566  (join == nullptr)) {
567  mem = (char*)malloc(conf->getNetShufflePageSize());
568  }
569 #endif
570  newPlan->nullifyPlanPointer();
571  PDBPagePtr output = nullptr;
572  PipelinePtr curPipeline = newPlan->buildPipeline(
573  buildTheseTupleSets,
574  this->jobStage->getSourceTupleSetSpecifier(),
575  this->jobStage->getTargetComputationSpecifier(),
576  [&]() -> std::pair<void*, size_t> {
577  size_t headerSize = (sizeof(NodeID) + sizeof(DatabaseID) + sizeof(UserTypeID) +
578  sizeof(SetID) + sizeof(PageID) + sizeof(int) + sizeof(size_t));
579  if ((this->jobStage->isBroadcasting() == false) &&
580  (this->jobStage->isRepartition() == false) &&
581  (sourceContext->getSetType() == UserSetType)) {
582  proxy->addUserPage(outputSet->getDatabaseId(),
583  outputSet->getTypeId(),
584  outputSet->getSetId(),
585  output);
586 
587  if (output == nullptr) {
588  std::cout << "Pipeline Error: insufficient memory in heap" << std::endl;
589  return std::make_pair(nullptr, 0);
590  }
591  return std::make_pair(output->getBytes(), output->getSize());
592 
593 
594  } else if ((this->jobStage->isBroadcasting() == false) &&
595  (this->jobStage->isRepartition() == false) &&
596  (sourceContext->getSetType() != UserSetType)) {
597 
598  // TODO: move this to Pangea
599  void* myPage = calloc(outputSet->getPageSize(), 1);
600  if (myPage == nullptr) {
601  std::cout << "Pipeline Error: insufficient memory in heap" << std::endl;
602  }
603  return std::make_pair((char*)myPage + headerSize,
604  outputSet->getPageSize() - headerSize);
605 
606 
607  } else if ((this->jobStage->isBroadcasting() == true) ||
608  ((this->jobStage->isRepartition() == true) &&
609  (this->jobStage->isCombining() == false) && (join != nullptr))) {
610  // TODO: move this to Pangea
611  // join case
612  void* myPage = calloc(conf->getBroadcastPageSize(), 1);
613  if (myPage == nullptr) {
614  std::cout << "Pipeline Error: insufficient memory in heap" << std::endl;
615  }
616  return std::make_pair((char*)myPage + headerSize, conf->getNetBroadcastPageSize());
617 
618 
619  } else {
620  // TODO: move this to Pangea
621  // aggregation and partition cases
622  std::cout << "to allocate a page for storing partition sink with size="
623  << conf->getShufflePageSize() << std::endl;
624  void* myPage = calloc(conf->getShufflePageSize(), 1);
625  if (myPage == nullptr) {
626  std::cout << "Pipeline Error: insufficient memory in heap" << std::endl;
627  }
628  return std::make_pair((char*)myPage + headerSize, conf->getNetShufflePageSize());
629  }
630 
631  },
632 
633  [&](void* page) {
634 
635  if ((this->jobStage->isBroadcasting() == false) &&
636  (this->jobStage->isRepartition() == false) &&
637  (sourceContext->getSetType() == UserSetType)) {
638  if (output != nullptr) {
639  proxy->unpinUserPage(
640  nodeId, output->getDbID(), output->getTypeID(), output->getSetID(), output);
641  }
642 
643  } else {
644  free((char*)page - (sizeof(NodeID) + sizeof(DatabaseID) + sizeof(UserTypeID) +
645  sizeof(SetID) + sizeof(PageID) + sizeof(int) + sizeof(size_t)));
646  }
647  },
648 
649  [&](void* page) {
650  size_t headerSize = (sizeof(NodeID) + sizeof(DatabaseID) + sizeof(UserTypeID) +
651  sizeof(SetID) + sizeof(PageID) + sizeof(int) + sizeof(size_t));
652  if (this->jobStage->isBroadcasting() == true) {
653  PDB_COUT << "to broadcast a page" << std::endl;
654  // to handle a broadcast join
655  // get the objects
656  Record<Object>* record = (Record<Object>*)page;
657  // broadcast the objects
658  if (record != nullptr) {
659  Handle<Object> objectToSend = record->getRootObject();
660  if (objectToSend != nullptr) {
661  PDBPagePtr pageToBroadcast = std::make_shared<PDBPage>(
662  ((char*)page -
663  (sizeof(NodeID) + sizeof(DatabaseID) + sizeof(UserTypeID) +
664  sizeof(SetID) + sizeof(PageID) + sizeof(int) + sizeof(size_t))),
665  0,
666  0,
667  0,
668  0,
669  0,
670  conf->getBroadcastPageSize(),
671  0,
672  0);
673  int numNodes = jobStage->getNumNodes();
674  int k;
675  NodeID myNodeId = jobStage->getNodeId();
676  for (k = 0; k < numNodes; k++) {
677  pageToBroadcast->incRefCount();
678  }
679  for (k = 0; k < numNodes; k++) {
680  if (k != myNodeId) {
681  PageCircularBufferPtr buffer = sinkBuffers[k];
682  buffer->addPageToTail(pageToBroadcast);
683  }
684  }
685  proxy->pinBytes(outputSet->getDatabaseId(),
686  outputSet->getTypeId(),
687  outputSet->getSetId(),
688  record->numBytes(),
689  record);
690  pageToBroadcast->decRefCount();
691  if (pageToBroadcast->getRefCount() == 0) {
692  pageToBroadcast->freeContent();
693  }
694  } else {
695  free((char*)page - headerSize);
696  }
697  } else {
698  free((char*)page - headerSize);
699  }
700  } else if ((this->jobStage->isRepartition() == true) &&
701  (this->jobStage->isCombining() == false) && (join != nullptr)) {
702  PDB_COUT << "to hash partition a page" << std::endl;
703  // to handle a hash partition join
704  // get the objects
705  Record<Object>* record = (Record<Object>*)page;
706  // broadcast the objects
707  if (record != nullptr) {
708  Handle<Object> objectToSend = record->getRootObject();
709  if (objectToSend != nullptr) {
710  PDBPagePtr pageToSend = std::make_shared<PDBPage>(
711  ((char*)page -
712  (sizeof(NodeID) + sizeof(DatabaseID) + sizeof(UserTypeID) +
713  sizeof(SetID) + sizeof(PageID) + sizeof(int) + sizeof(size_t))),
714  0,
715  0,
716  0,
717  0,
718  0,
719  conf->getBroadcastPageSize(),
720  0,
721  0);
722  int numNodes = jobStage->getNumNodes();
723  int k;
724  for (k = 0; k < numNodes; k++) {
725  pageToSend->incRefCount();
726  }
727  for (k = 0; k < numNodes; k++) {
728  PageCircularBufferPtr buffer = sinkBuffers[k];
729  buffer->addPageToTail(pageToSend);
730  }
731  } else {
732  free((char*)page - headerSize);
733  }
734  } else {
735  free((char*)page - headerSize);
736  }
737 
738  } else if ((this->jobStage->isRepartition() == true) &&
739  (this->jobStage->isCombining() == true)) {
740  // to handle an aggregation
741  PDBPagePtr output = make_shared<PDBPage>(
742  (char*)page - (sizeof(NodeID) + sizeof(DatabaseID) + sizeof(UserTypeID) +
743  sizeof(SetID) + sizeof(PageID) + sizeof(int) + sizeof(size_t)),
744  0,
745  0,
746  0,
747  0,
748  0,
749  conf->getShufflePageSize(),
750  0,
751  0);
752  int numNodes = jobStage->getNumNodes();
753  int k;
754  for (k = 0; k < numNodes; k++) {
755  output->incRefCount();
756  }
757  for (k = 0; k < numNodes; k++) {
758  PageCircularBufferPtr buffer = sinkBuffers[k];
759  buffer->addPageToTail(output);
760  }
761 
762  } else if ((this->jobStage->isRepartition() == true) &&
763  (this->jobStage->isCombining() == false) && (join == nullptr)) {
764  // to handle aggregation without combining and partitioning
765  std::cout << "to shuffle data on this page" << std::endl;
768  if (record != nullptr) {
769  Handle<Vector<Handle<Vector<Handle<Object>>>>> objectsToShuffle =
770  record->getRootObject();
771  int numNodes = jobStage->getNumNodes();
772  int k;
773  for (k = 0; k < numNodes; k++) {
774  Handle<Vector<Handle<Object>>> objectToShuffle = (*objectsToShuffle)[k];
775 #ifdef REUSE_CONNECTION_FOR_AGG_NO_COMBINER
776  Record<Vector<Handle<Object>>>* myRecord =
777  getRecord(objectToShuffle, mem, conf->getNetShufflePageSize());
778  std::cout << "send " << myRecord->numBytes() << " bytes to node-" << k
779  << std::endl;
780  if (objectToShuffle != nullptr) {
781  // to shuffle data
782  sendData(connections[k],
783  myRecord,
784  myRecord->numBytes(),
785  jobStage->getSinkContext()->getDatabase(),
786  jobStage->getSinkContext()->getSetName(),
787  errMsg);
788  }
789 #else
790  if (objectToShuffle != nullptr) {
791  // to shuffle data
792  // get the i-th address
793  std::string address = this->jobStage->getIPAddress(k);
794 
795  // get the i-th port
796  int port = this->jobStage->getPort(k);
797  bool whetherToPersist = true;
798  this->storeShuffleData(objectToShuffle,
799  this->jobStage->getSinkContext()->getDatabase(),
800  this->jobStage->getSinkContext()->getSetName(),
801  address,
802  port,
803  true,
804  errMsg);
805  }
806 #endif
807  }
808  }
809  free((char*)page - headerSize);
810 
811  } else {
812  if (sourceContext->getSetType() == UserSetType) {
813  proxy->unpinUserPage(
814  nodeId, output->getDbID(), output->getTypeID(), output->getSetID(), output);
815 
816  } else {
817  // to handle a vector sink
818  // PDBPagePtr output = nullptr;
819  proxy->addUserPage(outputSet->getDatabaseId(),
820  outputSet->getTypeId(),
821  outputSet->getSetId(),
822  output);
823  memcpy(output->getBytes(), page, output->getSize());
824  proxy->unpinUserPage(
825  nodeId, output->getDbID(), output->getTypeID(), output->getSetID(), output);
826  free((char*)page - headerSize);
827  }
828  }
829  },
830 
831  info);
832  std::cout << "\nRunning Pipeline\n";
833  curPipeline->run();
834  curPipeline = nullptr;
835  newPlan->nullifyPlanPointer();
836 #ifdef REUSE_CONNECTION_FOR_AGG_NO_COMBINER
837  makeObjectAllocatorBlock(4 * 1024 * 1024, true);
838  for (int j = 0; j < jobStage->getNumNodes(); j++) {
839  sendData(connections[j],
840  nullptr,
842  jobStage->getSinkContext()->getDatabase(),
843  jobStage->getSinkContext()->getSetName(),
844  errMsg);
845  }
846  if (mem != nullptr) {
847  free(mem);
848  }
849 #endif
850 }
851 
852 
854 
855  std::vector<PageCircularBufferPtr> sinkBuffers;
856  SetSpecifierPtr outputSet =
857  make_shared<SetSpecifier>(jobStage->getSinkContext()->getDatabase(),
858  jobStage->getSinkContext()->getSetName(),
859  jobStage->getSinkContext()->getDatabaseId(),
860  jobStage->getSinkContext()->getTypeId(),
861  jobStage->getSinkContext()->getSetId(),
862  jobStage->getSinkContext()->getPageSize());
863  runPipeline(server, sinkBuffers, outputSet);
864 }
865 
866 
867 // combinerBuffers can be empty if the pipeline doesn't need combining
869  std::vector<PageCircularBufferPtr> sinkBuffers,
870  SetSpecifierPtr outputSet) {
871 // std :: cout << "Pipeline network is running" << std :: endl;
872 #ifdef ENABLE_LARGE_GRAPH
873  UseTemporaryAllocationBlock tempBlock{256 * 1024 * 1024};
874 #else
875  UseTemporaryAllocationBlock tempBlock{32 * 1024 * 1024};
876 #endif
877  bool success;
878  std::string errMsg;
879  int numPartitions = 0;
880  int sourceCounter = 0;
881  PDBBuzzerPtr sourceBuzzer;
882  std::vector<PageCircularBufferPtr> sourceBuffers;
883  // get user set iterators
884  std::vector<PageCircularBufferIteratorPtr> iterators;
885  PartitionedHashSetPtr hashSet;
886  Handle<SetIdentifier> sourceContext = this->jobStage->getSourceContext();
887 
888  // to get computations
889  Handle<ComputePlan> plan = this->jobStage->getComputePlan();
890  plan->nullifyPlanPointer();
891  PDB_COUT << "to deep copy ComputePlan object" << std::endl;
892  Handle<ComputePlan> newPlan = deepCopyToCurrentAllocationBlock<ComputePlan>(plan);
893  bool isHashPartitionedJoinProbing = false;
894  Handle<Computation> computation = nullptr;
895  std::vector<std::string> buildTheseTupleSets;
896  jobStage->getTupleSetsToBuildPipeline(buildTheseTupleSets);
897  PDB_COUT << "buildTheseTupleSets[0]=" << buildTheseTupleSets[0] << std::endl;
898  std::string sourceSpecifier = jobStage->getSourceTupleSetSpecifier();
899  PDB_COUT << "Source tupleset name=" << sourceSpecifier << std::endl;
900  if (buildTheseTupleSets[0] != sourceSpecifier) {
901  std::string producerComputationName =
902  newPlan->getProducingComputationName(buildTheseTupleSets[0]);
903  PDB_COUT << "Producer computation name=" << producerComputationName << std::endl;
904  computation = newPlan->getPlan()->getNode(producerComputationName).getComputationHandle();
905  if (computation->getComputationType() == "JoinComp") {
906  isHashPartitionedJoinProbing = true;
907  }
908  }
909  if (isHashPartitionedJoinProbing == false) {
910  std::string producerComputationName = newPlan->getProducingComputationName(sourceSpecifier);
911  PDB_COUT << "Producer computation name=" << producerComputationName << std::endl;
912  computation = newPlan->getPlan()->getNode(producerComputationName).getComputationHandle();
913  }
914 
915 
916  if ((sourceContext->getSetType() == UserSetType) &&
917  (computation->getComputationType() != "JoinComp")) {
918  iterators = getUserSetIterators(server, numThreads, success, errMsg);
919  } else if ((sourceContext->getSetType() == UserSetType) &&
920  (computation->getComputationType() == "JoinComp")) {
921  int sourceBufferSize = 2;
922  int numPartitionsInCluster = this->jobStage->getNumTotalPartitions();
923  int numNodes = this->jobStage->getNumNodes();
924  numPartitions = numPartitionsInCluster / numNodes;
925  for (int i = 0; i < numPartitions; i++) {
926  PDBLoggerPtr myLogger =
927  make_shared<PDBLogger>(std::string("scanPartitionedSource-") + std::to_string(i));
928  PageCircularBufferPtr buffer =
929  make_shared<PageCircularBuffer>(sourceBufferSize, myLogger);
930  sourceBuffers.push_back(buffer);
932  make_shared<PageCircularBufferIterator>(i, buffer, myLogger);
933  iterators.push_back(iter);
934  }
935  } else {
936  std::string hashSetName = sourceContext->getDatabase() + ":" + sourceContext->getSetName();
937  AbstractHashSetPtr abstractHashSet = server->getHashSet(hashSetName);
938  hashSet = std::dynamic_pointer_cast<PartitionedHashSet>(abstractHashSet);
939  numThreads = hashSet->getNumPages();
940  }
941  // initialize mutextes
942  pthread_mutex_t connection_mutex;
943  pthread_mutex_init(&connection_mutex, nullptr);
944 
945  // create a buzzer and counter
946  PDBBuzzerPtr tempBuzzer =
947  make_shared<PDBBuzzer>([&](PDBAlarm myAlarm, int& counter) { counter++; });
948 
949  int numSourceThreads = numThreads;
950  if (numPartitions > 0) {
951  numSourceThreads = numPartitions;
952  }
953  std::cout << "to run pipeline with " << numSourceThreads << " threads." << std::endl;
954  int counter = 0;
955 
956  for (int i = 0; i < numSourceThreads; i++) {
957  PDBWorkerPtr worker =
958  server->getFunctionality<HermesExecutionServer>().getWorkers()->getWorker();
959  PDB_COUT << "to run the " << i << "-th work..." << std::endl;
960  // TODO: start threads
961  PDBWorkPtr myWork = make_shared<GenericWork>([&, i](PDBBuzzerPtr callerBuzzer) {
962 
963  std::string out = getAllocator().printInactiveBlocks();
964  logger->warn(out);
965 #ifdef PROFILING
966  std::cout << "print inactive blocks before running pipeline in this worker:"
967  << std::endl;
968  std::cout << out << std::endl;
969 #endif
970  // create a data proxy
971  DataProxyPtr proxy = createProxy(i, connection_mutex, errMsg);
972 
973  // set allocator policy
974  getAllocator().setPolicy(jobStage->getAllocatorPolicy());
975 
976  // setup an output page to store intermediate results and final output
978  i, outputSet, iterators, hashSet, proxy, sinkBuffers, server, errMsg);
979 
980  // restore allocator policy
982 #ifdef PROFILING
984  std::cout << "print inactive blocks after running pipeline in this worker:"
985  << std::endl;
986  std::cout << out << std::endl;
987 #endif
988  callerBuzzer->buzz(PDBAlarm::WorkAllDone, counter);
989 
990  }
991 
992  );
993  worker->execute(myWork, tempBuzzer);
994  }
995 
996  if ((sourceContext->getSetType() == UserSetType) &&
997  (computation->getComputationType() == "JoinComp")) {
998  // start the scanning thread
999  std::cout << "start scanning source set and put pages to source buffers" << std::endl;
1000  sourceCounter = 0;
1001  sourceBuzzer = make_shared<PDBBuzzer>([&](PDBAlarm myAlarm, int& sourceCounter) {
1002  sourceCounter++;
1003  PDB_COUT << "source counter = " << sourceCounter << std::endl;
1004  });
1005  // get scan iterators
1006  std::vector<PageCircularBufferIteratorPtr> scanIterators =
1007  getUserSetIterators(server, 1, success, errMsg);
1008  int numScanThreads = scanIterators.size();
1009  std::cout << "we've got " << numScanThreads << " iterators" << std::endl;
1010  // start multiple thread to scan the set
1011  for (int i = 0; i < numScanThreads; i++) {
1012  // each threads get a page and put the page to each source buffer
1013  PDBWorkerPtr worker =
1014  server->getFunctionality<HermesExecutionServer>().getWorkers()->getWorker();
1015  std::cout << "to run the " << i << "-th scan work..." << std::endl;
1016  // start threads
1017  PDBWorkPtr myWork = make_shared<GenericWork>([&, i](PDBBuzzerPtr callerBuzzer) {
1018  // setup an output page to store intermediate results and final output
1019  const UseTemporaryAllocationBlock tempBlock{4 * 1024 * 1024};
1020  PageCircularBufferIteratorPtr iter = scanIterators[i];
1021  PDBPagePtr page = nullptr;
1022  while (iter->hasNext()) {
1023  page = iter->next();
1024  if (page != nullptr) {
1025  std::cout << "Scanner got a non-null page" << std::endl;
1026  for (int j = 0; j < numPartitions; j++) {
1027  page->incRefCount();
1028  }
1029  std::cout << "Initialize join source page reference count to "
1030  << page->getRefCount() << std::endl;
1031  for (int j = 0; j < numPartitions; j++) {
1032  sourceBuffers[j]->addPageToTail(page);
1033  }
1034  }
1035  }
1036  callerBuzzer->buzz(PDBAlarm::WorkAllDone, sourceCounter);
1037  });
1038 
1039  worker->execute(myWork, sourceBuzzer);
1040  } // for
1041 
1042  while (sourceCounter < 1) {
1043  sourceBuzzer->wait();
1044  }
1045  sourceCounter = 0;
1046  std::cout << "Scanned all pages, now we close all source buffers" << std::endl;
1047 
1048  for (int i = 0; i < numPartitions; i++) {
1049  PageCircularBufferPtr buffer = sourceBuffers[i];
1050  buffer->close();
1051  }
1052  }
1053 
1054  while (counter < numSourceThreads) {
1055  tempBuzzer->wait();
1056  }
1057 
1058  counter = 0;
1059  pthread_mutex_destroy(&connection_mutex);
1060 
1061 
1062  if (server->getFunctionality<HermesExecutionServer>().setCurPageScanner(nullptr) == false) {
1063  success = false;
1064  errMsg = "Error: No job is running!";
1065  std::cout << errMsg << std::endl;
1066  return;
1067  }
1068 
1069  return;
1070 }
1071 
1072 
1073 // below method will run the combiner
1075  bool success;
1076  std::string errMsg;
1077 
1078  int numNodes = jobStage->getNumNodes();
1079 
1080 #ifdef AUTO_TUNING
1081  size_t memSize = jobStage->getTotalMemoryOnThisNode();
1082  size_t sharedMemPoolSize = conf->getShmSize();
1083 #ifndef USE_VALGRIND
1084  size_t tunedHashPageSize =
1085  (double)(memSize * ((size_t)(1024)) - sharedMemPoolSize - server->getHashSetsSize()) *
1086  (0.8) / (double)(numNodes);
1087 #else
1088  size_t tunedHashPageSize =
1089  (double)(memSize * ((size_t)(1024)) - sharedMemPoolSize - server->getHashSetsSize()) *
1090  (0.5) / (double)(numNodes);
1091 #endif
1092  if (memSize * ((size_t)(1024)) <
1093  sharedMemPoolSize + (size_t)512 * (size_t)1024 * (size_t)1024) {
1094  std::cout << "WARNING: Auto tuning can not work for this case, we use default value"
1095  << std::endl;
1096  tunedHashPageSize = conf->getHashPageSize();
1097  }
1098 
1099  std::cout << "Tuned combiner page size is " << tunedHashPageSize << std::endl;
1100  conf->setHashPageSize(tunedHashPageSize);
1101 #endif
1102 
1103 
1104  size_t combinerPageSize = conf->getHashPageSize();
1105  // each queue has multiple producers and one consumer
1106  int combinerBufferSize = numThreads;
1107  if (combinerBufferSize > 12) {
1108  combinerBufferSize = 12;
1109  }
1110  PDB_COUT << "combinerBufferSize=" << combinerBufferSize << std::endl;
1111  std::vector<PageCircularBufferPtr> combinerBuffers;
1112  std::vector<PageCircularBufferIteratorPtr> combinerIters;
1113 
1114  pthread_mutex_t connection_mutex;
1115  pthread_mutex_init(&connection_mutex, nullptr);
1116 
1117  // create a buzzer and counter
1118  PDBBuzzerPtr combinerBuzzer =
1119  make_shared<PDBBuzzer>([&](PDBAlarm myAlarm, int& combinerCounter) {
1120  combinerCounter++;
1121  PDB_COUT << "combinerCounter = " << combinerCounter << std::endl;
1122  });
1123  PDB_COUT << "to run combiner with " << numNodes << " threads." << std::endl;
1124  int combinerCounter = 0;
1125 
1126  int i;
1127  for (i = 0; i < numNodes; i++) {
1128  PageCircularBufferPtr buffer = make_shared<PageCircularBuffer>(combinerBufferSize, logger);
1129  combinerBuffers.push_back(buffer);
1131  make_shared<PageCircularBufferIterator>(i, buffer, logger);
1132  combinerIters.push_back(iter);
1133  PDBWorkerPtr worker =
1134  server->getFunctionality<HermesExecutionServer>().getWorkers()->getWorker();
1135  PDB_COUT << "to run the " << i << "-th combining work..." << std::endl;
1136  // start threads
1137  PDBWorkPtr myWork = make_shared<GenericWork>([&, i](PDBBuzzerPtr callerBuzzer) {
1138 
1139  std::string out = getAllocator().printInactiveBlocks();
1140  logger->warn(out);
1141 #ifdef PROFILING
1142  std::cout << "inactive blocks before running combiner in this worker:" << std::endl;
1143  std::cout << out << std::endl;
1144 #endif
1146 
1147  // to combine data for node-i
1148 
1149  std::string errMsg;
1150 
1151  // create data proxy
1152  DataProxyPtr proxy = createProxy(i, connection_mutex, errMsg);
1153 
1154  std::string address = "";
1155  int port = 0;
1156  int numNodesToCollect = this->jobStage->getNumNodesToCollect();
1157  if (this->jobStage->isCollectAsMap() == false) {
1158 
1159  // get the i-th address
1160  address = this->jobStage->getIPAddress(i);
1161  PDB_COUT << "address = " << address << std::endl;
1162 
1163  // get the i-th port
1164  port = this->jobStage->getPort(i);
1165  PDB_COUT << "port = " << port << std::endl;
1166 
1167  } else {
1168 
1169  // get the 1-st address
1170  address = this->jobStage->getIPAddress(i % numNodesToCollect);
1171  PDB_COUT << "address = " << address << std::endl;
1172 
1173  // get the 1-st port
1174  port = this->jobStage->getPort(i % numNodesToCollect);
1175  PDB_COUT << "port = " << port << std::endl;
1176  }
1177  // get aggregate computation
1178  PDB_COUT << i << ": to get compute plan" << std::endl;
1179 #ifdef ENABLE_LARGE_GRAPH
1180  const UseTemporaryAllocationBlock tempBlock{256 * 1024 * 1024};
1181 #else
1182  const UseTemporaryAllocationBlock tempBlock{32 * 1024 * 1024};
1183 #endif
1184  Handle<ComputePlan> plan = this->jobStage->getComputePlan();
1185  plan->nullifyPlanPointer();
1186  PDB_COUT << i << ": to deep copy ComputePlan object" << std::endl;
1187  Handle<ComputePlan> newPlan = deepCopyToCurrentAllocationBlock<ComputePlan>(plan);
1188  std::string targetSpecifier = jobStage->getTargetComputationSpecifier();
1189  PDB_COUT << "target computation name=" << targetSpecifier << std::endl;
1190  Handle<Computation> computation =
1191  newPlan->getPlan()->getNode(targetSpecifier).getComputationHandle();
1192  Handle<AbstractAggregateComp> aggregate =
1193  unsafeCast<AbstractAggregateComp, Computation>(computation);
1194  Handle<Vector<HashPartitionID>> partitions = this->jobStage->getNumPartitions(i);
1195  std::vector<HashPartitionID> stdPartitions;
1196  int numPartitionsOnTheNode = partitions->size();
1197  PDB_COUT << "num partitions on this node:" << numPartitionsOnTheNode << std::endl;
1198  for (int m = 0; m < numPartitionsOnTheNode; m++) {
1199  PDB_COUT << m << ":" << (*partitions)[m] << std::endl;
1200  stdPartitions.push_back((*partitions)[m]);
1201  }
1202  // get combiner processor
1203  SimpleSingleTableQueryProcessorPtr combinerProcessor =
1204  aggregate->getCombinerProcessor(stdPartitions);
1205  size_t myCombinerPageSize = combinerPageSize;
1206  if (myCombinerPageSize > conf->getShufflePageSize() - 64) {
1207  myCombinerPageSize = conf->getShufflePageSize() - 64;
1208  }
1209  void* combinerPage = (void*)calloc(myCombinerPageSize, sizeof(char));
1210  if (combinerPage == nullptr) {
1211  std::cout << "Fatal Error: insufficient memory can be allocated from memory"
1212  << std::endl;
1213  exit(-1);
1214  }
1215  std::cout << i << ": load a combiner page with size = " << myCombinerPageSize
1216  << std::endl;
1217  combinerProcessor->loadOutputPage(combinerPage, myCombinerPageSize);
1218 
1219  PageCircularBufferIteratorPtr myIter = combinerIters[i];
1220  int numPages = 0;
1221  while (myIter->hasNext()) {
1222  PDBPagePtr page = myIter->next();
1223  if (page != nullptr) {
1224  // to load input page
1225  numPages++;
1226  combinerProcessor->loadInputPage(page->getBytes());
1227  while (combinerProcessor->fillNextOutputPage()) {
1228  // send out the output page
1229  Record<Vector<Handle<Object>>>* record =
1230  (Record<Vector<Handle<Object>>>*)combinerPage;
1231 #ifndef ENABLE_COMPRESSION
1232  this->storeShuffleData(record->getRootObject(),
1233  this->jobStage->getSinkContext()->getDatabase(),
1234  this->jobStage->getSinkContext()->getSetName(),
1235  address,
1236  port,
1237  false,
1238  errMsg);
1239 #else
1240  char* compressedBytes =
1241  new char[snappy::MaxCompressedLength(record->numBytes())];
1242  size_t compressedSize;
1243  snappy::RawCompress(
1244  (char*)record, record->numBytes(), compressedBytes, &compressedSize);
1245  std::cout << "size before compression is " << record->numBytes()
1246  << " and size after compression is " << compressedSize
1247  << std::endl;
1249  compressedBytes,
1250  compressedSize,
1251  this->jobStage->getSinkContext()->getDatabase(),
1252  this->jobStage->getSinkContext()->getSetName(),
1253  address,
1254  port,
1255  errMsg);
1256  delete[] compressedBytes;
1257 
1258 #endif
1259 
1260 
1261  // free the output page
1262  combinerProcessor->clearOutputPage();
1263  free(combinerPage);
1264  // allocate a new page
1265  combinerPage = (void*)malloc(myCombinerPageSize * sizeof(char));
1266  if (combinerPage == nullptr) {
1267  std::cout
1268  << "Fatal Error: insufficient memory can be allocated from memory"
1269  << std::endl;
1270  exit(-1);
1271  }
1272  std::cout << "load a combiner page with size = " << myCombinerPageSize
1273  << std::endl;
1274  // load the new page as output vector
1275  combinerProcessor->loadOutputPage(combinerPage, myCombinerPageSize);
1276  }
1277  // unpin the input page
1278  // combinerProcessor->clearInputPage();
1279  page->decRefCount();
1280  if (page->getRefCount() == 0) {
1281  page->freeContent();
1282  }
1283  }
1284  }
1285  combinerProcessor->finalize();
1286  combinerProcessor->fillNextOutputPage();
1287  // send the output page
1288  PDB_COUT << "processed " << numPages << " pages" << std::endl;
1289  Record<Vector<Handle<Object>>>* record = (Record<Vector<Handle<Object>>>*)combinerPage;
1290 #ifndef ENABLE_COMPRESSION
1291  this->storeShuffleData(record->getRootObject(),
1292  this->jobStage->getSinkContext()->getDatabase(),
1293  this->jobStage->getSinkContext()->getSetName(),
1294  address,
1295  port,
1296  false,
1297  errMsg);
1298 #else
1299  char* compressedBytes = new char[snappy::MaxCompressedLength(record->numBytes())];
1300  size_t compressedSize;
1301  snappy::RawCompress(
1302  (char*)record, record->numBytes(), compressedBytes, &compressedSize);
1303  std::cout << "size before compression is " << record->numBytes()
1304  << " and size after compression is " << compressedSize << std::endl;
1305  this->storeCompressedShuffleData(compressedBytes,
1306  compressedSize,
1307  this->jobStage->getSinkContext()->getDatabase(),
1308  this->jobStage->getSinkContext()->getSetName(),
1309  address,
1310  port,
1311  errMsg);
1312  delete[] compressedBytes;
1313 
1314 #endif
1315 
1316  // free the output page
1317  combinerProcessor->clearOutputPage();
1318  free(combinerPage);
1320 #ifdef PROFILING
1322  std::cout << "inactive blocks after running combiner in this worker:" << std::endl;
1323  std::cout << out << std::endl;
1324 #endif
1325  getAllocator().cleanInactiveBlocks((size_t)((size_t)32 * (size_t)1024 * (size_t)1024));
1326  getAllocator().cleanInactiveBlocks((size_t)((size_t)128 * (size_t)1024 * (size_t)1024));
1328  callerBuzzer->buzz(PDBAlarm::WorkAllDone, combinerCounter);
1329  }
1330 
1331  );
1332  worker->execute(myWork, combinerBuzzer);
1333  }
1334  SetSpecifierPtr outputSet =
1335  make_shared<SetSpecifier>(jobStage->getCombinerContext()->getDatabase(),
1336  jobStage->getCombinerContext()->getSetName(),
1337  jobStage->getCombinerContext()->getDatabaseId(),
1338  jobStage->getCombinerContext()->getTypeId(),
1339  jobStage->getCombinerContext()->getSetId(),
1340  jobStage->getCombinerContext()->getPageSize());
1341  runPipeline(server, combinerBuffers, outputSet);
1342 
1343 
1344  int k;
1345  for (k = 0; k < numNodes; k++) {
1346  PageCircularBufferPtr buffer = combinerBuffers[k];
1347  buffer->close();
1348  }
1349 
1350  while (combinerCounter < numNodes) {
1351  combinerBuzzer->wait();
1352  }
1353 
1354  combinerCounter = 0;
1355  return;
1356 }
1357 
1358 // below method will run broadcasting
1360 
1361  bool success;
1362  std::string errMsg;
1363 
1364  int numNodes = jobStage->getNumNodes();
1365 
1366  // each queue has multiple producers and one consumer
1367  int shuffleBufferSize = numThreads;
1368  if (shuffleBufferSize > 12) {
1369  shuffleBufferSize = 12;
1370  }
1371  PDB_COUT << "shuffleBufferSize=" << shuffleBufferSize << std::endl;
1372  std::vector<PageCircularBufferPtr> shuffleBuffers;
1373  std::vector<PageCircularBufferIteratorPtr> shuffleIters;
1374 
1375  pthread_mutex_t connection_mutex;
1376  pthread_mutex_init(&connection_mutex, nullptr);
1377 
1378  // create a buzzer and counter
1379  PDBBuzzerPtr shuffleBuzzer = make_shared<PDBBuzzer>([&](PDBAlarm myAlarm, int& shuffleCounter) {
1380  shuffleCounter++;
1381  PDB_COUT << "shuffleCounter = " << shuffleCounter << std::endl;
1382  });
1383  PDB_COUT << "to run shuffle with " << numNodes << " threads." << std::endl;
1384  int shuffleCounter = 0;
1385 
1386  int i;
1387  NodeID myNodeId = jobStage->getNodeId();
1388  for (i = 0; i < numNodes; i++) {
1389  PageCircularBufferPtr buffer = make_shared<PageCircularBuffer>(shuffleBufferSize, logger);
1390  shuffleBuffers.push_back(buffer);
1392  make_shared<PageCircularBufferIterator>(i, buffer, logger);
1393  shuffleIters.push_back(iter);
1394  PDBWorkerPtr worker =
1395  server->getFunctionality<HermesExecutionServer>().getWorkers()->getWorker();
1396  PDB_COUT << "to run the " << i << "-th broadcasting work..." << std::endl;
1397  // start threads
1398  PDBWorkPtr myWork = make_shared<GenericWork>([&, i](PDBBuzzerPtr callerBuzzer) {
1399  if (i == myNodeId) {
1400  callerBuzzer->buzz(PDBAlarm::WorkAllDone, shuffleCounter);
1401  return;
1402  }
1403  UseTemporaryAllocationBlock tempBlock{2 * 1024 * 1024};
1404  std::string out = getAllocator().printInactiveBlocks();
1405  logger->warn(out);
1406 #ifdef PROFILING
1407  std::cout << "inactive blocks before sending data in this worker:" << std::endl;
1408  std::cout << out << std::endl;
1409 #endif
1410 
1411  std::string errMsg;
1412 
1413  // get the i-th address
1414  std::string address = this->jobStage->getIPAddress(i);
1415  PDB_COUT << "address = " << address << std::endl;
1416 
1417  // get the i-th port
1418  int port = this->jobStage->getPort(i);
1419 
1420  PDB_COUT << "port = " << port << std::endl;
1421  // get aggregate computation
1422 
1423  PDBCommunicatorPtr communicator = std::make_shared<PDBCommunicator>();
1424  communicator->connectToInternetServer(logger, port, address, errMsg);
1425 
1426  PageCircularBufferIteratorPtr myIter = shuffleIters[i];
1427  int numPages = 0;
1428  while (myIter->hasNext()) {
1429  PDBPagePtr page = myIter->next();
1430  if (page != nullptr) {
1431  // to load input page
1432  numPages++;
1433  // send out the page
1434  Record<Object>* myRecord = (Record<Object>*)(page->getBytes());
1435  sendData(communicator,
1436  myRecord,
1437  myRecord->numBytes(),
1438  jobStage->getSinkContext()->getDatabase(),
1439  jobStage->getSinkContext()->getSetName(),
1440  errMsg);
1441  // unpin the input page
1442  page->decRefCount();
1443  if (page->getRefCount() == 0) {
1444  page->freeContent();
1445  }
1446  }
1447  }
1448  std::cout << "broadcasted " << numPages << " pages to address: " << address
1449  << std::endl;
1450  sendData(communicator,
1451  nullptr,
1453  jobStage->getSinkContext()->getDatabase(),
1454  jobStage->getSinkContext()->getSetName(),
1455  errMsg);
1456 #ifdef PROFILING
1458  std::cout << "inactive blocks after sending data in this worker:" << std::endl;
1459  std::cout << out << std::endl;
1460 #endif
1461  getAllocator().cleanInactiveBlocks((size_t)((size_t)32 * (size_t)1024 * (size_t)1024));
1462  getAllocator().cleanInactiveBlocks((size_t)((size_t)128 * (size_t)1024 * (size_t)1024));
1464  callerBuzzer->buzz(PDBAlarm::WorkAllDone, shuffleCounter);
1465  }
1466 
1467  );
1468  worker->execute(myWork, shuffleBuzzer);
1469  }
1470  SetSpecifierPtr outputSet =
1471  make_shared<SetSpecifier>(jobStage->getSinkContext()->getDatabase(),
1472  jobStage->getSinkContext()->getSetName(),
1473  jobStage->getSinkContext()->getDatabaseId(),
1474  jobStage->getSinkContext()->getTypeId(),
1475  jobStage->getSinkContext()->getSetId(),
1476  jobStage->getSinkContext()->getPageSize());
1477  runPipeline(server, shuffleBuffers, outputSet);
1478 
1479 
1480  int k;
1481  for (k = 0; k < numNodes; k++) {
1482  PageCircularBufferPtr buffer = shuffleBuffers[k];
1483  buffer->close();
1484  }
1485 
1486  while (shuffleCounter < numNodes) {
1487  shuffleBuzzer->wait();
1488  }
1489 
1490  shuffleCounter = 0;
1491  return;
1492 }
1493 
1494 // below method will run hash partitioning
1496 
1497  bool success;
1498  std::string errMsg;
1499 
1500  int numNodes = jobStage->getNumNodes();
1501 
1502  // each queue has multiple producers and one consumer
1503  int shuffleBufferSize = numThreads;
1504  if (shuffleBufferSize > 12) {
1505  shuffleBufferSize = 12;
1506  }
1507  PDB_COUT << "shuffleBufferSize=" << shuffleBufferSize << std::endl;
1508  std::vector<PageCircularBufferPtr> shuffleBuffers;
1509  std::vector<PageCircularBufferIteratorPtr> shuffleIters;
1510 
1511  pthread_mutex_t connection_mutex;
1512  pthread_mutex_init(&connection_mutex, nullptr);
1513 
1514  // create a buzzer and counter
1515  PDBBuzzerPtr shuffleBuzzer = make_shared<PDBBuzzer>([&](PDBAlarm myAlarm, int& shuffleCounter) {
1516  shuffleCounter++;
1517  PDB_COUT << "shuffleCounter = " << shuffleCounter << std::endl;
1518  });
1519  PDB_COUT << "to run shuffle with " << numNodes << " threads." << std::endl;
1520  int shuffleCounter = 0;
1521 
1522  int i;
1523  NodeID myNodeId = jobStage->getNodeId();
1524  for (i = 0; i < numNodes; i++) {
1525  PageCircularBufferPtr buffer = make_shared<PageCircularBuffer>(shuffleBufferSize, logger);
1526  shuffleBuffers.push_back(buffer);
1528  make_shared<PageCircularBufferIterator>(i, buffer, logger);
1529  shuffleIters.push_back(iter);
1530  PDBWorkerPtr worker =
1531  server->getFunctionality<HermesExecutionServer>().getWorkers()->getWorker();
1532  PDB_COUT << "to run the " << i << "-th hash partitioning work..." << std::endl;
1533  // start threads
1534  PDBWorkPtr myWork = make_shared<GenericWork>([&, i](PDBBuzzerPtr callerBuzzer) {
1535  UseTemporaryAllocationBlock tempBlock{32 * 1024 * 1024};
1536  std::string out = getAllocator().printInactiveBlocks();
1537  logger->warn(out);
1538 #ifdef PROFILING
1539  std::cout << "inactive blocks before sending data in this worker:" << std::endl;
1540  std::cout << out << std::endl;
1541 #endif
1542 
1543  // to combine data for node-i
1544  std::string errMsg;
1545 
1546  // create the data proxy
1547  DataProxyPtr proxy = nullptr;
1548  if (i == myNodeId) {
1549  proxy = createProxy(i, connection_mutex, errMsg);
1550  }
1551  // get the i-th address
1552  std::string address = this->jobStage->getIPAddress(i);
1553  PDB_COUT << "address = " << address << std::endl;
1554 
1555  // get the i-th port
1556  int port = this->jobStage->getPort(i);
1557 
1558  PDB_COUT << "port = " << port << std::endl;
1559 
1560  PDBCommunicatorPtr communicator = std::make_shared<PDBCommunicator>();
1561  communicator->connectToInternetServer(logger, port, address, errMsg);
1562 
1563  // get join computation
1564  PDB_COUT << i << ": to get compute plan" << std::endl;
1565  Handle<ComputePlan> plan = this->jobStage->getComputePlan();
1566  plan->nullifyPlanPointer();
1567  PDB_COUT << i << ": to deep copy ComputePlan object" << std::endl;
1568  Handle<ComputePlan> newPlan = deepCopyToCurrentAllocationBlock<ComputePlan>(plan);
1569  std::string sourceTupleSetSpecifier = jobStage->getSourceTupleSetSpecifier();
1570  std::string targetTupleSetSpecifier = jobStage->getTargetTupleSetSpecifier();
1571  std::string targetSpecifier = jobStage->getTargetComputationSpecifier();
1572  // get shuffler
1573  SinkShufflerPtr shuffler = newPlan->getShuffler(
1574  sourceTupleSetSpecifier, targetTupleSetSpecifier, targetSpecifier);
1575  shuffler->setNodeId(i);
1576  PageCircularBufferIteratorPtr myIter = shuffleIters[i];
1577  int numPages = 0;
1578  int numMaps = 0;
1579 
1580  // set non-reuse policy
1581  char* output = nullptr;
1582  char* buffer = nullptr;
1583  Handle<Object> myMaps = nullptr;
1584  while (myIter->hasNext()) {
1585  PDBPagePtr page = myIter->next();
1586  if (page != nullptr) {
1587  // to load output page
1588  if (output == nullptr) {
1589  //use broadcastPageSize for broadcast join and hash partition join
1590  output = (char*)calloc(conf->getNetBroadcastPageSize(), 1);
1591  makeObjectAllocatorBlock(output, conf->getNetBroadcastPageSize(), true);
1592  std::cout << getAllocator().printCurrentBlock() << std::endl;
1593  myMaps = shuffler->createNewOutputContainer();
1594  }
1595  // get the vector corresponding to the i-th node
1596  // for each the map in the vector, we shuffle the map to the output page
1598  (Record<Vector<
1599  Handle<Vector<Handle<JoinMap<JoinTupleBase>>>>>>*)(page->getBytes());
1601  (Record<Vector<Handle<Vector<Handle<Object>>>>>*)(page->getBytes());
1602  if (record != nullptr) {
1603  Handle<Vector<Handle<Vector<Handle<Object>>>>> objectsToShuffle =
1604  record->getRootObject();
1605  Handle<Vector<Handle<Object>>>& objectToShuffle = (*objectsToShuffle)[i];
1606  Vector<Handle<Object>>& theOtherMaps = *objectToShuffle;
1607  for (int j = 0; j < theOtherMaps.size(); j++) {
1608  {
1609 
1610  bool success = shuffler->writeOut(theOtherMaps[j], myMaps);
1611  if (success == false) {
1612  // output page is full, send it out
1613  getRecord(myMaps);
1614  Record<Object>* myRecord = (Record<Object>*)output;
1615  size_t numBytes = myRecord->numBytes();
1616  char* sendBuffer = (char*)malloc(numBytes);
1617  if (sendBuffer == nullptr) {
1618  std::cout << "Out of memory on heap" << std::endl;
1619  exit(-1);
1620  }
1621  memcpy(sendBuffer, output, numBytes);
1622  if (i != myNodeId) {
1623  std::cout << getAllocator().printCurrentBlock()
1624  << std::endl;
1625  makeObjectAllocatorBlock(128 * 1024, true);
1626 
1627  sendData(communicator,
1628  sendBuffer,
1629  numBytes,
1630  jobStage->getSinkContext()->getDatabase(),
1631  jobStage->getSinkContext()->getSetName(),
1632  errMsg);
1633  } else {
1634  makeObjectAllocatorBlock(128 * 1024, true);
1635  proxy->pinBytes(jobStage->getSinkContext()->getDatabaseId(),
1636  jobStage->getSinkContext()->getTypeId(),
1637  jobStage->getSinkContext()->getSetId(),
1638  numBytes,
1639  sendBuffer,
1640  false);
1641  }
1642  free(sendBuffer);
1643  numPages++;
1644  // free the output page and reload a new output page
1645  myMaps = nullptr;
1646  buffer = (char*)calloc(conf->getNetBroadcastPageSize(), 1);
1647  makeObjectAllocatorBlock(buffer, conf->getNetBroadcastPageSize(), true);
1648  // redo for current map;
1649  myMaps = shuffler->createNewOutputContainer();
1650  shuffler->writeOut(theOtherMaps[j], myMaps);
1651  }
1652  }
1653  numMaps++;
1654  if ((output != nullptr) && (buffer != nullptr) && (output != buffer)) {
1655  free(output);
1656  output = buffer;
1657  }
1658  } // for
1659 
1660  } // if (record != nullptr)
1661  // unpin the input page
1662  page->decRefCount();
1663  if (page->getRefCount() == 0) {
1664  page->freeContent();
1665  }
1666  } // if
1667  } // while
1668  // send out the page
1670  std::cout << "inactive blocks before sending data in this worker:" << std::endl;
1671  std::cout << out << std::endl;
1672  if (myMaps != nullptr) {
1673  getRecord(myMaps);
1674  Record<Object>* myRecord = (Record<Object>*)output;
1675  size_t numBytes = myRecord->numBytes();
1676  char* sendBuffer = (char*)malloc(numBytes);
1677  if (sendBuffer == nullptr) {
1678  std::cout << "Out of memory on heap" << std::endl;
1679  exit(-1);
1680  }
1681  memcpy(sendBuffer, output, numBytes);
1683  std::cout << "inactive blocks before sending data in this worker:" << std::endl;
1684  std::cout << out << std::endl;
1685  if (i != myNodeId) {
1686  std::cout << getAllocator().printCurrentBlock() << std::endl;
1687  makeObjectAllocatorBlock(128 * 1024, true);
1688  sendData(communicator,
1689  sendBuffer,
1690  numBytes,
1691  jobStage->getSinkContext()->getDatabase(),
1692  jobStage->getSinkContext()->getSetName(),
1693  errMsg);
1694  } else {
1695  makeObjectAllocatorBlock(128 * 1024, true);
1696  proxy->pinBytes(jobStage->getSinkContext()->getDatabaseId(),
1697  jobStage->getSinkContext()->getTypeId(),
1698  jobStage->getSinkContext()->getSetId(),
1699  numBytes,
1700  sendBuffer,
1701  false);
1702  }
1703  free(sendBuffer);
1704  numPages++;
1705  myMaps = nullptr;
1706  }
1707  if (output != nullptr) {
1708  free(output);
1709  output = nullptr;
1710  }
1711  std::cout << "HashPartitioned " << numPages << " pages to address: " << address
1712  << std::endl;
1713  std::cout << numMaps << " maps are written in total for partition-" << i << std::endl;
1714  if (i != myNodeId) {
1715  makeObjectAllocatorBlock(128 * 1024, true);
1716  sendData(communicator,
1717  nullptr,
1718  conf->getNetBroadcastPageSize(),
1719  jobStage->getSinkContext()->getDatabase(),
1720  jobStage->getSinkContext()->getSetName(),
1721  errMsg);
1722  }
1723 #ifdef PROFILING
1725  std::cout << "inactive blocks after sending data in this worker:" << std::endl;
1726  std::cout << out << std::endl;
1727 #endif
1728  getAllocator().cleanInactiveBlocks((size_t)((size_t)32 * (size_t)1024 * (size_t)1024));
1729  getAllocator().cleanInactiveBlocks((size_t)((size_t)128 * (size_t)1024 * (size_t)1024));
1730  getAllocator().cleanInactiveBlocks((size_t)(conf->getNetBroadcastPageSize()));
1731  callerBuzzer->buzz(PDBAlarm::WorkAllDone, shuffleCounter);
1732  }
1733 
1734  );
1735  worker->execute(myWork, shuffleBuzzer);
1736  }
1737  SetSpecifierPtr outputSet =
1738  make_shared<SetSpecifier>(jobStage->getSinkContext()->getDatabase(),
1739  jobStage->getSinkContext()->getSetName(),
1740  jobStage->getSinkContext()->getDatabaseId(),
1741  jobStage->getSinkContext()->getTypeId(),
1742  jobStage->getSinkContext()->getSetId(),
1743  jobStage->getSinkContext()->getPageSize());
1744  runPipeline(server, shuffleBuffers, outputSet);
1745  int k;
1746  for (k = 0; k < numNodes; k++) {
1747  PageCircularBufferPtr buffer = shuffleBuffers[k];
1748  buffer->close();
1749  }
1750 
1751  while (shuffleCounter < numNodes) {
1752  shuffleBuzzer->wait();
1753  }
1754 
1755  shuffleCounter = 0;
1756  return;
1757 }
1758 }
1759 #endif
shared_ptr< PageScanner > PageScannerPtr
Definition: PageScanner.h:39
std::string printCurrentBlock()
Definition: Allocator.cc:876
std::shared_ptr< AbstractHashSet > AbstractHashSetPtr
unsigned int SetID
Definition: DataTypes.h:31
shared_ptr< PDBPage > PDBPagePtr
Definition: PDBPage.h:32
bool storeCompressedShuffleData(char *bytes, size_t numBytes, std::string databaseName, std::string setName, std::string address, int port, std::string &errMsg)
Handle< ObjType > getRootObject()
Definition: Record.cc:46
void runPipelineWithShuffleSink(HermesExecutionServer *server)
void runPipeline(HermesExecutionServer *server, std::vector< PageCircularBufferPtr > combinerBuffers, SetSpecifierPtr outputSet)
shared_ptr< DataProxy > DataProxyPtr
Definition: DataProxy.h:30
std::shared_ptr< SinkShuffler > SinkShufflerPtr
Definition: SinkShuffler.h:31
Functionality & getFunctionality()
void setPolicy(AllocatorPolicy policy)
Definition: Allocator.cc:536
bool storeShuffleData(Handle< Vector< Handle< Object >>> data, std::string databaseName, std::string setName, std::string address, int port, bool whetherToPersiste, std::string &errMsg)
unsigned int NodeID
Definition: DataTypes.h:27
Handle< TupleSetJobStage > & getJobStage()
std::vector< PageCircularBufferIteratorPtr > getUserSetIterators(HermesExecutionServer *server, int numThreads, bool &success, std::string &errMsg)
Allocator & getAllocator()
Definition: Allocator.cc:943
std::shared_ptr< Pipeline > PipelinePtr
Definition: Pipeline.h:314
bool setCurPageScanner(PageScannerPtr curPageScanner)
shared_ptr< PDBWork > PDBWorkPtr
Definition: PDBWork.h:47
Handle< TupleSetJobStage > jobStage
Definition: PipelineStage.h:68
bool sendData(PDBCommunicatorPtr conn, void *bytes, size_t size, std::string databaseName, std::string setName, std::string &errMsg)
shared_ptr< SharedMem > SharedMemPtr
Definition: SharedMem.h:32
std::shared_ptr< PartitionedHashSet > PartitionedHashSetPtr
unsigned int DatabaseID
Definition: DataTypes.h:29
void feedSharedBuffers(HermesExecutionServer *server, std::vector< PageCircularBufferPtr > &sourceBuffers, int numPartitions, int &counter, PDBBuzzerPtr tempBuzzer, bool &success, std::string &errMsg)
Record< ObjType > * getRecord(Handle< ObjType > &forMe)
std::shared_ptr< SharedHashSet > SharedHashSetPtr
Definition: SharedHashSet.h:26
std::shared_ptr< PDBCommunicator > PDBCommunicatorPtr
unsigned int PageID
Definition: DataTypes.h:26
void runPipelineWithBroadcastSink(HermesExecutionServer *server)
PDBLoggerPtr logger
Definition: PipelineStage.h:80
std::shared_ptr< SimpleSingleTableQueryProcessor > SimpleSingleTableQueryProcessorPtr
void executePipelineWork(int i, SetSpecifierPtr outputSet, std::vector< PageCircularBufferIteratorPtr > &iterators, PartitionedHashSetPtr hashSet, DataProxyPtr proxy, std::vector< PageCircularBufferPtr > &sinkBuffers, HermesExecutionServer *server, std::string &errMsg)
size_t numBytes()
Definition: Record.cc:36
SharedMemPtr shm
Definition: PipelineStage.h:86
shared_ptr< PDBBuzzer > PDBBuzzerPtr
Definition: PDBBuzzer.h:32
#define PDB_COUT
Definition: PDBDebug.h:31
shared_ptr< Configuration > ConfigurationPtr
Definition: Configuration.h:89
shared_ptr< PDBWorker > PDBWorkerPtr
Definition: PDBWorker.h:40
PDBAlarm
Definition: PDBAlarm.h:28
size_t size() const
Definition: PDBVector.cc:67
std::shared_ptr< PDBLogger > PDBLoggerPtr
Definition: PDBLogger.h:40
size_t getBackendCircularBufferSize(bool &success, std::string &errMsg)
void runPipelineWithHashPartitionSink(HermesExecutionServer *server)
std::shared_ptr< SetSpecifier > SetSpecifierPtr
Definition: SetSpecifier.h:28
shared_ptr< PageCircularBuffer > PageCircularBufferPtr
void makeObjectAllocatorBlock(size_t numBytesIn, bool throwExceptionOnFail)
ConfigurationPtr conf
Definition: PipelineStage.h:83
std::vector< int > nodeIds
Definition: PipelineStage.h:92
shared_ptr< PageCircularBufferIterator > PageCircularBufferIteratorPtr
AbstractHashSetPtr getHashSet(std::string name)
DataProxyPtr createProxy(int i, pthread_mutex_t connection_mutex, std::string &errMsg)
unsigned int UserTypeID
Definition: DataTypes.h:25
PipelineStage(Handle< TupleSetJobStage > stage, SharedMemPtr shm, PDBLoggerPtr logger, ConfigurationPtr conf, NodeID nodeId, size_t batchSize, int numThreads)
#define DEFAULT_NET_PAGE_SIZE
Definition: Configuration.h:44
std::string printInactiveBlocks()
Definition: Allocator.cc:888