A platform for high-performance distributed tool and library development written in C++. It can be deployed in two different cluster modes: standalone or distributed. API for v0.5.0, released on June 13, 2018.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
AdvancedPhysicalShuffleSetAlgorithm.cc
Go to the documentation of this file.
1 /*****************************************************************************
2  * *
3  * Copyright 2018 Rice University *
4  * *
5  * Licensed under the Apache License, Version 2.0 (the "License"); *
6  * you may not use this file except in compliance with the License. *
7  * You may obtain a copy of the License at *
8  * *
9  * http://www.apache.org/licenses/LICENSE-2.0 *
10  * *
11  * Unless required by applicable law or agreed to in writing, software *
12  * distributed under the License is distributed on an "AS IS" BASIS, *
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. *
14  * See the License for the specific language governing permissions and *
15  * limitations under the License. *
16  * *
17  *****************************************************************************/
18 
21 
22 namespace pdb {
23 
25  const std::string &jobID,
26  bool isProbing,
27  bool isOutput,
28  const Handle<SetIdentifier> &source,
29  const Handle<ComputePlan> &computePlan,
30  const LogicalPlanPtr &logicalPlan,
32  jobID,
33  isProbing,
34  isOutput,
35  source,
36  computePlan,
37  logicalPlan,
38  conf) {}
40  const StatisticsPtr &stats) {
41 
42  // extract the atomic computations from the pipes for this algorithm
44 
45  // extract the hash sets we might want to probe
47 
48  // get the source atomic computation
49  auto sourceAtomicComputation = this->pipelineComputations.front();
50 
51  // we get the first atomic computation of the join pipeline this should be the apply join computation
52  auto joinAtomicComputation = pipeline.back()->getConsumer(0)->to<AdvancedPhysicalAbstractPipe>()->getPipelineComputationAt(0);
53 
54  // get the final atomic computation
55  string finalAtomicComputationName = this->pipelineComputations.back()->getOutputName();
56 
57  // the computation specifier of this join
58  std::string computationSpecifier = joinAtomicComputation->getComputationName();
59 
60  // grab the output of the current node
61  std::string outputName = joinAtomicComputation->getOutputName();
62 
63  // grab the computation associated with this node
64  Handle<Computation> curComp = logicalPlan->getNode(computationSpecifier).getComputationHandle();
65 
66  // fist we need to shuffle our data from the other side and put it in this sink set
67  sink = makeObject<SetIdentifier>(jobID, outputName + "_repartitionData");
68  sink->setPageSize(conf->getBroadcastPageSize());
69 
70  // create a tuple set job stage builder
71  TupleSetJobStageBuilderPtr tupleStageBuilder = make_shared<TupleSetJobStageBuilder>();
72 
73  // copy the computation names
74  for(const auto &it : this->pipelineComputations) {
75 
76  // we don't need the output set name... (that is jsut the way the pipeline building works)
77  if(it->getAtomicComputationTypeID() == WriteSetTypeID) {
78  continue;
79  }
80 
81  // add the set name of the atomic computation to the pipeline
82  tupleStageBuilder->addTupleSetToBuildPipeline(it->getOutputName());
83  }
84 
85  // set the parameters
86  tupleStageBuilder->setSourceTupleSetName(sourceAtomicComputation->getOutputName());
87  tupleStageBuilder->setSourceContext(source);
88  tupleStageBuilder->setInputAggHashOut(source->isAggregationResult());
89  tupleStageBuilder->setJobId(jobID);
90  tupleStageBuilder->setProbing(isProbing);
91  tupleStageBuilder->setComputePlan(computePlan);
92  tupleStageBuilder->setJobStageId(nextStageID++);
93  tupleStageBuilder->setTargetTupleSetName(finalAtomicComputationName);
94  tupleStageBuilder->setTargetComputationName(computationSpecifier);
95  tupleStageBuilder->setOutputTypeName("IntermediateData");
96  tupleStageBuilder->setSinkContext(sink);
97  tupleStageBuilder->setRepartition(true);
98  tupleStageBuilder->setAllocatorPolicy(curComp->getAllocatorPolicy());
99  tupleStageBuilder->setRepartitionJoin(true);
100 
101  // add all the probing hash sets
102  for(auto it : probingHashSets) {
103  tupleStageBuilder->addHashSetToProbe(it.first, it.second);
104  }
105 
106  // update the consumers
108 
109  // we first create a pipeline breaker to partition RHS by setting
110  // the isRepartitioning=true and isRepartitionJoin=true
111  Handle<TupleSetJobStage> joinPrepStage = tupleStageBuilder->build();
112 
113  // create a analyzer result
114  PhysicalOptimizerResultPtr result = make_shared<PhysicalOptimizerResult>();
115 
116  // add the stage to the list of stages to be executed
117  result->physicalPlanToOutput.emplace_front(joinPrepStage);
118 
119  // add the output of this TupleSetJobStage to the list of intermediate sets
120  result->interGlobalSets.emplace_front(sink);
121 
122  // set the remaining parameters of the result
123  result->success = true;
124 
125  // update the source pipes to reflect the state after executing the job stages
126  // if we have a consumer we have a new source pipe since we materialize this result
127  if(pipeline.back()->getNumConsumers() != 0) {
128 
129  // add consumers as new sources
130  for(int i = 0; i < pipeline.back()->getNumConsumers(); ++i) {
131  result->createdSourceComputations.push_back(pipeline.back()->getConsumer(i));
132  }
133  }
134 
135  return result;
136 }
137 
140 }
141 
142 }
143 
144 
PhysicalOptimizerResultPtr generate(int nextStageID, const StatisticsPtr &stats) override
std::shared_ptr< Statistics > StatisticsPtr
Definition: Statistics.h:27
void updateConsumers(const Handle< SetIdentifier > &sink, DataStatistics approxSize, const StatisticsPtr &stats)
std::list< AdvancedPhysicalPipelineNodePtr > pipeline
AdvancedPhysicalShuffleSetAlgorithm(const AdvancedPhysicalPipelineNodePtr &handle, const std::string &jobID, bool isProbing, bool isOutput, const Handle< SetIdentifier > &source, const Handle< ComputePlan > &computePlan, const LogicalPlanPtr &logicalPlan, const ConfigurationPtr &conf)
std::shared_ptr< LogicalPlan > LogicalPlanPtr
Definition: ComputePlan.h:36
virtual DataStatistics approximateResultSize(const StatisticsPtr &stats)
AdvancedPhysicalAbstractAlgorithmTypeID getType() override
shared_ptr< Configuration > ConfigurationPtr
Definition: Configuration.h:89
unordered_map< std::string, std::string > probingHashSets
std::shared_ptr< AdvancedPhysicalAbstractPipe > AdvancedPhysicalPipelineNodePtr
std::shared_ptr< TupleSetJobStageBuilder > TupleSetJobStageBuilderPtr
std::shared_ptr< PhysicalOptimizerResult > PhysicalOptimizerResultPtr