Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]
davidradl commented on PR #97: URL: https://github.com/apache/flink-connector-jdbc/pull/97#issuecomment-1912127835 Thanks @snuyanzin :-) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]
boring-cyborg[bot] commented on PR #97: URL: https://github.com/apache/flink-connector-jdbc/pull/97#issuecomment-1912124003 Awesome work, congrats on your first merged pull request! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]
snuyanzin merged PR #97: URL: https://github.com/apache/flink-connector-jdbc/pull/97 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]
davidradl commented on PR #97: URL: https://github.com/apache/flink-connector-jdbc/pull/97#issuecomment-1912096605 @snuyanzin as discussed here is the pr for the back port. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]
davidradl opened a new pull request, #97: URL: https://github.com/apache/flink-connector-jdbc/pull/97 This closes apache/flink-connector-jdbc#79 Creating as draft while testing -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]
snuyanzin commented on PR #79: URL: https://github.com/apache/flink-connector-jdbc/pull/79#issuecomment-1912023410 Merged as 5a90eb0a73ca0ac8475331a74ae8f7c1c01646bb Thanks everyone for making it -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]
snuyanzin closed pull request #79: [FLINK-33365] include filters with Lookup joins URL: https://github.com/apache/flink-connector-jdbc/pull/79 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]
davidradl commented on PR #79: URL: https://github.com/apache/flink-connector-jdbc/pull/79#issuecomment-1911754647 > > @libenchao a question to you before merging `squash` also removed the info about all the authors, are you ok if I add you as a co-author since it is based on your work? Or you can do it on your own and merge > > Sure, thanks. @libenchao @snuyanzin I have amended the commit message and added you both as co-authors. I made an educated guess as to the name (git id) and email to use. If you prefer something else - let me know. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]
libenchao commented on PR #79: URL: https://github.com/apache/flink-connector-jdbc/pull/79#issuecomment-1911291438 > @libenchao a question to you before merging `squash` also removed the info about all the authors, are you ok if I add you as a co-author since it is based on your work? Or you can do it on your own and merge Sure, thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]
snuyanzin commented on PR #79: URL: https://github.com/apache/flink-connector-jdbc/pull/79#issuecomment-1910975978 @libenchao a question to you before merging `squash` also removed the info about all the authors, are you ok if I add you as a co-author since it is based on your work? Or you can do it on your own and merge -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]
davidradl commented on PR #79: URL: https://github.com/apache/flink-connector-jdbc/pull/79#issuecomment-1910329090 > yes, backport would make sense, i guess it could be cherry-picked @snuyanzin once this pr is merged - I assume I raise a second pr against this issue, would you be ok to merge that please? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]
snuyanzin commented on PR #79: URL: https://github.com/apache/flink-connector-jdbc/pull/79#issuecomment-1910277677 yes, backport would make sense, i guess it could be cherry-picked -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]
davidradl commented on PR #79: URL: https://github.com/apache/flink-connector-jdbc/pull/79#issuecomment-1909958760 > for future: it is better not to squash commits unless it was requested explicitely by reviewer. The reason is that since there is no tool in GitHub confirming that the set of changes is same before and after sqaush then ideally need to go again through the changes and double check it... @snuyanzin OK, I see it makes review more difficult . I have been asked in the past to squash - as per [the process](https://cwiki.apache.org/confluence/display/FLINK/Merging+Pull+Requests), where is says `MUST squash fixup commits (for example commits that are addressing review comments).` I think it is pragmatic to follow what you say and only squash on the committers request. I notice in this process, it talks about back porting. Should I back port for this fix to the last connector release? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]
snuyanzin commented on PR #79: URL: https://github.com/apache/flink-connector-jdbc/pull/79#issuecomment-1909908037 for future: it is better not to squash commits unless it was requested explicitely by reviewer. The reason is that since there is no tool in GitHub confirming that the set of changes is same before and after sqaush then ideally need to go again through the changes and double check it... -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]
davidradl commented on PR #79: URL: https://github.com/apache/flink-connector-jdbc/pull/79#issuecomment-1909799398 @snuyanzin @libenchao I have squashed the commit. Thank you both for all your help and support on this. Are you OK to merge? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]
davidradl closed pull request #79: [FLINK-33365] include filters with Lookup joins URL: https://github.com/apache/flink-connector-jdbc/pull/79 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]
davidradl commented on PR #79: URL: https://github.com/apache/flink-connector-jdbc/pull/79#issuecomment-1906068499 @libenchao @snuyanzin I have consolidated the feedback into this branch. When you both approve I will squash the commits ready for merge. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]
davidradl commented on code in PR #79: URL: https://github.com/apache/flink-connector-jdbc/pull/79#discussion_r1463286945 ## flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcRowDataLookupFunction.java: ## @@ -71,7 +75,9 @@ public JdbcRowDataLookupFunction( String[] fieldNames, DataType[] fieldTypes, String[] keyNames, -RowType rowType) { +RowType rowType, Review Comment: Interesting - I see. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]
snuyanzin commented on code in PR #79: URL: https://github.com/apache/flink-connector-jdbc/pull/79#discussion_r1463273839 ## flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcRowDataLookupFunction.java: ## @@ -71,7 +75,9 @@ public JdbcRowDataLookupFunction( String[] fieldNames, DataType[] fieldTypes, String[] keyNames, -RowType rowType) { +RowType rowType, Review Comment: yes, however it could be calculated from `fieldNames` and `fieldTypes` exactly in same way like it is done at lines 109-110 anyway this is just a thing to note, it seems not related to this PR -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]
davidradl commented on code in PR #79: URL: https://github.com/apache/flink-connector-jdbc/pull/79#discussion_r1463211057 ## flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcRowDataLookupFunction.java: ## @@ -71,7 +75,9 @@ public JdbcRowDataLookupFunction( String[] fieldNames, DataType[] fieldTypes, String[] keyNames, -RowType rowType) { +RowType rowType, Review Comment: it is used on line 108this.jdbcRowConverter = jdbcDialect.getRowConverter(rowType); It looks like the field names are used to check that the key is a field, then the field types to look up the key type. Are you thinking of some more optimal way of doing this? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]
davidradl commented on code in PR #79: URL: https://github.com/apache/flink-connector-jdbc/pull/79#discussion_r1463211057 ## flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcRowDataLookupFunction.java: ## @@ -71,7 +75,9 @@ public JdbcRowDataLookupFunction( String[] fieldNames, DataType[] fieldTypes, String[] keyNames, -RowType rowType) { +RowType rowType, Review Comment: it is used on line 108this.jdbcRowConverter = jdbcDialect.getRowConverter(rowType); -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]
davidradl commented on code in PR #79: URL: https://github.com/apache/flink-connector-jdbc/pull/79#discussion_r1463161889 ## flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcRowDataLookupFunctionTest.java: ## @@ -82,6 +103,114 @@ void testLookup(boolean withFailure) throws Exception { assertThat(result).isEqualTo(expected); } +@ParameterizedTest +@MethodSource("lookupWithPredicatesProvider") +void testEval(TestSpec testSpec) throws Exception { +JdbcRowDataLookupFunction lookupFunction = +buildRowDataLookupFunctionWithPredicates( +testSpec.withFailure, testSpec.resolvedPredicates, testSpec.pushdownParams); + +ListOutputCollector collector = new ListOutputCollector(); +lookupFunction.setCollector(collector); +lookupFunction.open(null); +lookupFunction.eval(testSpec.keys); + +if (testSpec.withFailure) { +// Close connection here, and this will be recovered by retry +if (lookupFunction.getDbConnection() != null) { +lookupFunction.getDbConnection().close(); +} +} + +List result = +new ArrayList<>(collector.getOutputs()) + .stream().map(RowData::toString).sorted().collect(Collectors.toList()); +Collections.sort(testSpec.expected); +assertThat(result).isEqualTo(testSpec.expected); +} + +private static class TestSpec { Review Comment: good spot - I will add one -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]
davidradl commented on code in PR #79: URL: https://github.com/apache/flink-connector-jdbc/pull/79#discussion_r1463160400 ## flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcRowDataLookupFunctionTest.java: ## @@ -82,6 +103,121 @@ void testLookup(boolean withFailure) throws Exception { assertThat(result).isEqualTo(expected); } +@ParameterizedTest +@MethodSource("lookupWithPredicatesProvider") +public void testEval(TestSpec testSpec) throws Exception { +JdbcRowDataLookupFunction lookupFunction = +buildRowDataLookupFunctionWithPredicates( +testSpec.withFailure, testSpec.resolvedPredicates, testSpec.pushdownParams); + +ListOutputCollector collector = new ListOutputCollector(); +lookupFunction.setCollector(collector); +lookupFunction.open(null); +lookupFunction.eval(testSpec.keys); + +if (testSpec.withFailure) { +// Close connection here, and this will be recovered by retry +if (lookupFunction.getDbConnection() != null) { +lookupFunction.getDbConnection().close(); +} +} + +List result = +new ArrayList<>(collector.getOutputs()) + .stream().map(RowData::toString).sorted().collect(Collectors.toList()); +Collections.sort(testSpec.expected); +assertThat(result).isEqualTo(testSpec.expected); +} + +private static class TestSpec { + +private boolean withFailure; +private final List resolvedPredicates; +private final Serializable[] pushdownParams; +private final Object[] keys; +private List expected; + +private TestSpec( +boolean withFailure, +List resolvedPredicates, +Serializable[] pushdownParams, +Object[] keys, +List expected) { +this.withFailure = withFailure; +this.resolvedPredicates = resolvedPredicates; +this.pushdownParams = pushdownParams; +this.keys = keys; +this.expected = expected; +} +} + +static Collection lookupWithPredicatesProvider() { +return ImmutableList.builder() +.addAll(getTestSpecs(true)) +.addAll(getTestSpecs(false)) +.build(); +} + +@NotNull +private static ImmutableList getTestSpecs(boolean withFailure) { +return ImmutableList.of( +// var char single filter +new TestSpec( +withFailure, +Arrays.asList(new String[] {"(comment1 = ?)"}), +new Serializable[] {"11-c1-v1"}, +new Object[] {1, StringData.fromString("1")}, +Arrays.asList(new String[] {"+I(1,1,11-c1-v1,11-c2-v1,100.1011,1.1,2.2)"})), Review Comment: sorry my bad - you are right - I have fixed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]
snuyanzin commented on code in PR #79: URL: https://github.com/apache/flink-connector-jdbc/pull/79#discussion_r1463158838 ## flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcRowDataLookupFunctionTest.java: ## @@ -82,6 +103,114 @@ void testLookup(boolean withFailure) throws Exception { assertThat(result).isEqualTo(expected); } +@ParameterizedTest +@MethodSource("lookupWithPredicatesProvider") +void testEval(TestSpec testSpec) throws Exception { +JdbcRowDataLookupFunction lookupFunction = +buildRowDataLookupFunctionWithPredicates( +testSpec.withFailure, testSpec.resolvedPredicates, testSpec.pushdownParams); + +ListOutputCollector collector = new ListOutputCollector(); +lookupFunction.setCollector(collector); +lookupFunction.open(null); +lookupFunction.eval(testSpec.keys); + +if (testSpec.withFailure) { +// Close connection here, and this will be recovered by retry +if (lookupFunction.getDbConnection() != null) { +lookupFunction.getDbConnection().close(); +} +} + +List result = +new ArrayList<>(collector.getOutputs()) + .stream().map(RowData::toString).sorted().collect(Collectors.toList()); +Collections.sort(testSpec.expected); +assertThat(result).isEqualTo(testSpec.expected); +} + +private static class TestSpec { Review Comment: This class should have `toString()` otherwise it is impossible to navigate between different tests involving objects of this class as a parameter -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]
snuyanzin commented on code in PR #79: URL: https://github.com/apache/flink-connector-jdbc/pull/79#discussion_r1463158838 ## flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcRowDataLookupFunctionTest.java: ## @@ -82,6 +103,114 @@ void testLookup(boolean withFailure) throws Exception { assertThat(result).isEqualTo(expected); } +@ParameterizedTest +@MethodSource("lookupWithPredicatesProvider") +void testEval(TestSpec testSpec) throws Exception { +JdbcRowDataLookupFunction lookupFunction = +buildRowDataLookupFunctionWithPredicates( +testSpec.withFailure, testSpec.resolvedPredicates, testSpec.pushdownParams); + +ListOutputCollector collector = new ListOutputCollector(); +lookupFunction.setCollector(collector); +lookupFunction.open(null); +lookupFunction.eval(testSpec.keys); + +if (testSpec.withFailure) { +// Close connection here, and this will be recovered by retry +if (lookupFunction.getDbConnection() != null) { +lookupFunction.getDbConnection().close(); +} +} + +List result = +new ArrayList<>(collector.getOutputs()) + .stream().map(RowData::toString).sorted().collect(Collectors.toList()); +Collections.sort(testSpec.expected); +assertThat(result).isEqualTo(testSpec.expected); +} + +private static class TestSpec { Review Comment: This class should have `toString()` otherwise it is impossible to navigate between different tests involving this as a parameter -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]
snuyanzin commented on code in PR #79: URL: https://github.com/apache/flink-connector-jdbc/pull/79#discussion_r1463156166 ## flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcRowDataLookupFunctionTest.java: ## @@ -82,6 +103,121 @@ void testLookup(boolean withFailure) throws Exception { assertThat(result).isEqualTo(expected); } +@ParameterizedTest +@MethodSource("lookupWithPredicatesProvider") +public void testEval(TestSpec testSpec) throws Exception { +JdbcRowDataLookupFunction lookupFunction = +buildRowDataLookupFunctionWithPredicates( +testSpec.withFailure, testSpec.resolvedPredicates, testSpec.pushdownParams); + +ListOutputCollector collector = new ListOutputCollector(); +lookupFunction.setCollector(collector); +lookupFunction.open(null); +lookupFunction.eval(testSpec.keys); + +if (testSpec.withFailure) { +// Close connection here, and this will be recovered by retry +if (lookupFunction.getDbConnection() != null) { +lookupFunction.getDbConnection().close(); +} +} + +List result = +new ArrayList<>(collector.getOutputs()) + .stream().map(RowData::toString).sorted().collect(Collectors.toList()); +Collections.sort(testSpec.expected); +assertThat(result).isEqualTo(testSpec.expected); +} + +private static class TestSpec { + +private boolean withFailure; +private final List resolvedPredicates; +private final Serializable[] pushdownParams; +private final Object[] keys; +private List expected; + +private TestSpec( +boolean withFailure, +List resolvedPredicates, +Serializable[] pushdownParams, +Object[] keys, +List expected) { +this.withFailure = withFailure; +this.resolvedPredicates = resolvedPredicates; +this.pushdownParams = pushdownParams; +this.keys = keys; +this.expected = expected; +} +} + +static Collection lookupWithPredicatesProvider() { +return ImmutableList.builder() +.addAll(getTestSpecs(true)) +.addAll(getTestSpecs(false)) +.build(); +} + +@NotNull +private static ImmutableList getTestSpecs(boolean withFailure) { +return ImmutableList.of( +// var char single filter +new TestSpec( +withFailure, +Arrays.asList(new String[] {"(comment1 = ?)"}), +new Serializable[] {"11-c1-v1"}, +new Object[] {1, StringData.fromString("1")}, +Arrays.asList(new String[] {"+I(1,1,11-c1-v1,11-c2-v1,100.1011,1.1,2.2)"})), +// decimal single filter +new TestSpec( +withFailure, +Arrays.asList(new String[] {"(decimal_col = ?)"}), +new Serializable[] {BigDecimal.valueOf(100.1011)}, +new Object[] {1, StringData.fromString("1")}, +Arrays.asList(new String[] {"+I(1,1,11-c1-v1,11-c2-v1,100.1011,1.1,2.2)"})), Review Comment: could you please explain where we can find the second value? I see only one `"+I(1,1,11-c1-v1,11-c2-v1,100.1011,1.1,2.2)"` contained in that list -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]
davidradl commented on code in PR #79: URL: https://github.com/apache/flink-connector-jdbc/pull/79#discussion_r1463154006 ## flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcRowDataLookupFunctionTest.java: ## @@ -82,6 +103,121 @@ void testLookup(boolean withFailure) throws Exception { assertThat(result).isEqualTo(expected); } +@ParameterizedTest +@MethodSource("lookupWithPredicatesProvider") +public void testEval(TestSpec testSpec) throws Exception { +JdbcRowDataLookupFunction lookupFunction = +buildRowDataLookupFunctionWithPredicates( +testSpec.withFailure, testSpec.resolvedPredicates, testSpec.pushdownParams); + +ListOutputCollector collector = new ListOutputCollector(); +lookupFunction.setCollector(collector); +lookupFunction.open(null); +lookupFunction.eval(testSpec.keys); + +if (testSpec.withFailure) { +// Close connection here, and this will be recovered by retry +if (lookupFunction.getDbConnection() != null) { +lookupFunction.getDbConnection().close(); +} +} + +List result = +new ArrayList<>(collector.getOutputs()) + .stream().map(RowData::toString).sorted().collect(Collectors.toList()); +Collections.sort(testSpec.expected); +assertThat(result).isEqualTo(testSpec.expected); +} + +private static class TestSpec { + +private boolean withFailure; +private final List resolvedPredicates; +private final Serializable[] pushdownParams; +private final Object[] keys; +private List expected; + +private TestSpec( +boolean withFailure, +List resolvedPredicates, +Serializable[] pushdownParams, +Object[] keys, +List expected) { +this.withFailure = withFailure; +this.resolvedPredicates = resolvedPredicates; +this.pushdownParams = pushdownParams; +this.keys = keys; +this.expected = expected; +} +} + +static Collection lookupWithPredicatesProvider() { +return ImmutableList.builder() +.addAll(getTestSpecs(true)) +.addAll(getTestSpecs(false)) +.build(); +} + +@NotNull +private static ImmutableList getTestSpecs(boolean withFailure) { +return ImmutableList.of( +// var char single filter +new TestSpec( +withFailure, +Arrays.asList(new String[] {"(comment1 = ?)"}), +new Serializable[] {"11-c1-v1"}, +new Object[] {1, StringData.fromString("1")}, +Arrays.asList(new String[] {"+I(1,1,11-c1-v1,11-c2-v1,100.1011,1.1,2.2)"})), +// decimal single filter +new TestSpec( +withFailure, +Arrays.asList(new String[] {"(decimal_col = ?)"}), +new Serializable[] {BigDecimal.valueOf(100.1011)}, +new Object[] {1, StringData.fromString("1")}, +Arrays.asList(new String[] {"+I(1,1,11-c1-v1,11-c2-v1,100.1011,1.1,2.2)"})), Review Comment: This is a list of 2 so cannot be used with a singleton list.I have removed the unnecessary `new String[] {}` in line with your other feedback. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]
snuyanzin commented on code in PR #79: URL: https://github.com/apache/flink-connector-jdbc/pull/79#discussion_r1463147700 ## flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcRowDataLookupFunctionTest.java: ## @@ -82,6 +103,121 @@ void testLookup(boolean withFailure) throws Exception { assertThat(result).isEqualTo(expected); } +@ParameterizedTest +@MethodSource("lookupWithPredicatesProvider") +public void testEval(TestSpec testSpec) throws Exception { +JdbcRowDataLookupFunction lookupFunction = +buildRowDataLookupFunctionWithPredicates( +testSpec.withFailure, testSpec.resolvedPredicates, testSpec.pushdownParams); + +ListOutputCollector collector = new ListOutputCollector(); +lookupFunction.setCollector(collector); +lookupFunction.open(null); +lookupFunction.eval(testSpec.keys); + +if (testSpec.withFailure) { +// Close connection here, and this will be recovered by retry +if (lookupFunction.getDbConnection() != null) { +lookupFunction.getDbConnection().close(); +} +} + +List result = +new ArrayList<>(collector.getOutputs()) + .stream().map(RowData::toString).sorted().collect(Collectors.toList()); +Collections.sort(testSpec.expected); +assertThat(result).isEqualTo(testSpec.expected); +} + +private static class TestSpec { + +private boolean withFailure; +private final List resolvedPredicates; +private final Serializable[] pushdownParams; +private final Object[] keys; +private List expected; + +private TestSpec( +boolean withFailure, +List resolvedPredicates, +Serializable[] pushdownParams, +Object[] keys, +List expected) { +this.withFailure = withFailure; +this.resolvedPredicates = resolvedPredicates; +this.pushdownParams = pushdownParams; +this.keys = keys; +this.expected = expected; +} +} + +static Collection lookupWithPredicatesProvider() { +return ImmutableList.builder() +.addAll(getTestSpecs(true)) +.addAll(getTestSpecs(false)) +.build(); +} + +@NotNull +private static ImmutableList getTestSpecs(boolean withFailure) { +return ImmutableList.of( +// var char single filter +new TestSpec( +withFailure, +Arrays.asList(new String[] {"(comment1 = ?)"}), +new Serializable[] {"11-c1-v1"}, +new Object[] {1, StringData.fromString("1")}, +Arrays.asList(new String[] {"+I(1,1,11-c1-v1,11-c2-v1,100.1011,1.1,2.2)"})), Review Comment: I see only one value `"+I(1,1,11-c1-v1,11-c2-v1,100.1011,1.1,2.2)"` what is the second? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]
davidradl commented on code in PR #79: URL: https://github.com/apache/flink-connector-jdbc/pull/79#discussion_r1463137596 ## flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcRowDataLookupFunctionTest.java: ## @@ -82,6 +103,121 @@ void testLookup(boolean withFailure) throws Exception { assertThat(result).isEqualTo(expected); } +@ParameterizedTest +@MethodSource("lookupWithPredicatesProvider") +public void testEval(TestSpec testSpec) throws Exception { +JdbcRowDataLookupFunction lookupFunction = +buildRowDataLookupFunctionWithPredicates( +testSpec.withFailure, testSpec.resolvedPredicates, testSpec.pushdownParams); + +ListOutputCollector collector = new ListOutputCollector(); +lookupFunction.setCollector(collector); +lookupFunction.open(null); +lookupFunction.eval(testSpec.keys); + +if (testSpec.withFailure) { +// Close connection here, and this will be recovered by retry +if (lookupFunction.getDbConnection() != null) { +lookupFunction.getDbConnection().close(); +} +} + +List result = +new ArrayList<>(collector.getOutputs()) + .stream().map(RowData::toString).sorted().collect(Collectors.toList()); +Collections.sort(testSpec.expected); +assertThat(result).isEqualTo(testSpec.expected); +} + +private static class TestSpec { + +private boolean withFailure; +private final List resolvedPredicates; +private final Serializable[] pushdownParams; +private final Object[] keys; +private List expected; + +private TestSpec( +boolean withFailure, +List resolvedPredicates, +Serializable[] pushdownParams, +Object[] keys, +List expected) { +this.withFailure = withFailure; +this.resolvedPredicates = resolvedPredicates; +this.pushdownParams = pushdownParams; +this.keys = keys; +this.expected = expected; +} +} + +static Collection lookupWithPredicatesProvider() { +return ImmutableList.builder() +.addAll(getTestSpecs(true)) +.addAll(getTestSpecs(false)) +.build(); +} + +@NotNull +private static ImmutableList getTestSpecs(boolean withFailure) { +return ImmutableList.of( +// var char single filter +new TestSpec( +withFailure, +Arrays.asList(new String[] {"(comment1 = ?)"}), +new Serializable[] {"11-c1-v1"}, +new Object[] {1, StringData.fromString("1")}, +Arrays.asList(new String[] {"+I(1,1,11-c1-v1,11-c2-v1,100.1011,1.1,2.2)"})), Review Comment: This is a list of 2 so cannot be made into a singletonList. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]
davidradl commented on PR #79: URL: https://github.com/apache/flink-connector-jdbc/pull/79#issuecomment-1905821215 > I noticed that there addressed only comment about `final` vars in `flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcRowDataLookupFunction.java` > > why we are talking about different branch btw? Which one is going to be merged? @snuyanzin I am not sure - I will bring @libenchao s branch into this branch then address your feedback; so we know where we are. I will leave the changes unsquashed until there is a good review from you both. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]
snuyanzin commented on PR #79: URL: https://github.com/apache/flink-connector-jdbc/pull/79#issuecomment-1905810643 I noticed that there addressed only comment about `final` vars in flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcRowDataLookupFunction.java why we are talking about different branch btw? Which one is going to be merged? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]
davidradl commented on PR #79: URL: https://github.com/apache/flink-connector-jdbc/pull/79#issuecomment-1905803529 @snuyanzin Can I check - did you see the cosmetic changes in [https://github.com/libenchao/flink-connector-jdbc/commits/33365-lookup-join-predicate-pushdown/](https://github.com/libenchao/flink-connector-jdbc/commits/33365-lookup-join-predicate-pushdown/) that @libenchao provided - I think some of your feedback is addressed there. @snuyanzin If you are ok with @libenchao's changes - we can proceed to merge, if not I can merge his commit into my branch and look at addressing your additional feedback. I assume there is a linter you are both using to spot the missing finals and the like, that is not in the build. What linter are you using so I can avoid these issues in the future. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]
snuyanzin commented on code in PR #79: URL: https://github.com/apache/flink-connector-jdbc/pull/79#discussion_r1463099347 ## flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcRowDataLookupFunctionTest.java: ## @@ -82,6 +103,121 @@ void testLookup(boolean withFailure) throws Exception { assertThat(result).isEqualTo(expected); } +@ParameterizedTest +@MethodSource("lookupWithPredicatesProvider") +public void testEval(TestSpec testSpec) throws Exception { +JdbcRowDataLookupFunction lookupFunction = +buildRowDataLookupFunctionWithPredicates( +testSpec.withFailure, testSpec.resolvedPredicates, testSpec.pushdownParams); + +ListOutputCollector collector = new ListOutputCollector(); +lookupFunction.setCollector(collector); +lookupFunction.open(null); +lookupFunction.eval(testSpec.keys); + +if (testSpec.withFailure) { +// Close connection here, and this will be recovered by retry +if (lookupFunction.getDbConnection() != null) { +lookupFunction.getDbConnection().close(); +} +} + +List result = +new ArrayList<>(collector.getOutputs()) + .stream().map(RowData::toString).sorted().collect(Collectors.toList()); +Collections.sort(testSpec.expected); +assertThat(result).isEqualTo(testSpec.expected); +} + +private static class TestSpec { + +private boolean withFailure; +private final List resolvedPredicates; +private final Serializable[] pushdownParams; +private final Object[] keys; +private List expected; + +private TestSpec( +boolean withFailure, +List resolvedPredicates, +Serializable[] pushdownParams, +Object[] keys, +List expected) { +this.withFailure = withFailure; +this.resolvedPredicates = resolvedPredicates; +this.pushdownParams = pushdownParams; +this.keys = keys; +this.expected = expected; +} +} + +static Collection lookupWithPredicatesProvider() { +return ImmutableList.builder() +.addAll(getTestSpecs(true)) +.addAll(getTestSpecs(false)) +.build(); +} + +@NotNull +private static ImmutableList getTestSpecs(boolean withFailure) { +return ImmutableList.of( +// var char single filter +new TestSpec( +withFailure, +Arrays.asList(new String[] {"(comment1 = ?)"}), +new Serializable[] {"11-c1-v1"}, +new Object[] {1, StringData.fromString("1")}, +Arrays.asList(new String[] {"+I(1,1,11-c1-v1,11-c2-v1,100.1011,1.1,2.2)"})), +// decimal single filter +new TestSpec( +withFailure, +Arrays.asList(new String[] {"(decimal_col = ?)"}), +new Serializable[] {BigDecimal.valueOf(100.1011)}, +new Object[] {1, StringData.fromString("1")}, +Arrays.asList(new String[] {"+I(1,1,11-c1-v1,11-c2-v1,100.1011,1.1,2.2)"})), +// real single filter +new TestSpec( +withFailure, +Arrays.asList(new String[] {"(real_col = ?)"}), +new Serializable[] {2.2}, +new Object[] {1, StringData.fromString("1")}, +Arrays.asList( +new String[] { + "+I(1,1,11-c1-v1,11-c2-v1,100.1011,1.1,2.2)", + "+I(1,1,11-c1-v2,11-c2-v2,100.2022,2.2,2.2)" +})), +// double single filter +new TestSpec( +withFailure, +Arrays.asList(new String[] {"(double_col = ?)"}), +new Serializable[] { +1.1, +}, +new Object[] {1, StringData.fromString("1")}, +Arrays.asList(new String[] {"+I(1,1,11-c1-v1,11-c2-v1,100.1011,1.1,2.2)"})), +// and +new TestSpec( +withFailure, +Arrays.asList(new String[] {"(real_col = ?) AND (double_col = ?)"}), +new Serializable[] {2.2, 1.1}, +new Object[] {1, StringData.fromString("1")}, +Arrays.asList(new String[] {"+I(1,1,11-c1-v1,11-c2-v1,100.1011,1.1,2.2)"})), +// or +new TestSpec( +withFailure, +
Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]
snuyanzin commented on code in PR #79: URL: https://github.com/apache/flink-connector-jdbc/pull/79#discussion_r1463099077 ## flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcRowDataLookupFunctionTest.java: ## @@ -82,6 +103,121 @@ void testLookup(boolean withFailure) throws Exception { assertThat(result).isEqualTo(expected); } +@ParameterizedTest +@MethodSource("lookupWithPredicatesProvider") +public void testEval(TestSpec testSpec) throws Exception { +JdbcRowDataLookupFunction lookupFunction = +buildRowDataLookupFunctionWithPredicates( +testSpec.withFailure, testSpec.resolvedPredicates, testSpec.pushdownParams); + +ListOutputCollector collector = new ListOutputCollector(); +lookupFunction.setCollector(collector); +lookupFunction.open(null); +lookupFunction.eval(testSpec.keys); + +if (testSpec.withFailure) { +// Close connection here, and this will be recovered by retry +if (lookupFunction.getDbConnection() != null) { +lookupFunction.getDbConnection().close(); +} +} + +List result = +new ArrayList<>(collector.getOutputs()) + .stream().map(RowData::toString).sorted().collect(Collectors.toList()); +Collections.sort(testSpec.expected); +assertThat(result).isEqualTo(testSpec.expected); +} + +private static class TestSpec { + +private boolean withFailure; +private final List resolvedPredicates; +private final Serializable[] pushdownParams; +private final Object[] keys; +private List expected; + +private TestSpec( +boolean withFailure, +List resolvedPredicates, +Serializable[] pushdownParams, +Object[] keys, +List expected) { +this.withFailure = withFailure; +this.resolvedPredicates = resolvedPredicates; +this.pushdownParams = pushdownParams; +this.keys = keys; +this.expected = expected; +} +} + +static Collection lookupWithPredicatesProvider() { +return ImmutableList.builder() +.addAll(getTestSpecs(true)) +.addAll(getTestSpecs(false)) +.build(); +} + +@NotNull +private static ImmutableList getTestSpecs(boolean withFailure) { +return ImmutableList.of( +// var char single filter +new TestSpec( +withFailure, +Arrays.asList(new String[] {"(comment1 = ?)"}), +new Serializable[] {"11-c1-v1"}, +new Object[] {1, StringData.fromString("1")}, +Arrays.asList(new String[] {"+I(1,1,11-c1-v1,11-c2-v1,100.1011,1.1,2.2)"})), +// decimal single filter +new TestSpec( +withFailure, +Arrays.asList(new String[] {"(decimal_col = ?)"}), +new Serializable[] {BigDecimal.valueOf(100.1011)}, +new Object[] {1, StringData.fromString("1")}, +Arrays.asList(new String[] {"+I(1,1,11-c1-v1,11-c2-v1,100.1011,1.1,2.2)"})), +// real single filter +new TestSpec( +withFailure, +Arrays.asList(new String[] {"(real_col = ?)"}), +new Serializable[] {2.2}, +new Object[] {1, StringData.fromString("1")}, +Arrays.asList( +new String[] { + "+I(1,1,11-c1-v1,11-c2-v1,100.1011,1.1,2.2)", + "+I(1,1,11-c1-v2,11-c2-v2,100.2022,2.2,2.2)" +})), +// double single filter +new TestSpec( +withFailure, +Arrays.asList(new String[] {"(double_col = ?)"}), +new Serializable[] { +1.1, +}, +new Object[] {1, StringData.fromString("1")}, +Arrays.asList(new String[] {"+I(1,1,11-c1-v1,11-c2-v1,100.1011,1.1,2.2)"})), +// and +new TestSpec( +withFailure, +Arrays.asList(new String[] {"(real_col = ?) AND (double_col = ?)"}), +new Serializable[] {2.2, 1.1}, +new Object[] {1, StringData.fromString("1")}, +Arrays.asList(new String[] {"+I(1,1,11-c1-v1,11-c2-v1,100.1011,1.1,2.2)"})), +// or +new TestSpec( +withFailure, +
Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]
snuyanzin commented on code in PR #79: URL: https://github.com/apache/flink-connector-jdbc/pull/79#discussion_r1463098844 ## flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcRowDataLookupFunctionTest.java: ## @@ -82,6 +103,121 @@ void testLookup(boolean withFailure) throws Exception { assertThat(result).isEqualTo(expected); } +@ParameterizedTest +@MethodSource("lookupWithPredicatesProvider") +public void testEval(TestSpec testSpec) throws Exception { +JdbcRowDataLookupFunction lookupFunction = +buildRowDataLookupFunctionWithPredicates( +testSpec.withFailure, testSpec.resolvedPredicates, testSpec.pushdownParams); + +ListOutputCollector collector = new ListOutputCollector(); +lookupFunction.setCollector(collector); +lookupFunction.open(null); +lookupFunction.eval(testSpec.keys); + +if (testSpec.withFailure) { +// Close connection here, and this will be recovered by retry +if (lookupFunction.getDbConnection() != null) { +lookupFunction.getDbConnection().close(); +} +} + +List result = +new ArrayList<>(collector.getOutputs()) + .stream().map(RowData::toString).sorted().collect(Collectors.toList()); +Collections.sort(testSpec.expected); +assertThat(result).isEqualTo(testSpec.expected); +} + +private static class TestSpec { + +private boolean withFailure; +private final List resolvedPredicates; +private final Serializable[] pushdownParams; +private final Object[] keys; +private List expected; + +private TestSpec( +boolean withFailure, +List resolvedPredicates, +Serializable[] pushdownParams, +Object[] keys, +List expected) { +this.withFailure = withFailure; +this.resolvedPredicates = resolvedPredicates; +this.pushdownParams = pushdownParams; +this.keys = keys; +this.expected = expected; +} +} + +static Collection lookupWithPredicatesProvider() { +return ImmutableList.builder() +.addAll(getTestSpecs(true)) +.addAll(getTestSpecs(false)) +.build(); +} + +@NotNull +private static ImmutableList getTestSpecs(boolean withFailure) { +return ImmutableList.of( +// var char single filter +new TestSpec( +withFailure, +Arrays.asList(new String[] {"(comment1 = ?)"}), +new Serializable[] {"11-c1-v1"}, +new Object[] {1, StringData.fromString("1")}, +Arrays.asList(new String[] {"+I(1,1,11-c1-v1,11-c2-v1,100.1011,1.1,2.2)"})), +// decimal single filter +new TestSpec( +withFailure, +Arrays.asList(new String[] {"(decimal_col = ?)"}), +new Serializable[] {BigDecimal.valueOf(100.1011)}, +new Object[] {1, StringData.fromString("1")}, +Arrays.asList(new String[] {"+I(1,1,11-c1-v1,11-c2-v1,100.1011,1.1,2.2)"})), +// real single filter +new TestSpec( +withFailure, +Arrays.asList(new String[] {"(real_col = ?)"}), +new Serializable[] {2.2}, +new Object[] {1, StringData.fromString("1")}, +Arrays.asList( +new String[] { + "+I(1,1,11-c1-v1,11-c2-v1,100.1011,1.1,2.2)", + "+I(1,1,11-c1-v2,11-c2-v2,100.2022,2.2,2.2)" +})), +// double single filter +new TestSpec( +withFailure, +Arrays.asList(new String[] {"(double_col = ?)"}), +new Serializable[] { +1.1, +}, +new Object[] {1, StringData.fromString("1")}, +Arrays.asList(new String[] {"+I(1,1,11-c1-v1,11-c2-v1,100.1011,1.1,2.2)"})), +// and +new TestSpec( +withFailure, +Arrays.asList(new String[] {"(real_col = ?) AND (double_col = ?)"}), +new Serializable[] {2.2, 1.1}, +new Object[] {1, StringData.fromString("1")}, +Arrays.asList(new String[] {"+I(1,1,11-c1-v1,11-c2-v1,100.1011,1.1,2.2)"})), Review Comment: ```suggestion
Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]
snuyanzin commented on code in PR #79: URL: https://github.com/apache/flink-connector-jdbc/pull/79#discussion_r1463098549 ## flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcRowDataLookupFunctionTest.java: ## @@ -82,6 +103,121 @@ void testLookup(boolean withFailure) throws Exception { assertThat(result).isEqualTo(expected); } +@ParameterizedTest +@MethodSource("lookupWithPredicatesProvider") +public void testEval(TestSpec testSpec) throws Exception { +JdbcRowDataLookupFunction lookupFunction = +buildRowDataLookupFunctionWithPredicates( +testSpec.withFailure, testSpec.resolvedPredicates, testSpec.pushdownParams); + +ListOutputCollector collector = new ListOutputCollector(); +lookupFunction.setCollector(collector); +lookupFunction.open(null); +lookupFunction.eval(testSpec.keys); + +if (testSpec.withFailure) { +// Close connection here, and this will be recovered by retry +if (lookupFunction.getDbConnection() != null) { +lookupFunction.getDbConnection().close(); +} +} + +List result = +new ArrayList<>(collector.getOutputs()) + .stream().map(RowData::toString).sorted().collect(Collectors.toList()); +Collections.sort(testSpec.expected); +assertThat(result).isEqualTo(testSpec.expected); +} + +private static class TestSpec { + +private boolean withFailure; +private final List resolvedPredicates; +private final Serializable[] pushdownParams; +private final Object[] keys; +private List expected; + +private TestSpec( +boolean withFailure, +List resolvedPredicates, +Serializable[] pushdownParams, +Object[] keys, +List expected) { +this.withFailure = withFailure; +this.resolvedPredicates = resolvedPredicates; +this.pushdownParams = pushdownParams; +this.keys = keys; +this.expected = expected; +} +} + +static Collection lookupWithPredicatesProvider() { +return ImmutableList.builder() +.addAll(getTestSpecs(true)) +.addAll(getTestSpecs(false)) +.build(); +} + +@NotNull +private static ImmutableList getTestSpecs(boolean withFailure) { +return ImmutableList.of( +// var char single filter +new TestSpec( +withFailure, +Arrays.asList(new String[] {"(comment1 = ?)"}), +new Serializable[] {"11-c1-v1"}, +new Object[] {1, StringData.fromString("1")}, +Arrays.asList(new String[] {"+I(1,1,11-c1-v1,11-c2-v1,100.1011,1.1,2.2)"})), +// decimal single filter +new TestSpec( +withFailure, +Arrays.asList(new String[] {"(decimal_col = ?)"}), +new Serializable[] {BigDecimal.valueOf(100.1011)}, +new Object[] {1, StringData.fromString("1")}, +Arrays.asList(new String[] {"+I(1,1,11-c1-v1,11-c2-v1,100.1011,1.1,2.2)"})), +// real single filter +new TestSpec( +withFailure, +Arrays.asList(new String[] {"(real_col = ?)"}), +new Serializable[] {2.2}, +new Object[] {1, StringData.fromString("1")}, +Arrays.asList( +new String[] { + "+I(1,1,11-c1-v1,11-c2-v1,100.1011,1.1,2.2)", + "+I(1,1,11-c1-v2,11-c2-v2,100.2022,2.2,2.2)" +})), +// double single filter +new TestSpec( +withFailure, +Arrays.asList(new String[] {"(double_col = ?)"}), +new Serializable[] { +1.1, +}, +new Object[] {1, StringData.fromString("1")}, +Arrays.asList(new String[] {"+I(1,1,11-c1-v1,11-c2-v1,100.1011,1.1,2.2)"})), +// and +new TestSpec( +withFailure, +Arrays.asList(new String[] {"(real_col = ?) AND (double_col = ?)"}), Review Comment: ```suggestion Collections.singletonList("(real_col = ?) AND (double_col = ?)"), ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail:
Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]
snuyanzin commented on code in PR #79: URL: https://github.com/apache/flink-connector-jdbc/pull/79#discussion_r1463097912 ## flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcRowDataLookupFunctionTest.java: ## @@ -82,6 +103,121 @@ void testLookup(boolean withFailure) throws Exception { assertThat(result).isEqualTo(expected); } +@ParameterizedTest +@MethodSource("lookupWithPredicatesProvider") +public void testEval(TestSpec testSpec) throws Exception { +JdbcRowDataLookupFunction lookupFunction = +buildRowDataLookupFunctionWithPredicates( +testSpec.withFailure, testSpec.resolvedPredicates, testSpec.pushdownParams); + +ListOutputCollector collector = new ListOutputCollector(); +lookupFunction.setCollector(collector); +lookupFunction.open(null); +lookupFunction.eval(testSpec.keys); + +if (testSpec.withFailure) { +// Close connection here, and this will be recovered by retry +if (lookupFunction.getDbConnection() != null) { +lookupFunction.getDbConnection().close(); +} +} + +List result = +new ArrayList<>(collector.getOutputs()) + .stream().map(RowData::toString).sorted().collect(Collectors.toList()); +Collections.sort(testSpec.expected); +assertThat(result).isEqualTo(testSpec.expected); +} + +private static class TestSpec { + +private boolean withFailure; +private final List resolvedPredicates; +private final Serializable[] pushdownParams; +private final Object[] keys; +private List expected; + +private TestSpec( +boolean withFailure, +List resolvedPredicates, +Serializable[] pushdownParams, +Object[] keys, +List expected) { +this.withFailure = withFailure; +this.resolvedPredicates = resolvedPredicates; +this.pushdownParams = pushdownParams; +this.keys = keys; +this.expected = expected; +} +} + +static Collection lookupWithPredicatesProvider() { +return ImmutableList.builder() +.addAll(getTestSpecs(true)) +.addAll(getTestSpecs(false)) +.build(); +} + +@NotNull +private static ImmutableList getTestSpecs(boolean withFailure) { +return ImmutableList.of( +// var char single filter +new TestSpec( +withFailure, +Arrays.asList(new String[] {"(comment1 = ?)"}), +new Serializable[] {"11-c1-v1"}, +new Object[] {1, StringData.fromString("1")}, +Arrays.asList(new String[] {"+I(1,1,11-c1-v1,11-c2-v1,100.1011,1.1,2.2)"})), +// decimal single filter +new TestSpec( +withFailure, +Arrays.asList(new String[] {"(decimal_col = ?)"}), +new Serializable[] {BigDecimal.valueOf(100.1011)}, +new Object[] {1, StringData.fromString("1")}, +Arrays.asList(new String[] {"+I(1,1,11-c1-v1,11-c2-v1,100.1011,1.1,2.2)"})), +// real single filter +new TestSpec( +withFailure, +Arrays.asList(new String[] {"(real_col = ?)"}), +new Serializable[] {2.2}, +new Object[] {1, StringData.fromString("1")}, +Arrays.asList( +new String[] { + "+I(1,1,11-c1-v1,11-c2-v1,100.1011,1.1,2.2)", + "+I(1,1,11-c1-v2,11-c2-v2,100.2022,2.2,2.2)" +})), +// double single filter +new TestSpec( +withFailure, +Arrays.asList(new String[] {"(double_col = ?)"}), Review Comment: ```suggestion Collections.singletonList("(double_col = ?)"), ``` ## flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcRowDataLookupFunctionTest.java: ## @@ -82,6 +103,121 @@ void testLookup(boolean withFailure) throws Exception { assertThat(result).isEqualTo(expected); } +@ParameterizedTest +@MethodSource("lookupWithPredicatesProvider") +public void testEval(TestSpec testSpec) throws Exception { +JdbcRowDataLookupFunction lookupFunction = +buildRowDataLookupFunctionWithPredicates( +testSpec.withFailure, testSpec.resolvedPredicates, testSpec.pushdownParams); + +ListOutputCollector collector = new ListOutputCollector();
Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]
snuyanzin commented on code in PR #79: URL: https://github.com/apache/flink-connector-jdbc/pull/79#discussion_r1463097569 ## flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcRowDataLookupFunctionTest.java: ## @@ -82,6 +103,121 @@ void testLookup(boolean withFailure) throws Exception { assertThat(result).isEqualTo(expected); } +@ParameterizedTest +@MethodSource("lookupWithPredicatesProvider") +public void testEval(TestSpec testSpec) throws Exception { +JdbcRowDataLookupFunction lookupFunction = +buildRowDataLookupFunctionWithPredicates( +testSpec.withFailure, testSpec.resolvedPredicates, testSpec.pushdownParams); + +ListOutputCollector collector = new ListOutputCollector(); +lookupFunction.setCollector(collector); +lookupFunction.open(null); +lookupFunction.eval(testSpec.keys); + +if (testSpec.withFailure) { +// Close connection here, and this will be recovered by retry +if (lookupFunction.getDbConnection() != null) { +lookupFunction.getDbConnection().close(); +} +} + +List result = +new ArrayList<>(collector.getOutputs()) + .stream().map(RowData::toString).sorted().collect(Collectors.toList()); +Collections.sort(testSpec.expected); +assertThat(result).isEqualTo(testSpec.expected); +} + +private static class TestSpec { + +private boolean withFailure; +private final List resolvedPredicates; +private final Serializable[] pushdownParams; +private final Object[] keys; +private List expected; + +private TestSpec( +boolean withFailure, +List resolvedPredicates, +Serializable[] pushdownParams, +Object[] keys, +List expected) { +this.withFailure = withFailure; +this.resolvedPredicates = resolvedPredicates; +this.pushdownParams = pushdownParams; +this.keys = keys; +this.expected = expected; +} +} + +static Collection lookupWithPredicatesProvider() { +return ImmutableList.builder() +.addAll(getTestSpecs(true)) +.addAll(getTestSpecs(false)) +.build(); +} + +@NotNull +private static ImmutableList getTestSpecs(boolean withFailure) { +return ImmutableList.of( +// var char single filter +new TestSpec( +withFailure, +Arrays.asList(new String[] {"(comment1 = ?)"}), +new Serializable[] {"11-c1-v1"}, +new Object[] {1, StringData.fromString("1")}, +Arrays.asList(new String[] {"+I(1,1,11-c1-v1,11-c2-v1,100.1011,1.1,2.2)"})), +// decimal single filter +new TestSpec( +withFailure, +Arrays.asList(new String[] {"(decimal_col = ?)"}), +new Serializable[] {BigDecimal.valueOf(100.1011)}, +new Object[] {1, StringData.fromString("1")}, +Arrays.asList(new String[] {"+I(1,1,11-c1-v1,11-c2-v1,100.1011,1.1,2.2)"})), +// real single filter +new TestSpec( +withFailure, +Arrays.asList(new String[] {"(real_col = ?)"}), +new Serializable[] {2.2}, +new Object[] {1, StringData.fromString("1")}, +Arrays.asList( +new String[] { + "+I(1,1,11-c1-v1,11-c2-v1,100.1011,1.1,2.2)", + "+I(1,1,11-c1-v2,11-c2-v2,100.2022,2.2,2.2)" +})), Review Comment: ```suggestion Arrays.asList( "+I(1,1,11-c1-v1,11-c2-v1,100.1011,1.1,2.2)", "+I(1,1,11-c1-v2,11-c2-v2,100.2022,2.2,2.2)")), ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]
snuyanzin commented on code in PR #79: URL: https://github.com/apache/flink-connector-jdbc/pull/79#discussion_r1463096545 ## flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcRowDataLookupFunctionTest.java: ## @@ -82,6 +103,121 @@ void testLookup(boolean withFailure) throws Exception { assertThat(result).isEqualTo(expected); } +@ParameterizedTest +@MethodSource("lookupWithPredicatesProvider") +public void testEval(TestSpec testSpec) throws Exception { +JdbcRowDataLookupFunction lookupFunction = +buildRowDataLookupFunctionWithPredicates( +testSpec.withFailure, testSpec.resolvedPredicates, testSpec.pushdownParams); + +ListOutputCollector collector = new ListOutputCollector(); +lookupFunction.setCollector(collector); +lookupFunction.open(null); +lookupFunction.eval(testSpec.keys); + +if (testSpec.withFailure) { +// Close connection here, and this will be recovered by retry +if (lookupFunction.getDbConnection() != null) { +lookupFunction.getDbConnection().close(); +} +} + +List result = +new ArrayList<>(collector.getOutputs()) + .stream().map(RowData::toString).sorted().collect(Collectors.toList()); +Collections.sort(testSpec.expected); +assertThat(result).isEqualTo(testSpec.expected); +} + +private static class TestSpec { + +private boolean withFailure; +private final List resolvedPredicates; +private final Serializable[] pushdownParams; +private final Object[] keys; +private List expected; + +private TestSpec( +boolean withFailure, +List resolvedPredicates, +Serializable[] pushdownParams, +Object[] keys, +List expected) { +this.withFailure = withFailure; +this.resolvedPredicates = resolvedPredicates; +this.pushdownParams = pushdownParams; +this.keys = keys; +this.expected = expected; +} +} + +static Collection lookupWithPredicatesProvider() { +return ImmutableList.builder() +.addAll(getTestSpecs(true)) +.addAll(getTestSpecs(false)) +.build(); +} + +@NotNull +private static ImmutableList getTestSpecs(boolean withFailure) { +return ImmutableList.of( +// var char single filter +new TestSpec( +withFailure, +Arrays.asList(new String[] {"(comment1 = ?)"}), +new Serializable[] {"11-c1-v1"}, +new Object[] {1, StringData.fromString("1")}, +Arrays.asList(new String[] {"+I(1,1,11-c1-v1,11-c2-v1,100.1011,1.1,2.2)"})), +// decimal single filter +new TestSpec( +withFailure, +Arrays.asList(new String[] {"(decimal_col = ?)"}), +new Serializable[] {BigDecimal.valueOf(100.1011)}, +new Object[] {1, StringData.fromString("1")}, +Arrays.asList(new String[] {"+I(1,1,11-c1-v1,11-c2-v1,100.1011,1.1,2.2)"})), +// real single filter +new TestSpec( +withFailure, +Arrays.asList(new String[] {"(real_col = ?)"}), Review Comment: ```suggestion Collections.singletonList("(real_col = ?)"), ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]
snuyanzin commented on code in PR #79: URL: https://github.com/apache/flink-connector-jdbc/pull/79#discussion_r1463096287 ## flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcRowDataLookupFunctionTest.java: ## @@ -82,6 +103,121 @@ void testLookup(boolean withFailure) throws Exception { assertThat(result).isEqualTo(expected); } +@ParameterizedTest +@MethodSource("lookupWithPredicatesProvider") +public void testEval(TestSpec testSpec) throws Exception { +JdbcRowDataLookupFunction lookupFunction = +buildRowDataLookupFunctionWithPredicates( +testSpec.withFailure, testSpec.resolvedPredicates, testSpec.pushdownParams); + +ListOutputCollector collector = new ListOutputCollector(); +lookupFunction.setCollector(collector); +lookupFunction.open(null); +lookupFunction.eval(testSpec.keys); + +if (testSpec.withFailure) { +// Close connection here, and this will be recovered by retry +if (lookupFunction.getDbConnection() != null) { +lookupFunction.getDbConnection().close(); +} +} + +List result = +new ArrayList<>(collector.getOutputs()) + .stream().map(RowData::toString).sorted().collect(Collectors.toList()); +Collections.sort(testSpec.expected); +assertThat(result).isEqualTo(testSpec.expected); +} + +private static class TestSpec { + +private boolean withFailure; +private final List resolvedPredicates; +private final Serializable[] pushdownParams; +private final Object[] keys; +private List expected; + +private TestSpec( +boolean withFailure, +List resolvedPredicates, +Serializable[] pushdownParams, +Object[] keys, +List expected) { +this.withFailure = withFailure; +this.resolvedPredicates = resolvedPredicates; +this.pushdownParams = pushdownParams; +this.keys = keys; +this.expected = expected; +} +} + +static Collection lookupWithPredicatesProvider() { +return ImmutableList.builder() +.addAll(getTestSpecs(true)) +.addAll(getTestSpecs(false)) +.build(); +} + +@NotNull +private static ImmutableList getTestSpecs(boolean withFailure) { +return ImmutableList.of( +// var char single filter +new TestSpec( +withFailure, +Arrays.asList(new String[] {"(comment1 = ?)"}), +new Serializable[] {"11-c1-v1"}, +new Object[] {1, StringData.fromString("1")}, +Arrays.asList(new String[] {"+I(1,1,11-c1-v1,11-c2-v1,100.1011,1.1,2.2)"})), +// decimal single filter +new TestSpec( +withFailure, +Arrays.asList(new String[] {"(decimal_col = ?)"}), +new Serializable[] {BigDecimal.valueOf(100.1011)}, +new Object[] {1, StringData.fromString("1")}, +Arrays.asList(new String[] {"+I(1,1,11-c1-v1,11-c2-v1,100.1011,1.1,2.2)"})), Review Comment: ```suggestion Collections.singletonList("+I(1,1,11-c1-v1,11-c2-v1,100.1011,1.1,2.2)")), ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]
snuyanzin commented on code in PR #79: URL: https://github.com/apache/flink-connector-jdbc/pull/79#discussion_r1463096013 ## flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcRowDataLookupFunctionTest.java: ## @@ -82,6 +103,121 @@ void testLookup(boolean withFailure) throws Exception { assertThat(result).isEqualTo(expected); } +@ParameterizedTest +@MethodSource("lookupWithPredicatesProvider") +public void testEval(TestSpec testSpec) throws Exception { +JdbcRowDataLookupFunction lookupFunction = +buildRowDataLookupFunctionWithPredicates( +testSpec.withFailure, testSpec.resolvedPredicates, testSpec.pushdownParams); + +ListOutputCollector collector = new ListOutputCollector(); +lookupFunction.setCollector(collector); +lookupFunction.open(null); +lookupFunction.eval(testSpec.keys); + +if (testSpec.withFailure) { +// Close connection here, and this will be recovered by retry +if (lookupFunction.getDbConnection() != null) { +lookupFunction.getDbConnection().close(); +} +} + +List result = +new ArrayList<>(collector.getOutputs()) + .stream().map(RowData::toString).sorted().collect(Collectors.toList()); +Collections.sort(testSpec.expected); +assertThat(result).isEqualTo(testSpec.expected); +} + +private static class TestSpec { + +private boolean withFailure; +private final List resolvedPredicates; +private final Serializable[] pushdownParams; +private final Object[] keys; +private List expected; + +private TestSpec( +boolean withFailure, +List resolvedPredicates, +Serializable[] pushdownParams, +Object[] keys, +List expected) { +this.withFailure = withFailure; +this.resolvedPredicates = resolvedPredicates; +this.pushdownParams = pushdownParams; +this.keys = keys; +this.expected = expected; +} +} + +static Collection lookupWithPredicatesProvider() { +return ImmutableList.builder() +.addAll(getTestSpecs(true)) +.addAll(getTestSpecs(false)) +.build(); +} + +@NotNull +private static ImmutableList getTestSpecs(boolean withFailure) { +return ImmutableList.of( +// var char single filter +new TestSpec( +withFailure, +Arrays.asList(new String[] {"(comment1 = ?)"}), +new Serializable[] {"11-c1-v1"}, +new Object[] {1, StringData.fromString("1")}, +Arrays.asList(new String[] {"+I(1,1,11-c1-v1,11-c2-v1,100.1011,1.1,2.2)"})), +// decimal single filter +new TestSpec( +withFailure, +Arrays.asList(new String[] {"(decimal_col = ?)"}), Review Comment: ```suggestion Collections.singletonList("(decimal_col = ?)"), ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]
snuyanzin commented on code in PR #79: URL: https://github.com/apache/flink-connector-jdbc/pull/79#discussion_r1463095416 ## flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcRowDataLookupFunctionTest.java: ## @@ -82,6 +103,121 @@ void testLookup(boolean withFailure) throws Exception { assertThat(result).isEqualTo(expected); } +@ParameterizedTest +@MethodSource("lookupWithPredicatesProvider") +public void testEval(TestSpec testSpec) throws Exception { +JdbcRowDataLookupFunction lookupFunction = +buildRowDataLookupFunctionWithPredicates( +testSpec.withFailure, testSpec.resolvedPredicates, testSpec.pushdownParams); + +ListOutputCollector collector = new ListOutputCollector(); +lookupFunction.setCollector(collector); +lookupFunction.open(null); +lookupFunction.eval(testSpec.keys); + +if (testSpec.withFailure) { +// Close connection here, and this will be recovered by retry +if (lookupFunction.getDbConnection() != null) { +lookupFunction.getDbConnection().close(); +} +} + +List result = +new ArrayList<>(collector.getOutputs()) + .stream().map(RowData::toString).sorted().collect(Collectors.toList()); +Collections.sort(testSpec.expected); +assertThat(result).isEqualTo(testSpec.expected); +} + +private static class TestSpec { + +private boolean withFailure; +private final List resolvedPredicates; +private final Serializable[] pushdownParams; +private final Object[] keys; +private List expected; + +private TestSpec( +boolean withFailure, +List resolvedPredicates, +Serializable[] pushdownParams, +Object[] keys, +List expected) { +this.withFailure = withFailure; +this.resolvedPredicates = resolvedPredicates; +this.pushdownParams = pushdownParams; +this.keys = keys; +this.expected = expected; +} +} + +static Collection lookupWithPredicatesProvider() { +return ImmutableList.builder() +.addAll(getTestSpecs(true)) +.addAll(getTestSpecs(false)) +.build(); +} + +@NotNull +private static ImmutableList getTestSpecs(boolean withFailure) { +return ImmutableList.of( +// var char single filter +new TestSpec( +withFailure, +Arrays.asList(new String[] {"(comment1 = ?)"}), +new Serializable[] {"11-c1-v1"}, +new Object[] {1, StringData.fromString("1")}, +Arrays.asList(new String[] {"+I(1,1,11-c1-v1,11-c2-v1,100.1011,1.1,2.2)"})), Review Comment: ```suggestion Collections.singletonList("+I(1,1,11-c1-v1,11-c2-v1,100.1011,1.1,2.2)")), ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]
snuyanzin commented on code in PR #79: URL: https://github.com/apache/flink-connector-jdbc/pull/79#discussion_r1463094604 ## flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcRowDataLookupFunctionTest.java: ## @@ -82,6 +103,121 @@ void testLookup(boolean withFailure) throws Exception { assertThat(result).isEqualTo(expected); } +@ParameterizedTest +@MethodSource("lookupWithPredicatesProvider") +public void testEval(TestSpec testSpec) throws Exception { +JdbcRowDataLookupFunction lookupFunction = +buildRowDataLookupFunctionWithPredicates( +testSpec.withFailure, testSpec.resolvedPredicates, testSpec.pushdownParams); + +ListOutputCollector collector = new ListOutputCollector(); +lookupFunction.setCollector(collector); +lookupFunction.open(null); +lookupFunction.eval(testSpec.keys); + +if (testSpec.withFailure) { +// Close connection here, and this will be recovered by retry +if (lookupFunction.getDbConnection() != null) { +lookupFunction.getDbConnection().close(); +} +} + +List result = +new ArrayList<>(collector.getOutputs()) + .stream().map(RowData::toString).sorted().collect(Collectors.toList()); +Collections.sort(testSpec.expected); +assertThat(result).isEqualTo(testSpec.expected); +} + +private static class TestSpec { + +private boolean withFailure; +private final List resolvedPredicates; +private final Serializable[] pushdownParams; +private final Object[] keys; +private List expected; + +private TestSpec( +boolean withFailure, +List resolvedPredicates, +Serializable[] pushdownParams, +Object[] keys, +List expected) { +this.withFailure = withFailure; +this.resolvedPredicates = resolvedPredicates; +this.pushdownParams = pushdownParams; +this.keys = keys; +this.expected = expected; +} +} + +static Collection lookupWithPredicatesProvider() { +return ImmutableList.builder() +.addAll(getTestSpecs(true)) +.addAll(getTestSpecs(false)) +.build(); +} + +@NotNull +private static ImmutableList getTestSpecs(boolean withFailure) { +return ImmutableList.of( +// var char single filter +new TestSpec( +withFailure, +Arrays.asList(new String[] {"(comment1 = ?)"}), Review Comment: ```suggestion Collections.singletonList("(comment1 = ?)"), ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]
snuyanzin commented on code in PR #79: URL: https://github.com/apache/flink-connector-jdbc/pull/79#discussion_r1463084570 ## flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcRowDataLookupFunction.java: ## @@ -167,7 +185,13 @@ public Collection lookup(RowData keyRow) { private void establishConnectionAndStatement() throws SQLException, ClassNotFoundException { Connection dbConn = connectionProvider.getOrEstablishConnection(); -statement = FieldNamedPreparedStatement.prepareStatement(dbConn, query, keyNames); +String additionalPredicates = ""; +if (resolvedPredicates.size() > 0) { Review Comment: ```suggestion if (!resolvedPredicates.isEmpty()) { ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]
snuyanzin commented on code in PR #79: URL: https://github.com/apache/flink-connector-jdbc/pull/79#discussion_r1463082736 ## flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcRowDataLookupFunction.java: ## @@ -56,13 +57,16 @@ public class JdbcRowDataLookupFunction extends LookupFunction { private static final Logger LOG = LoggerFactory.getLogger(JdbcRowDataLookupFunction.class); private static final long serialVersionUID = 2L; -private final String query; +private String query; private final JdbcConnectionProvider connectionProvider; -private final String[] keyNames; +private String[] keyNames; private final int maxRetryTimes; private final JdbcRowConverter jdbcRowConverter; private final JdbcRowConverter lookupKeyRowConverter; +private List resolvedPredicates = new ArrayList<>(); +private Serializable[] pushdownParams = new Serializable[0]; Review Comment: We do not need this initialization here since it is not used anywhere and then is going to be overwritten in a constructor. After that both vars could be made final -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]
snuyanzin commented on code in PR #79: URL: https://github.com/apache/flink-connector-jdbc/pull/79#discussion_r1463077682 ## flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcRowDataLookupFunction.java: ## @@ -71,7 +75,9 @@ public JdbcRowDataLookupFunction( String[] fieldNames, DataType[] fieldTypes, String[] keyNames, -RowType rowType) { +RowType rowType, Review Comment: This looks strange: I wonder why do we need this if it is not used and at the same time we still `fieldNames` and `fieldTypes` containing same info -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]
snuyanzin commented on code in PR #79: URL: https://github.com/apache/flink-connector-jdbc/pull/79#discussion_r1463067422 ## flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcTablePlanTest.java: ## @@ -70,6 +98,51 @@ public void testFilterPushdown() { "SELECT id, time_col, real_col FROM jdbc WHERE id = 91 AND time_col <> TIME '11:11:11' OR double_col >= -1000.23"); } +/** + * Note the join condition is not present in the optimized plan, as it is handled in the JDBC + * java code, where it adds the join conditions to the select statement string. + */ +@Test +public void testLookupJoin() { +util.verifyExecPlan( +"SELECT * FROM a LEFT JOIN d FOR SYSTEM_TIME AS OF a.proctime ON a.ip = d.ip"); +} + +@Test +public void testLookupJoinWithFilter() { +util.verifyExecPlan( +"SELECT * FROM a LEFT JOIN d FOR SYSTEM_TIME AS OF a.proctime ON d.type = 0 AND a.ip = d.ip"); +} + +@Test +public void testLookupJoinWithANDAndORFilter() { +util.verifyExecPlan( +"SELECT * FROM a LEFT JOIN d FOR SYSTEM_TIME AS OF a.proctime ON ((d.age = 50 AND d.type = 0) " ++ "OR (d.type = 1 AND d.age = 40)) AND a.ip = d.ip"); +} + +@Test +public void testLookupJoinWith2ANDsAndORFilter() { Review Comment: ```suggestion void testLookupJoinWith2ANDsAndORFilter() { ``` ## flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcTablePlanTest.java: ## @@ -70,6 +98,51 @@ public void testFilterPushdown() { "SELECT id, time_col, real_col FROM jdbc WHERE id = 91 AND time_col <> TIME '11:11:11' OR double_col >= -1000.23"); } +/** + * Note the join condition is not present in the optimized plan, as it is handled in the JDBC + * java code, where it adds the join conditions to the select statement string. + */ +@Test +public void testLookupJoin() { +util.verifyExecPlan( +"SELECT * FROM a LEFT JOIN d FOR SYSTEM_TIME AS OF a.proctime ON a.ip = d.ip"); +} + +@Test +public void testLookupJoinWithFilter() { +util.verifyExecPlan( +"SELECT * FROM a LEFT JOIN d FOR SYSTEM_TIME AS OF a.proctime ON d.type = 0 AND a.ip = d.ip"); +} + +@Test +public void testLookupJoinWithANDAndORFilter() { +util.verifyExecPlan( +"SELECT * FROM a LEFT JOIN d FOR SYSTEM_TIME AS OF a.proctime ON ((d.age = 50 AND d.type = 0) " ++ "OR (d.type = 1 AND d.age = 40)) AND a.ip = d.ip"); +} + +@Test +public void testLookupJoinWith2ANDsAndORFilter() { +util.verifyExecPlan( +"SELECT * FROM a JOIN d FOR SYSTEM_TIME AS OF a.proctime " ++ "ON ((50 > d.age AND d.type = 1 AND d.age > 0 ) " ++ "OR (70 > d.age AND d.type = 6 AND d.age > 10)) AND a.ip = d.ip"); +} + +@Test +public void testLookupJoinWithORFilter() { Review Comment: ```suggestion void testLookupJoinWithORFilter() { ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]
snuyanzin commented on code in PR #79: URL: https://github.com/apache/flink-connector-jdbc/pull/79#discussion_r1463067119 ## flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcTablePlanTest.java: ## @@ -70,6 +98,51 @@ public void testFilterPushdown() { "SELECT id, time_col, real_col FROM jdbc WHERE id = 91 AND time_col <> TIME '11:11:11' OR double_col >= -1000.23"); } +/** + * Note the join condition is not present in the optimized plan, as it is handled in the JDBC + * java code, where it adds the join conditions to the select statement string. + */ +@Test +public void testLookupJoin() { +util.verifyExecPlan( +"SELECT * FROM a LEFT JOIN d FOR SYSTEM_TIME AS OF a.proctime ON a.ip = d.ip"); +} + +@Test +public void testLookupJoinWithFilter() { Review Comment: ```suggestion void testLookupJoinWithFilter() { ``` ## flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcTablePlanTest.java: ## @@ -70,6 +98,51 @@ public void testFilterPushdown() { "SELECT id, time_col, real_col FROM jdbc WHERE id = 91 AND time_col <> TIME '11:11:11' OR double_col >= -1000.23"); } +/** + * Note the join condition is not present in the optimized plan, as it is handled in the JDBC + * java code, where it adds the join conditions to the select statement string. + */ +@Test +public void testLookupJoin() { +util.verifyExecPlan( +"SELECT * FROM a LEFT JOIN d FOR SYSTEM_TIME AS OF a.proctime ON a.ip = d.ip"); +} + +@Test +public void testLookupJoinWithFilter() { +util.verifyExecPlan( +"SELECT * FROM a LEFT JOIN d FOR SYSTEM_TIME AS OF a.proctime ON d.type = 0 AND a.ip = d.ip"); +} + +@Test +public void testLookupJoinWithANDAndORFilter() { Review Comment: ```suggestion void testLookupJoinWithANDAndORFilter() { ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]
snuyanzin commented on code in PR #79: URL: https://github.com/apache/flink-connector-jdbc/pull/79#discussion_r1463067787 ## flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcTablePlanTest.java: ## @@ -70,6 +98,51 @@ public void testFilterPushdown() { "SELECT id, time_col, real_col FROM jdbc WHERE id = 91 AND time_col <> TIME '11:11:11' OR double_col >= -1000.23"); } +/** + * Note the join condition is not present in the optimized plan, as it is handled in the JDBC + * java code, where it adds the join conditions to the select statement string. + */ +@Test +public void testLookupJoin() { +util.verifyExecPlan( +"SELECT * FROM a LEFT JOIN d FOR SYSTEM_TIME AS OF a.proctime ON a.ip = d.ip"); +} + +@Test +public void testLookupJoinWithFilter() { +util.verifyExecPlan( +"SELECT * FROM a LEFT JOIN d FOR SYSTEM_TIME AS OF a.proctime ON d.type = 0 AND a.ip = d.ip"); +} + +@Test +public void testLookupJoinWithANDAndORFilter() { +util.verifyExecPlan( +"SELECT * FROM a LEFT JOIN d FOR SYSTEM_TIME AS OF a.proctime ON ((d.age = 50 AND d.type = 0) " ++ "OR (d.type = 1 AND d.age = 40)) AND a.ip = d.ip"); +} + +@Test +public void testLookupJoinWith2ANDsAndORFilter() { +util.verifyExecPlan( +"SELECT * FROM a JOIN d FOR SYSTEM_TIME AS OF a.proctime " ++ "ON ((50 > d.age AND d.type = 1 AND d.age > 0 ) " ++ "OR (70 > d.age AND d.type = 6 AND d.age > 10)) AND a.ip = d.ip"); +} + +@Test +public void testLookupJoinWithORFilter() { +util.verifyExecPlan( +"SELECT * FROM a LEFT JOIN d FOR SYSTEM_TIME AS OF a.proctime ON (d.age = 50 OR d.type = 1) AND a.ip = d.ip"); +} + +@Test +public void testLookupJoinWithWeirdColumnNames() { Review Comment: ```suggestion void testLookupJoinWithWeirdColumnNames() { ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]
snuyanzin commented on code in PR #79: URL: https://github.com/apache/flink-connector-jdbc/pull/79#discussion_r1463066443 ## flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcTablePlanTest.java: ## @@ -70,6 +98,51 @@ public void testFilterPushdown() { "SELECT id, time_col, real_col FROM jdbc WHERE id = 91 AND time_col <> TIME '11:11:11' OR double_col >= -1000.23"); } +/** + * Note the join condition is not present in the optimized plan, as it is handled in the JDBC + * java code, where it adds the join conditions to the select statement string. + */ +@Test +public void testLookupJoin() { Review Comment: ```suggestion void testLookupJoin() { ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]
snuyanzin commented on code in PR #79: URL: https://github.com/apache/flink-connector-jdbc/pull/79#discussion_r1463064026 ## flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcRowDataLookupFunctionTest.java: ## @@ -82,6 +103,121 @@ void testLookup(boolean withFailure) throws Exception { assertThat(result).isEqualTo(expected); } +@ParameterizedTest +@MethodSource("lookupWithPredicatesProvider") +public void testEval(TestSpec testSpec) throws Exception { Review Comment: ```suggestion void testEval(TestSpec testSpec) throws Exception { ``` no need for `public` since it is junit5 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]
snuyanzin commented on code in PR #79: URL: https://github.com/apache/flink-connector-jdbc/pull/79#discussion_r1463057026 ## flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcRowDataLookupFunction.java: ## @@ -56,13 +57,16 @@ public class JdbcRowDataLookupFunction extends LookupFunction { private static final Logger LOG = LoggerFactory.getLogger(JdbcRowDataLookupFunction.class); private static final long serialVersionUID = 2L; -private final String query; +private String query; private final JdbcConnectionProvider connectionProvider; -private final String[] keyNames; +private String[] keyNames; Review Comment: nit ```suggestion private final String[] keyNames; ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]
snuyanzin commented on code in PR #79: URL: https://github.com/apache/flink-connector-jdbc/pull/79#discussion_r1463056711 ## flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcRowDataLookupFunction.java: ## @@ -56,13 +57,16 @@ public class JdbcRowDataLookupFunction extends LookupFunction { private static final Logger LOG = LoggerFactory.getLogger(JdbcRowDataLookupFunction.class); private static final long serialVersionUID = 2L; -private final String query; +private String query; Review Comment: nit: ```suggestion private final String query; ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]
davidradl commented on PR #79: URL: https://github.com/apache/flink-connector-jdbc/pull/79#issuecomment-1905665607 > @davidradl > > > * I don't think we can have 0 key names for a lookup join. Have you a test case that would drive this ? > > There is no such restriction in semantic (maybe somewhere has a checker logic to disallow this, maybe not), anyway, it doesn't affect here, semantically we can handle it when there is no keys. > > > * the ? test. We can leave this in , but it allows column names with ?s in - which seems like a good thing. We could do this in a separate issue if you want. > > Yes, this is an orthogonal issue to current one. @libenchao ok lets merge :-) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]
libenchao commented on PR #79: URL: https://github.com/apache/flink-connector-jdbc/pull/79#issuecomment-1905659276 @davidradl > * I don't think we can have 0 key names for a lookup join. Have you a test case that would drive this ? There is no such restriction in semantic (maybe somewhere has a checker logic to disallow this, maybe not), anyway, it doesn't affect here, semantically we can handle it when there is no keys. > * the ? test. We can leave this in , but it allows column names with ?s in - which seems like a good thing. We could do this in a separate issue if you want. Yes, this is an orthogonal issue to current one. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]
davidradl commented on PR #79: URL: https://github.com/apache/flink-connector-jdbc/pull/79#issuecomment-1905644311 > > @libenchao @snuyanzin I have updated the code in line with your feedback - please could you review again. Many thanks > > @davidradl One small suggestion, please do not squash and force-push before you must do it (conflicts with main branch, or the reviewer has done the review), else the reviewer is hard to track incremental changes. @libenchao makes sense. Thanks for the suggestion. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]
davidradl commented on PR #79: URL: https://github.com/apache/flink-connector-jdbc/pull/79#issuecomment-1905641356 > @davidradl @snuyanzin I reviewed, and added a few cosmetic changes in [commit](https://github.com/libenchao/flink-connector-jdbc/commit/24caa99cd97e86afb1836886d51007775ed95ddc), others looks good to me, please take a look. If you agree on this version, I'll proceed with merging. > > https://github.com/libenchao/flink-connector-jdbc/commits/33365-lookup-join-predicate-pushdown/ @libenchao than you very much for your help on this. I am happy with the cosmetic fixups . I had 2 comments I added to the code. - I don't think we can have 0 key names for a lookup join. Have you a test case that would drive this ? - the ? test. We can leave this in , but it allows column names with ?s in - which seems like a good thing. We could do this in a separate issue if you want. If we can get to a consensus on these, I am happy for it to merged. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]
libenchao commented on PR #79: URL: https://github.com/apache/flink-connector-jdbc/pull/79#issuecomment-1905255051 @davidradl @snuyanzin I reviewed, and added a few cosmetic changes in [commit](https://github.com/libenchao/flink-connector-jdbc/commit/24caa99cd97e86afb1836886d51007775ed95ddc), others looks good to me, please take a look. If you agree on this version, I'll proceed with merging. https://github.com/libenchao/flink-connector-jdbc/commits/33365-lookup-join-predicate-pushdown/ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]
libenchao commented on PR #79: URL: https://github.com/apache/flink-connector-jdbc/pull/79#issuecomment-1905174734 > @libenchao @snuyanzin I have updated the code in line with your feedback - please could you review again. Many thanks @davidradl One small suggestion, please do not squash and force-push before you must do it (conflicts with main branch, or the reviewer has done the review), else the reviewer is hard to track incremental changes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]
davidradl commented on PR #79: URL: https://github.com/apache/flink-connector-jdbc/pull/79#issuecomment-1904779201 @libenchao @snuyanzin I have updated the code in line with your feedback - please could you review again. Many thanks -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]
davidradl commented on code in PR #79: URL: https://github.com/apache/flink-connector-jdbc/pull/79#discussion_r1462396444 ## flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcRowDataLookupFunction.java: ## @@ -63,6 +64,9 @@ public class JdbcRowDataLookupFunction extends LookupFunction { private final JdbcRowConverter jdbcRowConverter; private final JdbcRowConverter lookupKeyRowConverter; +private List resolvedPredicates = new ArrayList<>(); +private Serializable[] pushdownParams = new Serializable[0]; Review Comment: I have change the code since this comment, now so these variables are assigned so cannot be made final. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]
davidradl commented on code in PR #79: URL: https://github.com/apache/flink-connector-jdbc/pull/79#discussion_r1462392709 ## flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcRowDataLookupFunction.java: ## @@ -116,6 +124,15 @@ public void open(FunctionContext context) throws Exception { } } +private FieldNamedPreparedStatement setPredicateParams(FieldNamedPreparedStatement statement) +throws SQLException { +for (int i = 0; i < pushdownParams.length; ++i) { Review Comment: I have added more tests to the eval method which does the scaffolding already. I hope this is sufficient. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]
davidradl opened a new pull request, #79: URL: https://github.com/apache/flink-connector-jdbc/pull/79 Allow lookup joins to honour supplied filters, where the lookup source is JDBC. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]
davidradl closed pull request #79: [FLINK-33365] include filters with Lookup joins URL: https://github.com/apache/flink-connector-jdbc/pull/79 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]
libenchao commented on code in PR #79: URL: https://github.com/apache/flink-connector-jdbc/pull/79#discussion_r1461286671 ## flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcTablePlanTest.java: ## @@ -70,6 +98,51 @@ public void testFilterPushdown() { "SELECT id, time_col, real_col FROM jdbc WHERE id = 91 AND time_col <> TIME '11:11:11' OR double_col >= -1000.23"); } +/** + * Note the join condition is not present in the optimized plan, as it is handled in the JDBC Review Comment: > just for my understanding ar we thinking that the rowdata sent in on the lookup would contain the pushdown predicates so the code could call and this would handle the predicates and the keys: > statement = lookupKeyRowConverter.toExternal(keyRow, statement); > we could then remove the need for? > statement = setPredicateParams(statement); What I was proposing is showing the predicates in the digest of lookup join node. That way, we can see it in the test xml files, and also it can be shown in Flink Web UI. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]
davidradl commented on code in PR #79: URL: https://github.com/apache/flink-connector-jdbc/pull/79#discussion_r1460466435 ## flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcRowDataLookupFunction.java: ## @@ -116,6 +124,15 @@ public void open(FunctionContext context) throws Exception { } } +private FieldNamedPreparedStatement setPredicateParams(FieldNamedPreparedStatement statement) +throws SQLException { +for (int i = 0; i < pushdownParams.length; ++i) { Review Comment: @snuyanzin I am not finding a way to scaffold a unit test around this method and prove the cases work. It looks like I need to prepare the statement and then populate rowdata and then run through the converter before I can setup the statement required for this method. I will continue to investigate. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]
davidradl commented on code in PR #79: URL: https://github.com/apache/flink-connector-jdbc/pull/79#discussion_r1460463528 ## flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcTablePlanTest.java: ## @@ -70,6 +98,51 @@ public void testFilterPushdown() { "SELECT id, time_col, real_col FROM jdbc WHERE id = 91 AND time_col <> TIME '11:11:11' OR double_col >= -1000.23"); } +/** + * Note the join condition is not present in the optimized plan, as it is handled in the JDBC Review Comment: @libenchao just for my understanding ar we thinking that the rowdata sent in on the lookup would contain the pushdown predicates so the code could call and this would handle the predicates and the keys: ` statement = lookupKeyRowConverter.toExternal(keyRow, statement);` we could then remove the need for? `statement = setPredicateParams(statement); ` Would you be willing to assign me (and provide minimal guidance if required) FLINK-34170 and review / merge any associated PR? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]
davidradl commented on code in PR #79: URL: https://github.com/apache/flink-connector-jdbc/pull/79#discussion_r1459375955 ## flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcTablePlanTest.java: ## @@ -70,6 +98,51 @@ public void testFilterPushdown() { "SELECT id, time_col, real_col FROM jdbc WHERE id = 91 AND time_col <> TIME '11:11:11' OR double_col >= -1000.23"); } +/** + * Note the join condition is not present in the optimized plan, as it is handled in the JDBC Review Comment: > change to the core Flink to include the look up join conditions in the optimised plan I have raised issue [https://issues.apache.org/jira/browse/FLINK-34170](https://issues.apache.org/jira/browse/FLINK-34170) . -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]
davidradl commented on code in PR #79: URL: https://github.com/apache/flink-connector-jdbc/pull/79#discussion_r1459366515 ## flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcRowDataLookupFunction.java: ## @@ -116,6 +124,15 @@ public void open(FunctionContext context) throws Exception { } } +private FieldNamedPreparedStatement setPredicateParams(FieldNamedPreparedStatement statement) +throws SQLException { +for (int i = 0; i < pushdownParams.length; ++i) { Review Comment: @snuyanzin Thanks I am working on this currently. I can test the method but am not sure how to check it worked. My current test shows that the sqltext in the statement is not updated with the parameters. I am continuing to look into this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]
libenchao commented on code in PR #79: URL: https://github.com/apache/flink-connector-jdbc/pull/79#discussion_r1458773766 ## flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcTablePlanTest.java: ## @@ -70,6 +98,51 @@ public void testFilterPushdown() { "SELECT id, time_col, real_col FROM jdbc WHERE id = 91 AND time_col <> TIME '11:11:11' OR double_col >= -1000.23"); } +/** + * Note the join condition is not present in the optimized plan, as it is handled in the JDBC Review Comment: Yes, precisely. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]
libenchao commented on code in PR #79: URL: https://github.com/apache/flink-connector-jdbc/pull/79#discussion_r1458772878 ## flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/statement/FieldNamedPreparedStatementImpl.java: ## @@ -178,26 +178,42 @@ public void close() throws SQLException { // public static FieldNamedPreparedStatement prepareStatement( -Connection connection, String sql, String[] fieldNames) throws SQLException { +Connection connection, +String sql, +String[] fieldNames, +String additionalPredicates, +int numberOfDynamicParams) +throws SQLException { checkNotNull(connection, "connection must not be null."); checkNotNull(sql, "sql must not be null."); checkNotNull(fieldNames, "fieldNames must not be null."); -if (sql.contains("?")) { Review Comment: I guess this is also irrelevant with current issue, just as FLINK-34146. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]
davidradl commented on code in PR #79: URL: https://github.com/apache/flink-connector-jdbc/pull/79#discussion_r1458664773 ## flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcTablePlanTest.java: ## @@ -70,6 +98,51 @@ public void testFilterPushdown() { "SELECT id, time_col, real_col FROM jdbc WHERE id = 91 AND time_col <> TIME '11:11:11' OR double_col >= -1000.23"); } +/** + * Note the join condition is not present in the optimized plan, as it is handled in the JDBC Review Comment: @libenchao I am not sure what I am proposing . I assume this would be a change to the core Flink to include the look up join conditions in the optimised plan. Am I understanding this correctly ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]
davidradl commented on code in PR #79: URL: https://github.com/apache/flink-connector-jdbc/pull/79#discussion_r1458658676 ## flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/statement/FieldNamedPreparedStatementImpl.java: ## @@ -178,26 +178,42 @@ public void close() throws SQLException { // public static FieldNamedPreparedStatement prepareStatement( -Connection connection, String sql, String[] fieldNames) throws SQLException { +Connection connection, +String sql, +String[] fieldNames, +String additionalPredicates, +int numberOfDynamicParams) +throws SQLException { checkNotNull(connection, "connection must not be null."); checkNotNull(sql, "sql must not be null."); checkNotNull(fieldNames, "fieldNames must not be null."); -if (sql.contains("?")) { Review Comment: If there is a column name with a ? character in - then it will fail - removing this check tolerates column names like this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]
libenchao commented on code in PR #79: URL: https://github.com/apache/flink-connector-jdbc/pull/79#discussion_r1457562374 ## flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/statement/FieldNamedPreparedStatementImpl.java: ## @@ -178,26 +178,42 @@ public void close() throws SQLException { // public static FieldNamedPreparedStatement prepareStatement( -Connection connection, String sql, String[] fieldNames) throws SQLException { +Connection connection, +String sql, +String[] fieldNames, +String additionalPredicates, +int numberOfDynamicParams) +throws SQLException { checkNotNull(connection, "connection must not be null."); checkNotNull(sql, "sql must not be null."); checkNotNull(fieldNames, "fieldNames must not be null."); -if (sql.contains("?")) { Review Comment: Do we need to remove this check? ## flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcRowDataLookupFunction.java: ## @@ -63,6 +64,9 @@ public class JdbcRowDataLookupFunction extends LookupFunction { private final JdbcRowConverter jdbcRowConverter; private final JdbcRowConverter lookupKeyRowConverter; +private List resolvedPredicates = new ArrayList<>(); +private Serializable[] pushdownParams = new Serializable[0]; Review Comment: These two variable could be `final`. ## flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/statement/FieldNamedPreparedStatementImplTest.java: ## @@ -41,6 +41,10 @@ class FieldNamedPreparedStatementImplTest { private final String[] keyFields = new String[] {"id", "__field_3__"}; private final String tableName = "tbl"; +private final String[] fieldNames2 = +new String[] {"id:", "name", "email", "ts", "field1", "field_2", "__field_3__"}; +private final String[] keyFields2 = new String[] {"id?:", "__field_3__"}; + Review Comment: This is change is not necessary anymore? ## flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcTablePlanTest.java: ## @@ -70,6 +98,51 @@ public void testFilterPushdown() { "SELECT id, time_col, real_col FROM jdbc WHERE id = 91 AND time_col <> TIME '11:11:11' OR double_col >= -1000.23"); } +/** + * Note the join condition is not present in the optimized plan, as it is handled in the JDBC Review Comment: Can you log another Jira to improve this, scan source has this ability already. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]
snuyanzin commented on code in PR #79: URL: https://github.com/apache/flink-connector-jdbc/pull/79#discussion_r1458074979 ## flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcRowDataLookupFunction.java: ## @@ -116,6 +124,15 @@ public void open(FunctionContext context) throws Exception { } } +private FieldNamedPreparedStatement setPredicateParams(FieldNamedPreparedStatement statement) +throws SQLException { +for (int i = 0; i < pushdownParams.length; ++i) { Review Comment: It seems that currently so far there no test checking what happens if we enter inside this loop -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]
snuyanzin commented on PR #79: URL: https://github.com/apache/flink-connector-jdbc/pull/79#issuecomment-1899301322 it is reproducible locally with this command (also assuming that it will be run with jdk17 or jdk21) ``` mvn clean install -Dflink.version=1.19-SNAPSHOT ``` the reason is recent change in Flink main repo since it is not related to changes within this PR, I created a separate PR to fix it https://github.com/apache/flink-connector-jdbc/pull/93 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]
davidradl commented on PR #79: URL: https://github.com/apache/flink-connector-jdbc/pull/79#issuecomment-1898772077 @libenchao I see there are failing tests, these work for me locally. Do you know how I get access to more details around the test failures so I can debug? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]
davidradl commented on PR #79: URL: https://github.com/apache/flink-connector-jdbc/pull/79#issuecomment-1898550767 @snuyanzin @libenchao I have put up the changes with the requested unit tests on the filters. Please could you have a look? I can edit the commit message to add you are co-authors if you want - but I thought it would be better to get the code up first. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]
davidradl closed pull request #79: [FLINK-33365] include filters with Lookup joins URL: https://github.com/apache/flink-connector-jdbc/pull/79 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]
davidradl commented on PR #79: URL: https://github.com/apache/flink-connector-jdbc/pull/79#issuecomment-1898097633 > @davidradl Thanks for the work. I think the `testSelectStatementWithWeirdCharacters` you added is an orthogonal problem with current one, it fails without current changes, right? If yes, it can be another separate issue which does not block current one. > > Current issue is mainly about how to let JDBC lookup function handle the pushed predicates, previously it just ignored them. So one test case I would like to see is one ITCase test, which shows that the result is correct with predicates pushed down. @libenchao Yes the join key with weird colon characters failed before. We are now adding filters that use the same mechanism and will not work with colons. The index approach works with all characters. Based on this , I will put this code without the weird character unit test into this pr with the extra unit test that you suggest. I will raise a separate issue for the colon characters issue. @snuyanzin any concerns? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]
libenchao commented on PR #79: URL: https://github.com/apache/flink-connector-jdbc/pull/79#issuecomment-1897803207 @davidradl Thanks for the work. I think the `testSelectStatementWithWeirdCharacters` you added is an orthogonal problem with current one, it fails without current changes, right? If yes, it can be another separate issue which does not block current one. Current issue is mainly about how to let JDBC lookup function handle the pushed predicates, previously it just ignored them. So one test case I would like to see is one ITCase test, which shows that the result is correct with predicates pushed down. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]
davidradl commented on PR #79: URL: https://github.com/apache/flink-connector-jdbc/pull/79#issuecomment-1896292338 It looks like we have used the approach from [https://stackoverflow.com/questions/2309970/named-parameters-in-jdbc](https://stackoverflow.com/questions/2309970/named-parameters-in-jdbc ). It says `Please note that the above simple example does not handle using named parameter twice. Nor does it handle using the : sign inside quotes.` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]
davidradl commented on PR #79: URL: https://github.com/apache/flink-connector-jdbc/pull/79#issuecomment-1896205523 Hi @libenchao @snuyanzin , I have got the POC mostly working. I added fixups and some junits to [https://github.com/davidradl/flink-connector-jdbc/tree/pushdown-predidates-to-lookup-function](https://github.com/davidradl/flink-connector-jdbc/tree/pushdown-predidates-to-lookup-function ). This works for me locally, but it fails with column names or join keys containing colons. I do think this is a better approach than the indexes solution in the pr - thankyou @libenchao :-) I have included a failing junit showing the problem. https://github.com/davidradl/flink-connector-jdbc/blob/59f47bc8f659b9a1c4440dcf0fd9ae70ecaba9c3/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/statement/FieldNamedPreparedStatementImplTest.java#L152 The unit test has a join key name with a `:` at the end, the FieldNamedPreparedStatementImpl parseNamedStatement method finds the colon and nothing after it - so assume there is an invalid empty name. Is this acceptable as `:` are unusual in column names and is baked into the current design of FieldNamedPreparedStatementImpl ? If not, any thoughts on how to fix this? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]
davidradl commented on PR #79: URL: https://github.com/apache/flink-connector-jdbc/pull/79#issuecomment-1895544871 @libenchao cloned your repo. I like the idea behind this fix, avoiding String manipulation. I added in all the table plan junits from the pr 79 and they work. The junit testing the FieldNamedPreparedStatement with a `:` in the column name fails. I then ran local tests that work with pr 79. The lookup joins hang with the POC for me . I am using mySQL JDBC driver. I am continuing to investigate. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]
davidradl commented on PR #79: URL: https://github.com/apache/flink-connector-jdbc/pull/79#issuecomment-1895411935 > @davidradl @snuyanzin I did a POC of the idea proposed above, the branch is here: https://github.com/libenchao/flink-connector-jdbc/tree/pushdown-predidates-to-lookup-function . Feel free to review it, if you agree with this approach, feel free to do following work based on it (we can add others as co-author anyway). > > Sorry that I did not do testing, I couldn't find a ITCase test in current PR. thanks @libenchao I will have a look. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]
libenchao commented on PR #79: URL: https://github.com/apache/flink-connector-jdbc/pull/79#issuecomment-1894963665 @davidradl @snuyanzin I did a POC of the idea proposed above, the branch is here: https://github.com/libenchao/flink-connector-jdbc/tree/pushdown-predidates-to-lookup-function . Feel free to review it, if you agree with this approach, feel free to do following work based on it (we can add others as co-author anyway). Sorry that I did not do testing, I couldn't find a ITCase test in current PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]
davidradl commented on PR #79: URL: https://github.com/apache/flink-connector-jdbc/pull/79#issuecomment-1893725108 The existing pr fails with a filter with a colon in it. SELECT * FROM a left join mariadb_catalog.menagerie.e6 FOR SYSTEM_TIME AS OF a.proctime on a.ip = e6.ip and e6.`typ:e` = 1; @libenchao The FieldNamedPreparedStatementImpl logic looks for : to identify names and incorrectly finds key `e` (the character after the colon). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]
davidradl commented on PR #79: URL: https://github.com/apache/flink-connector-jdbc/pull/79#issuecomment-1893480056 @snuyanzin It fails here [https://github.com/apache/flink-connector-jdbc/blob/e3dd84160cd665ae17672da8b6e742e61a72a32d/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/statement/FieldNamedPreparedStatementImpl.java#L187](https://github.com/apache/flink-connector-jdbc/blob/e3dd84160cd665ae17672da8b6e742e61a72a32d/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/statement/FieldNamedPreparedStatementImpl.java#L187) For my test the sql variable is: `SELECT `ip`, `type?`, `age`, `height`, `weight` FROM `e4` WHERE `ip` = :ip AND (`type?` = 1)` The Derby junits with the weird column names junit works - I assume it does not drive this code. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]
snuyanzin commented on PR #79: URL: https://github.com/apache/flink-connector-jdbc/pull/79#issuecomment-1892607108 @davidradl thanks for checking and letting know would be great if you can create unit tests confirming your findings and we could integrate them in connector's test currently not sure about guiding however feel free to continue and share yout findings -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]
davidradl commented on PR #79: URL: https://github.com/apache/flink-connector-jdbc/pull/79#issuecomment-1892371486 @libenchao @snuyanzin Hi , I have done some more local testing. It seems that the fix as-is does not work with 1- lookup up join with a `?` in the join key 2- lookup up join with no filters but with a RDB column name containing a `?` 3- lookup up join with filters referencing a RDB column name containing a `?` fyi scan queries seem to work, I tested : - non-joining queries work with column names with `?`s in them. - non-temporal joins with no filters , but with with column names with `?`s in it. - non-temporal joins with a filter specifying a column name with a `?`s in it. Note before the fix, the following fails. 1- lookup up join with a `?` in the join key 2- lookup up join with no filters but with a RDB column name containing a `?` It looks like there are 2 issues for lookup joins, firstly that the filters are ignored, the second is that `?` character does not work in column names or lookup keys. I hope an approach based on `FieldNamedPreparedStatement ` will resolve both these issues. I am happy to be involved with the implementation. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]
davidradl commented on PR #79: URL: https://github.com/apache/flink-connector-jdbc/pull/79#issuecomment-1889448042 > JdbcRowDataInputFormat @libenchao This sounds interesting. An early fix that I produced text replaced ? for the parameters; this was a simple change and did not need changes to the visitor or the ParameterizedPredicate. The issue that @snuyanzin pointed out was that quoted column names containing a `?` would fail. This was the reason for adding in the indexes - so we knew the index of the placeholder we needed to replace the parameter with and not match a column name character. It seems that `FieldNamedPreparedStatement` is passed parameters `Connection connection, String sql, String[] fieldNames `. It will need to replace the `?` and not get confused by a `?` in a column name. I notice that there is parsing for the `:` character which could also be in a quoted column name. I have googled around for whether placeholder characters are allowed in column names - I have not found anything conclusive. If we do not need to support this, the fix could be massively simplified, removing the need to update the ParameterizedPredicate and the visitor. Do you think placeholder column names will work with the `FieldNamedPreparedStatement` approach? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]
libenchao commented on PR #79: URL: https://github.com/apache/flink-connector-jdbc/pull/79#issuecomment-1889182758 @snuyanzin My rough idea is like: * Pass `resolvedPredicates` and `pushdownParams` to `JdbcRowDataLookupFunction` * `FieldNamedPreparedStatement` may be need to adapt to allow pass additional predicates * set params to additional predicates somewhere, maybe after `lookupKeyRowConverter.toExternal` My goal is to keep current `ParameterizedPredicate` and `JdbcFilterPushdownPreparedStatementVisitor` unchanged, and adapt `JdbcRowDataLookupFunction` just like how we did for `JdbcRowDataInputFormat`. The challenge is how to make it work with `FieldNamedPreparedStatement` now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]
snuyanzin commented on PR #79: URL: https://github.com/apache/flink-connector-jdbc/pull/79#issuecomment-1888940790 @libenchao thanks for your comments/feedback >I'll try to come out with a solution with FieldNamedPreparedStatement in the coming days, but I cannot guarantee that I have enough time for that. if you at least share some ideas that would also be helpful, I might have some time during weekends to try things out -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]
libenchao commented on PR #79: URL: https://github.com/apache/flink-connector-jdbc/pull/79#issuecomment-1888926645 > I am new to this area. I was looking to work within the existing design with minimal changes (as the design was added in a flip and works for scan queries. I think we have something that solves this critical issue - @libenchao would you be ok to proceed with this design and raise a subsequent issue / flip for a more elegant design. Or are you thinking this design is not appropriate and should not be merged? I need to look into your alternative proposals to understand them. I took a brief look at `JdbcRowDataLookupFunction`, and I now know why `JdbcRowDataLookupFunction` is slightly different from `JdbcRowDataInputFormat`, it's because `JdbcRowDataLookupFunction` uses `FieldNamedPreparedStatement` which does not allow `?` in the query. I'll try to come out with a solution with `FieldNamedPreparedStatement` in the coming days, but I cannot guarantee that I have enough time for that. So if you are confident with current PR, you can go ahead with merging it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]
davidradl commented on PR #79: URL: https://github.com/apache/flink-connector-jdbc/pull/79#issuecomment-1887600132 @snuyanzin I have left this with multiple commits for now . Should we squash to one commit and put you as a co author in the commit message? If so how would you like me to identify you in the `Co-authored-by: NAME ` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]
davidradl commented on code in PR #79: URL: https://github.com/apache/flink-connector-jdbc/pull/79#discussion_r1449155934 ## flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/ParameterizedPredicatePlanTest.java: ## @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.table; + +import org.apache.flink.table.planner.utils.TableTestBase; + +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.List; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests ParameterizedPredicate. */ +public class ParameterizedPredicatePlanTest extends TableTestBase { Review Comment: @snuyanzin I assume that this comment is no longer relevant as is , as you have extended ParameterizedPredicatePlanTest to be a ParameterizedTest -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]
davidradl commented on code in PR #79: URL: https://github.com/apache/flink-connector-jdbc/pull/79#discussion_r1448644093 ## flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcRowDataLookupFunction.java: ## @@ -92,9 +103,22 @@ public JdbcRowDataLookupFunction( }) .toArray(DataType[]::new); this.maxRetryTimes = maxRetryTimes; -this.query = + +final String baseSelectStatement = options.getDialect() .getSelectFromStatement(options.getTableName(), fieldNames, keyNames); +if (conditions == null || conditions.length == 0) { +this.query = baseSelectStatement; +if (LOG.isDebugEnabled()) { +LOG.debug("Issuing look up select {}", this.query); +} +} else { +this.query = baseSelectStatement + " AND " + String.join(" AND ", conditions); Review Comment: [https://github.com/apache/flink-connector-jdbc/blob/e3dd84160cd665ae17672da8b6e742e61a72a32d/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/AbstractDialect.java#L198](https://github.com/apache/flink-connector-jdbc/blob/e3dd84160cd665ae17672da8b6e742e61a72a32d/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/AbstractDialect.java#L198) puts in a WHERE clause if there are conditions and this else is only when there are conditions. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]
davidradl commented on code in PR #79: URL: https://github.com/apache/flink-connector-jdbc/pull/79#discussion_r1446523771 ## flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcRowDataLookupFunction.java: ## @@ -92,9 +103,22 @@ public JdbcRowDataLookupFunction( }) .toArray(DataType[]::new); this.maxRetryTimes = maxRetryTimes; -this.query = + +final String baseSelectStatement = options.getDialect() .getSelectFromStatement(options.getTableName(), fieldNames, keyNames); +if (conditions == null || conditions.length == 0) { +this.query = baseSelectStatement; +if (LOG.isDebugEnabled()) { Review Comment: log4jLogger does check for debug, but other Logger implementations may not . I see this method is used extensively in core Flink - I am inclined to leave it in, it means the parameters are not formatted when debug is not on. WDYT @libenchao -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]
davidradl commented on code in PR #79: URL: https://github.com/apache/flink-connector-jdbc/pull/79#discussion_r1448612970 ## flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcRowDataLookupFunction.java: ## @@ -72,6 +72,17 @@ public JdbcRowDataLookupFunction( DataType[] fieldTypes, String[] keyNames, RowType rowType) { +this(options, maxRetryTimes, fieldNames, fieldTypes, keyNames, rowType, null); +} + +public JdbcRowDataLookupFunction( Review Comment: @libenchao yes we could have done this . I think it is nice for the caller to not have to include a null value for a parameter it is not concerned with and overload the constructor. If you feel strongly about this style I can change it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]
davidradl commented on PR #79: URL: https://github.com/apache/flink-connector-jdbc/pull/79#issuecomment-1885629516 > I haven't gone through all the code and comments yet, there are some inline comments below, however, two questions came into my mind while reviewing the PR: > > * Why do you need to refactor `ParameterizedPredicate` and `JdbcFilterPushdownPreparedStatementVisitor`? Can you just adapt `JdbcRowDataLookupFunction` just like `JdbcRowDataInputFormat`? > * Have you considered `PrepareStatement` way to handle literals, as we have already discussed in the Jira? (I'm not sure about this, but looking at `JdbcRowDataLookupFunction`, it seems no place are handling this, so I assume that the implementation does not address that) I am new to this area. I was looking to work within the existing design with minimal changes (as the design was added ind a flip and works for scan queries. I think we have something that solves this critical issue - @libenchao would you be ok to proceed with this design and raise a subsequent issue / flip for a more elegant design. Or are you thinking this design is not appropriate and should not be merged? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org