[jira] [Comment Edited] (FLINK-22955) lookup join filter push down result to mismatch function signature
[ https://issues.apache.org/jira/browse/FLINK-22955?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17365856#comment-17365856 ] Cooper Luan edited comment on FLINK-22955 at 6/19/21, 6:33 AM: --- it's ok to implement LookupFunction like JdbcRowDataLookupFunction/JdbcDynamicTableSource because LookupFunction accept eval(Object... keys) AsyncLookupFunction do not accept eval(Object... keys), so we have to implement {code:scala} def eval(resultFuture: CompletableFuture[JCollection[RowData]], a: Integer) def eval(resultFuture: CompletableFuture[JCollection[RowData]], a: Integer, b Integer) def eval(resultFuture: CompletableFuture[JCollection[RowData]], a: Integer, b Integer, c String) def eval(resultFuture: CompletableFuture[JCollection[RowData]], a: Integer, b String, c String) {code} *but we don't know user schema (in lookup db like redis/hbase), which means don't know how to implement eval function.* however, there is workaround, just change {code:sql} SELECT a,b,id,name FROM v_vvv WHERE age = 30 {code} to {code:sql} SELECT a,b,id,name FROM v_vvv WHERE cast(age as bigint) = 30 {code} so the optimizer won't push down filter [~qingru zhang] was (Author: gsavl): it's ok to implement LookupFunction like JdbcDynamicTableSource because LookupFunction accept eval(Object... keys) AsyncLookupFunction do not accept eval(Object... keys), so we have to implement {code:scala} def eval(resultFuture: CompletableFuture[JCollection[RowData]], a: Integer) def eval(resultFuture: CompletableFuture[JCollection[RowData]], a: Integer, b Integer) def eval(resultFuture: CompletableFuture[JCollection[RowData]], a: Integer, b Integer, c String) def eval(resultFuture: CompletableFuture[JCollection[RowData]], a: Integer, b String, c String) {code} *but we don't know user schema (in lookup db like redis/hbase), which means don't know how to implement eval function.* however, there is workaround, just change {code:sql} SELECT a,b,id,name FROM v_vvv WHERE age = 30 {code} to {code:sql} SELECT a,b,id,name FROM v_vvv WHERE cast(age as bigint) = 30 {code} so the optimizer won't push down filter [~qingru zhang] > lookup join filter push down result to mismatch function signature > -- > > Key: FLINK-22955 > URL: https://issues.apache.org/jira/browse/FLINK-22955 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.11.3, 1.13.1, 1.12.4 > Environment: Flink 1.13.1 > how to reproduce: patch file attached >Reporter: Cooper Luan >Priority: Critical > Fix For: 1.11.4, 1.12.5, 1.13.2 > > Attachments: > 0001-try-to-produce-lookup-join-filter-pushdown-expensive.patch > > > a sql like this may result to look function signature mismatch exception when > explain sql > {code:sql} > CREATE TEMPORARY VIEW v_vvv AS > SELECT * FROM MyTable AS T > JOIN LookupTableAsync1 FOR SYSTEM_TIME AS OF T.proctime AS D > ON T.a = D.id; > SELECT a,b,id,name > FROM v_vvv > WHERE age = 10;{code} > the lookup function is > {code:scala} > class AsyncTableFunction1 extends AsyncTableFunction[RowData] { > def eval(resultFuture: CompletableFuture[JCollection[RowData]], a: > Integer): Unit = { > } > }{code} > exec plan is > {code:java} > LegacySink(name=[`default_catalog`.`default_database`.`appendSink1`], > fields=[a, b, id, name]) > +- LookupJoin(table=[default_catalog.default_database.LookupTableAsync1], > joinType=[InnerJoin], async=[true], lookup=[age=10, id=a], where=[(age = > 10)], select=[a, b, id, name]) >+- Calc(select=[a, b]) > +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], > fields=[a, b, c, proctime, rowtime]) > {code} > the "lookup=[age=10, id=a]" result to mismatch signature mismatch > > but if I add 1 more insert, it works well > {code:sql} > SELECT a,b,id,name > FROM v_vvv > WHERE age = 30 > {code} > exec plan is > {code:java} > == Optimized Execution Plan == > LookupJoin(table=[default_catalog.default_database.LookupTableAsync1], > joinType=[InnerJoin], async=[true], lookup=[id=a], select=[a, b, c, proctime, > rowtime, id, name, age, ts])(reuse_id=[1]) > +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], > fields=[a, b, c, proctime, > rowtime])LegacySink(name=[`default_catalog`.`default_database`.`appendSink1`], > fields=[a, b, id, name]) > +- Calc(select=[a, b, id, name], where=[(age = 10)]) >+- > Reused(reference_id=[1])LegacySink(name=[`default_catalog`.`default_database`.`appendSink2`], > fields=[a, b, id, name]) > +- Calc(select=[a, b, id, name], where=[(age = 30)]) >+- Reused(reference_id=[1]) > {code} > the LookupJoin node use "lookup=[id=a]"(right) not "lookup=[age=10, id=a]" > (wrong) > > so, in "multi insert" case,
[jira] [Comment Edited] (FLINK-22955) lookup join filter push down result to mismatch function signature
[ https://issues.apache.org/jira/browse/FLINK-22955?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17365856#comment-17365856 ] Cooper Luan edited comment on FLINK-22955 at 6/19/21, 6:31 AM: --- it's ok to implement LookupFunction like JdbcDynamicTableSource because LookupFunction accept eval(Object... keys) AsyncLookupFunction do not accept eval(Object... keys), so we have to implement {code:scala} def eval(resultFuture: CompletableFuture[JCollection[RowData]], a: Integer) def eval(resultFuture: CompletableFuture[JCollection[RowData]], a: Integer, b Integer) def eval(resultFuture: CompletableFuture[JCollection[RowData]], a: Integer, b Integer, c String) def eval(resultFuture: CompletableFuture[JCollection[RowData]], a: Integer, b String, c String) {code} *but we don't know user schema (in lookup db like redis/hbase), which means don't know how to implement eval function.* however, there is workaround, just change {code:sql} SELECT a,b,id,name FROM v_vvv WHERE age = 30 {code} to {code:sql} SELECT a,b,id,name FROM v_vvv WHERE cast(age as bigint) = 30 {code} so the optimizer won't push down filter [~qingru zhang] was (Author: gsavl): it's ok to implement LookupFunction like JdbcDynamicTableSource because LookupFunction accept eval(Object... keys) for AsyncLookupFunction we have to implement {code:scala} def eval(resultFuture: CompletableFuture[JCollection[RowData]], a: Integer) def eval(resultFuture: CompletableFuture[JCollection[RowData]], a: Integer, b Integer) def eval(resultFuture: CompletableFuture[JCollection[RowData]], a: Integer, b Integer, c String) def eval(resultFuture: CompletableFuture[JCollection[RowData]], a: Integer, b String, c String) {code} *but we don't know user schema (in lookup db like redis/hbase), which means don't know how to implement eval function.* however, there is workaround, just change {code:sql} SELECT a,b,id,name FROM v_vvv WHERE age = 30 {code} to {code:sql} SELECT a,b,id,name FROM v_vvv WHERE cast(age as bigint) = 30 {code} so the optimizer won't push down filter [~qingru zhang] > lookup join filter push down result to mismatch function signature > -- > > Key: FLINK-22955 > URL: https://issues.apache.org/jira/browse/FLINK-22955 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.11.3, 1.13.1, 1.12.4 > Environment: Flink 1.13.1 > how to reproduce: patch file attached >Reporter: Cooper Luan >Priority: Critical > Fix For: 1.11.4, 1.12.5, 1.13.2 > > Attachments: > 0001-try-to-produce-lookup-join-filter-pushdown-expensive.patch > > > a sql like this may result to look function signature mismatch exception when > explain sql > {code:sql} > CREATE TEMPORARY VIEW v_vvv AS > SELECT * FROM MyTable AS T > JOIN LookupTableAsync1 FOR SYSTEM_TIME AS OF T.proctime AS D > ON T.a = D.id; > SELECT a,b,id,name > FROM v_vvv > WHERE age = 10;{code} > the lookup function is > {code:scala} > class AsyncTableFunction1 extends AsyncTableFunction[RowData] { > def eval(resultFuture: CompletableFuture[JCollection[RowData]], a: > Integer): Unit = { > } > }{code} > exec plan is > {code:java} > LegacySink(name=[`default_catalog`.`default_database`.`appendSink1`], > fields=[a, b, id, name]) > +- LookupJoin(table=[default_catalog.default_database.LookupTableAsync1], > joinType=[InnerJoin], async=[true], lookup=[age=10, id=a], where=[(age = > 10)], select=[a, b, id, name]) >+- Calc(select=[a, b]) > +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], > fields=[a, b, c, proctime, rowtime]) > {code} > the "lookup=[age=10, id=a]" result to mismatch signature mismatch > > but if I add 1 more insert, it works well > {code:sql} > SELECT a,b,id,name > FROM v_vvv > WHERE age = 30 > {code} > exec plan is > {code:java} > == Optimized Execution Plan == > LookupJoin(table=[default_catalog.default_database.LookupTableAsync1], > joinType=[InnerJoin], async=[true], lookup=[id=a], select=[a, b, c, proctime, > rowtime, id, name, age, ts])(reuse_id=[1]) > +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], > fields=[a, b, c, proctime, > rowtime])LegacySink(name=[`default_catalog`.`default_database`.`appendSink1`], > fields=[a, b, id, name]) > +- Calc(select=[a, b, id, name], where=[(age = 10)]) >+- > Reused(reference_id=[1])LegacySink(name=[`default_catalog`.`default_database`.`appendSink2`], > fields=[a, b, id, name]) > +- Calc(select=[a, b, id, name], where=[(age = 30)]) >+- Reused(reference_id=[1]) > {code} > the LookupJoin node use "lookup=[id=a]"(right) not "lookup=[age=10, id=a]" > (wrong) > > so, in "multi insert" case, planner works great > in "single insert" case, planner throw
[jira] [Commented] (FLINK-22955) lookup join filter push down result to mismatch function signature
[ https://issues.apache.org/jira/browse/FLINK-22955?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17365856#comment-17365856 ] Cooper Luan commented on FLINK-22955: - it's ok to implement LookupFunction like JdbcDynamicTableSource because LookupFunction accept eval(Object... keys) for AsyncLookupFunction we have to implement {code:scala} def eval(resultFuture: CompletableFuture[JCollection[RowData]], a: Integer) def eval(resultFuture: CompletableFuture[JCollection[RowData]], a: Integer, b Integer) def eval(resultFuture: CompletableFuture[JCollection[RowData]], a: Integer, b Integer, c String) def eval(resultFuture: CompletableFuture[JCollection[RowData]], a: Integer, b String, c String) {code} *but we don't know schema (in lookup db like redis/hbase), which means don't know how to implement eval function.* however, there is workaround, just change {code:sql} SELECT a,b,id,name FROM v_vvv WHERE age = 30 {code} to {code:sql} SELECT a,b,id,name FROM v_vvv WHERE cast(age as bigint) = 30 {code} so the optimizer won't push down filter [~qingru zhang] > lookup join filter push down result to mismatch function signature > -- > > Key: FLINK-22955 > URL: https://issues.apache.org/jira/browse/FLINK-22955 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.11.3, 1.13.1, 1.12.4 > Environment: Flink 1.13.1 > how to reproduce: patch file attached >Reporter: Cooper Luan >Priority: Critical > Fix For: 1.11.4, 1.12.5, 1.13.2 > > Attachments: > 0001-try-to-produce-lookup-join-filter-pushdown-expensive.patch > > > a sql like this may result to look function signature mismatch exception when > explain sql > {code:sql} > CREATE TEMPORARY VIEW v_vvv AS > SELECT * FROM MyTable AS T > JOIN LookupTableAsync1 FOR SYSTEM_TIME AS OF T.proctime AS D > ON T.a = D.id; > SELECT a,b,id,name > FROM v_vvv > WHERE age = 10;{code} > the lookup function is > {code:scala} > class AsyncTableFunction1 extends AsyncTableFunction[RowData] { > def eval(resultFuture: CompletableFuture[JCollection[RowData]], a: > Integer): Unit = { > } > }{code} > exec plan is > {code:java} > LegacySink(name=[`default_catalog`.`default_database`.`appendSink1`], > fields=[a, b, id, name]) > +- LookupJoin(table=[default_catalog.default_database.LookupTableAsync1], > joinType=[InnerJoin], async=[true], lookup=[age=10, id=a], where=[(age = > 10)], select=[a, b, id, name]) >+- Calc(select=[a, b]) > +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], > fields=[a, b, c, proctime, rowtime]) > {code} > the "lookup=[age=10, id=a]" result to mismatch signature mismatch > > but if I add 1 more insert, it works well > {code:sql} > SELECT a,b,id,name > FROM v_vvv > WHERE age = 30 > {code} > exec plan is > {code:java} > == Optimized Execution Plan == > LookupJoin(table=[default_catalog.default_database.LookupTableAsync1], > joinType=[InnerJoin], async=[true], lookup=[id=a], select=[a, b, c, proctime, > rowtime, id, name, age, ts])(reuse_id=[1]) > +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], > fields=[a, b, c, proctime, > rowtime])LegacySink(name=[`default_catalog`.`default_database`.`appendSink1`], > fields=[a, b, id, name]) > +- Calc(select=[a, b, id, name], where=[(age = 10)]) >+- > Reused(reference_id=[1])LegacySink(name=[`default_catalog`.`default_database`.`appendSink2`], > fields=[a, b, id, name]) > +- Calc(select=[a, b, id, name], where=[(age = 30)]) >+- Reused(reference_id=[1]) > {code} > the LookupJoin node use "lookup=[id=a]"(right) not "lookup=[age=10, id=a]" > (wrong) > > so, in "multi insert" case, planner works great > in "single insert" case, planner throw exception -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (FLINK-22955) lookup join filter push down result to mismatch function signature
[ https://issues.apache.org/jira/browse/FLINK-22955?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17365856#comment-17365856 ] Cooper Luan edited comment on FLINK-22955 at 6/19/21, 6:30 AM: --- it's ok to implement LookupFunction like JdbcDynamicTableSource because LookupFunction accept eval(Object... keys) for AsyncLookupFunction we have to implement {code:scala} def eval(resultFuture: CompletableFuture[JCollection[RowData]], a: Integer) def eval(resultFuture: CompletableFuture[JCollection[RowData]], a: Integer, b Integer) def eval(resultFuture: CompletableFuture[JCollection[RowData]], a: Integer, b Integer, c String) def eval(resultFuture: CompletableFuture[JCollection[RowData]], a: Integer, b String, c String) {code} *but we don't know user schema (in lookup db like redis/hbase), which means don't know how to implement eval function.* however, there is workaround, just change {code:sql} SELECT a,b,id,name FROM v_vvv WHERE age = 30 {code} to {code:sql} SELECT a,b,id,name FROM v_vvv WHERE cast(age as bigint) = 30 {code} so the optimizer won't push down filter [~qingru zhang] was (Author: gsavl): it's ok to implement LookupFunction like JdbcDynamicTableSource because LookupFunction accept eval(Object... keys) for AsyncLookupFunction we have to implement {code:scala} def eval(resultFuture: CompletableFuture[JCollection[RowData]], a: Integer) def eval(resultFuture: CompletableFuture[JCollection[RowData]], a: Integer, b Integer) def eval(resultFuture: CompletableFuture[JCollection[RowData]], a: Integer, b Integer, c String) def eval(resultFuture: CompletableFuture[JCollection[RowData]], a: Integer, b String, c String) {code} *but we don't know schema (in lookup db like redis/hbase), which means don't know how to implement eval function.* however, there is workaround, just change {code:sql} SELECT a,b,id,name FROM v_vvv WHERE age = 30 {code} to {code:sql} SELECT a,b,id,name FROM v_vvv WHERE cast(age as bigint) = 30 {code} so the optimizer won't push down filter [~qingru zhang] > lookup join filter push down result to mismatch function signature > -- > > Key: FLINK-22955 > URL: https://issues.apache.org/jira/browse/FLINK-22955 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.11.3, 1.13.1, 1.12.4 > Environment: Flink 1.13.1 > how to reproduce: patch file attached >Reporter: Cooper Luan >Priority: Critical > Fix For: 1.11.4, 1.12.5, 1.13.2 > > Attachments: > 0001-try-to-produce-lookup-join-filter-pushdown-expensive.patch > > > a sql like this may result to look function signature mismatch exception when > explain sql > {code:sql} > CREATE TEMPORARY VIEW v_vvv AS > SELECT * FROM MyTable AS T > JOIN LookupTableAsync1 FOR SYSTEM_TIME AS OF T.proctime AS D > ON T.a = D.id; > SELECT a,b,id,name > FROM v_vvv > WHERE age = 10;{code} > the lookup function is > {code:scala} > class AsyncTableFunction1 extends AsyncTableFunction[RowData] { > def eval(resultFuture: CompletableFuture[JCollection[RowData]], a: > Integer): Unit = { > } > }{code} > exec plan is > {code:java} > LegacySink(name=[`default_catalog`.`default_database`.`appendSink1`], > fields=[a, b, id, name]) > +- LookupJoin(table=[default_catalog.default_database.LookupTableAsync1], > joinType=[InnerJoin], async=[true], lookup=[age=10, id=a], where=[(age = > 10)], select=[a, b, id, name]) >+- Calc(select=[a, b]) > +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], > fields=[a, b, c, proctime, rowtime]) > {code} > the "lookup=[age=10, id=a]" result to mismatch signature mismatch > > but if I add 1 more insert, it works well > {code:sql} > SELECT a,b,id,name > FROM v_vvv > WHERE age = 30 > {code} > exec plan is > {code:java} > == Optimized Execution Plan == > LookupJoin(table=[default_catalog.default_database.LookupTableAsync1], > joinType=[InnerJoin], async=[true], lookup=[id=a], select=[a, b, c, proctime, > rowtime, id, name, age, ts])(reuse_id=[1]) > +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], > fields=[a, b, c, proctime, > rowtime])LegacySink(name=[`default_catalog`.`default_database`.`appendSink1`], > fields=[a, b, id, name]) > +- Calc(select=[a, b, id, name], where=[(age = 10)]) >+- > Reused(reference_id=[1])LegacySink(name=[`default_catalog`.`default_database`.`appendSink2`], > fields=[a, b, id, name]) > +- Calc(select=[a, b, id, name], where=[(age = 30)]) >+- Reused(reference_id=[1]) > {code} > the LookupJoin node use "lookup=[id=a]"(right) not "lookup=[age=10, id=a]" > (wrong) > > so, in "multi insert" case, planner works great > in "single insert" case, planner throw exception -- This message was sent by
[jira] [Comment Edited] (FLINK-22955) lookup join filter push down result to mismatch function signature
[ https://issues.apache.org/jira/browse/FLINK-22955?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17365382#comment-17365382 ] Cooper Luan edited comment on FLINK-22955 at 6/18/21, 9:16 AM: --- actually it's not possible for us to implement more eval method(it's not a once for all work) is there a rule in PHYSICAL_OPT_RULES I can disable (as a workaround)? so the filer won't push down to lookup table [~qingru zhang] was (Author: gsavl): actually it's not possible for us to implement more eval method is there a rule in PHYSICAL_OPT_RULES I can disable (as a workaround)? so the filer won't push down to lookup table [~qingru zhang] > lookup join filter push down result to mismatch function signature > -- > > Key: FLINK-22955 > URL: https://issues.apache.org/jira/browse/FLINK-22955 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.11.3, 1.13.1, 1.12.4 > Environment: Flink 1.13.1 > how to reproduce: patch file attached >Reporter: Cooper Luan >Priority: Critical > Fix For: 1.11.4, 1.12.5, 1.13.2 > > Attachments: > 0001-try-to-produce-lookup-join-filter-pushdown-expensive.patch > > > a sql like this may result to look function signature mismatch exception when > explain sql > {code:sql} > CREATE TEMPORARY VIEW v_vvv AS > SELECT * FROM MyTable AS T > JOIN LookupTableAsync1 FOR SYSTEM_TIME AS OF T.proctime AS D > ON T.a = D.id; > SELECT a,b,id,name > FROM v_vvv > WHERE age = 10;{code} > the lookup function is > {code:scala} > class AsyncTableFunction1 extends AsyncTableFunction[RowData] { > def eval(resultFuture: CompletableFuture[JCollection[RowData]], a: > Integer): Unit = { > } > }{code} > exec plan is > {code:java} > LegacySink(name=[`default_catalog`.`default_database`.`appendSink1`], > fields=[a, b, id, name]) > +- LookupJoin(table=[default_catalog.default_database.LookupTableAsync1], > joinType=[InnerJoin], async=[true], lookup=[age=10, id=a], where=[(age = > 10)], select=[a, b, id, name]) >+- Calc(select=[a, b]) > +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], > fields=[a, b, c, proctime, rowtime]) > {code} > the "lookup=[age=10, id=a]" result to mismatch signature mismatch > > but if I add 1 more insert, it works well > {code:sql} > SELECT a,b,id,name > FROM v_vvv > WHERE age = 30 > {code} > exec plan is > {code:java} > == Optimized Execution Plan == > LookupJoin(table=[default_catalog.default_database.LookupTableAsync1], > joinType=[InnerJoin], async=[true], lookup=[id=a], select=[a, b, c, proctime, > rowtime, id, name, age, ts])(reuse_id=[1]) > +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], > fields=[a, b, c, proctime, > rowtime])LegacySink(name=[`default_catalog`.`default_database`.`appendSink1`], > fields=[a, b, id, name]) > +- Calc(select=[a, b, id, name], where=[(age = 10)]) >+- > Reused(reference_id=[1])LegacySink(name=[`default_catalog`.`default_database`.`appendSink2`], > fields=[a, b, id, name]) > +- Calc(select=[a, b, id, name], where=[(age = 30)]) >+- Reused(reference_id=[1]) > {code} > the LookupJoin node use "lookup=[id=a]"(right) not "lookup=[age=10, id=a]" > (wrong) > > so, in "multi insert" case, planner works great > in "single insert" case, planner throw exception -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (FLINK-22955) lookup join filter push down result to mismatch function signature
[ https://issues.apache.org/jira/browse/FLINK-22955?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17365382#comment-17365382 ] Cooper Luan edited comment on FLINK-22955 at 6/18/21, 9:10 AM: --- actually it's not possible for us to implement more eval method is there a rule in PHYSICAL_OPT_RULES I can disable (as a workaround)? so the filer won't push down to lookup table [~qingru zhang] was (Author: gsavl): actually it's not possible for us to implement more eval method is there a rule in PHYSICAL_OPT_RULES I can disable? so the filer won't push down to lookup table [~qingru zhang] > lookup join filter push down result to mismatch function signature > -- > > Key: FLINK-22955 > URL: https://issues.apache.org/jira/browse/FLINK-22955 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.11.3, 1.13.1, 1.12.4 > Environment: Flink 1.13.1 > how to reproduce: patch file attached >Reporter: Cooper Luan >Priority: Critical > Fix For: 1.11.4, 1.12.5, 1.13.2 > > Attachments: > 0001-try-to-produce-lookup-join-filter-pushdown-expensive.patch > > > a sql like this may result to look function signature mismatch exception when > explain sql > {code:sql} > CREATE TEMPORARY VIEW v_vvv AS > SELECT * FROM MyTable AS T > JOIN LookupTableAsync1 FOR SYSTEM_TIME AS OF T.proctime AS D > ON T.a = D.id; > SELECT a,b,id,name > FROM v_vvv > WHERE age = 10;{code} > the lookup function is > {code:scala} > class AsyncTableFunction1 extends AsyncTableFunction[RowData] { > def eval(resultFuture: CompletableFuture[JCollection[RowData]], a: > Integer): Unit = { > } > }{code} > exec plan is > {code:java} > LegacySink(name=[`default_catalog`.`default_database`.`appendSink1`], > fields=[a, b, id, name]) > +- LookupJoin(table=[default_catalog.default_database.LookupTableAsync1], > joinType=[InnerJoin], async=[true], lookup=[age=10, id=a], where=[(age = > 10)], select=[a, b, id, name]) >+- Calc(select=[a, b]) > +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], > fields=[a, b, c, proctime, rowtime]) > {code} > the "lookup=[age=10, id=a]" result to mismatch signature mismatch > > but if I add 1 more insert, it works well > {code:sql} > SELECT a,b,id,name > FROM v_vvv > WHERE age = 30 > {code} > exec plan is > {code:java} > == Optimized Execution Plan == > LookupJoin(table=[default_catalog.default_database.LookupTableAsync1], > joinType=[InnerJoin], async=[true], lookup=[id=a], select=[a, b, c, proctime, > rowtime, id, name, age, ts])(reuse_id=[1]) > +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], > fields=[a, b, c, proctime, > rowtime])LegacySink(name=[`default_catalog`.`default_database`.`appendSink1`], > fields=[a, b, id, name]) > +- Calc(select=[a, b, id, name], where=[(age = 10)]) >+- > Reused(reference_id=[1])LegacySink(name=[`default_catalog`.`default_database`.`appendSink2`], > fields=[a, b, id, name]) > +- Calc(select=[a, b, id, name], where=[(age = 30)]) >+- Reused(reference_id=[1]) > {code} > the LookupJoin node use "lookup=[id=a]"(right) not "lookup=[age=10, id=a]" > (wrong) > > so, in "multi insert" case, planner works great > in "single insert" case, planner throw exception -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-22955) lookup join filter push down result to mismatch function signature
[ https://issues.apache.org/jira/browse/FLINK-22955?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17365382#comment-17365382 ] Cooper Luan commented on FLINK-22955: - actually it's not possible for us to implement more eval method is there a rule in PHYSICAL_OPT_RULES I can disable? so the filer won't push down to lookup table > lookup join filter push down result to mismatch function signature > -- > > Key: FLINK-22955 > URL: https://issues.apache.org/jira/browse/FLINK-22955 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.11.3, 1.13.1, 1.12.4 > Environment: Flink 1.13.1 > how to reproduce: patch file attached >Reporter: Cooper Luan >Priority: Critical > Fix For: 1.11.4, 1.12.5, 1.13.2 > > Attachments: > 0001-try-to-produce-lookup-join-filter-pushdown-expensive.patch > > > a sql like this may result to look function signature mismatch exception when > explain sql > {code:sql} > CREATE TEMPORARY VIEW v_vvv AS > SELECT * FROM MyTable AS T > JOIN LookupTableAsync1 FOR SYSTEM_TIME AS OF T.proctime AS D > ON T.a = D.id; > SELECT a,b,id,name > FROM v_vvv > WHERE age = 10;{code} > the lookup function is > {code:scala} > class AsyncTableFunction1 extends AsyncTableFunction[RowData] { > def eval(resultFuture: CompletableFuture[JCollection[RowData]], a: > Integer): Unit = { > } > }{code} > exec plan is > {code:java} > LegacySink(name=[`default_catalog`.`default_database`.`appendSink1`], > fields=[a, b, id, name]) > +- LookupJoin(table=[default_catalog.default_database.LookupTableAsync1], > joinType=[InnerJoin], async=[true], lookup=[age=10, id=a], where=[(age = > 10)], select=[a, b, id, name]) >+- Calc(select=[a, b]) > +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], > fields=[a, b, c, proctime, rowtime]) > {code} > the "lookup=[age=10, id=a]" result to mismatch signature mismatch > > but if I add 1 more insert, it works well > {code:sql} > SELECT a,b,id,name > FROM v_vvv > WHERE age = 30 > {code} > exec plan is > {code:java} > == Optimized Execution Plan == > LookupJoin(table=[default_catalog.default_database.LookupTableAsync1], > joinType=[InnerJoin], async=[true], lookup=[id=a], select=[a, b, c, proctime, > rowtime, id, name, age, ts])(reuse_id=[1]) > +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], > fields=[a, b, c, proctime, > rowtime])LegacySink(name=[`default_catalog`.`default_database`.`appendSink1`], > fields=[a, b, id, name]) > +- Calc(select=[a, b, id, name], where=[(age = 10)]) >+- > Reused(reference_id=[1])LegacySink(name=[`default_catalog`.`default_database`.`appendSink2`], > fields=[a, b, id, name]) > +- Calc(select=[a, b, id, name], where=[(age = 30)]) >+- Reused(reference_id=[1]) > {code} > the LookupJoin node use "lookup=[id=a]"(right) not "lookup=[age=10, id=a]" > (wrong) > > so, in "multi insert" case, planner works great > in "single insert" case, planner throw exception -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (FLINK-22955) lookup join filter push down result to mismatch function signature
[ https://issues.apache.org/jira/browse/FLINK-22955?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17365382#comment-17365382 ] Cooper Luan edited comment on FLINK-22955 at 6/18/21, 9:05 AM: --- actually it's not possible for us to implement more eval method is there a rule in PHYSICAL_OPT_RULES I can disable? so the filer won't push down to lookup table [~qingru zhang] was (Author: gsavl): actually it's not possible for us to implement more eval method is there a rule in PHYSICAL_OPT_RULES I can disable? so the filer won't push down to lookup table > lookup join filter push down result to mismatch function signature > -- > > Key: FLINK-22955 > URL: https://issues.apache.org/jira/browse/FLINK-22955 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.11.3, 1.13.1, 1.12.4 > Environment: Flink 1.13.1 > how to reproduce: patch file attached >Reporter: Cooper Luan >Priority: Critical > Fix For: 1.11.4, 1.12.5, 1.13.2 > > Attachments: > 0001-try-to-produce-lookup-join-filter-pushdown-expensive.patch > > > a sql like this may result to look function signature mismatch exception when > explain sql > {code:sql} > CREATE TEMPORARY VIEW v_vvv AS > SELECT * FROM MyTable AS T > JOIN LookupTableAsync1 FOR SYSTEM_TIME AS OF T.proctime AS D > ON T.a = D.id; > SELECT a,b,id,name > FROM v_vvv > WHERE age = 10;{code} > the lookup function is > {code:scala} > class AsyncTableFunction1 extends AsyncTableFunction[RowData] { > def eval(resultFuture: CompletableFuture[JCollection[RowData]], a: > Integer): Unit = { > } > }{code} > exec plan is > {code:java} > LegacySink(name=[`default_catalog`.`default_database`.`appendSink1`], > fields=[a, b, id, name]) > +- LookupJoin(table=[default_catalog.default_database.LookupTableAsync1], > joinType=[InnerJoin], async=[true], lookup=[age=10, id=a], where=[(age = > 10)], select=[a, b, id, name]) >+- Calc(select=[a, b]) > +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], > fields=[a, b, c, proctime, rowtime]) > {code} > the "lookup=[age=10, id=a]" result to mismatch signature mismatch > > but if I add 1 more insert, it works well > {code:sql} > SELECT a,b,id,name > FROM v_vvv > WHERE age = 30 > {code} > exec plan is > {code:java} > == Optimized Execution Plan == > LookupJoin(table=[default_catalog.default_database.LookupTableAsync1], > joinType=[InnerJoin], async=[true], lookup=[id=a], select=[a, b, c, proctime, > rowtime, id, name, age, ts])(reuse_id=[1]) > +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], > fields=[a, b, c, proctime, > rowtime])LegacySink(name=[`default_catalog`.`default_database`.`appendSink1`], > fields=[a, b, id, name]) > +- Calc(select=[a, b, id, name], where=[(age = 10)]) >+- > Reused(reference_id=[1])LegacySink(name=[`default_catalog`.`default_database`.`appendSink2`], > fields=[a, b, id, name]) > +- Calc(select=[a, b, id, name], where=[(age = 30)]) >+- Reused(reference_id=[1]) > {code} > the LookupJoin node use "lookup=[id=a]"(right) not "lookup=[age=10, id=a]" > (wrong) > > so, in "multi insert" case, planner works great > in "single insert" case, planner throw exception -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-22955) lookup join filter push down result to mismatch function signature
[ https://issues.apache.org/jira/browse/FLINK-22955?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Cooper Luan updated FLINK-22955: Fix Version/s: 1.13.2 > lookup join filter push down result to mismatch function signature > -- > > Key: FLINK-22955 > URL: https://issues.apache.org/jira/browse/FLINK-22955 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.11.3, 1.13.1, 1.12.4 > Environment: Flink 1.13.1 > how to reproduce: patch file attached >Reporter: Cooper Luan >Priority: Critical > Fix For: 1.11.4, 1.12.5, 1.13.2 > > Attachments: > 0001-try-to-produce-lookup-join-filter-pushdown-expensive.patch > > > a sql like this may result to look function signature mismatch exception when > explain sql > {code:sql} > CREATE TEMPORARY VIEW v_vvv AS > SELECT * FROM MyTable AS T > JOIN LookupTableAsync1 FOR SYSTEM_TIME AS OF T.proctime AS D > ON T.a = D.id; > SELECT a,b,id,name > FROM v_vvv > WHERE age = 10;{code} > the lookup function is > {code:scala} > class AsyncTableFunction1 extends AsyncTableFunction[RowData] { > def eval(resultFuture: CompletableFuture[JCollection[RowData]], a: > Integer): Unit = { > } > }{code} > exec plan is > {code:java} > LegacySink(name=[`default_catalog`.`default_database`.`appendSink1`], > fields=[a, b, id, name]) > +- LookupJoin(table=[default_catalog.default_database.LookupTableAsync1], > joinType=[InnerJoin], async=[true], lookup=[age=10, id=a], where=[(age = > 10)], select=[a, b, id, name]) >+- Calc(select=[a, b]) > +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], > fields=[a, b, c, proctime, rowtime]) > {code} > the "lookup=[age=10, id=a]" result to mismatch signature mismatch > > but if I add 1 more insert, it works well > {code:sql} > SELECT a,b,id,name > FROM v_vvv > WHERE age = 30 > {code} > exec plan is > {code:java} > == Optimized Execution Plan == > LookupJoin(table=[default_catalog.default_database.LookupTableAsync1], > joinType=[InnerJoin], async=[true], lookup=[id=a], select=[a, b, c, proctime, > rowtime, id, name, age, ts])(reuse_id=[1]) > +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], > fields=[a, b, c, proctime, > rowtime])LegacySink(name=[`default_catalog`.`default_database`.`appendSink1`], > fields=[a, b, id, name]) > +- Calc(select=[a, b, id, name], where=[(age = 10)]) >+- > Reused(reference_id=[1])LegacySink(name=[`default_catalog`.`default_database`.`appendSink2`], > fields=[a, b, id, name]) > +- Calc(select=[a, b, id, name], where=[(age = 30)]) >+- Reused(reference_id=[1]) > {code} > the LookupJoin node use "lookup=[id=a]"(right) not "lookup=[age=10, id=a]" > (wrong) > > so, in "multi insert" case, planner works great > in "single insert" case, planner throw exception -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-22955) lookup join filter push down result to mismatch function signature
[ https://issues.apache.org/jira/browse/FLINK-22955?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Cooper Luan updated FLINK-22955: Fix Version/s: 1.12.5 1.11.4 > lookup join filter push down result to mismatch function signature > -- > > Key: FLINK-22955 > URL: https://issues.apache.org/jira/browse/FLINK-22955 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.11.3, 1.13.1, 1.12.4 > Environment: Flink 1.13.1 > how to reproduce: patch file attached >Reporter: Cooper Luan >Priority: Critical > Fix For: 1.11.4, 1.12.5 > > Attachments: > 0001-try-to-produce-lookup-join-filter-pushdown-expensive.patch > > > a sql like this may result to look function signature mismatch exception when > explain sql > {code:sql} > CREATE TEMPORARY VIEW v_vvv AS > SELECT * FROM MyTable AS T > JOIN LookupTableAsync1 FOR SYSTEM_TIME AS OF T.proctime AS D > ON T.a = D.id; > SELECT a,b,id,name > FROM v_vvv > WHERE age = 10;{code} > the lookup function is > {code:scala} > class AsyncTableFunction1 extends AsyncTableFunction[RowData] { > def eval(resultFuture: CompletableFuture[JCollection[RowData]], a: > Integer): Unit = { > } > }{code} > exec plan is > {code:java} > LegacySink(name=[`default_catalog`.`default_database`.`appendSink1`], > fields=[a, b, id, name]) > +- LookupJoin(table=[default_catalog.default_database.LookupTableAsync1], > joinType=[InnerJoin], async=[true], lookup=[age=10, id=a], where=[(age = > 10)], select=[a, b, id, name]) >+- Calc(select=[a, b]) > +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], > fields=[a, b, c, proctime, rowtime]) > {code} > the "lookup=[age=10, id=a]" result to mismatch signature mismatch > > but if I add 1 more insert, it works well > {code:sql} > SELECT a,b,id,name > FROM v_vvv > WHERE age = 30 > {code} > exec plan is > {code:java} > == Optimized Execution Plan == > LookupJoin(table=[default_catalog.default_database.LookupTableAsync1], > joinType=[InnerJoin], async=[true], lookup=[id=a], select=[a, b, c, proctime, > rowtime, id, name, age, ts])(reuse_id=[1]) > +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], > fields=[a, b, c, proctime, > rowtime])LegacySink(name=[`default_catalog`.`default_database`.`appendSink1`], > fields=[a, b, id, name]) > +- Calc(select=[a, b, id, name], where=[(age = 10)]) >+- > Reused(reference_id=[1])LegacySink(name=[`default_catalog`.`default_database`.`appendSink2`], > fields=[a, b, id, name]) > +- Calc(select=[a, b, id, name], where=[(age = 30)]) >+- Reused(reference_id=[1]) > {code} > the LookupJoin node use "lookup=[id=a]"(right) not "lookup=[age=10, id=a]" > (wrong) > > so, in "multi insert" case, planner works great > in "single insert" case, planner throw exception -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-22955) lookup join filter push down result to mismatch function signature
Cooper Luan created FLINK-22955: --- Summary: lookup join filter push down result to mismatch function signature Key: FLINK-22955 URL: https://issues.apache.org/jira/browse/FLINK-22955 Project: Flink Issue Type: Bug Components: Table SQL / Planner Affects Versions: 1.12.4, 1.13.1, 1.11.3 Environment: Flink 1.13.1 how to reproduce: patch file attached Reporter: Cooper Luan Attachments: 0001-try-to-produce-lookup-join-filter-pushdown-expensive.patch a sql like this may result to look function signature mismatch exception when explain sql {code:sql} CREATE TEMPORARY VIEW v_vvv AS SELECT * FROM MyTable AS T JOIN LookupTableAsync1 FOR SYSTEM_TIME AS OF T.proctime AS D ON T.a = D.id; SELECT a,b,id,name FROM v_vvv WHERE age = 10;{code} the lookup function is {code:scala} class AsyncTableFunction1 extends AsyncTableFunction[RowData] { def eval(resultFuture: CompletableFuture[JCollection[RowData]], a: Integer): Unit = { } }{code} exec plan is {code:java} LegacySink(name=[`default_catalog`.`default_database`.`appendSink1`], fields=[a, b, id, name]) +- LookupJoin(table=[default_catalog.default_database.LookupTableAsync1], joinType=[InnerJoin], async=[true], lookup=[age=10, id=a], where=[(age = 10)], select=[a, b, id, name]) +- Calc(select=[a, b]) +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime]) {code} the "lookup=[age=10, id=a]" result to mismatch signature mismatch but if I add 1 more insert, it works well {code:sql} SELECT a,b,id,name FROM v_vvv WHERE age = 30 {code} exec plan is {code:java} == Optimized Execution Plan == LookupJoin(table=[default_catalog.default_database.LookupTableAsync1], joinType=[InnerJoin], async=[true], lookup=[id=a], select=[a, b, c, proctime, rowtime, id, name, age, ts])(reuse_id=[1]) +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime])LegacySink(name=[`default_catalog`.`default_database`.`appendSink1`], fields=[a, b, id, name]) +- Calc(select=[a, b, id, name], where=[(age = 10)]) +- Reused(reference_id=[1])LegacySink(name=[`default_catalog`.`default_database`.`appendSink2`], fields=[a, b, id, name]) +- Calc(select=[a, b, id, name], where=[(age = 30)]) +- Reused(reference_id=[1]) {code} the LookupJoin node use "lookup=[id=a]"(right) not "lookup=[age=10, id=a]" (wrong) so, in "multi insert" case, planner works great in "single insert" case, planner throw exception -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-14055) Add advanced function DDL syntax "USING JAR/FILE/ACHIVE"
[ https://issues.apache.org/jira/browse/FLINK-14055?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17298060#comment-17298060 ] Cooper Luan commented on FLINK-14055: - [~hpeter] Is there a FLIP design doc now? we're looking forward to this feature > Add advanced function DDL syntax "USING JAR/FILE/ACHIVE" > > > Key: FLINK-14055 > URL: https://issues.apache.org/jira/browse/FLINK-14055 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / API >Reporter: Bowen Li >Assignee: Zhenqiu Huang >Priority: Major > Labels: sprint > Fix For: 1.13.0 > > > As FLINK-7151 adds basic function DDL to Flink, this ticket is to support > dynamically loading functions from external source in function DDL with > advanced syntax like > > {code:java} > CREATE FUNCTION func_name as class_name USING JAR/FILE/ACHIEVE 'xxx' [, > JAR/FILE/ACHIEVE 'yyy'] ; > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)