A platform for high-performance distributed tool and library development written in C++. It can be deployed in two different cluster modes: standalone or distributed. API for v0.5.0, released on June 13, 2018.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
PDBDistributionManager.cc
Go to the documentation of this file.
1 /*****************************************************************************
2  * *
3  * Copyright 2018 Rice University *
4  * *
5  * Licensed under the Apache License, Version 2.0 (the "License"); *
6  * you may not use this file except in compliance with the License. *
7  * You may obtain a copy of the License at *
8  * *
9  * http://www.apache.org/licenses/LICENSE-2.0 *
10  * *
11  * Unless required by applicable law or agreed to in writing, software *
12  * distributed under the License is distributed on an "AS IS" BASIS, *
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. *
14  * See the License for the specific language governing permissions and *
15  * limitations under the License. *
16  * *
17  *****************************************************************************/
18 /*
19  * PDBDistributionManager.cc
20  *
21  * Created on: Nov 30, 2015
22  * Author: kia
23  */
24 #ifndef PDB_DISTRIBUTION_MANAGER_CC
25 #define PDB_DISTRIBUTION_MANAGER_CC
26 
27 #include "PDBDistributionManager.h"
28 
29 #include <stdio.h>
30 #include <time.h>
31 
32 #include "PDBLogger.h"
33 
34 #include <iostream>
35 #include <vector>
36 #include <string>
37 #include <memory>
38 #include <pthread.h>
39 
40 #include <signal.h>
41 #include <stdio.h>
42 #include <sys/socket.h>
43 #include <sys/types.h>
44 #include <map>
45 #include <stdio.h>
46 #include <time.h>
47 #include <chrono>
48 #include <uuid/uuid.h>
49 
51 
52 using namespace std;
53 
54 namespace pdb {
55 
56 PDBDistributionManager::PDBDistributionManager() {
57  pthread_mutex_init(&this->writeLock, nullptr);
58  this->heartBeatCounter = 0;
59 }
60 
61 PDBDistributionManager::~PDBDistributionManager() {}
62 
63 bool PDBDistributionManager::addOrUpdateNodes(PDBLoggerPtr myLoggerIn, string& nodeID) {
64 
65  std::chrono::time_point<std::chrono::system_clock> p;
66  p = std::chrono::system_clock::now();
67  long timeCounter =
68  std::chrono::duration_cast<std::chrono::nanoseconds>(p.time_since_epoch()).count();
69 
70  // Lock for write
71 
72  pthread_mutex_lock(&this->writeLock);
73  this->heartBeatCounter++;
74  // if we have 20 heart beat messages from any servers then we activate the cleaning process.
75  if (this->heartBeatCounter >= 20) {
76 
77  // iterate over the list of nodes and remove old ones.
78  for (auto myPair = nodesOfCluster.begin(); myPair != nodesOfCluster.end();) {
79  // if a node does not send heart beat messages then remove it from the list. If we don't
80  // get heart beat more than 10sec.
81  if ((timeCounter - myPair->second) > 10000000000) {
82  string m_nodehostname = myPair->first;
83  myLoggerIn->debug("PDBDistributionManager: Not responding node to remove " +
84  m_nodehostname + " No. of Nodes in Cluster " +
85  to_string(nodesOfCluster.size()));
86  myPair = nodesOfCluster.erase(myPair);
87  } else
88  ++myPair;
89  }
90  this->heartBeatCounter = 0;
91  }
92  // unlock
93  pthread_mutex_unlock(&this->writeLock);
94 
95  if (this->nodesOfCluster.count(nodeID) == 0) {
96  // insert the nodeID and timestamp
97  pthread_mutex_lock(&this->writeLock);
98  this->nodesOfCluster[nodeID] = timeCounter;
99  pthread_mutex_unlock(&this->writeLock);
100 
101  myLoggerIn->trace("PDBDistributionManager: Node with ID " + nodeID +
102  " added. No. of Nodes in Cluster " + to_string(nodesOfCluster.size()));
103  return false;
104  } else {
105 
106  // insert the nodeID and timestamp
107  pthread_mutex_lock(&this->writeLock);
108  this->nodesOfCluster[nodeID] = timeCounter;
109  pthread_mutex_unlock(&this->writeLock);
110 
111  myLoggerIn->trace("PDBDistributionManager: Node with ID " + nodeID +
112  " updated. No. of Nodes in Cluster " +
113  to_string(nodesOfCluster.size()));
114  return true;
115  }
116 }
117 
123 string PDBDistributionManager::getPermitToRunQuery(PDBLoggerPtr myLoggerIn) {
124 
125  // TODO: we need to know when to admit permission to run queries and when not.
126 
127  uuid_t uuid;
128 
129  // generate
130  uuid_generate(uuid);
131 
132  char uuid_str[37]; // ex. "1b4e28ba-2fa1-11d2-883f-0016d3cca427" + "\0"
133  uuid_unparse_lower(uuid, uuid_str);
134 
135  // check the time now and store the time of permitting the query.
136  std::chrono::time_point<std::chrono::system_clock> p;
137  p = std::chrono::system_clock::now();
138  long timeCounter =
139  std::chrono::duration_cast<std::chrono::nanoseconds>(p.time_since_epoch()).count();
140 
141  string uuidString = uuid_str;
142 
143  // store time of running this query.
144  runningQueries[uuidString] = timeCounter;
145 
146  myLoggerIn->trace("PDBDistributionManager: permitted running a new query with GUID " +
147  uuidString + " at time " + to_string(timeCounter) +
148  " No. of Queries: " + to_string(runningQueries.size()));
149 
150  return uuid_str;
151 }
152 
157 int PDBDistributionManager::queryIsDone(string& queryID, PDBLoggerPtr logToMe) {
158 
159  if (runningQueries.find(queryID) != runningQueries.end()) {
160  map<string, long>::iterator tmp = runningQueries.find(queryID);
161  // erase it from the map
162  runningQueries.erase(tmp);
163  logToMe->trace("PDBDistributionManager: running query with " + queryID +
164  " is done and erased from memory." +
165  " No. of Queries: " + to_string(runningQueries.size()));
166  return 1;
167  } else {
168  logToMe->error("PDBDistributionManager: running query with " + queryID +
169  " could not be found." +
170  " No. of Queries: " + to_string(runningQueries.size()));
171  return 0;
172  }
173 }
174 }
175 #endif
std::shared_ptr< PDBLogger > PDBLoggerPtr
Definition: PDBLogger.h:40