[jira] [Commented] (FLINK-33365) Missing filter condition in execution plan containing lookup join with mysql jdbc connector

2024-02-29 Thread macdoor615 (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33365?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17822108#comment-17822108
 ] 

macdoor615 commented on FLINK-33365:


[~Sergey Nuyanzin] You are right. I tested the sql using data and got the 
correct results. There is no corresponding filter fetch in the execution plan

> Missing filter condition in execution plan containing lookup join with mysql 
> jdbc connector
> ---
>
> Key: FLINK-33365
> URL: https://issues.apache.org/jira/browse/FLINK-33365
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / JDBC
>Affects Versions: 1.18.0, 1.17.1
> Environment: Flink 1.17.1 & Flink 1.18.0 with 
> flink-connector-jdbc-3.1.1-1.17.jar
>Reporter: macdoor615
>Assignee: david radley
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: jdbc-3.2.0, jdbc-3.1.2
>
> Attachments: flink-connector-jdbc-3.0.0-1.16.png, 
> flink-connector-jdbc-3.1.1-1.17.png
>
>
> create table in flink with sql-client.sh
> {code:java}
> CREATE TABLE default_catalog.default_database.a (
>   ip string, 
>   proctime as proctime()
> ) 
> WITH (
>   'connector' = 'datagen'
> );{code}
> create table in mysql
> {code:java}
> create table b (
>   ip varchar(20), 
>   type int
> );  {code}
>  
> Flink 1.17.1/ 1.18.0 and *flink-connector-jdbc-3.1.1-1.17.jar*
> excute in sql-client.sh 
> {code:java}
> explain SELECT * FROM default_catalog.default_database.a left join 
> bnpmp_mysql_test.gem_tmp.b FOR SYSTEM_TIME AS OF a.proctime on b.type = 0 and 
> a.ip = b.ip; {code}
> get the execution plan
> {code:java}
> ...
> == Optimized Execution Plan ==
> Calc(select=[ip, PROCTIME_MATERIALIZE(proctime) AS proctime, ip0, type])
> +- LookupJoin(table=[bnpmp_mysql_test.gem_tmp.b], joinType=[LeftOuterJoin], 
> lookup=[ip=ip], select=[ip, proctime, ip, CAST(0 AS INTEGER) AS type, CAST(ip 
> AS VARCHAR(2147483647)) AS ip0])
>    +- Calc(select=[ip, PROCTIME() AS proctime])
>       +- TableSourceScan(table=[[default_catalog, default_database, a]], 
> fields=[ip]){code}
>  
> excute same sql in sql-client with Flink 1.17.1/ 1.18.0 and 
> *flink-connector-jdbc-3.0.0-1.16.jar*
> get the execution plan
> {code:java}
> ...
> == Optimized Execution Plan ==
> Calc(select=[ip, PROCTIME_MATERIALIZE(proctime) AS proctime, ip0, type])
> +- LookupJoin(table=[bnpmp_mysql_test.gem_tmp.b], joinType=[LeftOuterJoin], 
> lookup=[type=0, ip=ip], where=[(type = 0)], select=[ip, proctime, ip, CAST(0 
> AS INTEGER) AS type, CAST(ip AS VARCHAR(2147483647)) AS ip0])
>    +- Calc(select=[ip, PROCTIME() AS proctime])
>       +- TableSourceScan(table=[[default_catalog, default_database, a]], 
> fields=[ip]) {code}
> with flink-connector-jdbc-3.1.1-1.17.jar,  the condition is 
> *lookup=[ip=ip]*
> with flink-connector-jdbc-3.0.0-1.16.jar ,  the condition is 
> *lookup=[type=0, ip=ip], where=[(type = 0)]*
>  
> In out real world production environment, this lead incorrect data output
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33365) Missing filter condition in execution plan containing lookup join with mysql jdbc connector

2024-02-29 Thread Benchao Li (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33365?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17822097#comment-17822097
 ] 

Benchao Li commented on FLINK-33365:


Yes, the related issue is FLINK-34170

> Missing filter condition in execution plan containing lookup join with mysql 
> jdbc connector
> ---
>
> Key: FLINK-33365
> URL: https://issues.apache.org/jira/browse/FLINK-33365
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / JDBC
>Affects Versions: 1.18.0, 1.17.1
> Environment: Flink 1.17.1 & Flink 1.18.0 with 
> flink-connector-jdbc-3.1.1-1.17.jar
>Reporter: macdoor615
>Assignee: david radley
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: jdbc-3.2.0, jdbc-3.1.2
>
> Attachments: flink-connector-jdbc-3.0.0-1.16.png, 
> flink-connector-jdbc-3.1.1-1.17.png
>
>
> create table in flink with sql-client.sh
> {code:java}
> CREATE TABLE default_catalog.default_database.a (
>   ip string, 
>   proctime as proctime()
> ) 
> WITH (
>   'connector' = 'datagen'
> );{code}
> create table in mysql
> {code:java}
> create table b (
>   ip varchar(20), 
>   type int
> );  {code}
>  
> Flink 1.17.1/ 1.18.0 and *flink-connector-jdbc-3.1.1-1.17.jar*
> excute in sql-client.sh 
> {code:java}
> explain SELECT * FROM default_catalog.default_database.a left join 
> bnpmp_mysql_test.gem_tmp.b FOR SYSTEM_TIME AS OF a.proctime on b.type = 0 and 
> a.ip = b.ip; {code}
> get the execution plan
> {code:java}
> ...
> == Optimized Execution Plan ==
> Calc(select=[ip, PROCTIME_MATERIALIZE(proctime) AS proctime, ip0, type])
> +- LookupJoin(table=[bnpmp_mysql_test.gem_tmp.b], joinType=[LeftOuterJoin], 
> lookup=[ip=ip], select=[ip, proctime, ip, CAST(0 AS INTEGER) AS type, CAST(ip 
> AS VARCHAR(2147483647)) AS ip0])
>    +- Calc(select=[ip, PROCTIME() AS proctime])
>       +- TableSourceScan(table=[[default_catalog, default_database, a]], 
> fields=[ip]){code}
>  
> excute same sql in sql-client with Flink 1.17.1/ 1.18.0 and 
> *flink-connector-jdbc-3.0.0-1.16.jar*
> get the execution plan
> {code:java}
> ...
> == Optimized Execution Plan ==
> Calc(select=[ip, PROCTIME_MATERIALIZE(proctime) AS proctime, ip0, type])
> +- LookupJoin(table=[bnpmp_mysql_test.gem_tmp.b], joinType=[LeftOuterJoin], 
> lookup=[type=0, ip=ip], where=[(type = 0)], select=[ip, proctime, ip, CAST(0 
> AS INTEGER) AS type, CAST(ip AS VARCHAR(2147483647)) AS ip0])
>    +- Calc(select=[ip, PROCTIME() AS proctime])
>       +- TableSourceScan(table=[[default_catalog, default_database, a]], 
> fields=[ip]) {code}
> with flink-connector-jdbc-3.1.1-1.17.jar,  the condition is 
> *lookup=[ip=ip]*
> with flink-connector-jdbc-3.0.0-1.16.jar ,  the condition is 
> *lookup=[type=0, ip=ip], where=[(type = 0)]*
>  
> In out real world production environment, this lead incorrect data output
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33365) Missing filter condition in execution plan containing lookup join with mysql jdbc connector

2024-02-29 Thread Sergey Nuyanzin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33365?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17821996#comment-17821996
 ] 

Sergey Nuyanzin commented on FLINK-33365:
-

[~macdoor615] iIRC it was discussed during PR and this is the issue for 
{{TableSourceScan}} how it is serialized to string and should be handled within 
another jira issue, however at the same time the result should be correct.
Could you please check it?

> Missing filter condition in execution plan containing lookup join with mysql 
> jdbc connector
> ---
>
> Key: FLINK-33365
> URL: https://issues.apache.org/jira/browse/FLINK-33365
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / JDBC
>Affects Versions: 1.18.0, 1.17.1
> Environment: Flink 1.17.1 & Flink 1.18.0 with 
> flink-connector-jdbc-3.1.1-1.17.jar
>Reporter: macdoor615
>Assignee: david radley
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: jdbc-3.2.0, jdbc-3.1.2
>
> Attachments: flink-connector-jdbc-3.0.0-1.16.png, 
> flink-connector-jdbc-3.1.1-1.17.png
>
>
> create table in flink with sql-client.sh
> {code:java}
> CREATE TABLE default_catalog.default_database.a (
>   ip string, 
>   proctime as proctime()
> ) 
> WITH (
>   'connector' = 'datagen'
> );{code}
> create table in mysql
> {code:java}
> create table b (
>   ip varchar(20), 
>   type int
> );  {code}
>  
> Flink 1.17.1/ 1.18.0 and *flink-connector-jdbc-3.1.1-1.17.jar*
> excute in sql-client.sh 
> {code:java}
> explain SELECT * FROM default_catalog.default_database.a left join 
> bnpmp_mysql_test.gem_tmp.b FOR SYSTEM_TIME AS OF a.proctime on b.type = 0 and 
> a.ip = b.ip; {code}
> get the execution plan
> {code:java}
> ...
> == Optimized Execution Plan ==
> Calc(select=[ip, PROCTIME_MATERIALIZE(proctime) AS proctime, ip0, type])
> +- LookupJoin(table=[bnpmp_mysql_test.gem_tmp.b], joinType=[LeftOuterJoin], 
> lookup=[ip=ip], select=[ip, proctime, ip, CAST(0 AS INTEGER) AS type, CAST(ip 
> AS VARCHAR(2147483647)) AS ip0])
>    +- Calc(select=[ip, PROCTIME() AS proctime])
>       +- TableSourceScan(table=[[default_catalog, default_database, a]], 
> fields=[ip]){code}
>  
> excute same sql in sql-client with Flink 1.17.1/ 1.18.0 and 
> *flink-connector-jdbc-3.0.0-1.16.jar*
> get the execution plan
> {code:java}
> ...
> == Optimized Execution Plan ==
> Calc(select=[ip, PROCTIME_MATERIALIZE(proctime) AS proctime, ip0, type])
> +- LookupJoin(table=[bnpmp_mysql_test.gem_tmp.b], joinType=[LeftOuterJoin], 
> lookup=[type=0, ip=ip], where=[(type = 0)], select=[ip, proctime, ip, CAST(0 
> AS INTEGER) AS type, CAST(ip AS VARCHAR(2147483647)) AS ip0])
>    +- Calc(select=[ip, PROCTIME() AS proctime])
>       +- TableSourceScan(table=[[default_catalog, default_database, a]], 
> fields=[ip]) {code}
> with flink-connector-jdbc-3.1.1-1.17.jar,  the condition is 
> *lookup=[ip=ip]*
> with flink-connector-jdbc-3.0.0-1.16.jar ,  the condition is 
> *lookup=[type=0, ip=ip], where=[(type = 0)]*
>  
> In out real world production environment, this lead incorrect data output
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33365) Missing filter condition in execution plan containing lookup join with mysql jdbc connector

2024-02-08 Thread Martijn Visser (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33365?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17815943#comment-17815943
 ] 

Martijn Visser commented on FLINK-33365:


[~rahulchundeth] See the Dev mailing list for an ongoing voting thread; please 
help validating the release as well. 
https://lists.apache.org/thread/rlk5kp2vxgkmbxmq4wnco885q5vv9rtp

> Missing filter condition in execution plan containing lookup join with mysql 
> jdbc connector
> ---
>
> Key: FLINK-33365
> URL: https://issues.apache.org/jira/browse/FLINK-33365
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / JDBC
>Affects Versions: 1.18.0, 1.17.1
> Environment: Flink 1.17.1 & Flink 1.18.0 with 
> flink-connector-jdbc-3.1.1-1.17.jar
>Reporter: macdoor615
>Assignee: david radley
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: jdbc-3.2.0, jdbc-3.1.2
>
> Attachments: flink-connector-jdbc-3.0.0-1.16.png, 
> flink-connector-jdbc-3.1.1-1.17.png
>
>
> create table in flink with sql-client.sh
> {code:java}
> CREATE TABLE default_catalog.default_database.a (
>   ip string, 
>   proctime as proctime()
> ) 
> WITH (
>   'connector' = 'datagen'
> );{code}
> create table in mysql
> {code:java}
> create table b (
>   ip varchar(20), 
>   type int
> );  {code}
>  
> Flink 1.17.1/ 1.18.0 and *flink-connector-jdbc-3.1.1-1.17.jar*
> excute in sql-client.sh 
> {code:java}
> explain SELECT * FROM default_catalog.default_database.a left join 
> bnpmp_mysql_test.gem_tmp.b FOR SYSTEM_TIME AS OF a.proctime on b.type = 0 and 
> a.ip = b.ip; {code}
> get the execution plan
> {code:java}
> ...
> == Optimized Execution Plan ==
> Calc(select=[ip, PROCTIME_MATERIALIZE(proctime) AS proctime, ip0, type])
> +- LookupJoin(table=[bnpmp_mysql_test.gem_tmp.b], joinType=[LeftOuterJoin], 
> lookup=[ip=ip], select=[ip, proctime, ip, CAST(0 AS INTEGER) AS type, CAST(ip 
> AS VARCHAR(2147483647)) AS ip0])
>    +- Calc(select=[ip, PROCTIME() AS proctime])
>       +- TableSourceScan(table=[[default_catalog, default_database, a]], 
> fields=[ip]){code}
>  
> excute same sql in sql-client with Flink 1.17.1/ 1.18.0 and 
> *flink-connector-jdbc-3.0.0-1.16.jar*
> get the execution plan
> {code:java}
> ...
> == Optimized Execution Plan ==
> Calc(select=[ip, PROCTIME_MATERIALIZE(proctime) AS proctime, ip0, type])
> +- LookupJoin(table=[bnpmp_mysql_test.gem_tmp.b], joinType=[LeftOuterJoin], 
> lookup=[type=0, ip=ip], where=[(type = 0)], select=[ip, proctime, ip, CAST(0 
> AS INTEGER) AS type, CAST(ip AS VARCHAR(2147483647)) AS ip0])
>    +- Calc(select=[ip, PROCTIME() AS proctime])
>       +- TableSourceScan(table=[[default_catalog, default_database, a]], 
> fields=[ip]) {code}
> with flink-connector-jdbc-3.1.1-1.17.jar,  the condition is 
> *lookup=[ip=ip]*
> with flink-connector-jdbc-3.0.0-1.16.jar ,  the condition is 
> *lookup=[type=0, ip=ip], where=[(type = 0)]*
>  
> In out real world production environment, this lead incorrect data output
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33365) Missing filter condition in execution plan containing lookup join with mysql jdbc connector

2024-02-08 Thread Rahul Raj (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33365?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17815934#comment-17815934
 ] 

Rahul Raj commented on FLINK-33365:
---

Hi Team,
when we can expect next version of flink-connector-jdbc with this fix?

> Missing filter condition in execution plan containing lookup join with mysql 
> jdbc connector
> ---
>
> Key: FLINK-33365
> URL: https://issues.apache.org/jira/browse/FLINK-33365
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / JDBC
>Affects Versions: 1.18.0, 1.17.1
> Environment: Flink 1.17.1 & Flink 1.18.0 with 
> flink-connector-jdbc-3.1.1-1.17.jar
>Reporter: macdoor615
>Assignee: david radley
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: jdbc-3.2.0, jdbc-3.1.2
>
> Attachments: flink-connector-jdbc-3.0.0-1.16.png, 
> flink-connector-jdbc-3.1.1-1.17.png
>
>
> create table in flink with sql-client.sh
> {code:java}
> CREATE TABLE default_catalog.default_database.a (
>   ip string, 
>   proctime as proctime()
> ) 
> WITH (
>   'connector' = 'datagen'
> );{code}
> create table in mysql
> {code:java}
> create table b (
>   ip varchar(20), 
>   type int
> );  {code}
>  
> Flink 1.17.1/ 1.18.0 and *flink-connector-jdbc-3.1.1-1.17.jar*
> excute in sql-client.sh 
> {code:java}
> explain SELECT * FROM default_catalog.default_database.a left join 
> bnpmp_mysql_test.gem_tmp.b FOR SYSTEM_TIME AS OF a.proctime on b.type = 0 and 
> a.ip = b.ip; {code}
> get the execution plan
> {code:java}
> ...
> == Optimized Execution Plan ==
> Calc(select=[ip, PROCTIME_MATERIALIZE(proctime) AS proctime, ip0, type])
> +- LookupJoin(table=[bnpmp_mysql_test.gem_tmp.b], joinType=[LeftOuterJoin], 
> lookup=[ip=ip], select=[ip, proctime, ip, CAST(0 AS INTEGER) AS type, CAST(ip 
> AS VARCHAR(2147483647)) AS ip0])
>    +- Calc(select=[ip, PROCTIME() AS proctime])
>       +- TableSourceScan(table=[[default_catalog, default_database, a]], 
> fields=[ip]){code}
>  
> excute same sql in sql-client with Flink 1.17.1/ 1.18.0 and 
> *flink-connector-jdbc-3.0.0-1.16.jar*
> get the execution plan
> {code:java}
> ...
> == Optimized Execution Plan ==
> Calc(select=[ip, PROCTIME_MATERIALIZE(proctime) AS proctime, ip0, type])
> +- LookupJoin(table=[bnpmp_mysql_test.gem_tmp.b], joinType=[LeftOuterJoin], 
> lookup=[type=0, ip=ip], where=[(type = 0)], select=[ip, proctime, ip, CAST(0 
> AS INTEGER) AS type, CAST(ip AS VARCHAR(2147483647)) AS ip0])
>    +- Calc(select=[ip, PROCTIME() AS proctime])
>       +- TableSourceScan(table=[[default_catalog, default_database, a]], 
> fields=[ip]) {code}
> with flink-connector-jdbc-3.1.1-1.17.jar,  the condition is 
> *lookup=[ip=ip]*
> with flink-connector-jdbc-3.0.0-1.16.jar ,  the condition is 
> *lookup=[type=0, ip=ip], where=[(type = 0)]*
>  
> In out real world production environment, this lead incorrect data output
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33365) Missing filter condition in execution plan containing lookup join with mysql jdbc connector

2024-01-26 Thread Sergey Nuyanzin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33365?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17811296#comment-17811296
 ] 

Sergey Nuyanzin commented on FLINK-33365:
-

Merged to 3.1.x as 
[6f318a5326b2d7aba554638b186bf4cc4e17511c|https://github.com/apache/flink-connector-jdbc/commit/6f318a5326b2d7aba554638b186bf4cc4e17511c]

> Missing filter condition in execution plan containing lookup join with mysql 
> jdbc connector
> ---
>
> Key: FLINK-33365
> URL: https://issues.apache.org/jira/browse/FLINK-33365
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / JDBC
>Affects Versions: 1.18.0, 1.17.1
> Environment: Flink 1.17.1 & Flink 1.18.0 with 
> flink-connector-jdbc-3.1.1-1.17.jar
>Reporter: macdoor615
>Assignee: david radley
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: jdbc-3.2.0, jdbc-3.1.2
>
> Attachments: flink-connector-jdbc-3.0.0-1.16.png, 
> flink-connector-jdbc-3.1.1-1.17.png
>
>
> create table in flink with sql-client.sh
> {code:java}
> CREATE TABLE default_catalog.default_database.a (
>   ip string, 
>   proctime as proctime()
> ) 
> WITH (
>   'connector' = 'datagen'
> );{code}
> create table in mysql
> {code:java}
> create table b (
>   ip varchar(20), 
>   type int
> );  {code}
>  
> Flink 1.17.1/ 1.18.0 and *flink-connector-jdbc-3.1.1-1.17.jar*
> excute in sql-client.sh 
> {code:java}
> explain SELECT * FROM default_catalog.default_database.a left join 
> bnpmp_mysql_test.gem_tmp.b FOR SYSTEM_TIME AS OF a.proctime on b.type = 0 and 
> a.ip = b.ip; {code}
> get the execution plan
> {code:java}
> ...
> == Optimized Execution Plan ==
> Calc(select=[ip, PROCTIME_MATERIALIZE(proctime) AS proctime, ip0, type])
> +- LookupJoin(table=[bnpmp_mysql_test.gem_tmp.b], joinType=[LeftOuterJoin], 
> lookup=[ip=ip], select=[ip, proctime, ip, CAST(0 AS INTEGER) AS type, CAST(ip 
> AS VARCHAR(2147483647)) AS ip0])
>    +- Calc(select=[ip, PROCTIME() AS proctime])
>       +- TableSourceScan(table=[[default_catalog, default_database, a]], 
> fields=[ip]){code}
>  
> excute same sql in sql-client with Flink 1.17.1/ 1.18.0 and 
> *flink-connector-jdbc-3.0.0-1.16.jar*
> get the execution plan
> {code:java}
> ...
> == Optimized Execution Plan ==
> Calc(select=[ip, PROCTIME_MATERIALIZE(proctime) AS proctime, ip0, type])
> +- LookupJoin(table=[bnpmp_mysql_test.gem_tmp.b], joinType=[LeftOuterJoin], 
> lookup=[type=0, ip=ip], where=[(type = 0)], select=[ip, proctime, ip, CAST(0 
> AS INTEGER) AS type, CAST(ip AS VARCHAR(2147483647)) AS ip0])
>    +- Calc(select=[ip, PROCTIME() AS proctime])
>       +- TableSourceScan(table=[[default_catalog, default_database, a]], 
> fields=[ip]) {code}
> with flink-connector-jdbc-3.1.1-1.17.jar,  the condition is 
> *lookup=[ip=ip]*
> with flink-connector-jdbc-3.0.0-1.16.jar ,  the condition is 
> *lookup=[type=0, ip=ip], where=[(type = 0)]*
>  
> In out real world production environment, this lead incorrect data output
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33365) Missing filter condition in execution plan containing lookup join with mysql jdbc connector

2024-01-22 Thread Benchao Li (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33365?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17809712#comment-17809712
 ] 

Benchao Li commented on FLINK-33365:


[~martijnvisser]  The PR is very close to merge now, I'll try to finalize it 
asap, and hopefully we can resolve this blocker which week and unblock the 
release process.

> Missing filter condition in execution plan containing lookup join with mysql 
> jdbc connector
> ---
>
> Key: FLINK-33365
> URL: https://issues.apache.org/jira/browse/FLINK-33365
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / JDBC
>Affects Versions: 1.18.0, 1.17.1
> Environment: Flink 1.17.1 & Flink 1.18.0 with 
> flink-connector-jdbc-3.1.1-1.17.jar
>Reporter: macdoor615
>Assignee: david radley
>Priority: Blocker
>  Labels: pull-request-available
> Attachments: flink-connector-jdbc-3.0.0-1.16.png, 
> flink-connector-jdbc-3.1.1-1.17.png
>
>
> create table in flink with sql-client.sh
> {code:java}
> CREATE TABLE default_catalog.default_database.a (
>   ip string, 
>   proctime as proctime()
> ) 
> WITH (
>   'connector' = 'datagen'
> );{code}
> create table in mysql
> {code:java}
> create table b (
>   ip varchar(20), 
>   type int
> );  {code}
>  
> Flink 1.17.1/ 1.18.0 and *flink-connector-jdbc-3.1.1-1.17.jar*
> excute in sql-client.sh 
> {code:java}
> explain SELECT * FROM default_catalog.default_database.a left join 
> bnpmp_mysql_test.gem_tmp.b FOR SYSTEM_TIME AS OF a.proctime on b.type = 0 and 
> a.ip = b.ip; {code}
> get the execution plan
> {code:java}
> ...
> == Optimized Execution Plan ==
> Calc(select=[ip, PROCTIME_MATERIALIZE(proctime) AS proctime, ip0, type])
> +- LookupJoin(table=[bnpmp_mysql_test.gem_tmp.b], joinType=[LeftOuterJoin], 
> lookup=[ip=ip], select=[ip, proctime, ip, CAST(0 AS INTEGER) AS type, CAST(ip 
> AS VARCHAR(2147483647)) AS ip0])
>    +- Calc(select=[ip, PROCTIME() AS proctime])
>       +- TableSourceScan(table=[[default_catalog, default_database, a]], 
> fields=[ip]){code}
>  
> excute same sql in sql-client with Flink 1.17.1/ 1.18.0 and 
> *flink-connector-jdbc-3.0.0-1.16.jar*
> get the execution plan
> {code:java}
> ...
> == Optimized Execution Plan ==
> Calc(select=[ip, PROCTIME_MATERIALIZE(proctime) AS proctime, ip0, type])
> +- LookupJoin(table=[bnpmp_mysql_test.gem_tmp.b], joinType=[LeftOuterJoin], 
> lookup=[type=0, ip=ip], where=[(type = 0)], select=[ip, proctime, ip, CAST(0 
> AS INTEGER) AS type, CAST(ip AS VARCHAR(2147483647)) AS ip0])
>    +- Calc(select=[ip, PROCTIME() AS proctime])
>       +- TableSourceScan(table=[[default_catalog, default_database, a]], 
> fields=[ip]) {code}
> with flink-connector-jdbc-3.1.1-1.17.jar,  the condition is 
> *lookup=[ip=ip]*
> with flink-connector-jdbc-3.0.0-1.16.jar ,  the condition is 
> *lookup=[type=0, ip=ip], where=[(type = 0)]*
>  
> In out real world production environment, this lead incorrect data output
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33365) Missing filter condition in execution plan containing lookup join with mysql jdbc connector

2024-01-22 Thread Martijn Visser (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33365?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17809424#comment-17809424
 ] 

Martijn Visser commented on FLINK-33365:


What's the status of this ticket? Are we closing in on fixing this, or will we 
revert the change? Flink 1.18 was announced end of October, and we've still 
been unable to make a JDBC connector release that's compatible with it because 
of this blocker. In hindsight, I think we've made a mistake with not flagging 
this as a blocker earlier, and we should have just reverted this change. 

> Missing filter condition in execution plan containing lookup join with mysql 
> jdbc connector
> ---
>
> Key: FLINK-33365
> URL: https://issues.apache.org/jira/browse/FLINK-33365
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / JDBC
>Affects Versions: 1.18.0, 1.17.1
> Environment: Flink 1.17.1 & Flink 1.18.0 with 
> flink-connector-jdbc-3.1.1-1.17.jar
>Reporter: macdoor615
>Assignee: david radley
>Priority: Blocker
>  Labels: pull-request-available
> Attachments: flink-connector-jdbc-3.0.0-1.16.png, 
> flink-connector-jdbc-3.1.1-1.17.png
>
>
> create table in flink with sql-client.sh
> {code:java}
> CREATE TABLE default_catalog.default_database.a (
>   ip string, 
>   proctime as proctime()
> ) 
> WITH (
>   'connector' = 'datagen'
> );{code}
> create table in mysql
> {code:java}
> create table b (
>   ip varchar(20), 
>   type int
> );  {code}
>  
> Flink 1.17.1/ 1.18.0 and *flink-connector-jdbc-3.1.1-1.17.jar*
> excute in sql-client.sh 
> {code:java}
> explain SELECT * FROM default_catalog.default_database.a left join 
> bnpmp_mysql_test.gem_tmp.b FOR SYSTEM_TIME AS OF a.proctime on b.type = 0 and 
> a.ip = b.ip; {code}
> get the execution plan
> {code:java}
> ...
> == Optimized Execution Plan ==
> Calc(select=[ip, PROCTIME_MATERIALIZE(proctime) AS proctime, ip0, type])
> +- LookupJoin(table=[bnpmp_mysql_test.gem_tmp.b], joinType=[LeftOuterJoin], 
> lookup=[ip=ip], select=[ip, proctime, ip, CAST(0 AS INTEGER) AS type, CAST(ip 
> AS VARCHAR(2147483647)) AS ip0])
>    +- Calc(select=[ip, PROCTIME() AS proctime])
>       +- TableSourceScan(table=[[default_catalog, default_database, a]], 
> fields=[ip]){code}
>  
> excute same sql in sql-client with Flink 1.17.1/ 1.18.0 and 
> *flink-connector-jdbc-3.0.0-1.16.jar*
> get the execution plan
> {code:java}
> ...
> == Optimized Execution Plan ==
> Calc(select=[ip, PROCTIME_MATERIALIZE(proctime) AS proctime, ip0, type])
> +- LookupJoin(table=[bnpmp_mysql_test.gem_tmp.b], joinType=[LeftOuterJoin], 
> lookup=[type=0, ip=ip], where=[(type = 0)], select=[ip, proctime, ip, CAST(0 
> AS INTEGER) AS type, CAST(ip AS VARCHAR(2147483647)) AS ip0])
>    +- Calc(select=[ip, PROCTIME() AS proctime])
>       +- TableSourceScan(table=[[default_catalog, default_database, a]], 
> fields=[ip]) {code}
> with flink-connector-jdbc-3.1.1-1.17.jar,  the condition is 
> *lookup=[ip=ip]*
> with flink-connector-jdbc-3.0.0-1.16.jar ,  the condition is 
> *lookup=[type=0, ip=ip], where=[(type = 0)]*
>  
> In out real world production environment, this lead incorrect data output
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33365) Missing filter condition in execution plan containing lookup join with mysql jdbc connector

2024-01-09 Thread Sergey Nuyanzin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33365?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17804657#comment-17804657
 ] 

Sergey Nuyanzin commented on FLINK-33365:
-

[~rahulchundeth] there is an existing PR and you can help/speed it up by 
helping reviewing this

> Missing filter condition in execution plan containing lookup join with mysql 
> jdbc connector
> ---
>
> Key: FLINK-33365
> URL: https://issues.apache.org/jira/browse/FLINK-33365
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / JDBC
>Affects Versions: 1.18.0, 1.17.1
> Environment: Flink 1.17.1 & Flink 1.18.0 with 
> flink-connector-jdbc-3.1.1-1.17.jar
>Reporter: macdoor615
>Assignee: david radley
>Priority: Blocker
>  Labels: pull-request-available
> Attachments: flink-connector-jdbc-3.0.0-1.16.png, 
> flink-connector-jdbc-3.1.1-1.17.png
>
>
> create table in flink with sql-client.sh
> {code:java}
> CREATE TABLE default_catalog.default_database.a (
>   ip string, 
>   proctime as proctime()
> ) 
> WITH (
>   'connector' = 'datagen'
> );{code}
> create table in mysql
> {code:java}
> create table b (
>   ip varchar(20), 
>   type int
> );  {code}
>  
> Flink 1.17.1/ 1.18.0 and *flink-connector-jdbc-3.1.1-1.17.jar*
> excute in sql-client.sh 
> {code:java}
> explain SELECT * FROM default_catalog.default_database.a left join 
> bnpmp_mysql_test.gem_tmp.b FOR SYSTEM_TIME AS OF a.proctime on b.type = 0 and 
> a.ip = b.ip; {code}
> get the execution plan
> {code:java}
> ...
> == Optimized Execution Plan ==
> Calc(select=[ip, PROCTIME_MATERIALIZE(proctime) AS proctime, ip0, type])
> +- LookupJoin(table=[bnpmp_mysql_test.gem_tmp.b], joinType=[LeftOuterJoin], 
> lookup=[ip=ip], select=[ip, proctime, ip, CAST(0 AS INTEGER) AS type, CAST(ip 
> AS VARCHAR(2147483647)) AS ip0])
>    +- Calc(select=[ip, PROCTIME() AS proctime])
>       +- TableSourceScan(table=[[default_catalog, default_database, a]], 
> fields=[ip]){code}
>  
> excute same sql in sql-client with Flink 1.17.1/ 1.18.0 and 
> *flink-connector-jdbc-3.0.0-1.16.jar*
> get the execution plan
> {code:java}
> ...
> == Optimized Execution Plan ==
> Calc(select=[ip, PROCTIME_MATERIALIZE(proctime) AS proctime, ip0, type])
> +- LookupJoin(table=[bnpmp_mysql_test.gem_tmp.b], joinType=[LeftOuterJoin], 
> lookup=[type=0, ip=ip], where=[(type = 0)], select=[ip, proctime, ip, CAST(0 
> AS INTEGER) AS type, CAST(ip AS VARCHAR(2147483647)) AS ip0])
>    +- Calc(select=[ip, PROCTIME() AS proctime])
>       +- TableSourceScan(table=[[default_catalog, default_database, a]], 
> fields=[ip]) {code}
> with flink-connector-jdbc-3.1.1-1.17.jar,  the condition is 
> *lookup=[ip=ip]*
> with flink-connector-jdbc-3.0.0-1.16.jar ,  the condition is 
> *lookup=[type=0, ip=ip], where=[(type = 0)]*
>  
> In out real world production environment, this lead incorrect data output
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33365) Missing filter condition in execution plan containing lookup join with mysql jdbc connector

2024-01-09 Thread Rahul Raj (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33365?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17804655#comment-17804655
 ] 

Rahul Raj commented on FLINK-33365:
---

Hi [~davidradl] , Do we have any ETA for this bug fix ?

> Missing filter condition in execution plan containing lookup join with mysql 
> jdbc connector
> ---
>
> Key: FLINK-33365
> URL: https://issues.apache.org/jira/browse/FLINK-33365
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / JDBC
>Affects Versions: 1.18.0, 1.17.1
> Environment: Flink 1.17.1 & Flink 1.18.0 with 
> flink-connector-jdbc-3.1.1-1.17.jar
>Reporter: macdoor615
>Assignee: david radley
>Priority: Blocker
>  Labels: pull-request-available
> Attachments: flink-connector-jdbc-3.0.0-1.16.png, 
> flink-connector-jdbc-3.1.1-1.17.png
>
>
> create table in flink with sql-client.sh
> {code:java}
> CREATE TABLE default_catalog.default_database.a (
>   ip string, 
>   proctime as proctime()
> ) 
> WITH (
>   'connector' = 'datagen'
> );{code}
> create table in mysql
> {code:java}
> create table b (
>   ip varchar(20), 
>   type int
> );  {code}
>  
> Flink 1.17.1/ 1.18.0 and *flink-connector-jdbc-3.1.1-1.17.jar*
> excute in sql-client.sh 
> {code:java}
> explain SELECT * FROM default_catalog.default_database.a left join 
> bnpmp_mysql_test.gem_tmp.b FOR SYSTEM_TIME AS OF a.proctime on b.type = 0 and 
> a.ip = b.ip; {code}
> get the execution plan
> {code:java}
> ...
> == Optimized Execution Plan ==
> Calc(select=[ip, PROCTIME_MATERIALIZE(proctime) AS proctime, ip0, type])
> +- LookupJoin(table=[bnpmp_mysql_test.gem_tmp.b], joinType=[LeftOuterJoin], 
> lookup=[ip=ip], select=[ip, proctime, ip, CAST(0 AS INTEGER) AS type, CAST(ip 
> AS VARCHAR(2147483647)) AS ip0])
>    +- Calc(select=[ip, PROCTIME() AS proctime])
>       +- TableSourceScan(table=[[default_catalog, default_database, a]], 
> fields=[ip]){code}
>  
> excute same sql in sql-client with Flink 1.17.1/ 1.18.0 and 
> *flink-connector-jdbc-3.0.0-1.16.jar*
> get the execution plan
> {code:java}
> ...
> == Optimized Execution Plan ==
> Calc(select=[ip, PROCTIME_MATERIALIZE(proctime) AS proctime, ip0, type])
> +- LookupJoin(table=[bnpmp_mysql_test.gem_tmp.b], joinType=[LeftOuterJoin], 
> lookup=[type=0, ip=ip], where=[(type = 0)], select=[ip, proctime, ip, CAST(0 
> AS INTEGER) AS type, CAST(ip AS VARCHAR(2147483647)) AS ip0])
>    +- Calc(select=[ip, PROCTIME() AS proctime])
>       +- TableSourceScan(table=[[default_catalog, default_database, a]], 
> fields=[ip]) {code}
> with flink-connector-jdbc-3.1.1-1.17.jar,  the condition is 
> *lookup=[ip=ip]*
> with flink-connector-jdbc-3.0.0-1.16.jar ,  the condition is 
> *lookup=[type=0, ip=ip], where=[(type = 0)]*
>  
> In out real world production environment, this lead incorrect data output
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33365) Missing filter condition in execution plan containing lookup join with mysql jdbc connector

2024-01-05 Thread Sergey Nuyanzin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33365?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17803557#comment-17803557
 ] 

Sergey Nuyanzin commented on FLINK-33365:
-

marked as a blocker as mentioned at 
https://lists.apache.org/thread/pc15twvf90l0d6t4vthomh6jzv1h145d

> Missing filter condition in execution plan containing lookup join with mysql 
> jdbc connector
> ---
>
> Key: FLINK-33365
> URL: https://issues.apache.org/jira/browse/FLINK-33365
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / JDBC
>Affects Versions: 1.18.0, 1.17.1
> Environment: Flink 1.17.1 & Flink 1.18.0 with 
> flink-connector-jdbc-3.1.1-1.17.jar
>Reporter: macdoor615
>Assignee: david radley
>Priority: Blocker
>  Labels: pull-request-available
> Attachments: flink-connector-jdbc-3.0.0-1.16.png, 
> flink-connector-jdbc-3.1.1-1.17.png
>
>
> create table in flink with sql-client.sh
> {code:java}
> CREATE TABLE default_catalog.default_database.a (
>   ip string, 
>   proctime as proctime()
> ) 
> WITH (
>   'connector' = 'datagen'
> );{code}
> create table in mysql
> {code:java}
> create table b (
>   ip varchar(20), 
>   type int
> );  {code}
>  
> Flink 1.17.1/ 1.18.0 and *flink-connector-jdbc-3.1.1-1.17.jar*
> excute in sql-client.sh 
> {code:java}
> explain SELECT * FROM default_catalog.default_database.a left join 
> bnpmp_mysql_test.gem_tmp.b FOR SYSTEM_TIME AS OF a.proctime on b.type = 0 and 
> a.ip = b.ip; {code}
> get the execution plan
> {code:java}
> ...
> == Optimized Execution Plan ==
> Calc(select=[ip, PROCTIME_MATERIALIZE(proctime) AS proctime, ip0, type])
> +- LookupJoin(table=[bnpmp_mysql_test.gem_tmp.b], joinType=[LeftOuterJoin], 
> lookup=[ip=ip], select=[ip, proctime, ip, CAST(0 AS INTEGER) AS type, CAST(ip 
> AS VARCHAR(2147483647)) AS ip0])
>    +- Calc(select=[ip, PROCTIME() AS proctime])
>       +- TableSourceScan(table=[[default_catalog, default_database, a]], 
> fields=[ip]){code}
>  
> excute same sql in sql-client with Flink 1.17.1/ 1.18.0 and 
> *flink-connector-jdbc-3.0.0-1.16.jar*
> get the execution plan
> {code:java}
> ...
> == Optimized Execution Plan ==
> Calc(select=[ip, PROCTIME_MATERIALIZE(proctime) AS proctime, ip0, type])
> +- LookupJoin(table=[bnpmp_mysql_test.gem_tmp.b], joinType=[LeftOuterJoin], 
> lookup=[type=0, ip=ip], where=[(type = 0)], select=[ip, proctime, ip, CAST(0 
> AS INTEGER) AS type, CAST(ip AS VARCHAR(2147483647)) AS ip0])
>    +- Calc(select=[ip, PROCTIME() AS proctime])
>       +- TableSourceScan(table=[[default_catalog, default_database, a]], 
> fields=[ip]) {code}
> with flink-connector-jdbc-3.1.1-1.17.jar,  the condition is 
> *lookup=[ip=ip]*
> with flink-connector-jdbc-3.0.0-1.16.jar ,  the condition is 
> *lookup=[type=0, ip=ip], where=[(type = 0)]*
>  
> In out real world production environment, this lead incorrect data output
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33365) Missing filter condition in execution plan containing lookup join with mysql jdbc connector

2023-12-19 Thread david radley (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33365?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17798719#comment-17798719
 ] 

david radley commented on FLINK-33365:
--

[~Sergey Nuyanzin] I have pushed up my latest changes including the fix for the 
OR case. The tests are more simplified but they are not data driven - with one 
method looking of test case data. I did not find a way with the existing data 
to drive the null case. I have tested that and the where locally but there is 
no automated test for it. Please review and see what more needs to be changed? 
I appreciate your time on this.   

> Missing filter condition in execution plan containing lookup join with mysql 
> jdbc connector
> ---
>
> Key: FLINK-33365
> URL: https://issues.apache.org/jira/browse/FLINK-33365
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / JDBC
>Affects Versions: 1.18.0, 1.17.1
> Environment: Flink 1.17.1 & Flink 1.18.0 with 
> flink-connector-jdbc-3.1.1-1.17.jar
>Reporter: macdoor615
>Assignee: david radley
>Priority: Critical
>  Labels: pull-request-available
> Attachments: flink-connector-jdbc-3.0.0-1.16.png, 
> flink-connector-jdbc-3.1.1-1.17.png
>
>
> create table in flink with sql-client.sh
> {code:java}
> CREATE TABLE default_catalog.default_database.a (
>   ip string, 
>   proctime as proctime()
> ) 
> WITH (
>   'connector' = 'datagen'
> );{code}
> create table in mysql
> {code:java}
> create table b (
>   ip varchar(20), 
>   type int
> );  {code}
>  
> Flink 1.17.1/ 1.18.0 and *flink-connector-jdbc-3.1.1-1.17.jar*
> excute in sql-client.sh 
> {code:java}
> explain SELECT * FROM default_catalog.default_database.a left join 
> bnpmp_mysql_test.gem_tmp.b FOR SYSTEM_TIME AS OF a.proctime on b.type = 0 and 
> a.ip = b.ip; {code}
> get the execution plan
> {code:java}
> ...
> == Optimized Execution Plan ==
> Calc(select=[ip, PROCTIME_MATERIALIZE(proctime) AS proctime, ip0, type])
> +- LookupJoin(table=[bnpmp_mysql_test.gem_tmp.b], joinType=[LeftOuterJoin], 
> lookup=[ip=ip], select=[ip, proctime, ip, CAST(0 AS INTEGER) AS type, CAST(ip 
> AS VARCHAR(2147483647)) AS ip0])
>    +- Calc(select=[ip, PROCTIME() AS proctime])
>       +- TableSourceScan(table=[[default_catalog, default_database, a]], 
> fields=[ip]){code}
>  
> excute same sql in sql-client with Flink 1.17.1/ 1.18.0 and 
> *flink-connector-jdbc-3.0.0-1.16.jar*
> get the execution plan
> {code:java}
> ...
> == Optimized Execution Plan ==
> Calc(select=[ip, PROCTIME_MATERIALIZE(proctime) AS proctime, ip0, type])
> +- LookupJoin(table=[bnpmp_mysql_test.gem_tmp.b], joinType=[LeftOuterJoin], 
> lookup=[type=0, ip=ip], where=[(type = 0)], select=[ip, proctime, ip, CAST(0 
> AS INTEGER) AS type, CAST(ip AS VARCHAR(2147483647)) AS ip0])
>    +- Calc(select=[ip, PROCTIME() AS proctime])
>       +- TableSourceScan(table=[[default_catalog, default_database, a]], 
> fields=[ip]) {code}
> with flink-connector-jdbc-3.1.1-1.17.jar,  the condition is 
> *lookup=[ip=ip]*
> with flink-connector-jdbc-3.0.0-1.16.jar ,  the condition is 
> *lookup=[type=0, ip=ip], where=[(type = 0)]*
>  
> In out real world production environment, this lead incorrect data output
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33365) Missing filter condition in execution plan containing lookup join with mysql jdbc connector

2023-12-19 Thread david radley (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33365?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17798552#comment-17798552
 ] 

david radley commented on FLINK-33365:
--

[~Sergey Nuyanzin] sounds great! Thank you for your support. I realise that I 
need to add tests that exercise the 2 types of filters, the one on the _join_ 
which can get nulls and the one in the _where_ that does not. I will push up 
new code for you to review, when I have completed the tests.   

> Missing filter condition in execution plan containing lookup join with mysql 
> jdbc connector
> ---
>
> Key: FLINK-33365
> URL: https://issues.apache.org/jira/browse/FLINK-33365
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / JDBC
>Affects Versions: 1.18.0, 1.17.1
> Environment: Flink 1.17.1 & Flink 1.18.0 with 
> flink-connector-jdbc-3.1.1-1.17.jar
>Reporter: macdoor615
>Assignee: david radley
>Priority: Critical
>  Labels: pull-request-available
> Attachments: flink-connector-jdbc-3.0.0-1.16.png, 
> flink-connector-jdbc-3.1.1-1.17.png
>
>
> create table in flink with sql-client.sh
> {code:java}
> CREATE TABLE default_catalog.default_database.a (
>   ip string, 
>   proctime as proctime()
> ) 
> WITH (
>   'connector' = 'datagen'
> );{code}
> create table in mysql
> {code:java}
> create table b (
>   ip varchar(20), 
>   type int
> );  {code}
>  
> Flink 1.17.1/ 1.18.0 and *flink-connector-jdbc-3.1.1-1.17.jar*
> excute in sql-client.sh 
> {code:java}
> explain SELECT * FROM default_catalog.default_database.a left join 
> bnpmp_mysql_test.gem_tmp.b FOR SYSTEM_TIME AS OF a.proctime on b.type = 0 and 
> a.ip = b.ip; {code}
> get the execution plan
> {code:java}
> ...
> == Optimized Execution Plan ==
> Calc(select=[ip, PROCTIME_MATERIALIZE(proctime) AS proctime, ip0, type])
> +- LookupJoin(table=[bnpmp_mysql_test.gem_tmp.b], joinType=[LeftOuterJoin], 
> lookup=[ip=ip], select=[ip, proctime, ip, CAST(0 AS INTEGER) AS type, CAST(ip 
> AS VARCHAR(2147483647)) AS ip0])
>    +- Calc(select=[ip, PROCTIME() AS proctime])
>       +- TableSourceScan(table=[[default_catalog, default_database, a]], 
> fields=[ip]){code}
>  
> excute same sql in sql-client with Flink 1.17.1/ 1.18.0 and 
> *flink-connector-jdbc-3.0.0-1.16.jar*
> get the execution plan
> {code:java}
> ...
> == Optimized Execution Plan ==
> Calc(select=[ip, PROCTIME_MATERIALIZE(proctime) AS proctime, ip0, type])
> +- LookupJoin(table=[bnpmp_mysql_test.gem_tmp.b], joinType=[LeftOuterJoin], 
> lookup=[type=0, ip=ip], where=[(type = 0)], select=[ip, proctime, ip, CAST(0 
> AS INTEGER) AS type, CAST(ip AS VARCHAR(2147483647)) AS ip0])
>    +- Calc(select=[ip, PROCTIME() AS proctime])
>       +- TableSourceScan(table=[[default_catalog, default_database, a]], 
> fields=[ip]) {code}
> with flink-connector-jdbc-3.1.1-1.17.jar,  the condition is 
> *lookup=[ip=ip]*
> with flink-connector-jdbc-3.0.0-1.16.jar ,  the condition is 
> *lookup=[type=0, ip=ip], where=[(type = 0)]*
>  
> In out real world production environment, this lead incorrect data output
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33365) Missing filter condition in execution plan containing lookup join with mysql jdbc connector

2023-12-19 Thread Sergey Nuyanzin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33365?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17798548#comment-17798548
 ] 

Sergey Nuyanzin commented on FLINK-33365:
-

glad that the issue is solved and thanks for working on it

{quote}
3) If we do not view this as a blocker - we could release without reverting the 
commit and continue investigating. 
{quote}
it was already mentioned in ML that it is a blocker [1] 
We could try to continue with option 2 and let's see whether we face something 
else or not


[1] https://lists.apache.org/thread/sdkm5qshqozow9sljz6c0qjft6kg9cwc

> Missing filter condition in execution plan containing lookup join with mysql 
> jdbc connector
> ---
>
> Key: FLINK-33365
> URL: https://issues.apache.org/jira/browse/FLINK-33365
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / JDBC
>Affects Versions: 1.18.0, 1.17.1
> Environment: Flink 1.17.1 & Flink 1.18.0 with 
> flink-connector-jdbc-3.1.1-1.17.jar
>Reporter: macdoor615
>Assignee: david radley
>Priority: Critical
>  Labels: pull-request-available
> Attachments: flink-connector-jdbc-3.0.0-1.16.png, 
> flink-connector-jdbc-3.1.1-1.17.png
>
>
> create table in flink with sql-client.sh
> {code:java}
> CREATE TABLE default_catalog.default_database.a (
>   ip string, 
>   proctime as proctime()
> ) 
> WITH (
>   'connector' = 'datagen'
> );{code}
> create table in mysql
> {code:java}
> create table b (
>   ip varchar(20), 
>   type int
> );  {code}
>  
> Flink 1.17.1/ 1.18.0 and *flink-connector-jdbc-3.1.1-1.17.jar*
> excute in sql-client.sh 
> {code:java}
> explain SELECT * FROM default_catalog.default_database.a left join 
> bnpmp_mysql_test.gem_tmp.b FOR SYSTEM_TIME AS OF a.proctime on b.type = 0 and 
> a.ip = b.ip; {code}
> get the execution plan
> {code:java}
> ...
> == Optimized Execution Plan ==
> Calc(select=[ip, PROCTIME_MATERIALIZE(proctime) AS proctime, ip0, type])
> +- LookupJoin(table=[bnpmp_mysql_test.gem_tmp.b], joinType=[LeftOuterJoin], 
> lookup=[ip=ip], select=[ip, proctime, ip, CAST(0 AS INTEGER) AS type, CAST(ip 
> AS VARCHAR(2147483647)) AS ip0])
>    +- Calc(select=[ip, PROCTIME() AS proctime])
>       +- TableSourceScan(table=[[default_catalog, default_database, a]], 
> fields=[ip]){code}
>  
> excute same sql in sql-client with Flink 1.17.1/ 1.18.0 and 
> *flink-connector-jdbc-3.0.0-1.16.jar*
> get the execution plan
> {code:java}
> ...
> == Optimized Execution Plan ==
> Calc(select=[ip, PROCTIME_MATERIALIZE(proctime) AS proctime, ip0, type])
> +- LookupJoin(table=[bnpmp_mysql_test.gem_tmp.b], joinType=[LeftOuterJoin], 
> lookup=[type=0, ip=ip], where=[(type = 0)], select=[ip, proctime, ip, CAST(0 
> AS INTEGER) AS type, CAST(ip AS VARCHAR(2147483647)) AS ip0])
>    +- Calc(select=[ip, PROCTIME() AS proctime])
>       +- TableSourceScan(table=[[default_catalog, default_database, a]], 
> fields=[ip]) {code}
> with flink-connector-jdbc-3.1.1-1.17.jar,  the condition is 
> *lookup=[ip=ip]*
> with flink-connector-jdbc-3.0.0-1.16.jar ,  the condition is 
> *lookup=[type=0, ip=ip], where=[(type = 0)]*
>  
> In out real world production environment, this lead incorrect data output
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33365) Missing filter condition in execution plan containing lookup join with mysql jdbc connector

2023-12-18 Thread david radley (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33365?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17798178#comment-17798178
 ] 

david radley commented on FLINK-33365:
--

[~Sergey Nuyanzin] from the discussion above. I am thinking the s issue 
was not really an issue, this was my misunderstanding. The options I see are: 

1) Do as you say, but this wouldn't this be regression for the jdbc driver 
without filters, as there is likely to be a performance impact without filter 
pushdown

2) We could complete the fix I have. I have it working locally with the OR 
case. I am looking at the implementing more test cases.I am struggling to make 
the unit test cases, to be defined in data with a for loop to execute them.  If 
you are ok with the style of unit tests I have, I could have this up today or 
tomorrow. 

3) If we do not view this as a blocker - we could release without reverting the 
commit and continue investigating. 

 

My preference would be option 2 - we are looking to have this fix in and would 
probably need to patch out code to pick it up.  But if this takes too long to 
get merged, option 3 would be an option.  

 

Strangely the config option to disable predicate pushdowns works for me. I was 
not expecting it to work as it should only effect the legacy style of predicate 
pushdown.  

 

 

 

> Missing filter condition in execution plan containing lookup join with mysql 
> jdbc connector
> ---
>
> Key: FLINK-33365
> URL: https://issues.apache.org/jira/browse/FLINK-33365
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / JDBC
>Affects Versions: 1.18.0, 1.17.1
> Environment: Flink 1.17.1 & Flink 1.18.0 with 
> flink-connector-jdbc-3.1.1-1.17.jar
>Reporter: macdoor615
>Assignee: david radley
>Priority: Critical
>  Labels: pull-request-available
> Attachments: flink-connector-jdbc-3.0.0-1.16.png, 
> flink-connector-jdbc-3.1.1-1.17.png
>
>
> create table in flink with sql-client.sh
> {code:java}
> CREATE TABLE default_catalog.default_database.a (
>   ip string, 
>   proctime as proctime()
> ) 
> WITH (
>   'connector' = 'datagen'
> );{code}
> create table in mysql
> {code:java}
> create table b (
>   ip varchar(20), 
>   type int
> );  {code}
>  
> Flink 1.17.1/ 1.18.0 and *flink-connector-jdbc-3.1.1-1.17.jar*
> excute in sql-client.sh 
> {code:java}
> explain SELECT * FROM default_catalog.default_database.a left join 
> bnpmp_mysql_test.gem_tmp.b FOR SYSTEM_TIME AS OF a.proctime on b.type = 0 and 
> a.ip = b.ip; {code}
> get the execution plan
> {code:java}
> ...
> == Optimized Execution Plan ==
> Calc(select=[ip, PROCTIME_MATERIALIZE(proctime) AS proctime, ip0, type])
> +- LookupJoin(table=[bnpmp_mysql_test.gem_tmp.b], joinType=[LeftOuterJoin], 
> lookup=[ip=ip], select=[ip, proctime, ip, CAST(0 AS INTEGER) AS type, CAST(ip 
> AS VARCHAR(2147483647)) AS ip0])
>    +- Calc(select=[ip, PROCTIME() AS proctime])
>       +- TableSourceScan(table=[[default_catalog, default_database, a]], 
> fields=[ip]){code}
>  
> excute same sql in sql-client with Flink 1.17.1/ 1.18.0 and 
> *flink-connector-jdbc-3.0.0-1.16.jar*
> get the execution plan
> {code:java}
> ...
> == Optimized Execution Plan ==
> Calc(select=[ip, PROCTIME_MATERIALIZE(proctime) AS proctime, ip0, type])
> +- LookupJoin(table=[bnpmp_mysql_test.gem_tmp.b], joinType=[LeftOuterJoin], 
> lookup=[type=0, ip=ip], where=[(type = 0)], select=[ip, proctime, ip, CAST(0 
> AS INTEGER) AS type, CAST(ip AS VARCHAR(2147483647)) AS ip0])
>    +- Calc(select=[ip, PROCTIME() AS proctime])
>       +- TableSourceScan(table=[[default_catalog, default_database, a]], 
> fields=[ip]) {code}
> with flink-connector-jdbc-3.1.1-1.17.jar,  the condition is 
> *lookup=[ip=ip]*
> with flink-connector-jdbc-3.0.0-1.16.jar ,  the condition is 
> *lookup=[type=0, ip=ip], where=[(type = 0)]*
>  
> In out real world production environment, this lead incorrect data output
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33365) Missing filter condition in execution plan containing lookup join with mysql jdbc connector

2023-12-18 Thread Benchao Li (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33365?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17798176#comment-17798176
 ] 

Benchao Li commented on FLINK-33365:


Yes, {{SELECT * FROM a left join mariadb_catalog.menagerie.e FOR SYSTEM_TIME AS 
OF a.proctime on a.ip = e.ip where e.age = 50}} should behaves as you expected. 
The semantic is different for filter in ON CLAUSE and WHERE CLAUSE when you are 
using LEFT JOIN. (For inner join, it will be the same)

> Missing filter condition in execution plan containing lookup join with mysql 
> jdbc connector
> ---
>
> Key: FLINK-33365
> URL: https://issues.apache.org/jira/browse/FLINK-33365
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / JDBC
>Affects Versions: 1.18.0, 1.17.1
> Environment: Flink 1.17.1 & Flink 1.18.0 with 
> flink-connector-jdbc-3.1.1-1.17.jar
>Reporter: macdoor615
>Assignee: david radley
>Priority: Critical
>  Labels: pull-request-available
> Attachments: flink-connector-jdbc-3.0.0-1.16.png, 
> flink-connector-jdbc-3.1.1-1.17.png
>
>
> create table in flink with sql-client.sh
> {code:java}
> CREATE TABLE default_catalog.default_database.a (
>   ip string, 
>   proctime as proctime()
> ) 
> WITH (
>   'connector' = 'datagen'
> );{code}
> create table in mysql
> {code:java}
> create table b (
>   ip varchar(20), 
>   type int
> );  {code}
>  
> Flink 1.17.1/ 1.18.0 and *flink-connector-jdbc-3.1.1-1.17.jar*
> excute in sql-client.sh 
> {code:java}
> explain SELECT * FROM default_catalog.default_database.a left join 
> bnpmp_mysql_test.gem_tmp.b FOR SYSTEM_TIME AS OF a.proctime on b.type = 0 and 
> a.ip = b.ip; {code}
> get the execution plan
> {code:java}
> ...
> == Optimized Execution Plan ==
> Calc(select=[ip, PROCTIME_MATERIALIZE(proctime) AS proctime, ip0, type])
> +- LookupJoin(table=[bnpmp_mysql_test.gem_tmp.b], joinType=[LeftOuterJoin], 
> lookup=[ip=ip], select=[ip, proctime, ip, CAST(0 AS INTEGER) AS type, CAST(ip 
> AS VARCHAR(2147483647)) AS ip0])
>    +- Calc(select=[ip, PROCTIME() AS proctime])
>       +- TableSourceScan(table=[[default_catalog, default_database, a]], 
> fields=[ip]){code}
>  
> excute same sql in sql-client with Flink 1.17.1/ 1.18.0 and 
> *flink-connector-jdbc-3.0.0-1.16.jar*
> get the execution plan
> {code:java}
> ...
> == Optimized Execution Plan ==
> Calc(select=[ip, PROCTIME_MATERIALIZE(proctime) AS proctime, ip0, type])
> +- LookupJoin(table=[bnpmp_mysql_test.gem_tmp.b], joinType=[LeftOuterJoin], 
> lookup=[type=0, ip=ip], where=[(type = 0)], select=[ip, proctime, ip, CAST(0 
> AS INTEGER) AS type, CAST(ip AS VARCHAR(2147483647)) AS ip0])
>    +- Calc(select=[ip, PROCTIME() AS proctime])
>       +- TableSourceScan(table=[[default_catalog, default_database, a]], 
> fields=[ip]) {code}
> with flink-connector-jdbc-3.1.1-1.17.jar,  the condition is 
> *lookup=[ip=ip]*
> with flink-connector-jdbc-3.0.0-1.16.jar ,  the condition is 
> *lookup=[type=0, ip=ip], where=[(type = 0)]*
>  
> In out real world production environment, this lead incorrect data output
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33365) Missing filter condition in execution plan containing lookup join with mysql jdbc connector

2023-12-18 Thread david radley (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33365?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17798165#comment-17798165
 ] 

david radley commented on FLINK-33365:
--

[~libenchao]  thanks for the quick feedback - I guess it is where the filter is 
applied - I was expecting the filter to be applied to the table then we join. 
You are saying we join and show non null values where the filter matches.  I 
have done some testing on sqiflddle, it looks like you are right. If I wanted 
the bevaviour, I was expecting I should use: 

SELECT * FROM a left join mariadb_catalog.menagerie.e FOR SYSTEM_TIME AS OF 
a.proctime on a.ip = e.ip _where_  e.age = 50 ;

 

 

 

 

> Missing filter condition in execution plan containing lookup join with mysql 
> jdbc connector
> ---
>
> Key: FLINK-33365
> URL: https://issues.apache.org/jira/browse/FLINK-33365
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / JDBC
>Affects Versions: 1.18.0, 1.17.1
> Environment: Flink 1.17.1 & Flink 1.18.0 with 
> flink-connector-jdbc-3.1.1-1.17.jar
>Reporter: macdoor615
>Assignee: david radley
>Priority: Critical
>  Labels: pull-request-available
> Attachments: flink-connector-jdbc-3.0.0-1.16.png, 
> flink-connector-jdbc-3.1.1-1.17.png
>
>
> create table in flink with sql-client.sh
> {code:java}
> CREATE TABLE default_catalog.default_database.a (
>   ip string, 
>   proctime as proctime()
> ) 
> WITH (
>   'connector' = 'datagen'
> );{code}
> create table in mysql
> {code:java}
> create table b (
>   ip varchar(20), 
>   type int
> );  {code}
>  
> Flink 1.17.1/ 1.18.0 and *flink-connector-jdbc-3.1.1-1.17.jar*
> excute in sql-client.sh 
> {code:java}
> explain SELECT * FROM default_catalog.default_database.a left join 
> bnpmp_mysql_test.gem_tmp.b FOR SYSTEM_TIME AS OF a.proctime on b.type = 0 and 
> a.ip = b.ip; {code}
> get the execution plan
> {code:java}
> ...
> == Optimized Execution Plan ==
> Calc(select=[ip, PROCTIME_MATERIALIZE(proctime) AS proctime, ip0, type])
> +- LookupJoin(table=[bnpmp_mysql_test.gem_tmp.b], joinType=[LeftOuterJoin], 
> lookup=[ip=ip], select=[ip, proctime, ip, CAST(0 AS INTEGER) AS type, CAST(ip 
> AS VARCHAR(2147483647)) AS ip0])
>    +- Calc(select=[ip, PROCTIME() AS proctime])
>       +- TableSourceScan(table=[[default_catalog, default_database, a]], 
> fields=[ip]){code}
>  
> excute same sql in sql-client with Flink 1.17.1/ 1.18.0 and 
> *flink-connector-jdbc-3.0.0-1.16.jar*
> get the execution plan
> {code:java}
> ...
> == Optimized Execution Plan ==
> Calc(select=[ip, PROCTIME_MATERIALIZE(proctime) AS proctime, ip0, type])
> +- LookupJoin(table=[bnpmp_mysql_test.gem_tmp.b], joinType=[LeftOuterJoin], 
> lookup=[type=0, ip=ip], where=[(type = 0)], select=[ip, proctime, ip, CAST(0 
> AS INTEGER) AS type, CAST(ip AS VARCHAR(2147483647)) AS ip0])
>    +- Calc(select=[ip, PROCTIME() AS proctime])
>       +- TableSourceScan(table=[[default_catalog, default_database, a]], 
> fields=[ip]) {code}
> with flink-connector-jdbc-3.1.1-1.17.jar,  the condition is 
> *lookup=[ip=ip]*
> with flink-connector-jdbc-3.0.0-1.16.jar ,  the condition is 
> *lookup=[type=0, ip=ip], where=[(type = 0)]*
>  
> In out real world production environment, this lead incorrect data output
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33365) Missing filter condition in execution plan containing lookup join with mysql jdbc connector

2023-12-18 Thread Benchao Li (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33365?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17798131#comment-17798131
 ] 

Benchao Li commented on FLINK-33365:


[~davidradl] I didn't see a problem in the example you gave in this thread: 
https://lists.apache.org/thread/7r49c5ffohz1oqm3oz2g60g5lnovq0l2. 

The result is correct since you are using 'LEFT JOIN', for the records from 
left table which does not have matching rows, it should emit the line, and pad 
nulls for fields in right table. Am I missing something?

The behavior should be the same when pushdown is enabled or not as you stated 
in previously.

> Missing filter condition in execution plan containing lookup join with mysql 
> jdbc connector
> ---
>
> Key: FLINK-33365
> URL: https://issues.apache.org/jira/browse/FLINK-33365
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / JDBC
>Affects Versions: 1.18.0, 1.17.1
> Environment: Flink 1.17.1 & Flink 1.18.0 with 
> flink-connector-jdbc-3.1.1-1.17.jar
>Reporter: macdoor615
>Assignee: david radley
>Priority: Critical
>  Labels: pull-request-available
> Attachments: flink-connector-jdbc-3.0.0-1.16.png, 
> flink-connector-jdbc-3.1.1-1.17.png
>
>
> create table in flink with sql-client.sh
> {code:java}
> CREATE TABLE default_catalog.default_database.a (
>   ip string, 
>   proctime as proctime()
> ) 
> WITH (
>   'connector' = 'datagen'
> );{code}
> create table in mysql
> {code:java}
> create table b (
>   ip varchar(20), 
>   type int
> );  {code}
>  
> Flink 1.17.1/ 1.18.0 and *flink-connector-jdbc-3.1.1-1.17.jar*
> excute in sql-client.sh 
> {code:java}
> explain SELECT * FROM default_catalog.default_database.a left join 
> bnpmp_mysql_test.gem_tmp.b FOR SYSTEM_TIME AS OF a.proctime on b.type = 0 and 
> a.ip = b.ip; {code}
> get the execution plan
> {code:java}
> ...
> == Optimized Execution Plan ==
> Calc(select=[ip, PROCTIME_MATERIALIZE(proctime) AS proctime, ip0, type])
> +- LookupJoin(table=[bnpmp_mysql_test.gem_tmp.b], joinType=[LeftOuterJoin], 
> lookup=[ip=ip], select=[ip, proctime, ip, CAST(0 AS INTEGER) AS type, CAST(ip 
> AS VARCHAR(2147483647)) AS ip0])
>    +- Calc(select=[ip, PROCTIME() AS proctime])
>       +- TableSourceScan(table=[[default_catalog, default_database, a]], 
> fields=[ip]){code}
>  
> excute same sql in sql-client with Flink 1.17.1/ 1.18.0 and 
> *flink-connector-jdbc-3.0.0-1.16.jar*
> get the execution plan
> {code:java}
> ...
> == Optimized Execution Plan ==
> Calc(select=[ip, PROCTIME_MATERIALIZE(proctime) AS proctime, ip0, type])
> +- LookupJoin(table=[bnpmp_mysql_test.gem_tmp.b], joinType=[LeftOuterJoin], 
> lookup=[type=0, ip=ip], where=[(type = 0)], select=[ip, proctime, ip, CAST(0 
> AS INTEGER) AS type, CAST(ip AS VARCHAR(2147483647)) AS ip0])
>    +- Calc(select=[ip, PROCTIME() AS proctime])
>       +- TableSourceScan(table=[[default_catalog, default_database, a]], 
> fields=[ip]) {code}
> with flink-connector-jdbc-3.1.1-1.17.jar,  the condition is 
> *lookup=[ip=ip]*
> with flink-connector-jdbc-3.0.0-1.16.jar ,  the condition is 
> *lookup=[type=0, ip=ip], where=[(type = 0)]*
>  
> In out real world production environment, this lead incorrect data output
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33365) Missing filter condition in execution plan containing lookup join with mysql jdbc connector

2023-12-18 Thread Sergey Nuyanzin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33365?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17798105#comment-17798105
 ] 

Sergey Nuyanzin commented on FLINK-33365:
-

Since 
1. predicate push down from one side brings so many issues and probably there 
is something else which is not known yet.
2. IIRC this issue was a blocker before jdbc connector release

I tend to think 
1. to revert push down commit 
((https://github.com/apache/flink-connector-jdbc/commit/3e3b40e8cfadfc16a8ab74d4ef6a3ab3ceafa57b)
2. make a release
3. continue working on pushdown

what do you think?



> Missing filter condition in execution plan containing lookup join with mysql 
> jdbc connector
> ---
>
> Key: FLINK-33365
> URL: https://issues.apache.org/jira/browse/FLINK-33365
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / JDBC
>Affects Versions: 1.18.0, 1.17.1
> Environment: Flink 1.17.1 & Flink 1.18.0 with 
> flink-connector-jdbc-3.1.1-1.17.jar
>Reporter: macdoor615
>Assignee: david radley
>Priority: Critical
>  Labels: pull-request-available
> Attachments: flink-connector-jdbc-3.0.0-1.16.png, 
> flink-connector-jdbc-3.1.1-1.17.png
>
>
> create table in flink with sql-client.sh
> {code:java}
> CREATE TABLE default_catalog.default_database.a (
>   ip string, 
>   proctime as proctime()
> ) 
> WITH (
>   'connector' = 'datagen'
> );{code}
> create table in mysql
> {code:java}
> create table b (
>   ip varchar(20), 
>   type int
> );  {code}
>  
> Flink 1.17.1/ 1.18.0 and *flink-connector-jdbc-3.1.1-1.17.jar*
> excute in sql-client.sh 
> {code:java}
> explain SELECT * FROM default_catalog.default_database.a left join 
> bnpmp_mysql_test.gem_tmp.b FOR SYSTEM_TIME AS OF a.proctime on b.type = 0 and 
> a.ip = b.ip; {code}
> get the execution plan
> {code:java}
> ...
> == Optimized Execution Plan ==
> Calc(select=[ip, PROCTIME_MATERIALIZE(proctime) AS proctime, ip0, type])
> +- LookupJoin(table=[bnpmp_mysql_test.gem_tmp.b], joinType=[LeftOuterJoin], 
> lookup=[ip=ip], select=[ip, proctime, ip, CAST(0 AS INTEGER) AS type, CAST(ip 
> AS VARCHAR(2147483647)) AS ip0])
>    +- Calc(select=[ip, PROCTIME() AS proctime])
>       +- TableSourceScan(table=[[default_catalog, default_database, a]], 
> fields=[ip]){code}
>  
> excute same sql in sql-client with Flink 1.17.1/ 1.18.0 and 
> *flink-connector-jdbc-3.0.0-1.16.jar*
> get the execution plan
> {code:java}
> ...
> == Optimized Execution Plan ==
> Calc(select=[ip, PROCTIME_MATERIALIZE(proctime) AS proctime, ip0, type])
> +- LookupJoin(table=[bnpmp_mysql_test.gem_tmp.b], joinType=[LeftOuterJoin], 
> lookup=[type=0, ip=ip], where=[(type = 0)], select=[ip, proctime, ip, CAST(0 
> AS INTEGER) AS type, CAST(ip AS VARCHAR(2147483647)) AS ip0])
>    +- Calc(select=[ip, PROCTIME() AS proctime])
>       +- TableSourceScan(table=[[default_catalog, default_database, a]], 
> fields=[ip]) {code}
> with flink-connector-jdbc-3.1.1-1.17.jar,  the condition is 
> *lookup=[ip=ip]*
> with flink-connector-jdbc-3.0.0-1.16.jar ,  the condition is 
> *lookup=[type=0, ip=ip], where=[(type = 0)]*
>  
> In out real world production environment, this lead incorrect data output
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33365) Missing filter condition in execution plan containing lookup join with mysql jdbc connector

2023-12-18 Thread david radley (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33365?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17798091#comment-17798091
 ] 

david radley commented on FLINK-33365:
--

[~libenchao] [~Sergey Nuyanzin] Bringing back a discussion onto the issue. I 
have many of the scenarios working including the Or scenario. There are 
scenarios

like the one [https://lists.apache.org/thread/7r49c5ffohz1oqm3oz2g60g5lnovq0l2] 
if the jdbc lookup source has rows that do not match the filter, then nulls 
come out in the lookup join resultset. The same behaviour occurs with predicate 
pushdown disabled and with my current patch (when I push it up).

On the discussion thread [~ruanhang1993]  agreed that the nulls are not correct.

 

The way I am reading this is that, unless we have a reason to see these nulls, 
there is an issue in lookup joins in certain scenarios. It does not appear to 
relate to JDBC. Are we ok if I make this fix work the same as when predicate 
pushdown is not enabled. And raise a separate issue for the lookup join which 
appears not to be JDBC related.  

 

 

 

> Missing filter condition in execution plan containing lookup join with mysql 
> jdbc connector
> ---
>
> Key: FLINK-33365
> URL: https://issues.apache.org/jira/browse/FLINK-33365
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / JDBC
>Affects Versions: 1.18.0, 1.17.1
> Environment: Flink 1.17.1 & Flink 1.18.0 with 
> flink-connector-jdbc-3.1.1-1.17.jar
>Reporter: macdoor615
>Assignee: david radley
>Priority: Critical
>  Labels: pull-request-available
> Attachments: flink-connector-jdbc-3.0.0-1.16.png, 
> flink-connector-jdbc-3.1.1-1.17.png
>
>
> create table in flink with sql-client.sh
> {code:java}
> CREATE TABLE default_catalog.default_database.a (
>   ip string, 
>   proctime as proctime()
> ) 
> WITH (
>   'connector' = 'datagen'
> );{code}
> create table in mysql
> {code:java}
> create table b (
>   ip varchar(20), 
>   type int
> );  {code}
>  
> Flink 1.17.1/ 1.18.0 and *flink-connector-jdbc-3.1.1-1.17.jar*
> excute in sql-client.sh 
> {code:java}
> explain SELECT * FROM default_catalog.default_database.a left join 
> bnpmp_mysql_test.gem_tmp.b FOR SYSTEM_TIME AS OF a.proctime on b.type = 0 and 
> a.ip = b.ip; {code}
> get the execution plan
> {code:java}
> ...
> == Optimized Execution Plan ==
> Calc(select=[ip, PROCTIME_MATERIALIZE(proctime) AS proctime, ip0, type])
> +- LookupJoin(table=[bnpmp_mysql_test.gem_tmp.b], joinType=[LeftOuterJoin], 
> lookup=[ip=ip], select=[ip, proctime, ip, CAST(0 AS INTEGER) AS type, CAST(ip 
> AS VARCHAR(2147483647)) AS ip0])
>    +- Calc(select=[ip, PROCTIME() AS proctime])
>       +- TableSourceScan(table=[[default_catalog, default_database, a]], 
> fields=[ip]){code}
>  
> excute same sql in sql-client with Flink 1.17.1/ 1.18.0 and 
> *flink-connector-jdbc-3.0.0-1.16.jar*
> get the execution plan
> {code:java}
> ...
> == Optimized Execution Plan ==
> Calc(select=[ip, PROCTIME_MATERIALIZE(proctime) AS proctime, ip0, type])
> +- LookupJoin(table=[bnpmp_mysql_test.gem_tmp.b], joinType=[LeftOuterJoin], 
> lookup=[type=0, ip=ip], where=[(type = 0)], select=[ip, proctime, ip, CAST(0 
> AS INTEGER) AS type, CAST(ip AS VARCHAR(2147483647)) AS ip0])
>    +- Calc(select=[ip, PROCTIME() AS proctime])
>       +- TableSourceScan(table=[[default_catalog, default_database, a]], 
> fields=[ip]) {code}
> with flink-connector-jdbc-3.1.1-1.17.jar,  the condition is 
> *lookup=[ip=ip]*
> with flink-connector-jdbc-3.0.0-1.16.jar ,  the condition is 
> *lookup=[type=0, ip=ip], where=[(type = 0)]*
>  
> In out real world production environment, this lead incorrect data output
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33365) Missing filter condition in execution plan containing lookup join with mysql jdbc connector

2023-11-20 Thread Benchao Li (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33365?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17788250#comment-17788250
 ] 

Benchao Li commented on FLINK-33365:


[~davidradl] Is the PR ready to review now? Since we've reached an agreement on 
the general idea, let's move to the PR for further discussions.

> Missing filter condition in execution plan containing lookup join with mysql 
> jdbc connector
> ---
>
> Key: FLINK-33365
> URL: https://issues.apache.org/jira/browse/FLINK-33365
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / JDBC
>Affects Versions: 1.18.0, 1.17.1
> Environment: Flink 1.17.1 & Flink 1.18.0 with 
> flink-connector-jdbc-3.1.1-1.17.jar
>Reporter: macdoor615
>Assignee: david radley
>Priority: Critical
>  Labels: pull-request-available
> Attachments: flink-connector-jdbc-3.0.0-1.16.png, 
> flink-connector-jdbc-3.1.1-1.17.png
>
>
> create table in flink with sql-client.sh
> {code:java}
> CREATE TABLE default_catalog.default_database.a (
>   ip string, 
>   proctime as proctime()
> ) 
> WITH (
>   'connector' = 'datagen'
> );{code}
> create table in mysql
> {code:java}
> create table b (
>   ip varchar(20), 
>   type int
> );  {code}
>  
> Flink 1.17.1/ 1.18.0 and *flink-connector-jdbc-3.1.1-1.17.jar*
> excute in sql-client.sh 
> {code:java}
> explain SELECT * FROM default_catalog.default_database.a left join 
> bnpmp_mysql_test.gem_tmp.b FOR SYSTEM_TIME AS OF a.proctime on b.type = 0 and 
> a.ip = b.ip; {code}
> get the execution plan
> {code:java}
> ...
> == Optimized Execution Plan ==
> Calc(select=[ip, PROCTIME_MATERIALIZE(proctime) AS proctime, ip0, type])
> +- LookupJoin(table=[bnpmp_mysql_test.gem_tmp.b], joinType=[LeftOuterJoin], 
> lookup=[ip=ip], select=[ip, proctime, ip, CAST(0 AS INTEGER) AS type, CAST(ip 
> AS VARCHAR(2147483647)) AS ip0])
>    +- Calc(select=[ip, PROCTIME() AS proctime])
>       +- TableSourceScan(table=[[default_catalog, default_database, a]], 
> fields=[ip]){code}
>  
> excute same sql in sql-client with Flink 1.17.1/ 1.18.0 and 
> *flink-connector-jdbc-3.0.0-1.16.jar*
> get the execution plan
> {code:java}
> ...
> == Optimized Execution Plan ==
> Calc(select=[ip, PROCTIME_MATERIALIZE(proctime) AS proctime, ip0, type])
> +- LookupJoin(table=[bnpmp_mysql_test.gem_tmp.b], joinType=[LeftOuterJoin], 
> lookup=[type=0, ip=ip], where=[(type = 0)], select=[ip, proctime, ip, CAST(0 
> AS INTEGER) AS type, CAST(ip AS VARCHAR(2147483647)) AS ip0])
>    +- Calc(select=[ip, PROCTIME() AS proctime])
>       +- TableSourceScan(table=[[default_catalog, default_database, a]], 
> fields=[ip]) {code}
> with flink-connector-jdbc-3.1.1-1.17.jar,  the condition is 
> *lookup=[ip=ip]*
> with flink-connector-jdbc-3.0.0-1.16.jar ,  the condition is 
> *lookup=[type=0, ip=ip], where=[(type = 0)]*
>  
> In out real world production environment, this lead incorrect data output
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33365) Missing filter condition in execution plan containing lookup join with mysql jdbc connector

2023-11-17 Thread david radley (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33365?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17787123#comment-17787123
 ] 

david radley commented on FLINK-33365:
--

[~libenchao] 

fyi this fix asis appears to have regressed when there are 2 lookup key 
conditions and no filters. I am investigating.  

> Missing filter condition in execution plan containing lookup join with mysql 
> jdbc connector
> ---
>
> Key: FLINK-33365
> URL: https://issues.apache.org/jira/browse/FLINK-33365
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / JDBC
>Affects Versions: 1.18.0, 1.17.1
> Environment: Flink 1.17.1 & Flink 1.18.0 with 
> flink-connector-jdbc-3.1.1-1.17.jar
>Reporter: macdoor615
>Assignee: david radley
>Priority: Critical
>  Labels: pull-request-available
> Attachments: flink-connector-jdbc-3.0.0-1.16.png, 
> flink-connector-jdbc-3.1.1-1.17.png
>
>
> create table in flink with sql-client.sh
> {code:java}
> CREATE TABLE default_catalog.default_database.a (
>   ip string, 
>   proctime as proctime()
> ) 
> WITH (
>   'connector' = 'datagen'
> );{code}
> create table in mysql
> {code:java}
> create table b (
>   ip varchar(20), 
>   type int
> );  {code}
>  
> Flink 1.17.1/ 1.18.0 and *flink-connector-jdbc-3.1.1-1.17.jar*
> excute in sql-client.sh 
> {code:java}
> explain SELECT * FROM default_catalog.default_database.a left join 
> bnpmp_mysql_test.gem_tmp.b FOR SYSTEM_TIME AS OF a.proctime on b.type = 0 and 
> a.ip = b.ip; {code}
> get the execution plan
> {code:java}
> ...
> == Optimized Execution Plan ==
> Calc(select=[ip, PROCTIME_MATERIALIZE(proctime) AS proctime, ip0, type])
> +- LookupJoin(table=[bnpmp_mysql_test.gem_tmp.b], joinType=[LeftOuterJoin], 
> lookup=[ip=ip], select=[ip, proctime, ip, CAST(0 AS INTEGER) AS type, CAST(ip 
> AS VARCHAR(2147483647)) AS ip0])
>    +- Calc(select=[ip, PROCTIME() AS proctime])
>       +- TableSourceScan(table=[[default_catalog, default_database, a]], 
> fields=[ip]){code}
>  
> excute same sql in sql-client with Flink 1.17.1/ 1.18.0 and 
> *flink-connector-jdbc-3.0.0-1.16.jar*
> get the execution plan
> {code:java}
> ...
> == Optimized Execution Plan ==
> Calc(select=[ip, PROCTIME_MATERIALIZE(proctime) AS proctime, ip0, type])
> +- LookupJoin(table=[bnpmp_mysql_test.gem_tmp.b], joinType=[LeftOuterJoin], 
> lookup=[type=0, ip=ip], where=[(type = 0)], select=[ip, proctime, ip, CAST(0 
> AS INTEGER) AS type, CAST(ip AS VARCHAR(2147483647)) AS ip0])
>    +- Calc(select=[ip, PROCTIME() AS proctime])
>       +- TableSourceScan(table=[[default_catalog, default_database, a]], 
> fields=[ip]) {code}
> with flink-connector-jdbc-3.1.1-1.17.jar,  the condition is 
> *lookup=[ip=ip]*
> with flink-connector-jdbc-3.0.0-1.16.jar ,  the condition is 
> *lookup=[type=0, ip=ip], where=[(type = 0)]*
>  
> In out real world production environment, this lead incorrect data output
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33365) Missing filter condition in execution plan containing lookup join with mysql jdbc connector

2023-11-15 Thread david radley (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33365?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17786427#comment-17786427
 ] 

david radley commented on FLINK-33365:
--

[~libenchao] 

I have moved the code from the AbstractDialog and pushed up the change. I could 
not see how to get a  _PreparedStatement_ to be able to set the condition 
on{_}.{_} Please could you give me some pointers.

 

I have successfully tested using the supplied test tables:
 * other simple predicates work
 * multiple simple predicates work

 

I added extra tests to JdbcDynamicTableSourceITCase but cannot test those 
changes as I get errors locally when running the tests without my changes. Any 
pointers would be great I am running on a Mac.  

 

I wanted to test 2 look up keys, so I created a new table d (on MariaDB) and e 
(on paimon)  and set up the following tables and did the some joins with 
filters and a join with multiple keys. The results do not look right to me (but 
I may be misunderstanding) - but want to check with you: 

select * from mariadb_catalog.menagerie.d ;

+++-+-+

| op |                             ip |        type |         age |

+++-+-+

| +I |                    10.10.10.10 |           1 |          30 |

| +I |                    10.10.10.10 |           2 |          40 |

| +I |                    10.10.10.10 |           2 |          50 |

| +I |                    10.10.10.10 |           3 |          50 |

+++-+-+

Received a total of 4 rows

 

Flink SQL> select * from e; 

+++-+-+

| op |                             ip |         age |                proctime |

+++-+-+

| +I |                    10.10.10.10 |          40 | 2023-11-15 16:12:57.553 |

| +I |                    10.10.10.10 |          50 | 2023-11-15 16:12:57.554 |

 

Flink SQL> SELECT * FROM e left join mariadb_catalog.menagerie.d FOR 
SYSTEM_TIME AS OF e.proctime on d.type = 2 and d.age = 50 and e.ip = d.ip;

+++-+-++-+-+

| op |                             ip |         age |                proctime | 
                           ip0 |        type |        age0 |

+++-+-++-+-+

| +I |                    10.10.10.10 |          40 | 2023-11-15 16:08:40.973 | 
                   10.10.10.10 |           2 |          50 |

| +I |                    10.10.10.10 |          50 | 2023-11-15 16:08:40.974 | 
                   10.10.10.10 |           2 |          50 |

 

Is this what you would expect? 

 

Also I get

SELECT * FROM e left join mariadb_catalog.menagerie.d FOR SYSTEM_TIME AS OF 
e.proctime on  e.age = d.age;

+++-+-++-+-+

| op |                             ip |         age |                proctime | 
                           ip0 |        type |        age0 |

+++-+-++-+-+

^CQuery terminated, received a total of 0 row

 

and

SELECT * FROM e left join mariadb_catalog.menagerie.d FOR SYSTEM_TIME AS OF 
e.proctime on  e.age = d.age and d.ip = e.ip;

+++-+-++-+-+

| op |                             ip |         age |                proctime | 
                           ip0 |        type |        age0 |

+++-+-++-+-+

^CQuery terminated, received a total of 0 row

 

 

 

 

 

 

 

 

 

 

 

 

 

> Missing filter condition in execution plan containing lookup join with mysql 
> jdbc connector
> ---
>
> Key: FLINK-33365
> URL: https://issues.apache.org/jira/browse/FLINK-33365
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / JDBC
>Affects Versions: 1.18.0, 1.17.1
> Environment: Flink 1.17.1 & Flink 1.18.0 with 
> flink-connector-jdbc-3.1.1-1.17.jar
>Reporter: macdoor615
>Assignee: david radley
>Priority: Critical
> Attachments: flink-connector-jdbc-3.0.0-1.16.png, 
> 

[jira] [Commented] (FLINK-33365) Missing filter condition in execution plan containing lookup join with mysql jdbc connector

2023-11-15 Thread david radley (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33365?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17786355#comment-17786355
 ] 

david radley commented on FLINK-33365:
--

[~libenchao] Thanks for the quick feedback . I agree, I was thinking the second 
point you make will make it more in line with the scan logic, where it appends 
to the select statement with the where clauses. I was not aware of the 
_PreparedStatement_ thanks for pointing me to it :)  

> Missing filter condition in execution plan containing lookup join with mysql 
> jdbc connector
> ---
>
> Key: FLINK-33365
> URL: https://issues.apache.org/jira/browse/FLINK-33365
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / JDBC
>Affects Versions: 1.18.0, 1.17.1
> Environment: Flink 1.17.1 & Flink 1.18.0 with 
> flink-connector-jdbc-3.1.1-1.17.jar
>Reporter: macdoor615
>Assignee: david radley
>Priority: Critical
> Attachments: flink-connector-jdbc-3.0.0-1.16.png, 
> flink-connector-jdbc-3.1.1-1.17.png
>
>
> create table in flink with sql-client.sh
> {code:java}
> CREATE TABLE default_catalog.default_database.a (
>   ip string, 
>   proctime as proctime()
> ) 
> WITH (
>   'connector' = 'datagen'
> );{code}
> create table in mysql
> {code:java}
> create table b (
>   ip varchar(20), 
>   type int
> );  {code}
>  
> Flink 1.17.1/ 1.18.0 and *flink-connector-jdbc-3.1.1-1.17.jar*
> excute in sql-client.sh 
> {code:java}
> explain SELECT * FROM default_catalog.default_database.a left join 
> bnpmp_mysql_test.gem_tmp.b FOR SYSTEM_TIME AS OF a.proctime on b.type = 0 and 
> a.ip = b.ip; {code}
> get the execution plan
> {code:java}
> ...
> == Optimized Execution Plan ==
> Calc(select=[ip, PROCTIME_MATERIALIZE(proctime) AS proctime, ip0, type])
> +- LookupJoin(table=[bnpmp_mysql_test.gem_tmp.b], joinType=[LeftOuterJoin], 
> lookup=[ip=ip], select=[ip, proctime, ip, CAST(0 AS INTEGER) AS type, CAST(ip 
> AS VARCHAR(2147483647)) AS ip0])
>    +- Calc(select=[ip, PROCTIME() AS proctime])
>       +- TableSourceScan(table=[[default_catalog, default_database, a]], 
> fields=[ip]){code}
>  
> excute same sql in sql-client with Flink 1.17.1/ 1.18.0 and 
> *flink-connector-jdbc-3.0.0-1.16.jar*
> get the execution plan
> {code:java}
> ...
> == Optimized Execution Plan ==
> Calc(select=[ip, PROCTIME_MATERIALIZE(proctime) AS proctime, ip0, type])
> +- LookupJoin(table=[bnpmp_mysql_test.gem_tmp.b], joinType=[LeftOuterJoin], 
> lookup=[type=0, ip=ip], where=[(type = 0)], select=[ip, proctime, ip, CAST(0 
> AS INTEGER) AS type, CAST(ip AS VARCHAR(2147483647)) AS ip0])
>    +- Calc(select=[ip, PROCTIME() AS proctime])
>       +- TableSourceScan(table=[[default_catalog, default_database, a]], 
> fields=[ip]) {code}
> with flink-connector-jdbc-3.1.1-1.17.jar,  the condition is 
> *lookup=[ip=ip]*
> with flink-connector-jdbc-3.0.0-1.16.jar ,  the condition is 
> *lookup=[type=0, ip=ip], where=[(type = 0)]*
>  
> In out real world production environment, this lead incorrect data output
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33365) Missing filter condition in execution plan containing lookup join with mysql jdbc connector

2023-11-14 Thread Benchao Li (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33365?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17786165#comment-17786165
 ] 

Benchao Li commented on FLINK-33365:


[~davidradl] I take a quick glance at the PR, it looks generally good, two 
cents from my point of view:
* We'd better to use `PreparedStatement.setXXX` to pass the literals into it, 
this will make us free from different SQL dialects behaviors about how to 
serialize literals into SQL
* It may be better to append the extra conditions in 
`JdbcRowDataLookupFunction` itself, instead of touching `JdbcDialect` which is 
a public API

> Missing filter condition in execution plan containing lookup join with mysql 
> jdbc connector
> ---
>
> Key: FLINK-33365
> URL: https://issues.apache.org/jira/browse/FLINK-33365
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / JDBC
>Affects Versions: 1.18.0, 1.17.1
> Environment: Flink 1.17.1 & Flink 1.18.0 with 
> flink-connector-jdbc-3.1.1-1.17.jar
>Reporter: macdoor615
>Assignee: david radley
>Priority: Critical
> Attachments: flink-connector-jdbc-3.0.0-1.16.png, 
> flink-connector-jdbc-3.1.1-1.17.png
>
>
> create table in flink with sql-client.sh
> {code:java}
> CREATE TABLE default_catalog.default_database.a (
>   ip string, 
>   proctime as proctime()
> ) 
> WITH (
>   'connector' = 'datagen'
> );{code}
> create table in mysql
> {code:java}
> create table b (
>   ip varchar(20), 
>   type int
> );  {code}
>  
> Flink 1.17.1/ 1.18.0 and *flink-connector-jdbc-3.1.1-1.17.jar*
> excute in sql-client.sh 
> {code:java}
> explain SELECT * FROM default_catalog.default_database.a left join 
> bnpmp_mysql_test.gem_tmp.b FOR SYSTEM_TIME AS OF a.proctime on b.type = 0 and 
> a.ip = b.ip; {code}
> get the execution plan
> {code:java}
> ...
> == Optimized Execution Plan ==
> Calc(select=[ip, PROCTIME_MATERIALIZE(proctime) AS proctime, ip0, type])
> +- LookupJoin(table=[bnpmp_mysql_test.gem_tmp.b], joinType=[LeftOuterJoin], 
> lookup=[ip=ip], select=[ip, proctime, ip, CAST(0 AS INTEGER) AS type, CAST(ip 
> AS VARCHAR(2147483647)) AS ip0])
>    +- Calc(select=[ip, PROCTIME() AS proctime])
>       +- TableSourceScan(table=[[default_catalog, default_database, a]], 
> fields=[ip]){code}
>  
> excute same sql in sql-client with Flink 1.17.1/ 1.18.0 and 
> *flink-connector-jdbc-3.0.0-1.16.jar*
> get the execution plan
> {code:java}
> ...
> == Optimized Execution Plan ==
> Calc(select=[ip, PROCTIME_MATERIALIZE(proctime) AS proctime, ip0, type])
> +- LookupJoin(table=[bnpmp_mysql_test.gem_tmp.b], joinType=[LeftOuterJoin], 
> lookup=[type=0, ip=ip], where=[(type = 0)], select=[ip, proctime, ip, CAST(0 
> AS INTEGER) AS type, CAST(ip AS VARCHAR(2147483647)) AS ip0])
>    +- Calc(select=[ip, PROCTIME() AS proctime])
>       +- TableSourceScan(table=[[default_catalog, default_database, a]], 
> fields=[ip]) {code}
> with flink-connector-jdbc-3.1.1-1.17.jar,  the condition is 
> *lookup=[ip=ip]*
> with flink-connector-jdbc-3.0.0-1.16.jar ,  the condition is 
> *lookup=[type=0, ip=ip], where=[(type = 0)]*
>  
> In out real world production environment, this lead incorrect data output
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33365) Missing filter condition in execution plan containing lookup join with mysql jdbc connector

2023-11-14 Thread david radley (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33365?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17785967#comment-17785967
 ] 

david radley commented on FLINK-33365:
--

[~qingwei91] I have had a play and have some code that is working. I have put 
it in a draft pr [https://github.com/apache/flink-connector-jdbc/pull/79.]

Could you feedback on it please?

 

My thinking behind the change:

The issue here is that the filters are not honoured in the loop up join when 
there are push down predicates.

The lookup processing has access to the lookup keys as well as the predicate 
pushdowns, the predicate push down information is parameterized. 

The existing getSelectStatement takes as it last parameter conditions, it takes 
the lookupkeys and creates an equality conditions in AbstractDialect like this 

 
_String fieldExpressions =_
_Arrays.stream(conditionFields)_
_.map(f -> format("%s = :%s", quoteIdentifier(f), f))_
_.collect(Collectors.joining(" AND "));_
 

This is not appropriate for a condition like a.type = 0 containing a literal 
and potentially not being an equals condition.

The predicate pushdown is currently only is implemented  for simple 
expressions. 

The idea of the change is to resolve the conditions completely in the lookup 
code and pass an array of Strings to a new overloaded implementation of 
getSelectStatement, so we can add the appropriate conditions as a new 
parameter. Note that the conditions for non-predicate pushdown would end up as 
filters in the existing select conditions.  

 

I do not think the pr is ready yet, but I would love to hear your feedback. I 
intend to: 
 * to do more testing 
 ** multiple conditions including non equalities
 ** multiple lookup keys
 * ideally add test cases along the lines that have been suggested by [~Sergey 
Nuyanzin] 
 * look into whether I can use existing Flink classes to resolve the condition 
insert (I have used a replace); I see there is a 
ResolvedExpression class I might be able to use.
 * Any ideas on other tests I could try to break this?
 * We could do the resolution of the String inserts in the getSelectStatement - 
but this would involved passing both the predicate and the parameter in rather 
than resolving in the caller.  I think resolving it in the caller makes for a 
simpler call.  

 

 

 

 

 

 

 

 

 

 

I do not think this is ready a pr yet as I would like to do more testing and 
hopefully add tests.     

> Missing filter condition in execution plan containing lookup join with mysql 
> jdbc connector
> ---
>
> Key: FLINK-33365
> URL: https://issues.apache.org/jira/browse/FLINK-33365
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / JDBC
>Affects Versions: 1.18.0, 1.17.1
> Environment: Flink 1.17.1 & Flink 1.18.0 with 
> flink-connector-jdbc-3.1.1-1.17.jar
>Reporter: macdoor615
>Assignee: david radley
>Priority: Critical
> Attachments: flink-connector-jdbc-3.0.0-1.16.png, 
> flink-connector-jdbc-3.1.1-1.17.png
>
>
> create table in flink with sql-client.sh
> {code:java}
> CREATE TABLE default_catalog.default_database.a (
>   ip string, 
>   proctime as proctime()
> ) 
> WITH (
>   'connector' = 'datagen'
> );{code}
> create table in mysql
> {code:java}
> create table b (
>   ip varchar(20), 
>   type int
> );  {code}
>  
> Flink 1.17.1/ 1.18.0 and *flink-connector-jdbc-3.1.1-1.17.jar*
> excute in sql-client.sh 
> {code:java}
> explain SELECT * FROM default_catalog.default_database.a left join 
> bnpmp_mysql_test.gem_tmp.b FOR SYSTEM_TIME AS OF a.proctime on b.type = 0 and 
> a.ip = b.ip; {code}
> get the execution plan
> {code:java}
> ...
> == Optimized Execution Plan ==
> Calc(select=[ip, PROCTIME_MATERIALIZE(proctime) AS proctime, ip0, type])
> +- LookupJoin(table=[bnpmp_mysql_test.gem_tmp.b], joinType=[LeftOuterJoin], 
> lookup=[ip=ip], select=[ip, proctime, ip, CAST(0 AS INTEGER) AS type, CAST(ip 
> AS VARCHAR(2147483647)) AS ip0])
>    +- Calc(select=[ip, PROCTIME() AS proctime])
>       +- TableSourceScan(table=[[default_catalog, default_database, a]], 
> fields=[ip]){code}
>  
> excute same sql in sql-client with Flink 1.17.1/ 1.18.0 and 
> *flink-connector-jdbc-3.0.0-1.16.jar*
> get the execution plan
> {code:java}
> ...
> == Optimized Execution Plan ==
> Calc(select=[ip, PROCTIME_MATERIALIZE(proctime) AS proctime, ip0, type])
> +- LookupJoin(table=[bnpmp_mysql_test.gem_tmp.b], joinType=[LeftOuterJoin], 
> lookup=[type=0, ip=ip], where=[(type = 0)], select=[ip, proctime, ip, CAST(0 
> AS INTEGER) AS type, CAST(ip AS VARCHAR(2147483647)) AS ip0])
>    +- Calc(select=[ip, PROCTIME() AS proctime])
>       +- TableSourceScan(table=[[default_catalog, default_database, a]], 
> fields=[ip]) {code}
> with 

[jira] [Commented] (FLINK-33365) Missing filter condition in execution plan containing lookup join with mysql jdbc connector

2023-11-14 Thread Benchao Li (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33365?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17785849#comment-17785849
 ] 

Benchao Li commented on FLINK-33365:


[~davidradl] "lookup table source" is a special form of "table source", so in 
theory it should/could handle all the pushed down predicate/limit/partitions, 
this is also how we abstracted it in Flink now. Hence to me, fixing it in JDBC 
Connector is a better choice for now.

Regarding the #2, I don't see a real need that one connector that supports 
pushdown for "scan table source", but not for "lookup table source" yet, if 
there is a real need for this in the future, we can revisit and add 
fine-grained pushdown abilities for "scan table source" and "lookup table 
source". WDYT?

Feel free to ping me if your PR is ready, I would be glad to help review and 
merge it:)

> Missing filter condition in execution plan containing lookup join with mysql 
> jdbc connector
> ---
>
> Key: FLINK-33365
> URL: https://issues.apache.org/jira/browse/FLINK-33365
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / JDBC
>Affects Versions: 1.18.0, 1.17.1
> Environment: Flink 1.17.1 & Flink 1.18.0 with 
> flink-connector-jdbc-3.1.1-1.17.jar
>Reporter: macdoor615
>Assignee: david radley
>Priority: Critical
> Attachments: flink-connector-jdbc-3.0.0-1.16.png, 
> flink-connector-jdbc-3.1.1-1.17.png
>
>
> create table in flink with sql-client.sh
> {code:java}
> CREATE TABLE default_catalog.default_database.a (
>   ip string, 
>   proctime as proctime()
> ) 
> WITH (
>   'connector' = 'datagen'
> );{code}
> create table in mysql
> {code:java}
> create table b (
>   ip varchar(20), 
>   type int
> );  {code}
>  
> Flink 1.17.1/ 1.18.0 and *flink-connector-jdbc-3.1.1-1.17.jar*
> excute in sql-client.sh 
> {code:java}
> explain SELECT * FROM default_catalog.default_database.a left join 
> bnpmp_mysql_test.gem_tmp.b FOR SYSTEM_TIME AS OF a.proctime on b.type = 0 and 
> a.ip = b.ip; {code}
> get the execution plan
> {code:java}
> ...
> == Optimized Execution Plan ==
> Calc(select=[ip, PROCTIME_MATERIALIZE(proctime) AS proctime, ip0, type])
> +- LookupJoin(table=[bnpmp_mysql_test.gem_tmp.b], joinType=[LeftOuterJoin], 
> lookup=[ip=ip], select=[ip, proctime, ip, CAST(0 AS INTEGER) AS type, CAST(ip 
> AS VARCHAR(2147483647)) AS ip0])
>    +- Calc(select=[ip, PROCTIME() AS proctime])
>       +- TableSourceScan(table=[[default_catalog, default_database, a]], 
> fields=[ip]){code}
>  
> excute same sql in sql-client with Flink 1.17.1/ 1.18.0 and 
> *flink-connector-jdbc-3.0.0-1.16.jar*
> get the execution plan
> {code:java}
> ...
> == Optimized Execution Plan ==
> Calc(select=[ip, PROCTIME_MATERIALIZE(proctime) AS proctime, ip0, type])
> +- LookupJoin(table=[bnpmp_mysql_test.gem_tmp.b], joinType=[LeftOuterJoin], 
> lookup=[type=0, ip=ip], where=[(type = 0)], select=[ip, proctime, ip, CAST(0 
> AS INTEGER) AS type, CAST(ip AS VARCHAR(2147483647)) AS ip0])
>    +- Calc(select=[ip, PROCTIME() AS proctime])
>       +- TableSourceScan(table=[[default_catalog, default_database, a]], 
> fields=[ip]) {code}
> with flink-connector-jdbc-3.1.1-1.17.jar,  the condition is 
> *lookup=[ip=ip]*
> with flink-connector-jdbc-3.0.0-1.16.jar ,  the condition is 
> *lookup=[type=0, ip=ip], where=[(type = 0)]*
>  
> In out real world production environment, this lead incorrect data output
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33365) Missing filter condition in execution plan containing lookup join with mysql jdbc connector

2023-11-14 Thread Lim Qing Wei (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33365?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17785826#comment-17785826
 ] 

Lim Qing Wei commented on FLINK-33365:
--

I am not familiar with Calcite, not able to comment on how it should interact 
with the pushdown predicate. I do agree 2nd approach is probably a smaller 
change and possibly safer.

I originally implemented FilterPushdown in JDBC because that's where the query 
is being generated and it is performing LimitPushdown, now with this bug I am 
not sure if that's the right way.

> Missing filter condition in execution plan containing lookup join with mysql 
> jdbc connector
> ---
>
> Key: FLINK-33365
> URL: https://issues.apache.org/jira/browse/FLINK-33365
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / JDBC
>Affects Versions: 1.18.0, 1.17.1
> Environment: Flink 1.17.1 & Flink 1.18.0 with 
> flink-connector-jdbc-3.1.1-1.17.jar
>Reporter: macdoor615
>Assignee: david radley
>Priority: Critical
> Attachments: flink-connector-jdbc-3.0.0-1.16.png, 
> flink-connector-jdbc-3.1.1-1.17.png
>
>
> create table in flink with sql-client.sh
> {code:java}
> CREATE TABLE default_catalog.default_database.a (
>   ip string, 
>   proctime as proctime()
> ) 
> WITH (
>   'connector' = 'datagen'
> );{code}
> create table in mysql
> {code:java}
> create table b (
>   ip varchar(20), 
>   type int
> );  {code}
>  
> Flink 1.17.1/ 1.18.0 and *flink-connector-jdbc-3.1.1-1.17.jar*
> excute in sql-client.sh 
> {code:java}
> explain SELECT * FROM default_catalog.default_database.a left join 
> bnpmp_mysql_test.gem_tmp.b FOR SYSTEM_TIME AS OF a.proctime on b.type = 0 and 
> a.ip = b.ip; {code}
> get the execution plan
> {code:java}
> ...
> == Optimized Execution Plan ==
> Calc(select=[ip, PROCTIME_MATERIALIZE(proctime) AS proctime, ip0, type])
> +- LookupJoin(table=[bnpmp_mysql_test.gem_tmp.b], joinType=[LeftOuterJoin], 
> lookup=[ip=ip], select=[ip, proctime, ip, CAST(0 AS INTEGER) AS type, CAST(ip 
> AS VARCHAR(2147483647)) AS ip0])
>    +- Calc(select=[ip, PROCTIME() AS proctime])
>       +- TableSourceScan(table=[[default_catalog, default_database, a]], 
> fields=[ip]){code}
>  
> excute same sql in sql-client with Flink 1.17.1/ 1.18.0 and 
> *flink-connector-jdbc-3.0.0-1.16.jar*
> get the execution plan
> {code:java}
> ...
> == Optimized Execution Plan ==
> Calc(select=[ip, PROCTIME_MATERIALIZE(proctime) AS proctime, ip0, type])
> +- LookupJoin(table=[bnpmp_mysql_test.gem_tmp.b], joinType=[LeftOuterJoin], 
> lookup=[type=0, ip=ip], where=[(type = 0)], select=[ip, proctime, ip, CAST(0 
> AS INTEGER) AS type, CAST(ip AS VARCHAR(2147483647)) AS ip0])
>    +- Calc(select=[ip, PROCTIME() AS proctime])
>       +- TableSourceScan(table=[[default_catalog, default_database, a]], 
> fields=[ip]) {code}
> with flink-connector-jdbc-3.1.1-1.17.jar,  the condition is 
> *lookup=[ip=ip]*
> with flink-connector-jdbc-3.0.0-1.16.jar ,  the condition is 
> *lookup=[type=0, ip=ip], where=[(type = 0)]*
>  
> In out real world production environment, this lead incorrect data output
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33365) Missing filter condition in execution plan containing lookup join with mysql jdbc connector

2023-11-14 Thread david radley (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33365?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17785813#comment-17785813
 ] 

david radley commented on FLINK-33365:
--

[~qingwei91] thank you very much for you support :) . One question I have about 
the design intent.

1) Would it be true to say that the Calcite rule's should dictate the general 
shape of the query, I can see that there are rules around predicate pushdown 
and lookups. Then the JDBC dialect would be minimal type mapping of the graph. 

2) The solution we are considering, because the predicate pushdown logic was 
done in the JDBC code, we are fixing up the query in the JDBC code to not lose 
the conditions.

 

Would it be true to say the 2nd approach is more tactical and short term as 
this is a blocker and the 1st approach would be more ideal? Or am I 
misunderstanding?

 

Either way I will look into option 2), as a change of the rules is much more 
impactful outside of JDBC. 

 

 

       

  

 

   

> Missing filter condition in execution plan containing lookup join with mysql 
> jdbc connector
> ---
>
> Key: FLINK-33365
> URL: https://issues.apache.org/jira/browse/FLINK-33365
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / JDBC
>Affects Versions: 1.18.0, 1.17.1
> Environment: Flink 1.17.1 & Flink 1.18.0 with 
> flink-connector-jdbc-3.1.1-1.17.jar
>Reporter: macdoor615
>Assignee: david radley
>Priority: Critical
> Attachments: flink-connector-jdbc-3.0.0-1.16.png, 
> flink-connector-jdbc-3.1.1-1.17.png
>
>
> create table in flink with sql-client.sh
> {code:java}
> CREATE TABLE default_catalog.default_database.a (
>   ip string, 
>   proctime as proctime()
> ) 
> WITH (
>   'connector' = 'datagen'
> );{code}
> create table in mysql
> {code:java}
> create table b (
>   ip varchar(20), 
>   type int
> );  {code}
>  
> Flink 1.17.1/ 1.18.0 and *flink-connector-jdbc-3.1.1-1.17.jar*
> excute in sql-client.sh 
> {code:java}
> explain SELECT * FROM default_catalog.default_database.a left join 
> bnpmp_mysql_test.gem_tmp.b FOR SYSTEM_TIME AS OF a.proctime on b.type = 0 and 
> a.ip = b.ip; {code}
> get the execution plan
> {code:java}
> ...
> == Optimized Execution Plan ==
> Calc(select=[ip, PROCTIME_MATERIALIZE(proctime) AS proctime, ip0, type])
> +- LookupJoin(table=[bnpmp_mysql_test.gem_tmp.b], joinType=[LeftOuterJoin], 
> lookup=[ip=ip], select=[ip, proctime, ip, CAST(0 AS INTEGER) AS type, CAST(ip 
> AS VARCHAR(2147483647)) AS ip0])
>    +- Calc(select=[ip, PROCTIME() AS proctime])
>       +- TableSourceScan(table=[[default_catalog, default_database, a]], 
> fields=[ip]){code}
>  
> excute same sql in sql-client with Flink 1.17.1/ 1.18.0 and 
> *flink-connector-jdbc-3.0.0-1.16.jar*
> get the execution plan
> {code:java}
> ...
> == Optimized Execution Plan ==
> Calc(select=[ip, PROCTIME_MATERIALIZE(proctime) AS proctime, ip0, type])
> +- LookupJoin(table=[bnpmp_mysql_test.gem_tmp.b], joinType=[LeftOuterJoin], 
> lookup=[type=0, ip=ip], where=[(type = 0)], select=[ip, proctime, ip, CAST(0 
> AS INTEGER) AS type, CAST(ip AS VARCHAR(2147483647)) AS ip0])
>    +- Calc(select=[ip, PROCTIME() AS proctime])
>       +- TableSourceScan(table=[[default_catalog, default_database, a]], 
> fields=[ip]) {code}
> with flink-connector-jdbc-3.1.1-1.17.jar,  the condition is 
> *lookup=[ip=ip]*
> with flink-connector-jdbc-3.0.0-1.16.jar ,  the condition is 
> *lookup=[type=0, ip=ip], where=[(type = 0)]*
>  
> In out real world production environment, this lead incorrect data output
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33365) Missing filter condition in execution plan containing lookup join with mysql jdbc connector

2023-11-13 Thread Lim Qing Wei (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33365?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17785672#comment-17785672
 ] 

Lim Qing Wei commented on FLINK-33365:
--

Hi [~davidradl] , 

if you can work on this that's better, I agree that we need to change the 
LookupFunction to incorporate the information about pushdown.

 

> Missing filter condition in execution plan containing lookup join with mysql 
> jdbc connector
> ---
>
> Key: FLINK-33365
> URL: https://issues.apache.org/jira/browse/FLINK-33365
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / JDBC
>Affects Versions: 1.18.0, 1.17.1
> Environment: Flink 1.17.1 & Flink 1.18.0 with 
> flink-connector-jdbc-3.1.1-1.17.jar
>Reporter: macdoor615
>Assignee: david radley
>Priority: Critical
> Attachments: flink-connector-jdbc-3.0.0-1.16.png, 
> flink-connector-jdbc-3.1.1-1.17.png
>
>
> create table in flink with sql-client.sh
> {code:java}
> CREATE TABLE default_catalog.default_database.a (
>   ip string, 
>   proctime as proctime()
> ) 
> WITH (
>   'connector' = 'datagen'
> );{code}
> create table in mysql
> {code:java}
> create table b (
>   ip varchar(20), 
>   type int
> );  {code}
>  
> Flink 1.17.1/ 1.18.0 and *flink-connector-jdbc-3.1.1-1.17.jar*
> excute in sql-client.sh 
> {code:java}
> explain SELECT * FROM default_catalog.default_database.a left join 
> bnpmp_mysql_test.gem_tmp.b FOR SYSTEM_TIME AS OF a.proctime on b.type = 0 and 
> a.ip = b.ip; {code}
> get the execution plan
> {code:java}
> ...
> == Optimized Execution Plan ==
> Calc(select=[ip, PROCTIME_MATERIALIZE(proctime) AS proctime, ip0, type])
> +- LookupJoin(table=[bnpmp_mysql_test.gem_tmp.b], joinType=[LeftOuterJoin], 
> lookup=[ip=ip], select=[ip, proctime, ip, CAST(0 AS INTEGER) AS type, CAST(ip 
> AS VARCHAR(2147483647)) AS ip0])
>    +- Calc(select=[ip, PROCTIME() AS proctime])
>       +- TableSourceScan(table=[[default_catalog, default_database, a]], 
> fields=[ip]){code}
>  
> excute same sql in sql-client with Flink 1.17.1/ 1.18.0 and 
> *flink-connector-jdbc-3.0.0-1.16.jar*
> get the execution plan
> {code:java}
> ...
> == Optimized Execution Plan ==
> Calc(select=[ip, PROCTIME_MATERIALIZE(proctime) AS proctime, ip0, type])
> +- LookupJoin(table=[bnpmp_mysql_test.gem_tmp.b], joinType=[LeftOuterJoin], 
> lookup=[type=0, ip=ip], where=[(type = 0)], select=[ip, proctime, ip, CAST(0 
> AS INTEGER) AS type, CAST(ip AS VARCHAR(2147483647)) AS ip0])
>    +- Calc(select=[ip, PROCTIME() AS proctime])
>       +- TableSourceScan(table=[[default_catalog, default_database, a]], 
> fields=[ip]) {code}
> with flink-connector-jdbc-3.1.1-1.17.jar,  the condition is 
> *lookup=[ip=ip]*
> with flink-connector-jdbc-3.0.0-1.16.jar ,  the condition is 
> *lookup=[type=0, ip=ip], where=[(type = 0)]*
>  
> In out real world production environment, this lead incorrect data output
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33365) Missing filter condition in execution plan containing lookup join with mysql jdbc connector

2023-11-13 Thread david radley (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33365?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17785563#comment-17785563
 ] 

david radley commented on FLINK-33365:
--

[~Sergey Nuyanzin] [~qingwei91] thanks for you input. The 2 circumventions do 
not work for me on master, on the reported case. 

It sounds like the introduction of pushdown predicate capability for jdbc, 
broke this case.  At the moment the 
JdbcRowDataLookupFunction just adds the keyNames to the conditions 
 
_this.query =_
_options.getDialect()_
_.getSelectFromStatement(options.getTableName(), fieldNames, keyNames);_
 
 
It seems to me that we need to include the filter information into the context 
when we do the pushdown logic, so we can add this as a condition to the last 
parameter. I was hoping to contribute this as a fix, as I have been assigned 
the issue;  [~qingwei91] is this still possible- or do you plan to fix this? 
 
 
 
 

 

> Missing filter condition in execution plan containing lookup join with mysql 
> jdbc connector
> ---
>
> Key: FLINK-33365
> URL: https://issues.apache.org/jira/browse/FLINK-33365
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / JDBC
>Affects Versions: 1.18.0, 1.17.1
> Environment: Flink 1.17.1 & Flink 1.18.0 with 
> flink-connector-jdbc-3.1.1-1.17.jar
>Reporter: macdoor615
>Assignee: david radley
>Priority: Critical
> Attachments: flink-connector-jdbc-3.0.0-1.16.png, 
> flink-connector-jdbc-3.1.1-1.17.png
>
>
> create table in flink with sql-client.sh
> {code:java}
> CREATE TABLE default_catalog.default_database.a (
>   ip string, 
>   proctime as proctime()
> ) 
> WITH (
>   'connector' = 'datagen'
> );{code}
> create table in mysql
> {code:java}
> create table b (
>   ip varchar(20), 
>   type int
> );  {code}
>  
> Flink 1.17.1/ 1.18.0 and *flink-connector-jdbc-3.1.1-1.17.jar*
> excute in sql-client.sh 
> {code:java}
> explain SELECT * FROM default_catalog.default_database.a left join 
> bnpmp_mysql_test.gem_tmp.b FOR SYSTEM_TIME AS OF a.proctime on b.type = 0 and 
> a.ip = b.ip; {code}
> get the execution plan
> {code:java}
> ...
> == Optimized Execution Plan ==
> Calc(select=[ip, PROCTIME_MATERIALIZE(proctime) AS proctime, ip0, type])
> +- LookupJoin(table=[bnpmp_mysql_test.gem_tmp.b], joinType=[LeftOuterJoin], 
> lookup=[ip=ip], select=[ip, proctime, ip, CAST(0 AS INTEGER) AS type, CAST(ip 
> AS VARCHAR(2147483647)) AS ip0])
>    +- Calc(select=[ip, PROCTIME() AS proctime])
>       +- TableSourceScan(table=[[default_catalog, default_database, a]], 
> fields=[ip]){code}
>  
> excute same sql in sql-client with Flink 1.17.1/ 1.18.0 and 
> *flink-connector-jdbc-3.0.0-1.16.jar*
> get the execution plan
> {code:java}
> ...
> == Optimized Execution Plan ==
> Calc(select=[ip, PROCTIME_MATERIALIZE(proctime) AS proctime, ip0, type])
> +- LookupJoin(table=[bnpmp_mysql_test.gem_tmp.b], joinType=[LeftOuterJoin], 
> lookup=[type=0, ip=ip], where=[(type = 0)], select=[ip, proctime, ip, CAST(0 
> AS INTEGER) AS type, CAST(ip AS VARCHAR(2147483647)) AS ip0])
>    +- Calc(select=[ip, PROCTIME() AS proctime])
>       +- TableSourceScan(table=[[default_catalog, default_database, a]], 
> fields=[ip]) {code}
> with flink-connector-jdbc-3.1.1-1.17.jar,  the condition is 
> *lookup=[ip=ip]*
> with flink-connector-jdbc-3.0.0-1.16.jar ,  the condition is 
> *lookup=[type=0, ip=ip], where=[(type = 0)]*
>  
> In out real world production environment, this lead incorrect data output
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33365) Missing filter condition in execution plan containing lookup join with mysql jdbc connector

2023-11-13 Thread Lim Qing Wei (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33365?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17785470#comment-17785470
 ] 

Lim Qing Wei commented on FLINK-33365:
--

[~Sergey Nuyanzin] , I will look into this

> Missing filter condition in execution plan containing lookup join with mysql 
> jdbc connector
> ---
>
> Key: FLINK-33365
> URL: https://issues.apache.org/jira/browse/FLINK-33365
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / JDBC
>Affects Versions: 1.18.0, 1.17.1
> Environment: Flink 1.17.1 & Flink 1.18.0 with 
> flink-connector-jdbc-3.1.1-1.17.jar
>Reporter: macdoor615
>Assignee: david radley
>Priority: Critical
> Attachments: flink-connector-jdbc-3.0.0-1.16.png, 
> flink-connector-jdbc-3.1.1-1.17.png
>
>
> create table in flink with sql-client.sh
> {code:java}
> CREATE TABLE default_catalog.default_database.a (
>   ip string, 
>   proctime as proctime()
> ) 
> WITH (
>   'connector' = 'datagen'
> );{code}
> create table in mysql
> {code:java}
> create table b (
>   ip varchar(20), 
>   type int
> );  {code}
>  
> Flink 1.17.1/ 1.18.0 and *flink-connector-jdbc-3.1.1-1.17.jar*
> excute in sql-client.sh 
> {code:java}
> explain SELECT * FROM default_catalog.default_database.a left join 
> bnpmp_mysql_test.gem_tmp.b FOR SYSTEM_TIME AS OF a.proctime on b.type = 0 and 
> a.ip = b.ip; {code}
> get the execution plan
> {code:java}
> ...
> == Optimized Execution Plan ==
> Calc(select=[ip, PROCTIME_MATERIALIZE(proctime) AS proctime, ip0, type])
> +- LookupJoin(table=[bnpmp_mysql_test.gem_tmp.b], joinType=[LeftOuterJoin], 
> lookup=[ip=ip], select=[ip, proctime, ip, CAST(0 AS INTEGER) AS type, CAST(ip 
> AS VARCHAR(2147483647)) AS ip0])
>    +- Calc(select=[ip, PROCTIME() AS proctime])
>       +- TableSourceScan(table=[[default_catalog, default_database, a]], 
> fields=[ip]){code}
>  
> excute same sql in sql-client with Flink 1.17.1/ 1.18.0 and 
> *flink-connector-jdbc-3.0.0-1.16.jar*
> get the execution plan
> {code:java}
> ...
> == Optimized Execution Plan ==
> Calc(select=[ip, PROCTIME_MATERIALIZE(proctime) AS proctime, ip0, type])
> +- LookupJoin(table=[bnpmp_mysql_test.gem_tmp.b], joinType=[LeftOuterJoin], 
> lookup=[type=0, ip=ip], where=[(type = 0)], select=[ip, proctime, ip, CAST(0 
> AS INTEGER) AS type, CAST(ip AS VARCHAR(2147483647)) AS ip0])
>    +- Calc(select=[ip, PROCTIME() AS proctime])
>       +- TableSourceScan(table=[[default_catalog, default_database, a]], 
> fields=[ip]) {code}
> with flink-connector-jdbc-3.1.1-1.17.jar,  the condition is 
> *lookup=[ip=ip]*
> with flink-connector-jdbc-3.0.0-1.16.jar ,  the condition is 
> *lookup=[type=0, ip=ip], where=[(type = 0)]*
>  
> In out real world production environment, this lead incorrect data output
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33365) Missing filter condition in execution plan containing lookup join with mysql jdbc connector

2023-11-12 Thread Benchao Li (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33365?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17785355#comment-17785355
 ] 

Benchao Li commented on FLINK-33365:


[~Sergey Nuyanzin] Thanks for the digging. It's indeed a problem from the 
current JDBC filter pushdown implementation, one way to mitigate this problem 
might be also considering the predicates in {{JdbcRowDataLookupFunction}}.

> Missing filter condition in execution plan containing lookup join with mysql 
> jdbc connector
> ---
>
> Key: FLINK-33365
> URL: https://issues.apache.org/jira/browse/FLINK-33365
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / JDBC
>Affects Versions: 1.18.0, 1.17.1
> Environment: Flink 1.17.1 & Flink 1.18.0 with 
> flink-connector-jdbc-3.1.1-1.17.jar
>Reporter: macdoor615
>Assignee: david radley
>Priority: Critical
> Attachments: flink-connector-jdbc-3.0.0-1.16.png, 
> flink-connector-jdbc-3.1.1-1.17.png
>
>
> create table in flink with sql-client.sh
> {code:java}
> CREATE TABLE default_catalog.default_database.a (
>   ip string, 
>   proctime as proctime()
> ) 
> WITH (
>   'connector' = 'datagen'
> );{code}
> create table in mysql
> {code:java}
> create table b (
>   ip varchar(20), 
>   type int
> );  {code}
>  
> Flink 1.17.1/ 1.18.0 and *flink-connector-jdbc-3.1.1-1.17.jar*
> excute in sql-client.sh 
> {code:java}
> explain SELECT * FROM default_catalog.default_database.a left join 
> bnpmp_mysql_test.gem_tmp.b FOR SYSTEM_TIME AS OF a.proctime on b.type = 0 and 
> a.ip = b.ip; {code}
> get the execution plan
> {code:java}
> ...
> == Optimized Execution Plan ==
> Calc(select=[ip, PROCTIME_MATERIALIZE(proctime) AS proctime, ip0, type])
> +- LookupJoin(table=[bnpmp_mysql_test.gem_tmp.b], joinType=[LeftOuterJoin], 
> lookup=[ip=ip], select=[ip, proctime, ip, CAST(0 AS INTEGER) AS type, CAST(ip 
> AS VARCHAR(2147483647)) AS ip0])
>    +- Calc(select=[ip, PROCTIME() AS proctime])
>       +- TableSourceScan(table=[[default_catalog, default_database, a]], 
> fields=[ip]){code}
>  
> excute same sql in sql-client with Flink 1.17.1/ 1.18.0 and 
> *flink-connector-jdbc-3.0.0-1.16.jar*
> get the execution plan
> {code:java}
> ...
> == Optimized Execution Plan ==
> Calc(select=[ip, PROCTIME_MATERIALIZE(proctime) AS proctime, ip0, type])
> +- LookupJoin(table=[bnpmp_mysql_test.gem_tmp.b], joinType=[LeftOuterJoin], 
> lookup=[type=0, ip=ip], where=[(type = 0)], select=[ip, proctime, ip, CAST(0 
> AS INTEGER) AS type, CAST(ip AS VARCHAR(2147483647)) AS ip0])
>    +- Calc(select=[ip, PROCTIME() AS proctime])
>       +- TableSourceScan(table=[[default_catalog, default_database, a]], 
> fields=[ip]) {code}
> with flink-connector-jdbc-3.1.1-1.17.jar,  the condition is 
> *lookup=[ip=ip]*
> with flink-connector-jdbc-3.0.0-1.16.jar ,  the condition is 
> *lookup=[type=0, ip=ip], where=[(type = 0)]*
>  
> In out real world production environment, this lead incorrect data output
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33365) Missing filter condition in execution plan containing lookup join with mysql jdbc connector

2023-11-12 Thread Sergey Nuyanzin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33365?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17785321#comment-17785321
 ] 

Sergey Nuyanzin commented on FLINK-33365:
-

looks like the issue is incomplete implementation of filter push down 
FLINK-16024

TC to reproduce
add this to {{org.apache.flink.connector.jdbc.table.JdbcTablePlanTest}}
{code:java}
@Test
public void MyTest() {
String dataId =
TestValuesTableFactory.registerData(
Arrays.asList(
Row.of(1L, "Alice"),
Row.of(1L, "Alice"),
Row.of(2L, "Bob"),
Row.of(3L, "Charlie")));
util.tableEnv().executeSql(
String.format(
"CREATE TABLE value_source (\n"
+ "`id` BIGINT,\n"
+ "`name` STRING,\n"
+ "`proctime` AS PROCTIME()\n"
+ ") WITH (\n"
+ "'connector' = 'values', \n"
+ "'data-id' = '%s')",
dataId));
util.verifyExecPlan(
"SELECT S.id, S.name, D.id, D.timestamp6_col, D.double_col FROM 
value_source"
+ " AS S JOIN jdbc for system_time as of S.proctime AS 
D ON S.id = D.id and D.decimal_col = 0.0");
}
{code}
and check before and after commit https://github.com/apache/flink/pull/20140 it 
shows different results.


At the same time there is a bunch of WA like add something meaningless or cast 
or some other function use, 
e.g. slightly changed query  started behave as expected since math operations, 
cast and other functions are not going to push down (yet)
however in Flink 1.18 math operations could be simplified by newer Calcite, so 
need to check case by case
like 
{code:sql}
SELECT S.id, S.name, D.id, D.timestamp6_col, D.double_col FROM value_source
 AS S JOIN jdbc for system_time as of S.proctime AS D 
ON S.id = D.id and D.decimal_col = 0.0 + 0
{code}

or with casting like
{code:sql}
SELECT S.id, S.name, D.id, D.timestamp6_col, D.double_col FROM value_source
 AS S JOIN jdbc for system_time as of S.proctime AS D 
ON S.id = D.id and D.decimal_col = cast(0.0 as decimal)
{code}
[~macdoor615] that might help you with your queries

[~qingwei91] could you have a look here please since you are  aware of current 
implementation


> Missing filter condition in execution plan containing lookup join with mysql 
> jdbc connector
> ---
>
> Key: FLINK-33365
> URL: https://issues.apache.org/jira/browse/FLINK-33365
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / JDBC
>Affects Versions: 1.18.0, 1.17.1
> Environment: Flink 1.17.1 & Flink 1.18.0 with 
> flink-connector-jdbc-3.1.1-1.17.jar
>Reporter: macdoor615
>Assignee: david radley
>Priority: Critical
> Attachments: flink-connector-jdbc-3.0.0-1.16.png, 
> flink-connector-jdbc-3.1.1-1.17.png
>
>
> create table in flink with sql-client.sh
> {code:java}
> CREATE TABLE default_catalog.default_database.a (
>   ip string, 
>   proctime as proctime()
> ) 
> WITH (
>   'connector' = 'datagen'
> );{code}
> create table in mysql
> {code:java}
> create table b (
>   ip varchar(20), 
>   type int
> );  {code}
>  
> Flink 1.17.1/ 1.18.0 and *flink-connector-jdbc-3.1.1-1.17.jar*
> excute in sql-client.sh 
> {code:java}
> explain SELECT * FROM default_catalog.default_database.a left join 
> bnpmp_mysql_test.gem_tmp.b FOR SYSTEM_TIME AS OF a.proctime on b.type = 0 and 
> a.ip = b.ip; {code}
> get the execution plan
> {code:java}
> ...
> == Optimized Execution Plan ==
> Calc(select=[ip, PROCTIME_MATERIALIZE(proctime) AS proctime, ip0, type])
> +- LookupJoin(table=[bnpmp_mysql_test.gem_tmp.b], joinType=[LeftOuterJoin], 
> lookup=[ip=ip], select=[ip, proctime, ip, CAST(0 AS INTEGER) AS type, CAST(ip 
> AS VARCHAR(2147483647)) AS ip0])
>    +- Calc(select=[ip, PROCTIME() AS proctime])
>       +- TableSourceScan(table=[[default_catalog, default_database, a]], 
> fields=[ip]){code}
>  
> excute same sql in sql-client with Flink 1.17.1/ 1.18.0 and 
> *flink-connector-jdbc-3.0.0-1.16.jar*
> get the execution plan
> {code:java}
> ...
> == Optimized Execution Plan ==
> Calc(select=[ip, PROCTIME_MATERIALIZE(proctime) AS proctime, ip0, type])
> +- LookupJoin(table=[bnpmp_mysql_test.gem_tmp.b], joinType=[LeftOuterJoin], 
> lookup=[type=0, ip=ip], where=[(type = 0)], select=[ip, proctime, ip, CAST(0 
> AS INTEGER) AS type, CAST(ip AS VARCHAR(2147483647)) AS ip0])
>    +- Calc(select=[ip, PROCTIME() AS proctime])
>       +- 

[jira] [Commented] (FLINK-33365) Missing filter condition in execution plan containing lookup join with mysql jdbc connector

2023-11-10 Thread david radley (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33365?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17785031#comment-17785031
 ] 

david radley commented on FLINK-33365:
--

I am not sure about the design intent in this area. Should the pushed down 
predicate be in the lookup keys as the java doc for 
([https://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/common/CommonPhysicalLookupJoin.scala)]

Indicates. Or should it just be honoured as a filter. If it is honoured a 
filter,  then does the dialect need to do anything special for the pushdown 
predicates, like it does for the scan case.

 

I think the lookup processing should honour the pushed down predicates and they 
should not be in the lookup keys (unless the source does not support predicate 
push down). There is no current way I can see for the dialect to add in these 
predicates, which has been optimized in the Calcite rules processing.

So I think a change to the lookup processing (probably in 
[CommonPhysicalLookupJoin|https://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/common/CommonPhysicalLookupJoin.scala)])
  to honour the table source resolvedPredicates and pushed down params, is 
required.

WDYT?

> Missing filter condition in execution plan containing lookup join with mysql 
> jdbc connector
> ---
>
> Key: FLINK-33365
> URL: https://issues.apache.org/jira/browse/FLINK-33365
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / JDBC
>Affects Versions: 1.18.0, 1.17.1
> Environment: Flink 1.17.1 & Flink 1.18.0 with 
> flink-connector-jdbc-3.1.1-1.17.jar
>Reporter: macdoor615
>Assignee: david radley
>Priority: Critical
> Attachments: flink-connector-jdbc-3.0.0-1.16.png, 
> flink-connector-jdbc-3.1.1-1.17.png
>
>
> create table in flink with sql-client.sh
> {code:java}
> CREATE TABLE default_catalog.default_database.a (
>   ip string, 
>   proctime as proctime()
> ) 
> WITH (
>   'connector' = 'datagen'
> );{code}
> create table in mysql
> {code:java}
> create table b (
>   ip varchar(20), 
>   type int
> );  {code}
>  
> Flink 1.17.1/ 1.18.0 and *flink-connector-jdbc-3.1.1-1.17.jar*
> excute in sql-client.sh 
> {code:java}
> explain SELECT * FROM default_catalog.default_database.a left join 
> bnpmp_mysql_test.gem_tmp.b FOR SYSTEM_TIME AS OF a.proctime on b.type = 0 and 
> a.ip = b.ip; {code}
> get the execution plan
> {code:java}
> ...
> == Optimized Execution Plan ==
> Calc(select=[ip, PROCTIME_MATERIALIZE(proctime) AS proctime, ip0, type])
> +- LookupJoin(table=[bnpmp_mysql_test.gem_tmp.b], joinType=[LeftOuterJoin], 
> lookup=[ip=ip], select=[ip, proctime, ip, CAST(0 AS INTEGER) AS type, CAST(ip 
> AS VARCHAR(2147483647)) AS ip0])
>    +- Calc(select=[ip, PROCTIME() AS proctime])
>       +- TableSourceScan(table=[[default_catalog, default_database, a]], 
> fields=[ip]){code}
>  
> excute same sql in sql-client with Flink 1.17.1/ 1.18.0 and 
> *flink-connector-jdbc-3.0.0-1.16.jar*
> get the execution plan
> {code:java}
> ...
> == Optimized Execution Plan ==
> Calc(select=[ip, PROCTIME_MATERIALIZE(proctime) AS proctime, ip0, type])
> +- LookupJoin(table=[bnpmp_mysql_test.gem_tmp.b], joinType=[LeftOuterJoin], 
> lookup=[type=0, ip=ip], where=[(type = 0)], select=[ip, proctime, ip, CAST(0 
> AS INTEGER) AS type, CAST(ip AS VARCHAR(2147483647)) AS ip0])
>    +- Calc(select=[ip, PROCTIME() AS proctime])
>       +- TableSourceScan(table=[[default_catalog, default_database, a]], 
> fields=[ip]) {code}
> with flink-connector-jdbc-3.1.1-1.17.jar,  the condition is 
> *lookup=[ip=ip]*
> with flink-connector-jdbc-3.0.0-1.16.jar ,  the condition is 
> *lookup=[type=0, ip=ip], where=[(type = 0)]*
>  
> In out real world production environment, this lead incorrect data output
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33365) Missing filter condition in execution plan containing lookup join with mysql jdbc connector

2023-11-10 Thread david radley (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33365?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17784924#comment-17784924
 ] 

david radley commented on FLINK-33365:
--

Testing against MariaDB I see:

select  CAST(0 AS INT) as type  from c;

+--+

| type |

+--+

|    0 |

|    0 |

+--+

 

It looks like this change from a filter to a cast is not logically equivalent. 

> Missing filter condition in execution plan containing lookup join with mysql 
> jdbc connector
> ---
>
> Key: FLINK-33365
> URL: https://issues.apache.org/jira/browse/FLINK-33365
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / JDBC
>Affects Versions: 1.18.0, 1.17.1
> Environment: Flink 1.17.1 & Flink 1.18.0 with 
> flink-connector-jdbc-3.1.1-1.17.jar
>Reporter: macdoor615
>Assignee: david radley
>Priority: Critical
> Attachments: flink-connector-jdbc-3.0.0-1.16.png, 
> flink-connector-jdbc-3.1.1-1.17.png
>
>
> create table in flink with sql-client.sh
> {code:java}
> CREATE TABLE default_catalog.default_database.a (
>   ip string, 
>   proctime as proctime()
> ) 
> WITH (
>   'connector' = 'datagen'
> );{code}
> create table in mysql
> {code:java}
> create table b (
>   ip varchar(20), 
>   type int
> );  {code}
>  
> Flink 1.17.1/ 1.18.0 and *flink-connector-jdbc-3.1.1-1.17.jar*
> excute in sql-client.sh 
> {code:java}
> explain SELECT * FROM default_catalog.default_database.a left join 
> bnpmp_mysql_test.gem_tmp.b FOR SYSTEM_TIME AS OF a.proctime on b.type = 0 and 
> a.ip = b.ip; {code}
> get the execution plan
> {code:java}
> ...
> == Optimized Execution Plan ==
> Calc(select=[ip, PROCTIME_MATERIALIZE(proctime) AS proctime, ip0, type])
> +- LookupJoin(table=[bnpmp_mysql_test.gem_tmp.b], joinType=[LeftOuterJoin], 
> lookup=[ip=ip], select=[ip, proctime, ip, CAST(0 AS INTEGER) AS type, CAST(ip 
> AS VARCHAR(2147483647)) AS ip0])
>    +- Calc(select=[ip, PROCTIME() AS proctime])
>       +- TableSourceScan(table=[[default_catalog, default_database, a]], 
> fields=[ip]){code}
>  
> excute same sql in sql-client with Flink 1.17.1/ 1.18.0 and 
> *flink-connector-jdbc-3.0.0-1.16.jar*
> get the execution plan
> {code:java}
> ...
> == Optimized Execution Plan ==
> Calc(select=[ip, PROCTIME_MATERIALIZE(proctime) AS proctime, ip0, type])
> +- LookupJoin(table=[bnpmp_mysql_test.gem_tmp.b], joinType=[LeftOuterJoin], 
> lookup=[type=0, ip=ip], where=[(type = 0)], select=[ip, proctime, ip, CAST(0 
> AS INTEGER) AS type, CAST(ip AS VARCHAR(2147483647)) AS ip0])
>    +- Calc(select=[ip, PROCTIME() AS proctime])
>       +- TableSourceScan(table=[[default_catalog, default_database, a]], 
> fields=[ip]) {code}
> with flink-connector-jdbc-3.1.1-1.17.jar,  the condition is 
> *lookup=[ip=ip]*
> with flink-connector-jdbc-3.0.0-1.16.jar ,  the condition is 
> *lookup=[type=0, ip=ip], where=[(type = 0)]*
>  
> In out real world production environment, this lead incorrect data output
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33365) Missing filter condition in execution plan containing lookup join with mysql jdbc connector

2023-11-10 Thread david radley (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33365?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17784915#comment-17784915
 ] 

david radley commented on FLINK-33365:
--

an update on what I have found:

 

I have switched on DEBUG put out the rules that are being driven for my 
recreation. I see :

org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram [] - 
optimize time_indicator cost 1 ms.

optimize result:

FlinkLogicalSink(table=[*anonymous_collect$4*], fields=[ip, proctime, ip0, 
type])

+- FlinkLogicalCalc(select=[ip, PROCTIME_MATERIALIZE(proctime) AS proctime, 
ip0, type])

   +- FlinkLogicalJoin(condition=[=($0, $4)], joinType=[left])

      :- FlinkLogicalCalc(select=[ip, PROCTIME() AS proctime])

      :  +- FlinkLogicalTableSourceScan(table=[[paimon_catalog, default, a]], 
fields=[ip])

      +- FlinkLogicalSnapshot(period=[$cor0.proctime])

         +- FlinkLogicalCalc(select=[ip, CAST(0 AS INTEGER) AS type, CAST(ip AS 
VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS ip0])

            +- FlinkLogicalTableSourceScan(table=[[mariadb_catalog, menagerie, 
c, {*}filter=[=(type, 0)]]]{*}, fields=[ip, type])

 

Is changed in the next rule to  

org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram [] - 
optimize physical cost 3 ms.

optimize result:

Sink(table=[*anonymous_collect$4*], fields=[ip, proctime, ip0, type])

+- Calc(select=[ip, PROCTIME_MATERIALIZE(proctime) AS proctime, ip0, type])

   +- LookupJoin(table=[mariadb_catalog.menagerie.c], joinType=[LeftOuterJoin], 
lookup=[ip=ip], select=[ip, proctime, ip, *CAST(0 AS INTEGER)* AS type, CAST(ip 
AS VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS ip0])

      +- Calc(select=[ip, PROCTIME() AS proctime])

         +- TableSourceScan(table=[[paimon_catalog, default, a]], fields=[ip])

 

The *CAST(0 AS INTEGER)* is in the final Optimized Execution Plan we see in the 
explain.

 

I am not an expert at this, but it seems to me that either 2 things are 
happening:

1) This change to the graph is a valid optimization but it is not being 
actioned properly when executed, such that the CAST(0 AS INTEGER) is ignored ** 

or

2) the comments at the top of 
[CommonPhysicalLookupJoin.scala]([https://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/common/CommonPhysicalLookupJoin.scala)]
 are correct and this filter should actually be in the lookup keys. The 
comments says

 
_* For a lookup join query:_
_*_
_*  SELECT T.id, T.content, D.age FROM T JOIN userTable FOR SYSTEM_TIME AS 
OF T.proctime AS D_
_* ON T.content = concat(D.name, '!') AND D.age = 11 AND T.id = D.id WHERE 
D.name LIKE 'Jack%'_
_* _
_*_
_* The LookupJoin physical node encapsulates the following RelNode tree:_
_*_
_*  Join (l.name = r.name) / \ RelNode Calc (concat(name, "!") as name, 
name LIKE 'Jack%') \|_
_* DimTable (lookup-keys: age=11, id=l.id) (age, id, name) _
_*_
_* The important member fields in LookupJoin:  allLookupKeys: [$0=11, 
$1=l.id] ($0 and $1 is_
_* the indexes of age and id in dim table) remainingCondition: 
l.name=r.name _
_*_
_* The workflow of lookup join:_
_*_
_* 1) lookup records dimension table using the lookup-keys  2) project & 
filter on the lookup-ed_
_* records  3) join left input record and lookup-ed records  4) only 
outputs the rows which_
_* match to the remainingCondition _
 
 
 

 

 

 

 

 

 

 

> Missing filter condition in execution plan containing lookup join with mysql 
> jdbc connector
> ---
>
> Key: FLINK-33365
> URL: https://issues.apache.org/jira/browse/FLINK-33365
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / JDBC
>Affects Versions: 1.18.0, 1.17.1
> Environment: Flink 1.17.1 & Flink 1.18.0 with 
> flink-connector-jdbc-3.1.1-1.17.jar
>Reporter: macdoor615
>Assignee: david radley
>Priority: Critical
> Attachments: flink-connector-jdbc-3.0.0-1.16.png, 
> flink-connector-jdbc-3.1.1-1.17.png
>
>
> create table in flink with sql-client.sh
> {code:java}
> CREATE TABLE default_catalog.default_database.a (
>   ip string, 
>   proctime as proctime()
> ) 
> WITH (
>   'connector' = 'datagen'
> );{code}
> create table in mysql
> {code:java}
> create table b (
>   ip varchar(20), 
>   type int
> );  {code}
>  
> Flink 1.17.1/ 1.18.0 and *flink-connector-jdbc-3.1.1-1.17.jar*
> excute in sql-client.sh 
> {code:java}
> explain SELECT * FROM default_catalog.default_database.a left join 
> bnpmp_mysql_test.gem_tmp.b FOR SYSTEM_TIME AS OF a.proctime on b.type = 0 and 
> a.ip = b.ip; {code}
> get the execution plan
> {code:java}
> ...
> == Optimized Execution Plan ==
> Calc(select=[ip, PROCTIME_MATERIALIZE(proctime) AS proctime, ip0, type])
> +- 

[jira] [Commented] (FLINK-33365) Missing filter condition in execution plan containing lookup join with mysql jdbc connector

2023-11-03 Thread david radley (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33365?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17782488#comment-17782488
 ] 

david radley commented on FLINK-33365:
--

I did not create using the back level jdbc driver. Though I am on a Mac, using 
MariaDB and Flink master; so that could be sufficiently different for it not to 
recreate for me . I am focussing on the issue I can recreate. 

> Missing filter condition in execution plan containing lookup join with mysql 
> jdbc connector
> ---
>
> Key: FLINK-33365
> URL: https://issues.apache.org/jira/browse/FLINK-33365
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / JDBC
>Affects Versions: 1.18.0, 1.17.1
> Environment: Flink 1.17.1 & Flink 1.18.0 with 
> flink-connector-jdbc-3.1.1-1.17.jar
>Reporter: macdoor615
>Assignee: david radley
>Priority: Critical
> Attachments: flink-connector-jdbc-3.0.0-1.16.png, 
> flink-connector-jdbc-3.1.1-1.17.png
>
>
> create table in flink with sql-client.sh
> {code:java}
> CREATE TABLE default_catalog.default_database.a (
>   ip string, 
>   proctime as proctime()
> ) 
> WITH (
>   'connector' = 'datagen'
> );{code}
> create table in mysql
> {code:java}
> create table b (
>   ip varchar(20), 
>   type int
> );  {code}
>  
> Flink 1.17.1/ 1.18.0 and *flink-connector-jdbc-3.1.1-1.17.jar*
> excute in sql-client.sh 
> {code:java}
> explain SELECT * FROM default_catalog.default_database.a left join 
> bnpmp_mysql_test.gem_tmp.b FOR SYSTEM_TIME AS OF a.proctime on b.type = 0 and 
> a.ip = b.ip; {code}
> get the execution plan
> {code:java}
> ...
> == Optimized Execution Plan ==
> Calc(select=[ip, PROCTIME_MATERIALIZE(proctime) AS proctime, ip0, type])
> +- LookupJoin(table=[bnpmp_mysql_test.gem_tmp.b], joinType=[LeftOuterJoin], 
> lookup=[ip=ip], select=[ip, proctime, ip, CAST(0 AS INTEGER) AS type, CAST(ip 
> AS VARCHAR(2147483647)) AS ip0])
>    +- Calc(select=[ip, PROCTIME() AS proctime])
>       +- TableSourceScan(table=[[default_catalog, default_database, a]], 
> fields=[ip]){code}
>  
> excute same sql in sql-client with Flink 1.17.1/ 1.18.0 and 
> *flink-connector-jdbc-3.0.0-1.16.jar*
> get the execution plan
> {code:java}
> ...
> == Optimized Execution Plan ==
> Calc(select=[ip, PROCTIME_MATERIALIZE(proctime) AS proctime, ip0, type])
> +- LookupJoin(table=[bnpmp_mysql_test.gem_tmp.b], joinType=[LeftOuterJoin], 
> lookup=[type=0, ip=ip], where=[(type = 0)], select=[ip, proctime, ip, CAST(0 
> AS INTEGER) AS type, CAST(ip AS VARCHAR(2147483647)) AS ip0])
>    +- Calc(select=[ip, PROCTIME() AS proctime])
>       +- TableSourceScan(table=[[default_catalog, default_database, a]], 
> fields=[ip]) {code}
> with flink-connector-jdbc-3.1.1-1.17.jar,  the condition is 
> *lookup=[ip=ip]*
> with flink-connector-jdbc-3.0.0-1.16.jar ,  the condition is 
> *lookup=[type=0, ip=ip], where=[(type = 0)]*
>  
> In out real world production environment, this lead incorrect data output
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33365) Missing filter condition in execution plan containing lookup join with mysql jdbc connector

2023-11-02 Thread david radley (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33365?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17782128#comment-17782128
 ] 

david radley commented on FLINK-33365:
--

[~macdoor615] thanks. 

> Missing filter condition in execution plan containing lookup join with mysql 
> jdbc connector
> ---
>
> Key: FLINK-33365
> URL: https://issues.apache.org/jira/browse/FLINK-33365
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / JDBC
>Affects Versions: 1.18.0, 1.17.1
> Environment: Flink 1.17.1 & Flink 1.18.0 with 
> flink-connector-jdbc-3.1.1-1.17.jar
>Reporter: macdoor615
>Assignee: david radley
>Priority: Critical
> Attachments: flink-connector-jdbc-3.0.0-1.16.png, 
> flink-connector-jdbc-3.1.1-1.17.png
>
>
> create table in flink with sql-client.sh
> {code:java}
> CREATE TABLE default_catalog.default_database.a (
>   ip string, 
>   proctime as proctime()
> ) 
> WITH (
>   'connector' = 'datagen'
> );{code}
> create table in mysql
> {code:java}
> create table b (
>   ip varchar(20), 
>   type int
> );  {code}
>  
> Flink 1.17.1/ 1.18.0 and *flink-connector-jdbc-3.1.1-1.17.jar*
> excute in sql-client.sh 
> {code:java}
> explain SELECT * FROM default_catalog.default_database.a left join 
> bnpmp_mysql_test.gem_tmp.b FOR SYSTEM_TIME AS OF a.proctime on b.type = 0 and 
> a.ip = b.ip; {code}
> get the execution plan
> {code:java}
> ...
> == Optimized Execution Plan ==
> Calc(select=[ip, PROCTIME_MATERIALIZE(proctime) AS proctime, ip0, type])
> +- LookupJoin(table=[bnpmp_mysql_test.gem_tmp.b], joinType=[LeftOuterJoin], 
> lookup=[ip=ip], select=[ip, proctime, ip, CAST(0 AS INTEGER) AS type, CAST(ip 
> AS VARCHAR(2147483647)) AS ip0])
>    +- Calc(select=[ip, PROCTIME() AS proctime])
>       +- TableSourceScan(table=[[default_catalog, default_database, a]], 
> fields=[ip]){code}
>  
> excute same sql in sql-client with Flink 1.17.1/ 1.18.0 and 
> *flink-connector-jdbc-3.0.0-1.16.jar*
> get the execution plan
> {code:java}
> ...
> == Optimized Execution Plan ==
> Calc(select=[ip, PROCTIME_MATERIALIZE(proctime) AS proctime, ip0, type])
> +- LookupJoin(table=[bnpmp_mysql_test.gem_tmp.b], joinType=[LeftOuterJoin], 
> lookup=[type=0, ip=ip], where=[(type = 0)], select=[ip, proctime, ip, CAST(0 
> AS INTEGER) AS type, CAST(ip AS VARCHAR(2147483647)) AS ip0])
>    +- Calc(select=[ip, PROCTIME() AS proctime])
>       +- TableSourceScan(table=[[default_catalog, default_database, a]], 
> fields=[ip]) {code}
> with flink-connector-jdbc-3.1.1-1.17.jar,  the condition is 
> *lookup=[ip=ip]*
> with flink-connector-jdbc-3.0.0-1.16.jar ,  the condition is 
> *lookup=[type=0, ip=ip], where=[(type = 0)]*
>  
> In out real world production environment, this lead incorrect data output
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33365) Missing filter condition in execution plan containing lookup join with mysql jdbc connector

2023-11-02 Thread macdoor615 (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33365?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17782119#comment-17782119
 ] 

macdoor615 commented on FLINK-33365:


[~davidradl] you need 

*flink-shaded-guava-30.1.1-jre-16.1.jar* 

> Missing filter condition in execution plan containing lookup join with mysql 
> jdbc connector
> ---
>
> Key: FLINK-33365
> URL: https://issues.apache.org/jira/browse/FLINK-33365
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / JDBC
>Affects Versions: 1.18.0, 1.17.1
> Environment: Flink 1.17.1 & Flink 1.18.0 with 
> flink-connector-jdbc-3.1.1-1.17.jar
>Reporter: macdoor615
>Assignee: david radley
>Priority: Critical
> Attachments: flink-connector-jdbc-3.0.0-1.16.png, 
> flink-connector-jdbc-3.1.1-1.17.png
>
>
> create table in flink with sql-client.sh
> {code:java}
> CREATE TABLE default_catalog.default_database.a (
>   ip string, 
>   proctime as proctime()
> ) 
> WITH (
>   'connector' = 'datagen'
> );{code}
> create table in mysql
> {code:java}
> create table b (
>   ip varchar(20), 
>   type int
> );  {code}
>  
> Flink 1.17.1/ 1.18.0 and *flink-connector-jdbc-3.1.1-1.17.jar*
> excute in sql-client.sh 
> {code:java}
> explain SELECT * FROM default_catalog.default_database.a left join 
> bnpmp_mysql_test.gem_tmp.b FOR SYSTEM_TIME AS OF a.proctime on b.type = 0 and 
> a.ip = b.ip; {code}
> get the execution plan
> {code:java}
> ...
> == Optimized Execution Plan ==
> Calc(select=[ip, PROCTIME_MATERIALIZE(proctime) AS proctime, ip0, type])
> +- LookupJoin(table=[bnpmp_mysql_test.gem_tmp.b], joinType=[LeftOuterJoin], 
> lookup=[ip=ip], select=[ip, proctime, ip, CAST(0 AS INTEGER) AS type, CAST(ip 
> AS VARCHAR(2147483647)) AS ip0])
>    +- Calc(select=[ip, PROCTIME() AS proctime])
>       +- TableSourceScan(table=[[default_catalog, default_database, a]], 
> fields=[ip]){code}
>  
> excute same sql in sql-client with Flink 1.17.1/ 1.18.0 and 
> *flink-connector-jdbc-3.0.0-1.16.jar*
> get the execution plan
> {code:java}
> ...
> == Optimized Execution Plan ==
> Calc(select=[ip, PROCTIME_MATERIALIZE(proctime) AS proctime, ip0, type])
> +- LookupJoin(table=[bnpmp_mysql_test.gem_tmp.b], joinType=[LeftOuterJoin], 
> lookup=[type=0, ip=ip], where=[(type = 0)], select=[ip, proctime, ip, CAST(0 
> AS INTEGER) AS type, CAST(ip AS VARCHAR(2147483647)) AS ip0])
>    +- Calc(select=[ip, PROCTIME() AS proctime])
>       +- TableSourceScan(table=[[default_catalog, default_database, a]], 
> fields=[ip]) {code}
> with flink-connector-jdbc-3.1.1-1.17.jar,  the condition is 
> *lookup=[ip=ip]*
> with flink-connector-jdbc-3.0.0-1.16.jar ,  the condition is 
> *lookup=[type=0, ip=ip], where=[(type = 0)]*
>  
> In out real world production environment, this lead incorrect data output
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33365) Missing filter condition in execution plan containing lookup join with mysql jdbc connector

2023-11-01 Thread macdoor615 (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33365?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17781985#comment-17781985
 ] 

macdoor615 commented on FLINK-33365:


[~davidradl] I tested with flink 1.17.1 & 1.18.0. 

> Missing filter condition in execution plan containing lookup join with mysql 
> jdbc connector
> ---
>
> Key: FLINK-33365
> URL: https://issues.apache.org/jira/browse/FLINK-33365
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / JDBC
>Affects Versions: 1.18.0, 1.17.1
> Environment: Flink 1.17.1 & Flink 1.18.0 with 
> flink-connector-jdbc-3.1.1-1.17.jar
>Reporter: macdoor615
>Assignee: david radley
>Priority: Critical
> Attachments: flink-connector-jdbc-3.0.0-1.16.png, 
> flink-connector-jdbc-3.1.1-1.17.png
>
>
> create table in flink with sql-client.sh
> {code:java}
> CREATE TABLE default_catalog.default_database.a (
>   ip string, 
>   proctime as proctime()
> ) 
> WITH (
>   'connector' = 'datagen'
> );{code}
> create table in mysql
> {code:java}
> create table b (
>   ip varchar(20), 
>   type int
> );  {code}
>  
> Flink 1.17.1/ 1.18.0 and *flink-connector-jdbc-3.1.1-1.17.jar*
> excute in sql-client.sh 
> {code:java}
> explain SELECT * FROM default_catalog.default_database.a left join 
> bnpmp_mysql_test.gem_tmp.b FOR SYSTEM_TIME AS OF a.proctime on b.type = 0 and 
> a.ip = b.ip; {code}
> get the execution plan
> {code:java}
> ...
> == Optimized Execution Plan ==
> Calc(select=[ip, PROCTIME_MATERIALIZE(proctime) AS proctime, ip0, type])
> +- LookupJoin(table=[bnpmp_mysql_test.gem_tmp.b], joinType=[LeftOuterJoin], 
> lookup=[ip=ip], select=[ip, proctime, ip, CAST(0 AS INTEGER) AS type, CAST(ip 
> AS VARCHAR(2147483647)) AS ip0])
>    +- Calc(select=[ip, PROCTIME() AS proctime])
>       +- TableSourceScan(table=[[default_catalog, default_database, a]], 
> fields=[ip]){code}
>  
> excute same sql in sql-client with Flink 1.17.1/ 1.18.0 and 
> *flink-connector-jdbc-3.0.0-1.16.jar*
> get the execution plan
> {code:java}
> ...
> == Optimized Execution Plan ==
> Calc(select=[ip, PROCTIME_MATERIALIZE(proctime) AS proctime, ip0, type])
> +- LookupJoin(table=[bnpmp_mysql_test.gem_tmp.b], joinType=[LeftOuterJoin], 
> lookup=[type=0, ip=ip], where=[(type = 0)], select=[ip, proctime, ip, CAST(0 
> AS INTEGER) AS type, CAST(ip AS VARCHAR(2147483647)) AS ip0])
>    +- Calc(select=[ip, PROCTIME() AS proctime])
>       +- TableSourceScan(table=[[default_catalog, default_database, a]], 
> fields=[ip]) {code}
> with flink-connector-jdbc-3.1.1-1.17.jar,  the condition is 
> *lookup=[ip=ip]*
> with flink-connector-jdbc-3.0.0-1.16.jar ,  the condition is 
> *lookup=[type=0, ip=ip], where=[(type = 0)]*
>  
> In out real world production environment, this lead incorrect data output
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33365) Missing filter condition in execution plan containing lookup join with mysql jdbc connector

2023-11-01 Thread david radley (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33365?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17781843#comment-17781843
 ] 

david radley commented on FLINK-33365:
--

[~macdoor615] I switched out to use the 3.10 jar but hot some compatibility 
issues. I assume you are testing that with Flink 3.16? Could you include the 
output of the explain when the select is working please?

> Missing filter condition in execution plan containing lookup join with mysql 
> jdbc connector
> ---
>
> Key: FLINK-33365
> URL: https://issues.apache.org/jira/browse/FLINK-33365
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / JDBC
>Affects Versions: 1.18.0, 1.17.1
> Environment: Flink 1.17.1 & Flink 1.18.0 with 
> flink-connector-jdbc-3.1.1-1.17.jar
>Reporter: macdoor615
>Assignee: david radley
>Priority: Critical
> Attachments: flink-connector-jdbc-3.0.0-1.16.png, 
> flink-connector-jdbc-3.1.1-1.17.png
>
>
> create table in flink with sql-client.sh
> {code:java}
> CREATE TABLE default_catalog.default_database.a (
>   ip string, 
>   proctime as proctime()
> ) 
> WITH (
>   'connector' = 'datagen'
> );{code}
> create table in mysql
> {code:java}
> create table b (
>   ip varchar(20), 
>   type int
> );  {code}
>  
> Flink 1.17.1/ 1.18.0 and *flink-connector-jdbc-3.1.1-1.17.jar*
> excute in sql-client.sh 
> {code:java}
> explain SELECT * FROM default_catalog.default_database.a left join 
> bnpmp_mysql_test.gem_tmp.b FOR SYSTEM_TIME AS OF a.proctime on b.type = 0 and 
> a.ip = b.ip; {code}
> get the execution plan
> {code:java}
> ...
> == Optimized Execution Plan ==
> Calc(select=[ip, PROCTIME_MATERIALIZE(proctime) AS proctime, ip0, type])
> +- LookupJoin(table=[bnpmp_mysql_test.gem_tmp.b], joinType=[LeftOuterJoin], 
> lookup=[ip=ip], select=[ip, proctime, ip, CAST(0 AS INTEGER) AS type, CAST(ip 
> AS VARCHAR(2147483647)) AS ip0])
>    +- Calc(select=[ip, PROCTIME() AS proctime])
>       +- TableSourceScan(table=[[default_catalog, default_database, a]], 
> fields=[ip]){code}
>  
> excute same sql in sql-client with Flink 1.17.1/ 1.18.0 and 
> *flink-connector-jdbc-3.0.0-1.16.jar*
> get the execution plan
> {code:java}
> ...
> == Optimized Execution Plan ==
> Calc(select=[ip, PROCTIME_MATERIALIZE(proctime) AS proctime, ip0, type])
> +- LookupJoin(table=[bnpmp_mysql_test.gem_tmp.b], joinType=[LeftOuterJoin], 
> lookup=[type=0, ip=ip], where=[(type = 0)], select=[ip, proctime, ip, CAST(0 
> AS INTEGER) AS type, CAST(ip AS VARCHAR(2147483647)) AS ip0])
>    +- Calc(select=[ip, PROCTIME() AS proctime])
>       +- TableSourceScan(table=[[default_catalog, default_database, a]], 
> fields=[ip]) {code}
> with flink-connector-jdbc-3.1.1-1.17.jar,  the condition is 
> *lookup=[ip=ip]*
> with flink-connector-jdbc-3.0.0-1.16.jar ,  the condition is 
> *lookup=[type=0, ip=ip], where=[(type = 0)]*
>  
> In out real world production environment, this lead incorrect data output
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33365) Missing filter condition in execution plan containing lookup join with mysql jdbc connector

2023-11-01 Thread david radley (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33365?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17781829#comment-17781829
 ] 

david radley commented on FLINK-33365:
--

I have managed to recreate your paimon scenario using the latest drivers and 
MariaDB on the Mac. I will look into this issue - as this is the only one I can 
recreate. 

> Missing filter condition in execution plan containing lookup join with mysql 
> jdbc connector
> ---
>
> Key: FLINK-33365
> URL: https://issues.apache.org/jira/browse/FLINK-33365
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / JDBC
>Affects Versions: 1.18.0, 1.17.1
> Environment: Flink 1.17.1 & Flink 1.18.0 with 
> flink-connector-jdbc-3.1.1-1.17.jar
>Reporter: macdoor615
>Assignee: david radley
>Priority: Critical
> Attachments: flink-connector-jdbc-3.0.0-1.16.png, 
> flink-connector-jdbc-3.1.1-1.17.png
>
>
> create table in flink with sql-client.sh
> {code:java}
> CREATE TABLE default_catalog.default_database.a (
>   ip string, 
>   proctime as proctime()
> ) 
> WITH (
>   'connector' = 'datagen'
> );{code}
> create table in mysql
> {code:java}
> create table b (
>   ip varchar(20), 
>   type int
> );  {code}
>  
> Flink 1.17.1/ 1.18.0 and *flink-connector-jdbc-3.1.1-1.17.jar*
> excute in sql-client.sh 
> {code:java}
> explain SELECT * FROM default_catalog.default_database.a left join 
> bnpmp_mysql_test.gem_tmp.b FOR SYSTEM_TIME AS OF a.proctime on b.type = 0 and 
> a.ip = b.ip; {code}
> get the execution plan
> {code:java}
> ...
> == Optimized Execution Plan ==
> Calc(select=[ip, PROCTIME_MATERIALIZE(proctime) AS proctime, ip0, type])
> +- LookupJoin(table=[bnpmp_mysql_test.gem_tmp.b], joinType=[LeftOuterJoin], 
> lookup=[ip=ip], select=[ip, proctime, ip, CAST(0 AS INTEGER) AS type, CAST(ip 
> AS VARCHAR(2147483647)) AS ip0])
>    +- Calc(select=[ip, PROCTIME() AS proctime])
>       +- TableSourceScan(table=[[default_catalog, default_database, a]], 
> fields=[ip]){code}
>  
> excute same sql in sql-client with Flink 1.17.1/ 1.18.0 and 
> *flink-connector-jdbc-3.0.0-1.16.jar*
> get the execution plan
> {code:java}
> ...
> == Optimized Execution Plan ==
> Calc(select=[ip, PROCTIME_MATERIALIZE(proctime) AS proctime, ip0, type])
> +- LookupJoin(table=[bnpmp_mysql_test.gem_tmp.b], joinType=[LeftOuterJoin], 
> lookup=[type=0, ip=ip], where=[(type = 0)], select=[ip, proctime, ip, CAST(0 
> AS INTEGER) AS type, CAST(ip AS VARCHAR(2147483647)) AS ip0])
>    +- Calc(select=[ip, PROCTIME() AS proctime])
>       +- TableSourceScan(table=[[default_catalog, default_database, a]], 
> fields=[ip]) {code}
> with flink-connector-jdbc-3.1.1-1.17.jar,  the condition is 
> *lookup=[ip=ip]*
> with flink-connector-jdbc-3.0.0-1.16.jar ,  the condition is 
> *lookup=[type=0, ip=ip], where=[(type = 0)]*
>  
> In out real world production environment, this lead incorrect data output
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33365) Missing filter condition in execution plan containing lookup join with mysql jdbc connector

2023-11-01 Thread macdoor615 (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33365?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17781804#comment-17781804
 ] 

macdoor615 commented on FLINK-33365:


[~davidradl] I adopted released version in my environment, and I followed Flink 
docuement 
([https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/connectors/table/jdbc/])
 to get JDBC SQL Connector. 

Let's test with release version.

[https://repo.maven.org/maven2/org/apache/flink/flink-connector-jdbc/3.0.0-1.16/flink-connector-jdbc-3.0.0-1.16.jar|https://repo1.maven.org/maven2/org/apache/flink/flink-connector-jdbc/3.0.0-1.16/flink-connector-jdbc-3.0.0-1.16.jar]

[https://repo.maven.org/maven2/org/apache/flink/flink-connector-jdbc/3.1.1-1.17/flink-connector-jdbc-3.1.1-1.17.jar|https://repo1.maven.org/maven2/org/apache/flink/flink-connector-jdbc/3.1.1-1.17/flink-connector-jdbc-3.1.1-1.17.jar]

> Missing filter condition in execution plan containing lookup join with mysql 
> jdbc connector
> ---
>
> Key: FLINK-33365
> URL: https://issues.apache.org/jira/browse/FLINK-33365
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / JDBC
>Affects Versions: 1.18.0, 1.17.1
> Environment: Flink 1.17.1 & Flink 1.18.0 with 
> flink-connector-jdbc-3.1.1-1.17.jar
>Reporter: macdoor615
>Assignee: david radley
>Priority: Critical
> Attachments: flink-connector-jdbc-3.0.0-1.16.png, 
> flink-connector-jdbc-3.1.1-1.17.png
>
>
> create table in flink with sql-client.sh
> {code:java}
> CREATE TABLE default_catalog.default_database.a (
>   ip string, 
>   proctime as proctime()
> ) 
> WITH (
>   'connector' = 'datagen'
> );{code}
> create table in mysql
> {code:java}
> create table b (
>   ip varchar(20), 
>   type int
> );  {code}
>  
> Flink 1.17.1/ 1.18.0 and *flink-connector-jdbc-3.1.1-1.17.jar*
> excute in sql-client.sh 
> {code:java}
> explain SELECT * FROM default_catalog.default_database.a left join 
> bnpmp_mysql_test.gem_tmp.b FOR SYSTEM_TIME AS OF a.proctime on b.type = 0 and 
> a.ip = b.ip; {code}
> get the execution plan
> {code:java}
> ...
> == Optimized Execution Plan ==
> Calc(select=[ip, PROCTIME_MATERIALIZE(proctime) AS proctime, ip0, type])
> +- LookupJoin(table=[bnpmp_mysql_test.gem_tmp.b], joinType=[LeftOuterJoin], 
> lookup=[ip=ip], select=[ip, proctime, ip, CAST(0 AS INTEGER) AS type, CAST(ip 
> AS VARCHAR(2147483647)) AS ip0])
>    +- Calc(select=[ip, PROCTIME() AS proctime])
>       +- TableSourceScan(table=[[default_catalog, default_database, a]], 
> fields=[ip]){code}
>  
> excute same sql in sql-client with Flink 1.17.1/ 1.18.0 and 
> *flink-connector-jdbc-3.0.0-1.16.jar*
> get the execution plan
> {code:java}
> ...
> == Optimized Execution Plan ==
> Calc(select=[ip, PROCTIME_MATERIALIZE(proctime) AS proctime, ip0, type])
> +- LookupJoin(table=[bnpmp_mysql_test.gem_tmp.b], joinType=[LeftOuterJoin], 
> lookup=[type=0, ip=ip], where=[(type = 0)], select=[ip, proctime, ip, CAST(0 
> AS INTEGER) AS type, CAST(ip AS VARCHAR(2147483647)) AS ip0])
>    +- Calc(select=[ip, PROCTIME() AS proctime])
>       +- TableSourceScan(table=[[default_catalog, default_database, a]], 
> fields=[ip]) {code}
> with flink-connector-jdbc-3.1.1-1.17.jar,  the condition is 
> *lookup=[ip=ip]*
> with flink-connector-jdbc-3.0.0-1.16.jar ,  the condition is 
> *lookup=[type=0, ip=ip], where=[(type = 0)]*
>  
> In out real world production environment, this lead incorrect data output
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33365) Missing filter condition in execution plan containing lookup join with mysql jdbc connector

2023-11-01 Thread david radley (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33365?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17781757#comment-17781757
 ] 

david radley commented on FLINK-33365:
--

I did a further test- I rebuild 3.1.1 (in case there was something fixing it in 
main) with the MariaDB change and it did not recreate the issue.  

> Missing filter condition in execution plan containing lookup join with mysql 
> jdbc connector
> ---
>
> Key: FLINK-33365
> URL: https://issues.apache.org/jira/browse/FLINK-33365
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / JDBC
>Affects Versions: 1.18.0, 1.17.1
> Environment: Flink 1.17.1 & Flink 1.18.0 with 
> flink-connector-jdbc-3.1.1-1.17.jar
>Reporter: macdoor615
>Assignee: david radley
>Priority: Critical
> Attachments: flink-connector-jdbc-3.0.0-1.16.png, 
> flink-connector-jdbc-3.1.1-1.17.png
>
>
> create table in flink with sql-client.sh
> {code:java}
> CREATE TABLE default_catalog.default_database.a (
>   ip string, 
>   proctime as proctime()
> ) 
> WITH (
>   'connector' = 'datagen'
> );{code}
> create table in mysql
> {code:java}
> create table b (
>   ip varchar(20), 
>   type int
> );  {code}
>  
> Flink 1.17.1/ 1.18.0 and *flink-connector-jdbc-3.1.1-1.17.jar*
> excute in sql-client.sh 
> {code:java}
> explain SELECT * FROM default_catalog.default_database.a left join 
> bnpmp_mysql_test.gem_tmp.b FOR SYSTEM_TIME AS OF a.proctime on b.type = 0 and 
> a.ip = b.ip; {code}
> get the execution plan
> {code:java}
> ...
> == Optimized Execution Plan ==
> Calc(select=[ip, PROCTIME_MATERIALIZE(proctime) AS proctime, ip0, type])
> +- LookupJoin(table=[bnpmp_mysql_test.gem_tmp.b], joinType=[LeftOuterJoin], 
> lookup=[ip=ip], select=[ip, proctime, ip, CAST(0 AS INTEGER) AS type, CAST(ip 
> AS VARCHAR(2147483647)) AS ip0])
>    +- Calc(select=[ip, PROCTIME() AS proctime])
>       +- TableSourceScan(table=[[default_catalog, default_database, a]], 
> fields=[ip]){code}
>  
> excute same sql in sql-client with Flink 1.17.1/ 1.18.0 and 
> *flink-connector-jdbc-3.0.0-1.16.jar*
> get the execution plan
> {code:java}
> ...
> == Optimized Execution Plan ==
> Calc(select=[ip, PROCTIME_MATERIALIZE(proctime) AS proctime, ip0, type])
> +- LookupJoin(table=[bnpmp_mysql_test.gem_tmp.b], joinType=[LeftOuterJoin], 
> lookup=[type=0, ip=ip], where=[(type = 0)], select=[ip, proctime, ip, CAST(0 
> AS INTEGER) AS type, CAST(ip AS VARCHAR(2147483647)) AS ip0])
>    +- Calc(select=[ip, PROCTIME() AS proctime])
>       +- TableSourceScan(table=[[default_catalog, default_database, a]], 
> fields=[ip]) {code}
> with flink-connector-jdbc-3.1.1-1.17.jar,  the condition is 
> *lookup=[ip=ip]*
> with flink-connector-jdbc-3.0.0-1.16.jar ,  the condition is 
> *lookup=[type=0, ip=ip], where=[(type = 0)]*
>  
> In out real world production environment, this lead incorrect data output
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33365) Missing filter condition in execution plan containing lookup join with mysql jdbc connector

2023-11-01 Thread david radley (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33365?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17781712#comment-17781712
 ] 

david radley commented on FLINK-33365:
--

 

[~macdoor615] I am running on a Mac and have Mariadb (I tweaked the dialect 
factory to allow mariadb in the jdbc url - see  
https://issues.apache.org/jira/browse/FLINK-28433 ) and the latest Flink jdbc 
connector (master 3.2-SNAPSHOT that I built from source) and I cannot recreate 
the problem you are seeing (I see the filter as expected in the explain).

 

Can you try: 
 * the latest Flink JDBC connector 3.2-SNAPSHOT built from source
 * the latest mySQL JDBC driver.  I used the platform independent 
[https://dev.mysql.com/downloads/connector/j/]  . This seems to be recommended 
[https://blogs.oracle.com/mysql/post/mysql-connectorj-has-new-maven-coordinates]([https://blogs.oracle.com/mysql/post/mysql-connectorj-has-new-maven-coordinates).]
 I have raised issue https://issues.apache.org/jira/browse/FLINK-33384 to 
change to docs and code that reference the old deprecated JDBC driver. 

I hope these suggestions resoilve this for you; if not I can look to setup a 
Linux image with mySQL; which JDBC driver do you see the issue on?

> Missing filter condition in execution plan containing lookup join with mysql 
> jdbc connector
> ---
>
> Key: FLINK-33365
> URL: https://issues.apache.org/jira/browse/FLINK-33365
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / JDBC
>Affects Versions: 1.18.0, 1.17.1
> Environment: Flink 1.17.1 & Flink 1.18.0 with 
> flink-connector-jdbc-3.1.1-1.17.jar
>Reporter: macdoor615
>Assignee: david radley
>Priority: Critical
> Attachments: flink-connector-jdbc-3.0.0-1.16.png, 
> flink-connector-jdbc-3.1.1-1.17.png
>
>
> create table in flink with sql-client.sh
> {code:java}
> CREATE TABLE default_catalog.default_database.a (
>   ip string, 
>   proctime as proctime()
> ) 
> WITH (
>   'connector' = 'datagen'
> );{code}
> create table in mysql
> {code:java}
> create table b (
>   ip varchar(20), 
>   type int
> );  {code}
>  
> Flink 1.17.1/ 1.18.0 and *flink-connector-jdbc-3.1.1-1.17.jar*
> excute in sql-client.sh 
> {code:java}
> explain SELECT * FROM default_catalog.default_database.a left join 
> bnpmp_mysql_test.gem_tmp.b FOR SYSTEM_TIME AS OF a.proctime on b.type = 0 and 
> a.ip = b.ip; {code}
> get the execution plan
> {code:java}
> ...
> == Optimized Execution Plan ==
> Calc(select=[ip, PROCTIME_MATERIALIZE(proctime) AS proctime, ip0, type])
> +- LookupJoin(table=[bnpmp_mysql_test.gem_tmp.b], joinType=[LeftOuterJoin], 
> lookup=[ip=ip], select=[ip, proctime, ip, CAST(0 AS INTEGER) AS type, CAST(ip 
> AS VARCHAR(2147483647)) AS ip0])
>    +- Calc(select=[ip, PROCTIME() AS proctime])
>       +- TableSourceScan(table=[[default_catalog, default_database, a]], 
> fields=[ip]){code}
>  
> excute same sql in sql-client with Flink 1.17.1/ 1.18.0 and 
> *flink-connector-jdbc-3.0.0-1.16.jar*
> get the execution plan
> {code:java}
> ...
> == Optimized Execution Plan ==
> Calc(select=[ip, PROCTIME_MATERIALIZE(proctime) AS proctime, ip0, type])
> +- LookupJoin(table=[bnpmp_mysql_test.gem_tmp.b], joinType=[LeftOuterJoin], 
> lookup=[type=0, ip=ip], where=[(type = 0)], select=[ip, proctime, ip, CAST(0 
> AS INTEGER) AS type, CAST(ip AS VARCHAR(2147483647)) AS ip0])
>    +- Calc(select=[ip, PROCTIME() AS proctime])
>       +- TableSourceScan(table=[[default_catalog, default_database, a]], 
> fields=[ip]) {code}
> with flink-connector-jdbc-3.1.1-1.17.jar,  the condition is 
> *lookup=[ip=ip]*
> with flink-connector-jdbc-3.0.0-1.16.jar ,  the condition is 
> *lookup=[type=0, ip=ip], where=[(type = 0)]*
>  
> In out real world production environment, this lead incorrect data output
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33365) Missing filter condition in execution plan containing lookup join with mysql jdbc connector

2023-10-31 Thread david radley (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33365?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17781275#comment-17781275
 ] 

david radley commented on FLINK-33365:
--

[~macdoor615] I am going to dig into this later this week.   

> Missing filter condition in execution plan containing lookup join with mysql 
> jdbc connector
> ---
>
> Key: FLINK-33365
> URL: https://issues.apache.org/jira/browse/FLINK-33365
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / JDBC
>Affects Versions: 1.18.0, 1.17.1
> Environment: Flink 1.17.1 & Flink 1.18.0 with 
> flink-connector-jdbc-3.1.1-1.17.jar
>Reporter: macdoor615
>Assignee: david radley
>Priority: Critical
> Attachments: flink-connector-jdbc-3.0.0-1.16.png, 
> flink-connector-jdbc-3.1.1-1.17.png
>
>
> create table in flink with sql-client.sh
> {code:java}
> CREATE TABLE default_catalog.default_database.a (
>   ip string, 
>   proctime as proctime()
> ) 
> WITH (
>   'connector' = 'datagen'
> );{code}
> create table in mysql
> {code:java}
> create table b (
>   ip varchar(20), 
>   type int
> );  {code}
>  
> Flink 1.17.1/ 1.18.0 and *flink-connector-jdbc-3.1.1-1.17.jar*
> excute in sql-client.sh 
> {code:java}
> explain SELECT * FROM default_catalog.default_database.a left join 
> bnpmp_mysql_test.gem_tmp.b FOR SYSTEM_TIME AS OF a.proctime on b.type = 0 and 
> a.ip = b.ip; {code}
> get the execution plan
> {code:java}
> ...
> == Optimized Execution Plan ==
> Calc(select=[ip, PROCTIME_MATERIALIZE(proctime) AS proctime, ip0, type])
> +- LookupJoin(table=[bnpmp_mysql_test.gem_tmp.b], joinType=[LeftOuterJoin], 
> lookup=[ip=ip], select=[ip, proctime, ip, CAST(0 AS INTEGER) AS type, CAST(ip 
> AS VARCHAR(2147483647)) AS ip0])
>    +- Calc(select=[ip, PROCTIME() AS proctime])
>       +- TableSourceScan(table=[[default_catalog, default_database, a]], 
> fields=[ip]){code}
>  
> excute same sql in sql-client with Flink 1.17.1/ 1.18.0 and 
> *flink-connector-jdbc-3.0.0-1.16.jar*
> get the execution plan
> {code:java}
> ...
> == Optimized Execution Plan ==
> Calc(select=[ip, PROCTIME_MATERIALIZE(proctime) AS proctime, ip0, type])
> +- LookupJoin(table=[bnpmp_mysql_test.gem_tmp.b], joinType=[LeftOuterJoin], 
> lookup=[type=0, ip=ip], where=[(type = 0)], select=[ip, proctime, ip, CAST(0 
> AS INTEGER) AS type, CAST(ip AS VARCHAR(2147483647)) AS ip0])
>    +- Calc(select=[ip, PROCTIME() AS proctime])
>       +- TableSourceScan(table=[[default_catalog, default_database, a]], 
> fields=[ip]) {code}
> with flink-connector-jdbc-3.1.1-1.17.jar,  the condition is 
> *lookup=[ip=ip]*
> with flink-connector-jdbc-3.0.0-1.16.jar ,  the condition is 
> *lookup=[type=0, ip=ip], where=[(type = 0)]*
>  
> In out real world production environment, this lead incorrect data output
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33365) Missing filter condition in execution plan containing lookup join with mysql jdbc connector

2023-10-27 Thread macdoor615 (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33365?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17780456#comment-17780456
 ] 

macdoor615 commented on FLINK-33365:


[~davidradl] I create another case

1. create a paimon table and insert one line of data

 
{code:java}
create table svc1_paimon_prod.dxx.a( ip string, proctime as proctime() );
insert into  svc1_paimon_prod.dxx.a values ('10.10.10.10'); {code}
2. create a mysql table and insert two lines of data

 

 
{code:java}
create table gem_tmp.b (ip varchar(20), type int);
insert into gem_tmp.b values ('10.10.10.10', 0);
insert into gem_tmp.b values ('10.10.10.10', 1); {code}
3. execute sql in sql-client.sh with flink-connector-jdbc-3.0.0-1.16.jar and. 
flink-connector-jdbc-3.1.1-1.17.jar 

 

 
{code:java}
SELECT * FROM svc1_paimon_prod.dxx.a left join bnpmp_mysql_test.gem_tmp.b FOR 
SYSTEM_TIME AS OF a.proctime on b.type = 0 and a.ip = b.ip; {code}
4. screenshot flink-connector-jdbc-3.0.0-1.16.jar, I get one line output ,this 
is correct

5. screenshot flink-connector-jdbc-3.1.1-1.17.jar, I get two lines output, this 
is incorrect with wrong execution plan

 

> Missing filter condition in execution plan containing lookup join with mysql 
> jdbc connector
> ---
>
> Key: FLINK-33365
> URL: https://issues.apache.org/jira/browse/FLINK-33365
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / JDBC
>Affects Versions: 1.18.0, 1.17.1
> Environment: Flink 1.17.1 & Flink 1.18.0 with 
> flink-connector-jdbc-3.1.1-1.17.jar
>Reporter: macdoor615
>Assignee: david radley
>Priority: Critical
> Attachments: flink-connector-jdbc-3.0.0-1.16.png, 
> flink-connector-jdbc-3.1.1-1.17.png
>
>
> create table in flink with sql-client.sh
> {code:java}
> CREATE TABLE default_catalog.default_database.a (
>   ip string, 
>   proctime as proctime()
> ) 
> WITH (
>   'connector' = 'datagen'
> );{code}
> create table in mysql
> {code:java}
> create table b (
>   ip varchar(20), 
>   type int
> );  {code}
>  
> Flink 1.17.1/ 1.18.0 and *flink-connector-jdbc-3.1.1-1.17.jar*
> excute in sql-client.sh 
> {code:java}
> explain SELECT * FROM default_catalog.default_database.a left join 
> bnpmp_mysql_test.gem_tmp.b FOR SYSTEM_TIME AS OF a.proctime on b.type = 0 and 
> a.ip = b.ip; {code}
> get the execution plan
> {code:java}
> ...
> == Optimized Execution Plan ==
> Calc(select=[ip, PROCTIME_MATERIALIZE(proctime) AS proctime, ip0, type])
> +- LookupJoin(table=[bnpmp_mysql_test.gem_tmp.b], joinType=[LeftOuterJoin], 
> lookup=[ip=ip], select=[ip, proctime, ip, CAST(0 AS INTEGER) AS type, CAST(ip 
> AS VARCHAR(2147483647)) AS ip0])
>    +- Calc(select=[ip, PROCTIME() AS proctime])
>       +- TableSourceScan(table=[[default_catalog, default_database, a]], 
> fields=[ip]){code}
>  
> excute same sql in sql-client with Flink 1.17.1/ 1.18.0 and 
> *flink-connector-jdbc-3.0.0-1.16.jar*
> get the execution plan
> {code:java}
> ...
> == Optimized Execution Plan ==
> Calc(select=[ip, PROCTIME_MATERIALIZE(proctime) AS proctime, ip0, type])
> +- LookupJoin(table=[bnpmp_mysql_test.gem_tmp.b], joinType=[LeftOuterJoin], 
> lookup=[type=0, ip=ip], where=[(type = 0)], select=[ip, proctime, ip, CAST(0 
> AS INTEGER) AS type, CAST(ip AS VARCHAR(2147483647)) AS ip0])
>    +- Calc(select=[ip, PROCTIME() AS proctime])
>       +- TableSourceScan(table=[[default_catalog, default_database, a]], 
> fields=[ip]) {code}
> with flink-connector-jdbc-3.1.1-1.17.jar,  the condition is 
> *lookup=[ip=ip]*
> with flink-connector-jdbc-3.0.0-1.16.jar ,  the condition is 
> *lookup=[type=0, ip=ip], where=[(type = 0)]*
>  
> In out real world production environment, this lead incorrect data output
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33365) Missing filter condition in execution plan containing lookup join with mysql jdbc connector

2023-10-27 Thread macdoor615 (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33365?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17780427#comment-17780427
 ] 

macdoor615 commented on FLINK-33365:


[~davidradl] I'm not sure if your test case represents my situation. What I see 
is the exact same sql, using different versions of Flink JDBC connectors, 
getting different execution plans. Can you reproduce my situation?

> Missing filter condition in execution plan containing lookup join with mysql 
> jdbc connector
> ---
>
> Key: FLINK-33365
> URL: https://issues.apache.org/jira/browse/FLINK-33365
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / JDBC
>Affects Versions: 1.18.0, 1.17.1
> Environment: Flink 1.17.1 & Flink 1.18.0 with 
> flink-connector-jdbc-3.1.1-1.17.jar
>Reporter: macdoor615
>Assignee: david radley
>Priority: Critical
>
> create table in flink with sql-client.sh
> {code:java}
> CREATE TABLE default_catalog.default_database.a (
>   ip string, 
>   proctime as proctime()
> ) 
> WITH (
>   'connector' = 'datagen'
> );{code}
> create table in mysql
> {code:java}
> create table b (
>   ip varchar(20), 
>   type int
> );  {code}
>  
> Flink 1.17.1/ 1.18.0 and *flink-connector-jdbc-3.1.1-1.17.jar*
> excute in sql-client.sh 
> {code:java}
> explain SELECT * FROM default_catalog.default_database.a left join 
> bnpmp_mysql_test.gem_tmp.b FOR SYSTEM_TIME AS OF a.proctime on b.type = 0 and 
> a.ip = b.ip; {code}
> get the execution plan
> {code:java}
> ...
> == Optimized Execution Plan ==
> Calc(select=[ip, PROCTIME_MATERIALIZE(proctime) AS proctime, ip0, type])
> +- LookupJoin(table=[bnpmp_mysql_test.gem_tmp.b], joinType=[LeftOuterJoin], 
> lookup=[ip=ip], select=[ip, proctime, ip, CAST(0 AS INTEGER) AS type, CAST(ip 
> AS VARCHAR(2147483647)) AS ip0])
>    +- Calc(select=[ip, PROCTIME() AS proctime])
>       +- TableSourceScan(table=[[default_catalog, default_database, a]], 
> fields=[ip]){code}
>  
> excute same sql in sql-client with Flink 1.17.1/ 1.18.0 and 
> *flink-connector-jdbc-3.0.0-1.16.jar*
> get the execution plan
> {code:java}
> ...
> == Optimized Execution Plan ==
> Calc(select=[ip, PROCTIME_MATERIALIZE(proctime) AS proctime, ip0, type])
> +- LookupJoin(table=[bnpmp_mysql_test.gem_tmp.b], joinType=[LeftOuterJoin], 
> lookup=[type=0, ip=ip], where=[(type = 0)], select=[ip, proctime, ip, CAST(0 
> AS INTEGER) AS type, CAST(ip AS VARCHAR(2147483647)) AS ip0])
>    +- Calc(select=[ip, PROCTIME() AS proctime])
>       +- TableSourceScan(table=[[default_catalog, default_database, a]], 
> fields=[ip]) {code}
> with flink-connector-jdbc-3.1.1-1.17.jar,  the condition is 
> *lookup=[ip=ip]*
> with flink-connector-jdbc-3.0.0-1.16.jar ,  the condition is 
> *lookup=[type=0, ip=ip], where=[(type = 0)]*
>  
> In out real world production environment, this lead incorrect data output
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33365) Missing filter condition in execution plan containing lookup join with mysql jdbc connector

2023-10-27 Thread david radley (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33365?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17780403#comment-17780403
 ] 

david radley commented on FLINK-33365:
--

[~macdoor615] I notice that I can run 
JdbcDynamicTableSourceITCase method , testLookupJoin in my Intellij workspace, 
and then choose mySQL as the implmentation.
As is this runs correctly.
 
I then change the select to something like your failing case:
 
_List collected =_
_executeQuery(_
_"SELECT S.id, S.name, D.id, D.timestamp6_col, D.decimal_col FROM value_source"_
_+ " AS S JOIN jdbc_lookup for system_time as of S.proctime AS D ON S.id = D.id 
AND S.id = 2");_
 
and rerun the test, it fails with error:
org.apache.flink.table.api.TableException: Temporal table join requires an 
equality condition on fields of table 
[default_catalog.default_database.jdbc_lookup].
 
There are some hits in google indicating that this is a restriction for 
temporal table joins. This Exception comes from a validate 
method in commonExecLookupJoin.java. 
 
Do you think this test is representative of your case? If not we can try other 
selects here, I have tried a few variants.   
 
I am going to see if this happens at the back level version of the JDBC 
connector.
 
 
 
 
 
 
 
 
 
 
 

> Missing filter condition in execution plan containing lookup join with mysql 
> jdbc connector
> ---
>
> Key: FLINK-33365
> URL: https://issues.apache.org/jira/browse/FLINK-33365
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / JDBC
>Affects Versions: 1.18.0, 1.17.1
> Environment: Flink 1.17.1 & Flink 1.18.0 with 
> flink-connector-jdbc-3.1.1-1.17.jar
>Reporter: macdoor615
>Assignee: david radley
>Priority: Critical
>
> create table in flink with sql-client.sh
> {code:java}
> CREATE TABLE default_catalog.default_database.a (
>   ip string, 
>   proctime as proctime()
> ) 
> WITH (
>   'connector' = 'datagen'
> );{code}
> create table in mysql
> {code:java}
> create table b (
>   ip varchar(20), 
>   type int
> );  {code}
>  
> Flink 1.17.1/ 1.18.0 and *flink-connector-jdbc-3.1.1-1.17.jar*
> excute in sql-client.sh 
> {code:java}
> explain SELECT * FROM default_catalog.default_database.a left join 
> bnpmp_mysql_test.gem_tmp.b FOR SYSTEM_TIME AS OF a.proctime on b.type = 0 and 
> a.ip = b.ip; {code}
> get the execution plan
> {code:java}
> ...
> == Optimized Execution Plan ==
> Calc(select=[ip, PROCTIME_MATERIALIZE(proctime) AS proctime, ip0, type])
> +- LookupJoin(table=[bnpmp_mysql_test.gem_tmp.b], joinType=[LeftOuterJoin], 
> lookup=[ip=ip], select=[ip, proctime, ip, CAST(0 AS INTEGER) AS type, CAST(ip 
> AS VARCHAR(2147483647)) AS ip0])
>    +- Calc(select=[ip, PROCTIME() AS proctime])
>       +- TableSourceScan(table=[[default_catalog, default_database, a]], 
> fields=[ip]){code}
>  
> excute same sql in sql-client with Flink 1.17.1/ 1.18.0 and 
> *flink-connector-jdbc-3.0.0-1.16.jar*
> get the execution plan
> {code:java}
> ...
> == Optimized Execution Plan ==
> Calc(select=[ip, PROCTIME_MATERIALIZE(proctime) AS proctime, ip0, type])
> +- LookupJoin(table=[bnpmp_mysql_test.gem_tmp.b], joinType=[LeftOuterJoin], 
> lookup=[type=0, ip=ip], where=[(type = 0)], select=[ip, proctime, ip, CAST(0 
> AS INTEGER) AS type, CAST(ip AS VARCHAR(2147483647)) AS ip0])
>    +- Calc(select=[ip, PROCTIME() AS proctime])
>       +- TableSourceScan(table=[[default_catalog, default_database, a]], 
> fields=[ip]) {code}
> with flink-connector-jdbc-3.1.1-1.17.jar,  the condition is 
> *lookup=[ip=ip]*
> with flink-connector-jdbc-3.0.0-1.16.jar ,  the condition is 
> *lookup=[type=0, ip=ip], where=[(type = 0)]*
>  
> In out real world production environment, this lead incorrect data output
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33365) Missing filter condition in execution plan containing lookup join with mysql jdbc connector

2023-10-27 Thread macdoor615 (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33365?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17780276#comment-17780276
 ] 

macdoor615 commented on FLINK-33365:


Maybe it is caused by jdbc connector.

> Missing filter condition in execution plan containing lookup join with mysql 
> jdbc connector
> ---
>
> Key: FLINK-33365
> URL: https://issues.apache.org/jira/browse/FLINK-33365
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / JDBC
>Affects Versions: 1.18.0, 1.17.1
> Environment: Flink 1.17.1 & Flink 1.18.0 with 
> flink-connector-jdbc-3.1.1-1.17.jar
>Reporter: macdoor615
>Priority: Critical
>
> create table in flink with sql-client.sh
> {code:java}
> CREATE TABLE default_catalog.default_database.a (
>   ip string, 
>   proctime as proctime()
> ) 
> WITH (
>   'connector' = 'datagen'
> );{code}
> create table in mysql
> {code:java}
> create table b (
>   ip varchar(20), 
>   type int
> );  {code}
>  
> Flink 1.17.1/ 1.18.0 and *flink-connector-jdbc-3.1.1-1.17.jar*
> excute in sql-client.sh 
> {code:java}
> explain SELECT * FROM default_catalog.default_database.a left join 
> bnpmp_mysql_test.gem_tmp.b FOR SYSTEM_TIME AS OF a.proctime on b.type = 0 and 
> a.ip = b.ip; {code}
> get the execution plan
> {code:java}
> ...
> == Optimized Execution Plan ==
> Calc(select=[ip, PROCTIME_MATERIALIZE(proctime) AS proctime, ip0, type])
> +- LookupJoin(table=[bnpmp_mysql_test.gem_tmp.b], joinType=[LeftOuterJoin], 
> lookup=[ip=ip], select=[ip, proctime, ip, CAST(0 AS INTEGER) AS type, CAST(ip 
> AS VARCHAR(2147483647)) AS ip0])
>    +- Calc(select=[ip, PROCTIME() AS proctime])
>       +- TableSourceScan(table=[[default_catalog, default_database, a]], 
> fields=[ip]){code}
>  
> excute same sql in sql-client with Flink 1.17.1/ 1.18.0 and 
> *flink-connector-jdbc-3.0.0-1.16.jar*
> get the execution plan
> {code:java}
> ...
> == Optimized Execution Plan ==
> Calc(select=[ip, PROCTIME_MATERIALIZE(proctime) AS proctime, ip0, type])
> +- LookupJoin(table=[bnpmp_mysql_test.gem_tmp.b], joinType=[LeftOuterJoin], 
> lookup=[type=0, ip=ip], where=[(type = 0)], select=[ip, proctime, ip, CAST(0 
> AS INTEGER) AS type, CAST(ip AS VARCHAR(2147483647)) AS ip0])
>    +- Calc(select=[ip, PROCTIME() AS proctime])
>       +- TableSourceScan(table=[[default_catalog, default_database, a]], 
> fields=[ip]) {code}
> with flink-connector-jdbc-3.1.1-1.17.jar,  the condition is 
> *lookup=[ip=ip]*
> with flink-connector-jdbc-3.0.0-1.16.jar ,  the condition is 
> *lookup=[type=0, ip=ip], where=[(type = 0)]*
>  
> In out real world production environment, this lead incorrect data output
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33365) Missing filter condition in execution plan containing lookup join with mysql jdbc connector

2023-10-27 Thread macdoor615 (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33365?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17780274#comment-17780274
 ] 

macdoor615 commented on FLINK-33365:


[~davidradl] yes. I found that filter “on b.type = 0” has been ignored. When I 
am using mySQL jdbc as the lookup source connector

> Missing filter condition in execution plan containing lookup join with mysql 
> jdbc connector
> ---
>
> Key: FLINK-33365
> URL: https://issues.apache.org/jira/browse/FLINK-33365
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / JDBC
>Affects Versions: 1.18.0, 1.17.1
> Environment: Flink 1.17.1 & Flink 1.18.0 with 
> flink-connector-jdbc-3.1.1-1.17.jar
>Reporter: macdoor615
>Priority: Critical
>
> create table in flink with sql-client.sh
> {code:java}
> CREATE TABLE default_catalog.default_database.a (
>   ip string, 
>   proctime as proctime()
> ) 
> WITH (
>   'connector' = 'datagen'
> );{code}
> create table in mysql
> {code:java}
> create table b (
>   ip varchar(20), 
>   type int
> );  {code}
>  
> Flink 1.17.1/ 1.18.0 and *flink-connector-jdbc-3.1.1-1.17.jar*
> excute in sql-client.sh 
> {code:java}
> explain SELECT * FROM default_catalog.default_database.a left join 
> bnpmp_mysql_test.gem_tmp.b FOR SYSTEM_TIME AS OF a.proctime on b.type = 0 and 
> a.ip = b.ip; {code}
> get the execution plan
> {code:java}
> ...
> == Optimized Execution Plan ==
> Calc(select=[ip, PROCTIME_MATERIALIZE(proctime) AS proctime, ip0, type])
> +- LookupJoin(table=[bnpmp_mysql_test.gem_tmp.b], joinType=[LeftOuterJoin], 
> lookup=[ip=ip], select=[ip, proctime, ip, CAST(0 AS INTEGER) AS type, CAST(ip 
> AS VARCHAR(2147483647)) AS ip0])
>    +- Calc(select=[ip, PROCTIME() AS proctime])
>       +- TableSourceScan(table=[[default_catalog, default_database, a]], 
> fields=[ip]){code}
>  
> excute same sql in sql-client with Flink 1.17.1/ 1.18.0 and 
> *flink-connector-jdbc-3.0.0-1.16.jar*
> get the execution plan
> {code:java}
> ...
> == Optimized Execution Plan ==
> Calc(select=[ip, PROCTIME_MATERIALIZE(proctime) AS proctime, ip0, type])
> +- LookupJoin(table=[bnpmp_mysql_test.gem_tmp.b], joinType=[LeftOuterJoin], 
> lookup=[type=0, ip=ip], where=[(type = 0)], select=[ip, proctime, ip, CAST(0 
> AS INTEGER) AS type, CAST(ip AS VARCHAR(2147483647)) AS ip0])
>    +- Calc(select=[ip, PROCTIME() AS proctime])
>       +- TableSourceScan(table=[[default_catalog, default_database, a]], 
> fields=[ip]) {code}
> with flink-connector-jdbc-3.1.1-1.17.jar,  the condition is 
> *lookup=[ip=ip]*
> with flink-connector-jdbc-3.0.0-1.16.jar ,  the condition is 
> *lookup=[type=0, ip=ip], where=[(type = 0)]*
>  
> In out real world production environment, this lead incorrect data output
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33365) Missing filter condition in execution plan containing lookup join with mysql jdbc connector

2023-10-27 Thread david radley (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33365?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17780272#comment-17780272
 ] 

david radley commented on FLINK-33365:
--

found the discussion in the mailing list with an example. We have recreated 
this situation with a getindata (lookup) connector. So I am thinking this is 
not just a mySQL issue.    

> Missing filter condition in execution plan containing lookup join with mysql 
> jdbc connector
> ---
>
> Key: FLINK-33365
> URL: https://issues.apache.org/jira/browse/FLINK-33365
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / JDBC
>Affects Versions: 1.18.0, 1.17.1
> Environment: Flink 1.17.1 & Flink 1.18.0 with 
> flink-connector-jdbc-3.1.1-1.17.jar
>Reporter: macdoor615
>Priority: Critical
>
> create table in flink with sql-client.sh
> {code:java}
> CREATE TABLE default_catalog.default_database.a (
>   ip string, 
>   proctime as proctime()
> ) 
> WITH (
>   'connector' = 'datagen'
> );{code}
> create table in mysql
> {code:java}
> create table b (
>   ip varchar(20), 
>   type int
> );  {code}
>  
> Flink 1.17.1/ 1.18.0 and *flink-connector-jdbc-3.1.1-1.17.jar*
> excute in sql-client.sh 
> {code:java}
> explain SELECT * FROM default_catalog.default_database.a left join 
> bnpmp_mysql_test.gem_tmp.b FOR SYSTEM_TIME AS OF a.proctime on b.type = 0 and 
> a.ip = b.ip; {code}
> get the execution plan
> {code:java}
> ...
> == Optimized Execution Plan ==
> Calc(select=[ip, PROCTIME_MATERIALIZE(proctime) AS proctime, ip0, type])
> +- LookupJoin(table=[bnpmp_mysql_test.gem_tmp.b], joinType=[LeftOuterJoin], 
> lookup=[ip=ip], select=[ip, proctime, ip, CAST(0 AS INTEGER) AS type, CAST(ip 
> AS VARCHAR(2147483647)) AS ip0])
>    +- Calc(select=[ip, PROCTIME() AS proctime])
>       +- TableSourceScan(table=[[default_catalog, default_database, a]], 
> fields=[ip]){code}
>  
> excute same sql in sql-client with Flink 1.17.1/ 1.18.0 and 
> *flink-connector-jdbc-3.0.0-1.16.jar*
> get the execution plan
> {code:java}
> ...
> == Optimized Execution Plan ==
> Calc(select=[ip, PROCTIME_MATERIALIZE(proctime) AS proctime, ip0, type])
> +- LookupJoin(table=[bnpmp_mysql_test.gem_tmp.b], joinType=[LeftOuterJoin], 
> lookup=[type=0, ip=ip], where=[(type = 0)], select=[ip, proctime, ip, CAST(0 
> AS INTEGER) AS type, CAST(ip AS VARCHAR(2147483647)) AS ip0])
>    +- Calc(select=[ip, PROCTIME() AS proctime])
>       +- TableSourceScan(table=[[default_catalog, default_database, a]], 
> fields=[ip]) {code}
> with flink-connector-jdbc-3.1.1-1.17.jar,  the condition is 
> *lookup=[ip=ip]*
> with flink-connector-jdbc-3.0.0-1.16.jar ,  the condition is 
> *lookup=[type=0, ip=ip], where=[(type = 0)]*
>  
> In out real world production environment, this lead incorrect data output
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33365) Missing filter condition in execution plan containing lookup join with mysql jdbc connector

2023-10-27 Thread david radley (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33365?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17780259#comment-17780259
 ] 

david radley commented on FLINK-33365:
--

Or are the filters you are talking about part of the ON clause?

> Missing filter condition in execution plan containing lookup join with mysql 
> jdbc connector
> ---
>
> Key: FLINK-33365
> URL: https://issues.apache.org/jira/browse/FLINK-33365
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / JDBC
>Affects Versions: 1.18.0, 1.17.1
> Environment: Flink 1.17.1 & Flink 1.18.0 with 
> flink-connector-jdbc-3.1.1-1.17.jar
>Reporter: macdoor615
>Priority: Critical
>
> create table in flink with sql-client.sh
> {code:java}
> CREATE TABLE default_catalog.default_database.a (
>   ip string, 
>   proctime as proctime()
> ) 
> WITH (
>   'connector' = 'datagen'
> );{code}
> create table in mysql
> {code:java}
> create table b (
>   ip varchar(20), 
>   type int
> );  {code}
>  
> Flink 1.17.1/ 1.18.0 and *flink-connector-jdbc-3.1.1-1.17.jar*
> excute in sql-client.sh 
> {code:java}
> explain SELECT * FROM default_catalog.default_database.a left join 
> bnpmp_mysql_test.gem_tmp.b FOR SYSTEM_TIME AS OF a.proctime on b.type = 0 and 
> a.ip = b.ip; {code}
> get the execution plan
> {code:java}
> ...
> == Optimized Execution Plan ==
> Calc(select=[ip, PROCTIME_MATERIALIZE(proctime) AS proctime, ip0, type])
> +- LookupJoin(table=[bnpmp_mysql_test.gem_tmp.b], joinType=[LeftOuterJoin], 
> lookup=[ip=ip], select=[ip, proctime, ip, CAST(0 AS INTEGER) AS type, CAST(ip 
> AS VARCHAR(2147483647)) AS ip0])
>    +- Calc(select=[ip, PROCTIME() AS proctime])
>       +- TableSourceScan(table=[[default_catalog, default_database, a]], 
> fields=[ip]){code}
>  
> excute same sql in sql-client with Flink 1.17.1/ 1.18.0 and 
> *flink-connector-jdbc-3.0.0-1.16.jar*
> get the execution plan
> {code:java}
> ...
> == Optimized Execution Plan ==
> Calc(select=[ip, PROCTIME_MATERIALIZE(proctime) AS proctime, ip0, type])
> +- LookupJoin(table=[bnpmp_mysql_test.gem_tmp.b], joinType=[LeftOuterJoin], 
> lookup=[type=0, ip=ip], where=[(type = 0)], select=[ip, proctime, ip, CAST(0 
> AS INTEGER) AS type, CAST(ip AS VARCHAR(2147483647)) AS ip0])
>    +- Calc(select=[ip, PROCTIME() AS proctime])
>       +- TableSourceScan(table=[[default_catalog, default_database, a]], 
> fields=[ip]) {code}
> with flink-connector-jdbc-3.1.1-1.17.jar,  the condition is 
> *lookup=[ip=ip]*
> with flink-connector-jdbc-3.0.0-1.16.jar ,  the condition is 
> *lookup=[type=0, ip=ip], where=[(type = 0)]*
>  
> In out real world production environment, this lead incorrect data output
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33365) Missing filter condition in execution plan containing lookup join with mysql jdbc connector

2023-10-27 Thread david radley (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33365?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17780255#comment-17780255
 ] 

david radley commented on FLINK-33365:
--

Hi,

It would be really helpful if you could supply a test case (the tables and SQL 
an expected results) that shows this issue. 

 

I assume if we follow the example in 
[[https://nightlies.apache.org/flink/flink-table-store-docs-master/docs/development/lookup-join/](https://nightlies.apache.org/flink/flink-table-store-docs-master/docs/development/lookup-join/)]
 then you are using mySQL as the lookup source connector and you specify a 
WHERE clause that is being ignored. Am I understanding correctly?

 

 

 

 

> Missing filter condition in execution plan containing lookup join with mysql 
> jdbc connector
> ---
>
> Key: FLINK-33365
> URL: https://issues.apache.org/jira/browse/FLINK-33365
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / JDBC
>Affects Versions: 1.18.0, 1.17.1
> Environment: Flink 1.17.1 & Flink 1.18.0 with 
> flink-connector-jdbc-3.1.1-1.17.jar
>Reporter: macdoor615
>Priority: Critical
>
> create table in flink with sql-client.sh
> {code:java}
> CREATE TABLE default_catalog.default_database.a (
>   ip string, 
>   proctime as proctime()
> ) 
> WITH (
>   'connector' = 'datagen'
> );{code}
> create table in mysql
> {code:java}
> create table b (
>   ip varchar(20), 
>   type int
> );  {code}
>  
> Flink 1.17.1/ 1.18.0 and *flink-connector-jdbc-3.1.1-1.17.jar*
> excute in sql-client.sh 
> {code:java}
> explain SELECT * FROM default_catalog.default_database.a left join 
> bnpmp_mysql_test.gem_tmp.b FOR SYSTEM_TIME AS OF a.proctime on b.type = 0 and 
> a.ip = b.ip; {code}
> get the execution plan
> {code:java}
> ...
> == Optimized Execution Plan ==
> Calc(select=[ip, PROCTIME_MATERIALIZE(proctime) AS proctime, ip0, type])
> +- LookupJoin(table=[bnpmp_mysql_test.gem_tmp.b], joinType=[LeftOuterJoin], 
> lookup=[ip=ip], select=[ip, proctime, ip, CAST(0 AS INTEGER) AS type, CAST(ip 
> AS VARCHAR(2147483647)) AS ip0])
>    +- Calc(select=[ip, PROCTIME() AS proctime])
>       +- TableSourceScan(table=[[default_catalog, default_database, a]], 
> fields=[ip]){code}
>  
> excute same sql in sql-client with Flink 1.17.1/ 1.18.0 and 
> *flink-connector-jdbc-3.0.0-1.16.jar*
> get the execution plan
> {code:java}
> ...
> == Optimized Execution Plan ==
> Calc(select=[ip, PROCTIME_MATERIALIZE(proctime) AS proctime, ip0, type])
> +- LookupJoin(table=[bnpmp_mysql_test.gem_tmp.b], joinType=[LeftOuterJoin], 
> lookup=[type=0, ip=ip], where=[(type = 0)], select=[ip, proctime, ip, CAST(0 
> AS INTEGER) AS type, CAST(ip AS VARCHAR(2147483647)) AS ip0])
>    +- Calc(select=[ip, PROCTIME() AS proctime])
>       +- TableSourceScan(table=[[default_catalog, default_database, a]], 
> fields=[ip]) {code}
> with flink-connector-jdbc-3.1.1-1.17.jar,  the condition is 
> *lookup=[ip=ip]*
> with flink-connector-jdbc-3.0.0-1.16.jar ,  the condition is 
> *lookup=[type=0, ip=ip], where=[(type = 0)]*
>  
> In out real world production environment, this lead incorrect data output
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)