A platform for high-performance distributed tool and library development written in C++. It can be deployed in two different cluster modes: standalone or distributed. API for v0.5.0, released on June 13, 2018.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
DispatcherServer.h
Go to the documentation of this file.
1 /*****************************************************************************
2  * *
3  * Copyright 2018 Rice University *
4  * *
5  * Licensed under the Apache License, Version 2.0 (the "License"); *
6  * you may not use this file except in compliance with the License. *
7  * You may obtain a copy of the License at *
8  * *
9  * http://www.apache.org/licenses/LICENSE-2.0 *
10  * *
11  * Unless required by applicable law or agreed to in writing, software *
12  * distributed under the License is distributed on an "AS IS" BASIS, *
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. *
14  * See the License for the specific language governing permissions and *
15  * limitations under the License. *
16  * *
17  *****************************************************************************/
18 
19 #ifndef OBJECTQUERYMODEL_DISPATCHER_H
20 #define OBJECTQUERYMODEL_DISPATCHER_H
21 
22 #include "ServerFunctionality.h"
23 #include "PDBLogger.h"
24 #include "PDBWork.h"
25 #include "PartitionPolicy.h"
27 #include "PDBVector.h"
28 
29 #include "NodeDispatcherData.h"
30 #include "StorageClient.h"
31 #include "StatisticsDB.h"
32 
33 #include <string>
34 #include <queue>
35 #include <unordered_map>
36 #include <vector>
37 
38 
39 
40 namespace pdb {
41 
42 // The DispatcherServer partitions and then forwards a Vector of pdb::Objects received from a
43 // DispatcherClient to the proper storage servers
44 // So far, there are two dispatching policies are supported:
45 // -- Random Policy: the received Vector will be sent to any storage node determined randomly
46 // -- Round-Robin Policy: the first received Vector will be sent to the first storage node,
47 // and so on.
48 
49 
51 
52 public:
53  DispatcherServer(PDBLoggerPtr logger, std::shared_ptr<StatisticsDB> statisticsDB);
54 
56 
57  void initialize();
58 
63  void registerHandlers(PDBServer& forMe) override;
64 
71 
78  void registerSet(std::pair<std::string, std::string> setAndDatabase,
79  PartitionPolicyPtr partitionPolicy);
80 
89  bool dispatchData(std::pair<std::string, std::string> setAndDatabase,
90  std::string type,
91  Handle<Vector<Handle<Object>>> toDispatch);
92  bool dispatchBytes(std::pair<std::string, std::string> setAndDatabase,
93  std::string type,
94  char* bytes,
95  size_t numBytes);
96 
97 
99  pthread_mutex_lock(&mutex);
100  while (numRequestsInProcessing > 0) {
101  pthread_mutex_unlock(&mutex);
102  sleep(1);
103  pthread_mutex_lock(&mutex);
104  }
105  pthread_mutex_unlock(&mutex);
106  }
107 
108 private:
110  std::shared_ptr<StatisticsDB> statisticsDB;
112  std::map<std::pair<std::string, std::string>, PartitionPolicyPtr> partitionPolicies;
113 
118  bool validateTypes(const std::string& databaseName,
119  const std::string& setName,
120  const std::string& typeName,
121  std::string& errMsg);
122 
123  bool sendData(std::pair<std::string, std::string> setAndDatabase,
124  std::string type,
125  Handle<NodeDispatcherData> destination,
126  Handle<Vector<Handle<Object>>> toSend);
127 
128  bool sendBytes(std::pair<std::string, std::string> setAndDatabase,
129  std::string type,
130  Handle<NodeDispatcherData> destination,
131  char* bytes,
132  size_t numBytes);
133 
136  pthread_mutex_t mutex;
137 };
138 }
139 
140 
141 #endif // OBJECTQUERYMODEL_DISPATCHER_H
Handle< Vector< Handle< NodeDispatcherData > > > storageNodes
bool sendBytes(std::pair< std::string, std::string > setAndDatabase, std::string type, Handle< NodeDispatcherData > destination, char *bytes, size_t numBytes)
unsigned int NodeID
Definition: DataTypes.h:27
void registerSet(std::pair< std::string, std::string > setAndDatabase, PartitionPolicyPtr partitionPolicy)
bool dispatchData(std::pair< std::string, std::string > setAndDatabase, std::string type, Handle< Vector< Handle< Object >>> toDispatch)
void registerStorageNodes(Handle< Vector< Handle< NodeDispatcherData >>> storageNodes)
std::shared_ptr< PartitionPolicy > PartitionPolicyPtr
void registerHandlers(PDBServer &forMe) override
DispatcherServer(PDBLoggerPtr logger, std::shared_ptr< StatisticsDB > statisticsDB)
std::shared_ptr< StatisticsDB > statisticsDB
bool validateTypes(const std::string &databaseName, const std::string &setName, const std::string &typeName, std::string &errMsg)
std::shared_ptr< PDBLogger > PDBLoggerPtr
Definition: PDBLogger.h:40
std::map< std::pair< std::string, std::string >, PartitionPolicyPtr > partitionPolicies
bool sendData(std::pair< std::string, std::string > setAndDatabase, std::string type, Handle< NodeDispatcherData > destination, Handle< Vector< Handle< Object >>> toSend)
Handle< NodeDispatcherData > findNode(NodeID nodeId)
bool dispatchBytes(std::pair< std::string, std::string > setAndDatabase, std::string type, char *bytes, size_t numBytes)