A platform for high-performance distributed tool and library development written in C++. It can be deployed in two different cluster modes: standalone or distributed. API for v0.5.0, released on June 13, 2018.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
SimplePhysicalNodeFactory.cc
Go to the documentation of this file.
1 /*****************************************************************************
2  * *
3  * Copyright 2018 Rice University *
4  * *
5  * Licensed under the Apache License, Version 2.0 (the "License"); *
6  * you may not use this file except in compliance with the License. *
7  * You may obtain a copy of the License at *
8  * *
9  * http://www.apache.org/licenses/LICENSE-2.0 *
10  * *
11  * Unless required by applicable law or agreed to in writing, software *
12  * distributed under the License is distributed on an "AS IS" BASIS, *
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. *
14  * See the License for the specific language governing permissions and *
15  * limitations under the License. *
16  * *
17  *****************************************************************************/
21 
22 namespace pdb {
23 
25  const Handle<ComputePlan> &computePlan,
26  const ConfigurationPtr &conf) : AbstractPhysicalNodeFactory(computePlan),
27  jobId(jobId),
28  conf(conf) {}
29 
31 
32  // check the type of the atomic computation
33  switch (tcapNode->getAtomicComputationTypeID()){
34 
35  // we are dealing with an aggregate
36  case ApplyAggTypeID: {
37  return (new SimplePhysicalAggregationNode(jobId, tcapNode, computePlan, logicalPlan, conf))->getSimpleNodeHandle();
38  }
39 
40  // we are dealing with a join
41  case ApplyJoinTypeID: {
42  return (new SimplePhysicalJoinNode(jobId, tcapNode, computePlan, logicalPlan, conf))->getSimpleNodeHandle();
43  }
44 
45  // we are dealing with node that is not an aggregate or a join (no special treatment needed)
46  default: {
47  return (new SimplePhysicalNode(jobId, tcapNode, computePlan, logicalPlan, conf))->getSimpleNodeHandle();
48  }
49  }
50 }
51 
52 std::vector<AbstractPhysicalNodePtr> SimplePhysicalNodeFactory::generateAnalyzerGraph(std::vector<AtomicComputationPtr> sources) {
53 
54  std::vector<AbstractPhysicalNodePtr> ret;
55 
56  for(const auto &source : sources) {
57 
58  auto analyzerNode = createAnalyzerNode(source);
59 
60  // store this source since we are returning it
61  ret.push_back(analyzerNode);
62 
63  // store it in nodes to avoid duplicates
64  nodes[source->getOutputName()] = analyzerNode;
65 
66  // for each consumer go through them
67  for(const auto &consumer : computationGraph.getConsumingAtomicComputations(source->getOutputName())) {
68  generateConsumerNode(analyzerNode, consumer);
69  }
70  }
71 
72  // clear nodes map
73  nodes.clear();
74 
75  return ret;
76 }
77 
79 
80  AbstractPhysicalNodePtr analyzerNode;
81 
82  // do we already have an AbstractPhysicalNode for this node
83  if(nodes.find(node->getOutputName()) == nodes.end()) {
84 
85  // create the node
86  analyzerNode = createAnalyzerNode(node);
87 
88  // store the node as created
89  nodes[node->getOutputName()] = analyzerNode;
90 
91  // add the current node to this source
92  source->addConsumer(analyzerNode);
93  }
94  else {
95  // grab the already exiting node
96  analyzerNode = nodes[node->getOutputName()];
97 
98  // add the current node to this source
99  source->addConsumer(analyzerNode);
100 
101  // we are done here
102  return;
103  }
104 
105  // for each consumer of the current node do the same
106  for(const auto &consumer : computationGraph.getConsumingAtomicComputations(node->getOutputName())) {
107  generateConsumerNode(analyzerNode, consumer);
108  }
109 }
110 
111 }
SimplePhysicalNodeFactory(const string &jobId, const Handle< ComputePlan > &computePlan, const ConfigurationPtr &conf)
std::vector< AtomicComputationPtr > & getConsumingAtomicComputations(std::string inputName)
shared_ptr< Configuration > ConfigurationPtr
Definition: Configuration.h:89
std::shared_ptr< struct AtomicComputation > AtomicComputationPtr
std::shared_ptr< AbstractPhysicalNode > AbstractPhysicalNodePtr
AbstractPhysicalNodePtr createAnalyzerNode(AtomicComputationPtr tcapNode)
void generateConsumerNode(AbstractPhysicalNodePtr source, AtomicComputationPtr node)
std::vector< AbstractPhysicalNodePtr > generateAnalyzerGraph(std::vector< AtomicComputationPtr > sources) override
std::map< std::string, AbstractPhysicalNodePtr > nodes