37 std::cout << source->getAtomicComputationType() << std::endl;
76 switch (curNode->getAtomicComputationTypeID()) {
83 createPhysicalPipeline<AdvancedPhysicalJoinSidePipe>();
91 createPhysicalPipeline<AdvancedPhysicalAggregationPipe>();
105 createPhysicalPipeline<AdvancedPhysicalStraightPipe>();
123 createPhysicalPipeline<AdvancedPhysicalStraightPipe>();
128 for(
auto &consumer : consumers) {
137 std::vector<std::string> consumers;
145 std::cout << consumer->getOutputName() << std::endl;
150 consumers.push_back(consumer->getOutputName());
154 if(!consumers.empty()) {
155 this->
consumedBy[node->getNodeIdentifier()] = consumers;
164 auto consumingAtomicComputation =
consumedBy[node.second->getNodeIdentifier()];
167 for(
const auto &atomicComputation : consumingAtomicComputation) {
169 std::cout << node.second->getPipeComputations().back()->getOutputName() <<
":" << atomicComputation << std::endl;
171 auto consumer =
startsWith[atomicComputation];
174 node.second->addConsumer(consumer);
std::vector< AbstractPhysicalNodePtr > physicalSourceNodes
std::map< std::string, std::vector< std::string > > consumedBy
AdvancedPhysicalNodeFactory(const string &jobId, const Handle< ComputePlan > &computePlan, const ConfigurationPtr &conf)
std::map< std::string, AdvancedPhysicalPipelineNodePtr > physicalNodes
std::map< std::string, AdvancedPhysicalPipelineNodePtr > startsWith
std::set< AtomicComputationPtr > visitedNodes
void transverseTCAPGraph(AtomicComputationPtr curNode)
std::vector< AtomicComputationPtr > currentPipe
std::vector< AtomicComputationPtr > & getConsumingAtomicComputations(std::string inputName)
void setConsumers(shared_ptr< AdvancedPhysicalAbstractPipe > node)
vector< AbstractPhysicalNodePtr > generateAnalyzerGraph(std::vector< AtomicComputationPtr > sources) override
AtomicComputationList computationGraph
shared_ptr< Configuration > ConfigurationPtr
std::shared_ptr< struct AtomicComputation > AtomicComputationPtr