A platform for high-performance distributed tool and library development written in C++. It can be deployed in two different cluster modes: standalone or distributed. API for v0.5.0, released on June 13, 2018.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
ScanUserSetBase.h
Go to the documentation of this file.
1 /*****************************************************************************
2  * *
3  * Copyright 2018 Rice University *
4  * *
5  * Licensed under the Apache License, Version 2.0 (the "License"); *
6  * you may not use this file except in compliance with the License. *
7  * You may obtain a copy of the License at *
8  * *
9  * http://www.apache.org/licenses/LICENSE-2.0 *
10  * *
11  * Unless required by applicable law or agreed to in writing, software *
12  * distributed under the License is distributed on an "AS IS" BASIS, *
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. *
14  * See the License for the specific language governing permissions and *
15  * limitations under the License. *
16  * *
17  *****************************************************************************/
18 
19 #ifndef PDB_SCANUSERSETBASE_H
20 #define PDB_SCANUSERSETBASE_H
21 
22 #include "TypeName.h"
23 #include "Computation.h"
25 #include "VectorTupleSetIterator.h"
26 #include "PDBString.h"
27 #include "DataTypes.h"
28 #include "DataProxy.h"
29 #include "Configuration.h"
30 #include "mustache.h"
31 
32 namespace pdb {
33 
34 template<class OutputClass>
35 class ScanUserSetBase : public Computation {
36 
37  public:
38 
42  ScanUserSetBase() = default;
43 
49  ScanUserSetBase(std::string dbName, std::string setName) {
50  this->dbName = dbName;
51  this->setName = setName;
52  this->outputType = getTypeName<OutputClass>();
53  this->batchSize = -1;
54  }
55 
57  this->iterator = nullptr;
58  this->proxy = nullptr;
59  }
60 
65  void setUpAndCopyFrom(void *target, void *source) const override {
66  new(target) ScanUserSetBase<OutputClass>();
69  toMe.iterator = fromMe.iterator;
70  toMe.proxy = fromMe.proxy;
71  toMe.batchSize = fromMe.batchSize;
72  toMe.dbName = fromMe.dbName;
73  toMe.setName = fromMe.setName;
74  toMe.outputType = fromMe.outputType;
75 
76  }
77 
78  void deleteObject(void *deleteMe) override {
79  deleter(deleteMe, this);
80  }
81 
82  size_t getSize(void *forMe) override {
83  return sizeof(ScanUserSetBase<OutputClass>);
84  }
85 
87  return std::make_shared<VectorTupleSetIterator>(
88 
89  [&]() -> void * {
90  if (this->iterator == nullptr) {
91  return nullptr;
92  }
93  while (this->iterator->hasNext()) {
94 
95  PDBPagePtr page = this->iterator->next();
96  if (page != nullptr) {
97  return page->getBytes();
98  }
99  }
100 
101  return nullptr;
102 
103  },
104 
105  [&](void *freeMe) -> void {
106  if (this->proxy != nullptr) {
107  char *pageRawBytes = (char *) freeMe -
108  (sizeof(NodeID) + sizeof(DatabaseID) + sizeof(UserTypeID) + sizeof(SetID) +
109  sizeof(PageID) + sizeof(int) + sizeof(size_t));
110 
111  PDBPagePtr page = make_shared<PDBPage>(pageRawBytes, 0, 0);
112  NodeID nodeId = page->getNodeID();
113  DatabaseID dbId = page->getDbID();
114  UserTypeID typeId = page->getTypeID();
115  SetID setId = page->getSetID();
116  try {
117  this->proxy->unpinUserPage(nodeId, dbId, typeId, setId, page, false);
118  } catch (NotEnoughSpace &n) {
119  makeObjectAllocatorBlock(4096, true);
120  this->proxy->unpinUserPage(nodeId, dbId, typeId, setId, page, false);
121  throw n;
122  }
123  }
124  },
125 
126  this->batchSize
127 
128  );
129  }
130 
136  this->iterator = iterator;
137  }
138 
140  this->proxy = proxy;
141  }
142 
143  void setBatchSize(int batchSize) override {
144  this->batchSize = batchSize;
145  }
146 
147  int getBatchSize() {
148  return this->batchSize;
149  }
150 
151  void setOutput(std::string dbName, std::string setName) override {
152  this->dbName = dbName;
153  this->setName = setName;
154  }
155 
156  void setDatabaseName(std::string dbName) {
157  this->dbName = dbName;
158  }
159 
160  void setSetName(std::string setName) {
161  this->setName = setName;
162  }
163 
164  std::string getDatabaseName() override {
165  return dbName;
166  }
167 
168  std::string getSetName() override {
169  return setName;
170  }
171 
172  std::string getComputationType() override {
173  return std::string("ScanUserSet");
174  }
175 
181  return ScanSetTypeID;
182  }
183 
193  std::string toTCAPString(std::vector<InputTupleSetSpecifier> &inputTupleSets,
194  int computationLabel,
195  std::string &outputTupleSetName,
196  std::vector<std::string> &outputColumnNames,
197  std::string &addedOutputColumnName) override {
198 
199  InputTupleSetSpecifier inputTupleSet;
200  if (!inputTupleSets.empty()) {
201  inputTupleSet = inputTupleSets[0];
202  }
203  return toTCAPString(inputTupleSet.getTupleSetName(),
204  inputTupleSet.getColumnNamesToKeep(),
205  inputTupleSet.getColumnNamesToApply(),
206  computationLabel,
208  outputColumnNames,
209  addedOutputColumnName);
210  }
211 
223  std::string toTCAPString(std::string inputTupleSetName,
224  std::vector<std::string> &inputColumnNames,
225  std::vector<std::string> &inputColumnsToApply,
226  int computationLabel,
227  std::string &outputTupleSetName,
228  std::vector<std::string> &outputColumnNames,
229  std::string &addedOutputColumnName) {
230 
231  // the template we are going to use to create the TCAP string for this ScanUserSet
232  mustache::mustache scanSetTemplate{"inputDataFor{{computationType}}_{{computationLabel}}(in{{computationLabel}})"
233  " <= SCAN ('{{setName}}', '{{dbName}}', '{{computationType}}_{{computationLabel}}')\n"};
234 
235  // the data required to fill in the template
236  mustache::data scanSetData;
237  scanSetData.set("computationType", getComputationType());
238  scanSetData.set("computationLabel", std::to_string(computationLabel));
239  scanSetData.set("setName", std::string(setName));
240  scanSetData.set("dbName", std::string(dbName));
241 
242  // output column name
243  mustache::mustache outputColumnNameTemplate{"in{{computationLabel}}"};
244 
245  // set the output column name
246  addedOutputColumnName = outputColumnNameTemplate.render(scanSetData);
247  outputColumnNames.push_back(addedOutputColumnName);
248 
249  // output tuple set name template
250  mustache::mustache outputTupleSetTemplate{"inputDataFor{{computationType}}_{{computationLabel}}"};
251  outputTupleSetName = outputTupleSetTemplate.render(scanSetData);
252 
253  // update the state of the computation
254  this->setTraversed(true);
255  this->setOutputTupleSetName(outputTupleSetName);
256  this->setOutputColumnToApply(addedOutputColumnName);
257 
258  // return the TCAP string
259  return scanSetTemplate.render(scanSetData);
260  }
261 
262  int getNumInputs() override {
263  return 0;
264  }
265 
266  std::string getIthInputType(int i) override {
267  return "";
268  }
269 
270  std::string getOutputType() override {
271  if (outputType == "") {
272  outputType = getTypeName<OutputClass>();
273  }
274  return this->outputType;
275  }
276 
277  bool needsMaterializeOutput() override {
278  return false;
279  }
280 
281  protected:
282 
287 
288  DataProxyPtr proxy = nullptr;
289 
291 
293 
294  int batchSize{};
295 
297 };
298 
299 }
300 
301 
302 
303 #endif //PDB_SCANUSERSETBASE_H
ComputationTypeID
Definition: Computation.h:39
void setOutput(std::string dbName, std::string setName) override
unsigned int SetID
Definition: DataTypes.h:31
shared_ptr< PDBPage > PDBPagePtr
Definition: PDBPage.h:32
std::shared_ptr< ComputeSource > ComputeSourcePtr
Definition: ComputeSource.h:26
std::vector< std::string > & getColumnNamesToKeep()
PageCircularBufferIteratorPtr iterator
void setUpAndCopyFrom(void *target, void *source) const override
shared_ptr< DataProxy > DataProxyPtr
Definition: DataProxy.h:30
unsigned int NodeID
Definition: DataTypes.h:27
std::string getOutputType() override
std::string getComputationType() override
void deleter(void *deleteMe, ObjType *dummy)
Definition: DeepCopy.h:48
ScanUserSetBase(std::string dbName, std::string setName)
ComputationTypeID getComputationTypeID() override
unsigned int DatabaseID
Definition: DataTypes.h:29
void setDatabaseName(std::string dbName)
void setSetName(std::string setName)
unsigned int PageID
Definition: DataTypes.h:26
ScanUserSetBase()=default
void setProxy(DataProxyPtr proxy)
int getNumInputs() override
ComputeSourcePtr getComputeSource(TupleSpec &schema, ComputePlan &plan) override
std::string getDatabaseName() override
std::string toTCAPString(std::string inputTupleSetName, std::vector< std::string > &inputColumnNames, std::vector< std::string > &inputColumnsToApply, int computationLabel, std::string &outputTupleSetName, std::vector< std::string > &outputColumnNames, std::string &addedOutputColumnName)
void setOutputColumnToApply(std::string outputColumnToApply)
Definition: Computation.h:294
void setBatchSize(int batchSize) override
void setTraversed(bool traversed)
Definition: Computation.h:254
bool needsMaterializeOutput() override
std::vector< std::string > & getColumnNamesToApply()
void deleteObject(void *deleteMe) override
void setOutputTupleSetName(std::string outputTupleSetName)
Definition: Computation.h:275
std::string getIthInputType(int i) override
std::string getSetName() override
void makeObjectAllocatorBlock(size_t numBytesIn, bool throwExceptionOnFail)
std::string toTCAPString(std::vector< InputTupleSetSpecifier > &inputTupleSets, int computationLabel, std::string &outputTupleSetName, std::vector< std::string > &outputColumnNames, std::string &addedOutputColumnName) override
void setIterator(PageCircularBufferIteratorPtr iterator)
shared_ptr< PageCircularBufferIterator > PageCircularBufferIteratorPtr
size_t getSize(void *forMe) override
unsigned int UserTypeID
Definition: DataTypes.h:25
String outputTupleSetName
Definition: Computation.h:379