A platform for high-performance distributed tool and library development written in C++. It can be deployed in two different cluster modes: standalone or distributed. API for v0.5.0, released on June 13, 2018.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
RandomPolicy.cc
Go to the documentation of this file.
1 /*****************************************************************************
2  * *
3  * Copyright 2018 Rice University *
4  * *
5  * Licensed under the Apache License, Version 2.0 (the "License"); *
6  * you may not use this file except in compliance with the License. *
7  * You may obtain a copy of the License at *
8  * *
9  * http://www.apache.org/licenses/LICENSE-2.0 *
10  * *
11  * Unless required by applicable law or agreed to in writing, software *
12  * distributed under the License is distributed on an "AS IS" BASIS, *
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. *
14  * See the License for the specific language governing permissions and *
15  * limitations under the License. *
16  * *
17  *****************************************************************************/
18 #ifndef OBJECTQUERYMODEL_RANDOMPOLICY_CC
19 #define OBJECTQUERYMODEL_RANDOMPOLICY_CC
20 
21 #include "PDBDebug.h"
22 #include "RandomPolicy.h"
23 
24 namespace pdb {
25 
27  this->storageNodes = std::vector<NodePartitionDataPtr>();
28  srand(SEED);
29 }
30 
32 
34  Handle<Vector<Handle<NodeDispatcherData>>> activeStorageNodesRaw) {
35 
36  auto oldNodes = storageNodes;
37  auto activeStorageNodes = createNodePartitionData(activeStorageNodesRaw);
38  storageNodes = std::vector<NodePartitionDataPtr>();
39 
40  for (int i = 0; i < activeStorageNodes.size(); i++) {
41  bool alreadyContains = false;
42  for (int j = 0; j < oldNodes.size(); j++) {
43  if ((*activeStorageNodes[i]) == (*oldNodes[j])) {
44  // Update the pre-existing node with the new information
45  auto updatedNode = updateExistingNode(activeStorageNodes[i], oldNodes[j]);
46  storageNodes.push_back(updatedNode);
47  oldNodes.erase(oldNodes.begin() + j);
48  alreadyContains = true;
49  break;
50  }
51  }
52  if (!alreadyContains) {
53  storageNodes.push_back(updateNewNode(activeStorageNodes[i]));
54  }
55  }
56  for (auto oldNode : oldNodes) {
57  handleDeadNode(oldNode);
58  }
59 }
60 
61 std::vector<NodePartitionDataPtr> RandomPolicy::createNodePartitionData(
62  Handle<Vector<Handle<NodeDispatcherData>>> storageNodes) {
63  std::vector<NodePartitionDataPtr> newData = std::vector<NodePartitionDataPtr>();
64  for (int i = 0; i < storageNodes->size(); i++) {
65  auto nodeData = (*storageNodes)[i];
66  auto newNode =
67  std::make_shared<NodePartitionData>(nodeData->getNodeId(),
68  nodeData->getPort(),
69  nodeData->getAddress(),
70  std::pair<std::string, std::string>("", ""));
71  PDB_COUT << newNode->toString() << std::endl;
72  newData.push_back(newNode);
73  }
74  return newData;
75 }
76 
78  NodePartitionDataPtr oldNode) {
79  PDB_COUT << "Updating existing node " << newNode->toString() << std::endl;
80  return oldNode;
81 }
82 
84  PDB_COUT << "Updating new node " << newNode->toString() << std::endl;
85  return newNode;
86 }
87 
89  PDB_COUT << "Deleting node " << deadNode->toString() << std::endl;
90  return deadNode;
91 }
92 
93 std::shared_ptr<std::unordered_map<NodeID, Handle<Vector<Handle<Object>>>>> RandomPolicy::partition(
94  Handle<Vector<Handle<Object>>> toPartition) {
95 
96  auto partitionedData =
97  std::make_shared<std::unordered_map<NodeID, Handle<Vector<Handle<Object>>>>>();
98  if (storageNodes.size() == 0) {
99  std::cout
100  << "FATAL ERROR: there is no storage node in the cluster, please check conf/serverlist"
101  << std::endl;
102  exit(-1);
103  }
104  int indexOfNodeToUse = rand() % storageNodes.size();
105  auto nodeToUse = storageNodes[indexOfNodeToUse];
106  partitionedData->insert(
107  std::pair<NodeID, Handle<Vector<Handle<Object>>>>(nodeToUse->getNodeId(), toPartition));
108  return partitionedData;
109 }
110 }
111 
112 #endif
NodePartitionDataPtr handleDeadNode(NodePartitionDataPtr deadNode)
Definition: RandomPolicy.cc:88
unsigned int NodeID
Definition: DataTypes.h:27
void updateStorageNodes(Handle< Vector< Handle< NodeDispatcherData >>> storageNodes)
Definition: RandomPolicy.cc:33
std::shared_ptr< NodePartitionData > NodePartitionDataPtr
NodePartitionDataPtr updateNewNode(NodePartitionDataPtr newNode)
Definition: RandomPolicy.cc:83
NodePartitionDataPtr updateExistingNode(NodePartitionDataPtr newNodeData, NodePartitionDataPtr oldNodeData)
Definition: RandomPolicy.cc:77
std::vector< NodePartitionDataPtr > storageNodes
#define PDB_COUT
Definition: PDBDebug.h:31
std::vector< NodePartitionDataPtr > createNodePartitionData(Handle< Vector< Handle< NodeDispatcherData >>> storageNodes)
Definition: RandomPolicy.cc:61
std::shared_ptr< std::unordered_map< NodeID, Handle< Vector< Handle< Object > > > > > partition(Handle< Vector< Handle< Object >>> toPartition)
Definition: RandomPolicy.cc:93