18 #ifndef QUERY_SCHEDULER_SERVER_CC
19 #define QUERY_SCHEDULER_SERVER_CC
45 std::shared_ptr<StatisticsDB> statisticsDB,
46 bool pseudoClusterMode,
47 double partitionToCoreRatio) {
64 std::shared_ptr<StatisticsDB> statisticsDB,
65 bool pseudoClusterMode,
66 double partitionToCoreRatio) {
87 interGlobalSet =
nullptr;
89 this->interGlobalSets.clear();
112 PDB_COUT <<
"To get the node object from the resource manager" << std::endl;
113 auto nodeObjects = getFunctionality<ResourceManagerServer>().getAllNodes();
116 for (
int i = 0; i < nodeObjects->size(); i++) {
118 PDB_COUT << i <<
": address=" << (*(nodeObjects))[i]->getAddress()
119 <<
", port=" << (*(nodeObjects))[i]->getPort()
120 <<
", node=" << (*(nodeObjects))[i]->getNodeId() << std::endl;
122 std::make_shared<StandardResourceInfo>(
DEFAULT_NUM_CORES / (nodeObjects->size()),
124 (*(nodeObjects))[i]->getAddress().c_str(),
125 (*(nodeObjects))[i]->getPort(),
126 (*(nodeObjects))[i]->getNodeId());
136 PDB_COUT <<
"To get the resource object from the resource manager" << std::endl;
137 auto resourceObjects = getFunctionality<ResourceManagerServer>().getAllResources();
140 for (
int i = 0; i < resourceObjects->size(); i++) {
142 PDB_COUT << i <<
": address=" << (*(resourceObjects))[i]->getAddress()
143 <<
", port=" << (*(resourceObjects))[i]->getPort()
144 <<
", node=" << (*(resourceObjects))[i]->getNodeId()
145 <<
", numCores=" << (*(resourceObjects))[i]->getNumCores()
146 <<
", memSize=" << (*(resourceObjects))[i]->getMemSize() << std::endl;
148 (*(resourceObjects))[i]->getNumCores(),
149 (*(resourceObjects))[i]->getMemSize(),
150 (*(resourceObjects))[i]->getAddress().c_str(),
151 (*(resourceObjects))[i]->getPort(),
152 (*(resourceObjects))[i]->getNodeId());
164 std::shared_ptr<ShuffleInfo> shuffleInfo) {
171 PDB_COUT <<
"counter = " << cnt << std::endl;
175 for (
auto &stage : stagesToSchedule) {
176 for (
unsigned long node = 0; node < shuffleInfo->getNumNodes(); node++) {
187 myWorker->execute(myWork, tempBuzzer);
191 while (counter < shuffleInfo->getNumNodes()) {
217 if(communicator ==
nullptr) {
224 switch (stage->getJobStageTypeID()) {
225 case TupleSetJobStage_TYPEID : {
230 case AggregationJobStage_TYPEID : {
234 aggStage->setAggTotalPartitions(
shuffleInfo->getNumHashPartitions());
239 case BroadcastJoinBuildHTJobStage_TYPEID : {
241 unsafeCast<BroadcastJoinBuildHTJobStage, AbstractJobStage>(stage);
242 success =
scheduleStage(node, broadcastJoinStage, communicator);
245 case HashPartitionedJoinBuildHTJobStage_TYPEID : {
247 unsafeCast<HashPartitionedJoinBuildHTJobStage, AbstractJobStage>(stage);
248 success =
scheduleStage(node, hashPartitionedJoinStage, communicator);
252 PDB_COUT <<
"Unrecognized job stage" << std::endl;
262 PDB_COUT <<
"Can't execute the " << stage->getJobStageType() <<
" with " << stage->getStageId()
263 <<
" on the " << std::to_string(node) <<
"-th node" << std::endl;
279 PDB_COUT <<
"Connecting to remote node connect to the remote node with address : " << ip <<
":" << port << std::endl;
283 bool failure = communicator->connectToInternetServer(
logger, port, ip, errMsg);
294 std::cout << errMsg << std::endl;
307 PDB_COUT <<
"to send the job stage with id="
308 << stage->getStageId() <<
" to the " << node <<
"-th remote node" << std::endl;
317 success = communicator->sendObject<T>(stageToSend, errMsg);
321 std::cout << errMsg << std::endl;
325 PDB_COUT <<
"to receive query response from the " << node <<
"-th remote node" << std::endl;
329 if(result ==
nullptr) {
330 PDB_COUT << stage->getJobStageType() <<
"TupleSetJobStage execute failure: can't get results" << std::endl;
336 PDB_COUT << stage->getJobStageType() <<
" execute: wrote set:" << result->getDatabase()
337 <<
":" << result->getSetName() << std::endl;
349 stageToSend->setNumNodes(this->
shuffleInfo->getNumNodes());
350 stageToSend->setNumTotalPartitions(this->
shuffleInfo->getNumHashPartitions());
353 std::vector<std::vector<HashPartitionID>> standardPartitionIds =
shuffleInfo->getPartitionIds();
357 for (
auto &standardPartitionId : standardPartitionIds) {
359 for (
unsigned int id : standardPartitionId) {
360 nodePartitionIds->push_back(
id);
362 partitionIds->push_back(nodePartitionIds);
366 stageToSend->setTotalMemoryOnThisNode((
size_t)(*(this->
standardResources))[index]->getMemSize());
369 stageToSend->setNumPartitions(partitionIds);
372 std::vector<std::string> standardAddresses =
shuffleInfo->getAddresses();
376 for (
const auto &standardAddress : standardAddresses) {
377 addresses->push_back(
String(standardAddress));
381 stageToSend->setIPAddresses(addresses);
382 stageToSend->setNodeId(static_cast<NodeID>(index));
392 deepCopyToCurrentAllocationBlock<AggregationJobStage>(stage);
396 if (numPartitionsOnThisNode == 0) {
397 numPartitionsOnThisNode = 1;
401 stageToSend->setNumNodePartitions(numPartitionsOnThisNode);
402 stageToSend->setTotalMemoryOnThisNode((
size_t)(*(this->
standardResources))[index]->getMemSize());
405 stageToSend->setAggTotalPartitions(
shuffleInfo->getNumHashPartitions());
416 deepCopyToCurrentAllocationBlock<BroadcastJoinBuildHTJobStage>(stage);
419 stageToSend->nullifyComputePlanPointer();
422 stageToSend->setTotalMemoryOnThisNode((
size_t)(*(this->
standardResources))[index]->getMemSize());
432 deepCopyToCurrentAllocationBlock<HashPartitionedJoinBuildHTJobStage>(stage);
435 stageToSend->nullifyComputePlanPointer();
439 if (numPartitionsOnThisNode == 0) {
440 numPartitionsOnThisNode = 1;
444 stageToSend->setNumNodePartitions(numPartitionsOnThisNode);
447 stageToSend->setTotalMemoryOnThisNode((
size_t)(*(this->
standardResources))[index]->getMemSize());
460 PDB_COUT <<
"counter = " << cnt << std::endl;
482 myWorker->execute(myWork, tempBuzzer);
509 if(communicator ==
nullptr) {
515 PDB_COUT <<
"About to collect stats on the " << node <<
"-th node" << std::endl;
520 std::cout << errMsg << std::endl;
526 PDB_COUT <<
"About to receive response from the " << node <<
"-th remote node" << std::endl;
531 if (!success || result ==
nullptr) {
532 PDB_COUT <<
"Can't get results from node with id=" << std::to_string(node) <<
" and ip=" << ip << std::endl;
539 for (
int j = 0; j < stats->size(); j++) {
558 std::string databaseName = setToUpdateStats->getDatabase();
559 std::string setName = setToUpdateStats->getSetName();
562 size_t numPages = setToUpdateStats->getNumPages();
563 size_t pageSize = setToUpdateStats->getPageSize();
564 size_t numBytes = numPages * pageSize;
577 ExecuteComputation_TYPEID,
585 RegisterReplica_TYPEID,
599 long input_data_id = this->
statisticsDB->getLatestDataId(std::pair<std::string, std::string> (request->getInputDatabaseName(),
600 request->getInputSetName()));
601 long output_data_id = this->
statisticsDB->getLatestDataId(std::pair<std::string, std::string>(request->getOutputDatabaseName(),
602 request->getOutputSetName()));
606 computations = request->getComputations();
607 std::cout <<
"there are " << computations->size() <<
" computations in total" << std::endl;
609 bool success = this->
statisticsDB->createDataTransformation(input_data_id,
611 request->getNumPartitions(),
612 request->getNumNodes(),
613 request->getReplicaType(),
614 request->getTCAPString(),
617 std::string errMsg =
"";
618 if (success ==
false) {
619 errMsg =
"error in register replica";
621 std::cout <<
"registered the input-output mapping with statisticsDB for id = " <<
id << std::endl;
624 PDB_COUT <<
"About to send back response to client" << std::endl;
626 if (!sendUsingMe->sendObject(result, errMsg)) {
627 errMsg =
"error in sending object to client";
628 return std::make_pair(
false, errMsg);
630 return std::make_pair(
true, errMsg);
642 PDB_COUT <<
"Got the ExecuteComputation object" << std::endl;
651 PDB_COUT <<
"Could not crate a database for " << this->jobId <<
", cleaning up!" << std::endl;
652 getFunctionality<QuerySchedulerServer>().
cleanup();
653 return std::make_pair(
false, errMsg);
657 PDB_COUT <<
"To get the resource object from the resource manager" << std::endl;
658 getFunctionality<QuerySchedulerServer>().
initialize();
682 auto graph = analyzerNodeFactory->generateAnalyzerGraph(sourcesComputations);
690 PDB_COUT <<
"Could not parse the compute plan. About to cleanup" << std::endl;
691 getFunctionality<QuerySchedulerServer>().
cleanup();
692 return std::make_pair(
false,
"Could not parse the compute plan. About to cleanup");
698 std::vector<Handle<AbstractJobStage>> jobStages;
699 std::vector<Handle<SetIdentifier>> intermediateSets;
718 PDB_COUT <<
"To schedule the query to run on the cluster" << std::endl;
728 PDB_COUT <<
"About to remove intermediate sets" << endl;
732 PDB_COUT <<
"About to send back response to client" << std::endl;
735 if (!sendUsingMe->sendObject(result, errMsg)) {
736 PDB_COUT <<
"About to cleanup" << std::endl;
737 getFunctionality<QuerySchedulerServer>().
cleanup();
738 return std::make_pair(
false, errMsg);
741 PDB_COUT <<
"About to cleanup" << std::endl;
742 getFunctionality<QuerySchedulerServer>().
cleanup();
743 return std::make_pair(
true, errMsg);
750 for (
auto &intermediateSet : intermediateSets) {
762 bool res = dsmClient.
removeTempSet(intermediateSet->getDatabase(),
763 intermediateSet->getSetName(),
769 std::cout <<
"can't remove temp set: " << errMsg << std::endl;
774 std::cout <<
"Removed set with database=" << intermediateSet->getDatabase() <<
", set="
775 << intermediateSet->getSetName() << std::endl;
786 bool res = dsmClient.
removeTempSet(intermediateSet->getDatabase(),
787 intermediateSet->getSetName(),
793 cout <<
"Can not remove temp set: " << errMsg << endl;
798 PDB_COUT <<
"Removed set with database=" << intermediateSet->getDatabase() <<
", set="
799 << intermediateSet->getSetName() << endl;
807 for (
const auto &intermediateSet : intermediateSets) {
811 bool res = dsmClient.
createTempSet(intermediateSet->getDatabase(),
812 intermediateSet->getSetName(),
815 intermediateSet->getPageSize());
819 cout <<
"Can not create temp set: " << errMsg << endl;
824 PDB_COUT <<
"Created set with database=" << intermediateSet->getDatabase() <<
", set="
825 << intermediateSet->getSetName() << endl;
835 bool success =
false;
844 std::cout << idx << std::endl;
std::shared_ptr< Statistics > StatisticsPtr
void scheduleStages(std::vector< Handle< AbstractJobStage >> &stagesToSchedule, std::shared_ptr< ShuffleInfo > shuffleInfo)
Handle< TupleSetJobStage > getStageToSend(unsigned long index, Handle< TupleSetJobStage > &stage)
std::shared_ptr< PhysicalOptimizer > physicalOptimizerPtr
pthread_mutex_t connection_mutex
void removeUnusedIntermediateSets(DistributedStorageManagerClient &dsmClient, vector< Handle< SetIdentifier >> &intermediateSets)
std::vector< StandardResourceInfoPtr > * standardResources
std::shared_ptr< ShuffleInfo > shuffleInfo
pair< bool, basic_string< char > > executeComputation(Handle< ExecuteComputation > &request, PDBCommunicatorPtr &sendUsingMe)
pair< bool, basic_string< char > > registerReplica(Handle< RegisterReplica > &request, PDBCommunicatorPtr &sendUsingMe)
#define DEFAULT_NUM_CORES
void extractPipelineStages(int &jobStageId, vector< Handle< AbstractJobStage >> &jobStages, vector< Handle< SetIdentifier >> &intermediateSets)
std::string getNextJobId()
shared_ptr< PDBWork > PDBWorkPtr
std::shared_ptr< LogicalPlan > LogicalPlanPtr
bool createTempSet(const std::string &databaseName, const std::string &setName, const std::string &typeName, std::string &errMsg, size_t pageSize=DEFAULT_PAGE_SIZE)
std::shared_ptr< StandardResourceInfo > StandardResourceInfoPtr
std::shared_ptr< PDBCommunicator > PDBCommunicatorPtr
bool removeTempSet(const std::string &databaseName, const std::string &setName, const std::string &typeName, std::string &errMsg)
bool scheduleStage(unsigned long node, Handle< T > &stage, PDBCommunicatorPtr communicator)
std::vector< Handle< SetIdentifier > > interGlobalSets
void registerHandlers(PDBServer &forMe) override
#define DEFAULT_BATCH_SIZE
shared_ptr< PDBBuzzer > PDBBuzzerPtr
shared_ptr< Configuration > ConfigurationPtr
PDBCommunicatorPtr getCommunicatorToNode(int port, std::string &ip)
shared_ptr< PDBWorker > PDBWorkerPtr
QuerySchedulerServer(PDBLoggerPtr logger, ConfigurationPtr conf, std::shared_ptr< StatisticsDB > statisticsDB, bool pseudoClusterMode=false, double partitionToCoreRatio=0.75)
void registerHandler(int16_t typeID, PDBCommWorkPtr handledBy)
void requestStatistics(PDBCommunicatorPtr &communicator, bool &success, string &errMsg) const
std::shared_ptr< AbstractPhysicalNodeFactory > AbstractPhysicalNodeFactoryPtr
void prepareAndScheduleStage(Handle< AbstractJobStage > &stage, unsigned long node, int &counter, PDBBuzzerPtr &callerBuzzer)
void createIntermediateSets(DistributedStorageManagerClient &dsmClient, vector< Handle< SetIdentifier >> &intermediateSets)
#define PROFILER_END_MESSAGE(id, message)
bool createDatabase(const std::string &databaseName, std::string &errMsg)
std::shared_ptr< PDBLogger > PDBLoggerPtr
std::vector< AtomicComputationPtr > & getAllScanSets()
void collectStatsForNode(int node, int &counter, PDBBuzzerPtr &callerBuzzer)
void initializeForPseudoClusterMode()
std::shared_ptr< StatisticsDB > statisticsDB
void initializeForServerMode()
void removeIntermediateSets(DistributedStorageManagerClient &dsmClient)
StatisticsPtr statsForOptimization
double partitionToCoreRatio
#define PROFILER_START(id)
void updateStats(Handle< SetIdentifier > setToUpdateStats)