A platform for high-performance distributed tool and library development written in C++. It can be deployed in two different cluster modes: standalone or distributed. API for v0.5.0, released on June 13, 2018.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
WorkerMain.cc
Go to the documentation of this file.
1 /*****************************************************************************
2  * *
3  * Copyright 2018 Rice University *
4  * *
5  * Licensed under the Apache License, Version 2.0 (the "License"); *
6  * you may not use this file except in compliance with the License. *
7  * You may obtain a copy of the License at *
8  * *
9  * http://www.apache.org/licenses/LICENSE-2.0 *
10  * *
11  * Unless required by applicable law or agreed to in writing, software *
12  * distributed under the License is distributed on an "AS IS" BASIS, *
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. *
14  * See the License for the specific language governing permissions and *
15  * limitations under the License. *
16  * *
17  *****************************************************************************/
18 
19 #ifndef WORKER_MAIN_CC
20 #define WORKER_MAIN_CC
21 
22 #include "PDBServer.h"
23 #include "CatalogServer.h"
24 #include "CatalogClient.h"
25 #include "StorageClient.h"
26 #include "PangeaStorageServer.h"
28 #include "HermesExecutionServer.h"
29 
30 int main(int argc, char* argv[]) {
31 
32  std::cout << "Starting up a PDB server!!\n";
33  std::cout << "[Usage] #numThreads(optional) #sharedMemSize(optional, unit: MB) "
34  "#managerIp(optional) #localIp(optional)"
35  << std::endl;
36 
37  ConfigurationPtr conf = make_shared<Configuration>();
38 
39  int numThreads = 1;
40  size_t sharedMemSize = (size_t)12 * (size_t)1024 * (size_t)1024 * (size_t)1024;
41  bool standalone = true;
42  std::string managerIp;
43  std::string localIp = conf->getServerAddress();
44  int managerPort = conf->getPort();
45  int localPort = conf->getPort();
46  if (argc == 2) {
47  numThreads = atoi(argv[1]);
48  }
49 
50  if (argc == 3) {
51  numThreads = atoi(argv[1]);
52  sharedMemSize = (size_t)(atoi(argv[2])) * (size_t)1024 * (size_t)1024;
53  }
54 
55  if (argc == 4) {
56  std::cout << "You must provide both managerIp and localIp" << std::endl;
57  exit(-1);
58  }
59 
60  if (argc == 5) {
61  numThreads = atoi(argv[1]);
62  sharedMemSize = (size_t)(atoi(argv[2])) * (size_t)1024 * (size_t)1024;
63  standalone = false;
64  string managerAccess(argv[3]);
65  size_t pos = managerAccess.find(":");
66  if (pos != string::npos) {
67  managerPort = stoi(managerAccess.substr(pos + 1, managerAccess.size()));
68 
69  managerIp = managerAccess.substr(0, pos);
70  } else {
71  managerPort = 8108;
72  managerIp = managerAccess;
73  }
74  string workerAccess(argv[4]);
75  pos = workerAccess.find(":");
76  if (pos != string::npos) {
77  localPort = stoi(workerAccess.substr(pos + 1, workerAccess.size()));
78  localIp = workerAccess.substr(0, pos);
79  conf->setPort(localPort);
80  } else {
81  localPort = 8108;
82  localIp = workerAccess;
83  }
84  }
85  conf->initDirs();
86 
87  std::cout << "Thread number =" << numThreads << std::endl;
88  std::cout << "Shared memory size =" << sharedMemSize << std::endl;
89 
90  if (standalone == true) {
91  std::cout << "We are now running in standalone mode" << std::endl;
92  } else {
93  std::cout << "We are now running in distribution mode" << std::endl;
94  std::cout << "Manager IP:" << managerIp << std::endl;
95  std::cout << "Manager Port:" << managerPort << std::endl;
96  conf->setIsManager(false);
97  conf->setManagerNodeHostName(managerIp);
98  conf->setManagerNodePort(managerPort);
99  std::cout << "Local IP:" << localIp << std::endl;
100  std::cout << "Local Port:" << localPort << std::endl;
101  }
102  std::string frontendLoggerFile = std::string("frontend_") + localIp + std::string("_") +
103  std::to_string(localPort) + std::string(".log");
104  pdb::PDBLoggerPtr logger = make_shared<pdb::PDBLogger>(frontendLoggerFile);
105  conf->setNumThreads(numThreads);
106  conf->setShmSize(sharedMemSize);
107  SharedMemPtr shm = make_shared<SharedMem>(conf->getShmSize(), logger);
108 
109  std::string ipcFile =
110  std::string("/tmp/") + localIp + std::string("_") + std::to_string(localPort);
111  std::cout << "ipcFile=" << ipcFile << std::endl;
112  conf->setBackEndIpcFile(ipcFile);
113 
114  string errMsg;
115  if (shm != nullptr) {
116  pid_t child_pid = fork();
117  if (child_pid == 0) {
118  // I'm the backend server
119  std::string backendLoggerFile = std::string("backend_") + localIp + std::string("_") +
120  std::to_string(localPort) + std::string(".log");
121  pdb::PDBLoggerPtr logger = make_shared<pdb::PDBLogger>(backendLoggerFile);
122  pdb::PDBServer backEnd(conf->getBackEndIpcFile(), 100, logger);
124  shm, backEnd.getWorkerQueue(), logger, conf);
125  bool usePangea = true;
126  std::string clientLoggerFile = std::string("client_") + localIp + std::string("_") +
127  std::to_string(localPort) + std::string(".log");
128  backEnd.addFunctionality<pdb::StorageClient>(
129  localPort, "localhost", make_shared<pdb::PDBLogger>(clientLoggerFile), usePangea);
130  backEnd.startServer(nullptr);
131 
132  } else if (child_pid == -1) {
133  std::cout << "Fatal Error: fork failed." << std::endl;
134  } else {
135  // I'm the frontend server
136  pdb::PDBServer frontEnd(localPort, 100, logger);
137  // frontEnd.addFunctionality<pdb :: PipelineDummyTestServer>();
139  shm, frontEnd.getWorkerQueue(), logger, conf, standalone);
140  frontEnd.getFunctionality<pdb::PangeaStorageServer>().startFlushConsumerThreads();
141  bool createSet = true;
142  if (standalone == false) {
143  createSet = false;
144  }
145  frontEnd.addFunctionality<pdb::FrontendQueryTestServer>(standalone, createSet);
146  if (standalone == true) {
147  string nodeName = "standalone";
148  string nodeType = "manager";
149 
150  pdb::UseTemporaryAllocationBlock tempBlock{1024 * 1024};
152  pdb::makeObject<pdb::CatalogNodeMetadata>(
153  String("localhost:" + std::to_string(localPort)),
154  String("localhost"),
155  localPort,
156  String(nodeName),
157  String(nodeType),
158  1);
160  "CatalogDir", true, "localhost", localPort);
161  frontEnd.addFunctionality<pdb::CatalogClient>(localPort, "localhost", logger);
162  std::cout << "to register node metadata in catalog..." << std::endl;
163  if (!frontEnd.getFunctionality<pdb::CatalogServer>().addNodeMetadata(nodeData,
164  errMsg)) {
165  std::cout << "Not able to register node metadata: " + errMsg << std::endl;
166  std::cout
167  << "Please change the parameters: nodeIP, port, nodeName, nodeType, status."
168  << std::endl;
169  } else {
170  std::cout << "Node metadata successfully added.\n";
171  }
172 
173  } else {
174 
175  std::string catalogFile = std::string("CatalogDir_") + localIp + std::string("_") +
176  std::to_string(localPort);
178  catalogFile, false, managerIp, managerPort);
179  frontEnd.addFunctionality<pdb::CatalogClient>(localPort, "localhost", logger);
180  }
181 
182  frontEnd.startServer(nullptr);
183  }
184  }
185 }
186 
187 #endif
int main(int argc, char *argv[])
Definition: WorkerMain.cc:30
void startServer(PDBWorkPtr runMeAtStart)
Definition: PDBServer.cc:348
shared_ptr< SharedMem > SharedMemPtr
Definition: SharedMem.h:32
bool addNodeMetadata(Handle< CatalogNodeMetadata > &nodeMetadata, std::string &errMsg)
PDBWorkerQueuePtr getWorkerQueue()
Definition: PDBServer.cc:228
shared_ptr< Configuration > ConfigurationPtr
Definition: Configuration.h:89
std::shared_ptr< PDBLogger > PDBLoggerPtr
Definition: PDBLogger.h:40
Functionality & getFunctionality()
void addFunctionality(Args &&...args)