18 #ifndef TUPLESET_JOBSTAGE_H
19 #define TUPLESET_JOBSTAGE_H
80 return "TupleSetJobStage";
84 return TupleSetJobStage_TYPEID;
113 for (
int i = 0; i < (*buildTheseTupleSets).size(); i++) {
121 for (
const auto &i : buildMe) {
272 std::cout <<
"[JOB ID] jobId=" <<
jobId << std::endl;
273 std::cout <<
"[STAGE ID] id=" <<
id << std::endl;
274 std::cout <<
"[INPUT] databaseName=" <<
sourceContext->getDatabase()
276 std::cout <<
"[OUTPUT] databaseName=" <<
sinkContext->getDatabase()
277 <<
", setName=" <<
sinkContext->getSetName() << std::endl;
283 std::cout <<
"[TARGET COMPUTATION] targetComputationSpecifier="
285 std::cout <<
"[ALLOCATOR POLICY] allocatorPolicy=" << this->
allocatorPolicy << std::endl;
287 std::cout <<
"[PIPELINE]" << std::endl;
289 for (
size_t i = 0; i < mySize; i++) {
290 std::cout << i <<
": " << (*buildTheseTupleSets)[i] << std::endl;
293 std::cout <<
"[Probing] isProbing=" << this->
probeOrNot << std::endl;
294 std::cout <<
"Number of cluster nodes=" <<
getNumNodes() << std::endl;
300 std::cout <<
"Number of partitions on node-" << i <<
" is " << partitions->size()
302 std::cout <<
"IP address on node-" << i <<
" is " <<
getIPAddress(i) << std::endl;
305 std::cout <<
"Port on node-" << i <<
" is " << port << std::endl;
313 std::cout <<
"Hash sets to probe" << std::endl;
315 std::cout <<
"Atomic computation : \"" << (*it).key <<
"\" : Hash set \"" << (*it).value <<
"\"" << std::endl;
352 (*hashSetsToProbe)[targetTupleSetName] = hashSetName;
360 if ((
unsigned int)nodeId < numPartitions->size()) {
361 std::string ipStr = (*ipAddresses)[nodeId];
362 if (ipStr.find(
':') != std::string::npos) {
363 std::string ip = ipStr.substr(0, ipStr.find(
':'));
374 if ((
unsigned int)nodeId < numPartitions->size()) {
375 std::string ipStr = (*ipAddresses)[nodeId];
376 if (ipStr.find(
':') != std::string::npos) {
377 int port = atoi((ipStr.substr(ipStr.find(
':') + 1)).c_str());
bool inputAggHashOutOrNot
size_t totalMemoryOnThisNode
void setProbing(bool probeOrNot)
void setSinkContext(Handle< SetIdentifier > sinkContext)
Handle< SetIdentifier > sinkContext
int16_t getJobStageTypeID() override
Handle< Vector< String > > ipAddresses
Handle< SetIdentifier > getCombinerContext()
bool repartitionVectorOrNot
String targetComputationSpecifier
void setAllocatorPolicy(AllocatorPolicy myPolicy)
AllocatorPolicy allocatorPolicy
void setRepartition(bool repartitionOrNot)
TupleSetJobStage(JobStageID stageId)
std::string getSourceTupleSetSpecifier()
bool repartitionJoinOrNot
Handle< ComputePlan > sharedPlan
void setInputAggHashOut(bool inputAggHashOutOrNot)
Handle< Vector< String > > buildTheseTupleSets
void setRepartitionJoin(bool repartitionJoinOrNot)
size_t getTotalMemoryOnThisNode()
void setNumNodesToCollect(int numNodesToCollect)
std::string getJobStageType() override
void setComputePlan(const Handle< ComputePlan > &plan, const std::string &sourceTupleSetSpecifier, const std::string &targetTupleSetSpecifier, const std::string &targetComputationSpecifier)
Handle< SetIdentifier > getSinkContext()
Handle< SetIdentifier > getHashContext()
String sourceTupleSetSpecifier
void setSourceContext(Handle< SetIdentifier > sourceContext)
std::string getOutputTypeName()
Handle< Vector< HashPartitionID > > & getNumPartitions(int nodeId)
String targetTupleSetSpecifier
Handle< Map< String, String > > hashSetsToProbe
void setNumTotalPartitions(int numTotalPartitions)
int getNumTotalPartitions()
Handle< SetIdentifier > sourceContext
void setIPAddresses(Handle< Vector< String >> addresses)
TupleSetJobStage(JobStageID stageId, int numNodes)
AllocatorPolicy getAllocatorPolicy()
bool isRepartitionVector()
void setCombining(bool combineOrNot)
void setNodeId(NodeID nodeId)
void nullifyComputePlanPointer()
Handle< Vector< Handle< Vector< HashPartitionID > > > > numPartitions
String getIPAddress(int nodeId)
void setCombinerContext(Handle< SetIdentifier > combinerContext)
void setRepartitionVector(bool repartitionVectorOrNot)
void setHashContext(Handle< SetIdentifier > hashContext)
Handle< ComputePlan > getComputePlan()
Handle< SetIdentifier > getSourceContext()
std::string getTargetTupleSetSpecifier()
void setNumPartitions(Handle< Vector< Handle< Vector< HashPartitionID >>>> numPartitions)
void setHashSetsToProbe(Handle< Map< String, String >> hashSetsToProbe)
std::string getTargetComputationSpecifier()
void addHashSetToProbe(String targetTupleSetName, String hashSetName)
Handle< Map< String, String > > & getHashSets()
int getNumNodesToCollect()
void getTupleSetsToBuildPipeline(std::vector< std::string > &buildMe)
void setCollectAsMap(bool collectAsMapOrNot)
void setBroadcasting(bool broadcastOrNot)
void setTotalMemoryOnThisNode(size_t totalMem)
void addNumPartitions(Handle< Vector< HashPartitionID >> numPartitions)
void setNumNodes(int numNodes)
JobStageID getStageId() override
void setOutputTypeName(const std::string &outputTypeName)
Handle< SetIdentifier > combinerContext
Handle< SetIdentifier > hashContext
void setTupleSetsToBuildPipeline(const std::vector< std::string > &buildMe)