A platform for high-performance distributed tool and library development written in C++. It can be deployed in two different cluster modes: standalone or distributed. API for v0.5.0, released on June 13, 2018.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
RoundRobinPolicy.cc
Go to the documentation of this file.
1 /*****************************************************************************
2  * *
3  * Copyright 2018 Rice University *
4  * *
5  * Licensed under the Apache License, Version 2.0 (the "License"); *
6  * you may not use this file except in compliance with the License. *
7  * You may obtain a copy of the License at *
8  * *
9  * http://www.apache.org/licenses/LICENSE-2.0 *
10  * *
11  * Unless required by applicable law or agreed to in writing, software *
12  * distributed under the License is distributed on an "AS IS" BASIS, *
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. *
14  * See the License for the specific language governing permissions and *
15  * limitations under the License. *
16  * *
17  *****************************************************************************/
18 #ifndef OBJECTQUERYMODEL_ROUNDROBINPOLICY_CC
19 #define OBJECTQUERYMODEL_ROUNDROBINPOLICY_CC
20 
21 #include "PDBDebug.h"
22 #include "RoundRobinPolicy.h"
23 
24 namespace pdb {
25 
27  this->storageNodes = std::vector<NodePartitionDataPtr>();
28  pthread_mutex_init(&idMutex, nullptr);
29 }
30 
32 
33 unsigned int RoundRobinPolicy::curNodeId = 0;
34 
36  Handle<Vector<Handle<NodeDispatcherData>>> activeStorageNodesRaw) {
37 
38  auto oldNodes = storageNodes;
39  auto activeStorageNodes = createNodePartitionData(activeStorageNodesRaw);
40  storageNodes = std::vector<NodePartitionDataPtr>();
41 
42  for (int i = 0; i < activeStorageNodes.size(); i++) {
43  bool alreadyContains = false;
44  for (int j = 0; j < oldNodes.size(); j++) {
45  if ((*activeStorageNodes[i]) == (*oldNodes[j])) {
46  // Update the pre-existing node with the new information
47  auto updatedNode = updateExistingNode(activeStorageNodes[i], oldNodes[j]);
48  storageNodes.push_back(updatedNode);
49  oldNodes.erase(oldNodes.begin() + j);
50  alreadyContains = true;
51  break;
52  }
53  }
54  if (!alreadyContains) {
55  storageNodes.push_back(updateNewNode(activeStorageNodes[i]));
56  }
57  }
58  for (auto oldNode : oldNodes) {
59  handleDeadNode(oldNode);
60  }
61  this->numNodes = storageNodes.size();
62 }
63 
64 std::vector<NodePartitionDataPtr> RoundRobinPolicy::createNodePartitionData(
65  Handle<Vector<Handle<NodeDispatcherData>>> storageNodes) {
66  std::vector<NodePartitionDataPtr> newData = std::vector<NodePartitionDataPtr>();
67  for (int i = 0; i < storageNodes->size(); i++) {
68  auto nodeData = (*storageNodes)[i];
69  auto newNode =
70  std::make_shared<NodePartitionData>(nodeData->getNodeId(),
71  nodeData->getPort(),
72  nodeData->getAddress(),
73  std::pair<std::string, std::string>("", ""));
74  PDB_COUT << newNode->toString() << std::endl;
75  newData.push_back(newNode);
76  }
77  return newData;
78 }
79 
81  NodePartitionDataPtr oldNode) {
82  PDB_COUT << "Updating existing node " << newNode->toString() << std::endl;
83  return oldNode;
84 }
85 
87  PDB_COUT << "Updating new node " << newNode->toString() << std::endl;
88  return newNode;
89 }
90 
92  PDB_COUT << "Deleting node " << deadNode->toString() << std::endl;
93  return deadNode;
94 }
95 
96 std::shared_ptr<std::unordered_map<NodeID, Handle<Vector<Handle<Object>>>>>
98 
99  auto partitionedData =
100  std::make_shared<std::unordered_map<NodeID, Handle<Vector<Handle<Object>>>>>();
101  if (storageNodes.size() == 0) {
102  std::cout
103  << "FATAL ERROR: there is no storage node in the cluster, please check conf/serverlist"
104  << std::endl;
105  exit(-1);
106  }
107  pthread_mutex_lock(&idMutex);
108  curNodeId = (curNodeId + 1) % numNodes;
109  auto nodeToUse = storageNodes[curNodeId];
110  pthread_mutex_unlock(&idMutex);
111  partitionedData->insert(
112  std::pair<NodeID, Handle<Vector<Handle<Object>>>>(nodeToUse->getNodeId(), toPartition));
113  return partitionedData;
114 }
115 }
116 
117 #endif
std::shared_ptr< std::unordered_map< NodeID, Handle< Vector< Handle< Object > > > > > partition(Handle< Vector< Handle< Object >>> toPartition)
std::vector< NodePartitionDataPtr > createNodePartitionData(Handle< Vector< Handle< NodeDispatcherData >>> storageNodes)
unsigned int NodeID
Definition: DataTypes.h:27
static unsigned int curNodeId
std::shared_ptr< NodePartitionData > NodePartitionDataPtr
void updateStorageNodes(Handle< Vector< Handle< NodeDispatcherData >>> storageNodes)
NodePartitionDataPtr updateNewNode(NodePartitionDataPtr newNode)
std::vector< NodePartitionDataPtr > storageNodes
#define PDB_COUT
Definition: PDBDebug.h:31
NodePartitionDataPtr handleDeadNode(NodePartitionDataPtr deadNode)
pthread_mutex_t idMutex
NodePartitionDataPtr updateExistingNode(NodePartitionDataPtr newNodeData, NodePartitionDataPtr oldNodeData)