A platform for high-performance distributed tool and library development written in C++. It can be deployed in two different cluster modes: standalone or distributed. API for v0.5.0, released on June 13, 2018.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
QuerySchedulerServer.h
Go to the documentation of this file.
1 /*****************************************************************************
2  * *
3  * Copyright 2018 Rice University *
4  * *
5  * Licensed under the Apache License, Version 2.0 (the "License"); *
6  * you may not use this file except in compliance with the License. *
7  * You may obtain a copy of the License at *
8  * *
9  * http://www.apache.org/licenses/LICENSE-2.0 *
10  * *
11  * Unless required by applicable law or agreed to in writing, software *
12  * distributed under the License is distributed on an "AS IS" BASIS, *
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. *
14  * See the License for the specific language governing permissions and *
15  * limitations under the License. *
16  * *
17  *****************************************************************************/
18 #ifndef QUERY_SCHEDULER_SERVER_H
19 #define QUERY_SCHEDULER_SERVER_H
20 
21 
22 #include "ServerFunctionality.h"
23 #include "ResourceInfo.h"
24 #include "StandardResourceInfo.h"
25 #include "Handle.h"
26 #include "PDBVector.h"
27 #include "QueryBase.h"
28 #include "ResourceInfo.h"
30 #include "PDBLogger.h"
31 #include "TupleSetJobStage.h"
32 #include "AggregationJobStage.h"
35 #include "SequenceID.h"
36 #include "PhysicalOptimizer.h"
37 #include "ShuffleInfo.h"
39 #include "StatisticsDB.h"
40 #include "RegisterReplica.h"
41 #include <vector>
42 #include <ExecuteComputation.h>
43 
44 namespace pdb {
45 
63 
64 public:
65 
75  std::shared_ptr<StatisticsDB> statisticsDB,
76  bool pseudoClusterMode = false,
77  double partitionToCoreRatio = 0.75);
78 
88  PDBLoggerPtr logger,
89  ConfigurationPtr conf,
90  std::shared_ptr<StatisticsDB> statisticsDB,
91  bool pseudoClusterMode = false,
92  double partitionToCoreRatio = 0.75);
93 
98 
103  void registerHandlers(PDBServer& forMe) override;
104 
109  void collectStats();
110 
114  void cleanup() override;
115 
121 
127  std::string getNextJobId() {
128  time_t currentTime = time(nullptr);
129  struct tm* local = localtime(&currentTime);
130  this->jobId = "Job-" + std::to_string(local->tm_year + 1900) + "_" +
131  std::to_string(local->tm_mon + 1) + "_" + std::to_string(local->tm_mday) + "_" +
132  std::to_string(local->tm_hour) + "_" + std::to_string(local->tm_min) + "_" +
133  std::to_string(local->tm_sec) + "_" + std::to_string(seqId.getNextSequenceID());
134  return this->jobId;
135  }
136 
137 protected:
138 
143  void initialize();
144 
149 
154 
161  void scheduleStages(std::vector<Handle<AbstractJobStage>>& stagesToSchedule,
162  std::shared_ptr<ShuffleInfo> shuffleInfo);
163 
164 
174  unsigned long node,
175  int &counter,
176  PDBBuzzerPtr &callerBuzzer);
177 
184  template<typename T>
185  bool scheduleStage(unsigned long node,
186  Handle<T>& stage,
187  PDBCommunicatorPtr communicator);
188 
197  std::string &ip);
198 
205  Handle <TupleSetJobStage> getStageToSend(unsigned long index,
207 
214  Handle <AggregationJobStage> getStageToSend(unsigned long index,
216 
225 
235 
242  void collectStatsForNode(int node,
243  int &counter,
244  PDBBuzzerPtr &callerBuzzer);
245 
250  void updateStats(Handle<SetIdentifier> setToUpdateStats);
251 
257  pair<bool, basic_string<char>> executeComputation(Handle<ExecuteComputation> &request,
258  PDBCommunicatorPtr &sendUsingMe);
259 
265  pair<bool, basic_string<char>> registerReplica(Handle<RegisterReplica> &request,
266  PDBCommunicatorPtr &sendUsingMe);
267 
268 
277  void extractPipelineStages(int &jobStageId,
278  vector<Handle<AbstractJobStage>> &jobStages,
279  vector<Handle<SetIdentifier>> &intermediateSets);
280 
287  vector<Handle<SetIdentifier>> &intermediateSets);
288 
295 
304  vector<Handle<SetIdentifier>> &intermediateSets);
305 
312  void requestStatistics(PDBCommunicatorPtr &communicator, bool &success, string &errMsg) const;
313 
314 
318  std::vector<StandardResourceInfoPtr>* standardResources;
319 
323  int port;
324 
329  std::vector<Handle<SetIdentifier>> interGlobalSets;
330 
335 
340 
344  std::shared_ptr<StatisticsDB> statisticsDB;
345 
350 
356  pthread_mutex_t connection_mutex;
357 
363 
367  std::string jobId;
368 
374 
380  std::shared_ptr<PhysicalOptimizer> physicalOptimizerPtr;
381 
390 
394  std::shared_ptr<ShuffleInfo> shuffleInfo;
395 };
396 }
397 
398 
399 #endif
std::shared_ptr< Statistics > StatisticsPtr
Definition: Statistics.h:27
void scheduleStages(std::vector< Handle< AbstractJobStage >> &stagesToSchedule, std::shared_ptr< ShuffleInfo > shuffleInfo)
Handle< TupleSetJobStage > getStageToSend(unsigned long index, Handle< TupleSetJobStage > &stage)
std::shared_ptr< PhysicalOptimizer > physicalOptimizerPtr
void removeUnusedIntermediateSets(DistributedStorageManagerClient &dsmClient, vector< Handle< SetIdentifier >> &intermediateSets)
std::vector< StandardResourceInfoPtr > * standardResources
std::shared_ptr< ShuffleInfo > shuffleInfo
pair< bool, basic_string< char > > executeComputation(Handle< ExecuteComputation > &request, PDBCommunicatorPtr &sendUsingMe)
pair< bool, basic_string< char > > registerReplica(Handle< RegisterReplica > &request, PDBCommunicatorPtr &sendUsingMe)
void extractPipelineStages(int &jobStageId, vector< Handle< AbstractJobStage >> &jobStages, vector< Handle< SetIdentifier >> &intermediateSets)
std::shared_ptr< PDBCommunicator > PDBCommunicatorPtr
bool scheduleStage(unsigned long node, Handle< T > &stage, PDBCommunicatorPtr communicator)
std::vector< Handle< SetIdentifier > > interGlobalSets
void registerHandlers(PDBServer &forMe) override
shared_ptr< PDBBuzzer > PDBBuzzerPtr
Definition: PDBBuzzer.h:32
shared_ptr< Configuration > ConfigurationPtr
Definition: Configuration.h:89
PDBCommunicatorPtr getCommunicatorToNode(int port, std::string &ip)
QuerySchedulerServer(PDBLoggerPtr logger, ConfigurationPtr conf, std::shared_ptr< StatisticsDB > statisticsDB, bool pseudoClusterMode=false, double partitionToCoreRatio=0.75)
void requestStatistics(PDBCommunicatorPtr &communicator, bool &success, string &errMsg) const
unsigned int getNextSequenceID()
Definition: SequenceID.h:43
void prepareAndScheduleStage(Handle< AbstractJobStage > &stage, unsigned long node, int &counter, PDBBuzzerPtr &callerBuzzer)
void createIntermediateSets(DistributedStorageManagerClient &dsmClient, vector< Handle< SetIdentifier >> &intermediateSets)
std::shared_ptr< PDBLogger > PDBLoggerPtr
Definition: PDBLogger.h:40
void collectStatsForNode(int node, int &counter, PDBBuzzerPtr &callerBuzzer)
std::shared_ptr< StatisticsDB > statisticsDB
void removeIntermediateSets(DistributedStorageManagerClient &dsmClient)
void updateStats(Handle< SetIdentifier > setToUpdateStats)