A platform for high-performance distributed tool and library development written in C++. It can be deployed in two different cluster modes: standalone or distributed. API for v0.5.0, released on June 13, 2018.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
QuerySchedulerServer.cc
Go to the documentation of this file.
1 /*****************************************************************************
2  * *
3  * Copyright 2018 Rice University *
4  * *
5  * Licensed under the Apache License, Version 2.0 (the "License"); *
6  * you may not use this file except in compliance with the License. *
7  * You may obtain a copy of the License at *
8  * *
9  * http://www.apache.org/licenses/LICENSE-2.0 *
10  * *
11  * Unless required by applicable law or agreed to in writing, software *
12  * distributed under the License is distributed on an "AS IS" BASIS, *
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. *
14  * See the License for the specific language governing permissions and *
15  * limitations under the License. *
16  * *
17  *****************************************************************************/
18 #ifndef QUERY_SCHEDULER_SERVER_CC
19 #define QUERY_SCHEDULER_SERVER_CC
20 
21 
22 #include "PDBDebug.h"
23 #include "InterfaceFunctions.h"
24 #include "QuerySchedulerServer.h"
25 #include "ResourceManagerServer.h"
26 #include "SimpleRequestHandler.h"
27 #include "GenericWork.h"
28 #include "StorageCollectStats.h"
30 #include "Profiling.h"
31 #include "RegisterReplica.h"
32 #include <ctime>
33 #include <chrono>
36 
37 namespace pdb {
38 
40  pthread_mutex_destroy(&connection_mutex);
41 }
42 
44  ConfigurationPtr conf,
45  std::shared_ptr<StatisticsDB> statisticsDB,
46  bool pseudoClusterMode,
47  double partitionToCoreRatio) {
48  pthread_mutex_init(&connection_mutex, nullptr);
49 
50  this->port = 8108;
51  this->logger = logger;
52  this->conf = conf;
53  this->statisticsDB = statisticsDB;
54  this->pseudoClusterMode = pseudoClusterMode;
55  this->partitionToCoreRatio = partitionToCoreRatio;
56  this->statsForOptimization = nullptr;
57  this->standardResources = nullptr;
58 }
59 
60 
62  PDBLoggerPtr logger,
63  ConfigurationPtr conf,
64  std::shared_ptr<StatisticsDB> statisticsDB,
65  bool pseudoClusterMode,
66  double partitionToCoreRatio) {
67  pthread_mutex_init(&connection_mutex, nullptr);
68 
69  this->port = port;
70  this->logger = logger;
71  this->conf = conf;
72  this->statisticsDB = statisticsDB;
73  this->pseudoClusterMode = pseudoClusterMode;
74  this->partitionToCoreRatio = partitionToCoreRatio;
75  this->statsForOptimization = nullptr;
76  this->standardResources = nullptr;
77 }
78 
80 
81  // delete standard resources if they exist and set them to null
82  delete this->standardResources;
83  this->standardResources = nullptr;
84 
85  // clean the list of intermediate sets that need to be removed
86  for (auto &interGlobalSet : interGlobalSets) {
87  interGlobalSet = nullptr;
88  }
89  this->interGlobalSets.clear();
90 }
91 
93 
94  // remove the standard resources if there are any
95  delete this->standardResources;
96  this->standardResources = new std::vector<StandardResourceInfoPtr>();
97 
98  // depending of whether we are running in pseudo cluster mode
99  // or not we need to grab the standard resources from a different place
100  if (!pseudoClusterMode) {
102  } else {
104  }
105 }
106 
108 
109  // all the stuff we create will be stored here
110  const UseTemporaryAllocationBlock block(2 * 1024 * 1024);
111 
112  PDB_COUT << "To get the node object from the resource manager" << std::endl;
113  auto nodeObjects = getFunctionality<ResourceManagerServer>().getAllNodes();
114 
115  // add and print out the resources
116  for (int i = 0; i < nodeObjects->size(); i++) {
117 
118  PDB_COUT << i << ": address=" << (*(nodeObjects))[i]->getAddress()
119  << ", port=" << (*(nodeObjects))[i]->getPort()
120  << ", node=" << (*(nodeObjects))[i]->getNodeId() << std::endl;
121  StandardResourceInfoPtr currentResource =
122  std::make_shared<StandardResourceInfo>(DEFAULT_NUM_CORES / (nodeObjects->size()),
123  DEFAULT_MEM_SIZE / (nodeObjects->size()),
124  (*(nodeObjects))[i]->getAddress().c_str(),
125  (*(nodeObjects))[i]->getPort(),
126  (*(nodeObjects))[i]->getNodeId());
127  this->standardResources->push_back(currentResource);
128  }
129 }
130 
132 
133  // all the stuff we create will be stored here
134  const UseTemporaryAllocationBlock block(2 * 1024 * 1024);
135 
136  PDB_COUT << "To get the resource object from the resource manager" << std::endl;
137  auto resourceObjects = getFunctionality<ResourceManagerServer>().getAllResources();
138 
139  // add and print out the resources
140  for (int i = 0; i < resourceObjects->size(); i++) {
141 
142  PDB_COUT << i << ": address=" << (*(resourceObjects))[i]->getAddress()
143  << ", port=" << (*(resourceObjects))[i]->getPort()
144  << ", node=" << (*(resourceObjects))[i]->getNodeId()
145  << ", numCores=" << (*(resourceObjects))[i]->getNumCores()
146  << ", memSize=" << (*(resourceObjects))[i]->getMemSize() << std::endl;
147  StandardResourceInfoPtr currentResource = std::make_shared<StandardResourceInfo>(
148  (*(resourceObjects))[i]->getNumCores(),
149  (*(resourceObjects))[i]->getMemSize(),
150  (*(resourceObjects))[i]->getAddress().c_str(),
151  (*(resourceObjects))[i]->getPort(),
152  (*(resourceObjects))[i]->getNodeId());
153  this->standardResources->push_back(currentResource);
154  }
155 }
156 
157 
159  return statsForOptimization;
160 }
161 
162 
164  std::shared_ptr<ShuffleInfo> shuffleInfo) {
165 
166  int counter = 0;
167 
168  // create the buzzer
169  PDBBuzzerPtr tempBuzzer = make_shared<PDBBuzzer>([&](PDBAlarm myAlarm, int& cnt) {
170  cnt++;
171  PDB_COUT << "counter = " << cnt << std::endl;
172  });
173 
174  // go though all the stages and send them to every node
175  for (auto &stage : stagesToSchedule) {
176  for (unsigned long node = 0; node < shuffleInfo->getNumNodes(); node++) {
177 
178  // grab a worker
179  PDBWorkerPtr myWorker = getWorker();
180 
181  // create some work for it
182  PDBWorkPtr myWork = make_shared<GenericWork>([&, node](PDBBuzzerPtr callerBuzzer) {
183  prepareAndScheduleStage(stage, node, counter, callerBuzzer);
184  });
185 
186  // execute the work
187  myWorker->execute(myWork, tempBuzzer);
188  }
189 
190  // wait until all the nodes are finished
191  while (counter < shuffleInfo->getNumNodes()) {
192  tempBuzzer->wait();
193  }
194 
195  // reset the counter for the next stage
196  counter = 0;
197  }
198 }
199 
201  unsigned long node,
202  int &counter,
203  PDBBuzzerPtr &callerBuzzer){
204  // this is where all the stuff we create will be stored (the deep copy of the stage)
205  const UseTemporaryAllocationBlock block(256 * 1024 * 1024);
206 
207  // grab the port and the address of the node node from the standard resources
208  int port = this->standardResources->at(node)->getPort();
209  std::string ip = this->standardResources->at(node)->getAddress();
210 
212 
213  // create PDBCommunicator
214  PDBCommunicatorPtr communicator = getCommunicatorToNode(port, ip);
215 
216  // if we failed to acquire a communicator to the node signal a failure and finish
217  if(communicator == nullptr) {
218  callerBuzzer->buzz(PDBAlarm::GenericError, counter);
219  return;
220  }
221 
222  // figure out what kind of stage it is and schedule it
223  bool success;
224  switch (stage->getJobStageTypeID()) {
225  case TupleSetJobStage_TYPEID : {
226  Handle<TupleSetJobStage> tupleSetStage = unsafeCast<TupleSetJobStage, AbstractJobStage>(stage);
227  success = scheduleStage(node, tupleSetStage, communicator);
228  break;
229  }
230  case AggregationJobStage_TYPEID : {
231  Handle<AggregationJobStage> aggStage = unsafeCast<AggregationJobStage, AbstractJobStage>(stage);
232 
233  // TODO this is bad, concurrent modification need to move it to the right place!
234  aggStage->setAggTotalPartitions(shuffleInfo->getNumHashPartitions());
235  aggStage->setAggBatchSize(DEFAULT_BATCH_SIZE);
236  success = scheduleStage(node, aggStage, communicator);
237  break;
238  }
239  case BroadcastJoinBuildHTJobStage_TYPEID : {
240  Handle<BroadcastJoinBuildHTJobStage> broadcastJoinStage =
241  unsafeCast<BroadcastJoinBuildHTJobStage, AbstractJobStage>(stage);
242  success = scheduleStage(node, broadcastJoinStage, communicator);
243  break;
244  }
245  case HashPartitionedJoinBuildHTJobStage_TYPEID : {
246  Handle<HashPartitionedJoinBuildHTJobStage> hashPartitionedJoinStage =
247  unsafeCast<HashPartitionedJoinBuildHTJobStage, AbstractJobStage>(stage);
248  success = scheduleStage(node, hashPartitionedJoinStage, communicator);
249  break;
250  }
251  default: {
252  PDB_COUT << "Unrecognized job stage" << std::endl;
253  success = false;
254  break;
255  }
256  }
257 
258  PROFILER_END_MESSAGE(scheduleStage, "For stage : " << stage->getStageId() << " on node" << ip)
259 
260  // if we failed to execute the stage on the node node we signal a failure
261  if (!success) {
262  PDB_COUT << "Can't execute the " << stage->getJobStageType() << " with " << stage->getStageId()
263  << " on the " << std::to_string(node) << "-th node" << std::endl;
264  callerBuzzer->buzz(PDBAlarm::GenericError, counter);
265  return;
266  }
267 
268  // excellent everything worked just as expected
269  callerBuzzer->buzz(PDBAlarm::WorkAllDone, counter);
270 }
271 
273 
274  // lock the connection mutex so no other thread tries to connect to a node
275  pthread_mutex_lock(&connection_mutex);
276  PDBCommunicatorPtr communicator = std::make_shared<PDBCommunicator>();
277 
278  // log what we are doing
279  PDB_COUT << "Connecting to remote node connect to the remote node with address : " << ip << ":" << port << std::endl;
280 
281  // try to connect to the node
282  string errMsg;
283  bool failure = communicator->connectToInternetServer(logger, port, ip, errMsg);
284 
285  // we are don connecting somebody else can do it now
286  pthread_mutex_unlock(&connection_mutex);
287 
288  // if we succeeded return the communicator
289  if (!failure) {
290  return communicator;
291  }
292 
293  // otherwise log the error message
294  std::cout << errMsg << std::endl;
295 
296  // return a null pointer
297  return nullptr;
298 }
299 
300 
301 template<typename T>
302 bool QuerySchedulerServer::scheduleStage(unsigned long node,
303  Handle<T>& stage,
304  PDBCommunicatorPtr communicator){
305  bool success;
306  std::string errMsg;
307  PDB_COUT << "to send the job stage with id="
308  << stage->getStageId() << " to the " << node << "-th remote node" << std::endl;
309 
310  // the copy we are about to make will be stored here
311  const UseTemporaryAllocationBlock block(256 * 1024 * 1024);
312 
313  // get a copy of the stage, that is prepared to be sent
314  Handle<T> stageToSend = getStageToSend(node, stage);
315 
316  // send the stage to the execution server
317  success = communicator->sendObject<T>(stageToSend, errMsg);
318 
319  // check if we succeeded on doing that if not we have a problem
320  if (!success) {
321  std::cout << errMsg << std::endl;
322  return false;
323  }
324 
325  PDB_COUT << "to receive query response from the " << node << "-th remote node" << std::endl;
326  Handle<SetIdentifier> result = communicator->getNextObject<SetIdentifier>(success, errMsg);
327 
328  // check if we succeeded in executing the stage
329  if(result == nullptr) {
330  PDB_COUT << stage->getJobStageType() << "TupleSetJobStage execute failure: can't get results" << std::endl;
331  return false;
332  }
333 
334  // update the statistics based on the returned results
335  this->updateStats(result);
336  PDB_COUT << stage->getJobStageType() << " execute: wrote set:" << result->getDatabase()
337  << ":" << result->getSetName() << std::endl;
338 
339  return true;
340 }
341 
343  Handle<TupleSetJobStage> &stage) {
344 
345  // do a deep copy of the stage
346  Handle<TupleSetJobStage> stageToSend = deepCopyToCurrentAllocationBlock<TupleSetJobStage>(stage);
347 
348  // set the number of nodes and the number of hash partitions
349  stageToSend->setNumNodes(this->shuffleInfo->getNumNodes());
350  stageToSend->setNumTotalPartitions(this->shuffleInfo->getNumHashPartitions());
351 
352  // grab the partition IDs on each node
353  std::vector<std::vector<HashPartitionID>> standardPartitionIds = shuffleInfo->getPartitionIds();
354 
355  // copy the IDs into a new vector of vectors
356  Handle<Vector<Handle<Vector<HashPartitionID>>>> partitionIds = makeObject<Vector<Handle<Vector<HashPartitionID>>>>();
357  for (auto &standardPartitionId : standardPartitionIds) {
358  Handle<Vector<HashPartitionID>> nodePartitionIds = makeObject<Vector<HashPartitionID>>();
359  for (unsigned int id : standardPartitionId) {
360  nodePartitionIds->push_back(id);
361  }
362  partitionIds->push_back(nodePartitionIds);
363  }
364 
365  // set the memory on the node we want to send it
366  stageToSend->setTotalMemoryOnThisNode((size_t)(*(this->standardResources))[index]->getMemSize());
367 
368  // set the partition IDs for each node
369  stageToSend->setNumPartitions(partitionIds);
370 
371  // grab the addresses for each node
372  std::vector<std::string> standardAddresses = shuffleInfo->getAddresses();
373 
374  // go through each address and copy it into a vector of strings
375  Handle<Vector<String>> addresses = makeObject<Vector<String>>();
376  for (const auto &standardAddress : standardAddresses) {
377  addresses->push_back(String(standardAddress));
378  }
379 
380  // set the addresses of the nodes
381  stageToSend->setIPAddresses(addresses);
382  stageToSend->setNodeId(static_cast<NodeID>(index));
383 
384  return stageToSend;
385 }
386 
389 
390  // do a deep copy of the stage
391  Handle<AggregationJobStage> stageToSend =
392  deepCopyToCurrentAllocationBlock<AggregationJobStage>(stage);
393 
394  // figure out the number of partitions on the node we want to send it
395  auto numPartitionsOnThisNode = (int)((double)(standardResources->at(index)->getNumCores()) * partitionToCoreRatio);
396  if (numPartitionsOnThisNode == 0) {
397  numPartitionsOnThisNode = 1;
398  }
399 
400  // fill in the info about the node
401  stageToSend->setNumNodePartitions(numPartitionsOnThisNode);
402  stageToSend->setTotalMemoryOnThisNode((size_t)(*(this->standardResources))[index]->getMemSize());
403 
404  // TODO these two need to be relocated
405  stageToSend->setAggTotalPartitions(shuffleInfo->getNumHashPartitions());
406  stageToSend->setAggBatchSize(DEFAULT_BATCH_SIZE);
407 
408  return stageToSend;
409 }
410 
413 
414  // do a deep copy of the stage
416  deepCopyToCurrentAllocationBlock<BroadcastJoinBuildHTJobStage>(stage);
417 
418  // set the reference to the compute plan to null so it's not sent
419  stageToSend->nullifyComputePlanPointer();
420 
421  // set the memory on the node we want to send it
422  stageToSend->setTotalMemoryOnThisNode((size_t)(*(this->standardResources))[index]->getMemSize());
423 
424  return stageToSend;
425 }
426 
429 
430  // do a deep copy of the stage
432  deepCopyToCurrentAllocationBlock<HashPartitionedJoinBuildHTJobStage>(stage);
433 
434  // set the reference to the compute plan to null so it's not sent
435  stageToSend->nullifyComputePlanPointer();
436 
437  // figure out the number of partitions on the node we want to send it
438  auto numPartitionsOnThisNode = (int)((double)(standardResources->at(index)->getNumCores()) * partitionToCoreRatio);
439  if (numPartitionsOnThisNode == 0) {
440  numPartitionsOnThisNode = 1;
441  }
442 
443  // set the value we just calculated
444  stageToSend->setNumNodePartitions(numPartitionsOnThisNode);
445 
446  // set the memory on the node we want to send it
447  stageToSend->setTotalMemoryOnThisNode((size_t)(*(this->standardResources))[index]->getMemSize());
448 
449  return stageToSend;
450 }
451 
453 
454  // we use this variable to sync all the nodes
455  int counter = 0;
456 
457  // create the buzzer
458  PDBBuzzerPtr tempBuzzer = make_shared<PDBBuzzer>([&](PDBAlarm myAlarm, int& cnt) {
459  cnt++;
460  PDB_COUT << "counter = " << cnt << std::endl;
461  });
462 
463  // if the standard resources are for some reason not initialized initialize them
464  if (this->standardResources == nullptr) {
465  initialize();
466  }
467 
468  this->statsForOptimization = make_shared<Statistics>();
469 
470  // go through each node
471  for (int node = 0; node < this->standardResources->size(); node++) {
472 
473  // grab one worker
474  PDBWorkerPtr myWorker = getWorker();
475 
476  // make some work to collect the stats for the current node
477  PDBWorkPtr myWork = make_shared<GenericWork>([&, node](PDBBuzzerPtr callerBuzzer) {
478  collectStatsForNode(node, counter, callerBuzzer);
479  });
480 
481  // execute the work
482  myWorker->execute(myWork, tempBuzzer);
483  }
484 
485  // wait until everything is finished
486  while (counter < this->standardResources->size()) {
487  tempBuzzer->wait();
488  }
489 }
490 
492  int &counter,
493  PDBBuzzerPtr &callerBuzzer) {
494 
495  bool success;
496  std::string errMsg;
497 
498  // all the stuff that we create in this method will be stored here
499  const UseTemporaryAllocationBlock block(4 * 1024 * 1024);
500 
501  // grab the port and the ip of the node
502  int port = (*(this->standardResources))[node]->getPort();
503  std::string ip = (*(this->standardResources))[node]->getAddress();
504 
505  // create PDBCommunicator
506  PDBCommunicatorPtr communicator = getCommunicatorToNode(port, ip);
507 
508  // if we failed to acquire a communicator to the node signal a failure and finish
509  if(communicator == nullptr) {
510  callerBuzzer->buzz(PDBAlarm::GenericError, counter);
511  return;
512  }
513 
514  // make a request to remote server for the statistics
515  PDB_COUT << "About to collect stats on the " << node << "-th node" << std::endl;
516  requestStatistics(communicator, success, errMsg);
517 
518  // we failed to request print the reason for the failure and signal an error
519  if (!success) {
520  std::cout << errMsg << std::endl;
521  callerBuzzer->buzz(PDBAlarm::GenericError, counter);
522  return;
523  }
524 
525  // receive StorageCollectStatsResponse from remote server
526  PDB_COUT << "About to receive response from the " << node << "-th remote node" << std::endl;
527  Handle<StorageCollectStatsResponse> result = communicator->getNextObject<StorageCollectStatsResponse>(success,
528  errMsg);
529 
530  // we failed to receive the result, print out what happened and signal an error
531  if (!success || result == nullptr) {
532  PDB_COUT << "Can't get results from node with id=" << std::to_string(node) << " and ip=" << ip << std::endl;
533  callerBuzzer->buzz(PDBAlarm::GenericError, counter);
534  return;
535  }
536 
537  // update stats
538  Handle<Vector<Handle<SetIdentifier>>> stats = result->getStats();
539  for (int j = 0; j < stats->size(); j++) {
540  this->updateStats((*stats)[j]);
541  }
542 
543  // lose the reference to the result
544  result = nullptr;
545 
546  // great! we succeeded in collecting the statistics for this node signal that
547  callerBuzzer->buzz(PDBAlarm::WorkAllDone, counter);
548 }
549 
550 void QuerySchedulerServer::requestStatistics(PDBCommunicatorPtr &communicator, bool &success, string &errMsg) const {
551  Handle<StorageCollectStats> collectStatsMsg = makeObject<StorageCollectStats>();
552  success = communicator->sendObject<StorageCollectStats>(collectStatsMsg, errMsg);
553 }
554 
556 
557  // grab the database name and the name of the set we are updating
558  std::string databaseName = setToUpdateStats->getDatabase();
559  std::string setName = setToUpdateStats->getSetName();
560 
561  // figure out the values we need tu updated
562  size_t numPages = setToUpdateStats->getNumPages();
563  size_t pageSize = setToUpdateStats->getPageSize();
564  size_t numBytes = numPages * pageSize;
565 
566  // update the statistics
567  statsForOptimization->setPageSize(databaseName, setName, pageSize);
568  statsForOptimization->incrementNumPages(databaseName, setName, numPages);
569  statsForOptimization->incrementNumBytes(databaseName, setName, numBytes);
570 }
571 
572 
574 
575  // handler to schedule a Computation-based query graph
576  forMe.registerHandler(
577  ExecuteComputation_TYPEID,
579  [&](Handle<ExecuteComputation> request, PDBCommunicatorPtr sendUsingMe) {
580  return executeComputation(request, sendUsingMe);
581  }));
582 
583  // handler to register a replica
584  forMe.registerHandler(
585  RegisterReplica_TYPEID,
587  [&](Handle<RegisterReplica> request, PDBCommunicatorPtr sendUsingMe) {
588  request->print();
589  return registerReplica(request, sendUsingMe);
590  }));
591 
592 
593 }
594 
595 pair<bool, basic_string<char>> QuerySchedulerServer::registerReplica(Handle<RegisterReplica> &request,
596  PDBCommunicatorPtr &sendUsingMe) {
597 
598  const UseTemporaryAllocationBlock block{256 * 1024 * 1024};
599  long input_data_id = this->statisticsDB->getLatestDataId(std::pair<std::string, std::string> (request->getInputDatabaseName(),
600  request->getInputSetName()));
601  long output_data_id = this->statisticsDB->getLatestDataId(std::pair<std::string, std::string>(request->getOutputDatabaseName(),
602  request->getOutputSetName()));
603  long id = -1;
604 
605  Handle<Vector<Handle<Computation>>> computations = makeObject<Vector<Handle<Computation>>>();
606  computations = request->getComputations();
607  std::cout << "there are " << computations->size() << " computations in total" << std::endl;
608 
609  bool success = this->statisticsDB->createDataTransformation(input_data_id,
610  output_data_id,
611  request->getNumPartitions(),
612  request->getNumNodes(),
613  request->getReplicaType(),
614  request->getTCAPString(),
615  computations,
616  id);
617  std::string errMsg = "";
618  if (success == false) {
619  errMsg = "error in register replica";
620  } else {
621  std::cout << "registered the input-output mapping with statisticsDB for id = " << id << std::endl;
622  }
623  // notify the client that we succeeded
624  PDB_COUT << "About to send back response to client" << std::endl;
625  Handle<SimpleRequestResult> result = makeObject<SimpleRequestResult>(success, errMsg);
626  if (!sendUsingMe->sendObject(result, errMsg)) {
627  errMsg = "error in sending object to client";
628  return std::make_pair(false, errMsg);
629  }
630  return std::make_pair(true, errMsg);
631 }
632 
634  PDBCommunicatorPtr &sendUsingMe) {
635  // all the stuff will be allocated here
636  const UseTemporaryAllocationBlock block{256 * 1024 * 1024};
637 
638  std::string errMsg;
639  bool success;
640 
641  // parse the query
642  PDB_COUT << "Got the ExecuteComputation object" << std::endl;
643  Handle<Vector<Handle<Computation>>> computations = sendUsingMe->getNextObject<Vector<Handle<Computation>>>(success,
644  errMsg);
645  // we create a new jobID
646  this->jobId = this->getNextJobId();
647 
648  // use that jobID to create a database for the job
649  DistributedStorageManagerClient dsmClient(this->port, "localhost", logger);
650  if(!dsmClient.createDatabase(this->jobId, errMsg)) {
651  PDB_COUT << "Could not crate a database for " << this->jobId << ", cleaning up!" << std::endl;
652  getFunctionality<QuerySchedulerServer>().cleanup();
653  return std::make_pair(false, errMsg);
654  }
655 
656  // initialize the standard resources from the resource manager
657  PDB_COUT << "To get the resource object from the resource manager" << std::endl;
658  getFunctionality<QuerySchedulerServer>().initialize();
659 
660 
661  // create the shuffle info (just combine the standard resources with the partition to core ration) TODO ask Jia if this is really necessary
662  this->shuffleInfo = std::make_shared<ShuffleInfo>(this->standardResources, this->partitionToCoreRatio);
663 
664  // if we don't have the information about the sets we ask every node to submit them
665  if (this->statsForOptimization == nullptr) {
666  this->collectStats();
667  }
668 
669  try {
670  // parse the plan and initialize the values we need
671  Handle<ComputePlan> computePlan = makeObject<ComputePlan>(String(request->getTCAPString()), *computations);
672  LogicalPlanPtr logicalPlan = computePlan->getPlan();
673  AtomicComputationList computationGraph = logicalPlan->getComputations();
674  auto sourcesComputations = computationGraph.getAllScanSets();
675 
676  // this is the tcap analyzer node factory we want to use create the graph for the physical analysis
677  AbstractPhysicalNodeFactoryPtr analyzerNodeFactory = make_shared<SimplePhysicalNodeFactory>(jobId,
678  computePlan,
679  conf);
680 
681  // generate the analysis graph (it is a list of source nodes for that graph)
682  auto graph = analyzerNodeFactory->generateAnalyzerGraph(sourcesComputations);
683 
684  // initialize the physicalAnalyzer - used to generate the pipelines and pipeline stages we need to execute
685  this->physicalOptimizerPtr = make_shared<PhysicalOptimizer>(graph, this->logger);
686  }
687  catch (pdb::NotEnoughSpace &n) {
688 
689  // cleanup since we failed to parse the plan
690  PDB_COUT << "Could not parse the compute plan. About to cleanup" << std::endl;
691  getFunctionality<QuerySchedulerServer>().cleanup();
692  return std::make_pair(false, "Could not parse the compute plan. About to cleanup");
693  }
694 
695  int jobStageId = 0;
696  while (this->physicalOptimizerPtr->hasSources()) {
697 
698  std::vector<Handle<AbstractJobStage>> jobStages;
699  std::vector<Handle<SetIdentifier>> intermediateSets;
700 
702  PROFILER_START(physicalPlanning)
703 
704  extractPipelineStages(jobStageId, jobStages, intermediateSets);
705 
706  PROFILER_END(physicalPlanning)
707 
708 
710 
711  createIntermediateSets(dsmClient, intermediateSets);
712 
714 
715 
717 
718  PDB_COUT << "To schedule the query to run on the cluster" << std::endl;
719  getFunctionality<QuerySchedulerServer>().scheduleStages(jobStages, shuffleInfo);
720 
722 
723  // removes the intermediate sets we don't anymore to continue the execution
724  removeUnusedIntermediateSets(dsmClient, intermediateSets);
725  }
726 
727  // removes the rest of the intermediate sets
728  PDB_COUT << "About to remove intermediate sets" << endl;
729  removeIntermediateSets(dsmClient);
730 
731  // notify the client that we succeeded
732  PDB_COUT << "About to send back response to client" << std::endl;
733  Handle<SimpleRequestResult> result = makeObject<SimpleRequestResult>(success, errMsg);
734 
735  if (!sendUsingMe->sendObject(result, errMsg)) {
736  PDB_COUT << "About to cleanup" << std::endl;
737  getFunctionality<QuerySchedulerServer>().cleanup();
738  return std::make_pair(false, errMsg);
739  }
740 
741  PDB_COUT << "About to cleanup" << std::endl;
742  getFunctionality<QuerySchedulerServer>().cleanup();
743  return std::make_pair(true, errMsg);
744 }
745 
747  vector<Handle<SetIdentifier>> &intermediateSets) {
748 
749  // to remove the intermediate sets:
750  for (auto &intermediateSet : intermediateSets) {
751 
752  // check whether intermediateSet is a source set and has consumers
753  if (this->physicalOptimizerPtr->hasConsumers(intermediateSet)) {
754 
755  // if it does then we need to remember this set and not remove it, because it will be used later
756  this->interGlobalSets.push_back(intermediateSet);
757  continue;
758  }
759 
760  // check if we failed, if we did log it and continue to the next set
761  std::string errMsg;
762  bool res = dsmClient.removeTempSet(intermediateSet->getDatabase(),
763  intermediateSet->getSetName(),
764  "IntermediateData",
765  errMsg);
766 
767  // check if we failed, if we did log it and continue to the next set
768  if (!res) {
769  std::cout << "can't remove temp set: " << errMsg << std::endl;
770  continue;
771  }
772 
773  // we succeeded in removing the set, log that
774  std::cout << "Removed set with database=" << intermediateSet->getDatabase() << ", set="
775  << intermediateSet->getSetName() << std::endl;
776  }
777 }
778 
780 
781  // go through the remaining intermediate sets and remove them
782  for (const auto &intermediateSet : interGlobalSets) {
783 
784  // send a request to the DistributedStorageManagerClient to remove it
785  string errMsg;
786  bool res = dsmClient.removeTempSet(intermediateSet->getDatabase(),
787  intermediateSet->getSetName(),
788  "IntermediateData",
789  errMsg);
790 
791  // check if we failed, if we did log it and continue to the next set
792  if (!res) {
793  cout << "Can not remove temp set: " << errMsg << endl;
794  continue;
795  }
796 
797  // we succeeded in removing the set, log that
798  PDB_COUT << "Removed set with database=" << intermediateSet->getDatabase() << ", set="
799  << intermediateSet->getSetName() << endl;
800  }
801 }
802 
804  vector<Handle<SetIdentifier>> &intermediateSets) {
805 
806  // go through each intermediate set list and create them
807  for (const auto &intermediateSet : intermediateSets) {
808 
809  // send a request to the DistributedStorageManagerClient to remove it
810  string errMsg;
811  bool res = dsmClient.createTempSet(intermediateSet->getDatabase(),
812  intermediateSet->getSetName(),
813  "IntermediateData",
814  errMsg,
815  intermediateSet->getPageSize());
816 
817  // check if we failed, if we did log it and continue to the next set
818  if (!res) {
819  cout << "Can not create temp set: " << errMsg << endl;
820  continue;
821  }
822 
823  // great everything went well log that...
824  PDB_COUT << "Created set with database=" << intermediateSet->getDatabase() << ", set="
825  << intermediateSet->getSetName() << endl;
826  }
827 }
828 
830  vector<Handle<AbstractJobStage>> &jobStages,
831  vector<Handle<SetIdentifier>> &intermediateSets) {
832 
833  // try to get a sequence of stages, if we have any sources left
834  int idx = 0;
835  bool success = false;
836  while (this->physicalOptimizerPtr->hasSources() && !success) {
837 
838  // get the next sequence of stages returns false if it selects the wrong source, and needs to retry it
839  success = this->physicalOptimizerPtr->getNextStagesOptimized(jobStages,
840  intermediateSets,
842  jobStageId);
843 
844  std::cout << idx << std::endl;
845  }
846 }
847 
848 }
849 
850 
851 #endif
#define PROFILER_END(id)
Definition: Profiling.h:58
std::shared_ptr< Statistics > StatisticsPtr
Definition: Statistics.h:27
void scheduleStages(std::vector< Handle< AbstractJobStage >> &stagesToSchedule, std::shared_ptr< ShuffleInfo > shuffleInfo)
Handle< TupleSetJobStage > getStageToSend(unsigned long index, Handle< TupleSetJobStage > &stage)
std::shared_ptr< PhysicalOptimizer > physicalOptimizerPtr
void removeUnusedIntermediateSets(DistributedStorageManagerClient &dsmClient, vector< Handle< SetIdentifier >> &intermediateSets)
std::vector< StandardResourceInfoPtr > * standardResources
std::shared_ptr< ShuffleInfo > shuffleInfo
pair< bool, basic_string< char > > executeComputation(Handle< ExecuteComputation > &request, PDBCommunicatorPtr &sendUsingMe)
pair< bool, basic_string< char > > registerReplica(Handle< RegisterReplica > &request, PDBCommunicatorPtr &sendUsingMe)
#define DEFAULT_NUM_CORES
Definition: Configuration.h:85
void extractPipelineStages(int &jobStageId, vector< Handle< AbstractJobStage >> &jobStages, vector< Handle< SetIdentifier >> &intermediateSets)
shared_ptr< PDBWork > PDBWorkPtr
Definition: PDBWork.h:47
std::shared_ptr< LogicalPlan > LogicalPlanPtr
Definition: ComputePlan.h:36
bool createTempSet(const std::string &databaseName, const std::string &setName, const std::string &typeName, std::string &errMsg, size_t pageSize=DEFAULT_PAGE_SIZE)
std::shared_ptr< StandardResourceInfo > StandardResourceInfoPtr
std::shared_ptr< PDBCommunicator > PDBCommunicatorPtr
bool removeTempSet(const std::string &databaseName, const std::string &setName, const std::string &typeName, std::string &errMsg)
bool scheduleStage(unsigned long node, Handle< T > &stage, PDBCommunicatorPtr communicator)
std::vector< Handle< SetIdentifier > > interGlobalSets
void registerHandlers(PDBServer &forMe) override
#define DEFAULT_BATCH_SIZE
Definition: Configuration.h:71
shared_ptr< PDBBuzzer > PDBBuzzerPtr
Definition: PDBBuzzer.h:32
#define PDB_COUT
Definition: PDBDebug.h:31
shared_ptr< Configuration > ConfigurationPtr
Definition: Configuration.h:89
PDBCommunicatorPtr getCommunicatorToNode(int port, std::string &ip)
shared_ptr< PDBWorker > PDBWorkerPtr
Definition: PDBWorker.h:40
QuerySchedulerServer(PDBLoggerPtr logger, ConfigurationPtr conf, std::shared_ptr< StatisticsDB > statisticsDB, bool pseudoClusterMode=false, double partitionToCoreRatio=0.75)
#define DEFAULT_MEM_SIZE
Definition: Configuration.h:81
void registerHandler(int16_t typeID, PDBCommWorkPtr handledBy)
Definition: PDBServer.cc:81
void requestStatistics(PDBCommunicatorPtr &communicator, bool &success, string &errMsg) const
std::shared_ptr< AbstractPhysicalNodeFactory > AbstractPhysicalNodeFactoryPtr
void prepareAndScheduleStage(Handle< AbstractJobStage > &stage, unsigned long node, int &counter, PDBBuzzerPtr &callerBuzzer)
PDBAlarm
Definition: PDBAlarm.h:28
void createIntermediateSets(DistributedStorageManagerClient &dsmClient, vector< Handle< SetIdentifier >> &intermediateSets)
#define PROFILER_END_MESSAGE(id, message)
Definition: Profiling.h:63
bool createDatabase(const std::string &databaseName, std::string &errMsg)
std::shared_ptr< PDBLogger > PDBLoggerPtr
Definition: PDBLogger.h:40
std::vector< AtomicComputationPtr > & getAllScanSets()
void collectStatsForNode(int node, int &counter, PDBBuzzerPtr &callerBuzzer)
std::shared_ptr< StatisticsDB > statisticsDB
void removeIntermediateSets(DistributedStorageManagerClient &dsmClient)
#define PROFILER_START(id)
Definition: Profiling.h:52
void updateStats(Handle< SetIdentifier > setToUpdateStats)