A platform for high-performance distributed tool and library development written in C++. It can be deployed in two different cluster modes: standalone or distributed. API for v0.5.0, released on June 13, 2018.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
AdvancedPhysicalJoinBroadcastedHashsetAlgorithm.cc
Go to the documentation of this file.
1 /*****************************************************************************
2  * *
3  * Copyright 2018 Rice University *
4  * *
5  * Licensed under the Apache License, Version 2.0 (the "License"); *
6  * you may not use this file except in compliance with the License. *
7  * You may obtain a copy of the License at *
8  * *
9  * http://www.apache.org/licenses/LICENSE-2.0 *
10  * *
11  * Unless required by applicable law or agreed to in writing, software *
12  * distributed under the License is distributed on an "AS IS" BASIS, *
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. *
14  * See the License for the specific language governing permissions and *
15  * limitations under the License. *
16  * *
17  *****************************************************************************/
18 
23 
24 AdvancedPhysicalJoinBroadcastedHashsetAlgorithm::AdvancedPhysicalJoinBroadcastedHashsetAlgorithm(const AdvancedPhysicalPipelineNodePtr &handle,
25  const std::string &jobID,
26  bool isProbing,
27  bool isOutput,
28  const Handle<SetIdentifier> &source,
29  const Handle<ComputePlan> &computePlan,
30  const LogicalPlanPtr &logicalPlan,
31  const ConfigurationPtr &conf) :
33  jobID,
34  isProbing,
35  isOutput,
36  source,
37  computePlan,
38  logicalPlan,
39  conf) {}
40 
42  const StatisticsPtr &stats) {
43 
44  // extract the atomic computations from the pipes for this algorithm
46 
47  // extract the hash sets we might want to probe
49 
50  // create a analyzer result
51  PhysicalOptimizerResultPtr result = make_shared<PhysicalOptimizerResult>();
52 
53  // get the source atomic computation
54  auto sourceAtomicComputation = this->pipelineComputations.front();
55 
56  // we get the first atomic computation of the join pipeline that comes after this one.
57  // This computation should be the apply join computation
58  auto joinAtomicComputation = pipeline.back()->getConsumer(0)->to<AdvancedPhysicalAbstractPipe>()->getPipelineComputationAt(0);
59 
60  // get the final atomic computation
61  string finalAtomicComputationName = this->pipelineComputations.back()->getOutputName();
62 
63  // the computation specifier of this join
64  std::string computationSpecifier = joinAtomicComputation->getComputationName();
65 
66  // grab the computation associated with this node
67  Handle<Computation> curComp = logicalPlan->getNode(computationSpecifier).getComputationHandle();
68 
69  // grab the output of the current node
70  std::string outputName = joinAtomicComputation->getOutputName();
71 
72  // the set identifier of the set where we store the output of the TupleSetJobStage
73  sink = makeObject<SetIdentifier>(jobID, outputName + "_broadcastData");
74  sink->setPageSize(conf->getBroadcastPageSize());
75 
76  // create a tuple set job stage builder
77  TupleSetJobStageBuilderPtr tupleStageBuilder = make_shared<TupleSetJobStageBuilder>();
78 
79  // copy the computation names
80  for (const auto &it : this->pipelineComputations) {
81 
82  // we don't need the output set name... (that is jsut the way the pipeline building works)
83  if (it->getAtomicComputationTypeID() == WriteSetTypeID) {
84  continue;
85  }
86 
87  // add the set name of the atomic computation to the pipeline
88  tupleStageBuilder->addTupleSetToBuildPipeline(it->getOutputName());
89  }
90 
91  // set the parameters
92  tupleStageBuilder->setSourceTupleSetName(sourceAtomicComputation->getOutputName());
93  tupleStageBuilder->setSourceContext(source);
94  tupleStageBuilder->setInputAggHashOut(source->isAggregationResult());
95  tupleStageBuilder->setJobId(jobID);
96  tupleStageBuilder->setProbing(isProbing);
97  tupleStageBuilder->setComputePlan(computePlan);
98  tupleStageBuilder->setJobStageId(nextStageID);
99  tupleStageBuilder->setTargetTupleSetName(finalAtomicComputationName);
100  tupleStageBuilder->setTargetComputationName(computationSpecifier);
101  tupleStageBuilder->setOutputTypeName("IntermediateData");
102  tupleStageBuilder->setSinkContext(sink);
103  tupleStageBuilder->setBroadcasting(true);
104  tupleStageBuilder->setAllocatorPolicy(curComp->getAllocatorPolicy());
105 
106  // add all the probing hash sets
107  for(auto it : probingHashSets) {
108  tupleStageBuilder->addHashSetToProbe(it.first, it.second);
109  }
110 
111  // We are setting isBroadcasting to true so that we run a pipeline with broadcast sink
112  Handle<TupleSetJobStage> joinPrepStage = tupleStageBuilder->build();
113 
114  // add the stage to the list of stages to be executed
115  result->physicalPlanToOutput.emplace_back(joinPrepStage);
116 
117  // add the sink to the intermediate sets
118  result->interGlobalSets.push_back(sink);
119 
120  // grab the hash set name
121  std::string hashSetName = sink->toSourceSetName();
122 
123  // initialize the build hash partition set builder stage
124  BroadcastJoinBuildHTJobStageBuilderPtr broadcastBuilder = make_shared<BroadcastJoinBuildHTJobStageBuilder>();
125 
126  // set the parameters
127  broadcastBuilder->setJobId(jobID);
128  broadcastBuilder->setJobStageId(nextStageID);
129  broadcastBuilder->setSourceTupleSetName(joinPrepStage->getSourceTupleSetSpecifier());
130  broadcastBuilder->setTargetTupleSetName(finalAtomicComputationName);
131  broadcastBuilder->setTargetComputationName(computationSpecifier);
132  broadcastBuilder->setSourceContext(sink);
133  broadcastBuilder->setHashSetName(hashSetName);
134  broadcastBuilder->setComputePlan(computePlan);
135 
136  // We then create a BroadcastJoinBuildHTStage
137  Handle<BroadcastJoinBuildHTJobStage> joinBroadcastStage = broadcastBuilder->build();
138 
139  // we set the name of the hash we just generated
140  pipeline.back()->to<AdvancedPhysicalJoinSidePipe>()->setHashSet(hashSetName);
141 
142  // add the stage to the list of stages to be executed
143  result->physicalPlanToOutput.emplace_back(joinBroadcastStage);
144 
145  // set the remaining parameters of the result
146  result->success = true;
147 
148  return result;
149 }
150 
153 }
154 
155 
std::shared_ptr< Statistics > StatisticsPtr
Definition: Statistics.h:27
std::list< AdvancedPhysicalPipelineNodePtr > pipeline
std::shared_ptr< LogicalPlan > LogicalPlanPtr
Definition: ComputePlan.h:36
std::shared_ptr< BroadcastJoinBuildHTJobStageBuilder > BroadcastJoinBuildHTJobStageBuilderPtr
PhysicalOptimizerResultPtr generate(int nextStageID, const StatisticsPtr &stats) override
shared_ptr< Configuration > ConfigurationPtr
Definition: Configuration.h:89
unordered_map< std::string, std::string > probingHashSets
std::shared_ptr< AdvancedPhysicalAbstractPipe > AdvancedPhysicalPipelineNodePtr
std::shared_ptr< TupleSetJobStageBuilder > TupleSetJobStageBuilderPtr
std::shared_ptr< PhysicalOptimizerResult > PhysicalOptimizerResultPtr