Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]

2024-01-26 Thread via GitHub


davidradl commented on PR #97:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/97#issuecomment-1912127835

   Thanks @snuyanzin :-)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]

2024-01-26 Thread via GitHub


boring-cyborg[bot] commented on PR #97:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/97#issuecomment-1912124003

   Awesome work, congrats on your first merged pull request!
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]

2024-01-26 Thread via GitHub


snuyanzin merged PR #97:
URL: https://github.com/apache/flink-connector-jdbc/pull/97


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]

2024-01-26 Thread via GitHub


davidradl commented on PR #97:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/97#issuecomment-1912096605

   @snuyanzin as discussed here is the pr for the back port.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]

2024-01-26 Thread via GitHub


davidradl opened a new pull request, #97:
URL: https://github.com/apache/flink-connector-jdbc/pull/97

   This closes apache/flink-connector-jdbc#79
   
   Creating as draft while testing


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]

2024-01-26 Thread via GitHub


snuyanzin commented on PR #79:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/79#issuecomment-1912023410

   Merged as 5a90eb0a73ca0ac8475331a74ae8f7c1c01646bb
   
   Thanks everyone for making it 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]

2024-01-26 Thread via GitHub


snuyanzin closed pull request #79: [FLINK-33365] include filters with Lookup 
joins
URL: https://github.com/apache/flink-connector-jdbc/pull/79


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]

2024-01-26 Thread via GitHub


davidradl commented on PR #79:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/79#issuecomment-1911754647

   > > @libenchao a question to you before merging `squash` also removed the 
info about all the authors, are you ok if I add you as a co-author since it is 
based on your work? Or you can do it on your own and merge
   > 
   > Sure, thanks.
   
   @libenchao @snuyanzin I have amended the commit message and added you both 
as co-authors. I made an educated guess as to the name (git id) and email to 
use. If you prefer something else - let me know.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]

2024-01-25 Thread via GitHub


libenchao commented on PR #79:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/79#issuecomment-1911291438

   > @libenchao a question to you before merging `squash` also removed the info 
about all the authors, are you ok if I add you as a co-author since it is based 
on your work? Or you can do it on your own and merge
   
   Sure, thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]

2024-01-25 Thread via GitHub


snuyanzin commented on PR #79:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/79#issuecomment-1910975978

   @libenchao a question to you before merging
   `squash` also removed the info about all the authors, are you ok if I add 
you as a co-author since it is based on your work?
   Or you can do it on your own and merge


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]

2024-01-25 Thread via GitHub


davidradl commented on PR #79:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/79#issuecomment-1910329090

   > yes, backport would make sense, i guess it could be cherry-picked
   
   @snuyanzin once this pr is merged - I assume I raise a second pr against 
this issue, would you be ok to merge that please?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]

2024-01-25 Thread via GitHub


snuyanzin commented on PR #79:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/79#issuecomment-1910277677

   yes, backport would make sense, i guess it could be cherry-picked


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]

2024-01-25 Thread via GitHub


davidradl commented on PR #79:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/79#issuecomment-1909958760

   > for future: it is better not to squash commits unless it was requested 
explicitely by reviewer. The reason is that since there is no tool in GitHub 
confirming that the set of changes is same before and after sqaush then ideally 
need to go again through the changes and double check it...
   
   @snuyanzin   OK, I see it makes review more difficult .  I have been asked 
in the past to squash - as per [the 
process](https://cwiki.apache.org/confluence/display/FLINK/Merging+Pull+Requests),
 where is says `MUST squash fixup commits (for example commits that are 
addressing review comments).` I think it is pragmatic to follow what you say 
and only squash on the committers request.  I notice in this process, it talks 
about back porting. Should I back port for this fix to the last connector 
release?  


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]

2024-01-25 Thread via GitHub


snuyanzin commented on PR #79:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/79#issuecomment-1909908037

   for future: it is better not to squash commits unless it was requested 
explicitely by reviewer.
   The reason is that since there is no tool in GitHub confirming that the set 
of changes is same before and after sqaush then ideally need to go again 
through the changes and double check it...


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]

2024-01-25 Thread via GitHub


davidradl commented on PR #79:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/79#issuecomment-1909799398

   @snuyanzin @libenchao I have squashed the commit. Thank you both for all 
your help and support on this. Are you OK to  merge?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]

2024-01-25 Thread via GitHub


davidradl closed pull request #79: [FLINK-33365] include filters with Lookup 
joins
URL: https://github.com/apache/flink-connector-jdbc/pull/79


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]

2024-01-23 Thread via GitHub


davidradl commented on PR #79:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/79#issuecomment-1906068499

   @libenchao @snuyanzin I have consolidated the feedback into this branch.  
When you both approve I will squash the commits ready for merge.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]

2024-01-23 Thread via GitHub


davidradl commented on code in PR #79:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/79#discussion_r1463286945


##
flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcRowDataLookupFunction.java:
##
@@ -71,7 +75,9 @@ public JdbcRowDataLookupFunction(
 String[] fieldNames,
 DataType[] fieldTypes,
 String[] keyNames,
-RowType rowType) {
+RowType rowType,

Review Comment:
   Interesting - I see. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]

2024-01-23 Thread via GitHub


snuyanzin commented on code in PR #79:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/79#discussion_r1463273839


##
flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcRowDataLookupFunction.java:
##
@@ -71,7 +75,9 @@ public JdbcRowDataLookupFunction(
 String[] fieldNames,
 DataType[] fieldTypes,
 String[] keyNames,
-RowType rowType) {
+RowType rowType,

Review Comment:
   yes, however it could be calculated from `fieldNames` and `fieldTypes` 
exactly in same way like it is done at lines 109-110
   anyway this is just a thing to note, it seems not related to this PR



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]

2024-01-23 Thread via GitHub


davidradl commented on code in PR #79:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/79#discussion_r1463211057


##
flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcRowDataLookupFunction.java:
##
@@ -71,7 +75,9 @@ public JdbcRowDataLookupFunction(
 String[] fieldNames,
 DataType[] fieldTypes,
 String[] keyNames,
-RowType rowType) {
+RowType rowType,

Review Comment:
   it is used on line 108this.jdbcRowConverter = 
jdbcDialect.getRowConverter(rowType);
   It looks like the field names are used to check that the key is a field, 
then the field types to look up the key type. 
   
   Are you thinking of some more optimal way of doing this?
 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]

2024-01-23 Thread via GitHub


davidradl commented on code in PR #79:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/79#discussion_r1463211057


##
flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcRowDataLookupFunction.java:
##
@@ -71,7 +75,9 @@ public JdbcRowDataLookupFunction(
 String[] fieldNames,
 DataType[] fieldTypes,
 String[] keyNames,
-RowType rowType) {
+RowType rowType,

Review Comment:
   it is used on line 108this.jdbcRowConverter = 
jdbcDialect.getRowConverter(rowType);



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]

2024-01-23 Thread via GitHub


davidradl commented on code in PR #79:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/79#discussion_r1463161889


##
flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcRowDataLookupFunctionTest.java:
##
@@ -82,6 +103,114 @@ void testLookup(boolean withFailure) throws Exception {
 assertThat(result).isEqualTo(expected);
 }
 
+@ParameterizedTest
+@MethodSource("lookupWithPredicatesProvider")
+void testEval(TestSpec testSpec) throws Exception {
+JdbcRowDataLookupFunction lookupFunction =
+buildRowDataLookupFunctionWithPredicates(
+testSpec.withFailure, testSpec.resolvedPredicates, 
testSpec.pushdownParams);
+
+ListOutputCollector collector = new ListOutputCollector();
+lookupFunction.setCollector(collector);
+lookupFunction.open(null);
+lookupFunction.eval(testSpec.keys);
+
+if (testSpec.withFailure) {
+// Close connection here, and this will be recovered by retry
+if (lookupFunction.getDbConnection() != null) {
+lookupFunction.getDbConnection().close();
+}
+}
+
+List result =
+new ArrayList<>(collector.getOutputs())
+
.stream().map(RowData::toString).sorted().collect(Collectors.toList());
+Collections.sort(testSpec.expected);
+assertThat(result).isEqualTo(testSpec.expected);
+}
+
+private static class TestSpec {

Review Comment:
   good spot - I will add one



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]

2024-01-23 Thread via GitHub


davidradl commented on code in PR #79:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/79#discussion_r1463160400


##
flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcRowDataLookupFunctionTest.java:
##
@@ -82,6 +103,121 @@ void testLookup(boolean withFailure) throws Exception {
 assertThat(result).isEqualTo(expected);
 }
 
+@ParameterizedTest
+@MethodSource("lookupWithPredicatesProvider")
+public void testEval(TestSpec testSpec) throws Exception {
+JdbcRowDataLookupFunction lookupFunction =
+buildRowDataLookupFunctionWithPredicates(
+testSpec.withFailure, testSpec.resolvedPredicates, 
testSpec.pushdownParams);
+
+ListOutputCollector collector = new ListOutputCollector();
+lookupFunction.setCollector(collector);
+lookupFunction.open(null);
+lookupFunction.eval(testSpec.keys);
+
+if (testSpec.withFailure) {
+// Close connection here, and this will be recovered by retry
+if (lookupFunction.getDbConnection() != null) {
+lookupFunction.getDbConnection().close();
+}
+}
+
+List result =
+new ArrayList<>(collector.getOutputs())
+
.stream().map(RowData::toString).sorted().collect(Collectors.toList());
+Collections.sort(testSpec.expected);
+assertThat(result).isEqualTo(testSpec.expected);
+}
+
+private static class TestSpec {
+
+private boolean withFailure;
+private final List resolvedPredicates;
+private final Serializable[] pushdownParams;
+private final Object[] keys;
+private List expected;
+
+private TestSpec(
+boolean withFailure,
+List resolvedPredicates,
+Serializable[] pushdownParams,
+Object[] keys,
+List expected) {
+this.withFailure = withFailure;
+this.resolvedPredicates = resolvedPredicates;
+this.pushdownParams = pushdownParams;
+this.keys = keys;
+this.expected = expected;
+}
+}
+
+static Collection lookupWithPredicatesProvider() {
+return ImmutableList.builder()
+.addAll(getTestSpecs(true))
+.addAll(getTestSpecs(false))
+.build();
+}
+
+@NotNull
+private static ImmutableList getTestSpecs(boolean withFailure) {
+return ImmutableList.of(
+// var char single filter
+new TestSpec(
+withFailure,
+Arrays.asList(new String[] {"(comment1 = ?)"}),
+new Serializable[] {"11-c1-v1"},
+new Object[] {1, StringData.fromString("1")},
+Arrays.asList(new String[] 
{"+I(1,1,11-c1-v1,11-c2-v1,100.1011,1.1,2.2)"})),

Review Comment:
   sorry my bad - you are right - I have fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]

2024-01-23 Thread via GitHub


snuyanzin commented on code in PR #79:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/79#discussion_r1463158838


##
flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcRowDataLookupFunctionTest.java:
##
@@ -82,6 +103,114 @@ void testLookup(boolean withFailure) throws Exception {
 assertThat(result).isEqualTo(expected);
 }
 
+@ParameterizedTest
+@MethodSource("lookupWithPredicatesProvider")
+void testEval(TestSpec testSpec) throws Exception {
+JdbcRowDataLookupFunction lookupFunction =
+buildRowDataLookupFunctionWithPredicates(
+testSpec.withFailure, testSpec.resolvedPredicates, 
testSpec.pushdownParams);
+
+ListOutputCollector collector = new ListOutputCollector();
+lookupFunction.setCollector(collector);
+lookupFunction.open(null);
+lookupFunction.eval(testSpec.keys);
+
+if (testSpec.withFailure) {
+// Close connection here, and this will be recovered by retry
+if (lookupFunction.getDbConnection() != null) {
+lookupFunction.getDbConnection().close();
+}
+}
+
+List result =
+new ArrayList<>(collector.getOutputs())
+
.stream().map(RowData::toString).sorted().collect(Collectors.toList());
+Collections.sort(testSpec.expected);
+assertThat(result).isEqualTo(testSpec.expected);
+}
+
+private static class TestSpec {

Review Comment:
   This class should have `toString()`
   otherwise it is impossible to navigate between different tests involving 
objects of this class as a parameter



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]

2024-01-23 Thread via GitHub


snuyanzin commented on code in PR #79:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/79#discussion_r1463158838


##
flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcRowDataLookupFunctionTest.java:
##
@@ -82,6 +103,114 @@ void testLookup(boolean withFailure) throws Exception {
 assertThat(result).isEqualTo(expected);
 }
 
+@ParameterizedTest
+@MethodSource("lookupWithPredicatesProvider")
+void testEval(TestSpec testSpec) throws Exception {
+JdbcRowDataLookupFunction lookupFunction =
+buildRowDataLookupFunctionWithPredicates(
+testSpec.withFailure, testSpec.resolvedPredicates, 
testSpec.pushdownParams);
+
+ListOutputCollector collector = new ListOutputCollector();
+lookupFunction.setCollector(collector);
+lookupFunction.open(null);
+lookupFunction.eval(testSpec.keys);
+
+if (testSpec.withFailure) {
+// Close connection here, and this will be recovered by retry
+if (lookupFunction.getDbConnection() != null) {
+lookupFunction.getDbConnection().close();
+}
+}
+
+List result =
+new ArrayList<>(collector.getOutputs())
+
.stream().map(RowData::toString).sorted().collect(Collectors.toList());
+Collections.sort(testSpec.expected);
+assertThat(result).isEqualTo(testSpec.expected);
+}
+
+private static class TestSpec {

Review Comment:
   This class should have `toString()`
   otherwise it is impossible to navigate between different tests involving 
this as a parameter



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]

2024-01-23 Thread via GitHub


snuyanzin commented on code in PR #79:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/79#discussion_r1463156166


##
flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcRowDataLookupFunctionTest.java:
##
@@ -82,6 +103,121 @@ void testLookup(boolean withFailure) throws Exception {
 assertThat(result).isEqualTo(expected);
 }
 
+@ParameterizedTest
+@MethodSource("lookupWithPredicatesProvider")
+public void testEval(TestSpec testSpec) throws Exception {
+JdbcRowDataLookupFunction lookupFunction =
+buildRowDataLookupFunctionWithPredicates(
+testSpec.withFailure, testSpec.resolvedPredicates, 
testSpec.pushdownParams);
+
+ListOutputCollector collector = new ListOutputCollector();
+lookupFunction.setCollector(collector);
+lookupFunction.open(null);
+lookupFunction.eval(testSpec.keys);
+
+if (testSpec.withFailure) {
+// Close connection here, and this will be recovered by retry
+if (lookupFunction.getDbConnection() != null) {
+lookupFunction.getDbConnection().close();
+}
+}
+
+List result =
+new ArrayList<>(collector.getOutputs())
+
.stream().map(RowData::toString).sorted().collect(Collectors.toList());
+Collections.sort(testSpec.expected);
+assertThat(result).isEqualTo(testSpec.expected);
+}
+
+private static class TestSpec {
+
+private boolean withFailure;
+private final List resolvedPredicates;
+private final Serializable[] pushdownParams;
+private final Object[] keys;
+private List expected;
+
+private TestSpec(
+boolean withFailure,
+List resolvedPredicates,
+Serializable[] pushdownParams,
+Object[] keys,
+List expected) {
+this.withFailure = withFailure;
+this.resolvedPredicates = resolvedPredicates;
+this.pushdownParams = pushdownParams;
+this.keys = keys;
+this.expected = expected;
+}
+}
+
+static Collection lookupWithPredicatesProvider() {
+return ImmutableList.builder()
+.addAll(getTestSpecs(true))
+.addAll(getTestSpecs(false))
+.build();
+}
+
+@NotNull
+private static ImmutableList getTestSpecs(boolean withFailure) {
+return ImmutableList.of(
+// var char single filter
+new TestSpec(
+withFailure,
+Arrays.asList(new String[] {"(comment1 = ?)"}),
+new Serializable[] {"11-c1-v1"},
+new Object[] {1, StringData.fromString("1")},
+Arrays.asList(new String[] 
{"+I(1,1,11-c1-v1,11-c2-v1,100.1011,1.1,2.2)"})),
+// decimal single filter
+new TestSpec(
+withFailure,
+Arrays.asList(new String[] {"(decimal_col = ?)"}),
+new Serializable[] {BigDecimal.valueOf(100.1011)},
+new Object[] {1, StringData.fromString("1")},
+Arrays.asList(new String[] 
{"+I(1,1,11-c1-v1,11-c2-v1,100.1011,1.1,2.2)"})),

Review Comment:
   could you please explain where we can find the second value?
   I see only one `"+I(1,1,11-c1-v1,11-c2-v1,100.1011,1.1,2.2)"` contained in 
that list



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]

2024-01-23 Thread via GitHub


davidradl commented on code in PR #79:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/79#discussion_r1463154006


##
flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcRowDataLookupFunctionTest.java:
##
@@ -82,6 +103,121 @@ void testLookup(boolean withFailure) throws Exception {
 assertThat(result).isEqualTo(expected);
 }
 
+@ParameterizedTest
+@MethodSource("lookupWithPredicatesProvider")
+public void testEval(TestSpec testSpec) throws Exception {
+JdbcRowDataLookupFunction lookupFunction =
+buildRowDataLookupFunctionWithPredicates(
+testSpec.withFailure, testSpec.resolvedPredicates, 
testSpec.pushdownParams);
+
+ListOutputCollector collector = new ListOutputCollector();
+lookupFunction.setCollector(collector);
+lookupFunction.open(null);
+lookupFunction.eval(testSpec.keys);
+
+if (testSpec.withFailure) {
+// Close connection here, and this will be recovered by retry
+if (lookupFunction.getDbConnection() != null) {
+lookupFunction.getDbConnection().close();
+}
+}
+
+List result =
+new ArrayList<>(collector.getOutputs())
+
.stream().map(RowData::toString).sorted().collect(Collectors.toList());
+Collections.sort(testSpec.expected);
+assertThat(result).isEqualTo(testSpec.expected);
+}
+
+private static class TestSpec {
+
+private boolean withFailure;
+private final List resolvedPredicates;
+private final Serializable[] pushdownParams;
+private final Object[] keys;
+private List expected;
+
+private TestSpec(
+boolean withFailure,
+List resolvedPredicates,
+Serializable[] pushdownParams,
+Object[] keys,
+List expected) {
+this.withFailure = withFailure;
+this.resolvedPredicates = resolvedPredicates;
+this.pushdownParams = pushdownParams;
+this.keys = keys;
+this.expected = expected;
+}
+}
+
+static Collection lookupWithPredicatesProvider() {
+return ImmutableList.builder()
+.addAll(getTestSpecs(true))
+.addAll(getTestSpecs(false))
+.build();
+}
+
+@NotNull
+private static ImmutableList getTestSpecs(boolean withFailure) {
+return ImmutableList.of(
+// var char single filter
+new TestSpec(
+withFailure,
+Arrays.asList(new String[] {"(comment1 = ?)"}),
+new Serializable[] {"11-c1-v1"},
+new Object[] {1, StringData.fromString("1")},
+Arrays.asList(new String[] 
{"+I(1,1,11-c1-v1,11-c2-v1,100.1011,1.1,2.2)"})),
+// decimal single filter
+new TestSpec(
+withFailure,
+Arrays.asList(new String[] {"(decimal_col = ?)"}),
+new Serializable[] {BigDecimal.valueOf(100.1011)},
+new Object[] {1, StringData.fromString("1")},
+Arrays.asList(new String[] 
{"+I(1,1,11-c1-v1,11-c2-v1,100.1011,1.1,2.2)"})),

Review Comment:
   This is a list of 2 so cannot be used with a singleton list.I have removed 
the unnecessary `new String[] {}` in line with your other feedback.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]

2024-01-23 Thread via GitHub


snuyanzin commented on code in PR #79:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/79#discussion_r1463147700


##
flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcRowDataLookupFunctionTest.java:
##
@@ -82,6 +103,121 @@ void testLookup(boolean withFailure) throws Exception {
 assertThat(result).isEqualTo(expected);
 }
 
+@ParameterizedTest
+@MethodSource("lookupWithPredicatesProvider")
+public void testEval(TestSpec testSpec) throws Exception {
+JdbcRowDataLookupFunction lookupFunction =
+buildRowDataLookupFunctionWithPredicates(
+testSpec.withFailure, testSpec.resolvedPredicates, 
testSpec.pushdownParams);
+
+ListOutputCollector collector = new ListOutputCollector();
+lookupFunction.setCollector(collector);
+lookupFunction.open(null);
+lookupFunction.eval(testSpec.keys);
+
+if (testSpec.withFailure) {
+// Close connection here, and this will be recovered by retry
+if (lookupFunction.getDbConnection() != null) {
+lookupFunction.getDbConnection().close();
+}
+}
+
+List result =
+new ArrayList<>(collector.getOutputs())
+
.stream().map(RowData::toString).sorted().collect(Collectors.toList());
+Collections.sort(testSpec.expected);
+assertThat(result).isEqualTo(testSpec.expected);
+}
+
+private static class TestSpec {
+
+private boolean withFailure;
+private final List resolvedPredicates;
+private final Serializable[] pushdownParams;
+private final Object[] keys;
+private List expected;
+
+private TestSpec(
+boolean withFailure,
+List resolvedPredicates,
+Serializable[] pushdownParams,
+Object[] keys,
+List expected) {
+this.withFailure = withFailure;
+this.resolvedPredicates = resolvedPredicates;
+this.pushdownParams = pushdownParams;
+this.keys = keys;
+this.expected = expected;
+}
+}
+
+static Collection lookupWithPredicatesProvider() {
+return ImmutableList.builder()
+.addAll(getTestSpecs(true))
+.addAll(getTestSpecs(false))
+.build();
+}
+
+@NotNull
+private static ImmutableList getTestSpecs(boolean withFailure) {
+return ImmutableList.of(
+// var char single filter
+new TestSpec(
+withFailure,
+Arrays.asList(new String[] {"(comment1 = ?)"}),
+new Serializable[] {"11-c1-v1"},
+new Object[] {1, StringData.fromString("1")},
+Arrays.asList(new String[] 
{"+I(1,1,11-c1-v1,11-c2-v1,100.1011,1.1,2.2)"})),

Review Comment:
   I see only one value `"+I(1,1,11-c1-v1,11-c2-v1,100.1011,1.1,2.2)"` 
   what is the second?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]

2024-01-23 Thread via GitHub


davidradl commented on code in PR #79:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/79#discussion_r1463137596


##
flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcRowDataLookupFunctionTest.java:
##
@@ -82,6 +103,121 @@ void testLookup(boolean withFailure) throws Exception {
 assertThat(result).isEqualTo(expected);
 }
 
+@ParameterizedTest
+@MethodSource("lookupWithPredicatesProvider")
+public void testEval(TestSpec testSpec) throws Exception {
+JdbcRowDataLookupFunction lookupFunction =
+buildRowDataLookupFunctionWithPredicates(
+testSpec.withFailure, testSpec.resolvedPredicates, 
testSpec.pushdownParams);
+
+ListOutputCollector collector = new ListOutputCollector();
+lookupFunction.setCollector(collector);
+lookupFunction.open(null);
+lookupFunction.eval(testSpec.keys);
+
+if (testSpec.withFailure) {
+// Close connection here, and this will be recovered by retry
+if (lookupFunction.getDbConnection() != null) {
+lookupFunction.getDbConnection().close();
+}
+}
+
+List result =
+new ArrayList<>(collector.getOutputs())
+
.stream().map(RowData::toString).sorted().collect(Collectors.toList());
+Collections.sort(testSpec.expected);
+assertThat(result).isEqualTo(testSpec.expected);
+}
+
+private static class TestSpec {
+
+private boolean withFailure;
+private final List resolvedPredicates;
+private final Serializable[] pushdownParams;
+private final Object[] keys;
+private List expected;
+
+private TestSpec(
+boolean withFailure,
+List resolvedPredicates,
+Serializable[] pushdownParams,
+Object[] keys,
+List expected) {
+this.withFailure = withFailure;
+this.resolvedPredicates = resolvedPredicates;
+this.pushdownParams = pushdownParams;
+this.keys = keys;
+this.expected = expected;
+}
+}
+
+static Collection lookupWithPredicatesProvider() {
+return ImmutableList.builder()
+.addAll(getTestSpecs(true))
+.addAll(getTestSpecs(false))
+.build();
+}
+
+@NotNull
+private static ImmutableList getTestSpecs(boolean withFailure) {
+return ImmutableList.of(
+// var char single filter
+new TestSpec(
+withFailure,
+Arrays.asList(new String[] {"(comment1 = ?)"}),
+new Serializable[] {"11-c1-v1"},
+new Object[] {1, StringData.fromString("1")},
+Arrays.asList(new String[] 
{"+I(1,1,11-c1-v1,11-c2-v1,100.1011,1.1,2.2)"})),

Review Comment:
   This is a list of 2 so cannot be made into a singletonList.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]

2024-01-23 Thread via GitHub


davidradl commented on PR #79:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/79#issuecomment-1905821215

   > I noticed that there addressed only comment about `final` vars in 
`flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcRowDataLookupFunction.java`
   > 
   > why we are talking about different branch btw? Which one is going to be 
merged?
   
   @snuyanzin I am not sure - I will bring @libenchao s branch into this branch 
then address your feedback; so we know where we are. I will leave the changes 
unsquashed until there is a good review from you both.  


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]

2024-01-23 Thread via GitHub


snuyanzin commented on PR #79:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/79#issuecomment-1905810643

   I noticed that there addressed only comment about `final` vars in 
flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcRowDataLookupFunction.java
   
   why we are talking about different branch btw? Which one is going to be 
merged?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]

2024-01-23 Thread via GitHub


davidradl commented on PR #79:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/79#issuecomment-1905803529

   @snuyanzin Can I check - did you see the cosmetic changes in  
[https://github.com/libenchao/flink-connector-jdbc/commits/33365-lookup-join-predicate-pushdown/](https://github.com/libenchao/flink-connector-jdbc/commits/33365-lookup-join-predicate-pushdown/)
 that @libenchao provided - I think some of your feedback is addressed there. 
   
   @snuyanzin If you are ok with @libenchao's changes - we can proceed to 
merge, if not I can merge his commit into my branch and look at addressing your 
additional feedback. 
   
   I assume there is a linter you are both using to spot the missing finals and 
the like, that is not in the build. What linter are you using so I can avoid 
these issues in the future.  
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]

2024-01-23 Thread via GitHub


snuyanzin commented on code in PR #79:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/79#discussion_r1463099347


##
flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcRowDataLookupFunctionTest.java:
##
@@ -82,6 +103,121 @@ void testLookup(boolean withFailure) throws Exception {
 assertThat(result).isEqualTo(expected);
 }
 
+@ParameterizedTest
+@MethodSource("lookupWithPredicatesProvider")
+public void testEval(TestSpec testSpec) throws Exception {
+JdbcRowDataLookupFunction lookupFunction =
+buildRowDataLookupFunctionWithPredicates(
+testSpec.withFailure, testSpec.resolvedPredicates, 
testSpec.pushdownParams);
+
+ListOutputCollector collector = new ListOutputCollector();
+lookupFunction.setCollector(collector);
+lookupFunction.open(null);
+lookupFunction.eval(testSpec.keys);
+
+if (testSpec.withFailure) {
+// Close connection here, and this will be recovered by retry
+if (lookupFunction.getDbConnection() != null) {
+lookupFunction.getDbConnection().close();
+}
+}
+
+List result =
+new ArrayList<>(collector.getOutputs())
+
.stream().map(RowData::toString).sorted().collect(Collectors.toList());
+Collections.sort(testSpec.expected);
+assertThat(result).isEqualTo(testSpec.expected);
+}
+
+private static class TestSpec {
+
+private boolean withFailure;
+private final List resolvedPredicates;
+private final Serializable[] pushdownParams;
+private final Object[] keys;
+private List expected;
+
+private TestSpec(
+boolean withFailure,
+List resolvedPredicates,
+Serializable[] pushdownParams,
+Object[] keys,
+List expected) {
+this.withFailure = withFailure;
+this.resolvedPredicates = resolvedPredicates;
+this.pushdownParams = pushdownParams;
+this.keys = keys;
+this.expected = expected;
+}
+}
+
+static Collection lookupWithPredicatesProvider() {
+return ImmutableList.builder()
+.addAll(getTestSpecs(true))
+.addAll(getTestSpecs(false))
+.build();
+}
+
+@NotNull
+private static ImmutableList getTestSpecs(boolean withFailure) {
+return ImmutableList.of(
+// var char single filter
+new TestSpec(
+withFailure,
+Arrays.asList(new String[] {"(comment1 = ?)"}),
+new Serializable[] {"11-c1-v1"},
+new Object[] {1, StringData.fromString("1")},
+Arrays.asList(new String[] 
{"+I(1,1,11-c1-v1,11-c2-v1,100.1011,1.1,2.2)"})),
+// decimal single filter
+new TestSpec(
+withFailure,
+Arrays.asList(new String[] {"(decimal_col = ?)"}),
+new Serializable[] {BigDecimal.valueOf(100.1011)},
+new Object[] {1, StringData.fromString("1")},
+Arrays.asList(new String[] 
{"+I(1,1,11-c1-v1,11-c2-v1,100.1011,1.1,2.2)"})),
+// real single filter
+new TestSpec(
+withFailure,
+Arrays.asList(new String[] {"(real_col = ?)"}),
+new Serializable[] {2.2},
+new Object[] {1, StringData.fromString("1")},
+Arrays.asList(
+new String[] {
+
"+I(1,1,11-c1-v1,11-c2-v1,100.1011,1.1,2.2)",
+
"+I(1,1,11-c1-v2,11-c2-v2,100.2022,2.2,2.2)"
+})),
+// double single filter
+new TestSpec(
+withFailure,
+Arrays.asList(new String[] {"(double_col = ?)"}),
+new Serializable[] {
+1.1,
+},
+new Object[] {1, StringData.fromString("1")},
+Arrays.asList(new String[] 
{"+I(1,1,11-c1-v1,11-c2-v1,100.1011,1.1,2.2)"})),
+// and
+new TestSpec(
+withFailure,
+Arrays.asList(new String[] {"(real_col = ?) AND 
(double_col = ?)"}),
+new Serializable[] {2.2, 1.1},
+new Object[] {1, StringData.fromString("1")},
+Arrays.asList(new String[] 
{"+I(1,1,11-c1-v1,11-c2-v1,100.1011,1.1,2.2)"})),
+// or
+new TestSpec(
+withFailure,
+  

Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]

2024-01-23 Thread via GitHub


snuyanzin commented on code in PR #79:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/79#discussion_r1463099077


##
flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcRowDataLookupFunctionTest.java:
##
@@ -82,6 +103,121 @@ void testLookup(boolean withFailure) throws Exception {
 assertThat(result).isEqualTo(expected);
 }
 
+@ParameterizedTest
+@MethodSource("lookupWithPredicatesProvider")
+public void testEval(TestSpec testSpec) throws Exception {
+JdbcRowDataLookupFunction lookupFunction =
+buildRowDataLookupFunctionWithPredicates(
+testSpec.withFailure, testSpec.resolvedPredicates, 
testSpec.pushdownParams);
+
+ListOutputCollector collector = new ListOutputCollector();
+lookupFunction.setCollector(collector);
+lookupFunction.open(null);
+lookupFunction.eval(testSpec.keys);
+
+if (testSpec.withFailure) {
+// Close connection here, and this will be recovered by retry
+if (lookupFunction.getDbConnection() != null) {
+lookupFunction.getDbConnection().close();
+}
+}
+
+List result =
+new ArrayList<>(collector.getOutputs())
+
.stream().map(RowData::toString).sorted().collect(Collectors.toList());
+Collections.sort(testSpec.expected);
+assertThat(result).isEqualTo(testSpec.expected);
+}
+
+private static class TestSpec {
+
+private boolean withFailure;
+private final List resolvedPredicates;
+private final Serializable[] pushdownParams;
+private final Object[] keys;
+private List expected;
+
+private TestSpec(
+boolean withFailure,
+List resolvedPredicates,
+Serializable[] pushdownParams,
+Object[] keys,
+List expected) {
+this.withFailure = withFailure;
+this.resolvedPredicates = resolvedPredicates;
+this.pushdownParams = pushdownParams;
+this.keys = keys;
+this.expected = expected;
+}
+}
+
+static Collection lookupWithPredicatesProvider() {
+return ImmutableList.builder()
+.addAll(getTestSpecs(true))
+.addAll(getTestSpecs(false))
+.build();
+}
+
+@NotNull
+private static ImmutableList getTestSpecs(boolean withFailure) {
+return ImmutableList.of(
+// var char single filter
+new TestSpec(
+withFailure,
+Arrays.asList(new String[] {"(comment1 = ?)"}),
+new Serializable[] {"11-c1-v1"},
+new Object[] {1, StringData.fromString("1")},
+Arrays.asList(new String[] 
{"+I(1,1,11-c1-v1,11-c2-v1,100.1011,1.1,2.2)"})),
+// decimal single filter
+new TestSpec(
+withFailure,
+Arrays.asList(new String[] {"(decimal_col = ?)"}),
+new Serializable[] {BigDecimal.valueOf(100.1011)},
+new Object[] {1, StringData.fromString("1")},
+Arrays.asList(new String[] 
{"+I(1,1,11-c1-v1,11-c2-v1,100.1011,1.1,2.2)"})),
+// real single filter
+new TestSpec(
+withFailure,
+Arrays.asList(new String[] {"(real_col = ?)"}),
+new Serializable[] {2.2},
+new Object[] {1, StringData.fromString("1")},
+Arrays.asList(
+new String[] {
+
"+I(1,1,11-c1-v1,11-c2-v1,100.1011,1.1,2.2)",
+
"+I(1,1,11-c1-v2,11-c2-v2,100.2022,2.2,2.2)"
+})),
+// double single filter
+new TestSpec(
+withFailure,
+Arrays.asList(new String[] {"(double_col = ?)"}),
+new Serializable[] {
+1.1,
+},
+new Object[] {1, StringData.fromString("1")},
+Arrays.asList(new String[] 
{"+I(1,1,11-c1-v1,11-c2-v1,100.1011,1.1,2.2)"})),
+// and
+new TestSpec(
+withFailure,
+Arrays.asList(new String[] {"(real_col = ?) AND 
(double_col = ?)"}),
+new Serializable[] {2.2, 1.1},
+new Object[] {1, StringData.fromString("1")},
+Arrays.asList(new String[] 
{"+I(1,1,11-c1-v1,11-c2-v1,100.1011,1.1,2.2)"})),
+// or
+new TestSpec(
+withFailure,
+  

Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]

2024-01-23 Thread via GitHub


snuyanzin commented on code in PR #79:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/79#discussion_r1463098844


##
flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcRowDataLookupFunctionTest.java:
##
@@ -82,6 +103,121 @@ void testLookup(boolean withFailure) throws Exception {
 assertThat(result).isEqualTo(expected);
 }
 
+@ParameterizedTest
+@MethodSource("lookupWithPredicatesProvider")
+public void testEval(TestSpec testSpec) throws Exception {
+JdbcRowDataLookupFunction lookupFunction =
+buildRowDataLookupFunctionWithPredicates(
+testSpec.withFailure, testSpec.resolvedPredicates, 
testSpec.pushdownParams);
+
+ListOutputCollector collector = new ListOutputCollector();
+lookupFunction.setCollector(collector);
+lookupFunction.open(null);
+lookupFunction.eval(testSpec.keys);
+
+if (testSpec.withFailure) {
+// Close connection here, and this will be recovered by retry
+if (lookupFunction.getDbConnection() != null) {
+lookupFunction.getDbConnection().close();
+}
+}
+
+List result =
+new ArrayList<>(collector.getOutputs())
+
.stream().map(RowData::toString).sorted().collect(Collectors.toList());
+Collections.sort(testSpec.expected);
+assertThat(result).isEqualTo(testSpec.expected);
+}
+
+private static class TestSpec {
+
+private boolean withFailure;
+private final List resolvedPredicates;
+private final Serializable[] pushdownParams;
+private final Object[] keys;
+private List expected;
+
+private TestSpec(
+boolean withFailure,
+List resolvedPredicates,
+Serializable[] pushdownParams,
+Object[] keys,
+List expected) {
+this.withFailure = withFailure;
+this.resolvedPredicates = resolvedPredicates;
+this.pushdownParams = pushdownParams;
+this.keys = keys;
+this.expected = expected;
+}
+}
+
+static Collection lookupWithPredicatesProvider() {
+return ImmutableList.builder()
+.addAll(getTestSpecs(true))
+.addAll(getTestSpecs(false))
+.build();
+}
+
+@NotNull
+private static ImmutableList getTestSpecs(boolean withFailure) {
+return ImmutableList.of(
+// var char single filter
+new TestSpec(
+withFailure,
+Arrays.asList(new String[] {"(comment1 = ?)"}),
+new Serializable[] {"11-c1-v1"},
+new Object[] {1, StringData.fromString("1")},
+Arrays.asList(new String[] 
{"+I(1,1,11-c1-v1,11-c2-v1,100.1011,1.1,2.2)"})),
+// decimal single filter
+new TestSpec(
+withFailure,
+Arrays.asList(new String[] {"(decimal_col = ?)"}),
+new Serializable[] {BigDecimal.valueOf(100.1011)},
+new Object[] {1, StringData.fromString("1")},
+Arrays.asList(new String[] 
{"+I(1,1,11-c1-v1,11-c2-v1,100.1011,1.1,2.2)"})),
+// real single filter
+new TestSpec(
+withFailure,
+Arrays.asList(new String[] {"(real_col = ?)"}),
+new Serializable[] {2.2},
+new Object[] {1, StringData.fromString("1")},
+Arrays.asList(
+new String[] {
+
"+I(1,1,11-c1-v1,11-c2-v1,100.1011,1.1,2.2)",
+
"+I(1,1,11-c1-v2,11-c2-v2,100.2022,2.2,2.2)"
+})),
+// double single filter
+new TestSpec(
+withFailure,
+Arrays.asList(new String[] {"(double_col = ?)"}),
+new Serializable[] {
+1.1,
+},
+new Object[] {1, StringData.fromString("1")},
+Arrays.asList(new String[] 
{"+I(1,1,11-c1-v1,11-c2-v1,100.1011,1.1,2.2)"})),
+// and
+new TestSpec(
+withFailure,
+Arrays.asList(new String[] {"(real_col = ?) AND 
(double_col = ?)"}),
+new Serializable[] {2.2, 1.1},
+new Object[] {1, StringData.fromString("1")},
+Arrays.asList(new String[] 
{"+I(1,1,11-c1-v1,11-c2-v1,100.1011,1.1,2.2)"})),

Review Comment:
   ```suggestion
   

Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]

2024-01-23 Thread via GitHub


snuyanzin commented on code in PR #79:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/79#discussion_r1463098549


##
flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcRowDataLookupFunctionTest.java:
##
@@ -82,6 +103,121 @@ void testLookup(boolean withFailure) throws Exception {
 assertThat(result).isEqualTo(expected);
 }
 
+@ParameterizedTest
+@MethodSource("lookupWithPredicatesProvider")
+public void testEval(TestSpec testSpec) throws Exception {
+JdbcRowDataLookupFunction lookupFunction =
+buildRowDataLookupFunctionWithPredicates(
+testSpec.withFailure, testSpec.resolvedPredicates, 
testSpec.pushdownParams);
+
+ListOutputCollector collector = new ListOutputCollector();
+lookupFunction.setCollector(collector);
+lookupFunction.open(null);
+lookupFunction.eval(testSpec.keys);
+
+if (testSpec.withFailure) {
+// Close connection here, and this will be recovered by retry
+if (lookupFunction.getDbConnection() != null) {
+lookupFunction.getDbConnection().close();
+}
+}
+
+List result =
+new ArrayList<>(collector.getOutputs())
+
.stream().map(RowData::toString).sorted().collect(Collectors.toList());
+Collections.sort(testSpec.expected);
+assertThat(result).isEqualTo(testSpec.expected);
+}
+
+private static class TestSpec {
+
+private boolean withFailure;
+private final List resolvedPredicates;
+private final Serializable[] pushdownParams;
+private final Object[] keys;
+private List expected;
+
+private TestSpec(
+boolean withFailure,
+List resolvedPredicates,
+Serializable[] pushdownParams,
+Object[] keys,
+List expected) {
+this.withFailure = withFailure;
+this.resolvedPredicates = resolvedPredicates;
+this.pushdownParams = pushdownParams;
+this.keys = keys;
+this.expected = expected;
+}
+}
+
+static Collection lookupWithPredicatesProvider() {
+return ImmutableList.builder()
+.addAll(getTestSpecs(true))
+.addAll(getTestSpecs(false))
+.build();
+}
+
+@NotNull
+private static ImmutableList getTestSpecs(boolean withFailure) {
+return ImmutableList.of(
+// var char single filter
+new TestSpec(
+withFailure,
+Arrays.asList(new String[] {"(comment1 = ?)"}),
+new Serializable[] {"11-c1-v1"},
+new Object[] {1, StringData.fromString("1")},
+Arrays.asList(new String[] 
{"+I(1,1,11-c1-v1,11-c2-v1,100.1011,1.1,2.2)"})),
+// decimal single filter
+new TestSpec(
+withFailure,
+Arrays.asList(new String[] {"(decimal_col = ?)"}),
+new Serializable[] {BigDecimal.valueOf(100.1011)},
+new Object[] {1, StringData.fromString("1")},
+Arrays.asList(new String[] 
{"+I(1,1,11-c1-v1,11-c2-v1,100.1011,1.1,2.2)"})),
+// real single filter
+new TestSpec(
+withFailure,
+Arrays.asList(new String[] {"(real_col = ?)"}),
+new Serializable[] {2.2},
+new Object[] {1, StringData.fromString("1")},
+Arrays.asList(
+new String[] {
+
"+I(1,1,11-c1-v1,11-c2-v1,100.1011,1.1,2.2)",
+
"+I(1,1,11-c1-v2,11-c2-v2,100.2022,2.2,2.2)"
+})),
+// double single filter
+new TestSpec(
+withFailure,
+Arrays.asList(new String[] {"(double_col = ?)"}),
+new Serializable[] {
+1.1,
+},
+new Object[] {1, StringData.fromString("1")},
+Arrays.asList(new String[] 
{"+I(1,1,11-c1-v1,11-c2-v1,100.1011,1.1,2.2)"})),
+// and
+new TestSpec(
+withFailure,
+Arrays.asList(new String[] {"(real_col = ?) AND 
(double_col = ?)"}),

Review Comment:
   ```suggestion
   Collections.singletonList("(real_col = ?) AND 
(double_col = ?)"),
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: 

Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]

2024-01-23 Thread via GitHub


snuyanzin commented on code in PR #79:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/79#discussion_r1463097912


##
flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcRowDataLookupFunctionTest.java:
##
@@ -82,6 +103,121 @@ void testLookup(boolean withFailure) throws Exception {
 assertThat(result).isEqualTo(expected);
 }
 
+@ParameterizedTest
+@MethodSource("lookupWithPredicatesProvider")
+public void testEval(TestSpec testSpec) throws Exception {
+JdbcRowDataLookupFunction lookupFunction =
+buildRowDataLookupFunctionWithPredicates(
+testSpec.withFailure, testSpec.resolvedPredicates, 
testSpec.pushdownParams);
+
+ListOutputCollector collector = new ListOutputCollector();
+lookupFunction.setCollector(collector);
+lookupFunction.open(null);
+lookupFunction.eval(testSpec.keys);
+
+if (testSpec.withFailure) {
+// Close connection here, and this will be recovered by retry
+if (lookupFunction.getDbConnection() != null) {
+lookupFunction.getDbConnection().close();
+}
+}
+
+List result =
+new ArrayList<>(collector.getOutputs())
+
.stream().map(RowData::toString).sorted().collect(Collectors.toList());
+Collections.sort(testSpec.expected);
+assertThat(result).isEqualTo(testSpec.expected);
+}
+
+private static class TestSpec {
+
+private boolean withFailure;
+private final List resolvedPredicates;
+private final Serializable[] pushdownParams;
+private final Object[] keys;
+private List expected;
+
+private TestSpec(
+boolean withFailure,
+List resolvedPredicates,
+Serializable[] pushdownParams,
+Object[] keys,
+List expected) {
+this.withFailure = withFailure;
+this.resolvedPredicates = resolvedPredicates;
+this.pushdownParams = pushdownParams;
+this.keys = keys;
+this.expected = expected;
+}
+}
+
+static Collection lookupWithPredicatesProvider() {
+return ImmutableList.builder()
+.addAll(getTestSpecs(true))
+.addAll(getTestSpecs(false))
+.build();
+}
+
+@NotNull
+private static ImmutableList getTestSpecs(boolean withFailure) {
+return ImmutableList.of(
+// var char single filter
+new TestSpec(
+withFailure,
+Arrays.asList(new String[] {"(comment1 = ?)"}),
+new Serializable[] {"11-c1-v1"},
+new Object[] {1, StringData.fromString("1")},
+Arrays.asList(new String[] 
{"+I(1,1,11-c1-v1,11-c2-v1,100.1011,1.1,2.2)"})),
+// decimal single filter
+new TestSpec(
+withFailure,
+Arrays.asList(new String[] {"(decimal_col = ?)"}),
+new Serializable[] {BigDecimal.valueOf(100.1011)},
+new Object[] {1, StringData.fromString("1")},
+Arrays.asList(new String[] 
{"+I(1,1,11-c1-v1,11-c2-v1,100.1011,1.1,2.2)"})),
+// real single filter
+new TestSpec(
+withFailure,
+Arrays.asList(new String[] {"(real_col = ?)"}),
+new Serializable[] {2.2},
+new Object[] {1, StringData.fromString("1")},
+Arrays.asList(
+new String[] {
+
"+I(1,1,11-c1-v1,11-c2-v1,100.1011,1.1,2.2)",
+
"+I(1,1,11-c1-v2,11-c2-v2,100.2022,2.2,2.2)"
+})),
+// double single filter
+new TestSpec(
+withFailure,
+Arrays.asList(new String[] {"(double_col = ?)"}),

Review Comment:
   ```suggestion
   Collections.singletonList("(double_col = ?)"),
   ```



##
flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcRowDataLookupFunctionTest.java:
##
@@ -82,6 +103,121 @@ void testLookup(boolean withFailure) throws Exception {
 assertThat(result).isEqualTo(expected);
 }
 
+@ParameterizedTest
+@MethodSource("lookupWithPredicatesProvider")
+public void testEval(TestSpec testSpec) throws Exception {
+JdbcRowDataLookupFunction lookupFunction =
+buildRowDataLookupFunctionWithPredicates(
+testSpec.withFailure, testSpec.resolvedPredicates, 
testSpec.pushdownParams);
+
+ListOutputCollector collector = new ListOutputCollector();

Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]

2024-01-23 Thread via GitHub


snuyanzin commented on code in PR #79:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/79#discussion_r1463097569


##
flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcRowDataLookupFunctionTest.java:
##
@@ -82,6 +103,121 @@ void testLookup(boolean withFailure) throws Exception {
 assertThat(result).isEqualTo(expected);
 }
 
+@ParameterizedTest
+@MethodSource("lookupWithPredicatesProvider")
+public void testEval(TestSpec testSpec) throws Exception {
+JdbcRowDataLookupFunction lookupFunction =
+buildRowDataLookupFunctionWithPredicates(
+testSpec.withFailure, testSpec.resolvedPredicates, 
testSpec.pushdownParams);
+
+ListOutputCollector collector = new ListOutputCollector();
+lookupFunction.setCollector(collector);
+lookupFunction.open(null);
+lookupFunction.eval(testSpec.keys);
+
+if (testSpec.withFailure) {
+// Close connection here, and this will be recovered by retry
+if (lookupFunction.getDbConnection() != null) {
+lookupFunction.getDbConnection().close();
+}
+}
+
+List result =
+new ArrayList<>(collector.getOutputs())
+
.stream().map(RowData::toString).sorted().collect(Collectors.toList());
+Collections.sort(testSpec.expected);
+assertThat(result).isEqualTo(testSpec.expected);
+}
+
+private static class TestSpec {
+
+private boolean withFailure;
+private final List resolvedPredicates;
+private final Serializable[] pushdownParams;
+private final Object[] keys;
+private List expected;
+
+private TestSpec(
+boolean withFailure,
+List resolvedPredicates,
+Serializable[] pushdownParams,
+Object[] keys,
+List expected) {
+this.withFailure = withFailure;
+this.resolvedPredicates = resolvedPredicates;
+this.pushdownParams = pushdownParams;
+this.keys = keys;
+this.expected = expected;
+}
+}
+
+static Collection lookupWithPredicatesProvider() {
+return ImmutableList.builder()
+.addAll(getTestSpecs(true))
+.addAll(getTestSpecs(false))
+.build();
+}
+
+@NotNull
+private static ImmutableList getTestSpecs(boolean withFailure) {
+return ImmutableList.of(
+// var char single filter
+new TestSpec(
+withFailure,
+Arrays.asList(new String[] {"(comment1 = ?)"}),
+new Serializable[] {"11-c1-v1"},
+new Object[] {1, StringData.fromString("1")},
+Arrays.asList(new String[] 
{"+I(1,1,11-c1-v1,11-c2-v1,100.1011,1.1,2.2)"})),
+// decimal single filter
+new TestSpec(
+withFailure,
+Arrays.asList(new String[] {"(decimal_col = ?)"}),
+new Serializable[] {BigDecimal.valueOf(100.1011)},
+new Object[] {1, StringData.fromString("1")},
+Arrays.asList(new String[] 
{"+I(1,1,11-c1-v1,11-c2-v1,100.1011,1.1,2.2)"})),
+// real single filter
+new TestSpec(
+withFailure,
+Arrays.asList(new String[] {"(real_col = ?)"}),
+new Serializable[] {2.2},
+new Object[] {1, StringData.fromString("1")},
+Arrays.asList(
+new String[] {
+
"+I(1,1,11-c1-v1,11-c2-v1,100.1011,1.1,2.2)",
+
"+I(1,1,11-c1-v2,11-c2-v2,100.2022,2.2,2.2)"
+})),

Review Comment:
   ```suggestion
   Arrays.asList(
   "+I(1,1,11-c1-v1,11-c2-v1,100.1011,1.1,2.2)",
   
"+I(1,1,11-c1-v2,11-c2-v2,100.2022,2.2,2.2)")),
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]

2024-01-23 Thread via GitHub


snuyanzin commented on code in PR #79:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/79#discussion_r1463096545


##
flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcRowDataLookupFunctionTest.java:
##
@@ -82,6 +103,121 @@ void testLookup(boolean withFailure) throws Exception {
 assertThat(result).isEqualTo(expected);
 }
 
+@ParameterizedTest
+@MethodSource("lookupWithPredicatesProvider")
+public void testEval(TestSpec testSpec) throws Exception {
+JdbcRowDataLookupFunction lookupFunction =
+buildRowDataLookupFunctionWithPredicates(
+testSpec.withFailure, testSpec.resolvedPredicates, 
testSpec.pushdownParams);
+
+ListOutputCollector collector = new ListOutputCollector();
+lookupFunction.setCollector(collector);
+lookupFunction.open(null);
+lookupFunction.eval(testSpec.keys);
+
+if (testSpec.withFailure) {
+// Close connection here, and this will be recovered by retry
+if (lookupFunction.getDbConnection() != null) {
+lookupFunction.getDbConnection().close();
+}
+}
+
+List result =
+new ArrayList<>(collector.getOutputs())
+
.stream().map(RowData::toString).sorted().collect(Collectors.toList());
+Collections.sort(testSpec.expected);
+assertThat(result).isEqualTo(testSpec.expected);
+}
+
+private static class TestSpec {
+
+private boolean withFailure;
+private final List resolvedPredicates;
+private final Serializable[] pushdownParams;
+private final Object[] keys;
+private List expected;
+
+private TestSpec(
+boolean withFailure,
+List resolvedPredicates,
+Serializable[] pushdownParams,
+Object[] keys,
+List expected) {
+this.withFailure = withFailure;
+this.resolvedPredicates = resolvedPredicates;
+this.pushdownParams = pushdownParams;
+this.keys = keys;
+this.expected = expected;
+}
+}
+
+static Collection lookupWithPredicatesProvider() {
+return ImmutableList.builder()
+.addAll(getTestSpecs(true))
+.addAll(getTestSpecs(false))
+.build();
+}
+
+@NotNull
+private static ImmutableList getTestSpecs(boolean withFailure) {
+return ImmutableList.of(
+// var char single filter
+new TestSpec(
+withFailure,
+Arrays.asList(new String[] {"(comment1 = ?)"}),
+new Serializable[] {"11-c1-v1"},
+new Object[] {1, StringData.fromString("1")},
+Arrays.asList(new String[] 
{"+I(1,1,11-c1-v1,11-c2-v1,100.1011,1.1,2.2)"})),
+// decimal single filter
+new TestSpec(
+withFailure,
+Arrays.asList(new String[] {"(decimal_col = ?)"}),
+new Serializable[] {BigDecimal.valueOf(100.1011)},
+new Object[] {1, StringData.fromString("1")},
+Arrays.asList(new String[] 
{"+I(1,1,11-c1-v1,11-c2-v1,100.1011,1.1,2.2)"})),
+// real single filter
+new TestSpec(
+withFailure,
+Arrays.asList(new String[] {"(real_col = ?)"}),

Review Comment:
   ```suggestion
   Collections.singletonList("(real_col = ?)"),
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]

2024-01-23 Thread via GitHub


snuyanzin commented on code in PR #79:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/79#discussion_r1463096287


##
flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcRowDataLookupFunctionTest.java:
##
@@ -82,6 +103,121 @@ void testLookup(boolean withFailure) throws Exception {
 assertThat(result).isEqualTo(expected);
 }
 
+@ParameterizedTest
+@MethodSource("lookupWithPredicatesProvider")
+public void testEval(TestSpec testSpec) throws Exception {
+JdbcRowDataLookupFunction lookupFunction =
+buildRowDataLookupFunctionWithPredicates(
+testSpec.withFailure, testSpec.resolvedPredicates, 
testSpec.pushdownParams);
+
+ListOutputCollector collector = new ListOutputCollector();
+lookupFunction.setCollector(collector);
+lookupFunction.open(null);
+lookupFunction.eval(testSpec.keys);
+
+if (testSpec.withFailure) {
+// Close connection here, and this will be recovered by retry
+if (lookupFunction.getDbConnection() != null) {
+lookupFunction.getDbConnection().close();
+}
+}
+
+List result =
+new ArrayList<>(collector.getOutputs())
+
.stream().map(RowData::toString).sorted().collect(Collectors.toList());
+Collections.sort(testSpec.expected);
+assertThat(result).isEqualTo(testSpec.expected);
+}
+
+private static class TestSpec {
+
+private boolean withFailure;
+private final List resolvedPredicates;
+private final Serializable[] pushdownParams;
+private final Object[] keys;
+private List expected;
+
+private TestSpec(
+boolean withFailure,
+List resolvedPredicates,
+Serializable[] pushdownParams,
+Object[] keys,
+List expected) {
+this.withFailure = withFailure;
+this.resolvedPredicates = resolvedPredicates;
+this.pushdownParams = pushdownParams;
+this.keys = keys;
+this.expected = expected;
+}
+}
+
+static Collection lookupWithPredicatesProvider() {
+return ImmutableList.builder()
+.addAll(getTestSpecs(true))
+.addAll(getTestSpecs(false))
+.build();
+}
+
+@NotNull
+private static ImmutableList getTestSpecs(boolean withFailure) {
+return ImmutableList.of(
+// var char single filter
+new TestSpec(
+withFailure,
+Arrays.asList(new String[] {"(comment1 = ?)"}),
+new Serializable[] {"11-c1-v1"},
+new Object[] {1, StringData.fromString("1")},
+Arrays.asList(new String[] 
{"+I(1,1,11-c1-v1,11-c2-v1,100.1011,1.1,2.2)"})),
+// decimal single filter
+new TestSpec(
+withFailure,
+Arrays.asList(new String[] {"(decimal_col = ?)"}),
+new Serializable[] {BigDecimal.valueOf(100.1011)},
+new Object[] {1, StringData.fromString("1")},
+Arrays.asList(new String[] 
{"+I(1,1,11-c1-v1,11-c2-v1,100.1011,1.1,2.2)"})),

Review Comment:
   ```suggestion
   
Collections.singletonList("+I(1,1,11-c1-v1,11-c2-v1,100.1011,1.1,2.2)")),
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]

2024-01-23 Thread via GitHub


snuyanzin commented on code in PR #79:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/79#discussion_r1463096013


##
flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcRowDataLookupFunctionTest.java:
##
@@ -82,6 +103,121 @@ void testLookup(boolean withFailure) throws Exception {
 assertThat(result).isEqualTo(expected);
 }
 
+@ParameterizedTest
+@MethodSource("lookupWithPredicatesProvider")
+public void testEval(TestSpec testSpec) throws Exception {
+JdbcRowDataLookupFunction lookupFunction =
+buildRowDataLookupFunctionWithPredicates(
+testSpec.withFailure, testSpec.resolvedPredicates, 
testSpec.pushdownParams);
+
+ListOutputCollector collector = new ListOutputCollector();
+lookupFunction.setCollector(collector);
+lookupFunction.open(null);
+lookupFunction.eval(testSpec.keys);
+
+if (testSpec.withFailure) {
+// Close connection here, and this will be recovered by retry
+if (lookupFunction.getDbConnection() != null) {
+lookupFunction.getDbConnection().close();
+}
+}
+
+List result =
+new ArrayList<>(collector.getOutputs())
+
.stream().map(RowData::toString).sorted().collect(Collectors.toList());
+Collections.sort(testSpec.expected);
+assertThat(result).isEqualTo(testSpec.expected);
+}
+
+private static class TestSpec {
+
+private boolean withFailure;
+private final List resolvedPredicates;
+private final Serializable[] pushdownParams;
+private final Object[] keys;
+private List expected;
+
+private TestSpec(
+boolean withFailure,
+List resolvedPredicates,
+Serializable[] pushdownParams,
+Object[] keys,
+List expected) {
+this.withFailure = withFailure;
+this.resolvedPredicates = resolvedPredicates;
+this.pushdownParams = pushdownParams;
+this.keys = keys;
+this.expected = expected;
+}
+}
+
+static Collection lookupWithPredicatesProvider() {
+return ImmutableList.builder()
+.addAll(getTestSpecs(true))
+.addAll(getTestSpecs(false))
+.build();
+}
+
+@NotNull
+private static ImmutableList getTestSpecs(boolean withFailure) {
+return ImmutableList.of(
+// var char single filter
+new TestSpec(
+withFailure,
+Arrays.asList(new String[] {"(comment1 = ?)"}),
+new Serializable[] {"11-c1-v1"},
+new Object[] {1, StringData.fromString("1")},
+Arrays.asList(new String[] 
{"+I(1,1,11-c1-v1,11-c2-v1,100.1011,1.1,2.2)"})),
+// decimal single filter
+new TestSpec(
+withFailure,
+Arrays.asList(new String[] {"(decimal_col = ?)"}),

Review Comment:
   ```suggestion
   Collections.singletonList("(decimal_col = ?)"),
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]

2024-01-23 Thread via GitHub


snuyanzin commented on code in PR #79:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/79#discussion_r1463095416


##
flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcRowDataLookupFunctionTest.java:
##
@@ -82,6 +103,121 @@ void testLookup(boolean withFailure) throws Exception {
 assertThat(result).isEqualTo(expected);
 }
 
+@ParameterizedTest
+@MethodSource("lookupWithPredicatesProvider")
+public void testEval(TestSpec testSpec) throws Exception {
+JdbcRowDataLookupFunction lookupFunction =
+buildRowDataLookupFunctionWithPredicates(
+testSpec.withFailure, testSpec.resolvedPredicates, 
testSpec.pushdownParams);
+
+ListOutputCollector collector = new ListOutputCollector();
+lookupFunction.setCollector(collector);
+lookupFunction.open(null);
+lookupFunction.eval(testSpec.keys);
+
+if (testSpec.withFailure) {
+// Close connection here, and this will be recovered by retry
+if (lookupFunction.getDbConnection() != null) {
+lookupFunction.getDbConnection().close();
+}
+}
+
+List result =
+new ArrayList<>(collector.getOutputs())
+
.stream().map(RowData::toString).sorted().collect(Collectors.toList());
+Collections.sort(testSpec.expected);
+assertThat(result).isEqualTo(testSpec.expected);
+}
+
+private static class TestSpec {
+
+private boolean withFailure;
+private final List resolvedPredicates;
+private final Serializable[] pushdownParams;
+private final Object[] keys;
+private List expected;
+
+private TestSpec(
+boolean withFailure,
+List resolvedPredicates,
+Serializable[] pushdownParams,
+Object[] keys,
+List expected) {
+this.withFailure = withFailure;
+this.resolvedPredicates = resolvedPredicates;
+this.pushdownParams = pushdownParams;
+this.keys = keys;
+this.expected = expected;
+}
+}
+
+static Collection lookupWithPredicatesProvider() {
+return ImmutableList.builder()
+.addAll(getTestSpecs(true))
+.addAll(getTestSpecs(false))
+.build();
+}
+
+@NotNull
+private static ImmutableList getTestSpecs(boolean withFailure) {
+return ImmutableList.of(
+// var char single filter
+new TestSpec(
+withFailure,
+Arrays.asList(new String[] {"(comment1 = ?)"}),
+new Serializable[] {"11-c1-v1"},
+new Object[] {1, StringData.fromString("1")},
+Arrays.asList(new String[] 
{"+I(1,1,11-c1-v1,11-c2-v1,100.1011,1.1,2.2)"})),

Review Comment:
   ```suggestion
  
Collections.singletonList("+I(1,1,11-c1-v1,11-c2-v1,100.1011,1.1,2.2)")),
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]

2024-01-23 Thread via GitHub


snuyanzin commented on code in PR #79:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/79#discussion_r1463094604


##
flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcRowDataLookupFunctionTest.java:
##
@@ -82,6 +103,121 @@ void testLookup(boolean withFailure) throws Exception {
 assertThat(result).isEqualTo(expected);
 }
 
+@ParameterizedTest
+@MethodSource("lookupWithPredicatesProvider")
+public void testEval(TestSpec testSpec) throws Exception {
+JdbcRowDataLookupFunction lookupFunction =
+buildRowDataLookupFunctionWithPredicates(
+testSpec.withFailure, testSpec.resolvedPredicates, 
testSpec.pushdownParams);
+
+ListOutputCollector collector = new ListOutputCollector();
+lookupFunction.setCollector(collector);
+lookupFunction.open(null);
+lookupFunction.eval(testSpec.keys);
+
+if (testSpec.withFailure) {
+// Close connection here, and this will be recovered by retry
+if (lookupFunction.getDbConnection() != null) {
+lookupFunction.getDbConnection().close();
+}
+}
+
+List result =
+new ArrayList<>(collector.getOutputs())
+
.stream().map(RowData::toString).sorted().collect(Collectors.toList());
+Collections.sort(testSpec.expected);
+assertThat(result).isEqualTo(testSpec.expected);
+}
+
+private static class TestSpec {
+
+private boolean withFailure;
+private final List resolvedPredicates;
+private final Serializable[] pushdownParams;
+private final Object[] keys;
+private List expected;
+
+private TestSpec(
+boolean withFailure,
+List resolvedPredicates,
+Serializable[] pushdownParams,
+Object[] keys,
+List expected) {
+this.withFailure = withFailure;
+this.resolvedPredicates = resolvedPredicates;
+this.pushdownParams = pushdownParams;
+this.keys = keys;
+this.expected = expected;
+}
+}
+
+static Collection lookupWithPredicatesProvider() {
+return ImmutableList.builder()
+.addAll(getTestSpecs(true))
+.addAll(getTestSpecs(false))
+.build();
+}
+
+@NotNull
+private static ImmutableList getTestSpecs(boolean withFailure) {
+return ImmutableList.of(
+// var char single filter
+new TestSpec(
+withFailure,
+Arrays.asList(new String[] {"(comment1 = ?)"}),

Review Comment:
   ```suggestion
   Collections.singletonList("(comment1 = ?)"),
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]

2024-01-23 Thread via GitHub


snuyanzin commented on code in PR #79:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/79#discussion_r1463084570


##
flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcRowDataLookupFunction.java:
##
@@ -167,7 +185,13 @@ public Collection lookup(RowData keyRow) {
 
 private void establishConnectionAndStatement() throws SQLException, 
ClassNotFoundException {
 Connection dbConn = connectionProvider.getOrEstablishConnection();
-statement = FieldNamedPreparedStatement.prepareStatement(dbConn, 
query, keyNames);
+String additionalPredicates = "";
+if (resolvedPredicates.size() > 0) {

Review Comment:
   ```suggestion
   if (!resolvedPredicates.isEmpty()) {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]

2024-01-23 Thread via GitHub


snuyanzin commented on code in PR #79:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/79#discussion_r1463082736


##
flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcRowDataLookupFunction.java:
##
@@ -56,13 +57,16 @@ public class JdbcRowDataLookupFunction extends 
LookupFunction {
 private static final Logger LOG = 
LoggerFactory.getLogger(JdbcRowDataLookupFunction.class);
 private static final long serialVersionUID = 2L;
 
-private final String query;
+private String query;
 private final JdbcConnectionProvider connectionProvider;
-private final String[] keyNames;
+private String[] keyNames;
 private final int maxRetryTimes;
 private final JdbcRowConverter jdbcRowConverter;
 private final JdbcRowConverter lookupKeyRowConverter;
 
+private List resolvedPredicates = new ArrayList<>();
+private Serializable[] pushdownParams = new Serializable[0];

Review Comment:
   We do not need this initialization here since it is not used anywhere and 
then is going to be overwritten in a constructor.
   After that both vars could be made final



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]

2024-01-23 Thread via GitHub


snuyanzin commented on code in PR #79:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/79#discussion_r1463077682


##
flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcRowDataLookupFunction.java:
##
@@ -71,7 +75,9 @@ public JdbcRowDataLookupFunction(
 String[] fieldNames,
 DataType[] fieldTypes,
 String[] keyNames,
-RowType rowType) {
+RowType rowType,

Review Comment:
   This looks strange: I wonder why do we need this if it is not used and at 
the same time we still `fieldNames` and `fieldTypes` containing same info



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]

2024-01-23 Thread via GitHub


snuyanzin commented on code in PR #79:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/79#discussion_r1463067422


##
flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcTablePlanTest.java:
##
@@ -70,6 +98,51 @@ public void testFilterPushdown() {
 "SELECT id, time_col, real_col FROM jdbc WHERE id = 91 AND 
time_col <> TIME '11:11:11' OR double_col >= -1000.23");
 }
 
+/**
+ * Note the join condition is not present in the optimized plan, as it is 
handled in the JDBC
+ * java code, where it adds the join conditions to the select statement 
string.
+ */
+@Test
+public void testLookupJoin() {
+util.verifyExecPlan(
+"SELECT * FROM a LEFT JOIN d FOR SYSTEM_TIME AS OF a.proctime 
ON a.ip = d.ip");
+}
+
+@Test
+public void testLookupJoinWithFilter() {
+util.verifyExecPlan(
+"SELECT * FROM a LEFT JOIN d FOR SYSTEM_TIME AS OF a.proctime 
ON d.type = 0 AND a.ip = d.ip");
+}
+
+@Test
+public void testLookupJoinWithANDAndORFilter() {
+util.verifyExecPlan(
+"SELECT * FROM a LEFT JOIN d FOR SYSTEM_TIME AS OF a.proctime 
ON ((d.age = 50 AND d.type = 0) "
++ "OR (d.type = 1 AND d.age = 40)) AND a.ip = d.ip");
+}
+
+@Test
+public void testLookupJoinWith2ANDsAndORFilter() {

Review Comment:
   ```suggestion
   void testLookupJoinWith2ANDsAndORFilter() {
   ```



##
flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcTablePlanTest.java:
##
@@ -70,6 +98,51 @@ public void testFilterPushdown() {
 "SELECT id, time_col, real_col FROM jdbc WHERE id = 91 AND 
time_col <> TIME '11:11:11' OR double_col >= -1000.23");
 }
 
+/**
+ * Note the join condition is not present in the optimized plan, as it is 
handled in the JDBC
+ * java code, where it adds the join conditions to the select statement 
string.
+ */
+@Test
+public void testLookupJoin() {
+util.verifyExecPlan(
+"SELECT * FROM a LEFT JOIN d FOR SYSTEM_TIME AS OF a.proctime 
ON a.ip = d.ip");
+}
+
+@Test
+public void testLookupJoinWithFilter() {
+util.verifyExecPlan(
+"SELECT * FROM a LEFT JOIN d FOR SYSTEM_TIME AS OF a.proctime 
ON d.type = 0 AND a.ip = d.ip");
+}
+
+@Test
+public void testLookupJoinWithANDAndORFilter() {
+util.verifyExecPlan(
+"SELECT * FROM a LEFT JOIN d FOR SYSTEM_TIME AS OF a.proctime 
ON ((d.age = 50 AND d.type = 0) "
++ "OR (d.type = 1 AND d.age = 40)) AND a.ip = d.ip");
+}
+
+@Test
+public void testLookupJoinWith2ANDsAndORFilter() {
+util.verifyExecPlan(
+"SELECT * FROM a JOIN d FOR SYSTEM_TIME AS OF a.proctime "
++ "ON ((50 > d.age AND d.type = 1 AND d.age > 0 ) "
++ "OR (70 > d.age AND d.type = 6 AND d.age > 10)) AND 
a.ip = d.ip");
+}
+
+@Test
+public void testLookupJoinWithORFilter() {

Review Comment:
   ```suggestion
   void testLookupJoinWithORFilter() {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]

2024-01-23 Thread via GitHub


snuyanzin commented on code in PR #79:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/79#discussion_r1463067119


##
flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcTablePlanTest.java:
##
@@ -70,6 +98,51 @@ public void testFilterPushdown() {
 "SELECT id, time_col, real_col FROM jdbc WHERE id = 91 AND 
time_col <> TIME '11:11:11' OR double_col >= -1000.23");
 }
 
+/**
+ * Note the join condition is not present in the optimized plan, as it is 
handled in the JDBC
+ * java code, where it adds the join conditions to the select statement 
string.
+ */
+@Test
+public void testLookupJoin() {
+util.verifyExecPlan(
+"SELECT * FROM a LEFT JOIN d FOR SYSTEM_TIME AS OF a.proctime 
ON a.ip = d.ip");
+}
+
+@Test
+public void testLookupJoinWithFilter() {

Review Comment:
   ```suggestion
   void testLookupJoinWithFilter() {
   ```



##
flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcTablePlanTest.java:
##
@@ -70,6 +98,51 @@ public void testFilterPushdown() {
 "SELECT id, time_col, real_col FROM jdbc WHERE id = 91 AND 
time_col <> TIME '11:11:11' OR double_col >= -1000.23");
 }
 
+/**
+ * Note the join condition is not present in the optimized plan, as it is 
handled in the JDBC
+ * java code, where it adds the join conditions to the select statement 
string.
+ */
+@Test
+public void testLookupJoin() {
+util.verifyExecPlan(
+"SELECT * FROM a LEFT JOIN d FOR SYSTEM_TIME AS OF a.proctime 
ON a.ip = d.ip");
+}
+
+@Test
+public void testLookupJoinWithFilter() {
+util.verifyExecPlan(
+"SELECT * FROM a LEFT JOIN d FOR SYSTEM_TIME AS OF a.proctime 
ON d.type = 0 AND a.ip = d.ip");
+}
+
+@Test
+public void testLookupJoinWithANDAndORFilter() {

Review Comment:
   ```suggestion
   void testLookupJoinWithANDAndORFilter() {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]

2024-01-23 Thread via GitHub


snuyanzin commented on code in PR #79:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/79#discussion_r1463067787


##
flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcTablePlanTest.java:
##
@@ -70,6 +98,51 @@ public void testFilterPushdown() {
 "SELECT id, time_col, real_col FROM jdbc WHERE id = 91 AND 
time_col <> TIME '11:11:11' OR double_col >= -1000.23");
 }
 
+/**
+ * Note the join condition is not present in the optimized plan, as it is 
handled in the JDBC
+ * java code, where it adds the join conditions to the select statement 
string.
+ */
+@Test
+public void testLookupJoin() {
+util.verifyExecPlan(
+"SELECT * FROM a LEFT JOIN d FOR SYSTEM_TIME AS OF a.proctime 
ON a.ip = d.ip");
+}
+
+@Test
+public void testLookupJoinWithFilter() {
+util.verifyExecPlan(
+"SELECT * FROM a LEFT JOIN d FOR SYSTEM_TIME AS OF a.proctime 
ON d.type = 0 AND a.ip = d.ip");
+}
+
+@Test
+public void testLookupJoinWithANDAndORFilter() {
+util.verifyExecPlan(
+"SELECT * FROM a LEFT JOIN d FOR SYSTEM_TIME AS OF a.proctime 
ON ((d.age = 50 AND d.type = 0) "
++ "OR (d.type = 1 AND d.age = 40)) AND a.ip = d.ip");
+}
+
+@Test
+public void testLookupJoinWith2ANDsAndORFilter() {
+util.verifyExecPlan(
+"SELECT * FROM a JOIN d FOR SYSTEM_TIME AS OF a.proctime "
++ "ON ((50 > d.age AND d.type = 1 AND d.age > 0 ) "
++ "OR (70 > d.age AND d.type = 6 AND d.age > 10)) AND 
a.ip = d.ip");
+}
+
+@Test
+public void testLookupJoinWithORFilter() {
+util.verifyExecPlan(
+"SELECT * FROM a LEFT JOIN d FOR SYSTEM_TIME AS OF a.proctime 
ON (d.age = 50 OR d.type = 1) AND a.ip = d.ip");
+}
+
+@Test
+public void testLookupJoinWithWeirdColumnNames() {

Review Comment:
   ```suggestion
   void testLookupJoinWithWeirdColumnNames() {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]

2024-01-23 Thread via GitHub


snuyanzin commented on code in PR #79:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/79#discussion_r1463066443


##
flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcTablePlanTest.java:
##
@@ -70,6 +98,51 @@ public void testFilterPushdown() {
 "SELECT id, time_col, real_col FROM jdbc WHERE id = 91 AND 
time_col <> TIME '11:11:11' OR double_col >= -1000.23");
 }
 
+/**
+ * Note the join condition is not present in the optimized plan, as it is 
handled in the JDBC
+ * java code, where it adds the join conditions to the select statement 
string.
+ */
+@Test
+public void testLookupJoin() {

Review Comment:
   ```suggestion
   void testLookupJoin() {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]

2024-01-23 Thread via GitHub


snuyanzin commented on code in PR #79:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/79#discussion_r1463064026


##
flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcRowDataLookupFunctionTest.java:
##
@@ -82,6 +103,121 @@ void testLookup(boolean withFailure) throws Exception {
 assertThat(result).isEqualTo(expected);
 }
 
+@ParameterizedTest
+@MethodSource("lookupWithPredicatesProvider")
+public void testEval(TestSpec testSpec) throws Exception {

Review Comment:
   ```suggestion
   void testEval(TestSpec testSpec) throws Exception {
   ```
   no need for `public` since it is junit5



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]

2024-01-23 Thread via GitHub


snuyanzin commented on code in PR #79:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/79#discussion_r1463057026


##
flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcRowDataLookupFunction.java:
##
@@ -56,13 +57,16 @@ public class JdbcRowDataLookupFunction extends 
LookupFunction {
 private static final Logger LOG = 
LoggerFactory.getLogger(JdbcRowDataLookupFunction.class);
 private static final long serialVersionUID = 2L;
 
-private final String query;
+private String query;
 private final JdbcConnectionProvider connectionProvider;
-private final String[] keyNames;
+private String[] keyNames;

Review Comment:
   nit
   ```suggestion
   private final String[] keyNames;
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]

2024-01-23 Thread via GitHub


snuyanzin commented on code in PR #79:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/79#discussion_r1463056711


##
flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcRowDataLookupFunction.java:
##
@@ -56,13 +57,16 @@ public class JdbcRowDataLookupFunction extends 
LookupFunction {
 private static final Logger LOG = 
LoggerFactory.getLogger(JdbcRowDataLookupFunction.class);
 private static final long serialVersionUID = 2L;
 
-private final String query;
+private String query;

Review Comment:
   nit: 
   ```suggestion
   private final String query;
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]

2024-01-23 Thread via GitHub


davidradl commented on PR #79:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/79#issuecomment-1905665607

   > @davidradl
   > 
   > > * I don't think we can have 0  key names for a lookup join. Have you a 
test case that would drive this ?
   > 
   > There is no such restriction in semantic (maybe somewhere has a checker 
logic to disallow this, maybe not), anyway, it doesn't affect here, 
semantically we can handle it when there is no keys.
   > 
   > > * the ? test. We can leave this in , but it allows column names with ?s 
in - which seems like a good thing. We could do this in a separate issue if you 
want.
   > 
   > Yes, this is an orthogonal issue to current one.
   
   @libenchao  ok lets merge :-)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]

2024-01-23 Thread via GitHub


libenchao commented on PR #79:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/79#issuecomment-1905659276

   @davidradl 
   
   > * I don't think we can have 0  key names for a lookup join. Have you a 
test case that would drive this ?
   
   There is no such restriction in semantic (maybe somewhere has a checker 
logic to disallow this, maybe not), anyway, it doesn't affect here, 
semantically we can handle it when there is no keys.
   
   > * the ? test. We can leave this in , but it allows column names with ?s in 
- which seems like a good thing. We could do this in a separate issue if you 
want.
   
   Yes, this is an orthogonal issue to current one.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]

2024-01-23 Thread via GitHub


davidradl commented on PR #79:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/79#issuecomment-1905644311

   > > @libenchao @snuyanzin I have updated the code in line with your feedback 
- please could you review again. Many thanks
   > 
   > @davidradl One small suggestion, please do not squash and force-push 
before you must do it (conflicts with main branch, or the reviewer has done the 
review), else the reviewer is hard to track incremental changes.
   
   @libenchao makes sense. Thanks for the suggestion.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]

2024-01-23 Thread via GitHub


davidradl commented on PR #79:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/79#issuecomment-1905641356

   > @davidradl @snuyanzin I reviewed, and added a few cosmetic changes in 
[commit](https://github.com/libenchao/flink-connector-jdbc/commit/24caa99cd97e86afb1836886d51007775ed95ddc),
 others looks good to me, please take a look. If you agree on this version, 
I'll proceed with merging.
   > 
   > 
https://github.com/libenchao/flink-connector-jdbc/commits/33365-lookup-join-predicate-pushdown/
   
   @libenchao than you very much for your help on this.
   I am happy with the cosmetic fixups . I had 2 comments I added to the code. 
   - I don't think we can have 0  key names for a lookup join. Have you a test 
case that would drive this ?
   - the ? test. We can leave this in , but it allows column names with ?s in - 
which seems like a good thing. We could do this in a separate issue if you 
want. 
   
   If we can get to a consensus on these, I am happy for it to merged. 
   

   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]

2024-01-22 Thread via GitHub


libenchao commented on PR #79:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/79#issuecomment-1905255051

   @davidradl @snuyanzin I reviewed, and added a few cosmetic changes in 
[commit](https://github.com/libenchao/flink-connector-jdbc/commit/24caa99cd97e86afb1836886d51007775ed95ddc),
 others looks good to me, please take a look. If you agree on this version, 
I'll proceed with merging.
   
   
https://github.com/libenchao/flink-connector-jdbc/commits/33365-lookup-join-predicate-pushdown/


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]

2024-01-22 Thread via GitHub


libenchao commented on PR #79:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/79#issuecomment-1905174734

   > @libenchao @snuyanzin I have updated the code in line with your feedback - 
please could you review again. Many thanks
   
   @davidradl  One small suggestion, please do not squash and force-push before 
you must do it (conflicts with main branch, or the reviewer has done the 
review), else the reviewer is hard to track incremental changes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]

2024-01-22 Thread via GitHub


davidradl commented on PR #79:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/79#issuecomment-1904779201

   @libenchao @snuyanzin I have updated the code in line with your feedback - 
please could you review again. Many thanks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]

2024-01-22 Thread via GitHub


davidradl commented on code in PR #79:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/79#discussion_r1462396444


##
flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcRowDataLookupFunction.java:
##
@@ -63,6 +64,9 @@ public class JdbcRowDataLookupFunction extends LookupFunction 
{
 private final JdbcRowConverter jdbcRowConverter;
 private final JdbcRowConverter lookupKeyRowConverter;
 
+private List resolvedPredicates = new ArrayList<>();
+private Serializable[] pushdownParams = new Serializable[0];

Review Comment:
   I have change the code since this comment, now so these variables are 
assigned so cannot be made final. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]

2024-01-22 Thread via GitHub


davidradl commented on code in PR #79:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/79#discussion_r1462392709


##
flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcRowDataLookupFunction.java:
##
@@ -116,6 +124,15 @@ public void open(FunctionContext context) throws Exception 
{
 }
 }
 
+private FieldNamedPreparedStatement 
setPredicateParams(FieldNamedPreparedStatement statement)
+throws SQLException {
+for (int i = 0; i < pushdownParams.length; ++i) {

Review Comment:
   I have added more  tests to the eval method which does the scaffolding 
already. I hope this is sufficient. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]

2024-01-22 Thread via GitHub


davidradl opened a new pull request, #79:
URL: https://github.com/apache/flink-connector-jdbc/pull/79

   Allow lookup joins to honour supplied filters, where the lookup source is 
JDBC. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]

2024-01-22 Thread via GitHub


davidradl closed pull request #79: [FLINK-33365] include filters with Lookup 
joins
URL: https://github.com/apache/flink-connector-jdbc/pull/79


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]

2024-01-21 Thread via GitHub


libenchao commented on code in PR #79:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/79#discussion_r1461286671


##
flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcTablePlanTest.java:
##
@@ -70,6 +98,51 @@ public void testFilterPushdown() {
 "SELECT id, time_col, real_col FROM jdbc WHERE id = 91 AND 
time_col <> TIME '11:11:11' OR double_col >= -1000.23");
 }
 
+/**
+ * Note the join condition is not present in the optimized plan, as it is 
handled in the JDBC

Review Comment:
   > just for my understanding ar we thinking that the rowdata sent in on the 
lookup would contain the pushdown predicates so the code could call and this 
would handle the predicates and the keys:
   >  statement = lookupKeyRowConverter.toExternal(keyRow, statement);
   > we could then remove the need for?
   > statement = setPredicateParams(statement); 
   
   What I was proposing is showing the predicates in the digest of lookup join 
node. That way, we can see it in the test xml files, and also it can be shown 
in Flink Web UI.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]

2024-01-20 Thread via GitHub


davidradl commented on code in PR #79:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/79#discussion_r1460466435


##
flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcRowDataLookupFunction.java:
##
@@ -116,6 +124,15 @@ public void open(FunctionContext context) throws Exception 
{
 }
 }
 
+private FieldNamedPreparedStatement 
setPredicateParams(FieldNamedPreparedStatement statement)
+throws SQLException {
+for (int i = 0; i < pushdownParams.length; ++i) {

Review Comment:
   @snuyanzin I am not finding a way to scaffold a unit test around this method 
and prove the cases work. It looks like I need to prepare the statement and 
then populate rowdata and then run through the converter before I can setup the 
statement required for this method. I will continue to investigate. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]

2024-01-20 Thread via GitHub


davidradl commented on code in PR #79:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/79#discussion_r1460463528


##
flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcTablePlanTest.java:
##
@@ -70,6 +98,51 @@ public void testFilterPushdown() {
 "SELECT id, time_col, real_col FROM jdbc WHERE id = 91 AND 
time_col <> TIME '11:11:11' OR double_col >= -1000.23");
 }
 
+/**
+ * Note the join condition is not present in the optimized plan, as it is 
handled in the JDBC

Review Comment:
   @libenchao  just for my understanding ar we thinking that the rowdata sent 
in on the lookup would contain the pushdown predicates so the code could call 
and this would handle the predicates and the keys:
   ` statement = lookupKeyRowConverter.toExternal(keyRow, statement);`
   we could then remove the need for?
   `statement = setPredicateParams(statement); `
   
   Would you be willing to assign me (and provide minimal guidance if required) 
FLINK-34170 and review / merge any associated PR? 
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]

2024-01-19 Thread via GitHub


davidradl commented on code in PR #79:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/79#discussion_r1459375955


##
flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcTablePlanTest.java:
##
@@ -70,6 +98,51 @@ public void testFilterPushdown() {
 "SELECT id, time_col, real_col FROM jdbc WHERE id = 91 AND 
time_col <> TIME '11:11:11' OR double_col >= -1000.23");
 }
 
+/**
+ * Note the join condition is not present in the optimized plan, as it is 
handled in the JDBC

Review Comment:
   > change to the core Flink to include the look up join conditions in the 
optimised plan
   
   I have raised issue 
[https://issues.apache.org/jira/browse/FLINK-34170](https://issues.apache.org/jira/browse/FLINK-34170)
 .  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]

2024-01-19 Thread via GitHub


davidradl commented on code in PR #79:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/79#discussion_r1459366515


##
flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcRowDataLookupFunction.java:
##
@@ -116,6 +124,15 @@ public void open(FunctionContext context) throws Exception 
{
 }
 }
 
+private FieldNamedPreparedStatement 
setPredicateParams(FieldNamedPreparedStatement statement)
+throws SQLException {
+for (int i = 0; i < pushdownParams.length; ++i) {

Review Comment:
   @snuyanzin Thanks I am working on this currently. I can test the method but 
am not sure how to check it worked. My current test shows that the sqltext in 
the statement is not updated with the parameters. I am continuing to look into 
this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]

2024-01-19 Thread via GitHub


libenchao commented on code in PR #79:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/79#discussion_r1458773766


##
flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcTablePlanTest.java:
##
@@ -70,6 +98,51 @@ public void testFilterPushdown() {
 "SELECT id, time_col, real_col FROM jdbc WHERE id = 91 AND 
time_col <> TIME '11:11:11' OR double_col >= -1000.23");
 }
 
+/**
+ * Note the join condition is not present in the optimized plan, as it is 
handled in the JDBC

Review Comment:
   Yes, precisely.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]

2024-01-19 Thread via GitHub


libenchao commented on code in PR #79:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/79#discussion_r1458772878


##
flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/statement/FieldNamedPreparedStatementImpl.java:
##
@@ -178,26 +178,42 @@ public void close() throws SQLException {
 // 

 
 public static FieldNamedPreparedStatement prepareStatement(
-Connection connection, String sql, String[] fieldNames) throws 
SQLException {
+Connection connection,
+String sql,
+String[] fieldNames,
+String additionalPredicates,
+int numberOfDynamicParams)
+throws SQLException {
 checkNotNull(connection, "connection must not be null.");
 checkNotNull(sql, "sql must not be null.");
 checkNotNull(fieldNames, "fieldNames must not be null.");
 
-if (sql.contains("?")) {

Review Comment:
   I guess this is also irrelevant with current issue, just as FLINK-34146.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]

2024-01-19 Thread via GitHub


davidradl commented on code in PR #79:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/79#discussion_r1458664773


##
flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcTablePlanTest.java:
##
@@ -70,6 +98,51 @@ public void testFilterPushdown() {
 "SELECT id, time_col, real_col FROM jdbc WHERE id = 91 AND 
time_col <> TIME '11:11:11' OR double_col >= -1000.23");
 }
 
+/**
+ * Note the join condition is not present in the optimized plan, as it is 
handled in the JDBC

Review Comment:
   @libenchao I am not sure what I am proposing . I assume this would be a 
change to the core Flink to include the look up join conditions in the 
optimised plan. Am I understanding this correctly ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]

2024-01-19 Thread via GitHub


davidradl commented on code in PR #79:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/79#discussion_r1458658676


##
flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/statement/FieldNamedPreparedStatementImpl.java:
##
@@ -178,26 +178,42 @@ public void close() throws SQLException {
 // 

 
 public static FieldNamedPreparedStatement prepareStatement(
-Connection connection, String sql, String[] fieldNames) throws 
SQLException {
+Connection connection,
+String sql,
+String[] fieldNames,
+String additionalPredicates,
+int numberOfDynamicParams)
+throws SQLException {
 checkNotNull(connection, "connection must not be null.");
 checkNotNull(sql, "sql must not be null.");
 checkNotNull(fieldNames, "fieldNames must not be null.");
 
-if (sql.contains("?")) {

Review Comment:
   If there is a column name with a ? character in - then it will fail - 
removing this check tolerates column names like this. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]

2024-01-18 Thread via GitHub


libenchao commented on code in PR #79:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/79#discussion_r1457562374


##
flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/statement/FieldNamedPreparedStatementImpl.java:
##
@@ -178,26 +178,42 @@ public void close() throws SQLException {
 // 

 
 public static FieldNamedPreparedStatement prepareStatement(
-Connection connection, String sql, String[] fieldNames) throws 
SQLException {
+Connection connection,
+String sql,
+String[] fieldNames,
+String additionalPredicates,
+int numberOfDynamicParams)
+throws SQLException {
 checkNotNull(connection, "connection must not be null.");
 checkNotNull(sql, "sql must not be null.");
 checkNotNull(fieldNames, "fieldNames must not be null.");
 
-if (sql.contains("?")) {

Review Comment:
   Do we need to remove this check?



##
flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcRowDataLookupFunction.java:
##
@@ -63,6 +64,9 @@ public class JdbcRowDataLookupFunction extends LookupFunction 
{
 private final JdbcRowConverter jdbcRowConverter;
 private final JdbcRowConverter lookupKeyRowConverter;
 
+private List resolvedPredicates = new ArrayList<>();
+private Serializable[] pushdownParams = new Serializable[0];

Review Comment:
   These two variable could be `final`.



##
flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/statement/FieldNamedPreparedStatementImplTest.java:
##
@@ -41,6 +41,10 @@ class FieldNamedPreparedStatementImplTest {
 private final String[] keyFields = new String[] {"id", "__field_3__"};
 private final String tableName = "tbl";
 
+private final String[] fieldNames2 =
+new String[] {"id:", "name", "email", "ts", "field1", "field_2", 
"__field_3__"};
+private final String[] keyFields2 = new String[] {"id?:", "__field_3__"};
+

Review Comment:
   This is change is not necessary anymore?



##
flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcTablePlanTest.java:
##
@@ -70,6 +98,51 @@ public void testFilterPushdown() {
 "SELECT id, time_col, real_col FROM jdbc WHERE id = 91 AND 
time_col <> TIME '11:11:11' OR double_col >= -1000.23");
 }
 
+/**
+ * Note the join condition is not present in the optimized plan, as it is 
handled in the JDBC

Review Comment:
   Can you log another Jira to improve this, scan source has this ability 
already.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]

2024-01-18 Thread via GitHub


snuyanzin commented on code in PR #79:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/79#discussion_r1458074979


##
flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcRowDataLookupFunction.java:
##
@@ -116,6 +124,15 @@ public void open(FunctionContext context) throws Exception 
{
 }
 }
 
+private FieldNamedPreparedStatement 
setPredicateParams(FieldNamedPreparedStatement statement)
+throws SQLException {
+for (int i = 0; i < pushdownParams.length; ++i) {

Review Comment:
   It seems that currently so far there no test checking what happens if we 
enter inside this loop



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]

2024-01-18 Thread via GitHub


snuyanzin commented on PR #79:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/79#issuecomment-1899301322

   it is reproducible locally with this command (also assuming that it will be 
run with jdk17 or jdk21)
   ```
   mvn clean install -Dflink.version=1.19-SNAPSHOT
   ```
   the reason is recent change in Flink main repo
   since it is not related to changes within this PR, I created a separate PR 
to fix it https://github.com/apache/flink-connector-jdbc/pull/93


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]

2024-01-18 Thread via GitHub


davidradl commented on PR #79:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/79#issuecomment-1898772077

   @libenchao I see there are failing tests, these work for me locally. Do you 
know how I get access to more details around the test failures so I can debug?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]

2024-01-18 Thread via GitHub


davidradl commented on PR #79:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/79#issuecomment-1898550767

   @snuyanzin @libenchao I have put up the changes with the requested unit 
tests on the filters. Please could you have a look?
   
   I can edit the commit message to add you are co-authors if you want - but I 
thought it would be better to get the code up first.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]

2024-01-18 Thread via GitHub


davidradl closed pull request #79: [FLINK-33365] include filters with Lookup 
joins
URL: https://github.com/apache/flink-connector-jdbc/pull/79


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]

2024-01-18 Thread via GitHub


davidradl commented on PR #79:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/79#issuecomment-1898097633

   > @davidradl Thanks for the work. I think the 
`testSelectStatementWithWeirdCharacters` you added is an orthogonal problem 
with current one, it fails without current changes, right? If yes, it can be 
another separate issue which does not block current one.
   > 
   > Current issue is mainly about how to let JDBC lookup function handle the 
pushed predicates, previously it just ignored them. So one test case I would 
like to see is one ITCase test, which shows that the result is correct with 
predicates pushed down.
   
   @libenchao Yes the join key with weird colon characters failed before. We 
are now adding filters that use the same mechanism and will not work with 
colons. The index approach works with all characters. 
   
   Based on this , I will put this code without the weird character unit test 
into this pr with the extra unit test that you suggest. I will raise a separate 
issue for the colon characters issue. @snuyanzin any concerns?
   
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]

2024-01-17 Thread via GitHub


libenchao commented on PR #79:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/79#issuecomment-1897803207

   @davidradl Thanks for the work. I think the 
`testSelectStatementWithWeirdCharacters` you added is an orthogonal problem 
with current one, it fails without current changes, right? If yes, it can be 
another separate issue which does not block current one.
   
   Current issue is mainly about how to let JDBC lookup function handle the 
pushed predicates, previously it just ignored them. So one test case I would 
like to see is one ITCase test, which shows that the result is correct with 
predicates pushed down.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]

2024-01-17 Thread via GitHub


davidradl commented on PR #79:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/79#issuecomment-1896292338

   It looks like we have used the approach from 
[https://stackoverflow.com/questions/2309970/named-parameters-in-jdbc](https://stackoverflow.com/questions/2309970/named-parameters-in-jdbc
 ). It says `Please note that the above simple example does not handle using 
named parameter twice. Nor does it handle using the : sign inside quotes.`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]

2024-01-17 Thread via GitHub


davidradl commented on PR #79:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/79#issuecomment-1896205523

   Hi @libenchao @snuyanzin ,
   I have got the POC mostly working. I added fixups and some junits to 
[https://github.com/davidradl/flink-connector-jdbc/tree/pushdown-predidates-to-lookup-function](https://github.com/davidradl/flink-connector-jdbc/tree/pushdown-predidates-to-lookup-function
 ). 
   
   This works for me locally, but it fails with column names or join keys 
containing colons. I do think this is a better approach than the indexes 
solution in the pr - thankyou @libenchao :-) 
   
I have included a failing junit showing the problem.
   
https://github.com/davidradl/flink-connector-jdbc/blob/59f47bc8f659b9a1c4440dcf0fd9ae70ecaba9c3/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/statement/FieldNamedPreparedStatementImplTest.java#L152
   
   The unit test has a join key name with a `:` at the end, the 
FieldNamedPreparedStatementImpl parseNamedStatement method finds the colon and 
nothing after it - so assume there is an invalid empty name.  
   
   Is this acceptable as `:` are unusual in column names and is baked into the 
current design of FieldNamedPreparedStatementImpl ?  If not, any thoughts on 
how to fix this? 
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]

2024-01-17 Thread via GitHub


davidradl commented on PR #79:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/79#issuecomment-1895544871

   @libenchao cloned your repo. I like the idea behind this fix, avoiding 
String manipulation.  I added in all the table plan junits from the pr 79 and 
they work. The junit testing the FieldNamedPreparedStatement with a `:` in the 
column name fails.  
   
   I then ran local tests that work with pr 79. The lookup joins hang with the 
POC for me . I am using mySQL JDBC driver. I am continuing to investigate.
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]

2024-01-17 Thread via GitHub


davidradl commented on PR #79:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/79#issuecomment-1895411935

   > @davidradl @snuyanzin I did a POC of the idea proposed above, the branch 
is here: 
https://github.com/libenchao/flink-connector-jdbc/tree/pushdown-predidates-to-lookup-function
 . Feel free to review it, if you agree with this approach, feel free to do 
following work based on it (we can add others as co-author anyway).
   > 
   > Sorry that I did not do testing, I couldn't find a ITCase test in current 
PR.
   
   thanks @libenchao I will have a look. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]

2024-01-16 Thread via GitHub


libenchao commented on PR #79:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/79#issuecomment-1894963665

   @davidradl @snuyanzin I did a POC of the idea proposed above, the branch is 
here: 
https://github.com/libenchao/flink-connector-jdbc/tree/pushdown-predidates-to-lookup-function
 . Feel free to review it, if you agree with this approach, feel free to do 
following work based on it (we can add others as co-author anyway).
   
   Sorry that I did not do testing, I couldn't find a ITCase test in current 
PR. 
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]

2024-01-16 Thread via GitHub


davidradl commented on PR #79:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/79#issuecomment-1893725108

   The existing pr fails with a filter with a colon in it. 
   SELECT * FROM a left join mariadb_catalog.menagerie.e6 FOR SYSTEM_TIME AS OF 
a.proctime on  a.ip = e6.ip and e6.`typ:e` = 1;
   
   @libenchao The FieldNamedPreparedStatementImpl logic looks for : to identify 
names and incorrectly finds key `e` (the character after the colon). 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]

2024-01-16 Thread via GitHub


davidradl commented on PR #79:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/79#issuecomment-1893480056

   @snuyanzin It fails here 
[https://github.com/apache/flink-connector-jdbc/blob/e3dd84160cd665ae17672da8b6e742e61a72a32d/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/statement/FieldNamedPreparedStatementImpl.java#L187](https://github.com/apache/flink-connector-jdbc/blob/e3dd84160cd665ae17672da8b6e742e61a72a32d/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/statement/FieldNamedPreparedStatementImpl.java#L187)
   
   For my test the sql variable is: 
   `SELECT `ip`, `type?`, `age`, `height`, `weight` FROM `e4` WHERE `ip` = :ip 
AND (`type?` = 1)`
   
   The Derby junits with the weird column names junit works - I assume it does 
not drive this code.
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]

2024-01-15 Thread via GitHub


snuyanzin commented on PR #79:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/79#issuecomment-1892607108

   @davidradl thanks for checking and letting know
   would be great if you can create unit tests confirming your findings and we 
could integrate them in connector's test
   
   currently not sure about guiding however feel free to continue and share 
yout findings


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]

2024-01-15 Thread via GitHub


davidradl commented on PR #79:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/79#issuecomment-1892371486

   @libenchao @snuyanzin Hi , I have done some more local testing. It seems 
that the fix as-is does not work with 
   1- lookup up join with a `?` in the join key
   2- lookup up join with no filters but with a RDB column name containing a `?`
   3- lookup up join with filters referencing a RDB column name containing a `?`
   
   fyi scan queries seem to work, I tested  :
   - non-joining queries work with column names with `?`s in them.
   - non-temporal joins with no filters , but with with column names with `?`s 
in it.
   - non-temporal joins with a filter specifying a column name with a  `?`s in 
it.
   
   Note before the fix, the following fails.
   1- lookup up join with a `?` in the join key
   2- lookup up join with no filters but with a RDB column name containing a `?`
   
   It looks like there are 2 issues for lookup joins, firstly that the filters 
are ignored, the second is that `?` character does not work in column names or 
lookup keys.  
   
   I hope an approach based on `FieldNamedPreparedStatement ` will resolve both 
these issues. I am happy to be involved with the implementation.  
   


   
   
   

   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]

2024-01-12 Thread via GitHub


davidradl commented on PR #79:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/79#issuecomment-1889448042

   > JdbcRowDataInputFormat
   
   @libenchao This sounds interesting. An early fix that I produced text 
replaced ? for the parameters; this was a simple change and did not need 
changes to the visitor or the ParameterizedPredicate.  The issue that 
@snuyanzin  pointed out was that quoted column names containing a `?` would 
fail. This was the reason for adding in the indexes - so we knew the index of 
the placeholder we needed to replace the parameter with and not match a column 
name character. 
   
   It seems that `FieldNamedPreparedStatement` is passed  parameters 
`Connection connection, String sql, String[] fieldNames `. It will need to 
replace the `?` and not get confused by a `?` in a column name.  I notice that 
there is parsing for the  `:` character which could also be in a quoted column 
name. 
   
   I have googled around for whether placeholder characters are allowed in 
column names - I have not found anything conclusive. If we do not need to 
support this, the fix could be massively simplified, removing the need to 
update the ParameterizedPredicate and the visitor. Do you think placeholder 
column names will work with the  `FieldNamedPreparedStatement` approach?

   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]

2024-01-12 Thread via GitHub


libenchao commented on PR #79:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/79#issuecomment-1889182758

   @snuyanzin My rough idea is like:
   * Pass `resolvedPredicates` and `pushdownParams` to 
`JdbcRowDataLookupFunction`
   * `FieldNamedPreparedStatement` may be need to adapt to allow pass 
additional predicates
   *  set params to additional predicates somewhere, maybe after 
`lookupKeyRowConverter.toExternal`
   
   My goal is to keep current `ParameterizedPredicate` and 
`JdbcFilterPushdownPreparedStatementVisitor` unchanged, and adapt 
`JdbcRowDataLookupFunction` just like how we did for `JdbcRowDataInputFormat`. 
The challenge is how to make it work with `FieldNamedPreparedStatement` now.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]

2024-01-12 Thread via GitHub


snuyanzin commented on PR #79:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/79#issuecomment-1888940790

   @libenchao thanks for your comments/feedback
   >I'll try to come out with a solution with FieldNamedPreparedStatement in 
the coming days, but I cannot guarantee that I have enough time for that. 
   if you at least share some ideas that would also be helpful, I might have 
some time during weekends to try things out


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]

2024-01-12 Thread via GitHub


libenchao commented on PR #79:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/79#issuecomment-1888926645

   > I am new to this area. I was looking to work within the existing design 
with minimal changes (as the design was added in a flip and works for scan 
queries. I think we have something that solves this critical issue - @libenchao 
would you be ok to proceed with this design and raise a subsequent issue / flip 
for a more elegant design. Or are you thinking this design is not appropriate 
and should not be merged? I need to look into your alternative proposals to 
understand them.
   
   I took a brief look at `JdbcRowDataLookupFunction`, and I now know why 
`JdbcRowDataLookupFunction` is slightly different from 
`JdbcRowDataInputFormat`, it's because `JdbcRowDataLookupFunction` uses 
`FieldNamedPreparedStatement` which does not allow `?` in the query. I'll try 
to come out with a solution with `FieldNamedPreparedStatement` in the coming 
days, but I cannot guarantee that I have enough time for that. So if you are 
confident with current PR, you can go ahead with merging it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]

2024-01-11 Thread via GitHub


davidradl commented on PR #79:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/79#issuecomment-1887600132

   @snuyanzin  I have left this with multiple commits for now . Should we 
squash to one commit and put you as a co author in the commit message? If so 
how would you like me to identify you in the `Co-authored-by: NAME 
`
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]

2024-01-11 Thread via GitHub


davidradl commented on code in PR #79:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/79#discussion_r1449155934


##
flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/ParameterizedPredicatePlanTest.java:
##
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.table;
+
+import org.apache.flink.table.planner.utils.TableTestBase;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests ParameterizedPredicate. */
+public class ParameterizedPredicatePlanTest extends TableTestBase {

Review Comment:
   @snuyanzin I assume that this comment is no longer relevant as is , as you 
have extended ParameterizedPredicatePlanTest to be a ParameterizedTest 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]

2024-01-11 Thread via GitHub


davidradl commented on code in PR #79:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/79#discussion_r1448644093


##
flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcRowDataLookupFunction.java:
##
@@ -92,9 +103,22 @@ public JdbcRowDataLookupFunction(
 })
 .toArray(DataType[]::new);
 this.maxRetryTimes = maxRetryTimes;
-this.query =
+
+final String baseSelectStatement =
 options.getDialect()
 .getSelectFromStatement(options.getTableName(), 
fieldNames, keyNames);
+if (conditions == null || conditions.length == 0) {
+this.query = baseSelectStatement;
+if (LOG.isDebugEnabled()) {
+LOG.debug("Issuing look up select {}", this.query);
+}
+} else {
+this.query = baseSelectStatement + " AND " + String.join(" AND ", 
conditions);

Review Comment:
   
[https://github.com/apache/flink-connector-jdbc/blob/e3dd84160cd665ae17672da8b6e742e61a72a32d/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/AbstractDialect.java#L198](https://github.com/apache/flink-connector-jdbc/blob/e3dd84160cd665ae17672da8b6e742e61a72a32d/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/AbstractDialect.java#L198)
 puts in a WHERE clause if there are conditions and this else is only when 
there are conditions. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]

2024-01-11 Thread via GitHub


davidradl commented on code in PR #79:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/79#discussion_r1446523771


##
flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcRowDataLookupFunction.java:
##
@@ -92,9 +103,22 @@ public JdbcRowDataLookupFunction(
 })
 .toArray(DataType[]::new);
 this.maxRetryTimes = maxRetryTimes;
-this.query =
+
+final String baseSelectStatement =
 options.getDialect()
 .getSelectFromStatement(options.getTableName(), 
fieldNames, keyNames);
+if (conditions == null || conditions.length == 0) {
+this.query = baseSelectStatement;
+if (LOG.isDebugEnabled()) {

Review Comment:
   log4jLogger does check for debug, but other Logger implementations may not . 
I see this method is used extensively in core Flink - I am inclined to leave it 
in, it means the parameters are not formatted when debug is not on. WDYT 
@libenchao 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]

2024-01-11 Thread via GitHub


davidradl commented on code in PR #79:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/79#discussion_r1448612970


##
flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcRowDataLookupFunction.java:
##
@@ -72,6 +72,17 @@ public JdbcRowDataLookupFunction(
 DataType[] fieldTypes,
 String[] keyNames,
 RowType rowType) {
+this(options, maxRetryTimes, fieldNames, fieldTypes, keyNames, 
rowType, null);
+}
+
+public JdbcRowDataLookupFunction(

Review Comment:
   @libenchao yes we could have done this . I think it is nice for the caller 
to not have to include a null value for a parameter it is not concerned with 
and overload the constructor. If you feel strongly about this style I can 
change it. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]

2024-01-10 Thread via GitHub


davidradl commented on PR #79:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/79#issuecomment-1885629516

   > I haven't gone through all the code and comments yet, there are some 
inline comments below, however, two questions came into my mind while reviewing 
the PR:
   > 
   > * Why do you need to refactor `ParameterizedPredicate` and 
`JdbcFilterPushdownPreparedStatementVisitor`? Can you just adapt 
`JdbcRowDataLookupFunction` just like `JdbcRowDataInputFormat`?
   > * Have you considered `PrepareStatement` way to handle literals, as we 
have already discussed in the Jira? (I'm not sure about this, but looking at 
`JdbcRowDataLookupFunction`, it seems no place are handling this, so I assume 
that the implementation does not address that)
   
   I am new to this area. I was looking to work within the existing design with 
minimal changes (as the design was added ind a flip and works for scan queries. 
I think we have something that solves this critical issue - @libenchao would 
you be ok to proceed with this design and raise a subsequent issue / flip for a 
more elegant design. Or are you thinking this design is not appropriate and 
should not be merged? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



  1   2   3   >