A platform for high-performance distributed tool and library development written in C++. It can be deployed in two different cluster modes: standalone or distributed. API for v0.5.0, released on June 13, 2018.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
ManagerMain.cc
Go to the documentation of this file.
1 /*****************************************************************************
2  * *
3  * Copyright 2018 Rice University *
4  * *
5  * Licensed under the Apache License, Version 2.0 (the "License"); *
6  * you may not use this file except in compliance with the License. *
7  * You may obtain a copy of the License at *
8  * *
9  * http://www.apache.org/licenses/LICENSE-2.0 *
10  * *
11  * Unless required by applicable law or agreed to in writing, software *
12  * distributed under the License is distributed on an "AS IS" BASIS, *
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. *
14  * See the License for the specific language governing permissions and *
15  * limitations under the License. *
16  * *
17  *****************************************************************************/
18 #ifndef MASTER_MAIN_CC
19 #define MASTER_MAIN_CC
20 
21 #include <iostream>
22 #include <string>
23 
24 #include "PDBServer.h"
25 #include "CatalogServer.h"
26 #include "CatalogClient.h"
27 #include "ResourceManagerServer.h"
28 #include "PangeaStorageServer.h"
30 #include "DispatcherServer.h"
31 #include "QuerySchedulerServer.h"
32 #include "StorageAddDatabase.h"
33 #include "SharedEmployee.h"
34 
35 int main(int argc, char* argv[]) {
36  int port = 8108;
37  std::string managerIp;
38  std::string pemFile = "conf/pdb.key";
39  bool pseudoClusterMode = false;
40  double partitionToCoreRatio = 0.75;
41  if (argc == 3) {
42  managerIp = argv[1];
43  port = atoi(argv[2]);
44  } else if ((argc == 4) || (argc == 5) || (argc == 6)) {
45  managerIp = argv[1];
46  port = atoi(argv[2]);
47  std::string isPseudoStr(argv[3]);
48  if (isPseudoStr.compare(std::string("Y")) == 0) {
49  pseudoClusterMode = true;
50  std::cout << "Running in standalone cluster mode" << std::endl;
51  }
52  if ((argc == 5) || (argc == 6)) {
53  pemFile = argv[4];
54  }
55  if (argc == 6) {
56  partitionToCoreRatio = stod(argv[5]);
57  }
58 
59  } else {
60  std::cout << "[Usage] #managerIp #port #runPseudoClusterOnOneNode (Y for running a "
61  "pseudo-cluster on one node, N for running a real-cluster distributedly, and "
62  "default is N) #pemFile (by default is conf/pdb.key) #partitionToCoreRatio "
63  "(by default is 0.75)"
64  << std::endl;
65  exit(-1);
66  }
67 
68  std::cout << "Starting up a distributed storage manager server\n";
69  pdb::PDBLoggerPtr myLogger = make_shared<pdb::PDBLogger>("frontendLogFile.log");
70  pdb::PDBServer frontEnd(port, 100, myLogger);
71 
72  ConfigurationPtr conf = make_shared<Configuration>();
73 
74  frontEnd.addFunctionality<pdb::CatalogServer>("CatalogDir", true, managerIp, port);
75  frontEnd.addFunctionality<pdb::CatalogClient>(port, "localhost", myLogger);
76 
77  //initialize StatisticsDB
78  std::shared_ptr<StatisticsDB> statisticsDB = std::make_shared<StatisticsDB>(conf);
79  if (statisticsDB == nullptr) {
80  std::cout << "fatal error in initializing statisticsDB" << std::endl;
81  exit(1);
82  }
83 
84  std::string errMsg = " ";
85  int numNodes = 1;
86  string line = "";
87  string nodeName = "";
88  string hostName = "";
89  string serverListFile;
90  int portValue = 8108;
91 
92  if (pseudoClusterMode==true) serverListFile = "conf/serverlist.test";
93  else serverListFile = "conf/serverlist";
94 
95  frontEnd.addFunctionality<pdb::ResourceManagerServer>(
96  serverListFile, port, pseudoClusterMode, pemFile);
97  frontEnd.addFunctionality<pdb::DistributedStorageManagerServer>(myLogger, statisticsDB);
98  auto allNodes = frontEnd.getFunctionality<pdb::ResourceManagerServer>().getAllNodes();
99 
100  // registers metadata for manager node in the catalog
102  pdb::makeObject<pdb::CatalogNodeMetadata>(String("localhost:" + std::to_string(port)),
103  String("localhost"),
104  port,
105  String("manager"),
106  String("manager"),
107  1);
108 
109  if (!frontEnd.getFunctionality<pdb::CatalogServer>().addNodeMetadata(nodeData, errMsg)) {
110  std::cout << "Metadata for manager node was not added because " + errMsg << std::endl;
111  } else {
112  std::cout << "Metadata for manager node successfully added to catalog." << std::endl;
113  }
114 
115  // registers metadata for worker nodes in the catalog
116  makeObjectAllocatorBlock(4 * 1024 * 1024, true);
117 
118  for (int i = 0; i < allNodes->size(); i++) {
119  nodeName = "worker_" + std::to_string(numNodes);
120  hostName = (*allNodes)[i]->getAddress().c_str();
121  portValue = (*allNodes)[i]->getPort();
123  pdb::makeObject<pdb::CatalogNodeMetadata>(String(hostName + ":" + std::to_string(portValue)),
124  String(hostName),
125  portValue,
126  String(nodeName),
127  String("worker"),
128  1);
129  if (!frontEnd.getFunctionality<pdb::CatalogServer>().addNodeMetadata(workerNode, errMsg)) {
130  std::cout << "Metadata for worker node was not added because " + errMsg << std::endl;
131  } else {
132  std::cout << "Metadata for worker node successfully added to catalog. "
133  << hostName << " | " << std::to_string(portValue) << " | " << nodeName << " | "
134  << std::endl;
135  }
136  numNodes++;
137  }
138 
139  frontEnd.addFunctionality<pdb::DispatcherServer>(myLogger, statisticsDB);
140  frontEnd.getFunctionality<pdb::DispatcherServer>().registerStorageNodes(allNodes);
141 
142  frontEnd.addFunctionality<pdb::QuerySchedulerServer>(
143  port, myLogger, conf, statisticsDB, pseudoClusterMode, partitionToCoreRatio);
144  frontEnd.startServer(nullptr);
145 }
146 
147 #endif
Functionality & getFunctionality()
int main(int argc, char *argv[])
Definition: ManagerMain.cc:35
bool addNodeMetadata(Handle< CatalogNodeMetadata > &nodeMetadata, std::string &errMsg)
shared_ptr< Configuration > ConfigurationPtr
Definition: Configuration.h:89
std::shared_ptr< PDBLogger > PDBLoggerPtr
Definition: PDBLogger.h:40
void makeObjectAllocatorBlock(size_t numBytesIn, bool throwExceptionOnFail)