35 std::cout << node->getOutputName() << std::endl;
36 Handle<Computation> comp = logicalPlan->getNode(node->getComputationName()).getComputationHandle();
49 jobStageBuilder->setSourceTupleSetName(
node->getOutputName());
58 jobStageBuilder->setJobId(
jobId);
71 if(!result->success) {
94 case 0:
return analyzeOutput(jobStageBuilder, prevNode, stats, nextStageID);
121 activeConsumers.push_back(std::dynamic_pointer_cast<SimplePhysicalNode>(consumer));
127 if (source ==
nullptr) {
128 PDB_COUT <<
"WARNING: the set provided to the get cost is a nullptr\n";
133 if(stats ==
nullptr) {
134 PDB_COUT <<
"WARNING: there are not stats when looking for the set=" << source->toSourceSetName() <<
"\n";
139 double cost = stats->getNumBytes(source->getDatabase(), source->getSetName());
140 return cost / 1000000.0;
149 tupleStageBuilder->addTupleSetToBuildPipeline(
node->getOutputName());
165 if(result->success) {
181 Handle<SetIdentifier> sink = makeObject<SetIdentifier>(curComp->getDatabaseName(), curComp->getSetName());
184 tupleStageBuilder->setJobStageId(nextStageID);
185 tupleStageBuilder->setTargetTupleSetName(
node->getInputName());
186 tupleStageBuilder->setTargetComputationName(
node->getComputationName());
187 tupleStageBuilder->setOutputTypeName(curComp->getOutputType());
188 tupleStageBuilder->setSinkContext(sink);
189 tupleStageBuilder->setAllocatorPolicy(curComp->getAllocatorPolicy());
198 result->physicalPlanToOutput.emplace_back(jobStage);
199 result->success =
true;
213 std::string outputName =
node->getOutputName();
216 tupleSetJobStageBuilder->addTupleSetToBuildPipeline(outputName);
226 if (!curComp->needsMaterializeOutput()) {
229 curComp->setOutput(
jobId, outputName);
232 sink = makeObject<SetIdentifier>(
jobId, outputName);
233 sink->setPageSize(
conf->getPageSize());
236 result->interGlobalSets.push_back(sink);
239 sink = makeObject<SetIdentifier>(curComp->getDatabaseName(), curComp->getSetName());
243 tupleSetJobStageBuilder->setJobStageId(nextStageID);
244 tupleSetJobStageBuilder->setTargetTupleSetName(outputName);
245 tupleSetJobStageBuilder->setTargetComputationName(
node->getComputationName());
246 tupleSetJobStageBuilder->setOutputTypeName(curComp->getOutputType());
247 tupleSetJobStageBuilder->setSinkContext(sink);
248 tupleSetJobStageBuilder->setAllocatorPolicy(curComp->getAllocatorPolicy());
254 result->physicalPlanToOutput.emplace_back(jobStage);
255 result->success =
true;
271 return node->getOutputName();
AbstractPhysicalNodePtr getHandle()
virtual PhysicalOptimizerResultPtr analyzeOutput(TupleSetJobStageBuilderPtr &tupleStageBuilder, SimplePhysicalNodePtr &prevNode, const StatisticsPtr &stats, int nextStageID)
SimplePhysicalNode(string jobId, AtomicComputationPtr node, const Handle< ComputePlan > &computePlan, LogicalPlanPtr logicalPlan, ConfigurationPtr conf)
const AtomicComputationPtr & getNode() const
Handle< ComputePlan > computePlan
std::shared_ptr< Statistics > StatisticsPtr
PhysicalOptimizerResultPtr analyze(const StatisticsPtr &stats, int nextStageID) override
void addConsumer(const AbstractPhysicalNodePtr &consumer) override
double getCost(const StatisticsPtr &stats) override
LogicalPlanPtr logicalPlan
bool isConsuming(Handle< SetIdentifier > &set) override
std::shared_ptr< LogicalPlan > LogicalPlanPtr
std::string getNodeIdentifier() override
virtual PhysicalOptimizerResultPtr analyzeMultipleConsumers(TupleSetJobStageBuilderPtr &ptr, SimplePhysicalNodePtr &prevNode, const StatisticsPtr &stats, int nextStageID)
bool hasConsumers() override
const Handle< SetIdentifier > & getSourceSetIdentifier() const
shared_ptr< Configuration > ConfigurationPtr
std::shared_ptr< struct AtomicComputation > AtomicComputationPtr
std::shared_ptr< SimplePhysicalNode > SimplePhysicalNodePtr
virtual PhysicalOptimizerResultPtr analyzeSingleConsumer(TupleSetJobStageBuilderPtr &tupleStageBuilder, SimplePhysicalNodePtr &prevNode, const StatisticsPtr &stats, int nextStageID)
std::list< AbstractPhysicalNodePtr > consumers
std::shared_ptr< AbstractPhysicalNode > AbstractPhysicalNodePtr
Handle< SetIdentifier > getSetIdentifierFromComputation(Handle< Computation > computation)
SimplePhysicalNodePtr getSimpleNodeHandle()
virtual void addConsumer(const pdb::AbstractPhysicalNodePtr &consumer)
AtomicComputationPtr node
Handle< SetIdentifier > sourceSetIdentifier
std::list< SimplePhysicalNodePtr > activeConsumers
std::shared_ptr< TupleSetJobStageBuilder > TupleSetJobStageBuilderPtr
std::shared_ptr< PhysicalOptimizerResult > PhysicalOptimizerResultPtr