19 #ifndef PDB_JOINCOMPBASE_H
20 #define PDB_JOINCOMPBASE_H
58 template <
typename Out,
typename In1,
typename In2,
typename... Rest>
93 this->
proxy =
nullptr;
177 for (
int i = 0; i < inputNames.size(); i++) {
183 std::vector<std::string> inputsInPredicates =
185 for (
auto &inputsInPredicate : inputsInPredicates) {
190 std::vector<std::string> inputsInProjection =
192 for (
auto &i : inputsInProjection) {
209 void extractLambdas(std::map<std::string, GenericLambdaObjectPtr>& returnVal)
override {
213 selectionLambda.
toMap(returnVal, suffix);
214 projectionLambda.
toMap(returnVal, suffix);
219 return getTypeName<Out>();
224 const int extras =
sizeof...(Rest);
228 template <
typename First,
typename... Others>
229 typename std::enable_if<
sizeof...(Others) == 0, std::string>::type
getIthInputType(
int i) {
231 return getTypeName<First>();
233 std::cout <<
"Asked for an input type that didn't exist!";
239 template <
typename First,
typename... Others>
240 typename std::enable_if<
sizeof...(Others) != 0, std::string>::type
getIthInputType(
int i) {
242 return getTypeName<First>();
263 std::vector<std::string> typeList;
266 for (
auto& a : projection.
getAtts()) {
269 std::pair<std::string, std::string> res = producer->findSource(a, plan.
getPlan()->getComputations());
272 if (res.second.empty()) {
273 typeList.push_back(
"pdb::Handle<" + plan.
getPlan()->getNode(res.first).getComputation().getOutputType() +
">");
276 plan.
getPlan()->getNode(res.first).getLambda(res.second)->getOutputType();
279 if (myType.find_first_of(
"pdb::Handle<") == 0) {
280 typeList.push_back(myType);
282 typeList.push_back(
"pdb::Handle<" + myType +
">");
289 std::vector<int> whereEveryoneGoes;
292 return correctJoinTuple->getMerger();
303 std::vector<std::string> typeList;
306 for (
auto& a : projection.
getAtts()) {
309 std::pair<std::string, std::string> res = producer->findSource(a, plan.
getPlan()->getComputations());
312 if (res.second.empty()) {
313 typeList.push_back(
"pdb::Handle<" + plan.
getPlan()->getNode(res.first).getComputation().getOutputType() +
">");
315 std::string myType = plan.
getPlan()->getNode(res.first).getLambda(res.second)->getOutputType();
317 if (myType.find_first_of(
"pdb::Handle<") == 0) {
318 typeList.push_back(myType);
320 typeList.push_back(
"pdb::Handle<" + myType +
">");
326 std::vector<int> whereEveryoneGoes;
329 return correctJoinTuple->getShuffler();
340 std::vector<std::string> typeList;
343 for (
auto& a : projection.
getAtts()) {
347 std::pair<std::string, std::string> res = producer->findSource(a, plan.
getPlan()->getComputations());
350 if (res.second.empty()) {
353 plan.
getPlan()->getNode(res.first).getComputation().getOutputType() +
">");
356 std::string myType = plan.
getPlan()->getNode(res.first).getLambda(res.second)->getOutputType();
358 if (myType.find_first_of(
"pdb::Handle<") == 0) {
359 typeList.push_back(myType);
361 typeList.push_back(
"pdb::Handle<" + myType +
">");
367 std::vector<int> whereEveryoneGoes;
371 return correctJoinTuple->getSink(consumeMe, attsToOpOn, projection, whereEveryoneGoes);
391 std::vector<std::string> typeList;
394 for (
auto& a : outputScheme.
getAtts()) {
395 if (a.find(
"hash") != std::string::npos) {
400 std::pair<std::string, std::string> res = producer->findSource(a, plan.
getPlan()->getComputations());
403 if (res.second.empty()) {
404 typeList.push_back(
"pdb::Handle<" + plan.
getPlan()->getNode(res.first).getComputation().getOutputType() +
">");
407 std::string myType = plan.
getPlan()->getNode(res.first).getLambda(res.second)->getOutputType();
409 if (myType.find_first_of(
"pdb::Handle<") == 0) {
410 typeList.push_back(myType);
412 typeList.push_back(
"pdb::Handle<" + myType +
">");
419 std::vector<int> whereEveryoneGoes;
422 return correctJoinTuple->getPartitionedSource(
428 std::cout <<
"Error in JoinComp: partitioned join source has a null iterator"
436 if (page !=
nullptr) {
446 if (this->
proxy !=
nullptr) {
447 char* curBytes = freeMe->getRawBytes();
449 curBytes = curBytes +
sizeof(
NodeID);
455 freeMe->decRefCount();
456 if (freeMe->getRefCount() == 0) {
457 #ifdef PROFILING_CACHE
458 std::cout <<
"To unpin Join source page with DatabaseID=" << dbId
459 <<
", UserTypeID=" << typeId <<
", SetID=" << setId
460 <<
", PageID=" << freeMe->getPageID() << std::endl;
463 this->
proxy->unpinUserPage(nodeId, dbId, typeId, setId, freeMe,
false);
466 this->
proxy->unpinUserPage(nodeId, dbId, typeId, setId, freeMe,
false);
470 #ifdef PROFILING_CACHE
472 std::cout <<
"Can't unpin Join source page with DatabaseID=" << dbId
473 <<
", UserTypeID=" << typeId <<
", SetID=" << setId
474 <<
", PageID=" << freeMe->getPageID()
475 <<
", reference count=" << freeMe->getRefCount() << std::endl;
491 return std::string(
"JoinComp");
500 std::string
toTCAPString(std::vector<InputTupleSetSpecifier>& inputTupleSets,
501 int computationLabel,
503 std::vector<std::string>& outputColumnNames,
504 std::string& addedOutputColumnName)
override {
507 std::string tcapString;
512 std::vector<std::string> inputNames;
515 for (
unsigned int i = 0; i < inputTupleSets.size(); i++) {
519 inputNames.push_back(inputTupleSets[i].getColumnNamesToApply()[0]);
524 std::string inputTupleSetName;
525 std::vector<std::string> inputColumnNames;
526 std::vector<std::string> inputColumnsToApply;
527 std::vector<std::string> childrenLambdaNames;
529 std::string myLambdaName;
531 tcapString += selectionLambda.
toTCAPString(inputTupleSetName,
540 addedOutputColumnName,
547 tcapString +=
"\n/* run Join projection on ( " + inputsInProjection[0];
548 for (
unsigned int i = 1; i < inputsInProjection.size(); i++) {
549 tcapString +=
" " + inputsInProjection[i];
551 tcapString +=
" )*/\n";
554 inputColumnNames.clear();
555 inputColumnsToApply.clear();
556 childrenLambdaNames.clear();
557 for (
unsigned int index = 0; index < multiInputsComp->
getNumInputs(); index++) {
561 tcapString += projectionLambda.
toTCAPString(inputTupleSetName,
570 addedOutputColumnName,
582 std::cout <<
"ERROR: inputTupleSet size is " << inputTupleSets.size()
583 <<
" and not equivalent with Join's inputs " <<
getNumInputs() << std::endl;
602 TupleSpec& pipelinedAttsToIncludeInOutput,
610 std::vector<std::string> typeList;
612 for (
auto& a : (hashedInputSchema.
getAtts())) {
615 std::pair<std::string, std::string> res = producer->findSource(a, joinArg.
plan.
getPlan()->getComputations());
618 if (res.second.empty()) {
619 typeList.push_back(
"pdb::Handle<" + joinArg.
plan.
getPlan()->getNode(res.first).getComputation().getOutputType() +
">");
622 std::string myType = joinArg.
plan.
getPlan()->getNode(res.first).getLambda(res.second)->getOutputType();
624 if (myType.find_first_of(
"pdb::Handle<") == 0) {
625 typeList.push_back(myType);
627 typeList.push_back(
"pdb::Handle<" + myType +
">");
633 std::vector<int> whereEveryoneGoes;
639 pipelinedInputSchema,
640 pipelinedAttsToOperateOn,
641 pipelinedAttsToIncludeInOutput,
649 TupleSpec& pipelinedAttsToIncludeInOutput)
override {
650 std::cout <<
"Currently, no pipelined version of the join doesn't take an arg.\n";
658 #endif //PDB_JOINCOMPBASE_H
ComputeSourcePtr getComputeSource(TupleSpec &outputScheme, ComputePlan &plan) override
shared_ptr< PDBPage > PDBPagePtr
std::shared_ptr< ComputeSource > ComputeSourcePtr
ComputeExecutorPtr getExecutor(bool needToSwapAtts, TupleSpec &hashedInputSchema, TupleSpec &pipelinedInputSchema, TupleSpec &pipelinedAttsToOperateOn, TupleSpec &pipelinedAttsToIncludeInOutput, ComputeInfoPtr arg) override
void * pageWhereHashTableIs
SinkMergerPtr getSinkMerger(TupleSpec &consumeMe, TupleSpec &attsToOpOn, TupleSpec &projection, ComputePlan &plan) override
std::shared_ptr< JoinTupleSingleton > JoinTuplePtr
std::vector< std::string > & getAtts()
JoinArg(ComputePlan &plan, void *pageWhereHashTableIs)
shared_ptr< DataProxy > DataProxyPtr
std::shared_ptr< SinkShuffler > SinkShufflerPtr
ComputationTypeID getComputationTypeID() override
void analyzeInputSets(std::vector< std::string > &inputNames)
std::enable_if< sizeof...(Others)!=0, std::string >::type getIthInputType(int i)
std::shared_ptr< SinkMerger > SinkMergerPtr
PageCircularBufferIteratorPtr iterator
void toMap(std::map< std::string, GenericLambdaObjectPtr > &returnVal, int &suffix)
std::shared_ptr< ComputeInfo > ComputeInfoPtr
MultiInputsBase * getMultiInputsBase()
std::string & getSetName()
std::string getOutputType() override
void setBatchSize(int batchSize) override
ComputeSinkPtr getComputeSink(TupleSpec &consumeMe, TupleSpec &attsToOpOn, TupleSpec &projection, ComputePlan &plan) override
std::string toTCAPString(std::vector< InputTupleSetSpecifier > &inputTupleSets, int computationLabel, std::string &outputTupleSetName, std::vector< std::string > &outputColumnNames, std::string &addedOutputColumnName) override
MultiInputsBase * multiInputsBase
ComputeExecutorPtr getExecutor(bool needToSwapAtts, TupleSpec &hashedInputSchema, TupleSpec &pipelinedInputSchema, TupleSpec &pipelinedAttsToOperateOn, TupleSpec &pipelinedAttsToIncludeInOutput) override
SinkShufflerPtr getSinkShuffler(TupleSpec &consumeMe, TupleSpec &attsToOpOn, TupleSpec &projection, ComputePlan &plan) override
void setPartitionId(size_t myPartitionId)
std::enable_if< std::is_base_of< JoinTupleBase, In1 >::value, JoinTuplePtr >::type findCorrectJoinTuple(std::vector< std::string > &typeList, std::vector< int > &whereEveryoneGoes)
std::shared_ptr< ComputeSink > ComputeSinkPtr
auto callGetProjection(TypeToCallMethodOn &a, decltype(HasTwoArgs::test(&a))*arg=nullptr)
void setOutputColumnToApply(std::string outputColumnToApply)
std::string getIthInputType(int i) final
std::shared_ptr< struct AtomicComputation > AtomicComputationPtr
virtual Lambda< bool > getSelection(Handle< In1 > in1, Handle< In2 > in2, Handle< Rest >...otherArgs)=0
std::shared_ptr< ComputeExecutor > ComputeExecutorPtr
auto callGetSelection(TypeToCallMethodOn &a, decltype(HasTwoArgs::test(&a))*arg=nullptr)
void setMultiInputsBaseToNull()
void setJoinType(JoinType joinType)
std::vector< std::string > getAllInputs(MultiInputsBase *multiInputsBase)
void setNumPartitions(int numPartitions)
std::enable_if< sizeof...(Others)==0, std::string >::type getIthInputType(int i)
std::string getComputationType() override
void setOutputTupleSetName(std::string outputTupleSetName)
~JoinArg() override=default
void setIterator(PageCircularBufferIteratorPtr iterator)
void setNumNodes(int numNodes)
void makeObjectAllocatorBlock(size_t numBytesIn, bool throwExceptionOnFail)
virtual Lambda< Handle< Out > > getProjection(Handle< In1 > in1, Handle< In2 > in2, Handle< Rest >...otherArgs)=0
std::string toTCAPString(std::string inputTupleSetName, std::vector< std::string > &inputColumnNames, std::vector< std::string > &inputColumnsToApply, std::vector< std::string > &childrenLambdaNames, int &lambdaLabel, std::string computationName, int computationLabel, std::string &outputTupleSetName, std::vector< std::string > &outputColumnNames, std::string &addedOutputColumnName, std::string &myLambdaName, bool whetherToRemoveUnusedOutputColumns, MultiInputsBase *multiInputsComp=nullptr, bool amIPartOfJoinPredicate=false)
shared_ptr< PageCircularBufferIterator > PageCircularBufferIteratorPtr
void setProxy(DataProxyPtr proxy)
void extractLambdas(std::map< std::string, GenericLambdaObjectPtr > &returnVal) override
String outputTupleSetName