[jira] [Commented] (FLINK-24924) TO_TIMESTAMP and TO_DATE should fail

2021-11-16 Thread Kurt Young (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-24924?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17444896#comment-17444896
 ] 

Kurt Young commented on FLINK-24924:


I would second [~TsReaper] 's concern and i remember we have faced such dilemma 
before. Imagine an online job has been ran for couple of weeks and suddenly 
failed by some unexpected dirty data. What choices do users have to quickly 
resume the job? 

> TO_TIMESTAMP and TO_DATE should fail
> 
>
> Key: FLINK-24924
> URL: https://issues.apache.org/jira/browse/FLINK-24924
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Francesco Guardiani
>Priority: Major
>
> In a similar fashion to what described 
> https://issues.apache.org/jira/browse/FLINK-24385, TO_TIMESTAMP and TO_DATE 
> should fail instead of returning {{null}}.
> In particular for these two functions, a failure in parsing could lead to 
> very unexpected behavior, for example it could lead to records with null 
> rowtime.
> We should change these functions to fail by default when parsing generates an 
> error. We can let users handle errors by letting them use TRY_CAST for the 
> same functionality:
> {code:sql}
> -- This fails when input is invalid
> TO_TIMESTAMP(input)
> -- Behaves the same as above
> CAST(input AS TIMESTAMP)
> -- This returns null when input is invalid
> TRY_CAST(input AS TIMESTAMP)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-24717) Push down partitions before filters

2021-11-01 Thread Kurt Young (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-24717?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17437101#comment-17437101
 ] 

Kurt Young commented on FLINK-24717:


assigned to you [~xuyangzhong]

> Push down partitions before filters
> ---
>
> Key: FLINK-24717
> URL: https://issues.apache.org/jira/browse/FLINK-24717
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Planner
>Reporter: Timo Walther
>Assignee: xuyang
>Priority: Major
> Fix For: 1.15.0
>
>
> Currently, we push filters before partitions. This means that a 
> {{applyFilters}} needs to have partition logic to extract the partition 
> predicate. Furthermore, if a {{applyFilters}} consumes all filters (no 
> remaining predicates), the {{applyPartitions}} is never called.
> We should execute the {{PushPartitionIntoTableSourceScanRule}} first and 
> check for side effects of this change.
> See 
> {{org.apache.flink.table.planner.plan.rules.logical.PushProjectIntoTableSourceScanRuleTest#testMetadataProjectionWithoutProjectionPushDownWhenNotSupportedAndNoneSelected}}
>  for an example of using the new test infrastructure.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (FLINK-24717) Push down partitions before filters

2021-11-01 Thread Kurt Young (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-24717?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kurt Young reassigned FLINK-24717:
--

Assignee: xuyang

> Push down partitions before filters
> ---
>
> Key: FLINK-24717
> URL: https://issues.apache.org/jira/browse/FLINK-24717
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Planner
>Reporter: Timo Walther
>Assignee: xuyang
>Priority: Major
> Fix For: 1.15.0
>
>
> Currently, we push filters before partitions. This means that a 
> {{applyFilters}} needs to have partition logic to extract the partition 
> predicate. Furthermore, if a {{applyFilters}} consumes all filters (no 
> remaining predicates), the {{applyPartitions}} is never called.
> We should execute the {{PushPartitionIntoTableSourceScanRule}} first and 
> check for side effects of this change.
> See 
> {{org.apache.flink.table.planner.plan.rules.logical.PushProjectIntoTableSourceScanRuleTest#testMetadataProjectionWithoutProjectionPushDownWhenNotSupportedAndNoneSelected}}
>  for an example of using the new test infrastructure.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-24717) Push down partitions before filters

2021-11-01 Thread Kurt Young (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-24717?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kurt Young updated FLINK-24717:
---
Fix Version/s: 1.15.0

> Push down partitions before filters
> ---
>
> Key: FLINK-24717
> URL: https://issues.apache.org/jira/browse/FLINK-24717
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Planner
>Reporter: Timo Walther
>Priority: Major
> Fix For: 1.15.0
>
>
> Currently, we push filters before partitions. This means that a 
> {{applyFilters}} needs to have partition logic to extract the partition 
> predicate. Furthermore, if a {{applyFilters}} consumes all filters (no 
> remaining predicates), the {{applyPartitions}} is never called.
> We should execute the {{PushPartitionIntoTableSourceScanRule}} first and 
> check for side effects of this change.
> See 
> {{org.apache.flink.table.planner.plan.rules.logical.PushProjectIntoTableSourceScanRuleTest#testMetadataProjectionWithoutProjectionPushDownWhenNotSupportedAndNoneSelected}}
>  for an example of using the new test infrastructure.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-24536) flink sql where condition is not supported != writing

2021-11-01 Thread Kurt Young (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-24536?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kurt Young updated FLINK-24536:
---
Fix Version/s: (was: 1.14.1)
   1.15.0

> flink sql where condition is not supported  != writing
> --
>
> Key: FLINK-24536
> URL: https://issues.apache.org/jira/browse/FLINK-24536
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Affects Versions: 1.14.0
>Reporter: simenliuxing
>Assignee: liwei li
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.15.0
>
>
> sql:
>  
> {code:java}
> CREATE TABLE source
> (
>  id INT,
>  name STRING,
>  money DECIMAL(32, 2),
>  dateone timestamp,
>  age bigint,
>  datethree timestamp,
>  datesix timestamp(6),
>  datenigth timestamp(9),
>  dtdate date,
>  dttime time
> ) WITH (
>  'connector' = 'datagen'
>  ,'rows-per-second' = '1'
>  );
> CREATE TABLE sink
> (
>  id bigint,
>  name STRING
> ) WITH (
>  'connector' = 'print'
>  );
> insert into sink
> select sum(id) as id, name
> from source
> where name != 'aa'
> group by name;
> {code}
>  
> exception:
>  
> {code:java}
> Caused by: org.apache.calcite.sql.parser.SqlParseException: Bang equal '!=' 
> is not allowed under the current SQL conformance levelCaused by: 
> org.apache.calcite.sql.parser.SqlParseException: Bang equal '!=' is not 
> allowed under the current SQL conformance level at 
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.convertException(FlinkSqlParserImpl.java:462)
>  at 
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.normalizeException(FlinkSqlParserImpl.java:225)
>  at 
> org.apache.calcite.sql.parser.SqlParser.handleException(SqlParser.java:140) 
> at org.apache.calcite.sql.parser.SqlParser.parseQuery(SqlParser.java:155) at 
> org.apache.calcite.sql.parser.SqlParser.parseStmt(SqlParser.java:180) at 
> org.apache.flink.table.planner.parse.CalciteParser.parse(CalciteParser.java:54)
>  ... 22 moreCaused by: org.apache.calcite.runtime.CalciteException: Bang 
> equal '!=' is not allowed under the current SQL conformance level at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>  at 
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>  at java.lang.reflect.Constructor.newInstance(Constructor.java:423) at 
> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:467) 
> at org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:560) at 
> org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:883) at 
> org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:868) at 
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.BinaryRowOperator(FlinkSqlParserImpl.java:31759)
>  at 
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.Expression2(FlinkSqlParserImpl.java:19802)
>  at 
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.Expression(FlinkSqlParserImpl.java:19553)
>  at 
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.WhereOpt(FlinkSqlParserImpl.java:14370)
>  at 
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlSelect(FlinkSqlParserImpl.java:7836)
>  at 
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.LeafQuery(FlinkSqlParserImpl.java:704)
>  at 
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.LeafQueryOrExpr(FlinkSqlParserImpl.java:19536)
>  at 
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.QueryOrExpr(FlinkSqlParserImpl.java:18982)
>  at 
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.OrderedQueryOrExpr(FlinkSqlParserImpl.java:578)
>  at 
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.RichSqlInsert(FlinkSqlParserImpl.java:5596)
>  at 
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlStmt(FlinkSqlParserImpl.java:3404)
>  at 
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlStmtEof(FlinkSqlParserImpl.java:3980)
>  at 
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.parseSqlStmtEof(FlinkSqlParserImpl.java:273)
>  at org.apache.calcite.sql.parser.SqlParser.parseQuery(SqlParser.java:153) 
> ... 24 more{code}
>  
> It is ok when I use the following syntax:
> where name <> 'aa'
> Why not support '!=' This kind of grammar, will it be supported later?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-24704) Exception occurs when the input record loses monotonicity on the sort key field of UpdatableTopNFunction

2021-11-01 Thread Kurt Young (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-24704?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kurt Young updated FLINK-24704:
---
Priority: Critical  (was: Minor)

> Exception occurs when the input record loses monotonicity on the sort key 
> field of UpdatableTopNFunction
> 
>
> Key: FLINK-24704
> URL: https://issues.apache.org/jira/browse/FLINK-24704
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.14.0
>Reporter: lincoln lee
>Assignee: lincoln lee
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.15.0
>
>
> An IllegalArgumentException occurred when the input retract record's sort key 
> is lower than old sort key, this's because it breaks the monotonicity on sort 
> key field which is guaranteed by the sql semantic. It's highly possible 
> upstream stateful operator has shorter state ttl than the stream records is 
> that cause the staled record cleared by state ttl. 
> A reproduce case like below:
> {code:java|title=RankHarnessTest.java|borderStyle=solid}
> }}
> val sql =
>  """
>  |SELECT word, cnt, rank_num
>  |FROM (
>  | SELECT word, cnt,
>  | ROW_NUMBER() OVER (PARTITION BY type ORDER BY cnt DESC) as rank_num
>  | FROM (
>  | select word, type, sum(cnt) filter (where cnt > 0) cnt from T group by 
> word, type
>  | )
>  | )
>  |WHERE rank_num <= 6
>  """.stripMargin
> {code}
> when then aggregated result column `cnt` becomes lower for a key, then 
> downstream retract rank operator will fail on such exception:
>  
> {quote}java.lang.IllegalArgumentExceptionjava.lang.IllegalArgumentException 
> at org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:122) 
> at 
> org.apache.flink.table.runtime.operators.rank.UpdatableTopNFunction.emitRecordsWithRowNumber(UpdatableTopNFunction.java:399)
>  at 
> org.apache.flink.table.runtime.operators.rank.UpdatableTopNFunction.processElementWithRowNumber(UpdatableTopNFunction.java:274)
>  at 
> org.apache.flink.table.runtime.operators.rank.UpdatableTopNFunction.processElement(UpdatableTopNFunction.java:167)
>  at 
> org.apache.flink.table.runtime.operators.rank.UpdatableTopNFunction.processElement(UpdatableTopNFunction.java:69)
>  at 
> org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:83)
>  at 
> org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness.processElement(OneInputStreamOperatorTestHarness.java:209)
> {quote}
> Here we should align with the RetractableTopNFunction, continue 
> processing(but incorrectly result) by default or can be configured to 
> failover after 
> [Flink-24666|https://issues.apache.org/jira/browse/FLINK-24666] was addressed.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-24704) Exception occurs when the input record loses monotonicity on the sort key field of UpdatableTopNFunction

2021-11-01 Thread Kurt Young (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-24704?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17436653#comment-17436653
 ] 

Kurt Young commented on FLINK-24704:


I changed the priority to critical, and we'd better to also back port this fix 
to 1.14

> Exception occurs when the input record loses monotonicity on the sort key 
> field of UpdatableTopNFunction
> 
>
> Key: FLINK-24704
> URL: https://issues.apache.org/jira/browse/FLINK-24704
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.14.0
>Reporter: lincoln lee
>Assignee: lincoln lee
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.15.0, 1.14.1
>
>
> An IllegalArgumentException occurred when the input retract record's sort key 
> is lower than old sort key, this's because it breaks the monotonicity on sort 
> key field which is guaranteed by the sql semantic. It's highly possible 
> upstream stateful operator has shorter state ttl than the stream records is 
> that cause the staled record cleared by state ttl. 
> A reproduce case like below:
> {code:java|title=RankHarnessTest.java|borderStyle=solid}
> }}
> val sql =
>  """
>  |SELECT word, cnt, rank_num
>  |FROM (
>  | SELECT word, cnt,
>  | ROW_NUMBER() OVER (PARTITION BY type ORDER BY cnt DESC) as rank_num
>  | FROM (
>  | select word, type, sum(cnt) filter (where cnt > 0) cnt from T group by 
> word, type
>  | )
>  | )
>  |WHERE rank_num <= 6
>  """.stripMargin
> {code}
> when then aggregated result column `cnt` becomes lower for a key, then 
> downstream retract rank operator will fail on such exception:
>  
> {quote}java.lang.IllegalArgumentExceptionjava.lang.IllegalArgumentException 
> at org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:122) 
> at 
> org.apache.flink.table.runtime.operators.rank.UpdatableTopNFunction.emitRecordsWithRowNumber(UpdatableTopNFunction.java:399)
>  at 
> org.apache.flink.table.runtime.operators.rank.UpdatableTopNFunction.processElementWithRowNumber(UpdatableTopNFunction.java:274)
>  at 
> org.apache.flink.table.runtime.operators.rank.UpdatableTopNFunction.processElement(UpdatableTopNFunction.java:167)
>  at 
> org.apache.flink.table.runtime.operators.rank.UpdatableTopNFunction.processElement(UpdatableTopNFunction.java:69)
>  at 
> org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:83)
>  at 
> org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness.processElement(OneInputStreamOperatorTestHarness.java:209)
> {quote}
> Here we should align with the RetractableTopNFunction, continue 
> processing(but incorrectly result) by default or can be configured to 
> failover after 
> [Flink-24666|https://issues.apache.org/jira/browse/FLINK-24666] was addressed.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-24704) Exception occurs when the input record loses monotonicity on the sort key field of UpdatableTopNFunction

2021-11-01 Thread Kurt Young (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-24704?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kurt Young updated FLINK-24704:
---
Fix Version/s: 1.14.1

> Exception occurs when the input record loses monotonicity on the sort key 
> field of UpdatableTopNFunction
> 
>
> Key: FLINK-24704
> URL: https://issues.apache.org/jira/browse/FLINK-24704
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.14.0
>Reporter: lincoln lee
>Assignee: lincoln lee
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.15.0, 1.14.1
>
>
> An IllegalArgumentException occurred when the input retract record's sort key 
> is lower than old sort key, this's because it breaks the monotonicity on sort 
> key field which is guaranteed by the sql semantic. It's highly possible 
> upstream stateful operator has shorter state ttl than the stream records is 
> that cause the staled record cleared by state ttl. 
> A reproduce case like below:
> {code:java|title=RankHarnessTest.java|borderStyle=solid}
> }}
> val sql =
>  """
>  |SELECT word, cnt, rank_num
>  |FROM (
>  | SELECT word, cnt,
>  | ROW_NUMBER() OVER (PARTITION BY type ORDER BY cnt DESC) as rank_num
>  | FROM (
>  | select word, type, sum(cnt) filter (where cnt > 0) cnt from T group by 
> word, type
>  | )
>  | )
>  |WHERE rank_num <= 6
>  """.stripMargin
> {code}
> when then aggregated result column `cnt` becomes lower for a key, then 
> downstream retract rank operator will fail on such exception:
>  
> {quote}java.lang.IllegalArgumentExceptionjava.lang.IllegalArgumentException 
> at org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:122) 
> at 
> org.apache.flink.table.runtime.operators.rank.UpdatableTopNFunction.emitRecordsWithRowNumber(UpdatableTopNFunction.java:399)
>  at 
> org.apache.flink.table.runtime.operators.rank.UpdatableTopNFunction.processElementWithRowNumber(UpdatableTopNFunction.java:274)
>  at 
> org.apache.flink.table.runtime.operators.rank.UpdatableTopNFunction.processElement(UpdatableTopNFunction.java:167)
>  at 
> org.apache.flink.table.runtime.operators.rank.UpdatableTopNFunction.processElement(UpdatableTopNFunction.java:69)
>  at 
> org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:83)
>  at 
> org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness.processElement(OneInputStreamOperatorTestHarness.java:209)
> {quote}
> Here we should align with the RetractableTopNFunction, continue 
> processing(but incorrectly result) by default or can be configured to 
> failover after 
> [Flink-24666|https://issues.apache.org/jira/browse/FLINK-24666] was addressed.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-24691) FLINK SQL SUM() causes a precision error

2021-10-31 Thread Kurt Young (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-24691?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kurt Young updated FLINK-24691:
---
Priority: Critical  (was: Major)

> FLINK SQL SUM() causes a precision error
> 
>
> Key: FLINK-24691
> URL: https://issues.apache.org/jira/browse/FLINK-24691
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.14.0
>Reporter: Zongwen Li
>Assignee: Marios Trivyzas
>Priority: Critical
> Fix For: 1.15.0, 1.14.1
>
> Attachments: image-2021-10-29-15-49-14-419.png
>
>
> code: 
> {code:java}
> tableEnv.executeSql("select sum(cast( 1.03520274 as decimal(32, 8))) as num 
> ").print();
> {code}
> expected:
> 1.03520274
> actual:
> 1.03520270
>  
> This function is normal in version 1.13.x.
> I found the code that caused the inconsistency:
> org.apache.flink.table.types.logical.utils.LogicalTypeMerging#adjustPrecisionScale
>  method was added in 1.14. Because sum() generates max precision(38) by 
> default, it leads to special circumstances, which reduces the scale by 1.
>   !image-2021-10-29-15-49-14-419.png!



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-24691) FLINK SQL SUM() causes a precision error

2021-10-31 Thread Kurt Young (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-24691?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kurt Young updated FLINK-24691:
---
Fix Version/s: 1.14.1
   1.15.0

> FLINK SQL SUM() causes a precision error
> 
>
> Key: FLINK-24691
> URL: https://issues.apache.org/jira/browse/FLINK-24691
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.14.0
>Reporter: Zongwen Li
>Assignee: Marios Trivyzas
>Priority: Major
> Fix For: 1.15.0, 1.14.1
>
> Attachments: image-2021-10-29-15-49-14-419.png
>
>
> code: 
> {code:java}
> tableEnv.executeSql("select sum(cast( 1.03520274 as decimal(32, 8))) as num 
> ").print();
> {code}
> expected:
> 1.03520274
> actual:
> 1.03520270
>  
> This function is normal in version 1.13.x.
> I found the code that caused the inconsistency:
> org.apache.flink.table.types.logical.utils.LogicalTypeMerging#adjustPrecisionScale
>  method was added in 1.14. Because sum() generates max precision(38) by 
> default, it leads to special circumstances, which reduces the scale by 1.
>   !image-2021-10-29-15-49-14-419.png!



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-24704) Exception occurs when the input record loses monotonicity on the sort key field of UpdatableTopNFunction

2021-10-31 Thread Kurt Young (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-24704?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17436583#comment-17436583
 ] 

Kurt Young commented on FLINK-24704:


Thanks for reporting this [~lincoln.86xy], will this leads to incorrect result 
or endless exception-failover-exception loop?

> Exception occurs when the input record loses monotonicity on the sort key 
> field of UpdatableTopNFunction
> 
>
> Key: FLINK-24704
> URL: https://issues.apache.org/jira/browse/FLINK-24704
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.14.0
>Reporter: lincoln lee
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.15.0
>
>
> An IllegalArgumentException occurred when the input retract record's sort key 
> is lower than old sort key, this's because it breaks the monotonicity on sort 
> key field which is guaranteed by the sql semantic. It's highly possible 
> upstream stateful operator has shorter state ttl than the stream records is 
> that cause the staled record cleared by state ttl. 
> A reproduce case like below:
> {{{code:title=RankHarnessTest.java|borderStyle=solid}}}
> val sql =
>  """
>  |SELECT word, cnt, rank_num
>  |FROM (
>  | SELECT word, cnt,
>  | ROW_NUMBER() OVER (PARTITION BY type ORDER BY cnt DESC) as rank_num
>  | FROM (
>  | select word, type, sum(cnt) filter (where cnt > 0) cnt from T group by 
> word, type
>  | )
>  | )
>  |WHERE rank_num <= 6
>  """.stripMargin
> {code}
> when then aggregated result column `cnt` becomes lower for a key, then 
> downstream retract rank operator will fail on such exception:
>  
> {quote}java.lang.IllegalArgumentExceptionjava.lang.IllegalArgumentException 
> at org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:122) 
> at 
> org.apache.flink.table.runtime.operators.rank.UpdatableTopNFunction.emitRecordsWithRowNumber(UpdatableTopNFunction.java:399)
>  at 
> org.apache.flink.table.runtime.operators.rank.UpdatableTopNFunction.processElementWithRowNumber(UpdatableTopNFunction.java:274)
>  at 
> org.apache.flink.table.runtime.operators.rank.UpdatableTopNFunction.processElement(UpdatableTopNFunction.java:167)
>  at 
> org.apache.flink.table.runtime.operators.rank.UpdatableTopNFunction.processElement(UpdatableTopNFunction.java:69)
>  at 
> org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:83)
>  at 
> org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness.processElement(OneInputStreamOperatorTestHarness.java:209)
> {quote}
> Here we should align with the RetractableTopNFunction, continue 
> processing(but incorrectly result) by default or can be configured to 
> failover after Flink-24666 was addressed.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-24708) `ConvertToNotInOrInRule` has a bug which leads to wrong result

2021-10-31 Thread Kurt Young (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-24708?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kurt Young updated FLINK-24708:
---
Fix Version/s: 1.13.4
   1.14.1

> `ConvertToNotInOrInRule` has a bug which leads to wrong result
> --
>
> Key: FLINK-24708
> URL: https://issues.apache.org/jira/browse/FLINK-24708
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Reporter: JING ZHANG
>Assignee: JING ZHANG
>Priority: Critical
> Fix For: 1.15.0, 1.14.1, 1.13.4
>
> Attachments: image-2021-10-29-23-59-48-074.png
>
>
> A user report this bug in maillist, I paste the content here.
> We are in the process of upgrading from Flink 1.9.3 to 1.13.3.  We have 
> noticed that statements with either where UPPER(field) or LOWER(field) in 
> combination with an IN do not always evaluate correctly. 
>  
> The following test case highlights this problem.
>  
>  
> {code:java}
> import org.apache.flink.streaming.api.datastream.DataStream;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.table.api.Schema;
> import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
> public class TestCase {
> public static void main(String[] args) throws Exception {
> final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setParallelism(1);
> TestData testData = new TestData();
> testData.setField1("bcd");
> DataStream stream = env.fromElements(testData);
> stream.print();  // To prevent 'No operators' error
> final StreamTableEnvironment tableEnvironment = 
> StreamTableEnvironment.create(env);
> tableEnvironment.createTemporaryView("testTable", stream, 
> Schema.newBuilder().build());
> // Fails because abcd is larger than abc
> tableEnvironment.executeSql("select *, '1' as run from testTable 
> WHERE lower(field1) IN ('abcd', 'abc', 'bcd', 'cde')").print();
> // Succeeds because lower was removed
> tableEnvironment.executeSql("select *, '2' as run from testTable 
> WHERE field1 IN ('abcd', 'abc', 'bcd', 'cde')").print();
> // These 4 succeed because the smallest literal is before abcd
> tableEnvironment.executeSql("select *, '3' as run from testTable 
> WHERE lower(field1) IN ('abc', 'abcd', 'bcd', 'cde')").print();
> tableEnvironment.executeSql("select *, '4' as run from testTable 
> WHERE lower(field1) IN ('abc', 'bcd', 'abhi', 'cde')").print();
> tableEnvironment.executeSql("select *, '5' as run from testTable 
> WHERE lower(field1) IN ('cde', 'abcd', 'abc', 'bcd')").print();
> tableEnvironment.executeSql("select *, '6' as run from testTable 
> WHERE lower(field1) IN ('cde', 'abc', 'abcd', 'bcd')").print();
> // Fails because smallest is not first
> tableEnvironment.executeSql("select *, '7' as run from testTable 
> WHERE lower(field1) IN ('cdef', 'abce', 'abcd', 'ab', 'bcd')").print();
> // Succeeds
> tableEnvironment.executeSql("select *, '8' as run from testTable 
> WHERE lower(field1) IN ('ab', 'cdef', 'abce', 'abcdefgh', 'bcd')").print();
> env.execute("TestCase");
> }
> public static class TestData {
> private String field1;
> public String getField1() {
> return field1;
> }
> public void setField1(String field1) {
> this.field1 = field1;
> }
> }
> }
> {code}
>  
> The job produces the following output:
> Empty set
> +-+---++
> |op| field1|    run|
> +-+---++
> |+I|    bcd|  2|
> +-+---++
> 1 row in set
> +-+---++
> |op| field1|    run|
> +-+---++
> |+I|    bcd|  3|
> +-+---++
> 1 row in set
> +-+---++
> |op| field1|    run|
> +-+---++
> |+I|    bcd|  4|
> +-+---++
> 1 row in set
> 

[jira] [Updated] (FLINK-24708) `ConvertToNotInOrInRule` has a bug which leads to wrong result

2021-10-31 Thread Kurt Young (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-24708?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kurt Young updated FLINK-24708:
---
Fix Version/s: 1.15.0

> `ConvertToNotInOrInRule` has a bug which leads to wrong result
> --
>
> Key: FLINK-24708
> URL: https://issues.apache.org/jira/browse/FLINK-24708
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Reporter: JING ZHANG
>Assignee: JING ZHANG
>Priority: Major
> Fix For: 1.15.0
>
> Attachments: image-2021-10-29-23-59-48-074.png
>
>
> A user report this bug in maillist, I paste the content here.
> We are in the process of upgrading from Flink 1.9.3 to 1.13.3.  We have 
> noticed that statements with either where UPPER(field) or LOWER(field) in 
> combination with an IN do not always evaluate correctly. 
>  
> The following test case highlights this problem.
>  
>  
> {code:java}
> import org.apache.flink.streaming.api.datastream.DataStream;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.table.api.Schema;
> import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
> public class TestCase {
> public static void main(String[] args) throws Exception {
> final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setParallelism(1);
> TestData testData = new TestData();
> testData.setField1("bcd");
> DataStream stream = env.fromElements(testData);
> stream.print();  // To prevent 'No operators' error
> final StreamTableEnvironment tableEnvironment = 
> StreamTableEnvironment.create(env);
> tableEnvironment.createTemporaryView("testTable", stream, 
> Schema.newBuilder().build());
> // Fails because abcd is larger than abc
> tableEnvironment.executeSql("select *, '1' as run from testTable 
> WHERE lower(field1) IN ('abcd', 'abc', 'bcd', 'cde')").print();
> // Succeeds because lower was removed
> tableEnvironment.executeSql("select *, '2' as run from testTable 
> WHERE field1 IN ('abcd', 'abc', 'bcd', 'cde')").print();
> // These 4 succeed because the smallest literal is before abcd
> tableEnvironment.executeSql("select *, '3' as run from testTable 
> WHERE lower(field1) IN ('abc', 'abcd', 'bcd', 'cde')").print();
> tableEnvironment.executeSql("select *, '4' as run from testTable 
> WHERE lower(field1) IN ('abc', 'bcd', 'abhi', 'cde')").print();
> tableEnvironment.executeSql("select *, '5' as run from testTable 
> WHERE lower(field1) IN ('cde', 'abcd', 'abc', 'bcd')").print();
> tableEnvironment.executeSql("select *, '6' as run from testTable 
> WHERE lower(field1) IN ('cde', 'abc', 'abcd', 'bcd')").print();
> // Fails because smallest is not first
> tableEnvironment.executeSql("select *, '7' as run from testTable 
> WHERE lower(field1) IN ('cdef', 'abce', 'abcd', 'ab', 'bcd')").print();
> // Succeeds
> tableEnvironment.executeSql("select *, '8' as run from testTable 
> WHERE lower(field1) IN ('ab', 'cdef', 'abce', 'abcdefgh', 'bcd')").print();
> env.execute("TestCase");
> }
> public static class TestData {
> private String field1;
> public String getField1() {
> return field1;
> }
> public void setField1(String field1) {
> this.field1 = field1;
> }
> }
> }
> {code}
>  
> The job produces the following output:
> Empty set
> +-+---++
> |op| field1|    run|
> +-+---++
> |+I|    bcd|  2|
> +-+---++
> 1 row in set
> +-+---++
> |op| field1|    run|
> +-+---++
> |+I|    bcd|  3|
> +-+---++
> 1 row in set
> +-+---++
> |op| field1|    run|
> +-+---++
> |+I|    bcd|  4|
> +-+---++
> 1 row in set
> +-+---++
> |op| 

[jira] [Updated] (FLINK-24708) `ConvertToNotInOrInRule` has a bug which leads to wrong result

2021-10-31 Thread Kurt Young (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-24708?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kurt Young updated FLINK-24708:
---
Priority: Critical  (was: Major)

> `ConvertToNotInOrInRule` has a bug which leads to wrong result
> --
>
> Key: FLINK-24708
> URL: https://issues.apache.org/jira/browse/FLINK-24708
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Reporter: JING ZHANG
>Assignee: JING ZHANG
>Priority: Critical
> Fix For: 1.15.0
>
> Attachments: image-2021-10-29-23-59-48-074.png
>
>
> A user report this bug in maillist, I paste the content here.
> We are in the process of upgrading from Flink 1.9.3 to 1.13.3.  We have 
> noticed that statements with either where UPPER(field) or LOWER(field) in 
> combination with an IN do not always evaluate correctly. 
>  
> The following test case highlights this problem.
>  
>  
> {code:java}
> import org.apache.flink.streaming.api.datastream.DataStream;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.table.api.Schema;
> import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
> public class TestCase {
> public static void main(String[] args) throws Exception {
> final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setParallelism(1);
> TestData testData = new TestData();
> testData.setField1("bcd");
> DataStream stream = env.fromElements(testData);
> stream.print();  // To prevent 'No operators' error
> final StreamTableEnvironment tableEnvironment = 
> StreamTableEnvironment.create(env);
> tableEnvironment.createTemporaryView("testTable", stream, 
> Schema.newBuilder().build());
> // Fails because abcd is larger than abc
> tableEnvironment.executeSql("select *, '1' as run from testTable 
> WHERE lower(field1) IN ('abcd', 'abc', 'bcd', 'cde')").print();
> // Succeeds because lower was removed
> tableEnvironment.executeSql("select *, '2' as run from testTable 
> WHERE field1 IN ('abcd', 'abc', 'bcd', 'cde')").print();
> // These 4 succeed because the smallest literal is before abcd
> tableEnvironment.executeSql("select *, '3' as run from testTable 
> WHERE lower(field1) IN ('abc', 'abcd', 'bcd', 'cde')").print();
> tableEnvironment.executeSql("select *, '4' as run from testTable 
> WHERE lower(field1) IN ('abc', 'bcd', 'abhi', 'cde')").print();
> tableEnvironment.executeSql("select *, '5' as run from testTable 
> WHERE lower(field1) IN ('cde', 'abcd', 'abc', 'bcd')").print();
> tableEnvironment.executeSql("select *, '6' as run from testTable 
> WHERE lower(field1) IN ('cde', 'abc', 'abcd', 'bcd')").print();
> // Fails because smallest is not first
> tableEnvironment.executeSql("select *, '7' as run from testTable 
> WHERE lower(field1) IN ('cdef', 'abce', 'abcd', 'ab', 'bcd')").print();
> // Succeeds
> tableEnvironment.executeSql("select *, '8' as run from testTable 
> WHERE lower(field1) IN ('ab', 'cdef', 'abce', 'abcdefgh', 'bcd')").print();
> env.execute("TestCase");
> }
> public static class TestData {
> private String field1;
> public String getField1() {
> return field1;
> }
> public void setField1(String field1) {
> this.field1 = field1;
> }
> }
> }
> {code}
>  
> The job produces the following output:
> Empty set
> +-+---++
> |op| field1|    run|
> +-+---++
> |+I|    bcd|  2|
> +-+---++
> 1 row in set
> +-+---++
> |op| field1|    run|
> +-+---++
> |+I|    bcd|  3|
> +-+---++
> 1 row in set
> +-+---++
> |op| field1|    run|
> +-+---++
> |+I|    bcd|  4|
> +-+---++
> 1 row in set
> +-+---++
> |op|   

[jira] [Updated] (FLINK-23015) Implement streaming window Deduplicate operator

2021-10-24 Thread Kurt Young (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-23015?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kurt Young updated FLINK-23015:
---
Fix Version/s: 1.15.0

> Implement streaming window Deduplicate operator
> ---
>
> Key: FLINK-23015
> URL: https://issues.apache.org/jira/browse/FLINK-23015
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Runtime
>Reporter: JING ZHANG
>Assignee: JING ZHANG
>Priority: Major
> Fix For: 1.15.0
>
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-21327) Support window TVF in batch mode

2021-10-24 Thread Kurt Young (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-21327?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kurt Young updated FLINK-21327:
---
Fix Version/s: 1.15.0

> Support window TVF in batch mode
> 
>
> Key: FLINK-21327
> URL: https://issues.apache.org/jira/browse/FLINK-21327
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Planner
>Reporter: Jark Wu
>Assignee: JING ZHANG
>Priority: Major
> Fix For: 1.15.0
>
>
> As a batch and streaming unified engine, we should also support to run window 
> TVF in batch mode. Then users can use one query with streaming mode to 
> produce data in real-time and use the same query with batch mode to backfill 
> data for a specific day.
> The implementation for batch should be straightforward and simple. We can 
> just introduce a physical and exec node for window TVF. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-23865) Class cast error caused by nested Pojo in legacy outputConversion

2021-10-21 Thread Kurt Young (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-23865?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kurt Young updated FLINK-23865:
---
Component/s: (was: Table SQL / Planner)
 Table SQL / API

> Class cast error caused by nested Pojo in legacy outputConversion
> -
>
> Key: FLINK-23865
> URL: https://issues.apache.org/jira/browse/FLINK-23865
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.13.2
>Reporter: zoucao
>Priority: Major
>
> code:
> {code:java}
> Table table = tbEnv.fromValues(DataTypes.ROW(
> DataTypes.FIELD("innerPojo", DataTypes.ROW(DataTypes.FIELD("c", 
> STRING(,
> DataTypes.FIELD("b", STRING()),
> DataTypes.FIELD("a", INT())),
> Row.of(Row.of("str-c"), "str-b", 1));
> DataStream pojoDataStream = tbEnv.toAppendStream(table, Pojo.class);
> -
> public static class Pojo{
> public InnerPojo innerPojo;
> public String b;
> public int a;
> public Pojo() {
> }
> }
> public static class InnerPojo {
> public String c;
> public InnerPojo() {
> }
> }{code}
> error:
> {code:java}
> java.lang.ClassCastException: org.apache.flink.table.types.logical.IntType 
> cannot be cast to org.apache.flink.table.types.logical.RowType
> java.lang.ClassCastException: org.apache.flink.table.types.logical.IntType 
> cannot be cast to org.apache.flink.table.types.logical.RowType at 
> org.apache.flink.table.planner.sinks.TableSinkUtils$$anonfun$1.apply(TableSinkUtils.scala:163)
>  at 
> org.apache.flink.table.planner.sinks.TableSinkUtils$$anonfun$1.apply(TableSinkUtils.scala:155)
> {code}
> The fields of PojoTypeInfo is in the alphabet order, such that in 
> `expandPojoTypeToSchema`, 'pojoType' and 'queryLogicalType' should have own 
> index,but now we use the pojo field index to get 'queryLogicalType', this 
> will casue the field type mismatch. It should be fixed like :
> {code:java}
> val queryIndex = queryLogicalType.getFieldIndex(name)
> val nestedLogicalType = 
> queryLogicalType.getFields()(queryIndex).getType.asInstanceOf[RowType]{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-18996) Avoid disorder for time interval join

2021-10-21 Thread Kurt Young (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-18996?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kurt Young updated FLINK-18996:
---
Fix Version/s: 1.15.0

> Avoid disorder for time interval join
> -
>
> Key: FLINK-18996
> URL: https://issues.apache.org/jira/browse/FLINK-18996
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Runtime
>Reporter: Benchao Li
>Priority: Critical
>  Labels: auto-deprioritized-major, stale-critical
> Fix For: 1.15.0
>
>
> Currently, the time interval join will produce data with rowtime later than 
> watermark. If we use the rowtime again in downstream, e.t. window 
> aggregation, we'll lose some data.
>  
> reported from user-zh: 
> [http://apache-flink.147419.n8.nabble.com/Re-flink-interval-join-tc4458.html#none]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-22826) flink sql1.13.1 causes data loss based on change log stream data join

2021-10-21 Thread Kurt Young (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22826?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kurt Young updated FLINK-22826:
---
Fix Version/s: 1.15.0

> flink sql1.13.1 causes data loss based on change log stream data join
> -
>
> Key: FLINK-22826
> URL: https://issues.apache.org/jira/browse/FLINK-22826
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.12.0, 1.13.1
>Reporter: 徐州州
>Priority: Minor
>  Labels: auto-deprioritized-major, stale-blocker
> Fix For: 1.15.0
>
>
> {code:java}
> insert into dwd_order_detail
> select
>ord.Id,
>ord.Code,
>Status
>  concat(cast(ord.Id as String),if(oed.Id is null,'oed_null',cast(oed.Id  
> as STRING)),DATE_FORMAT(LOCALTIMESTAMP,'-MM-dd'))  as uuids,
>  TO_DATE(DATE_FORMAT(LOCALTIMESTAMP,'-MM-dd')) as As_Of_Date
> from
> orders ord
> left join order_extend oed on  ord.Id=oed.OrderId and oed.IsDeleted=0 and 
> oed.CreateTime>CAST(DATE_FORMAT(LOCALTIMESTAMP,'-MM-dd') AS TIMESTAMP)
> where ( ord.OrderTime>CAST(DATE_FORMAT(LOCALTIMESTAMP,'-MM-dd') AS 
> TIMESTAMP)
> or ord.ReviewTime>CAST(DATE_FORMAT(LOCALTIMESTAMP,'-MM-dd') AS TIMESTAMP)
> or ord.RejectTime>CAST(DATE_FORMAT(LOCALTIMESTAMP,'-MM-dd') AS TIMESTAMP)
> ) and ord.IsDeleted=0;
> {code}
> My upsert-kafka table for PRIMARY KEY for uuids.
> This is the logic of my kafka based canal-json stream data join and write to 
> Upsert-kafka tables I confirm that version 1.12 also has this problem I just 
> upgraded from 1.12 to 1.13.
> I look up a user s order data and order number XJ0120210531004794 in 
> canal-json original table as U which is normal.
> {code:java}
> | +U | XJ0120210531004794 |  50 |
> | +U | XJ0120210531004672 |  50 |
> {code}
> But written to upsert-kakfa via join, the data consumed from upsert kafka is,
> {code:java}
> | +I | XJ0120210531004794 |  50 |
> | -U | XJ0120210531004794 |  50 |
> {code}
> The order is two records this sheet in orders and order_extend tables has not 
> changed since created -U status caused my data loss not computed and the 
> final result was wrong.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-17337) Send UPDATE messages instead of INSERT and DELETE in streaming join operator

2021-10-21 Thread Kurt Young (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-17337?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kurt Young updated FLINK-17337:
---
Fix Version/s: 1.15.0

> Send UPDATE messages instead of INSERT and DELETE in streaming join operator
> 
>
> Key: FLINK-17337
> URL: https://issues.apache.org/jira/browse/FLINK-17337
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Runtime
>Reporter: Jark Wu
>Priority: Minor
>  Labels: auto-deprioritized-major
> Fix For: 1.15.0
>
>
> Currently, streaming join operator always send INSERT and DELETE messages for 
> simplification if it's not inner join. However, we can send UPDATE_BEFORE and 
> UPDATE_AFTER messages instead of INSERT and DELETE. For example, when we 
> recieve right record "b", then we can send {{UB[a, null]}} and {{UA[a,b]}} 
> instead of {{D[a,null]}}, {{I[a,b]}}. This is an optimization, because UB can 
> be omitted in some cases to reduce IO cost and computation. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-23379) interval left join null value result out of order

2021-10-21 Thread Kurt Young (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-23379?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kurt Young updated FLINK-23379:
---
Fix Version/s: 1.15.0

>  interval left join null value result out of order
> --
>
> Key: FLINK-23379
> URL: https://issues.apache.org/jira/browse/FLINK-23379
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.12.2
>Reporter: waywtdcc
>Priority: Major
> Fix For: 1.15.0
>
> Attachments: image-2021-07-15-16-53-59-228.png
>
>
> * Scenes:
>  Person main table left interval join associated message information table,
>  The first record that is not associated with the message information table 
> will be later than the later record that is associated with the message 
> information table.
>  When there are normal output and null value output with the same primary 
> key, it will be out of order, and the null value output is later than the 
> normal value output, resulting in incorrect results
> enter:
> {"id": 1, "name":"chencc2", "message": "good boy2", "ts":"2021-03-26 
> 18:56:43"}
> {"id": 1, "name":"chencc2", "age": "28", "ts":"2021-03-26 19:02:47"}
> {"id": 1, "name":"chencc2", "message": "good boy3", "ts":"2021-03-26 
> 19:06:43"}
> {"id": 1, "name":"chencc2", "age": "27", "ts":"2021-03-26 19:06:47"}
> Output:
>  +I(chencc2,27,2021-03-26T19:06:47,good boy3,2021-03-26 19:06:43.000)
>  +I(chencc2,28,2021-03-26T19:02:47,null,null)
>  The time of the second record here is 19:02 earlier than the first record, 
> but the output of the result is late, causing data update errors
>  
>  *  code
> {code:java}
> tableEnv.executeSql("drop table if exists persons_table_kafka2");
>          String kafka_source_sql = "CREATE TABLE persons_table_kafka2 (\n" +
>                  "  `id` BIGINT,\n" +
>                  "  `name` STRING,\n" +
>                  "  `age` INT,\n" +
>                  "  proctime as PROCTIME(),\n" +
>                  "  `ts` TIMESTAMP(3),\n" +
>                  " WATERMARK FOR ts AS ts\n" +
>                  ") WITH (\n" +
>                  "  'connector' = 'kafka',\n" +
>                  "  'topic' = 'persons_test2',\n" +
>                  "  'properties.bootstrap.servers' = 'node2:6667',\n" +
>                  "  'properties.group.id' = 'testGroa115',\n" +
>                  "  'scan.startup.mode' = 'group-offsets',\n" +
>                  "  'format' = 'json'\n" +
>                  ")";
>          tableEnv.executeSql(kafka_source_sql);
>         tableEnv.executeSql("drop table if exists 
> persons_message_table_kafka2");
>          String kafka_source_sql2 = "CREATE TABLE 
> persons_message_table_kafka2 (\n" +
>                  "  `id` BIGINT,\n" +
>                  "  `name` STRING,\n" +
>                  "  `message` STRING,\n" +
>                  "  `ts` TIMESTAMP(3) ," +
>  //                " WATERMARK FOR ts AS ts - INTERVAL '5' SECOND\n" +
>                  " WATERMARK FOR ts AS ts\n" +
>                  ") WITH (\n" +
>                  "  'connector' = 'kafka',\n" +
>                  "  'topic' = 'persons_extra_message2',\n" +
>                  "  'properties.bootstrap.servers' = 'node2:6667',\n" +
>                  "  'properties.group.id' = 'testGroud2e313',\n" +
>                  "  'scan.startup.mode' = 'group-offsets',\n" +
>                  "  'format' = 'json'\n" +
>                  ")";
>          tableEnv.executeSql(kafka_source_sql2);
>         tableEnv.executeSql("" +
>                  "CREATE TEMPORARY VIEW result_data_view " +
>                  " as " +
>                  " select " +
>                  " t1.name, t1.age,t1.ts as ts1,t2.message, cast (t2.ts as 
> string) as ts2 " +
>                  " from persons_table_kafka2 t1 " +
>                  " left  join persons_message_table_kafka2 t2 on t1.name = 
> t2.name and t1.ts between " +
>                          " t2.ts and t2.ts +  INTERVAL '3' MINUTE"
>                  );
>         Table resultTable = tableEnv.from("result_data_view");
>          DataStream rowDataDataStream = 
> tableEnv.toAppendStream(resultTable, RowData.class);
>          rowDataDataStream.print();
> {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-23740) SQL Full Outer Join bug

2021-10-21 Thread Kurt Young (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-23740?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kurt Young updated FLINK-23740:
---
Fix Version/s: 1.15.0

> SQL Full Outer Join bug
> ---
>
> Key: FLINK-23740
> URL: https://issues.apache.org/jira/browse/FLINK-23740
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.13.1, 1.13.2
>Reporter: Fu Kai
>Priority: Critical
> Fix For: 1.15.0
>
>
> Hi team,
> We encountered an issue about FULL OUTER JOIN of Flink SQL, which happens 
> occasionally at very low probability that join output records cannot be 
> correctly updated. We cannot locate the root cause for now by glancing at the 
> SQL join logic in 
> [StreamingJoinOperator.|https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/stream/StreamingJoinOperator.java#L198]
>  It cannot be stably reproduced and it does happen with massive data volume.
> The reason we suspect it's the FULL OUER join problem instead of others like 
> LEFT OUTER join is because the issue only arises after we introduced FULL 
> OUTER into the join flow. The query we are using is like the following. The 
> are two join code pieces below, the fist one contains solely left join(though 
> with nested) and there is no issue detected; the second one contains both 
> left and full outer join(nested as well), and the problem is that sometimes 
> update from the left table A(and other tables before the full outer join 
> operator) cannot be reflected in the final output. We suspect it could be the 
> introduce of full outer join that caused the problem, although at a very low 
> probability(~10 out of ~30million). 
> The root cause of the bug could be something else, the suspecting of FULL OUT 
> join is based on the result of our current experiment and observation.
> {code:java}
> create table A(
> k1 int,
> k2 int,
> k3 int,
> k4 int,
> k5 int,
> PRIMARY KEY (k1, k2, k3, k4, k5) NOT ENFORCED
> ) WITH ();
> create table B(
> k1 int,
> k2 int,
> k3 int,
> PRIMARY KEY (k1, k2, k3) NOT ENFORCED
> ) WITH ();
> create table C(
> k1 int,
> k2 int,
> k3 int,
> PRIMARY KEY (k1, k2, k3) NOT ENFORCED
> ) WITH ();
> create table D(
> k1 int,
> k2 int,
> PRIMARY KEY (k1, k2) NOT ENFORCED
> ) WITH ();
> // query with left join, no issue detected
> select * from A 
> left outer join 
> (select * from B
> left outer join C
> on 
> B.k1 = C.k1
> B.k2 = C.k2
> B.k3 = C.k3
> ) as BC
> on
> A.k1 = BC.k1
> A.k2 = BC.k2
> A.k3 = BC.k3
> left outer join D
> on 
> A.k1 = D.k1
> A.k2 = D.k2
> ;
> // query with full outer join combined with left outer join, record updates 
> from left table A cannot be updated in the final output record some times
> select * from A 
> left outer join 
> (select * from B
> full outer join C
> on 
> B.k1 = C.k1
> B.k2 = C.k2
> B.k3 = C.k3
> ) as BC
> on
> A.k1 = BC.k1
> A.k2 = BC.k2
> A.k3 = BC.k3
> left outer join D
> on 
> A.k1 = D.k1
> A.k2 = D.k2
> ;
> {code}
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-23997) Improvement for SQL windowing table-valued function

2021-10-20 Thread Kurt Young (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-23997?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kurt Young updated FLINK-23997:
---
Fix Version/s: 1.15.0

> Improvement for SQL windowing table-valued function
> ---
>
> Key: FLINK-23997
> URL: https://issues.apache.org/jira/browse/FLINK-23997
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.15.0
>Reporter: JING ZHANG
>Priority: Major
> Fix For: 1.15.0
>
>
> This is an umbrella issue for follow up issues related with windowing 
> table-valued function.
> FLIP-145: 
> [https://cwiki.apache.org/confluence/display/FLINK/FLIP-145%3A+Support+SQL+windowing+table-valued+function#FLIP145:SupportSQLwindowingtablevaluedfunction-CumulatingWindows]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-24459) Performance improvement of file sink on Nexmark

2021-10-10 Thread Kurt Young (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-24459?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kurt Young updated FLINK-24459:
---
Fix Version/s: 1.15.0

> Performance improvement of file sink on Nexmark
> ---
>
> Key: FLINK-24459
> URL: https://issues.apache.org/jira/browse/FLINK-24459
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / FileSystem
>Affects Versions: 1.14.0
>Reporter: Alexander Trushev
>Assignee: Alexander Trushev
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.15.0
>
> Attachments: after.jfr.zip, after_cpu.png, after_mem.png, 
> before.jfr.zip, before_cpu.png, before_mem.png
>
>
> h3. Context
> {{PartitionPathUtils.escapePathName}} is a pretty simple method that takes 
> {{String}}, allocates {{StringBuilder}}, appends original or escaped chars, 
> and outputs the result {{String}}.
> Filesystem sink calls the method several times for each element to determine 
> bucket id. Because of this, it is a hot spot on a workload that writes 
> intensively to filesystem, such as [nexmark 
> q10|https://github.com/nexmark/nexmark/blob/master/nexmark-flink/src/main/resources/queries/q10.sql].
>  On my local machine escaping of chars takes 9.53% CPU and 17.8% mem 
> allocations of the whole TaskManager process.
> h3. Proposal
> {{PartitionPathUtils.escapePathName}} improvements
> # Use more efficient {{Integer.toHexString}} instead of {{String.format}}
> # Do not allocate new string when there is no escapable char in the original 
> string
> # Allocate {{StringBuilder}} depending on the original string length instead 
> of the default value
> h3. Benefit
> Experiment on local machine.
> 1 TaskManager with 6 slots. Job parallelism 6. Nexmark default configuration 
> + object reuse option.
> Before: flink-1.14.0
> After: flink-1.14.0 + patch with the improvements
> || Nexmark q10 || Before || After ||
> | CPU samples of escapePathName() (% of all) | 9.53 | 1.64 |
> | Memory allocations by escapePathName() (% of all) | 17.8 | 2.98 |
> | Throughput/Cores (K/s) | 107.64 | 119.42 |
> Diff: CPU *-7.89*%, Memory *-14.82*%, Throughput *+10.9*%
> Profiling reports are in the attachment.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (FLINK-24459) Performance improvement of file sink on Nexmark

2021-10-10 Thread Kurt Young (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-24459?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kurt Young closed FLINK-24459.
--
Resolution: Fixed

merged: 624372376a09df53e23bb615e5293a6caa296c47

> Performance improvement of file sink on Nexmark
> ---
>
> Key: FLINK-24459
> URL: https://issues.apache.org/jira/browse/FLINK-24459
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / FileSystem
>Affects Versions: 1.14.0
>Reporter: Alexander Trushev
>Assignee: Alexander Trushev
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.15.0
>
> Attachments: after.jfr.zip, after_cpu.png, after_mem.png, 
> before.jfr.zip, before_cpu.png, before_mem.png
>
>
> h3. Context
> {{PartitionPathUtils.escapePathName}} is a pretty simple method that takes 
> {{String}}, allocates {{StringBuilder}}, appends original or escaped chars, 
> and outputs the result {{String}}.
> Filesystem sink calls the method several times for each element to determine 
> bucket id. Because of this, it is a hot spot on a workload that writes 
> intensively to filesystem, such as [nexmark 
> q10|https://github.com/nexmark/nexmark/blob/master/nexmark-flink/src/main/resources/queries/q10.sql].
>  On my local machine escaping of chars takes 9.53% CPU and 17.8% mem 
> allocations of the whole TaskManager process.
> h3. Proposal
> {{PartitionPathUtils.escapePathName}} improvements
> # Use more efficient {{Integer.toHexString}} instead of {{String.format}}
> # Do not allocate new string when there is no escapable char in the original 
> string
> # Allocate {{StringBuilder}} depending on the original string length instead 
> of the default value
> h3. Benefit
> Experiment on local machine.
> 1 TaskManager with 6 slots. Job parallelism 6. Nexmark default configuration 
> + object reuse option.
> Before: flink-1.14.0
> After: flink-1.14.0 + patch with the improvements
> || Nexmark q10 || Before || After ||
> | CPU samples of escapePathName() (% of all) | 9.53 | 1.64 |
> | Memory allocations by escapePathName() (% of all) | 17.8 | 2.98 |
> | Throughput/Cores (K/s) | 107.64 | 119.42 |
> Diff: CPU *-7.89*%, Memory *-14.82*%, Throughput *+10.9*%
> Profiling reports are in the attachment.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-24459) Performance improvement of file sink on Nexmark

2021-10-10 Thread Kurt Young (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-24459?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kurt Young updated FLINK-24459:
---
Affects Version/s: 1.14.0

> Performance improvement of file sink on Nexmark
> ---
>
> Key: FLINK-24459
> URL: https://issues.apache.org/jira/browse/FLINK-24459
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / FileSystem
>Affects Versions: 1.14.0
>Reporter: Alexander Trushev
>Assignee: Alexander Trushev
>Priority: Minor
>  Labels: pull-request-available
> Attachments: after.jfr.zip, after_cpu.png, after_mem.png, 
> before.jfr.zip, before_cpu.png, before_mem.png
>
>
> h3. Context
> {{PartitionPathUtils.escapePathName}} is a pretty simple method that takes 
> {{String}}, allocates {{StringBuilder}}, appends original or escaped chars, 
> and outputs the result {{String}}.
> Filesystem sink calls the method several times for each element to determine 
> bucket id. Because of this, it is a hot spot on a workload that writes 
> intensively to filesystem, such as [nexmark 
> q10|https://github.com/nexmark/nexmark/blob/master/nexmark-flink/src/main/resources/queries/q10.sql].
>  On my local machine escaping of chars takes 9.53% CPU and 17.8% mem 
> allocations of the whole TaskManager process.
> h3. Proposal
> {{PartitionPathUtils.escapePathName}} improvements
> # Use more efficient {{Integer.toHexString}} instead of {{String.format}}
> # Do not allocate new string when there is no escapable char in the original 
> string
> # Allocate {{StringBuilder}} depending on the original string length instead 
> of the default value
> h3. Benefit
> Experiment on local machine.
> 1 TaskManager with 6 slots. Job parallelism 6. Nexmark default configuration 
> + object reuse option.
> Before: flink-1.14.0
> After: flink-1.14.0 + patch with the improvements
> || Nexmark q10 || Before || After ||
> | CPU samples of escapePathName() (% of all) | 9.53 | 1.64 |
> | Memory allocations by escapePathName() (% of all) | 17.8 | 2.98 |
> | Throughput/Cores (K/s) | 107.64 | 119.42 |
> Diff: CPU *-7.89*%, Memory *-14.82*%, Throughput *+10.9*%
> Profiling reports are in the attachment.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-24459) Performance improvement of file sink on Nexmark

2021-10-10 Thread Kurt Young (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-24459?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17426946#comment-17426946
 ] 

Kurt Young commented on FLINK-24459:


Very nice improvement! [~trushev]

> Performance improvement of file sink on Nexmark
> ---
>
> Key: FLINK-24459
> URL: https://issues.apache.org/jira/browse/FLINK-24459
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / FileSystem
>Reporter: Alexander Trushev
>Assignee: Alexander Trushev
>Priority: Minor
>  Labels: pull-request-available
> Attachments: after.jfr.zip, after_cpu.png, after_mem.png, 
> before.jfr.zip, before_cpu.png, before_mem.png
>
>
> h3. Context
> {{PartitionPathUtils.escapePathName}} is a pretty simple method that takes 
> {{String}}, allocates {{StringBuilder}}, appends original or escaped chars, 
> and outputs the result {{String}}.
> Filesystem sink calls the method several times for each element to determine 
> bucket id. Because of this, it is a hot spot on a workload that writes 
> intensively to filesystem, such as [nexmark 
> q10|https://github.com/nexmark/nexmark/blob/master/nexmark-flink/src/main/resources/queries/q10.sql].
>  On my local machine escaping of chars takes 9.53% CPU and 17.8% mem 
> allocations of the whole TaskManager process.
> h3. Proposal
> {{PartitionPathUtils.escapePathName}} improvements
> # Use more efficient {{Integer.toHexString}} instead of {{String.format}}
> # Do not allocate new string when there is no escapable char in the original 
> string
> # Allocate {{StringBuilder}} depending on the original string length instead 
> of the default value
> h3. Benefit
> Experiment on local machine.
> 1 TaskManager with 6 slots. Job parallelism 6. Nexmark default configuration 
> + object reuse option.
> Before: flink-1.14.0
> After: flink-1.14.0 + patch with the improvements
> || Nexmark q10 || Before || After ||
> | CPU samples of escapePathName() (% of all) | 9.53 | 1.64 |
> | Memory allocations by escapePathName() (% of all) | 17.8 | 2.98 |
> | Throughput/Cores (K/s) | 107.64 | 119.42 |
> Diff: CPU *-7.89*%, Memory *-14.82*%, Throughput *+10.9*%
> Profiling reports are in the attachment.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-24013) Add IssueNavigationLink for IDEA

2021-09-09 Thread Kurt Young (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-24013?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17412400#comment-17412400
 ] 

Kurt Young commented on FLINK-24013:


would you like to open a PR and I can help reviewing [~chesnay]

> Add IssueNavigationLink for IDEA
> 
>
> Key: FLINK-24013
> URL: https://issues.apache.org/jira/browse/FLINK-24013
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Affects Versions: 1.14.0
>Reporter: gaoyajun02
>Assignee: gaoyajun02
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.14.0
>
>
> just like SPARK-35223



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-24013) Add IssueNavigationLink for IDEA

2021-09-01 Thread Kurt Young (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-24013?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17408054#comment-17408054
 ] 

Kurt Young commented on FLINK-24013:


I see no harm to adjust the xml to your format, at least my IDEA also doesn't 
complain after I changed my vcs.xml to yours.

> Add IssueNavigationLink for IDEA
> 
>
> Key: FLINK-24013
> URL: https://issues.apache.org/jira/browse/FLINK-24013
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Affects Versions: 1.14.0
>Reporter: gaoyajun02
>Assignee: gaoyajun02
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.14.0
>
>
> just like SPARK-35223



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-24013) Add IssueNavigationLink for IDEA

2021-09-01 Thread Kurt Young (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-24013?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17408036#comment-17408036
 ] 

Kurt Young commented on FLINK-24013:


Found 3 differences:
 # no license header. If this is fine with rat plugin, we can also remove it
 # an indent issue, we can adjust it too
 # an extra line: ``, I have no idea where does it come from, do you have any clues?

> Add IssueNavigationLink for IDEA
> 
>
> Key: FLINK-24013
> URL: https://issues.apache.org/jira/browse/FLINK-24013
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Affects Versions: 1.14.0
>Reporter: gaoyajun02
>Assignee: gaoyajun02
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.14.0
>
>
> just like SPARK-35223



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-24013) Add IssueNavigationLink for IDEA

2021-09-01 Thread Kurt Young (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-24013?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17408008#comment-17408008
 ] 

Kurt Young commented on FLINK-24013:


What does it look like after been modified automatically?

> Add IssueNavigationLink for IDEA
> 
>
> Key: FLINK-24013
> URL: https://issues.apache.org/jira/browse/FLINK-24013
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Affects Versions: 1.14.0
>Reporter: gaoyajun02
>Assignee: gaoyajun02
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.14.0
>
>
> just like SPARK-35223



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-24013) Add IssueNavigationLink for IDEA

2021-09-01 Thread Kurt Young (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-24013?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17407995#comment-17407995
 ] 

Kurt Young commented on FLINK-24013:


Mine works well. Did you have any personal configure with vcs.xml?

> Add IssueNavigationLink for IDEA
> 
>
> Key: FLINK-24013
> URL: https://issues.apache.org/jira/browse/FLINK-24013
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Affects Versions: 1.14.0
>Reporter: gaoyajun02
>Assignee: gaoyajun02
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.14.0
>
>
> just like SPARK-35223



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-24054) Let SinkUpsertMaterializer emit +U instead of only +I

2021-08-31 Thread Kurt Young (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-24054?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17407735#comment-17407735
 ] 

Kurt Young commented on FLINK-24054:


You opinions also make sense in some cases. That's exactly what I tried to mean 
at the first place, whether to improve this issue seems more discussion and 
thoughts. I didn't understand why we category this one into a bug immediately 
and marked it as release blocker. 

> Let SinkUpsertMaterializer emit +U instead of only +I
> -
>
> Key: FLINK-24054
> URL: https://issues.apache.org/jira/browse/FLINK-24054
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.14.0
>
>
> Currently, {{SinkUpsertMaterializer}} is not able to emit +U's but will 
> always emit +I's. Thus, resulting changelogs are incorrect strictly speaking 
> and only valid when treating +U and +I as similar changes in downstream 
> operators.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-24054) Let SinkUpsertMaterializer emit +U instead of only +I

2021-08-31 Thread Kurt Young (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-24054?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17407218#comment-17407218
 ] 

Kurt Young commented on FLINK-24054:


The example you gave is test, whose behavior could be changed & adopted easily. 

I think we need to think more with actual external systems, just like you said, 
kafka sink and others. 

> Let SinkUpsertMaterializer emit +U instead of only +I
> -
>
> Key: FLINK-24054
> URL: https://issues.apache.org/jira/browse/FLINK-24054
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Blocker
> Fix For: 1.14.0
>
>
> Currently, {{SinkUpsertMaterializer}} is not able to emit +U's but will 
> always emit +I's. Thus, resulting changelogs are incorrect strictly speaking 
> and only valid when treating +U and +I as similar changes in downstream 
> operators.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-24054) Let SinkUpsertMaterializer emit +U instead of only +I

2021-08-31 Thread Kurt Young (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-24054?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17407171#comment-17407171
 ] 

Kurt Young commented on FLINK-24054:


In most cases, +I and -D are easier to handle, and are equivalant with +U and -U

Is it worth to treat this as blocker bug? I'm not even sure if this is a bug.

> Let SinkUpsertMaterializer emit +U instead of only +I
> -
>
> Key: FLINK-24054
> URL: https://issues.apache.org/jira/browse/FLINK-24054
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Blocker
> Fix For: 1.14.0
>
>
> Currently, {{SinkUpsertMaterializer}} is not able to emit +U's but will 
> always emit +I's. Thus, resulting changelogs are incorrect strictly speaking 
> and only valid when treating +U and +I as similar changes in downstream 
> operators.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (FLINK-24013) Add IssueNavigationLink for IDEA

2021-08-27 Thread Kurt Young (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-24013?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kurt Young closed FLINK-24013.
--
Resolution: Fixed

merged via f2b0e4345564f420ff652f757f716426cb538d45

Thanks for the contribution [~gaoyajun02]

> Add IssueNavigationLink for IDEA
> 
>
> Key: FLINK-24013
> URL: https://issues.apache.org/jira/browse/FLINK-24013
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Affects Versions: 1.14.0
>Reporter: gaoyajun02
>Assignee: gaoyajun02
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.14.0
>
>
> just like SPARK-35223



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (FLINK-24013) Add IssueNavigationLink for IDEA

2021-08-27 Thread Kurt Young (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-24013?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kurt Young reassigned FLINK-24013:
--

Assignee: gaoyajun02

> Add IssueNavigationLink for IDEA
> 
>
> Key: FLINK-24013
> URL: https://issues.apache.org/jira/browse/FLINK-24013
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Affects Versions: 1.14.0
>Reporter: gaoyajun02
>Assignee: gaoyajun02
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.14.0
>
>
> just like SPARK-35223



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-24013) Add IssueNavigationLink for IDEA

2021-08-27 Thread Kurt Young (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-24013?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kurt Young updated FLINK-24013:
---
Fix Version/s: (was: 1.14.1)
   1.14.0

> Add IssueNavigationLink for IDEA
> 
>
> Key: FLINK-24013
> URL: https://issues.apache.org/jira/browse/FLINK-24013
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Affects Versions: 1.14.0
>Reporter: gaoyajun02
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.14.0
>
>
> just like SPARK-35223



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-24013) Add IssueNavigationLink for IDEA

2021-08-27 Thread Kurt Young (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-24013?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kurt Young updated FLINK-24013:
---
Affects Version/s: (was: 1.14.1)
   1.14.0

> Add IssueNavigationLink for IDEA
> 
>
> Key: FLINK-24013
> URL: https://issues.apache.org/jira/browse/FLINK-24013
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Affects Versions: 1.14.0
>Reporter: gaoyajun02
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.14.1
>
>
> just like SPARK-35223



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-24013) Add IssueNavigationLink for IDEA

2021-08-27 Thread Kurt Young (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-24013?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17405608#comment-17405608
 ] 

Kurt Young commented on FLINK-24013:


Good idea, do you want to open a PR for this? [~gaoyajun02]

> Add IssueNavigationLink for IDEA
> 
>
> Key: FLINK-24013
> URL: https://issues.apache.org/jira/browse/FLINK-24013
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Affects Versions: 1.14.1
>Reporter: gaoyajun02
>Priority: Minor
> Fix For: 1.14.1
>
>
> just like SPARK-35223



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-22994) Improve the performance of invoking nesting udf

2021-06-15 Thread Kurt Young (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22994?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kurt Young updated FLINK-22994:
---
Component/s: (was: Table SQL / Planner)
 Table SQL / Runtime

> Improve the performance of invoking nesting udf
> ---
>
> Key: FLINK-22994
> URL: https://issues.apache.org/jira/browse/FLINK-22994
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Runtime
>Affects Versions: 1.12.4
> Environment: h5.  
>Reporter: lynn1.zhang
>Priority: Major
>  Labels: pull-request-available
> Attachments: image-2021-06-15-15-18-12-619.png, 
> image-2021-06-15-15-19-01-103.png, image-2021-06-15-15-27-26-739.png, 
> image-2021-06-15-15-28-28-137.png, image-2021-06-15-15-29-09-773.png, 
> image-2021-06-15-15-30-14-775.png, image-2021-06-15-15-42-08-065.png
>
>
> h1. BackGround
> Flink maintain the udf result as BinaryData, like BinaryStringData. When 
> invoking nesting udf like select useless(int_ip_2_string(ip)), the result of 
> int_ip_2_string(ip) will be serialization and deserialization.
> Below is the Generated Code
> !image-2021-06-15-15-18-12-619.png!   This issue will improve it as below
> !image-2021-06-15-15-19-01-103.png!
> h1. Performance Compare
> Condition: Source = Kafka, Schema = PB with snappy; Flink Slot = 1; 
> taskmanager.memory.process.size=4g; Linux Core = Intel(R) Xeon(R) Gold 5218 
> CPU @ 2.30GHz
>  UDF Introduction:
>  * ipip:  input: int ip, output: map ip_info, map size = 14.
>  * ip_2_country: input map ip_info, output: string country.
>  * ip_2_region: input  map ip_info, output: string region.
>  * ip_2_isp_domain: input  map ip_info, output: string isp.
>  * ip_2_timezone: input map ip_info, output: string timezone.
> h5. The throughput without udf invoke: 764.50 k/s
> !image-2021-06-15-15-27-26-739.png!
> h5. The throughput with udf invoke: 183.24 k/s
> !image-2021-06-15-15-42-08-065.png!
> h5. The throughput with udf nesting invoke: 41.42 k/s
> !image-2021-06-15-15-29-09-773.png!
> h5. The throughput with udf nesting invoke after this issue: 174.41 k/s
> !image-2021-06-15-15-30-14-775.png!
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-22015) SQL filter containing OR and IS NULL will produce an incorrect result.

2021-06-11 Thread Kurt Young (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22015?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kurt Young updated FLINK-22015:
---
Fix Version/s: 1.12.5

> SQL filter containing OR and IS NULL will produce an incorrect result.
> --
>
> Key: FLINK-22015
> URL: https://issues.apache.org/jira/browse/FLINK-22015
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.13.0
>Reporter: Caizhi Weng
>Assignee: Caizhi Weng
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.13.0, 1.12.5
>
>
> Add the following test case to {{CalcITCase}} to reproduce this bug.
> {code:scala}
> @Test
> def myTest(): Unit = {
>   checkResult(
> """
>   |WITH myView AS (SELECT a, CASE
>   |  WHEN a = 1 THEN '1'
>   |  ELSE CAST(NULL AS STRING)
>   |  END AS s
>   |FROM SmallTable3)
>   |SELECT a FROM myView WHERE s = '2' OR s IS NULL
>   |""".stripMargin,
> Seq(row(2), row(3)))
> }
> {code}
> However if we remove the {{s = '2'}} the result will be correct.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-22015) SQL filter containing OR and IS NULL will produce an incorrect result.

2021-06-11 Thread Kurt Young (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22015?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17361592#comment-17361592
 ] 

Kurt Young commented on FLINK-22015:


also fixed in 1.12.5: faf7cc43beebce3fee528ec5637e9387b95bec99

> SQL filter containing OR and IS NULL will produce an incorrect result.
> --
>
> Key: FLINK-22015
> URL: https://issues.apache.org/jira/browse/FLINK-22015
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.13.0
>Reporter: Caizhi Weng
>Assignee: Caizhi Weng
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.13.0, 1.12.5
>
>
> Add the following test case to {{CalcITCase}} to reproduce this bug.
> {code:scala}
> @Test
> def myTest(): Unit = {
>   checkResult(
> """
>   |WITH myView AS (SELECT a, CASE
>   |  WHEN a = 1 THEN '1'
>   |  ELSE CAST(NULL AS STRING)
>   |  END AS s
>   |FROM SmallTable3)
>   |SELECT a FROM myView WHERE s = '2' OR s IS NULL
>   |""".stripMargin,
> Seq(row(2), row(3)))
> }
> {code}
> However if we remove the {{s = '2'}} the result will be correct.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (FLINK-22472) The real partition data produced time is behind meta(_SUCCESS) file produced

2021-05-25 Thread Kurt Young (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22472?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kurt Young reassigned FLINK-22472:
--

Assignee: luoyuxia

> The real partition data produced time is behind meta(_SUCCESS) file produced
> 
>
> Key: FLINK-22472
> URL: https://issues.apache.org/jira/browse/FLINK-22472
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / FileSystem, Connectors / Hive
>Reporter: Leonard Xu
>Assignee: luoyuxia
>Priority: Major
> Attachments: image-2021-05-25-14-27-40-563.png
>
>
> I test write some data to csv file by flink filesystem connector, but after 
> the success file produced, the data file is still un-committed, it's very 
> weird to me.
> {code:java}
> bang@mac db1.db $ll 
> /var/folders/55/cw682b314gn8jhfh565hp7q0gp/T/junit8642959834366044048/junit484868942580135598/test-partition-time-commit/d\=2020-05-03/e\=12/
> total 8
> drwxr-xr-x  4 bang  staff  128  4 25 19:57 ./
> drwxr-xr-x  8 bang  staff  256  4 25 19:57 ../
> -rw-r--r--  1 bang  staff   12  4 25 19:57 
> .part-b703d4b9-067a-4dfe-935e-3afc723aed56-0-4.inprogress.b7d9cf09-0f72-4dce-8591-b61b1d23ae9b
> -rw-r--r--  1 bang  staff0  4 25 19:57 _MY_SUCCESS
> {code}
>  
> After some debug I found I have to set  {{sink.rolling-policy.file-size}} or 
> {{sink.rolling-policy.rollover-interval parameters, the default value of the 
> two parameters is pretty big(128M and 30min). It's not convenient for 
> test/demo. I think we can improve this.}}
>  
> As the doc[1] described, for row formats (csv, json), you can set the 
> parameter {{sink.rolling-policy.file-size}} or 
> {{sink.rolling-policy.rollover-interval}} in the connector properties and 
> parameter {{execution.checkpointing.interval}} in flink-conf.yaml together if 
> you don’t want to wait a long period before observe the data exists in file 
> system.
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-master/docs/connectors/table/filesystem/#rolling-policy



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-22472) The real partition data produced time is behind meta(_SUCCESS) file produced

2021-05-25 Thread Kurt Young (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22472?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17350842#comment-17350842
 ] 

Kurt Young commented on FLINK-22472:


[~luoyuxia] would you like to take this issue?

> The real partition data produced time is behind meta(_SUCCESS) file produced
> 
>
> Key: FLINK-22472
> URL: https://issues.apache.org/jira/browse/FLINK-22472
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / FileSystem, Connectors / Hive
>Reporter: Leonard Xu
>Priority: Major
> Attachments: image-2021-05-25-14-27-40-563.png
>
>
> I test write some data to csv file by flink filesystem connector, but after 
> the success file produced, the data file is still un-committed, it's very 
> weird to me.
> {code:java}
> bang@mac db1.db $ll 
> /var/folders/55/cw682b314gn8jhfh565hp7q0gp/T/junit8642959834366044048/junit484868942580135598/test-partition-time-commit/d\=2020-05-03/e\=12/
> total 8
> drwxr-xr-x  4 bang  staff  128  4 25 19:57 ./
> drwxr-xr-x  8 bang  staff  256  4 25 19:57 ../
> -rw-r--r--  1 bang  staff   12  4 25 19:57 
> .part-b703d4b9-067a-4dfe-935e-3afc723aed56-0-4.inprogress.b7d9cf09-0f72-4dce-8591-b61b1d23ae9b
> -rw-r--r--  1 bang  staff0  4 25 19:57 _MY_SUCCESS
> {code}
>  
> After some debug I found I have to set  {{sink.rolling-policy.file-size}} or 
> {{sink.rolling-policy.rollover-interval parameters, the default value of the 
> two parameters is pretty big(128M and 30min). It's not convenient for 
> test/demo. I think we can improve this.}}
>  
> As the doc[1] described, for row formats (csv, json), you can set the 
> parameter {{sink.rolling-policy.file-size}} or 
> {{sink.rolling-policy.rollover-interval}} in the connector properties and 
> parameter {{execution.checkpointing.interval}} in flink-conf.yaml together if 
> you don’t want to wait a long period before observe the data exists in file 
> system.
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-master/docs/connectors/table/filesystem/#rolling-policy



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-15440) Enable savepoint support for Table & SQL program

2021-05-14 Thread Kurt Young (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15440?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17344371#comment-17344371
 ] 

Kurt Young commented on FLINK-15440:


{quote}if I change the SQL/Table query structure I will not be able to restore 
from the savepoint anymore (even if the change could have been compatible), am 
I correct ?
{quote}
Yes, you are right. [~Mathieu Druart]

> Enable savepoint support for Table & SQL program
> 
>
> Key: FLINK-15440
> URL: https://issues.apache.org/jira/browse/FLINK-15440
> Project: Flink
>  Issue Type: New Feature
>  Components: Table SQL / API
>Reporter: Kurt Young
>Priority: Major
>  Labels: stale-major
> Fix For: 1.13.0
>
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-15440) Enable savepoint support for Table & SQL program

2021-05-13 Thread Kurt Young (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15440?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17344224#comment-17344224
 ] 

Kurt Young commented on FLINK-15440:


[~Mathieu Druart] IIRC this issue can be addressed by setting savepoint path 
before submitting SQL job, and make the job continue with last savepoint. 

Setting uid for SQL operators might be another issue which IMO is not under 
this one's scope. 

> Enable savepoint support for Table & SQL program
> 
>
> Key: FLINK-15440
> URL: https://issues.apache.org/jira/browse/FLINK-15440
> Project: Flink
>  Issue Type: New Feature
>  Components: Table SQL / API
>Reporter: Kurt Young
>Priority: Major
>  Labels: stale-major
> Fix For: 1.13.0
>
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-22632) CatalogFunctionImpl.isGeneric should use ContextClassLoader

2021-05-11 Thread Kurt Young (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22632?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kurt Young updated FLINK-22632:
---
Component/s: (was: Table SQL / API)
 Connectors / Hive

> CatalogFunctionImpl.isGeneric should use ContextClassLoader
> ---
>
> Key: FLINK-22632
> URL: https://issues.apache.org/jira/browse/FLINK-22632
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Hive
>Affects Versions: 1.12.2, 1.12.3
>Reporter: Adrian Zhong
>Priority: Major
>  Labels: easyfix
> Fix For: 1.13.0
>
>   Original Estimate: 0.5h
>  Remaining Estimate: 0.5h
>
> Hello, community.
> I'm using Hive catalog, I'm trying to load UDF through HTTP.
> I registered a UDF, then I submited a FlinkSQL job with command line:
> {code:java}
> flink run ... -C "http://someHost/plusTwoFunc.jar; ...{code}
> When the job started, it throws an Exception:(:
> {code:java}
> Caused by: java.lang.ClassNotFoundException: com.slankka.flink.udf.PlusTwoFunc
>   at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>   at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>   at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
>   at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>   at java.lang.Class.forName0(Native Method)
>   at java.lang.Class.forName(Class.java:264)
>   at 
> org.apache.flink.table.catalog.CatalogFunctionImpl.isGeneric(CatalogFunctionImpl.java:72)
> {code}
> I know that user code is loaded by {{SafetyNetWrapperClassLoader}} . and 
> {{FlinkUserCodeClassLoader}} , {{SafetyNetWrapperClassLoader}} is a facade 
> classloader, {{FlinkUserCodeClassLoader}} is the real classloader whose 
> classpath contains the jar dependency, but it failed to load udf in 
> {{CatalogFunctionImpl}}
> {code:java}
> @Override
> public boolean isGeneric() {
> if (functionLanguage == FunctionLanguage.PYTHON) {
> return true;
> }
> try {
> Class c = Class.forName(className); //ClassNotFound
> if (UserDefinedFunction.class.isAssignableFrom(c)) {
> return true;
> }
> } catch (ClassNotFoundException e) {
> throw new RuntimeException(String.format("Can't resolve udf class 
> %s", className), e);
> }
> return false;
> }
> {code}
>  When I change to:
> {code:java}
> Thread.currentThread().getContextClassLoader().loadClass(className);
> {code}
> Then it works fine.
> My proposal is CatalogFunctionImpl.isGeneric should using ContextClassloader, 
> that is {{FlinkUserCodeClassLoader}} proxied by 
> {{SafetyNetWrapperClassLoader}}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (FLINK-15376) support "CREATE TABLE AS" in Flink SQL

2021-04-30 Thread Kurt Young (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15376?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kurt Young reassigned FLINK-15376:
--

Assignee: Danny Chen

> support "CREATE TABLE AS" in Flink SQL
> --
>
> Key: FLINK-15376
> URL: https://issues.apache.org/jira/browse/FLINK-15376
> Project: Flink
>  Issue Type: New Feature
>  Components: Table SQL / API
>Reporter: Bowen Li
>Assignee: Danny Chen
>Priority: Major
>  Labels: auto-unassigned
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-11864) Let compressed channel reader/writer reuse the logic of AsynchronousFileIOChannel

2021-04-29 Thread Kurt Young (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-11864?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kurt Young updated FLINK-11864:
---
Component/s: (was: Runtime / Task)
 Table SQL / Runtime

> Let compressed channel reader/writer reuse the logic of 
> AsynchronousFileIOChannel
> -
>
> Key: FLINK-11864
> URL: https://issues.apache.org/jira/browse/FLINK-11864
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Runtime
>Reporter: Kurt Young
>Priority: Minor
>  Labels: auto-deprioritized-major
>
> This is a follow up issue of 
> [Flink-11863|https://issues.apache.org/jira/browse/FLINK-11863]. The 
> introduced `CompressedBlockChannelReader` and `CompressedBlockChannelWriter` 
> should reuse the logic of `AsynchronousFileIOChannel` by extending from it. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (FLINK-4864) Shade Calcite dependency in flink-table

2021-04-29 Thread Kurt Young (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-4864?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kurt Young closed FLINK-4864.
-
  Assignee: Jark Wu
Resolution: Fixed

> Shade Calcite dependency in flink-table
> ---
>
> Key: FLINK-4864
> URL: https://issues.apache.org/jira/browse/FLINK-4864
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / API
>Affects Versions: 1.2.0
>Reporter: Fabian Hueske
>Assignee: Jark Wu
>Priority: Minor
>  Labels: auto-deprioritized-major
>
> The Table API has a dependency on Apache Calcite.
> A user reported to have version conflicts when having a own Calcite 
> dependency in the classpath.
> The solution would be to shade away the Calcite dependency (Calcite's 
> transitive dependencies are already shaded).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-21923) SplitAggregateRule will be abnormal, when the sum/count and avg in SQL at the same time

2021-04-29 Thread Kurt Young (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-21923?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kurt Young updated FLINK-21923:
---
Fix Version/s: (was: 1.13.0)
   1.13.1

> SplitAggregateRule will be abnormal, when the sum/count and avg in SQL at the 
> same time
> ---
>
> Key: FLINK-21923
> URL: https://issues.apache.org/jira/browse/FLINK-21923
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.10.0
>Reporter: tartarus
>Assignee: tartarus
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.14.0, 1.13.1
>
>
> SplitAggregateRule optimizes one-layer aggregation to two-layer aggregation 
> to improve computing performance under data skew.
> In the partial phase, avg will be translated into count and sum. If count 
> already exists in the original SQL at this time, the engine will remove the 
> duplicate count, and then add Project to calculate and restore the optimized 
> count result value.
> {code:java}
> relBuilder.aggregate(
>   relBuilder.groupKey(fullGroupSet, 
> ImmutableList.of[ImmutableBitSet](fullGroupSet)),
>   newPartialAggCalls)
> relBuilder.peek().asInstanceOf[FlinkLogicalAggregate]
>   .setPartialFinalType(PartialFinalType.PARTIAL)
> {code}
> so `relBuilder.peek()` will return `FlinkLogicalCalc` not 
> `FlinkLogicalAggregate`,
> then will throw exception like 
> {code:java}
> java.lang.ClassCastException: 
> org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalCalc cannot be 
> cast to 
> org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalAggregate
>   at 
> org.apache.flink.table.planner.plan.rules.logical.SplitAggregateRule.onMatch(SplitAggregateRule.scala:286)
>   at 
> org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:333)
>   at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:542)
>   at 
> org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:407)
>   at 
> org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:243)
>   at 
> org.apache.calcite.plan.hep.HepInstruction$RuleInstance.execute(HepInstruction.java:127)
>   at 
> org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:202)
>   at 
> org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:189)
>   at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkHepProgram.optimize(FlinkHepProgram.scala:69)
>   at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgram.optimize(FlinkHepRuleSetProgram.scala:87)
>   at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
>   at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
>   at 
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
>   at 
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
>   at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>   at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>   at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>   at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>   at 
> scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
>   at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
>   at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57)
>   at 
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:163)
>   at 
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:79)
>   at 
> org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:284)
>   at 
> org.apache.flink.table.planner.utils.TableTestUtilBase.assertPlanEquals(TableTestBase.scala:889)
>   at 
> org.apache.flink.table.planner.utils.TableTestUtilBase.doVerifyPlan(TableTestBase.scala:780)
>   at 
> org.apache.flink.table.planner.utils.TableTestUtilBase.verifyPlan(TableTestBase.scala:283)
>   at 
> org.apache.flink.table.planner.plan.rules.logical.SplitAggregateRuleTest.testAggWithFilterClause2(SplitAggregateRuleTest.scala:205)
> {code}
> We can reproduce 

[jira] [Updated] (FLINK-22463) IllegalArgumentException is thrown in WindowAttachedWindowingStrategy when two phase is enabled for distinct agg

2021-04-29 Thread Kurt Young (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22463?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kurt Young updated FLINK-22463:
---
Fix Version/s: (was: 1.13.0)
   1.13.1

> IllegalArgumentException is thrown in WindowAttachedWindowingStrategy when 
> two phase is enabled for distinct agg
> 
>
> Key: FLINK-22463
> URL: https://issues.apache.org/jira/browse/FLINK-22463
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.13.0
>Reporter: godfrey he
>Assignee: godfrey he
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.14.0, 1.13.1
>
>
> Caused by: java.lang.IllegalArgumentException
>   at 
> org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:122)
>   at 
> org.apache.flink.table.planner.plan.logical.WindowAttachedWindowingStrategy.(WindowAttachedWindowingStrategy.java:51)
>   at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
>   at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>   at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
> The reason is the {{windowStart}} may be {{-1}} when two phase is enabled for 
> distinct agg, see 
> [TwoStageOptimizedWindowAggregateRule.java#L143|https://github.com/apache/flink/blob/a3363b91b144edfbae5ab114984ded622d3f8fbc/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/TwoStageOptimizedWindowAggregateRule.java#L143]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-21247) flink iceberg table map cannot convert to datastream

2021-04-29 Thread Kurt Young (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-21247?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kurt Young updated FLINK-21247:
---
Fix Version/s: (was: 1.13.0)
   1.13.1
   1.14.0

> flink iceberg table map cannot convert to datastream
> ---
>
> Key: FLINK-21247
> URL: https://issues.apache.org/jira/browse/FLINK-21247
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.12.1
> Environment: iceberg master
> flink 1.12
>  
>  
>Reporter: donglei
>Assignee: Shuo Cheng
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.14.0, 1.13.1
>
> Attachments: image-2021-02-03-15-38-42-340.png, 
> image-2021-02-03-15-40-27-055.png, image-2021-02-03-15-41-34-426.png, 
> image-2021-02-03-15-43-19-919.png, image-2021-02-03-15-52-12-493.png, 
> image-2021-02-03-15-53-18-244.png
>
>
> Flink Iceberg Table with map
> !image-2021-02-03-15-38-42-340.png!
>  
> we want to read the table like this :
>  
> String querySql = "SELECT 
> ftime,extinfo,country,province,operator,apn,gw,src_ip_head,info_str,product_id,app_version,sdk_id,sdk_version,hardware_os,qua,upload_ip,client_ip,upload_apn,event_code,event_result,package_size,consume_time,event_value,event_time,upload_time,boundle_id,uin,platform,os_version,channel,brand,model
>  from bfzt3 ";
> Table table = tEnv.sqlQuery(querySql);
> DataStream sinkStream = tEnv.toAppendStream(table, 
> Types.POJO(AttaInfo.class, map));
> sinkStream.map(x->1).returns(Types.INT).keyBy(new 
> NullByteKeySelector()).reduce((x,y) -> {
>  return x+y;
> }).print();
>  
>  
> when read  we find a exception
>  
> 2021-02-03 15:37:57
> java.lang.ClassCastException: 
> org.apache.iceberg.flink.data.FlinkParquetReaders$ReusableMapData cannot be 
> cast to org.apache.flink.table.data.binary.BinaryMapData
> at 
> org.apache.flink.table.runtime.typeutils.MapDataSerializer.copy(MapDataSerializer.java:107)
> at 
> org.apache.flink.table.runtime.typeutils.MapDataSerializer.copy(MapDataSerializer.java:47)
> at 
> org.apache.flink.table.runtime.typeutils.RowDataSerializer.copyRowData(RowDataSerializer.java:166)
> at 
> org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:129)
> at 
> org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:50)
> at 
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:69)
> at 
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
> at 
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
> at 
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
> at 
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
> at 
> org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:317)
> at 
> org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:411)
> at 
> org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.run(InputFormatSourceFunction.java:92)
> at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
> at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
> at 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:241)
>  
> we find that iceberg map is  ReusableMapData implements MapData 
> !image-2021-02-03-15-40-27-055.png!
>  
> this is the exception 
> !image-2021-02-03-15-41-34-426.png!
> MapData has two default implements  GenericMapData and BinaryMapData
> from iceberg implement is ReusableMapData
>  
> so i think that code should change to like this 
> !image-2021-02-03-15-43-19-919.png!
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-19449) LEAD/LAG cannot work correctly in streaming mode

2021-04-29 Thread Kurt Young (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-19449?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kurt Young updated FLINK-19449:
---
Fix Version/s: (was: 1.13.0)
   1.13.1
   1.14.0

> LEAD/LAG cannot work correctly in streaming mode
> 
>
> Key: FLINK-19449
> URL: https://issues.apache.org/jira/browse/FLINK-19449
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.10.2, 1.11.2
>Reporter: Benchao Li
>Assignee: Jingsong Lee
>Priority: Major
>  Labels: pull-request-available, stale-major
> Fix For: 1.14.0, 1.13.1
>
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-22449) Casting an invalid constant string to int throws exception from SinkNotNullEnforcer

2021-04-25 Thread Kurt Young (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22449?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17331705#comment-17331705
 ] 

Kurt Young commented on FLINK-22449:


Whether changing the result type nullability should be align with the behavior. 
If certain CAST operation do return NULL when handling illegal data, we should 
change the nullability to nullable. 

> Casting an invalid constant string to int throws exception from 
> SinkNotNullEnforcer
> ---
>
> Key: FLINK-22449
> URL: https://issues.apache.org/jira/browse/FLINK-22449
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.13.0
>Reporter: Caizhi Weng
>Priority: Major
>
> Add the following test case to {{CalcITCase}} to reproduce this bug:
> {code:scala}
> @Test
> def myTest(): Unit = {
>   checkResult("SELECT CAST('haha' AS INT)", Seq(row(null)))
> }
> {code}
> The exception stack is
> {code}
> Caused by: org.apache.flink.table.api.TableException: Column 'EXPR$0' is NOT 
> NULL, however, a null value is being written into it. You can set job 
> configuration 'table.exec.sink.not-null-enforcer'='drop' to suppress this 
> exception and drop such records silently.
>   at 
> org.apache.flink.table.runtime.operators.sink.SinkNotNullEnforcer.filter(SinkNotNullEnforcer.java:56)
>   at 
> org.apache.flink.table.runtime.operators.sink.SinkNotNullEnforcer.filter(SinkNotNullEnforcer.java:30)
>   at 
> org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:38)
>   at 
> org.apache.flink.streaming.runtime.tasks.ChainingOutput.pushToOperator(ChainingOutput.java:112)
>   at 
> org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:93)
>   at 
> org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:39)
>   at 
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
>   at 
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
>   at BatchExecCalc$33.processElement(Unknown Source)
>   at 
> org.apache.flink.streaming.runtime.tasks.ChainingOutput.pushToOperator(ChainingOutput.java:112)
>   at 
> org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:93)
>   at 
> org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:39)
>   at 
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
>   at 
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
>   at 
> org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:317)
>   at 
> org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:411)
>   at 
> org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.run(InputFormatSourceFunction.java:92)
>   at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
>   at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
>   at 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269)
> {code}
> This is because the result type of CAST is inferred as NOT NULL (see 
> {{SqlCastFunction#inferReturnType}} and 
> {{StandardConvertletTable#convertCast}}, the nullability is the same with the 
> input argument), however parsing an invalid string to int will produce null 
> values.
> One way I could think of is to change the result type of CAST to always 
> nullable (at least for some specific types of casting, for example casting 
> from string to int), but as CAST is a very low-level function this might have 
> a big impact (for example, if a rule adds casting, the resulting row type 
> might not be equal to the original row type due to mismatch in nullability).
> So it seems that at the current stage we should set all columns in a select 
> sink to be nullable. However this indicates that one cannot truly rely on the 
> nullability of any result type.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-22449) Casting an invalid constant string to int throws exception from SinkNotNullEnforcer

2021-04-25 Thread Kurt Young (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22449?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17331458#comment-17331458
 ] 

Kurt Young commented on FLINK-22449:


cc [~godfreyhe] [~wenlong.lwl]

Normally, assume the result type be nullable is safer than not null. 

> Casting an invalid constant string to int throws exception from 
> SinkNotNullEnforcer
> ---
>
> Key: FLINK-22449
> URL: https://issues.apache.org/jira/browse/FLINK-22449
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.13.0
>Reporter: Caizhi Weng
>Priority: Major
>
> Add the following test case to {{CalcITCase}} to reproduce this bug:
> {code:scala}
> @Test
> def myTest(): Unit = {
>   checkResult("SELECT CAST('haha' AS INT)", Seq(row(null)))
> }
> {code}
> The exception stack is
> {code}
> Caused by: org.apache.flink.table.api.TableException: Column 'EXPR$0' is NOT 
> NULL, however, a null value is being written into it. You can set job 
> configuration 'table.exec.sink.not-null-enforcer'='drop' to suppress this 
> exception and drop such records silently.
>   at 
> org.apache.flink.table.runtime.operators.sink.SinkNotNullEnforcer.filter(SinkNotNullEnforcer.java:56)
>   at 
> org.apache.flink.table.runtime.operators.sink.SinkNotNullEnforcer.filter(SinkNotNullEnforcer.java:30)
>   at 
> org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:38)
>   at 
> org.apache.flink.streaming.runtime.tasks.ChainingOutput.pushToOperator(ChainingOutput.java:112)
>   at 
> org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:93)
>   at 
> org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:39)
>   at 
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
>   at 
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
>   at BatchExecCalc$33.processElement(Unknown Source)
>   at 
> org.apache.flink.streaming.runtime.tasks.ChainingOutput.pushToOperator(ChainingOutput.java:112)
>   at 
> org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:93)
>   at 
> org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:39)
>   at 
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
>   at 
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
>   at 
> org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:317)
>   at 
> org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:411)
>   at 
> org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.run(InputFormatSourceFunction.java:92)
>   at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
>   at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
>   at 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269)
> {code}
> This is because the result type of CAST is inferred as NOT NULL (see 
> {{SqlCastFunction#inferReturnType}} and 
> {{StandardConvertletTable#convertCast}}, the nullability is the same with the 
> input argument), however parsing an invalid string to int will produce null 
> values.
> One way I could think of is to change the result type of CAST to always 
> nullable (at least for some specific types of casting, for example casting 
> from string to int), but as CAST is a very low-level function this might have 
> a big impact (for example, if a rule adds casting, the resulting row type 
> might not be equal to the original row type due to mismatch in nullability).
> So it seems that at the current stage we should set all columns in a select 
> sink to be nullable. However this indicates that one cannot truly rely on the 
> nullability of any result type.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-22182) Incorrect round when dividing decimals

2021-04-25 Thread Kurt Young (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22182?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17331451#comment-17331451
 ] 

Kurt Young commented on FLINK-22182:


(38, 4) / (38, 4) -> (38, 6) 

It seems we didn't give enough scale for such computation?

> Incorrect round when dividing decimals
> --
>
> Key: FLINK-22182
> URL: https://issues.apache.org/jira/browse/FLINK-22182
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.12.2
>Reporter: Maciej Bryński
>Priority: Major
>
> Hi,
> I have following problem:
> This query:
> {code:java}
> select ((CAST(28604 as decimal(38,4)))/(cast(451 as decimal(38,4))) - 
> 1121)/10 ;
> {code}
> Gives following answer:
> {code:java}
> -105.757650
> {code}
> Correct answer should be:
> {code:java}
> -105,75764966740576496674057649667
> {code}
> Why the result is rounded to 5th decimal place ?
> Saving both values back to Decimal(38,4) column will give different results: 
> -105.7577 vs -105.7576 (where second one is correct)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (FLINK-13734) Support DDL in SQL CLI

2021-04-22 Thread Kurt Young (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-13734?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kurt Young closed FLINK-13734.
--
Resolution: Implemented

I believe this is already supported. 

> Support DDL in SQL CLI
> --
>
> Key: FLINK-13734
> URL: https://issues.apache.org/jira/browse/FLINK-13734
> Project: Flink
>  Issue Type: New Feature
>  Components: Table SQL / Client
>Reporter: Jark Wu
>Priority: Major
>  Labels: stale-major
>
> We have supported DDL in TableEnvironment. We should also support to execute 
> DDL on SQL client to make the feature to be used more easily. However, this 
> might need to modify the current architecture of SQL Client. More detailed 
> design should be attached and discussed. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-17117) There are an useless cast operation for sql on blink when generate code

2021-04-19 Thread Kurt Young (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-17117?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kurt Young updated FLINK-17117:
---
Component/s: (was: Table SQL / Planner)
 Table SQL / Runtime

> There are an useless  cast operation for sql on blink when generate code
> 
>
> Key: FLINK-17117
> URL: https://issues.apache.org/jira/browse/FLINK-17117
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Runtime
>Reporter: hehuiyuan
>Assignee: hehuiyuan
>Priority: Minor
>  Labels: pull-request-available, stale-assigned
> Attachments: image-2020-04-13-19-44-19-174.png
>
>
> !image-2020-04-13-19-44-19-174.png|width=641,height=305!
>  
> This mehthod `generateOneInputStreamOperator` when OperatorCodeGenerator  
> generates SourceConversion:
> {code:java}
> @Override
> public void processElement($STREAM_RECORD $ELEMENT) throws Exception {
>   $inputTypeTerm $inputTerm = ($inputTypeTerm) 
> ${converter(s"$ELEMENT.getValue()")};
>   ${ctx.reusePerRecordCode()}
>   ${ctx.reuseLocalVariableCode()}
>   ${if (lazyInputUnboxingCode) "" else ctx.reuseInputUnboxingCode()}
>   $processCode
> }
> {code}
>  
> {code:java}
>  $inputTypeTerm $inputTerm = ($inputTypeTerm) 
> ${converter(s"$ELEMENT.getValue()")};
> {code}
> ScanUtil calls generateOneInputStreamOperator
> {code:java}
> val generatedOperator = 
> OperatorCodeGenerator.generateOneInputStreamOperator[Any, BaseRow](
>   ctx,
>   convertName,
>   processCode,
>   outputRowType,
>   converter = inputTermConverter)
> //inputTermConverter
> val (inputTermConverter, inputRowType) = {
>   val convertFunc = CodeGenUtils.genToInternal(ctx, inputType)
>   internalInType match {
> case rt: RowType => (convertFunc, rt)
> case _ => ((record: String) => s"$GENERIC_ROW.of(${convertFunc(record)})",
> RowType.of(internalInType))
>   }
> }
> {code}
> CodeGenUtils.scala  :  genToInternal
> {code:java}
> def genToInternal(ctx: CodeGeneratorContext, t: DataType): String => String = 
> {
>   val iTerm = boxedTypeTermForType(fromDataTypeToLogicalType(t))
>   if (isConverterIdentity(t)) {
> term => s"($iTerm) $term"
>   } else {
> val eTerm = boxedTypeTermForExternalType(t)
> val converter = ctx.addReusableObject(
>   DataFormatConverters.getConverterForDataType(t),
>   "converter")
> term => s"($iTerm) $converter.toInternal(($eTerm) $term)"
>   }
> }
> {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (FLINK-20855) Calculating numBuckets exceeds the maximum value of int and got a negative number

2021-04-16 Thread Kurt Young (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-20855?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kurt Young closed FLINK-20855.
--
Resolution: Fixed

Thanks [~hejiefang] for the contribution.

Merged: fd8e34c03b663aff96a625ed751b66244da8793e

> Calculating numBuckets exceeds the maximum value of int and got a negative 
> number
> -
>
> Key: FLINK-20855
> URL: https://issues.apache.org/jira/browse/FLINK-20855
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.11.1, 1.12.0
>Reporter: JieFang.He
>Assignee: JieFang.He
>Priority: Major
>  Labels: pull-request-available, stale-assigned
> Fix For: 1.13.0
>
>
> When i run the TPCDS of 500G,i get a exception
> {code:java}
> Caused by: java.lang.IllegalArgumentException
> at 
> org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:123)
> at 
> org.apache.flink.table.runtime.hashtable.LongHashPartition.setNewBuckets(LongHashPartition.java:223)
> at 
> org.apache.flink.table.runtime.hashtable.LongHashPartition.(LongHashPartition.java:176)
> at 
> org.apache.flink.table.runtime.hashtable.LongHybridHashTable.buildTableFromSpilledPartition(LongHybridHashTable.java:432)
> at 
> org.apache.flink.table.runtime.hashtable.LongHybridHashTable.prepareNextPartition(LongHybridHashTable.java:354)
> at 
> org.apache.flink.table.runtime.hashtable.LongHybridHashTable.nextMatching(LongHybridHashTable.java:145)
> at LongHashJoinOperator$40166.endInput2$(Unknown Source)
> at LongHashJoinOperator$40166.endInput(Unknown Source)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.endOperatorInput(StreamOperatorWrapper.java:101)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain.endHeadOperatorInput(OperatorChain.java:278)
> at 
> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.checkFinished(StreamTwoInputProcessor.java:211)
> at 
> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:183)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:349)
> at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191)
> at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:564)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:534)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
> at java.lang.Thread.run(Thread.java:748)
> {code}
> The reason is: when calculate the numBuckets in LongHashPartition,the result 
> exceeds the maximum value of int and got a negative number
> {code:java}
> LongHashPartition(
>   LongHybridHashTable longTable,
>   int partitionNum,
>   BinaryRowDataSerializer buildSideSerializer,
>   int bucketNumSegs,
>   int recursionLevel,
>   List buffers,
>   int lastSegmentLimit) {
>this(longTable, buildSideSerializer, listToArray(buffers));
>this.partitionNum = partitionNum;
>this.recursionLevel = recursionLevel;
>int numBuckets = MathUtils.roundDownToPowerOf2(bucketNumSegs * segmentSize 
> / 16);
>MemorySegment[] buckets = new MemorySegment[bucketNumSegs];
>for (int i = 0; i < bucketNumSegs; i++) {
>   buckets[i] = longTable.nextSegment();
>}
>setNewBuckets(buckets, numBuckets);
>this.finalBufferLimit = lastSegmentLimit;
> }
> {code}
> A way to avoid the exception is to adjust the calculation order
> change
> {code:java}
> int numBuckets = MathUtils.roundDownToPowerOf2(bucketNumSegs * segmentSize / 
> 16);
> {code}
> to
> {code:java}
> int numBuckets = MathUtils.roundDownToPowerOf2(segmentSize / 16 * 
> bucketNumSegs);
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (FLINK-22100) BatchFileSystemITCaseBase.testPartialDynamicPartition fail because of no output for 900 seconds

2021-04-16 Thread Kurt Young (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22100?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kurt Young closed FLINK-22100.
--
Resolution: Duplicate

> BatchFileSystemITCaseBase.testPartialDynamicPartition fail because of no 
> output for 900 seconds
> ---
>
> Key: FLINK-22100
> URL: https://issues.apache.org/jira/browse/FLINK-22100
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.13.0
>Reporter: Guowei Ma
>Assignee: Caizhi Weng
>Priority: Critical
>  Labels: test-stability
> Fix For: 1.13.0
>
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=15988=logs=d44f43ce-542c-597d-bf94-b0718c71e5e8=03dca39c-73e8-5aaf-601d-328ae5c35f20=16696
> {code:java}
> "main" #1 prio=5 os_prio=0 tid=0x7f844400b800 nid=0x7b96 waiting on 
> condition [0x7f844be37000]
>java.lang.Thread.State: TIMED_WAITING (sleeping)
>   at java.lang.Thread.sleep(Native Method)
>   at 
> org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.sleepBeforeRetry(CollectResultFetcher.java:229)
>   at 
> org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.next(CollectResultFetcher.java:111)
>   at 
> org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:106)
>   at 
> org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:80)
>   at 
> org.apache.flink.table.planner.connectors.SelectTableSinkBase$RowIteratorWrapper.hasNext(SelectTableSinkBase.java:117)
>   at 
> org.apache.flink.table.api.internal.TableResultImpl$CloseableRowIteratorWrapper.hasNext(TableResultImpl.java:351)
>   at java.util.Iterator.forEachRemaining(Iterator.java:115)
>   at 
> org.apache.flink.util.CollectionUtil.iteratorToList(CollectionUtil.java:109)
>   at 
> org.apache.flink.table.planner.runtime.utils.BatchTestBase.executeQuery(BatchTestBase.scala:300)
>   at 
> org.apache.flink.table.planner.runtime.utils.BatchTestBase.check(BatchTestBase.scala:140)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (FLINK-20254) HiveTableSourceITCase.testStreamPartitionReadByCreateTime times out

2021-04-16 Thread Kurt Young (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-20254?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kurt Young closed FLINK-20254.
--
Resolution: Duplicate

> HiveTableSourceITCase.testStreamPartitionReadByCreateTime times out
> ---
>
> Key: FLINK-20254
> URL: https://issues.apache.org/jira/browse/FLINK-20254
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Hive
>Affects Versions: 1.12.0, 1.13.0
>Reporter: Robert Metzger
>Assignee: Leonard Xu
>Priority: Critical
>  Labels: pull-request-available, test-stability
> Fix For: 1.13.0
>
>
> [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=9808=logs=fc5181b0-e452-5c8f-68de-1097947f6483=62110053-334f-5295-a0ab-80dd7e2babbf]
> {code:java}
> 2020-11-19T10:34:23.5591765Z [ERROR] Tests run: 18, Failures: 0, Errors: 1, 
> Skipped: 0, Time elapsed: 192.243 s <<< FAILURE! - in 
> org.apache.flink.connectors.hive.HiveTableSourceITCase
> 2020-11-19T10:34:23.5593193Z [ERROR] 
> testStreamPartitionReadByCreateTime(org.apache.flink.connectors.hive.HiveTableSourceITCase)
>   Time elapsed: 120.075 s  <<< ERROR!
> 2020-11-19T10:34:23.5593929Z org.junit.runners.model.TestTimedOutException: 
> test timed out after 12 milliseconds
> 2020-11-19T10:34:23.5594321Z  at java.lang.Thread.sleep(Native Method)
> 2020-11-19T10:34:23.5594777Z  at 
> org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.sleepBeforeRetry(CollectResultFetcher.java:231)
> 2020-11-19T10:34:23.5595378Z  at 
> org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.next(CollectResultFetcher.java:119)
> 2020-11-19T10:34:23.5596001Z  at 
> org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:103)
> 2020-11-19T10:34:23.5596610Z  at 
> org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:77)
> 2020-11-19T10:34:23.5597218Z  at 
> org.apache.flink.table.planner.sinks.SelectTableSinkBase$RowIteratorWrapper.hasNext(SelectTableSinkBase.java:115)
> 2020-11-19T10:34:23.5597811Z  at 
> org.apache.flink.table.api.internal.TableResultImpl$CloseableRowIteratorWrapper.hasNext(TableResultImpl.java:355)
> 2020-11-19T10:34:23.5598555Z  at 
> org.apache.flink.connectors.hive.HiveTableSourceITCase.fetchRows(HiveTableSourceITCase.java:653)
> 2020-11-19T10:34:23.5599407Z  at 
> org.apache.flink.connectors.hive.HiveTableSourceITCase.testStreamPartitionReadByCreateTime(HiveTableSourceITCase.java:594)
> 2020-11-19T10:34:23.5599982Z  at 
> sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> 2020-11-19T10:34:23.5600393Z  at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> 2020-11-19T10:34:23.5600865Z  at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 2020-11-19T10:34:23.5601300Z  at 
> java.lang.reflect.Method.invoke(Method.java:498)
> 2020-11-19T10:34:23.5601713Z  at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
> 2020-11-19T10:34:23.5602211Z  at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> 2020-11-19T10:34:23.5602688Z  at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
> 2020-11-19T10:34:23.5603181Z  at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> 2020-11-19T10:34:23.5603753Z  at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:298)
> 2020-11-19T10:34:23.5604308Z  at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:292)
> 2020-11-19T10:34:23.5604780Z  at 
> java.util.concurrent.FutureTask.run(FutureTask.java:266)
> 2020-11-19T10:34:23.5605114Z  at java.lang.Thread.run(Thread.java:748)
> 2020-11-19T10:34:23.5605299Z 
> 2020-11-19T10:34:24.4180149Z [INFO] Running 
> org.apache.flink.connectors.hive.TableEnvHiveConnectorITCase
> {code}
>  
> I've spent some time to debug this case in local env, but unfortunately I 
> didn't find the root cause. I think this is the same case with FLINK-22129, 
> FLINK-22100, but after the debug, these failed tests in 
> *HiveTableSourceITCase* should not exist bugs, and the root cause may be 
> related to FLINK-21996.
> Given some debug results:
>  * 1. Run HiveTableSourceITCase.testStreamPartitionReadByCreateTime times out 
> repeatedly in IDE, it mostly failed at about the 36th round
>  * 2. The thread usually hangs on 
> {code:java}
> org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.next(CollectResultFetcher.java:119){code}
> {code:java}
> 

[jira] [Closed] (FLINK-22129) OrcFileSystemITCase hangs on azure

2021-04-16 Thread Kurt Young (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22129?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kurt Young closed FLINK-22129.
--
Resolution: Duplicate

> OrcFileSystemITCase hangs on azure
> --
>
> Key: FLINK-22129
> URL: https://issues.apache.org/jira/browse/FLINK-22129
> Project: Flink
>  Issue Type: Bug
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile), Table 
> SQL / Ecosystem
>Affects Versions: 1.13.0
>Reporter: Dawid Wysakowicz
>Assignee: Caizhi Weng
>Priority: Critical
>  Labels: test-stability
> Fix For: 1.13.0
>
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=16071=logs=d44f43ce-542c-597d-bf94-b0718c71e5e8=03dca39c-73e8-5aaf-601d-328ae5c35f20=14239
> {code}
> "main" #1 prio=5 os_prio=0 tid=0x7f8cbc00b800 nid=0x7ebe waiting on 
> condition [0x7f8cc5e45000]
>java.lang.Thread.State: TIMED_WAITING (sleeping)
>   at java.lang.Thread.sleep(Native Method)
>   at 
> org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.sleepBeforeRetry(CollectResultFetcher.java:229)
>   at 
> org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.next(CollectResultFetcher.java:111)
>   at 
> org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:106)
>   at 
> org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:80)
>   at 
> org.apache.flink.table.planner.connectors.SelectTableSinkBase$RowIteratorWrapper.hasNext(SelectTableSinkBase.java:117)
>   at 
> org.apache.flink.table.api.internal.TableResultImpl$CloseableRowIteratorWrapper.hasNext(TableResultImpl.java:351)
>   at java.util.Iterator.forEachRemaining(Iterator.java:115)
>   at 
> org.apache.flink.util.CollectionUtil.iteratorToList(CollectionUtil.java:109)
>   at 
> org.apache.flink.table.planner.runtime.utils.BatchTestBase.executeQuery(BatchTestBase.scala:300)
>   at 
> org.apache.flink.table.planner.runtime.utils.BatchTestBase.check(BatchTestBase.scala:140)
>   at 
> org.apache.flink.table.planner.runtime.utils.BatchTestBase.checkResult(BatchTestBase.scala:106)
>   at 
> org.apache.flink.table.planner.runtime.batch.sql.BatchFileSystemITCaseBase.check(BatchFileSystemITCaseBase.scala:46)
>   at 
> org.apache.flink.table.planner.runtime.FileSystemITCaseBase$class.check(FileSystemITCaseBase.scala:57)
>   at 
> org.apache.flink.table.planner.runtime.batch.sql.BatchFileSystemITCaseBase.check(BatchFileSystemITCaseBase.scala:33)
>   at 
> org.apache.flink.orc.OrcFileSystemITCase.testOrcFilterPushDown(OrcFileSystemITCase.java:131)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>   at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (FLINK-22308) Fix CliTableauResultView print results after cancel in STREAMING mode

2021-04-16 Thread Kurt Young (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22308?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kurt Young closed FLINK-22308.
--
  Assignee: Shengkai Fang
Resolution: Fixed

fixed: e4a2738e8b9badf37eab4874109a3e49ad47a341

> Fix CliTableauResultView print results after cancel in STREAMING mode
> -
>
> Key: FLINK-22308
> URL: https://issues.apache.org/jira/browse/FLINK-22308
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Client
>Affects Versions: 1.13.0
>Reporter: Shengkai Fang
>Assignee: Shengkai Fang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.13.0
>
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (FLINK-22016) PushFilterIntoTableSourceScanRule fails to deal with NULLs

2021-04-16 Thread Kurt Young (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22016?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kurt Young closed FLINK-22016.
--
Resolution: Fixed

fixed: 1562ed02796798ad450b3d4a778974ec086e0419

> PushFilterIntoTableSourceScanRule fails to deal with NULLs
> --
>
> Key: FLINK-22016
> URL: https://issues.apache.org/jira/browse/FLINK-22016
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.13.0
>Reporter: Caizhi Weng
>Assignee: Caizhi Weng
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.13.0
>
>
> Add the following test case to {{PushFilterIntoTableSourceScanRuleTest}} to 
> reproduce this bug:
> {code:java}
> @Test
> public void myTest() {
> String ddl =
> "CREATE TABLE MTable ("
> + "  a STRING,"
> + "  b STRING"
> + ") WITH ("
> + "  'connector' = 'values',"
> + "  'bounded' = 'true'"
> + ")";
> util().tableEnv().executeSql(ddl);
> util().verifyRelPlan("WITH MView AS (SELECT CASE\n"
> + "  WHEN a = b THEN a\n"
> + "  ELSE CAST(NULL AS STRING)\n"
> + "  END AS a\n"
> + "  FROM MTable)\n"
> + "SELECT a FROM MView WHERE a IS NOT NULL");
> }
> {code}
> The exception stack is
> {code}
> org.apache.flink.table.api.ValidationException: Data type 'STRING NOT NULL' 
> does not support null values.
>   at 
> org.apache.flink.table.expressions.ValueLiteralExpression.validateValueDataType(ValueLiteralExpression.java:272)
>   at 
> org.apache.flink.table.expressions.ValueLiteralExpression.(ValueLiteralExpression.java:79)
>   at 
> org.apache.flink.table.expressions.ApiExpressionUtils.valueLiteral(ApiExpressionUtils.java:251)
>   at 
> org.apache.flink.table.planner.plan.utils.RexNodeToExpressionConverter.visitLiteral(RexNodeExtractor.scala:463)
>   at 
> org.apache.flink.table.planner.plan.utils.RexNodeToExpressionConverter.visitLiteral(RexNodeExtractor.scala:361)
>   at org.apache.calcite.rex.RexLiteral.accept(RexLiteral.java:1173)
>   at 
> org.apache.flink.table.planner.plan.utils.RexNodeToExpressionConverter$$anonfun$8.apply(RexNodeExtractor.scala:471)
>   at 
> org.apache.flink.table.planner.plan.utils.RexNodeToExpressionConverter$$anonfun$8.apply(RexNodeExtractor.scala:471)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>   at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>   at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>   at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>   at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>   at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>   at 
> org.apache.flink.table.planner.plan.utils.RexNodeToExpressionConverter.visitCall(RexNodeExtractor.scala:470)
>   at 
> org.apache.flink.table.planner.plan.utils.RexNodeToExpressionConverter.visitCall(RexNodeExtractor.scala:361)
>   at org.apache.calcite.rex.RexCall.accept(RexCall.java:174)
>   at 
> org.apache.flink.table.planner.plan.utils.RexNodeToExpressionConverter$$anonfun$8.apply(RexNodeExtractor.scala:471)
>   at 
> org.apache.flink.table.planner.plan.utils.RexNodeToExpressionConverter$$anonfun$8.apply(RexNodeExtractor.scala:471)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>   at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>   at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>   at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>   at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>   at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>   at 
> org.apache.flink.table.planner.plan.utils.RexNodeToExpressionConverter.visitCall(RexNodeExtractor.scala:470)
>   at 
> org.apache.flink.table.planner.plan.utils.RexNodeToExpressionConverter.visitCall(RexNodeExtractor.scala:361)
>   at org.apache.calcite.rex.RexCall.accept(RexCall.java:174)
>   at 
> org.apache.flink.table.planner.plan.utils.RexNodeExtractor$$anonfun$extractConjunctiveConditions$1.apply(RexNodeExtractor.scala:138)
>   at 
> 

[jira] [Closed] (FLINK-20761) Cannot read hive table/partition whose location path contains comma

2021-04-15 Thread Kurt Young (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-20761?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kurt Young closed FLINK-20761.
--
Resolution: Fixed

1.13.0: 45e2fb584579952d6abc9a4507cbc3b09b071c12

1.12.3: 902a0f54b487ee39a8adb0e04c826bcc4a9dcab7

> Cannot read hive table/partition whose location path contains comma
> ---
>
> Key: FLINK-20761
> URL: https://issues.apache.org/jira/browse/FLINK-20761
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Hive
>Reporter: Rui Li
>Assignee: Rui Li
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.13.0, 1.12.3
>
>
> We probably need to call hadoop {{StringUtils::escapeString}} to escape the 
> input path string.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (FLINK-22293) Throw IllegalArgumentException when getNumFiles with partitions on different hdfs nameservices

2021-04-15 Thread Kurt Young (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22293?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kurt Young closed FLINK-22293.
--
Resolution: Duplicate

> Throw IllegalArgumentException when getNumFiles with partitions on different 
> hdfs nameservices
> --
>
> Key: FLINK-22293
> URL: https://issues.apache.org/jira/browse/FLINK-22293
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Hive
>Affects Versions: 1.12.2
>Reporter: Junfan Zhang
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-13437) Add Hive SQL E2E test

2021-04-15 Thread Kurt Young (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-13437?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kurt Young updated FLINK-13437:
---
Fix Version/s: (was: 1.13.0)

> Add Hive SQL E2E test
> -
>
> Key: FLINK-13437
> URL: https://issues.apache.org/jira/browse/FLINK-13437
> Project: Flink
>  Issue Type: Test
>  Components: Connectors / Hive, Tests
>Affects Versions: 1.9.0
>Reporter: Till Rohrmann
>Assignee: Terry Wang
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> We should add an E2E test for the Hive integration: List all tables and read 
> some metadata, read from an existing table, register a new table in Hive, use 
> a registered function, write to an existing table, write to a new table.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (FLINK-22169) Beautify the CliTableauResultView when print

2021-04-14 Thread Kurt Young (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22169?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kurt Young closed FLINK-22169.
--
Resolution: Fixed

fixed: 326a564ebc0e397654ccd4e5a83a77ef811c6a76

> Beautify the CliTableauResultView when print
> 
>
> Key: FLINK-22169
> URL: https://issues.apache.org/jira/browse/FLINK-22169
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Client
>Affects Versions: 1.13.0
>Reporter: Shengkai Fang
>Assignee: Shengkai Fang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.13.0
>
> Attachments: print.png
>
>
> In batch mode, the print is not as same as before.
>  !print.png! 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-22280) Add XML output format for Flink Table

2021-04-14 Thread Kurt Young (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17321901#comment-17321901
 ] 

Kurt Young commented on FLINK-22280:


I also think this would be helpful. [~flacombe]

> Add XML output format for Flink Table
> -
>
> Key: FLINK-22280
> URL: https://issues.apache.org/jira/browse/FLINK-22280
> Project: Flink
>  Issue Type: New Feature
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile), Table 
> SQL / Ecosystem
>Affects Versions: 1.12.1
>Reporter: François Lacombe
>Priority: Major
>
> Dear maintainers
> I'm looking for the ability to output xml files from Flink Table API, just 
> like csv and json already supported formats.
> To me, a new format could be required to make the appropriate serialization. 
> Am I missing any existing feature (or duplicate issue) that could allow it 
> without a dedicated format?
> Depending on your returns and if it makes sense, I could get involved in 
> writing the appropriate format based on the same logic as 
> `JsonRowDataSerializationSchema`.
> Best regards



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (FLINK-21660) CREATE TABLE LIKE cannot reference tables in HiveCatalog

2021-04-14 Thread Kurt Young (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-21660?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kurt Young closed FLINK-21660.
--
Resolution: Fixed

fixed: 5b9e7882207357120717966d8bf7efd53c53ede5

> CREATE TABLE LIKE cannot reference tables in HiveCatalog
> 
>
> Key: FLINK-21660
> URL: https://issues.apache.org/jira/browse/FLINK-21660
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Hive
>Affects Versions: 1.12.1
>Reporter: liqiang.liu
>Assignee: Rui Li
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.13.0
>
>
> create a table like with a hive catalog table
> {color:#cc7832}CREATE TABLE {color}temp_order
> {color:#cc7832}WITH {color}(
>  {color:#6a8759}'properties.group.id' {color}= 
> {color:#6a8759}'test1'{color}{color:#cc7832},
> {color} {color:#6a8759}'properties.bootstrap.servers' {color}= 
> {color:#6a8759}'kafka.address'
> {color})
> {color:#cc7832}LIKE {color}hive.ods.flink_order
> It has a error, log is :
>  
> Exception in thread "main" org.apache.flink.table.api.ValidationException: 
> Unable to create a source for reading table 
> 'default_catalog.default_database.test_taos_customer_agent_cinema'.
> Table options are:
> 'connector'='upsert-kafka'
> 'is_generic'='true'
> 'key.format'='json'
> 'properties.bootstrap.servers'='kafka.address'
> 'properties.group.id'='test1'
> 'topic'='topic1'
> 'value.format'='json'
> ...
> Caused by: org.apache.flink.table.api.ValidationException: Unsupported 
> options found for connector 'upsert-kafka'.
> Unsupported options:
> is_generic
>  
> the ’is_generic‘ option is  added by hive catalog,  but  `kafka` connector  
> does not support this  option.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (FLINK-22278) Refactor sql client's DynamicResult

2021-04-14 Thread Kurt Young (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22278?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kurt Young closed FLINK-22278.
--
Resolution: Duplicate

created twice by accident

> Refactor sql client's DynamicResult
> ---
>
> Key: FLINK-22278
> URL: https://issues.apache.org/jira/browse/FLINK-22278
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Client
>Reporter: Kurt Young
>Priority: Major
>
> We can simplify the design around sql client's Executor and DynamicResult, by 
> reducing the responsibility of Executor when retrieving SELECT result.
> Page related logic should be handled by different CliResultViews instead of 
> Executor and different DynamicResults.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-22278) Refactor sql client's DynamicResult

2021-04-14 Thread Kurt Young (Jira)
Kurt Young created FLINK-22278:
--

 Summary: Refactor sql client's DynamicResult
 Key: FLINK-22278
 URL: https://issues.apache.org/jira/browse/FLINK-22278
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / Client
Reporter: Kurt Young


We can simplify the design around sql client's Executor and DynamicResult, by 
reducing the responsibility of Executor when retrieving SELECT result.

Page related logic should be handled by different CliResultViews instead of 
Executor and different DynamicResults.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-22277) Refactor sql client's DynamicResult

2021-04-14 Thread Kurt Young (Jira)
Kurt Young created FLINK-22277:
--

 Summary: Refactor sql client's DynamicResult 
 Key: FLINK-22277
 URL: https://issues.apache.org/jira/browse/FLINK-22277
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / Client
Reporter: Kurt Young


Right now, the sql client Executor has different result handling logic for 
different result display mode. 

Different result is handled by child classes of `DynamicResult`. 

This leads to introducing page related API to the Executor, such as 
`snapshotResult`  and `retrieveResultPage`, which I think is inappropriate and 
will make things complicated. 

It will be benefit to simplify the responsibility of Executor about retrieving 
results to simply streaming back the SELECT result, and move the logic of 
dealing with different display mode into CliResultView. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-22271) FlinkSQL Read Hive(parquet file) field does not exist

2021-04-14 Thread Kurt Young (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22271?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kurt Young updated FLINK-22271:
---
Issue Type: New Feature  (was: Bug)

> FlinkSQL Read Hive(parquet file) field does not exist 
> --
>
> Key: FLINK-22271
> URL: https://issues.apache.org/jira/browse/FLINK-22271
> Project: Flink
>  Issue Type: New Feature
>  Components: Table SQL / API
>Affects Versions: 1.12.2
>Reporter: moran
>Priority: Major
>
> Create a parquet table format student insert the data for each field is not 
> empty, FlinkSQL can query the table, if you add a field after query error.
> 1.Step:
> CREATE TABLE tmp.student ( 
> name STRING, 
> age INT
> )
> STORED AS PARQUET;
> insert into table tmp.student  values ("java", 12);
> FlinkSQL can read the table at this point.
> 2.Step:
> alter table tmp.student add columns(update_time timestamp);
> Query error after adding field update_time.
> error:
> java.lang.IllegalArgumentException: update_time does not exist
> at 
> org.apache.flink.hive.shaded.formats.parquet.ParquetVectorizedInputFormat.clipParquetSchema(ParquetVectorizedInputFormat.java:193)
>   at 
> org.apache.flink.hive.shaded.formats.parquet.ParquetVectorizedInputFormat.createReader(ParquetVectorizedInputFormat.java:120)
>   at 
> org.apache.flink.hive.shaded.formats.parquet.ParquetVectorizedInputFormat.createReader(ParquetVectorizedInputFormat.java:73)
>   at 
> org.apache.flink.connectors.hive.read.HiveBulkFormatAdapter.createReader(HiveBulkFormatAdapter.java:108)
>   at 
> org.apache.flink.connectors.hive.read.HiveBulkFormatAdapter.createReader(HiveBulkFormatAdapter.java:63)
>   at 
> org.apache.flink.connector.file.src.impl.FileSourceSplitReader.checkSplitOrStartNext(FileSourceSplitReader.java:112)
>   at 
> org.apache.flink.connector.file.src.impl.FileSourceSplitReader.fetch(FileSourceSplitReader.java:65)
>   at 
> org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:56)
>   at 
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:138)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-22271) FlinkSQL Read Hive(parquet file) field does not exist

2021-04-14 Thread Kurt Young (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22271?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17320879#comment-17320879
 ] 

Kurt Young commented on FLINK-22271:


This is by design for now, but I think it's also useful if we can make the 
schema handling to be more flexible in the future. cc [~jark]

> FlinkSQL Read Hive(parquet file) field does not exist 
> --
>
> Key: FLINK-22271
> URL: https://issues.apache.org/jira/browse/FLINK-22271
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Hive
>Affects Versions: 1.12.2
>Reporter: moran
>Priority: Major
>
> Create a parquet table format student insert the data for each field is not 
> empty, FlinkSQL can query the table, if you add a field after query error.
> 1.Step:
> CREATE TABLE tmp.student ( 
> name STRING, 
> age INT
> )
> STORED AS PARQUET;
> insert into table tmp.student  values ("java", 12);
> FlinkSQL can read the table at this point.
> 2.Step:
> alter table tmp.student add columns(update_time timestamp);
> Query error after adding field update_time.
> error:
> java.lang.IllegalArgumentException: update_time does not exist
> at 
> org.apache.flink.hive.shaded.formats.parquet.ParquetVectorizedInputFormat.clipParquetSchema(ParquetVectorizedInputFormat.java:193)
>   at 
> org.apache.flink.hive.shaded.formats.parquet.ParquetVectorizedInputFormat.createReader(ParquetVectorizedInputFormat.java:120)
>   at 
> org.apache.flink.hive.shaded.formats.parquet.ParquetVectorizedInputFormat.createReader(ParquetVectorizedInputFormat.java:73)
>   at 
> org.apache.flink.connectors.hive.read.HiveBulkFormatAdapter.createReader(HiveBulkFormatAdapter.java:108)
>   at 
> org.apache.flink.connectors.hive.read.HiveBulkFormatAdapter.createReader(HiveBulkFormatAdapter.java:63)
>   at 
> org.apache.flink.connector.file.src.impl.FileSourceSplitReader.checkSplitOrStartNext(FileSourceSplitReader.java:112)
>   at 
> org.apache.flink.connector.file.src.impl.FileSourceSplitReader.fetch(FileSourceSplitReader.java:65)
>   at 
> org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:56)
>   at 
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:138)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-22271) FlinkSQL Read Hive(parquet file) field does not exist

2021-04-14 Thread Kurt Young (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22271?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kurt Young updated FLINK-22271:
---
Component/s: (was: Connectors / Hive)
 Table SQL / API

> FlinkSQL Read Hive(parquet file) field does not exist 
> --
>
> Key: FLINK-22271
> URL: https://issues.apache.org/jira/browse/FLINK-22271
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.12.2
>Reporter: moran
>Priority: Major
>
> Create a parquet table format student insert the data for each field is not 
> empty, FlinkSQL can query the table, if you add a field after query error.
> 1.Step:
> CREATE TABLE tmp.student ( 
> name STRING, 
> age INT
> )
> STORED AS PARQUET;
> insert into table tmp.student  values ("java", 12);
> FlinkSQL can read the table at this point.
> 2.Step:
> alter table tmp.student add columns(update_time timestamp);
> Query error after adding field update_time.
> error:
> java.lang.IllegalArgumentException: update_time does not exist
> at 
> org.apache.flink.hive.shaded.formats.parquet.ParquetVectorizedInputFormat.clipParquetSchema(ParquetVectorizedInputFormat.java:193)
>   at 
> org.apache.flink.hive.shaded.formats.parquet.ParquetVectorizedInputFormat.createReader(ParquetVectorizedInputFormat.java:120)
>   at 
> org.apache.flink.hive.shaded.formats.parquet.ParquetVectorizedInputFormat.createReader(ParquetVectorizedInputFormat.java:73)
>   at 
> org.apache.flink.connectors.hive.read.HiveBulkFormatAdapter.createReader(HiveBulkFormatAdapter.java:108)
>   at 
> org.apache.flink.connectors.hive.read.HiveBulkFormatAdapter.createReader(HiveBulkFormatAdapter.java:63)
>   at 
> org.apache.flink.connector.file.src.impl.FileSourceSplitReader.checkSplitOrStartNext(FileSourceSplitReader.java:112)
>   at 
> org.apache.flink.connector.file.src.impl.FileSourceSplitReader.fetch(FileSourceSplitReader.java:65)
>   at 
> org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:56)
>   at 
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:138)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (FLINK-21592) RemoveSingleAggregateRule fails due to nullability mismatch

2021-04-14 Thread Kurt Young (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-21592?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kurt Young closed FLINK-21592.
--
Resolution: Fixed

fixed: b22bc62ae59d3ccaef95507897c7725970e4e5c3

> RemoveSingleAggregateRule fails due to nullability mismatch
> ---
>
> Key: FLINK-21592
> URL: https://issues.apache.org/jira/browse/FLINK-21592
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.12.2
>Reporter: Rui Li
>Assignee: Rui Li
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.13.0
>
>
> The test case to reproduce the issue:
> {code}
> @Test
> public void test() throws Exception {
> tableEnv.executeSql("create table foo(x int,y int)");
> tableEnv.executeSql("create table bar(i int,s string)");
> System.out.println(tableEnv.explainSql("select (select count(x)-1 
> from foo where foo.y=bar.i) from bar"));
> }
> {code}
> Error stack trace is:
> {noformat}
> java.lang.AssertionError: Cannot add expression of different type to set:
> set type is RecordType(BIGINT NOT NULL $f0) NOT NULL
> expression type is RecordType(BIGINT $f0) NOT NULL
> set is 
> rel#94:LogicalAggregate.NONE.any.[](input=HepRelVertex#93,group={},agg#0=SINGLE_VALUE($0))
> expression is LogicalProject($f0=[CAST(-($0, 1)):BIGINT])
>   LogicalAggregate(group=[{}], agg#0=[COUNT($0)])
> LogicalProject(x=[$0])
>   LogicalFilter(condition=[=($1, $cor0.i)])
> LogicalTableScan(table=[[test-catalog, default, foo]])
>   at 
> org.apache.calcite.plan.RelOptUtil.verifyTypeEquivalence(RelOptUtil.java:381)
>   at 
> org.apache.calcite.plan.hep.HepRuleCall.transformTo(HepRuleCall.java:58)
>   at 
> org.apache.calcite.plan.RelOptRuleCall.transformTo(RelOptRuleCall.java:268)
>   at 
> org.apache.calcite.plan.RelOptRuleCall.transformTo(RelOptRuleCall.java:283)
>   at 
> org.apache.calcite.sql2rel.RelDecorrelator$RemoveSingleAggregateRule.onMatch(RelDecorrelator.java:1881)
>   at 
> org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:333)
>   at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:542)
>   at 
> org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:407)
>   at 
> org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:243)
>   at 
> org.apache.calcite.plan.hep.HepInstruction$RuleInstance.execute(HepInstruction.java:127)
>   at 
> org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:202)
>   at 
> org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:189)
>   at 
> org.apache.calcite.sql2rel.RelDecorrelator.removeCorrelationViaRule(RelDecorrelator.java:346)
>   at 
> org.apache.calcite.sql2rel.RelDecorrelator.decorrelateQuery(RelDecorrelator.java:192)
>   at 
> org.apache.calcite.sql2rel.RelDecorrelator.decorrelateQuery(RelDecorrelator.java:169)
>   at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkDecorrelateProgram.optimize(FlinkDecorrelateProgram.scala:41)
> {noformat}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-21592) RemoveSingleAggregateRule fails due to nullability mismatch

2021-04-14 Thread Kurt Young (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-21592?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kurt Young updated FLINK-21592:
---
Affects Version/s: 1.12.2

> RemoveSingleAggregateRule fails due to nullability mismatch
> ---
>
> Key: FLINK-21592
> URL: https://issues.apache.org/jira/browse/FLINK-21592
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.12.2
>Reporter: Rui Li
>Assignee: Rui Li
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.13.0
>
>
> The test case to reproduce the issue:
> {code}
> @Test
> public void test() throws Exception {
> tableEnv.executeSql("create table foo(x int,y int)");
> tableEnv.executeSql("create table bar(i int,s string)");
> System.out.println(tableEnv.explainSql("select (select count(x)-1 
> from foo where foo.y=bar.i) from bar"));
> }
> {code}
> Error stack trace is:
> {noformat}
> java.lang.AssertionError: Cannot add expression of different type to set:
> set type is RecordType(BIGINT NOT NULL $f0) NOT NULL
> expression type is RecordType(BIGINT $f0) NOT NULL
> set is 
> rel#94:LogicalAggregate.NONE.any.[](input=HepRelVertex#93,group={},agg#0=SINGLE_VALUE($0))
> expression is LogicalProject($f0=[CAST(-($0, 1)):BIGINT])
>   LogicalAggregate(group=[{}], agg#0=[COUNT($0)])
> LogicalProject(x=[$0])
>   LogicalFilter(condition=[=($1, $cor0.i)])
> LogicalTableScan(table=[[test-catalog, default, foo]])
>   at 
> org.apache.calcite.plan.RelOptUtil.verifyTypeEquivalence(RelOptUtil.java:381)
>   at 
> org.apache.calcite.plan.hep.HepRuleCall.transformTo(HepRuleCall.java:58)
>   at 
> org.apache.calcite.plan.RelOptRuleCall.transformTo(RelOptRuleCall.java:268)
>   at 
> org.apache.calcite.plan.RelOptRuleCall.transformTo(RelOptRuleCall.java:283)
>   at 
> org.apache.calcite.sql2rel.RelDecorrelator$RemoveSingleAggregateRule.onMatch(RelDecorrelator.java:1881)
>   at 
> org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:333)
>   at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:542)
>   at 
> org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:407)
>   at 
> org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:243)
>   at 
> org.apache.calcite.plan.hep.HepInstruction$RuleInstance.execute(HepInstruction.java:127)
>   at 
> org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:202)
>   at 
> org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:189)
>   at 
> org.apache.calcite.sql2rel.RelDecorrelator.removeCorrelationViaRule(RelDecorrelator.java:346)
>   at 
> org.apache.calcite.sql2rel.RelDecorrelator.decorrelateQuery(RelDecorrelator.java:192)
>   at 
> org.apache.calcite.sql2rel.RelDecorrelator.decorrelateQuery(RelDecorrelator.java:169)
>   at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkDecorrelateProgram.optimize(FlinkDecorrelateProgram.scala:41)
> {noformat}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (FLINK-22207) Hive Catalog retrieve Flink Properties error

2021-04-14 Thread Kurt Young (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22207?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kurt Young closed FLINK-22207.
--
Fix Version/s: (was: 1.12.1)
   1.13.0
 Assignee: wangzeyu
   Resolution: Fixed

fixed: c29ee91abdb092f8dca5274a6628b71e88b57485

> Hive Catalog retrieve Flink Properties error
> 
>
> Key: FLINK-22207
> URL: https://issues.apache.org/jira/browse/FLINK-22207
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Hive
>Affects Versions: 1.12.1
>Reporter: wangzeyu
>Assignee: wangzeyu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.13.0
>
>
> If we use hive catalog and set flink propertie eg."url",and then when we add 
> the other propertie like start with string "flink." like "flink.url" will 
> show we the error "java.lang.IllegalStateException: Duplicate key" , and 
> what's worse is because of this error we can`t drop or alter this tabel more.
> I found in this method , 
> "org.apache.flink.table.catalog.hive.HiveCatalog.retrieveFlinkProperties", 
> replace all "flink." of propertie. So , the  propertie of HiveCatalog 
> "flink.url" and "flink.flink.url" both result to "url". 
> I think in the method 
> "org.apache.flink.table.catalog.hive.HiveCatalog.retrieveFlinkProperties" we 
> should use replaceFirst but not replace to handle propertie.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-22263) Using TIMESTAMPADD function with partition value has some problem when push partition into TableSource

2021-04-13 Thread Kurt Young (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22263?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kurt Young updated FLINK-22263:
---
Component/s: Table SQL / Planner

> Using TIMESTAMPADD function with partition value has some problem  when push 
> partition into  TableSource
> 
>
> Key: FLINK-22263
> URL: https://issues.apache.org/jira/browse/FLINK-22263
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.12.2
>Reporter: hehuiyuan
>Priority: Major
>
> SQL (table api):
> {code:java}
> CREATE CATALOG myhive
> WITH (
> 'type' = 'hive',
> 'default-database' = 'hhy'
> );
> INSERT INTO  default_catalog.default_database.table_sink select * from  
> myhive.hhy.tmp_flink_test where dt=CAST(TIMESTAMPADD(DAY, -1, CURRENT_DATE) 
> as varchar);
> {code}
>  
> Error log:
> {code:java}
> Exception in thread "main" org.apache.flink.table.api.ValidationException: 
> Data type 'INTERVAL SECOND(3) NOT NULL' with conversion class 
> 'java.time.Duration' does not support a value literal of class 
> 'java.math.BigDecimal'.Exception in thread "main" 
> org.apache.flink.table.api.ValidationException: Data type 'INTERVAL SECOND(3) 
> NOT NULL' with conversion class 'java.time.Duration' does not support a value 
> literal of class 'java.math.BigDecimal'. at 
> org.apache.flink.table.expressions.ValueLiteralExpression.validateValueDataType(ValueLiteralExpression.java:286)
>  at 
> org.apache.flink.table.expressions.ValueLiteralExpression.(ValueLiteralExpression.java:79)
>  at 
> org.apache.flink.table.expressions.ApiExpressionUtils.valueLiteral(ApiExpressionUtils.java:251)
>  at 
> org.apache.flink.table.planner.plan.utils.RexNodeToExpressionConverter.visitLiteral(RexNodeExtractor.scala:432)
>  at 
> org.apache.flink.table.planner.plan.utils.RexNodeToExpressionConverter.visitLiteral(RexNodeExtractor.scala:340)
>  at org.apache.calcite.rex.RexLiteral.accept(RexLiteral.java:1173) at 
> org.apache.flink.table.planner.plan.utils.RexNodeToExpressionConverter$$anonfun$7.apply(RexNodeExtractor.scala:440)
>  at 
> org.apache.flink.table.planner.plan.utils.RexNodeToExpressionConverter$$anonfun$7.apply(RexNodeExtractor.scala:440)
>  at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>  at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>  at scala.collection.Iterator$class.foreach(Iterator.scala:893) at 
> scala.collection.AbstractIterator.foreach(Iterator.scala:1336) at 
> scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at 
> scala.collection.AbstractIterable.foreach(Iterable.scala:54) at 
> scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at 
> scala.collection.AbstractTraversable.map(Traversable.scala:104) at 
> org.apache.flink.table.planner.plan.utils.RexNodeToExpressionConverter.visitCall(RexNodeExtractor.scala:439)
>  at 
> org.apache.flink.table.planner.plan.utils.RexNodeToExpressionConverter.visitCall(RexNodeExtractor.scala:340)
>  at org.apache.calcite.rex.RexCall.accept(RexCall.java:174) at 
> org.apache.flink.table.planner.plan.utils.RexNodeToExpressionConverter$$anonfun$7.apply(RexNodeExtractor.scala:440)
>  at 
> org.apache.flink.table.planner.plan.utils.RexNodeToExpressionConverter$$anonfun$7.apply(RexNodeExtractor.scala:440)
>  at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>  at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>  at scala.collection.Iterator$class.foreach(Iterator.scala:893) at 
> scala.collection.AbstractIterator.foreach(Iterator.scala:1336) at 
> scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at 
> scala.collection.AbstractIterable.foreach(Iterable.scala:54) at 
> scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at 
> scala.collection.AbstractTraversable.map(Traversable.scala:104) at 
> org.apache.flink.table.planner.plan.utils.RexNodeToExpressionConverter.visitCall(RexNodeExtractor.scala:439)
>  at 
> org.apache.flink.table.planner.plan.utils.RexNodeToExpressionConverter.visitCall(RexNodeExtractor.scala:340)
>  at org.apache.calcite.rex.RexCall.accept(RexCall.java:174) at 
> org.apache.flink.table.planner.plan.utils.RexNodeToExpressionConverter$$anonfun$7.apply(RexNodeExtractor.scala:440)
>  at 
> org.apache.flink.table.planner.plan.utils.RexNodeToExpressionConverter$$anonfun$7.apply(RexNodeExtractor.scala:440)
>  at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>  at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>  at scala.collection.Iterator$class.foreach(Iterator.scala:893) at 
> 

[jira] [Updated] (FLINK-22263) Using TIMESTAMPADD function with partition value has some problem when push partition into TableSource

2021-04-13 Thread Kurt Young (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22263?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kurt Young updated FLINK-22263:
---
Affects Version/s: 1.12.2

> Using TIMESTAMPADD function with partition value has some problem  when push 
> partition into  TableSource
> 
>
> Key: FLINK-22263
> URL: https://issues.apache.org/jira/browse/FLINK-22263
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.12.2
>Reporter: hehuiyuan
>Priority: Major
>
> SQL (table api):
> {code:java}
> CREATE CATALOG myhive
> WITH (
> 'type' = 'hive',
> 'default-database' = 'hhy'
> );
> INSERT INTO  default_catalog.default_database.table_sink select * from  
> myhive.hhy.tmp_flink_test where dt=CAST(TIMESTAMPADD(DAY, -1, CURRENT_DATE) 
> as varchar);
> {code}
>  
> Error log:
> {code:java}
> Exception in thread "main" org.apache.flink.table.api.ValidationException: 
> Data type 'INTERVAL SECOND(3) NOT NULL' with conversion class 
> 'java.time.Duration' does not support a value literal of class 
> 'java.math.BigDecimal'.Exception in thread "main" 
> org.apache.flink.table.api.ValidationException: Data type 'INTERVAL SECOND(3) 
> NOT NULL' with conversion class 'java.time.Duration' does not support a value 
> literal of class 'java.math.BigDecimal'. at 
> org.apache.flink.table.expressions.ValueLiteralExpression.validateValueDataType(ValueLiteralExpression.java:286)
>  at 
> org.apache.flink.table.expressions.ValueLiteralExpression.(ValueLiteralExpression.java:79)
>  at 
> org.apache.flink.table.expressions.ApiExpressionUtils.valueLiteral(ApiExpressionUtils.java:251)
>  at 
> org.apache.flink.table.planner.plan.utils.RexNodeToExpressionConverter.visitLiteral(RexNodeExtractor.scala:432)
>  at 
> org.apache.flink.table.planner.plan.utils.RexNodeToExpressionConverter.visitLiteral(RexNodeExtractor.scala:340)
>  at org.apache.calcite.rex.RexLiteral.accept(RexLiteral.java:1173) at 
> org.apache.flink.table.planner.plan.utils.RexNodeToExpressionConverter$$anonfun$7.apply(RexNodeExtractor.scala:440)
>  at 
> org.apache.flink.table.planner.plan.utils.RexNodeToExpressionConverter$$anonfun$7.apply(RexNodeExtractor.scala:440)
>  at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>  at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>  at scala.collection.Iterator$class.foreach(Iterator.scala:893) at 
> scala.collection.AbstractIterator.foreach(Iterator.scala:1336) at 
> scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at 
> scala.collection.AbstractIterable.foreach(Iterable.scala:54) at 
> scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at 
> scala.collection.AbstractTraversable.map(Traversable.scala:104) at 
> org.apache.flink.table.planner.plan.utils.RexNodeToExpressionConverter.visitCall(RexNodeExtractor.scala:439)
>  at 
> org.apache.flink.table.planner.plan.utils.RexNodeToExpressionConverter.visitCall(RexNodeExtractor.scala:340)
>  at org.apache.calcite.rex.RexCall.accept(RexCall.java:174) at 
> org.apache.flink.table.planner.plan.utils.RexNodeToExpressionConverter$$anonfun$7.apply(RexNodeExtractor.scala:440)
>  at 
> org.apache.flink.table.planner.plan.utils.RexNodeToExpressionConverter$$anonfun$7.apply(RexNodeExtractor.scala:440)
>  at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>  at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>  at scala.collection.Iterator$class.foreach(Iterator.scala:893) at 
> scala.collection.AbstractIterator.foreach(Iterator.scala:1336) at 
> scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at 
> scala.collection.AbstractIterable.foreach(Iterable.scala:54) at 
> scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at 
> scala.collection.AbstractTraversable.map(Traversable.scala:104) at 
> org.apache.flink.table.planner.plan.utils.RexNodeToExpressionConverter.visitCall(RexNodeExtractor.scala:439)
>  at 
> org.apache.flink.table.planner.plan.utils.RexNodeToExpressionConverter.visitCall(RexNodeExtractor.scala:340)
>  at org.apache.calcite.rex.RexCall.accept(RexCall.java:174) at 
> org.apache.flink.table.planner.plan.utils.RexNodeToExpressionConverter$$anonfun$7.apply(RexNodeExtractor.scala:440)
>  at 
> org.apache.flink.table.planner.plan.utils.RexNodeToExpressionConverter$$anonfun$7.apply(RexNodeExtractor.scala:440)
>  at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>  at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>  at scala.collection.Iterator$class.foreach(Iterator.scala:893) at 
> scala.collection.AbstractIterator.foreach(Iterator.scala:1336) at 

[jira] [Updated] (FLINK-22065) Improve the error message when input invalid command in the sql client

2021-04-13 Thread Kurt Young (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22065?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kurt Young updated FLINK-22065:
---
Summary: Improve the error message when input invalid command in the sql 
client  (was: Beautify the parse error exception when input invalid command in 
the sql client)

> Improve the error message when input invalid command in the sql client
> --
>
> Key: FLINK-22065
> URL: https://issues.apache.org/jira/browse/FLINK-22065
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Client
>Affects Versions: 1.13.0
>Reporter: Shengkai Fang
>Assignee: Shengkai Fang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.13.0
>
>
> !https://static.dingtalk.com/media/lALPD26eOprT2ztwzQWg_1440_112.png_720x720g.jpg?renderWidth=1440=112=1=0=im!



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (FLINK-21748) LocalExecutorITCase.testBatchQueryExecutionMultipleTimes[Planner: old] fails

2021-04-12 Thread Kurt Young (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-21748?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kurt Young closed FLINK-21748.
--
Resolution: Fixed

fixed: b78ce3b9713c7b2792629d509a3951a96829012d

> LocalExecutorITCase.testBatchQueryExecutionMultipleTimes[Planner: old] fails
> 
>
> Key: FLINK-21748
> URL: https://issues.apache.org/jira/browse/FLINK-21748
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Client
>Affects Versions: 1.13.0
>Reporter: Dawid Wysakowicz
>Assignee: Shengkai Fang
>Priority: Major
>  Labels: pull-request-available, test-stability
> Fix For: 1.13.0
>
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=14520=logs=b2f046ab-ae17-5406-acdc-240be7e870e4=93e5ae06-d194-513d-ba8d-150ef6da1d7c=8800
> {code}
> [ERROR] testBatchQueryExecutionMultipleTimes[Planner: 
> old](org.apache.flink.table.client.gateway.local.LocalExecutorITCase)  Time 
> elapsed: 0.438 s  <<< ERROR!
> org.apache.flink.table.client.gateway.SqlExecutionException: Error while 
> retrieving result.
>   at 
> org.apache.flink.table.client.gateway.local.result.CollectResultBase$ResultRetrievalThread.run(CollectResultBase.java:79)
> Caused by: java.lang.NullPointerException
>   at 
> org.apache.flink.table.client.gateway.local.result.MaterializedCollectBatchResult.processRecord(MaterializedCollectBatchResult.java:48)
>   at 
> org.apache.flink.table.client.gateway.local.result.CollectResultBase$ResultRetrievalThread.run(CollectResultBase.java:76)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (FLINK-16584) Whether to support the long type field in table planner when the source is kafka and event time field's type is long

2021-04-12 Thread Kurt Young (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-16584?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kurt Young closed FLINK-16584.
--
Fix Version/s: 1.13.0
   Resolution: Fixed

> Whether to support  the long type field in table planner when the source is 
> kafka and event time field's type is long
> -
>
> Key: FLINK-16584
> URL: https://issues.apache.org/jira/browse/FLINK-16584
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / API
>Reporter: hehuiyuan
>Priority: Minor
> Fix For: 1.13.0
>
>
> For rowtime function , the field type may be long or timestamp .
> But the event time field type is only timestamp when use kafka connect.
> Some validations (for  example ,when create kafka table source   )  are not 
> allowed long.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-16584) Whether to support the long type field in table planner when the source is kafka and event time field's type is long

2021-04-12 Thread Kurt Young (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16584?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17319426#comment-17319426
 ] 

Kurt Young commented on FLINK-16584:


with 1.13, you can convert Long to TIMESTAMP_LTZ with builtin function 
TO_TIMESTAMP_LTZ. This type has builtin support with conversion between Long. I 
think it's enough to meet this requirement and I would close this issue.

> Whether to support  the long type field in table planner when the source is 
> kafka and event time field's type is long
> -
>
> Key: FLINK-16584
> URL: https://issues.apache.org/jira/browse/FLINK-16584
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / API
>Reporter: hehuiyuan
>Priority: Minor
>
> For rowtime function , the field type may be long or timestamp .
> But the event time field type is only timestamp when use kafka connect.
> Some validations (for  example ,when create kafka table source   )  are not 
> allowed long.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (FLINK-15404) How to insert hive table in streaming mode and blink planner

2021-04-12 Thread Kurt Young (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15404?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kurt Young closed FLINK-15404.
--
Resolution: Fixed

> How to insert hive table in streaming mode and blink planner
> 
>
> Key: FLINK-15404
> URL: https://issues.apache.org/jira/browse/FLINK-15404
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Reporter: hehuiyuan
>Priority: Major
>
> I have a hive catalog :
>  
> {code:java}
>     catalog name : myhive 
>     database : default
> {code}
>  
> and  the flink has a default catalog :     
>  
> {code:java}
>     catalog name : default_catalog
>     database : default_database
> {code}
>  
> For example :
> I have a source table 'source_table' that's from kafka   which is register to 
>  default_catalog,
> I want to insert hive table 'hive_table' that is from myhive catalog.
> SQL:
> insert into hive_table select * from source_table;
>  
> And if if the data of hive table has changed, flink is not found. 
> SQL:
> tableEnv.sqlUpdate("insert into myhive.`default`.stafftest select * from 
> default_catalog.default_database.source_table");



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-15404) How to insert hive table in streaming mode and blink planner

2021-04-12 Thread Kurt Young (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15404?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17319424#comment-17319424
 ] 

Kurt Young commented on FLINK-15404:


Closing this because streaming write to hive is already supported.

> How to insert hive table in streaming mode and blink planner
> 
>
> Key: FLINK-15404
> URL: https://issues.apache.org/jira/browse/FLINK-15404
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Reporter: hehuiyuan
>Priority: Major
>
> I have a hive catalog :
>  
> {code:java}
>     catalog name : myhive 
>     database : default
> {code}
>  
> and  the flink has a default catalog :     
>  
> {code:java}
>     catalog name : default_catalog
>     database : default_database
> {code}
>  
> For example :
> I have a source table 'source_table' that's from kafka   which is register to 
>  default_catalog,
> I want to insert hive table 'hive_table' that is from myhive catalog.
> SQL:
> insert into hive_table select * from source_table;
>  
> And if if the data of hive table has changed, flink is not found. 
> SQL:
> tableEnv.sqlUpdate("insert into myhive.`default`.stafftest select * from 
> default_catalog.default_database.source_table");



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-22075) Incorrect null outputs in left join

2021-04-12 Thread Kurt Young (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22075?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kurt Young updated FLINK-22075:
---
Fix Version/s: (was: 1.13.0)

> Incorrect null outputs in left join
> ---
>
> Key: FLINK-22075
> URL: https://issues.apache.org/jira/browse/FLINK-22075
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.12.2
> Environment: 
> https://github.com/jamii/streaming-consistency/blob/4e5d144dacf85e512bdc7afd77d031b5974d733e/pkgs.nix#L25-L46
> ```
> [nix-shell:~/streaming-consistency/flink]$ java -version
> openjdk version "1.8.0_265"
> OpenJDK Runtime Environment (build 1.8.0_265-ga)
> OpenJDK 64-Bit Server VM (build 25.265-bga, mixed mode)
> [nix-shell:~/streaming-consistency/flink]$ flink --version
> Version: 1.12.2, Commit ID: 4dedee0
> [nix-shell:~/streaming-consistency/flink]$ nix-info
> system: "x86_64-linux", multi-user?: yes, version: nix-env (Nix) 2.3.10, 
> channels(jamie): "", channels(root): "nixos-20.09.3554.f8929dce13e", nixpkgs: 
> /nix/var/nix/profiles/per-user/root/channels/nixos
> ```
>Reporter: Jamie Brandon
>Assignee: lincoln lee
>Priority: Critical
>
> I'm left joining a table with itself 
> [here](https://github.com/jamii/streaming-consistency/blob/4e5d144dacf85e512bdc7afd77d031b5974d733e/flink/src/main/java/Demo.java#L55-L66).
>  The output should have no nulls, or at least emit nulls and then retract 
> them. Instead I see:
> ```
> jamie@machine:~/streaming-consistency/flink$ wc -l tmp/outer_join_with_time
> 10 tmp/outer_join_with_time
> jamie@machine:~/streaming-consistency/flink$ grep -c insert 
> tmp/outer_join_with_time
> 10
> jamie@machine:~/streaming-consistency/flink$ grep -c 'null' 
> tmp/outer_join_with_time
> 16943
> ```
> ~17% of the outputs are incorrect and never retracted.
> [Full 
> output](https://gist.githubusercontent.com/jamii/983fee41609b1425fe7fa59d3249b249/raw/069b9dcd4faf9f6113114381bc7028c6642ca787/gistfile1.txt)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (FLINK-21162) FLINK SQL IF function semantic incorrect

2021-04-12 Thread Kurt Young (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-21162?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kurt Young closed FLINK-21162.
--
Resolution: Fixed

> FLINK SQL IF function semantic incorrect
> 
>
> Key: FLINK-21162
> URL: https://issues.apache.org/jira/browse/FLINK-21162
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API, Table SQL / Planner, Table SQL / Runtime
>Affects Versions: 1.12.1
>Reporter: 谢波
>Assignee: WeiNan Zhao
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.13.0
>
>
> I have a job using IF() test condition.
> when i use "IF(col = '', 'a', 'b'), WHEN col = '' return a ", it is ok.
> when i use "IF(col IS NULL, 'a', 'b'), WHEN col = NULL return a ", it is ok.
> when i use "IF(col = '' OR col IS NULL, 'a', 'b'),WHEN col = ''return a", it 
> is ok.
> when i use "IF(col = '' OR col IS NULL, 'a', 'b'),WHEN col = NULL return b" 
> ,that's amazing.
> The semantic seems like inccorrect.
>  
> my table ddl:
> CREATE TABLE print(a STRING, b STRING) WITH ( 'connector' = 'print' );
> my dml :
> INSERT INTO print
> SELECT return_flag,
>  IF(return_flag = '' OR return_flag IS NULL, 'N', 'Y') returnFlag
> FROM header;
> my test data:
> 3> +I(,N)
> 5> +I(X,Y)
> 1> +I(null,Y)
> 2> +I(,N)
> 5> +I(null,Y)
> 3> +I(,N)
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (FLINK-22015) SQL filter containing OR and IS NULL will produce an incorrect result.

2021-04-12 Thread Kurt Young (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22015?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kurt Young closed FLINK-22015.
--
Resolution: Fixed

fixed: a3be1cc5ed55d8d93a56fa22d3f3421ae6b65722

> SQL filter containing OR and IS NULL will produce an incorrect result.
> --
>
> Key: FLINK-22015
> URL: https://issues.apache.org/jira/browse/FLINK-22015
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.13.0
>Reporter: Caizhi Weng
>Assignee: Caizhi Weng
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.13.0
>
>
> Add the following test case to {{CalcITCase}} to reproduce this bug.
> {code:scala}
> @Test
> def myTest(): Unit = {
>   checkResult(
> """
>   |WITH myView AS (SELECT a, CASE
>   |  WHEN a = 1 THEN '1'
>   |  ELSE CAST(NULL AS STRING)
>   |  END AS s
>   |FROM SmallTable3)
>   |SELECT a FROM myView WHERE s = '2' OR s IS NULL
>   |""".stripMargin,
> Seq(row(2), row(3)))
> }
> {code}
> However if we remove the {{s = '2'}} the result will be correct.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-22201) Incorrect output for simple sql query

2021-04-11 Thread Kurt Young (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22201?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17319023#comment-17319023
 ] 

Kurt Young commented on FLINK-22201:


Yes, for streaming mode, we are expecting the final result finish at 0 when all 
the inputs are finished and stopped. For batch mode, we will expect one single 
output 0 after the sources are finished. They are *eventually consistent*. 

> Incorrect output for simple sql query
> -
>
> Key: FLINK-22201
> URL: https://issues.apache.org/jira/browse/FLINK-22201
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.12.2
> Environment: {code:bash}
> [nix-shell:~/streaming-consistency/flink]$ java -version
> openjdk version "1.8.0_265"
> OpenJDK Runtime Environment (build 1.8.0_265-ga)
> OpenJDK 64-Bit Server VM (build 25.265-bga, mixed mode)
> [nix-shell:~/streaming-consistency/flink]$ flink --version
> Version: 1.12.2, Commit ID: 4dedee0
> [nix-shell:~/streaming-consistency/flink]$ nix-info
> system: "x86_64-linux", multi-user?: yes, version: nix-env (Nix) 2.3.10, 
> channels(jamie): "", channels(root): "nixos-20.09.3554.f8929dce13e", nixpkgs: 
> /nix/var/nix/profiles/per-user/root/channels/nixos
> {code}
>Reporter: Jamie Brandon
>Priority: Major
>
> I'm running this simple query:
> {code:sql}
> CREATE VIEW credits AS
> SELECT
> to_account AS account, 
> sum(amount) AS credits
> FROM
> transactions
> GROUP BY
> to_account;
> CREATE VIEW debits AS
> SELECT
> from_account AS account, 
> sum(amount) AS debits
> FROM
> transactions
> GROUP BY
> from_account;
> CREATE VIEW balance AS
> SELECT
> credits.account AS account, 
> credits - debits AS balance
> FROM
> credits,
> debits
> WHERE
> credits.account = debits.account;
> CREATE VIEW total AS
> SELECT
> sum(balance)
> FROM
> balance;
> {code}
> The `total` view is a sanity check - it's value should always be 0 because 
> money is only moved from one account to another, never created or destroyed.
> In streaming mode (code 
> [here|https://github.com/jamii/streaming-consistency/tree/a0f3b9d7ba178a7e184e6cb60e597a302dc3dd86/flink-table])
>  only about ~0.04% of the output values are 0. The absolute error in the 
> outputs increases roughly linearly wrt to the number of input transactions. 
> But after the inputs are finished it does return to 0.
> In batch mode (code 
> [here|https://github.com/jamii/streaming-consistency/tree/d3288e27649174c7463829c726be514610bbd056/flink])
>  it produces 0 for a while but then has large jumps to incorrect outputs and 
> never returns to 0. In this run, the first ~44% of the outputs are correct 
> but the final answer is -48811 which amounts to miscounting ~5% of the inputs.
> I also run a variant of that query which joins on event time. In streaming 
> mode it produces similar results to the original. In batch mode only 2 out of 
> 1718375 outputs were correct and the final error was similar to the original 
> query.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-22201) Incorrect output for simple sql query

2021-04-11 Thread Kurt Young (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22201?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17319020#comment-17319020
 ] 

Kurt Young commented on FLINK-22201:


[~jamii] You didn't really enabled batch execution mode because you hard coded 
table environment with streaming mode in 

[https://github.com/jamii/streaming-consistency/blob/d3288e27649174c7463829c726be514610bbd056/flink/src/main/java/Demo.java#L22]

> Incorrect output for simple sql query
> -
>
> Key: FLINK-22201
> URL: https://issues.apache.org/jira/browse/FLINK-22201
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.12.2
> Environment: {code:bash}
> [nix-shell:~/streaming-consistency/flink]$ java -version
> openjdk version "1.8.0_265"
> OpenJDK Runtime Environment (build 1.8.0_265-ga)
> OpenJDK 64-Bit Server VM (build 25.265-bga, mixed mode)
> [nix-shell:~/streaming-consistency/flink]$ flink --version
> Version: 1.12.2, Commit ID: 4dedee0
> [nix-shell:~/streaming-consistency/flink]$ nix-info
> system: "x86_64-linux", multi-user?: yes, version: nix-env (Nix) 2.3.10, 
> channels(jamie): "", channels(root): "nixos-20.09.3554.f8929dce13e", nixpkgs: 
> /nix/var/nix/profiles/per-user/root/channels/nixos
> {code}
>Reporter: Jamie Brandon
>Priority: Major
>
> I'm running this simple query:
> {code:sql}
> CREATE VIEW credits AS
> SELECT
> to_account AS account, 
> sum(amount) AS credits
> FROM
> transactions
> GROUP BY
> to_account;
> CREATE VIEW debits AS
> SELECT
> from_account AS account, 
> sum(amount) AS debits
> FROM
> transactions
> GROUP BY
> from_account;
> CREATE VIEW balance AS
> SELECT
> credits.account AS account, 
> credits - debits AS balance
> FROM
> credits,
> debits
> WHERE
> credits.account = debits.account;
> CREATE VIEW total AS
> SELECT
> sum(balance)
> FROM
> balance;
> {code}
> The `total` view is a sanity check - it's value should always be 0 because 
> money is only moved from one account to another, never created or destroyed.
> In streaming mode (code 
> [here|https://github.com/jamii/streaming-consistency/tree/a0f3b9d7ba178a7e184e6cb60e597a302dc3dd86/flink-table])
>  only about ~0.04% of the output values are 0. The absolute error in the 
> outputs increases roughly linearly wrt to the number of input transactions. 
> But after the inputs are finished it does return to 0.
> In batch mode (code 
> [here|https://github.com/jamii/streaming-consistency/tree/d3288e27649174c7463829c726be514610bbd056/flink])
>  it produces 0 for a while but then has large jumps to incorrect outputs and 
> never returns to 0. In this run, the first ~44% of the outputs are correct 
> but the final answer is -48811 which amounts to miscounting ~5% of the inputs.
> I also run a variant of that query which joins on event time. In streaming 
> mode it produces similar results to the original. In batch mode only 2 out of 
> 1718375 outputs were correct and the final error was similar to the original 
> query.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-22100) BatchFileSystemITCaseBase.testPartialDynamicPartition fail because of no output for 900 seconds

2021-04-11 Thread Kurt Young (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22100?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17318998#comment-17318998
 ] 

Kurt Young commented on FLINK-22100:


[https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=16319=logs=fc5181b0-e452-5c8f-68de-1097947f6483=62110053-334f-5295-a0ab-80dd7e2babbf=23367]

and

[https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=16319=logs=fc5181b0-e452-5c8f-68de-1097947f6483=62110053-334f-5295-a0ab-80dd7e2babbf=23413]

> BatchFileSystemITCaseBase.testPartialDynamicPartition fail because of no 
> output for 900 seconds
> ---
>
> Key: FLINK-22100
> URL: https://issues.apache.org/jira/browse/FLINK-22100
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.13.0
>Reporter: Guowei Ma
>Assignee: Caizhi Weng
>Priority: Critical
>  Labels: test-stability
> Fix For: 1.13.0
>
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=15988=logs=d44f43ce-542c-597d-bf94-b0718c71e5e8=03dca39c-73e8-5aaf-601d-328ae5c35f20=16696
> {code:java}
> "main" #1 prio=5 os_prio=0 tid=0x7f844400b800 nid=0x7b96 waiting on 
> condition [0x7f844be37000]
>java.lang.Thread.State: TIMED_WAITING (sleeping)
>   at java.lang.Thread.sleep(Native Method)
>   at 
> org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.sleepBeforeRetry(CollectResultFetcher.java:229)
>   at 
> org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.next(CollectResultFetcher.java:111)
>   at 
> org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:106)
>   at 
> org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:80)
>   at 
> org.apache.flink.table.planner.connectors.SelectTableSinkBase$RowIteratorWrapper.hasNext(SelectTableSinkBase.java:117)
>   at 
> org.apache.flink.table.api.internal.TableResultImpl$CloseableRowIteratorWrapper.hasNext(TableResultImpl.java:351)
>   at java.util.Iterator.forEachRemaining(Iterator.java:115)
>   at 
> org.apache.flink.util.CollectionUtil.iteratorToList(CollectionUtil.java:109)
>   at 
> org.apache.flink.table.planner.runtime.utils.BatchTestBase.executeQuery(BatchTestBase.scala:300)
>   at 
> org.apache.flink.table.planner.runtime.utils.BatchTestBase.check(BatchTestBase.scala:140)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (FLINK-22193) HiveTableSinkITCase.testNonPartStreamingMrWrite fail

2021-04-11 Thread Kurt Young (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22193?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kurt Young closed FLINK-22193.
--
Resolution: Duplicate

> HiveTableSinkITCase.testNonPartStreamingMrWrite fail
> 
>
> Key: FLINK-22193
> URL: https://issues.apache.org/jira/browse/FLINK-22193
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Hive
>Affects Versions: 1.13.0
>Reporter: Guowei Ma
>Priority: Major
>  Labels: test-stability
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=16319=logs=fc5181b0-e452-5c8f-68de-1097947f6483=62110053-334f-5295-a0ab-80dd7e2babbf=23413



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (FLINK-22192) HiveTableSinkITCase.testDefaultSerPartStreamingWrite fail

2021-04-11 Thread Kurt Young (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22192?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kurt Young closed FLINK-22192.
--
Resolution: Duplicate

> HiveTableSinkITCase.testDefaultSerPartStreamingWrite fail
> -
>
> Key: FLINK-22192
> URL: https://issues.apache.org/jira/browse/FLINK-22192
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Hive
>Affects Versions: 1.13.0
>Reporter: Guowei Ma
>Priority: Major
>  Labels: test-stability
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=16319=logs=fc5181b0-e452-5c8f-68de-1097947f6483=62110053-334f-5295-a0ab-80dd7e2babbf=23367



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-22189) Flink SQL Client Hudi batch write crashes JobManager

2021-04-11 Thread Kurt Young (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22189?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kurt Young updated FLINK-22189:
---
Priority: Major  (was: Blocker)

> Flink SQL Client Hudi batch write crashes JobManager
> 
>
> Key: FLINK-22189
> URL: https://issues.apache.org/jira/browse/FLINK-22189
> Project: Flink
>  Issue Type: Bug
>  Components: Client / Job Submission
>Affects Versions: 1.11.3
>Reporter: Phil Chen
>Priority: Major
> Attachments: 
> flink-pchen-standalonesession-0-Phils-MBP.fios-router.home.log
>
>
> Flink SQL> create table t2(
> > uuid varchar(20),
> > name varchar(10),
> > age int,
> > ts timestamp(3),
> > `partition` varchar(20)
> > )
> > PARTITIONED BY (`partition`)
> > with (
> > 'connector' = 'hudi',
> > 'path' = 'file:///tmp/hudi/t2'
> > );
> [INFO] Table has been created.
> Flink SQL> insert into t2 values
> > ('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1'),
> > ('id2','Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par1'),
> > ('id3','Julian',53,TIMESTAMP '1970-01-01 00:00:03','par2'),
> > ('id4','Fabian',31,TIMESTAMP '1970-01-01 00:00:04','par2'),
> > ('id5','Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par3'),
> > ('id6','Emma',20,TIMESTAMP '1970-01-01 00:00:06','par3'),
> > ('id7','Bob',44,TIMESTAMP '1970-01-01 00:00:07','par4'),
> > ('id8','Han',56,TIMESTAMP '1970-01-01 00:00:08','par4');
> [INFO] Submitting SQL update statement to the cluster...
> [INFO] Table update statement has been successfully submitted to the cluster:
> Job ID: f5828dfb80550b82f9a2f15afe2439a0
>  
> Check flink dashboard at [http://localhost:8081|http://localhost:8081/]
> JM crashed.
>  
> Hadoop version: 3.2.2
> Apache Hudi version:
> lib/hudi-flink-bundle_2.11-0.8.0.jar
> See attached JobManager log.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-22201) Incorrect output for simple sql query

2021-04-11 Thread Kurt Young (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22201?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17318974#comment-17318974
 ] 

Kurt Young commented on FLINK-22201:


[~jamii] Thanks for the reporting. Could you provide some example data that can 
help us finding the bug? The query is just too simple that I can't recall any 
potential bug around it. 

> Incorrect output for simple sql query
> -
>
> Key: FLINK-22201
> URL: https://issues.apache.org/jira/browse/FLINK-22201
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.12.2
> Environment: {code:bash}
> [nix-shell:~/streaming-consistency/flink]$ java -version
> openjdk version "1.8.0_265"
> OpenJDK Runtime Environment (build 1.8.0_265-ga)
> OpenJDK 64-Bit Server VM (build 25.265-bga, mixed mode)
> [nix-shell:~/streaming-consistency/flink]$ flink --version
> Version: 1.12.2, Commit ID: 4dedee0
> [nix-shell:~/streaming-consistency/flink]$ nix-info
> system: "x86_64-linux", multi-user?: yes, version: nix-env (Nix) 2.3.10, 
> channels(jamie): "", channels(root): "nixos-20.09.3554.f8929dce13e", nixpkgs: 
> /nix/var/nix/profiles/per-user/root/channels/nixos
> {code}
>Reporter: Jamie Brandon
>Priority: Major
>
> I'm running this simple query:
> {code:sql}
> CREATE VIEW credits AS
> SELECT
> to_account AS account, 
> sum(amount) AS credits
> FROM
> transactions
> GROUP BY
> to_account;
> CREATE VIEW debits AS
> SELECT
> from_account AS account, 
> sum(amount) AS debits
> FROM
> transactions
> GROUP BY
> from_account;
> CREATE VIEW balance AS
> SELECT
> credits.account AS account, 
> credits - debits AS balance
> FROM
> credits,
> debits
> WHERE
> credits.account = debits.account;
> CREATE VIEW total AS
> SELECT
> sum(balance)
> FROM
> balance;
> {code}
> The `total` view is a sanity check - it's value should always be 0 because 
> money is only moved from one account to another, never created or destroyed.
> In streaming mode (code 
> [here|https://github.com/jamii/streaming-consistency/tree/a0f3b9d7ba178a7e184e6cb60e597a302dc3dd86/flink-table])
>  only about ~0.04% of the output values are 0. The absolute error in the 
> outputs increases roughly linearly wrt to the number of input transactions. 
> But after the inputs are finished it does return to 0.
> In batch mode (code 
> [here|https://github.com/jamii/streaming-consistency/tree/d3288e27649174c7463829c726be514610bbd056/flink])
>  it produces 0 for a while but then has large jumps to incorrect outputs and 
> never returns to 0. In this run, the first ~44% of the outputs are correct 
> but the final answer is -48811 which amounts to miscounting ~5% of the inputs.
> I also run a variant of that query which joins on event time. In streaming 
> mode it produces similar results to the original. In batch mode only 2 out of 
> 1718375 outputs were correct and the final error was similar to the original 
> query.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (FLINK-22178) Support ignore-first-line option in new csv format

2021-04-10 Thread Kurt Young (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22178?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kurt Young reassigned FLINK-22178:
--

Assignee: jinfeng

> Support ignore-first-line option in new csv format
> --
>
> Key: FLINK-22178
> URL: https://issues.apache.org/jira/browse/FLINK-22178
> Project: Flink
>  Issue Type: New Feature
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>Affects Versions: 1.13.0
>Reporter: Kurt Young
>Assignee: jinfeng
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


  1   2   3   4   5   6   7   8   9   10   >