A platform for high-performance distributed tool and library development written in C++. It can be deployed in two different cluster modes: standalone or distributed. API for v0.5.0, released on June 13, 2018.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
QueryClient.h
Go to the documentation of this file.
1 /*****************************************************************************
2  * *
3  * Copyright 2018 Rice University *
4  * *
5  * Licensed under the Apache License, Version 2.0 (the "License"); *
6  * you may not use this file except in compliance with the License. *
7  * You may obtain a copy of the License at *
8  * *
9  * http://www.apache.org/licenses/LICENSE-2.0 *
10  * *
11  * Unless required by applicable law or agreed to in writing, software *
12  * distributed under the License is distributed on an "AS IS" BASIS, *
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. *
14  * See the License for the specific language governing permissions and *
15  * limitations under the License. *
16  * *
17  *****************************************************************************/
18 
19 #ifndef QUERY_CLIENT
20 #define QUERY_CLIENT
21 
22 #include "Set.h"
23 #include "SetIterator.h"
24 #include "Handle.h"
25 #include "PDBLogger.h"
26 #include "PDBVector.h"
27 #include "CatalogClient.h"
28 #include "DeleteSet.h"
29 #include "ExecuteQuery.h"
30 #include "RegisterReplica.h"
31 #include "TupleSetExecuteQuery.h"
32 #include "ExecuteComputation.h"
33 #include "QueryGraphAnalyzer.h"
34 #include "Computation.h"
35 namespace pdb {
36 
37 class QueryClient {
38 
39 public:
41 
42  // connect to the database
43  QueryClient(int portIn,
44  std::string addressIn,
45  PDBLoggerPtr myLoggerIn,
46  bool useScheduler = false)
47  : myHelper(portIn, addressIn, myLoggerIn) {
48  port = portIn;
49  address = addressIn;
50  myLogger = myLoggerIn;
51  runUs = makeObject<Vector<Handle<QueryBase>>>();
52  queryGraph = makeObject<Vector<Handle<Computation>>>();
53  this->useScheduler = useScheduler;
54  }
55 
57  runUs = nullptr;
58  queryGraph = nullptr;
59  }
60 
61  // access a set in the database
62  template <class Type>
63  Handle<Set<Type>> getSet(std::string databaseName, std::string setName) {
64 
65 // verify that the database and set work
66 #ifdef DEBUG_SET_TYPE
67  std::string errMsg;
68 
69  std::string typeName = myHelper.getObjectType(databaseName, setName, errMsg);
70 
71  if (typeName == "") {
72  std::cout << "I was not able to obtain the type for database set " << setName << "\n";
73  myLogger->error("query client: not able to verify type: " + errMsg);
74  Handle<Set<Type>> returnVal = makeObject<Set<Type>>(false);
75  return returnVal;
76  }
77 
78  if (typeName != getTypeName<Type>()) {
79  std::cout << "Wrong type for database set " << setName << "\n";
80  Handle<Set<Type>> returnVal = makeObject<Set<Type>>(false);
81  return returnVal;
82  }
83 #endif
84  Handle<Set<Type>> returnVal = makeObject<Set<Type>>(databaseName, setName);
85  return returnVal;
86  }
87 
88  // get an iterator for a set in the database
89  template <class Type>
90  SetIterator<Type> getSetIterator(std::string databaseName, std::string setName) {
91 
92 // verify that the database and set work
93 #ifdef DEBUG_SET_TYPE
94  std::string errMsg;
95  std::string typeName = myHelper.getObjectType(databaseName, setName, errMsg);
96 
97  if (typeName == "") {
98  myLogger->error("query client: not able to verify type: " + errMsg);
99  SetIterator<Type> returnVal;
100  return returnVal;
101  }
102 #endif
103  // commented by Jia, below type check can not work with complex types such as
104  // Vector<Handle<Foo>>
105  /*
106 if (typeName != getTypeName <Type> ()) {
107  std :: cout << "Wrong type for database set " << setName << "\n";
108  SetIterator <Type> returnVal;
109  return returnVal;
110 }
111  */
112  SetIterator<Type> returnVal(myLogger, port, address, databaseName, setName);
113  return returnVal;
114  }
115 
116  bool deleteSet(std::string databaseName, std::string setName) {
117  // this is for query testing stuff
118  return simpleRequest<DeleteSet, SimpleRequestResult, bool, String, String>(
119  myLogger,
120  port,
121  address,
122  false,
123  124 * 1024,
124  [&](Handle<SimpleRequestResult> result) {
125  std::string errMsg;
126  if (result != nullptr) {
127 
128  // make sure we got the correct number of results
129  if (!result->getRes().first) {
130  errMsg = "Could not remove set: " + result->getRes().second;
131  myLogger->error("QueryErr: " + errMsg);
132  return false;
133  }
134 
135  return true;
136  }
137  errMsg = "Error getting type name: got nothing back from catalog";
138  return false;
139  },
140  databaseName,
141  setName);
142  }
143 
144 
145  //to set query graph
147  this->queryGraph->push_back(querySink);
148  std::cout << "query graph size = " << this->queryGraph->size() << std::endl;
149  }
150 
151 
152 
153  //to return TCAP string
154  std::string getTCAP (std::vector<Handle<Computation>> & computations) {
155 
156  QueryGraphAnalyzer queryAnalyzer(this->queryGraph);
157  std::string tcapString = queryAnalyzer.parseTCAPString();
158  queryAnalyzer.parseComputations(computations);
159  return tcapString;
160  }
161 
162  //to register a replica with statisticsDB
163  bool registerReplica(std::pair<std::string, std::string> inputDatabaseAndSet,
164  std::pair<std::string, std::string> outputDatabaseAndSet,
165  int numPartitions,
166  int numNodes,
167  std::string type,
168  std::string tcap,
169  std::vector<Handle<Computation>> computations) {
170  std::string errMsg;
171  std::cout << "to register Replica at query cient: " << computations.size() << " computations" << std::endl;
172  return simpleRequest<RegisterReplica, SimpleRequestResult, bool>(
173  myLogger,
174  port,
175  address,
176  false,
177  4 * 1024 * 1024,
178  [&](Handle<SimpleRequestResult> result) {
179  if (result != nullptr) {
180  if (!result->getRes().first) {
181  errMsg = "Error in query: " + result->getRes().second;
182  myLogger->error("Error querying data: " + result->getRes().second);
183  return false;
184  }
185  return true;
186  }
187  errMsg = "Error getting type name: got nothing back from server";
188  return false;
189  },
190  inputDatabaseAndSet,
191  outputDatabaseAndSet,
192  numPartitions,
193  numNodes,
194  type,
195  tcap,
196  computations);
197 
198  }
199 
200 
201 
202  //to execute computations
203  template <class... Types>
204  bool executeComputations(std::string& errMsg,
205  Handle<Computation> firstParam,
206  Handle<Types>... args) {
207  queryGraph->push_back(firstParam);
208  return executeComputations(errMsg, args...);
209  }
210 
211  bool executeComputations(std::string& errMsg) {
212 
213  // this is the request
214  const UseTemporaryAllocationBlock myBlock{256 * 1024 * 1024};
215  std::vector<Handle<Computation>> computations;
216  std::string tcapString = getTCAP(computations);
217  return executeComputations (errMsg,
218  tcapString,
219  computations);
220  }
221 
222  bool executeComputations(std::string& errMsg,
223  std::string tcapString,
224  std::vector<Handle<Computation>> computations) {
225 
226  Handle<Vector<Handle<Computation>>> computationsToSend =
227  makeObject<Vector<Handle<Computation>>>();
228  for (size_t i = 0; i < computations.size(); i++) {
229  computationsToSend->push_back(computations[i]);
230  }
231  Handle<ExecuteComputation> executeComputation = makeObject<ExecuteComputation>(tcapString);
232 
233  // this call asks the database to execute the query, and then it inserts the result set name
234  // within each of the results, as well as the database connection information
235 
236  // this is for query scheduling stuff
237  if (useScheduler == true) {
241  bool>(
242  myLogger,
243  port,
244  address,
245  false,
246  124 * 1024,
247  [&](Handle<SimpleRequestResult> result) {
248  if (result != nullptr) {
249  if (!result->getRes().first) {
250  errMsg = "Error in query: " + result->getRes().second;
251  myLogger->error("Error querying data: " + result->getRes().second);
252  return false;
253  }
254  this->queryGraph = makeObject<Vector<Handle<Computation>>>();
255  return true;
256  }
257  errMsg = "Error getting type name: got nothing back from server";
258  this->queryGraph = makeObject<Vector<Handle<Computation>>>();
259  return false;
260 
261 
262  },
263  executeComputation,
264  computationsToSend);
265 
266 
267  } else {
268  errMsg =
269  "This query must be sent to QuerySchedulerServer, but it seems "
270  "QuerySchedulerServer is not supported";
271  this->queryGraph = makeObject<Vector<Handle<Computation>>>();
272  return false;
273  }
274  this->queryGraph = makeObject<Vector<Handle<Computation>>>();
275  }
276 
278  this->useScheduler = useScheduler;
279  }
280 
281 private:
282  // how we connect to the catalog
284 
285  // deprecated
286  // this is the query graph we'll execute
288 
289  // JiaNote: the Computation-based query graph to execute
291 
292 
293  // connection info
294  int port;
295  std::string address;
296 
297  // for logging
299 
300  // JiaNote: whether to run in distributed mode
302 };
303 }
304 
305 #endif
Handle< Vector< Handle< QueryBase > > > runUs
Definition: QueryClient.h:287
Handle< Vector< Handle< Computation > > > queryGraph
Definition: QueryClient.h:290
void setQueryGraph(Handle< Computation > querySink)
Definition: QueryClient.h:146
bool registerReplica(std::pair< std::string, std::string > inputDatabaseAndSet, std::pair< std::string, std::string > outputDatabaseAndSet, int numPartitions, int numNodes, std::string type, std::string tcap, std::vector< Handle< Computation >> computations)
Definition: QueryClient.h:163
void setUseScheduler(bool useScheduler)
Definition: QueryClient.h:277
std::string getObjectType(std::string databaseName, std::string setName, std::string &errMsg)
bool executeComputations(std::string &errMsg, Handle< Computation > firstParam, Handle< Types >...args)
Definition: QueryClient.h:204
bool executeComputations(std::string &errMsg)
Definition: QueryClient.h:211
QueryClient(int portIn, std::string addressIn, PDBLoggerPtr myLoggerIn, bool useScheduler=false)
Definition: QueryClient.h:43
CatalogClient myHelper
Definition: QueryClient.h:283
std::string address
Definition: QueryClient.h:295
std::string getTCAP(std::vector< Handle< Computation >> &computations)
Definition: QueryClient.h:154
bool deleteSet(std::string databaseName, std::string setName)
Definition: QueryClient.h:116
void parseComputations(std::vector< Handle< Computation >> &computations, Handle< Computation > sink)
ReturnType simpleDoubleRequest(PDBLoggerPtr myLogger, int port, std::string address, ReturnType onErr, size_t bytesForRequest, function< ReturnType(Handle< ResponseType >)> processResponse, Handle< RequestType > &firstRequest, Handle< SecondRequestType > &secondRequest)
std::shared_ptr< PDBLogger > PDBLoggerPtr
Definition: PDBLogger.h:40
PDBLoggerPtr myLogger
Definition: QueryClient.h:298
bool executeComputations(std::string &errMsg, std::string tcapString, std::vector< Handle< Computation >> computations)
Definition: QueryClient.h:222
SetIterator< Type > getSetIterator(std::string databaseName, std::string setName)
Definition: QueryClient.h:90
Handle< Set< Type > > getSet(std::string databaseName, std::string setName)
Definition: QueryClient.h:63