A platform for high-performance distributed tool and library development written in C++. It can be deployed in two different cluster modes: standalone or distributed. API for v0.5.0, released on June 13, 2018.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
PhysicalOptimizer.cc
Go to the documentation of this file.
1 /*****************************************************************************
2  * *
3  * Copyright 2018 Rice University *
4  * *
5  * Licensed under the Apache License, Version 2.0 (the "License"); *
6  * you may not use this file except in compliance with the License. *
7  * You may obtain a copy of the License at *
8  * *
9  * http://www.apache.org/licenses/LICENSE-2.0 *
10  * *
11  * Unless required by applicable law or agreed to in writing, software *
12  * distributed under the License is distributed on an "AS IS" BASIS, *
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. *
14  * See the License for the specific language governing permissions and *
15  * limitations under the License. *
16  * *
17  *****************************************************************************/
18 
20 #include "PhysicalOptimizer.h"
21 
22 
23 namespace pdb {
24 
25 PhysicalOptimizer::PhysicalOptimizer(std::vector<AbstractPhysicalNodePtr> &sources, PDBLoggerPtr &logger) {
26 
27  // this is the logger
28  this->logger = logger;
29 
30  // form the map of source nodes
31  for(const auto &i : sources) {
32  sourceNodes[i->getNodeIdentifier()] = i;
33  }
34 }
35 
37  vector<Handle<SetIdentifier>> &interGlobalSets,
38  StatisticsPtr &stats,
39  int &jobStageId) {
40  // grab the best node
41  auto source = getBestNode(stats);
42 
43  // analyze this source node and do physical planning
44  auto result = source->analyze(stats, jobStageId);
45 
46  // copy the physical plan
47  physicalPlanToOutput.insert(physicalPlanToOutput.end(),
48  result->physicalPlanToOutput.begin(),
49  result->physicalPlanToOutput.end());
50 
51  // copy the intermediate sets
52  interGlobalSets.insert(interGlobalSets.end(),
53  result->interGlobalSets.begin(),
54  result->interGlobalSets.end());
55 
56  // did we succeed
57  if(result->success) {
58 
59  // increase the job stage
60  jobStageId += result->physicalPlanToOutput.size();
61 
62  // did we create a new source set add them
63  for(auto &it : result->createdSourceComputations) {
64  sourceNodes[it->getNodeIdentifier()] = it;
65  }
66 
67  // does this source have any consumers
68  if(!source->hasConsumers()) {
69  sourceNodes.erase(source->getNodeIdentifier());
70  }
71  }
72  // if we did not penalize this set
73  else {
74  penalizedSets.insert(source->getNodeIdentifier());
75  }
76 
77  return result->success;
78 }
79 
81  return !sourceNodes.empty();
82 }
83 
85 
86  // go through each consumer
87  for(auto &source: sourceNodes) {
88 
89  // check
90  if(source.second->isConsuming(set)) {
91  return true;
92  }
93  }
94 
95  // ok we do not have it
96  return false;
97 }
98 
100 
101  // the default is to just use the first node
102  AbstractPhysicalNodePtr ret = sourceNodes.begin()->second;
103  double cost = std::numeric_limits<double>::max();
104 
105  // go through all source nodes
106  for(const auto &it : sourceNodes) {
107 
108  // grab the cost of the source
109  double sourceCost = it.second->getCost(ptr);
110 
111  // is this set in the penalized sets, increase the cost by a factor of 1000
112  if(penalizedSets.find(it.second->getNodeIdentifier()) != penalizedSets.end()){
113  sourceCost *= SOURCE_PENALIZE_FACTOR;
114  }
115 
116  // if the cost is less the the current cost this is our new best source
117  if(sourceCost < cost) {
118  cost = sourceCost;
119  ret = it.second;
120  }
121  }
122 
123  return ret;
124 }
125 
127  penalizedSets.clear();
128  sourceNodes.clear();
129 }
130 
131 }
bool hasConsumers(Handle< SetIdentifier > &set)
std::shared_ptr< Statistics > StatisticsPtr
Definition: Statistics.h:27
static constexpr double SOURCE_PENALIZE_FACTOR
std::set< std::string > penalizedSets
bool getNextStagesOptimized(std::vector< pdb::Handle< AbstractJobStage >> &physicalPlanToOutput, std::vector< pdb::Handle< SetIdentifier >> &interGlobalSets, StatisticsPtr &stats, int &jobStageId)
std::map< std::string, AbstractPhysicalNodePtr > sourceNodes
std::shared_ptr< AbstractPhysicalNode > AbstractPhysicalNodePtr
std::shared_ptr< PDBLogger > PDBLoggerPtr
Definition: PDBLogger.h:40
PhysicalOptimizer(std::vector< AbstractPhysicalNodePtr > &sources, PDBLoggerPtr &logger)
AbstractPhysicalNodePtr getBestNode(StatisticsPtr &ptr)