A platform for high-performance distributed tool and library development written in C++. It can be deployed in two different cluster modes: standalone or distributed. API for v0.5.0, released on June 13, 2018.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
SimplePhysicalPartitionNode.cc
Go to the documentation of this file.
1 /*****************************************************************************
2  * *
3  * Copyright 2018 Rice University *
4  * *
5  * Licensed under the Apache License, Version 2.0 (the "License"); *
6  * you may not use this file except in compliance with the License. *
7  * You may obtain a copy of the License at *
8  * *
9  * http://www.apache.org/licenses/LICENSE-2.0 *
10  * *
11  * Unless required by applicable law or agreed to in writing, software *
12  * distributed under the License is distributed on an "AS IS" BASIS, *
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. *
14  * See the License for the specific language governing permissions and *
15  * limitations under the License. *
16  * *
17  *****************************************************************************/
20 
21 namespace pdb {
22 
25  const Handle<ComputePlan> &computePlan,
26  LogicalPlanPtr logicalPlan,
27  ConfigurationPtr conf) : SimplePhysicalNode(std::move(jobId),
28  std::move(node),
29  computePlan,
30  logicalPlan,
31  std::move(conf)) {}
32 
33 
35  SimplePhysicalNodePtr &prevNode,
36  const StatisticsPtr &stats,
37  int nextStageID) {
38 
39  // create a analyzer result
40  PhysicalOptimizerResultPtr result = make_shared<PhysicalOptimizerResult>();
41 
42  // the computation specifier of this partition
43  std::string computationSpecifier = node->getComputationName();
44 
45  // grab the computation associated with this node
46  Handle<Computation> comp = logicalPlan->getNode(computationSpecifier).getComputationHandle();
47 
48  // create a SetIdentifier for the output set
49  Handle<SetIdentifier> sink = makeObject<SetIdentifier>(comp->getDatabaseName(), comp->getSetName());
50 
51  // create the tuple set job stage to run the pipeline with a shuffle sink
52  // here is what we are doing :
53  // the input to the stage is either the output of the join or the source node we started)
54  // the repartitioning flag is set to true, so that we run a pipeline with a hash partition sink
55  // the pipeline will apply all the computations to the source set
56  // and put them on a page partitioned into multiple vectors, and each vector on that page will be
57  // sent to an appropriate node.
58  tupleStageBuilder->setJobStageId(nextStageID++);
59  tupleStageBuilder->setTargetTupleSetName(node->getInputName());
60  tupleStageBuilder->setTargetComputationName(computationSpecifier);
61  tupleStageBuilder->setOutputTypeName(comp->getOutputType());
62  tupleStageBuilder->setSinkContext(sink);
63  tupleStageBuilder->setRepartition(true);
64  tupleStageBuilder->setRepartitionVector(true);
65  tupleStageBuilder->setAllocatorPolicy(comp->getAllocatorPolicy());
66 
67 
68  // to push back the job stage
69  result->physicalPlanToOutput.emplace_back(tupleStageBuilder->build());
70 
71  // we succeeded
72  result->success = true;
73 
74  return result;
75 }
76 
78  SimplePhysicalNodePtr &prevNode,
79  const StatisticsPtr &stats,
80  int nextStageID) {
81  // create a analyzer result
82  PhysicalOptimizerResultPtr result = make_shared<PhysicalOptimizerResult>();
83 
84  // the computation specifier of this partition
85  std::string computationSpecifier = node->getComputationName();
86 
87  // grab the computation associated with this node
88  Handle<Computation> curComp = logicalPlan->getNode(computationSpecifier).getComputationHandle();
89 
90  // to create the consuming job stage for partition
92 
93  // does the current computation already need materialization
94  if (curComp->needsMaterializeOutput()) {
95  sink = makeObject<SetIdentifier>(curComp->getDatabaseName(), curComp->getSetName());
96  sink->setPageSize(conf->getPageSize());
97  } else {
98  sink = makeObject<SetIdentifier>(jobId, node->getOutputName());
99  curComp->setOutput(jobId, node->getOutputName());
100  sink->setPageSize(conf->getPageSize());
101  result->interGlobalSets.push_back(sink);
102  }
103 
104  // create the tuple set job stage to run the pipeline with a hash partition sink
105  tupleStageBuilder->setJobStageId(nextStageID++);
106  tupleStageBuilder->setTargetTupleSetName(node->getInputName());
107  tupleStageBuilder->setTargetComputationName(computationSpecifier);
108  tupleStageBuilder->setOutputTypeName(curComp->getOutputType());
109  tupleStageBuilder->setSinkContext(sink);
110  tupleStageBuilder->setRepartition(true);
111  tupleStageBuilder->setRepartitionVector(true);
112  tupleStageBuilder->setAllocatorPolicy(curComp->getAllocatorPolicy());
113 
114  // to push back the job stage
115  result->physicalPlanToOutput.emplace_back(tupleStageBuilder->build());
116 
117 
118  // update the source sets (if the source node is not being used anymore we just remove it)
119  result->createdSourceComputations.push_back(getHandle());
120 
121  // we succeeded
122  result->success = true;
123 
124  // the new source is the sink
125  sourceSetIdentifier = sink;
126 
127  return result;
128 }
129 
131  SimplePhysicalNodePtr &prevNode,
132  const StatisticsPtr &stats,
133  int nextStageID) {
134  // create a analyzer result
135  PhysicalOptimizerResultPtr result = make_shared<PhysicalOptimizerResult>();
136 
137  // the computation specifier of this partition
138  std::string computationSpecifier = node->getComputationName();
139 
140  // grab the computation associated with this node
141  Handle<Computation> curComp = logicalPlan->getNode(computationSpecifier).getComputationHandle();
142 
143  // I am a pipeline breaker because I have more than one consumers
144  Handle<SetIdentifier> sink = nullptr;
145 
146  // in the case that the current computation does not require materialization by default
147  // we have to set an output to it, we it gets materialized
148  if (!curComp->needsMaterializeOutput()) {
149 
150  // set the output
151  curComp->setOutput(jobId, node->getOutputName());
152 
153  // create the sink and set the page size
154  sink = makeObject<SetIdentifier>(jobId, node->getOutputName());
155  sink->setPageSize(conf->getPageSize());
156 
157  // add this set to the list of intermediate sets
158  result->interGlobalSets.push_back(sink);
159  } else {
160  // this computation needs materialization either way so just create the sink set identifier
161  sink = makeObject<SetIdentifier>(curComp->getDatabaseName(), curComp->getSetName());
162  }
163 
164  // create the tuple set job stage to run the pipeline with a hash partition sink
165  tupleStageBuilder->setJobStageId(nextStageID++);
166  tupleStageBuilder->setTargetTupleSetName(node->getInputName());
167  tupleStageBuilder->setTargetComputationName(computationSpecifier);
168  tupleStageBuilder->setOutputTypeName(curComp->getOutputType());
169  tupleStageBuilder->setRepartition(true);
170  tupleStageBuilder->setRepartitionVector(true);
171  tupleStageBuilder->setAllocatorPolicy(curComp->getAllocatorPolicy());
172 
173  // add the created tuple job stage to the
174  result->physicalPlanToOutput.emplace_back(tupleStageBuilder->build());
175 
176 
177  // update the source sets to reflect the state after executing the job stages
178  result->createdSourceComputations.push_back(getHandle());
179 
180  // we succeeded
181  result->success = true;
182 
183  // the new source is the sink
184  sourceSetIdentifier = sink;
185 
186  return result;
187 }
188 
189 }
AbstractPhysicalNodePtr getHandle()
std::shared_ptr< Statistics > StatisticsPtr
Definition: Statistics.h:27
std::shared_ptr< LogicalPlan > LogicalPlanPtr
Definition: ComputePlan.h:36
PhysicalOptimizerResultPtr analyzeSingleConsumer(TupleSetJobStageBuilderPtr &tupleStageBuilder, SimplePhysicalNodePtr &prevNode, const StatisticsPtr &stats, int nextStageID) override
shared_ptr< Configuration > ConfigurationPtr
Definition: Configuration.h:89
std::shared_ptr< struct AtomicComputation > AtomicComputationPtr
std::shared_ptr< SimplePhysicalNode > SimplePhysicalNodePtr
PhysicalOptimizerResultPtr analyzeMultipleConsumers(TupleSetJobStageBuilderPtr &ptr, SimplePhysicalNodePtr &prevNode, const StatisticsPtr &stats, int nextStageID) override
PhysicalOptimizerResultPtr analyzeOutput(TupleSetJobStageBuilderPtr &tupleStageBuilder, SimplePhysicalNodePtr &prevNode, const StatisticsPtr &stats, int nextStageID) override
SimplePhysicalPartitionNode(string jobId, AtomicComputationPtr node, const Handle< ComputePlan > &computePlan, LogicalPlanPtr logicalPlan, ConfigurationPtr conf)
AtomicComputationPtr node
Handle< SetIdentifier > sourceSetIdentifier
std::shared_ptr< TupleSetJobStageBuilder > TupleSetJobStageBuilderPtr
std::shared_ptr< PhysicalOptimizerResult > PhysicalOptimizerResultPtr