A platform for high-performance distributed tool and library development written in C++. It can be deployed in two different cluster modes: standalone or distributed. API for v0.5.0, released on June 13, 2018.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
AdvancedPhysicalShuffledHashsetPipelineAlgorithm.cc
Go to the documentation of this file.
1 /*****************************************************************************
2  * *
3  * Copyright 2018 Rice University *
4  * *
5  * Licensed under the Apache License, Version 2.0 (the "License"); *
6  * you may not use this file except in compliance with the License. *
7  * You may obtain a copy of the License at *
8  * *
9  * http://www.apache.org/licenses/LICENSE-2.0 *
10  * *
11  * Unless required by applicable law or agreed to in writing, software *
12  * distributed under the License is distributed on an "AS IS" BASIS, *
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. *
14  * See the License for the specific language governing permissions and *
15  * limitations under the License. *
16  * *
17  *****************************************************************************/
18 
21 #include <JoinComp.h>
24 
25 AdvancedPhysicalShuffledHashsetPipelineAlgorithm::AdvancedPhysicalShuffledHashsetPipelineAlgorithm(const AdvancedPhysicalPipelineNodePtr &handle,
26  const std::string &jobID,
27  bool isProbing,
28  bool isOutput,
29  const Handle<SetIdentifier> &source,
30  const Handle<ComputePlan> &computePlan,
31  const LogicalPlanPtr &logicalPlan,
32  const ConfigurationPtr &conf)
34  jobID,
35  isProbing,
36  isOutput,
37  source,
38  computePlan,
39  logicalPlan,
40  conf) {}
41 
43  const StatisticsPtr &stats) {
44 
45  // extract the atomic computations from the pipes for this algorithm
47 
48  // extract the hash sets we might want to probe
50 
51  // create a analyzer result
52  PhysicalOptimizerResultPtr result = make_shared<PhysicalOptimizerResult>();
53 
54  // get the source atomic computation
55  auto sourceAtomicComputation = this->pipelineComputations.front();
56 
57  // we get the first atomic computation of the join pipeline this should be the apply join computation
58  auto joinAtomicComputation = pipeline.back()->getConsumer(0)->to<AdvancedPhysicalAbstractPipe>()->getPipelineComputationAt(0);
59 
60  // get the final atomic computation
61  string finalAtomicComputationName = this->pipelineComputations.back()->getOutputName();
62 
63  Handle<SetIdentifier> sink = nullptr;
64 
65  // the computation specifier of this join
66  std::string computationSpecifier = joinAtomicComputation->getComputationName();
67 
68  // grab the output of the current node
69  std::string outputName = joinAtomicComputation->getOutputName();
70 
71  // grab the computation associated with this node
72  Handle<Computation> curComp = logicalPlan->getNode(computationSpecifier).getComputationHandle();
73 
74  // cast the computation to a JoinComp
75  Handle<JoinComp<Object, Object, Object>> join = unsafeCast<JoinComp<Object, Object, Object>, Computation>(curComp);
76 
77  // mark it as a hash partition join
78  join->setJoinType(HashPartitionedJoin);
79 
80  // I am a pipeline breaker.
81  // We first need to create a TupleSetJobStage with a repartition sink
82  sink = makeObject<SetIdentifier>(jobID, outputName + "_repartitionData");
83  sink->setPageSize(conf->getBroadcastPageSize());
84 
85  // create a tuple set job stage builder
86  TupleSetJobStageBuilderPtr tupleStageBuilder = make_shared<TupleSetJobStageBuilder>();
87 
88  // copy the computation names
89  for(const auto &it : this->pipelineComputations) {
90 
91  // we don't need the output set name... (that is jsut the way the pipeline building works)
92  if(it->getAtomicComputationTypeID() == WriteSetTypeID) {
93  continue;
94  }
95 
96  // add the set name of the atomic computation to the pipeline
97  tupleStageBuilder->addTupleSetToBuildPipeline(it->getOutputName());
98  }
99 
100  // set the parameters
101  tupleStageBuilder->setSourceTupleSetName(sourceAtomicComputation->getOutputName());
102  tupleStageBuilder->setSourceContext(source);
103  tupleStageBuilder->setInputAggHashOut(source->isAggregationResult());
104  tupleStageBuilder->setJobId(jobID);
105  tupleStageBuilder->setProbing(isProbing);
106  tupleStageBuilder->setComputePlan(computePlan);
107  tupleStageBuilder->setJobStageId(nextStageID++);
108  tupleStageBuilder->setTargetTupleSetName(finalAtomicComputationName);
109  tupleStageBuilder->setTargetComputationName(computationSpecifier);
110  tupleStageBuilder->setOutputTypeName("IntermediateData");
111  tupleStageBuilder->setSinkContext(sink);
112  tupleStageBuilder->setRepartition(true);
113  tupleStageBuilder->setAllocatorPolicy(curComp->getAllocatorPolicy());
114  tupleStageBuilder->setRepartitionJoin(true);
115 
116  // add all the probing hash sets
117  for(auto it : probingHashSets) {
118  tupleStageBuilder->addHashSetToProbe(it.first, it.second);
119  }
120 
121  // create the tuple stage to run a pipeline with a hash partition sink
122  Handle<TupleSetJobStage> joinPrepStage = tupleStageBuilder->build();
123 
124  // add the stage to the list of stages to be executed
125  result->physicalPlanToOutput.emplace_back(joinPrepStage);
126 
127  // add the sink to the intermediate sets
128  result->interGlobalSets.push_back(sink);
129 
130  // the source set for the HashPartitionedJoinBuildHTJobStage is the sink set of the TupleSetJobStage
131  std::string hashSetName = sink->toSourceSetName();
132 
133  // initialize the build hash partition set builder stage
134  HashPartitionedJoinBuildHTJobStageBuilderPtr hashBuilder = make_shared<HashPartitionedJoinBuildHTJobStageBuilder>();
135 
136  // set the parameters
137  hashBuilder->setJobId(jobID);
138  hashBuilder->setJobStageId(nextStageID);
139  hashBuilder->setSourceTupleSetName(joinPrepStage->getSourceTupleSetSpecifier());
140  hashBuilder->setTargetTupleSetName(finalAtomicComputationName);
141  hashBuilder->setTargetComputationName(computationSpecifier);
142  hashBuilder->setSourceContext(sink);
143  hashBuilder->setHashSetName(hashSetName);
144  hashBuilder->setComputePlan(computePlan);
145 
146  // we set the name of the hash we just generated
147  pipeline.back()->to<AdvancedPhysicalJoinSidePipe>()->setHashSet(hashSetName);
148 
149  // create the build hash partitioned join hash table job stage to partition and shuffle the source set
150  Handle<HashPartitionedJoinBuildHTJobStage> joinPartitionStage = hashBuilder->build();
151 
152  // add the stage to the list of stages to be executed
153  result->physicalPlanToOutput.emplace_back(joinPartitionStage);
154 
155  // set the remaining parameters of the result
156  result->success = true;
157 
158  return result;
159 }
160 
163 }
164 
std::shared_ptr< Statistics > StatisticsPtr
Definition: Statistics.h:27
std::list< AdvancedPhysicalPipelineNodePtr > pipeline
std::shared_ptr< LogicalPlan > LogicalPlanPtr
Definition: ComputePlan.h:36
std::shared_ptr< HashPartitionedJoinBuildHTJobStageBuilder > HashPartitionedJoinBuildHTJobStageBuilderPtr
PhysicalOptimizerResultPtr generate(int nextStageID, const StatisticsPtr &stats) override
shared_ptr< Configuration > ConfigurationPtr
Definition: Configuration.h:89
unordered_map< std::string, std::string > probingHashSets
std::shared_ptr< AdvancedPhysicalAbstractPipe > AdvancedPhysicalPipelineNodePtr
std::shared_ptr< TupleSetJobStageBuilder > TupleSetJobStageBuilderPtr
std::shared_ptr< PhysicalOptimizerResult > PhysicalOptimizerResultPtr