24 #ifndef PDB_DISTRIBUTION_MANAGER_CC
25 #define PDB_DISTRIBUTION_MANAGER_CC
42 #include <sys/socket.h>
43 #include <sys/types.h>
48 #include <uuid/uuid.h>
56 PDBDistributionManager::PDBDistributionManager() {
57 pthread_mutex_init(&this->writeLock,
nullptr);
58 this->heartBeatCounter = 0;
61 PDBDistributionManager::~PDBDistributionManager() {}
63 bool PDBDistributionManager::addOrUpdateNodes(
PDBLoggerPtr myLoggerIn,
string& nodeID) {
65 std::chrono::time_point<std::chrono::system_clock> p;
66 p = std::chrono::system_clock::now();
68 std::chrono::duration_cast<std::chrono::nanoseconds>(p.time_since_epoch()).count();
72 pthread_mutex_lock(&this->writeLock);
73 this->heartBeatCounter++;
75 if (this->heartBeatCounter >= 20) {
78 for (
auto myPair = nodesOfCluster.begin(); myPair != nodesOfCluster.end();) {
81 if ((timeCounter - myPair->second) > 10000000000) {
82 string m_nodehostname = myPair->first;
83 myLoggerIn->debug(
"PDBDistributionManager: Not responding node to remove " +
84 m_nodehostname +
" No. of Nodes in Cluster " +
85 to_string(nodesOfCluster.size()));
86 myPair = nodesOfCluster.erase(myPair);
90 this->heartBeatCounter = 0;
93 pthread_mutex_unlock(&this->writeLock);
95 if (this->nodesOfCluster.count(nodeID) == 0) {
97 pthread_mutex_lock(&this->writeLock);
98 this->nodesOfCluster[nodeID] = timeCounter;
99 pthread_mutex_unlock(&this->writeLock);
101 myLoggerIn->trace(
"PDBDistributionManager: Node with ID " + nodeID +
102 " added. No. of Nodes in Cluster " + to_string(nodesOfCluster.size()));
107 pthread_mutex_lock(&this->writeLock);
108 this->nodesOfCluster[nodeID] = timeCounter;
109 pthread_mutex_unlock(&this->writeLock);
111 myLoggerIn->trace(
"PDBDistributionManager: Node with ID " + nodeID +
112 " updated. No. of Nodes in Cluster " +
113 to_string(nodesOfCluster.size()));
123 string PDBDistributionManager::getPermitToRunQuery(
PDBLoggerPtr myLoggerIn) {
133 uuid_unparse_lower(uuid, uuid_str);
136 std::chrono::time_point<std::chrono::system_clock> p;
137 p = std::chrono::system_clock::now();
139 std::chrono::duration_cast<std::chrono::nanoseconds>(p.time_since_epoch()).count();
141 string uuidString = uuid_str;
144 runningQueries[uuidString] = timeCounter;
146 myLoggerIn->trace(
"PDBDistributionManager: permitted running a new query with GUID " +
147 uuidString +
" at time " + to_string(timeCounter) +
148 " No. of Queries: " + to_string(runningQueries.size()));
157 int PDBDistributionManager::queryIsDone(
string& queryID,
PDBLoggerPtr logToMe) {
159 if (runningQueries.find(queryID) != runningQueries.end()) {
160 map<string, long>::iterator tmp = runningQueries.find(queryID);
162 runningQueries.erase(tmp);
163 logToMe->trace(
"PDBDistributionManager: running query with " + queryID +
164 " is done and erased from memory." +
165 " No. of Queries: " + to_string(runningQueries.size()));
168 logToMe->error(
"PDBDistributionManager: running query with " + queryID +
169 " could not be found." +
170 " No. of Queries: " + to_string(runningQueries.size()));
std::shared_ptr< PDBLogger > PDBLoggerPtr