A platform for high-performance distributed tool and library development written in C++. It can be deployed in two different cluster modes: standalone or distributed. API for v0.5.0, released on June 13, 2018.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
SimplePhysicalJoinNode.cc
Go to the documentation of this file.
1 /*****************************************************************************
2  * *
3  * Copyright 2018 Rice University *
4  * *
5  * Licensed under the Apache License, Version 2.0 (the "License"); *
6  * you may not use this file except in compliance with the License. *
7  * You may obtain a copy of the License at *
8  * *
9  * http://www.apache.org/licenses/LICENSE-2.0 *
10  * *
11  * Unless required by applicable law or agreed to in writing, software *
12  * distributed under the License is distributed on an "AS IS" BASIS, *
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. *
14  * See the License for the specific language governing permissions and *
15  * limitations under the License. *
16  * *
17  *****************************************************************************/
18 #include "AtomicComputation.h"
21 #include "Handle.h"
24 #include "JoinComp.h"
25 
28  const Handle<ComputePlan> &computePlan,
29  LogicalPlanPtr logicalPlan,
30  ConfigurationPtr conf) : SimplePhysicalNode(std::move(jobId),
31  std::move(node),
32  computePlan,
33  logicalPlan,
34  std::move(conf)),
35  transversed(false), rollbacked(false) {}
36 
38  SimplePhysicalNodePtr &prevNode,
39  const StatisticsPtr &stats,
40  int nextStageID) {
41 
42  // grab the computation associated with this node
43  Handle<Computation> comp = logicalPlan->getNode(node->getComputationName()).getComputationHandle();
44 
45  // TODO exit since this is what has been done in previous PhysicalOptimizer, maybe do something smarter
46  PDB_COUT << "Sink Computation Type: " << comp->getComputationType() << " are not supported as sink node right now\n";
47  exit(1);
48 }
49 
51  SimplePhysicalNodePtr &prevNode,
52  const StatisticsPtr &stats,
53  int nextStageID) {
54 
55  // TODO Jia : have no clue why this exists? why use the input name of the current node
56  string targetTupleSetName = prevNode == nullptr ? node->getInputName() : prevNode->getNode()->getOutputName();
57 
58  Handle<SetIdentifier> sink = nullptr;
59 
60  // cast this node to an ApplyJoin
61  shared_ptr<ApplyJoin> joinNode = dynamic_pointer_cast<ApplyJoin>(node);
62 
63  // the computation specifier of this join
64  std::string computationSpecifier = node->getComputationName();
65 
66  // grab the output of the current node
67  std::string outputName = node->getOutputName();
68 
69  // grab the computation associated with this node
70  Handle<Computation> curComp = logicalPlan->getNode(computationSpecifier).getComputationHandle();
71 
72  // We have two main cases to handle when dealing with a join
73  // 1. This is the first time we are processing this join therefore no side of the join has been hashed
74  // and then broadcasted or partitioned, therefore we can not probe it
75  // 2. This is the second time we are processing this join therefore the one side of the join is hashed and then
76  // broadcasted or partitioned, we can therefore probe it!
77  if (!transversed) {
78 
79  // create a analyzer result
80  PhysicalOptimizerResultPtr result = make_shared<PhysicalOptimizerResult>();
81 
82  // get the cost of the source set
83  double sourceCost = getCost(tupleStageBuilder->getSourceSetIdentifier(), stats);
84 
85  // are we already probing a set in the pipeline, and is the cost of the current source smaller than the join
86  // threshold? if so might be better to go back and process the other side of the join first
87  if (tupleStageBuilder->isPipelineProbing() && (sourceCost <= BROADCAST_JOIN_COST_THRESHOLD) && !rollbacked) {
88 
89  // of we tried to rollback the planning for this join if this happens again we will not do it
90  rollbacked = true;
91 
92  // we should go back therefore this analysis is not successful
93  result->success = false;
94  result->physicalPlanToOutput.clear();
95  result->interGlobalSets.clear();
96 
97  //PDB_COUT << "WARNING: met a join sink with probing, to return and put cost " << sourceCost << " " << tupleStageBuilder->getSourceSetIdentifier()->toSourceSetName() << " to penalized list \n";
98 
99  // we return false to signalize that we did no extract a pipeline
100  return result;
101  }
102 
103  // does the cost of the current source exceed the join threshold, if so we need to do a hash partition join
104  // therefore we definitely need to hash the current table this is definitely a pipeline breaker
105  else if (sourceCost > BROADCAST_JOIN_COST_THRESHOLD) {
106 
107  // set the partitioning flag so we can know that when probing
108  joinNode->setPartitioningLHS(true);
109 
110  // cast the computation to a JoinComp
112  join = unsafeCast<JoinComp<Object, Object, Object>, Computation>(curComp);
113 
114  // mark it as a hash partition join
115  join->setJoinType(HashPartitionedJoin);
116 
117  // I am a pipeline breaker.
118  // We first need to create a TupleSetJobStage with a repartition sink
119  sink = makeObject<SetIdentifier>(jobId, outputName + "_repartitionData");
120  sink->setPageSize(conf->getBroadcastPageSize());
121 
122  // set the parameters
123  tupleStageBuilder->setJobStageId(nextStageID++);
124  tupleStageBuilder->setTargetTupleSetName(targetTupleSetName);
125  tupleStageBuilder->setTargetComputationName(computationSpecifier);
126  tupleStageBuilder->setOutputTypeName("IntermediateData");
127  tupleStageBuilder->setSinkContext(sink);
128  tupleStageBuilder->setRepartition(true);
129  tupleStageBuilder->setAllocatorPolicy(curComp->getAllocatorPolicy());
130  tupleStageBuilder->setRepartitionJoin(true);
131 
132  // create the tuple stage to run a pipeline with a hash partition sink
133  Handle<TupleSetJobStage> joinPrepStage = tupleStageBuilder->build();
134 
135  // add the stage to the list of stages to be executed
136  result->physicalPlanToOutput.emplace_back(joinPrepStage);
137 
138  // add the sink to the intermediate sets
139  result->interGlobalSets.push_back(sink);
140 
141  // the source set for the HashPartitionedJoinBuildHTJobStage is the sink set of the TupleSetJobStage
142  hashSetName = sink->toSourceSetName();
143 
144  // initialize the build hash partition set builder stage
146  hashBuilder = make_shared<HashPartitionedJoinBuildHTJobStageBuilder>();
147 
148  // set the parameters
149  hashBuilder->setJobId(jobId);
150  hashBuilder->setJobStageId(nextStageID++);
151  hashBuilder->setSourceTupleSetName(joinPrepStage->getSourceTupleSetSpecifier());
152  hashBuilder->setTargetTupleSetName(targetTupleSetName);
153  hashBuilder->setTargetComputationName(computationSpecifier);
154  hashBuilder->setSourceContext(sink);
155  hashBuilder->setHashSetName(hashSetName);
156  hashBuilder->setComputePlan(computePlan);
157 
158  // create the build hash partitioned join hash table job stage to partition and shuffle the source set
159  Handle<HashPartitionedJoinBuildHTJobStage> joinPartitionStage = hashBuilder->build();
160 
161  // add the stage to the list of stages to be executed
162  result->physicalPlanToOutput.emplace_back(joinPartitionStage);
163 
164  } else {
165 
166  // The other input has not been processed and we can do broadcasting because
167  // costOfCurSource <= JOIN_COST_THRESHOLD. I am a pipeline breaker.
168  // We first need to create a TupleSetJobStage with a broadcasting sink
169  // then a BroadcastJoinBuildHTJobStage to build a hash table of that data.
170 
171  // the set identifier of the set where we store the output of the TupleSetJobStage
172  sink = makeObject<SetIdentifier>(jobId, outputName + "_broadcastData");
173  sink->setPageSize(conf->getBroadcastPageSize());
174 
175  // set the parameters
176  tupleStageBuilder->setJobStageId(nextStageID);
177  tupleStageBuilder->setTargetTupleSetName(targetTupleSetName);
178  tupleStageBuilder->setTargetComputationName(computationSpecifier);
179  tupleStageBuilder->setOutputTypeName("IntermediateData");
180  tupleStageBuilder->setSinkContext(sink);
181  tupleStageBuilder->setBroadcasting(true);
182  tupleStageBuilder->setAllocatorPolicy(curComp->getAllocatorPolicy());
183 
184  // We are setting isBroadcasting to true so that we run a pipeline with broadcast sink
185  Handle<TupleSetJobStage> joinPrepStage = tupleStageBuilder->build();
186 
187  // add the stage to the list of stages to be executed
188  result->physicalPlanToOutput.emplace_back(joinPrepStage);
189 
190  // add the sink to the intermediate sets
191  result->interGlobalSets.push_back(sink);
192 
193  // grab the hash set name
194  hashSetName = sink->toSourceSetName();
195 
196  // initialize the build hash partition set builder stage
197  BroadcastJoinBuildHTJobStageBuilderPtr broadcastBuilder = make_shared<BroadcastJoinBuildHTJobStageBuilder>();
198 
199  // set the parameters
200  broadcastBuilder->setJobId(jobId);
201  broadcastBuilder->setJobStageId(nextStageID);
202  broadcastBuilder->setSourceTupleSetName(joinPrepStage->getSourceTupleSetSpecifier());
203  broadcastBuilder->setTargetTupleSetName(targetTupleSetName);
204  broadcastBuilder->setTargetComputationName(computationSpecifier);
205  broadcastBuilder->setSourceContext(sink);
206  broadcastBuilder->setHashSetName(hashSetName);
207  broadcastBuilder->setComputePlan(computePlan);
208 
209  // We then create a BroadcastJoinBuildHTStage
210  Handle<BroadcastJoinBuildHTJobStage> joinBroadcastStage = broadcastBuilder->build();
211 
212  // add the stage to the list of stages to be executed
213  result->physicalPlanToOutput.emplace_back(joinBroadcastStage);
214  }
215 
216  // We should not go further, we set it to traversed and leave it to
217  // other join inputs, and simply return
218  transversed = true;
219 
220  // set the remaining parameters of the result
221  result->success = true;
222 
223  // return to indicate the we succeeded
224  return result;
225  } else {
226 
227  // at this point we know that the other side of the join has been processed now we need to figure out how?
228  // There are two options :
229  // 1. is that it has been partitioned, in this case we are probing a partitioned hash table
230  // 2. is that it has been broadcasted, in that case we are probing a broadcasted hash table
231 
232  // check if the other side has been partitioned
233  if (joinNode->isPartitioningLHS()) {
234 
235  // we have only one consumer node so we know what the next node in line is
236  SimplePhysicalNodePtr nextNode = activeConsumers.front();
237 
238  // fist we need to shuffle our data from the other side and put it in this sink set
239  sink = makeObject<SetIdentifier>(jobId, outputName + "_repartitionData");
240  sink->setPageSize(conf->getBroadcastPageSize());
241 
242  // set the parameters
243  tupleStageBuilder->setJobStageId(nextStageID++);
244  tupleStageBuilder->setTargetTupleSetName(targetTupleSetName);
245  tupleStageBuilder->setTargetComputationName(computationSpecifier);
246  tupleStageBuilder->setOutputTypeName("IntermediateData");
247  tupleStageBuilder->setSinkContext(sink);
248  tupleStageBuilder->setRepartition(true);
249  tupleStageBuilder->setAllocatorPolicy(curComp->getAllocatorPolicy());
250  tupleStageBuilder->setRepartitionJoin(true);
251 
252  // we first create a pipeline breaker to partition RHS by setting
253  // the isRepartitioning=true and isRepartitionJoin=true
254  Handle<TupleSetJobStage> joinPrepStage = tupleStageBuilder->build();
255 
256  // ok we have added a tuple stage to shuffle the data now we need one to probe the hash set
257  // therefore we create a job stage builder
258  TupleSetJobStageBuilderPtr probingStageBuilder = make_shared<TupleSetJobStageBuilder>();
259 
260  // set the job id
261  probingStageBuilder->setJobId(joinPrepStage->getJobId());
262 
263  // the join source becomes the original source
264  probingStageBuilder->setSourceTupleSetName(tupleStageBuilder->getSourceTupleSetName());
265 
266  // grab the last computation to be applied (that thing is our hashing thingy)
267  string lastOne = tupleStageBuilder->getLastSetThatBuildsPipeline();
268 
269  // add the last and the current
270  probingStageBuilder->addTupleSetToBuildPipeline(lastOne);
271  probingStageBuilder->addTupleSetToBuildPipeline(node->getOutputName());
272 
273  // add the hash set this pipeline is going to probe
274  probingStageBuilder->addHashSetToProbe(outputName, hashSetName);
275  probingStageBuilder->setProbing(true);
276  probingStageBuilder->setSourceContext(sink);
277  probingStageBuilder->setSourceTupleSetName(tupleStageBuilder->getSourceTupleSetName());
278  probingStageBuilder->setComputePlan(computePlan);
279 
280  // I am the previous node
281  SimplePhysicalNodePtr newPrevNode = getSimpleNodeHandle();
282 
283  // temporary alias the source set with the sink set because the are essentially the same and we need the stats
284  stats->addSetAlias(tupleStageBuilder->getSourceSetIdentifier()->getDatabase(),
285  tupleStageBuilder->getSourceSetIdentifier()->getSetName(),
286  sink->getDatabase(),
287  sink->getSetName());
288 
289  // we then create a pipeline stage to probe the partitioned hash table
290  PhysicalOptimizerResultPtr result = activeConsumers.front()->analyze(probingStageBuilder,
291  newPrevNode,
292  stats,
293  nextStageID);
294 
295  // remove the temporary alias
296  stats->removeSet(sink->getDatabase(), sink->getSetName());
297 
298  // add the stage to the list of stages to be executed
299  result->physicalPlanToOutput.emplace_front(joinPrepStage);
300 
301  // add the output of this TupleSetJobStage to the list of intermediate sets
302  result->interGlobalSets.emplace_front(sink);
303 
304  return result;
305 
306  } else {
307 
308  // we probe the broadcasted hash table
309  // if my other input has been processed, I am not a pipeline breaker, but we
310  // should set the correct hash set names for probing
311  tupleStageBuilder->addTupleSetToBuildPipeline(node->getOutputName());
312  tupleStageBuilder->addHashSetToProbe(outputName, hashSetName);
313  tupleStageBuilder->setProbing(true);
314 
315  // I am the previous node
316  SimplePhysicalNodePtr newPrevNode = getSimpleNodeHandle();
317 
318  // go and analyze further
319  return activeConsumers.front()->analyze(tupleStageBuilder, newPrevNode, stats, nextStageID);
320  }
321  }
322 }
PhysicalOptimizerResultPtr analyzeSingleConsumer(TupleSetJobStageBuilderPtr &tupleStageBuilder, SimplePhysicalNodePtr &prevNode, const StatisticsPtr &stats, int nextStageID) override
std::shared_ptr< Statistics > StatisticsPtr
Definition: Statistics.h:27
std::string & getComputationName()
std::shared_ptr< LogicalPlan > LogicalPlanPtr
Definition: ComputePlan.h:36
std::shared_ptr< HashPartitionedJoinBuildHTJobStageBuilder > HashPartitionedJoinBuildHTJobStageBuilderPtr
PhysicalOptimizerResultPtr analyzeOutput(TupleSetJobStageBuilderPtr &ptr, SimplePhysicalNodePtr &prevNode, const StatisticsPtr &stats, int nextStageID) override
std::shared_ptr< BroadcastJoinBuildHTJobStageBuilder > BroadcastJoinBuildHTJobStageBuilderPtr
#define PDB_COUT
Definition: PDBDebug.h:31
shared_ptr< Configuration > ConfigurationPtr
Definition: Configuration.h:89
std::shared_ptr< struct AtomicComputation > AtomicComputationPtr
std::shared_ptr< SimplePhysicalNode > SimplePhysicalNodePtr
SimplePhysicalJoinNode(string jobId, AtomicComputationPtr node, const Handle< ComputePlan > &computePlan, LogicalPlanPtr logicalPlan, ConfigurationPtr conf)
std::shared_ptr< TupleSetJobStageBuilder > TupleSetJobStageBuilderPtr
std::shared_ptr< PhysicalOptimizerResult > PhysicalOptimizerResultPtr