29 vector<AtomicComputationPtr> &pipeComputations,
34 pipeComputations(pipeComputations),
35 selectedAlgorithm(nullptr),
42 auto computationName = pipeComputations.front()->getComputationName();
99 assert(std::dynamic_pointer_cast<AdvancedPhysicalAbstractPipe>(producer)->
getType() ==
JOIN_SIDE);
126 std::vector<AdvancedPhysicalPipelineNodePtr> pipeline,
166 auto current =
analyze(stats, nextStageID + (
int) previous->physicalPlanToOutput.size());
169 current->physicalPlanToOutput.insert(current->physicalPlanToOutput.begin(),
170 previous->physicalPlanToOutput.begin(),
171 previous->physicalPlanToOutput.end());
180 current->success = current->success && previous->success;
217 if(stats ==
nullptr) {
231 return double((
size_t) cost / 1000000);
253 return "node_" + to_string(
id);
293 if(!lhs->isExecuted() && !rhs->
isExecuted()) {
296 for (
const auto &algorithm : algorithms) {
307 assert(best !=
nullptr);
315 auto otherAlgorithm = lhs->isExecuted() ? lhs->getSelectedAlgorithm() : rhs->
getSelectedAlgorithm();
318 assert(otherAlgorithm !=
nullptr);
324 for (
const auto &algorithm : algorithms) {
338 return algorithms.front();
348 std::unordered_map<std::string, std::string> ret;
356 if(joinSide->isExecuted() && joinSide->hasHashSet()) {
AbstractPhysicalNodePtr getHandle()
virtual AdvancedPhysicalAbstractAlgorithmPtr selectOutputAlgorithm()=0
std::shared_ptr< Statistics > StatisticsPtr
virtual PhysicalOptimizerResultPtr chainMe(int nextStageID, const StatisticsPtr &stats, PhysicalOptimizerResultPtr previous)
const vector< AtomicComputationPtr > & getPipeComputations() const
vector< AtomicComputationPtr > pipeComputations
string getNodeIdentifier() override
AdvancedPhysicalPipelineNodePtr getAdvancedPhysicalNodeHandle()
AdvancedPhysicalAbstractAlgorithmPtr selectedAlgorithm
Handle< SetIdentifier > sourceSetIdentifier
virtual AdvancedPhysicalAbstractAlgorithmPtr propose(std::vector< AdvancedPhysicalAbstractAlgorithmPtr > algorithms)
virtual PhysicalOptimizerResultPtr pipelineMe(int nextStageID, std::vector< AdvancedPhysicalPipelineNodePtr > pipeline, const StatisticsPtr &stats)
AdvancedPhysicalAbstractPipe(string &jobId, const Handle< ComputePlan > &computePlan, LogicalPlanPtr &logicalPlan, ConfigurationPtr &conf, vector< AtomicComputationPtr > &pipeComputations, size_t id)
std::shared_ptr< LogicalPlan > LogicalPlanPtr
void setSourceSetIdentifier(const Handle< SetIdentifier > &sourceSetIdentifier)
PhysicalOptimizerResultPtr analyze(const StatisticsPtr &stats, int nextStageID) override
double getCost(const StatisticsPtr &stats) override
std::unordered_map< std::string, std::string > getProbingHashSets()
AtomicComputationPtr getPipelineComputationAt(size_t idx)
virtual bool isExecuted()
bool isConsuming(Handle< SetIdentifier > &set) override
bool hasConsumers() override
virtual AdvancedPhysicalPipelineTypeID getType()=0
shared_ptr< Configuration > ConfigurationPtr
std::shared_ptr< struct AtomicComputation > AtomicComputationPtr
std::list< AbstractPhysicalNodePtr > consumers
virtual std::vector< AdvancedPhysicalAbstractAlgorithmPtr > getPossibleAlgorithms(const StatisticsPtr &stats)=0
Handle< SetIdentifier > getSetIdentifierFromComputation(Handle< Computation > computation)
std::shared_ptr< AdvancedPhysicalAbstractPipe > AdvancedPhysicalPipelineNodePtr
const Handle< SetIdentifier > & getSourceSetIdentifier() const
virtual bool isPipelinable(AdvancedPhysicalPipelineNodePtr node)
const bool isAggregating()
virtual bool isChainable()
std::list< AbstractPhysicalNodeWeakPtr > producers
std::shared_ptr< PhysicalOptimizerResult > PhysicalOptimizerResultPtr
const AdvancedPhysicalAbstractAlgorithmPtr & getSelectedAlgorithm() const
std::shared_ptr< AdvancedPhysicalAbstractAlgorithm > AdvancedPhysicalAbstractAlgorithmPtr