33 template <
class LHS,
class RHS>
34 std::enable_if_t<std::is_base_of<PtrBase, LHS>::value && std::is_base_of<PtrBase, RHS>::value,
bool>
39 template <
class LHS,
class RHS>
40 std::enable_if_t<std::is_base_of<PtrBase, LHS>::value && !(std::is_base_of<PtrBase, RHS>::value),
46 template <
class LHS,
class RHS>
47 std::enable_if_t<!(std::is_base_of<PtrBase, LHS>::value) && std::is_base_of<PtrBase, RHS>::value,
53 template <
class LHS,
class RHS>
54 std::enable_if_t<!(std::is_base_of<PtrBase, LHS>::value) && !(std::is_base_of<PtrBase, RHS>::value),
60 template <
class LeftType,
class RightType>
71 PDB_COUT <<
"ANDLambda: LHS index is " << lhs.getInputIndex(0) << std::endl;
72 PDB_COUT <<
"ANDLambda: RHS index is " << rhs.getInputIndex(0) << std::endl;
73 PDB_COUT <<
"ANDLambda: LHS type is " << getTypeName<LeftType>() << std::endl;
74 PDB_COUT <<
"ANDLambda: RHS type is " << getTypeName<RightType>() << std::endl;
80 return std::string(
"&&");
102 TupleSpec& attsToIncludeInOutput)
override {
109 std::make_shared<TupleSetSetupMachine>(inputSchema, attsToIncludeInOutput);
112 std::vector<int> inputAtts = myMachine->match(attsToOperateOn);
113 int firstAtt = inputAtts[0];
114 int secondAtt = inputAtts[1];
117 auto outAtt = (int) attsToIncludeInOutput.
getAtts().size();
119 return std::make_shared<SimpleComputeExecutor>(
124 myMachine->setup(input, output);
127 std::vector<LeftType>& leftColumn = input->getColumn<LeftType>(firstAtt);
128 std::vector<RightType>& rightColumn = input->getColumn<RightType>(secondAtt);
131 if (!output->hasColumn(outAtt)) {
132 auto outColumn =
new std::vector<bool>;
133 output->addColumn(outAtt, outColumn,
true);
137 std::vector<bool>& outColumn = output->getColumn<
bool>(outAtt);
140 auto numTuples = leftColumn.size();
141 outColumn.resize(numTuples);
142 for (
int i = 0; i < numTuples; i++) {
143 outColumn[i] =
checkAnd(leftColumn[i], rightColumn[i]);
153 std::vector<std::string>& inputColumnNames,
154 std::vector<std::string>& inputColumnsToApply,
155 std::vector<std::string>& childrenLambdaNames,
157 std::string computationName,
158 int computationLabel,
159 std::string& outputTupleSetName,
160 std::vector<std::string>& outputColumns,
161 std::string& outputColumnName,
162 std::string& myLambdaName,
164 bool amIPartOfJoinPredicate =
false,
165 bool amILeftChildOfEqualLambda =
false,
166 bool amIRightChildOfEqualLambda =
false,
167 std::string parentLambdaName =
"",
168 bool isSelfJoin =
false)
override {
170 if ((multiInputsComp !=
nullptr) && amIPartOfJoinPredicate) {
171 std::string tcapString;
172 std::string myComputationName = computationName +
"_" + std::to_string(computationLabel);
174 unsigned int leftIndex = lhs.getInputIndex(0);
175 std::vector<std::string> lhsColumnNames =
176 multiInputsComp->getInputColumnsForIthInput(leftIndex);
177 std::vector<std::string> lhsInputNames;
178 for (
const auto &curColumnName : lhsColumnNames) {
179 for (
int j = 0; j < multiInputsComp->getNumInputs(); j++) {
180 if (multiInputsComp->getNameForIthInput(j) == curColumnName) {
181 lhsInputNames.push_back(curColumnName);
188 unsigned int rightIndex = rhs.getInputIndex(0);
189 std::vector<std::string> rhsColumnNames =
190 multiInputsComp->getInputColumnsForIthInput(rightIndex);
191 std::vector<std::string> rhsInputNames;
192 for (
const auto &curColumnName : rhsColumnNames) {
193 for (
int j = 0; j < multiInputsComp->getNumInputs(); j++) {
194 if (multiInputsComp->getNameForIthInput(j) == curColumnName) {
195 rhsInputNames.push_back(curColumnName);
202 std::vector<std::string> inputNamesIntersection;
204 for (
const auto &lhsInputName : lhsInputNames) {
205 for (
const auto &rhsInputName : rhsInputNames) {
206 if (lhsInputName == rhsInputName) {
207 inputNamesIntersection.push_back(lhsInputName);
212 if (!inputNamesIntersection.empty()) {
217 std::string leftTupleSetName =
218 multiInputsComp->getTupleSetNameForIthInput(leftIndex);
219 std::string leftColumnToApply = lhsInputNames[0];
220 std::vector<std::string> leftColumnsToApply;
221 leftColumnsToApply.push_back(leftColumnToApply);
222 std::string leftOutputTupleSetName =
"hashOneFor_" + leftColumnToApply +
"_" +
223 std::to_string(computationLabel) +
"_" + std::to_string(lambdaLabel);
224 std::string leftOutputColumnName =
"OneFor_left_" +
225 std::to_string(computationLabel) +
"_" + std::to_string(lambdaLabel);
226 std::vector<std::string> leftOutputColumns;
227 for (
const auto &lhsColumnName : lhsColumnNames) {
228 leftOutputColumns.push_back(lhsColumnName);
230 leftOutputColumns.push_back(leftOutputColumnName);
234 leftOutputTupleSetName,
236 leftOutputColumnName,
240 std::map<std::string, std::string>());
243 std::string rightTupleSetName = multiInputsComp->getTupleSetNameForIthInput(rightIndex);
244 std::string rightColumnToApply = rhsInputNames[0];
245 std::vector<std::string> rightColumnsToApply;
246 rightColumnsToApply.push_back(rightColumnToApply);
247 std::string rightOutputTupleSetName =
"hashOneFor_" + rightColumnToApply +
"_" + std::to_string(computationLabel) +
"_" + std::to_string(lambdaLabel);
248 std::string rightOutputColumnName =
"OneFor_right_" + std::to_string(computationLabel) +
"_" + std::to_string(lambdaLabel);
249 std::vector<std::string> rightOutputColumns;
250 for (
const auto &rhsColumnName : rhsColumnNames) {
251 rightOutputColumns.push_back(rhsColumnName);
253 rightOutputColumns.push_back(rightOutputColumnName);
257 rightOutputTupleSetName,
259 rightOutputColumnName,
263 std::map<std::string, std::string>());
266 tcapString +=
"\n/* CartesianJoin ( " + lhsInputNames[0];
268 outputTupleSetName =
"CartesianJoined__" + lhsInputNames[0];
269 for (
unsigned int i = 1; i < lhsInputNames.size(); i++) {
270 outputTupleSetName +=
"_" + lhsInputNames[i];
271 tcapString +=
" " + lhsInputNames[i];
273 outputTupleSetName +=
"___" + rhsInputNames[0];
274 tcapString +=
" ) and ( " + rhsInputNames[0];
275 for (
unsigned int i = 1; i < rhsInputNames.size(); i++) {
276 outputTupleSetName +=
"_" + rhsInputNames[i];
277 tcapString +=
" " + rhsInputNames[i];
279 outputTupleSetName +=
"_";
280 tcapString +=
" ) */\n";
283 outputColumns.clear();
284 tcapString += outputTupleSetName +
"(" + lhsColumnNames[0];
285 outputColumns.push_back(lhsColumnNames[0]);
286 for (
unsigned int i = 1; i < lhsColumnNames.size(); i++) {
287 tcapString +=
", " + lhsColumnNames[i];
288 outputColumns.push_back(lhsColumnNames[i]);
290 tcapString +=
", " + rhsColumnNames[0];
291 outputColumns.push_back(rhsColumnNames[0]);
292 for (
unsigned int i = 1; i < rhsColumnNames.size(); i++) {
293 tcapString +=
", " + rhsColumnNames[i];
294 outputColumns.push_back(rhsColumnNames[i]);
296 tcapString +=
") <= JOIN (" + leftOutputTupleSetName +
"(" + leftOutputColumnName +
297 "), " + leftOutputTupleSetName +
"(" + lhsColumnNames[0];
298 for (
unsigned int i = 1; i < lhsColumnNames.size(); i++) {
299 tcapString +=
", " + lhsColumnNames[i];
301 tcapString +=
"), " + rightOutputTupleSetName +
"(" + rightOutputColumnName +
302 "), " + rightOutputTupleSetName +
"(" + rhsColumnNames[0];
303 for (
unsigned int i = 1; i < rhsColumnNames.size(); i++) {
304 tcapString +=
", " + rhsColumnNames[i];
306 tcapString +=
"), '" + myComputationName +
"')\n";
309 for (
unsigned int i = 0; i < multiInputsComp->getNumInputs(); i++) {
310 std::string curInput = multiInputsComp->getNameForIthInput(i);
311 auto iter = std::find(outputColumns.begin(), outputColumns.end(), curInput);
312 if (iter != outputColumns.end()) {
313 multiInputsComp->setTupleSetNameForIthInput(i, outputTupleSetName);
314 multiInputsComp->setInputColumnsForIthInput(i, outputColumns);
315 multiInputsComp->setInputColumnsToApplyForIthInput(i, outputColumns);
330 std::map<std::string, std::string>
getInfo()
override {
333 return std::map<std::string, std::string>{
std::map< std::string, std::string > getInfo() override
ComputeExecutorPtr getExecutor(TupleSpec &inputSchema, TupleSpec &attsToOperateOn, TupleSpec &attsToIncludeInOutput) override
int getNumChildren() override
std::vector< std::string > & getAtts()
std::string getTCAPString(const std::string &inputTupleSetName, const std::vector< std::string > &inputColumnNames, const std::vector< std::string > &inputColumnsToApply, const std::string &outputTupleSetName, const std::vector< std::string > &outputColumns, const std::string &outputColumnName, const std::string &tcapOperation, const std::string &computationNameAndLabel, const std::string &lambdaNameAndLabel, const std::map< std::string, std::string > &info)
std::shared_ptr< TupleSetSetupMachine > TupleSetSetupMachinePtr
unsigned int getNumInputs() override
std::shared_ptr< GenericLambdaObject > GenericLambdaObjectPtr
AndLambda(LambdaTree< LeftType > lhsIn, LambdaTree< RightType > rhsIn)
std::shared_ptr< TupleSet > TupleSetPtr
GenericLambdaObjectPtr getChild(int which) override
std::shared_ptr< ComputeExecutor > ComputeExecutorPtr
LambdaTree< LeftType > lhs
std::enable_if_t< std::is_base_of< PtrBase, LHS >::value &&std::is_base_of< PtrBase, RHS >::value, bool > checkAnd(LHS lhs, RHS rhs)
std::string toTCAPString(std::vector< std::string > &inputTupleSetNames, std::vector< std::string > &inputColumnNames, std::vector< std::string > &inputColumnsToApply, std::vector< std::string > &childrenLambdaNames, int lambdaLabel, std::string computationName, int computationLabel, std::string &outputTupleSetName, std::vector< std::string > &outputColumns, std::string &outputColumnName, std::string &myLambdaName, MultiInputsBase *multiInputsComp=nullptr, bool amIPartOfJoinPredicate=false, bool amILeftChildOfEqualLambda=false, bool amIRightChildOfEqualLambda=false, std::string parentLambdaName="", bool isSelfJoin=false) override
LambdaTree< RightType > rhs
void setInputIndex(int i, unsigned int index)
std::string getTypeOfLambda() override