A platform for high-performance distributed tool and library development written in C++. It can be deployed in two different cluster modes: standalone or distributed. API for v0.5.0, released on June 13, 2018.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
AggregationJobStage.h
Go to the documentation of this file.
1 /*****************************************************************************
2  * *
3  * Copyright 2018 Rice University *
4  * *
5  * Licensed under the Apache License, Version 2.0 (the "License"); *
6  * you may not use this file except in compliance with the License. *
7  * You may obtain a copy of the License at *
8  * *
9  * http://www.apache.org/licenses/LICENSE-2.0 *
10  * *
11  * Unless required by applicable law or agreed to in writing, software *
12  * distributed under the License is distributed on an "AS IS" BASIS, *
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. *
14  * See the License for the specific language governing permissions and *
15  * limitations under the License. *
16  * *
17  *****************************************************************************/
18 #ifndef AGGREGATION_JOBSTAGE_H
19 #define AGGREGATION_JOBSTAGE_H
20 
21 
22 #include "PDBDebug.h"
23 #include "Object.h"
24 #include "DataTypes.h"
25 #include "Handle.h"
26 #include "PDBVector.h"
27 #include "PDBString.h"
28 #include "SetIdentifier.h"
29 #include "AbstractAggregateComp.h"
30 #include "AbstractJobStage.h"
31 
32 // PRELOAD %AggregationJobStage%
33 
34 namespace pdb {
35 
36 // encapsulates a synchronous aggregation consuming job stage
38 
39 public:
41 
43  bool materializeOrNot,
45  this->id = stageId;
46  this->materializeOrNot = materializeOrNot;
47  this->aggComputation = aggComputation;
48  }
49 
51  bool materializeOrNot,
53  int numNodePartitions) {
54  this->id = stageId;
55  this->materializeOrNot = materializeOrNot;
56  this->aggComputation = aggComputation;
57  this->numNodePartitions = numNodePartitions;
58  }
59 
61 
63  this->numNodePartitions = numNodePartitions;
64  }
65 
67  return this->numNodePartitions;
68  }
69 
70  std::string getJobStageType() override {
71  return "AggregationJobStage";
72  }
73 
74  int16_t getJobStageTypeID() override {
75  return AggregationJobStage_TYPEID;
76  }
77 
78  // to set source set identifier
80  this->sourceContext = sourceContext;
81  }
82 
83  // to return source set identifier
85  return this->sourceContext;
86  }
87 
88  // to set sink set identifier
90  this->sinkContext = sinkContext;
91  }
92 
93  // to return sink set identifier
95  return this->sinkContext;
96  }
97 
98  JobStageID getStageId() override {
99  return this->id;
100  }
101 
103  return materializeOrNot;
104  }
105 
106  void print() override {
107 
108 
109  int numNodePartitions;
110 
111  size_t totalMemoryOnThisNode;
112  std::cout << "[JOB STAGE] JobStageType" << this->getJobStageType() << std::endl;
113  std::cout << "[JOB ID] jobId=" << jobId << std::endl;
114  std::cout << "[STAGE ID] id=" << id << std::endl;
115  std::cout << "[INPUT] databaseName=" << sourceContext->getDatabase()
116  << ", setName=" << sourceContext->getSetName() << std::endl;
117  std::cout << "[OUTPUT] databaseName=" << sinkContext->getDatabase()
118  << ", setName=" << sinkContext->getSetName() << std::endl;
119  std::cout << "[OUTTYPE] typeName=" << getOutputTypeName() << std::endl;
120  std::cout << "[NUMPARTITIONS] numPartitions=" << numNodePartitions << std::endl;
121  std::cout << "[MEM] total memory=" << totalMemoryOnThisNode << std::endl;
122  std::cout << "[MATERIALIZE] materialize=" << materializeOrNot << std::endl;
123  std::cout << "[COMPUTATION] outputTupleSetName=" << (*aggComputation).getOutputTupleSetName() << std::endl;
124  }
125 
126  std::string getOutputTypeName() {
127  return this->outputTypeName;
128  }
129 
130  void setOutputTypeName(std::string outputTypeName) {
131  this->outputTypeName = outputTypeName;
132  }
133 
135  return aggComputation;
136  }
137 
138  void setAggTotalPartitions(int numPartitions) {
139  this->aggComputation->setNumPartitions(numPartitions);
140  }
141 
142  void setAggBatchSize(int batchSize) {
143  this->aggComputation->setBatchSize(batchSize);
144  }
145 
147  this->aggComputation = aggComputation;
148  }
149 
150  void setTotalMemoryOnThisNode(size_t totalMem) {
151  this->totalMemoryOnThisNode = totalMem;
152  }
153 
155  return this->totalMemoryOnThisNode;
156  }
157 
158 
160 
161 
162 private:
164 
166 
167 
169 
171 
173 
175 
177 
179 };
180 }
181 
182 #endif
Handle< SetIdentifier > sinkContext
#define ENABLE_DEEP_COPY
Definition: DeepCopy.h:52
void setAggComputation(Handle< AbstractAggregateComp > aggComputation)
JobStageID getStageId() override
Handle< AbstractAggregateComp > aggComputation
AggregationJobStage(JobStageID stageId, bool materializeOrNot, Handle< AbstractAggregateComp > aggComputation, int numNodePartitions)
std::string getJobStageType() override
void setOutputTypeName(std::string outputTypeName)
void setAggTotalPartitions(int numPartitions)
void setSourceContext(Handle< SetIdentifier > sourceContext)
void setNumNodePartitions(int numNodePartitions)
unsigned int JobStageID
Definition: DataTypes.h:37
int16_t getJobStageTypeID() override
Handle< SetIdentifier > getSourceContext()
Handle< SetIdentifier > getSinkContext()
Handle< AbstractAggregateComp > getAggComputation()
void setSinkContext(Handle< SetIdentifier > sinkContext)
void setAggBatchSize(int batchSize)
AggregationJobStage(JobStageID stageId, bool materializeOrNot, Handle< AbstractAggregateComp > aggComputation)
void setTotalMemoryOnThisNode(size_t totalMem)
Handle< SetIdentifier > sourceContext