19 #ifndef COMPUTE_PLAN_CC
20 #define COMPUTE_PLAN_CC
47 myLogicalPlan.push_back(
'\0');
57 const int parseFailed{
yyparse(scanner, &myResult)};
63 std::cout <<
"Parse error when compiling TCAP: " << extra.errorMessage;
81 std::vector<AtomicComputationPtr>& listSoFar,
82 std::string& targetTupleSetName) {
85 if (listSoFar.back()->getOutputName() == targetTupleSetName) {
92 std::vector<AtomicComputationPtr>& nextOnes =
93 myPlan->getComputations().getConsumingAtomicComputations(listSoFar.back()->getOutputName());
96 for (
auto& a : nextOnes) {
99 listSoFar.push_back(a);
100 if (
recurse(myPlan, listSoFar, targetTupleSetName)) {
107 listSoFar.pop_back();
132 std::string targetTupleSetName,
133 std::string targetComputationName) {
135 if (targetComputationName.find(
"JoinComp") == std::string::npos) {
151 std::vector<AtomicComputationPtr>& consumers =
157 for (
auto& a : consumers) {
158 if (a->getComputationName() == targetComputationName) {
161 if (targetSpec == a->getInput()) {
162 targetProjection = a->getProjection();
163 targetAttsToOpOn = a->getInput();
169 if (a->getAtomicComputationType() != std::string(
"JoinSets")) {
170 std::cout <<
"This is bad... is the target computation name correct??";
171 std::cout <<
"Didn't find a JoinSets, target was " << targetSpec.
getSetName()
179 std::cout <<
"This is bad... is the target computation name correct??";
180 std::cout <<
"Find a JoinSets, target was " << targetSpec.
getSetName() <<
"\n";
191 myPlan->getNode(targetComputationName)
193 .getSinkMerger(targetSpec, targetAttsToOpOn, targetProjection, *
this);
199 std::string targetTupleSetName,
200 std::string targetComputationName) {
202 if (targetComputationName.find(
"JoinComp") == std::string::npos) {
217 std::vector<AtomicComputationPtr>& consumers =
223 for (
auto& a : consumers) {
224 if (a->getComputationName() == targetComputationName) {
227 if (targetSpec == a->getInput()) {
228 targetProjection = a->getProjection();
229 targetAttsToOpOn = a->getInput();
235 if (a->getAtomicComputationType() != std::string(
"JoinSets")) {
236 std::cout <<
"This is bad... is the target computation name correct??";
237 std::cout <<
"Didn't find a JoinSets, target was " << targetSpec.
getSetName()
245 std::cout <<
"This is bad... is the target computation name correct??";
246 std::cout <<
"Find a JoinSets, target was " << targetSpec.
getSetName() <<
"\n";
257 myPlan->getNode(targetComputationName)
259 .getSinkShuffler(targetSpec, targetAttsToOpOn, targetProjection, *
this);
266 std::string sourceTupleSetName,
267 std::string targetComputationName,
268 std::function<std::pair<void*, size_t>()> getPage,
269 std::function<
void(
void*)> discardTempPage,
270 std::function<
void(
void*)> writeBackPage) {
272 std::map<std::string, ComputeInfoPtr> params;
275 targetComputationName,
284 std::string targetTupleSetName,
285 std::string targetComputationName,
286 std::function<std::pair<void*, size_t>()> getPage,
287 std::function<
void(
void*)> discardTempPage,
288 std::function<
void(
void*)> writeBackPage) {
290 std::map<std::string, ComputeInfoPtr> params;
293 targetComputationName,
303 std::string sourceTupleSetName,
304 std::string targetComputationName,
305 std::function<std::pair<void*, size_t>()> getPage,
306 std::function<
void(
void*)> discardTempPage,
307 std::function<
void(
void*)> writeBackPage,
308 std::map<std::string, ComputeInfoPtr>& params) {
322 size_t numTupleSets = buildTheseTupleSets.size();
323 if (numTupleSets == 0) {
324 std::cout <<
"ERROR: there is no tuple sets to build pipeline" << std::endl;
329 std::string producerName =
331 std::cout <<
"producerName = " << producerName << std::endl;
337 std::cout <<
"origSpec: " << origSpec << std::endl;
341 myPlan->getNode(producerName).getComputation().getComputeSource(origSpec, *
this);
345 std::string targetTupleSetName = buildTheseTupleSets[numTupleSets - 1];
350 if (targetComputationName.find(
"SelectionComp") == std::string::npos) {
354 targetProjection = targetSpec;
355 targetAttsToOpOn = targetSpec;
364 if (targetSpec == a->getInput()) {
365 targetProjection = a->getProjection();
368 if (targetComputationName.find(
"JoinComp") == std::string::npos) {
369 targetSpec = targetProjection;
372 targetAttsToOpOn = a->getInput();
378 else if (a->getAtomicComputationType() != std::string(
"JoinSets")) {
379 std::cout <<
"This is bad... is the target computation name correct??";
380 std::cout <<
"Didn't find a JoinSets, target was " << targetSpec.
getSetName()
387 std::cout <<
"This is bad... is the target computation name correct??";
388 std::cout <<
"Find a JoinSets, target was " << targetSpec.
getSetName() <<
"\n";
397 targetProjection = targetSpec;
398 targetAttsToOpOn = targetSpec;
402 std::cout <<
"to get compute sink for " << targetComputationName <<
" using targetSpec=" <<
403 targetSpec <<
", targetAttsToOpOn=" << targetAttsToOpOn <<
", and targetProjection=" <<
404 targetProjection << std::endl;
406 myPlan->getNode(targetComputationName)
408 .getComputeSink(targetSpec, targetAttsToOpOn, targetProjection, *
this);
411 PipelinePtr returnVal = std::make_shared<Pipeline>(
412 getPage, discardTempPage, writeBackPage, computeSource, computeSink);
416 myPlan->getComputations().getProducingAtomicComputation(buildTheseTupleSets[0]);
418 for (
int i = 1; i < buildTheseTupleSets.size(); i++) {
422 myPlan->getComputations().getProducingAtomicComputation(buildTheseTupleSets[i]);
426 std::cout <<
"ERROR: We can't get producing computation and stop building" << std::endl;
432 if (a->getAtomicComputationType() ==
"Filter") {
433 if (params.count(a->getOutput().getSetName()) == 0) {
434 returnVal->addStage(std::make_shared<FilterExecutor>(
435 lastOne->getOutput(), a->getInput(), a->getProjection()));
439 std::make_shared<FilterExecutor>(lastOne->getOutput(),
442 params[a->getOutput().getSetName()]));
445 }
else if (a->getAtomicComputationType() ==
"Apply") {
448 if (params.count(a->getOutput().getSetName()) == 0) {
450 myPlan->getNode(a->getComputationName())
451 .getLambda(((
ApplyLambda*)a.get())->getLambdaToApply())
452 ->getExecutor(lastOne->getOutput(), a->getInput(), a->getProjection()));
454 returnVal->addStage(
myPlan->getNode(a->getComputationName())
455 .getLambda(((
ApplyLambda*)a.get())->getLambdaToApply())
456 ->getExecutor(lastOne->getOutput(),
459 params[a->getOutput().getSetName()]));
462 }
else if (a->getAtomicComputationType() ==
"HashLeft") {
465 if (params.count(a->getOutput().getSetName()) == 0)
467 myPlan->getNode(a->getComputationName())
468 .getLambda(((
HashLeft*)a.get())->getLambdaToApply())
469 ->getLeftHasher(lastOne->getOutput(), a->getInput(), a->getProjection()));
471 returnVal->addStage(
myPlan->getNode(a->getComputationName())
472 .getLambda(((
HashLeft*)a.get())->getLambdaToApply())
473 ->getLeftHasher(lastOne->getOutput(),
476 params[a->getOutput().getSetName()]));
478 }
else if (a->getAtomicComputationType() ==
"HashRight") {
481 if (params.count(a->getOutput().getSetName()) == 0)
483 myPlan->getNode(a->getComputationName())
484 .getLambda(((
HashLeft*)a.get())->getLambdaToApply())
485 ->getRightHasher(lastOne->getOutput(), a->getInput(), a->getProjection()));
487 returnVal->addStage(
myPlan->getNode(a->getComputationName())
488 .getLambda(((
HashLeft*)a.get())->getLambdaToApply())
489 ->getRightHasher(lastOne->getOutput(),
492 params[a->getOutput().getSetName()]));
494 }
else if (a->getAtomicComputationType() ==
"HashOne") {
495 if (params.count(a->getOutput().getSetName()) == 0) {
496 returnVal->addStage(std::make_shared<HashOneExecutor>(
497 lastOne->getOutput(), a->getInput(), a->getProjection()));
501 std::make_shared<HashOneExecutor>(lastOne->getOutput(),
504 params[a->getOutput().getSetName()]));
506 }
else if (a->getAtomicComputationType() ==
"Flatten") {
507 if (params.count(a->getOutput().getSetName()) == 0) {
508 returnVal->addStage(std::make_shared<FlattenExecutor>(
509 lastOne->getOutput(), a->getInput(), a->getProjection()));
513 std::make_shared<FlattenExecutor>(lastOne->getOutput(),
516 params[a->getOutput().getSetName()]));
519 }
else if (a->getAtomicComputationType() ==
"JoinSets") {
531 if (params.count(a->getOutput().getSetName()) == 0) {
534 lastOne->getOutput(),
540 lastOne->getOutput(),
543 params[a->getOutput().getSetName()]));
552 if (params.count(a->getOutput().getSetName()) == 0) {
555 lastOne->getOutput(),
561 lastOne->getOutput(),
564 params[a->getOutput().getSetName()]));
570 std::cout <<
"This is bad... found an unexpected computation type ("
581 std::string targetTupleSetName,
582 std::string targetComputationName,
583 std::function<std::pair<void*, size_t>()> getPage,
584 std::function<
void(
void*)> discardTempPage,
585 std::function<
void(
void*)> writeBackPage,
586 std::map<std::string, ComputeInfoPtr>& params) {
600 std::string producerName =
610 myPlan->getNode(producerName).getComputation().getComputeSource(origSpec, *
this);
615 std::vector<AtomicComputationPtr> listSoFar;
618 std::vector<AtomicComputationPtr>& nextOnes =
619 myPlan->getComputations().getConsumingAtomicComputations(origSpec.
getSetName());
623 for (
auto& a : nextOnes) {
624 listSoFar.push_back(a);
633 listSoFar.pop_back();
639 <<
"This is bad. Could not find a path from source computation to sink computation.\n";
649 std::vector<AtomicComputationPtr>& consumers =
655 for (
auto& a : consumers) {
656 if (a->getComputationName() == targetComputationName) {
661 if (targetSpec == a->getInput()) {
662 targetProjection = a->getProjection();
665 if (targetComputationName.find(
"JoinComp") == std::string::npos) {
666 targetSpec = targetProjection;
669 targetAttsToOpOn = a->getInput();
675 if (a->getAtomicComputationType() != std::string(
"JoinSets")) {
676 std::cout <<
"This is bad... is the target computation name correct??";
677 std::cout <<
"Didn't find a JoinSets, target was " << targetSpec.
getSetName()
685 std::cout <<
"This is bad... is the target computation name correct??";
686 std::cout <<
"Find a JoinSets, target was " << targetSpec.
getSetName() <<
"\n";
702 myPlan->getNode(targetComputationName)
704 .getComputeSink(targetSpec, targetAttsToOpOn, targetProjection, *
this);
707 PipelinePtr returnVal = std::make_shared<Pipeline>(
708 getPage, discardTempPage, writeBackPage, computeSource, computeSink);
712 myPlan->getComputations().getProducingAtomicComputation(sourceTupleSetName);
713 for (
auto& a : listSoFar) {
716 if (a->getAtomicComputationType() ==
"Filter") {
719 if (params.count(a->getOutput().getSetName()) == 0) {
720 returnVal->addStage(std::make_shared<FilterExecutor>(
721 lastOne->getOutput(), a->getInput(), a->getProjection()));
725 std::make_shared<FilterExecutor>(lastOne->getOutput(),
728 params[a->getOutput().getSetName()]));
731 }
else if (a->getAtomicComputationType() ==
"Apply") {
736 if (params.count(a->getOutput().getSetName()) == 0) {
738 myPlan->getNode(a->getComputationName())
739 .getLambda(((
ApplyLambda*)a.get())->getLambdaToApply())
740 ->getExecutor(lastOne->getOutput(), a->getInput(), a->getProjection()));
742 returnVal->addStage(
myPlan->getNode(a->getComputationName())
743 .getLambda(((
ApplyLambda*)a.get())->getLambdaToApply())
744 ->getExecutor(lastOne->getOutput(),
747 params[a->getOutput().getSetName()]));
750 }
else if (a->getAtomicComputationType() ==
"HashLeft") {
755 if (params.count(a->getOutput().getSetName()) == 0)
757 myPlan->getNode(a->getComputationName())
758 .getLambda(((
HashLeft*)a.get())->getLambdaToApply())
759 ->getLeftHasher(lastOne->getOutput(), a->getInput(), a->getProjection()));
761 returnVal->addStage(
myPlan->getNode(a->getComputationName())
762 .getLambda(((
HashLeft*)a.get())->getLambdaToApply())
763 ->getLeftHasher(lastOne->getOutput(),
766 params[a->getOutput().getSetName()]));
768 }
else if (a->getAtomicComputationType() ==
"HashRight") {
773 if (params.count(a->getOutput().getSetName()) == 0)
775 myPlan->getNode(a->getComputationName())
776 .getLambda(((
HashLeft*)a.get())->getLambdaToApply())
777 ->getRightHasher(lastOne->getOutput(), a->getInput(), a->getProjection()));
779 returnVal->addStage(
myPlan->getNode(a->getComputationName())
780 .getLambda(((
HashLeft*)a.get())->getLambdaToApply())
781 ->getRightHasher(lastOne->getOutput(),
784 params[a->getOutput().getSetName()]));
786 }
else if (a->getAtomicComputationType() ==
"HashOne") {
789 if (params.count(a->getOutput().getSetName()) == 0) {
790 returnVal->addStage(std::make_shared<HashOneExecutor>(
791 lastOne->getOutput(), a->getInput(), a->getProjection()));
795 std::make_shared<HashOneExecutor>(lastOne->getOutput(),
798 params[a->getOutput().getSetName()]));
801 }
else if (a->getAtomicComputationType() ==
"Flatten") {
804 if (params.count(a->getOutput().getSetName()) == 0) {
805 returnVal->addStage(std::make_shared<FlattenExecutor>(
806 lastOne->getOutput(), a->getInput(), a->getProjection()));
810 std::make_shared<FlattenExecutor>(lastOne->getOutput(),
813 params[a->getOutput().getSetName()]));
815 }
else if (a->getAtomicComputationType() ==
"JoinSets") {
830 if (params.count(a->getOutput().getSetName()) == 0) {
833 lastOne->getOutput(),
839 lastOne->getOutput(),
842 params[a->getOutput().getSetName()]));
851 if (params.count(a->getOutput().getSetName()) == 0) {
854 lastOne->getOutput(),
860 lastOne->getOutput(),
863 params[a->getOutput().getSetName()]));
869 std::cout <<
"This is bad... found an unexpected computation type ("
int yyparse(yyscan_t, struct AtomicComputationList **)
std::shared_ptr< ComputeSource > ComputeSourcePtr
PipelinePtr buildPipeline(std::string sourceTupleSetName, std::string targetTupleSetName, std::string targetComputationName, std::function< std::pair< void *, size_t >()> getPage, std::function< void(void *)> discardTempPage, std::function< void(void *)> writeBackPage, std::map< std::string, ComputeInfoPtr > ¶ms)
TupleSpec & getRightInput()
std::string & getComputationName()
std::shared_ptr< SinkShuffler > SinkShufflerPtr
struct yy_buffer_state * YY_BUFFER_STATE
SinkShufflerPtr getShuffler(std::string sourceTupleSetName, std::string targetTupleSetName, std::string targetComputationName)
std::shared_ptr< SinkMerger > SinkMergerPtr
std::shared_ptr< Pipeline > PipelinePtr
int yylex_init_extra(struct LexerExtra *, yyscan_t *)
std::shared_ptr< LogicalPlan > LogicalPlanPtr
std::string & getSetName()
Vector< Handle< Computation > > allComputations
ENABLE_DEEP_COPY ComputePlan()
AtomicComputationPtr getProducingAtomicComputation(std::string outputName)
std::vector< AtomicComputationPtr > & getConsumingAtomicComputations(std::string inputName)
std::string getProducingComputationName(std::string sourceTupleSetName)
std::shared_ptr< ComputeSink > ComputeSinkPtr
std::shared_ptr< struct AtomicComputation > AtomicComputationPtr
TupleSpec & getProjection()
int yylex_destroy(yyscan_t)
virtual ComputeExecutorPtr getExecutor(bool needToSwapAtts, TupleSpec &hashedInputSchema, TupleSpec &pipelinedInputSchema, TupleSpec &pipelinedAttsToOperateOn, TupleSpec &pipelinedAttsToIncludeInOutput, ComputeInfoPtr arg)=0
bool recurse(LogicalPlanPtr myPlan, std::vector< AtomicComputationPtr > &listSoFar, std::string &targetTupleSetName)
void nullifyPlanPointer()
TupleSpec & getRightProjection()
void yy_delete_buffer(YY_BUFFER_STATE, yyscan_t)
YY_BUFFER_STATE yy_scan_string(const char *, yyscan_t)
SinkMergerPtr getMerger(std::string sourceTupleSetName, std::string targetTupleSetName, std::string targetComputationName)