[jira] [Comment Edited] (FLINK-22955) lookup join filter push down result to mismatch function signature

2021-06-19 Thread Cooper Luan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22955?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17365856#comment-17365856
 ] 

Cooper Luan edited comment on FLINK-22955 at 6/19/21, 6:33 AM:
---

it's ok to implement LookupFunction like 
JdbcRowDataLookupFunction/JdbcDynamicTableSource

because LookupFunction accept eval(Object... keys)

 

AsyncLookupFunction do not accept eval(Object... keys), so we have to implement
{code:scala}
def eval(resultFuture: CompletableFuture[JCollection[RowData]], a: Integer)
def eval(resultFuture: CompletableFuture[JCollection[RowData]], a: Integer, b 
Integer)
def eval(resultFuture: CompletableFuture[JCollection[RowData]], a: Integer, b 
Integer, c String)
def eval(resultFuture: CompletableFuture[JCollection[RowData]], a: Integer, b 
String, c String)
{code}
*but we don't know user schema (in lookup db like redis/hbase), which means 
don't know how to implement eval function.* 

 

however, there is workaround, just change
{code:sql}
SELECT a,b,id,name
FROM v_vvv
WHERE age = 30
{code}
to
{code:sql}
SELECT a,b,id,name
FROM v_vvv
WHERE cast(age as bigint) = 30
{code}
 so the optimizer won't push down filter

 

[~qingru zhang] 


was (Author: gsavl):
it's ok to implement LookupFunction like JdbcDynamicTableSource

because LookupFunction accept eval(Object... keys)

 

AsyncLookupFunction do not accept eval(Object... keys), so we have to implement
{code:scala}
def eval(resultFuture: CompletableFuture[JCollection[RowData]], a: Integer)
def eval(resultFuture: CompletableFuture[JCollection[RowData]], a: Integer, b 
Integer)
def eval(resultFuture: CompletableFuture[JCollection[RowData]], a: Integer, b 
Integer, c String)
def eval(resultFuture: CompletableFuture[JCollection[RowData]], a: Integer, b 
String, c String)
{code}
*but we don't know user schema (in lookup db like redis/hbase), which means 
don't know how to implement eval function.* 

 

however, there is workaround, just change
{code:sql}
SELECT a,b,id,name
FROM v_vvv
WHERE age = 30
{code}
to
{code:sql}
SELECT a,b,id,name
FROM v_vvv
WHERE cast(age as bigint) = 30
{code}
 so the optimizer won't push down filter

 

[~qingru zhang] 

> lookup join filter push down result to mismatch function signature
> --
>
> Key: FLINK-22955
> URL: https://issues.apache.org/jira/browse/FLINK-22955
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.11.3, 1.13.1, 1.12.4
> Environment: Flink 1.13.1
> how to reproduce: patch file attached
>Reporter: Cooper Luan
>Priority: Critical
> Fix For: 1.11.4, 1.12.5, 1.13.2
>
> Attachments: 
> 0001-try-to-produce-lookup-join-filter-pushdown-expensive.patch
>
>
> a sql like this may result to look function signature mismatch exception when 
> explain sql
> {code:sql}
> CREATE TEMPORARY VIEW v_vvv AS
> SELECT * FROM MyTable AS T
> JOIN LookupTableAsync1 FOR SYSTEM_TIME AS OF T.proctime AS D
> ON T.a = D.id;
> SELECT a,b,id,name
> FROM v_vvv
> WHERE age = 10;{code}
> the lookup function is
> {code:scala}
> class AsyncTableFunction1 extends AsyncTableFunction[RowData] {
>   def eval(resultFuture: CompletableFuture[JCollection[RowData]], a: 
> Integer): Unit = {
>   }
> }{code}
> exec plan is
> {code:java}
> LegacySink(name=[`default_catalog`.`default_database`.`appendSink1`], 
> fields=[a, b, id, name])
> +- LookupJoin(table=[default_catalog.default_database.LookupTableAsync1], 
> joinType=[InnerJoin], async=[true], lookup=[age=10, id=a], where=[(age = 
> 10)], select=[a, b, id, name])
>+- Calc(select=[a, b])
>   +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], 
> fields=[a, b, c, proctime, rowtime])
> {code}
> the "lookup=[age=10, id=a]" result to mismatch signature mismatch
>  
> but if I add 1 more insert, it works well
> {code:sql}
> SELECT a,b,id,name
> FROM v_vvv
> WHERE age = 30
> {code}
> exec plan is
> {code:java}
> == Optimized Execution Plan ==
> LookupJoin(table=[default_catalog.default_database.LookupTableAsync1], 
> joinType=[InnerJoin], async=[true], lookup=[id=a], select=[a, b, c, proctime, 
> rowtime, id, name, age, ts])(reuse_id=[1])
> +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], 
> fields=[a, b, c, proctime, 
> rowtime])LegacySink(name=[`default_catalog`.`default_database`.`appendSink1`],
>  fields=[a, b, id, name])
> +- Calc(select=[a, b, id, name], where=[(age = 10)])
>+- 
> Reused(reference_id=[1])LegacySink(name=[`default_catalog`.`default_database`.`appendSink2`],
>  fields=[a, b, id, name])
> +- Calc(select=[a, b, id, name], where=[(age = 30)])
>+- Reused(reference_id=[1])
> {code}
>  the LookupJoin node use "lookup=[id=a]"(right) not "lookup=[age=10, id=a]" 
> (wrong)
>  
> so, in "multi insert" case, 

[jira] [Comment Edited] (FLINK-22955) lookup join filter push down result to mismatch function signature

2021-06-19 Thread Cooper Luan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22955?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17365856#comment-17365856
 ] 

Cooper Luan edited comment on FLINK-22955 at 6/19/21, 6:31 AM:
---

it's ok to implement LookupFunction like JdbcDynamicTableSource

because LookupFunction accept eval(Object... keys)

 

AsyncLookupFunction do not accept eval(Object... keys), so we have to implement
{code:scala}
def eval(resultFuture: CompletableFuture[JCollection[RowData]], a: Integer)
def eval(resultFuture: CompletableFuture[JCollection[RowData]], a: Integer, b 
Integer)
def eval(resultFuture: CompletableFuture[JCollection[RowData]], a: Integer, b 
Integer, c String)
def eval(resultFuture: CompletableFuture[JCollection[RowData]], a: Integer, b 
String, c String)
{code}
*but we don't know user schema (in lookup db like redis/hbase), which means 
don't know how to implement eval function.* 

 

however, there is workaround, just change
{code:sql}
SELECT a,b,id,name
FROM v_vvv
WHERE age = 30
{code}
to
{code:sql}
SELECT a,b,id,name
FROM v_vvv
WHERE cast(age as bigint) = 30
{code}
 so the optimizer won't push down filter

 

[~qingru zhang] 


was (Author: gsavl):
it's ok to implement LookupFunction like JdbcDynamicTableSource

because LookupFunction accept eval(Object... keys)

 

for AsyncLookupFunction we have to implement
{code:scala}
def eval(resultFuture: CompletableFuture[JCollection[RowData]], a: Integer)
def eval(resultFuture: CompletableFuture[JCollection[RowData]], a: Integer, b 
Integer)
def eval(resultFuture: CompletableFuture[JCollection[RowData]], a: Integer, b 
Integer, c String)
def eval(resultFuture: CompletableFuture[JCollection[RowData]], a: Integer, b 
String, c String)
{code}
*but we don't know user schema (in lookup db like redis/hbase), which means 
don't know how to implement eval function.* 

 

however, there is workaround, just change
{code:sql}
SELECT a,b,id,name
FROM v_vvv
WHERE age = 30
{code}
to
{code:sql}
SELECT a,b,id,name
FROM v_vvv
WHERE cast(age as bigint) = 30
{code}
 so the optimizer won't push down filter

 

[~qingru zhang] 

> lookup join filter push down result to mismatch function signature
> --
>
> Key: FLINK-22955
> URL: https://issues.apache.org/jira/browse/FLINK-22955
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.11.3, 1.13.1, 1.12.4
> Environment: Flink 1.13.1
> how to reproduce: patch file attached
>Reporter: Cooper Luan
>Priority: Critical
> Fix For: 1.11.4, 1.12.5, 1.13.2
>
> Attachments: 
> 0001-try-to-produce-lookup-join-filter-pushdown-expensive.patch
>
>
> a sql like this may result to look function signature mismatch exception when 
> explain sql
> {code:sql}
> CREATE TEMPORARY VIEW v_vvv AS
> SELECT * FROM MyTable AS T
> JOIN LookupTableAsync1 FOR SYSTEM_TIME AS OF T.proctime AS D
> ON T.a = D.id;
> SELECT a,b,id,name
> FROM v_vvv
> WHERE age = 10;{code}
> the lookup function is
> {code:scala}
> class AsyncTableFunction1 extends AsyncTableFunction[RowData] {
>   def eval(resultFuture: CompletableFuture[JCollection[RowData]], a: 
> Integer): Unit = {
>   }
> }{code}
> exec plan is
> {code:java}
> LegacySink(name=[`default_catalog`.`default_database`.`appendSink1`], 
> fields=[a, b, id, name])
> +- LookupJoin(table=[default_catalog.default_database.LookupTableAsync1], 
> joinType=[InnerJoin], async=[true], lookup=[age=10, id=a], where=[(age = 
> 10)], select=[a, b, id, name])
>+- Calc(select=[a, b])
>   +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], 
> fields=[a, b, c, proctime, rowtime])
> {code}
> the "lookup=[age=10, id=a]" result to mismatch signature mismatch
>  
> but if I add 1 more insert, it works well
> {code:sql}
> SELECT a,b,id,name
> FROM v_vvv
> WHERE age = 30
> {code}
> exec plan is
> {code:java}
> == Optimized Execution Plan ==
> LookupJoin(table=[default_catalog.default_database.LookupTableAsync1], 
> joinType=[InnerJoin], async=[true], lookup=[id=a], select=[a, b, c, proctime, 
> rowtime, id, name, age, ts])(reuse_id=[1])
> +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], 
> fields=[a, b, c, proctime, 
> rowtime])LegacySink(name=[`default_catalog`.`default_database`.`appendSink1`],
>  fields=[a, b, id, name])
> +- Calc(select=[a, b, id, name], where=[(age = 10)])
>+- 
> Reused(reference_id=[1])LegacySink(name=[`default_catalog`.`default_database`.`appendSink2`],
>  fields=[a, b, id, name])
> +- Calc(select=[a, b, id, name], where=[(age = 30)])
>+- Reused(reference_id=[1])
> {code}
>  the LookupJoin node use "lookup=[id=a]"(right) not "lookup=[age=10, id=a]" 
> (wrong)
>  
> so, in "multi insert" case, planner works great
> in "single insert" case, planner throw 

[jira] [Commented] (FLINK-22955) lookup join filter push down result to mismatch function signature

2021-06-19 Thread Cooper Luan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22955?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17365856#comment-17365856
 ] 

Cooper Luan commented on FLINK-22955:
-

it's ok to implement LookupFunction like JdbcDynamicTableSource

because LookupFunction accept eval(Object... keys)

 

for AsyncLookupFunction we have to implement
{code:scala}
def eval(resultFuture: CompletableFuture[JCollection[RowData]], a: Integer)
def eval(resultFuture: CompletableFuture[JCollection[RowData]], a: Integer, b 
Integer)
def eval(resultFuture: CompletableFuture[JCollection[RowData]], a: Integer, b 
Integer, c String)
def eval(resultFuture: CompletableFuture[JCollection[RowData]], a: Integer, b 
String, c String)
{code}
*but we don't know schema (in lookup db like redis/hbase), which means don't 
know how to implement eval function.* 

 

however, there is workaround, just change
{code:sql}
SELECT a,b,id,name
FROM v_vvv
WHERE age = 30
{code}
to
{code:sql}
SELECT a,b,id,name
FROM v_vvv
WHERE cast(age as bigint) = 30
{code}
 so the optimizer won't push down filter

 

[~qingru zhang] 

> lookup join filter push down result to mismatch function signature
> --
>
> Key: FLINK-22955
> URL: https://issues.apache.org/jira/browse/FLINK-22955
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.11.3, 1.13.1, 1.12.4
> Environment: Flink 1.13.1
> how to reproduce: patch file attached
>Reporter: Cooper Luan
>Priority: Critical
> Fix For: 1.11.4, 1.12.5, 1.13.2
>
> Attachments: 
> 0001-try-to-produce-lookup-join-filter-pushdown-expensive.patch
>
>
> a sql like this may result to look function signature mismatch exception when 
> explain sql
> {code:sql}
> CREATE TEMPORARY VIEW v_vvv AS
> SELECT * FROM MyTable AS T
> JOIN LookupTableAsync1 FOR SYSTEM_TIME AS OF T.proctime AS D
> ON T.a = D.id;
> SELECT a,b,id,name
> FROM v_vvv
> WHERE age = 10;{code}
> the lookup function is
> {code:scala}
> class AsyncTableFunction1 extends AsyncTableFunction[RowData] {
>   def eval(resultFuture: CompletableFuture[JCollection[RowData]], a: 
> Integer): Unit = {
>   }
> }{code}
> exec plan is
> {code:java}
> LegacySink(name=[`default_catalog`.`default_database`.`appendSink1`], 
> fields=[a, b, id, name])
> +- LookupJoin(table=[default_catalog.default_database.LookupTableAsync1], 
> joinType=[InnerJoin], async=[true], lookup=[age=10, id=a], where=[(age = 
> 10)], select=[a, b, id, name])
>+- Calc(select=[a, b])
>   +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], 
> fields=[a, b, c, proctime, rowtime])
> {code}
> the "lookup=[age=10, id=a]" result to mismatch signature mismatch
>  
> but if I add 1 more insert, it works well
> {code:sql}
> SELECT a,b,id,name
> FROM v_vvv
> WHERE age = 30
> {code}
> exec plan is
> {code:java}
> == Optimized Execution Plan ==
> LookupJoin(table=[default_catalog.default_database.LookupTableAsync1], 
> joinType=[InnerJoin], async=[true], lookup=[id=a], select=[a, b, c, proctime, 
> rowtime, id, name, age, ts])(reuse_id=[1])
> +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], 
> fields=[a, b, c, proctime, 
> rowtime])LegacySink(name=[`default_catalog`.`default_database`.`appendSink1`],
>  fields=[a, b, id, name])
> +- Calc(select=[a, b, id, name], where=[(age = 10)])
>+- 
> Reused(reference_id=[1])LegacySink(name=[`default_catalog`.`default_database`.`appendSink2`],
>  fields=[a, b, id, name])
> +- Calc(select=[a, b, id, name], where=[(age = 30)])
>+- Reused(reference_id=[1])
> {code}
>  the LookupJoin node use "lookup=[id=a]"(right) not "lookup=[age=10, id=a]" 
> (wrong)
>  
> so, in "multi insert" case, planner works great
> in "single insert" case, planner throw exception



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-22955) lookup join filter push down result to mismatch function signature

2021-06-19 Thread Cooper Luan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22955?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17365856#comment-17365856
 ] 

Cooper Luan edited comment on FLINK-22955 at 6/19/21, 6:30 AM:
---

it's ok to implement LookupFunction like JdbcDynamicTableSource

because LookupFunction accept eval(Object... keys)

 

for AsyncLookupFunction we have to implement
{code:scala}
def eval(resultFuture: CompletableFuture[JCollection[RowData]], a: Integer)
def eval(resultFuture: CompletableFuture[JCollection[RowData]], a: Integer, b 
Integer)
def eval(resultFuture: CompletableFuture[JCollection[RowData]], a: Integer, b 
Integer, c String)
def eval(resultFuture: CompletableFuture[JCollection[RowData]], a: Integer, b 
String, c String)
{code}
*but we don't know user schema (in lookup db like redis/hbase), which means 
don't know how to implement eval function.* 

 

however, there is workaround, just change
{code:sql}
SELECT a,b,id,name
FROM v_vvv
WHERE age = 30
{code}
to
{code:sql}
SELECT a,b,id,name
FROM v_vvv
WHERE cast(age as bigint) = 30
{code}
 so the optimizer won't push down filter

 

[~qingru zhang] 


was (Author: gsavl):
it's ok to implement LookupFunction like JdbcDynamicTableSource

because LookupFunction accept eval(Object... keys)

 

for AsyncLookupFunction we have to implement
{code:scala}
def eval(resultFuture: CompletableFuture[JCollection[RowData]], a: Integer)
def eval(resultFuture: CompletableFuture[JCollection[RowData]], a: Integer, b 
Integer)
def eval(resultFuture: CompletableFuture[JCollection[RowData]], a: Integer, b 
Integer, c String)
def eval(resultFuture: CompletableFuture[JCollection[RowData]], a: Integer, b 
String, c String)
{code}
*but we don't know schema (in lookup db like redis/hbase), which means don't 
know how to implement eval function.* 

 

however, there is workaround, just change
{code:sql}
SELECT a,b,id,name
FROM v_vvv
WHERE age = 30
{code}
to
{code:sql}
SELECT a,b,id,name
FROM v_vvv
WHERE cast(age as bigint) = 30
{code}
 so the optimizer won't push down filter

 

[~qingru zhang] 

> lookup join filter push down result to mismatch function signature
> --
>
> Key: FLINK-22955
> URL: https://issues.apache.org/jira/browse/FLINK-22955
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.11.3, 1.13.1, 1.12.4
> Environment: Flink 1.13.1
> how to reproduce: patch file attached
>Reporter: Cooper Luan
>Priority: Critical
> Fix For: 1.11.4, 1.12.5, 1.13.2
>
> Attachments: 
> 0001-try-to-produce-lookup-join-filter-pushdown-expensive.patch
>
>
> a sql like this may result to look function signature mismatch exception when 
> explain sql
> {code:sql}
> CREATE TEMPORARY VIEW v_vvv AS
> SELECT * FROM MyTable AS T
> JOIN LookupTableAsync1 FOR SYSTEM_TIME AS OF T.proctime AS D
> ON T.a = D.id;
> SELECT a,b,id,name
> FROM v_vvv
> WHERE age = 10;{code}
> the lookup function is
> {code:scala}
> class AsyncTableFunction1 extends AsyncTableFunction[RowData] {
>   def eval(resultFuture: CompletableFuture[JCollection[RowData]], a: 
> Integer): Unit = {
>   }
> }{code}
> exec plan is
> {code:java}
> LegacySink(name=[`default_catalog`.`default_database`.`appendSink1`], 
> fields=[a, b, id, name])
> +- LookupJoin(table=[default_catalog.default_database.LookupTableAsync1], 
> joinType=[InnerJoin], async=[true], lookup=[age=10, id=a], where=[(age = 
> 10)], select=[a, b, id, name])
>+- Calc(select=[a, b])
>   +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], 
> fields=[a, b, c, proctime, rowtime])
> {code}
> the "lookup=[age=10, id=a]" result to mismatch signature mismatch
>  
> but if I add 1 more insert, it works well
> {code:sql}
> SELECT a,b,id,name
> FROM v_vvv
> WHERE age = 30
> {code}
> exec plan is
> {code:java}
> == Optimized Execution Plan ==
> LookupJoin(table=[default_catalog.default_database.LookupTableAsync1], 
> joinType=[InnerJoin], async=[true], lookup=[id=a], select=[a, b, c, proctime, 
> rowtime, id, name, age, ts])(reuse_id=[1])
> +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], 
> fields=[a, b, c, proctime, 
> rowtime])LegacySink(name=[`default_catalog`.`default_database`.`appendSink1`],
>  fields=[a, b, id, name])
> +- Calc(select=[a, b, id, name], where=[(age = 10)])
>+- 
> Reused(reference_id=[1])LegacySink(name=[`default_catalog`.`default_database`.`appendSink2`],
>  fields=[a, b, id, name])
> +- Calc(select=[a, b, id, name], where=[(age = 30)])
>+- Reused(reference_id=[1])
> {code}
>  the LookupJoin node use "lookup=[id=a]"(right) not "lookup=[age=10, id=a]" 
> (wrong)
>  
> so, in "multi insert" case, planner works great
> in "single insert" case, planner throw exception



--
This message was sent by 

[jira] [Comment Edited] (FLINK-22955) lookup join filter push down result to mismatch function signature

2021-06-18 Thread Cooper Luan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22955?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17365382#comment-17365382
 ] 

Cooper Luan edited comment on FLINK-22955 at 6/18/21, 9:16 AM:
---

actually it's not possible for us to implement more eval method(it's not a once 
for all work)

is there a rule in  PHYSICAL_OPT_RULES I can disable (as a workaround)?

so the filer won't push down to lookup table

[~qingru zhang]


was (Author: gsavl):
actually it's not possible for us to implement more eval method

is there a rule in  PHYSICAL_OPT_RULES I can disable (as a workaround)?

so the filer won't push down to lookup table

[~qingru zhang]

> lookup join filter push down result to mismatch function signature
> --
>
> Key: FLINK-22955
> URL: https://issues.apache.org/jira/browse/FLINK-22955
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.11.3, 1.13.1, 1.12.4
> Environment: Flink 1.13.1
> how to reproduce: patch file attached
>Reporter: Cooper Luan
>Priority: Critical
> Fix For: 1.11.4, 1.12.5, 1.13.2
>
> Attachments: 
> 0001-try-to-produce-lookup-join-filter-pushdown-expensive.patch
>
>
> a sql like this may result to look function signature mismatch exception when 
> explain sql
> {code:sql}
> CREATE TEMPORARY VIEW v_vvv AS
> SELECT * FROM MyTable AS T
> JOIN LookupTableAsync1 FOR SYSTEM_TIME AS OF T.proctime AS D
> ON T.a = D.id;
> SELECT a,b,id,name
> FROM v_vvv
> WHERE age = 10;{code}
> the lookup function is
> {code:scala}
> class AsyncTableFunction1 extends AsyncTableFunction[RowData] {
>   def eval(resultFuture: CompletableFuture[JCollection[RowData]], a: 
> Integer): Unit = {
>   }
> }{code}
> exec plan is
> {code:java}
> LegacySink(name=[`default_catalog`.`default_database`.`appendSink1`], 
> fields=[a, b, id, name])
> +- LookupJoin(table=[default_catalog.default_database.LookupTableAsync1], 
> joinType=[InnerJoin], async=[true], lookup=[age=10, id=a], where=[(age = 
> 10)], select=[a, b, id, name])
>+- Calc(select=[a, b])
>   +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], 
> fields=[a, b, c, proctime, rowtime])
> {code}
> the "lookup=[age=10, id=a]" result to mismatch signature mismatch
>  
> but if I add 1 more insert, it works well
> {code:sql}
> SELECT a,b,id,name
> FROM v_vvv
> WHERE age = 30
> {code}
> exec plan is
> {code:java}
> == Optimized Execution Plan ==
> LookupJoin(table=[default_catalog.default_database.LookupTableAsync1], 
> joinType=[InnerJoin], async=[true], lookup=[id=a], select=[a, b, c, proctime, 
> rowtime, id, name, age, ts])(reuse_id=[1])
> +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], 
> fields=[a, b, c, proctime, 
> rowtime])LegacySink(name=[`default_catalog`.`default_database`.`appendSink1`],
>  fields=[a, b, id, name])
> +- Calc(select=[a, b, id, name], where=[(age = 10)])
>+- 
> Reused(reference_id=[1])LegacySink(name=[`default_catalog`.`default_database`.`appendSink2`],
>  fields=[a, b, id, name])
> +- Calc(select=[a, b, id, name], where=[(age = 30)])
>+- Reused(reference_id=[1])
> {code}
>  the LookupJoin node use "lookup=[id=a]"(right) not "lookup=[age=10, id=a]" 
> (wrong)
>  
> so, in "multi insert" case, planner works great
> in "single insert" case, planner throw exception



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-22955) lookup join filter push down result to mismatch function signature

2021-06-18 Thread Cooper Luan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22955?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17365382#comment-17365382
 ] 

Cooper Luan edited comment on FLINK-22955 at 6/18/21, 9:10 AM:
---

actually it's not possible for us to implement more eval method

is there a rule in  PHYSICAL_OPT_RULES I can disable (as a workaround)?

so the filer won't push down to lookup table

[~qingru zhang]


was (Author: gsavl):
actually it's not possible for us to implement more eval method

is there a rule in  PHYSICAL_OPT_RULES I can disable?

so the filer won't push down to lookup table

[~qingru zhang]

> lookup join filter push down result to mismatch function signature
> --
>
> Key: FLINK-22955
> URL: https://issues.apache.org/jira/browse/FLINK-22955
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.11.3, 1.13.1, 1.12.4
> Environment: Flink 1.13.1
> how to reproduce: patch file attached
>Reporter: Cooper Luan
>Priority: Critical
> Fix For: 1.11.4, 1.12.5, 1.13.2
>
> Attachments: 
> 0001-try-to-produce-lookup-join-filter-pushdown-expensive.patch
>
>
> a sql like this may result to look function signature mismatch exception when 
> explain sql
> {code:sql}
> CREATE TEMPORARY VIEW v_vvv AS
> SELECT * FROM MyTable AS T
> JOIN LookupTableAsync1 FOR SYSTEM_TIME AS OF T.proctime AS D
> ON T.a = D.id;
> SELECT a,b,id,name
> FROM v_vvv
> WHERE age = 10;{code}
> the lookup function is
> {code:scala}
> class AsyncTableFunction1 extends AsyncTableFunction[RowData] {
>   def eval(resultFuture: CompletableFuture[JCollection[RowData]], a: 
> Integer): Unit = {
>   }
> }{code}
> exec plan is
> {code:java}
> LegacySink(name=[`default_catalog`.`default_database`.`appendSink1`], 
> fields=[a, b, id, name])
> +- LookupJoin(table=[default_catalog.default_database.LookupTableAsync1], 
> joinType=[InnerJoin], async=[true], lookup=[age=10, id=a], where=[(age = 
> 10)], select=[a, b, id, name])
>+- Calc(select=[a, b])
>   +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], 
> fields=[a, b, c, proctime, rowtime])
> {code}
> the "lookup=[age=10, id=a]" result to mismatch signature mismatch
>  
> but if I add 1 more insert, it works well
> {code:sql}
> SELECT a,b,id,name
> FROM v_vvv
> WHERE age = 30
> {code}
> exec plan is
> {code:java}
> == Optimized Execution Plan ==
> LookupJoin(table=[default_catalog.default_database.LookupTableAsync1], 
> joinType=[InnerJoin], async=[true], lookup=[id=a], select=[a, b, c, proctime, 
> rowtime, id, name, age, ts])(reuse_id=[1])
> +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], 
> fields=[a, b, c, proctime, 
> rowtime])LegacySink(name=[`default_catalog`.`default_database`.`appendSink1`],
>  fields=[a, b, id, name])
> +- Calc(select=[a, b, id, name], where=[(age = 10)])
>+- 
> Reused(reference_id=[1])LegacySink(name=[`default_catalog`.`default_database`.`appendSink2`],
>  fields=[a, b, id, name])
> +- Calc(select=[a, b, id, name], where=[(age = 30)])
>+- Reused(reference_id=[1])
> {code}
>  the LookupJoin node use "lookup=[id=a]"(right) not "lookup=[age=10, id=a]" 
> (wrong)
>  
> so, in "multi insert" case, planner works great
> in "single insert" case, planner throw exception



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-22955) lookup join filter push down result to mismatch function signature

2021-06-18 Thread Cooper Luan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22955?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17365382#comment-17365382
 ] 

Cooper Luan commented on FLINK-22955:
-

actually it's not possible for us to implement more eval method

is there a rule in  PHYSICAL_OPT_RULES I can disable?

so the filer won't push down to lookup table

> lookup join filter push down result to mismatch function signature
> --
>
> Key: FLINK-22955
> URL: https://issues.apache.org/jira/browse/FLINK-22955
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.11.3, 1.13.1, 1.12.4
> Environment: Flink 1.13.1
> how to reproduce: patch file attached
>Reporter: Cooper Luan
>Priority: Critical
> Fix For: 1.11.4, 1.12.5, 1.13.2
>
> Attachments: 
> 0001-try-to-produce-lookup-join-filter-pushdown-expensive.patch
>
>
> a sql like this may result to look function signature mismatch exception when 
> explain sql
> {code:sql}
> CREATE TEMPORARY VIEW v_vvv AS
> SELECT * FROM MyTable AS T
> JOIN LookupTableAsync1 FOR SYSTEM_TIME AS OF T.proctime AS D
> ON T.a = D.id;
> SELECT a,b,id,name
> FROM v_vvv
> WHERE age = 10;{code}
> the lookup function is
> {code:scala}
> class AsyncTableFunction1 extends AsyncTableFunction[RowData] {
>   def eval(resultFuture: CompletableFuture[JCollection[RowData]], a: 
> Integer): Unit = {
>   }
> }{code}
> exec plan is
> {code:java}
> LegacySink(name=[`default_catalog`.`default_database`.`appendSink1`], 
> fields=[a, b, id, name])
> +- LookupJoin(table=[default_catalog.default_database.LookupTableAsync1], 
> joinType=[InnerJoin], async=[true], lookup=[age=10, id=a], where=[(age = 
> 10)], select=[a, b, id, name])
>+- Calc(select=[a, b])
>   +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], 
> fields=[a, b, c, proctime, rowtime])
> {code}
> the "lookup=[age=10, id=a]" result to mismatch signature mismatch
>  
> but if I add 1 more insert, it works well
> {code:sql}
> SELECT a,b,id,name
> FROM v_vvv
> WHERE age = 30
> {code}
> exec plan is
> {code:java}
> == Optimized Execution Plan ==
> LookupJoin(table=[default_catalog.default_database.LookupTableAsync1], 
> joinType=[InnerJoin], async=[true], lookup=[id=a], select=[a, b, c, proctime, 
> rowtime, id, name, age, ts])(reuse_id=[1])
> +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], 
> fields=[a, b, c, proctime, 
> rowtime])LegacySink(name=[`default_catalog`.`default_database`.`appendSink1`],
>  fields=[a, b, id, name])
> +- Calc(select=[a, b, id, name], where=[(age = 10)])
>+- 
> Reused(reference_id=[1])LegacySink(name=[`default_catalog`.`default_database`.`appendSink2`],
>  fields=[a, b, id, name])
> +- Calc(select=[a, b, id, name], where=[(age = 30)])
>+- Reused(reference_id=[1])
> {code}
>  the LookupJoin node use "lookup=[id=a]"(right) not "lookup=[age=10, id=a]" 
> (wrong)
>  
> so, in "multi insert" case, planner works great
> in "single insert" case, planner throw exception



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-22955) lookup join filter push down result to mismatch function signature

2021-06-18 Thread Cooper Luan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22955?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17365382#comment-17365382
 ] 

Cooper Luan edited comment on FLINK-22955 at 6/18/21, 9:05 AM:
---

actually it's not possible for us to implement more eval method

is there a rule in  PHYSICAL_OPT_RULES I can disable?

so the filer won't push down to lookup table

[~qingru zhang]


was (Author: gsavl):
actually it's not possible for us to implement more eval method

is there a rule in  PHYSICAL_OPT_RULES I can disable?

so the filer won't push down to lookup table

> lookup join filter push down result to mismatch function signature
> --
>
> Key: FLINK-22955
> URL: https://issues.apache.org/jira/browse/FLINK-22955
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.11.3, 1.13.1, 1.12.4
> Environment: Flink 1.13.1
> how to reproduce: patch file attached
>Reporter: Cooper Luan
>Priority: Critical
> Fix For: 1.11.4, 1.12.5, 1.13.2
>
> Attachments: 
> 0001-try-to-produce-lookup-join-filter-pushdown-expensive.patch
>
>
> a sql like this may result to look function signature mismatch exception when 
> explain sql
> {code:sql}
> CREATE TEMPORARY VIEW v_vvv AS
> SELECT * FROM MyTable AS T
> JOIN LookupTableAsync1 FOR SYSTEM_TIME AS OF T.proctime AS D
> ON T.a = D.id;
> SELECT a,b,id,name
> FROM v_vvv
> WHERE age = 10;{code}
> the lookup function is
> {code:scala}
> class AsyncTableFunction1 extends AsyncTableFunction[RowData] {
>   def eval(resultFuture: CompletableFuture[JCollection[RowData]], a: 
> Integer): Unit = {
>   }
> }{code}
> exec plan is
> {code:java}
> LegacySink(name=[`default_catalog`.`default_database`.`appendSink1`], 
> fields=[a, b, id, name])
> +- LookupJoin(table=[default_catalog.default_database.LookupTableAsync1], 
> joinType=[InnerJoin], async=[true], lookup=[age=10, id=a], where=[(age = 
> 10)], select=[a, b, id, name])
>+- Calc(select=[a, b])
>   +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], 
> fields=[a, b, c, proctime, rowtime])
> {code}
> the "lookup=[age=10, id=a]" result to mismatch signature mismatch
>  
> but if I add 1 more insert, it works well
> {code:sql}
> SELECT a,b,id,name
> FROM v_vvv
> WHERE age = 30
> {code}
> exec plan is
> {code:java}
> == Optimized Execution Plan ==
> LookupJoin(table=[default_catalog.default_database.LookupTableAsync1], 
> joinType=[InnerJoin], async=[true], lookup=[id=a], select=[a, b, c, proctime, 
> rowtime, id, name, age, ts])(reuse_id=[1])
> +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], 
> fields=[a, b, c, proctime, 
> rowtime])LegacySink(name=[`default_catalog`.`default_database`.`appendSink1`],
>  fields=[a, b, id, name])
> +- Calc(select=[a, b, id, name], where=[(age = 10)])
>+- 
> Reused(reference_id=[1])LegacySink(name=[`default_catalog`.`default_database`.`appendSink2`],
>  fields=[a, b, id, name])
> +- Calc(select=[a, b, id, name], where=[(age = 30)])
>+- Reused(reference_id=[1])
> {code}
>  the LookupJoin node use "lookup=[id=a]"(right) not "lookup=[age=10, id=a]" 
> (wrong)
>  
> so, in "multi insert" case, planner works great
> in "single insert" case, planner throw exception



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-22955) lookup join filter push down result to mismatch function signature

2021-06-09 Thread Cooper Luan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22955?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Cooper Luan updated FLINK-22955:

Fix Version/s: 1.13.2

> lookup join filter push down result to mismatch function signature
> --
>
> Key: FLINK-22955
> URL: https://issues.apache.org/jira/browse/FLINK-22955
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.11.3, 1.13.1, 1.12.4
> Environment: Flink 1.13.1
> how to reproduce: patch file attached
>Reporter: Cooper Luan
>Priority: Critical
> Fix For: 1.11.4, 1.12.5, 1.13.2
>
> Attachments: 
> 0001-try-to-produce-lookup-join-filter-pushdown-expensive.patch
>
>
> a sql like this may result to look function signature mismatch exception when 
> explain sql
> {code:sql}
> CREATE TEMPORARY VIEW v_vvv AS
> SELECT * FROM MyTable AS T
> JOIN LookupTableAsync1 FOR SYSTEM_TIME AS OF T.proctime AS D
> ON T.a = D.id;
> SELECT a,b,id,name
> FROM v_vvv
> WHERE age = 10;{code}
> the lookup function is
> {code:scala}
> class AsyncTableFunction1 extends AsyncTableFunction[RowData] {
>   def eval(resultFuture: CompletableFuture[JCollection[RowData]], a: 
> Integer): Unit = {
>   }
> }{code}
> exec plan is
> {code:java}
> LegacySink(name=[`default_catalog`.`default_database`.`appendSink1`], 
> fields=[a, b, id, name])
> +- LookupJoin(table=[default_catalog.default_database.LookupTableAsync1], 
> joinType=[InnerJoin], async=[true], lookup=[age=10, id=a], where=[(age = 
> 10)], select=[a, b, id, name])
>+- Calc(select=[a, b])
>   +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], 
> fields=[a, b, c, proctime, rowtime])
> {code}
> the "lookup=[age=10, id=a]" result to mismatch signature mismatch
>  
> but if I add 1 more insert, it works well
> {code:sql}
> SELECT a,b,id,name
> FROM v_vvv
> WHERE age = 30
> {code}
> exec plan is
> {code:java}
> == Optimized Execution Plan ==
> LookupJoin(table=[default_catalog.default_database.LookupTableAsync1], 
> joinType=[InnerJoin], async=[true], lookup=[id=a], select=[a, b, c, proctime, 
> rowtime, id, name, age, ts])(reuse_id=[1])
> +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], 
> fields=[a, b, c, proctime, 
> rowtime])LegacySink(name=[`default_catalog`.`default_database`.`appendSink1`],
>  fields=[a, b, id, name])
> +- Calc(select=[a, b, id, name], where=[(age = 10)])
>+- 
> Reused(reference_id=[1])LegacySink(name=[`default_catalog`.`default_database`.`appendSink2`],
>  fields=[a, b, id, name])
> +- Calc(select=[a, b, id, name], where=[(age = 30)])
>+- Reused(reference_id=[1])
> {code}
>  the LookupJoin node use "lookup=[id=a]"(right) not "lookup=[age=10, id=a]" 
> (wrong)
>  
> so, in "multi insert" case, planner works great
> in "single insert" case, planner throw exception



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-22955) lookup join filter push down result to mismatch function signature

2021-06-09 Thread Cooper Luan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22955?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Cooper Luan updated FLINK-22955:

Fix Version/s: 1.12.5
   1.11.4

> lookup join filter push down result to mismatch function signature
> --
>
> Key: FLINK-22955
> URL: https://issues.apache.org/jira/browse/FLINK-22955
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.11.3, 1.13.1, 1.12.4
> Environment: Flink 1.13.1
> how to reproduce: patch file attached
>Reporter: Cooper Luan
>Priority: Critical
> Fix For: 1.11.4, 1.12.5
>
> Attachments: 
> 0001-try-to-produce-lookup-join-filter-pushdown-expensive.patch
>
>
> a sql like this may result to look function signature mismatch exception when 
> explain sql
> {code:sql}
> CREATE TEMPORARY VIEW v_vvv AS
> SELECT * FROM MyTable AS T
> JOIN LookupTableAsync1 FOR SYSTEM_TIME AS OF T.proctime AS D
> ON T.a = D.id;
> SELECT a,b,id,name
> FROM v_vvv
> WHERE age = 10;{code}
> the lookup function is
> {code:scala}
> class AsyncTableFunction1 extends AsyncTableFunction[RowData] {
>   def eval(resultFuture: CompletableFuture[JCollection[RowData]], a: 
> Integer): Unit = {
>   }
> }{code}
> exec plan is
> {code:java}
> LegacySink(name=[`default_catalog`.`default_database`.`appendSink1`], 
> fields=[a, b, id, name])
> +- LookupJoin(table=[default_catalog.default_database.LookupTableAsync1], 
> joinType=[InnerJoin], async=[true], lookup=[age=10, id=a], where=[(age = 
> 10)], select=[a, b, id, name])
>+- Calc(select=[a, b])
>   +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], 
> fields=[a, b, c, proctime, rowtime])
> {code}
> the "lookup=[age=10, id=a]" result to mismatch signature mismatch
>  
> but if I add 1 more insert, it works well
> {code:sql}
> SELECT a,b,id,name
> FROM v_vvv
> WHERE age = 30
> {code}
> exec plan is
> {code:java}
> == Optimized Execution Plan ==
> LookupJoin(table=[default_catalog.default_database.LookupTableAsync1], 
> joinType=[InnerJoin], async=[true], lookup=[id=a], select=[a, b, c, proctime, 
> rowtime, id, name, age, ts])(reuse_id=[1])
> +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], 
> fields=[a, b, c, proctime, 
> rowtime])LegacySink(name=[`default_catalog`.`default_database`.`appendSink1`],
>  fields=[a, b, id, name])
> +- Calc(select=[a, b, id, name], where=[(age = 10)])
>+- 
> Reused(reference_id=[1])LegacySink(name=[`default_catalog`.`default_database`.`appendSink2`],
>  fields=[a, b, id, name])
> +- Calc(select=[a, b, id, name], where=[(age = 30)])
>+- Reused(reference_id=[1])
> {code}
>  the LookupJoin node use "lookup=[id=a]"(right) not "lookup=[age=10, id=a]" 
> (wrong)
>  
> so, in "multi insert" case, planner works great
> in "single insert" case, planner throw exception



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-22955) lookup join filter push down result to mismatch function signature

2021-06-09 Thread Cooper Luan (Jira)
Cooper Luan created FLINK-22955:
---

 Summary: lookup join filter push down result to mismatch function 
signature
 Key: FLINK-22955
 URL: https://issues.apache.org/jira/browse/FLINK-22955
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner
Affects Versions: 1.12.4, 1.13.1, 1.11.3
 Environment: Flink 1.13.1

how to reproduce: patch file attached
Reporter: Cooper Luan
 Attachments: 
0001-try-to-produce-lookup-join-filter-pushdown-expensive.patch

a sql like this may result to look function signature mismatch exception when 
explain sql
{code:sql}
CREATE TEMPORARY VIEW v_vvv AS
SELECT * FROM MyTable AS T
JOIN LookupTableAsync1 FOR SYSTEM_TIME AS OF T.proctime AS D
ON T.a = D.id;

SELECT a,b,id,name
FROM v_vvv
WHERE age = 10;{code}
the lookup function is
{code:scala}
class AsyncTableFunction1 extends AsyncTableFunction[RowData] {
  def eval(resultFuture: CompletableFuture[JCollection[RowData]], a: Integer): 
Unit = {
  }
}{code}
exec plan is
{code:java}
LegacySink(name=[`default_catalog`.`default_database`.`appendSink1`], 
fields=[a, b, id, name])
+- LookupJoin(table=[default_catalog.default_database.LookupTableAsync1], 
joinType=[InnerJoin], async=[true], lookup=[age=10, id=a], where=[(age = 10)], 
select=[a, b, id, name])
   +- Calc(select=[a, b])
  +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], 
fields=[a, b, c, proctime, rowtime])
{code}
the "lookup=[age=10, id=a]" result to mismatch signature mismatch

 

but if I add 1 more insert, it works well
{code:sql}
SELECT a,b,id,name
FROM v_vvv
WHERE age = 30
{code}
exec plan is
{code:java}
== Optimized Execution Plan ==
LookupJoin(table=[default_catalog.default_database.LookupTableAsync1], 
joinType=[InnerJoin], async=[true], lookup=[id=a], select=[a, b, c, proctime, 
rowtime, id, name, age, ts])(reuse_id=[1])
+- DataStreamScan(table=[[default_catalog, default_database, MyTable]], 
fields=[a, b, c, proctime, 
rowtime])LegacySink(name=[`default_catalog`.`default_database`.`appendSink1`], 
fields=[a, b, id, name])
+- Calc(select=[a, b, id, name], where=[(age = 10)])
   +- 
Reused(reference_id=[1])LegacySink(name=[`default_catalog`.`default_database`.`appendSink2`],
 fields=[a, b, id, name])
+- Calc(select=[a, b, id, name], where=[(age = 30)])
   +- Reused(reference_id=[1])

{code}
 the LookupJoin node use "lookup=[id=a]"(right) not "lookup=[age=10, id=a]" 
(wrong)

 

so, in "multi insert" case, planner works great

in "single insert" case, planner throw exception



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-14055) Add advanced function DDL syntax "USING JAR/FILE/ACHIVE"

2021-03-09 Thread Cooper Luan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14055?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17298060#comment-17298060
 ] 

Cooper Luan commented on FLINK-14055:
-

[~hpeter] Is there a FLIP design doc now? we're looking forward to this feature

> Add advanced function DDL syntax "USING JAR/FILE/ACHIVE"
> 
>
> Key: FLINK-14055
> URL: https://issues.apache.org/jira/browse/FLINK-14055
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Reporter: Bowen Li
>Assignee: Zhenqiu Huang
>Priority: Major
>  Labels: sprint
> Fix For: 1.13.0
>
>
> As FLINK-7151 adds basic function DDL to Flink, this ticket is to support 
> dynamically loading functions from external source in function DDL with 
> advanced syntax like 
>  
> {code:java}
> CREATE FUNCTION func_name as class_name USING JAR/FILE/ACHIEVE 'xxx' [, 
> JAR/FILE/ACHIEVE 'yyy'] ;
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)