A platform for high-performance distributed tool and library development written in C++. It can be deployed in two different cluster modes: standalone or distributed. API for v0.5.0, released on June 13, 2018.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
AbstractPhysicalNode.cc
Go to the documentation of this file.
1 /*****************************************************************************
2  * *
3  * Copyright 2018 Rice University *
4  * *
5  * Licensed under the Apache License, Version 2.0 (the "License"); *
6  * you may not use this file except in compliance with the License. *
7  * You may obtain a copy of the License at *
8  * *
9  * http://www.apache.org/licenses/LICENSE-2.0 *
10  * *
11  * Unless required by applicable law or agreed to in writing, software *
12  * distributed under the License is distributed on an "AS IS" BASIS, *
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. *
14  * See the License for the specific language governing permissions and *
15  * limitations under the License. *
16  * *
17  *****************************************************************************/
18 
19 #include "AbstractPhysicalNode.h"
20 #include "SelectionComp.h"
21 #include "PartitionComp.h"
22 #include "MultiSelectionComp.h"
23 
24 namespace pdb {
25 
27  const Handle<ComputePlan> &computePlan,
28  LogicalPlanPtr &logicalPlan,
29  ConfigurationPtr &conf) : jobId(jobId),
30  computePlan(computePlan),
31  logicalPlan(logicalPlan),
32  conf(conf),
33  handle(nullptr) {}
34 
36 
37  switch (computation->getComputationTypeID()) {
38  case ScanUserSetTypeID :
39  case ScanSetTypeID : {
40 
41  // this is a ScanUserSet cast it
42  Handle<ScanUserSet<Object>> scanner = unsafeCast<ScanUserSet<Object>, Computation>(computation);
43 
44  // create a set identifier from it
45  return makeObject<SetIdentifier>(scanner->getDatabaseName(), scanner->getSetName());
46  }
48 
49  // this is an AbstractAggregateComp cast it
50  Handle<AbstractAggregateComp> aggregator = unsafeCast<AbstractAggregateComp, Computation>(computation);
51 
52  // create the set identifier from it
53  return makeObject<SetIdentifier>(aggregator->getDatabaseName(), aggregator->getSetName());
54  }
55  case PartitionCompTypeID : {
56 
57  // this is a PartitionComp cast it
58  Handle<PartitionComp<Object, Object>> partitioner = unsafeCast<PartitionComp<Object, Object>, Computation>(computation);
59 
60  // create the set identifier from it
61  return makeObject<SetIdentifier>(partitioner->getDatabaseName(), partitioner->getSetName());
62 
63  }
64  case SelectionCompTypeID : {
65 
66  // this is a SelectionComp cast it
68  selector = unsafeCast<SelectionComp<Object, Object>, Computation>(computation);
69 
70  // create the set identifier from it
71  return makeObject<SetIdentifier>(selector->getDatabaseName(), selector->getSetName());
72  }
74 
75  // this is a MultiSelectionComp cast it
77  selector = unsafeCast<MultiSelectionComp<Object, Object>, Computation>(computation);
78 
79  // create the set identifier from it
80  return makeObject<SetIdentifier>(selector->getDatabaseName(), selector->getSetName());
81  }
82  default: {
83  // this is bad, we can not cast this thing...
84  PDB_COUT << "Source Computation Type: " << computation->getComputationType()
85  << " are not supported as source node right now" << std::endl;
86  PDB_COUT << "Manager exit...Please restart cluster\n";
87  exit(1); // TODO we are killing the server for a bad query might not be smart!
88  }
89  }
90 }
91 
92 
94 
95  // if we do not have a handle to this node already
96  if(handle == nullptr) {
97  handle = std::shared_ptr<AbstractPhysicalNode> (this);
98  }
99 
100  return handle;
101 }
102 
104 
105  // set the iterator to the idx-th element
106  auto it = consumers.begin();
107  std::advance(it, idx);
108 
109  // return the consumer
110  return *it;
111 }
112 
114  return consumers.size();
115 }
116 
118 
119  // set the iterator to the idx-th element
120  auto it = producers.begin();
121  std::advance(it, idx);
122 
123  // return the consumer
124  return *it;
125 }
126 
128  return producers.size();
129 }
130 
131 }
AbstractPhysicalNodePtr getHandle()
AbstractPhysicalNodePtr getProducer(int idx)
std::shared_ptr< LogicalPlan > LogicalPlanPtr
Definition: ComputePlan.h:36
AbstractPhysicalNodeWeakPtr handle
AbstractPhysicalNode(string &jobId, const Handle< ComputePlan > &computePlan, LogicalPlanPtr &logicalPlan, ConfigurationPtr &conf)
AbstractPhysicalNodePtr getConsumer(int idx)
#define PDB_COUT
Definition: PDBDebug.h:31
shared_ptr< Configuration > ConfigurationPtr
Definition: Configuration.h:89
std::list< AbstractPhysicalNodePtr > consumers
std::shared_ptr< AbstractPhysicalNode > AbstractPhysicalNodePtr
Handle< SetIdentifier > getSetIdentifierFromComputation(Handle< Computation > computation)
std::list< AbstractPhysicalNodeWeakPtr > producers