[jira] [Commented] (FLINK-32608) Improve source reusing with projection push down

2023-08-11 Thread Aitozi (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32608?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17753066#comment-17753066
 ] 

Aitozi commented on FLINK-32608:


I think this issue is duplicated of 
https://issues.apache.org/jira/browse/FLINK-29088

> Improve source reusing with projection push down
> 
>
> Key: FLINK-32608
> URL: https://issues.apache.org/jira/browse/FLINK-32608
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Planner
>Reporter: dalongliu
>Assignee: dalongliu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.18.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32699) select typeof(proctime()); throw exception in sql-client

2023-08-02 Thread Aitozi (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32699?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17750530#comment-17750530
 ] 

Aitozi commented on FLINK-32699:


Hi [~jackylau], sorry for not notice this ticket, I have opened another ticket 
https://issues.apache.org/jira/projects/FLINK/issues/FLINK-32711 about this, 
could you help also take a review for that ?

> select typeof(proctime()); throw exception in sql-client
> 
>
> Key: FLINK-32699
> URL: https://issues.apache.org/jira/browse/FLINK-32699
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Affects Versions: 1.18.0
>Reporter: Jacky Lau
>Priority: Major
> Fix For: 1.18.0
>
>
> {code:java}
> Flink SQL> select typeof(proctime()); 
>  
> [ERROR] Could not execute SQL statement. Reason: 
> org.apache.flink.table.planner.codegen.CodeGenException: Mismatch of 
> function's argument data type 'TIMESTAMP_LTZ(3) NOT NULL' and actual argument 
> type 'TIMESTAMP_LTZ(3)'
>  {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32659) DB connection may leak if exception is thrown in JdbcOutputFormat#close

2023-07-31 Thread Aitozi (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32659?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17749428#comment-17749428
 ] 

Aitozi commented on FLINK-32659:


Can someone help review the fix: 
https://github.com/apache/flink-connector-jdbc/pull/71 

> DB connection may leak if exception is thrown in JdbcOutputFormat#close
> ---
>
> Key: FLINK-32659
> URL: https://issues.apache.org/jira/browse/FLINK-32659
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / JDBC
>Reporter: Aitozi
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32711) Type mismatch when proctime function used as parameter

2023-07-31 Thread Aitozi (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32711?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17749412#comment-17749412
 ] 

Aitozi commented on FLINK-32711:


I pushed a fix for this, anyone can help review ?

> Type mismatch when proctime function used as parameter
> --
>
> Key: FLINK-32711
> URL: https://issues.apache.org/jira/browse/FLINK-32711
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.18.0
>Reporter: Aitozi
>Priority: Major
>  Labels: pull-request-available
>
> reproduce case:
> {code:sql}
> SELECT TYPEOF(PROCTIME())
> {code}
> this query will fail with 
> org.apache.flink.table.planner.codegen.CodeGenException: Mismatch of 
> function's argument data type 'TIMESTAMP_LTZ(3) NOT NULL' and actual argument 
> type 'TIMESTAMP_LTZ(3)'.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-32711) Type mismatch when proctime function used as parameter

2023-07-28 Thread Aitozi (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32711?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aitozi updated FLINK-32711:
---
Issue Type: Bug  (was: Improvement)

> Type mismatch when proctime function used as parameter
> --
>
> Key: FLINK-32711
> URL: https://issues.apache.org/jira/browse/FLINK-32711
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.18.0
>Reporter: Aitozi
>Priority: Major
>
> reproduce case:
> {code:sql}
> SELECT TYPEOF(PROCTIME())
> {code}
> this query will fail with 
> org.apache.flink.table.planner.codegen.CodeGenException: Mismatch of 
> function's argument data type 'TIMESTAMP_LTZ(3) NOT NULL' and actual argument 
> type 'TIMESTAMP_LTZ(3)'.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32711) Type mismatch when proctime function used as parameter

2023-07-28 Thread Aitozi (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32711?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17748626#comment-17748626
 ] 

Aitozi commented on FLINK-32711:


It's caused by the hard coded nullable result type of PROCTIME_MATERIALIZE 
codegen.

> Type mismatch when proctime function used as parameter
> --
>
> Key: FLINK-32711
> URL: https://issues.apache.org/jira/browse/FLINK-32711
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Affects Versions: 1.18.0
>Reporter: Aitozi
>Priority: Major
>
> reproduce case:
> {code:sql}
> SELECT TYPEOF(PROCTIME())
> {code}
> this query will fail with 
> org.apache.flink.table.planner.codegen.CodeGenException: Mismatch of 
> function's argument data type 'TIMESTAMP_LTZ(3) NOT NULL' and actual argument 
> type 'TIMESTAMP_LTZ(3)'.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32711) Type mismatch when proctime function used as parameter

2023-07-28 Thread Aitozi (Jira)
Aitozi created FLINK-32711:
--

 Summary: Type mismatch when proctime function used as parameter
 Key: FLINK-32711
 URL: https://issues.apache.org/jira/browse/FLINK-32711
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / Planner
Affects Versions: 1.18.0
Reporter: Aitozi


reproduce case:

{code:sql}
SELECT TYPEOF(PROCTIME())
{code}

this query will fail with 

org.apache.flink.table.planner.codegen.CodeGenException: Mismatch of function's 
argument data type 'TIMESTAMP_LTZ(3) NOT NULL' and actual argument type 
'TIMESTAMP_LTZ(3)'.





--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32659) DB connection may leak if exception is thrown in JdbcOutputFormat#close

2023-07-24 Thread Aitozi (Jira)
Aitozi created FLINK-32659:
--

 Summary: DB connection may leak if exception is thrown in 
JdbcOutputFormat#close
 Key: FLINK-32659
 URL: https://issues.apache.org/jira/browse/FLINK-32659
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / JDBC
Reporter: Aitozi






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32320) Same correlate can not be reused due to the different correlationId

2023-06-12 Thread Aitozi (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32320?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17731832#comment-17731832
 ] 

Aitozi commented on FLINK-32320:


[~libenchao] Thanks for your attention. I just made a quick fix on calcite side 
when creating a new correlationId. If in the same scope and same identifier, 
using the same correlationId as before. It works as expected. What do you think 
of this solution ? 

> Same correlate can not be reused due to the different correlationId
> ---
>
> Key: FLINK-32320
> URL: https://issues.apache.org/jira/browse/FLINK-32320
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Reporter: Aitozi
>Priority: Major
>
> As describe in SubplanReuserTest
> {code:java}
>   @Test
>   def testSubplanReuseOnCorrelate(): Unit = {
> util.addFunction("str_split", new StringSplit())
> val sqlQuery =
>   """
> |WITH r AS (SELECT a, b, c, v FROM x, LATERAL TABLE(str_split(c, 
> '-')) AS T(v))
> |SELECT * FROM r r1, r r2 WHERE r1.v = r2.v
>   """.stripMargin
> // TODO the sub-plan of Correlate should be reused,
> // however the digests of Correlates are different
> util.verifyExecPlan(sqlQuery)
>   }
> {code}
> This will produce the plan 
> {code:java}
> HashJoin(joinType=[InnerJoin], where=[(f0 = f00)], select=[a, b, c, f0, a0, 
> b0, c0, f00], build=[right])
> :- Exchange(distribution=[hash[f0]])
> :  +- Correlate(invocation=[str_split($cor0.c, _UTF-16LE'-')], 
> correlate=[table(str_split($cor0.c,'-'))], select=[a,b,c,f0], 
> rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, 
> VARCHAR(2147483647) f0)], joinType=[INNER])
> : +- LegacyTableSourceScan(table=[[default_catalog, default_database, x, 
> source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
> +- Exchange(distribution=[hash[f0]])
>+- Correlate(invocation=[str_split($cor1.c, _UTF-16LE'-')], 
> correlate=[table(str_split($cor1.c,'-'))], select=[a,b,c,f0], 
> rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, 
> VARCHAR(2147483647) f0)], joinType=[INNER])
>   +- LegacyTableSourceScan(table=[[default_catalog, default_database, x, 
> source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
> {code}
> The Correlate node can not be reused due to the different correlation id.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32320) Same correlate can not be reused due to the different correlationId

2023-06-12 Thread Aitozi (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32320?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17731823#comment-17731823
 ] 

Aitozi commented on FLINK-32320:


In production, multi sink job are very common, if the table from UDTF is 
queried multi times, it will cause the function to be executed multi times(as 
shown in the plan). This will lead to bad performance. 

After some research, I think it's should be caused by: During sqlToRel process, 
the table function sqlNode will be `toRel` multi times and leads to different 
correlationId.


> Same correlate can not be reused due to the different correlationId
> ---
>
> Key: FLINK-32320
> URL: https://issues.apache.org/jira/browse/FLINK-32320
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Reporter: Aitozi
>Priority: Major
>
> As describe in SubplanReuserTest
> {code:java}
>   @Test
>   def testSubplanReuseOnCorrelate(): Unit = {
> util.addFunction("str_split", new StringSplit())
> val sqlQuery =
>   """
> |WITH r AS (SELECT a, b, c, v FROM x, LATERAL TABLE(str_split(c, 
> '-')) AS T(v))
> |SELECT * FROM r r1, r r2 WHERE r1.v = r2.v
>   """.stripMargin
> // TODO the sub-plan of Correlate should be reused,
> // however the digests of Correlates are different
> util.verifyExecPlan(sqlQuery)
>   }
> {code}
> This will produce the plan 
> {code:java}
> HashJoin(joinType=[InnerJoin], where=[(f0 = f00)], select=[a, b, c, f0, a0, 
> b0, c0, f00], build=[right])
> :- Exchange(distribution=[hash[f0]])
> :  +- Correlate(invocation=[str_split($cor0.c, _UTF-16LE'-')], 
> correlate=[table(str_split($cor0.c,'-'))], select=[a,b,c,f0], 
> rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, 
> VARCHAR(2147483647) f0)], joinType=[INNER])
> : +- LegacyTableSourceScan(table=[[default_catalog, default_database, x, 
> source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
> +- Exchange(distribution=[hash[f0]])
>+- Correlate(invocation=[str_split($cor1.c, _UTF-16LE'-')], 
> correlate=[table(str_split($cor1.c,'-'))], select=[a,b,c,f0], 
> rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, 
> VARCHAR(2147483647) f0)], joinType=[INNER])
>   +- LegacyTableSourceScan(table=[[default_catalog, default_database, x, 
> source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
> {code}
> The Correlate node can not be reused due to the different correlation id.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32320) Same correlate can not be reused due to the different correlationId

2023-06-12 Thread Aitozi (Jira)
Aitozi created FLINK-32320:
--

 Summary: Same correlate can not be reused due to the different 
correlationId
 Key: FLINK-32320
 URL: https://issues.apache.org/jira/browse/FLINK-32320
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / Planner
Reporter: Aitozi


As describe in SubplanReuserTest


{code:java}
  @Test
  def testSubplanReuseOnCorrelate(): Unit = {
util.addFunction("str_split", new StringSplit())
val sqlQuery =
  """
|WITH r AS (SELECT a, b, c, v FROM x, LATERAL TABLE(str_split(c, '-')) 
AS T(v))
|SELECT * FROM r r1, r r2 WHERE r1.v = r2.v
  """.stripMargin
// TODO the sub-plan of Correlate should be reused,
// however the digests of Correlates are different
util.verifyExecPlan(sqlQuery)
  }
{code}

This will produce the plan 


{code:java}
HashJoin(joinType=[InnerJoin], where=[(f0 = f00)], select=[a, b, c, f0, a0, b0, 
c0, f00], build=[right])
:- Exchange(distribution=[hash[f0]])
:  +- Correlate(invocation=[str_split($cor0.c, _UTF-16LE'-')], 
correlate=[table(str_split($cor0.c,'-'))], select=[a,b,c,f0], 
rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, 
VARCHAR(2147483647) f0)], joinType=[INNER])
: +- LegacyTableSourceScan(table=[[default_catalog, default_database, x, 
source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+- Exchange(distribution=[hash[f0]])
   +- Correlate(invocation=[str_split($cor1.c, _UTF-16LE'-')], 
correlate=[table(str_split($cor1.c,'-'))], select=[a,b,c,f0], 
rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, 
VARCHAR(2147483647) f0)], joinType=[INNER])
  +- LegacyTableSourceScan(table=[[default_catalog, default_database, x, 
source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
{code}

The Correlate node can not be reused due to the different correlation id.





--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31704) Pulsar docs should be pulled from dedicated branch

2023-06-07 Thread Aitozi (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31704?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17730162#comment-17730162
 ] 

Aitozi commented on FLINK-31704:


thanks, will try it.

> Pulsar docs should be pulled from dedicated branch
> --
>
> Key: FLINK-31704
> URL: https://issues.apache.org/jira/browse/FLINK-31704
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Connectors / Pulsar, Documentation
>Affects Versions: 1.17.0
>Reporter: Danny Cranmer
>Assignee: Weijie Guo
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.18.0, 1.17.1
>
>
> Pulsar docs are pulled from the {{main}} 
> [branch|https://github.com/apache/flink/blob/release-1.17/docs/setup_docs.sh#L49].
>  This is dangerous for final versions since we may include features in the 
> docs that are not supported. Update Pulsar to pull from a dedicated branch or 
> tag.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31704) Pulsar docs should be pulled from dedicated branch

2023-06-07 Thread Aitozi (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31704?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17730133#comment-17730133
 ] 

Aitozi commented on FLINK-31704:


Hi [~Weijie Guo] [~dannycranmer] I have a CI failed with missing branch 

https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=49737=logs=c5d67f7d-375d-5407-4743-f9d0c4436a81=38411795-40c9-51fa-10b0-bd083cf9f5a5=27

The target 3.0.0-docs do not exists in the flink-connector-pulsar. Does the 
branch name should be change ?

> Pulsar docs should be pulled from dedicated branch
> --
>
> Key: FLINK-31704
> URL: https://issues.apache.org/jira/browse/FLINK-31704
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Connectors / Pulsar, Documentation
>Affects Versions: 1.17.0
>Reporter: Danny Cranmer
>Assignee: Weijie Guo
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.18.0, 1.17.1
>
>
> Pulsar docs are pulled from the {{main}} 
> [branch|https://github.com/apache/flink/blob/release-1.17/docs/setup_docs.sh#L49].
>  This is dangerous for final versions since we may include features in the 
> docs that are not supported. Update Pulsar to pull from a dedicated branch or 
> tag.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32249) A Java string should be used instead of a Calcite NlsString to construct the table and column comment attributes of CatalogTable

2023-06-04 Thread Aitozi (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32249?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17729175#comment-17729175
 ] 

Aitozi commented on FLINK-32249:


It does not reproduce in my local sql client


{code:java}
Flink SQL> CREATE TABLE s1 (
>  order_idSTRING comment '测试中文',
>  price   DECIMAL(32,2) comment _utf8'测试_utf8中文',
>  currencySTRING,
>  order_time  TIMESTAMP(3)
>  ) comment '测试中文table comment' WITH ('connector'='dategen');
[INFO] Execute statement succeed.

Flink SQL> 
> show create table s1;
+-+
|   


   result |
+-+
| CREATE TABLE `default_catalog`.`default_database`.`s1` (
  `order_id` VARCHAR(2147483647) COMMENT '测试中文',
  `price` DECIMAL(32, 2) COMMENT '测试_utf8中文',
  `currency` VARCHAR(2147483647),
  `order_time` TIMESTAMP(3)
) COMMENT '测试中文table comment'
WITH (
  'connector' = 'dategen'
)
 |
+-+
1 row in set

Flink SQL> desc s1;
+++--+-++---+---+
|   name |   type | null | key | extras | watermark |   comment 
|
+++--+-++---+---+
|   order_id | STRING | TRUE | ||   |  测试中文 |
|  price | DECIMAL(32, 2) | TRUE | ||   | 测试_utf8中文 |
|   currency | STRING | TRUE | ||   |   
|
| order_time |   TIMESTAMP(3) | TRUE | ||   |   
|
+++--+-++---+---+
4 rows in set

Flink SQL> 

{code}




> A Java string should be used instead of a Calcite NlsString to construct the 
> table and column comment attributes of CatalogTable
> 
>
> Key: FLINK-32249
> URL: https://issues.apache.org/jira/browse/FLINK-32249
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.17.1
>Reporter: lincoln lee
>Priority: Major
> Fix For: 1.18.0
>
>
> when Flink interacts with CatalogTable, it directly passes the Calcite's 
> NlsString comment as a string to the comment attribute of the schema and 
> column. In theory, a Java string should be passed here, otherwise the 
> CatalogTable implementers may encounter special character encoding issues, 
> e.g., an issue in apache paimon: 
> [https://github.com/apache/incubator-paimon/issues/1262]
> also tested in sql-client:
> {code}
> Flink SQL> CREATE TABLE s1 (
> >     order_id    STRING comment '测试中文',
> >     price       DECIMAL(32,2) comment _utf8'测试_utf8中文',
> >     currency    STRING,
> >     order_time  TIMESTAMP(3)
> > ) comment '测试中文table comment' WITH ('connector'='dategen');
> [INFO] Execute statement succeed.
> Flink SQL> show tables;
> ++
> | table name |
> ++
> |         s1 |
> ++
> 1 row in set
> Flink SQL> desc s1;
> +++--+-++---+-+
> |       name |           type | null | key | extras | watermark |             
>     comment |
> +++--+-++---+-+
> |   order_id |         STRING | TRUE |     |        |           | 
> u&'\6d4b\8bd5\4e2d\6587 |
> |      price | DECIMAL(32, 2) | TRUE |     |        |           |     
> _UTF8'测试_utf8中文 |
> |   currency |         STRING | TRUE |     |        |           |             
>             |
> | order_time |   TIMESTAMP(3) | TRUE |     |        |           |             
>             |
> 

[jira] [Commented] (FLINK-32246) javax.management.InstanceAlreadyExistsException

2023-06-02 Thread Aitozi (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32246?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17728658#comment-17728658
 ] 

Aitozi commented on FLINK-32246:


where do you run this sql ? sql-client ? Can you provide a demo case to 
reproduce this ?

> javax.management.InstanceAlreadyExistsException
> ---
>
> Key: FLINK-32246
> URL: https://issues.apache.org/jira/browse/FLINK-32246
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.15.2
>Reporter: jeff-zou
>Priority: Major
>
> Flink SQL throws an 
> exception(javax.management.InstanceAlreadyExistsException) when trying to 
> perform multiple sink operations on the same kafka source .
>  
> sql example:
> {code:java}
> create table kafka_source() with ('connector'='kafka');
> insert into sink_table1 select * from kafka_source;
> insert into sink_table2 select * from kafka_source; {code}
> The Exception as below:
> {code:java}
> javax.management.InstanceAlreadyExistsException: 
> kafka.admin.client:type=app-info,id=*
>  
> java.management/com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:436)
>  
> java.management/com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1855)
>  
> java.management/com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:955)
>  
> java.management/com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:890)
>  
> java.management/com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:320)
>  
> java.management/com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522)
>  
> org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:64)
>  
> org.apache.kafka.clients.admin.KafkaAdminClient.(KafkaAdminClient.java:500)
>  
> org.apache.kafka.clients.admin.KafkaAdminClient.createInternal(KafkaAdminClient.java:444)
>  org.apache.kafka.clients.admin.Admin.create(Admin.java:59)
>  org.apache.kafka.clients.admin.AdminClient.create(AdminClient.java:39)
>  
> org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.getKafkaAdminClient(KafkaSourceEnumerator.java:410)
>  
> org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.start(KafkaSourceEnumerator.java:151)
>  
> org.apache.flink.runtime.source.coordinator.SourceCoordinator.lambda$start$1(SourceCoordinator.java:209)
>  
> org.apache.flink.runtime.source.coordinator.SourceCoordinator.lambda$runInEventLoop$9(SourceCoordinator.java:406)
>  
> org.apache.flink.util.ThrowableCatchingRunnable.run(ThrowableCatchingRunnable.java:40)
>  
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
>  java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
>  
> java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
>  
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>  
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>  java.base/java.lang.Thread.run(Thread.java:829) {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31828) List field in a POJO data stream results in table program compilation failure

2023-05-23 Thread Aitozi (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31828?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17725311#comment-17725311
 ] 

Aitozi commented on FLINK-31828:


Any sql guys can help verify this fix ?

> List field in a POJO data stream results in table program compilation failure
> -
>
> Key: FLINK-31828
> URL: https://issues.apache.org/jira/browse/FLINK-31828
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.16.1
> Environment: Java 11
> Flink 1.16.1
>Reporter: Vladimir Matveev
>Priority: Major
>  Labels: pull-request-available
> Attachments: MainPojo.java, generated-code.txt, stacktrace.txt
>
>
> Suppose I have a POJO class like this:
> {code:java}
> public class Example {
> private String key;
> private List> values;
> // getters, setters, equals+hashCode omitted
> }
> {code}
> When a DataStream with this class is converted to a table, and some 
> operations are performed on it, it results in an exception which explicitly 
> says that I should file a ticket:
> {noformat}
> Caused by: org.apache.flink.api.common.InvalidProgramException: Table program 
> cannot be compiled. This is a bug. Please file an issue.
> {noformat}
> Please find the example Java code and the full stack trace attached.
> From the exception and generated code it seems that Flink is upset with the 
> list field being treated as an array - but I cannot have an array type there 
> in the real code.
> Also note that if I _don't_ specify the schema explicitly, it then maps the 
> {{values}} field to a `RAW('java.util.List', '...')` type, which also does 
> not work correctly and fails the job in case of even simplest operations like 
> printing.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31835) DataTypeHint don't support Row>

2023-04-25 Thread Aitozi (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31835?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17716538#comment-17716538
 ] 

Aitozi commented on FLINK-31835:


[~jark] The PR have passed the CI and I think the current solution will not 
cause compatibility problem by only fix the conversion class according to the 
nullability when creating CollectionDataType, could you help review that ?

> DataTypeHint don't support Row>
> 
>
> Key: FLINK-31835
> URL: https://issues.apache.org/jira/browse/FLINK-31835
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.15.4
>Reporter: jeff-zou
>Priority: Major
>  Labels: pull-request-available
>
> Using DataTypeHint("Row>") in a UDF gives the following error:
>  
> {code:java}
> Caused by: java.lang.ClassCastException: class [I cannot be cast to class 
> [Ljava.lang.Object; ([I and [Ljava.lang.Object; are in module java.base of 
> loader 'bootstrap')
> org.apache.flink.table.data.conversion.ArrayObjectArrayConverter.toInternal(ArrayObjectArrayConverter.java:40)
> org.apache.flink.table.data.conversion.DataStructureConverter.toInternalOrNull(DataStructureConverter.java:61)
> org.apache.flink.table.data.conversion.RowRowConverter.toInternal(RowRowConverter.java:75)
> org.apache.flink.table.data.conversion.RowRowConverter.toInternal(RowRowConverter.java:37)
> org.apache.flink.table.data.conversion.DataStructureConverter.toInternalOrNull(DataStructureConverter.java:61)
> StreamExecCalc$251.processElement_split9(Unknown Source)
> StreamExecCalc$251.processElement(Unknown Source)
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82)
>  {code}
>  
> The function is as follows:
> {code:java}
> @DataTypeHint("Row>")
> public Row eval() {
> int[] i = new int[3];
> return Row.of(i);
> } {code}
>  
> This error is not reported when testing other simple types, so it is not an 
> environmental problem.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31835) DataTypeHint don't support Row>

2023-04-25 Thread Aitozi (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31835?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17716209#comment-17716209
 ] 

Aitozi commented on FLINK-31835:


Yes, the reason is shown above. I have pushed a PR to try to solve this. But it 
needs some discussion to avoid break the compatibility. 

> DataTypeHint don't support Row>
> 
>
> Key: FLINK-31835
> URL: https://issues.apache.org/jira/browse/FLINK-31835
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.15.4
>Reporter: jeff-zou
>Priority: Major
>  Labels: pull-request-available
>
> Using DataTypeHint("Row>") in a UDF gives the following error:
>  
> {code:java}
> Caused by: java.lang.ClassCastException: class [I cannot be cast to class 
> [Ljava.lang.Object; ([I and [Ljava.lang.Object; are in module java.base of 
> loader 'bootstrap')
> org.apache.flink.table.data.conversion.ArrayObjectArrayConverter.toInternal(ArrayObjectArrayConverter.java:40)
> org.apache.flink.table.data.conversion.DataStructureConverter.toInternalOrNull(DataStructureConverter.java:61)
> org.apache.flink.table.data.conversion.RowRowConverter.toInternal(RowRowConverter.java:75)
> org.apache.flink.table.data.conversion.RowRowConverter.toInternal(RowRowConverter.java:37)
> org.apache.flink.table.data.conversion.DataStructureConverter.toInternalOrNull(DataStructureConverter.java:61)
> StreamExecCalc$251.processElement_split9(Unknown Source)
> StreamExecCalc$251.processElement(Unknown Source)
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82)
>  {code}
>  
> The function is as follows:
> {code:java}
> @DataTypeHint("Row>")
> public Row eval() {
> int[] i = new int[3];
> return Row.of(i);
> } {code}
>  
> This error is not reported when testing other simple types, so it is not an 
> environmental problem.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31835) DataTypeHint don't support Row>

2023-04-23 Thread Aitozi (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31835?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17715474#comment-17715474
 ] 

Aitozi commented on FLINK-31835:


The first thought in my mind is that the conversion class for atomic type eg: 
{{IntType}} should respect to the nullability. So, when a {{IntType}} copy from 
nullable to not null. Its default conversionClass will change from {{Integer}} 
to {{int}}.

And in the DataType: {{AtomicType}} and {{CollectionDataType}} should also 
respect to the nullable and notNull call. The conversionClass of the dataType 
should be changed after these call.

I have verified this locally, it can solve this problem, what do you think this 
solution [~jark] ?

> DataTypeHint don't support Row>
> 
>
> Key: FLINK-31835
> URL: https://issues.apache.org/jira/browse/FLINK-31835
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.15.4
>Reporter: jeff-zou
>Priority: Major
>
> Using DataTypeHint("Row>") in a UDF gives the following error:
>  
> {code:java}
> Caused by: java.lang.ClassCastException: class [I cannot be cast to class 
> [Ljava.lang.Object; ([I and [Ljava.lang.Object; are in module java.base of 
> loader 'bootstrap')
> org.apache.flink.table.data.conversion.ArrayObjectArrayConverter.toInternal(ArrayObjectArrayConverter.java:40)
> org.apache.flink.table.data.conversion.DataStructureConverter.toInternalOrNull(DataStructureConverter.java:61)
> org.apache.flink.table.data.conversion.RowRowConverter.toInternal(RowRowConverter.java:75)
> org.apache.flink.table.data.conversion.RowRowConverter.toInternal(RowRowConverter.java:37)
> org.apache.flink.table.data.conversion.DataStructureConverter.toInternalOrNull(DataStructureConverter.java:61)
> StreamExecCalc$251.processElement_split9(Unknown Source)
> StreamExecCalc$251.processElement(Unknown Source)
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82)
>  {code}
>  
> The function is as follows:
> {code:java}
> @DataTypeHint("Row>")
> public Row eval() {
> int[] i = new int[3];
> return Row.of(i);
> } {code}
>  
> This error is not reported when testing other simple types, so it is not an 
> environmental problem.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31835) DataTypeHint don't support Row>

2023-04-23 Thread Aitozi (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31835?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17715466#comment-17715466
 ] 

Aitozi commented on FLINK-31835:


When creating Array CollectionDataType it will use the array's element type to 
construct an array conversion class. But the conversionClass is {{Integer}} for 
both  {{INT NOT NULL}} and {{INT}}. So the conversion class for Array will become {{Integer[]}}


{code:java}
if (logicalType.getTypeRoot() == LogicalTypeRoot.ARRAY && clazz == null) {
return Array.newInstance(elementDataType.getConversionClass(), 
0).getClass();
}
{code}




> DataTypeHint don't support Row>
> 
>
> Key: FLINK-31835
> URL: https://issues.apache.org/jira/browse/FLINK-31835
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.15.4
>Reporter: jeff-zou
>Priority: Major
>
> Using DataTypeHint("Row>") in a UDF gives the following error:
>  
> {code:java}
> Caused by: java.lang.ClassCastException: class [I cannot be cast to class 
> [Ljava.lang.Object; ([I and [Ljava.lang.Object; are in module java.base of 
> loader 'bootstrap')
> org.apache.flink.table.data.conversion.ArrayObjectArrayConverter.toInternal(ArrayObjectArrayConverter.java:40)
> org.apache.flink.table.data.conversion.DataStructureConverter.toInternalOrNull(DataStructureConverter.java:61)
> org.apache.flink.table.data.conversion.RowRowConverter.toInternal(RowRowConverter.java:75)
> org.apache.flink.table.data.conversion.RowRowConverter.toInternal(RowRowConverter.java:37)
> org.apache.flink.table.data.conversion.DataStructureConverter.toInternalOrNull(DataStructureConverter.java:61)
> StreamExecCalc$251.processElement_split9(Unknown Source)
> StreamExecCalc$251.processElement(Unknown Source)
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82)
>  {code}
>  
> The function is as follows:
> {code:java}
> @DataTypeHint("Row>")
> public Row eval() {
> int[] i = new int[3];
> return Row.of(i);
> } {code}
>  
> This error is not reported when testing other simple types, so it is not an 
> environmental problem.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31835) DataTypeHint don't support Row>

2023-04-23 Thread Aitozi (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31835?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17715464#comment-17715464
 ] 

Aitozi commented on FLINK-31835:


In this case, ARRAY will still maps to 
{{ArrayObjectArrayConverter}}. I think it's not expected

> DataTypeHint don't support Row>
> 
>
> Key: FLINK-31835
> URL: https://issues.apache.org/jira/browse/FLINK-31835
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.15.4
>Reporter: jeff-zou
>Priority: Major
>
> Using DataTypeHint("Row>") in a UDF gives the following error:
>  
> {code:java}
> Caused by: java.lang.ClassCastException: class [I cannot be cast to class 
> [Ljava.lang.Object; ([I and [Ljava.lang.Object; are in module java.base of 
> loader 'bootstrap')
> org.apache.flink.table.data.conversion.ArrayObjectArrayConverter.toInternal(ArrayObjectArrayConverter.java:40)
> org.apache.flink.table.data.conversion.DataStructureConverter.toInternalOrNull(DataStructureConverter.java:61)
> org.apache.flink.table.data.conversion.RowRowConverter.toInternal(RowRowConverter.java:75)
> org.apache.flink.table.data.conversion.RowRowConverter.toInternal(RowRowConverter.java:37)
> org.apache.flink.table.data.conversion.DataStructureConverter.toInternalOrNull(DataStructureConverter.java:61)
> StreamExecCalc$251.processElement_split9(Unknown Source)
> StreamExecCalc$251.processElement(Unknown Source)
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82)
>  {code}
>  
> The function is as follows:
> {code:java}
> @DataTypeHint("Row>")
> public Row eval() {
> int[] i = new int[3];
> return Row.of(i);
> } {code}
>  
> This error is not reported when testing other simple types, so it is not an 
> environmental problem.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31828) List field in a POJO data stream results in table program compilation failure

2023-04-23 Thread Aitozi (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31828?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17715451#comment-17715451
 ] 

Aitozi commented on FLINK-31828:


Hi [~netvl] you could also work with this in your case.
{code:java}
final var schema =
Schema.newBuilder()
.column("key", DataTypes.STRING())
.column(
"values",
DataTypes.ARRAY(
DataTypes.MAP(
DataTypes.STRING(), 
DataTypes.STRING()))
.bridgedTo(List.class))
.build(); {code}

> List field in a POJO data stream results in table program compilation failure
> -
>
> Key: FLINK-31828
> URL: https://issues.apache.org/jira/browse/FLINK-31828
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.16.1
> Environment: Java 11
> Flink 1.16.1
>Reporter: Vladimir Matveev
>Priority: Major
>  Labels: pull-request-available
> Attachments: MainPojo.java, generated-code.txt, stacktrace.txt
>
>
> Suppose I have a POJO class like this:
> {code:java}
> public class Example {
> private String key;
> private List> values;
> // getters, setters, equals+hashCode omitted
> }
> {code}
> When a DataStream with this class is converted to a table, and some 
> operations are performed on it, it results in an exception which explicitly 
> says that I should file a ticket:
> {noformat}
> Caused by: org.apache.flink.api.common.InvalidProgramException: Table program 
> cannot be compiled. This is a bug. Please file an issue.
> {noformat}
> Please find the example Java code and the full stack trace attached.
> From the exception and generated code it seems that Flink is upset with the 
> list field being treated as an array - but I cannot have an array type there 
> in the real code.
> Also note that if I _don't_ specify the schema explicitly, it then maps the 
> {{values}} field to a `RAW('java.util.List', '...')` type, which also does 
> not work correctly and fails the job in case of even simplest operations like 
> printing.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31828) List field in a POJO data stream results in table program compilation failure

2023-04-23 Thread Aitozi (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31828?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17715403#comment-17715403
 ] 

Aitozi commented on FLINK-31828:


[~twalthr] could help review this fix :) ? Since I see you help review the last 
commit for this part

> List field in a POJO data stream results in table program compilation failure
> -
>
> Key: FLINK-31828
> URL: https://issues.apache.org/jira/browse/FLINK-31828
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.16.1
> Environment: Java 11
> Flink 1.16.1
>Reporter: Vladimir Matveev
>Priority: Major
>  Labels: pull-request-available
> Attachments: MainPojo.java, generated-code.txt, stacktrace.txt
>
>
> Suppose I have a POJO class like this:
> {code:java}
> public class Example {
> private String key;
> private List> values;
> // getters, setters, equals+hashCode omitted
> }
> {code}
> When a DataStream with this class is converted to a table, and some 
> operations are performed on it, it results in an exception which explicitly 
> says that I should file a ticket:
> {noformat}
> Caused by: org.apache.flink.api.common.InvalidProgramException: Table program 
> cannot be compiled. This is a bug. Please file an issue.
> {noformat}
> Please find the example Java code and the full stack trace attached.
> From the exception and generated code it seems that Flink is upset with the 
> list field being treated as an array - but I cannot have an array type there 
> in the real code.
> Also note that if I _don't_ specify the schema explicitly, it then maps the 
> {{values}} field to a `RAW('java.util.List', '...')` type, which also does 
> not work correctly and fails the job in case of even simplest operations like 
> printing.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31828) List field in a POJO data stream results in table program compilation failure

2023-04-22 Thread Aitozi (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31828?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17715356#comment-17715356
 ] 

Aitozi commented on FLINK-31828:


I have pushed a fix for it. Before this fix the demo will fails with 
EOFException when print the raw type (as shown in the pull request)

After this, in this example, user can define the schema as below. Then 
 
{code:java}
final var schema =
Schema.newBuilder()
.column("key", DataTypes.STRING())
.column("values", DataTypes.RAW(java.util.List.class))
.build(); {code}

> List field in a POJO data stream results in table program compilation failure
> -
>
> Key: FLINK-31828
> URL: https://issues.apache.org/jira/browse/FLINK-31828
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.16.1
> Environment: Java 11
> Flink 1.16.1
>Reporter: Vladimir Matveev
>Priority: Major
>  Labels: pull-request-available
> Attachments: MainPojo.java, generated-code.txt, stacktrace.txt
>
>
> Suppose I have a POJO class like this:
> {code:java}
> public class Example {
> private String key;
> private List> values;
> // getters, setters, equals+hashCode omitted
> }
> {code}
> When a DataStream with this class is converted to a table, and some 
> operations are performed on it, it results in an exception which explicitly 
> says that I should file a ticket:
> {noformat}
> Caused by: org.apache.flink.api.common.InvalidProgramException: Table program 
> cannot be compiled. This is a bug. Please file an issue.
> {noformat}
> Please find the example Java code and the full stack trace attached.
> From the exception and generated code it seems that Flink is upset with the 
> list field being treated as an array - but I cannot have an array type there 
> in the real code.
> Also note that if I _don't_ specify the schema explicitly, it then maps the 
> {{values}} field to a `RAW('java.util.List', '...')` type, which also does 
> not work correctly and fails the job in case of even simplest operations like 
> printing.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31828) List field in a POJO data stream results in table program compilation failure

2023-04-21 Thread Aitozi (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31828?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17715111#comment-17715111
 ] 

Aitozi commented on FLINK-31828:


Hi [~netvl] Thanks for this detailed bug report.

I have reproduced your problem. And I also spend some time to dig the way to 
use RAW type to declare the List type in your case. I found that there's a bug 
in the cast rule (using the wrong serializer), so it will fail with EOF 
exception as you mentioned (hope it's the same error with you).

I will prepare a PR to solve this bug 

> List field in a POJO data stream results in table program compilation failure
> -
>
> Key: FLINK-31828
> URL: https://issues.apache.org/jira/browse/FLINK-31828
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.16.1
> Environment: Java 11
> Flink 1.16.1
>Reporter: Vladimir Matveev
>Priority: Major
> Attachments: MainPojo.java, generated-code.txt, stacktrace.txt
>
>
> Suppose I have a POJO class like this:
> {code:java}
> public class Example {
> private String key;
> private List> values;
> // getters, setters, equals+hashCode omitted
> }
> {code}
> When a DataStream with this class is converted to a table, and some 
> operations are performed on it, it results in an exception which explicitly 
> says that I should file a ticket:
> {noformat}
> Caused by: org.apache.flink.api.common.InvalidProgramException: Table program 
> cannot be compiled. This is a bug. Please file an issue.
> {noformat}
> Please find the example Java code and the full stack trace attached.
> From the exception and generated code it seems that Flink is upset with the 
> list field being treated as an array - but I cannot have an array type there 
> in the real code.
> Also note that if I _don't_ specify the schema explicitly, it then maps the 
> {{values}} field to a `RAW('java.util.List', '...')` type, which also does 
> not work correctly and fails the job in case of even simplest operations like 
> printing.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-29692) Support early/late fires for Windowing TVFs

2023-04-20 Thread Aitozi (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17714815#comment-17714815
 ] 

Aitozi edited comment on FLINK-29692 at 4/21/23 4:12 AM:
-

hi, sorry for jumping into this discussion. I want to share two thoughts about 
this feature.
 * As [~jark] mentioned, in the group window aggregation the early/late fire 
has been supported. Window TVF as a feature-rich version of group window 
aggregation, so I think this ability should be aligned.
 * There is difference between the early/late fire and cumulate window tvf. 
early/late fire is something relates to the window trigger. But cumulative 
window relates to window assigner. We could use cumulative window to simulate 
the same functionality, but it may bring overhead as [~charles-tan] said.

So, +1 from my side to support emit strategy(early/late fire) in window TVF.


was (Author: aitozi):
hi, sorry for jumping into this discussion. I want to share two thoughts about 
this feature.
 * As [~jark] mentioned, in the group window aggregation the early/late fire 
has been supported. Window TVF as a feature-rich version of group window 
aggregation, so I think this ability should be aligned.
 * There is difference between the early/late fire and cumulate window tvf. 
early/late fire is something relates to the window trigger. But cumulative 
window relates to window assigner. We could use cumulative window to simulate 
the same functionality, but it may bring overhead as [~charles-tan] said.

So, +1 from my side to support emit strategy in window TVF.

> Support early/late fires for Windowing TVFs
> ---
>
> Key: FLINK-29692
> URL: https://issues.apache.org/jira/browse/FLINK-29692
> Project: Flink
>  Issue Type: New Feature
>  Components: Table SQL / Planner
>Affects Versions: 1.15.3
>Reporter: Canope Nerda
>Priority: Major
>
> I have cases where I need to 1) output data as soon as possible and 2) handle 
> late arriving data to achieve eventual correctness. In the logic, I need to 
> do window deduplication which is based on Windowing TVFs and according to 
> source code, early/late fires are not supported yet in Windowing TVFs.
> Actually 1) contradicts with 2). Without early/late fires, we had to 
> compromise, either live with fresh incorrect data or tolerate excess latency 
> for correctness.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-29692) Support early/late fires for Windowing TVFs

2023-04-20 Thread Aitozi (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17714815#comment-17714815
 ] 

Aitozi commented on FLINK-29692:


hi, sorry for jumping into this discussion. I want to share two thoughts about 
this feature.
 * As [~jark] mentioned, in the group window aggregation the early/late fire 
has been supported. Window TVF as a feature-rich version of group window 
aggregation, so I think this ability should be aligned.
 * There is difference between the early/late fire and cumulate window tvf. 
early/late fire is something relates to the window trigger. But cumulative 
window relates to window assigner. We could use cumulative window to simulate 
the same functionality, but it may bring overhead as [~charles-tan] said.

So, +1 from my side to support emit strategy in window TVF.

> Support early/late fires for Windowing TVFs
> ---
>
> Key: FLINK-29692
> URL: https://issues.apache.org/jira/browse/FLINK-29692
> Project: Flink
>  Issue Type: New Feature
>  Components: Table SQL / Planner
>Affects Versions: 1.15.3
>Reporter: Canope Nerda
>Priority: Major
>
> I have cases where I need to 1) output data as soon as possible and 2) handle 
> late arriving data to achieve eventual correctness. In the logic, I need to 
> do window deduplication which is based on Windowing TVFs and according to 
> source code, early/late fires are not supported yet in Windowing TVFs.
> Actually 1) contradicts with 2). Without early/late fires, we had to 
> compromise, either live with fresh incorrect data or tolerate excess latency 
> for correctness.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31755) ROW function can not work with RewriteIntersectAllRule

2023-04-18 Thread Aitozi (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31755?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17713864#comment-17713864
 ] 

Aitozi commented on FLINK-31755:


CC [~lincoln.86xy] [~snuyanzin] 

> ROW function can not work with RewriteIntersectAllRule
> --
>
> Key: FLINK-31755
> URL: https://issues.apache.org/jira/browse/FLINK-31755
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Reporter: Aitozi
>Priority: Major
>  Labels: pull-request-available
>
> Reproduce case:
> {code:java}
> create table row_sink (
>   `b` ROW
> ) with (
>   'connector' = 'values'
> )
> util.verifyRelPlanInsert(
> "INSERT INTO row_sink " +
>   "SELECT ROW(a, b) FROM complex_type_src intersect all " +
>   "SELECT ROW(c, d) FROM complex_type_src ")
> {code}
> It will fails with 
> {code:java}
> Caused by: java.lang.IllegalArgumentException: Type mismatch:
> rel rowtype: RecordType(RecordType:peek_no_expand(VARCHAR(2147483647) 
> CHARACTER SET "UTF-16LE" EXPR$0, INTEGER EXPR$1) NOT NULL EXPR$0) NOT NULL
> equiv rowtype: RecordType(RecordType(VARCHAR(2147483647) CHARACTER SET 
> "UTF-16LE" EXPR$0, INTEGER EXPR$1) NOT NULL EXPR$0) NOT NULL
> Difference:
> EXPR$0: RecordType:peek_no_expand(VARCHAR(2147483647) CHARACTER SET 
> "UTF-16LE" EXPR$0, INTEGER EXPR$1) NOT NULL -> RecordType(VARCHAR(2147483647) 
> CHARACTER SET "UTF-16LE" EXPR$0, INTEGER EXPR$1) NOT NULL
>   at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:592)
>   at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:613)
>   at 
> org.apache.calcite.plan.volcano.VolcanoRuleCall.transformTo(VolcanoRuleCall.java:144)
>   ... 68 more
> {code}
> The reason is:
> ROW function will generates the {{FULLY_QUALIFIED}} type. But after the 
> {{RewriteIntersectAllRule}} optimization, it will produce the 
> {{PEEK_FIELDS_NO_EXPAND}}. So the volcano planner complains with type 
> mismatch.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31774) Add document for delete and update statement

2023-04-13 Thread Aitozi (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31774?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17712195#comment-17712195
 ] 

Aitozi commented on FLINK-31774:


Hi [~luoyuxia] The pr is ready for review, please take a look when you are free.

> Add document for delete and update statement
> 
>
> Key: FLINK-31774
> URL: https://issues.apache.org/jira/browse/FLINK-31774
> Project: Flink
>  Issue Type: Sub-task
>  Components: Documentation
>Reporter: Aitozi
>Assignee: Aitozi
>Priority: Major
>  Labels: pull-request-available
>
> I do not find the declaration about the usage of DELETE and UPDATE statement 
> in the SQL section. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31790) Filesystem batch sink should also respect to the PartitionCommitPolicy

2023-04-12 Thread Aitozi (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31790?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17711683#comment-17711683
 ] 

Aitozi commented on FLINK-31790:


CC [~luoyuxia] 

> Filesystem batch sink should also respect to the PartitionCommitPolicy
> --
>
> Key: FLINK-31790
> URL: https://issues.apache.org/jira/browse/FLINK-31790
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / FileSystem
>Reporter: Aitozi
>Priority: Major
>  Labels: pull-request-available
>
> Currently, the {{PartitionCommitPolicy}} only take effect in the streaming 
> file sink and hive file sink. The filesystem sink in batch mode should also 
> respect to the commit policy



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-31790) Filesystem batch sink should also respect to the PartitionCommitPolicy

2023-04-12 Thread Aitozi (Jira)
Aitozi created FLINK-31790:
--

 Summary: Filesystem batch sink should also respect to the 
PartitionCommitPolicy
 Key: FLINK-31790
 URL: https://issues.apache.org/jira/browse/FLINK-31790
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / FileSystem
Reporter: Aitozi


Currently, the {{PartitionCommitPolicy}} only take effect in the streaming file 
sink and hive file sink. The filesystem sink in batch mode should also respect 
to the commit policy



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31787) Add the explicit ROW constructor to the system function doc

2023-04-12 Thread Aitozi (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31787?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17711427#comment-17711427
 ] 

Aitozi commented on FLINK-31787:


see discussion in this ticket: https://issues.apache.org/jira/browse/FLINK-18027

> Add the explicit ROW constructor to the system function doc
> ---
>
> Key: FLINK-31787
> URL: https://issues.apache.org/jira/browse/FLINK-31787
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Reporter: Aitozi
>Priority: Minor
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-31787) Add the explicit ROW constructor to the system function doc

2023-04-12 Thread Aitozi (Jira)
Aitozi created FLINK-31787:
--

 Summary: Add the explicit ROW constructor to the system function 
doc
 Key: FLINK-31787
 URL: https://issues.apache.org/jira/browse/FLINK-31787
 Project: Flink
  Issue Type: Improvement
  Components: Documentation
Reporter: Aitozi






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31344) Support to update nested columns in update statement

2023-04-12 Thread Aitozi (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31344?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17711306#comment-17711306
 ] 

Aitozi commented on FLINK-31344:


[~luoyuxia] I have finished this ticket, could you help take a look when you 
are free ?

> Support to update nested columns in update statement
> 
>
> Key: FLINK-31344
> URL: https://issues.apache.org/jira/browse/FLINK-31344
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / API
>Reporter: luoyuxia
>Assignee: Aitozi
>Priority: Major
>  Labels: pull-request-available
>
> Currently, it'll throw exception while using update statement to update 
> nested column;
> For the following sql:
>  
> {code:java}
> create table (t ROW<`a` INT>) with (xxx);
> update t set s.a = 1;{code}
> It'll throw the exception:
> {code:java}
> Caused by: org.apache.flink.sql.parser.impl.ParseException: Encountered "." 
> at line 1, column 15.
> Was expecting:
>     "=" ...
>     
>     at 
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.generateParseException(FlinkSqlParserImpl.java:46382)
>     at 
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.jj_consume_token(FlinkSqlParserImpl.java:46190)
>     at 
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlUpdate(FlinkSqlParserImpl.java:14389)
>     at 
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlStmt(FlinkSqlParserImpl.java:4121)
>     at 
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlStmtList(FlinkSqlParserImpl.java:2998)
>     at 
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.parseSqlStmtList(FlinkSqlParserImpl.java:306)
>     at 
> org.apache.calcite.sql.parser.SqlParser.parseStmtList(SqlParser.java:198)
>     ... 33 more {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31533) CREATE TABLE AS SELECT should support to define partition

2023-04-12 Thread Aitozi (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31533?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17711265#comment-17711265
 ] 

Aitozi commented on FLINK-31533:


After some trying, I find it's seems easy to support this feature. It doesn't 
have to touch the syntax part. Because the CTAS reuse/(part of) the 
SqlCreateTable syntax. So the sql below can be parsed already
{code:java}
 CREATE TABLE MyCtasTable 
 PARTITIONED BY (`a`) 
 WITH (
  'connector' = 'filesystem',
  'format' = 'testcsv',
)  AS
 SELECT * FROM MyTable{code}
We can't define this  because it's manually banned in 
{{SqlCreateTableAs#validate}}

In my poc, it just need some minor change to support the partition by 
definition and do not have to touch the public api/syntax. Do you think we 
still needs a FLIP for this or not [~luoyuxia] ?

> CREATE TABLE AS SELECT should support to define partition
> -
>
> Key: FLINK-31533
> URL: https://issues.apache.org/jira/browse/FLINK-31533
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Reporter: luoyuxia
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] (FLINK-31533) CREATE TABLE AS SELECT should support to define partition

2023-04-12 Thread Aitozi (Jira)


[ https://issues.apache.org/jira/browse/FLINK-31533 ]


Aitozi deleted comment on FLINK-31533:


was (Author: aitozi):
Yes, it's indeed need to revisit the parser of the ctas syntax. I will draw a 
POC for this and open a discussion in dev mailing 

> CREATE TABLE AS SELECT should support to define partition
> -
>
> Key: FLINK-31533
> URL: https://issues.apache.org/jira/browse/FLINK-31533
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Reporter: luoyuxia
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31533) CREATE TABLE AS SELECT should support to define partition

2023-04-12 Thread Aitozi (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31533?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17711203#comment-17711203
 ] 

Aitozi commented on FLINK-31533:


Yes, it's indeed need to revisit the parser of the ctas syntax. I will draw a 
POC for this and open a discussion in dev mailing 

> CREATE TABLE AS SELECT should support to define partition
> -
>
> Key: FLINK-31533
> URL: https://issues.apache.org/jira/browse/FLINK-31533
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Reporter: luoyuxia
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-18027) ROW value constructor cannot deal with complex expressions

2023-04-11 Thread Aitozi (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18027?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17711200#comment-17711200
 ] 

Aitozi edited comment on FLINK-18027 at 4/12/23 5:43 AM:
-

The content is modified

from 

[https://github.com/apache/flink/blob/177310ebe3d552ec71a1f1f97e0207cf30b6efed/docs/dev/table/functions/systemFunctions.md]

to

[https://github.com/apache/flink/blob/a5372e0c92c7a0f465beba5e5204b07769cd92e6/docs/data/sql_functions.yml]
 

 

I think the content below is removed accidently.  CC [~sjwiesman] 

 
{code:java}
-- explicit ROW constructor ROW(value1 [, value2]*) {% endhighlight %} {code}
 

I think we should add this back, since the explicit ROW constructor's 
limitation has been solved


was (Author: aitozi):
The content is modified

from 

[https://github.com/apache/flink/blob/177310ebe3d552ec71a1f1f97e0207cf30b6efed/docs/dev/table/functions/systemFunctions.md]

to

[https://github.com/apache/flink/blob/a5372e0c92c7a0f465beba5e5204b07769cd92e6/docs/data/sql_functions.yml]
 

 

I think the content below is removed accidently. 

 
{code:java}
-- explicit ROW constructor ROW(value1 [, value2]*) {% endhighlight %} {code}
 

 

I think we should add this back, since the explicit ROW constructor's 
limitation has been solved

> ROW value constructor cannot deal with complex expressions
> --
>
> Key: FLINK-18027
> URL: https://issues.apache.org/jira/browse/FLINK-18027
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Reporter: Benchao Li
>Priority: Not a Priority
>  Labels: auto-deprioritized-major, auto-deprioritized-minor, 
> pull-request-available
>
> {code:java}
> create table my_source (
> my_row row
> ) with (...);
> create table my_sink (
> my_row row
> ) with (...);
> insert into my_sink
> select ROW(my_row.a, my_row.b) 
> from my_source;{code}
> will throw excepions:
> {code:java}
> Exception in thread "main" org.apache.flink.table.api.SqlParserException: SQL 
> parse failed. Encountered "." at line 1, column 18.Exception in thread "main" 
> org.apache.flink.table.api.SqlParserException: SQL parse failed. Encountered 
> "." at line 1, column 18.Was expecting one of:    ")" ...    "," ...     at 
> org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:56)
>  at 
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:64)
>  at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:627)
>  at com.bytedance.demo.KafkaTableSource.main(KafkaTableSource.java:76)Caused 
> by: org.apache.calcite.sql.parser.SqlParseException: Encountered "." at line 
> 1, column 18.Was expecting one of:    ")" ...    "," ...     at 
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.convertException(FlinkSqlParserImpl.java:416)
>  at 
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.normalizeException(FlinkSqlParserImpl.java:201)
>  at 
> org.apache.calcite.sql.parser.SqlParser.handleException(SqlParser.java:148) 
> at org.apache.calcite.sql.parser.SqlParser.parseQuery(SqlParser.java:163) at 
> org.apache.calcite.sql.parser.SqlParser.parseStmt(SqlParser.java:188) at 
> org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:54)
>  ... 3 moreCaused by: org.apache.flink.sql.parser.impl.ParseException: 
> Encountered "." at line 1, column 18.Was expecting one of:    ")" ...    "," 
> ...     at 
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.generateParseException(FlinkSqlParserImpl.java:36161)
>  at 
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.jj_consume_token(FlinkSqlParserImpl.java:35975)
>  at 
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.ParenthesizedSimpleIdentifierList(FlinkSqlParserImpl.java:21432)
>  at 
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.Expression3(FlinkSqlParserImpl.java:17164)
>  at 
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.Expression2b(FlinkSqlParserImpl.java:16820)
>  at 
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.Expression2(FlinkSqlParserImpl.java:16861)
>  at 
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.Expression(FlinkSqlParserImpl.java:16792)
>  at 
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SelectExpression(FlinkSqlParserImpl.java:11091)
>  at 
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SelectItem(FlinkSqlParserImpl.java:10293)
>  at 
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SelectList(FlinkSqlParserImpl.java:10267)
>  at 
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlSelect(FlinkSqlParserImpl.java:6943)
>  at 
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.LeafQuery(FlinkSqlParserImpl.java:658)
>  at 
> 

[jira] [Commented] (FLINK-18027) ROW value constructor cannot deal with complex expressions

2023-04-11 Thread Aitozi (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18027?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17711200#comment-17711200
 ] 

Aitozi commented on FLINK-18027:


The content is modified

from 

[https://github.com/apache/flink/blob/177310ebe3d552ec71a1f1f97e0207cf30b6efed/docs/dev/table/functions/systemFunctions.md]

to

[https://github.com/apache/flink/blob/a5372e0c92c7a0f465beba5e5204b07769cd92e6/docs/data/sql_functions.yml]
 

 

I think the content below is removed accidently. 

 
{code:java}
-- explicit ROW constructor ROW(value1 [, value2]*) {% endhighlight %} {code}
 

 

I think we should add this back, since the explicit ROW constructor's 
limitation has been solved

> ROW value constructor cannot deal with complex expressions
> --
>
> Key: FLINK-18027
> URL: https://issues.apache.org/jira/browse/FLINK-18027
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Reporter: Benchao Li
>Priority: Not a Priority
>  Labels: auto-deprioritized-major, auto-deprioritized-minor, 
> pull-request-available
>
> {code:java}
> create table my_source (
> my_row row
> ) with (...);
> create table my_sink (
> my_row row
> ) with (...);
> insert into my_sink
> select ROW(my_row.a, my_row.b) 
> from my_source;{code}
> will throw excepions:
> {code:java}
> Exception in thread "main" org.apache.flink.table.api.SqlParserException: SQL 
> parse failed. Encountered "." at line 1, column 18.Exception in thread "main" 
> org.apache.flink.table.api.SqlParserException: SQL parse failed. Encountered 
> "." at line 1, column 18.Was expecting one of:    ")" ...    "," ...     at 
> org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:56)
>  at 
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:64)
>  at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:627)
>  at com.bytedance.demo.KafkaTableSource.main(KafkaTableSource.java:76)Caused 
> by: org.apache.calcite.sql.parser.SqlParseException: Encountered "." at line 
> 1, column 18.Was expecting one of:    ")" ...    "," ...     at 
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.convertException(FlinkSqlParserImpl.java:416)
>  at 
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.normalizeException(FlinkSqlParserImpl.java:201)
>  at 
> org.apache.calcite.sql.parser.SqlParser.handleException(SqlParser.java:148) 
> at org.apache.calcite.sql.parser.SqlParser.parseQuery(SqlParser.java:163) at 
> org.apache.calcite.sql.parser.SqlParser.parseStmt(SqlParser.java:188) at 
> org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:54)
>  ... 3 moreCaused by: org.apache.flink.sql.parser.impl.ParseException: 
> Encountered "." at line 1, column 18.Was expecting one of:    ")" ...    "," 
> ...     at 
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.generateParseException(FlinkSqlParserImpl.java:36161)
>  at 
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.jj_consume_token(FlinkSqlParserImpl.java:35975)
>  at 
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.ParenthesizedSimpleIdentifierList(FlinkSqlParserImpl.java:21432)
>  at 
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.Expression3(FlinkSqlParserImpl.java:17164)
>  at 
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.Expression2b(FlinkSqlParserImpl.java:16820)
>  at 
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.Expression2(FlinkSqlParserImpl.java:16861)
>  at 
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.Expression(FlinkSqlParserImpl.java:16792)
>  at 
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SelectExpression(FlinkSqlParserImpl.java:11091)
>  at 
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SelectItem(FlinkSqlParserImpl.java:10293)
>  at 
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SelectList(FlinkSqlParserImpl.java:10267)
>  at 
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlSelect(FlinkSqlParserImpl.java:6943)
>  at 
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.LeafQuery(FlinkSqlParserImpl.java:658)
>  at 
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.LeafQueryOrExpr(FlinkSqlParserImpl.java:16775)
>  at 
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.QueryOrExpr(FlinkSqlParserImpl.java:16238)
>  at 
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.OrderedQueryOrExpr(FlinkSqlParserImpl.java:532)
>  at 
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlStmt(FlinkSqlParserImpl.java:3761)
>  at 
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlStmtEof(FlinkSqlParserImpl.java:3800)
>  at 
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.parseSqlStmtEof(FlinkSqlParserImpl.java:248)
>  at 

[jira] [Commented] (FLINK-18027) ROW value constructor cannot deal with complex expressions

2023-04-11 Thread Aitozi (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18027?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17711040#comment-17711040
 ] 

Aitozi commented on FLINK-18027:


[~libenchao] do you know why the explicit ROW construct is removed from the 
doc, (the git history of the file seems not trackable)

Does it's not encouraged to use explicit ROW call ? If not, I think we should 
add it back.

> ROW value constructor cannot deal with complex expressions
> --
>
> Key: FLINK-18027
> URL: https://issues.apache.org/jira/browse/FLINK-18027
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Reporter: Benchao Li
>Priority: Not a Priority
>  Labels: auto-deprioritized-major, auto-deprioritized-minor, 
> pull-request-available
>
> {code:java}
> create table my_source (
> my_row row
> ) with (...);
> create table my_sink (
> my_row row
> ) with (...);
> insert into my_sink
> select ROW(my_row.a, my_row.b) 
> from my_source;{code}
> will throw excepions:
> {code:java}
> Exception in thread "main" org.apache.flink.table.api.SqlParserException: SQL 
> parse failed. Encountered "." at line 1, column 18.Exception in thread "main" 
> org.apache.flink.table.api.SqlParserException: SQL parse failed. Encountered 
> "." at line 1, column 18.Was expecting one of:    ")" ...    "," ...     at 
> org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:56)
>  at 
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:64)
>  at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:627)
>  at com.bytedance.demo.KafkaTableSource.main(KafkaTableSource.java:76)Caused 
> by: org.apache.calcite.sql.parser.SqlParseException: Encountered "." at line 
> 1, column 18.Was expecting one of:    ")" ...    "," ...     at 
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.convertException(FlinkSqlParserImpl.java:416)
>  at 
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.normalizeException(FlinkSqlParserImpl.java:201)
>  at 
> org.apache.calcite.sql.parser.SqlParser.handleException(SqlParser.java:148) 
> at org.apache.calcite.sql.parser.SqlParser.parseQuery(SqlParser.java:163) at 
> org.apache.calcite.sql.parser.SqlParser.parseStmt(SqlParser.java:188) at 
> org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:54)
>  ... 3 moreCaused by: org.apache.flink.sql.parser.impl.ParseException: 
> Encountered "." at line 1, column 18.Was expecting one of:    ")" ...    "," 
> ...     at 
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.generateParseException(FlinkSqlParserImpl.java:36161)
>  at 
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.jj_consume_token(FlinkSqlParserImpl.java:35975)
>  at 
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.ParenthesizedSimpleIdentifierList(FlinkSqlParserImpl.java:21432)
>  at 
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.Expression3(FlinkSqlParserImpl.java:17164)
>  at 
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.Expression2b(FlinkSqlParserImpl.java:16820)
>  at 
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.Expression2(FlinkSqlParserImpl.java:16861)
>  at 
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.Expression(FlinkSqlParserImpl.java:16792)
>  at 
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SelectExpression(FlinkSqlParserImpl.java:11091)
>  at 
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SelectItem(FlinkSqlParserImpl.java:10293)
>  at 
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SelectList(FlinkSqlParserImpl.java:10267)
>  at 
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlSelect(FlinkSqlParserImpl.java:6943)
>  at 
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.LeafQuery(FlinkSqlParserImpl.java:658)
>  at 
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.LeafQueryOrExpr(FlinkSqlParserImpl.java:16775)
>  at 
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.QueryOrExpr(FlinkSqlParserImpl.java:16238)
>  at 
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.OrderedQueryOrExpr(FlinkSqlParserImpl.java:532)
>  at 
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlStmt(FlinkSqlParserImpl.java:3761)
>  at 
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlStmtEof(FlinkSqlParserImpl.java:3800)
>  at 
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.parseSqlStmtEof(FlinkSqlParserImpl.java:248)
>  at org.apache.calcite.sql.parser.SqlParser.parseQuery(SqlParser.java:161) 
> ... 5 more
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31774) Add document for delete and update statement

2023-04-11 Thread Aitozi (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31774?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17711033#comment-17711033
 ] 

Aitozi commented on FLINK-31774:


I think we should add the description for this two statements similar to the 
INSERT statement. CC [~luoyuxia] I'm willing to work on this.

> Add document for delete and update statement
> 
>
> Key: FLINK-31774
> URL: https://issues.apache.org/jira/browse/FLINK-31774
> Project: Flink
>  Issue Type: Sub-task
>  Components: Documentation
>Reporter: Aitozi
>Priority: Major
>
> I do not find the declaration about the usage of DELETE and UPDATE statement 
> in the SQL section. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-31774) Add document for delete and update statement

2023-04-11 Thread Aitozi (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31774?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aitozi updated FLINK-31774:
---
Parent: FLINK-30648
Issue Type: Sub-task  (was: Improvement)

> Add document for delete and update statement
> 
>
> Key: FLINK-31774
> URL: https://issues.apache.org/jira/browse/FLINK-31774
> Project: Flink
>  Issue Type: Sub-task
>  Components: Documentation
>Reporter: Aitozi
>Priority: Major
>
> I do not find the declaration about the usage of DELETE and UPDATE statement 
> in the SQL section. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-31774) Add document for delete and update statement

2023-04-11 Thread Aitozi (Jira)
Aitozi created FLINK-31774:
--

 Summary: Add document for delete and update statement
 Key: FLINK-31774
 URL: https://issues.apache.org/jira/browse/FLINK-31774
 Project: Flink
  Issue Type: Improvement
  Components: Documentation
Reporter: Aitozi


I do not find the declaration about the usage of DELETE and UPDATE statement in 
the SQL section. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31533) CREATE TABLE AS SELECT should support to define partition

2023-04-11 Thread Aitozi (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31533?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17711002#comment-17711002
 ] 

Aitozi commented on FLINK-31533:


[~luoyuxia] I think it's a useful feature, and I'd like to support this, can I 
take this ticket ?

> CREATE TABLE AS SELECT should support to define partition
> -
>
> Key: FLINK-31533
> URL: https://issues.apache.org/jira/browse/FLINK-31533
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Reporter: luoyuxia
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-18027) ROW value constructor cannot deal with complex expressions

2023-04-11 Thread Aitozi (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18027?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17710866#comment-17710866
 ] 

Aitozi commented on FLINK-18027:


Now calcite version is bundled to 1.29, the complex expression for row is 
supported (verified). This issue can be closed now. CC [~libenchao]  [~jark] 

> ROW value constructor cannot deal with complex expressions
> --
>
> Key: FLINK-18027
> URL: https://issues.apache.org/jira/browse/FLINK-18027
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Reporter: Benchao Li
>Priority: Not a Priority
>  Labels: auto-deprioritized-major, auto-deprioritized-minor, 
> pull-request-available
>
> {code:java}
> create table my_source (
> my_row row
> ) with (...);
> create table my_sink (
> my_row row
> ) with (...);
> insert into my_sink
> select ROW(my_row.a, my_row.b) 
> from my_source;{code}
> will throw excepions:
> {code:java}
> Exception in thread "main" org.apache.flink.table.api.SqlParserException: SQL 
> parse failed. Encountered "." at line 1, column 18.Exception in thread "main" 
> org.apache.flink.table.api.SqlParserException: SQL parse failed. Encountered 
> "." at line 1, column 18.Was expecting one of:    ")" ...    "," ...     at 
> org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:56)
>  at 
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:64)
>  at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:627)
>  at com.bytedance.demo.KafkaTableSource.main(KafkaTableSource.java:76)Caused 
> by: org.apache.calcite.sql.parser.SqlParseException: Encountered "." at line 
> 1, column 18.Was expecting one of:    ")" ...    "," ...     at 
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.convertException(FlinkSqlParserImpl.java:416)
>  at 
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.normalizeException(FlinkSqlParserImpl.java:201)
>  at 
> org.apache.calcite.sql.parser.SqlParser.handleException(SqlParser.java:148) 
> at org.apache.calcite.sql.parser.SqlParser.parseQuery(SqlParser.java:163) at 
> org.apache.calcite.sql.parser.SqlParser.parseStmt(SqlParser.java:188) at 
> org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:54)
>  ... 3 moreCaused by: org.apache.flink.sql.parser.impl.ParseException: 
> Encountered "." at line 1, column 18.Was expecting one of:    ")" ...    "," 
> ...     at 
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.generateParseException(FlinkSqlParserImpl.java:36161)
>  at 
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.jj_consume_token(FlinkSqlParserImpl.java:35975)
>  at 
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.ParenthesizedSimpleIdentifierList(FlinkSqlParserImpl.java:21432)
>  at 
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.Expression3(FlinkSqlParserImpl.java:17164)
>  at 
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.Expression2b(FlinkSqlParserImpl.java:16820)
>  at 
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.Expression2(FlinkSqlParserImpl.java:16861)
>  at 
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.Expression(FlinkSqlParserImpl.java:16792)
>  at 
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SelectExpression(FlinkSqlParserImpl.java:11091)
>  at 
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SelectItem(FlinkSqlParserImpl.java:10293)
>  at 
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SelectList(FlinkSqlParserImpl.java:10267)
>  at 
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlSelect(FlinkSqlParserImpl.java:6943)
>  at 
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.LeafQuery(FlinkSqlParserImpl.java:658)
>  at 
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.LeafQueryOrExpr(FlinkSqlParserImpl.java:16775)
>  at 
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.QueryOrExpr(FlinkSqlParserImpl.java:16238)
>  at 
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.OrderedQueryOrExpr(FlinkSqlParserImpl.java:532)
>  at 
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlStmt(FlinkSqlParserImpl.java:3761)
>  at 
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlStmtEof(FlinkSqlParserImpl.java:3800)
>  at 
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.parseSqlStmtEof(FlinkSqlParserImpl.java:248)
>  at org.apache.calcite.sql.parser.SqlParser.parseQuery(SqlParser.java:161) 
> ... 5 more
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31301) Unsupported nested columns in column list of insert statement

2023-04-10 Thread Aitozi (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31301?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17710396#comment-17710396
 ] 

Aitozi commented on FLINK-31301:


[~lincoln.86xy] , I opened PR for this, could you help review it, thanks.

> Unsupported nested columns in column list of insert statement
> -
>
> Key: FLINK-31301
> URL: https://issues.apache.org/jira/browse/FLINK-31301
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.17.0, 1.16.1
>Reporter: lincoln lee
>Assignee: Aitozi
>Priority: Major
>  Labels: pull-request-available
>
> Currently an error will be raised when use nested columns in column list of 
> insert statement, e.g.,
> {code:java}
> INSERT INTO nested_type_sink (a,b.b1,c.c2,f)
> SELECT a,b.b1,c.c2,f FROM nested_type_src
> {code}
>  
> {code:java}
> java.lang.AssertionError
>     at org.apache.calcite.sql.SqlIdentifier.getSimple(SqlIdentifier.java:333)
>     at 
> org.apache.calcite.sql.validate.SqlValidatorUtil.getTargetField(SqlValidatorUtil.java:612)
>     at 
> org.apache.flink.table.planner.calcite.PreValidateReWriter$.$anonfun$appendPartitionAndNullsProjects$3(PreValidateReWriter.scala:171)
>     at 
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
>     at scala.collection.Iterator.foreach(Iterator.scala:937)
>     at scala.collection.Iterator.foreach$(Iterator.scala:937)
>     at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
>     at scala.collection.IterableLike.foreach(IterableLike.scala:70)
>     at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
>     at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>     at scala.collection.TraversableLike.map(TraversableLike.scala:233)
>     at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
>     at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>     at 
> org.apache.flink.table.planner.calcite.PreValidateReWriter$.appendPartitionAndNullsProjects(PreValidateReWriter.scala:164)
>     at 
> org.apache.flink.table.planner.calcite.PreValidateReWriter.rewriteInsert(PreValidateReWriter.scala:71)
>     at 
> org.apache.flink.table.planner.calcite.PreValidateReWriter.visit(PreValidateReWriter.scala:61)
>     at 
> org.apache.flink.table.planner.calcite.PreValidateReWriter.visit(PreValidateReWriter.scala:50)
>     at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:161)
>     at 
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:118)
>     at 
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:113)
>     at 
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:281)
>     at 
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:106)
>     at 
> org.apache.flink.table.api.internal.StatementSetImpl.addInsertSql(StatementSetImpl.java:63)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31344) Support to update nested columns in update statement

2023-04-09 Thread Aitozi (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31344?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17710023#comment-17710023
 ] 

Aitozi commented on FLINK-31344:


Thanks for your reply. Totally agree with you. I have basically verified the 
first work, can you help assign this ticket to me ?

> Support to update nested columns in update statement
> 
>
> Key: FLINK-31344
> URL: https://issues.apache.org/jira/browse/FLINK-31344
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / API
>Reporter: luoyuxia
>Priority: Major
>
> Currently, it'll throw exception while using update statement to update 
> nested column;
> For the following sql:
>  
> {code:java}
> create table (t ROW<`a` INT>) with (xxx);
> update t set s.a = 1;{code}
> It'll throw the exception:
> {code:java}
> Caused by: org.apache.flink.sql.parser.impl.ParseException: Encountered "." 
> at line 1, column 15.
> Was expecting:
>     "=" ...
>     
>     at 
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.generateParseException(FlinkSqlParserImpl.java:46382)
>     at 
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.jj_consume_token(FlinkSqlParserImpl.java:46190)
>     at 
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlUpdate(FlinkSqlParserImpl.java:14389)
>     at 
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlStmt(FlinkSqlParserImpl.java:4121)
>     at 
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlStmtList(FlinkSqlParserImpl.java:2998)
>     at 
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.parseSqlStmtList(FlinkSqlParserImpl.java:306)
>     at 
> org.apache.calcite.sql.parser.SqlParser.parseStmtList(SqlParser.java:198)
>     ... 33 more {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31344) Support to update nested columns in update statement

2023-04-09 Thread Aitozi (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31344?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17709961#comment-17709961
 ] 

Aitozi commented on FLINK-31344:


Hi [~luoyuxia], when solving the FLINK-31301 I notice this ticket. After 
spending some time investigate on this ticket, I think we can support update 
nested columns in the following step:
 - Modify the SqlParser to accept the compound identifier for the UPDATE SET 
clause
 - Modify the interface SupportsRowLevelUpdate, which will be better to work 
with {{int[][]}} instead of {{Column}} (It will make the control of required 
column finer).

WDYT ?

> Support to update nested columns in update statement
> 
>
> Key: FLINK-31344
> URL: https://issues.apache.org/jira/browse/FLINK-31344
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / API
>Reporter: luoyuxia
>Priority: Major
>
> Currently, it'll throw exception while using update statement to update 
> nested column;
> For the following sql:
>  
> {code:java}
> create table (t ROW<`a` INT>) with (xxx);
> update t set s.a = 1;{code}
> It'll throw the exception:
> {code:java}
> Caused by: org.apache.flink.sql.parser.impl.ParseException: Encountered "." 
> at line 1, column 15.
> Was expecting:
>     "=" ...
>     
>     at 
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.generateParseException(FlinkSqlParserImpl.java:46382)
>     at 
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.jj_consume_token(FlinkSqlParserImpl.java:46190)
>     at 
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlUpdate(FlinkSqlParserImpl.java:14389)
>     at 
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlStmt(FlinkSqlParserImpl.java:4121)
>     at 
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlStmtList(FlinkSqlParserImpl.java:2998)
>     at 
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.parseSqlStmtList(FlinkSqlParserImpl.java:306)
>     at 
> org.apache.calcite.sql.parser.SqlParser.parseStmtList(SqlParser.java:198)
>     ... 33 more {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-31755) ROW function can not work with RewriteIntersectAllRule

2023-04-07 Thread Aitozi (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31755?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aitozi updated FLINK-31755:
---
Description: 
Reproduce case:


{code:java}
create table row_sink (
  `b` ROW
) with (
  'connector' = 'values'
)

util.verifyRelPlanInsert(
"INSERT INTO row_sink " +
  "SELECT ROW(a, b) FROM complex_type_src intersect all " +
  "SELECT ROW(c, d) FROM complex_type_src ")

{code}

It will fails with 


{code:java}
Caused by: java.lang.IllegalArgumentException: Type mismatch:
rel rowtype: RecordType(RecordType:peek_no_expand(VARCHAR(2147483647) CHARACTER 
SET "UTF-16LE" EXPR$0, INTEGER EXPR$1) NOT NULL EXPR$0) NOT NULL
equiv rowtype: RecordType(RecordType(VARCHAR(2147483647) CHARACTER SET 
"UTF-16LE" EXPR$0, INTEGER EXPR$1) NOT NULL EXPR$0) NOT NULL
Difference:
EXPR$0: RecordType:peek_no_expand(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" 
EXPR$0, INTEGER EXPR$1) NOT NULL -> RecordType(VARCHAR(2147483647) CHARACTER 
SET "UTF-16LE" EXPR$0, INTEGER EXPR$1) NOT NULL

at 
org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:592)
at 
org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:613)
at 
org.apache.calcite.plan.volcano.VolcanoRuleCall.transformTo(VolcanoRuleCall.java:144)
... 68 more
{code}


The reason is:

ROW function will generates the {{FULLY_QUALIFIED}} type. But after the 
{{RewriteIntersectAllRule}} optimization, it will produce the 
{{PEEK_FIELDS_NO_EXPAND}}. So the volcano planner complains with type mismatch.

  was:
Reproduce case:


{code:java}
create table row_sink (
  `b` ROW
) with (
  'connector' = 'values'
)

util.verifyRelPlanInsert(
"INSERT INTO row_sink " +
  "SELECT ROW(a, b) FROM complex_type_src intersect all " +
  "SELECT ROW(c, d) FROM complex_type_src ")

{code}

It will fails with 


{code:java}
Caused by: java.lang.IllegalArgumentException: Type mismatch:
rel rowtype: RecordType(RecordType:peek_no_expand(VARCHAR(2147483647) CHARACTER 
SET "UTF-16LE" EXPR$0, INTEGER EXPR$1) NOT NULL EXPR$0) NOT NULL
equiv rowtype: RecordType(RecordType(VARCHAR(2147483647) CHARACTER SET 
"UTF-16LE" EXPR$0, INTEGER EXPR$1) NOT NULL EXPR$0) NOT NULL
Difference:
EXPR$0: RecordType:peek_no_expand(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" 
EXPR$0, INTEGER EXPR$1) NOT NULL -> RecordType(VARCHAR(2147483647) CHARACTER 
SET "UTF-16LE" EXPR$0, INTEGER EXPR$1) NOT NULL

at 
org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:592)
at 
org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:613)
at 
org.apache.calcite.plan.volcano.VolcanoRuleCall.transformTo(VolcanoRuleCall.java:144)
... 68 more
{code}


The reason is:

ROW function will generates the {{FULLY_QUALIFIED}} type. But after the 
{{RewriteIntersectAllRule}} optimization, it will produce the 
{{PEEK_FIELDS_NO_EXPAND}}. So the volcan planner complains with type mismatch.


> ROW function can not work with RewriteIntersectAllRule
> --
>
> Key: FLINK-31755
> URL: https://issues.apache.org/jira/browse/FLINK-31755
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Reporter: Aitozi
>Priority: Major
>
> Reproduce case:
> {code:java}
> create table row_sink (
>   `b` ROW
> ) with (
>   'connector' = 'values'
> )
> util.verifyRelPlanInsert(
> "INSERT INTO row_sink " +
>   "SELECT ROW(a, b) FROM complex_type_src intersect all " +
>   "SELECT ROW(c, d) FROM complex_type_src ")
> {code}
> It will fails with 
> {code:java}
> Caused by: java.lang.IllegalArgumentException: Type mismatch:
> rel rowtype: RecordType(RecordType:peek_no_expand(VARCHAR(2147483647) 
> CHARACTER SET "UTF-16LE" EXPR$0, INTEGER EXPR$1) NOT NULL EXPR$0) NOT NULL
> equiv rowtype: RecordType(RecordType(VARCHAR(2147483647) CHARACTER SET 
> "UTF-16LE" EXPR$0, INTEGER EXPR$1) NOT NULL EXPR$0) NOT NULL
> Difference:
> EXPR$0: RecordType:peek_no_expand(VARCHAR(2147483647) CHARACTER SET 
> "UTF-16LE" EXPR$0, INTEGER EXPR$1) NOT NULL -> RecordType(VARCHAR(2147483647) 
> CHARACTER SET "UTF-16LE" EXPR$0, INTEGER EXPR$1) NOT NULL
>   at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:592)
>   at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:613)
>   at 
> org.apache.calcite.plan.volcano.VolcanoRuleCall.transformTo(VolcanoRuleCall.java:144)
>   ... 68 more
> {code}
> The reason is:
> ROW function will generates the {{FULLY_QUALIFIED}} type. But after the 
> {{RewriteIntersectAllRule}} optimization, it will produce the 
> {{PEEK_FIELDS_NO_EXPAND}}. So the volcano planner complains with type 
> mismatch.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-31755) ROW function can not work with RewriteIntersectAllRule

2023-04-07 Thread Aitozi (Jira)
Aitozi created FLINK-31755:
--

 Summary: ROW function can not work with RewriteIntersectAllRule
 Key: FLINK-31755
 URL: https://issues.apache.org/jira/browse/FLINK-31755
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner
Reporter: Aitozi


Reproduce case:


{code:java}
create table row_sink (
  `b` ROW
) with (
  'connector' = 'values'
)

util.verifyRelPlanInsert(
"INSERT INTO row_sink " +
  "SELECT ROW(a, b) FROM complex_type_src intersect all " +
  "SELECT ROW(c, d) FROM complex_type_src ")

{code}

It will fails with 


{code:java}
Caused by: java.lang.IllegalArgumentException: Type mismatch:
rel rowtype: RecordType(RecordType:peek_no_expand(VARCHAR(2147483647) CHARACTER 
SET "UTF-16LE" EXPR$0, INTEGER EXPR$1) NOT NULL EXPR$0) NOT NULL
equiv rowtype: RecordType(RecordType(VARCHAR(2147483647) CHARACTER SET 
"UTF-16LE" EXPR$0, INTEGER EXPR$1) NOT NULL EXPR$0) NOT NULL
Difference:
EXPR$0: RecordType:peek_no_expand(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" 
EXPR$0, INTEGER EXPR$1) NOT NULL -> RecordType(VARCHAR(2147483647) CHARACTER 
SET "UTF-16LE" EXPR$0, INTEGER EXPR$1) NOT NULL

at 
org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:592)
at 
org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:613)
at 
org.apache.calcite.plan.volcano.VolcanoRuleCall.transformTo(VolcanoRuleCall.java:144)
... 68 more
{code}


The reason is:

ROW function will generates the {{FULLY_QUALIFIED}} type. But after the 
{{RewriteIntersectAllRule}} optimization, it will produce the 
{{PEEK_FIELDS_NO_EXPAND}}. So the volcan planner complains with type mismatch.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31301) Unsupported nested columns in column list of insert statement

2023-04-05 Thread Aitozi (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31301?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17708948#comment-17708948
 ] 

Aitozi commented on FLINK-31301:


Hello [~lincoln.86xy], I want to contribute to this ticket and I have basically 
finished this. Can you help assign this ticket to me ?

> Unsupported nested columns in column list of insert statement
> -
>
> Key: FLINK-31301
> URL: https://issues.apache.org/jira/browse/FLINK-31301
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.17.0, 1.16.1
>Reporter: lincoln lee
>Priority: Major
>
> Currently an error will be raised when use nested columns in column list of 
> insert statement, e.g.,
> {code:java}
> INSERT INTO nested_type_sink (a,b.b1,c.c2,f)
> SELECT a,b.b1,c.c2,f FROM nested_type_src
> {code}
>  
> {code:java}
> java.lang.AssertionError
>     at org.apache.calcite.sql.SqlIdentifier.getSimple(SqlIdentifier.java:333)
>     at 
> org.apache.calcite.sql.validate.SqlValidatorUtil.getTargetField(SqlValidatorUtil.java:612)
>     at 
> org.apache.flink.table.planner.calcite.PreValidateReWriter$.$anonfun$appendPartitionAndNullsProjects$3(PreValidateReWriter.scala:171)
>     at 
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
>     at scala.collection.Iterator.foreach(Iterator.scala:937)
>     at scala.collection.Iterator.foreach$(Iterator.scala:937)
>     at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
>     at scala.collection.IterableLike.foreach(IterableLike.scala:70)
>     at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
>     at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>     at scala.collection.TraversableLike.map(TraversableLike.scala:233)
>     at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
>     at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>     at 
> org.apache.flink.table.planner.calcite.PreValidateReWriter$.appendPartitionAndNullsProjects(PreValidateReWriter.scala:164)
>     at 
> org.apache.flink.table.planner.calcite.PreValidateReWriter.rewriteInsert(PreValidateReWriter.scala:71)
>     at 
> org.apache.flink.table.planner.calcite.PreValidateReWriter.visit(PreValidateReWriter.scala:61)
>     at 
> org.apache.flink.table.planner.calcite.PreValidateReWriter.visit(PreValidateReWriter.scala:50)
>     at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:161)
>     at 
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:118)
>     at 
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:113)
>     at 
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:281)
>     at 
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:106)
>     at 
> org.apache.flink.table.api.internal.StatementSetImpl.addInsertSql(StatementSetImpl.java:63)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31301) Unsupported nested columns in column list of insert statement

2023-04-03 Thread Aitozi (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31301?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17708014#comment-17708014
 ] 

Aitozi commented on FLINK-31301:


Hello [~lincoln.86xy], I'm interested in this ticket and spend some time to 
look into this. I think we can support insert partial nested column by 
analysing the targetRow type to derive the RowType from the RelDataType. Then, 
we can construct the target row by {{row(b, cast(null as int), row(cast(null as 
varchar), c))}} to make up the complete row.
WDYT ?

> Unsupported nested columns in column list of insert statement
> -
>
> Key: FLINK-31301
> URL: https://issues.apache.org/jira/browse/FLINK-31301
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.17.0, 1.16.1
>Reporter: lincoln lee
>Priority: Major
>
> Currently an error will be raised when use nested columns in column list of 
> insert statement, e.g.,
> {code:java}
> INSERT INTO nested_type_sink (a,b.b1,c.c2,f)
> SELECT a,b.b1,c.c2,f FROM nested_type_src
> {code}
>  
> {code:java}
> java.lang.AssertionError
>     at org.apache.calcite.sql.SqlIdentifier.getSimple(SqlIdentifier.java:333)
>     at 
> org.apache.calcite.sql.validate.SqlValidatorUtil.getTargetField(SqlValidatorUtil.java:612)
>     at 
> org.apache.flink.table.planner.calcite.PreValidateReWriter$.$anonfun$appendPartitionAndNullsProjects$3(PreValidateReWriter.scala:171)
>     at 
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
>     at scala.collection.Iterator.foreach(Iterator.scala:937)
>     at scala.collection.Iterator.foreach$(Iterator.scala:937)
>     at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
>     at scala.collection.IterableLike.foreach(IterableLike.scala:70)
>     at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
>     at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>     at scala.collection.TraversableLike.map(TraversableLike.scala:233)
>     at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
>     at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>     at 
> org.apache.flink.table.planner.calcite.PreValidateReWriter$.appendPartitionAndNullsProjects(PreValidateReWriter.scala:164)
>     at 
> org.apache.flink.table.planner.calcite.PreValidateReWriter.rewriteInsert(PreValidateReWriter.scala:71)
>     at 
> org.apache.flink.table.planner.calcite.PreValidateReWriter.visit(PreValidateReWriter.scala:61)
>     at 
> org.apache.flink.table.planner.calcite.PreValidateReWriter.visit(PreValidateReWriter.scala:50)
>     at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:161)
>     at 
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:118)
>     at 
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:113)
>     at 
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:281)
>     at 
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:106)
>     at 
> org.apache.flink.table.api.internal.StatementSetImpl.addInsertSql(StatementSetImpl.java:63)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31550) Replace deprecated TableSchema with Schema in OperationConverterUtils

2023-03-28 Thread Aitozi (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31550?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17705828#comment-17705828
 ] 

Aitozi commented on FLINK-31550:


Is this stale? {{OperationConverterUtils}} do not use {{TableSchema}} now.

> Replace deprecated TableSchema with Schema in OperationConverterUtils
> -
>
> Key: FLINK-31550
> URL: https://issues.apache.org/jira/browse/FLINK-31550
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Planner
>Reporter: Jark Wu
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31507) Move execution logic of ShowOperation out from TableEnvironmentImpl

2023-03-19 Thread Aitozi (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31507?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17702245#comment-17702245
 ] 

Aitozi commented on FLINK-31507:


Hello, [~jark] May I help to do this refactor ?

> Move execution logic of ShowOperation out from TableEnvironmentImpl
> ---
>
> Key: FLINK-31507
> URL: https://issues.apache.org/jira/browse/FLINK-31507
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Planner
>Reporter: Jark Wu
>Priority: Major
>
> This should implement {{ExecutableOperation}} for all the {{ShowOperation}}s 
> to move the execution logic out from 
> {{TableEnvironmentImpl#executeInternal()}}.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-31497) Drop the deprecated CatalogViewImpl

2023-03-17 Thread Aitozi (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31497?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aitozi updated FLINK-31497:
---
Parent: FLINK-21394
Issue Type: Sub-task  (was: Technical Debt)

> Drop the deprecated CatalogViewImpl 
> 
>
> Key: FLINK-31497
> URL: https://issues.apache.org/jira/browse/FLINK-31497
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Planner
>Reporter: Aitozi
>Priority: Major
>
> After https://issues.apache.org/jira/browse/FLINK-29585
> CatalogViewImpl not used in Flink project now, we may can drop it now cc 
> [~snuyanzin]
> But, we may have to check whether it is used in other connector's system



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-31497) Drop the deprecated CatalogViewImpl

2023-03-17 Thread Aitozi (Jira)
Aitozi created FLINK-31497:
--

 Summary: Drop the deprecated CatalogViewImpl 
 Key: FLINK-31497
 URL: https://issues.apache.org/jira/browse/FLINK-31497
 Project: Flink
  Issue Type: Technical Debt
  Components: Table SQL / Planner
Reporter: Aitozi


After https://issues.apache.org/jira/browse/FLINK-29585

CatalogViewImpl not used in Flink project now, we may can drop it now cc 
[~snuyanzin]

But, we may have to check whether it is used in other connector's system



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-31426) Upgrade the deprecated UniqueConstraint to the new one

2023-03-17 Thread Aitozi (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31426?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aitozi updated FLINK-31426:
---
Parent: FLINK-29072
Issue Type: Sub-task  (was: Technical Debt)

> Upgrade the deprecated UniqueConstraint to the new one 
> ---
>
> Key: FLINK-31426
> URL: https://issues.apache.org/jira/browse/FLINK-31426
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Hive, Table SQL / Planner
>Reporter: Aitozi
>Priority: Major
>
> https://github.com/apache/flink/pull/21522#discussion_r1133642525



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-31426) Upgrade the deprecated UniqueConstraint to the new one

2023-03-13 Thread Aitozi (Jira)
Aitozi created FLINK-31426:
--

 Summary: Upgrade the deprecated UniqueConstraint to the new one 
 Key: FLINK-31426
 URL: https://issues.apache.org/jira/browse/FLINK-31426
 Project: Flink
  Issue Type: Technical Debt
  Components: Connectors / Hive, Table SQL / Planner
Reporter: Aitozi


https://github.com/apache/flink/pull/21522#discussion_r1133642525



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31260) PushLocalHashAggIntoScanRule should also work with union RelNode

2023-03-10 Thread Aitozi (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31260?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17698902#comment-17698902
 ] 

Aitozi commented on FLINK-31260:


[~zhengyiweng] Thanks for your attention, I think the exchange in rule pattern 
can be removed. I'd like revisit this issue after 
[https://github.com/apache/flink/pull/22001] merged. Since in that PR will 
generate the pattern above.

> PushLocalHashAggIntoScanRule should also work with union RelNode
> 
>
> Key: FLINK-31260
> URL: https://issues.apache.org/jira/browse/FLINK-31260
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Reporter: Aitozi
>Priority: Major
>
> As discussed in 
> [comments|https://github.com/apache/flink/pull/22001#discussion_r1119652784] 
> Currently, {{PushLocalHashAggIntoScanRule}} match for the Exchange -> 
> LocalHashAggregate -> Scan. As a result, the following pattern can not be 
> optimized
> {code:java}
>   +- Union(all=[true], union=[type, sum$0])
>  :- Union(all=[true], union=[type, sum$0])
>  :  :- LocalHashAggregate(groupBy=[type], select=[type, 
> Partial_SUM(price) AS sum$0])
>  :  :  +- TableSourceScan(table=[[default_catalog, default_database, 
> table1, project=[type, price], metadata=[]]], fields=[type, price])
>  :  +- LocalHashAggregate(groupBy=[type], select=[type, 
> Partial_SUM(price) AS sum$0])
>  : +- TableSourceScan(table=[[default_catalog, default_database, 
> table2, project=[type, price], metadata=[]]], fields=[type, price])
>  +- LocalHashAggregate(groupBy=[type], select=[type, 
> Partial_SUM(price) AS sum$0])
> +- TableSourceScan(table=[[default_catalog, default_database, 
> table3, project=[type, price], metadata=[]]], fields=[type, price])
> {code}
> We should extend the rule to support this pattern.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-31390) Optimize the FlinkChangelogModeInferenceProgram by avoiding unnecessary traversals.

2023-03-09 Thread Aitozi (Jira)
Aitozi created FLINK-31390:
--

 Summary: Optimize the FlinkChangelogModeInferenceProgram by 
avoiding unnecessary traversals.
 Key: FLINK-31390
 URL: https://issues.apache.org/jira/browse/FLINK-31390
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / Planner
Reporter: Aitozi


We can avoid the unnecessary traversals of the RelNode tree, since we are only 
interested in the first satisfied plan.

 

FlinkChangelogModeInferenceProgram
{code:java}
val updateKindTraitVisitor = new SatisfyUpdateKindTraitVisitor(context)
val finalRoot = requiredUpdateKindTraits.flatMap {
  requiredUpdateKindTrait =>
updateKindTraitVisitor.visit(rootWithModifyKindSet, 
requiredUpdateKindTrait)
}
{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-31362) Upgrade to Calcite version to 1.33.0

2023-03-07 Thread Aitozi (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31362?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aitozi updated FLINK-31362:
---
Description: 
In Calcite 1.33.0, C-style escape strings have been supported. We could 
leverage it to enhance our string literals usage.

issue: https://issues.apache.org/jira/browse/CALCITE-5305


  was:
In Calcite 1.33.0, C-style escape strings have been supported. We should 
outline its usage in document after upgrading to Calcite 1.33.0



> Upgrade to Calcite version to 1.33.0
> 
>
> Key: FLINK-31362
> URL: https://issues.apache.org/jira/browse/FLINK-31362
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / API
>Reporter: Aitozi
>Priority: Major
>
> In Calcite 1.33.0, C-style escape strings have been supported. We could 
> leverage it to enhance our string literals usage.
> issue: https://issues.apache.org/jira/browse/CALCITE-5305



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-31362) Upgrade to Calcite version to 1.33.0

2023-03-07 Thread Aitozi (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31362?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aitozi updated FLINK-31362:
---
Component/s: Table SQL / API
 (was: Documentation)

> Upgrade to Calcite version to 1.33.0
> 
>
> Key: FLINK-31362
> URL: https://issues.apache.org/jira/browse/FLINK-31362
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / API
>Reporter: Aitozi
>Priority: Major
>
> In Calcite 1.33.0, C-style escape strings have been supported. We should 
> outline its usage in document after upgrading to Calcite 1.33.0



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-31362) Upgrade to Calcite version to 1.33.0

2023-03-07 Thread Aitozi (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31362?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aitozi updated FLINK-31362:
---
Summary: Upgrade to Calcite version to 1.33.0  (was: Add document about how 
to use C-style escape strings)

> Upgrade to Calcite version to 1.33.0
> 
>
> Key: FLINK-31362
> URL: https://issues.apache.org/jira/browse/FLINK-31362
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Reporter: Aitozi
>Priority: Major
>
> In Calcite 1.33.0, C-style escape strings have been supported. We should 
> outline its usage in document after upgrading to Calcite 1.33.0



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-31362) Add document about how to use C-style escape strings

2023-03-07 Thread Aitozi (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31362?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aitozi updated FLINK-31362:
---
Description: 
In Calcite 1.33.0, C-style escape strings have been supported. We should 
outline its usage in document after upgrading to Calcite 1.33.0


> Add document about how to use C-style escape strings
> 
>
> Key: FLINK-31362
> URL: https://issues.apache.org/jira/browse/FLINK-31362
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Reporter: Aitozi
>Priority: Major
>
> In Calcite 1.33.0, C-style escape strings have been supported. We should 
> outline its usage in document after upgrading to Calcite 1.33.0



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-31362) Add document about how to use C-style escape strings

2023-03-07 Thread Aitozi (Jira)
Aitozi created FLINK-31362:
--

 Summary: Add document about how to use C-style escape strings
 Key: FLINK-31362
 URL: https://issues.apache.org/jira/browse/FLINK-31362
 Project: Flink
  Issue Type: Improvement
  Components: Documentation
Reporter: Aitozi






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31296) Add JoinConditionEqualityTransferRule to stream optimizer

2023-03-01 Thread Aitozi (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31296?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17695491#comment-17695491
 ] 

Aitozi commented on FLINK-31296:


But this will affect the join key pair in the Join operator which may affect 
the state compatibility, I'm not sure how to handle this implicit breaking

> Add JoinConditionEqualityTransferRule to stream optimizer
> -
>
> Key: FLINK-31296
> URL: https://issues.apache.org/jira/browse/FLINK-31296
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Reporter: Aitozi
>Priority: Major
>
> I find that {{JoinConditionEqualityTransferRule}} is a common rule for batch 
> and stream mode. So it should be added to the stream optimizer which will 
> bring performance improvement in some case.
> Maybe, other rules also need to be reviewed whether can be aligned in batch 
> and stream case.  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-31296) Add JoinConditionEqualityTransferRule to stream optimizer

2023-03-01 Thread Aitozi (Jira)
Aitozi created FLINK-31296:
--

 Summary: Add JoinConditionEqualityTransferRule to stream optimizer
 Key: FLINK-31296
 URL: https://issues.apache.org/jira/browse/FLINK-31296
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / Planner
Reporter: Aitozi


I find that {{JoinConditionEqualityTransferRule}} is a common rule for batch 
and stream mode. So it should be added to the stream optimizer which will bring 
performance improvement in some case.

Maybe, other rules also need to be reviewed whether can be aligned in batch and 
stream case.  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-31260) PushLocalHashAggIntoScanRule should also work with union RelNode

2023-02-28 Thread Aitozi (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31260?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aitozi updated FLINK-31260:
---
Component/s: Table SQL / Planner

> PushLocalHashAggIntoScanRule should also work with union RelNode
> 
>
> Key: FLINK-31260
> URL: https://issues.apache.org/jira/browse/FLINK-31260
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Reporter: Aitozi
>Priority: Major
>
> As discussed in 
> [comments|https://github.com/apache/flink/pull/22001#discussion_r1119652784] 
> Currently, {{PushLocalHashAggIntoScanRule}} match for the Exchange -> 
> LocalHashAggregate -> Scan. As a result, the following pattern can not be 
> optimized
> {code:java}
>   +- Union(all=[true], union=[type, sum$0])
>  :- Union(all=[true], union=[type, sum$0])
>  :  :- LocalHashAggregate(groupBy=[type], select=[type, 
> Partial_SUM(price) AS sum$0])
>  :  :  +- TableSourceScan(table=[[default_catalog, default_database, 
> table1, project=[type, price], metadata=[]]], fields=[type, price])
>  :  +- LocalHashAggregate(groupBy=[type], select=[type, 
> Partial_SUM(price) AS sum$0])
>  : +- TableSourceScan(table=[[default_catalog, default_database, 
> table2, project=[type, price], metadata=[]]], fields=[type, price])
>  +- LocalHashAggregate(groupBy=[type], select=[type, 
> Partial_SUM(price) AS sum$0])
> +- TableSourceScan(table=[[default_catalog, default_database, 
> table3, project=[type, price], metadata=[]]], fields=[type, price])
> {code}
> We should extend the rule to support this pattern.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-31260) PushLocalHashAggIntoScanRule should also work with union RelNode

2023-02-28 Thread Aitozi (Jira)
Aitozi created FLINK-31260:
--

 Summary: PushLocalHashAggIntoScanRule should also work with union 
RelNode
 Key: FLINK-31260
 URL: https://issues.apache.org/jira/browse/FLINK-31260
 Project: Flink
  Issue Type: Improvement
Reporter: Aitozi


As discussed in 
[comments|https://github.com/apache/flink/pull/22001#discussion_r1119652784] 
Currently, {{PushLocalHashAggIntoScanRule}} match for the Exchange -> 
LocalHashAggregate -> Scan. As a result, the following pattern can not be 
optimized


{code:java}
  +- Union(all=[true], union=[type, sum$0])
 :- Union(all=[true], union=[type, sum$0])
 :  :- LocalHashAggregate(groupBy=[type], select=[type, 
Partial_SUM(price) AS sum$0])
 :  :  +- TableSourceScan(table=[[default_catalog, default_database, 
table1, project=[type, price], metadata=[]]], fields=[type, price])
 :  +- LocalHashAggregate(groupBy=[type], select=[type, 
Partial_SUM(price) AS sum$0])
 : +- TableSourceScan(table=[[default_catalog, default_database, 
table2, project=[type, price], metadata=[]]], fields=[type, price])
 +- LocalHashAggregate(groupBy=[type], select=[type, Partial_SUM(price) 
AS sum$0])
+- TableSourceScan(table=[[default_catalog, default_database, 
table3, project=[type, price], metadata=[]]], fields=[type, price])

{code}

We should extend the rule to support this pattern.




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31205) do optimize for multi sink in a single relNode tree

2023-02-27 Thread Aitozi (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31205?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17694386#comment-17694386
 ] 

Aitozi commented on FLINK-31205:


looking forward to your opinion CC [~godfreyhe] [~twalthr] [~snuyanzin]

> do optimize for multi sink in a single relNode tree 
> 
>
> Key: FLINK-31205
> URL: https://issues.apache.org/jira/browse/FLINK-31205
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Reporter: Aitozi
>Priority: Major
>
> Flink supports multi sink usage, but it optimize the each sink in a 
> individual RelNode tree, this will miss some opportunity to do some cross 
> tree optimization, eg: 
> {code:java}
> create table newX(
>   a int,
>   b bigint,
>   c varchar,
>   d varchar,
>   e varchar
> ) with (
>   'connector' = 'values'
>   ,'enable-projection-push-down' = 'true'
> insert into sink_table select a, b from newX
> insert into sink_table select a, 1 from newX
> {code}
> It will produce the plan as below, this will cause the source be consumed 
> twice
> {code:java}
> Sink(table=[default_catalog.default_database.sink_table], fields=[a, b])
> +- TableSourceScan(table=[[default_catalog, default_database, newX, 
> project=[a, b], metadata=[]]], fields=[a, b])
> Sink(table=[default_catalog.default_database.sink_table], fields=[a, b])
> +- Calc(select=[a, 1 AS b])
>+- TableSourceScan(table=[[default_catalog, default_database, newX, 
> project=[a], metadata=[]]], fields=[a])
> {code}
> In this ticket, I propose to do a global optimization for the multi sink by 
> * Megre the multi sink(with same table) into a single relNode tree with an 
> extra union node
> * After optimization, split the merged union back to the original multi sink
> In my poc, after step 1, it will produce the plan as below, I think it will 
> do good for the global performacne
> {code:java}
> Sink(table=[default_catalog.default_database.sink_table], fields=[a, b])
> +- Union(all=[true], union=[a, b])
>:- TableSourceScan(table=[[default_catalog, default_database, newX, 
> project=[a, b], metadata=[]]], fields=[a, b])(reuse_id=[1])
>+- Calc(select=[a AS $f0, CAST(1 AS BIGINT) AS $f1])
>   +- Reused(reference_id=[1])
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31205) do optimize for multi sink in a single relNode tree

2023-02-27 Thread Aitozi (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31205?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17694372#comment-17694372
 ] 

Aitozi commented on FLINK-31205:


After some research, I found that there are better choices than using a union 
to get a single tree. {{Union}} can only cover the use case of multi-sink to 
the same table because the {{Union}} enforces the type consistency.

We can add a new "virtual" RelNode, accepting the multi-sink as input. It can 
work as packing the multi-tree together so that, from the perspective of the 
optimizer, it can have the ability to do global optimization.

In my POC, I add a new type RelNode named {{MultiSink}} before passing it to 
the calcite optimizer. 
The MultiSink does not do any transformation on the inputs.

After logical optimization, the plan is

{code:java}
LogicalMultiSink
:- LogicalSink(table=[default_catalog.default_database.sink_table], fields=[a, 
b])
:  +- LogicalProject(inputs=[0..1])
: +- LogicalTableScan(table=[[default_catalog, default_database, newX]])
+- LogicalSink(table=[default_catalog.default_database.sink_table], fields=[a, 
b])
   +- LogicalProject(inputs=[0], exprs=[[1:BIGINT]])
  +- LogicalTableScan(table=[[default_catalog, default_database, newX]])
{code}

After physical optimization, the plan is 

{code:java}
MultiSink
:- Sink(table=[default_catalog.default_database.sink_table], fields=[a, b])
:  +- TableSourceScan(table=[[default_catalog, default_database, newX, 
project=[a, b], metadata=[]]], fields=[a, b])
+- Sink(table=[default_catalog.default_database.sink_table], fields=[$f0, $f1])
   +- Calc(select=[a AS $f0, 1:BIGINT AS $f1])
  +- TableSourceScan(table=[[default_catalog, default_database, newX, 
project=[a, b], metadata=[]]], fields=[a, b])
{code}

Before transforming to the ExecNode, we remove the {{MultiSink}} (which is only 
intended to work during the optimizing phase), then the final result can be 

{code:java}
TableSourceScan(table=[[default_catalog, default_database, newX, project=[a, 
b], metadata=[]]], fields=[a, b])(reuse_id=[1])

Sink(table=[default_catalog.default_database.sink_table], fields=[a, b])
+- Reused(reference_id=[1])

Sink(table=[default_catalog.default_database.sink_table], fields=[$f0, $f1])
+- Calc(select=[a AS $f0, 1 AS $f1])
   +- Reused(reference_id=[1])
{code}

With the new RelNode, single-tree optimization is possible. We can do more 
things during the single tree optimization, e.g., introduce the cost model for 
the CTE to decide whether to inline/reuse and so on.

> do optimize for multi sink in a single relNode tree 
> 
>
> Key: FLINK-31205
> URL: https://issues.apache.org/jira/browse/FLINK-31205
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Reporter: Aitozi
>Priority: Major
>
> Flink supports multi sink usage, but it optimize the each sink in a 
> individual RelNode tree, this will miss some opportunity to do some cross 
> tree optimization, eg: 
> {code:java}
> create table newX(
>   a int,
>   b bigint,
>   c varchar,
>   d varchar,
>   e varchar
> ) with (
>   'connector' = 'values'
>   ,'enable-projection-push-down' = 'true'
> insert into sink_table select a, b from newX
> insert into sink_table select a, 1 from newX
> {code}
> It will produce the plan as below, this will cause the source be consumed 
> twice
> {code:java}
> Sink(table=[default_catalog.default_database.sink_table], fields=[a, b])
> +- TableSourceScan(table=[[default_catalog, default_database, newX, 
> project=[a, b], metadata=[]]], fields=[a, b])
> Sink(table=[default_catalog.default_database.sink_table], fields=[a, b])
> +- Calc(select=[a, 1 AS b])
>+- TableSourceScan(table=[[default_catalog, default_database, newX, 
> project=[a], metadata=[]]], fields=[a])
> {code}
> In this ticket, I propose to do a global optimization for the multi sink by 
> * Megre the multi sink(with same table) into a single relNode tree with an 
> extra union node
> * After optimization, split the merged union back to the original multi sink
> In my poc, after step 1, it will produce the plan as below, I think it will 
> do good for the global performacne
> {code:java}
> Sink(table=[default_catalog.default_database.sink_table], fields=[a, b])
> +- Union(all=[true], union=[a, b])
>:- TableSourceScan(table=[[default_catalog, default_database, newX, 
> project=[a, b], metadata=[]]], fields=[a, b])(reuse_id=[1])
>+- Calc(select=[a AS $f0, CAST(1 AS BIGINT) AS $f1])
>   +- Reused(reference_id=[1])
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-31205) do optimize for multi sink in a single relNode tree

2023-02-23 Thread Aitozi (Jira)
Aitozi created FLINK-31205:
--

 Summary: do optimize for multi sink in a single relNode tree 
 Key: FLINK-31205
 URL: https://issues.apache.org/jira/browse/FLINK-31205
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / Planner
Reporter: Aitozi


Flink supports multi sink usage, but it optimize the each sink in a individual 
RelNode tree, this will miss some opportunity to do some cross tree 
optimization, eg: 


{code:java}
create table newX(
  a int,
  b bigint,
  c varchar,
  d varchar,
  e varchar
) with (
  'connector' = 'values'
  ,'enable-projection-push-down' = 'true'


insert into sink_table select a, b from newX
insert into sink_table select a, 1 from newX
{code}

It will produce the plan as below, this will cause the source be consumed twice


{code:java}
Sink(table=[default_catalog.default_database.sink_table], fields=[a, b])
+- TableSourceScan(table=[[default_catalog, default_database, newX, project=[a, 
b], metadata=[]]], fields=[a, b])

Sink(table=[default_catalog.default_database.sink_table], fields=[a, b])
+- Calc(select=[a, 1 AS b])
   +- TableSourceScan(table=[[default_catalog, default_database, newX, 
project=[a], metadata=[]]], fields=[a])

{code}

In this ticket, I propose to do a global optimization for the multi sink by 
* Megre the multi sink(with same table) into a single relNode tree with an 
extra union node
* After optimization, split the merged union back to the original multi sink

In my poc, after step 1, it will produce the plan as below, I think it will do 
good for the global performacne


{code:java}
Sink(table=[default_catalog.default_database.sink_table], fields=[a, b])
+- Union(all=[true], union=[a, b])
   :- TableSourceScan(table=[[default_catalog, default_database, newX, 
project=[a, b], metadata=[]]], fields=[a, b])(reuse_id=[1])
   +- Calc(select=[a AS $f0, CAST(1 AS BIGINT) AS $f1])
  +- Reused(reference_id=[1])
{code}






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-30551) Add open method to PartitionCommitPolicy

2023-02-21 Thread Aitozi (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30551?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17691568#comment-17691568
 ] 

Aitozi commented on FLINK-30551:


In my PR, only the table's with properties will be passed to the open method. 
But I think it will be convenient to also pass the flink conf to the open 
method, because some custom metaStore's config are in job level can be set to 
the flink conf. What about keeping the both by adding the interface like

{code:java}
default void open(Configuration tableOptions, Configuration flinkConf) {}
{code}


> Add open method to PartitionCommitPolicy
> 
>
> Key: FLINK-30551
> URL: https://issues.apache.org/jira/browse/FLINK-30551
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / FileSystem
>Reporter: Aitozi
>Priority: Major
>  Labels: pull-request-available
>
> Currently, the {{PartitionCommitPolicy}} do not have the open hook. The 
> custom partition commit policy does not have an appropriate entry point for 
> the init work.
> So I purpose to add an {{open}} method to make this work.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31120) ConcurrentModificationException occurred in StringFunctionsITCase.test

2023-02-20 Thread Aitozi (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31120?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17691082#comment-17691082
 ] 

Aitozi commented on FLINK-31120:


Another instance: 
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=46301=logs=ae4f8708-9994-57d3-c2d7-b892156e7812=0c940707-2659-5648-cbe6-a1ad63045f0a

> ConcurrentModificationException occurred in StringFunctionsITCase.test
> --
>
> Key: FLINK-31120
> URL: https://issues.apache.org/jira/browse/FLINK-31120
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.17.0
>Reporter: Matthias Pohl
>Priority: Blocker
>  Labels: pull-request-available, test-stability
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=46255=logs=0c940707-2659-5648-cbe6-a1ad63045f0a=075c2716-8010-5565-fe08-3c4bb45824a4=12334
> {code}
> Feb 17 04:51:25 [ERROR] Tests run: 4, Failures: 0, Errors: 1, Skipped: 0, 
> Time elapsed: 10.725 s <<< FAILURE! - in 
> org.apache.flink.table.planner.functions.StringFunctionsITCase
> Feb 17 04:51:25 [ERROR] 
> org.apache.flink.table.planner.functions.StringFunctionsITCase.test(TestCase)[4]
>  Time elapsed: 4.367 s <<< ERROR!
> Feb 17 04:51:25 org.apache.flink.table.api.TableException: Failed to execute 
> sql
> Feb 17 04:51:25 at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeQueryOperation(TableEnvironmentImpl.java:974)
> Feb 17 04:51:25 at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1422)
> Feb 17 04:51:25 at 
> org.apache.flink.table.api.internal.TableImpl.execute(TableImpl.java:476)
> Feb 17 04:51:25 at 
> org.apache.flink.table.planner.functions.BuiltInFunctionTestBase$ResultTestItem.test(BuiltInFunctionTestBase.java:354)
> Feb 17 04:51:25 at 
> org.apache.flink.table.planner.functions.BuiltInFunctionTestBase$TestSetSpec.lambda$getTestCase$4(BuiltInFunctionTestBase.java:320)
> Feb 17 04:51:25 at 
> org.apache.flink.table.planner.functions.BuiltInFunctionTestBase$TestCase.execute(BuiltInFunctionTestBase.java:113)
> Feb 17 04:51:25 at 
> org.apache.flink.table.planner.functions.BuiltInFunctionTestBase.test(BuiltInFunctionTestBase.java:93)
> Feb 17 04:51:25 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> [...]
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-30570) RexNodeExtractor#isSupportedPartitionPredicate generates unexpected partition predicates

2023-01-09 Thread Aitozi (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30570?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17656052#comment-17656052
 ] 

Aitozi commented on FLINK-30570:


I opened a PR to solve this problem, looking forward to be reviewed, thanks

> RexNodeExtractor#isSupportedPartitionPredicate generates unexpected partition 
> predicates
> 
>
> Key: FLINK-30570
> URL: https://issues.apache.org/jira/browse/FLINK-30570
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Reporter: Aitozi
>Priority: Major
>  Labels: pull-request-available
>
> Currently, the condition {{where rand(1) < 0.0001}} will be recognized as a 
> partition predicates and will be evaluated to false when compiling the SQL. 
> It has two problem. 
> First, it should not be recognized as a partition predicates, and the 
> nondeterministic function should never pass the partition pruner 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-30570) RexNodeExtractor#isSupportedPartitionPredicate generates unexpected partition predicates

2023-01-05 Thread Aitozi (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30570?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aitozi updated FLINK-30570:
---
Description: 
Currently, the condition {{where rand(1) < 0.0001}} will be recognized as a 
partition predicates and will be evaluated to false when compiling the SQL. It 
has two problem. 
First, it should not be recognized as a partition predicates, and the 
nondeterministic function should never pass the partition pruner 

  was:
Currently, the condition {{where rand(1) > 0.0125}} will be recognized as a 
partition predicates and will be evaluated to false when compiling the SQL. It 
has two problem. 
First, it should not be recognized as a partition predicates, and the 
nondeterministic function should never pass the partition pruner 


> RexNodeExtractor#isSupportedPartitionPredicate generates unexpected partition 
> predicates
> 
>
> Key: FLINK-30570
> URL: https://issues.apache.org/jira/browse/FLINK-30570
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Reporter: Aitozi
>Priority: Major
>
> Currently, the condition {{where rand(1) < 0.0001}} will be recognized as a 
> partition predicates and will be evaluated to false when compiling the SQL. 
> It has two problem. 
> First, it should not be recognized as a partition predicates, and the 
> nondeterministic function should never pass the partition pruner 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-30570) RexNodeExtractor#isSupportedPartitionPredicate generates unexpected partition predicates

2023-01-05 Thread Aitozi (Jira)
Aitozi created FLINK-30570:
--

 Summary: RexNodeExtractor#isSupportedPartitionPredicate generates 
unexpected partition predicates
 Key: FLINK-30570
 URL: https://issues.apache.org/jira/browse/FLINK-30570
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner
Reporter: Aitozi


Currently, the condition {{where rand(1) > 0.0125}} will be recognized as a 
partition predicates and will be evaluated to false when compiling the SQL. It 
has two problem. 
First, it should not be recognized as a partition predicates, and the 
nondeterministic function should never pass the partition pruner 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-30551) Add open method to PartitionCommitPolicy

2023-01-03 Thread Aitozi (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30551?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17653949#comment-17653949
 ] 

Aitozi commented on FLINK-30551:


cc [~gaoyunhaii] what do you think of this ?

> Add open method to PartitionCommitPolicy
> 
>
> Key: FLINK-30551
> URL: https://issues.apache.org/jira/browse/FLINK-30551
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / FileSystem
>Reporter: Aitozi
>Priority: Major
>
> Currently, the {{PartitionCommitPolicy}} do not have the open hook. The 
> custom partition commit policy does not have an appropriate entry point for 
> the init work.
> So I purpose to add an {{open}} method to make this work.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-30551) Add open method to PartitionCommitPolicy

2023-01-03 Thread Aitozi (Jira)
Aitozi created FLINK-30551:
--

 Summary: Add open method to PartitionCommitPolicy
 Key: FLINK-30551
 URL: https://issues.apache.org/jira/browse/FLINK-30551
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / FileSystem
Reporter: Aitozi


Currently, the {{PartitionCommitPolicy}} do not have the open hook. The custom 
partition commit policy does not have an appropriate entry point for the init 
work.
So I purpose to add an {{open}} method to make this work.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-29085) Add the name for test as hint for the current test case in BuiltInFunctionTestBase

2022-12-17 Thread Aitozi (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29085?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aitozi closed FLINK-29085.
--
Resolution: Won't Fix

> Add the name for test as hint for the current test case in 
> BuiltInFunctionTestBase
> --
>
> Key: FLINK-29085
> URL: https://issues.apache.org/jira/browse/FLINK-29085
> Project: Flink
>  Issue Type: Improvement
>  Components: Tests
>Reporter: Aitozi
>Priority: Minor
>
> when running tests extends the {{BuiltInFunctionTestBase}}, I found it's hard 
> to distinguish the failure tests, I think it will be easy to add the name 
> prefix for the {{TestItem}}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-29066) Reconsider the runtime property of the BuiltInFunctionDefinition

2022-12-17 Thread Aitozi (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29066?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aitozi closed FLINK-29066.
--
Resolution: Won't Fix

> Reconsider the runtime property of the BuiltInFunctionDefinition
> 
>
> Key: FLINK-29066
> URL: https://issues.apache.org/jira/browse/FLINK-29066
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Reporter: Aitozi
>Priority: Major
>  Labels: pull-request-available
>
> I found a bit confused when implementing the inner built in functions when 
> dealing with the runtime property. Currently, it has three types of the 
> runtime property:
> 1) runtimeclass which means flink provide a class to define the runtime 
> implementation
> 2) runtimeProvider which means the runtime class is code generated 
> 3) runtimeDefered which means it will use the calcite's sql operator to 
> mapping the codegen
> After some research, I found that we have 4 situations to deal:
> 1) non new stack operators.
> 2) new stack with own runtime class provided. eg: {{IFNULL}} -> runtimeClass
> 3) new stack translate to sql operator to provide runtime call gen.  
> eg:{{IS_NOT_TRUE}} -> runtimeDefered
> 4) new stack can not mapping to calcite's operator (mainly flink internal 
> functions) without runtime class need mapping to the runtime callgen. eg: 
> {{CURRENT_WATERMARK}}. -> runtimeProvided



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-30198) Support AdaptiveBatchScheduler to set per-task size for reducer task

2022-12-15 Thread Aitozi (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30198?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17648099#comment-17648099
 ] 

Aitozi commented on FLINK-30198:


Thanks, all guys, for your input. I agree that vertex-level tuning will be more 
complex. I think a pluggable {{VertexParallelismDecider}} is a good choice. 
Maybe we can also provide some information about the vertex, eg: the vertex 
type {{Calc, Join, Local/Global Aggregate...}} to the interface to let users do 
more suitable choices. 

> Support AdaptiveBatchScheduler to set per-task size for reducer task 
> -
>
> Key: FLINK-30198
> URL: https://issues.apache.org/jira/browse/FLINK-30198
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Reporter: Aitozi
>Priority: Major
>
> When we use AdaptiveBatchScheduler in our case, we found that it can work 
> well in most case, but there is a limit that, there is only one global 
> parameter for per task data size by 
> {{jobmanager.adaptive-batch-scheduler.avg-data-volume-per-task}}. 
> However, in a map-reduce architecture, the reducer tasks are usually have 
> more complex computation logic such as aggregate/sort/join operators. So I 
> think it will be nicer if we can set the reducer and mapper task's data size 
> per task individually.
> Then, how to distinguish the reducer task?
> IMO, we can let the parallelism decider know whether the vertex have a hash 
> edge inputs. If yes, it should be a reducer task.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-30242) Push localHashAggregate pass the union node

2022-11-30 Thread Aitozi (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30242?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17641636#comment-17641636
 ] 

Aitozi commented on FLINK-30242:


> which scenario do you meet this problem?

I met this when we ran the batch job with flink-1.15. There is an unnecessary 
shuffle between the map node and local hash aggregate, and this will impact the 
performance. 
It's not an urgent need. We have already added a rule to support the local hash 
aggregate and union transpose. And it's a common optimization rule. So, it 
would be nice to contribute it upstream.
If you have already done the work, you can take this ticket. If not, I can 
prepare a PR for this.

BTW, I post two images to describe our use case problem

Thanks.

> Push localHashAggregate pass the union node
> ---
>
> Key: FLINK-30242
> URL: https://issues.apache.org/jira/browse/FLINK-30242
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Reporter: Aitozi
>Priority: Major
> Attachments: screenshot-1.png, screenshot-2.png
>
>
> The local hash aggregate after union will have an extra shuffle stage. We can 
> swap it with the union node so the local hash aggregate can chain with the 
> mapper stage saving the unnecessary shuffle, especially in the batch job.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-30242) Push localHashAggregate pass the union node

2022-11-30 Thread Aitozi (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30242?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aitozi updated FLINK-30242:
---
Attachment: screenshot-2.png

> Push localHashAggregate pass the union node
> ---
>
> Key: FLINK-30242
> URL: https://issues.apache.org/jira/browse/FLINK-30242
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Reporter: Aitozi
>Priority: Major
> Attachments: screenshot-1.png, screenshot-2.png
>
>
> The local hash aggregate after union will have an extra shuffle stage. We can 
> swap it with the union node so the local hash aggregate can chain with the 
> mapper stage saving the unnecessary shuffle, especially in the batch job.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-30242) Push localHashAggregate pass the union node

2022-11-30 Thread Aitozi (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30242?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aitozi updated FLINK-30242:
---
Attachment: (was: screenshot-1.png)

> Push localHashAggregate pass the union node
> ---
>
> Key: FLINK-30242
> URL: https://issues.apache.org/jira/browse/FLINK-30242
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Reporter: Aitozi
>Priority: Major
> Attachments: screenshot-1.png
>
>
> The local hash aggregate after union will have an extra shuffle stage. We can 
> swap it with the union node so the local hash aggregate can chain with the 
> mapper stage saving the unnecessary shuffle, especially in the batch job.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-30242) Push localHashAggregate pass the union node

2022-11-30 Thread Aitozi (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30242?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aitozi updated FLINK-30242:
---
Attachment: screenshot-1.png

> Push localHashAggregate pass the union node
> ---
>
> Key: FLINK-30242
> URL: https://issues.apache.org/jira/browse/FLINK-30242
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Reporter: Aitozi
>Priority: Major
> Attachments: screenshot-1.png
>
>
> The local hash aggregate after union will have an extra shuffle stage. We can 
> swap it with the union node so the local hash aggregate can chain with the 
> mapper stage saving the unnecessary shuffle, especially in the batch job.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-30242) Push localHashAggregate pass the union node

2022-11-30 Thread Aitozi (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30242?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aitozi updated FLINK-30242:
---
Attachment: screenshot-1.png

> Push localHashAggregate pass the union node
> ---
>
> Key: FLINK-30242
> URL: https://issues.apache.org/jira/browse/FLINK-30242
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Reporter: Aitozi
>Priority: Major
> Attachments: screenshot-1.png
>
>
> The local hash aggregate after union will have an extra shuffle stage. We can 
> swap it with the union node so the local hash aggregate can chain with the 
> mapper stage saving the unnecessary shuffle, especially in the batch job.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-30242) Push localHashAggregate pass the union node

2022-11-29 Thread Aitozi (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30242?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=1764#comment-1764
 ] 

Aitozi commented on FLINK-30242:


Hi [~Runking], Sorry I have not noticed that. If you have already done, you can 
take this ticket :)

BTW,  can you share your presentation link, I'd like to take a look 

> Push localHashAggregate pass the union node
> ---
>
> Key: FLINK-30242
> URL: https://issues.apache.org/jira/browse/FLINK-30242
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Reporter: Aitozi
>Priority: Major
>
> The local hash aggregate after union will have an extra shuffle stage. We can 
> swap it with the union node so the local hash aggregate can chain with the 
> mapper stage saving the unnecessary shuffle, especially in the batch job.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-30242) Push localHashAggregate pass the union node

2022-11-29 Thread Aitozi (Jira)
Aitozi created FLINK-30242:
--

 Summary: Push localHashAggregate pass the union node
 Key: FLINK-30242
 URL: https://issues.apache.org/jira/browse/FLINK-30242
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / Planner
Reporter: Aitozi


The local hash aggregate after union will have an extra shuffle stage. We can 
swap it with the union node so the local hash aggregate can chain with the 
mapper stage saving the unnecessary shuffle, especially in the batch job.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-30198) Support AdaptiveBatchScheduler to set per-task size for reducer task

2022-11-24 Thread Aitozi (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30198?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17638331#comment-17638331
 ] 

Aitozi commented on FLINK-30198:


cc [~wanglijie] what do you think of this ?

> Support AdaptiveBatchScheduler to set per-task size for reducer task 
> -
>
> Key: FLINK-30198
> URL: https://issues.apache.org/jira/browse/FLINK-30198
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Reporter: Aitozi
>Priority: Major
>
> When we use AdaptiveBatchScheduler in our case, we found that it can work 
> well in most case, but there is a limit that, there is only one global 
> parameter for per task data size by 
> {{jobmanager.adaptive-batch-scheduler.avg-data-volume-per-task}}. 
> However, in a map-reduce architecture, the reducer tasks are usually have 
> more complex computation logic such as aggregate/sort/join operators. So I 
> think it will be nicer if we can set the reducer and mapper task's data size 
> per task individually.
> Then, how to distinguish the reducer task?
> IMO, we can let the parallelism decider know whether the vertex have a hash 
> edge inputs. If yes, it should be a reducer task.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-30198) Support AdaptiveBatchScheduler to set per-task size for reducer task

2022-11-24 Thread Aitozi (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30198?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aitozi updated FLINK-30198:
---
Description: 
When we use AdaptiveBatchScheduler in our case, we found that it can work well 
in most case, but there is a limit that, there is only one global parameter for 
per task data size by 
{{jobmanager.adaptive-batch-scheduler.avg-data-volume-per-task}}. 

However, in a map-reduce architecture, the reducer tasks are usually have more 
complex computation logic such as aggregate/sort/join operators. So I think it 
will be nicer if we can set the reducer and mapper task's data size per task 
individually.

Then, how to distinguish the reducer task, IMO, we can let the parallelism 
decider know whether the vertex have a hash edge inputs. If yes, it should be a 
reducer task.

  was:
When we use AdaptiveBatchScheduler in our case, we found that it can work well 
in most case, but there is a limit that, there is only one global parameter for 
per task data size by 
{{jobmanager.adaptive-batch-scheduler.avg-data-volume-per-task}}. 

However, in a map-reduce architecture, the reducer tasks are usually have more 
complex computation logic such as aggregate/sort/join operators. So I think it 
will be nicer we can set the reducer and mapper task's data size per task 
individually.

Then, how to distinguish the reducer task, IMO, we can let the parallelism 
decider know whether the vertex have a hash edge inputs. If yes, it should be a 
reducer task.


> Support AdaptiveBatchScheduler to set per-task size for reducer task 
> -
>
> Key: FLINK-30198
> URL: https://issues.apache.org/jira/browse/FLINK-30198
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Reporter: Aitozi
>Priority: Major
>
> When we use AdaptiveBatchScheduler in our case, we found that it can work 
> well in most case, but there is a limit that, there is only one global 
> parameter for per task data size by 
> {{jobmanager.adaptive-batch-scheduler.avg-data-volume-per-task}}. 
> However, in a map-reduce architecture, the reducer tasks are usually have 
> more complex computation logic such as aggregate/sort/join operators. So I 
> think it will be nicer if we can set the reducer and mapper task's data size 
> per task individually.
> Then, how to distinguish the reducer task, IMO, we can let the parallelism 
> decider know whether the vertex have a hash edge inputs. If yes, it should be 
> a reducer task.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-30198) Support AdaptiveBatchScheduler to set per-task size for reducer task

2022-11-24 Thread Aitozi (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30198?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aitozi updated FLINK-30198:
---
Description: 
When we use AdaptiveBatchScheduler in our case, we found that it can work well 
in most case, but there is a limit that, there is only one global parameter for 
per task data size by 
{{jobmanager.adaptive-batch-scheduler.avg-data-volume-per-task}}. 

However, in a map-reduce architecture, the reducer tasks are usually have more 
complex computation logic such as aggregate/sort/join operators. So I think it 
will be nicer if we can set the reducer and mapper task's data size per task 
individually.

Then, how to distinguish the reducer task?
IMO, we can let the parallelism decider know whether the vertex have a hash 
edge inputs. If yes, it should be a reducer task.

  was:
When we use AdaptiveBatchScheduler in our case, we found that it can work well 
in most case, but there is a limit that, there is only one global parameter for 
per task data size by 
{{jobmanager.adaptive-batch-scheduler.avg-data-volume-per-task}}. 

However, in a map-reduce architecture, the reducer tasks are usually have more 
complex computation logic such as aggregate/sort/join operators. So I think it 
will be nicer if we can set the reducer and mapper task's data size per task 
individually.

Then, how to distinguish the reducer task, IMO, we can let the parallelism 
decider know whether the vertex have a hash edge inputs. If yes, it should be a 
reducer task.


> Support AdaptiveBatchScheduler to set per-task size for reducer task 
> -
>
> Key: FLINK-30198
> URL: https://issues.apache.org/jira/browse/FLINK-30198
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Reporter: Aitozi
>Priority: Major
>
> When we use AdaptiveBatchScheduler in our case, we found that it can work 
> well in most case, but there is a limit that, there is only one global 
> parameter for per task data size by 
> {{jobmanager.adaptive-batch-scheduler.avg-data-volume-per-task}}. 
> However, in a map-reduce architecture, the reducer tasks are usually have 
> more complex computation logic such as aggregate/sort/join operators. So I 
> think it will be nicer if we can set the reducer and mapper task's data size 
> per task individually.
> Then, how to distinguish the reducer task?
> IMO, we can let the parallelism decider know whether the vertex have a hash 
> edge inputs. If yes, it should be a reducer task.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-30198) Support AdaptiveBatchScheduler to set per-task size for reducer task

2022-11-24 Thread Aitozi (Jira)
Aitozi created FLINK-30198:
--

 Summary: Support AdaptiveBatchScheduler to set per-task size for 
reducer task 
 Key: FLINK-30198
 URL: https://issues.apache.org/jira/browse/FLINK-30198
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Coordination
Reporter: Aitozi


When we use AdaptiveBatchScheduler in our case, we found that it can work well 
in most case, but there is a limit that, there is only one global parameter for 
per task data size by 
{{jobmanager.adaptive-batch-scheduler.avg-data-volume-per-task}}. 

However, in a map-reduce architecture, the reducer tasks are usually have more 
complex computation logic such as aggregate/sort/join operators. So I think it 
will be nicer we can set the reducer and mapper task's data size per task 
individually.

Then, how to distinguish the reducer task, IMO, we can let the parallelism 
decider know whether the vertex have a hash edge inputs. If yes, it should be a 
reducer task.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-29585) Migrate TableSchema to Schema for Hive connector

2022-11-14 Thread Aitozi (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29585?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17634112#comment-17634112
 ] 

Aitozi commented on FLINK-29585:


fyi, I'm trying to work on this now. I will ping you for review after I 
finished this, thanks

> Migrate TableSchema to Schema for Hive connector
> 
>
> Key: FLINK-29585
> URL: https://issues.apache.org/jira/browse/FLINK-29585
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Hive
>Reporter: luoyuxia
>Priority: Major
>
> `TableSchema` is deprecated, we should migrate it to Schema



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-25113) Cleanup from Parquet and Orc the partition key handling logic

2022-11-14 Thread Aitozi (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17633829#comment-17633829
 ] 

Aitozi commented on FLINK-25113:


Hi [~slinkydeveloper], [~luoyuxia] , [~lsy] I have push a 
[PR|https://github.com/apache/flink/pull/21290] for this ticket, can you guys 
help review it.  

> Cleanup from Parquet and Orc the partition key handling logic
> -
>
> Key: FLINK-25113
> URL: https://issues.apache.org/jira/browse/FLINK-25113
> Project: Flink
>  Issue Type: Sub-task
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>Reporter: Francesco Guardiani
>Priority: Major
>
> After https://issues.apache.org/jira/browse/FLINK-24617 the partition key 
> handling logic is encapsuled within {{FileInfoExtractorBulkFormat}}. We 
> should cleanup this logic from orc and parquet formats, in order to simplify 
> it. Note: Hive still depends on this logic, but it should rather use 
> {{FileInfoExtractorBulkFormat}} or similar.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-30019) Remove the unused HiveTableFileInputFormat in hive connector

2022-11-14 Thread Aitozi (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30019?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17633826#comment-17633826
 ] 

Aitozi commented on FLINK-30019:


cc [~luoyuxia], [~lsy]  can you help review this PR ?

> Remove the unused HiveTableFileInputFormat in hive connector
> 
>
> Key: FLINK-30019
> URL: https://issues.apache.org/jira/browse/FLINK-30019
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Connectors / Hive
>Reporter: Aitozi
>Priority: Major
>  Labels: pull-request-available
>
> As I see, after https://issues.apache.org/jira/browse/FLINK-19888 the hive 
> connector do not reply on the HiveTableFileInputFormat now, it can be safely 
> removed now



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-30019) Remove the unused HiveTableFileInputFormat in hive connector

2022-11-14 Thread Aitozi (Jira)
Aitozi created FLINK-30019:
--

 Summary: Remove the unused HiveTableFileInputFormat in hive 
connector
 Key: FLINK-30019
 URL: https://issues.apache.org/jira/browse/FLINK-30019
 Project: Flink
  Issue Type: Technical Debt
  Components: Connectors / Hive
Reporter: Aitozi


As I see, after https://issues.apache.org/jira/browse/FLINK-19888 the hive 
connector do not reply on the HiveTableFileInputFormat now, it can be safely 
removed now



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-25113) Cleanup from Parquet and Orc the partition key handling logic

2022-11-11 Thread Aitozi (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17632146#comment-17632146
 ] 

Aitozi commented on FLINK-25113:


When I try to work on this, I found that I can't simply break this to two 
separate work, because the partition keys in the parquet/orc formats will 
affect the hive source after using the {{FileInfoExtractorBulkFormat}}. So, I 
create a PR with these two commits to complete this work.

> Cleanup from Parquet and Orc the partition key handling logic
> -
>
> Key: FLINK-25113
> URL: https://issues.apache.org/jira/browse/FLINK-25113
> Project: Flink
>  Issue Type: Sub-task
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>Reporter: Francesco Guardiani
>Priority: Major
>
> After https://issues.apache.org/jira/browse/FLINK-24617 the partition key 
> handling logic is encapsuled within {{FileInfoExtractorBulkFormat}}. We 
> should cleanup this logic from orc and parquet formats, in order to simplify 
> it. Note: Hive still depends on this logic, but it should rather use 
> {{FileInfoExtractorBulkFormat}} or similar.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-25113) Cleanup from Parquet and Orc the partition key handling logic

2022-11-10 Thread Aitozi (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17631679#comment-17631679
 ] 

Aitozi commented on FLINK-25113:


sorry for missing the ticket link :) 
https://issues.apache.org/jira/browse/FLINK-29980

> Cleanup from Parquet and Orc the partition key handling logic
> -
>
> Key: FLINK-25113
> URL: https://issues.apache.org/jira/browse/FLINK-25113
> Project: Flink
>  Issue Type: Sub-task
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>Reporter: Francesco Guardiani
>Priority: Major
>
> After https://issues.apache.org/jira/browse/FLINK-24617 the partition key 
> handling logic is encapsuled within {{FileInfoExtractorBulkFormat}}. We 
> should cleanup this logic from orc and parquet formats, in order to simplify 
> it. Note: Hive still depends on this logic, but it should rather use 
> {{FileInfoExtractorBulkFormat}} or similar.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-25113) Cleanup from Parquet and Orc the partition key handling logic

2022-11-10 Thread Aitozi (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17631632#comment-17631632
 ] 

Aitozi commented on FLINK-25113:


hi [~slinkydeveloper], I created a preceding ticket to improve the hive source 
to handle the partition keys. I'd like to work on it, can you help assign the 
ticket to me ?

> Cleanup from Parquet and Orc the partition key handling logic
> -
>
> Key: FLINK-25113
> URL: https://issues.apache.org/jira/browse/FLINK-25113
> Project: Flink
>  Issue Type: Sub-task
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>Reporter: Francesco Guardiani
>Priority: Major
>
> After https://issues.apache.org/jira/browse/FLINK-24617 the partition key 
> handling logic is encapsuled within {{FileInfoExtractorBulkFormat}}. We 
> should cleanup this logic from orc and parquet formats, in order to simplify 
> it. Note: Hive still depends on this logic, but it should rather use 
> {{FileInfoExtractorBulkFormat}} or similar.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


  1   2   3   4   5   6   7   8   9   >