[jira] [Commented] (FLINK-32608) Improve source reusing with projection push down
[ https://issues.apache.org/jira/browse/FLINK-32608?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17753066#comment-17753066 ] Aitozi commented on FLINK-32608: I think this issue is duplicated of https://issues.apache.org/jira/browse/FLINK-29088 > Improve source reusing with projection push down > > > Key: FLINK-32608 > URL: https://issues.apache.org/jira/browse/FLINK-32608 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / Planner >Reporter: dalongliu >Assignee: dalongliu >Priority: Major > Labels: pull-request-available > Fix For: 1.18.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-32699) select typeof(proctime()); throw exception in sql-client
[ https://issues.apache.org/jira/browse/FLINK-32699?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17750530#comment-17750530 ] Aitozi commented on FLINK-32699: Hi [~jackylau], sorry for not notice this ticket, I have opened another ticket https://issues.apache.org/jira/projects/FLINK/issues/FLINK-32711 about this, could you help also take a review for that ? > select typeof(proctime()); throw exception in sql-client > > > Key: FLINK-32699 > URL: https://issues.apache.org/jira/browse/FLINK-32699 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Planner >Affects Versions: 1.18.0 >Reporter: Jacky Lau >Priority: Major > Fix For: 1.18.0 > > > {code:java} > Flink SQL> select typeof(proctime()); > > [ERROR] Could not execute SQL statement. Reason: > org.apache.flink.table.planner.codegen.CodeGenException: Mismatch of > function's argument data type 'TIMESTAMP_LTZ(3) NOT NULL' and actual argument > type 'TIMESTAMP_LTZ(3)' > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-32659) DB connection may leak if exception is thrown in JdbcOutputFormat#close
[ https://issues.apache.org/jira/browse/FLINK-32659?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17749428#comment-17749428 ] Aitozi commented on FLINK-32659: Can someone help review the fix: https://github.com/apache/flink-connector-jdbc/pull/71 > DB connection may leak if exception is thrown in JdbcOutputFormat#close > --- > > Key: FLINK-32659 > URL: https://issues.apache.org/jira/browse/FLINK-32659 > Project: Flink > Issue Type: Improvement > Components: Connectors / JDBC >Reporter: Aitozi >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-32711) Type mismatch when proctime function used as parameter
[ https://issues.apache.org/jira/browse/FLINK-32711?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17749412#comment-17749412 ] Aitozi commented on FLINK-32711: I pushed a fix for this, anyone can help review ? > Type mismatch when proctime function used as parameter > -- > > Key: FLINK-32711 > URL: https://issues.apache.org/jira/browse/FLINK-32711 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.18.0 >Reporter: Aitozi >Priority: Major > Labels: pull-request-available > > reproduce case: > {code:sql} > SELECT TYPEOF(PROCTIME()) > {code} > this query will fail with > org.apache.flink.table.planner.codegen.CodeGenException: Mismatch of > function's argument data type 'TIMESTAMP_LTZ(3) NOT NULL' and actual argument > type 'TIMESTAMP_LTZ(3)'. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-32711) Type mismatch when proctime function used as parameter
[ https://issues.apache.org/jira/browse/FLINK-32711?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aitozi updated FLINK-32711: --- Issue Type: Bug (was: Improvement) > Type mismatch when proctime function used as parameter > -- > > Key: FLINK-32711 > URL: https://issues.apache.org/jira/browse/FLINK-32711 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.18.0 >Reporter: Aitozi >Priority: Major > > reproduce case: > {code:sql} > SELECT TYPEOF(PROCTIME()) > {code} > this query will fail with > org.apache.flink.table.planner.codegen.CodeGenException: Mismatch of > function's argument data type 'TIMESTAMP_LTZ(3) NOT NULL' and actual argument > type 'TIMESTAMP_LTZ(3)'. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-32711) Type mismatch when proctime function used as parameter
[ https://issues.apache.org/jira/browse/FLINK-32711?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17748626#comment-17748626 ] Aitozi commented on FLINK-32711: It's caused by the hard coded nullable result type of PROCTIME_MATERIALIZE codegen. > Type mismatch when proctime function used as parameter > -- > > Key: FLINK-32711 > URL: https://issues.apache.org/jira/browse/FLINK-32711 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Planner >Affects Versions: 1.18.0 >Reporter: Aitozi >Priority: Major > > reproduce case: > {code:sql} > SELECT TYPEOF(PROCTIME()) > {code} > this query will fail with > org.apache.flink.table.planner.codegen.CodeGenException: Mismatch of > function's argument data type 'TIMESTAMP_LTZ(3) NOT NULL' and actual argument > type 'TIMESTAMP_LTZ(3)'. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-32711) Type mismatch when proctime function used as parameter
Aitozi created FLINK-32711: -- Summary: Type mismatch when proctime function used as parameter Key: FLINK-32711 URL: https://issues.apache.org/jira/browse/FLINK-32711 Project: Flink Issue Type: Improvement Components: Table SQL / Planner Affects Versions: 1.18.0 Reporter: Aitozi reproduce case: {code:sql} SELECT TYPEOF(PROCTIME()) {code} this query will fail with org.apache.flink.table.planner.codegen.CodeGenException: Mismatch of function's argument data type 'TIMESTAMP_LTZ(3) NOT NULL' and actual argument type 'TIMESTAMP_LTZ(3)'. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-32659) DB connection may leak if exception is thrown in JdbcOutputFormat#close
Aitozi created FLINK-32659: -- Summary: DB connection may leak if exception is thrown in JdbcOutputFormat#close Key: FLINK-32659 URL: https://issues.apache.org/jira/browse/FLINK-32659 Project: Flink Issue Type: Improvement Components: Connectors / JDBC Reporter: Aitozi -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-32320) Same correlate can not be reused due to the different correlationId
[ https://issues.apache.org/jira/browse/FLINK-32320?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17731832#comment-17731832 ] Aitozi commented on FLINK-32320: [~libenchao] Thanks for your attention. I just made a quick fix on calcite side when creating a new correlationId. If in the same scope and same identifier, using the same correlationId as before. It works as expected. What do you think of this solution ? > Same correlate can not be reused due to the different correlationId > --- > > Key: FLINK-32320 > URL: https://issues.apache.org/jira/browse/FLINK-32320 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Planner >Reporter: Aitozi >Priority: Major > > As describe in SubplanReuserTest > {code:java} > @Test > def testSubplanReuseOnCorrelate(): Unit = { > util.addFunction("str_split", new StringSplit()) > val sqlQuery = > """ > |WITH r AS (SELECT a, b, c, v FROM x, LATERAL TABLE(str_split(c, > '-')) AS T(v)) > |SELECT * FROM r r1, r r2 WHERE r1.v = r2.v > """.stripMargin > // TODO the sub-plan of Correlate should be reused, > // however the digests of Correlates are different > util.verifyExecPlan(sqlQuery) > } > {code} > This will produce the plan > {code:java} > HashJoin(joinType=[InnerJoin], where=[(f0 = f00)], select=[a, b, c, f0, a0, > b0, c0, f00], build=[right]) > :- Exchange(distribution=[hash[f0]]) > : +- Correlate(invocation=[str_split($cor0.c, _UTF-16LE'-')], > correlate=[table(str_split($cor0.c,'-'))], select=[a,b,c,f0], > rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, > VARCHAR(2147483647) f0)], joinType=[INNER]) > : +- LegacyTableSourceScan(table=[[default_catalog, default_database, x, > source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) > +- Exchange(distribution=[hash[f0]]) >+- Correlate(invocation=[str_split($cor1.c, _UTF-16LE'-')], > correlate=[table(str_split($cor1.c,'-'))], select=[a,b,c,f0], > rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, > VARCHAR(2147483647) f0)], joinType=[INNER]) > +- LegacyTableSourceScan(table=[[default_catalog, default_database, x, > source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) > {code} > The Correlate node can not be reused due to the different correlation id. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-32320) Same correlate can not be reused due to the different correlationId
[ https://issues.apache.org/jira/browse/FLINK-32320?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17731823#comment-17731823 ] Aitozi commented on FLINK-32320: In production, multi sink job are very common, if the table from UDTF is queried multi times, it will cause the function to be executed multi times(as shown in the plan). This will lead to bad performance. After some research, I think it's should be caused by: During sqlToRel process, the table function sqlNode will be `toRel` multi times and leads to different correlationId. > Same correlate can not be reused due to the different correlationId > --- > > Key: FLINK-32320 > URL: https://issues.apache.org/jira/browse/FLINK-32320 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Planner >Reporter: Aitozi >Priority: Major > > As describe in SubplanReuserTest > {code:java} > @Test > def testSubplanReuseOnCorrelate(): Unit = { > util.addFunction("str_split", new StringSplit()) > val sqlQuery = > """ > |WITH r AS (SELECT a, b, c, v FROM x, LATERAL TABLE(str_split(c, > '-')) AS T(v)) > |SELECT * FROM r r1, r r2 WHERE r1.v = r2.v > """.stripMargin > // TODO the sub-plan of Correlate should be reused, > // however the digests of Correlates are different > util.verifyExecPlan(sqlQuery) > } > {code} > This will produce the plan > {code:java} > HashJoin(joinType=[InnerJoin], where=[(f0 = f00)], select=[a, b, c, f0, a0, > b0, c0, f00], build=[right]) > :- Exchange(distribution=[hash[f0]]) > : +- Correlate(invocation=[str_split($cor0.c, _UTF-16LE'-')], > correlate=[table(str_split($cor0.c,'-'))], select=[a,b,c,f0], > rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, > VARCHAR(2147483647) f0)], joinType=[INNER]) > : +- LegacyTableSourceScan(table=[[default_catalog, default_database, x, > source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) > +- Exchange(distribution=[hash[f0]]) >+- Correlate(invocation=[str_split($cor1.c, _UTF-16LE'-')], > correlate=[table(str_split($cor1.c,'-'))], select=[a,b,c,f0], > rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, > VARCHAR(2147483647) f0)], joinType=[INNER]) > +- LegacyTableSourceScan(table=[[default_catalog, default_database, x, > source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) > {code} > The Correlate node can not be reused due to the different correlation id. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-32320) Same correlate can not be reused due to the different correlationId
Aitozi created FLINK-32320: -- Summary: Same correlate can not be reused due to the different correlationId Key: FLINK-32320 URL: https://issues.apache.org/jira/browse/FLINK-32320 Project: Flink Issue Type: Improvement Components: Table SQL / Planner Reporter: Aitozi As describe in SubplanReuserTest {code:java} @Test def testSubplanReuseOnCorrelate(): Unit = { util.addFunction("str_split", new StringSplit()) val sqlQuery = """ |WITH r AS (SELECT a, b, c, v FROM x, LATERAL TABLE(str_split(c, '-')) AS T(v)) |SELECT * FROM r r1, r r2 WHERE r1.v = r2.v """.stripMargin // TODO the sub-plan of Correlate should be reused, // however the digests of Correlates are different util.verifyExecPlan(sqlQuery) } {code} This will produce the plan {code:java} HashJoin(joinType=[InnerJoin], where=[(f0 = f00)], select=[a, b, c, f0, a0, b0, c0, f00], build=[right]) :- Exchange(distribution=[hash[f0]]) : +- Correlate(invocation=[str_split($cor0.c, _UTF-16LE'-')], correlate=[table(str_split($cor0.c,'-'))], select=[a,b,c,f0], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) f0)], joinType=[INNER]) : +- LegacyTableSourceScan(table=[[default_catalog, default_database, x, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) +- Exchange(distribution=[hash[f0]]) +- Correlate(invocation=[str_split($cor1.c, _UTF-16LE'-')], correlate=[table(str_split($cor1.c,'-'))], select=[a,b,c,f0], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) f0)], joinType=[INNER]) +- LegacyTableSourceScan(table=[[default_catalog, default_database, x, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) {code} The Correlate node can not be reused due to the different correlation id. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-31704) Pulsar docs should be pulled from dedicated branch
[ https://issues.apache.org/jira/browse/FLINK-31704?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17730162#comment-17730162 ] Aitozi commented on FLINK-31704: thanks, will try it. > Pulsar docs should be pulled from dedicated branch > -- > > Key: FLINK-31704 > URL: https://issues.apache.org/jira/browse/FLINK-31704 > Project: Flink > Issue Type: Technical Debt > Components: Connectors / Pulsar, Documentation >Affects Versions: 1.17.0 >Reporter: Danny Cranmer >Assignee: Weijie Guo >Priority: Major > Labels: pull-request-available > Fix For: 1.18.0, 1.17.1 > > > Pulsar docs are pulled from the {{main}} > [branch|https://github.com/apache/flink/blob/release-1.17/docs/setup_docs.sh#L49]. > This is dangerous for final versions since we may include features in the > docs that are not supported. Update Pulsar to pull from a dedicated branch or > tag. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-31704) Pulsar docs should be pulled from dedicated branch
[ https://issues.apache.org/jira/browse/FLINK-31704?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17730133#comment-17730133 ] Aitozi commented on FLINK-31704: Hi [~Weijie Guo] [~dannycranmer] I have a CI failed with missing branch https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=49737=logs=c5d67f7d-375d-5407-4743-f9d0c4436a81=38411795-40c9-51fa-10b0-bd083cf9f5a5=27 The target 3.0.0-docs do not exists in the flink-connector-pulsar. Does the branch name should be change ? > Pulsar docs should be pulled from dedicated branch > -- > > Key: FLINK-31704 > URL: https://issues.apache.org/jira/browse/FLINK-31704 > Project: Flink > Issue Type: Technical Debt > Components: Connectors / Pulsar, Documentation >Affects Versions: 1.17.0 >Reporter: Danny Cranmer >Assignee: Weijie Guo >Priority: Major > Labels: pull-request-available > Fix For: 1.18.0, 1.17.1 > > > Pulsar docs are pulled from the {{main}} > [branch|https://github.com/apache/flink/blob/release-1.17/docs/setup_docs.sh#L49]. > This is dangerous for final versions since we may include features in the > docs that are not supported. Update Pulsar to pull from a dedicated branch or > tag. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-32249) A Java string should be used instead of a Calcite NlsString to construct the table and column comment attributes of CatalogTable
[ https://issues.apache.org/jira/browse/FLINK-32249?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17729175#comment-17729175 ] Aitozi commented on FLINK-32249: It does not reproduce in my local sql client {code:java} Flink SQL> CREATE TABLE s1 ( > order_idSTRING comment '测试中文', > price DECIMAL(32,2) comment _utf8'测试_utf8中文', > currencySTRING, > order_time TIMESTAMP(3) > ) comment '测试中文table comment' WITH ('connector'='dategen'); [INFO] Execute statement succeed. Flink SQL> > show create table s1; +-+ | result | +-+ | CREATE TABLE `default_catalog`.`default_database`.`s1` ( `order_id` VARCHAR(2147483647) COMMENT '测试中文', `price` DECIMAL(32, 2) COMMENT '测试_utf8中文', `currency` VARCHAR(2147483647), `order_time` TIMESTAMP(3) ) COMMENT '测试中文table comment' WITH ( 'connector' = 'dategen' ) | +-+ 1 row in set Flink SQL> desc s1; +++--+-++---+---+ | name | type | null | key | extras | watermark | comment | +++--+-++---+---+ | order_id | STRING | TRUE | || | 测试中文 | | price | DECIMAL(32, 2) | TRUE | || | 测试_utf8中文 | | currency | STRING | TRUE | || | | | order_time | TIMESTAMP(3) | TRUE | || | | +++--+-++---+---+ 4 rows in set Flink SQL> {code} > A Java string should be used instead of a Calcite NlsString to construct the > table and column comment attributes of CatalogTable > > > Key: FLINK-32249 > URL: https://issues.apache.org/jira/browse/FLINK-32249 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.17.1 >Reporter: lincoln lee >Priority: Major > Fix For: 1.18.0 > > > when Flink interacts with CatalogTable, it directly passes the Calcite's > NlsString comment as a string to the comment attribute of the schema and > column. In theory, a Java string should be passed here, otherwise the > CatalogTable implementers may encounter special character encoding issues, > e.g., an issue in apache paimon: > [https://github.com/apache/incubator-paimon/issues/1262] > also tested in sql-client: > {code} > Flink SQL> CREATE TABLE s1 ( > > order_id STRING comment '测试中文', > > price DECIMAL(32,2) comment _utf8'测试_utf8中文', > > currency STRING, > > order_time TIMESTAMP(3) > > ) comment '测试中文table comment' WITH ('connector'='dategen'); > [INFO] Execute statement succeed. > Flink SQL> show tables; > ++ > | table name | > ++ > | s1 | > ++ > 1 row in set > Flink SQL> desc s1; > +++--+-++---+-+ > | name | type | null | key | extras | watermark | > comment | > +++--+-++---+-+ > | order_id | STRING | TRUE | | | | > u&'\6d4b\8bd5\4e2d\6587 | > | price | DECIMAL(32, 2) | TRUE | | | | > _UTF8'测试_utf8中文 | > | currency | STRING | TRUE | | | | > | > | order_time | TIMESTAMP(3) | TRUE | | | | > | >
[jira] [Commented] (FLINK-32246) javax.management.InstanceAlreadyExistsException
[ https://issues.apache.org/jira/browse/FLINK-32246?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17728658#comment-17728658 ] Aitozi commented on FLINK-32246: where do you run this sql ? sql-client ? Can you provide a demo case to reproduce this ? > javax.management.InstanceAlreadyExistsException > --- > > Key: FLINK-32246 > URL: https://issues.apache.org/jira/browse/FLINK-32246 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.15.2 >Reporter: jeff-zou >Priority: Major > > Flink SQL throws an > exception(javax.management.InstanceAlreadyExistsException) when trying to > perform multiple sink operations on the same kafka source . > > sql example: > {code:java} > create table kafka_source() with ('connector'='kafka'); > insert into sink_table1 select * from kafka_source; > insert into sink_table2 select * from kafka_source; {code} > The Exception as below: > {code:java} > javax.management.InstanceAlreadyExistsException: > kafka.admin.client:type=app-info,id=* > > java.management/com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:436) > > java.management/com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1855) > > java.management/com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:955) > > java.management/com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:890) > > java.management/com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:320) > > java.management/com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522) > > org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:64) > > org.apache.kafka.clients.admin.KafkaAdminClient.(KafkaAdminClient.java:500) > > org.apache.kafka.clients.admin.KafkaAdminClient.createInternal(KafkaAdminClient.java:444) > org.apache.kafka.clients.admin.Admin.create(Admin.java:59) > org.apache.kafka.clients.admin.AdminClient.create(AdminClient.java:39) > > org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.getKafkaAdminClient(KafkaSourceEnumerator.java:410) > > org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.start(KafkaSourceEnumerator.java:151) > > org.apache.flink.runtime.source.coordinator.SourceCoordinator.lambda$start$1(SourceCoordinator.java:209) > > org.apache.flink.runtime.source.coordinator.SourceCoordinator.lambda$runInEventLoop$9(SourceCoordinator.java:406) > > org.apache.flink.util.ThrowableCatchingRunnable.run(ThrowableCatchingRunnable.java:40) > > java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) > java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) > > java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) > > java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) > > java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) > java.base/java.lang.Thread.run(Thread.java:829) {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-31828) List field in a POJO data stream results in table program compilation failure
[ https://issues.apache.org/jira/browse/FLINK-31828?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17725311#comment-17725311 ] Aitozi commented on FLINK-31828: Any sql guys can help verify this fix ? > List field in a POJO data stream results in table program compilation failure > - > > Key: FLINK-31828 > URL: https://issues.apache.org/jira/browse/FLINK-31828 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 1.16.1 > Environment: Java 11 > Flink 1.16.1 >Reporter: Vladimir Matveev >Priority: Major > Labels: pull-request-available > Attachments: MainPojo.java, generated-code.txt, stacktrace.txt > > > Suppose I have a POJO class like this: > {code:java} > public class Example { > private String key; > private List> values; > // getters, setters, equals+hashCode omitted > } > {code} > When a DataStream with this class is converted to a table, and some > operations are performed on it, it results in an exception which explicitly > says that I should file a ticket: > {noformat} > Caused by: org.apache.flink.api.common.InvalidProgramException: Table program > cannot be compiled. This is a bug. Please file an issue. > {noformat} > Please find the example Java code and the full stack trace attached. > From the exception and generated code it seems that Flink is upset with the > list field being treated as an array - but I cannot have an array type there > in the real code. > Also note that if I _don't_ specify the schema explicitly, it then maps the > {{values}} field to a `RAW('java.util.List', '...')` type, which also does > not work correctly and fails the job in case of even simplest operations like > printing. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-31835) DataTypeHint don't support Row>
[ https://issues.apache.org/jira/browse/FLINK-31835?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17716538#comment-17716538 ] Aitozi commented on FLINK-31835: [~jark] The PR have passed the CI and I think the current solution will not cause compatibility problem by only fix the conversion class according to the nullability when creating CollectionDataType, could you help review that ? > DataTypeHint don't support Row> > > > Key: FLINK-31835 > URL: https://issues.apache.org/jira/browse/FLINK-31835 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Affects Versions: 1.15.4 >Reporter: jeff-zou >Priority: Major > Labels: pull-request-available > > Using DataTypeHint("Row>") in a UDF gives the following error: > > {code:java} > Caused by: java.lang.ClassCastException: class [I cannot be cast to class > [Ljava.lang.Object; ([I and [Ljava.lang.Object; are in module java.base of > loader 'bootstrap') > org.apache.flink.table.data.conversion.ArrayObjectArrayConverter.toInternal(ArrayObjectArrayConverter.java:40) > org.apache.flink.table.data.conversion.DataStructureConverter.toInternalOrNull(DataStructureConverter.java:61) > org.apache.flink.table.data.conversion.RowRowConverter.toInternal(RowRowConverter.java:75) > org.apache.flink.table.data.conversion.RowRowConverter.toInternal(RowRowConverter.java:37) > org.apache.flink.table.data.conversion.DataStructureConverter.toInternalOrNull(DataStructureConverter.java:61) > StreamExecCalc$251.processElement_split9(Unknown Source) > StreamExecCalc$251.processElement(Unknown Source) > org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82) > {code} > > The function is as follows: > {code:java} > @DataTypeHint("Row>") > public Row eval() { > int[] i = new int[3]; > return Row.of(i); > } {code} > > This error is not reported when testing other simple types, so it is not an > environmental problem. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-31835) DataTypeHint don't support Row>
[ https://issues.apache.org/jira/browse/FLINK-31835?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17716209#comment-17716209 ] Aitozi commented on FLINK-31835: Yes, the reason is shown above. I have pushed a PR to try to solve this. But it needs some discussion to avoid break the compatibility. > DataTypeHint don't support Row> > > > Key: FLINK-31835 > URL: https://issues.apache.org/jira/browse/FLINK-31835 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Affects Versions: 1.15.4 >Reporter: jeff-zou >Priority: Major > Labels: pull-request-available > > Using DataTypeHint("Row>") in a UDF gives the following error: > > {code:java} > Caused by: java.lang.ClassCastException: class [I cannot be cast to class > [Ljava.lang.Object; ([I and [Ljava.lang.Object; are in module java.base of > loader 'bootstrap') > org.apache.flink.table.data.conversion.ArrayObjectArrayConverter.toInternal(ArrayObjectArrayConverter.java:40) > org.apache.flink.table.data.conversion.DataStructureConverter.toInternalOrNull(DataStructureConverter.java:61) > org.apache.flink.table.data.conversion.RowRowConverter.toInternal(RowRowConverter.java:75) > org.apache.flink.table.data.conversion.RowRowConverter.toInternal(RowRowConverter.java:37) > org.apache.flink.table.data.conversion.DataStructureConverter.toInternalOrNull(DataStructureConverter.java:61) > StreamExecCalc$251.processElement_split9(Unknown Source) > StreamExecCalc$251.processElement(Unknown Source) > org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82) > {code} > > The function is as follows: > {code:java} > @DataTypeHint("Row>") > public Row eval() { > int[] i = new int[3]; > return Row.of(i); > } {code} > > This error is not reported when testing other simple types, so it is not an > environmental problem. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-31835) DataTypeHint don't support Row>
[ https://issues.apache.org/jira/browse/FLINK-31835?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17715474#comment-17715474 ] Aitozi commented on FLINK-31835: The first thought in my mind is that the conversion class for atomic type eg: {{IntType}} should respect to the nullability. So, when a {{IntType}} copy from nullable to not null. Its default conversionClass will change from {{Integer}} to {{int}}. And in the DataType: {{AtomicType}} and {{CollectionDataType}} should also respect to the nullable and notNull call. The conversionClass of the dataType should be changed after these call. I have verified this locally, it can solve this problem, what do you think this solution [~jark] ? > DataTypeHint don't support Row> > > > Key: FLINK-31835 > URL: https://issues.apache.org/jira/browse/FLINK-31835 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Affects Versions: 1.15.4 >Reporter: jeff-zou >Priority: Major > > Using DataTypeHint("Row>") in a UDF gives the following error: > > {code:java} > Caused by: java.lang.ClassCastException: class [I cannot be cast to class > [Ljava.lang.Object; ([I and [Ljava.lang.Object; are in module java.base of > loader 'bootstrap') > org.apache.flink.table.data.conversion.ArrayObjectArrayConverter.toInternal(ArrayObjectArrayConverter.java:40) > org.apache.flink.table.data.conversion.DataStructureConverter.toInternalOrNull(DataStructureConverter.java:61) > org.apache.flink.table.data.conversion.RowRowConverter.toInternal(RowRowConverter.java:75) > org.apache.flink.table.data.conversion.RowRowConverter.toInternal(RowRowConverter.java:37) > org.apache.flink.table.data.conversion.DataStructureConverter.toInternalOrNull(DataStructureConverter.java:61) > StreamExecCalc$251.processElement_split9(Unknown Source) > StreamExecCalc$251.processElement(Unknown Source) > org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82) > {code} > > The function is as follows: > {code:java} > @DataTypeHint("Row>") > public Row eval() { > int[] i = new int[3]; > return Row.of(i); > } {code} > > This error is not reported when testing other simple types, so it is not an > environmental problem. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-31835) DataTypeHint don't support Row>
[ https://issues.apache.org/jira/browse/FLINK-31835?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17715466#comment-17715466 ] Aitozi commented on FLINK-31835: When creating Array CollectionDataType it will use the array's element type to construct an array conversion class. But the conversionClass is {{Integer}} for both {{INT NOT NULL}} and {{INT}}. So the conversion class for Array will become {{Integer[]}} {code:java} if (logicalType.getTypeRoot() == LogicalTypeRoot.ARRAY && clazz == null) { return Array.newInstance(elementDataType.getConversionClass(), 0).getClass(); } {code} > DataTypeHint don't support Row> > > > Key: FLINK-31835 > URL: https://issues.apache.org/jira/browse/FLINK-31835 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Affects Versions: 1.15.4 >Reporter: jeff-zou >Priority: Major > > Using DataTypeHint("Row>") in a UDF gives the following error: > > {code:java} > Caused by: java.lang.ClassCastException: class [I cannot be cast to class > [Ljava.lang.Object; ([I and [Ljava.lang.Object; are in module java.base of > loader 'bootstrap') > org.apache.flink.table.data.conversion.ArrayObjectArrayConverter.toInternal(ArrayObjectArrayConverter.java:40) > org.apache.flink.table.data.conversion.DataStructureConverter.toInternalOrNull(DataStructureConverter.java:61) > org.apache.flink.table.data.conversion.RowRowConverter.toInternal(RowRowConverter.java:75) > org.apache.flink.table.data.conversion.RowRowConverter.toInternal(RowRowConverter.java:37) > org.apache.flink.table.data.conversion.DataStructureConverter.toInternalOrNull(DataStructureConverter.java:61) > StreamExecCalc$251.processElement_split9(Unknown Source) > StreamExecCalc$251.processElement(Unknown Source) > org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82) > {code} > > The function is as follows: > {code:java} > @DataTypeHint("Row>") > public Row eval() { > int[] i = new int[3]; > return Row.of(i); > } {code} > > This error is not reported when testing other simple types, so it is not an > environmental problem. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-31835) DataTypeHint don't support Row>
[ https://issues.apache.org/jira/browse/FLINK-31835?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17715464#comment-17715464 ] Aitozi commented on FLINK-31835: In this case, ARRAY will still maps to {{ArrayObjectArrayConverter}}. I think it's not expected > DataTypeHint don't support Row> > > > Key: FLINK-31835 > URL: https://issues.apache.org/jira/browse/FLINK-31835 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Affects Versions: 1.15.4 >Reporter: jeff-zou >Priority: Major > > Using DataTypeHint("Row>") in a UDF gives the following error: > > {code:java} > Caused by: java.lang.ClassCastException: class [I cannot be cast to class > [Ljava.lang.Object; ([I and [Ljava.lang.Object; are in module java.base of > loader 'bootstrap') > org.apache.flink.table.data.conversion.ArrayObjectArrayConverter.toInternal(ArrayObjectArrayConverter.java:40) > org.apache.flink.table.data.conversion.DataStructureConverter.toInternalOrNull(DataStructureConverter.java:61) > org.apache.flink.table.data.conversion.RowRowConverter.toInternal(RowRowConverter.java:75) > org.apache.flink.table.data.conversion.RowRowConverter.toInternal(RowRowConverter.java:37) > org.apache.flink.table.data.conversion.DataStructureConverter.toInternalOrNull(DataStructureConverter.java:61) > StreamExecCalc$251.processElement_split9(Unknown Source) > StreamExecCalc$251.processElement(Unknown Source) > org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82) > {code} > > The function is as follows: > {code:java} > @DataTypeHint("Row>") > public Row eval() { > int[] i = new int[3]; > return Row.of(i); > } {code} > > This error is not reported when testing other simple types, so it is not an > environmental problem. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-31828) List field in a POJO data stream results in table program compilation failure
[ https://issues.apache.org/jira/browse/FLINK-31828?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17715451#comment-17715451 ] Aitozi commented on FLINK-31828: Hi [~netvl] you could also work with this in your case. {code:java} final var schema = Schema.newBuilder() .column("key", DataTypes.STRING()) .column( "values", DataTypes.ARRAY( DataTypes.MAP( DataTypes.STRING(), DataTypes.STRING())) .bridgedTo(List.class)) .build(); {code} > List field in a POJO data stream results in table program compilation failure > - > > Key: FLINK-31828 > URL: https://issues.apache.org/jira/browse/FLINK-31828 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 1.16.1 > Environment: Java 11 > Flink 1.16.1 >Reporter: Vladimir Matveev >Priority: Major > Labels: pull-request-available > Attachments: MainPojo.java, generated-code.txt, stacktrace.txt > > > Suppose I have a POJO class like this: > {code:java} > public class Example { > private String key; > private List> values; > // getters, setters, equals+hashCode omitted > } > {code} > When a DataStream with this class is converted to a table, and some > operations are performed on it, it results in an exception which explicitly > says that I should file a ticket: > {noformat} > Caused by: org.apache.flink.api.common.InvalidProgramException: Table program > cannot be compiled. This is a bug. Please file an issue. > {noformat} > Please find the example Java code and the full stack trace attached. > From the exception and generated code it seems that Flink is upset with the > list field being treated as an array - but I cannot have an array type there > in the real code. > Also note that if I _don't_ specify the schema explicitly, it then maps the > {{values}} field to a `RAW('java.util.List', '...')` type, which also does > not work correctly and fails the job in case of even simplest operations like > printing. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-31828) List field in a POJO data stream results in table program compilation failure
[ https://issues.apache.org/jira/browse/FLINK-31828?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17715403#comment-17715403 ] Aitozi commented on FLINK-31828: [~twalthr] could help review this fix :) ? Since I see you help review the last commit for this part > List field in a POJO data stream results in table program compilation failure > - > > Key: FLINK-31828 > URL: https://issues.apache.org/jira/browse/FLINK-31828 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 1.16.1 > Environment: Java 11 > Flink 1.16.1 >Reporter: Vladimir Matveev >Priority: Major > Labels: pull-request-available > Attachments: MainPojo.java, generated-code.txt, stacktrace.txt > > > Suppose I have a POJO class like this: > {code:java} > public class Example { > private String key; > private List> values; > // getters, setters, equals+hashCode omitted > } > {code} > When a DataStream with this class is converted to a table, and some > operations are performed on it, it results in an exception which explicitly > says that I should file a ticket: > {noformat} > Caused by: org.apache.flink.api.common.InvalidProgramException: Table program > cannot be compiled. This is a bug. Please file an issue. > {noformat} > Please find the example Java code and the full stack trace attached. > From the exception and generated code it seems that Flink is upset with the > list field being treated as an array - but I cannot have an array type there > in the real code. > Also note that if I _don't_ specify the schema explicitly, it then maps the > {{values}} field to a `RAW('java.util.List', '...')` type, which also does > not work correctly and fails the job in case of even simplest operations like > printing. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-31828) List field in a POJO data stream results in table program compilation failure
[ https://issues.apache.org/jira/browse/FLINK-31828?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17715356#comment-17715356 ] Aitozi commented on FLINK-31828: I have pushed a fix for it. Before this fix the demo will fails with EOFException when print the raw type (as shown in the pull request) After this, in this example, user can define the schema as below. Then {code:java} final var schema = Schema.newBuilder() .column("key", DataTypes.STRING()) .column("values", DataTypes.RAW(java.util.List.class)) .build(); {code} > List field in a POJO data stream results in table program compilation failure > - > > Key: FLINK-31828 > URL: https://issues.apache.org/jira/browse/FLINK-31828 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 1.16.1 > Environment: Java 11 > Flink 1.16.1 >Reporter: Vladimir Matveev >Priority: Major > Labels: pull-request-available > Attachments: MainPojo.java, generated-code.txt, stacktrace.txt > > > Suppose I have a POJO class like this: > {code:java} > public class Example { > private String key; > private List> values; > // getters, setters, equals+hashCode omitted > } > {code} > When a DataStream with this class is converted to a table, and some > operations are performed on it, it results in an exception which explicitly > says that I should file a ticket: > {noformat} > Caused by: org.apache.flink.api.common.InvalidProgramException: Table program > cannot be compiled. This is a bug. Please file an issue. > {noformat} > Please find the example Java code and the full stack trace attached. > From the exception and generated code it seems that Flink is upset with the > list field being treated as an array - but I cannot have an array type there > in the real code. > Also note that if I _don't_ specify the schema explicitly, it then maps the > {{values}} field to a `RAW('java.util.List', '...')` type, which also does > not work correctly and fails the job in case of even simplest operations like > printing. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-31828) List field in a POJO data stream results in table program compilation failure
[ https://issues.apache.org/jira/browse/FLINK-31828?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17715111#comment-17715111 ] Aitozi commented on FLINK-31828: Hi [~netvl] Thanks for this detailed bug report. I have reproduced your problem. And I also spend some time to dig the way to use RAW type to declare the List type in your case. I found that there's a bug in the cast rule (using the wrong serializer), so it will fail with EOF exception as you mentioned (hope it's the same error with you). I will prepare a PR to solve this bug > List field in a POJO data stream results in table program compilation failure > - > > Key: FLINK-31828 > URL: https://issues.apache.org/jira/browse/FLINK-31828 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 1.16.1 > Environment: Java 11 > Flink 1.16.1 >Reporter: Vladimir Matveev >Priority: Major > Attachments: MainPojo.java, generated-code.txt, stacktrace.txt > > > Suppose I have a POJO class like this: > {code:java} > public class Example { > private String key; > private List> values; > // getters, setters, equals+hashCode omitted > } > {code} > When a DataStream with this class is converted to a table, and some > operations are performed on it, it results in an exception which explicitly > says that I should file a ticket: > {noformat} > Caused by: org.apache.flink.api.common.InvalidProgramException: Table program > cannot be compiled. This is a bug. Please file an issue. > {noformat} > Please find the example Java code and the full stack trace attached. > From the exception and generated code it seems that Flink is upset with the > list field being treated as an array - but I cannot have an array type there > in the real code. > Also note that if I _don't_ specify the schema explicitly, it then maps the > {{values}} field to a `RAW('java.util.List', '...')` type, which also does > not work correctly and fails the job in case of even simplest operations like > printing. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-29692) Support early/late fires for Windowing TVFs
[ https://issues.apache.org/jira/browse/FLINK-29692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17714815#comment-17714815 ] Aitozi edited comment on FLINK-29692 at 4/21/23 4:12 AM: - hi, sorry for jumping into this discussion. I want to share two thoughts about this feature. * As [~jark] mentioned, in the group window aggregation the early/late fire has been supported. Window TVF as a feature-rich version of group window aggregation, so I think this ability should be aligned. * There is difference between the early/late fire and cumulate window tvf. early/late fire is something relates to the window trigger. But cumulative window relates to window assigner. We could use cumulative window to simulate the same functionality, but it may bring overhead as [~charles-tan] said. So, +1 from my side to support emit strategy(early/late fire) in window TVF. was (Author: aitozi): hi, sorry for jumping into this discussion. I want to share two thoughts about this feature. * As [~jark] mentioned, in the group window aggregation the early/late fire has been supported. Window TVF as a feature-rich version of group window aggregation, so I think this ability should be aligned. * There is difference between the early/late fire and cumulate window tvf. early/late fire is something relates to the window trigger. But cumulative window relates to window assigner. We could use cumulative window to simulate the same functionality, but it may bring overhead as [~charles-tan] said. So, +1 from my side to support emit strategy in window TVF. > Support early/late fires for Windowing TVFs > --- > > Key: FLINK-29692 > URL: https://issues.apache.org/jira/browse/FLINK-29692 > Project: Flink > Issue Type: New Feature > Components: Table SQL / Planner >Affects Versions: 1.15.3 >Reporter: Canope Nerda >Priority: Major > > I have cases where I need to 1) output data as soon as possible and 2) handle > late arriving data to achieve eventual correctness. In the logic, I need to > do window deduplication which is based on Windowing TVFs and according to > source code, early/late fires are not supported yet in Windowing TVFs. > Actually 1) contradicts with 2). Without early/late fires, we had to > compromise, either live with fresh incorrect data or tolerate excess latency > for correctness. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-29692) Support early/late fires for Windowing TVFs
[ https://issues.apache.org/jira/browse/FLINK-29692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17714815#comment-17714815 ] Aitozi commented on FLINK-29692: hi, sorry for jumping into this discussion. I want to share two thoughts about this feature. * As [~jark] mentioned, in the group window aggregation the early/late fire has been supported. Window TVF as a feature-rich version of group window aggregation, so I think this ability should be aligned. * There is difference between the early/late fire and cumulate window tvf. early/late fire is something relates to the window trigger. But cumulative window relates to window assigner. We could use cumulative window to simulate the same functionality, but it may bring overhead as [~charles-tan] said. So, +1 from my side to support emit strategy in window TVF. > Support early/late fires for Windowing TVFs > --- > > Key: FLINK-29692 > URL: https://issues.apache.org/jira/browse/FLINK-29692 > Project: Flink > Issue Type: New Feature > Components: Table SQL / Planner >Affects Versions: 1.15.3 >Reporter: Canope Nerda >Priority: Major > > I have cases where I need to 1) output data as soon as possible and 2) handle > late arriving data to achieve eventual correctness. In the logic, I need to > do window deduplication which is based on Windowing TVFs and according to > source code, early/late fires are not supported yet in Windowing TVFs. > Actually 1) contradicts with 2). Without early/late fires, we had to > compromise, either live with fresh incorrect data or tolerate excess latency > for correctness. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-31755) ROW function can not work with RewriteIntersectAllRule
[ https://issues.apache.org/jira/browse/FLINK-31755?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17713864#comment-17713864 ] Aitozi commented on FLINK-31755: CC [~lincoln.86xy] [~snuyanzin] > ROW function can not work with RewriteIntersectAllRule > -- > > Key: FLINK-31755 > URL: https://issues.apache.org/jira/browse/FLINK-31755 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Reporter: Aitozi >Priority: Major > Labels: pull-request-available > > Reproduce case: > {code:java} > create table row_sink ( > `b` ROW > ) with ( > 'connector' = 'values' > ) > util.verifyRelPlanInsert( > "INSERT INTO row_sink " + > "SELECT ROW(a, b) FROM complex_type_src intersect all " + > "SELECT ROW(c, d) FROM complex_type_src ") > {code} > It will fails with > {code:java} > Caused by: java.lang.IllegalArgumentException: Type mismatch: > rel rowtype: RecordType(RecordType:peek_no_expand(VARCHAR(2147483647) > CHARACTER SET "UTF-16LE" EXPR$0, INTEGER EXPR$1) NOT NULL EXPR$0) NOT NULL > equiv rowtype: RecordType(RecordType(VARCHAR(2147483647) CHARACTER SET > "UTF-16LE" EXPR$0, INTEGER EXPR$1) NOT NULL EXPR$0) NOT NULL > Difference: > EXPR$0: RecordType:peek_no_expand(VARCHAR(2147483647) CHARACTER SET > "UTF-16LE" EXPR$0, INTEGER EXPR$1) NOT NULL -> RecordType(VARCHAR(2147483647) > CHARACTER SET "UTF-16LE" EXPR$0, INTEGER EXPR$1) NOT NULL > at > org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:592) > at > org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:613) > at > org.apache.calcite.plan.volcano.VolcanoRuleCall.transformTo(VolcanoRuleCall.java:144) > ... 68 more > {code} > The reason is: > ROW function will generates the {{FULLY_QUALIFIED}} type. But after the > {{RewriteIntersectAllRule}} optimization, it will produce the > {{PEEK_FIELDS_NO_EXPAND}}. So the volcano planner complains with type > mismatch. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-31774) Add document for delete and update statement
[ https://issues.apache.org/jira/browse/FLINK-31774?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17712195#comment-17712195 ] Aitozi commented on FLINK-31774: Hi [~luoyuxia] The pr is ready for review, please take a look when you are free. > Add document for delete and update statement > > > Key: FLINK-31774 > URL: https://issues.apache.org/jira/browse/FLINK-31774 > Project: Flink > Issue Type: Sub-task > Components: Documentation >Reporter: Aitozi >Assignee: Aitozi >Priority: Major > Labels: pull-request-available > > I do not find the declaration about the usage of DELETE and UPDATE statement > in the SQL section. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-31790) Filesystem batch sink should also respect to the PartitionCommitPolicy
[ https://issues.apache.org/jira/browse/FLINK-31790?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17711683#comment-17711683 ] Aitozi commented on FLINK-31790: CC [~luoyuxia] > Filesystem batch sink should also respect to the PartitionCommitPolicy > -- > > Key: FLINK-31790 > URL: https://issues.apache.org/jira/browse/FLINK-31790 > Project: Flink > Issue Type: Improvement > Components: Connectors / FileSystem >Reporter: Aitozi >Priority: Major > Labels: pull-request-available > > Currently, the {{PartitionCommitPolicy}} only take effect in the streaming > file sink and hive file sink. The filesystem sink in batch mode should also > respect to the commit policy -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-31790) Filesystem batch sink should also respect to the PartitionCommitPolicy
Aitozi created FLINK-31790: -- Summary: Filesystem batch sink should also respect to the PartitionCommitPolicy Key: FLINK-31790 URL: https://issues.apache.org/jira/browse/FLINK-31790 Project: Flink Issue Type: Improvement Components: Connectors / FileSystem Reporter: Aitozi Currently, the {{PartitionCommitPolicy}} only take effect in the streaming file sink and hive file sink. The filesystem sink in batch mode should also respect to the commit policy -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-31787) Add the explicit ROW constructor to the system function doc
[ https://issues.apache.org/jira/browse/FLINK-31787?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17711427#comment-17711427 ] Aitozi commented on FLINK-31787: see discussion in this ticket: https://issues.apache.org/jira/browse/FLINK-18027 > Add the explicit ROW constructor to the system function doc > --- > > Key: FLINK-31787 > URL: https://issues.apache.org/jira/browse/FLINK-31787 > Project: Flink > Issue Type: Improvement > Components: Documentation >Reporter: Aitozi >Priority: Minor > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-31787) Add the explicit ROW constructor to the system function doc
Aitozi created FLINK-31787: -- Summary: Add the explicit ROW constructor to the system function doc Key: FLINK-31787 URL: https://issues.apache.org/jira/browse/FLINK-31787 Project: Flink Issue Type: Improvement Components: Documentation Reporter: Aitozi -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-31344) Support to update nested columns in update statement
[ https://issues.apache.org/jira/browse/FLINK-31344?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17711306#comment-17711306 ] Aitozi commented on FLINK-31344: [~luoyuxia] I have finished this ticket, could you help take a look when you are free ? > Support to update nested columns in update statement > > > Key: FLINK-31344 > URL: https://issues.apache.org/jira/browse/FLINK-31344 > Project: Flink > Issue Type: Improvement > Components: Table SQL / API >Reporter: luoyuxia >Assignee: Aitozi >Priority: Major > Labels: pull-request-available > > Currently, it'll throw exception while using update statement to update > nested column; > For the following sql: > > {code:java} > create table (t ROW<`a` INT>) with (xxx); > update t set s.a = 1;{code} > It'll throw the exception: > {code:java} > Caused by: org.apache.flink.sql.parser.impl.ParseException: Encountered "." > at line 1, column 15. > Was expecting: > "=" ... > > at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.generateParseException(FlinkSqlParserImpl.java:46382) > at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.jj_consume_token(FlinkSqlParserImpl.java:46190) > at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlUpdate(FlinkSqlParserImpl.java:14389) > at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlStmt(FlinkSqlParserImpl.java:4121) > at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlStmtList(FlinkSqlParserImpl.java:2998) > at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.parseSqlStmtList(FlinkSqlParserImpl.java:306) > at > org.apache.calcite.sql.parser.SqlParser.parseStmtList(SqlParser.java:198) > ... 33 more {code} > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-31533) CREATE TABLE AS SELECT should support to define partition
[ https://issues.apache.org/jira/browse/FLINK-31533?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17711265#comment-17711265 ] Aitozi commented on FLINK-31533: After some trying, I find it's seems easy to support this feature. It doesn't have to touch the syntax part. Because the CTAS reuse/(part of) the SqlCreateTable syntax. So the sql below can be parsed already {code:java} CREATE TABLE MyCtasTable PARTITIONED BY (`a`) WITH ( 'connector' = 'filesystem', 'format' = 'testcsv', ) AS SELECT * FROM MyTable{code} We can't define this because it's manually banned in {{SqlCreateTableAs#validate}} In my poc, it just need some minor change to support the partition by definition and do not have to touch the public api/syntax. Do you think we still needs a FLIP for this or not [~luoyuxia] ? > CREATE TABLE AS SELECT should support to define partition > - > > Key: FLINK-31533 > URL: https://issues.apache.org/jira/browse/FLINK-31533 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / API >Reporter: luoyuxia >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] (FLINK-31533) CREATE TABLE AS SELECT should support to define partition
[ https://issues.apache.org/jira/browse/FLINK-31533 ] Aitozi deleted comment on FLINK-31533: was (Author: aitozi): Yes, it's indeed need to revisit the parser of the ctas syntax. I will draw a POC for this and open a discussion in dev mailing > CREATE TABLE AS SELECT should support to define partition > - > > Key: FLINK-31533 > URL: https://issues.apache.org/jira/browse/FLINK-31533 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / API >Reporter: luoyuxia >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-31533) CREATE TABLE AS SELECT should support to define partition
[ https://issues.apache.org/jira/browse/FLINK-31533?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17711203#comment-17711203 ] Aitozi commented on FLINK-31533: Yes, it's indeed need to revisit the parser of the ctas syntax. I will draw a POC for this and open a discussion in dev mailing > CREATE TABLE AS SELECT should support to define partition > - > > Key: FLINK-31533 > URL: https://issues.apache.org/jira/browse/FLINK-31533 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / API >Reporter: luoyuxia >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-18027) ROW value constructor cannot deal with complex expressions
[ https://issues.apache.org/jira/browse/FLINK-18027?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17711200#comment-17711200 ] Aitozi edited comment on FLINK-18027 at 4/12/23 5:43 AM: - The content is modified from [https://github.com/apache/flink/blob/177310ebe3d552ec71a1f1f97e0207cf30b6efed/docs/dev/table/functions/systemFunctions.md] to [https://github.com/apache/flink/blob/a5372e0c92c7a0f465beba5e5204b07769cd92e6/docs/data/sql_functions.yml] I think the content below is removed accidently. CC [~sjwiesman] {code:java} -- explicit ROW constructor ROW(value1 [, value2]*) {% endhighlight %} {code} I think we should add this back, since the explicit ROW constructor's limitation has been solved was (Author: aitozi): The content is modified from [https://github.com/apache/flink/blob/177310ebe3d552ec71a1f1f97e0207cf30b6efed/docs/dev/table/functions/systemFunctions.md] to [https://github.com/apache/flink/blob/a5372e0c92c7a0f465beba5e5204b07769cd92e6/docs/data/sql_functions.yml] I think the content below is removed accidently. {code:java} -- explicit ROW constructor ROW(value1 [, value2]*) {% endhighlight %} {code} I think we should add this back, since the explicit ROW constructor's limitation has been solved > ROW value constructor cannot deal with complex expressions > -- > > Key: FLINK-18027 > URL: https://issues.apache.org/jira/browse/FLINK-18027 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Reporter: Benchao Li >Priority: Not a Priority > Labels: auto-deprioritized-major, auto-deprioritized-minor, > pull-request-available > > {code:java} > create table my_source ( > my_row row > ) with (...); > create table my_sink ( > my_row row > ) with (...); > insert into my_sink > select ROW(my_row.a, my_row.b) > from my_source;{code} > will throw excepions: > {code:java} > Exception in thread "main" org.apache.flink.table.api.SqlParserException: SQL > parse failed. Encountered "." at line 1, column 18.Exception in thread "main" > org.apache.flink.table.api.SqlParserException: SQL parse failed. Encountered > "." at line 1, column 18.Was expecting one of: ")" ... "," ... at > org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:56) > at > org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:64) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:627) > at com.bytedance.demo.KafkaTableSource.main(KafkaTableSource.java:76)Caused > by: org.apache.calcite.sql.parser.SqlParseException: Encountered "." at line > 1, column 18.Was expecting one of: ")" ... "," ... at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.convertException(FlinkSqlParserImpl.java:416) > at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.normalizeException(FlinkSqlParserImpl.java:201) > at > org.apache.calcite.sql.parser.SqlParser.handleException(SqlParser.java:148) > at org.apache.calcite.sql.parser.SqlParser.parseQuery(SqlParser.java:163) at > org.apache.calcite.sql.parser.SqlParser.parseStmt(SqlParser.java:188) at > org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:54) > ... 3 moreCaused by: org.apache.flink.sql.parser.impl.ParseException: > Encountered "." at line 1, column 18.Was expecting one of: ")" ... "," > ... at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.generateParseException(FlinkSqlParserImpl.java:36161) > at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.jj_consume_token(FlinkSqlParserImpl.java:35975) > at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.ParenthesizedSimpleIdentifierList(FlinkSqlParserImpl.java:21432) > at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.Expression3(FlinkSqlParserImpl.java:17164) > at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.Expression2b(FlinkSqlParserImpl.java:16820) > at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.Expression2(FlinkSqlParserImpl.java:16861) > at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.Expression(FlinkSqlParserImpl.java:16792) > at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SelectExpression(FlinkSqlParserImpl.java:11091) > at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SelectItem(FlinkSqlParserImpl.java:10293) > at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SelectList(FlinkSqlParserImpl.java:10267) > at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlSelect(FlinkSqlParserImpl.java:6943) > at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.LeafQuery(FlinkSqlParserImpl.java:658) > at >
[jira] [Commented] (FLINK-18027) ROW value constructor cannot deal with complex expressions
[ https://issues.apache.org/jira/browse/FLINK-18027?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17711200#comment-17711200 ] Aitozi commented on FLINK-18027: The content is modified from [https://github.com/apache/flink/blob/177310ebe3d552ec71a1f1f97e0207cf30b6efed/docs/dev/table/functions/systemFunctions.md] to [https://github.com/apache/flink/blob/a5372e0c92c7a0f465beba5e5204b07769cd92e6/docs/data/sql_functions.yml] I think the content below is removed accidently. {code:java} -- explicit ROW constructor ROW(value1 [, value2]*) {% endhighlight %} {code} I think we should add this back, since the explicit ROW constructor's limitation has been solved > ROW value constructor cannot deal with complex expressions > -- > > Key: FLINK-18027 > URL: https://issues.apache.org/jira/browse/FLINK-18027 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Reporter: Benchao Li >Priority: Not a Priority > Labels: auto-deprioritized-major, auto-deprioritized-minor, > pull-request-available > > {code:java} > create table my_source ( > my_row row > ) with (...); > create table my_sink ( > my_row row > ) with (...); > insert into my_sink > select ROW(my_row.a, my_row.b) > from my_source;{code} > will throw excepions: > {code:java} > Exception in thread "main" org.apache.flink.table.api.SqlParserException: SQL > parse failed. Encountered "." at line 1, column 18.Exception in thread "main" > org.apache.flink.table.api.SqlParserException: SQL parse failed. Encountered > "." at line 1, column 18.Was expecting one of: ")" ... "," ... at > org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:56) > at > org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:64) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:627) > at com.bytedance.demo.KafkaTableSource.main(KafkaTableSource.java:76)Caused > by: org.apache.calcite.sql.parser.SqlParseException: Encountered "." at line > 1, column 18.Was expecting one of: ")" ... "," ... at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.convertException(FlinkSqlParserImpl.java:416) > at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.normalizeException(FlinkSqlParserImpl.java:201) > at > org.apache.calcite.sql.parser.SqlParser.handleException(SqlParser.java:148) > at org.apache.calcite.sql.parser.SqlParser.parseQuery(SqlParser.java:163) at > org.apache.calcite.sql.parser.SqlParser.parseStmt(SqlParser.java:188) at > org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:54) > ... 3 moreCaused by: org.apache.flink.sql.parser.impl.ParseException: > Encountered "." at line 1, column 18.Was expecting one of: ")" ... "," > ... at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.generateParseException(FlinkSqlParserImpl.java:36161) > at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.jj_consume_token(FlinkSqlParserImpl.java:35975) > at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.ParenthesizedSimpleIdentifierList(FlinkSqlParserImpl.java:21432) > at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.Expression3(FlinkSqlParserImpl.java:17164) > at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.Expression2b(FlinkSqlParserImpl.java:16820) > at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.Expression2(FlinkSqlParserImpl.java:16861) > at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.Expression(FlinkSqlParserImpl.java:16792) > at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SelectExpression(FlinkSqlParserImpl.java:11091) > at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SelectItem(FlinkSqlParserImpl.java:10293) > at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SelectList(FlinkSqlParserImpl.java:10267) > at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlSelect(FlinkSqlParserImpl.java:6943) > at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.LeafQuery(FlinkSqlParserImpl.java:658) > at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.LeafQueryOrExpr(FlinkSqlParserImpl.java:16775) > at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.QueryOrExpr(FlinkSqlParserImpl.java:16238) > at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.OrderedQueryOrExpr(FlinkSqlParserImpl.java:532) > at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlStmt(FlinkSqlParserImpl.java:3761) > at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlStmtEof(FlinkSqlParserImpl.java:3800) > at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.parseSqlStmtEof(FlinkSqlParserImpl.java:248) > at
[jira] [Commented] (FLINK-18027) ROW value constructor cannot deal with complex expressions
[ https://issues.apache.org/jira/browse/FLINK-18027?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17711040#comment-17711040 ] Aitozi commented on FLINK-18027: [~libenchao] do you know why the explicit ROW construct is removed from the doc, (the git history of the file seems not trackable) Does it's not encouraged to use explicit ROW call ? If not, I think we should add it back. > ROW value constructor cannot deal with complex expressions > -- > > Key: FLINK-18027 > URL: https://issues.apache.org/jira/browse/FLINK-18027 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Reporter: Benchao Li >Priority: Not a Priority > Labels: auto-deprioritized-major, auto-deprioritized-minor, > pull-request-available > > {code:java} > create table my_source ( > my_row row > ) with (...); > create table my_sink ( > my_row row > ) with (...); > insert into my_sink > select ROW(my_row.a, my_row.b) > from my_source;{code} > will throw excepions: > {code:java} > Exception in thread "main" org.apache.flink.table.api.SqlParserException: SQL > parse failed. Encountered "." at line 1, column 18.Exception in thread "main" > org.apache.flink.table.api.SqlParserException: SQL parse failed. Encountered > "." at line 1, column 18.Was expecting one of: ")" ... "," ... at > org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:56) > at > org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:64) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:627) > at com.bytedance.demo.KafkaTableSource.main(KafkaTableSource.java:76)Caused > by: org.apache.calcite.sql.parser.SqlParseException: Encountered "." at line > 1, column 18.Was expecting one of: ")" ... "," ... at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.convertException(FlinkSqlParserImpl.java:416) > at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.normalizeException(FlinkSqlParserImpl.java:201) > at > org.apache.calcite.sql.parser.SqlParser.handleException(SqlParser.java:148) > at org.apache.calcite.sql.parser.SqlParser.parseQuery(SqlParser.java:163) at > org.apache.calcite.sql.parser.SqlParser.parseStmt(SqlParser.java:188) at > org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:54) > ... 3 moreCaused by: org.apache.flink.sql.parser.impl.ParseException: > Encountered "." at line 1, column 18.Was expecting one of: ")" ... "," > ... at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.generateParseException(FlinkSqlParserImpl.java:36161) > at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.jj_consume_token(FlinkSqlParserImpl.java:35975) > at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.ParenthesizedSimpleIdentifierList(FlinkSqlParserImpl.java:21432) > at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.Expression3(FlinkSqlParserImpl.java:17164) > at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.Expression2b(FlinkSqlParserImpl.java:16820) > at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.Expression2(FlinkSqlParserImpl.java:16861) > at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.Expression(FlinkSqlParserImpl.java:16792) > at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SelectExpression(FlinkSqlParserImpl.java:11091) > at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SelectItem(FlinkSqlParserImpl.java:10293) > at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SelectList(FlinkSqlParserImpl.java:10267) > at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlSelect(FlinkSqlParserImpl.java:6943) > at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.LeafQuery(FlinkSqlParserImpl.java:658) > at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.LeafQueryOrExpr(FlinkSqlParserImpl.java:16775) > at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.QueryOrExpr(FlinkSqlParserImpl.java:16238) > at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.OrderedQueryOrExpr(FlinkSqlParserImpl.java:532) > at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlStmt(FlinkSqlParserImpl.java:3761) > at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlStmtEof(FlinkSqlParserImpl.java:3800) > at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.parseSqlStmtEof(FlinkSqlParserImpl.java:248) > at org.apache.calcite.sql.parser.SqlParser.parseQuery(SqlParser.java:161) > ... 5 more > {code} > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-31774) Add document for delete and update statement
[ https://issues.apache.org/jira/browse/FLINK-31774?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17711033#comment-17711033 ] Aitozi commented on FLINK-31774: I think we should add the description for this two statements similar to the INSERT statement. CC [~luoyuxia] I'm willing to work on this. > Add document for delete and update statement > > > Key: FLINK-31774 > URL: https://issues.apache.org/jira/browse/FLINK-31774 > Project: Flink > Issue Type: Sub-task > Components: Documentation >Reporter: Aitozi >Priority: Major > > I do not find the declaration about the usage of DELETE and UPDATE statement > in the SQL section. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-31774) Add document for delete and update statement
[ https://issues.apache.org/jira/browse/FLINK-31774?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aitozi updated FLINK-31774: --- Parent: FLINK-30648 Issue Type: Sub-task (was: Improvement) > Add document for delete and update statement > > > Key: FLINK-31774 > URL: https://issues.apache.org/jira/browse/FLINK-31774 > Project: Flink > Issue Type: Sub-task > Components: Documentation >Reporter: Aitozi >Priority: Major > > I do not find the declaration about the usage of DELETE and UPDATE statement > in the SQL section. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-31774) Add document for delete and update statement
Aitozi created FLINK-31774: -- Summary: Add document for delete and update statement Key: FLINK-31774 URL: https://issues.apache.org/jira/browse/FLINK-31774 Project: Flink Issue Type: Improvement Components: Documentation Reporter: Aitozi I do not find the declaration about the usage of DELETE and UPDATE statement in the SQL section. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-31533) CREATE TABLE AS SELECT should support to define partition
[ https://issues.apache.org/jira/browse/FLINK-31533?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17711002#comment-17711002 ] Aitozi commented on FLINK-31533: [~luoyuxia] I think it's a useful feature, and I'd like to support this, can I take this ticket ? > CREATE TABLE AS SELECT should support to define partition > - > > Key: FLINK-31533 > URL: https://issues.apache.org/jira/browse/FLINK-31533 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / API >Reporter: luoyuxia >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-18027) ROW value constructor cannot deal with complex expressions
[ https://issues.apache.org/jira/browse/FLINK-18027?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17710866#comment-17710866 ] Aitozi commented on FLINK-18027: Now calcite version is bundled to 1.29, the complex expression for row is supported (verified). This issue can be closed now. CC [~libenchao] [~jark] > ROW value constructor cannot deal with complex expressions > -- > > Key: FLINK-18027 > URL: https://issues.apache.org/jira/browse/FLINK-18027 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Reporter: Benchao Li >Priority: Not a Priority > Labels: auto-deprioritized-major, auto-deprioritized-minor, > pull-request-available > > {code:java} > create table my_source ( > my_row row > ) with (...); > create table my_sink ( > my_row row > ) with (...); > insert into my_sink > select ROW(my_row.a, my_row.b) > from my_source;{code} > will throw excepions: > {code:java} > Exception in thread "main" org.apache.flink.table.api.SqlParserException: SQL > parse failed. Encountered "." at line 1, column 18.Exception in thread "main" > org.apache.flink.table.api.SqlParserException: SQL parse failed. Encountered > "." at line 1, column 18.Was expecting one of: ")" ... "," ... at > org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:56) > at > org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:64) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:627) > at com.bytedance.demo.KafkaTableSource.main(KafkaTableSource.java:76)Caused > by: org.apache.calcite.sql.parser.SqlParseException: Encountered "." at line > 1, column 18.Was expecting one of: ")" ... "," ... at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.convertException(FlinkSqlParserImpl.java:416) > at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.normalizeException(FlinkSqlParserImpl.java:201) > at > org.apache.calcite.sql.parser.SqlParser.handleException(SqlParser.java:148) > at org.apache.calcite.sql.parser.SqlParser.parseQuery(SqlParser.java:163) at > org.apache.calcite.sql.parser.SqlParser.parseStmt(SqlParser.java:188) at > org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:54) > ... 3 moreCaused by: org.apache.flink.sql.parser.impl.ParseException: > Encountered "." at line 1, column 18.Was expecting one of: ")" ... "," > ... at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.generateParseException(FlinkSqlParserImpl.java:36161) > at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.jj_consume_token(FlinkSqlParserImpl.java:35975) > at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.ParenthesizedSimpleIdentifierList(FlinkSqlParserImpl.java:21432) > at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.Expression3(FlinkSqlParserImpl.java:17164) > at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.Expression2b(FlinkSqlParserImpl.java:16820) > at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.Expression2(FlinkSqlParserImpl.java:16861) > at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.Expression(FlinkSqlParserImpl.java:16792) > at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SelectExpression(FlinkSqlParserImpl.java:11091) > at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SelectItem(FlinkSqlParserImpl.java:10293) > at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SelectList(FlinkSqlParserImpl.java:10267) > at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlSelect(FlinkSqlParserImpl.java:6943) > at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.LeafQuery(FlinkSqlParserImpl.java:658) > at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.LeafQueryOrExpr(FlinkSqlParserImpl.java:16775) > at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.QueryOrExpr(FlinkSqlParserImpl.java:16238) > at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.OrderedQueryOrExpr(FlinkSqlParserImpl.java:532) > at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlStmt(FlinkSqlParserImpl.java:3761) > at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlStmtEof(FlinkSqlParserImpl.java:3800) > at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.parseSqlStmtEof(FlinkSqlParserImpl.java:248) > at org.apache.calcite.sql.parser.SqlParser.parseQuery(SqlParser.java:161) > ... 5 more > {code} > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-31301) Unsupported nested columns in column list of insert statement
[ https://issues.apache.org/jira/browse/FLINK-31301?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17710396#comment-17710396 ] Aitozi commented on FLINK-31301: [~lincoln.86xy] , I opened PR for this, could you help review it, thanks. > Unsupported nested columns in column list of insert statement > - > > Key: FLINK-31301 > URL: https://issues.apache.org/jira/browse/FLINK-31301 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Affects Versions: 1.17.0, 1.16.1 >Reporter: lincoln lee >Assignee: Aitozi >Priority: Major > Labels: pull-request-available > > Currently an error will be raised when use nested columns in column list of > insert statement, e.g., > {code:java} > INSERT INTO nested_type_sink (a,b.b1,c.c2,f) > SELECT a,b.b1,c.c2,f FROM nested_type_src > {code} > > {code:java} > java.lang.AssertionError > at org.apache.calcite.sql.SqlIdentifier.getSimple(SqlIdentifier.java:333) > at > org.apache.calcite.sql.validate.SqlValidatorUtil.getTargetField(SqlValidatorUtil.java:612) > at > org.apache.flink.table.planner.calcite.PreValidateReWriter$.$anonfun$appendPartitionAndNullsProjects$3(PreValidateReWriter.scala:171) > at > scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233) > at scala.collection.Iterator.foreach(Iterator.scala:937) > at scala.collection.Iterator.foreach$(Iterator.scala:937) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1425) > at scala.collection.IterableLike.foreach(IterableLike.scala:70) > at scala.collection.IterableLike.foreach$(IterableLike.scala:69) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at scala.collection.TraversableLike.map(TraversableLike.scala:233) > at scala.collection.TraversableLike.map$(TraversableLike.scala:226) > at scala.collection.AbstractTraversable.map(Traversable.scala:104) > at > org.apache.flink.table.planner.calcite.PreValidateReWriter$.appendPartitionAndNullsProjects(PreValidateReWriter.scala:164) > at > org.apache.flink.table.planner.calcite.PreValidateReWriter.rewriteInsert(PreValidateReWriter.scala:71) > at > org.apache.flink.table.planner.calcite.PreValidateReWriter.visit(PreValidateReWriter.scala:61) > at > org.apache.flink.table.planner.calcite.PreValidateReWriter.visit(PreValidateReWriter.scala:50) > at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:161) > at > org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:118) > at > org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:113) > at > org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:281) > at > org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:106) > at > org.apache.flink.table.api.internal.StatementSetImpl.addInsertSql(StatementSetImpl.java:63) > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-31344) Support to update nested columns in update statement
[ https://issues.apache.org/jira/browse/FLINK-31344?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17710023#comment-17710023 ] Aitozi commented on FLINK-31344: Thanks for your reply. Totally agree with you. I have basically verified the first work, can you help assign this ticket to me ? > Support to update nested columns in update statement > > > Key: FLINK-31344 > URL: https://issues.apache.org/jira/browse/FLINK-31344 > Project: Flink > Issue Type: Improvement > Components: Table SQL / API >Reporter: luoyuxia >Priority: Major > > Currently, it'll throw exception while using update statement to update > nested column; > For the following sql: > > {code:java} > create table (t ROW<`a` INT>) with (xxx); > update t set s.a = 1;{code} > It'll throw the exception: > {code:java} > Caused by: org.apache.flink.sql.parser.impl.ParseException: Encountered "." > at line 1, column 15. > Was expecting: > "=" ... > > at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.generateParseException(FlinkSqlParserImpl.java:46382) > at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.jj_consume_token(FlinkSqlParserImpl.java:46190) > at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlUpdate(FlinkSqlParserImpl.java:14389) > at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlStmt(FlinkSqlParserImpl.java:4121) > at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlStmtList(FlinkSqlParserImpl.java:2998) > at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.parseSqlStmtList(FlinkSqlParserImpl.java:306) > at > org.apache.calcite.sql.parser.SqlParser.parseStmtList(SqlParser.java:198) > ... 33 more {code} > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-31344) Support to update nested columns in update statement
[ https://issues.apache.org/jira/browse/FLINK-31344?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17709961#comment-17709961 ] Aitozi commented on FLINK-31344: Hi [~luoyuxia], when solving the FLINK-31301 I notice this ticket. After spending some time investigate on this ticket, I think we can support update nested columns in the following step: - Modify the SqlParser to accept the compound identifier for the UPDATE SET clause - Modify the interface SupportsRowLevelUpdate, which will be better to work with {{int[][]}} instead of {{Column}} (It will make the control of required column finer). WDYT ? > Support to update nested columns in update statement > > > Key: FLINK-31344 > URL: https://issues.apache.org/jira/browse/FLINK-31344 > Project: Flink > Issue Type: Improvement > Components: Table SQL / API >Reporter: luoyuxia >Priority: Major > > Currently, it'll throw exception while using update statement to update > nested column; > For the following sql: > > {code:java} > create table (t ROW<`a` INT>) with (xxx); > update t set s.a = 1;{code} > It'll throw the exception: > {code:java} > Caused by: org.apache.flink.sql.parser.impl.ParseException: Encountered "." > at line 1, column 15. > Was expecting: > "=" ... > > at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.generateParseException(FlinkSqlParserImpl.java:46382) > at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.jj_consume_token(FlinkSqlParserImpl.java:46190) > at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlUpdate(FlinkSqlParserImpl.java:14389) > at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlStmt(FlinkSqlParserImpl.java:4121) > at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlStmtList(FlinkSqlParserImpl.java:2998) > at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.parseSqlStmtList(FlinkSqlParserImpl.java:306) > at > org.apache.calcite.sql.parser.SqlParser.parseStmtList(SqlParser.java:198) > ... 33 more {code} > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-31755) ROW function can not work with RewriteIntersectAllRule
[ https://issues.apache.org/jira/browse/FLINK-31755?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aitozi updated FLINK-31755: --- Description: Reproduce case: {code:java} create table row_sink ( `b` ROW ) with ( 'connector' = 'values' ) util.verifyRelPlanInsert( "INSERT INTO row_sink " + "SELECT ROW(a, b) FROM complex_type_src intersect all " + "SELECT ROW(c, d) FROM complex_type_src ") {code} It will fails with {code:java} Caused by: java.lang.IllegalArgumentException: Type mismatch: rel rowtype: RecordType(RecordType:peek_no_expand(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" EXPR$0, INTEGER EXPR$1) NOT NULL EXPR$0) NOT NULL equiv rowtype: RecordType(RecordType(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" EXPR$0, INTEGER EXPR$1) NOT NULL EXPR$0) NOT NULL Difference: EXPR$0: RecordType:peek_no_expand(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" EXPR$0, INTEGER EXPR$1) NOT NULL -> RecordType(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" EXPR$0, INTEGER EXPR$1) NOT NULL at org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:592) at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:613) at org.apache.calcite.plan.volcano.VolcanoRuleCall.transformTo(VolcanoRuleCall.java:144) ... 68 more {code} The reason is: ROW function will generates the {{FULLY_QUALIFIED}} type. But after the {{RewriteIntersectAllRule}} optimization, it will produce the {{PEEK_FIELDS_NO_EXPAND}}. So the volcano planner complains with type mismatch. was: Reproduce case: {code:java} create table row_sink ( `b` ROW ) with ( 'connector' = 'values' ) util.verifyRelPlanInsert( "INSERT INTO row_sink " + "SELECT ROW(a, b) FROM complex_type_src intersect all " + "SELECT ROW(c, d) FROM complex_type_src ") {code} It will fails with {code:java} Caused by: java.lang.IllegalArgumentException: Type mismatch: rel rowtype: RecordType(RecordType:peek_no_expand(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" EXPR$0, INTEGER EXPR$1) NOT NULL EXPR$0) NOT NULL equiv rowtype: RecordType(RecordType(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" EXPR$0, INTEGER EXPR$1) NOT NULL EXPR$0) NOT NULL Difference: EXPR$0: RecordType:peek_no_expand(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" EXPR$0, INTEGER EXPR$1) NOT NULL -> RecordType(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" EXPR$0, INTEGER EXPR$1) NOT NULL at org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:592) at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:613) at org.apache.calcite.plan.volcano.VolcanoRuleCall.transformTo(VolcanoRuleCall.java:144) ... 68 more {code} The reason is: ROW function will generates the {{FULLY_QUALIFIED}} type. But after the {{RewriteIntersectAllRule}} optimization, it will produce the {{PEEK_FIELDS_NO_EXPAND}}. So the volcan planner complains with type mismatch. > ROW function can not work with RewriteIntersectAllRule > -- > > Key: FLINK-31755 > URL: https://issues.apache.org/jira/browse/FLINK-31755 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Reporter: Aitozi >Priority: Major > > Reproduce case: > {code:java} > create table row_sink ( > `b` ROW > ) with ( > 'connector' = 'values' > ) > util.verifyRelPlanInsert( > "INSERT INTO row_sink " + > "SELECT ROW(a, b) FROM complex_type_src intersect all " + > "SELECT ROW(c, d) FROM complex_type_src ") > {code} > It will fails with > {code:java} > Caused by: java.lang.IllegalArgumentException: Type mismatch: > rel rowtype: RecordType(RecordType:peek_no_expand(VARCHAR(2147483647) > CHARACTER SET "UTF-16LE" EXPR$0, INTEGER EXPR$1) NOT NULL EXPR$0) NOT NULL > equiv rowtype: RecordType(RecordType(VARCHAR(2147483647) CHARACTER SET > "UTF-16LE" EXPR$0, INTEGER EXPR$1) NOT NULL EXPR$0) NOT NULL > Difference: > EXPR$0: RecordType:peek_no_expand(VARCHAR(2147483647) CHARACTER SET > "UTF-16LE" EXPR$0, INTEGER EXPR$1) NOT NULL -> RecordType(VARCHAR(2147483647) > CHARACTER SET "UTF-16LE" EXPR$0, INTEGER EXPR$1) NOT NULL > at > org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:592) > at > org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:613) > at > org.apache.calcite.plan.volcano.VolcanoRuleCall.transformTo(VolcanoRuleCall.java:144) > ... 68 more > {code} > The reason is: > ROW function will generates the {{FULLY_QUALIFIED}} type. But after the > {{RewriteIntersectAllRule}} optimization, it will produce the > {{PEEK_FIELDS_NO_EXPAND}}. So the volcano planner complains with type > mismatch. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-31755) ROW function can not work with RewriteIntersectAllRule
Aitozi created FLINK-31755: -- Summary: ROW function can not work with RewriteIntersectAllRule Key: FLINK-31755 URL: https://issues.apache.org/jira/browse/FLINK-31755 Project: Flink Issue Type: Bug Components: Table SQL / Planner Reporter: Aitozi Reproduce case: {code:java} create table row_sink ( `b` ROW ) with ( 'connector' = 'values' ) util.verifyRelPlanInsert( "INSERT INTO row_sink " + "SELECT ROW(a, b) FROM complex_type_src intersect all " + "SELECT ROW(c, d) FROM complex_type_src ") {code} It will fails with {code:java} Caused by: java.lang.IllegalArgumentException: Type mismatch: rel rowtype: RecordType(RecordType:peek_no_expand(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" EXPR$0, INTEGER EXPR$1) NOT NULL EXPR$0) NOT NULL equiv rowtype: RecordType(RecordType(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" EXPR$0, INTEGER EXPR$1) NOT NULL EXPR$0) NOT NULL Difference: EXPR$0: RecordType:peek_no_expand(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" EXPR$0, INTEGER EXPR$1) NOT NULL -> RecordType(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" EXPR$0, INTEGER EXPR$1) NOT NULL at org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:592) at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:613) at org.apache.calcite.plan.volcano.VolcanoRuleCall.transformTo(VolcanoRuleCall.java:144) ... 68 more {code} The reason is: ROW function will generates the {{FULLY_QUALIFIED}} type. But after the {{RewriteIntersectAllRule}} optimization, it will produce the {{PEEK_FIELDS_NO_EXPAND}}. So the volcan planner complains with type mismatch. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-31301) Unsupported nested columns in column list of insert statement
[ https://issues.apache.org/jira/browse/FLINK-31301?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17708948#comment-17708948 ] Aitozi commented on FLINK-31301: Hello [~lincoln.86xy], I want to contribute to this ticket and I have basically finished this. Can you help assign this ticket to me ? > Unsupported nested columns in column list of insert statement > - > > Key: FLINK-31301 > URL: https://issues.apache.org/jira/browse/FLINK-31301 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Affects Versions: 1.17.0, 1.16.1 >Reporter: lincoln lee >Priority: Major > > Currently an error will be raised when use nested columns in column list of > insert statement, e.g., > {code:java} > INSERT INTO nested_type_sink (a,b.b1,c.c2,f) > SELECT a,b.b1,c.c2,f FROM nested_type_src > {code} > > {code:java} > java.lang.AssertionError > at org.apache.calcite.sql.SqlIdentifier.getSimple(SqlIdentifier.java:333) > at > org.apache.calcite.sql.validate.SqlValidatorUtil.getTargetField(SqlValidatorUtil.java:612) > at > org.apache.flink.table.planner.calcite.PreValidateReWriter$.$anonfun$appendPartitionAndNullsProjects$3(PreValidateReWriter.scala:171) > at > scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233) > at scala.collection.Iterator.foreach(Iterator.scala:937) > at scala.collection.Iterator.foreach$(Iterator.scala:937) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1425) > at scala.collection.IterableLike.foreach(IterableLike.scala:70) > at scala.collection.IterableLike.foreach$(IterableLike.scala:69) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at scala.collection.TraversableLike.map(TraversableLike.scala:233) > at scala.collection.TraversableLike.map$(TraversableLike.scala:226) > at scala.collection.AbstractTraversable.map(Traversable.scala:104) > at > org.apache.flink.table.planner.calcite.PreValidateReWriter$.appendPartitionAndNullsProjects(PreValidateReWriter.scala:164) > at > org.apache.flink.table.planner.calcite.PreValidateReWriter.rewriteInsert(PreValidateReWriter.scala:71) > at > org.apache.flink.table.planner.calcite.PreValidateReWriter.visit(PreValidateReWriter.scala:61) > at > org.apache.flink.table.planner.calcite.PreValidateReWriter.visit(PreValidateReWriter.scala:50) > at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:161) > at > org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:118) > at > org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:113) > at > org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:281) > at > org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:106) > at > org.apache.flink.table.api.internal.StatementSetImpl.addInsertSql(StatementSetImpl.java:63) > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-31301) Unsupported nested columns in column list of insert statement
[ https://issues.apache.org/jira/browse/FLINK-31301?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17708014#comment-17708014 ] Aitozi commented on FLINK-31301: Hello [~lincoln.86xy], I'm interested in this ticket and spend some time to look into this. I think we can support insert partial nested column by analysing the targetRow type to derive the RowType from the RelDataType. Then, we can construct the target row by {{row(b, cast(null as int), row(cast(null as varchar), c))}} to make up the complete row. WDYT ? > Unsupported nested columns in column list of insert statement > - > > Key: FLINK-31301 > URL: https://issues.apache.org/jira/browse/FLINK-31301 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Affects Versions: 1.17.0, 1.16.1 >Reporter: lincoln lee >Priority: Major > > Currently an error will be raised when use nested columns in column list of > insert statement, e.g., > {code:java} > INSERT INTO nested_type_sink (a,b.b1,c.c2,f) > SELECT a,b.b1,c.c2,f FROM nested_type_src > {code} > > {code:java} > java.lang.AssertionError > at org.apache.calcite.sql.SqlIdentifier.getSimple(SqlIdentifier.java:333) > at > org.apache.calcite.sql.validate.SqlValidatorUtil.getTargetField(SqlValidatorUtil.java:612) > at > org.apache.flink.table.planner.calcite.PreValidateReWriter$.$anonfun$appendPartitionAndNullsProjects$3(PreValidateReWriter.scala:171) > at > scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233) > at scala.collection.Iterator.foreach(Iterator.scala:937) > at scala.collection.Iterator.foreach$(Iterator.scala:937) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1425) > at scala.collection.IterableLike.foreach(IterableLike.scala:70) > at scala.collection.IterableLike.foreach$(IterableLike.scala:69) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at scala.collection.TraversableLike.map(TraversableLike.scala:233) > at scala.collection.TraversableLike.map$(TraversableLike.scala:226) > at scala.collection.AbstractTraversable.map(Traversable.scala:104) > at > org.apache.flink.table.planner.calcite.PreValidateReWriter$.appendPartitionAndNullsProjects(PreValidateReWriter.scala:164) > at > org.apache.flink.table.planner.calcite.PreValidateReWriter.rewriteInsert(PreValidateReWriter.scala:71) > at > org.apache.flink.table.planner.calcite.PreValidateReWriter.visit(PreValidateReWriter.scala:61) > at > org.apache.flink.table.planner.calcite.PreValidateReWriter.visit(PreValidateReWriter.scala:50) > at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:161) > at > org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:118) > at > org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:113) > at > org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:281) > at > org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:106) > at > org.apache.flink.table.api.internal.StatementSetImpl.addInsertSql(StatementSetImpl.java:63) > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-31550) Replace deprecated TableSchema with Schema in OperationConverterUtils
[ https://issues.apache.org/jira/browse/FLINK-31550?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17705828#comment-17705828 ] Aitozi commented on FLINK-31550: Is this stale? {{OperationConverterUtils}} do not use {{TableSchema}} now. > Replace deprecated TableSchema with Schema in OperationConverterUtils > - > > Key: FLINK-31550 > URL: https://issues.apache.org/jira/browse/FLINK-31550 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / Planner >Reporter: Jark Wu >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-31507) Move execution logic of ShowOperation out from TableEnvironmentImpl
[ https://issues.apache.org/jira/browse/FLINK-31507?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17702245#comment-17702245 ] Aitozi commented on FLINK-31507: Hello, [~jark] May I help to do this refactor ? > Move execution logic of ShowOperation out from TableEnvironmentImpl > --- > > Key: FLINK-31507 > URL: https://issues.apache.org/jira/browse/FLINK-31507 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / Planner >Reporter: Jark Wu >Priority: Major > > This should implement {{ExecutableOperation}} for all the {{ShowOperation}}s > to move the execution logic out from > {{TableEnvironmentImpl#executeInternal()}}. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-31497) Drop the deprecated CatalogViewImpl
[ https://issues.apache.org/jira/browse/FLINK-31497?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aitozi updated FLINK-31497: --- Parent: FLINK-21394 Issue Type: Sub-task (was: Technical Debt) > Drop the deprecated CatalogViewImpl > > > Key: FLINK-31497 > URL: https://issues.apache.org/jira/browse/FLINK-31497 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / Planner >Reporter: Aitozi >Priority: Major > > After https://issues.apache.org/jira/browse/FLINK-29585 > CatalogViewImpl not used in Flink project now, we may can drop it now cc > [~snuyanzin] > But, we may have to check whether it is used in other connector's system -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-31497) Drop the deprecated CatalogViewImpl
Aitozi created FLINK-31497: -- Summary: Drop the deprecated CatalogViewImpl Key: FLINK-31497 URL: https://issues.apache.org/jira/browse/FLINK-31497 Project: Flink Issue Type: Technical Debt Components: Table SQL / Planner Reporter: Aitozi After https://issues.apache.org/jira/browse/FLINK-29585 CatalogViewImpl not used in Flink project now, we may can drop it now cc [~snuyanzin] But, we may have to check whether it is used in other connector's system -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-31426) Upgrade the deprecated UniqueConstraint to the new one
[ https://issues.apache.org/jira/browse/FLINK-31426?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aitozi updated FLINK-31426: --- Parent: FLINK-29072 Issue Type: Sub-task (was: Technical Debt) > Upgrade the deprecated UniqueConstraint to the new one > --- > > Key: FLINK-31426 > URL: https://issues.apache.org/jira/browse/FLINK-31426 > Project: Flink > Issue Type: Sub-task > Components: Connectors / Hive, Table SQL / Planner >Reporter: Aitozi >Priority: Major > > https://github.com/apache/flink/pull/21522#discussion_r1133642525 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-31426) Upgrade the deprecated UniqueConstraint to the new one
Aitozi created FLINK-31426: -- Summary: Upgrade the deprecated UniqueConstraint to the new one Key: FLINK-31426 URL: https://issues.apache.org/jira/browse/FLINK-31426 Project: Flink Issue Type: Technical Debt Components: Connectors / Hive, Table SQL / Planner Reporter: Aitozi https://github.com/apache/flink/pull/21522#discussion_r1133642525 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-31260) PushLocalHashAggIntoScanRule should also work with union RelNode
[ https://issues.apache.org/jira/browse/FLINK-31260?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17698902#comment-17698902 ] Aitozi commented on FLINK-31260: [~zhengyiweng] Thanks for your attention, I think the exchange in rule pattern can be removed. I'd like revisit this issue after [https://github.com/apache/flink/pull/22001] merged. Since in that PR will generate the pattern above. > PushLocalHashAggIntoScanRule should also work with union RelNode > > > Key: FLINK-31260 > URL: https://issues.apache.org/jira/browse/FLINK-31260 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Planner >Reporter: Aitozi >Priority: Major > > As discussed in > [comments|https://github.com/apache/flink/pull/22001#discussion_r1119652784] > Currently, {{PushLocalHashAggIntoScanRule}} match for the Exchange -> > LocalHashAggregate -> Scan. As a result, the following pattern can not be > optimized > {code:java} > +- Union(all=[true], union=[type, sum$0]) > :- Union(all=[true], union=[type, sum$0]) > : :- LocalHashAggregate(groupBy=[type], select=[type, > Partial_SUM(price) AS sum$0]) > : : +- TableSourceScan(table=[[default_catalog, default_database, > table1, project=[type, price], metadata=[]]], fields=[type, price]) > : +- LocalHashAggregate(groupBy=[type], select=[type, > Partial_SUM(price) AS sum$0]) > : +- TableSourceScan(table=[[default_catalog, default_database, > table2, project=[type, price], metadata=[]]], fields=[type, price]) > +- LocalHashAggregate(groupBy=[type], select=[type, > Partial_SUM(price) AS sum$0]) > +- TableSourceScan(table=[[default_catalog, default_database, > table3, project=[type, price], metadata=[]]], fields=[type, price]) > {code} > We should extend the rule to support this pattern. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-31390) Optimize the FlinkChangelogModeInferenceProgram by avoiding unnecessary traversals.
Aitozi created FLINK-31390: -- Summary: Optimize the FlinkChangelogModeInferenceProgram by avoiding unnecessary traversals. Key: FLINK-31390 URL: https://issues.apache.org/jira/browse/FLINK-31390 Project: Flink Issue Type: Improvement Components: Table SQL / Planner Reporter: Aitozi We can avoid the unnecessary traversals of the RelNode tree, since we are only interested in the first satisfied plan. FlinkChangelogModeInferenceProgram {code:java} val updateKindTraitVisitor = new SatisfyUpdateKindTraitVisitor(context) val finalRoot = requiredUpdateKindTraits.flatMap { requiredUpdateKindTrait => updateKindTraitVisitor.visit(rootWithModifyKindSet, requiredUpdateKindTrait) } {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-31362) Upgrade to Calcite version to 1.33.0
[ https://issues.apache.org/jira/browse/FLINK-31362?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aitozi updated FLINK-31362: --- Description: In Calcite 1.33.0, C-style escape strings have been supported. We could leverage it to enhance our string literals usage. issue: https://issues.apache.org/jira/browse/CALCITE-5305 was: In Calcite 1.33.0, C-style escape strings have been supported. We should outline its usage in document after upgrading to Calcite 1.33.0 > Upgrade to Calcite version to 1.33.0 > > > Key: FLINK-31362 > URL: https://issues.apache.org/jira/browse/FLINK-31362 > Project: Flink > Issue Type: Improvement > Components: Table SQL / API >Reporter: Aitozi >Priority: Major > > In Calcite 1.33.0, C-style escape strings have been supported. We could > leverage it to enhance our string literals usage. > issue: https://issues.apache.org/jira/browse/CALCITE-5305 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-31362) Upgrade to Calcite version to 1.33.0
[ https://issues.apache.org/jira/browse/FLINK-31362?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aitozi updated FLINK-31362: --- Component/s: Table SQL / API (was: Documentation) > Upgrade to Calcite version to 1.33.0 > > > Key: FLINK-31362 > URL: https://issues.apache.org/jira/browse/FLINK-31362 > Project: Flink > Issue Type: Improvement > Components: Table SQL / API >Reporter: Aitozi >Priority: Major > > In Calcite 1.33.0, C-style escape strings have been supported. We should > outline its usage in document after upgrading to Calcite 1.33.0 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-31362) Upgrade to Calcite version to 1.33.0
[ https://issues.apache.org/jira/browse/FLINK-31362?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aitozi updated FLINK-31362: --- Summary: Upgrade to Calcite version to 1.33.0 (was: Add document about how to use C-style escape strings) > Upgrade to Calcite version to 1.33.0 > > > Key: FLINK-31362 > URL: https://issues.apache.org/jira/browse/FLINK-31362 > Project: Flink > Issue Type: Improvement > Components: Documentation >Reporter: Aitozi >Priority: Major > > In Calcite 1.33.0, C-style escape strings have been supported. We should > outline its usage in document after upgrading to Calcite 1.33.0 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-31362) Add document about how to use C-style escape strings
[ https://issues.apache.org/jira/browse/FLINK-31362?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aitozi updated FLINK-31362: --- Description: In Calcite 1.33.0, C-style escape strings have been supported. We should outline its usage in document after upgrading to Calcite 1.33.0 > Add document about how to use C-style escape strings > > > Key: FLINK-31362 > URL: https://issues.apache.org/jira/browse/FLINK-31362 > Project: Flink > Issue Type: Improvement > Components: Documentation >Reporter: Aitozi >Priority: Major > > In Calcite 1.33.0, C-style escape strings have been supported. We should > outline its usage in document after upgrading to Calcite 1.33.0 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-31362) Add document about how to use C-style escape strings
Aitozi created FLINK-31362: -- Summary: Add document about how to use C-style escape strings Key: FLINK-31362 URL: https://issues.apache.org/jira/browse/FLINK-31362 Project: Flink Issue Type: Improvement Components: Documentation Reporter: Aitozi -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-31296) Add JoinConditionEqualityTransferRule to stream optimizer
[ https://issues.apache.org/jira/browse/FLINK-31296?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17695491#comment-17695491 ] Aitozi commented on FLINK-31296: But this will affect the join key pair in the Join operator which may affect the state compatibility, I'm not sure how to handle this implicit breaking > Add JoinConditionEqualityTransferRule to stream optimizer > - > > Key: FLINK-31296 > URL: https://issues.apache.org/jira/browse/FLINK-31296 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Planner >Reporter: Aitozi >Priority: Major > > I find that {{JoinConditionEqualityTransferRule}} is a common rule for batch > and stream mode. So it should be added to the stream optimizer which will > bring performance improvement in some case. > Maybe, other rules also need to be reviewed whether can be aligned in batch > and stream case. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-31296) Add JoinConditionEqualityTransferRule to stream optimizer
Aitozi created FLINK-31296: -- Summary: Add JoinConditionEqualityTransferRule to stream optimizer Key: FLINK-31296 URL: https://issues.apache.org/jira/browse/FLINK-31296 Project: Flink Issue Type: Improvement Components: Table SQL / Planner Reporter: Aitozi I find that {{JoinConditionEqualityTransferRule}} is a common rule for batch and stream mode. So it should be added to the stream optimizer which will bring performance improvement in some case. Maybe, other rules also need to be reviewed whether can be aligned in batch and stream case. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-31260) PushLocalHashAggIntoScanRule should also work with union RelNode
[ https://issues.apache.org/jira/browse/FLINK-31260?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aitozi updated FLINK-31260: --- Component/s: Table SQL / Planner > PushLocalHashAggIntoScanRule should also work with union RelNode > > > Key: FLINK-31260 > URL: https://issues.apache.org/jira/browse/FLINK-31260 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Planner >Reporter: Aitozi >Priority: Major > > As discussed in > [comments|https://github.com/apache/flink/pull/22001#discussion_r1119652784] > Currently, {{PushLocalHashAggIntoScanRule}} match for the Exchange -> > LocalHashAggregate -> Scan. As a result, the following pattern can not be > optimized > {code:java} > +- Union(all=[true], union=[type, sum$0]) > :- Union(all=[true], union=[type, sum$0]) > : :- LocalHashAggregate(groupBy=[type], select=[type, > Partial_SUM(price) AS sum$0]) > : : +- TableSourceScan(table=[[default_catalog, default_database, > table1, project=[type, price], metadata=[]]], fields=[type, price]) > : +- LocalHashAggregate(groupBy=[type], select=[type, > Partial_SUM(price) AS sum$0]) > : +- TableSourceScan(table=[[default_catalog, default_database, > table2, project=[type, price], metadata=[]]], fields=[type, price]) > +- LocalHashAggregate(groupBy=[type], select=[type, > Partial_SUM(price) AS sum$0]) > +- TableSourceScan(table=[[default_catalog, default_database, > table3, project=[type, price], metadata=[]]], fields=[type, price]) > {code} > We should extend the rule to support this pattern. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-31260) PushLocalHashAggIntoScanRule should also work with union RelNode
Aitozi created FLINK-31260: -- Summary: PushLocalHashAggIntoScanRule should also work with union RelNode Key: FLINK-31260 URL: https://issues.apache.org/jira/browse/FLINK-31260 Project: Flink Issue Type: Improvement Reporter: Aitozi As discussed in [comments|https://github.com/apache/flink/pull/22001#discussion_r1119652784] Currently, {{PushLocalHashAggIntoScanRule}} match for the Exchange -> LocalHashAggregate -> Scan. As a result, the following pattern can not be optimized {code:java} +- Union(all=[true], union=[type, sum$0]) :- Union(all=[true], union=[type, sum$0]) : :- LocalHashAggregate(groupBy=[type], select=[type, Partial_SUM(price) AS sum$0]) : : +- TableSourceScan(table=[[default_catalog, default_database, table1, project=[type, price], metadata=[]]], fields=[type, price]) : +- LocalHashAggregate(groupBy=[type], select=[type, Partial_SUM(price) AS sum$0]) : +- TableSourceScan(table=[[default_catalog, default_database, table2, project=[type, price], metadata=[]]], fields=[type, price]) +- LocalHashAggregate(groupBy=[type], select=[type, Partial_SUM(price) AS sum$0]) +- TableSourceScan(table=[[default_catalog, default_database, table3, project=[type, price], metadata=[]]], fields=[type, price]) {code} We should extend the rule to support this pattern. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-31205) do optimize for multi sink in a single relNode tree
[ https://issues.apache.org/jira/browse/FLINK-31205?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17694386#comment-17694386 ] Aitozi commented on FLINK-31205: looking forward to your opinion CC [~godfreyhe] [~twalthr] [~snuyanzin] > do optimize for multi sink in a single relNode tree > > > Key: FLINK-31205 > URL: https://issues.apache.org/jira/browse/FLINK-31205 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Planner >Reporter: Aitozi >Priority: Major > > Flink supports multi sink usage, but it optimize the each sink in a > individual RelNode tree, this will miss some opportunity to do some cross > tree optimization, eg: > {code:java} > create table newX( > a int, > b bigint, > c varchar, > d varchar, > e varchar > ) with ( > 'connector' = 'values' > ,'enable-projection-push-down' = 'true' > insert into sink_table select a, b from newX > insert into sink_table select a, 1 from newX > {code} > It will produce the plan as below, this will cause the source be consumed > twice > {code:java} > Sink(table=[default_catalog.default_database.sink_table], fields=[a, b]) > +- TableSourceScan(table=[[default_catalog, default_database, newX, > project=[a, b], metadata=[]]], fields=[a, b]) > Sink(table=[default_catalog.default_database.sink_table], fields=[a, b]) > +- Calc(select=[a, 1 AS b]) >+- TableSourceScan(table=[[default_catalog, default_database, newX, > project=[a], metadata=[]]], fields=[a]) > {code} > In this ticket, I propose to do a global optimization for the multi sink by > * Megre the multi sink(with same table) into a single relNode tree with an > extra union node > * After optimization, split the merged union back to the original multi sink > In my poc, after step 1, it will produce the plan as below, I think it will > do good for the global performacne > {code:java} > Sink(table=[default_catalog.default_database.sink_table], fields=[a, b]) > +- Union(all=[true], union=[a, b]) >:- TableSourceScan(table=[[default_catalog, default_database, newX, > project=[a, b], metadata=[]]], fields=[a, b])(reuse_id=[1]) >+- Calc(select=[a AS $f0, CAST(1 AS BIGINT) AS $f1]) > +- Reused(reference_id=[1]) > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-31205) do optimize for multi sink in a single relNode tree
[ https://issues.apache.org/jira/browse/FLINK-31205?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17694372#comment-17694372 ] Aitozi commented on FLINK-31205: After some research, I found that there are better choices than using a union to get a single tree. {{Union}} can only cover the use case of multi-sink to the same table because the {{Union}} enforces the type consistency. We can add a new "virtual" RelNode, accepting the multi-sink as input. It can work as packing the multi-tree together so that, from the perspective of the optimizer, it can have the ability to do global optimization. In my POC, I add a new type RelNode named {{MultiSink}} before passing it to the calcite optimizer. The MultiSink does not do any transformation on the inputs. After logical optimization, the plan is {code:java} LogicalMultiSink :- LogicalSink(table=[default_catalog.default_database.sink_table], fields=[a, b]) : +- LogicalProject(inputs=[0..1]) : +- LogicalTableScan(table=[[default_catalog, default_database, newX]]) +- LogicalSink(table=[default_catalog.default_database.sink_table], fields=[a, b]) +- LogicalProject(inputs=[0], exprs=[[1:BIGINT]]) +- LogicalTableScan(table=[[default_catalog, default_database, newX]]) {code} After physical optimization, the plan is {code:java} MultiSink :- Sink(table=[default_catalog.default_database.sink_table], fields=[a, b]) : +- TableSourceScan(table=[[default_catalog, default_database, newX, project=[a, b], metadata=[]]], fields=[a, b]) +- Sink(table=[default_catalog.default_database.sink_table], fields=[$f0, $f1]) +- Calc(select=[a AS $f0, 1:BIGINT AS $f1]) +- TableSourceScan(table=[[default_catalog, default_database, newX, project=[a, b], metadata=[]]], fields=[a, b]) {code} Before transforming to the ExecNode, we remove the {{MultiSink}} (which is only intended to work during the optimizing phase), then the final result can be {code:java} TableSourceScan(table=[[default_catalog, default_database, newX, project=[a, b], metadata=[]]], fields=[a, b])(reuse_id=[1]) Sink(table=[default_catalog.default_database.sink_table], fields=[a, b]) +- Reused(reference_id=[1]) Sink(table=[default_catalog.default_database.sink_table], fields=[$f0, $f1]) +- Calc(select=[a AS $f0, 1 AS $f1]) +- Reused(reference_id=[1]) {code} With the new RelNode, single-tree optimization is possible. We can do more things during the single tree optimization, e.g., introduce the cost model for the CTE to decide whether to inline/reuse and so on. > do optimize for multi sink in a single relNode tree > > > Key: FLINK-31205 > URL: https://issues.apache.org/jira/browse/FLINK-31205 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Planner >Reporter: Aitozi >Priority: Major > > Flink supports multi sink usage, but it optimize the each sink in a > individual RelNode tree, this will miss some opportunity to do some cross > tree optimization, eg: > {code:java} > create table newX( > a int, > b bigint, > c varchar, > d varchar, > e varchar > ) with ( > 'connector' = 'values' > ,'enable-projection-push-down' = 'true' > insert into sink_table select a, b from newX > insert into sink_table select a, 1 from newX > {code} > It will produce the plan as below, this will cause the source be consumed > twice > {code:java} > Sink(table=[default_catalog.default_database.sink_table], fields=[a, b]) > +- TableSourceScan(table=[[default_catalog, default_database, newX, > project=[a, b], metadata=[]]], fields=[a, b]) > Sink(table=[default_catalog.default_database.sink_table], fields=[a, b]) > +- Calc(select=[a, 1 AS b]) >+- TableSourceScan(table=[[default_catalog, default_database, newX, > project=[a], metadata=[]]], fields=[a]) > {code} > In this ticket, I propose to do a global optimization for the multi sink by > * Megre the multi sink(with same table) into a single relNode tree with an > extra union node > * After optimization, split the merged union back to the original multi sink > In my poc, after step 1, it will produce the plan as below, I think it will > do good for the global performacne > {code:java} > Sink(table=[default_catalog.default_database.sink_table], fields=[a, b]) > +- Union(all=[true], union=[a, b]) >:- TableSourceScan(table=[[default_catalog, default_database, newX, > project=[a, b], metadata=[]]], fields=[a, b])(reuse_id=[1]) >+- Calc(select=[a AS $f0, CAST(1 AS BIGINT) AS $f1]) > +- Reused(reference_id=[1]) > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-31205) do optimize for multi sink in a single relNode tree
Aitozi created FLINK-31205: -- Summary: do optimize for multi sink in a single relNode tree Key: FLINK-31205 URL: https://issues.apache.org/jira/browse/FLINK-31205 Project: Flink Issue Type: Improvement Components: Table SQL / Planner Reporter: Aitozi Flink supports multi sink usage, but it optimize the each sink in a individual RelNode tree, this will miss some opportunity to do some cross tree optimization, eg: {code:java} create table newX( a int, b bigint, c varchar, d varchar, e varchar ) with ( 'connector' = 'values' ,'enable-projection-push-down' = 'true' insert into sink_table select a, b from newX insert into sink_table select a, 1 from newX {code} It will produce the plan as below, this will cause the source be consumed twice {code:java} Sink(table=[default_catalog.default_database.sink_table], fields=[a, b]) +- TableSourceScan(table=[[default_catalog, default_database, newX, project=[a, b], metadata=[]]], fields=[a, b]) Sink(table=[default_catalog.default_database.sink_table], fields=[a, b]) +- Calc(select=[a, 1 AS b]) +- TableSourceScan(table=[[default_catalog, default_database, newX, project=[a], metadata=[]]], fields=[a]) {code} In this ticket, I propose to do a global optimization for the multi sink by * Megre the multi sink(with same table) into a single relNode tree with an extra union node * After optimization, split the merged union back to the original multi sink In my poc, after step 1, it will produce the plan as below, I think it will do good for the global performacne {code:java} Sink(table=[default_catalog.default_database.sink_table], fields=[a, b]) +- Union(all=[true], union=[a, b]) :- TableSourceScan(table=[[default_catalog, default_database, newX, project=[a, b], metadata=[]]], fields=[a, b])(reuse_id=[1]) +- Calc(select=[a AS $f0, CAST(1 AS BIGINT) AS $f1]) +- Reused(reference_id=[1]) {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-30551) Add open method to PartitionCommitPolicy
[ https://issues.apache.org/jira/browse/FLINK-30551?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17691568#comment-17691568 ] Aitozi commented on FLINK-30551: In my PR, only the table's with properties will be passed to the open method. But I think it will be convenient to also pass the flink conf to the open method, because some custom metaStore's config are in job level can be set to the flink conf. What about keeping the both by adding the interface like {code:java} default void open(Configuration tableOptions, Configuration flinkConf) {} {code} > Add open method to PartitionCommitPolicy > > > Key: FLINK-30551 > URL: https://issues.apache.org/jira/browse/FLINK-30551 > Project: Flink > Issue Type: Improvement > Components: Connectors / FileSystem >Reporter: Aitozi >Priority: Major > Labels: pull-request-available > > Currently, the {{PartitionCommitPolicy}} do not have the open hook. The > custom partition commit policy does not have an appropriate entry point for > the init work. > So I purpose to add an {{open}} method to make this work. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-31120) ConcurrentModificationException occurred in StringFunctionsITCase.test
[ https://issues.apache.org/jira/browse/FLINK-31120?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17691082#comment-17691082 ] Aitozi commented on FLINK-31120: Another instance: https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=46301=logs=ae4f8708-9994-57d3-c2d7-b892156e7812=0c940707-2659-5648-cbe6-a1ad63045f0a > ConcurrentModificationException occurred in StringFunctionsITCase.test > -- > > Key: FLINK-31120 > URL: https://issues.apache.org/jira/browse/FLINK-31120 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 1.17.0 >Reporter: Matthias Pohl >Priority: Blocker > Labels: pull-request-available, test-stability > > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=46255=logs=0c940707-2659-5648-cbe6-a1ad63045f0a=075c2716-8010-5565-fe08-3c4bb45824a4=12334 > {code} > Feb 17 04:51:25 [ERROR] Tests run: 4, Failures: 0, Errors: 1, Skipped: 0, > Time elapsed: 10.725 s <<< FAILURE! - in > org.apache.flink.table.planner.functions.StringFunctionsITCase > Feb 17 04:51:25 [ERROR] > org.apache.flink.table.planner.functions.StringFunctionsITCase.test(TestCase)[4] > Time elapsed: 4.367 s <<< ERROR! > Feb 17 04:51:25 org.apache.flink.table.api.TableException: Failed to execute > sql > Feb 17 04:51:25 at > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeQueryOperation(TableEnvironmentImpl.java:974) > Feb 17 04:51:25 at > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1422) > Feb 17 04:51:25 at > org.apache.flink.table.api.internal.TableImpl.execute(TableImpl.java:476) > Feb 17 04:51:25 at > org.apache.flink.table.planner.functions.BuiltInFunctionTestBase$ResultTestItem.test(BuiltInFunctionTestBase.java:354) > Feb 17 04:51:25 at > org.apache.flink.table.planner.functions.BuiltInFunctionTestBase$TestSetSpec.lambda$getTestCase$4(BuiltInFunctionTestBase.java:320) > Feb 17 04:51:25 at > org.apache.flink.table.planner.functions.BuiltInFunctionTestBase$TestCase.execute(BuiltInFunctionTestBase.java:113) > Feb 17 04:51:25 at > org.apache.flink.table.planner.functions.BuiltInFunctionTestBase.test(BuiltInFunctionTestBase.java:93) > Feb 17 04:51:25 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > [...] > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-30570) RexNodeExtractor#isSupportedPartitionPredicate generates unexpected partition predicates
[ https://issues.apache.org/jira/browse/FLINK-30570?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17656052#comment-17656052 ] Aitozi commented on FLINK-30570: I opened a PR to solve this problem, looking forward to be reviewed, thanks > RexNodeExtractor#isSupportedPartitionPredicate generates unexpected partition > predicates > > > Key: FLINK-30570 > URL: https://issues.apache.org/jira/browse/FLINK-30570 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Reporter: Aitozi >Priority: Major > Labels: pull-request-available > > Currently, the condition {{where rand(1) < 0.0001}} will be recognized as a > partition predicates and will be evaluated to false when compiling the SQL. > It has two problem. > First, it should not be recognized as a partition predicates, and the > nondeterministic function should never pass the partition pruner -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-30570) RexNodeExtractor#isSupportedPartitionPredicate generates unexpected partition predicates
[ https://issues.apache.org/jira/browse/FLINK-30570?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aitozi updated FLINK-30570: --- Description: Currently, the condition {{where rand(1) < 0.0001}} will be recognized as a partition predicates and will be evaluated to false when compiling the SQL. It has two problem. First, it should not be recognized as a partition predicates, and the nondeterministic function should never pass the partition pruner was: Currently, the condition {{where rand(1) > 0.0125}} will be recognized as a partition predicates and will be evaluated to false when compiling the SQL. It has two problem. First, it should not be recognized as a partition predicates, and the nondeterministic function should never pass the partition pruner > RexNodeExtractor#isSupportedPartitionPredicate generates unexpected partition > predicates > > > Key: FLINK-30570 > URL: https://issues.apache.org/jira/browse/FLINK-30570 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Reporter: Aitozi >Priority: Major > > Currently, the condition {{where rand(1) < 0.0001}} will be recognized as a > partition predicates and will be evaluated to false when compiling the SQL. > It has two problem. > First, it should not be recognized as a partition predicates, and the > nondeterministic function should never pass the partition pruner -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-30570) RexNodeExtractor#isSupportedPartitionPredicate generates unexpected partition predicates
Aitozi created FLINK-30570: -- Summary: RexNodeExtractor#isSupportedPartitionPredicate generates unexpected partition predicates Key: FLINK-30570 URL: https://issues.apache.org/jira/browse/FLINK-30570 Project: Flink Issue Type: Bug Components: Table SQL / Planner Reporter: Aitozi Currently, the condition {{where rand(1) > 0.0125}} will be recognized as a partition predicates and will be evaluated to false when compiling the SQL. It has two problem. First, it should not be recognized as a partition predicates, and the nondeterministic function should never pass the partition pruner -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-30551) Add open method to PartitionCommitPolicy
[ https://issues.apache.org/jira/browse/FLINK-30551?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17653949#comment-17653949 ] Aitozi commented on FLINK-30551: cc [~gaoyunhaii] what do you think of this ? > Add open method to PartitionCommitPolicy > > > Key: FLINK-30551 > URL: https://issues.apache.org/jira/browse/FLINK-30551 > Project: Flink > Issue Type: Improvement > Components: Connectors / FileSystem >Reporter: Aitozi >Priority: Major > > Currently, the {{PartitionCommitPolicy}} do not have the open hook. The > custom partition commit policy does not have an appropriate entry point for > the init work. > So I purpose to add an {{open}} method to make this work. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-30551) Add open method to PartitionCommitPolicy
Aitozi created FLINK-30551: -- Summary: Add open method to PartitionCommitPolicy Key: FLINK-30551 URL: https://issues.apache.org/jira/browse/FLINK-30551 Project: Flink Issue Type: Improvement Components: Connectors / FileSystem Reporter: Aitozi Currently, the {{PartitionCommitPolicy}} do not have the open hook. The custom partition commit policy does not have an appropriate entry point for the init work. So I purpose to add an {{open}} method to make this work. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Closed] (FLINK-29085) Add the name for test as hint for the current test case in BuiltInFunctionTestBase
[ https://issues.apache.org/jira/browse/FLINK-29085?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aitozi closed FLINK-29085. -- Resolution: Won't Fix > Add the name for test as hint for the current test case in > BuiltInFunctionTestBase > -- > > Key: FLINK-29085 > URL: https://issues.apache.org/jira/browse/FLINK-29085 > Project: Flink > Issue Type: Improvement > Components: Tests >Reporter: Aitozi >Priority: Minor > > when running tests extends the {{BuiltInFunctionTestBase}}, I found it's hard > to distinguish the failure tests, I think it will be easy to add the name > prefix for the {{TestItem}} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Closed] (FLINK-29066) Reconsider the runtime property of the BuiltInFunctionDefinition
[ https://issues.apache.org/jira/browse/FLINK-29066?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aitozi closed FLINK-29066. -- Resolution: Won't Fix > Reconsider the runtime property of the BuiltInFunctionDefinition > > > Key: FLINK-29066 > URL: https://issues.apache.org/jira/browse/FLINK-29066 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Planner >Reporter: Aitozi >Priority: Major > Labels: pull-request-available > > I found a bit confused when implementing the inner built in functions when > dealing with the runtime property. Currently, it has three types of the > runtime property: > 1) runtimeclass which means flink provide a class to define the runtime > implementation > 2) runtimeProvider which means the runtime class is code generated > 3) runtimeDefered which means it will use the calcite's sql operator to > mapping the codegen > After some research, I found that we have 4 situations to deal: > 1) non new stack operators. > 2) new stack with own runtime class provided. eg: {{IFNULL}} -> runtimeClass > 3) new stack translate to sql operator to provide runtime call gen. > eg:{{IS_NOT_TRUE}} -> runtimeDefered > 4) new stack can not mapping to calcite's operator (mainly flink internal > functions) without runtime class need mapping to the runtime callgen. eg: > {{CURRENT_WATERMARK}}. -> runtimeProvided -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-30198) Support AdaptiveBatchScheduler to set per-task size for reducer task
[ https://issues.apache.org/jira/browse/FLINK-30198?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17648099#comment-17648099 ] Aitozi commented on FLINK-30198: Thanks, all guys, for your input. I agree that vertex-level tuning will be more complex. I think a pluggable {{VertexParallelismDecider}} is a good choice. Maybe we can also provide some information about the vertex, eg: the vertex type {{Calc, Join, Local/Global Aggregate...}} to the interface to let users do more suitable choices. > Support AdaptiveBatchScheduler to set per-task size for reducer task > - > > Key: FLINK-30198 > URL: https://issues.apache.org/jira/browse/FLINK-30198 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Reporter: Aitozi >Priority: Major > > When we use AdaptiveBatchScheduler in our case, we found that it can work > well in most case, but there is a limit that, there is only one global > parameter for per task data size by > {{jobmanager.adaptive-batch-scheduler.avg-data-volume-per-task}}. > However, in a map-reduce architecture, the reducer tasks are usually have > more complex computation logic such as aggregate/sort/join operators. So I > think it will be nicer if we can set the reducer and mapper task's data size > per task individually. > Then, how to distinguish the reducer task? > IMO, we can let the parallelism decider know whether the vertex have a hash > edge inputs. If yes, it should be a reducer task. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-30242) Push localHashAggregate pass the union node
[ https://issues.apache.org/jira/browse/FLINK-30242?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17641636#comment-17641636 ] Aitozi commented on FLINK-30242: > which scenario do you meet this problem? I met this when we ran the batch job with flink-1.15. There is an unnecessary shuffle between the map node and local hash aggregate, and this will impact the performance. It's not an urgent need. We have already added a rule to support the local hash aggregate and union transpose. And it's a common optimization rule. So, it would be nice to contribute it upstream. If you have already done the work, you can take this ticket. If not, I can prepare a PR for this. BTW, I post two images to describe our use case problem Thanks. > Push localHashAggregate pass the union node > --- > > Key: FLINK-30242 > URL: https://issues.apache.org/jira/browse/FLINK-30242 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Planner >Reporter: Aitozi >Priority: Major > Attachments: screenshot-1.png, screenshot-2.png > > > The local hash aggregate after union will have an extra shuffle stage. We can > swap it with the union node so the local hash aggregate can chain with the > mapper stage saving the unnecessary shuffle, especially in the batch job. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-30242) Push localHashAggregate pass the union node
[ https://issues.apache.org/jira/browse/FLINK-30242?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aitozi updated FLINK-30242: --- Attachment: screenshot-2.png > Push localHashAggregate pass the union node > --- > > Key: FLINK-30242 > URL: https://issues.apache.org/jira/browse/FLINK-30242 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Planner >Reporter: Aitozi >Priority: Major > Attachments: screenshot-1.png, screenshot-2.png > > > The local hash aggregate after union will have an extra shuffle stage. We can > swap it with the union node so the local hash aggregate can chain with the > mapper stage saving the unnecessary shuffle, especially in the batch job. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-30242) Push localHashAggregate pass the union node
[ https://issues.apache.org/jira/browse/FLINK-30242?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aitozi updated FLINK-30242: --- Attachment: (was: screenshot-1.png) > Push localHashAggregate pass the union node > --- > > Key: FLINK-30242 > URL: https://issues.apache.org/jira/browse/FLINK-30242 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Planner >Reporter: Aitozi >Priority: Major > Attachments: screenshot-1.png > > > The local hash aggregate after union will have an extra shuffle stage. We can > swap it with the union node so the local hash aggregate can chain with the > mapper stage saving the unnecessary shuffle, especially in the batch job. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-30242) Push localHashAggregate pass the union node
[ https://issues.apache.org/jira/browse/FLINK-30242?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aitozi updated FLINK-30242: --- Attachment: screenshot-1.png > Push localHashAggregate pass the union node > --- > > Key: FLINK-30242 > URL: https://issues.apache.org/jira/browse/FLINK-30242 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Planner >Reporter: Aitozi >Priority: Major > Attachments: screenshot-1.png > > > The local hash aggregate after union will have an extra shuffle stage. We can > swap it with the union node so the local hash aggregate can chain with the > mapper stage saving the unnecessary shuffle, especially in the batch job. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-30242) Push localHashAggregate pass the union node
[ https://issues.apache.org/jira/browse/FLINK-30242?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aitozi updated FLINK-30242: --- Attachment: screenshot-1.png > Push localHashAggregate pass the union node > --- > > Key: FLINK-30242 > URL: https://issues.apache.org/jira/browse/FLINK-30242 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Planner >Reporter: Aitozi >Priority: Major > Attachments: screenshot-1.png > > > The local hash aggregate after union will have an extra shuffle stage. We can > swap it with the union node so the local hash aggregate can chain with the > mapper stage saving the unnecessary shuffle, especially in the batch job. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-30242) Push localHashAggregate pass the union node
[ https://issues.apache.org/jira/browse/FLINK-30242?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=1764#comment-1764 ] Aitozi commented on FLINK-30242: Hi [~Runking], Sorry I have not noticed that. If you have already done, you can take this ticket :) BTW, can you share your presentation link, I'd like to take a look > Push localHashAggregate pass the union node > --- > > Key: FLINK-30242 > URL: https://issues.apache.org/jira/browse/FLINK-30242 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Planner >Reporter: Aitozi >Priority: Major > > The local hash aggregate after union will have an extra shuffle stage. We can > swap it with the union node so the local hash aggregate can chain with the > mapper stage saving the unnecessary shuffle, especially in the batch job. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-30242) Push localHashAggregate pass the union node
Aitozi created FLINK-30242: -- Summary: Push localHashAggregate pass the union node Key: FLINK-30242 URL: https://issues.apache.org/jira/browse/FLINK-30242 Project: Flink Issue Type: Improvement Components: Table SQL / Planner Reporter: Aitozi The local hash aggregate after union will have an extra shuffle stage. We can swap it with the union node so the local hash aggregate can chain with the mapper stage saving the unnecessary shuffle, especially in the batch job. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-30198) Support AdaptiveBatchScheduler to set per-task size for reducer task
[ https://issues.apache.org/jira/browse/FLINK-30198?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17638331#comment-17638331 ] Aitozi commented on FLINK-30198: cc [~wanglijie] what do you think of this ? > Support AdaptiveBatchScheduler to set per-task size for reducer task > - > > Key: FLINK-30198 > URL: https://issues.apache.org/jira/browse/FLINK-30198 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Reporter: Aitozi >Priority: Major > > When we use AdaptiveBatchScheduler in our case, we found that it can work > well in most case, but there is a limit that, there is only one global > parameter for per task data size by > {{jobmanager.adaptive-batch-scheduler.avg-data-volume-per-task}}. > However, in a map-reduce architecture, the reducer tasks are usually have > more complex computation logic such as aggregate/sort/join operators. So I > think it will be nicer if we can set the reducer and mapper task's data size > per task individually. > Then, how to distinguish the reducer task? > IMO, we can let the parallelism decider know whether the vertex have a hash > edge inputs. If yes, it should be a reducer task. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-30198) Support AdaptiveBatchScheduler to set per-task size for reducer task
[ https://issues.apache.org/jira/browse/FLINK-30198?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aitozi updated FLINK-30198: --- Description: When we use AdaptiveBatchScheduler in our case, we found that it can work well in most case, but there is a limit that, there is only one global parameter for per task data size by {{jobmanager.adaptive-batch-scheduler.avg-data-volume-per-task}}. However, in a map-reduce architecture, the reducer tasks are usually have more complex computation logic such as aggregate/sort/join operators. So I think it will be nicer if we can set the reducer and mapper task's data size per task individually. Then, how to distinguish the reducer task, IMO, we can let the parallelism decider know whether the vertex have a hash edge inputs. If yes, it should be a reducer task. was: When we use AdaptiveBatchScheduler in our case, we found that it can work well in most case, but there is a limit that, there is only one global parameter for per task data size by {{jobmanager.adaptive-batch-scheduler.avg-data-volume-per-task}}. However, in a map-reduce architecture, the reducer tasks are usually have more complex computation logic such as aggregate/sort/join operators. So I think it will be nicer we can set the reducer and mapper task's data size per task individually. Then, how to distinguish the reducer task, IMO, we can let the parallelism decider know whether the vertex have a hash edge inputs. If yes, it should be a reducer task. > Support AdaptiveBatchScheduler to set per-task size for reducer task > - > > Key: FLINK-30198 > URL: https://issues.apache.org/jira/browse/FLINK-30198 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Reporter: Aitozi >Priority: Major > > When we use AdaptiveBatchScheduler in our case, we found that it can work > well in most case, but there is a limit that, there is only one global > parameter for per task data size by > {{jobmanager.adaptive-batch-scheduler.avg-data-volume-per-task}}. > However, in a map-reduce architecture, the reducer tasks are usually have > more complex computation logic such as aggregate/sort/join operators. So I > think it will be nicer if we can set the reducer and mapper task's data size > per task individually. > Then, how to distinguish the reducer task, IMO, we can let the parallelism > decider know whether the vertex have a hash edge inputs. If yes, it should be > a reducer task. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-30198) Support AdaptiveBatchScheduler to set per-task size for reducer task
[ https://issues.apache.org/jira/browse/FLINK-30198?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aitozi updated FLINK-30198: --- Description: When we use AdaptiveBatchScheduler in our case, we found that it can work well in most case, but there is a limit that, there is only one global parameter for per task data size by {{jobmanager.adaptive-batch-scheduler.avg-data-volume-per-task}}. However, in a map-reduce architecture, the reducer tasks are usually have more complex computation logic such as aggregate/sort/join operators. So I think it will be nicer if we can set the reducer and mapper task's data size per task individually. Then, how to distinguish the reducer task? IMO, we can let the parallelism decider know whether the vertex have a hash edge inputs. If yes, it should be a reducer task. was: When we use AdaptiveBatchScheduler in our case, we found that it can work well in most case, but there is a limit that, there is only one global parameter for per task data size by {{jobmanager.adaptive-batch-scheduler.avg-data-volume-per-task}}. However, in a map-reduce architecture, the reducer tasks are usually have more complex computation logic such as aggregate/sort/join operators. So I think it will be nicer if we can set the reducer and mapper task's data size per task individually. Then, how to distinguish the reducer task, IMO, we can let the parallelism decider know whether the vertex have a hash edge inputs. If yes, it should be a reducer task. > Support AdaptiveBatchScheduler to set per-task size for reducer task > - > > Key: FLINK-30198 > URL: https://issues.apache.org/jira/browse/FLINK-30198 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Reporter: Aitozi >Priority: Major > > When we use AdaptiveBatchScheduler in our case, we found that it can work > well in most case, but there is a limit that, there is only one global > parameter for per task data size by > {{jobmanager.adaptive-batch-scheduler.avg-data-volume-per-task}}. > However, in a map-reduce architecture, the reducer tasks are usually have > more complex computation logic such as aggregate/sort/join operators. So I > think it will be nicer if we can set the reducer and mapper task's data size > per task individually. > Then, how to distinguish the reducer task? > IMO, we can let the parallelism decider know whether the vertex have a hash > edge inputs. If yes, it should be a reducer task. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-30198) Support AdaptiveBatchScheduler to set per-task size for reducer task
Aitozi created FLINK-30198: -- Summary: Support AdaptiveBatchScheduler to set per-task size for reducer task Key: FLINK-30198 URL: https://issues.apache.org/jira/browse/FLINK-30198 Project: Flink Issue Type: Improvement Components: Runtime / Coordination Reporter: Aitozi When we use AdaptiveBatchScheduler in our case, we found that it can work well in most case, but there is a limit that, there is only one global parameter for per task data size by {{jobmanager.adaptive-batch-scheduler.avg-data-volume-per-task}}. However, in a map-reduce architecture, the reducer tasks are usually have more complex computation logic such as aggregate/sort/join operators. So I think it will be nicer we can set the reducer and mapper task's data size per task individually. Then, how to distinguish the reducer task, IMO, we can let the parallelism decider know whether the vertex have a hash edge inputs. If yes, it should be a reducer task. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-29585) Migrate TableSchema to Schema for Hive connector
[ https://issues.apache.org/jira/browse/FLINK-29585?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17634112#comment-17634112 ] Aitozi commented on FLINK-29585: fyi, I'm trying to work on this now. I will ping you for review after I finished this, thanks > Migrate TableSchema to Schema for Hive connector > > > Key: FLINK-29585 > URL: https://issues.apache.org/jira/browse/FLINK-29585 > Project: Flink > Issue Type: Sub-task > Components: Connectors / Hive >Reporter: luoyuxia >Priority: Major > > `TableSchema` is deprecated, we should migrate it to Schema -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-25113) Cleanup from Parquet and Orc the partition key handling logic
[ https://issues.apache.org/jira/browse/FLINK-25113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17633829#comment-17633829 ] Aitozi commented on FLINK-25113: Hi [~slinkydeveloper], [~luoyuxia] , [~lsy] I have push a [PR|https://github.com/apache/flink/pull/21290] for this ticket, can you guys help review it. > Cleanup from Parquet and Orc the partition key handling logic > - > > Key: FLINK-25113 > URL: https://issues.apache.org/jira/browse/FLINK-25113 > Project: Flink > Issue Type: Sub-task > Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) >Reporter: Francesco Guardiani >Priority: Major > > After https://issues.apache.org/jira/browse/FLINK-24617 the partition key > handling logic is encapsuled within {{FileInfoExtractorBulkFormat}}. We > should cleanup this logic from orc and parquet formats, in order to simplify > it. Note: Hive still depends on this logic, but it should rather use > {{FileInfoExtractorBulkFormat}} or similar. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-30019) Remove the unused HiveTableFileInputFormat in hive connector
[ https://issues.apache.org/jira/browse/FLINK-30019?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17633826#comment-17633826 ] Aitozi commented on FLINK-30019: cc [~luoyuxia], [~lsy] can you help review this PR ? > Remove the unused HiveTableFileInputFormat in hive connector > > > Key: FLINK-30019 > URL: https://issues.apache.org/jira/browse/FLINK-30019 > Project: Flink > Issue Type: Technical Debt > Components: Connectors / Hive >Reporter: Aitozi >Priority: Major > Labels: pull-request-available > > As I see, after https://issues.apache.org/jira/browse/FLINK-19888 the hive > connector do not reply on the HiveTableFileInputFormat now, it can be safely > removed now -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-30019) Remove the unused HiveTableFileInputFormat in hive connector
Aitozi created FLINK-30019: -- Summary: Remove the unused HiveTableFileInputFormat in hive connector Key: FLINK-30019 URL: https://issues.apache.org/jira/browse/FLINK-30019 Project: Flink Issue Type: Technical Debt Components: Connectors / Hive Reporter: Aitozi As I see, after https://issues.apache.org/jira/browse/FLINK-19888 the hive connector do not reply on the HiveTableFileInputFormat now, it can be safely removed now -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-25113) Cleanup from Parquet and Orc the partition key handling logic
[ https://issues.apache.org/jira/browse/FLINK-25113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17632146#comment-17632146 ] Aitozi commented on FLINK-25113: When I try to work on this, I found that I can't simply break this to two separate work, because the partition keys in the parquet/orc formats will affect the hive source after using the {{FileInfoExtractorBulkFormat}}. So, I create a PR with these two commits to complete this work. > Cleanup from Parquet and Orc the partition key handling logic > - > > Key: FLINK-25113 > URL: https://issues.apache.org/jira/browse/FLINK-25113 > Project: Flink > Issue Type: Sub-task > Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) >Reporter: Francesco Guardiani >Priority: Major > > After https://issues.apache.org/jira/browse/FLINK-24617 the partition key > handling logic is encapsuled within {{FileInfoExtractorBulkFormat}}. We > should cleanup this logic from orc and parquet formats, in order to simplify > it. Note: Hive still depends on this logic, but it should rather use > {{FileInfoExtractorBulkFormat}} or similar. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-25113) Cleanup from Parquet and Orc the partition key handling logic
[ https://issues.apache.org/jira/browse/FLINK-25113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17631679#comment-17631679 ] Aitozi commented on FLINK-25113: sorry for missing the ticket link :) https://issues.apache.org/jira/browse/FLINK-29980 > Cleanup from Parquet and Orc the partition key handling logic > - > > Key: FLINK-25113 > URL: https://issues.apache.org/jira/browse/FLINK-25113 > Project: Flink > Issue Type: Sub-task > Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) >Reporter: Francesco Guardiani >Priority: Major > > After https://issues.apache.org/jira/browse/FLINK-24617 the partition key > handling logic is encapsuled within {{FileInfoExtractorBulkFormat}}. We > should cleanup this logic from orc and parquet formats, in order to simplify > it. Note: Hive still depends on this logic, but it should rather use > {{FileInfoExtractorBulkFormat}} or similar. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-25113) Cleanup from Parquet and Orc the partition key handling logic
[ https://issues.apache.org/jira/browse/FLINK-25113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17631632#comment-17631632 ] Aitozi commented on FLINK-25113: hi [~slinkydeveloper], I created a preceding ticket to improve the hive source to handle the partition keys. I'd like to work on it, can you help assign the ticket to me ? > Cleanup from Parquet and Orc the partition key handling logic > - > > Key: FLINK-25113 > URL: https://issues.apache.org/jira/browse/FLINK-25113 > Project: Flink > Issue Type: Sub-task > Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) >Reporter: Francesco Guardiani >Priority: Major > > After https://issues.apache.org/jira/browse/FLINK-24617 the partition key > handling logic is encapsuled within {{FileInfoExtractorBulkFormat}}. We > should cleanup this logic from orc and parquet formats, in order to simplify > it. Note: Hive still depends on this logic, but it should rather use > {{FileInfoExtractorBulkFormat}} or similar. -- This message was sent by Atlassian Jira (v8.20.10#820010)