A platform for high-performance distributed tool and library development written in C++. It can be deployed in two different cluster modes: standalone or distributed. API for v0.5.0, released on June 13, 2018.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
AdvancedPhysicalNodeFactory.cc
Go to the documentation of this file.
1 /*****************************************************************************
2  * *
3  * Copyright 2018 Rice University *
4  * *
5  * Licensed under the Apache License, Version 2.0 (the "License"); *
6  * you may not use this file except in compliance with the License. *
7  * You may obtain a copy of the License at *
8  * *
9  * http://www.apache.org/licenses/LICENSE-2.0 *
10  * *
11  * Unless required by applicable law or agreed to in writing, software *
12  * distributed under the License is distributed on an "AS IS" BASIS, *
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. *
14  * See the License for the specific language governing permissions and *
15  * limitations under the License. *
16  * *
17  *****************************************************************************/
18 
23 
24 namespace pdb {
25 
27  const Handle<ComputePlan> &computePlan,
28  const ConfigurationPtr &conf)
29  : AbstractPhysicalNodeFactory(computePlan), jobId(jobId), conf(conf), currentNodeIndex(0) {}
30 
31 
32 vector<AbstractPhysicalNodePtr> AdvancedPhysicalNodeFactory::generateAnalyzerGraph(std::vector<AtomicComputationPtr> sources) {
33 
34  // go through each source in the sources
35  for(const AtomicComputationPtr &source : sources) {
36 
37  std::cout << source->getAtomicComputationType() << std::endl;
38 
39  // go trough each consumer of this node
40  for(const auto &consumer : computationGraph.getConsumingAtomicComputations(source->getOutputName())) {
41 
42  // we start with a source so we push that back
43  currentPipe.push_back(source);
44 
45  // add the consumer to the pipe
46  currentPipe.push_back(consumer);
47 
48  // then we start transversing the graph upwards
49  transverseTCAPGraph(consumer);
50  }
51  }
52 
53  // connect the pipes
55 
56  // return the generated source nodes
57  return this->physicalSourceNodes;
58 }
59 
61 
62  // did we already visit this node
63  if(visitedNodes.find(curNode) != visitedNodes.end()) {
64 
65  // clear the pipe we are done here
66  currentPipe.clear();
67 
68  // we are done here
69  return;
70  }
71 
72  // ok now we visited this node
73  visitedNodes.insert(curNode);
74 
75  // check the type of this node might be a pipeline breaker
76  switch (curNode->getAtomicComputationTypeID()) {
77 
78  case HashOneTypeID:
79  case HashLeftTypeID:
80  case HashRightTypeID: {
81 
82  // we got a hash operation, create a shuffle pipe
83  createPhysicalPipeline<AdvancedPhysicalJoinSidePipe>();
84  currentPipe.clear();
85 
86  break;
87  }
88  case ApplyAggTypeID: {
89 
90  // we got a aggregation so we need to create an aggregation shuffle pipe
91  createPhysicalPipeline<AdvancedPhysicalAggregationPipe>();
92  currentPipe.clear();
93 
94  break;
95  }
96  case WriteSetTypeID: {
97 
98  // do we just have one write set that was after an aggregation in this pipeline we just skip it no pipe is created
99  if(currentPipe.size() == 1) {
100  currentPipe.clear();
101  return;
102  }
103 
104  // write set also breaks the pipe because this is where the pipe ends
105  createPhysicalPipeline<AdvancedPhysicalStraightPipe>();
106  currentPipe.clear();
107  }
108  default: {
109 
110  // we only care about these since they tend to be pipeline breakers
111  break;
112  }
113  }
114 
115  // grab all the consumers
116  auto consumers = computationGraph.getConsumingAtomicComputations(curNode->getOutputName());
117 
118  // if we have multiple consumers and there is still stuff left in the pipe
119  if(consumers.size() > 1 && !currentPipe.empty()) {
120 
121  // this is a pipeline breaker create a pipe
122  //currentPipe.push_back(curNode);
123  createPhysicalPipeline<AdvancedPhysicalStraightPipe>();
124  currentPipe.clear();
125  }
126 
127  // go through each consumer and transverse to get the next pipe
128  for(auto &consumer : consumers) {
129  currentPipe.push_back(consumer);
130  transverseTCAPGraph(consumer);
131  }
132 }
133 
134 void AdvancedPhysicalNodeFactory::setConsumers(shared_ptr<AdvancedPhysicalAbstractPipe> node) {
135 
136  // all the consumers of these pipes
137  std::vector<std::string> consumers;
138 
139  // go trough each consumer of this node
140  for(const auto &consumer : computationGraph.getConsumingAtomicComputations(this->currentPipe.back()->getOutputName())) {
141 
142  // if the next pipe begins with a write set we just ignore it...
143  // this is happening usually when we have an aggregation connected to a write set which is not really necessary
144  if(consumer->getAtomicComputationTypeID() == WriteSetTypeID){
145  std::cout << consumer->getOutputName() << std::endl;
146  continue;
147  }
148 
149  // add them to the consumers
150  consumers.push_back(consumer->getOutputName());
151  }
152 
153  // set the consumers
154  if(!consumers.empty()) {
155  this->consumedBy[node->getNodeIdentifier()] = consumers;
156  }
157 }
158 
160 
161  for(auto node : physicalNodes) {
162 
163  // get all the consumers of this pipe
164  auto consumingAtomicComputation = consumedBy[node.second->getNodeIdentifier()];
165 
166  // go through each at
167  for(const auto &atomicComputation : consumingAtomicComputation) {
168 
169  std::cout << node.second->getPipeComputations().back()->getOutputName() << ":" << atomicComputation << std::endl;
170  // get the consuming pipeline
171  auto consumer = startsWith[atomicComputation];
172 
173  // add the consuming node of this guy
174  node.second->addConsumer(consumer);
175  }
176  }
177 }
178 
179 }
std::vector< AbstractPhysicalNodePtr > physicalSourceNodes
std::map< std::string, std::vector< std::string > > consumedBy
AdvancedPhysicalNodeFactory(const string &jobId, const Handle< ComputePlan > &computePlan, const ConfigurationPtr &conf)
std::map< std::string, AdvancedPhysicalPipelineNodePtr > physicalNodes
std::map< std::string, AdvancedPhysicalPipelineNodePtr > startsWith
std::set< AtomicComputationPtr > visitedNodes
void transverseTCAPGraph(AtomicComputationPtr curNode)
std::vector< AtomicComputationPtr > currentPipe
std::vector< AtomicComputationPtr > & getConsumingAtomicComputations(std::string inputName)
void setConsumers(shared_ptr< AdvancedPhysicalAbstractPipe > node)
vector< AbstractPhysicalNodePtr > generateAnalyzerGraph(std::vector< AtomicComputationPtr > sources) override
shared_ptr< Configuration > ConfigurationPtr
Definition: Configuration.h:89
std::shared_ptr< struct AtomicComputation > AtomicComputationPtr