19 #ifndef WORKER_MAIN_CC
20 #define WORKER_MAIN_CC
30 int main(
int argc,
char* argv[]) {
32 std::cout <<
"Starting up a PDB server!!\n";
33 std::cout <<
"[Usage] #numThreads(optional) #sharedMemSize(optional, unit: MB) "
34 "#managerIp(optional) #localIp(optional)"
40 size_t sharedMemSize = (size_t)12 * (
size_t)1024 * (size_t)1024 * (
size_t)1024;
41 bool standalone =
true;
42 std::string managerIp;
43 std::string localIp = conf->getServerAddress();
44 int managerPort = conf->getPort();
45 int localPort = conf->getPort();
47 numThreads = atoi(argv[1]);
51 numThreads = atoi(argv[1]);
52 sharedMemSize = (size_t)(atoi(argv[2])) * (
size_t)1024 * (size_t)1024;
56 std::cout <<
"You must provide both managerIp and localIp" << std::endl;
61 numThreads = atoi(argv[1]);
62 sharedMemSize = (size_t)(atoi(argv[2])) * (
size_t)1024 * (size_t)1024;
64 string managerAccess(argv[3]);
65 size_t pos = managerAccess.find(
":");
66 if (pos != string::npos) {
67 managerPort = stoi(managerAccess.substr(pos + 1, managerAccess.size()));
69 managerIp = managerAccess.substr(0, pos);
72 managerIp = managerAccess;
74 string workerAccess(argv[4]);
75 pos = workerAccess.find(
":");
76 if (pos != string::npos) {
77 localPort = stoi(workerAccess.substr(pos + 1, workerAccess.size()));
78 localIp = workerAccess.substr(0, pos);
79 conf->setPort(localPort);
82 localIp = workerAccess;
87 std::cout <<
"Thread number =" << numThreads << std::endl;
88 std::cout <<
"Shared memory size =" << sharedMemSize << std::endl;
90 if (standalone ==
true) {
91 std::cout <<
"We are now running in standalone mode" << std::endl;
93 std::cout <<
"We are now running in distribution mode" << std::endl;
94 std::cout <<
"Manager IP:" << managerIp << std::endl;
95 std::cout <<
"Manager Port:" << managerPort << std::endl;
96 conf->setIsManager(
false);
97 conf->setManagerNodeHostName(managerIp);
98 conf->setManagerNodePort(managerPort);
99 std::cout <<
"Local IP:" << localIp << std::endl;
100 std::cout <<
"Local Port:" << localPort << std::endl;
102 std::string frontendLoggerFile = std::string(
"frontend_") + localIp + std::string(
"_") +
103 std::to_string(localPort) + std::string(
".log");
105 conf->setNumThreads(numThreads);
106 conf->setShmSize(sharedMemSize);
107 SharedMemPtr shm = make_shared<SharedMem>(conf->getShmSize(), logger);
109 std::string ipcFile =
110 std::string(
"/tmp/") + localIp + std::string(
"_") + std::to_string(localPort);
111 std::cout <<
"ipcFile=" << ipcFile << std::endl;
112 conf->setBackEndIpcFile(ipcFile);
115 if (shm !=
nullptr) {
116 pid_t child_pid = fork();
117 if (child_pid == 0) {
119 std::string backendLoggerFile = std::string(
"backend_") + localIp + std::string(
"_") +
120 std::to_string(localPort) + std::string(
".log");
124 shm, backEnd.getWorkerQueue(), logger, conf);
125 bool usePangea =
true;
126 std::string clientLoggerFile = std::string(
"client_") + localIp + std::string(
"_") +
127 std::to_string(localPort) + std::string(
".log");
129 localPort,
"localhost", make_shared<pdb::PDBLogger>(clientLoggerFile), usePangea);
130 backEnd.startServer(
nullptr);
132 }
else if (child_pid == -1) {
133 std::cout <<
"Fatal Error: fork failed." << std::endl;
141 bool createSet =
true;
142 if (standalone ==
false) {
146 if (standalone ==
true) {
147 string nodeName =
"standalone";
148 string nodeType =
"manager";
152 pdb::makeObject<pdb::CatalogNodeMetadata>(
153 String(
"localhost:" + std::to_string(localPort)),
160 "CatalogDir",
true,
"localhost", localPort);
162 std::cout <<
"to register node metadata in catalog..." << std::endl;
165 std::cout <<
"Not able to register node metadata: " + errMsg << std::endl;
167 <<
"Please change the parameters: nodeIP, port, nodeName, nodeType, status."
170 std::cout <<
"Node metadata successfully added.\n";
175 std::string catalogFile = std::string(
"CatalogDir_") + localIp + std::string(
"_") +
176 std::to_string(localPort);
178 catalogFile,
false, managerIp, managerPort);
int main(int argc, char *argv[])
void startServer(PDBWorkPtr runMeAtStart)
shared_ptr< SharedMem > SharedMemPtr
bool addNodeMetadata(Handle< CatalogNodeMetadata > &nodeMetadata, std::string &errMsg)
PDBWorkerQueuePtr getWorkerQueue()
shared_ptr< Configuration > ConfigurationPtr
std::shared_ptr< PDBLogger > PDBLoggerPtr
Functionality & getFunctionality()
void addFunctionality(Args &&...args)