A platform for high-performance distributed tool and library development written in C++. It can be deployed in two different cluster modes: standalone or distributed. API for v0.5.0, released on June 13, 2018.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
ResourceManagerServer.cc
Go to the documentation of this file.
1 /*****************************************************************************
2  * *
3  * Copyright 2018 Rice University *
4  * *
5  * Licensed under the Apache License, Version 2.0 (the "License"); *
6  * you may not use this file except in compliance with the License. *
7  * You may obtain a copy of the License at *
8  * *
9  * http://www.apache.org/licenses/LICENSE-2.0 *
10  * *
11  * Unless required by applicable law or agreed to in writing, software *
12  * distributed under the License is distributed on an "AS IS" BASIS, *
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. *
14  * See the License for the specific language governing permissions and *
15  * limitations under the License. *
16  * *
17  *****************************************************************************/
18 #ifndef RESOURCE_MANAGER_SERVER_CC
19 #define RESOURCE_MANAGER_SERVER_CC
20 
21 #include "PDBDebug.h"
22 #include "ResourceManagerServer.h"
23 #include "InterfaceFunctions.h"
24 #include "SimpleRequestHandler.h"
25 #include "RequestResources.h"
26 #include "DataTypes.h"
27 #include <stdlib.h>
28 #include <regex>
29 #include <iostream>
30 #include <fstream>
31 #include <sstream>
32 #include <arpa/inet.h>
33 
34 
35 namespace pdb {
36 
37 
39 
40 
42 
43  this->resources = nullptr;
44  this->nodes = nullptr;
45 }
46 
47 
48 ResourceManagerServer::ResourceManagerServer(std::string pathToServerList,
49  int port,
50  bool pseudoClusterMode,
51  std::string pemFile) {
52  this->port = port;
53  this->pseudoClusterMode = pseudoClusterMode;
54  this->pemFile = pemFile;
55  this->initialize(pathToServerList);
56 }
57 
58 
60  return resources;
61 }
62 
64  return nodes;
65 }
66 
67 
68 void ResourceManagerServer::initialize(std::string pathToServerList) {
69 
70  // Allocate a Vector
71  int maxNodeNum = 1000;
72  makeObjectAllocatorBlock(2 * 1024 * 1024, true);
73  this->resources = makeObject<Vector<Handle<ResourceInfo>>>(maxNodeNum);
74  this->nodes = makeObject<Vector<Handle<NodeDispatcherData>>>(maxNodeNum);
75  analyzeNodes(pathToServerList);
76  if (pseudoClusterMode == false) {
77  // to run a script to obtain all system resources
78  std::string command = std::string("scripts/internal/collect_proc.sh ") + this->pemFile;
79  PDB_COUT << command << std::endl;
80  int ret = system(command.c_str());
81  if (ret < 0) {
82  std::cout << "Resource manager: failed to collect cluster information, try to use the "
83  "default one"
84  << std::endl;
85  }
86  analyzeResources("conf/cluster/cluster_info.txt");
87  }
88 }
89 
90 void ResourceManagerServer::analyzeNodes(std::string serverlist) {
91  PDB_COUT << serverlist << std::endl;
92  std::string inputLine;
93  std::string address;
94  int port, sfd;
95  bool connectSuccess = true;
96  NodeID nodeId = 0;
97  std::ifstream nodeFile(serverlist);
98  if (nodeFile.is_open()) {
99  while (!nodeFile.eof()) {
100  std::getline(nodeFile, inputLine);
101  size_t pos = inputLine.find("#");
102  // processes only valid entries, skips commented and empty lines
103  if (inputLine.length() != 0 && pos == string::npos) {
104  pos = inputLine.find(":");
105  if (pos != string::npos) {
106  port = stoi(inputLine.substr(pos + 1, inputLine.size()));
107  address = inputLine.substr(0, pos);
108  } else {
109  // TODO: we should not hardcode 8108
110  port = 8108;
111  address = inputLine;
112  }
113 
114  struct addrinfo hints;
115  struct addrinfo *result, *rp;
116  char portValue[10];
117  sprintf(portValue, "%d", 22);
118 
119  memset(&hints, 0, sizeof(struct addrinfo));
120  hints.ai_family = AF_INET;
121  hints.ai_socktype = SOCK_STREAM;
122  hints.ai_flags = 0;
123  hints.ai_protocol = 0;
124 
125  int s = getaddrinfo(address.c_str(), portValue, &hints, &result);
126  if (s != 0) {
127  continue;
128  } else {
129  for (rp = result; rp != NULL; rp = rp->ai_next) {
130  int count = 0;
131  while (count <= 3) {
132  sfd = socket(result->ai_family, result->ai_socktype,
133  result->ai_protocol);
134  if (sfd == -1) {
135  continue;
136  }
137  int co = ::connect(sfd, rp->ai_addr, rp->ai_addrlen);
138  if (co != -1) {
139  connectSuccess = true;
140  const UseTemporaryAllocationBlock block(1024);
141  Handle<NodeDispatcherData> node = makeObject<NodeDispatcherData>(nodeId, port, address);
142  this->nodes->push_back(node);
143  nodeId++;
144  break;
145  } else {
146  connectSuccess = false;
147  continue;
148  }
149  count++;
150  close(sfd);
151  } // while
152  if (connectSuccess == true) {
153  break;
154  }
155  } // for
156  } // else
157  }
158  }
159  if (nodes->size() == 0) {
160  PDB_COUT << "No workers in the cluster, stopping" << std::endl;
161  exit(-1);
162  }
163  nodeFile.close();
164  } else {
165  std::cout << "file can't be open" << std::endl;
166  }
167 }
168 
169 
170 void ResourceManagerServer::analyzeResources(std::string resourceFileName) {
171 
172  // to analyze and obtain resources
173  std::string line;
174  int numCores;
175  int memSize; // in MB
176  int numServers = 0;
177  std::ifstream resourceFile(resourceFileName);
178  if (resourceFile.is_open()) {
179  while (!resourceFile.eof()) {
180 
181  std::getline(resourceFile, line);
182  if (line.find("CPUNumber") != string::npos) {
183  // get the number of cores
184  PDB_COUT << line << std::endl;
185  const std::regex r("[0-9]+");
186  numCores = 0;
187  std::sregex_iterator N(line.begin(), line.end(), r);
188  std::stringstream SS(*N->begin());
189  numCores = 0;
190  SS >> numCores;
191  PDB_COUT << "numCores =" << numCores << std::endl;
192  }
193 
194  if (line.find("MemTotal") != string::npos) {
195  PDB_COUT << line << std::endl;
196  // get the size of memory in MB
197  const std::regex r("[0-9]+");
198  memSize = 0;
199  std::sregex_iterator N(line.begin(), line.end(), r);
200  std::stringstream SS(*N->begin());
201  SS >> memSize;
202  PDB_COUT << "memSize =" << memSize << std::endl;
203  Handle<ResourceInfo> resource = makeObject<ResourceInfo>(
204  numCores, memSize, (*this->nodes)[numServers]->getAddress(), port, numServers);
205  PDB_COUT << numServers << ": address=" << resource->getAddress()
206  << ", numCores=" << resource->getNumCores()
207  << ", memSize=" << resource->getMemSize() << std::endl;
208  this->resources->push_back(resource);
209  numServers++;
210  }
211  }
212 
213  resourceFile.close();
214 
215  } else {
216 
217  std::cout << resourceFileName << "can't be open." << std::endl;
218  }
219 }
220 
221 
223  // Now we use ResourceManager through getFunctionality() at Scheduler and Dispatcher
224 }
225 }
226 
227 #endif
unsigned int NodeID
Definition: DataTypes.h:27
ResourceManagerServer(std::string pathToServerList, int port, bool pseudoClusterMode=false, std::string pemFile="conf/pdb.key")
Handle< Vector< Handle< ResourceInfo > > > resources
void analyzeResources(std::string resourceFileName)
Handle< Vector< Handle< ResourceInfo > > > getAllResources()
#define PDB_COUT
Definition: PDBDebug.h:31
void analyzeNodes(std::string serverList)
void makeObjectAllocatorBlock(size_t numBytesIn, bool throwExceptionOnFail)
void initialize(std::string pathToServerList)
void registerHandlers(PDBServer &forMe) override
Handle< Vector< Handle< NodeDispatcherData > > > nodes
Handle< Vector< Handle< NodeDispatcherData > > > getAllNodes()