A platform for high-performance distributed tool and library development written in C++. It can be deployed in two different cluster modes: standalone or distributed. API for v0.5.0, released on June 13, 2018.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
AdvancedPhysicalNodeFactory.h
Go to the documentation of this file.
1 /*****************************************************************************
2  * *
3  * Copyright 2018 Rice University *
4  * *
5  * Licensed under the Apache License, Version 2.0 (the "License"); *
6  * you may not use this file except in compliance with the License. *
7  * You may obtain a copy of the License at *
8  * *
9  * http://www.apache.org/licenses/LICENSE-2.0 *
10  * *
11  * Unless required by applicable law or agreed to in writing, software *
12  * distributed under the License is distributed on an "AS IS" BASIS, *
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. *
14  * See the License for the specific language governing permissions and *
15  * limitations under the License. *
16  * *
17  *****************************************************************************/
18 
19 #ifndef PDB_ADVANCEDPHYSICALNODEFACTORY_H
20 #define PDB_ADVANCEDPHYSICALNODEFACTORY_H
21 
22 #include <set>
25 
26 namespace pdb {
27 
29 
30  public:
31 
32  AdvancedPhysicalNodeFactory(const string &jobId,
34  const ConfigurationPtr &conf);
35 
41  vector<AbstractPhysicalNodePtr> generateAnalyzerGraph(std::vector<AtomicComputationPtr> sources) override;
42 
43  protected:
44 
50 
56  void setConsumers(shared_ptr<AdvancedPhysicalAbstractPipe> node);
57 
61  void connectThePipes();
62 
66  template <class T>
68 
69  // this must never be empty
70  assert(!currentPipe.empty());
71 
72  // create the node
73  auto node = new T(jobId, computePlan, logicalPlan, conf, currentPipe, currentNodeIndex++);
74 
75  // create the node handle
76  auto nodeHandle = node->getAdvancedPhysicalNodeHandle();
77 
78  // update all the node connections
79  setConsumers(nodeHandle);
80 
81  // is this a source node
82  if(nodeHandle->isSource()) {
83 
84  // add the source node
85  physicalSourceNodes.push_back(nodeHandle);
86  }
87 
88  for(auto &c : currentPipe) {
89  std::cout << c->getOutputName() << std::endl;
90  }
91 
92  std::cout << "-----------------------------------------" << std::endl;
93 
94 
95  // add the starts with
96  startsWith[currentPipe.front()->getOutputName()] = nodeHandle;
97 
98  // add the pipe to the physical nodes
99  physicalNodes[nodeHandle->getNodeIdentifier()] = nodeHandle;
100  }
101 
105  std::string jobId;
106 
111 
116 
120  std::set<AtomicComputationPtr> visitedNodes;
121 
125  std::vector<AtomicComputationPtr> currentPipe;
126 
130  std::map<std::string, AdvancedPhysicalPipelineNodePtr> physicalNodes;
131 
135  std::vector<AbstractPhysicalNodePtr> physicalSourceNodes;
136 
141  std::map<std::string, AdvancedPhysicalPipelineNodePtr> startsWith;
142 
146  std::map<std::string, std::vector<std::string>> consumedBy;
147 
151  std::vector<AdvancedPhysicalPipelineNodePtr> sources;
152 };
153 
154 }
155 
156 
157 
158 #endif //PDB_ADVANCEDPHYSICALNODEFACTORY_H
std::vector< AbstractPhysicalNodePtr > physicalSourceNodes
std::map< std::string, std::vector< std::string > > consumedBy
AdvancedPhysicalNodeFactory(const string &jobId, const Handle< ComputePlan > &computePlan, const ConfigurationPtr &conf)
std::map< std::string, AdvancedPhysicalPipelineNodePtr > physicalNodes
std::map< std::string, AdvancedPhysicalPipelineNodePtr > startsWith
std::set< AtomicComputationPtr > visitedNodes
void transverseTCAPGraph(AtomicComputationPtr curNode)
std::vector< AtomicComputationPtr > currentPipe
std::vector< AdvancedPhysicalPipelineNodePtr > sources
void setConsumers(shared_ptr< AdvancedPhysicalAbstractPipe > node)
vector< AbstractPhysicalNodePtr > generateAnalyzerGraph(std::vector< AtomicComputationPtr > sources) override
shared_ptr< Configuration > ConfigurationPtr
Definition: Configuration.h:89
std::shared_ptr< struct AtomicComputation > AtomicComputationPtr