19 #ifndef DISPATCHER_SERVER_CC
20 #define DISPATCHER_SERVER_CC
29 #include "BuiltInObjectTypeIDs.h"
35 #define MAX_CONCURRENT_REQUESTS 10
42 this->
storageNodes = pdb::makeObject<Vector<Handle<NodeDispatcherData>>>();
44 pthread_mutex_init(&
mutex,
nullptr);
51 pthread_mutex_destroy(&
mutex);
56 DispatcherAddData_TYPEID,
59 pthread_mutex_lock(&
mutex);
61 pthread_mutex_unlock(&
mutex);
63 pthread_mutex_lock(&
mutex);
66 pthread_mutex_unlock(&
mutex);
69 PDB_COUT <<
"DispatcherAddData handler running" << std::endl;
71 size_t numBytes = sendUsingMe->getSizeOfNextObject();
72 std::cout <<
"Dispacher received numBytes = " << numBytes << std::endl;
74 char* tempPage =
nullptr;
75 char* readToHere =
nullptr;
76 if (request->isShallowCopy() ==
false) {
80 #ifdef ENABLE_COMPRESSION
81 tempPage =
new char[numBytes];
82 sendUsingMe->receiveBytes(tempPage, errMsg);
84 readToHere = malloc(numBytes);
85 sendUsingMe->receiveBytes(readToHere, errMsg);
88 #ifdef ENABLE_COMPRESSION
89 size_t uncompressedSize = 0;
90 snappy::GetUncompressedLength(tempPage, numBytes, &uncompressedSize);
91 readToHere = (
char*)malloc(uncompressedSize);
92 snappy::RawUncompress(tempPage, numBytes, (
char*)(readToHere));
98 if (dataToSend->size() == 0) {
99 errMsg =
"Warning: client attemps to store zero object vector";
101 makeObject<SimpleRequestResult>(
false, errMsg);
102 res = sendUsingMe->sendObject(response, errMsg);
103 std::cout << errMsg << std::endl;
104 return make_pair(
false, errMsg);
107 std::cout <<
"Dispatch to send vector size = " << dataToSend->size() << std::endl;
111 request->getSetName(),
112 request->getTypeName(),
115 makeObject<SimpleRequestResult>(
false, errMsg);
116 res = sendUsingMe->sendObject(response, errMsg);
117 std::cout << errMsg << std::endl;
118 return make_pair(
false, errMsg);
121 res = sendUsingMe->sendObject(response, errMsg);
123 if (request->isShallowCopy() ==
false) {
124 dispatchData(std::pair<std::string, std::string>(request->getSetName(),
125 request->getDatabaseName()),
126 request->getTypeName(),
130 #ifdef ENABLE_COMPRESSION
131 dispatchBytes(std::pair<std::string, std::string>(request->getSetName(),
132 request->getDatabaseName()),
133 request->getTypeName(),
138 dispatchBytes(std::pair<std::string, std::string>(request->getSetName(),
139 request->getDatabaseName()),
140 request->getTypeName(),
148 pthread_mutex_lock(&
mutex);
149 StatisticsPtr stats = getFunctionality<QuerySchedulerServer>().getStats();
150 if (stats ==
nullptr) {
151 getFunctionality<QuerySchedulerServer>().collectStats();
152 stats = getFunctionality<QuerySchedulerServer>().getStats();
155 stats->getNumBytes(request->getDatabaseName(), request->getSetName());
156 size_t newNumBytes = oldNumBytes + numBytes;
157 stats->setNumBytes(request->getDatabaseName(), request->getSetName(), newNumBytes);
159 pthread_mutex_unlock(&
mutex);
160 return make_pair(res, errMsg);
164 DispatcherRegisterPartitionPolicy_TYPEID,
168 PDB_COUT <<
"Registering partition policy for set " << request->getSetName() <<
":"
169 << request->getDatabaseName() << std::endl;
174 registerSet(std::pair<std::string, std::string>(request->getSetName(),
175 request->getDatabaseName()),
179 res = sendUsingMe->sendObject(response, errMsg);
181 return make_pair(res, errMsg);
189 auto node = (*storageNodes)[i];
190 PDB_COUT <<
"Dispatcher register node: " << node->getAddress() <<
" : " << node->getPort()
195 partitionPolicy.second->updateStorageNodes(
storageNodes);
202 PDB_COUT <<
"Updating old set" << setAndDatabase.first <<
":" << setAndDatabase.second
205 PDB_COUT <<
"Found new set: " << setAndDatabase.first <<
":" << setAndDatabase.second
209 setAndDatabase, partitionPolicy));
220 PDB_COUT <<
"No partition policy was found for set: " << setAndDatabase.first <<
":"
221 << setAndDatabase.second << std::endl;
222 PDB_COUT <<
"Defaulting to random policy" << std::endl;
226 auto mappedPartitions =
partitionPolicies[setAndDatabase]->partition(toDispatch);
227 PDB_COUT <<
"mappedPartitions size = " << mappedPartitions->size() << std::endl;
228 for (
auto const& pair : (*mappedPartitions)) {
245 PDB_COUT <<
"No partition policy was found for set: " << setAndDatabase.first <<
":"
246 << setAndDatabase.second << std::endl;
247 PDB_COUT <<
"Defaulting to random policy" << std::endl;
252 PDB_COUT <<
"mappedPartitions size = " << mappedPartitions->size() << std::endl;
253 for (
auto const& pair : (*mappedPartitions)) {
264 const std::string& setName,
265 const std::string& typeName,
266 std::string& errMsg) {
267 PDB_COUT <<
"running validateTypes with typeName" << typeName << std::endl;
302 PDB_COUT <<
"Sending data to " << destination->getPort() <<
" : " << destination->getAddress()
307 if (!storageClient.
storeData(toSend, setAndDatabase.second, setAndDatabase.first, type, err)) {
308 PDB_COUT <<
"Not able to store data: " << err << std::endl;
319 #ifndef ENABLE_COMPRESSION
320 std::cout <<
"Now only objects or compressed bytes can be dispatched!!" << std::endl;
322 int port = destination->getPort();
323 std::string address = destination->getAddress();
324 std::string databaseName = setAndDatabase.second;
325 std::string setName = setAndDatabase.first;
327 std::cout <<
"store compressed bytes to address=" << address <<
" and port=" << port
328 <<
", with compressed byte size = " << numBytes <<
" to database=" << databaseName
329 <<
" and set=" << setName <<
" and type = IntermediateData" << std::endl;
330 return simpleSendBytesRequest<StorageAddData, SimpleRequestResult, bool>(
337 if (result !=
nullptr)
338 if (!result->getRes().first) {
339 logger->error(
"Error sending data: " + result->getRes().second);
340 errMsg =
"Error sending data: " + result->getRes().second;
358 auto storageNode = (*storageNodes)[i];
359 if (storageNode->getNodeId() == nodeId) {
Handle< Vector< Handle< NodeDispatcherData > > > storageNodes
bool sendBytes(std::pair< std::string, std::string > setAndDatabase, std::string type, Handle< NodeDispatcherData > destination, char *bytes, size_t numBytes)
std::shared_ptr< Statistics > StatisticsPtr
Handle< ObjType > getRootObject()
void registerSet(std::pair< std::string, std::string > setAndDatabase, PartitionPolicyPtr partitionPolicy)
bool dispatchData(std::pair< std::string, std::string > setAndDatabase, std::string type, Handle< Vector< Handle< Object >>> toDispatch)
static PartitionPolicyPtr buildPartitionPolicy(PartitionPolicy::Policy policy)
std::shared_ptr< PDBCommunicator > PDBCommunicatorPtr
void registerStorageNodes(Handle< Vector< Handle< NodeDispatcherData >>> storageNodes)
std::shared_ptr< PartitionPolicy > PartitionPolicyPtr
void registerHandlers(PDBServer &forMe) override
DispatcherServer(PDBLoggerPtr logger, std::shared_ptr< StatisticsDB > statisticsDB)
void registerHandler(int16_t typeID, PDBCommWorkPtr handledBy)
std::shared_ptr< StatisticsDB > statisticsDB
bool validateTypes(const std::string &databaseName, const std::string &setName, const std::string &typeName, std::string &errMsg)
int numRequestsInProcessing
std::shared_ptr< PDBLogger > PDBLoggerPtr
static PartitionPolicyPtr buildDefaultPartitionPolicy()
std::map< std::pair< std::string, std::string >, PartitionPolicyPtr > partitionPolicies
bool sendData(std::pair< std::string, std::string > setAndDatabase, std::string type, Handle< NodeDispatcherData > destination, Handle< Vector< Handle< Object >>> toSend)
bool storeData(Handle< Vector< Handle< DataType >>> data, std::string databaseName, std::string setName, std::string &errMsg, bool typeCheck=true)
#define MAX_CONCURRENT_REQUESTS
Handle< NodeDispatcherData > findNode(NodeID nodeId)
bool dispatchBytes(std::pair< std::string, std::string > setAndDatabase, std::string type, char *bytes, size_t numBytes)