A platform for high-performance distributed tool and library development written in C++. It can be deployed in two different cluster modes: standalone or distributed. API for v0.5.0, released on June 13, 2018.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
AdvancedPhysicalAbstractAlgorithm.cc
Go to the documentation of this file.
1 /*****************************************************************************
2  * *
3  * Copyright 2018 Rice University *
4  * *
5  * Licensed under the Apache License, Version 2.0 (the "License"); *
6  * you may not use this file except in compliance with the License. *
7  * You may obtain a copy of the License at *
8  * *
9  * http://www.apache.org/licenses/LICENSE-2.0 *
10  * *
11  * Unless required by applicable law or agreed to in writing, software *
12  * distributed under the License is distributed on an "AS IS" BASIS, *
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. *
14  * See the License for the specific language governing permissions and *
15  * limitations under the License. *
16  * *
17  *****************************************************************************/
18 
20 
21 AdvancedPhysicalAbstractAlgorithm::AdvancedPhysicalAbstractAlgorithm(const AdvancedPhysicalPipelineNodePtr &handle,
22  const std::string &jobID,
23  bool isProbing,
24  bool isOutput,
25  Handle<SetIdentifier> source,
26  Handle<ComputePlan> computePlan,
27  const LogicalPlanPtr &logicalPlan,
28  const ConfigurationPtr &conf)
29  : jobID(jobID),
30  computePlan(computePlan),
31  logicalPlan(logicalPlan),
32  conf(conf),
33  source(source),
34  isProbing(isProbing),
35  isOutput(isOutput) {
36  // add the current node to the pipeline
37  pipeline.push_back(handle);
38 }
39 
40 
42  const StatisticsPtr &stats,
43  std::vector<AdvancedPhysicalPipelineNodePtr> &pipesToPipeline) {
44 
45  // get the source set identifier of the first node in the pipeline
46  source = pipesToPipeline.front()->getSourceSetIdentifier();
47 
48  // add the pipesToPipeline to the current pipeline
49  pipeline.insert(pipeline.begin(), pipesToPipeline.begin(), pipesToPipeline.end());
50 
51  // generate the stage
52  return generate(nextStageID, stats);
53 }
54 
56  DataStatistics approxSize,
57  const StatisticsPtr &stats) {
58 
59  // the handle always has to something else than nullptr
60  assert(pipeline.back() != nullptr);
61 
62  for(auto i = 0; i < pipeline.back()->getNumConsumers(); ++i) {
63  auto consumer = pipeline.back()->getConsumer(i)->to<AdvancedPhysicalAbstractPipe>();
64  consumer->setSourceSetIdentifier(sink);
65  }
66 
67  // update the set name
68  approxSize.databaseName = sink->getDatabase();
69  approxSize.setName = sink->getSetName();
70 
71  // update the stats
72  if(stats != nullptr){
73  stats->addSet(sink->getDatabase(), sink->getSetName(), approxSize);
74  }
75 }
76 
78 
79  // TODO this is a silly approximation, the size is never the same we need something better...
80 
81  // an algorithm should always have a source set
82  assert(source != nullptr);
83 
84  // temp variables
85  DataStatistics ds;
86 
87  // set the set stats
88  ds.pageSize = stats != nullptr ? stats->getPageSize(source->getDatabase(), source->getSetName()) : 0;
89  ds.numTuples = stats != nullptr ? stats->getNumTuples(source->getDatabase(), source->getSetName()) : 0;
90  ds.numBytes = stats != nullptr ? stats->getNumBytes(source->getDatabase(), source->getSetName()) : 0;
91  ds.avgTupleSize = stats != nullptr ? stats->getAvgTupleSize(source->getDatabase(), source->getSetName()) : 0;
92 
93  // get the size of the source set in bytes
94  return ds;
95 }
96 
98 
99  // if we are calling this we always must have two producers
100  assert(pipeline.front()->getNumProducers() == 2);
101 
102  // grab the left side and the right side
103  auto lhs = pipeline.front()->getProducer(0)->to<AdvancedPhysicalAbstractPipe>();
104  auto rhs = pipeline.front()->getProducer(1)->to<AdvancedPhysicalAbstractPipe>();
105 
106  // TODO for now we are just handling two situations one side JOIN_SHUFFLED_HASHSET_ALGORITHM the other JOIN_SUFFLE_SET_ALGORITHM
107  if(lhs->getSelectedAlgorithm()->getType() == JOIN_SHUFFLED_HASHSET_ALGORITHM &&
108  rhs->getSelectedAlgorithm()->getType() == JOIN_SUFFLE_SET_ALGORITHM) {
109 
110  // the last computation is hash computation grab that and add to the front of our pipeline
111  pipelineComputations.push_front(rhs->getPipeComputations().back());
112  }
113  else if(rhs->getSelectedAlgorithm()->getType() == JOIN_SHUFFLED_HASHSET_ALGORITHM &&
114  lhs->getSelectedAlgorithm()->getType() == JOIN_SUFFLE_SET_ALGORITHM) {
115 
116  // the last computation is hash computation grab that and add to the front of our pipeline
117  pipelineComputations.push_front(lhs->getPipeComputations().back());
118  }
119 }
120 
122 
123  // first clear it for reasons...
124  pipelineComputations.clear();
125 
126  // go through each stage check if we a probing and copy the atomic computations
127  for(auto &p : pipeline) {
128 
129  // get the atomic computations of the pipeline
130  auto computations = p->getPipeComputations();
131 
132  // append the pipelined operators
133  pipelineComputations.insert(pipelineComputations.end(), computations.begin(), computations.end());
134  }
135 
136  // if we are joining, if so check if we need to include the hash computation into this pipeline
137  if(pipeline.front()->isJoining()) {
139  }
140  else if(pipeline.front()->getNumProducers() == 1 && pipeline.front()->getProducer(0)->to<AdvancedPhysicalAbstractPipe>()->getPipeComputations().back()->getAtomicComputationTypeID() == ApplyAggTypeID){
141 
142  // else we include the last atomic computation of the previous pipe
143  auto producer = pipeline.front()->getProducer(0)->to<AdvancedPhysicalAbstractPipe>();
144  pipelineComputations.push_front(producer->getPipeComputations().back());
145  }
146 }
147 
149 
150  // first clear it for reasons...
151  probingHashSets.clear();
152 
153  for(auto &p : pipeline) {
154 
155  if (p->isJoining()) {
156 
157  // set the is probing flag
158  this->isProbing = p->isJoining();
159 
160  // get the probing hash sets
161  auto sets = p->getProbingHashSets();
162 
163  // there should always be one hash set we are probing for a join
164  assert(!sets.empty());
165 
166  // add the tuple sets we are probing to the list
167  probingHashSets.insert(sets.begin(), sets.end());
168  }
169  }
170 }
std::shared_ptr< Statistics > StatisticsPtr
Definition: Statistics.h:27
void updateConsumers(const Handle< SetIdentifier > &sink, DataStatistics approxSize, const StatisticsPtr &stats)
std::list< AdvancedPhysicalPipelineNodePtr > pipeline
const vector< AtomicComputationPtr > & getPipeComputations() const
std::string databaseName
Definition: Statistics.h:34
std::shared_ptr< LogicalPlan > LogicalPlanPtr
Definition: ComputePlan.h:36
void setSourceSetIdentifier(const Handle< SetIdentifier > &sourceSetIdentifier)
virtual DataStatistics approximateResultSize(const StatisticsPtr &stats)
shared_ptr< Configuration > ConfigurationPtr
Definition: Configuration.h:89
std::string setName
Definition: Statistics.h:35
virtual PhysicalOptimizerResultPtr generate(int nextStageID, const StatisticsPtr &stats)=0
unordered_map< std::string, std::string > probingHashSets
virtual PhysicalOptimizerResultPtr generatePipelined(int nextStageID, const StatisticsPtr &stats, std::vector< AdvancedPhysicalPipelineNodePtr > &pipesToPipeline)
std::shared_ptr< AdvancedPhysicalAbstractPipe > AdvancedPhysicalPipelineNodePtr
std::shared_ptr< PhysicalOptimizerResult > PhysicalOptimizerResultPtr
const AdvancedPhysicalAbstractAlgorithmPtr & getSelectedAlgorithm() const