A platform for high-performance distributed tool and library development written in C++. It can be deployed in two different cluster modes: standalone or distributed. API for v0.5.0, released on June 13, 2018.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
TupleSetJobStageBuilder.cc
Go to the documentation of this file.
1 /*****************************************************************************
2  * *
3  * Copyright 2018 Rice University *
4  * *
5  * Licensed under the Apache License, Version 2.0 (the "License"); *
6  * you may not use this file except in compliance with the License. *
7  * You may obtain a copy of the License at *
8  * *
9  * http://www.apache.org/licenses/LICENSE-2.0 *
10  * *
11  * Unless required by applicable law or agreed to in writing, software *
12  * distributed under the License is distributed on an "AS IS" BASIS, *
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. *
14  * See the License for the specific language governing permissions and *
15  * limitations under the License. *
16  * *
17  *****************************************************************************/
19 
20 
21 namespace pdb {
22 
24 
25  hashSetsToProbe = makeObject<Map<String, String>>();
27 
28  // the default for all types of sinks is false
29  isProbing = false;
30  isRepartitionJoin = false;
31  isRepartitionVector = false;
32  isRepartitioning = false;
33  isBroadcasting = false;
34  isCollectAsMap = false;
35 
36  // set the set identifiers to null
37  sourceContext = nullptr;
38  sinkContext = nullptr;
39  combinerContext = nullptr;
40 
42 }
43 
44 void TupleSetJobStageBuilder::setJobId(const std::string &jobId) {
45  this->jobId = jobId;
46 }
47 
49  this->jobStageId = jobStageId;
50 }
51 
53  this->computePlan = plan;
54 }
55 
56 void TupleSetJobStageBuilder::setSourceTupleSetName(const std::string &sourceTupleSetName) {
57  this->sourceTupleSetName = sourceTupleSetName;
58 }
59 
60 void TupleSetJobStageBuilder::setTargetTupleSetName(const std::string &targetTupleSetName) {
61  this->targetTupleSetName = targetTupleSetName;
62 }
63 
64 void TupleSetJobStageBuilder::setTargetComputationName(const std::string &targetComputationSpecifier) {
65  this->targetComputationName = targetComputationSpecifier;
66 }
67 
68 void TupleSetJobStageBuilder::addTupleSetToBuildPipeline(const std::string &buildMe) {
69  this->buildTheseTupleSets.push_back(buildMe);
70 }
71 
72 void TupleSetJobStageBuilder::addHashSetToProbe(const std::string &outputName, const std::string &hashSetName) {
73  (*hashSetsToProbe)[outputName] = hashSetName;
74 }
75 
77  this->sourceContext = sourceContext;
78 }
79 
81  this->sinkContext = sinkContext;
82 }
83 
85  this->combinerContext = combinerContext;
86 }
87 
88 void TupleSetJobStageBuilder::setOutputTypeName(const std::string &outputTypeName) {
89  this->outputTypeName = outputTypeName;
90 }
91 
93  this->isProbing = isProbing;
94 }
95 
97  this->policy = policy;
98 }
99 
100 void TupleSetJobStageBuilder::setRepartitionJoin(bool repartitionJoinOrNot) {
101  this->isRepartitionJoin = repartitionJoinOrNot;
102 }
103 
104 void TupleSetJobStageBuilder::setRepartitionVector(bool repartitionVectorOrNot) {
105  this->isRepartitionVector = repartitionVectorOrNot;
106 }
107 
108 void TupleSetJobStageBuilder::setBroadcasting(bool broadcastOrNot) {
109  this->isBroadcasting = broadcastOrNot;
110 }
111 
112 void TupleSetJobStageBuilder::setRepartition(bool repartitionOrNot) {
113  this->isRepartitioning = repartitionOrNot;
114 }
115 
118 }
119 
120 void TupleSetJobStageBuilder::setCollectAsMap(bool collectAsMapOrNot) {
121  this->isCollectAsMap = collectAsMapOrNot;
122 }
123 
125  this->numNodesToCollect = numNodesToCollect;
126 }
127 
129  return isProbing;
130 }
131 
133  return sourceContext;
134 }
135 
137  return sourceTupleSetName;
138 }
139 
141  return buildTheseTupleSets.back();
142 }
143 
145 
146  // create an instance of the tuple set job stage
147  Handle<TupleSetJobStage> jobStage = makeObject<TupleSetJobStage>(jobStageId);
148 
149  // set the parameters
151  jobStage->setTupleSetsToBuildPipeline(buildTheseTupleSets);
152  jobStage->setSourceContext(sourceContext);
153  jobStage->setSinkContext(sinkContext);
154  jobStage->setOutputTypeName(outputTypeName);
155  jobStage->setAllocatorPolicy(policy);
156  jobStage->setRepartitionJoin(isRepartitionJoin);
157  jobStage->setRepartitionVector(isRepartitionVector);
158  jobStage->setBroadcasting(isBroadcasting);
159  jobStage->setRepartition(isRepartitioning);
160  jobStage->setJobId(this->jobId);
161  jobStage->setCollectAsMap(isCollectAsMap);
162  jobStage->setNumNodesToCollect(numNodesToCollect);
163 
164  // do we have all the parameters set to do probing
165  if (hashSetsToProbe != nullptr && isProbing) {
166 
167  // set probing to true
168  jobStage->setProbing(true);
169 
170  // set them
171  jobStage->setHashSetsToProbe(hashSetsToProbe);
172  }
173 
174  // are we using a combiner
175  if (combinerContext != nullptr) {
176 
177  // set the parameters for the combiner
178  jobStage->setCombinerContext(combinerContext);
179  jobStage->setCombining(true);
180  }
181 
182  // aggregation output should not be kept across
183  // stages; if an aggregation has more than one
184  // consumers, we need materialize aggregation
185  // results.
186  if (sourceContext->isAggregationResult()) {
187  jobStage->setInputAggHashOut(true);
188  }
189 
190  PDB_COUT << "PhysicalOptimizer generates tupleSetJobStage:" << "\n";
191  return jobStage;
192 }
193 
194 }
195 
void setSourceContext(const Handle< SetIdentifier > &sourceContext)
Handle< SetIdentifier > sinkContext
void setComputePlan(const Handle< ComputePlan > &plan)
AllocatorPolicy
Definition: Allocator.h:130
void setBroadcasting(bool broadcastOrNot)
void setRepartitionVector(bool repartitionVectorOrNot)
void addTupleSetToBuildPipeline(const std::string &buildMe)
void setJobId(const std::string &jobId)
Handle< TupleSetJobStage > build()
std::vector< std::string > buildTheseTupleSets
void setSinkContext(const Handle< SetIdentifier > &sinkContext)
Handle< Map< String, String > > hashSetsToProbe
void setAllocatorPolicy(AllocatorPolicy policy)
void setNumNodesToCollect(int numNodesToCollect)
void setInputAggHashOut(bool inputAggHashOut)
const std::string & getLastSetThatBuildsPipeline() const
void setOutputTypeName(const std::string &outputTypeName)
void setTargetTupleSetName(const std::string &targetTupleSetName)
#define PDB_COUT
Definition: PDBDebug.h:31
Handle< SetIdentifier > getSourceSetIdentifier()
void addHashSetToProbe(const std::string &outputName, const std::string &hashSetName)
void setCollectAsMap(bool collectAsMapOrNot)
void setSourceTupleSetName(const std::string &sourceTupleSetSpecifier)
void setTargetComputationName(const std::string &targetComputationSpecifier)
void setRepartition(bool repartitionOrNot)
Handle< SetIdentifier > combinerContext
void setRepartitionJoin(bool repartitionJoinOrNot)
void setCombiner(Handle< SetIdentifier > combinerContext)
Handle< SetIdentifier > sourceContext
const std::string & getSourceTupleSetName() const