A platform for high-performance distributed tool and library development written in C++. It can be deployed in two different cluster modes: standalone or distributed. API for v0.5.0, released on June 13, 2018.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
TupleSetJobStage.h
Go to the documentation of this file.
1 /*****************************************************************************
2  * *
3  * Copyright 2018 Rice University *
4  * *
5  * Licensed under the Apache License, Version 2.0 (the "License"); *
6  * you may not use this file except in compliance with the License. *
7  * You may obtain a copy of the License at *
8  * *
9  * http://www.apache.org/licenses/LICENSE-2.0 *
10  * *
11  * Unless required by applicable law or agreed to in writing, software *
12  * distributed under the License is distributed on an "AS IS" BASIS, *
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. *
14  * See the License for the specific language governing permissions and *
15  * limitations under the License. *
16  * *
17  *****************************************************************************/
18 #ifndef TUPLESET_JOBSTAGE_H
19 #define TUPLESET_JOBSTAGE_H
20 
21 
22 #include "PDBDebug.h"
23 #include "Object.h"
24 #include "DataTypes.h"
25 #include "Handle.h"
26 #include "PDBVector.h"
27 #include "PDBString.h"
28 #include "SetIdentifier.h"
29 #include "ComputePlan.h"
30 #include "AbstractJobStage.h"
31 #include <stdlib.h>
32 
33 // PRELOAD %TupleSetJobStage%
34 
35 namespace pdb {
36 
38 
39 public:
40  // constructor
42 
43  // constructor
45  this->id = stageId;
46  this->sharedPlan = nullptr;
47  this->sourceContext = nullptr;
48  this->hashContext = nullptr;
49  this->sinkContext = nullptr;
50  this->probeOrNot = false;
51  this->repartitionOrNot = false;
52  this->combineOrNot = false;
53  this->broadcastOrNot = false;
54  this->inputAggHashOutOrNot = false;
55  this->numNodes = 0;
56  this->numPartitions = nullptr;
57  this->ipAddresses = nullptr;
58  }
59 
60  // constructor
62  this->id = stageId;
63  this->sharedPlan = nullptr;
64  this->sourceContext = nullptr;
65  this->hashContext = nullptr;
66  this->sinkContext = nullptr;
67  this->probeOrNot = false;
68  this->repartitionOrNot = false;
69  this->broadcastOrNot = false;
70  this->combineOrNot = false;
71  this->inputAggHashOutOrNot = false;
72  this->numNodes = numNodes;
73  this->numPartitions = makeObject<Vector<Handle<Vector<HashPartitionID>>>>(numNodes);
74  this->ipAddresses = makeObject<Vector<String>>(numNodes);
75  }
76 
78 
79  std::string getJobStageType() override {
80  return "TupleSetJobStage";
81  }
82 
83  int16_t getJobStageTypeID() override {
84  return TupleSetJobStage_TYPEID;
85  }
86 
87  // to set compute plan
89  const std::string &sourceTupleSetSpecifier,
90  const std::string &targetTupleSetSpecifier,
91  const std::string &targetComputationSpecifier) {
92  this->sharedPlan = plan;
93  this->sourceTupleSetSpecifier = sourceTupleSetSpecifier;
94  this->targetTupleSetSpecifier = targetTupleSetSpecifier;
95  this->targetComputationSpecifier = targetComputationSpecifier;
96  }
97  // to get source tupleset name for this pipeline stage
99  return this->sourceTupleSetSpecifier;
100  }
101  // to get target tupleset name for this pipeline stage
103  return this->targetTupleSetSpecifier;
104  }
105  // to get target computation name for this pipeline stage
107  return this->targetComputationSpecifier;
108  }
109 
110  // to get the vector of tuple sets to build the pipeline
111  void getTupleSetsToBuildPipeline(std::vector<std::string>& buildMe) {
112  buildMe.clear();
113  for (int i = 0; i < (*buildTheseTupleSets).size(); i++) {
114  buildMe.push_back((*buildTheseTupleSets)[i]);
115  }
116  }
117 
118  // to set tuplesets for building pipeline
119  void setTupleSetsToBuildPipeline(const std::vector<std::string> &buildMe) {
120  buildTheseTupleSets = makeObject<Vector<String>>();
121  for (const auto &i : buildMe) {
122  buildTheseTupleSets->push_back(i);
123  }
124  }
125 
126  // nullify compute plan shared pointer
128  this->sharedPlan->nullifyPlanPointer();
129  }
130 
131 
132  // to get compute plan
134  return this->sharedPlan;
135  }
136 
137  // to set source set identifier
139  this->sourceContext = sourceContext;
140  }
141 
142  // to return source set identifier
144  return this->sourceContext;
145  }
146 
147  // to set sink set identifier
149  this->sinkContext = sinkContext;
150  }
151 
152  // to return sink set identifier
154  return this->sinkContext;
155  }
156 
157  // to set hash set identifier for probing
159  this->hashContext = hashContext;
160  }
161 
162  // to return hash set identifier for probing
164  return this->hashContext;
165  }
166 
167  // to set combiner set identifier for combining data
169  this->combinerContext = combinerContext;
170  }
171 
172  // to return combiner set identifier for combining data
174  return this->combinerContext;
175  }
176 
177  // to set whether to probe hash table
178  void setProbing(bool probeOrNot) {
179  this->probeOrNot = probeOrNot;
180  }
181 
182  // to return whether this stage requires to probe hash table
183  bool isProbing() {
184  return this->probeOrNot;
185  }
186 
187 
188  // to set whether to probe hash table
190  this->inputAggHashOutOrNot = inputAggHashOutOrNot;
191  }
192 
193  // to return whether this stage requires to probe hash table
195  return this->inputAggHashOutOrNot;
196  }
197 
198 
199  // to set whether to broadcast hash table
201  this->broadcastOrNot = broadcastOrNot;
202  }
203 
204  // to return whether to broadcast hash table
205  bool isBroadcasting() {
206  return this->broadcastOrNot;
207  }
208 
209  // to set whether to repartition the output
211  this->repartitionOrNot = repartitionOrNot;
212  }
213 
214  // to return whether this stage requires to repartition output
215  bool isRepartition() {
216  return this->repartitionOrNot;
217  }
218 
219  // to set whether to repartition the output for a join
221  this->repartitionJoinOrNot = repartitionJoinOrNot;
222  }
223 
224  // to return whether this stage requires to repartition output for a join
226  return this->repartitionJoinOrNot;
227  }
228 
229  // to set whether to repartition the output into vectors
231  this->repartitionVectorOrNot = repartitionVectorOrNot;
232  }
233 
234  // to return whether this stage requires to repartition output into vectors
236  return this->repartitionVectorOrNot;
237  }
238 
239 
240  // to set whether to combine the repartitioned output
242  this->combineOrNot = combineOrNot;
243  }
244 
245  // to return whether this stage requires to combine repartitioned output
246  bool isCombining() {
247  return this->combineOrNot;
248  }
249 
250  JobStageID getStageId() override {
251  return this->id;
252  }
253 
255  this->collectAsMapOrNot = collectAsMapOrNot;
256  }
257 
258  bool isCollectAsMap() {
259  return this->collectAsMapOrNot;
260  }
261 
263  return this->numNodesToCollect;
264  }
265 
267  this->numNodesToCollect = numNodesToCollect;
268  }
269 
270 
271  void print() override {
272  std::cout << "[JOB ID] jobId=" << jobId << std::endl;
273  std::cout << "[STAGE ID] id=" << id << std::endl;
274  std::cout << "[INPUT] databaseName=" << sourceContext->getDatabase()
275  << ", setName=" << sourceContext->getSetName() << std::endl;
276  std::cout << "[OUTPUT] databaseName=" << sinkContext->getDatabase()
277  << ", setName=" << sinkContext->getSetName() << std::endl;
278  std::cout << "[OUTTYPE] typeName=" << getOutputTypeName() << std::endl;
279  std::cout << "[SOURCE TUPLESET] sourceTupleSetSpecifier=" << this->sourceTupleSetSpecifier
280  << std::endl;
281  std::cout << "[TARGET TUPLESET] targetTupleSetSpecifier=" << this->targetTupleSetSpecifier
282  << std::endl;
283  std::cout << "[TARGET COMPUTATION] targetComputationSpecifier="
284  << this->targetComputationSpecifier << std::endl;
285  std::cout << "[ALLOCATOR POLICY] allocatorPolicy=" << this->allocatorPolicy << std::endl;
286  if (buildTheseTupleSets != nullptr) {
287  std::cout << "[PIPELINE]" << std::endl;
288  size_t mySize = buildTheseTupleSets->size();
289  for (size_t i = 0; i < mySize; i++) {
290  std::cout << i << ": " << (*buildTheseTupleSets)[i] << std::endl;
291  }
292  }
293  std::cout << "[Probing] isProbing=" << this->probeOrNot << std::endl;
294  std::cout << "Number of cluster nodes=" << getNumNodes() << std::endl;
295  std::cout << "Total memory on this node is " << totalMemoryOnThisNode << std::endl;
296  std::cout << "Number of total partitions=" << getNumTotalPartitions() << std::endl;
297  int i;
298  for (i = 0; i < numNodes; i++) {
300  std::cout << "Number of partitions on node-" << i << " is " << partitions->size()
301  << std::endl;
302  std::cout << "IP address on node-" << i << " is " << getIPAddress(i) << std::endl;
303  int port;
304  if ((port = getPort(i)) > 0) {
305  std::cout << "Port on node-" << i << " is " << port << std::endl;
306  }
307  }
308 
309  // do we have hash sets to probe
310  if(hashSetsToProbe != nullptr) {
311 
312  // go through each entry in the map and print it
313  std::cout << "Hash sets to probe" << std::endl;
314  for(auto it = hashSetsToProbe->begin(); it != hashSetsToProbe->end(); ++it) {
315  std::cout << "Atomic computation : \"" << (*it).key << "\" : Hash set \"" << (*it).value << "\"" << std::endl;
316  }
317  }
318  }
319 
320  std::string getOutputTypeName() {
321  return this->outputTypeName;
322  }
323 
324  void setOutputTypeName(const std::string &outputTypeName) {
325  this->outputTypeName = outputTypeName;
326  }
327 
328  void setNumNodes(int numNodes) {
329  this->numNodes = numNodes;
330  }
331 
332  int getNumNodes() {
333  return this->numNodes;
334  }
335 
337  this->numPartitions->push_back(numPartitions);
338  }
339 
341  return (*numPartitions)[nodeId];
342  }
343 
345  return this->hashSetsToProbe;
346  }
347 
348  void addHashSetToProbe(String targetTupleSetName, String hashSetName) {
349  if (hashSetsToProbe == nullptr) {
350  hashSetsToProbe = makeObject<Map<String, String>>();
351  }
352  (*hashSetsToProbe)[targetTupleSetName] = hashSetName;
353  }
354 
357  }
358 
359  String getIPAddress(int nodeId) {
360  if ((unsigned int)nodeId < numPartitions->size()) {
361  std::string ipStr = (*ipAddresses)[nodeId];
362  if (ipStr.find(':') != std::string::npos) {
363  std::string ip = ipStr.substr(0, ipStr.find(':'));
364  return String(ip);
365  } else {
366  return String(ipStr);
367  }
368  } else {
369  return nullptr;
370  }
371  }
372 
373  int getPort(int nodeId) {
374  if ((unsigned int)nodeId < numPartitions->size()) {
375  std::string ipStr = (*ipAddresses)[nodeId];
376  if (ipStr.find(':') != std::string::npos) {
377  int port = atoi((ipStr.substr(ipStr.find(':') + 1)).c_str());
378  return port;
379  } else {
380  return -1;
381  }
382  } else {
383  return -1;
384  }
385  }
386 
387  // each address could be simply an IP address like 10.134.96.50, or IP address and port pair
388  // like localhost:8109
390  this->ipAddresses = addresses;
391  }
392 
395  }
396 
397 
399  return this->numTotalPartitions;
400  }
401 
403  this->numTotalPartitions = numTotalPartitions;
404  }
405 
407  return this->myNodeId;
408  }
409 
410  void setNodeId(NodeID nodeId) {
411  this->myNodeId = nodeId;
412  }
413 
414  void setTotalMemoryOnThisNode(size_t totalMem) {
415  this->totalMemoryOnThisNode = totalMem;
416  }
417 
419  return this->totalMemoryOnThisNode;
420  }
421 
423  this->allocatorPolicy = myPolicy;
424  }
425 
427  return this->allocatorPolicy;
428  }
429 
431 
432 
433 private:
434  // Input set information
436 
437  // Hash set information for probing aggregation set
439 
440  // Combiner set information
442 
443  // Output set information
445 
446  // Output type name
448 
449  // logical plan information
451 
452  // source tuple set
454 
455  // target tuple set
457 
458  // target computation
460 
461  // tuple sets to build the pipeline
463 
464  // Does this stage has a PartitionedJoinSink that partitions JoinMaps
465  bool repartitionJoinOrNot = false;
466 
467  // Does this stage has a HashPartitionSink that partitions Vectors
469 
470  // Does this stage require probing a hash table ?
471  bool probeOrNot = false;
472 
473  // Does this stage require repartitioning output?
474  bool repartitionOrNot = false;
475 
476  // Does this stage require combining repartitioned results?
477  bool combineOrNot = false;
478 
479  // Does this stage require broadcasting results?
480  bool broadcastOrNot = false;
481 
482  // Does this stage consume aggregation hash output?
483  bool inputAggHashOutOrNot = false;
484 
485  // Does this stage collect results to one partition?
486  bool collectAsMapOrNot = false;
487 
488  // Number of nodes to collect aggregation results
490 
491  // hash set names to probe for join
493 
494  // the id to identify this job stage
496 
497  // repartitioning scheme
498  int numNodes;
499 
500  // number of partitions on each node
502 
503  // IP for each node
505 
506  // totalPartitions, should be consistent with numPartitions
508 
509  // node id of the receiver of this message
511 
512  // memory size on this node
514 
515  // allocator policy
517 };
518 }
519 
520 #endif
#define ENABLE_DEEP_COPY
Definition: DeepCopy.h:52
void setProbing(bool probeOrNot)
void setSinkContext(Handle< SetIdentifier > sinkContext)
Handle< SetIdentifier > sinkContext
int16_t getJobStageTypeID() override
Handle< Vector< String > > ipAddresses
Handle< SetIdentifier > getCombinerContext()
AllocatorPolicy
Definition: Allocator.h:130
void setAllocatorPolicy(AllocatorPolicy myPolicy)
AllocatorPolicy allocatorPolicy
void setRepartition(bool repartitionOrNot)
unsigned int NodeID
Definition: DataTypes.h:27
TupleSetJobStage(JobStageID stageId)
std::string getSourceTupleSetSpecifier()
Handle< ComputePlan > sharedPlan
void setInputAggHashOut(bool inputAggHashOutOrNot)
Handle< Vector< String > > buildTheseTupleSets
void setRepartitionJoin(bool repartitionJoinOrNot)
void setNumNodesToCollect(int numNodesToCollect)
std::string getJobStageType() override
void setComputePlan(const Handle< ComputePlan > &plan, const std::string &sourceTupleSetSpecifier, const std::string &targetTupleSetSpecifier, const std::string &targetComputationSpecifier)
Handle< SetIdentifier > getSinkContext()
Handle< SetIdentifier > getHashContext()
void setSourceContext(Handle< SetIdentifier > sourceContext)
std::string getOutputTypeName()
Handle< Vector< HashPartitionID > > & getNumPartitions(int nodeId)
Handle< Map< String, String > > hashSetsToProbe
unsigned int JobStageID
Definition: DataTypes.h:37
void setNumTotalPartitions(int numTotalPartitions)
Handle< SetIdentifier > sourceContext
void setIPAddresses(Handle< Vector< String >> addresses)
TupleSetJobStage(JobStageID stageId, int numNodes)
AllocatorPolicy getAllocatorPolicy()
void setCombining(bool combineOrNot)
void setNodeId(NodeID nodeId)
Handle< Vector< Handle< Vector< HashPartitionID > > > > numPartitions
String getIPAddress(int nodeId)
void setCombinerContext(Handle< SetIdentifier > combinerContext)
void setRepartitionVector(bool repartitionVectorOrNot)
void setHashContext(Handle< SetIdentifier > hashContext)
Handle< ComputePlan > getComputePlan()
Handle< SetIdentifier > getSourceContext()
std::string getTargetTupleSetSpecifier()
void setNumPartitions(Handle< Vector< Handle< Vector< HashPartitionID >>>> numPartitions)
void setHashSetsToProbe(Handle< Map< String, String >> hashSetsToProbe)
std::string getTargetComputationSpecifier()
void addHashSetToProbe(String targetTupleSetName, String hashSetName)
Handle< Map< String, String > > & getHashSets()
void getTupleSetsToBuildPipeline(std::vector< std::string > &buildMe)
void setCollectAsMap(bool collectAsMapOrNot)
int getPort(int nodeId)
void setBroadcasting(bool broadcastOrNot)
void setTotalMemoryOnThisNode(size_t totalMem)
void addNumPartitions(Handle< Vector< HashPartitionID >> numPartitions)
void setNumNodes(int numNodes)
JobStageID getStageId() override
void setOutputTypeName(const std::string &outputTypeName)
Handle< SetIdentifier > combinerContext
Handle< SetIdentifier > hashContext
void setTupleSetsToBuildPipeline(const std::vector< std::string > &buildMe)