A platform for high-performance distributed tool and library development written in C++. It can be deployed in two different cluster modes: standalone or distributed. API for v0.5.0, released on June 13, 2018.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
FrontendQueryTestServer.cc
Go to the documentation of this file.
1 /*****************************************************************************
2  * *
3  * Copyright 2018 Rice University *
4  * *
5  * Licensed under the Apache License, Version 2.0 (the "License"); *
6  * you may not use this file except in compliance with the License. *
7  * You may obtain a copy of the License at *
8  * *
9  * http://www.apache.org/licenses/LICENSE-2.0 *
10  * *
11  * Unless required by applicable law or agreed to in writing, software *
12  * distributed under the License is distributed on an "AS IS" BASIS, *
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. *
14  * See the License for the specific language governing permissions and *
15  * limitations under the License. *
16  * *
17  *****************************************************************************/
18 #ifndef FRONTEND_SERVER_CC
19 #define FRONTEND_SERVER_CC
20 
21 #include <cstddef>
22 #include <iostream>
23 #include <fstream>
24 #include <vector>
25 #include <cstring>
26 #include <unistd.h>
27 #include <sys/types.h>
28 #include <sys/stat.h>
29 #include <fcntl.h>
30 #include <string>
31 #include "PDBDebug.h"
33 #include "SimpleRequestHandler.h"
34 #include "BuiltInObjectTypeIDs.h"
35 #include "SimpleRequestResult.h"
36 #include "QueryBase.h"
37 #include "ExecuteQuery.h"
38 #include "InterfaceFunctions.h"
39 #include "GenericWork.h"
40 #include "DeleteSet.h"
41 #include "CatalogServer.h"
42 #include "SetScan.h"
43 #include "KeepGoing.h"
44 #include "DoneWithResult.h"
45 #include "PangeaStorageServer.h"
46 #include "TupleSetJobStage.h"
47 #include "AggregationJobStage.h"
50 #include <snappy.h>
51 
52 namespace pdb {
53 
55 
56  isStandalone = true;
57  createOutputSet = true;
58 }
59 
60 FrontendQueryTestServer::FrontendQueryTestServer(bool isStandalone, bool createOutputSet) {
61 
62  this->isStandalone = isStandalone;
63  this->createOutputSet = createOutputSet;
64 }
65 
67 
69 
70  // to handle a request to execute a job stage for building hash tables for hash partition join
71  forMe.registerHandler(
72  HashPartitionedJoinBuildHTJobStage_TYPEID,
75  std::string errMsg;
76  bool success;
77  PDB_COUT << "Frontend got a request for HashPartitionedJoinBuildHTJobStage"
78  << std::endl;
79  request->print();
80 #ifdef EANBLE_LARGE_GRAPH
81  makeObjectAllocatorBlock(256 * 1024 * 1024, true);
82 #else
83  makeObjectAllocatorBlock(32 * 1024 * 1024, true);
84 #endif
85 #ifdef PROFILING
86  std::string out = getAllocator().printInactiveBlocks();
87  std::cout << "HashPartitionedJoinBuildHTJobStage: print inactive blocks:" << std::endl;
88  std::cout << out << std::endl;
89 #endif
90  PDBCommunicatorPtr communicatorToBackend = make_shared<PDBCommunicator>();
91  if (communicatorToBackend->connectToLocalServer(
92  getFunctionality<PangeaStorageServer>().getLogger(),
93  getFunctionality<PangeaStorageServer>().getPathToBackEndServer(),
94  errMsg)) {
95  std::cout << errMsg << std::endl;
96  return std::make_pair(false, errMsg);
97  }
98  PDB_COUT << "Frontend connected to backend" << std::endl;
99 
101  deepCopyToCurrentAllocationBlock<HashPartitionedJoinBuildHTJobStage>(request);
102  PDB_COUT << "Created HashPartitionedJoinBuildHTJobStage object for forwarding"
103  << std::endl;
104 
105  // check input set
106  // input set
107  // restructure the input information
108  std::string inDatabaseName = request->getSourceContext()->getDatabase();
109  std::string inSetName = request->getSourceContext()->getSetName();
110  Handle<SetIdentifier> sourceContext =
111  makeObject<SetIdentifier>(inDatabaseName, inSetName);
112  PDB_COUT << "Created SetIdentifier object for input" << std::endl;
113  SetPtr inputSet = getFunctionality<PangeaStorageServer>().getSet(
114  std::pair<std::string, std::string>(inDatabaseName, inSetName));
115  if (inputSet == nullptr) {
116  PDB_COUT << "FrontendQueryTestServer: input set doesn't exist in this machine"
117  << std::endl;
118  // TODO: move data from other servers
119  // temporarily, we simply return;
120  // now, we send back the result
121  Handle<SetIdentifier> result = makeObject<SetIdentifier>(inDatabaseName, inSetName);
122  result->setNumPages(0);
123  result->setPageSize(0);
124  PDB_COUT << "Query is done without data. " << std::endl;
125  // return the results
126  if (!sendUsingMe->sendObject(result, errMsg)) {
127  return std::make_pair(false, errMsg);
128  }
129  return std::make_pair(true, std::string("execution complete"));
130  } else {
131  inputSet->unpinBufferPage();
132  getFunctionality<PangeaStorageServer>().cleanup();
133  }
134  sourceContext->setDatabaseId(inputSet->getDbID());
135  sourceContext->setTypeId(inputSet->getTypeID());
136  sourceContext->setSetId(inputSet->getSetID());
137  sourceContext->setPageSize(inputSet->getPageSize());
138  newRequest->setSourceContext(sourceContext);
139  std::cout << "HashPartitioned data set size: " << inputSet->getNumPages() << " pages"
140  << std::endl;
141  newRequest->setNumPages(inputSet->getNumPages());
142  std::cout << "Input is set with setName=" << inSetName
143  << ", setId=" << inputSet->getSetID() << std::endl;
144 
145 
146  // forward the request
147  newRequest->print();
148 
149  if (inputSet->getNumPages() == 0) {
150  std::cout << "WARNING: repartitioned data size is 0" << std::endl;
151  }
152 
153  if (!communicatorToBackend->sendObject(newRequest, errMsg)) {
154  std::cout << errMsg << std::endl;
155  errMsg = std::string("can't send message to backend: ") + errMsg;
156  success = false;
157  } else {
158  PDB_COUT << "Frontend sent request to backend" << std::endl;
159  // wait for backend to finish.
160  communicatorToBackend->getNextObject<SimpleRequestResult>(success, errMsg);
161  if (!success) {
162  std::cout << "Error waiting for backend to finish this job stage. " << errMsg
163  << std::endl;
164  errMsg = std::string("backend failure: ") + errMsg;
165  }
166  }
167 
168  // forward result
169  // now, we send back the result
170  Handle<SetIdentifier> result = makeObject<SetIdentifier>(inDatabaseName, inSetName);
171  result->setNumPages(inputSet->getNumPages());
172  result->setPageSize(inputSet->getPageSize());
173  if (success == true) {
174  PDB_COUT << "Stage is done. " << std::endl;
175  errMsg = std::string("execution complete");
176  } else {
177  std::cout << "Stage failed at server" << std::endl;
178  }
179  // return the results
180  if (!sendUsingMe->sendObject(result, errMsg)) {
181  return std::make_pair(false, errMsg);
182  }
183  if (success == false) {
184  // TODO:restart backend
185  }
186  return std::make_pair(success, errMsg);
187  }));
188 
189  // to handle a request to execute a job stage for building hash table for broadcast join
190  forMe.registerHandler(
191  BroadcastJoinBuildHTJobStage_TYPEID,
194 
195  std::string errMsg;
196  bool success;
197  PDB_COUT << "Frontend got a request for BroadcastJoinBuildHTJobStage" << std::endl;
198  request->print();
199 #ifdef ENABLE_LARGE_GRAPH
200  makeObjectAllocatorBlock(256 * 1024 * 1024, true);
201 #else
202  makeObjectAllocatorBlock(32 * 1024 * 1024, true);
203 #endif
204 #ifdef PROFILING
205  std::string out = getAllocator().printInactiveBlocks();
206  std::cout << "BroadcastJoinBuildHTJobStage: print inactive blocks:" << std::endl;
207  std::cout << out << std::endl;
208 #endif
209  PDBCommunicatorPtr communicatorToBackend = make_shared<PDBCommunicator>();
210  if (communicatorToBackend->connectToLocalServer(
211  getFunctionality<PangeaStorageServer>().getLogger(),
212  getFunctionality<PangeaStorageServer>().getPathToBackEndServer(),
213  errMsg)) {
214  std::cout << errMsg << std::endl;
215  return std::make_pair(false, errMsg);
216  }
217  PDB_COUT << "Frontend connected to backend" << std::endl;
218 
220  deepCopyToCurrentAllocationBlock<BroadcastJoinBuildHTJobStage>(request);
221  PDB_COUT << "Created BroadcastJoinBuildHTJobStage object for forwarding" << std::endl;
222 
223  // check input set
224  // input set
225  // restructure the input information
226  std::string inDatabaseName = request->getSourceContext()->getDatabase();
227  std::string inSetName = request->getSourceContext()->getSetName();
228  Handle<SetIdentifier> sourceContext =
229  makeObject<SetIdentifier>(inDatabaseName, inSetName);
230  PDB_COUT << "Created SetIdentifier object for input" << std::endl;
231  SetPtr inputSet = getFunctionality<PangeaStorageServer>().getSet(
232  std::pair<std::string, std::string>(inDatabaseName, inSetName));
233  if (inputSet == nullptr) {
234  PDB_COUT << "FrontendQueryTestServer: input set doesn't exist in this machine"
235  << std::endl;
236  // TODO: move data from other servers
237  // temporarily, we simply return;
238  // now, we send back the result
239  Handle<SetIdentifier> result = makeObject<SetIdentifier>(inDatabaseName, inSetName);
240  result->setNumPages(0);
241  result->setPageSize(0);
242  PDB_COUT << "Query is done without data. " << std::endl;
243  // return the results
244  if (!sendUsingMe->sendObject(result, errMsg)) {
245  return std::make_pair(false, errMsg);
246  }
247  return std::make_pair(true, std::string("execution complete"));
248 
249  } else {
250  inputSet->unpinBufferPage();
251  getFunctionality<PangeaStorageServer>().cleanup();
252  }
253  sourceContext->setDatabaseId(inputSet->getDbID());
254  sourceContext->setTypeId(inputSet->getTypeID());
255  sourceContext->setSetId(inputSet->getSetID());
256  sourceContext->setPageSize(inputSet->getPageSize());
257  newRequest->setSourceContext(sourceContext);
258  std::cout << "Broadcasted data set size: " << inputSet->getNumPages() << " pages"
259  << std::endl;
260  newRequest->setNumPages(inputSet->getNumPages());
261  PDB_COUT << "Input is set with setName=" << inSetName
262  << ", setId=" << inputSet->getSetID() << std::endl;
263 
264 
265  // forward the request
266  newRequest->print();
267 
268  if (inputSet->getNumPages() != 0) {
269 
270  if (!communicatorToBackend->sendObject(newRequest, errMsg)) {
271  std::cout << errMsg << std::endl;
272  errMsg = std::string("can't send message to backend: ") + errMsg;
273  success = false;
274  } else {
275  PDB_COUT << "Frontend sent request to backend" << std::endl;
276  // wait for backend to finish.
277  communicatorToBackend->getNextObject<SimpleRequestResult>(success, errMsg);
278  if (!success) {
279  std::cout << "Error waiting for backend to finish this job stage. "
280  << errMsg << std::endl;
281  errMsg = std::string("backend failure: ") + errMsg;
282  }
283  }
284  } else {
285 
286  success = false;
287  errMsg = std::string("Error: broadcasted data size is 0");
288  std::cout << errMsg << std::endl;
289  }
290 
291  // forward result
292  // now, we send back the result
293  Handle<SetIdentifier> result = makeObject<SetIdentifier>(inDatabaseName, inSetName);
294  result->setNumPages(inputSet->getNumPages());
295  result->setPageSize(inputSet->getPageSize());
296  if (success == true) {
297  PDB_COUT << "Stage is done. " << std::endl;
298  errMsg = std::string("execution complete");
299  } else {
300  std::cout << "Stage failed at server" << std::endl;
301  }
302  // return the results
303  if (!sendUsingMe->sendObject(result, errMsg)) {
304  return std::make_pair(false, errMsg);
305  }
306  if (success == false) {
307  // TODO:restart backend
308  }
309  return std::make_pair(success, errMsg);
310 
311  }
312 
313  ));
314 
315 
316  // to handle a request to execute an aggregation stage
317  forMe.registerHandler(
318  AggregationJobStage_TYPEID,
320  Handle<AggregationJobStage> request, PDBCommunicatorPtr sendUsingMe) {
321  std::string errMsg;
322  bool success;
323  PDB_COUT << "Frontend got a request for AggregationJobStage" << std::endl;
324  request->print();
325 #ifdef ENABLE_LARGE_GRAPH
326  makeObjectAllocatorBlock(256 * 1024 * 1024, true);
327 #else
328  makeObjectAllocatorBlock(32 * 1024 * 1024, true);
329 #endif
330 #ifdef PROFILING
331  std::string out = getAllocator().printInactiveBlocks();
332  std::cout << "AggregationJobStage: print inactive blocks:" << std::endl;
333  std::cout << out << std::endl;
334 #endif
335  PDBCommunicatorPtr communicatorToBackend = make_shared<PDBCommunicator>();
336  if (communicatorToBackend->connectToLocalServer(
337  getFunctionality<PangeaStorageServer>().getLogger(),
338  getFunctionality<PangeaStorageServer>().getPathToBackEndServer(),
339  errMsg)) {
340  std::cout << errMsg << std::endl;
341  return std::make_pair(false, errMsg);
342  }
343  PDB_COUT << "Frontend connected to backend" << std::endl;
344  Handle<AggregationJobStage> newRequest =
345  makeObject<AggregationJobStage>(request->getStageId(),
346  request->needsToMaterializeAggOut(),
347  request->getAggComputation(),
348  request->getNumNodePartitions());
349  PDB_COUT << "Created AggregationJobStage object for forwarding" << std::endl;
350 
351 
352  // input set
353  // restructure the input information
354  std::string inDatabaseName = request->getSourceContext()->getDatabase();
355  std::string inSetName = request->getSourceContext()->getSetName();
356  Handle<SetIdentifier> sourceContext =
357  makeObject<SetIdentifier>(inDatabaseName, inSetName);
358  PDB_COUT << "Created SetIdentifier object for input" << std::endl;
359  SetPtr inputSet = getFunctionality<PangeaStorageServer>().getSet(
360  std::pair<std::string, std::string>(inDatabaseName, inSetName));
361  if (inputSet == nullptr) {
362  PDB_COUT << "FrontendQueryTestServer: input set doesn't exist in this machine"
363  << std::endl;
364  // TODO: move data from other servers
365  // temporarily, we simply return;
366  // now, we send back the result
367  Handle<SetIdentifier> result =
368  makeObject<SetIdentifier>(request->getSinkContext()->getDatabase(),
369  request->getSinkContext()->getSetName());
370  result->setNumPages(0);
371  result->setPageSize(
372  getFunctionality<PangeaStorageServer>().getConf()->getPageSize());
373  PDB_COUT << "Query is done without data. " << std::endl;
374  // return the results
375  if (!sendUsingMe->sendObject(result, errMsg)) {
376  return std::make_pair(false, errMsg);
377  }
378  return std::make_pair(true, std::string("execution complete"));
379 
380  } else {
381  getFunctionality<PangeaStorageServer>().cleanup(false);
382  PDB_COUT << "input set size=" << inputSet->getNumPages() << std::endl;
383  }
384  sourceContext->setDatabaseId(inputSet->getDbID());
385  sourceContext->setTypeId(inputSet->getTypeID());
386  sourceContext->setSetId(inputSet->getSetID());
387  sourceContext->setPageSize(inputSet->getPageSize());
388  newRequest->setSourceContext(sourceContext);
389  PDB_COUT << "Input is set with setName=" << inSetName
390  << ", setId=" << inputSet->getSetID() << std::endl;
391 
392 
393  // output set
394  std::string outDatabaseName = request->getSinkContext()->getDatabase();
395  std::string outSetName = request->getSinkContext()->getSetName();
396  SetType outSetType = request->getSinkContext()->getSetType();
397  bool isAggResult = request->getSinkContext()->isAggregationResult();
398  success = true;
399  // add the output set
400  // check whether output set exists
401  std::pair<std::string, std::string> outDatabaseAndSet =
402  std::make_pair(outDatabaseName, outSetName);
403  SetPtr outputSet = getFunctionality<PangeaStorageServer>().getSet(outDatabaseAndSet);
404  if ((outputSet == nullptr) && (outSetType != PartitionedHashSetType)) {
405  success = getFunctionality<PangeaStorageServer>().addSet(
406  outDatabaseName, request->getOutputTypeName(), outSetName);
407  outputSet = getFunctionality<PangeaStorageServer>().getSet(outDatabaseAndSet);
408  PDB_COUT << "Output set is created in storage" << std::endl;
409  }
410 
411  if (success == true) {
412  newRequest->setOutputTypeName(request->getOutputTypeName());
413  Handle<SetIdentifier> sinkContext =
414  makeObject<SetIdentifier>(outDatabaseName, outSetName, outSetType, isAggResult);
415  if (outSetType != PartitionedHashSetType) {
416  sinkContext->setDatabaseId(outputSet->getDbID());
417  sinkContext->setTypeId(outputSet->getTypeID());
418  sinkContext->setSetId(outputSet->getSetID());
419  sinkContext->setPageSize(outputSet->getPageSize());
420  }
421  newRequest->setSinkContext(sinkContext);
422  } else {
423  Handle<SetIdentifier> result =
424  makeObject<SetIdentifier>(outDatabaseName, outSetName);
425  result->setNumPages(0);
426  result->setPageSize(0);
427  PDB_COUT << "Query failed: not able to create output set. " << std::endl;
428  // return the results
429  if (!sendUsingMe->sendObject(result, errMsg)) {
430  return std::make_pair(false, errMsg);
431  }
432  return std::make_pair(true,
433  std::string("Query failed: not able to create output set"));
434  }
435 
436  newRequest->setJobId(request->getJobId());
437  newRequest->setTotalMemoryOnThisNode(request->getTotalMemoryOnThisNode());
438  // forward the request
439  newRequest->print();
440 
441  if (!communicatorToBackend->sendObject(newRequest, errMsg)) {
442  std::cout << errMsg << std::endl;
443  errMsg = std::string("can't send message to backend: ") + errMsg;
444  success = false;
445  } else {
446  PDB_COUT << "Frontend sent request to backend" << std::endl;
447  // wait for backend to finish.
448  communicatorToBackend->getNextObject<SimpleRequestResult>(success, errMsg);
449  if (!success) {
450  std::cout << "Error waiting for backend to finish this job stage. " << errMsg
451  << std::endl;
452  errMsg = std::string("backend failure: ") + errMsg;
453  }
454  }
455 
456  // forward result
457  // now, we send back the result
458 
459  Handle<SetIdentifier> result = makeObject<SetIdentifier>(outDatabaseName, outSetName);
460  if (outSetType != PartitionedHashSetType) {
461  result->setNumPages(outputSet->getNumPages());
462  result->setPageSize(outputSet->getPageSize());
463  } else {
464  // if output is not materialized to user set, we roughly estimate the output using
465  // the input.
466  result->setNumPages(inputSet->getNumPages());
467  result->setPageSize(inputSet->getPageSize());
468  }
469  if (success == true) {
470  PDB_COUT << "Stage is done. " << std::endl;
471  errMsg = std::string("execution complete");
472  } else {
473  std::cout << "Stage failed at server" << std::endl;
474  }
475  // return the results
476  if (!sendUsingMe->sendObject(result, errMsg)) {
477  return std::make_pair(false, errMsg);
478  }
479  if (success == false) {
480  // TODO:restart backend
481  }
482  return std::make_pair(success, errMsg);
483 
484  }));
485 
486 
487  // to handle a request to execute a tupleset pipeline stage
488  forMe.registerHandler(
489  TupleSetJobStage_TYPEID,
491  PDBCommunicatorPtr sendUsingMe) {
492  std::string errMsg;
493  bool success;
494  PDB_COUT << "Frontend got a request for TupleSetJobStage" << std::endl;
495  request->print();
496 #ifdef ENABLE_LARGE_GRAPH
497  makeObjectAllocatorBlock(256 * 1024 * 1024, true);
498 #else
499  makeObjectAllocatorBlock(32 * 1024 * 1024, true);
500 #endif
501 #ifdef PROFILING
502  std::string out = getAllocator().printInactiveBlocks();
503  std::cout << "TupleSetJobStage: print inactive blocks:" << std::endl;
504  std::cout << out << std::endl;
505 #endif
506  PDBCommunicatorPtr communicatorToBackend = make_shared<PDBCommunicator>();
507  if (communicatorToBackend->connectToLocalServer(
508  getFunctionality<PangeaStorageServer>().getLogger(),
509  getFunctionality<PangeaStorageServer>().getPathToBackEndServer(),
510  errMsg)) {
511  std::cout << errMsg << std::endl;
512  return std::make_pair(false, errMsg);
513  }
514  PDB_COUT << "Frontend connected to backend" << std::endl;
515  Handle<TupleSetJobStage> newRequest =
516  deepCopyToCurrentAllocationBlock<TupleSetJobStage>(request);
517 
518  PDB_COUT << "Created TupleSetJobStage object for forwarding" << std::endl;
519  std::string inDatabaseName = request->getSourceContext()->getDatabase();
520  std::string inSetName = request->getSourceContext()->getSetName();
521  if (request->isInputAggHashOut() == false) {
522  // restructure the input information
523  Handle<SetIdentifier> sourceContext =
524  makeObject<SetIdentifier>(inDatabaseName, inSetName);
525  PDB_COUT << "Created SetIdentifier object for input" << std::endl;
526  SetPtr inputSet = getFunctionality<PangeaStorageServer>().getSet(
527  std::pair<std::string, std::string>(inDatabaseName, inSetName));
528  if (inputSet == nullptr) {
529  PDB_COUT << "FrontendQueryTestServer: input set doesn't exist in this machine"
530  << std::endl;
531  // TODO: move data from other servers
532  // temporarily, we simply return;
533  // now, we send back the result
534  Handle<SetIdentifier> result =
535  makeObject<SetIdentifier>(request->getSinkContext()->getDatabase(),
536  request->getSinkContext()->getSetName());
537  result->setNumPages(0);
538  result->setPageSize(
539  getFunctionality<PangeaStorageServer>().getConf()->getPageSize());
540  PDB_COUT << "Stage is done without input. " << std::endl;
541  // return the results
542  if (!sendUsingMe->sendObject(result, errMsg)) {
543  return std::make_pair(false, errMsg);
544  }
545  return std::make_pair(true, std::string("execution complete"));
546 
547  } else {
548  inputSet->unpinBufferPage();
549  getFunctionality<PangeaStorageServer>().cleanup(false);
550  }
551  std::cout << "number of pages in set " << inSetName << " is "
552  << inputSet->getNumPages() << std::endl;
553  if (inputSet->getNumPages() == 0) {
554  PDB_COUT << "FrontendQueryTestServer: input set doesn't have any pages in this machine"
555  << std::endl;
556  // TODO: move data from other servers
557  // temporarily, we simply return;
558  // now, we send back the result
559  Handle<SetIdentifier> result =
560  makeObject<SetIdentifier>(request->getSinkContext()->getDatabase(),
561  request->getSinkContext()->getSetName());
562  result->setNumPages(0);
563  result->setPageSize(
564  inputSet->getPageSize());
565  PDB_COUT << "Stage is done without data. " << std::endl;
566  // return the results
567  if (!sendUsingMe->sendObject(result, errMsg)) {
568  return std::make_pair(false, errMsg);
569  }
570  return std::make_pair(true, std::string("execution complete"));
571 
572  }
573  sourceContext->setDatabaseId(inputSet->getDbID());
574  sourceContext->setTypeId(inputSet->getTypeID());
575  sourceContext->setSetId(inputSet->getSetID());
576  sourceContext->setPageSize(inputSet->getPageSize());
577  newRequest->setSourceContext(sourceContext);
578  PDB_COUT << "Input is set with setName=" << inSetName
579  << ", setId=" << inputSet->getSetID() << std::endl;
580  } else {
581  PDB_COUT << "Input is hash table output from aggregation" << std::endl;
582  }
583 
584  std::string outDatabaseName = request->getSinkContext()->getDatabase();
585  std::string outSetName = request->getSinkContext()->getSetName();
586  success = true;
587  // add the output set
588  // check whether output set exists
589  std::pair<std::string, std::string> outDatabaseAndSet =
590  std::make_pair(outDatabaseName, outSetName);
591  SetPtr outputSet = getFunctionality<PangeaStorageServer>().getSet(outDatabaseAndSet);
592  if (outputSet == nullptr) {
593  success = getFunctionality<PangeaStorageServer>().addSet(
594  outDatabaseName, request->getOutputTypeName(), outSetName);
595  outputSet = getFunctionality<PangeaStorageServer>().getSet(outDatabaseAndSet);
596  PDB_COUT << "Output set is created in storage with database=" << outDatabaseName
597  << ", set=" << outSetName << ", type=IntermediateData" << std::endl;
598  }
599 
600  if (success == true) {
601  Handle<SetIdentifier> sinkContext =
602  makeObject<SetIdentifier>(outDatabaseName, outSetName);
603  PDB_COUT << "Created SetIdentifier object for output with setName=" << outSetName
604  << ", setId=" << outputSet->getSetID() << std::endl;
605  sinkContext->setDatabaseId(outputSet->getDbID());
606  sinkContext->setTypeId(outputSet->getTypeID());
607  sinkContext->setSetId(outputSet->getSetID());
608  sinkContext->setPageSize(outputSet->getPageSize());
609  newRequest->setSinkContext(sinkContext);
610  newRequest->setOutputTypeName(request->getOutputTypeName());
611 
612  } else {
613  Handle<SetIdentifier> result =
614  makeObject<SetIdentifier>(outDatabaseName, outSetName);
615  result->setNumPages(0);
616  result->setPageSize(
617  getFunctionality<PangeaStorageServer>().getConf()->getPageSize());
618  PDB_COUT << "Stage failed: not able to create output set. " << std::endl;
619  // return the results
620  if (!sendUsingMe->sendObject(result, errMsg)) {
621  return std::make_pair(false, errMsg);
622  }
623  return std::make_pair(true,
624  std::string("Query failed: not able to create output set"));
625  }
626 
627  bool needsRemoveCombinerSet = false;
628  SetPtr combinerSet = nullptr;
629  std::string combinerDatabaseName;
630  std::string combinerSetName;
631  if (request->getCombinerContext() != nullptr) {
632  combinerDatabaseName = request->getCombinerContext()->getDatabase();
633  combinerSetName = request->getCombinerContext()->getSetName();
634  success = true;
635  // add the combiner set
636  // check whether the combiner set exists
637  std::pair<std::string, std::string> combinerDatabaseAndSet =
638  std::make_pair(combinerDatabaseName, combinerSetName);
639  combinerSet =
640  getFunctionality<PangeaStorageServer>().getSet(combinerDatabaseAndSet);
641  if (combinerSet == nullptr) {
642  success = getFunctionality<PangeaStorageServer>().addSet(combinerDatabaseName,
643  combinerSetName);
644  combinerSet =
645  getFunctionality<PangeaStorageServer>().getSet(combinerDatabaseAndSet);
646  needsRemoveCombinerSet = true;
647  PDB_COUT << "Combiner set is created in storage" << std::endl;
648  }
649  }
650 
651  if (success == true) {
652  if (combinerSet != nullptr) {
653  Handle<SetIdentifier> combinerContext =
654  makeObject<SetIdentifier>(combinerDatabaseName, combinerSetName);
655  PDB_COUT << "Created SetIdentifier object for combiner with setName="
656  << combinerSetName << ", setId=" << combinerSet->getSetID()
657  << std::endl;
658  combinerContext->setDatabaseId(combinerSet->getDbID());
659  combinerContext->setTypeId(combinerSet->getTypeID());
660  combinerContext->setSetId(combinerSet->getSetID());
661  combinerContext->setPageSize(combinerSet->getPageSize());
662  newRequest->setCombinerContext(combinerContext);
663  } else {
664  newRequest->setCombinerContext(nullptr);
665  }
666 
667  newRequest->print();
668 
669  if (!communicatorToBackend->sendObject(newRequest, errMsg)) {
670  std::cout << errMsg << std::endl;
671  errMsg = std::string("can't send message to backend: ") + errMsg;
672  success = false;
673  } else {
674  PDB_COUT << "Frontend sent request to backend" << std::endl;
675  // wait for backend to finish.
676  communicatorToBackend->getNextObject<SimpleRequestResult>(success, errMsg);
677  if (!success) {
678  std::cout << "Error waiting for backend to finish this job stage. "
679  << errMsg << std::endl;
680  errMsg = std::string("backend failure: ") + errMsg;
681  }
682  }
683  }
684 
685  if (needsRemoveCombinerSet == true) {
686  // remove combiner set
687  getFunctionality<PangeaStorageServer>().removeSet(combinerDatabaseName,
688  combinerSetName);
689  }
690 
691  // now, we send back the result
692  Handle<SetIdentifier> result = makeObject<SetIdentifier>(outDatabaseName, outSetName);
693  result->setNumPages(outputSet->getNumPages());
694  result->setPageSize(outputSet->getPageSize());
695  if (success == true) {
696  PDB_COUT << "Stage is done. " << std::endl;
697  errMsg = std::string("execution complete");
698  } else {
699  std::cout << "Stage failed at server" << std::endl;
700  }
701  // return the results
702  if (!sendUsingMe->sendObject(result, errMsg)) {
703  return std::make_pair(false, errMsg);
704  }
705  if (success == false) {
706  // TODO:restart backend
707  }
708  return std::make_pair(success, errMsg);
709 
710  }));
711 
712 
713  // handle a request to delete a file
714  forMe.registerHandler(
715  DeleteSet_TYPEID,
716  make_shared<SimpleRequestHandler<DeleteSet>>([&](Handle<DeleteSet> request,
717  PDBCommunicatorPtr sendUsingMe) {
718 
719  const UseTemporaryAllocationBlock tempBlock{1024 * 128};
720  {
721  std::string errMsg;
722  if ((!getFunctionality<CatalogServer>().deleteSet(
723  request->whichDatabase(), request->whichSet(), errMsg)) ||
724  (!getFunctionality<PangeaStorageServer>().removeSet(request->whichDatabase(),
725  request->whichSet()))) {
726  Handle<SimpleRequestResult> result = makeObject<SimpleRequestResult>(
727  false, std::string("error attempting to delete set: " + errMsg));
728  if (!sendUsingMe->sendObject(result, errMsg)) {
729  return std::make_pair(false, errMsg);
730  }
731  } else {
732  Handle<SimpleRequestResult> result = makeObject<SimpleRequestResult>(
733  true, std::string("successfully deleted set"));
734  if (!sendUsingMe->sendObject(result, errMsg)) {
735  return std::make_pair(false, errMsg);
736  }
737  }
738  return std::make_pair(true, std::string("delete complete"));
739  }
740  }));
741 
742  // handle a request to iterate through a file
743  forMe.registerHandler(
744  SetScan_TYPEID,
745  make_shared<SimpleRequestHandler<SetScan>>([&](Handle<SetScan> request,
746  PDBCommunicatorPtr sendUsingMe) {
747 
748  // for error handling
749  std::string errMsg;
750 
751  // this is the number of pages
752  std::string whichDatabase = request->getDatabase();
753  std::string whichSet = request->getSetName();
754  PDB_COUT << "we are now iterating set:" << whichSet << std::endl;
755  // and keep looping while someone wants to get the output
756  SetPtr loopingSet = getFunctionality<PangeaStorageServer>().getSet(
757  std::make_pair(whichDatabase, whichSet));
758  if (loopingSet == nullptr) {
759  errMsg = "FATAL ERROR in handling SetScan request: set doesn't exist";
760  std::cout << errMsg << std::endl;
761  return std::make_pair(false, errMsg);
762  } else {
763  std::cout << "To scan set " << whichDatabase << ":" << whichSet <<
764  " with " << loopingSet->getNumPages() << " pages." << std::endl;
765  }
766  loopingSet->setPinned(true);
767  vector<PageIteratorPtr> *pageIters = loopingSet->getIterators();
768  // loop through all pages
769  int numIterators = pageIters->size();
770  for (int i = 0; i < numIterators; i++) {
771  PageIteratorPtr iter = pageIters->at(i);
772  while (iter->hasNext()) {
773  PDBPagePtr nextPage = iter->next();
774  // send the relevant page.
775  if (nextPage != nullptr) {
777  (Record<Vector<Handle<Object>>> *) (nextPage->getBytes());
778  Handle<Vector<Handle<Object>>> inputVec = myRec->getRootObject();
779  if (inputVec == nullptr) {
780  std::cout << "no vector in this page" << std::endl;
781  // to evict this page
782  PageCachePtr cache = getFunctionality<PangeaStorageServer>().getCache();
783  CacheKey key;
784  key.dbId = nextPage->getDbID();
785  key.typeId = nextPage->getTypeID();
786  key.setId = nextPage->getSetID();
787  key.pageId = nextPage->getPageID();
788  cache->decPageRefCount(key);
789 #ifndef REMOVE_SET_WITH_EVICTION
790  cache->evictPage(key); // try to modify this to something like
791  // evictPageWithoutFlush() or clear set in the
792  // end.
793 #endif
794  continue;
795  }
796 
797  int vecSize = inputVec->size();
798  if (vecSize != 0) {
799  const UseTemporaryAllocationBlock tempBlock{2048};
800 #ifdef ENABLE_COMPRESSION
801  char *newRecord = (char *) calloc(nextPage->getSize(), 1);
802  myRec = getRecord(inputVec, newRecord, nextPage->getSize());
803  char *compressedBytes =
804  new char[snappy::MaxCompressedLength(myRec->numBytes())];
805  size_t compressedSize;
806  snappy::RawCompress((char *) (myRec),
807  myRec->numBytes(),
808  compressedBytes,
809  &compressedSize);
810  std::cout << "Frontend=>Client: size before compression is "
811  << myRec->numBytes() << " and size after compression is "
812  << compressedSize << std::endl;
813  sendUsingMe->sendBytes(compressedBytes, compressedSize, errMsg);
814 
815  delete[] compressedBytes;
816  free(newRecord);
817 #else
818  if (!sendUsingMe->sendBytes(
819  nextPage->getBytes(), nextPage->getSize(), errMsg)) {
820  return std::make_pair(false, errMsg);
821  }
822 #endif
823  // see whether or not the client wants to see more results
824  bool success;
825  if (sendUsingMe->getObjectTypeID() != DoneWithResult_TYPEID) {
826  Handle<KeepGoing> temp =
827  sendUsingMe->getNextObject<KeepGoing>(success, errMsg);
828  PDB_COUT << "Keep going" << std::endl;
829  if (!success)
830  return std::make_pair(false, errMsg);
831  } else {
833  sendUsingMe->getNextObject<DoneWithResult>(success, errMsg);
834  PDB_COUT << "Done" << std::endl;
835  if (!success)
836  return std::make_pair(false, errMsg);
837  else
838  return std::make_pair(true, std::string("everything OK!"));
839  }
840  }
841  // to evict this page
842  PageCachePtr cache = getFunctionality<PangeaStorageServer>().getCache();
843  CacheKey key;
844  key.dbId = nextPage->getDbID();
845  key.typeId = nextPage->getTypeID();
846  key.setId = nextPage->getSetID();
847  key.pageId = nextPage->getPageID();
848  cache->decPageRefCount(key);
849 #ifndef REMOVE_SET_WITH_EVICTION
850  cache->evictPage(key); // try to modify this to something like
851  // evictPageWithoutFlush() or clear set in the end.
852 #endif
853  } else {
854  PDB_COUT << "We've got a null page!!!" << std::endl;
855  }
856  }
857  }
858  loopingSet->setPinned(false);
859  delete pageIters;
860  // tell the caller we are done
861  const UseTemporaryAllocationBlock tempBlock{1024};
862  Handle<DoneWithResult> temp = makeObject<DoneWithResult>();
863  if (!sendUsingMe->sendObject(temp, errMsg)) {
864  return std::make_pair(false, "could not send done message: " + errMsg);
865  }
866  // we got to here means success!! We processed the query, and got all of the results
867  std::cout << "We have finished scanning this set" << std::endl;
868  return std::make_pair(true, std::string("query completed!!"));
869  }));
870 }
871 
872 }
873 
874 #endif
SetID setId
Definition: DataTypes.h:87
shared_ptr< PDBPage > PDBPagePtr
Definition: PDBPage.h:32
SetType
Definition: DataTypes.h:76
Handle< ObjType > getRootObject()
Definition: Record.cc:46
shared_ptr< PageCache > PageCachePtr
Definition: PageCache.h:39
DatabaseID dbId
Definition: DataTypes.h:85
shared_ptr< PageIteratorInterface > PageIteratorPtr
Definition: PageIterator.h:33
Allocator & getAllocator()
Definition: Allocator.cc:943
Record< ObjType > * getRecord(Handle< ObjType > &forMe)
std::shared_ptr< PDBCommunicator > PDBCommunicatorPtr
PageID pageId
Definition: DataTypes.h:88
size_t numBytes()
Definition: Record.cc:36
void registerHandlers(PDBServer &forMe) override
#define PDB_COUT
Definition: PDBDebug.h:31
void registerHandler(int16_t typeID, PDBCommWorkPtr handledBy)
Definition: PDBServer.cc:81
shared_ptr< UserSet > SetPtr
Definition: UserSet.h:36
void makeObjectAllocatorBlock(size_t numBytesIn, bool throwExceptionOnFail)
UserTypeID typeId
Definition: DataTypes.h:86
std::string printInactiveBlocks()
Definition: Allocator.cc:888