[jira] [Updated] (FLINK-29090) Fix the code gen for ColumnarMapData and ColumnarArrayData

2022-08-23 Thread Danny Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29090?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Danny Chen updated FLINK-29090:
---
Description: 
!image-2022-08-24-10-15-11-824.png|width=589,height=284!

Currently, the code generation for {{MapData}} assumes that it is the 
{{{}GenericMapData{}}}, but the new introduced {{ColumnarMapData}} and 
{{ColumnarArrayData}} can not be casted to {{{}GenericMapData{}}}.

{{ColumnarMapData}} and {{ColumnarArrayData}} are introduced in
FLINK-24614 
[https://github.com/apache/flink/commit/5c731a37e1a8f71f9c9e813f6c741a1e203fa1a3]

How to reproduce:
{code:sql}
create table parquet_source (
  f_map map
) with (
  'connector' = 'filesystem',
  'format' = 'parquet'
);

select f_map['k1'] from table parquet_source;

{code}

  was:
!image-2022-08-24-10-15-11-824.png|width=589,height=284!

Currently, the code generation for {{MapData}} assumes that it is the 
{{{}GenericMapData{}}}, but the new introduced {{ColumnarMapData}} and 
{{ColumnarArrayData}} can not be casted to {{{}GenericMapData{}}}.

{{ColumnarMapData}} and {{ColumnarArrayData}} are introduced in
FLINK-24614 
[https://github.com/apache/flink/commit/5c731a37e1a8f71f9c9e813f6c741a1e203fa1a3]
introduces,

How to reproduce:
{code:sql}
create table parquet_source (
  f_map map
) with (
  'connector' = 'filesystem',
  'format' = 'parquet'
);

select f_map['k1'] from table parquet_source;

{code}


> Fix the code gen for ColumnarMapData and ColumnarArrayData
> --
>
> Key: FLINK-29090
> URL: https://issues.apache.org/jira/browse/FLINK-29090
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.16.0
>Reporter: Danny Chen
>Priority: Major
> Fix For: 1.16.0
>
> Attachments: image-2022-08-24-10-15-11-824.png
>
>
> !image-2022-08-24-10-15-11-824.png|width=589,height=284!
> Currently, the code generation for {{MapData}} assumes that it is the 
> {{{}GenericMapData{}}}, but the new introduced {{ColumnarMapData}} and 
> {{ColumnarArrayData}} can not be casted to {{{}GenericMapData{}}}.
> {{ColumnarMapData}} and {{ColumnarArrayData}} are introduced in
> FLINK-24614 
> [https://github.com/apache/flink/commit/5c731a37e1a8f71f9c9e813f6c741a1e203fa1a3]
> How to reproduce:
> {code:sql}
> create table parquet_source (
>   f_map map
> ) with (
>   'connector' = 'filesystem',
>   'format' = 'parquet'
> );
> select f_map['k1'] from table parquet_source;
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-29090) Fix the code gen for ColumnarMapData and ColumnarArrayData

2022-08-23 Thread Danny Chen (Jira)
Danny Chen created FLINK-29090:
--

 Summary: Fix the code gen for ColumnarMapData and ColumnarArrayData
 Key: FLINK-29090
 URL: https://issues.apache.org/jira/browse/FLINK-29090
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Runtime
Affects Versions: 1.16.0
Reporter: Danny Chen
 Fix For: 1.16.0
 Attachments: image-2022-08-24-10-15-11-824.png

!image-2022-08-24-10-15-11-824.png|width=589,height=284!

Currently, the code generation for {{MapData}} assumes that it is the 
{{{}GenericMapData{}}}, but the new introduced {{ColumnarMapData}} and 
{{ColumnarArrayData}} can not be casted to {{{}GenericMapData{}}}.

{{ColumnarMapData}} and {{ColumnarArrayData}} are introduced in
FLINK-24614 
[https://github.com/apache/flink/commit/5c731a37e1a8f71f9c9e813f6c741a1e203fa1a3]
introduces,

How to reproduce:
{code:sql}
create table parquet_source (
  f_map map
) with (
  'connector' = 'filesystem',
  'format' = 'parquet'
);

select f_map['k1'] from table parquet_source;

{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-22370) ParquetColumnarRowSplitReader#reachedEnd() returns false after it returns true

2021-04-20 Thread Danny Chen (Jira)
Danny Chen created FLINK-22370:
--

 Summary: ParquetColumnarRowSplitReader#reachedEnd() returns false 
after it returns true
 Key: FLINK-22370
 URL: https://issues.apache.org/jira/browse/FLINK-22370
 Project: Flink
  Issue Type: Bug
  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
Affects Versions: 1.12.2, 1.13.1
Reporter: Danny Chen
 Fix For: 1.13.1


{{ParquetColumnarRowSplitReader#reachedEnd()}} should always return true after 
it first time returns true.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-21520) ParquetInputFormat#setfilterPredicate() does not work

2021-03-15 Thread Danny Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-21520?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17301550#comment-17301550
 ] 

Danny Chen commented on FLINK-21520:


Thanks, i guess in your query, CPU filtering is not the resource consuming 
bottleneck.

> ParquetInputFormat#setfilterPredicate() does not work
> -
>
> Key: FLINK-21520
> URL: https://issues.apache.org/jira/browse/FLINK-21520
> Project: Flink
>  Issue Type: Bug
>Reporter: Etienne Chauchot
>Priority: Major
>
> Simplified code:
> {code:java}
>  FilterPredicate filterPredicate = eq(intColumn("intField"), 10);
> parquetInputFormat.setFilterPredicate(filterPredicate);
> env.createInput(parquetInputFormat);
> {code}
>
> produces no records whereas 
> {code:java}
> env.createInput(parquetInputFormat)
> .filter((FilterFunction) value ->  value.get("intField") == 10);
> {code}
> produces records.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-21520) ParquetInputFormat#setfilterPredicate() does not work

2021-03-14 Thread Danny Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-21520?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17301357#comment-17301357
 ] 

Danny Chen commented on FLINK-21520:


That's also a good news, how did you get this conclusion "Performances are 
almost the same" ?

> ParquetInputFormat#setfilterPredicate() does not work
> -
>
> Key: FLINK-21520
> URL: https://issues.apache.org/jira/browse/FLINK-21520
> Project: Flink
>  Issue Type: Bug
>Reporter: Etienne Chauchot
>Priority: Major
>
> Simplified code:
> {code:java}
>  FilterPredicate filterPredicate = eq(intColumn("intField"), 10);
> parquetInputFormat.setFilterPredicate(filterPredicate);
> env.createInput(parquetInputFormat);
> {code}
>
> produces no records whereas 
> {code:java}
> env.createInput(parquetInputFormat)
> .filter((FilterFunction) value ->  value.get("intField") == 10);
> {code}
> produces records.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-21520) ParquetInputFormat#setfilterPredicate() does not work

2021-03-11 Thread Danny Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-21520?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17300105#comment-17300105
 ] 

Danny Chen commented on FLINK-21520:


I'm working on Flink on Apache HUDI now, and we use the Flink parquet format, 
in order to make time travel query, we need the filter expression push down for 
the Parquet format, which is ignored in current code base.

I'm glad that you need this feature too, can you contribute for the code ? I 
can help for the code review ~

> ParquetInputFormat#setfilterPredicate() does not work
> -
>
> Key: FLINK-21520
> URL: https://issues.apache.org/jira/browse/FLINK-21520
> Project: Flink
>  Issue Type: Bug
>Reporter: Etienne Chauchot
>Priority: Major
>
> Simplified code:
> {code:java}
>  FilterPredicate filterPredicate = eq(intColumn("intField"), 10);
> parquetInputFormat.setFilterPredicate(filterPredicate);
> env.createInput(parquetInputFormat);
> {code}
>
> produces no records whereas 
> {code:java}
> env.createInput(parquetInputFormat)
> .filter((FilterFunction) value ->  value.get("intField") == 10);
> {code}
> produces records.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-20942) Digest of FLOAT literals throws UnsupportedOperationException

2021-01-28 Thread Danny Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-20942?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17274208#comment-17274208
 ] 

Danny Chen commented on FLINK-20942:


Thanks [~twalthr], i have fired a fix via 
https://github.com/apache/flink/pull/14801

> Digest of FLOAT literals throws UnsupportedOperationException
> -
>
> Key: FLINK-20942
> URL: https://issues.apache.org/jira/browse/FLINK-20942
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Reporter: Timo Walther
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.13.0, 1.12.2
>
>
> The recent refactoring of Calcite's digests might have caused a regression 
> for FLOAT literals. {{org.apache.calcite.rex.RexLiteral#appendAsJava}} throws 
> a UnsupportedOperationException for the following query:
> {code}
> def main(args: Array[String]): Unit = {
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> val source = env.fromElements(
>   (1.0f, 11.0f, 12.0f),
>   (2.0f, 21.0f, 22.0f),
>   (3.0f, 31.0f, 32.0f),
>   (4.0f, 41.0f, 42.0f),
>   (5.0f, 51.0f, 52.0f)
> )
> val settings = EnvironmentSettings.newInstance()
> .inStreamingMode()
> .useBlinkPlanner()
> .build()
> val tEnv = StreamTableEnvironment.create(env, settings)
> tEnv.createTemporaryView("myTable", source, $("id"), $("f1"), $("f2"))
> val query =
>   """
> |select * from myTable where id in (1.0, 2.0, 3.0)
> |""".stripMargin
> tEnv.executeSql(query).print()
> }
> {code}
> Stack trace:
> {code}
> Exception in thread "main" java.lang.UnsupportedOperationException: class 
> org.apache.calcite.sql.type.SqlTypeName: FLOAT
>   at org.apache.calcite.util.Util.needToImplement(Util.java:1075)
>   at org.apache.calcite.rex.RexLiteral.appendAsJava(RexLiteral.java:703)
>   at org.apache.calcite.rex.RexLiteral.toJavaString(RexLiteral.java:408)
>   at org.apache.calcite.rex.RexLiteral.computeDigest(RexLiteral.java:276)
>   at org.apache.calcite.rex.RexLiteral.(RexLiteral.java:223)
>   at org.apache.calcite.rex.RexLiteral.toLiteral(RexLiteral.java:737)
>   at 
> org.apache.calcite.rex.RexLiteral.lambda$printSarg$4(RexLiteral.java:710)
>   at 
> org.apache.calcite.util.RangeSets$Printer.singleton(RangeSets.java:397)
>   at org.apache.calcite.util.RangeSets.forEach(RangeSets.java:237)
>   at org.apache.calcite.util.Sarg.lambda$printTo$0(Sarg.java:110)
>   at org.apache.calcite.linq4j.Ord.forEach(Ord.java:157)
>   at org.apache.calcite.util.Sarg.printTo(Sarg.java:106)
>   at org.apache.calcite.rex.RexLiteral.printSarg(RexLiteral.java:709)
>   at 
> org.apache.calcite.rex.RexLiteral.lambda$appendAsJava$1(RexLiteral.java:652)
>   at org.apache.calcite.util.Util.asStringBuilder(Util.java:2502)
>   at org.apache.calcite.rex.RexLiteral.appendAsJava(RexLiteral.java:651)
>   at org.apache.calcite.rex.RexLiteral.toJavaString(RexLiteral.java:408)
>   at org.apache.calcite.rex.RexLiteral.computeDigest(RexLiteral.java:276)
>   at org.apache.calcite.rex.RexLiteral.(RexLiteral.java:223)
>   at org.apache.calcite.rex.RexBuilder.makeLiteral(RexBuilder.java:971)
>   at 
> org.apache.calcite.rex.RexBuilder.makeSearchArgumentLiteral(RexBuilder.java:1066)
>   at 
> org.apache.calcite.rex.RexSimplify$SargCollector.fix(RexSimplify.java:2786)
>   at 
> org.apache.calcite.rex.RexSimplify.lambda$simplifyOrs$6(RexSimplify.java:1843)
>   at java.util.ArrayList.forEach(ArrayList.java:1257)
>   at org.apache.calcite.rex.RexSimplify.simplifyOrs(RexSimplify.java:1843)
>   at org.apache.calcite.rex.RexSimplify.simplifyOr(RexSimplify.java:1817)
>   at org.apache.calcite.rex.RexSimplify.simplify(RexSimplify.java:313)
>   at 
> org.apache.calcite.rex.RexSimplify.simplifyUnknownAs(RexSimplify.java:282)
>   at org.apache.calcite.rex.RexSimplify.simplify(RexSimplify.java:257)
>   at 
> org.apache.flink.table.planner.plan.utils.FlinkRexUtil$.simplify(FlinkRexUtil.scala:213)
>   at 
> org.apache.flink.table.planner.plan.rules.logical.SimplifyFilterConditionRule.simplify(SimplifyFilterConditionRule.scala:63)
>   at 
> org.apache.flink.table.planner.plan.rules.logical.SimplifyFilterConditionRule.onMatch(SimplifyFilterConditionRule.scala:46)
>   at 
> org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:333)
>   at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:542)
>   at 
> org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:407)
>   at 
> org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:243)
>   at 
> 

[jira] [Comment Edited] (FLINK-20942) Digest of FLOAT literals throws UnsupportedOperationException

2021-01-27 Thread Danny Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-20942?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17272647#comment-17272647
 ] 

Danny Chen edited comment on FLINK-20942 at 1/27/21, 8:05 AM:
--

I have fired a fix in CALCITE-4479 but i have no idea how to fix quickly in 
Flink side, should we copy the {{RexLiteral}} then ? The class is huge ..


was (Author: danny0405):
I have fired a fix in CALCITE-4479 but i have no idea how to fix quickly in 
Flink side, should we copy the {{RexLiteral }} then ? The class is huge ..

> Digest of FLOAT literals throws UnsupportedOperationException
> -
>
> Key: FLINK-20942
> URL: https://issues.apache.org/jira/browse/FLINK-20942
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Reporter: Timo Walther
>Priority: Critical
> Fix For: 1.13.0, 1.12.2
>
>
> The recent refactoring of Calcite's digests might have caused a regression 
> for FLOAT literals. {{org.apache.calcite.rex.RexLiteral#appendAsJava}} throws 
> a UnsupportedOperationException for the following query:
> {code}
> def main(args: Array[String]): Unit = {
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> val source = env.fromElements(
>   (1.0f, 11.0f, 12.0f),
>   (2.0f, 21.0f, 22.0f),
>   (3.0f, 31.0f, 32.0f),
>   (4.0f, 41.0f, 42.0f),
>   (5.0f, 51.0f, 52.0f)
> )
> val settings = EnvironmentSettings.newInstance()
> .inStreamingMode()
> .useBlinkPlanner()
> .build()
> val tEnv = StreamTableEnvironment.create(env, settings)
> tEnv.createTemporaryView("myTable", source, $("id"), $("f1"), $("f2"))
> val query =
>   """
> |select * from myTable where id in (1.0, 2.0, 3.0)
> |""".stripMargin
> tEnv.executeSql(query).print()
> }
> {code}
> Stack trace:
> {code}
> Exception in thread "main" java.lang.UnsupportedOperationException: class 
> org.apache.calcite.sql.type.SqlTypeName: FLOAT
>   at org.apache.calcite.util.Util.needToImplement(Util.java:1075)
>   at org.apache.calcite.rex.RexLiteral.appendAsJava(RexLiteral.java:703)
>   at org.apache.calcite.rex.RexLiteral.toJavaString(RexLiteral.java:408)
>   at org.apache.calcite.rex.RexLiteral.computeDigest(RexLiteral.java:276)
>   at org.apache.calcite.rex.RexLiteral.(RexLiteral.java:223)
>   at org.apache.calcite.rex.RexLiteral.toLiteral(RexLiteral.java:737)
>   at 
> org.apache.calcite.rex.RexLiteral.lambda$printSarg$4(RexLiteral.java:710)
>   at 
> org.apache.calcite.util.RangeSets$Printer.singleton(RangeSets.java:397)
>   at org.apache.calcite.util.RangeSets.forEach(RangeSets.java:237)
>   at org.apache.calcite.util.Sarg.lambda$printTo$0(Sarg.java:110)
>   at org.apache.calcite.linq4j.Ord.forEach(Ord.java:157)
>   at org.apache.calcite.util.Sarg.printTo(Sarg.java:106)
>   at org.apache.calcite.rex.RexLiteral.printSarg(RexLiteral.java:709)
>   at 
> org.apache.calcite.rex.RexLiteral.lambda$appendAsJava$1(RexLiteral.java:652)
>   at org.apache.calcite.util.Util.asStringBuilder(Util.java:2502)
>   at org.apache.calcite.rex.RexLiteral.appendAsJava(RexLiteral.java:651)
>   at org.apache.calcite.rex.RexLiteral.toJavaString(RexLiteral.java:408)
>   at org.apache.calcite.rex.RexLiteral.computeDigest(RexLiteral.java:276)
>   at org.apache.calcite.rex.RexLiteral.(RexLiteral.java:223)
>   at org.apache.calcite.rex.RexBuilder.makeLiteral(RexBuilder.java:971)
>   at 
> org.apache.calcite.rex.RexBuilder.makeSearchArgumentLiteral(RexBuilder.java:1066)
>   at 
> org.apache.calcite.rex.RexSimplify$SargCollector.fix(RexSimplify.java:2786)
>   at 
> org.apache.calcite.rex.RexSimplify.lambda$simplifyOrs$6(RexSimplify.java:1843)
>   at java.util.ArrayList.forEach(ArrayList.java:1257)
>   at org.apache.calcite.rex.RexSimplify.simplifyOrs(RexSimplify.java:1843)
>   at org.apache.calcite.rex.RexSimplify.simplifyOr(RexSimplify.java:1817)
>   at org.apache.calcite.rex.RexSimplify.simplify(RexSimplify.java:313)
>   at 
> org.apache.calcite.rex.RexSimplify.simplifyUnknownAs(RexSimplify.java:282)
>   at org.apache.calcite.rex.RexSimplify.simplify(RexSimplify.java:257)
>   at 
> org.apache.flink.table.planner.plan.utils.FlinkRexUtil$.simplify(FlinkRexUtil.scala:213)
>   at 
> org.apache.flink.table.planner.plan.rules.logical.SimplifyFilterConditionRule.simplify(SimplifyFilterConditionRule.scala:63)
>   at 
> org.apache.flink.table.planner.plan.rules.logical.SimplifyFilterConditionRule.onMatch(SimplifyFilterConditionRule.scala:46)
>   at 
> org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:333)
>   at 

[jira] [Commented] (FLINK-20942) Digest of FLOAT literals throws UnsupportedOperationException

2021-01-27 Thread Danny Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-20942?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17272647#comment-17272647
 ] 

Danny Chen commented on FLINK-20942:


I have fired a fix in CALCITE-4479 but i have no idea how to fix quickly in 
Flink side, should we copy the {{RexLiteral }} then ? The class is huge ..

> Digest of FLOAT literals throws UnsupportedOperationException
> -
>
> Key: FLINK-20942
> URL: https://issues.apache.org/jira/browse/FLINK-20942
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Reporter: Timo Walther
>Priority: Critical
> Fix For: 1.13.0, 1.12.2
>
>
> The recent refactoring of Calcite's digests might have caused a regression 
> for FLOAT literals. {{org.apache.calcite.rex.RexLiteral#appendAsJava}} throws 
> a UnsupportedOperationException for the following query:
> {code}
> def main(args: Array[String]): Unit = {
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> val source = env.fromElements(
>   (1.0f, 11.0f, 12.0f),
>   (2.0f, 21.0f, 22.0f),
>   (3.0f, 31.0f, 32.0f),
>   (4.0f, 41.0f, 42.0f),
>   (5.0f, 51.0f, 52.0f)
> )
> val settings = EnvironmentSettings.newInstance()
> .inStreamingMode()
> .useBlinkPlanner()
> .build()
> val tEnv = StreamTableEnvironment.create(env, settings)
> tEnv.createTemporaryView("myTable", source, $("id"), $("f1"), $("f2"))
> val query =
>   """
> |select * from myTable where id in (1.0, 2.0, 3.0)
> |""".stripMargin
> tEnv.executeSql(query).print()
> }
> {code}
> Stack trace:
> {code}
> Exception in thread "main" java.lang.UnsupportedOperationException: class 
> org.apache.calcite.sql.type.SqlTypeName: FLOAT
>   at org.apache.calcite.util.Util.needToImplement(Util.java:1075)
>   at org.apache.calcite.rex.RexLiteral.appendAsJava(RexLiteral.java:703)
>   at org.apache.calcite.rex.RexLiteral.toJavaString(RexLiteral.java:408)
>   at org.apache.calcite.rex.RexLiteral.computeDigest(RexLiteral.java:276)
>   at org.apache.calcite.rex.RexLiteral.(RexLiteral.java:223)
>   at org.apache.calcite.rex.RexLiteral.toLiteral(RexLiteral.java:737)
>   at 
> org.apache.calcite.rex.RexLiteral.lambda$printSarg$4(RexLiteral.java:710)
>   at 
> org.apache.calcite.util.RangeSets$Printer.singleton(RangeSets.java:397)
>   at org.apache.calcite.util.RangeSets.forEach(RangeSets.java:237)
>   at org.apache.calcite.util.Sarg.lambda$printTo$0(Sarg.java:110)
>   at org.apache.calcite.linq4j.Ord.forEach(Ord.java:157)
>   at org.apache.calcite.util.Sarg.printTo(Sarg.java:106)
>   at org.apache.calcite.rex.RexLiteral.printSarg(RexLiteral.java:709)
>   at 
> org.apache.calcite.rex.RexLiteral.lambda$appendAsJava$1(RexLiteral.java:652)
>   at org.apache.calcite.util.Util.asStringBuilder(Util.java:2502)
>   at org.apache.calcite.rex.RexLiteral.appendAsJava(RexLiteral.java:651)
>   at org.apache.calcite.rex.RexLiteral.toJavaString(RexLiteral.java:408)
>   at org.apache.calcite.rex.RexLiteral.computeDigest(RexLiteral.java:276)
>   at org.apache.calcite.rex.RexLiteral.(RexLiteral.java:223)
>   at org.apache.calcite.rex.RexBuilder.makeLiteral(RexBuilder.java:971)
>   at 
> org.apache.calcite.rex.RexBuilder.makeSearchArgumentLiteral(RexBuilder.java:1066)
>   at 
> org.apache.calcite.rex.RexSimplify$SargCollector.fix(RexSimplify.java:2786)
>   at 
> org.apache.calcite.rex.RexSimplify.lambda$simplifyOrs$6(RexSimplify.java:1843)
>   at java.util.ArrayList.forEach(ArrayList.java:1257)
>   at org.apache.calcite.rex.RexSimplify.simplifyOrs(RexSimplify.java:1843)
>   at org.apache.calcite.rex.RexSimplify.simplifyOr(RexSimplify.java:1817)
>   at org.apache.calcite.rex.RexSimplify.simplify(RexSimplify.java:313)
>   at 
> org.apache.calcite.rex.RexSimplify.simplifyUnknownAs(RexSimplify.java:282)
>   at org.apache.calcite.rex.RexSimplify.simplify(RexSimplify.java:257)
>   at 
> org.apache.flink.table.planner.plan.utils.FlinkRexUtil$.simplify(FlinkRexUtil.scala:213)
>   at 
> org.apache.flink.table.planner.plan.rules.logical.SimplifyFilterConditionRule.simplify(SimplifyFilterConditionRule.scala:63)
>   at 
> org.apache.flink.table.planner.plan.rules.logical.SimplifyFilterConditionRule.onMatch(SimplifyFilterConditionRule.scala:46)
>   at 
> org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:333)
>   at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:542)
>   at 
> org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:407)
>   at 
> org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:243)
>   at 
> 

[jira] [Commented] (FLINK-20942) Digest of FLOAT literals throws UnsupportedOperationException

2021-01-22 Thread Danny Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-20942?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17270032#comment-17270032
 ] 

Danny Chen commented on FLINK-20942:


Hmm, the root cause is that the new introduced `Sarg` tries to make literals 
recursively, for this example, it tries to create `RexLiteral`s with `FLOAT` as 
type name which is not possible from the normally SQL context, so, this expects 
to be a bug, let me see how to fix it quickly in Flink side.

> Digest of FLOAT literals throws UnsupportedOperationException
> -
>
> Key: FLINK-20942
> URL: https://issues.apache.org/jira/browse/FLINK-20942
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Reporter: Timo Walther
>Priority: Critical
> Fix For: 1.13.0, 1.12.2
>
>
> The recent refactoring of Calcite's digests might have caused a regression 
> for FLOAT literals. {{org.apache.calcite.rex.RexLiteral#appendAsJava}} throws 
> a UnsupportedOperationException for the following query:
> {code}
> def main(args: Array[String]): Unit = {
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> val source = env.fromElements(
>   (1.0f, 11.0f, 12.0f),
>   (2.0f, 21.0f, 22.0f),
>   (3.0f, 31.0f, 32.0f),
>   (4.0f, 41.0f, 42.0f),
>   (5.0f, 51.0f, 52.0f)
> )
> val settings = EnvironmentSettings.newInstance()
> .inStreamingMode()
> .useBlinkPlanner()
> .build()
> val tEnv = StreamTableEnvironment.create(env, settings)
> tEnv.createTemporaryView("myTable", source, $("id"), $("f1"), $("f2"))
> val query =
>   """
> |select * from myTable where id in (1.0, 2.0, 3.0)
> |""".stripMargin
> tEnv.executeSql(query).print()
> }
> {code}
> Stack trace:
> {code}
> Exception in thread "main" java.lang.UnsupportedOperationException: class 
> org.apache.calcite.sql.type.SqlTypeName: FLOAT
>   at org.apache.calcite.util.Util.needToImplement(Util.java:1075)
>   at org.apache.calcite.rex.RexLiteral.appendAsJava(RexLiteral.java:703)
>   at org.apache.calcite.rex.RexLiteral.toJavaString(RexLiteral.java:408)
>   at org.apache.calcite.rex.RexLiteral.computeDigest(RexLiteral.java:276)
>   at org.apache.calcite.rex.RexLiteral.(RexLiteral.java:223)
>   at org.apache.calcite.rex.RexLiteral.toLiteral(RexLiteral.java:737)
>   at 
> org.apache.calcite.rex.RexLiteral.lambda$printSarg$4(RexLiteral.java:710)
>   at 
> org.apache.calcite.util.RangeSets$Printer.singleton(RangeSets.java:397)
>   at org.apache.calcite.util.RangeSets.forEach(RangeSets.java:237)
>   at org.apache.calcite.util.Sarg.lambda$printTo$0(Sarg.java:110)
>   at org.apache.calcite.linq4j.Ord.forEach(Ord.java:157)
>   at org.apache.calcite.util.Sarg.printTo(Sarg.java:106)
>   at org.apache.calcite.rex.RexLiteral.printSarg(RexLiteral.java:709)
>   at 
> org.apache.calcite.rex.RexLiteral.lambda$appendAsJava$1(RexLiteral.java:652)
>   at org.apache.calcite.util.Util.asStringBuilder(Util.java:2502)
>   at org.apache.calcite.rex.RexLiteral.appendAsJava(RexLiteral.java:651)
>   at org.apache.calcite.rex.RexLiteral.toJavaString(RexLiteral.java:408)
>   at org.apache.calcite.rex.RexLiteral.computeDigest(RexLiteral.java:276)
>   at org.apache.calcite.rex.RexLiteral.(RexLiteral.java:223)
>   at org.apache.calcite.rex.RexBuilder.makeLiteral(RexBuilder.java:971)
>   at 
> org.apache.calcite.rex.RexBuilder.makeSearchArgumentLiteral(RexBuilder.java:1066)
>   at 
> org.apache.calcite.rex.RexSimplify$SargCollector.fix(RexSimplify.java:2786)
>   at 
> org.apache.calcite.rex.RexSimplify.lambda$simplifyOrs$6(RexSimplify.java:1843)
>   at java.util.ArrayList.forEach(ArrayList.java:1257)
>   at org.apache.calcite.rex.RexSimplify.simplifyOrs(RexSimplify.java:1843)
>   at org.apache.calcite.rex.RexSimplify.simplifyOr(RexSimplify.java:1817)
>   at org.apache.calcite.rex.RexSimplify.simplify(RexSimplify.java:313)
>   at 
> org.apache.calcite.rex.RexSimplify.simplifyUnknownAs(RexSimplify.java:282)
>   at org.apache.calcite.rex.RexSimplify.simplify(RexSimplify.java:257)
>   at 
> org.apache.flink.table.planner.plan.utils.FlinkRexUtil$.simplify(FlinkRexUtil.scala:213)
>   at 
> org.apache.flink.table.planner.plan.rules.logical.SimplifyFilterConditionRule.simplify(SimplifyFilterConditionRule.scala:63)
>   at 
> org.apache.flink.table.planner.plan.rules.logical.SimplifyFilterConditionRule.onMatch(SimplifyFilterConditionRule.scala:46)
>   at 
> org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:333)
>   at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:542)
>   at 
> 

[jira] [Commented] (FLINK-20942) Digest of FLOAT literals throws UnsupportedOperationException

2021-01-22 Thread Danny Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-20942?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17270006#comment-17270006
 ] 

Danny Chen commented on FLINK-20942:


Thanks [~twalthr] for reporting this issue, i would take a look and fix it soon 
~

> Digest of FLOAT literals throws UnsupportedOperationException
> -
>
> Key: FLINK-20942
> URL: https://issues.apache.org/jira/browse/FLINK-20942
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Reporter: Timo Walther
>Priority: Critical
> Fix For: 1.13.0, 1.12.2
>
>
> The recent refactoring of Calcite's digests might have caused a regression 
> for FLOAT literals. {{org.apache.calcite.rex.RexLiteral#appendAsJava}} throws 
> a UnsupportedOperationException for the following query:
> {code}
> def main(args: Array[String]): Unit = {
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> val source = env.fromElements(
>   (1.0f, 11.0f, 12.0f),
>   (2.0f, 21.0f, 22.0f),
>   (3.0f, 31.0f, 32.0f),
>   (4.0f, 41.0f, 42.0f),
>   (5.0f, 51.0f, 52.0f)
> )
> val settings = EnvironmentSettings.newInstance()
> .inStreamingMode()
> .useBlinkPlanner()
> .build()
> val tEnv = StreamTableEnvironment.create(env, settings)
> tEnv.createTemporaryView("myTable", source, $("id"), $("f1"), $("f2"))
> val query =
>   """
> |select * from myTable where id in (1.0, 2.0, 3.0)
> |""".stripMargin
> tEnv.executeSql(query).print()
> }
> {code}
> Stack trace:
> {code}
> Exception in thread "main" java.lang.UnsupportedOperationException: class 
> org.apache.calcite.sql.type.SqlTypeName: FLOAT
>   at org.apache.calcite.util.Util.needToImplement(Util.java:1075)
>   at org.apache.calcite.rex.RexLiteral.appendAsJava(RexLiteral.java:703)
>   at org.apache.calcite.rex.RexLiteral.toJavaString(RexLiteral.java:408)
>   at org.apache.calcite.rex.RexLiteral.computeDigest(RexLiteral.java:276)
>   at org.apache.calcite.rex.RexLiteral.(RexLiteral.java:223)
>   at org.apache.calcite.rex.RexLiteral.toLiteral(RexLiteral.java:737)
>   at 
> org.apache.calcite.rex.RexLiteral.lambda$printSarg$4(RexLiteral.java:710)
>   at 
> org.apache.calcite.util.RangeSets$Printer.singleton(RangeSets.java:397)
>   at org.apache.calcite.util.RangeSets.forEach(RangeSets.java:237)
>   at org.apache.calcite.util.Sarg.lambda$printTo$0(Sarg.java:110)
>   at org.apache.calcite.linq4j.Ord.forEach(Ord.java:157)
>   at org.apache.calcite.util.Sarg.printTo(Sarg.java:106)
>   at org.apache.calcite.rex.RexLiteral.printSarg(RexLiteral.java:709)
>   at 
> org.apache.calcite.rex.RexLiteral.lambda$appendAsJava$1(RexLiteral.java:652)
>   at org.apache.calcite.util.Util.asStringBuilder(Util.java:2502)
>   at org.apache.calcite.rex.RexLiteral.appendAsJava(RexLiteral.java:651)
>   at org.apache.calcite.rex.RexLiteral.toJavaString(RexLiteral.java:408)
>   at org.apache.calcite.rex.RexLiteral.computeDigest(RexLiteral.java:276)
>   at org.apache.calcite.rex.RexLiteral.(RexLiteral.java:223)
>   at org.apache.calcite.rex.RexBuilder.makeLiteral(RexBuilder.java:971)
>   at 
> org.apache.calcite.rex.RexBuilder.makeSearchArgumentLiteral(RexBuilder.java:1066)
>   at 
> org.apache.calcite.rex.RexSimplify$SargCollector.fix(RexSimplify.java:2786)
>   at 
> org.apache.calcite.rex.RexSimplify.lambda$simplifyOrs$6(RexSimplify.java:1843)
>   at java.util.ArrayList.forEach(ArrayList.java:1257)
>   at org.apache.calcite.rex.RexSimplify.simplifyOrs(RexSimplify.java:1843)
>   at org.apache.calcite.rex.RexSimplify.simplifyOr(RexSimplify.java:1817)
>   at org.apache.calcite.rex.RexSimplify.simplify(RexSimplify.java:313)
>   at 
> org.apache.calcite.rex.RexSimplify.simplifyUnknownAs(RexSimplify.java:282)
>   at org.apache.calcite.rex.RexSimplify.simplify(RexSimplify.java:257)
>   at 
> org.apache.flink.table.planner.plan.utils.FlinkRexUtil$.simplify(FlinkRexUtil.scala:213)
>   at 
> org.apache.flink.table.planner.plan.rules.logical.SimplifyFilterConditionRule.simplify(SimplifyFilterConditionRule.scala:63)
>   at 
> org.apache.flink.table.planner.plan.rules.logical.SimplifyFilterConditionRule.onMatch(SimplifyFilterConditionRule.scala:46)
>   at 
> org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:333)
>   at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:542)
>   at 
> org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:407)
>   at 
> org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:243)
>   at 
> org.apache.calcite.plan.hep.HepInstruction$RuleInstance.execute(HepInstruction.java:127)
>   

[jira] [Commented] (FLINK-18027) ROW value constructor cannot deal with complex expressions

2021-01-06 Thread Danny Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18027?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17260156#comment-17260156
 ] 

Danny Chen commented on FLINK-18027:


Fixed in https://issues.apache.org/jira/browse/CALCITE-4456, the bug would be 
fixed if we upgrade to CALCITE release 1.27.0 or higher version.

> ROW value constructor cannot deal with complex expressions
> --
>
> Key: FLINK-18027
> URL: https://issues.apache.org/jira/browse/FLINK-18027
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Reporter: Benchao Li
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.13.0
>
>
> {code:java}
> create table my_source (
> my_row row
> ) with (...);
> create table my_sink (
> my_row row
> ) with (...);
> insert into my_sink
> select ROW(my_row.a, my_row.b) 
> from my_source;{code}
> will throw excepions:
> {code:java}
> Exception in thread "main" org.apache.flink.table.api.SqlParserException: SQL 
> parse failed. Encountered "." at line 1, column 18.Exception in thread "main" 
> org.apache.flink.table.api.SqlParserException: SQL parse failed. Encountered 
> "." at line 1, column 18.Was expecting one of:    ")" ...    "," ...     at 
> org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:56)
>  at 
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:64)
>  at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:627)
>  at com.bytedance.demo.KafkaTableSource.main(KafkaTableSource.java:76)Caused 
> by: org.apache.calcite.sql.parser.SqlParseException: Encountered "." at line 
> 1, column 18.Was expecting one of:    ")" ...    "," ...     at 
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.convertException(FlinkSqlParserImpl.java:416)
>  at 
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.normalizeException(FlinkSqlParserImpl.java:201)
>  at 
> org.apache.calcite.sql.parser.SqlParser.handleException(SqlParser.java:148) 
> at org.apache.calcite.sql.parser.SqlParser.parseQuery(SqlParser.java:163) at 
> org.apache.calcite.sql.parser.SqlParser.parseStmt(SqlParser.java:188) at 
> org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:54)
>  ... 3 moreCaused by: org.apache.flink.sql.parser.impl.ParseException: 
> Encountered "." at line 1, column 18.Was expecting one of:    ")" ...    "," 
> ...     at 
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.generateParseException(FlinkSqlParserImpl.java:36161)
>  at 
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.jj_consume_token(FlinkSqlParserImpl.java:35975)
>  at 
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.ParenthesizedSimpleIdentifierList(FlinkSqlParserImpl.java:21432)
>  at 
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.Expression3(FlinkSqlParserImpl.java:17164)
>  at 
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.Expression2b(FlinkSqlParserImpl.java:16820)
>  at 
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.Expression2(FlinkSqlParserImpl.java:16861)
>  at 
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.Expression(FlinkSqlParserImpl.java:16792)
>  at 
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SelectExpression(FlinkSqlParserImpl.java:11091)
>  at 
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SelectItem(FlinkSqlParserImpl.java:10293)
>  at 
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SelectList(FlinkSqlParserImpl.java:10267)
>  at 
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlSelect(FlinkSqlParserImpl.java:6943)
>  at 
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.LeafQuery(FlinkSqlParserImpl.java:658)
>  at 
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.LeafQueryOrExpr(FlinkSqlParserImpl.java:16775)
>  at 
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.QueryOrExpr(FlinkSqlParserImpl.java:16238)
>  at 
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.OrderedQueryOrExpr(FlinkSqlParserImpl.java:532)
>  at 
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlStmt(FlinkSqlParserImpl.java:3761)
>  at 
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlStmtEof(FlinkSqlParserImpl.java:3800)
>  at 
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.parseSqlStmtEof(FlinkSqlParserImpl.java:248)
>  at org.apache.calcite.sql.parser.SqlParser.parseQuery(SqlParser.java:161) 
> ... 5 more
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-18027) ROW value constructor cannot deal with complex expressions

2020-12-30 Thread Danny Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18027?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17256799#comment-17256799
 ] 

Danny Chen commented on FLINK-18027:


Thanks for the check [~twalthr], let me have a check for the expression `ROW(f0 
+ 12, 'Hello world')` on the Calcite side. Would give the root cause and 
solution soon ~

> ROW value constructor cannot deal with complex expressions
> --
>
> Key: FLINK-18027
> URL: https://issues.apache.org/jira/browse/FLINK-18027
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Reporter: Benchao Li
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.13.0
>
>
> {code:java}
> create table my_source (
> my_row row
> ) with (...);
> create table my_sink (
> my_row row
> ) with (...);
> insert into my_sink
> select ROW(my_row.a, my_row.b) 
> from my_source;{code}
> will throw excepions:
> {code:java}
> Exception in thread "main" org.apache.flink.table.api.SqlParserException: SQL 
> parse failed. Encountered "." at line 1, column 18.Exception in thread "main" 
> org.apache.flink.table.api.SqlParserException: SQL parse failed. Encountered 
> "." at line 1, column 18.Was expecting one of:    ")" ...    "," ...     at 
> org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:56)
>  at 
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:64)
>  at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:627)
>  at com.bytedance.demo.KafkaTableSource.main(KafkaTableSource.java:76)Caused 
> by: org.apache.calcite.sql.parser.SqlParseException: Encountered "." at line 
> 1, column 18.Was expecting one of:    ")" ...    "," ...     at 
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.convertException(FlinkSqlParserImpl.java:416)
>  at 
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.normalizeException(FlinkSqlParserImpl.java:201)
>  at 
> org.apache.calcite.sql.parser.SqlParser.handleException(SqlParser.java:148) 
> at org.apache.calcite.sql.parser.SqlParser.parseQuery(SqlParser.java:163) at 
> org.apache.calcite.sql.parser.SqlParser.parseStmt(SqlParser.java:188) at 
> org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:54)
>  ... 3 moreCaused by: org.apache.flink.sql.parser.impl.ParseException: 
> Encountered "." at line 1, column 18.Was expecting one of:    ")" ...    "," 
> ...     at 
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.generateParseException(FlinkSqlParserImpl.java:36161)
>  at 
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.jj_consume_token(FlinkSqlParserImpl.java:35975)
>  at 
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.ParenthesizedSimpleIdentifierList(FlinkSqlParserImpl.java:21432)
>  at 
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.Expression3(FlinkSqlParserImpl.java:17164)
>  at 
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.Expression2b(FlinkSqlParserImpl.java:16820)
>  at 
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.Expression2(FlinkSqlParserImpl.java:16861)
>  at 
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.Expression(FlinkSqlParserImpl.java:16792)
>  at 
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SelectExpression(FlinkSqlParserImpl.java:11091)
>  at 
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SelectItem(FlinkSqlParserImpl.java:10293)
>  at 
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SelectList(FlinkSqlParserImpl.java:10267)
>  at 
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlSelect(FlinkSqlParserImpl.java:6943)
>  at 
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.LeafQuery(FlinkSqlParserImpl.java:658)
>  at 
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.LeafQueryOrExpr(FlinkSqlParserImpl.java:16775)
>  at 
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.QueryOrExpr(FlinkSqlParserImpl.java:16238)
>  at 
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.OrderedQueryOrExpr(FlinkSqlParserImpl.java:532)
>  at 
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlStmt(FlinkSqlParserImpl.java:3761)
>  at 
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlStmtEof(FlinkSqlParserImpl.java:3800)
>  at 
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.parseSqlStmtEof(FlinkSqlParserImpl.java:248)
>  at org.apache.calcite.sql.parser.SqlParser.parseQuery(SqlParser.java:161) 
> ... 5 more
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-20785) MapRelDataType for legacy planner has wrong digest

2020-12-28 Thread Danny Chen (Jira)
Danny Chen created FLINK-20785:
--

 Summary: MapRelDataType for legacy planner has wrong digest
 Key: FLINK-20785
 URL: https://issues.apache.org/jira/browse/FLINK-20785
 Project: Flink
  Issue Type: Task
  Components: Table SQL / Legacy Planner
Affects Versions: 1.12.0
Reporter: Danny Chen


In the legacy planner, we create the {{MapRelDataType}} ignoring the key and 
value nullability. If we implements the digest correctly, it would conflict 
with the logic.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-20704) Some rel data type does not implement the digest correctly

2020-12-21 Thread Danny Chen (Jira)
Danny Chen created FLINK-20704:
--

 Summary: Some rel data type does not implement the digest correctly
 Key: FLINK-20704
 URL: https://issues.apache.org/jira/browse/FLINK-20704
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Legacy Planner
Affects Versions: 1.11.3, 1.12.0
Reporter: Danny Chen
 Fix For: 1.12.1


Some of the rel data types for legacy planner:

- {{GenericRelDataType}}
- {{ArrayRelDataType}}
- {{MapRelDataType}}
- {{MultisetRelDataType}}

Does not implement the digest correctly, especially for {{GenericRelDataType}} 
, the {{RelDataTypeFactory}} caches the type instances based on its digest, a 
wrong digest impl would mess up the type instance creation, e.g. without this 
patch, all the {{GenericRelDataType}} instances have same digest `ANY`.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-20368) Supports custom operator name for Flink SQL

2020-11-26 Thread Danny Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-20368?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17239503#comment-17239503
 ] 

Danny Chen edited comment on FLINK-20368 at 11/27/20, 4:22 AM:
---

Hi [~ksp0422] ~

The DDL does not use DataStream API, for Blink planner, most of the operator 
codes are code-gened and chained through {{Transformation}}s, the operator name 
strategy is kind of builtin and user never see it.

> Will it break like normal SELECT queries when upgrading

If we keep operator name unchanged, it expected to be compatible.


was (Author: danny0405):
Hi [~ksp0422] ~

The DDL does not use DataStream API, for Blink planner, not of the operator 
codes are code-gened and chained through {{Transformation}}s, the operator name 
strategy is kind of builtin and user never see it.

> Will it break like normal SELECT queries when upgrading

If we keep operator name unchanged, it expected to be compatible.

> Supports custom operator name for Flink SQL
> ---
>
> Key: FLINK-20368
> URL: https://issues.apache.org/jira/browse/FLINK-20368
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / API
>Affects Versions: 1.12.0
>Reporter: Danny Chen
>Priority: Major
>
> A request from USER mailing list from Kevin Kwon:
> For SQLs, I know that the operator ID assignment is not possible now since 
> the query optimizer may not be backward compatible in each release
> But are DDLs also affected by this?
> for example,
> CREATE TABLE mytable (
>   id BIGINT,
>   data STRING
> ) with (
>   connector = 'kafka'
>   ...
>   id = 'mytable'
>   name = 'mytable'
> )
> and we can save all related checkpoint data



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-20368) Supports custom operator name for Flink SQL

2020-11-26 Thread Danny Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-20368?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17239503#comment-17239503
 ] 

Danny Chen commented on FLINK-20368:


Hi [~ksp0422] ~

The DDL does not use DataStream API, for Blink planner, not of the operator 
codes are code-gened and chained through {{Transformation}}s, the operator name 
strategy is kind of builtin and user never see it.

> Will it break like normal SELECT queries when upgrading

If we keep operator name unchanged, it expected to be compatible.

> Supports custom operator name for Flink SQL
> ---
>
> Key: FLINK-20368
> URL: https://issues.apache.org/jira/browse/FLINK-20368
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / API
>Affects Versions: 1.12.0
>Reporter: Danny Chen
>Priority: Major
>
> A request from USER mailing list from Kevin Kwon:
> For SQLs, I know that the operator ID assignment is not possible now since 
> the query optimizer may not be backward compatible in each release
> But are DDLs also affected by this?
> for example,
> CREATE TABLE mytable (
>   id BIGINT,
>   data STRING
> ) with (
>   connector = 'kafka'
>   ...
>   id = 'mytable'
>   name = 'mytable'
> )
> and we can save all related checkpoint data



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-20368) Supports custom operator name for Flink SQL

2020-11-26 Thread Danny Chen (Jira)
Danny Chen created FLINK-20368:
--

 Summary: Supports custom operator name for Flink SQL
 Key: FLINK-20368
 URL: https://issues.apache.org/jira/browse/FLINK-20368
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / API
Affects Versions: 1.12.0
Reporter: Danny Chen


A request from USER mailing list from Kevin Kwon:

For SQLs, I know that the operator ID assignment is not possible now since the 
query optimizer may not be backward compatible in each release

But are DDLs also affected by this?

for example,

CREATE TABLE mytable (
  id BIGINT,
  data STRING
) with (
  connector = 'kafka'
  ...
  id = 'mytable'
  name = 'mytable'
)

and we can save all related checkpoint data




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-20307) Strength the document about temporal table join syntax

2020-11-23 Thread Danny Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-20307?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17237921#comment-17237921
 ] 

Danny Chen commented on FLINK-20307:


Yes, i have added more documents there so there is less confusion.

> Strength the document about temporal table join syntax
> --
>
> Key: FLINK-20307
> URL: https://issues.apache.org/jira/browse/FLINK-20307
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation, Table SQL / Planner
>Reporter: Dawid Wysakowicz
>Assignee: Danny Chen
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.12.0
>
>
> A query like:
> {code}
> SELECT p.name, p.qty * r.rate AS price, p.`tstamp` FROM Products p JOIN 
> versioned_rates r FOR SYSTEM_TIME AS OF p.`tstamp` ON p.currency = r.currency;
> {code}
> fails with:
> {code}
> [ERROR] Could not execute SQL statement. Reason:
> org.apache.flink.sql.parser.impl.ParseException: Encountered "FOR" at line 1, 
> column 108.
> Was expecting one of:
>  
> "EXCEPT" ...
> "FETCH" ...
> "GROUP" ...
> "HAVING" ...
> "INTERSECT" ...
> "LIMIT" ...
> "OFFSET" ...
> "ON" ...
> "ORDER" ...
> "MINUS" ...
> "TABLESAMPLE" ...
> "UNION" ...
> "USING" ...
> "WHERE" ...
> "WINDOW" ...
> "(" ...
> "NATURAL" ...
> "JOIN" ...
> "INNER" ...
> "LEFT" ...
> "RIGHT" ...
> "FULL" ...
> "CROSS" ...
> "," ...
> "OUTER" ...
> {code}
> When I do not alias the {{versioned_rates}} table everything works as 
> expected. Therefore query like just runs:
> {code}
> SELECT p.name, p.qty * versioned_rates.rate AS price, p.`tstamp` FROM 
> Products p JOIN versioned_rates FOR SYSTEM_TIME AS OF p.`tstamp` ON 
> p.currency = versioned_rates.currency;
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-20307) Strength the document about temporal table join syntax

2020-11-23 Thread Danny Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-20307?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Danny Chen updated FLINK-20307:
---
Priority: Minor  (was: Critical)

> Strength the document about temporal table join syntax
> --
>
> Key: FLINK-20307
> URL: https://issues.apache.org/jira/browse/FLINK-20307
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation, Table SQL / Planner
>Reporter: Dawid Wysakowicz
>Assignee: Danny Chen
>Priority: Minor
> Fix For: 1.12.0
>
>
> A query like:
> {code}
> SELECT p.name, p.qty * r.rate AS price, p.`tstamp` FROM Products p JOIN 
> versioned_rates r FOR SYSTEM_TIME AS OF p.`tstamp` ON p.currency = r.currency;
> {code}
> fails with:
> {code}
> [ERROR] Could not execute SQL statement. Reason:
> org.apache.flink.sql.parser.impl.ParseException: Encountered "FOR" at line 1, 
> column 108.
> Was expecting one of:
>  
> "EXCEPT" ...
> "FETCH" ...
> "GROUP" ...
> "HAVING" ...
> "INTERSECT" ...
> "LIMIT" ...
> "OFFSET" ...
> "ON" ...
> "ORDER" ...
> "MINUS" ...
> "TABLESAMPLE" ...
> "UNION" ...
> "USING" ...
> "WHERE" ...
> "WINDOW" ...
> "(" ...
> "NATURAL" ...
> "JOIN" ...
> "INNER" ...
> "LEFT" ...
> "RIGHT" ...
> "FULL" ...
> "CROSS" ...
> "," ...
> "OUTER" ...
> {code}
> When I do not alias the {{versioned_rates}} table everything works as 
> expected. Therefore query like just runs:
> {code}
> SELECT p.name, p.qty * versioned_rates.rate AS price, p.`tstamp` FROM 
> Products p JOIN versioned_rates FOR SYSTEM_TIME AS OF p.`tstamp` ON 
> p.currency = versioned_rates.currency;
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-20307) Strength the document about temporal table join syntax

2020-11-23 Thread Danny Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-20307?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Danny Chen updated FLINK-20307:
---
Component/s: Documentation

> Strength the document about temporal table join syntax
> --
>
> Key: FLINK-20307
> URL: https://issues.apache.org/jira/browse/FLINK-20307
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation, Table SQL / Planner
>Reporter: Dawid Wysakowicz
>Assignee: Danny Chen
>Priority: Critical
> Fix For: 1.12.0
>
>
> A query like:
> {code}
> SELECT p.name, p.qty * r.rate AS price, p.`tstamp` FROM Products p JOIN 
> versioned_rates r FOR SYSTEM_TIME AS OF p.`tstamp` ON p.currency = r.currency;
> {code}
> fails with:
> {code}
> [ERROR] Could not execute SQL statement. Reason:
> org.apache.flink.sql.parser.impl.ParseException: Encountered "FOR" at line 1, 
> column 108.
> Was expecting one of:
>  
> "EXCEPT" ...
> "FETCH" ...
> "GROUP" ...
> "HAVING" ...
> "INTERSECT" ...
> "LIMIT" ...
> "OFFSET" ...
> "ON" ...
> "ORDER" ...
> "MINUS" ...
> "TABLESAMPLE" ...
> "UNION" ...
> "USING" ...
> "WHERE" ...
> "WINDOW" ...
> "(" ...
> "NATURAL" ...
> "JOIN" ...
> "INNER" ...
> "LEFT" ...
> "RIGHT" ...
> "FULL" ...
> "CROSS" ...
> "," ...
> "OUTER" ...
> {code}
> When I do not alias the {{versioned_rates}} table everything works as 
> expected. Therefore query like just runs:
> {code}
> SELECT p.name, p.qty * versioned_rates.rate AS price, p.`tstamp` FROM 
> Products p JOIN versioned_rates FOR SYSTEM_TIME AS OF p.`tstamp` ON 
> p.currency = versioned_rates.currency;
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-20307) Strength the document about temporal table join syntax

2020-11-23 Thread Danny Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-20307?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Danny Chen updated FLINK-20307:
---
Summary: Strength the document about temporal table join syntax  (was: 
Cannot alias versioned table in a join)

> Strength the document about temporal table join syntax
> --
>
> Key: FLINK-20307
> URL: https://issues.apache.org/jira/browse/FLINK-20307
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Reporter: Dawid Wysakowicz
>Assignee: Danny Chen
>Priority: Critical
> Fix For: 1.12.0
>
>
> A query like:
> {code}
> SELECT p.name, p.qty * r.rate AS price, p.`tstamp` FROM Products p JOIN 
> versioned_rates r FOR SYSTEM_TIME AS OF p.`tstamp` ON p.currency = r.currency;
> {code}
> fails with:
> {code}
> [ERROR] Could not execute SQL statement. Reason:
> org.apache.flink.sql.parser.impl.ParseException: Encountered "FOR" at line 1, 
> column 108.
> Was expecting one of:
>  
> "EXCEPT" ...
> "FETCH" ...
> "GROUP" ...
> "HAVING" ...
> "INTERSECT" ...
> "LIMIT" ...
> "OFFSET" ...
> "ON" ...
> "ORDER" ...
> "MINUS" ...
> "TABLESAMPLE" ...
> "UNION" ...
> "USING" ...
> "WHERE" ...
> "WINDOW" ...
> "(" ...
> "NATURAL" ...
> "JOIN" ...
> "INNER" ...
> "LEFT" ...
> "RIGHT" ...
> "FULL" ...
> "CROSS" ...
> "," ...
> "OUTER" ...
> {code}
> When I do not alias the {{versioned_rates}} table everything works as 
> expected. Therefore query like just runs:
> {code}
> SELECT p.name, p.qty * versioned_rates.rate AS price, p.`tstamp` FROM 
> Products p JOIN versioned_rates FOR SYSTEM_TIME AS OF p.`tstamp` ON 
> p.currency = versioned_rates.currency;
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-20307) Cannot alias versioned table in a join

2020-11-23 Thread Danny Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-20307?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17237854#comment-17237854
 ] 

Danny Chen edited comment on FLINK-20307 at 11/24/20, 4:25 AM:
---

Based on the SQL standard 2011 7.6 table reference, the table primary syntax is:

{code:xml}
 ::=
 [  ]
[ [ AS ] 
  [] ]
{code}

So the query should be re-written as following:

{code:sql}
SELECT p.name, p.qty * r.rate AS price, p.`tstamp`
FROM Products p
JOIN versioned_rates FOR SYSTEM_TIME AS OF p.`tstamp` AS r -- AS should be 
here, the AS keyword is optional
ON p.currency = r.currency
{code}



was (Author: danny0405):
Based on the SQL standard 2011 7.6 table reference, the table primary syntax is:

{code:xml}
 ::=
 [  ]
[ [ AS ] 
  [] ]
{code}

So the query should be re-written as following:

{code:sql}
SELECT p.name, p.qty * r.rate AS price, p.`tstamp`
FROM Products p
JOIN versioned_rates FOR SYSTEM_TIME AS OF p.`tstamp` AS r -- AS should be here 
~
ON p.currency = r.currency
{code}


> Cannot alias versioned table in a join
> --
>
> Key: FLINK-20307
> URL: https://issues.apache.org/jira/browse/FLINK-20307
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Reporter: Dawid Wysakowicz
>Assignee: Danny Chen
>Priority: Critical
> Fix For: 1.12.0
>
>
> A query like:
> {code}
> SELECT p.name, p.qty * r.rate AS price, p.`tstamp` FROM Products p JOIN 
> versioned_rates r FOR SYSTEM_TIME AS OF p.`tstamp` ON p.currency = r.currency;
> {code}
> fails with:
> {code}
> [ERROR] Could not execute SQL statement. Reason:
> org.apache.flink.sql.parser.impl.ParseException: Encountered "FOR" at line 1, 
> column 108.
> Was expecting one of:
>  
> "EXCEPT" ...
> "FETCH" ...
> "GROUP" ...
> "HAVING" ...
> "INTERSECT" ...
> "LIMIT" ...
> "OFFSET" ...
> "ON" ...
> "ORDER" ...
> "MINUS" ...
> "TABLESAMPLE" ...
> "UNION" ...
> "USING" ...
> "WHERE" ...
> "WINDOW" ...
> "(" ...
> "NATURAL" ...
> "JOIN" ...
> "INNER" ...
> "LEFT" ...
> "RIGHT" ...
> "FULL" ...
> "CROSS" ...
> "," ...
> "OUTER" ...
> {code}
> When I do not alias the {{versioned_rates}} table everything works as 
> expected. Therefore query like just runs:
> {code}
> SELECT p.name, p.qty * versioned_rates.rate AS price, p.`tstamp` FROM 
> Products p JOIN versioned_rates FOR SYSTEM_TIME AS OF p.`tstamp` ON 
> p.currency = versioned_rates.currency;
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-20307) Cannot alias versioned table in a join

2020-11-23 Thread Danny Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-20307?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17237854#comment-17237854
 ] 

Danny Chen commented on FLINK-20307:


Based on the SQL standard 2011 7.6 table reference, the table primary syntax is:

{code:xml}
 ::=
 [  ]
[ [ AS ] 
  [] ]
{code}

So the query should be re-written as following:

{code:sql}
SELECT p.name, p.qty * r.rate AS price, p.`tstamp`
FROM Products p
JOIN versioned_rates FOR SYSTEM_TIME AS OF p.`tstamp` AS r -- AS should be here 
~
ON p.currency = r.currency
{code}


> Cannot alias versioned table in a join
> --
>
> Key: FLINK-20307
> URL: https://issues.apache.org/jira/browse/FLINK-20307
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Reporter: Dawid Wysakowicz
>Assignee: Danny Chen
>Priority: Critical
> Fix For: 1.12.0
>
>
> A query like:
> {code}
> SELECT p.name, p.qty * r.rate AS price, p.`tstamp` FROM Products p JOIN 
> versioned_rates r FOR SYSTEM_TIME AS OF p.`tstamp` ON p.currency = r.currency;
> {code}
> fails with:
> {code}
> [ERROR] Could not execute SQL statement. Reason:
> org.apache.flink.sql.parser.impl.ParseException: Encountered "FOR" at line 1, 
> column 108.
> Was expecting one of:
>  
> "EXCEPT" ...
> "FETCH" ...
> "GROUP" ...
> "HAVING" ...
> "INTERSECT" ...
> "LIMIT" ...
> "OFFSET" ...
> "ON" ...
> "ORDER" ...
> "MINUS" ...
> "TABLESAMPLE" ...
> "UNION" ...
> "USING" ...
> "WHERE" ...
> "WINDOW" ...
> "(" ...
> "NATURAL" ...
> "JOIN" ...
> "INNER" ...
> "LEFT" ...
> "RIGHT" ...
> "FULL" ...
> "CROSS" ...
> "," ...
> "OUTER" ...
> {code}
> When I do not alias the {{versioned_rates}} table everything works as 
> expected. Therefore query like just runs:
> {code}
> SELECT p.name, p.qty * versioned_rates.rate AS price, p.`tstamp` FROM 
> Products p JOIN versioned_rates FOR SYSTEM_TIME AS OF p.`tstamp` ON 
> p.currency = versioned_rates.currency;
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-20307) Cannot alias versioned table in a join

2020-11-23 Thread Danny Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-20307?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17237794#comment-17237794
 ] 

Danny Chen commented on FLINK-20307:


Hi, [~dwysakowicz] ~ I would like to take this issue ~

> Cannot alias versioned table in a join
> --
>
> Key: FLINK-20307
> URL: https://issues.apache.org/jira/browse/FLINK-20307
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Reporter: Dawid Wysakowicz
>Priority: Critical
> Fix For: 1.12.0
>
>
> A query like:
> {code}
> SELECT p.name, p.qty * r.rate AS price, p.`tstamp` FROM Products p JOIN 
> versioned_rates r FOR SYSTEM_TIME AS OF p.`tstamp` ON p.currency = r.currency;
> {code}
> fails with:
> {code}
> [ERROR] Could not execute SQL statement. Reason:
> org.apache.flink.sql.parser.impl.ParseException: Encountered "FOR" at line 1, 
> column 108.
> Was expecting one of:
>  
> "EXCEPT" ...
> "FETCH" ...
> "GROUP" ...
> "HAVING" ...
> "INTERSECT" ...
> "LIMIT" ...
> "OFFSET" ...
> "ON" ...
> "ORDER" ...
> "MINUS" ...
> "TABLESAMPLE" ...
> "UNION" ...
> "USING" ...
> "WHERE" ...
> "WINDOW" ...
> "(" ...
> "NATURAL" ...
> "JOIN" ...
> "INNER" ...
> "LEFT" ...
> "RIGHT" ...
> "FULL" ...
> "CROSS" ...
> "," ...
> "OUTER" ...
> {code}
> When I do not alias the {{versioned_rates}} table everything works as 
> expected. Therefore query like just runs:
> {code}
> SELECT p.name, p.qty * versioned_rates.rate AS price, p.`tstamp` FROM 
> Products p JOIN versioned_rates FOR SYSTEM_TIME AS OF p.`tstamp` ON 
> p.currency = versioned_rates.currency;
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-19175) Tests in JoinITCase do not test BroadcastHashJoin

2020-11-23 Thread Danny Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-19175?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Danny Chen updated FLINK-19175:
---
Fix Version/s: 1.12.0

> Tests in JoinITCase do not test BroadcastHashJoin
> -
>
> Key: FLINK-19175
> URL: https://issues.apache.org/jira/browse/FLINK-19175
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner, Tests
>Reporter: Dawid Wysakowicz
>Assignee: Danny Chen
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.12.0
>
>
> The tests in JoinITCase claim to test the {{BroadcastHashJoin}}, but they 
> actually do not. None of the tables used in the tests have proper statistics 
> therefore, none of the tables meet the threshold for the broadcast join. At 
> the same time the {{ShuffleHashJoin}} is not disabled, therefore they 
> silently fallback to {{ShuffleHashJoin}}.
> In summary none (or at least not all of the tests) are executed for 
> BroadcastHashJoin, but are executed twice for ShuffleHashJoin.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (FLINK-20017) Improve the error message for join two windowed stream with time attributes

2020-11-23 Thread Danny Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-20017?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Danny Chen closed FLINK-20017.
--
Resolution: Fixed

> Improve the error message for join two windowed stream with time attributes
> ---
>
> Key: FLINK-20017
> URL: https://issues.apache.org/jira/browse/FLINK-20017
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.11.2
>Reporter: Danny Chen
>Assignee: Danny Chen
>Priority: Major
> Fix For: 1.12.0
>
>
> Reported from USER mailing list by Satyam ~
> Current for table:
> {code:sql}
> CREATE TABLE T0 (
>   amount BIGINT,
>   ts TIMESTAMP(3),
>   watermark for ts as ts - INTERVAL '5' SECOND
> ) WITH (
>   'connector' = 'values',
>   'data-id' = '$mDataId',
>   'bounded' = 'false'
> )
> {code}
> and query:
> {code:sql}
> WITH A AS (
>   SELECT COUNT(*) AS ct, tumble_rowtime(ts, INTERVAL '1' MINUTE) as tm
> FROM T0 GROUP BY tumble(ts, INTERVAL '1' MINUTE))
> select L.ct, R.ct, L.tm, R.tm from A as L left join A as R on L.tm = R.tm
> {code}
> throws stacktrace:
> {code:noformat}
> java.lang.RuntimeException: Error while applying rule 
> StreamExecIntervalJoinRule(in:LOGICAL,out:STREAM_PHYSICAL), args 
> [rel#320:FlinkLogicalJoin.LOGICAL.any.None: 
> 0.[NONE].[NONE](left=RelSubset#315,right=RelSubset#315,condition==($1, 
> $3),joinType=left)]
>   at 
> org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:256)
>   at 
> org.apache.calcite.plan.volcano.IterativeRuleDriver.drive(IterativeRuleDriver.java:58)
>   at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:510)
>   at 
> org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:312)
>   at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:64)
>   at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
>   at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
>   at 
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
>   at 
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
>   at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>   at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>   at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>   at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>   at 
> scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
>   at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
>   at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57)
>   at 
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:163)
>   at 
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:79)
>   at 
> org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:286)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:165)
>   at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1261)
>   at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:702)
>   at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:1065)
>   at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:664)
>   at 
> org.apache.flink.table.planner.runtime.stream.sql.TableSourceITCase.testXXX(TableSourceITCase.scala:110)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> 

[jira] [Commented] (FLINK-20017) Improve the error message for join two windowed stream with time attributes

2020-11-23 Thread Danny Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-20017?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17237223#comment-17237223
 ] 

Danny Chen commented on FLINK-20017:


The test case below works ok in master code now, so it is actually fixed, i 
will just close this issue.


{code:java}
@Test
  def testStreamWindowJoin(): Unit = {
// data set
val dataA = List(
  rowOf(2L, LocalDateTime.parse("1970-01-01T00:00:01")),
  rowOf(3L, LocalDateTime.parse("1970-01-01T00:00:04")),
  rowOf(4L, LocalDateTime.parse("1970-01-01T00:00:08")),
  rowOf(5L, LocalDateTime.parse("1970-01-01T00:00:16")),
  rowOf(6L, LocalDateTime.parse("1970-01-01T00:00:32")))
val dataID = TestValuesTableFactory.registerData(dataA)

val createTable =
  s"""
 |CREATE TABLE T0 (
 |  amount BIGINT,
 |  ts TIMESTAMP(3),
 |  watermark for ts as ts - INTERVAL '5' SECOND
 |) WITH (
 |  'connector' = 'values',
 |  'data-id' = '$dataID',
 |  'bounded' = 'false'
 |)
 |""".stripMargin
tEnv.executeSql(createTable)
val query =
  s"""
 |WITH A AS (
 |  SELECT COUNT(*) AS ct, tumble_rowtime(ts, INTERVAL '1' MINUTE) as tm
 |FROM T0 GROUP BY tumble(ts, INTERVAL '1' MINUTE))
 |select L.ct, R.ct, L.tm from A as L left join A as R on L.tm = R.tm
 |""".stripMargin
val result = CollectionUtil.iteratorToList(tEnv.executeSql(query).collect())
assertEquals(Seq("abc"), result)
  }
{code}


> Improve the error message for join two windowed stream with time attributes
> ---
>
> Key: FLINK-20017
> URL: https://issues.apache.org/jira/browse/FLINK-20017
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.11.2
>Reporter: Danny Chen
>Assignee: Danny Chen
>Priority: Major
> Fix For: 1.12.0
>
>
> Reported from USER mailing list by Satyam ~
> Current for table:
> {code:sql}
> CREATE TABLE T0 (
>   amount BIGINT,
>   ts TIMESTAMP(3),
>   watermark for ts as ts - INTERVAL '5' SECOND
> ) WITH (
>   'connector' = 'values',
>   'data-id' = '$mDataId',
>   'bounded' = 'false'
> )
> {code}
> and query:
> {code:sql}
> WITH A AS (
>   SELECT COUNT(*) AS ct, tumble_rowtime(ts, INTERVAL '1' MINUTE) as tm
> FROM T0 GROUP BY tumble(ts, INTERVAL '1' MINUTE))
> select L.ct, R.ct, L.tm, R.tm from A as L left join A as R on L.tm = R.tm
> {code}
> throws stacktrace:
> {code:noformat}
> java.lang.RuntimeException: Error while applying rule 
> StreamExecIntervalJoinRule(in:LOGICAL,out:STREAM_PHYSICAL), args 
> [rel#320:FlinkLogicalJoin.LOGICAL.any.None: 
> 0.[NONE].[NONE](left=RelSubset#315,right=RelSubset#315,condition==($1, 
> $3),joinType=left)]
>   at 
> org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:256)
>   at 
> org.apache.calcite.plan.volcano.IterativeRuleDriver.drive(IterativeRuleDriver.java:58)
>   at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:510)
>   at 
> org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:312)
>   at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:64)
>   at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
>   at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
>   at 
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
>   at 
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
>   at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>   at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>   at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>   at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>   at 
> scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
>   at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
>   at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57)
>   at 
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:163)
>   at 
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:79)
>   at 
> org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
> 

[jira] [Updated] (FLINK-20234) Json format supports SE/DE null elements of ARRAY type field

2020-11-22 Thread Danny Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-20234?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Danny Chen updated FLINK-20234:
---
Fix Version/s: (was: 1.12.0)

> Json format supports SE/DE null elements of ARRAY type field
> 
>
> Key: FLINK-20234
> URL: https://issues.apache.org/jira/browse/FLINK-20234
> Project: Flink
>  Issue Type: Improvement
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile), Table 
> SQL / Ecosystem
>Affects Versions: 1.11.2
>Reporter: Danny Chen
>Priority: Major
>
> Report my USER mailing list:
> Hi,
> I recently discovered some of our data has NULL values arriving in an 
> ARRAY column. This column is being consumed by Flink via the Kafka 
> connector Debezium format. We seem to be receiving NullPointerExceptions for 
> when these NULL values in the arrays arrive which restarts the source 
> operator in a loop.
> Is there any way to not throw or to possibly filter out NULLs in an Array of 
> Strings in Flink?
> We're somewhat stuck on how to solve this problem, we'd like to be defensive 
> about this on Flink's side.
> Thanks!
> The JSON:
> {code:java}
> {
>   "schema": {
> "type": "struct",
> "fields": [
>   {
> "type": "struct",
> "fields": [
>   { "type": "int32", "optional": false, "field": "id" },
>   {
> "type": "array",
> "items": { "type": "string", "optional": true },
> "optional": false,
> "field": "roles"
>   },
> ],
> "optional": true,
> "name": "db.public.data.Value",
> "field": "before"
>   },
>   {
> "type": "struct",
> "fields": [
>   { "type": "int32", "optional": false, "field": "id" },
>   {
> "type": "array",
> "items": { "type": "string", "optional": true },
> "optional": false,
> "field": "roles"
>   },
> ],
> "optional": true,
> "name": "db.public.data.Value",
> "field": "after"
>   },
>   {
> "type": "struct",
> "fields": [
>   { "type": "string", "optional": false, "field": "version" },
>   { "type": "string", "optional": false, "field": "connector" },
>   { "type": "string", "optional": false, "field": "name" },
>   { "type": "int64", "optional": false, "field": "ts_ms" },
>   {
> "type": "string",
> "optional": true,
> "name": "io.debezium.data.Enum",
> "version": 1,
> "parameters": { "allowed": "true,last,false" },
> "default": "false",
> "field": "snapshot"
>   },
>   { "type": "string", "optional": false, "field": "db" },
>   { "type": "string", "optional": false, "field": "schema" },
>   { "type": "string", "optional": false, "field": "table" },
>   { "type": "int64", "optional": true, "field": "txId" },
>   { "type": "int64", "optional": true, "field": "lsn" },
>   { "type": "int64", "optional": true, "field": "xmin" }
> ],
> "optional": false,
> "name": "io.debezium.connector.postgresql.Source",
> "field": "source"
>   },
>   { "type": "string", "optional": false, "field": "op" },
>   { "type": "int64", "optional": true, "field": "ts_ms" },
>   {
> "type": "struct",
> "fields": [
>   { "type": "string", "optional": false, "field": "id" },
>   { "type": "int64", "optional": false, "field": "total_order" },
>   {
> "type": "int64",
> "optional": false,
> "field": "data_collection_order"
>   }
> ],
> "optional": true,
> "field": "transaction"
>   }
> ],
> "optional": false,
> "name": "db.public.data.Envelope"
>   },
>   "payload": {
> "before": null,
> "after": {
>   "id": 76704,
>   "roles": [null],
> },
> "source": {
>   "version": "1.3.0.Final",
>   "connector": "postgresql",
>   "name": "db",
>   "ts_ms": 1605739197360,
>   "snapshot": "true",
>   "db": "db",
>   "schema": "public",
>   "table": "data",
>   "txId": 1784,
>   "lsn": 1305806608,
>   "xmin": null
> },
> "op": "r",
> "ts_ms": 1605739197373,
> "transaction": null
>   }
> }
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-20234) Json format supports SE/DE null elements of ARRAY type field

2020-11-19 Thread Danny Chen (Jira)
Danny Chen created FLINK-20234:
--

 Summary: Json format supports SE/DE null elements of ARRAY type 
field
 Key: FLINK-20234
 URL: https://issues.apache.org/jira/browse/FLINK-20234
 Project: Flink
  Issue Type: Improvement
  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
Affects Versions: 1.11.2
Reporter: Danny Chen
 Fix For: 1.12.0


Report my USER mailing list:

Hi,

I recently discovered some of our data has NULL values arriving in an 
ARRAY column. This column is being consumed by Flink via the Kafka 
connector Debezium format. We seem to be receiving NullPointerExceptions for 
when these NULL values in the arrays arrive which restarts the source operator 
in a loop.

Is there any way to not throw or to possibly filter out NULLs in an Array of 
Strings in Flink?

We're somewhat stuck on how to solve this problem, we'd like to be defensive 
about this on Flink's side.

Thanks!



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-20200) SQL Hints are not supported in "Create View" syntax

2020-11-17 Thread Danny Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-20200?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17234193#comment-17234193
 ] 

Danny Chen commented on FLINK-20200:


Thanks for reporting [~tinny], this should be a bug, i'm fixing it ~

> SQL Hints are not supported in  "Create View" syntax
> 
>
> Key: FLINK-20200
> URL: https://issues.apache.org/jira/browse/FLINK-20200
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.11.2
>Reporter: shizhengchao
>Priority: Major
>
> I have aready set the config option `table.dynamic-table-options.enabled` to 
> be true, but "SQL Hints" are not supported in View syntax. I got an error:
> {code:java}
> Exception in thread "main" java.lang.UnsupportedOperationException: class 
> org.apache.calcite.sql.SqlSyntax$6: SPECIAL
>   at org.apache.calcite.util.Util.needToImplement(Util.java:967)
>   at org.apache.calcite.sql.SqlSyntax$6.unparse(SqlSyntax.java:116)
>   at org.apache.calcite.sql.SqlOperator.unparse(SqlOperator.java:333)
>   at org.apache.calcite.sql.SqlDialect.unparseCall(SqlDialect.java:470)
>   at org.apache.calcite.sql.SqlCall.unparse(SqlCall.java:101)
>   at 
> org.apache.calcite.sql.SqlSelectOperator.unparse(SqlSelectOperator.java:176)
>   at org.apache.calcite.sql.SqlDialect.unparseCall(SqlDialect.java:470)
>   at org.apache.calcite.sql.SqlSelect.unparse(SqlSelect.java:246)
>   at org.apache.calcite.sql.SqlNode.toSqlString(SqlNode.java:151)
>   at org.apache.calcite.sql.SqlNode.toSqlString(SqlNode.java:173)
>   at org.apache.calcite.sql.SqlNode.toSqlString(SqlNode.java:182)
>   at 
> org.apache.flink.table.planner.operations.SqlToOperationConverter.getQuotedSqlString(SqlToOperationConverter.java:784)
>   at 
> org.apache.flink.table.planner.utils.Expander$Expanded.substitute(Expander.java:169)
>   at 
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertViewQuery(SqlToOperationConverter.java:694)
>   at 
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertCreateView(SqlToOperationConverter.java:665)
>   at 
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:228)
>   at 
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
>   at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:684)
> {code}
> The sql code is as follows:
> {code:java}
> drop table if exists SourceA;
> create table SourceA (
>   idstring,
>   name  string
> ) with (
>   'connector' = 'kafka-0.11',
>   'topic' = 'MyTopic',
>   'properties.bootstrap.servers' = 'localhost:9092',
>   'properties.group.id' = 'Test',
>   'scan.startup.mode' = 'group-offsets',
>   'format' = 'csv'
> );
> drop table if exists print;
> create table print (
>   idstring,
>   name  string
> ) with (
>   'connector' = 'print'
> );
> drop view if exists test_view;
> create view test_view as
> select
>   *
> from SourceA /*+ OPTIONS('properties.group.id'='NewGroup') */;
> insert into print
> select * from test_view;
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-18988) Continuous query with LATERAL and LIMIT produces wrong result

2020-11-17 Thread Danny Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17233552#comment-17233552
 ] 

Danny Chen commented on FLINK-18988:


Sorry for the delay reply, i will fix both master and 1.11.

> Continuous query with LATERAL and LIMIT produces wrong result
> -
>
> Key: FLINK-18988
> URL: https://issues.apache.org/jira/browse/FLINK-18988
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.11.1
>Reporter: Fabian Hueske
>Assignee: Danny Chen
>Priority: Critical
>  Labels: pull-request-available
>
> I was trying out the example queries provided in this blog post: 
> [https://materialize.io/lateral-joins-and-demand-driven-queries/] to check if 
> Flink supports the same and found that the queries were translated and 
> executed but produced the wrong result.
> I used the SQL Client and Kafka (running at kafka:9092) to store the table 
> data. I executed the following statements:
> {code:java}
> -- create cities table
> CREATE TABLE cities (
>   name STRING NOT NULL,
>   state STRING NOT NULL,
>   pop INT NOT NULL
> ) WITH (
>   'connector' = 'kafka',
>   'topic' = 'cities',
>   'properties.bootstrap.servers' = 'kafka:9092',
>   'properties.group.id' = 'mygroup', 
>   'scan.startup.mode' = 'earliest-offset',
>   'format' = 'json'
> );
> -- fill cities table
> INSERT INTO cities VALUES
>   ('Los_Angeles', 'CA', 3979576),
>   ('Phoenix', 'AZ', 1680992),
>   ('Houston', 'TX', 2320268),
>   ('San_Diego', 'CA', 1423851),
>   ('San_Francisco', 'CA', 881549),
>   ('New_York', 'NY', 8336817),
>   ('Dallas', 'TX', 1343573),
>   ('San_Antonio', 'TX', 1547253),
>   ('San_Jose', 'CA', 1021795),
>   ('Chicago', 'IL', 2695598),
>   ('Austin', 'TX', 978908);
> -- execute query
> SELECT state, name 
> FROM
>   (SELECT DISTINCT state FROM cities) states,
>   LATERAL (
> SELECT name, pop
> FROM cities
> WHERE state = states.state
> ORDER BY pop
> DESC LIMIT 3
>   );
> -- result
> state  name
>CA   Los_Angeles
>NY  New_York
>IL   Chicago
> -- expected result
> state | name
> --+-
> TX    | Dallas
> AZ    | Phoenix
> IL    | Chicago
> TX    | Houston
> CA    | San_Jose
> NY    | New_York
> CA    | San_Diego
> CA    | Los_Angeles
> TX    | San_Antonio
> {code}
> As you can see from the query result, Flink computes the top3 cities over all 
> states, not for every state individually. Hence, I assume that this is a bug 
> in the query optimizer or one of the rewriting rules.
> There are two valid ways to solve this issue:
>  * Fixing the rewriting rules / optimizer (obviously preferred)
>  * Disabling this feature and throwing an exception



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-18500) Make the legacy planner exception more clear when resolving computed columns types for schema

2020-11-17 Thread Danny Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17233419#comment-17233419
 ] 

Danny Chen edited comment on FLINK-18500 at 11/17/20, 8:14 AM:
---

Thanks, i'm rebasing the code and fixing. We can revert the commit first.


was (Author: danny0405):
Thanks, i'm rebasing the code and fixing.

> Make the legacy planner exception more clear when resolving computed columns 
> types for schema
> -
>
> Key: FLINK-18500
> URL: https://issues.apache.org/jira/browse/FLINK-18500
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / API
>Affects Versions: 1.11.0
>Reporter: Danny Chen
>Assignee: Danny Chen
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.12.0
>
>
> From the user mail:
> Hi, all:
> i use zeppelin execute sql, FLink version is Flink 1.11 snapshot ,build from 
> branch release-1.11 ,commit is 334f35cbd6da754d8b5b294032cd84c858b1f973
> when the table type is datagen, Flink will thrown exception ,but the 
> exception message is null ;
> My DDL is :
> {code:sql}
> CREATE TABLE datagen_dijie2 (
>  f_sequence INT,
>  f_random INT,
>  f_random_str STRING,
>  ts AS localtimestamp,
>  WATERMARK FOR ts AS ts
> ) WITH (
>  'connector' = 'datagen',
>  'rows-per-second'='5',
>  'fields.f_sequence.kind'='sequence',
>  'fields.f_sequence.start'='1',
>  'fields.f_sequence.end'='1000',
>  'fields.f_random.min'='1',
>  'fields.f_random.max'='1000',
>  'fields.f_random_str.length'='10'
> );
> {code}
> My query sql is :
> {code:sql}
> select * from datagen_dijie2;
> {code}
> the exception is :
> {noformat}
> org.apache.flink.table.api.ValidationException: SQL validation failed. null 
> at 
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:146)
>  at 
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:108)
>  at 
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:187)
>  at 
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
>  at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:658)
>  at 
> org.apache.zeppelin.flink.sql.AbstractStreamSqlJob.run(AbstractStreamSqlJob.java:102)
>  at 
> org.apache.zeppelin.flink.FlinkStreamSqlInterpreter.callInnerSelect(FlinkStreamSqlInterpreter.java:89)
>  at 
> org.apache.zeppelin.flink.FlinkSqlInterrpeter.callSelect(FlinkSqlInterrpeter.java:526)
>  at 
> org.apache.zeppelin.flink.FlinkSqlInterrpeter.callCommand(FlinkSqlInterrpeter.java:297)
>  at 
> org.apache.zeppelin.flink.FlinkSqlInterrpeter.runSqlList(FlinkSqlInterrpeter.java:191)
>  at 
> org.apache.zeppelin.flink.FlinkSqlInterrpeter.interpret(FlinkSqlInterrpeter.java:156)
>  at 
> org.apache.zeppelin.interpreter.LazyOpenInterpreter.interpret(LazyOpenInterpreter.java:110)
>  at 
> org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:776)
>  at 
> org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:668)
>  at org.apache.zeppelin.scheduler.Job.run(Job.java:172) at 
> org.apache.zeppelin.scheduler.AbstractScheduler.runJob(AbstractScheduler.java:130)
>  at 
> org.apache.zeppelin.scheduler.ParallelScheduler.lambda$runJobInScheduler$0(ParallelScheduler.java:39)
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  at java.lang.Thread.run(Thread.java:748) Caused by: 
> java.lang.UnsupportedOperationException at 
> org.apache.flink.table.planner.ParserImpl.parseSqlExpression(ParserImpl.java:86)
>  at 
> org.apache.flink.table.api.internal.CatalogTableSchemaResolver.resolveExpressionDataType(CatalogTableSchemaResolver.java:119)
>  at 
> org.apache.flink.table.api.internal.CatalogTableSchemaResolver.resolve(CatalogTableSchemaResolver.java:83)
>  at 
> org.apache.flink.table.catalog.CatalogManager.resolveTableSchema(CatalogManager.java:380)
>  at
> {noformat}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-18500) Make the legacy planner exception more clear when resolving computed columns types for schema

2020-11-17 Thread Danny Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17233419#comment-17233419
 ] 

Danny Chen commented on FLINK-18500:


Thanks, i'm rebasing the code and fixing.

> Make the legacy planner exception more clear when resolving computed columns 
> types for schema
> -
>
> Key: FLINK-18500
> URL: https://issues.apache.org/jira/browse/FLINK-18500
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / API
>Affects Versions: 1.11.0
>Reporter: Danny Chen
>Assignee: Danny Chen
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.12.0
>
>
> From the user mail:
> Hi, all:
> i use zeppelin execute sql, FLink version is Flink 1.11 snapshot ,build from 
> branch release-1.11 ,commit is 334f35cbd6da754d8b5b294032cd84c858b1f973
> when the table type is datagen, Flink will thrown exception ,but the 
> exception message is null ;
> My DDL is :
> {code:sql}
> CREATE TABLE datagen_dijie2 (
>  f_sequence INT,
>  f_random INT,
>  f_random_str STRING,
>  ts AS localtimestamp,
>  WATERMARK FOR ts AS ts
> ) WITH (
>  'connector' = 'datagen',
>  'rows-per-second'='5',
>  'fields.f_sequence.kind'='sequence',
>  'fields.f_sequence.start'='1',
>  'fields.f_sequence.end'='1000',
>  'fields.f_random.min'='1',
>  'fields.f_random.max'='1000',
>  'fields.f_random_str.length'='10'
> );
> {code}
> My query sql is :
> {code:sql}
> select * from datagen_dijie2;
> {code}
> the exception is :
> {noformat}
> org.apache.flink.table.api.ValidationException: SQL validation failed. null 
> at 
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:146)
>  at 
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:108)
>  at 
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:187)
>  at 
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
>  at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:658)
>  at 
> org.apache.zeppelin.flink.sql.AbstractStreamSqlJob.run(AbstractStreamSqlJob.java:102)
>  at 
> org.apache.zeppelin.flink.FlinkStreamSqlInterpreter.callInnerSelect(FlinkStreamSqlInterpreter.java:89)
>  at 
> org.apache.zeppelin.flink.FlinkSqlInterrpeter.callSelect(FlinkSqlInterrpeter.java:526)
>  at 
> org.apache.zeppelin.flink.FlinkSqlInterrpeter.callCommand(FlinkSqlInterrpeter.java:297)
>  at 
> org.apache.zeppelin.flink.FlinkSqlInterrpeter.runSqlList(FlinkSqlInterrpeter.java:191)
>  at 
> org.apache.zeppelin.flink.FlinkSqlInterrpeter.interpret(FlinkSqlInterrpeter.java:156)
>  at 
> org.apache.zeppelin.interpreter.LazyOpenInterpreter.interpret(LazyOpenInterpreter.java:110)
>  at 
> org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:776)
>  at 
> org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:668)
>  at org.apache.zeppelin.scheduler.Job.run(Job.java:172) at 
> org.apache.zeppelin.scheduler.AbstractScheduler.runJob(AbstractScheduler.java:130)
>  at 
> org.apache.zeppelin.scheduler.ParallelScheduler.lambda$runJobInScheduler$0(ParallelScheduler.java:39)
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  at java.lang.Thread.run(Thread.java:748) Caused by: 
> java.lang.UnsupportedOperationException at 
> org.apache.flink.table.planner.ParserImpl.parseSqlExpression(ParserImpl.java:86)
>  at 
> org.apache.flink.table.api.internal.CatalogTableSchemaResolver.resolveExpressionDataType(CatalogTableSchemaResolver.java:119)
>  at 
> org.apache.flink.table.api.internal.CatalogTableSchemaResolver.resolve(CatalogTableSchemaResolver.java:83)
>  at 
> org.apache.flink.table.catalog.CatalogManager.resolveTableSchema(CatalogManager.java:380)
>  at
> {noformat}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-20017) Improve the error message for join two windowed stream with time attributes

2020-11-16 Thread Danny Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-20017?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Danny Chen updated FLINK-20017:
---
Summary: Improve the error message for join two windowed stream with time 
attributes  (was: Promote the error message for join two windowed stream with 
time attributes)

> Improve the error message for join two windowed stream with time attributes
> ---
>
> Key: FLINK-20017
> URL: https://issues.apache.org/jira/browse/FLINK-20017
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.11.2
>Reporter: Danny Chen
>Assignee: Danny Chen
>Priority: Major
> Fix For: 1.12.0
>
>
> Reported from USER mailing list by Satyam ~
> Current for table:
> {code:sql}
> CREATE TABLE T0 (
>   amount BIGINT,
>   ts TIMESTAMP(3),
>   watermark for ts as ts - INTERVAL '5' SECOND
> ) WITH (
>   'connector' = 'values',
>   'data-id' = '$mDataId',
>   'bounded' = 'false'
> )
> {code}
> and query:
> {code:sql}
> WITH A AS (
>   SELECT COUNT(*) AS ct, tumble_rowtime(ts, INTERVAL '1' MINUTE) as tm
> FROM T0 GROUP BY tumble(ts, INTERVAL '1' MINUTE))
> select L.ct, R.ct, L.tm, R.tm from A as L left join A as R on L.tm = R.tm
> {code}
> throws stacktrace:
> {code:noformat}
> java.lang.RuntimeException: Error while applying rule 
> StreamExecIntervalJoinRule(in:LOGICAL,out:STREAM_PHYSICAL), args 
> [rel#320:FlinkLogicalJoin.LOGICAL.any.None: 
> 0.[NONE].[NONE](left=RelSubset#315,right=RelSubset#315,condition==($1, 
> $3),joinType=left)]
>   at 
> org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:256)
>   at 
> org.apache.calcite.plan.volcano.IterativeRuleDriver.drive(IterativeRuleDriver.java:58)
>   at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:510)
>   at 
> org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:312)
>   at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:64)
>   at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
>   at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
>   at 
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
>   at 
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
>   at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>   at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>   at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>   at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>   at 
> scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
>   at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
>   at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57)
>   at 
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:163)
>   at 
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:79)
>   at 
> org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:286)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:165)
>   at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1261)
>   at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:702)
>   at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:1065)
>   at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:664)
>   at 
> org.apache.flink.table.planner.runtime.stream.sql.TableSourceITCase.testXXX(TableSourceITCase.scala:110)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>   at 
> 

[jira] [Updated] (FLINK-20142) Update the document for CREATE TABLE LIKE that source table from different catalog is supported

2020-11-16 Thread Danny Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-20142?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Danny Chen updated FLINK-20142:
---
Fix Version/s: 1.12.0

> Update the document for CREATE TABLE LIKE that source table from different 
> catalog is supported
> ---
>
> Key: FLINK-20142
> URL: https://issues.apache.org/jira/browse/FLINK-20142
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation, Table SQL / API
>Affects Versions: 1.11.2
>Reporter: Danny Chen
>Priority: Major
> Fix For: 1.12.0
>
>
> The confusion from the [USER mailing 
> list|http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/CREATE-TABLE-LIKE-clause-from-different-catalog-or-database-td39364.html]:
> Hi,
> Is it disallowed to refer to a table from different databases or catalogs 
> when someone creates a table?
> According to [1], there's no way to refer to tables belonging to different 
> databases or catalogs.
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/create.html#create-table
> Best,
> Dongwon



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-20142) Update the document for CREATE TABLE LIKE that source table from different catalog is supported

2020-11-13 Thread Danny Chen (Jira)
Danny Chen created FLINK-20142:
--

 Summary: Update the document for CREATE TABLE LIKE that source 
table from different catalog is supported
 Key: FLINK-20142
 URL: https://issues.apache.org/jira/browse/FLINK-20142
 Project: Flink
  Issue Type: Improvement
  Components: Documentation
Affects Versions: 1.11.2
Reporter: Danny Chen


The confusion from the USER mailing list:

Hi,

Is it disallowed to refer to a table from different databases or catalogs when 
someone creates a table?

According to [1], there's no way to refer to tables belonging to different 
databases or catalogs.

[1] 
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/create.html#create-table

Best,

Dongwon



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-20017) Promote the error message for join two windowed stream with time attributes

2020-11-06 Thread Danny Chen (Jira)
Danny Chen created FLINK-20017:
--

 Summary: Promote the error message for join two windowed stream 
with time attributes
 Key: FLINK-20017
 URL: https://issues.apache.org/jira/browse/FLINK-20017
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / API
Affects Versions: 1.11.2
Reporter: Danny Chen
 Fix For: 1.12.0


Current for table:

{code:sql}
CREATE TABLE T0 (
  amount BIGINT,
  ts TIMESTAMP(3),
  watermark for ts as ts - INTERVAL '5' SECOND
) WITH (
  'connector' = 'values',
  'data-id' = '$mDataId',
  'bounded' = 'false'
)
{code}

and query:
{code:sql}
WITH A AS (
  SELECT COUNT(*) AS ct, tumble_rowtime(ts, INTERVAL '1' MINUTE) as tm
FROM T0 GROUP BY tumble(ts, INTERVAL '1' MINUTE))
select L.ct, R.ct, L.tm, R.tm from A as L left join A as R on L.tm = R.tm
{code}


throws stacktrace:

{code:noformat}
java.lang.RuntimeException: Error while applying rule 
StreamExecIntervalJoinRule(in:LOGICAL,out:STREAM_PHYSICAL), args 
[rel#320:FlinkLogicalJoin.LOGICAL.any.None: 
0.[NONE].[NONE](left=RelSubset#315,right=RelSubset#315,condition==($1, 
$3),joinType=left)]

at 
org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:256)
at 
org.apache.calcite.plan.volcano.IterativeRuleDriver.drive(IterativeRuleDriver.java:58)
at 
org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:510)
at 
org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:312)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:64)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at 
scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57)
at 
org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:163)
at 
org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:79)
at 
org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
at 
org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:286)
at 
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:165)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1261)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:702)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:1065)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:664)
at 
org.apache.flink.table.planner.runtime.stream.sql.TableSourceITCase.testXXX(TableSourceITCase.scala:110)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at 
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at 
org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
at 
org.junit.rules.ExpectedException$ExpectedExceptionStatement.evaluate(ExpectedException.java:239)
at 

[jira] [Updated] (FLINK-20017) Promote the error message for join two windowed stream with time attributes

2020-11-06 Thread Danny Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-20017?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Danny Chen updated FLINK-20017:
---
Description: 
Reported from USER mailing list by Satyam ~
Current for table:

{code:sql}
CREATE TABLE T0 (
  amount BIGINT,
  ts TIMESTAMP(3),
  watermark for ts as ts - INTERVAL '5' SECOND
) WITH (
  'connector' = 'values',
  'data-id' = '$mDataId',
  'bounded' = 'false'
)
{code}

and query:
{code:sql}
WITH A AS (
  SELECT COUNT(*) AS ct, tumble_rowtime(ts, INTERVAL '1' MINUTE) as tm
FROM T0 GROUP BY tumble(ts, INTERVAL '1' MINUTE))
select L.ct, R.ct, L.tm, R.tm from A as L left join A as R on L.tm = R.tm
{code}


throws stacktrace:

{code:noformat}
java.lang.RuntimeException: Error while applying rule 
StreamExecIntervalJoinRule(in:LOGICAL,out:STREAM_PHYSICAL), args 
[rel#320:FlinkLogicalJoin.LOGICAL.any.None: 
0.[NONE].[NONE](left=RelSubset#315,right=RelSubset#315,condition==($1, 
$3),joinType=left)]

at 
org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:256)
at 
org.apache.calcite.plan.volcano.IterativeRuleDriver.drive(IterativeRuleDriver.java:58)
at 
org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:510)
at 
org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:312)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:64)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at 
scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57)
at 
org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:163)
at 
org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:79)
at 
org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
at 
org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:286)
at 
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:165)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1261)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:702)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:1065)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:664)
at 
org.apache.flink.table.planner.runtime.stream.sql.TableSourceITCase.testXXX(TableSourceITCase.scala:110)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at 
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at 
org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
at 
org.junit.rules.ExpectedException$ExpectedExceptionStatement.evaluate(ExpectedException.java:239)
at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
at org.junit.rules.RunRules.evaluate(RunRules.java:20)
at 

[jira] [Updated] (FLINK-19949) Unescape CSV format line and field delimiter character

2020-11-03 Thread Danny Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-19949?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Danny Chen updated FLINK-19949:
---
Description: 
We should unescape the line delimiter characters first because the DDL can be 
read from a file. So that the new line "\n" in the DDL options was recognized 
as 2 characters.

While what user want is actually the invisible new line character.

{code:sql}
create table t1(
  ...
) with (
  'format' = 'csv',
  'csv.line-delimiter' = '\n'
  ...
)
{code}

Same as the field delimiter.


  was:
We should unescape the line delimiter characters first because the DDL can be 
read from a file. So that the new line "\n" in the DDL options was recognized 
as 2 characters.

While what user want is actually the invisible new line character.

{code:sql}
create table t1(
  ...
) with (
  'format' = 'csv',
  'csv.line-delimiter' = '\n'
  ...
)
{code}



> Unescape CSV format line and field delimiter character
> --
>
> Key: FLINK-19949
> URL: https://issues.apache.org/jira/browse/FLINK-19949
> Project: Flink
>  Issue Type: Improvement
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>Affects Versions: 1.11.2
>Reporter: Danny Chen
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.12.0
>
>
> We should unescape the line delimiter characters first because the DDL can be 
> read from a file. So that the new line "\n" in the DDL options was recognized 
> as 2 characters.
> While what user want is actually the invisible new line character.
> {code:sql}
> create table t1(
>   ...
> ) with (
>   'format' = 'csv',
>   'csv.line-delimiter' = '\n'
>   ...
> )
> {code}
> Same as the field delimiter.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-19949) Unescape CSV format line and field delimiter character

2020-11-03 Thread Danny Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-19949?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Danny Chen updated FLINK-19949:
---
Summary: Unescape CSV format line and field delimiter character  (was: 
Unescape CSV format line delimiter character)

> Unescape CSV format line and field delimiter character
> --
>
> Key: FLINK-19949
> URL: https://issues.apache.org/jira/browse/FLINK-19949
> Project: Flink
>  Issue Type: Improvement
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>Affects Versions: 1.11.2
>Reporter: Danny Chen
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.12.0
>
>
> We should unescape the line delimiter characters first because the DDL can be 
> read from a file. So that the new line "\n" in the DDL options was recognized 
> as 2 characters.
> While what user want is actually the invisible new line character.
> {code:sql}
> create table t1(
>   ...
> ) with (
>   'format' = 'csv',
>   'csv.line-delimiter' = '\n'
>   ...
> )
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-19949) Unescape CSV format line delimiter character

2020-11-03 Thread Danny Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19949?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17225244#comment-17225244
 ] 

Danny Chen commented on FLINK-19949:


Yes, but the unicode has poor usability. As a user, there would bring in 
confusion when the same DDL works in the Java code but fails when written in a 
file.

And the unescape way does not cause any regression.

> Unescape CSV format line delimiter character
> 
>
> Key: FLINK-19949
> URL: https://issues.apache.org/jira/browse/FLINK-19949
> Project: Flink
>  Issue Type: Improvement
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>Affects Versions: 1.11.2
>Reporter: Danny Chen
>Priority: Major
> Fix For: 1.12.0
>
>
> We should unescape the line delimiter characters first because the DDL can be 
> read from a file. So that the new line "\n" in the DDL options was recognized 
> as 2 characters.
> While what user want is actually the invisible new line character.
> {code:sql}
> create table t1(
>   ...
> ) with (
>   'format' = 'csv',
>   'csv.line-delimiter' = '\n'
>   ...
> )
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-19949) Unescape CSV format line delimiter character

2020-11-03 Thread Danny Chen (Jira)
Danny Chen created FLINK-19949:
--

 Summary: Unescape CSV format line delimiter character
 Key: FLINK-19949
 URL: https://issues.apache.org/jira/browse/FLINK-19949
 Project: Flink
  Issue Type: Improvement
  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
Affects Versions: 1.11.2
Reporter: Danny Chen
 Fix For: 1.12.0


We should unescape the line delimiter characters first because the DDL can be 
read from a file. So that the new line "\n" in the DDL options was recognized 
as 2 characters.

While what user want is actually the invisible new line character.

{code:sql}
create table t1(
  ...
) with (
  'format' = 'csv',
  'csv.line-delimiter' = '\n'
  ...
)
{code}




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-19779) Remove the "record_" field name prefix for Confluent Avro format deserialization

2020-10-26 Thread Danny Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19779?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17220547#comment-17220547
 ] 

Danny Chen commented on FLINK-19779:


[~xiaozilong] Yes, the avro schema builder does not allow same name field 
names, even if they are in different scope (different layer).

[~maver1ck] I have fixed the nullability and precision problem, please check.

> Remove the "record_" field name prefix for Confluent Avro format 
> deserialization
> 
>
> Key: FLINK-19779
> URL: https://issues.apache.org/jira/browse/FLINK-19779
> Project: Flink
>  Issue Type: Bug
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>Affects Versions: 1.12.0
>Reporter: Danny Chen
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.12.0
>
>
> Reported by Maciej Bryński :
> Problem is this is not compatible. I'm unable to read anything from Kafka 
> using Confluent Registry. Example:
> I have data in Kafka with following value schema:
> {code:java}
> {
>   "type": "record",
>   "name": "myrecord",
>   "fields": [
> {
>   "name": "f1",
>   "type": "string"
> }
>   ]
> }
> {code}
> I'm creating table using this avro-confluent format:
> {code:sql}
> create table `test` (
>   `f1` STRING
> ) WITH (
>   'connector' = 'kafka', 
>   'topic' = 'test', 
>   'properties.bootstrap.servers' = 'localhost:9092', 
>   'properties.group.id' = 'test1234', 
>'scan.startup.mode' = 'earliest-offset', 
>   'format' = 'avro-confluent'
>   'avro-confluent.schema-registry.url' = 'http://localhost:8081'
> );
> {code}
> When trying to select data I'm getting error:
> {code:noformat}
> SELECT * FROM test;
> [ERROR] Could not execute SQL statement. Reason:
> org.apache.avro.AvroTypeException: Found myrecord, expecting record, missing 
> required field record_f1
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-19779) Remove the "record_" field name prefix for Confluent Avro format deserialization

2020-10-23 Thread Danny Chen (Jira)
Danny Chen created FLINK-19779:
--

 Summary: Remove the "record_" field name prefix for Confluent Avro 
format deserialization
 Key: FLINK-19779
 URL: https://issues.apache.org/jira/browse/FLINK-19779
 Project: Flink
  Issue Type: Bug
  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
Affects Versions: 1.12.0
Reporter: Danny Chen
 Fix For: 1.12.0


Reported by Maciej Bryński :

Problem is this is not compatible. I'm unable to read anything from Kafka using 
Confluent Registry. Example:
I have data in Kafka with following value schema:


{code:java}
{
  "type": "record",
  "name": "myrecord",
  "fields": [
{
  "name": "f1",
  "type": "string"
}
  ]
}
{code}

I'm creating table using this avro-confluent format:


{code:sql}
create table `test` (
`f1` STRING
) WITH (
  'connector' = 'kafka', 
  'topic' = 'test', 
  'properties.bootstrap.servers' = 'localhost:9092', 
  'properties.group.id' = 'test1234', 
   'scan.startup.mode' = 'earliest-offset', 
  'format' = 'avro-confluent'
  'avro-confluent.schema-registry.url' = 'http://localhost:8081'
);
{code}

When trying to select data I'm getting error:


{code:noformat}
SELECT * FROM test;
[ERROR] Could not execute SQL statement. Reason:
org.apache.avro.AvroTypeException: Found myrecord, expecting record, missing 
required field record_f1
{code}




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-19668) Fix the plan regression by Calcite upgrade to 1.26

2020-10-18 Thread Danny Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19668?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17216424#comment-17216424
 ] 

Danny Chen commented on FLINK-19668:


Another case is 
`org.apache.flink.table.planner.plan.batch.sql.agg.WindowAggregateTest.testReturnTypeInferenceForWindowAgg`
of blink planner.

> Fix the plan regression by Calcite upgrade to 1.26
> --
>
> Key: FLINK-19668
> URL: https://issues.apache.org/jira/browse/FLINK-19668
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.11.3
>Reporter: Danny Chen
>Priority: Major
>
> See org.apache.flink.table.plan.TimeIndicatorConversionTest.testUnion,
> {code:java}
> @Test
>   def testUnion(): Unit = {
> val util = streamTestUtil()
> val t = util.addTable[(Long, Long, Int)]("MyTable", 'rowtime.rowtime, 
> 'long, 'int)
> val result = t.unionAll(t).select('rowtime)
> val expected = unaryNode(
>   "DataStreamCalc",
>   binaryNode(
> "DataStreamUnion",
> streamTableNode(t),
> streamTableNode(t),
> term("all", "true"),
> term("union all", "rowtime, long, int")
>   ),
>   term("select", "rowtime")
> )
> util.verifyTable(result, expected)
>   }
> {code}
> The regression is because the new plan has same cost with the old, thus, the 
> old was not picked as the best.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-19668) Fix the plan regression by Calcite upgrade to 1.26

2020-10-15 Thread Danny Chen (Jira)
Danny Chen created FLINK-19668:
--

 Summary: Fix the plan regression by Calcite upgrade to 1.26
 Key: FLINK-19668
 URL: https://issues.apache.org/jira/browse/FLINK-19668
 Project: Flink
  Issue Type: Bug
  Components: API / Core
Affects Versions: 1.11.3
Reporter: Danny Chen


See org.apache.flink.table.plan.TimeIndicatorConversionTest.testUnion,

{code:java}
@Test
  def testUnion(): Unit = {
val util = streamTestUtil()
val t = util.addTable[(Long, Long, Int)]("MyTable", 'rowtime.rowtime, 
'long, 'int)

val result = t.unionAll(t).select('rowtime)

val expected = unaryNode(
  "DataStreamCalc",
  binaryNode(
"DataStreamUnion",
streamTableNode(t),
streamTableNode(t),
term("all", "true"),
term("union all", "rowtime, long, int")
  ),
  term("select", "rowtime")
)

util.verifyTable(result, expected)
  }
{code}

The regression is because the new plan has same cost with the old, thus, the 
old was not picked as the best.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-19567) JoinITCase.testInnerJoinOutputWithPk is unstable for POJO type inference

2020-10-11 Thread Danny Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19567?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17212104#comment-17212104
 ] 

Danny Chen commented on FLINK-19567:


I encountered this problem when upgrade the planner Calcite version to 1.26.0, 
but the error should not be caused by the upgrade.

> JoinITCase.testInnerJoinOutputWithPk is unstable for POJO type inference
> 
>
> Key: FLINK-19567
> URL: https://issues.apache.org/jira/browse/FLINK-19567
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Legacy Planner
>Affects Versions: 1.11.2
>Reporter: Danny Chen
>Priority: Major
>
> The 
> {{org.apache.flink.table.runtime.stream.table.JoinITCase.testInnerJoinOutputWithPk}}
>  is unstable, it fails randomly in the mvn test but success in the IDEA local 
> runner.
> Here is the stacktrace:
> {code:xml}
> [ERROR] 
> testInnerJoinOutputWithPk(org.apache.flink.table.runtime.stream.table.JoinITCase)
>   Time elapsed: 0.044 s  <<< ERROR!
> org.apache.flink.table.codegen.CodeGenException: Incompatible types of 
> expression and result type. 
> Expression[GeneratedExpression(result$19166,isNull$19167,,GenericType,false)]
>  type is [GenericType], result type is 
> [GenericType]
>   at 
> org.apache.flink.table.codegen.CodeGenerator$$anonfun$generateResultExpression$2.apply(CodeGenerator.scala:379)
>   at 
> org.apache.flink.table.codegen.CodeGenerator$$anonfun$generateResultExpression$2.apply(CodeGenerator.scala:377)
>   at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>   at 
> org.apache.flink.table.codegen.CodeGenerator.generateResultExpression(CodeGenerator.scala:377)
>   at 
> org.apache.flink.table.codegen.CodeGenerator.generateConverterResultExpression(CodeGenerator.scala:295)
>   at 
> org.apache.flink.table.plan.nodes.datastream.StreamScan$class.generateConversionProcessFunction(StreamScan.scala:115)
>   at 
> org.apache.flink.table.plan.nodes.datastream.StreamScan$class.convertToInternalRow(StreamScan.scala:74)
>   at 
> org.apache.flink.table.plan.nodes.datastream.DataStreamScan.convertToInternalRow(DataStreamScan.scala:46)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-19567) JoinITCase.testInnerJoinOutputWithPk is unstable for POJO type inference

2020-10-11 Thread Danny Chen (Jira)
Danny Chen created FLINK-19567:
--

 Summary: JoinITCase.testInnerJoinOutputWithPk is unstable for POJO 
type inference
 Key: FLINK-19567
 URL: https://issues.apache.org/jira/browse/FLINK-19567
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Legacy Planner
Affects Versions: 1.11.2
Reporter: Danny Chen


The 
{{org.apache.flink.table.runtime.stream.table.JoinITCase.testInnerJoinOutputWithPk}}
 is unstable, it fails randomly in the mvn test but success in the IDEA local 
runner.

Here is the stacktrace:

{code:xml}
[ERROR] 
testInnerJoinOutputWithPk(org.apache.flink.table.runtime.stream.table.JoinITCase)
  Time elapsed: 0.044 s  <<< ERROR!
org.apache.flink.table.codegen.CodeGenException: Incompatible types of 
expression and result type. 
Expression[GeneratedExpression(result$19166,isNull$19167,,GenericType,false)]
 type is [GenericType], result type is 
[GenericType]
at 
org.apache.flink.table.codegen.CodeGenerator$$anonfun$generateResultExpression$2.apply(CodeGenerator.scala:379)
at 
org.apache.flink.table.codegen.CodeGenerator$$anonfun$generateResultExpression$2.apply(CodeGenerator.scala:377)
at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at 
org.apache.flink.table.codegen.CodeGenerator.generateResultExpression(CodeGenerator.scala:377)
at 
org.apache.flink.table.codegen.CodeGenerator.generateConverterResultExpression(CodeGenerator.scala:295)
at 
org.apache.flink.table.plan.nodes.datastream.StreamScan$class.generateConversionProcessFunction(StreamScan.scala:115)
at 
org.apache.flink.table.plan.nodes.datastream.StreamScan$class.convertToInternalRow(StreamScan.scala:74)
at 
org.apache.flink.table.plan.nodes.datastream.DataStreamScan.convertToInternalRow(DataStreamScan.scala:46)
{code}




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-16579) Upgrade Calcite version to 1.26 for Flink SQL

2020-10-08 Thread Danny Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-16579?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Danny Chen updated FLINK-16579:
---
Summary: Upgrade Calcite version to 1.26 for Flink SQL  (was: Upgrade 
Calcite version to 1.23 for Flink SQL)

> Upgrade Calcite version to 1.26 for Flink SQL
> -
>
> Key: FLINK-16579
> URL: https://issues.apache.org/jira/browse/FLINK-16579
> Project: Flink
>  Issue Type: Task
>  Components: Table SQL / Planner
>Reporter: Jark Wu
>Priority: Major
>
> A taks to upgrade Calcite version to 1.23.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-19175) Tests in JoinITCase do not test BroadcastHashJoin

2020-09-09 Thread Danny Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19175?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17193344#comment-17193344
 ] 

Danny Chen commented on FLINK-19175:


Nice catch, [~dwysakowicz], can i take this issue ?

> Tests in JoinITCase do not test BroadcastHashJoin
> -
>
> Key: FLINK-19175
> URL: https://issues.apache.org/jira/browse/FLINK-19175
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner, Tests
>Reporter: Dawid Wysakowicz
>Priority: Major
>
> The tests in JoinITCase claim to test the {{BroadcastHashJoin}}, but they 
> actually do not. None of the tables used in the tests have proper statistics 
> therefore, none of the tables meet the threshold for the broadcast join. At 
> the same time the {{ShuffleHashJoin}} is not disabled, therefore they 
> silently fallback to {{ShuffleHashJoin}}.
> In summary none (or at least not all of the tests) are executed for 
> BroadcastHashJoin, but are executed twice for ShuffleHashJoin.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-19133) User provided kafka partitioners are not initialized correctly

2020-09-03 Thread Danny Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19133?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17190573#comment-17190573
 ] 

Danny Chen commented on FLINK-19133:


Hi, [~dwysakowicz], may i take this issue ~

> User provided kafka partitioners are not initialized correctly
> --
>
> Key: FLINK-19133
> URL: https://issues.apache.org/jira/browse/FLINK-19133
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.11.1
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Blocker
> Fix For: 1.12.0, 1.11.2
>
>
> Reported in the ML: 
> https://lists.apache.org/thread.html/r94275a7314d44154eb1ac16237906e0f097e8a9d8a5a937e8dcb5e85%40%3Cdev.flink.apache.org%3E
> If a user provides a partitioner in combination with SerializationSchema it 
> is not initialized correctly and has no access to the parallel instance index 
> or number of parallel instances.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-19127) Provide a replacement of StreamExecutionEnvironment.createRemoteEnvironment for TableEnvironment

2020-09-03 Thread Danny Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19127?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17190505#comment-17190505
 ] 

Danny Chen commented on FLINK-19127:


Thanks, indeed, notebook like zeppelin may need this ~

> Provide a replacement of StreamExecutionEnvironment.createRemoteEnvironment 
> for TableEnvironment
> 
>
> Key: FLINK-19127
> URL: https://issues.apache.org/jira/browse/FLINK-19127
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / API
>Reporter: Timo Walther
>Priority: Major
>
> Connecting to a remote cluster from the unified TableEnvironment is neither 
> tested nor documented. Since StreamExecutionEnvironment is not necessary 
> anymore, users should be able to do the same in TableEnvironment easily. This 
> is in particular useful for interactive sessions that run in an IDE.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-19127) Provide a replacement of StreamExecutionEnvironment.createRemoteEnvironment for TableEnvironment

2020-09-03 Thread Danny Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19127?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17189887#comment-17189887
 ] 

Danny Chen commented on FLINK-19127:


Is there any real use cases that can be shared here ? Who will run interactive 
sessions that run in an IDE ? for testing ?

> Provide a replacement of StreamExecutionEnvironment.createRemoteEnvironment 
> for TableEnvironment
> 
>
> Key: FLINK-19127
> URL: https://issues.apache.org/jira/browse/FLINK-19127
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / API
>Reporter: Timo Walther
>Priority: Major
>
> Connecting to a remote cluster from the unified TableEnvironment is neither 
> tested nor documented. Since StreamExecutionEnvironment is not necessary 
> anymore, users should be able to do the same in TableEnvironment easily. This 
> is in particular useful for interactive sessions that run in an IDE.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-16528) Support Limit push down for Kafka streaming sources

2020-09-02 Thread Danny Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16528?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17189016#comment-17189016
 ] 

Danny Chen commented on FLINK-16528:


What is the semantics that for an unbounded stream to have a fetch limit ? 
These two combination conflicts.

> Support Limit push down for Kafka streaming sources
> ---
>
> Key: FLINK-16528
> URL: https://issues.apache.org/jira/browse/FLINK-16528
> Project: Flink
>  Issue Type: New Feature
>  Components: Table SQL / Ecosystem, Table SQL / Planner
>Reporter: Jark Wu
>Assignee: Jark Wu
>Priority: Major
>  Labels: usability
> Fix For: 1.12.0
>
>
> Currently, a limit query {{SELECT * FROM kafka LIMIT 10}} will be translated 
> into TopN operator and will scan the full data in the source. However, 
> {{LIMIT}} is a very useful feature in SQL CLI to explore data in the source. 
> It doesn't make sense it never stop. 
> We can support such case in streaming mode (ignore the text format):
> {code}
> flink > SELECT * FROM kafka LIMIT 10;
>  kafka_key  |user_name| lang |   created_at
> +-+--+-
>  494227746231685121 | burncaniff  | en   | 2014-07-29 14:07:31.000
>  494227746214535169 | gu8tn   | ja   | 2014-07-29 14:07:31.000
>  494227746219126785 | pequitamedicen  | es   | 2014-07-29 14:07:31.000
>  494227746201931777 | josnyS  | ht   | 2014-07-29 14:07:31.000
>  494227746219110401 | Cafe510 | en   | 2014-07-29 14:07:31.000
>  494227746210332673 | Da_JuanAnd_Only | en   | 2014-07-29 14:07:31.000
>  494227746193956865 | Smile_Kidrauhl6 | pt   | 2014-07-29 14:07:31.000
>  494227750426017793 | CashforeverCD   | en   | 2014-07-29 14:07:32.000
>  494227750396653569 | FilmArsivimiz   | tr   | 2014-07-29 14:07:32.000
>  494227750388256769 | jmolas  | es   | 2014-07-29 14:07:32.000
> (10 rows)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-19108) Stop expanding the identifiers with scope aliased by the system with 'EXPR$' prefix

2020-09-01 Thread Danny Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-19108?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Danny Chen updated FLINK-19108:
---
Description: 
For query

{code:sql}
create view tmp_view as
select * from (
  select f0,
  row_number() over (partition by f0 order by f0 desc) as rowNum
  from source) -- the query would be aliased as "EXPR$1"
  where rowNum = 1
{code}

When validation, the inner query would have alias assigned by the system with 
prefix "EXPR$1", when in the `Expander`, we replace the id in the inner query 
all with this prefix which is wrong because we do not add the alias to the 
inner query anymore.

To solve the problem, skip the expanding of id with "EXPR$" just like how 
{{SqlUtil#deriveAliasFromOrdinal}} added it.

This was introduced by FLINK-18750.

  was:
For query

{code:sql}
create view tmp_view as
select * from (
  select f0,
  row_number() over (partition by f0 order by f0 desc) as rowNum
  from source) -- the query would be aliased as "EXPR$1"
  where rowNum = 1
{code}

When validation, the inner query would have alias assigned by the system with 
prefix "EXPR$1", when in the `Expander`, we replace the id in the inner query 
all with this prefix which is wrong because we do not add the alias to the 
inner query anymore.

To solve the problem, skip the expanding of id with "EXPR$" just like how 
{{SqlUtil#deriveAliasFromOrdinal}} added it.


> Stop expanding the identifiers with scope aliased by the system with 'EXPR$' 
> prefix
> ---
>
> Key: FLINK-19108
> URL: https://issues.apache.org/jira/browse/FLINK-19108
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.12.0, 1.11.2
>Reporter: Danny Chen
>Priority: Major
> Fix For: 1.12.0, 1.11.2
>
>
> For query
> {code:sql}
> create view tmp_view as
> select * from (
>   select f0,
>   row_number() over (partition by f0 order by f0 desc) as rowNum
>   from source) -- the query would be aliased as "EXPR$1"
>   where rowNum = 1
> {code}
> When validation, the inner query would have alias assigned by the system with 
> prefix "EXPR$1", when in the `Expander`, we replace the id in the inner query 
> all with this prefix which is wrong because we do not add the alias to the 
> inner query anymore.
> To solve the problem, skip the expanding of id with "EXPR$" just like how 
> {{SqlUtil#deriveAliasFromOrdinal}} added it.
> This was introduced by FLINK-18750.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-19108) Stop expanding the identifiers with scope aliased by the system with 'EXPR$' prefix

2020-09-01 Thread Danny Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-19108?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Danny Chen updated FLINK-19108:
---
Description: 
For query

{code:sql}
create view tmp_view as
select * from (
  select f0,
  row_number() over (partition by f0 order by f0 desc) as rowNum
  from source) -- the query would be aliased as "EXPR$1"
  where rowNum = 1
{code}

When validation, the inner query would have alias assigned by the system with 
prefix "EXPR$1", when in the `Expander`, we replace the id in the inner query 
all with this prefix which is wrong because we do not add the alias to the 
inner query anymore.

To solve the problem, skip the expanding of id with "EXPR$" just like how 
{{SqlUtil#deriveAliasFromOrdinal}} added it.

  was:
For query

{code:sql}
create view tmp_view as
select * from (
  select f0,
  row_number() over (partition by f0 order by f0 desc) as rowNum
  from source) -- the query would be aliased as "EXPR$1"
  where rowNum = 1
{code}

when validation, the inner would have query alias by the system with prefix 
"EXPR$1", when in the `Expander`, we replace the id in the inner query all with 
this prefix which is wrong because we do not add the alias to the inner query 
anymore.

To solve the problem, skip the expanding of id with "EXPR$" just like how 
{{SqlUtil#deriveAliasFromOrdinal}} added it.


> Stop expanding the identifiers with scope aliased by the system with 'EXPR$' 
> prefix
> ---
>
> Key: FLINK-19108
> URL: https://issues.apache.org/jira/browse/FLINK-19108
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.12.0, 1.11.2
>Reporter: Danny Chen
>Priority: Major
> Fix For: 1.12.0, 1.11.2
>
>
> For query
> {code:sql}
> create view tmp_view as
> select * from (
>   select f0,
>   row_number() over (partition by f0 order by f0 desc) as rowNum
>   from source) -- the query would be aliased as "EXPR$1"
>   where rowNum = 1
> {code}
> When validation, the inner query would have alias assigned by the system with 
> prefix "EXPR$1", when in the `Expander`, we replace the id in the inner query 
> all with this prefix which is wrong because we do not add the alias to the 
> inner query anymore.
> To solve the problem, skip the expanding of id with "EXPR$" just like how 
> {{SqlUtil#deriveAliasFromOrdinal}} added it.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-19108) Stop expanding the identifiers with scope aliased by the system with 'EXPR$' prefix

2020-09-01 Thread Danny Chen (Jira)
Danny Chen created FLINK-19108:
--

 Summary: Stop expanding the identifiers with scope aliased by the 
system with 'EXPR$' prefix
 Key: FLINK-19108
 URL: https://issues.apache.org/jira/browse/FLINK-19108
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / API
Affects Versions: 1.12.0, 1.11.2
Reporter: Danny Chen
 Fix For: 1.12.0, 1.11.2


For query

{code:sql}
create view tmp_view as
select * from (
  select f0,
  row_number() over (partition by f0 order by f0 desc) as rowNum
  from source) -- the query would be aliased as "EXPR$1"
  where rowNum = 1
{code}

when validation, the inner would have query alias by the system with prefix 
"EXPR$1", when in the `Expander`, we replace the id in the inner query all with 
this prefix which is wrong because we do not add the alias to the inner query 
anymore.

To solve the problem, skip the expanding of id with "EXPR$" just like how 
{{SqlUtil#deriveAliasFromOrdinal}} added it.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-19099) consumer kafka message repeat

2020-08-31 Thread Danny Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19099?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17187592#comment-17187592
 ] 

Danny Chen edited comment on FLINK-19099 at 8/31/20, 9:23 AM:
--

Before FLINK-15221, the SQL Kafka connector only supports "at least once" 
semantic, where the records may duplicate when there are failures. You can use 
data stream instead.


was (Author: danny0405):
Before Flink-15221, the SQL Kafka connector only supports "at least once" 
semantic, where the records may duplicate when there are failures. You can use 
data stream instead.

> consumer kafka message repeat
> -
>
> Key: FLINK-19099
> URL: https://issues.apache.org/jira/browse/FLINK-19099
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream, Connectors / Kafka
>Affects Versions: 1.11.0
>Reporter: zouwenlong
>Priority: Major
>
> when taksmanager be killed ,my job consume some message , but the offset in 
> not commit,
> then restart it ,my job consume kafka message repeat,  I used checkpoint  and 
> set 5 seconds ,
> I think this is a very common problem,how to solve this problem?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-19099) consumer kafka message repeat

2020-08-31 Thread Danny Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19099?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17187592#comment-17187592
 ] 

Danny Chen commented on FLINK-19099:


Before Flink-15221, the SQL Kafka connector only supports "at least once" 
semantic, where the records may duplicate when there are failures. You can use 
data stream instead.

> consumer kafka message repeat
> -
>
> Key: FLINK-19099
> URL: https://issues.apache.org/jira/browse/FLINK-19099
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream, Connectors / Kafka
>Affects Versions: 1.11.0
>Reporter: zouwenlong
>Priority: Major
>
> when taksmanager be killed ,my job consume some message , but the offset in 
> not commit,
> then restart it ,my job consume kafka message repeat,  I used checkpoint  and 
> set 5 seconds ,
> I think this is a very common problem,how to solve this problem?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-19101) The SelectivityEstimator throw an NullPointerException when convertValueInterval with string type

2020-08-31 Thread Danny Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19101?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17187586#comment-17187586
 ] 

Danny Chen commented on FLINK-19101:


You may need to provide the data set of the table so that we can re-produce the 
problem.

> The SelectivityEstimator throw an NullPointerException when 
> convertValueInterval with string type
> -
>
> Key: FLINK-19101
> URL: https://issues.apache.org/jira/browse/FLINK-19101
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.10.2, 1.11.1
>Reporter: fa zheng
>Priority: Critical
> Fix For: 1.12.0
>
>
> For a SQL : 
> {code:sql}
> select t1.a, t2.b from t1  join t2  on t1.b=t2.b and t1.b<'2'
> {code}
> It will throw java.lang.NullPointerException because SelectivityEstimator 
> convert character value interval  to string without considering null 
> situation.
> {code:scala}
>   def convertValueInterval(
>   interval: ValueInterval,
>   typeFamily: RelDataTypeFamily): ValueInterval = {
> require(interval != null && typeFamily != null)
> interval match {
>   case ValueInterval.empty | ValueInterval.infinite => interval
>   case _ =>
> val (lower, includeLower) = interval match {
>   case li: WithLower => (li.lower, li.includeLower)
>   case _ => (null, false)
> }
> val (upper, includeUpper) = interval match {
>   case ui: WithUpper => (ui.upper, ui.includeUpper)
>   case _ => (null, false)
> }
> typeFamily match {
>   case SqlTypeFamily.NUMERIC | SqlTypeFamily.BOOLEAN | 
> SqlTypeFamily.DATE |
>SqlTypeFamily.TIME | SqlTypeFamily.TIMESTAMP =>
> ValueInterval(
>   comparableToDouble(lower),
>   comparableToDouble(upper),
>   includeLower,
>   includeUpper)
>   case SqlTypeFamily.CHARACTER =>
> ValueInterval(
>   lower.toString, //It can't call toString directly
>   upper.toString //It can't call toString directly
>   includeLower,
>   includeUpper)
>   case _ => throw new UnsupportedOperationException(s"Unsupported 
> typeFamily: $typeFamily")
> }
> }
>   }
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-19098) Make Rowdata converters public

2020-08-31 Thread Danny Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19098?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17187585#comment-17187585
 ] 

Danny Chen commented on FLINK-19098:


Abstract some common part of the code to reuse is usually a good idea, and 
remember to mark them as {{@Internal}} because it is only used as a tool for 
connectors.

> Make Rowdata converters public
> --
>
> Key: FLINK-19098
> URL: https://issues.apache.org/jira/browse/FLINK-19098
> Project: Flink
>  Issue Type: Improvement
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile), Table 
> SQL / Ecosystem
>Affects Versions: 1.11.0
>Reporter: Brian Zhou
>Priority: Major
>
> Some of the Rowdata converters(SeDer between Rowdata and format objects like 
> GenericRecord/JsonNode) are private or package-private (like Json), this is 
> not easy for other third-party connector projects to utilize to implement its 
> own format factory in Table API.
> For example, Pravega connector is now developing a schema-registry-based 
> format factory. The [Pravega schema 
> registry|https://github.com/pravega/schema-registry] is a rest service 
> similar with confluent registry , but it can help to serialize/deserialize 
> json/avro/protobuf/custom format data. It will help a lot if these converters 
> are public.
> Noticed in FLINK-16048, we have already moved the avro converters out and 
> made them public. Similarly, it should be safe to make at least json and csv 
> format converters public.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-19048) support line format for table connector

2020-08-26 Thread Danny Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19048?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17185080#comment-17185080
 ] 

Danny Chen commented on FLINK-19048:


It seems a duplicate of FLINK-14356

> support line format for table connector
> ---
>
> Key: FLINK-19048
> URL: https://issues.apache.org/jira/browse/FLINK-19048
> Project: Flink
>  Issue Type: New Feature
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile), Table 
> SQL / Ecosystem
>Reporter: badqiu
>Priority: Major
> Attachments: LineFormatFactory.java, 
> LineRowDataDeserializationSchema.java, LineRowDeserializationSchema.java, 
> LineRowFormatFactory.java, LineRowSchemaConverter.java, 
> LineRowSerializationSchema.java
>
>
> Native string data format. No data conversion is done.
> This format is particularly friendly to data without time attributes. With 
> UDF, writing your own data analysis will be much more convenient.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-18988) Continuous query with LATERAL and LIMIT produces wrong result

2020-08-18 Thread Danny Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17180207#comment-17180207
 ] 

Danny Chen commented on FLINK-18988:


Hi, [~fhueske], i would like to take this issue ~ I guess you use the Blink 
planner right ?

> Continuous query with LATERAL and LIMIT produces wrong result
> -
>
> Key: FLINK-18988
> URL: https://issues.apache.org/jira/browse/FLINK-18988
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.11.1
>Reporter: Fabian Hueske
>Priority: Critical
>
> I was trying out the example queries provided in this blog post: 
> [https://materialize.io/lateral-joins-and-demand-driven-queries/] to check if 
> Flink supports the same and found that the queries were translated and 
> executed but produced the wrong result.
> I used the SQL Client and Kafka (running at kafka:9092) to store the table 
> data. I executed the following statements:
> {code:java}
> -- create cities table
> CREATE TABLE cities (
>   name STRING NOT NULL,
>   state STRING NOT NULL,
>   pop INT NOT NULL
> ) WITH (
>   'connector' = 'kafka',
>   'topic' = 'cities',
>   'properties.bootstrap.servers' = 'kafka:9092',
>   'properties.group.id' = 'mygroup', 
>   'scan.startup.mode' = 'earliest-offset',
>   'format' = 'json'
> );
> -- fill cities table
> INSERT INTO cities VALUES
>   ('Los_Angeles', 'CA', 3979576),
>   ('Phoenix', 'AZ', 1680992),
>   ('Houston', 'TX', 2320268),
>   ('San_Diego', 'CA', 1423851),
>   ('San_Francisco', 'CA', 881549),
>   ('New_York', 'NY', 8336817),
>   ('Dallas', 'TX', 1343573),
>   ('San_Antonio', 'TX', 1547253),
>   ('San_Jose', 'CA', 1021795),
>   ('Chicago', 'IL', 2695598),
>   ('Austin', 'TX', 978908);
> -- execute query
> SELECT state, name 
> FROM
>   (SELECT DISTINCT state FROM cities) states,
>   LATERAL (
> SELECT name, pop
> FROM cities
> WHERE state = states.state
> ORDER BY pop
> DESC LIMIT 3
>   );
> -- result
> state  name
>CA   Los_Angeles
>NY  New_York
>IL   Chicago
> -- expected result
> state | name
> --+-
> TX    | Dallas
> AZ    | Phoenix
> IL    | Chicago
> TX    | Houston
> CA    | San_Jose
> NY    | New_York
> CA    | San_Diego
> CA    | Los_Angeles
> TX    | San_Antonio
> {code}
> As you can see from the query result, Flink computes the top3 cities over all 
> states, not for every state individually. Hence, I assume that this is a bug 
> in the query optimizer or one of the rewriting rules.
> There are two valid ways to solve this issue:
>  * Fixing the rewriting rules / optimizer (obviously preferred)
>  * Disabling this feature and throwing an exception



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-18840) Support StatementSet with DataStream API

2020-08-10 Thread Danny Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18840?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17174121#comment-17174121
 ] 

Danny Chen commented on FLINK-18840:


I'm a little confused by this request, what is the case that user converts a 
whole statement set into a {{DataStream}} ? Is there any cases that SQL can not 
cover but only {{DataStream}} could ?

For the statement set itself, it is mainly designed for SQL, we can add some 
APIs to convert a {{DataStream}} into table so that the following statements 
can reference that table, somehow this is kind of unification of the API.

> Support StatementSet with DataStream API
> 
>
> Key: FLINK-18840
> URL: https://issues.apache.org/jira/browse/FLINK-18840
> Project: Flink
>  Issue Type: New Feature
>  Components: Table SQL / API
>Reporter: Timo Walther
>Priority: Major
>
> Currently, users of the {{StreamTableEnvironment}} cannot not translate a 
> {{StatementSet}} to DataStream API.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-18750) SqlValidatorException thrown when select from a view which contains a UDTF call

2020-07-31 Thread Danny Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18750?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17168834#comment-17168834
 ] 

Danny Chen commented on FLINK-18750:


The root cause is that, the `LATERAL` keyword of the View expanded query was 
lost, CALCITE-4077 (release 1.24.0) actually fixed this problem, while we can 
fix this first in Flink.

> SqlValidatorException thrown when select from a view which contains a UDTF 
> call
> ---
>
> Key: FLINK-18750
> URL: https://issues.apache.org/jira/browse/FLINK-18750
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.11.1
>Reporter: Wei Zhong
>Priority: Major
>
> When executing such code:
>  
> {code:java}
> package com.example;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.table.api.EnvironmentSettings;
> import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
> import org.apache.flink.table.functions.TableFunction;
> public class TestUTDF {
>public static class UDTF extends TableFunction {
>   public void eval(String input) {
>  collect(input);
>   }
>}
>public static void main(String[] args) {
>   StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>   StreamTableEnvironment tEnv = StreamTableEnvironment.create(
>  env, EnvironmentSettings.newInstance().useBlinkPlanner().build());
>   tEnv.createTemporarySystemFunction("udtf", new UDTF());
>   tEnv.createTemporaryView("source", tEnv.fromValues("a", "b", 
> "c").as("f0"));
>   String udtfCall = "SELECT S.f0, T.f1 FROM source as S, LATERAL 
> TABLE(udtf(f0)) as T(f1)";
>   System.out.println(tEnv.explainSql(udtfCall));
>   String createViewCall = "CREATE VIEW tmp_view AS" + udtfCall;
>   tEnv.executeSql(createViewCall);
>   System.out.println(tEnv.from("tmp_view").explain());
>}
> }
> {code}
> Such a SqlValidatorException would be thrown:
>  
>  
> {code:java}
> == Abstract Syntax Tree  Abstract Syntax Tree ==LogicalProject(f0=[$0], 
> f1=[$1])+- LogicalCorrelate(correlation=[$cor0], joinType=[inner], 
> requiredColumns=[{0}])   :- LogicalProject(f0=[AS($0, _UTF-16LE'f0')])   :  
> +- LogicalValues(tuples=[[{ _UTF-16LE'a' }, { _UTF-16LE'b' }, { _UTF-16LE'c' 
> }]])   +- LogicalTableFunctionScan(invocation=[udtf($cor0.f0)], 
> rowType=[RecordType(VARCHAR(2147483647) EXPR$0)])
> == Optimized Logical Plan ==Correlate(invocation=[udtf($cor0.f0)], 
> correlate=[table(udtf($cor0.f0))], select=[f0,EXPR$0], 
> rowType=[RecordType(CHAR(1) f0, VARCHAR(2147483647) EXPR$0)], 
> joinType=[INNER])+- Calc(select=[f0])   +- Values(type=[RecordType(CHAR(1) 
> f0)], tuples=[[{ _UTF-16LE'a' }, { _UTF-16LE'b' }, { _UTF-16LE'c' }]])
> == Physical Execution Plan ==Stage 1 : Data Source content : Source: 
> Values(tuples=[[{ _UTF-16LE'a' }, { _UTF-16LE'b' }, { _UTF-16LE'c' }]])
>  Stage 2 : Operator content : Calc(select=[f0]) ship_strategy : FORWARD
>  Stage 3 : Operator content : Correlate(invocation=[udtf($cor0.f0)], 
> correlate=[table(udtf($cor0.f0))], select=[f0,EXPR$0], 
> rowType=[RecordType(CHAR(1) f0, VARCHAR(2147483647) EXPR$0)], 
> joinType=[INNER]) ship_strategy : FORWARDException in thread "main" 
> org.apache.flink.table.api.ValidationException: SQL validation failed. From 
> line 4, column 14 to line 4, column 17: Column 'f0' not found in any table at 
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:146)
>  at 
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl$ToRelContextImpl.expandView(FlinkPlannerImpl.scala:204)
>  at org.apache.calcite.plan.ViewExpanders$1.expandView(ViewExpanders.java:52) 
> at 
> org.apache.flink.table.planner.catalog.SqlCatalogViewTable.convertToRel(SqlCatalogViewTable.java:58)
>  at 
> org.apache.flink.table.planner.plan.schema.ExpandingPreparingTable.expand(ExpandingPreparingTable.java:59)
>  at 
> org.apache.flink.table.planner.plan.schema.ExpandingPreparingTable.toRel(ExpandingPreparingTable.java:55)
>  at 
> org.apache.calcite.rel.core.RelFactories$TableScanFactoryImpl.createScan(RelFactories.java:533)
>  at org.apache.calcite.tools.RelBuilder.scan(RelBuilder.java:1044) at 
> org.apache.calcite.tools.RelBuilder.scan(RelBuilder.java:1068) at 
> org.apache.flink.table.planner.plan.QueryOperationConverter$SingleRelVisitor.visit(QueryOperationConverter.java:349)
>  at 
> org.apache.flink.table.planner.plan.QueryOperationConverter$SingleRelVisitor.visit(QueryOperationConverter.java:152)
>  at 
> org.apache.flink.table.operations.CatalogQueryOperation.accept(CatalogQueryOperation.java:69)
>  at 
> 

[jira] [Issue Comment Deleted] (FLINK-18750) SqlValidatorException thrown when select from a view which contains a UDTF call

2020-07-31 Thread Danny Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-18750?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Danny Chen updated FLINK-18750:
---
Comment: was deleted

(was: The problem is because the example function code `UDTF` does not specify 
the return type, either through the {{#getResultType}} or through the 
{{DataTypeHint}}, thus during validation, the code goes to the 
{{SqlFunction#deriveType}}, when validation the return type of the collection 
table, the referenced column "f0" got a wrong scope (CatalogScope instead of 
JoinScope) thus it can not be resolved correctly.

The scope has beed fixed in CALCITE-4077, which is since Calcite release 
1.24.0.)

> SqlValidatorException thrown when select from a view which contains a UDTF 
> call
> ---
>
> Key: FLINK-18750
> URL: https://issues.apache.org/jira/browse/FLINK-18750
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.11.1
>Reporter: Wei Zhong
>Priority: Major
>
> When executing such code:
>  
> {code:java}
> package com.example;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.table.api.EnvironmentSettings;
> import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
> import org.apache.flink.table.functions.TableFunction;
> public class TestUTDF {
>public static class UDTF extends TableFunction {
>   public void eval(String input) {
>  collect(input);
>   }
>}
>public static void main(String[] args) {
>   StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>   StreamTableEnvironment tEnv = StreamTableEnvironment.create(
>  env, EnvironmentSettings.newInstance().useBlinkPlanner().build());
>   tEnv.createTemporarySystemFunction("udtf", new UDTF());
>   tEnv.createTemporaryView("source", tEnv.fromValues("a", "b", 
> "c").as("f0"));
>   String udtfCall = "SELECT S.f0, T.f1 FROM source as S, LATERAL 
> TABLE(udtf(f0)) as T(f1)";
>   System.out.println(tEnv.explainSql(udtfCall));
>   String createViewCall = "CREATE VIEW tmp_view AS" + udtfCall;
>   tEnv.executeSql(createViewCall);
>   System.out.println(tEnv.from("tmp_view").explain());
>}
> }
> {code}
> Such a SqlValidatorException would be thrown:
>  
>  
> {code:java}
> == Abstract Syntax Tree  Abstract Syntax Tree ==LogicalProject(f0=[$0], 
> f1=[$1])+- LogicalCorrelate(correlation=[$cor0], joinType=[inner], 
> requiredColumns=[{0}])   :- LogicalProject(f0=[AS($0, _UTF-16LE'f0')])   :  
> +- LogicalValues(tuples=[[{ _UTF-16LE'a' }, { _UTF-16LE'b' }, { _UTF-16LE'c' 
> }]])   +- LogicalTableFunctionScan(invocation=[udtf($cor0.f0)], 
> rowType=[RecordType(VARCHAR(2147483647) EXPR$0)])
> == Optimized Logical Plan ==Correlate(invocation=[udtf($cor0.f0)], 
> correlate=[table(udtf($cor0.f0))], select=[f0,EXPR$0], 
> rowType=[RecordType(CHAR(1) f0, VARCHAR(2147483647) EXPR$0)], 
> joinType=[INNER])+- Calc(select=[f0])   +- Values(type=[RecordType(CHAR(1) 
> f0)], tuples=[[{ _UTF-16LE'a' }, { _UTF-16LE'b' }, { _UTF-16LE'c' }]])
> == Physical Execution Plan ==Stage 1 : Data Source content : Source: 
> Values(tuples=[[{ _UTF-16LE'a' }, { _UTF-16LE'b' }, { _UTF-16LE'c' }]])
>  Stage 2 : Operator content : Calc(select=[f0]) ship_strategy : FORWARD
>  Stage 3 : Operator content : Correlate(invocation=[udtf($cor0.f0)], 
> correlate=[table(udtf($cor0.f0))], select=[f0,EXPR$0], 
> rowType=[RecordType(CHAR(1) f0, VARCHAR(2147483647) EXPR$0)], 
> joinType=[INNER]) ship_strategy : FORWARDException in thread "main" 
> org.apache.flink.table.api.ValidationException: SQL validation failed. From 
> line 4, column 14 to line 4, column 17: Column 'f0' not found in any table at 
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:146)
>  at 
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl$ToRelContextImpl.expandView(FlinkPlannerImpl.scala:204)
>  at org.apache.calcite.plan.ViewExpanders$1.expandView(ViewExpanders.java:52) 
> at 
> org.apache.flink.table.planner.catalog.SqlCatalogViewTable.convertToRel(SqlCatalogViewTable.java:58)
>  at 
> org.apache.flink.table.planner.plan.schema.ExpandingPreparingTable.expand(ExpandingPreparingTable.java:59)
>  at 
> org.apache.flink.table.planner.plan.schema.ExpandingPreparingTable.toRel(ExpandingPreparingTable.java:55)
>  at 
> org.apache.calcite.rel.core.RelFactories$TableScanFactoryImpl.createScan(RelFactories.java:533)
>  at org.apache.calcite.tools.RelBuilder.scan(RelBuilder.java:1044) at 
> org.apache.calcite.tools.RelBuilder.scan(RelBuilder.java:1068) at 
> 

[jira] [Commented] (FLINK-18750) SqlValidatorException thrown when select from a view which contains a UDTF call

2020-07-31 Thread Danny Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18750?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17168682#comment-17168682
 ] 

Danny Chen commented on FLINK-18750:


The problem is because the example function code `UDTF` does not specify the 
return type, either through the {{#getResultType}} or through the 
{{DataTypeHint}}, thus during validation, the code goes to the 
{{SqlFunction#deriveType}}, when validation the return type of the collection 
table, the referenced column "f0" got a wrong scope (CatalogScope instead of 
JoinScope) thus it can not be resolved correctly.

The scope has beed fixed in CALCITE-4077, which is since Calcite release 1.24.0.

> SqlValidatorException thrown when select from a view which contains a UDTF 
> call
> ---
>
> Key: FLINK-18750
> URL: https://issues.apache.org/jira/browse/FLINK-18750
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.11.1
>Reporter: Wei Zhong
>Priority: Major
>
> When executing such code:
>  
> {code:java}
> package com.example;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.table.api.EnvironmentSettings;
> import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
> import org.apache.flink.table.functions.TableFunction;
> public class TestUTDF {
>public static class UDTF extends TableFunction {
>   public void eval(String input) {
>  collect(input);
>   }
>}
>public static void main(String[] args) {
>   StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>   StreamTableEnvironment tEnv = StreamTableEnvironment.create(
>  env, EnvironmentSettings.newInstance().useBlinkPlanner().build());
>   tEnv.createTemporarySystemFunction("udtf", new UDTF());
>   tEnv.createTemporaryView("source", tEnv.fromValues("a", "b", 
> "c").as("f0"));
>   String udtfCall = "SELECT S.f0, T.f1 FROM source as S, LATERAL 
> TABLE(udtf(f0)) as T(f1)";
>   System.out.println(tEnv.explainSql(udtfCall));
>   String createViewCall = "CREATE VIEW tmp_view AS" + udtfCall;
>   tEnv.executeSql(createViewCall);
>   System.out.println(tEnv.from("tmp_view").explain());
>}
> }
> {code}
> Such a SqlValidatorException would be thrown:
>  
>  
> {code:java}
> == Abstract Syntax Tree  Abstract Syntax Tree ==LogicalProject(f0=[$0], 
> f1=[$1])+- LogicalCorrelate(correlation=[$cor0], joinType=[inner], 
> requiredColumns=[{0}])   :- LogicalProject(f0=[AS($0, _UTF-16LE'f0')])   :  
> +- LogicalValues(tuples=[[{ _UTF-16LE'a' }, { _UTF-16LE'b' }, { _UTF-16LE'c' 
> }]])   +- LogicalTableFunctionScan(invocation=[udtf($cor0.f0)], 
> rowType=[RecordType(VARCHAR(2147483647) EXPR$0)])
> == Optimized Logical Plan ==Correlate(invocation=[udtf($cor0.f0)], 
> correlate=[table(udtf($cor0.f0))], select=[f0,EXPR$0], 
> rowType=[RecordType(CHAR(1) f0, VARCHAR(2147483647) EXPR$0)], 
> joinType=[INNER])+- Calc(select=[f0])   +- Values(type=[RecordType(CHAR(1) 
> f0)], tuples=[[{ _UTF-16LE'a' }, { _UTF-16LE'b' }, { _UTF-16LE'c' }]])
> == Physical Execution Plan ==Stage 1 : Data Source content : Source: 
> Values(tuples=[[{ _UTF-16LE'a' }, { _UTF-16LE'b' }, { _UTF-16LE'c' }]])
>  Stage 2 : Operator content : Calc(select=[f0]) ship_strategy : FORWARD
>  Stage 3 : Operator content : Correlate(invocation=[udtf($cor0.f0)], 
> correlate=[table(udtf($cor0.f0))], select=[f0,EXPR$0], 
> rowType=[RecordType(CHAR(1) f0, VARCHAR(2147483647) EXPR$0)], 
> joinType=[INNER]) ship_strategy : FORWARDException in thread "main" 
> org.apache.flink.table.api.ValidationException: SQL validation failed. From 
> line 4, column 14 to line 4, column 17: Column 'f0' not found in any table at 
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:146)
>  at 
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl$ToRelContextImpl.expandView(FlinkPlannerImpl.scala:204)
>  at org.apache.calcite.plan.ViewExpanders$1.expandView(ViewExpanders.java:52) 
> at 
> org.apache.flink.table.planner.catalog.SqlCatalogViewTable.convertToRel(SqlCatalogViewTable.java:58)
>  at 
> org.apache.flink.table.planner.plan.schema.ExpandingPreparingTable.expand(ExpandingPreparingTable.java:59)
>  at 
> org.apache.flink.table.planner.plan.schema.ExpandingPreparingTable.toRel(ExpandingPreparingTable.java:55)
>  at 
> org.apache.calcite.rel.core.RelFactories$TableScanFactoryImpl.createScan(RelFactories.java:533)
>  at org.apache.calcite.tools.RelBuilder.scan(RelBuilder.java:1044) at 
> org.apache.calcite.tools.RelBuilder.scan(RelBuilder.java:1068) at 
> 

[jira] [Created] (FLINK-18777) Supports schema registry catalog

2020-07-30 Thread Danny Chen (Jira)
Danny Chen created FLINK-18777:
--

 Summary: Supports schema registry catalog
 Key: FLINK-18777
 URL: https://issues.apache.org/jira/browse/FLINK-18777
 Project: Flink
  Issue Type: Bug
  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
Affects Versions: 1.11.0
Reporter: Danny Chen
 Fix For: 1.12.0


Design doc: 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-125%3A+Confluent+Schema+Registry+Catalog



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-16048) Support read/write confluent schema registry avro data from Kafka

2020-07-23 Thread Danny Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16048?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17164105#comment-17164105
 ] 

Danny Chen commented on FLINK-16048:


Personally i do not have a strong preference too, it seems most of us supports 
avro-confluent, let's use it.

> Support read/write confluent schema registry avro data  from Kafka
> --
>
> Key: FLINK-16048
> URL: https://issues.apache.org/jira/browse/FLINK-16048
> Project: Flink
>  Issue Type: Improvement
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile), Table 
> SQL / Ecosystem
>Affects Versions: 1.11.0
>Reporter: Leonard Xu
>Assignee: Danny Chen
>Priority: Major
>  Labels: pull-request-available, usability
> Fix For: 1.12.0
>
>
> *The background*
> I found SQL Kafka connector can not consume avro data that was serialized by 
> `KafkaAvroSerializer` and only can consume Row data with avro schema because 
> we use `AvroRowDeserializationSchema/AvroRowSerializationSchema` to se/de 
> data in  `AvroRowFormatFactory`. 
> I think we should support this because `KafkaAvroSerializer` is very common 
> in Kafka.
> and someone met same question in stackoverflow[1].
> [[1]https://stackoverflow.com/questions/56452571/caused-by-org-apache-avro-avroruntimeexception-malformed-data-length-is-negat/56478259|https://stackoverflow.com/questions/56452571/caused-by-org-apache-avro-avroruntimeexception-malformed-data-length-is-negat/56478259]
> *The format details*
> _The factory identifier (or format id)_
> There are 2 candidates now ~
> - {{avro-sr}}: the pattern borrowed from KSQL {{JSON_SR}} format [1]
> - {{avro-confluent}}: the pattern borrowed from Clickhouse {{AvroConfluent}} 
> [2]
> Personally i would prefer {{avro-sr}} because it is more concise and the 
> confluent is a company name which i think is not that suitable for a format 
> name.
> _The format attributes_
> || Options || required || Remark ||
> | schema-registry.url | true | URL to connect to schema registry service |
> | schema-registry.subject | false | Subject name to write to the Schema 
> Registry service, required for sink |



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-16048) Support read/write confluent schema registry avro data from Kafka

2020-07-21 Thread Danny Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16048?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17162464#comment-17162464
 ] 

Danny Chen commented on FLINK-16048:


The schema string can be inferred from the DDL table schema, just like what our 
existing avro format does. One thing user need to be in caution is that for the 
sink table, the fields nullability should be strictly same with the avro schema 
but Flink and Avro have different default nullability strategy (Flink default 
nullable & Avro default not).

The Cloudera schema registry has different code path for schema registry, so i 
think there is no possibility it shares the same format id with confluent 
schema registry. But i'm still voting avro-sr if supporting Cloudera avro 
schema is not on our road map, the "confluent" is too verbose, Seth Wiesman has 
the same feeling.

> Support read/write confluent schema registry avro data  from Kafka
> --
>
> Key: FLINK-16048
> URL: https://issues.apache.org/jira/browse/FLINK-16048
> Project: Flink
>  Issue Type: Improvement
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile), Table 
> SQL / Ecosystem
>Affects Versions: 1.11.0
>Reporter: Leonard Xu
>Assignee: Danny Chen
>Priority: Major
>  Labels: pull-request-available, usability
> Fix For: 1.12.0
>
>
> *The background*
> I found SQL Kafka connector can not consume avro data that was serialized by 
> `KafkaAvroSerializer` and only can consume Row data with avro schema because 
> we use `AvroRowDeserializationSchema/AvroRowSerializationSchema` to se/de 
> data in  `AvroRowFormatFactory`. 
> I think we should support this because `KafkaAvroSerializer` is very common 
> in Kafka.
> and someone met same question in stackoverflow[1].
> [[1]https://stackoverflow.com/questions/56452571/caused-by-org-apache-avro-avroruntimeexception-malformed-data-length-is-negat/56478259|https://stackoverflow.com/questions/56452571/caused-by-org-apache-avro-avroruntimeexception-malformed-data-length-is-negat/56478259]
> *The format details*
> _The factory identifier (or format id)_
> There are 2 candidates now ~
> - {{avro-sr}}: the pattern borrowed from KSQL {{JSON_SR}} format [1]
> - {{avro-confluent}}: the pattern borrowed from Clickhouse {{AvroConfluent}} 
> [2]
> Personally i would prefer {{avro-sr}} because it is more concise and the 
> confluent is a company name which i think is not that suitable for a format 
> name.
> _The format attributes_
> || Options || required || Remark ||
> | schema-registry.url | true | URL to connect to schema registry service |
> | schema-registry.subject | false | Subject name to write to the Schema 
> Registry service, required for sink |



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-16048) Support read/write confluent schema registry avro data from Kafka

2020-07-21 Thread Danny Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-16048?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Danny Chen updated FLINK-16048:
---
Description: 
*The background*

I found SQL Kafka connector can not consume avro data that was serialized by 
`KafkaAvroSerializer` and only can consume Row data with avro schema because we 
use `AvroRowDeserializationSchema/AvroRowSerializationSchema` to se/de data in  
`AvroRowFormatFactory`. 

I think we should support this because `KafkaAvroSerializer` is very common in 
Kafka.

and someone met same question in stackoverflow[1].

[[1]https://stackoverflow.com/questions/56452571/caused-by-org-apache-avro-avroruntimeexception-malformed-data-length-is-negat/56478259|https://stackoverflow.com/questions/56452571/caused-by-org-apache-avro-avroruntimeexception-malformed-data-length-is-negat/56478259]

*The format details*

_The factory identifier (or format id)_

There are 2 candidates now ~
- {{avro-sr}}: the pattern borrowed from KSQL {{JSON_SR}} format [1]
- {{avro-confluent}}: the pattern borrowed from Clickhouse {{AvroConfluent}} [2]

Personally i would prefer {{avro-sr}} because it is more concise and the 
confluent is a company name which i think is not that suitable for a format 
name.

_The format attributes_

|| Options || required || Remark ||
| schema-registry.url | true | URL to connect to schema registry service |
| schema-registry.subject | false | Subject name to write to the Schema 
Registry service, required for sink |


  was:
*The background*

I found SQL Kafka connector can not consume avro data that was serialized by 
`KafkaAvroSerializer` and only can consume Row data with avro schema because we 
use `AvroRowDeserializationSchema/AvroRowSerializationSchema` to se/de data in  
`AvroRowFormatFactory`. 

I think we should support this because `KafkaAvroSerializer` is very common in 
Kafka.

and someone met same question in stackoverflow[1].

[[1]https://stackoverflow.com/questions/56452571/caused-by-org-apache-avro-avroruntimeexception-malformed-data-length-is-negat/56478259|https://stackoverflow.com/questions/56452571/caused-by-org-apache-avro-avroruntimeexception-malformed-data-length-is-negat/56478259]

*The format details*

_The factory identifier (or format id)_

There are 2 candidates now ~
- {{avro-sr}}: the pattern borrowed from KSQL {{JSON_SR}} format [1]
- {{avro-confluent}}: the pattern borrowed from Clickhouse {{AvroConfluent}} [2]

Personally i would prefer {{avro-sr}} because it is more concise and the 
confluent is a company name which i think is not that suitable for a format 
name.

_The format attributes_

|| Options || required || Remark ||
| schema-string | true | avro schema string used for (de)serialization |
| schema-registry.url | true | URL to connect to schema registry service |
| schema-registry.subject | false | Subject name to write to the Schema 
Registry service, required for sink |



> Support read/write confluent schema registry avro data  from Kafka
> --
>
> Key: FLINK-16048
> URL: https://issues.apache.org/jira/browse/FLINK-16048
> Project: Flink
>  Issue Type: Improvement
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile), Table 
> SQL / Ecosystem
>Affects Versions: 1.11.0
>Reporter: Leonard Xu
>Assignee: Danny Chen
>Priority: Major
>  Labels: pull-request-available, usability
> Fix For: 1.12.0
>
>
> *The background*
> I found SQL Kafka connector can not consume avro data that was serialized by 
> `KafkaAvroSerializer` and only can consume Row data with avro schema because 
> we use `AvroRowDeserializationSchema/AvroRowSerializationSchema` to se/de 
> data in  `AvroRowFormatFactory`. 
> I think we should support this because `KafkaAvroSerializer` is very common 
> in Kafka.
> and someone met same question in stackoverflow[1].
> [[1]https://stackoverflow.com/questions/56452571/caused-by-org-apache-avro-avroruntimeexception-malformed-data-length-is-negat/56478259|https://stackoverflow.com/questions/56452571/caused-by-org-apache-avro-avroruntimeexception-malformed-data-length-is-negat/56478259]
> *The format details*
> _The factory identifier (or format id)_
> There are 2 candidates now ~
> - {{avro-sr}}: the pattern borrowed from KSQL {{JSON_SR}} format [1]
> - {{avro-confluent}}: the pattern borrowed from Clickhouse {{AvroConfluent}} 
> [2]
> Personally i would prefer {{avro-sr}} because it is more concise and the 
> confluent is a company name which i think is not that suitable for a format 
> name.
> _The format attributes_
> || Options || required || Remark ||
> | schema-registry.url | true | URL to connect to schema registry service |
> | schema-registry.subject | false | Subject name to write to the Schema 
> Registry service, required for sink |



--
This 

[jira] [Comment Edited] (FLINK-16048) Support read/write confluent schema registry avro data from Kafka

2020-07-20 Thread Danny Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16048?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17160999#comment-17160999
 ] 

Danny Chen edited comment on FLINK-16048 at 7/20/20, 8:35 AM:
--

[~ykt836] Yes, we can infer the read schema from the DDL schema, the schema 
string is not necessary.

[~anshul.bansal] We are planning to implement a schema registry catalog with 
what you do not need to create a table explicitly.

[~dwysakowicz], this may be a problem, but do we plan to support cloudera 
schema registry ?


was (Author: danny0405):
[~ykt836] Yes, we can infer the read schema from the DDL schema, the schema 
string is not necessary.

[~anshul.bansal] We are planning to implement a schema registry catalog with 
what you do not need to create a table explicitly.

[~dwysakowicz], this may be a problem, but do we plan to support cloudra schema 
registry ?

> Support read/write confluent schema registry avro data  from Kafka
> --
>
> Key: FLINK-16048
> URL: https://issues.apache.org/jira/browse/FLINK-16048
> Project: Flink
>  Issue Type: Improvement
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile), Table 
> SQL / Ecosystem
>Affects Versions: 1.11.0
>Reporter: Leonard Xu
>Assignee: Danny Chen
>Priority: Major
>  Labels: pull-request-available, usability
> Fix For: 1.12.0
>
>
> *The background*
> I found SQL Kafka connector can not consume avro data that was serialized by 
> `KafkaAvroSerializer` and only can consume Row data with avro schema because 
> we use `AvroRowDeserializationSchema/AvroRowSerializationSchema` to se/de 
> data in  `AvroRowFormatFactory`. 
> I think we should support this because `KafkaAvroSerializer` is very common 
> in Kafka.
> and someone met same question in stackoverflow[1].
> [[1]https://stackoverflow.com/questions/56452571/caused-by-org-apache-avro-avroruntimeexception-malformed-data-length-is-negat/56478259|https://stackoverflow.com/questions/56452571/caused-by-org-apache-avro-avroruntimeexception-malformed-data-length-is-negat/56478259]
> *The format details*
> _The factory identifier (or format id)_
> There are 2 candidates now ~
> - {{avro-sr}}: the pattern borrowed from KSQL {{JSON_SR}} format [1]
> - {{avro-confluent}}: the pattern borrowed from Clickhouse {{AvroConfluent}} 
> [2]
> Personally i would prefer {{avro-sr}} because it is more concise and the 
> confluent is a company name which i think is not that suitable for a format 
> name.
> _The format attributes_
> || Options || required || Remark ||
> | schema-string | true | avro schema string used for (de)serialization |
> | schema-registry.url | true | URL to connect to schema registry service |
> | schema-registry.subject | false | Subject name to write to the Schema 
> Registry service, required for sink |



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-16048) Support read/write confluent schema registry avro data from Kafka

2020-07-20 Thread Danny Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16048?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17160999#comment-17160999
 ] 

Danny Chen edited comment on FLINK-16048 at 7/20/20, 8:34 AM:
--

[~ykt836] Yes, we can infer the read schema from the DDL schema, the schema 
string is not necessary.

[~anshul.bansal] We are planning to implement a schema registry catalog with 
what you do not need to create a table explicitly.

[~dwysakowicz], this may be a problem, but do we plan to support cloudra schema 
registry ?


was (Author: danny0405):
[~ykt836] This PR contributes 2 formats of schema registry avro, one way to 
remove the schema string is to pass in a schema id or the subject name, do the 
inference when the format was instantiated.

We can fetch the avro schema string of the compile time but that was in the 
scope of schema inference, i think it is not good practice to let the format do 
a schema inference, the format se/de schema should be solid. The inference work 
should be done by the framework, either the DDL or the Catalog.

[~dwysakowicz] Personally i agree the schema-string  should not be user 
provided option, but we actually can not recover a schema string from a 
TableSchema, any good ideas ?

One way i can think of is let user specify the subject name PLUS the schema 
version, what do you think ?

> Support read/write confluent schema registry avro data  from Kafka
> --
>
> Key: FLINK-16048
> URL: https://issues.apache.org/jira/browse/FLINK-16048
> Project: Flink
>  Issue Type: Improvement
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile), Table 
> SQL / Ecosystem
>Affects Versions: 1.11.0
>Reporter: Leonard Xu
>Assignee: Danny Chen
>Priority: Major
>  Labels: pull-request-available, usability
> Fix For: 1.12.0
>
>
> *The background*
> I found SQL Kafka connector can not consume avro data that was serialized by 
> `KafkaAvroSerializer` and only can consume Row data with avro schema because 
> we use `AvroRowDeserializationSchema/AvroRowSerializationSchema` to se/de 
> data in  `AvroRowFormatFactory`. 
> I think we should support this because `KafkaAvroSerializer` is very common 
> in Kafka.
> and someone met same question in stackoverflow[1].
> [[1]https://stackoverflow.com/questions/56452571/caused-by-org-apache-avro-avroruntimeexception-malformed-data-length-is-negat/56478259|https://stackoverflow.com/questions/56452571/caused-by-org-apache-avro-avroruntimeexception-malformed-data-length-is-negat/56478259]
> *The format details*
> _The factory identifier (or format id)_
> There are 2 candidates now ~
> - {{avro-sr}}: the pattern borrowed from KSQL {{JSON_SR}} format [1]
> - {{avro-confluent}}: the pattern borrowed from Clickhouse {{AvroConfluent}} 
> [2]
> Personally i would prefer {{avro-sr}} because it is more concise and the 
> confluent is a company name which i think is not that suitable for a format 
> name.
> _The format attributes_
> || Options || required || Remark ||
> | schema-string | true | avro schema string used for (de)serialization |
> | schema-registry.url | true | URL to connect to schema registry service |
> | schema-registry.subject | false | Subject name to write to the Schema 
> Registry service, required for sink |



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-16048) Support read/write confluent schema registry avro data from Kafka

2020-07-20 Thread Danny Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16048?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17160999#comment-17160999
 ] 

Danny Chen edited comment on FLINK-16048 at 7/20/20, 8:13 AM:
--

[~ykt836] This PR contributes 2 formats of schema registry avro, one way to 
remove the schema string is to pass in a schema id or the subject name, do the 
inference when the format was instantiated.

We can fetch the avro schema string of the compile time but that was in the 
scope of schema inference, i think it is not good practice to let the format do 
a schema inference, the format se/de schema should be solid. The inference work 
should be done by the framework, either the DDL or the Catalog.

[~dwysakowicz] Personally i agree the schema-string  should not be user 
provided option, but we actually can not recover a schema string from a 
TableSchema, any good ideas ?

One way i can think of is let user specify the subject name PLUS the schema 
version, what do you think ?


was (Author: danny0405):
[~ykt836] This PR contributes 2 formats of schema registry avro, one way to 
remove the schema string is to pass in a schema id or the subject name, do the 
inference when the format was instantiated.

We can fetch the avro schema string of the compile time but that was in the 
scope of schema inference, i think it is not good practice to let the format do 
a schema inference, the format se/de schema should be solid. The inference work 
should be done by the framework, either the DDL or the Catalog.

[~dwysakowicz] Personally i agree the schema-string  should not be user 
provided option, but we actually can not recover a schema string from a 
TableSchema, any good ideas ?



> Support read/write confluent schema registry avro data  from Kafka
> --
>
> Key: FLINK-16048
> URL: https://issues.apache.org/jira/browse/FLINK-16048
> Project: Flink
>  Issue Type: Improvement
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile), Table 
> SQL / Ecosystem
>Affects Versions: 1.11.0
>Reporter: Leonard Xu
>Assignee: Danny Chen
>Priority: Major
>  Labels: pull-request-available, usability
> Fix For: 1.12.0
>
>
> *The background*
> I found SQL Kafka connector can not consume avro data that was serialized by 
> `KafkaAvroSerializer` and only can consume Row data with avro schema because 
> we use `AvroRowDeserializationSchema/AvroRowSerializationSchema` to se/de 
> data in  `AvroRowFormatFactory`. 
> I think we should support this because `KafkaAvroSerializer` is very common 
> in Kafka.
> and someone met same question in stackoverflow[1].
> [[1]https://stackoverflow.com/questions/56452571/caused-by-org-apache-avro-avroruntimeexception-malformed-data-length-is-negat/56478259|https://stackoverflow.com/questions/56452571/caused-by-org-apache-avro-avroruntimeexception-malformed-data-length-is-negat/56478259]
> *The format details*
> _The factory identifier (or format id)_
> There are 2 candidates now ~
> - {{avro-sr}}: the pattern borrowed from KSQL {{JSON_SR}} format [1]
> - {{avro-confluent}}: the pattern borrowed from Clickhouse {{AvroConfluent}} 
> [2]
> Personally i would prefer {{avro-sr}} because it is more concise and the 
> confluent is a company name which i think is not that suitable for a format 
> name.
> _The format attributes_
> || Options || required || Remark ||
> | schema-string | true | avro schema string used for (de)serialization |
> | schema-registry.url | true | URL to connect to schema registry service |
> | schema-registry.subject | false | Subject name to write to the Schema 
> Registry service, required for sink |



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-16048) Support read/write confluent schema registry avro data from Kafka

2020-07-20 Thread Danny Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16048?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17160999#comment-17160999
 ] 

Danny Chen commented on FLINK-16048:


[~ykt836] This PR contributes 2 formats of schema registry avro, one way to 
remove the schema string is to pass in a schema id or the subject name, do the 
inference when the format was instantiated.

We can fetch the avro schema string of the compile time but that was in the 
scope of schema inference, i think it is not good practice to let the format do 
a schema inference, the format se/de schema should be solid. The inference work 
should be done by the framework, either the DDL or the Catalog.

[~dwysakowicz] Personally i agree the schema-string  should not be user 
provided option, but we actually can not recover a schema string from a 
TableSchema, any good ideas ?



> Support read/write confluent schema registry avro data  from Kafka
> --
>
> Key: FLINK-16048
> URL: https://issues.apache.org/jira/browse/FLINK-16048
> Project: Flink
>  Issue Type: Improvement
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile), Table 
> SQL / Ecosystem
>Affects Versions: 1.11.0
>Reporter: Leonard Xu
>Assignee: Danny Chen
>Priority: Major
>  Labels: pull-request-available, usability
> Fix For: 1.12.0
>
>
> *The background*
> I found SQL Kafka connector can not consume avro data that was serialized by 
> `KafkaAvroSerializer` and only can consume Row data with avro schema because 
> we use `AvroRowDeserializationSchema/AvroRowSerializationSchema` to se/de 
> data in  `AvroRowFormatFactory`. 
> I think we should support this because `KafkaAvroSerializer` is very common 
> in Kafka.
> and someone met same question in stackoverflow[1].
> [[1]https://stackoverflow.com/questions/56452571/caused-by-org-apache-avro-avroruntimeexception-malformed-data-length-is-negat/56478259|https://stackoverflow.com/questions/56452571/caused-by-org-apache-avro-avroruntimeexception-malformed-data-length-is-negat/56478259]
> *The format details*
> _The factory identifier (or format id)_
> There are 2 candidates now ~
> - {{avro-sr}}: the pattern borrowed from KSQL {{JSON_SR}} format [1]
> - {{avro-confluent}}: the pattern borrowed from Clickhouse {{AvroConfluent}} 
> [2]
> Personally i would prefer {{avro-sr}} because it is more concise and the 
> confluent is a company name which i think is not that suitable for a format 
> name.
> _The format attributes_
> || Options || required || Remark ||
> | schema-string | true | avro schema string used for (de)serialization |
> | schema-registry.url | true | URL to connect to schema registry service |
> | schema-registry.subject | false | Subject name to write to the Schema 
> Registry service, required for sink |



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-16048) Support read/write confluent schema registry avro data from Kafka

2020-07-17 Thread Danny Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-16048?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Danny Chen updated FLINK-16048:
---
Description: 
*The background*

I found SQL Kafka connector can not consume avro data that was serialized by 
`KafkaAvroSerializer` and only can consume Row data with avro schema because we 
use `AvroRowDeserializationSchema/AvroRowSerializationSchema` to se/de data in  
`AvroRowFormatFactory`. 

I think we should support this because `KafkaAvroSerializer` is very common in 
Kafka.

and someone met same question in stackoverflow[1].

[[1]https://stackoverflow.com/questions/56452571/caused-by-org-apache-avro-avroruntimeexception-malformed-data-length-is-negat/56478259|https://stackoverflow.com/questions/56452571/caused-by-org-apache-avro-avroruntimeexception-malformed-data-length-is-negat/56478259]

*The format details*

_The factory identifier (or format id)_

There are 2 candidates now ~
- {{avro-sr}}: the pattern borrowed from KSQL {{JSON_SR}} format [1]
- {{avro-confluent}}: the pattern borrowed from Clickhouse {{AvroConfluent}} [2]

Personally i would prefer {{avro-sr}} because it is more concise and the 
confluent is a company name which i think is not that suitable for a format 
name.

_The format attributes_

|| Options || required || Remark ||
| schema-string | true | avro schema string used for (de)serialization |
| schema-registry.url | true | URL to connect to schema registry service |
| schema-registry.subject | false | Subject name to write to the Schema 
Registry service, required for sink |


  was:
*The background*

I found SQL Kafka connector can not consume avro data that was serialized by 
`KafkaAvroSerializer` and only can consume Row data with avro schema because we 
use `AvroRowDeserializationSchema/AvroRowSerializationSchema` to se/de data in  
`AvroRowFormatFactory`. 

I think we should support this because `KafkaAvroSerializer` is very common in 
Kafka.

and someone met same question in stackoverflow[1].

[[1]https://stackoverflow.com/questions/56452571/caused-by-org-apache-avro-avroruntimeexception-malformed-data-length-is-negat/56478259|https://stackoverflow.com/questions/56452571/caused-by-org-apache-avro-avroruntimeexception-malformed-data-length-is-negat/56478259]

*The format details*

_The factory identifier_

There are 2 candidates now ~
- {{avro-sr}}: the pattern borrowed from KSQL {{JSON_SR}} format [1]
- {{avro-confluent}}: the pattern borrowed from Clickhouse {{AvroConfluent}} [2]

Personally i would prefer {{avro-sr}} because it is more concise and the 
confluent is a company name which i think is not that suitable for a format 
name.

_The format attributes_

|| Options || required || Remark ||
| schema-string | true | avro schema string used for (de)serialization |
| schema-registry.url | true | URL to connect to schema registry service |
| schema-registry.subject | false | Subject name to write to the Schema 
Registry service, required for sink |



> Support read/write confluent schema registry avro data  from Kafka
> --
>
> Key: FLINK-16048
> URL: https://issues.apache.org/jira/browse/FLINK-16048
> Project: Flink
>  Issue Type: Improvement
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile), Table 
> SQL / Ecosystem
>Affects Versions: 1.11.0
>Reporter: Leonard Xu
>Assignee: Danny Chen
>Priority: Major
>  Labels: usability
> Fix For: 1.12.0
>
>
> *The background*
> I found SQL Kafka connector can not consume avro data that was serialized by 
> `KafkaAvroSerializer` and only can consume Row data with avro schema because 
> we use `AvroRowDeserializationSchema/AvroRowSerializationSchema` to se/de 
> data in  `AvroRowFormatFactory`. 
> I think we should support this because `KafkaAvroSerializer` is very common 
> in Kafka.
> and someone met same question in stackoverflow[1].
> [[1]https://stackoverflow.com/questions/56452571/caused-by-org-apache-avro-avroruntimeexception-malformed-data-length-is-negat/56478259|https://stackoverflow.com/questions/56452571/caused-by-org-apache-avro-avroruntimeexception-malformed-data-length-is-negat/56478259]
> *The format details*
> _The factory identifier (or format id)_
> There are 2 candidates now ~
> - {{avro-sr}}: the pattern borrowed from KSQL {{JSON_SR}} format [1]
> - {{avro-confluent}}: the pattern borrowed from Clickhouse {{AvroConfluent}} 
> [2]
> Personally i would prefer {{avro-sr}} because it is more concise and the 
> confluent is a company name which i think is not that suitable for a format 
> name.
> _The format attributes_
> || Options || required || Remark ||
> | schema-string | true | avro schema string used for (de)serialization |
> | schema-registry.url | true | URL to connect to schema registry service |
> | 

[jira] [Commented] (FLINK-16048) Support read/write confluent schema registry avro data from Kafka

2020-07-17 Thread Danny Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16048?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17159702#comment-17159702
 ] 

Danny Chen commented on FLINK-16048:


I'm hoping to have a consensus about the format id and it's attributes before 
firing the PR, thanks in advance for your suggestions ~

> Support read/write confluent schema registry avro data  from Kafka
> --
>
> Key: FLINK-16048
> URL: https://issues.apache.org/jira/browse/FLINK-16048
> Project: Flink
>  Issue Type: Improvement
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile), Table 
> SQL / Ecosystem
>Affects Versions: 1.11.0
>Reporter: Leonard Xu
>Assignee: Danny Chen
>Priority: Major
>  Labels: usability
> Fix For: 1.12.0
>
>
> *The background*
> I found SQL Kafka connector can not consume avro data that was serialized by 
> `KafkaAvroSerializer` and only can consume Row data with avro schema because 
> we use `AvroRowDeserializationSchema/AvroRowSerializationSchema` to se/de 
> data in  `AvroRowFormatFactory`. 
> I think we should support this because `KafkaAvroSerializer` is very common 
> in Kafka.
> and someone met same question in stackoverflow[1].
> [[1]https://stackoverflow.com/questions/56452571/caused-by-org-apache-avro-avroruntimeexception-malformed-data-length-is-negat/56478259|https://stackoverflow.com/questions/56452571/caused-by-org-apache-avro-avroruntimeexception-malformed-data-length-is-negat/56478259]
> *The format details*
> _The factory identifier_
> There are 2 candidates now ~
> - {{avro-sr}}: the pattern borrowed from KSQL {{JSON_SR}} format [1]
> - {{avro-confluent}}: the pattern borrowed from Clickhouse {{AvroConfluent}} 
> [2]
> Personally i would prefer {{avro-sr}} because it is more concise and the 
> confluent is a company name which i think is not that suitable for a format 
> name.
> _The format attributes_
> || Options || required || Remark ||
> | schema-string | true | avro schema string used for (de)serialization |
> | schema-registry.url | true | URL to connect to schema registry service |
> | schema-registry.subject | false | Subject name to write to the Schema 
> Registry service, required for sink |



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-16048) Support read/write confluent schema registry avro data from Kafka

2020-07-16 Thread Danny Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-16048?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Danny Chen updated FLINK-16048:
---
Description: 
*The background*

I found SQL Kafka connector can not consume avro data that was serialized by 
`KafkaAvroSerializer` and only can consume Row data with avro schema because we 
use `AvroRowDeserializationSchema/AvroRowSerializationSchema` to se/de data in  
`AvroRowFormatFactory`. 

I think we should support this because `KafkaAvroSerializer` is very common in 
Kafka.

and someone met same question in stackoverflow[1].

[[1]https://stackoverflow.com/questions/56452571/caused-by-org-apache-avro-avroruntimeexception-malformed-data-length-is-negat/56478259|https://stackoverflow.com/questions/56452571/caused-by-org-apache-avro-avroruntimeexception-malformed-data-length-is-negat/56478259]

*The format details*

The factory identifier

There are 2 candidates now ~
- {{avro-sr}}: the pattern borrowed from KSQL {{JSON_SR}} format [1]
- {{avro-confluent}}: the pattern borrowed from Clickhouse {{AvroConfluent}} [2]

Personally i would prefer {{avro-sr}} because it is more concise and the 
confluent is a company name which i think is not that suitable for a format 
name.

The format attributes

|| Options || required || Remark ||
| schema-string | true | avro schema string used for (de)serialization |
| schema-registry.url | true | URL to connect to schema registry service |
| schema-registry.subject | false | Subject name to write to the Schema 
Registry service, required for sink |


  was:
I found SQL Kafka connector can not consume avro data that was serialized by 
`KafkaAvroSerializer` and only can consume Row data with avro schema because we 
use `AvroRowDeserializationSchema/AvroRowSerializationSchema` to se/de data in  
`AvroRowFormatFactory`. 

I think we should support this because `KafkaAvroSerializer` is very common in 
Kafka.

and someone met same question in stackoverflow[1].

[[1]https://stackoverflow.com/questions/56452571/caused-by-org-apache-avro-avroruntimeexception-malformed-data-length-is-negat/56478259|https://stackoverflow.com/questions/56452571/caused-by-org-apache-avro-avroruntimeexception-malformed-data-length-is-negat/56478259]


> Support read/write confluent schema registry avro data  from Kafka
> --
>
> Key: FLINK-16048
> URL: https://issues.apache.org/jira/browse/FLINK-16048
> Project: Flink
>  Issue Type: Improvement
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile), Table 
> SQL / Ecosystem
>Affects Versions: 1.11.0
>Reporter: Leonard Xu
>Assignee: Danny Chen
>Priority: Major
>  Labels: usability
> Fix For: 1.12.0
>
>
> *The background*
> I found SQL Kafka connector can not consume avro data that was serialized by 
> `KafkaAvroSerializer` and only can consume Row data with avro schema because 
> we use `AvroRowDeserializationSchema/AvroRowSerializationSchema` to se/de 
> data in  `AvroRowFormatFactory`. 
> I think we should support this because `KafkaAvroSerializer` is very common 
> in Kafka.
> and someone met same question in stackoverflow[1].
> [[1]https://stackoverflow.com/questions/56452571/caused-by-org-apache-avro-avroruntimeexception-malformed-data-length-is-negat/56478259|https://stackoverflow.com/questions/56452571/caused-by-org-apache-avro-avroruntimeexception-malformed-data-length-is-negat/56478259]
> *The format details*
> The factory identifier
> There are 2 candidates now ~
> - {{avro-sr}}: the pattern borrowed from KSQL {{JSON_SR}} format [1]
> - {{avro-confluent}}: the pattern borrowed from Clickhouse {{AvroConfluent}} 
> [2]
> Personally i would prefer {{avro-sr}} because it is more concise and the 
> confluent is a company name which i think is not that suitable for a format 
> name.
> The format attributes
> || Options || required || Remark ||
> | schema-string | true | avro schema string used for (de)serialization |
> | schema-registry.url | true | URL to connect to schema registry service |
> | schema-registry.subject | false | Subject name to write to the Schema 
> Registry service, required for sink |



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-16048) Support read/write confluent schema registry avro data from Kafka

2020-07-16 Thread Danny Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-16048?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Danny Chen updated FLINK-16048:
---
Description: 
*The background*

I found SQL Kafka connector can not consume avro data that was serialized by 
`KafkaAvroSerializer` and only can consume Row data with avro schema because we 
use `AvroRowDeserializationSchema/AvroRowSerializationSchema` to se/de data in  
`AvroRowFormatFactory`. 

I think we should support this because `KafkaAvroSerializer` is very common in 
Kafka.

and someone met same question in stackoverflow[1].

[[1]https://stackoverflow.com/questions/56452571/caused-by-org-apache-avro-avroruntimeexception-malformed-data-length-is-negat/56478259|https://stackoverflow.com/questions/56452571/caused-by-org-apache-avro-avroruntimeexception-malformed-data-length-is-negat/56478259]

*The format details*

_The factory identifier_

There are 2 candidates now ~
- {{avro-sr}}: the pattern borrowed from KSQL {{JSON_SR}} format [1]
- {{avro-confluent}}: the pattern borrowed from Clickhouse {{AvroConfluent}} [2]

Personally i would prefer {{avro-sr}} because it is more concise and the 
confluent is a company name which i think is not that suitable for a format 
name.

_The format attributes_

|| Options || required || Remark ||
| schema-string | true | avro schema string used for (de)serialization |
| schema-registry.url | true | URL to connect to schema registry service |
| schema-registry.subject | false | Subject name to write to the Schema 
Registry service, required for sink |


  was:
*The background*

I found SQL Kafka connector can not consume avro data that was serialized by 
`KafkaAvroSerializer` and only can consume Row data with avro schema because we 
use `AvroRowDeserializationSchema/AvroRowSerializationSchema` to se/de data in  
`AvroRowFormatFactory`. 

I think we should support this because `KafkaAvroSerializer` is very common in 
Kafka.

and someone met same question in stackoverflow[1].

[[1]https://stackoverflow.com/questions/56452571/caused-by-org-apache-avro-avroruntimeexception-malformed-data-length-is-negat/56478259|https://stackoverflow.com/questions/56452571/caused-by-org-apache-avro-avroruntimeexception-malformed-data-length-is-negat/56478259]

*The format details*

The factory identifier

There are 2 candidates now ~
- {{avro-sr}}: the pattern borrowed from KSQL {{JSON_SR}} format [1]
- {{avro-confluent}}: the pattern borrowed from Clickhouse {{AvroConfluent}} [2]

Personally i would prefer {{avro-sr}} because it is more concise and the 
confluent is a company name which i think is not that suitable for a format 
name.

The format attributes

|| Options || required || Remark ||
| schema-string | true | avro schema string used for (de)serialization |
| schema-registry.url | true | URL to connect to schema registry service |
| schema-registry.subject | false | Subject name to write to the Schema 
Registry service, required for sink |



> Support read/write confluent schema registry avro data  from Kafka
> --
>
> Key: FLINK-16048
> URL: https://issues.apache.org/jira/browse/FLINK-16048
> Project: Flink
>  Issue Type: Improvement
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile), Table 
> SQL / Ecosystem
>Affects Versions: 1.11.0
>Reporter: Leonard Xu
>Assignee: Danny Chen
>Priority: Major
>  Labels: usability
> Fix For: 1.12.0
>
>
> *The background*
> I found SQL Kafka connector can not consume avro data that was serialized by 
> `KafkaAvroSerializer` and only can consume Row data with avro schema because 
> we use `AvroRowDeserializationSchema/AvroRowSerializationSchema` to se/de 
> data in  `AvroRowFormatFactory`. 
> I think we should support this because `KafkaAvroSerializer` is very common 
> in Kafka.
> and someone met same question in stackoverflow[1].
> [[1]https://stackoverflow.com/questions/56452571/caused-by-org-apache-avro-avroruntimeexception-malformed-data-length-is-negat/56478259|https://stackoverflow.com/questions/56452571/caused-by-org-apache-avro-avroruntimeexception-malformed-data-length-is-negat/56478259]
> *The format details*
> _The factory identifier_
> There are 2 candidates now ~
> - {{avro-sr}}: the pattern borrowed from KSQL {{JSON_SR}} format [1]
> - {{avro-confluent}}: the pattern borrowed from Clickhouse {{AvroConfluent}} 
> [2]
> Personally i would prefer {{avro-sr}} because it is more concise and the 
> confluent is a company name which i think is not that suitable for a format 
> name.
> _The format attributes_
> || Options || required || Remark ||
> | schema-string | true | avro schema string used for (de)serialization |
> | schema-registry.url | true | URL to connect to schema registry service |
> | schema-registry.subject | false | Subject name 

[jira] [Commented] (FLINK-16048) Support read/write confluent schema registry avro data from Kafka

2020-07-16 Thread Danny Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16048?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17159687#comment-17159687
 ] 

Danny Chen commented on FLINK-16048:


[~anshul.bansal] [~lingyaKK] I'm working on this now, as part of FLIP-125. We 
might need to have a discussion about the format attributes in this issue, but 
i believe it would be very soon to finish.

[1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-125%3A+Confluent+Schema+Registry+Catalog

> Support read/write confluent schema registry avro data  from Kafka
> --
>
> Key: FLINK-16048
> URL: https://issues.apache.org/jira/browse/FLINK-16048
> Project: Flink
>  Issue Type: Improvement
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile), Table 
> SQL / Ecosystem
>Affects Versions: 1.11.0
>Reporter: Leonard Xu
>Priority: Major
>  Labels: usability
> Fix For: 1.12.0
>
>
> I found SQL Kafka connector can not consume avro data that was serialized by 
> `KafkaAvroSerializer` and only can consume Row data with avro schema because 
> we use `AvroRowDeserializationSchema/AvroRowSerializationSchema` to se/de 
> data in  `AvroRowFormatFactory`. 
> I think we should support this because `KafkaAvroSerializer` is very common 
> in Kafka.
> and someone met same question in stackoverflow[1].
> [[1]https://stackoverflow.com/questions/56452571/caused-by-org-apache-avro-avroruntimeexception-malformed-data-length-is-negat/56478259|https://stackoverflow.com/questions/56452571/caused-by-org-apache-avro-avroruntimeexception-malformed-data-length-is-negat/56478259]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-18440) ROW_NUMBER function: ROW/RANGE not allowed with RANK, DENSE_RANK or ROW_NUMBER functions

2020-07-10 Thread Danny Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18440?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17155338#comment-17155338
 ] 

Danny Chen commented on FLINK-18440:


I have fired a fix in https://github.com/apache/flink/pull/12868.

> ROW_NUMBER function: ROW/RANGE not allowed with RANK, DENSE_RANK or 
> ROW_NUMBER functions
> 
>
> Key: FLINK-18440
> URL: https://issues.apache.org/jira/browse/FLINK-18440
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.11.0
>Reporter: LakeShen
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.11.1
>
> Attachments: image-2020-06-28-18-55-43-692.png
>
>
> When I run flink sql ,the flink sql like this:
> create view test as select  name, eat ,sum(age) as cnt from test_source group 
> by  name,eat;
> create view results as select *, ROW_NUMBER() OVER (PARTITION BY name 
> ORDER BY cnt DESC) as row_num from test;
> create table sink (
> name varchar,
> eat varchar,
> cnt bigint
> )
> with(
> 'connector' = 'print'
> );
> insert into sink select name,eat , cnt from results where  row_num <= 3 ;
> The same sql code I could run success in flink 1.10, now I change the flink 
> version into flink 1.11, it throw the exception.
> Exception in thread "main" org.apache.flink.table.api.ValidationException: 
> SQL validation failed. From line 1, column 124 to line 1, column 127: 
> ROW/RANGE not allowed with RANK, DENSE_RANK or ROW_NUMBER functions
>   at 
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:146)
>   at 
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl$ToRelContextImpl.expandView(FlinkPlannerImpl.scala:204)
>   at 
> org.apache.calcite.plan.ViewExpanders$1.expandView(ViewExpanders.java:52)
>   at 
> org.apache.flink.table.planner.catalog.SqlCatalogViewTable.convertToRel(SqlCatalogViewTable.java:58)
>   at 
> org.apache.flink.table.planner.plan.schema.ExpandingPreparingTable.expand(ExpandingPreparingTable.java:59)
>   at 
> org.apache.flink.table.planner.plan.schema.ExpandingPreparingTable.toRel(ExpandingPreparingTable.java:55)
>   at 
> org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3492)
>   at 
> org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2415)
>   at 
> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2102)
>   at 
> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051)
>   at 
> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:661)
>   at 
> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:642)
>   at 
> org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3345)
>   at 
> org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:568)
>   at 
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:164)
>   at 
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:151)
>   at 
> org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:773)
>   at 
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:745)
>   at 
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:238)
>   at 
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:527)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-18440) ROW_NUMBER function: ROW/RANGE not allowed with RANK, DENSE_RANK or ROW_NUMBER functions

2020-07-10 Thread Danny Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18440?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17155333#comment-17155333
 ] 

Danny Chen commented on FLINK-18440:


After some code research, i found that the SqlWindow bounds state was mutated 
during sql-to-rel conversion in Calcite release-1.22, this has been fixed in 
https://issues.apache.org/jira/browse/CALCITE-3877.

See this logic in SqlValidatorImpl#resolveWindow of 1.22 Calcite:

{code:java}
if (populateBounds) {
  window.populateBounds();
}
{code}
The flag from SqlToRelConverter is always true.

I can figure out 2 ways to fix this problem:
1. We can resolve this problem when upgrade to Calcite 1.23 or 1.24.
2. In SqlToOperationConverter#convertCreateView, keep the validated sql string 
before it is converted.

I would fire a fix for 2 soon ~

> ROW_NUMBER function: ROW/RANGE not allowed with RANK, DENSE_RANK or 
> ROW_NUMBER functions
> 
>
> Key: FLINK-18440
> URL: https://issues.apache.org/jira/browse/FLINK-18440
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.11.0
>Reporter: LakeShen
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.11.1
>
> Attachments: image-2020-06-28-18-55-43-692.png
>
>
> When I run flink sql ,the flink sql like this:
> create view test as select  name, eat ,sum(age) as cnt from test_source group 
> by  name,eat;
> create view results as select *, ROW_NUMBER() OVER (PARTITION BY name 
> ORDER BY cnt DESC) as row_num from test;
> create table sink (
> name varchar,
> eat varchar,
> cnt bigint
> )
> with(
> 'connector' = 'print'
> );
> insert into sink select name,eat , cnt from results where  row_num <= 3 ;
> The same sql code I could run success in flink 1.10, now I change the flink 
> version into flink 1.11, it throw the exception.
> Exception in thread "main" org.apache.flink.table.api.ValidationException: 
> SQL validation failed. From line 1, column 124 to line 1, column 127: 
> ROW/RANGE not allowed with RANK, DENSE_RANK or ROW_NUMBER functions
>   at 
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:146)
>   at 
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl$ToRelContextImpl.expandView(FlinkPlannerImpl.scala:204)
>   at 
> org.apache.calcite.plan.ViewExpanders$1.expandView(ViewExpanders.java:52)
>   at 
> org.apache.flink.table.planner.catalog.SqlCatalogViewTable.convertToRel(SqlCatalogViewTable.java:58)
>   at 
> org.apache.flink.table.planner.plan.schema.ExpandingPreparingTable.expand(ExpandingPreparingTable.java:59)
>   at 
> org.apache.flink.table.planner.plan.schema.ExpandingPreparingTable.toRel(ExpandingPreparingTable.java:55)
>   at 
> org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3492)
>   at 
> org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2415)
>   at 
> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2102)
>   at 
> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051)
>   at 
> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:661)
>   at 
> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:642)
>   at 
> org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3345)
>   at 
> org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:568)
>   at 
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:164)
>   at 
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:151)
>   at 
> org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:773)
>   at 
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:745)
>   at 
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:238)
>   at 
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:527)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-18500) Make the legacy planner exception more clear when resolving computed columns types for schema

2020-07-06 Thread Danny Chen (Jira)
Danny Chen created FLINK-18500:
--

 Summary: Make the legacy planner exception more clear when 
resolving computed columns types for schema
 Key: FLINK-18500
 URL: https://issues.apache.org/jira/browse/FLINK-18500
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / API
Affects Versions: 1.11.0
Reporter: Danny Chen


>From the user mail:
Hi, all:
i use zeppelin execute sql, FLink version is Flink 1.11 snapshot ,build from 
branch release-1.11 ,commit is 334f35cbd6da754d8b5b294032cd84c858b1f973
when the table type is datagen, Flink will thrown exception ,but the exception 
message is null ;

My DDL is :

{code:sql}
CREATE TABLE datagen_dijie2 (
 f_sequence INT,
 f_random INT,
 f_random_str STRING,
 ts AS localtimestamp,
 WATERMARK FOR ts AS ts
) WITH (
 'connector' = 'datagen',
 'rows-per-second'='5',
 'fields.f_sequence.kind'='sequence',
 'fields.f_sequence.start'='1',
 'fields.f_sequence.end'='1000',
 'fields.f_random.min'='1',
 'fields.f_random.max'='1000',
 'fields.f_random_str.length'='10'
);

{code}

My query sql is :
{code:sql}
select * from datagen_dijie2;
{code}
the exception is :


{noformat}
org.apache.flink.table.api.ValidationException: SQL validation failed. null at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:146)
 at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:108)
 at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:187)
 at 
org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78) 
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:658)
 at 
org.apache.zeppelin.flink.sql.AbstractStreamSqlJob.run(AbstractStreamSqlJob.java:102)
 at 
org.apache.zeppelin.flink.FlinkStreamSqlInterpreter.callInnerSelect(FlinkStreamSqlInterpreter.java:89)
 at 
org.apache.zeppelin.flink.FlinkSqlInterrpeter.callSelect(FlinkSqlInterrpeter.java:526)
 at 
org.apache.zeppelin.flink.FlinkSqlInterrpeter.callCommand(FlinkSqlInterrpeter.java:297)
 at 
org.apache.zeppelin.flink.FlinkSqlInterrpeter.runSqlList(FlinkSqlInterrpeter.java:191)
 at 
org.apache.zeppelin.flink.FlinkSqlInterrpeter.interpret(FlinkSqlInterrpeter.java:156)
 at 
org.apache.zeppelin.interpreter.LazyOpenInterpreter.interpret(LazyOpenInterpreter.java:110)
 at 
org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:776)
 at 
org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:668)
 at org.apache.zeppelin.scheduler.Job.run(Job.java:172) at 
org.apache.zeppelin.scheduler.AbstractScheduler.runJob(AbstractScheduler.java:130)
 at 
org.apache.zeppelin.scheduler.ParallelScheduler.lambda$runJobInScheduler$0(ParallelScheduler.java:39)
 at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
at java.lang.Thread.run(Thread.java:748) Caused by: 
java.lang.UnsupportedOperationException at 
org.apache.flink.table.planner.ParserImpl.parseSqlExpression(ParserImpl.java:86)
 at 
org.apache.flink.table.api.internal.CatalogTableSchemaResolver.resolveExpressionDataType(CatalogTableSchemaResolver.java:119)
 at 
org.apache.flink.table.api.internal.CatalogTableSchemaResolver.resolve(CatalogTableSchemaResolver.java:83)
 at 
org.apache.flink.table.catalog.CatalogManager.resolveTableSchema(CatalogManager.java:380)
 at
{noformat}




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-18487) New table source factory omits unrecognized properties silently

2020-07-05 Thread Danny Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17151719#comment-17151719
 ] 

Danny Chen commented on FLINK-18487:


+1 to have the validation logic in the factory. The FactoryUtil can support 
validation patterns that are common enough.

> New table source factory omits unrecognized properties silently
> ---
>
> Key: FLINK-18487
> URL: https://issues.apache.org/jira/browse/FLINK-18487
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.11.0
>Reporter: Benchao Li
>Priority: Major
>
> For the following DDL, we just omits the unrecognized property 
> 'records-per-second'.
> {code:sql}
> CREATE TABLE MyDataGen (
>   int_field int,
>   double_field double,
>   string_field varchar
> ) WITH (
>   'connector' = 'datagen',
>   'records-per-second' = '1'  -- should be rows-per-second
> )
> {code}
> IMO, we should throw Exceptions to tell users that they used a wrong 
> property. 
>  CC [~jark] [~twalthr]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-18315) Insert into partitioned table can fail with values

2020-06-15 Thread Danny Chen (Jira)
Danny Chen created FLINK-18315:
--

 Summary: Insert into partitioned table can fail with values
 Key: FLINK-18315
 URL: https://issues.apache.org/jira/browse/FLINK-18315
 Project: Flink
  Issue Type: Task
  Components: Table SQL / API
Affects Versions: 1.10.0
Reporter: Danny Chen
 Fix For: 1.11.0






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-18009) Select an invalid column crashes the SQL CLI

2020-06-05 Thread Danny Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18009?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17126478#comment-17126478
 ] 

Danny Chen commented on FLINK-18009:


[~godfreyhe] Please check if this issue is fixed after FLINK-17893 is fixed.

> Select an invalid column crashes the SQL CLI
> 
>
> Key: FLINK-18009
> URL: https://issues.apache.org/jira/browse/FLINK-18009
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Client
>Reporter: Rui Li
>Assignee: Nicholas Jiang
>Priority: Blocker
> Fix For: 1.11.0
>
>
> To reproduce: just select a non-existing column from table in SQL CLI



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-16497) Improve default flush strategy for JDBC sink to make it work out-of-box

2020-06-04 Thread Danny Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16497?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17126373#comment-17126373
 ] 

Danny Chen commented on FLINK-16497:


I think as a popular streaming engine, ensure good throughput and performance 
should be in the first class. Most of the client tools have a default flush 
strategy(either buffer size or interval)[1][2]. We should also follow that.

I would suggest a default flush size (100) and flush interval(1s), it performs 
well for production and in local test 1s is also an acceptable latency.

[1] https://kafka.apache.org/22/documentation.html#producerconfigs
[2] https://github.com/searchbox-io/Jest

> Improve default flush strategy for JDBC sink to make it work out-of-box
> ---
>
> Key: FLINK-16497
> URL: https://issues.apache.org/jira/browse/FLINK-16497
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / JDBC, Table SQL / Ecosystem
>Reporter: Jark Wu
>Priority: Critical
> Fix For: 1.11.0
>
>
> Currently, JDBC sink provides 2 flush options:
> {code}
> 'connector.write.flush.max-rows' = '5000', -- default is 5000
> 'connector.write.flush.interval' = '2s', -- no default value
> {code}
> That means if flush interval is not set, the buffered output rows may not be 
> flushed to database for a long time. That is a surprising behavior because no 
> results are outputed by default. 
> So I propose to have a default flush '1s' interval for JDBC sink or default 1 
> row for flush size. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (FLINK-15126) migrate "show functions" from sql cli to sql parser

2020-06-04 Thread Danny Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15126?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Danny Chen closed FLINK-15126.
--
Resolution: Fixed

> migrate "show functions" from sql cli to sql parser
> ---
>
> Key: FLINK-15126
> URL: https://issues.apache.org/jira/browse/FLINK-15126
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Client, Table SQL / Planner
>Reporter: Bowen Li
>Assignee: Zhenqiu Huang
>Priority: Major
> Fix For: 1.11.0
>
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-13437) Add Hive SQL E2E test

2020-06-04 Thread Danny Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-13437?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Danny Chen updated FLINK-13437:
---
Fix Version/s: (was: 1.10.2)
   (was: 1.11.0)
   1.12.0

> Add Hive SQL E2E test
> -
>
> Key: FLINK-13437
> URL: https://issues.apache.org/jira/browse/FLINK-13437
> Project: Flink
>  Issue Type: Test
>  Components: Connectors / Hive, Tests
>Affects Versions: 1.9.0
>Reporter: Till Rohrmann
>Assignee: Terry Wang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.12.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> We should add an E2E test for the Hive integration: List all tables and read 
> some metadata, read from an existing table, register a new table in Hive, use 
> a registered function, write to an existing table, write to a new table.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-17686) Add document to dataGen, print, blackhole connectors

2020-06-02 Thread Danny Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-17686?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Danny Chen updated FLINK-17686:
---
Priority: Critical  (was: Major)

> Add document to dataGen, print, blackhole connectors
> 
>
> Key: FLINK-17686
> URL: https://issues.apache.org/jira/browse/FLINK-17686
> Project: Flink
>  Issue Type: Sub-task
>  Components: Documentation
>Affects Versions: 1.11.0
>Reporter: Jingsong Lee
>Assignee: Shengkai Fang
>Priority: Critical
> Fix For: 1.11.0
>
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-17831) Add documentation for the new Kafka connector

2020-06-02 Thread Danny Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-17831?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Danny Chen updated FLINK-17831:
---
Priority: Critical  (was: Major)

> Add documentation for the new Kafka connector
> -
>
> Key: FLINK-17831
> URL: https://issues.apache.org/jira/browse/FLINK-17831
> Project: Flink
>  Issue Type: Sub-task
>  Components: Documentation
>Reporter: Jark Wu
>Assignee: Danny Chen
>Priority: Critical
> Fix For: 1.11.0
>
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-17776) Add documentation for DDL in hive dialect

2020-06-02 Thread Danny Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-17776?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Danny Chen updated FLINK-17776:
---
Priority: Critical  (was: Major)

> Add documentation for DDL in hive dialect
> -
>
> Key: FLINK-17776
> URL: https://issues.apache.org/jira/browse/FLINK-17776
> Project: Flink
>  Issue Type: Sub-task
>  Components: Documentation
>Reporter: Jingsong Lee
>Assignee: Rui Li
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.11.0
>
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-16975) Add docs for FileSystem connector

2020-06-02 Thread Danny Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-16975?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Danny Chen updated FLINK-16975:
---
Priority: Critical  (was: Major)

> Add docs for FileSystem connector
> -
>
> Key: FLINK-16975
> URL: https://issues.apache.org/jira/browse/FLINK-16975
> Project: Flink
>  Issue Type: Sub-task
>  Components: Documentation
>Affects Versions: 1.11.0
>Reporter: Leonard Xu
>Assignee: Jingsong Lee
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.11.0
>
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-17599) Update documents due to FLIP-84

2020-06-02 Thread Danny Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-17599?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Danny Chen updated FLINK-17599:
---
Priority: Critical  (was: Major)

> Update documents due to FLIP-84
> ---
>
> Key: FLINK-17599
> URL: https://issues.apache.org/jira/browse/FLINK-17599
> Project: Flink
>  Issue Type: Sub-task
>  Components: Documentation
>Reporter: Kurt Young
>Assignee: godfrey he
>Priority: Critical
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-17832) Add documentation for the new Elasticsearch connector

2020-06-02 Thread Danny Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-17832?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Danny Chen updated FLINK-17832:
---
Priority: Critical  (was: Major)

> Add documentation for the new Elasticsearch connector
> -
>
> Key: FLINK-17832
> URL: https://issues.apache.org/jira/browse/FLINK-17832
> Project: Flink
>  Issue Type: Sub-task
>  Components: Documentation
>Reporter: Jark Wu
>Assignee: Shengkai Fang
>Priority: Critical
> Fix For: 1.11.0
>
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-18065) Add documentation for new scalar/table functions

2020-06-02 Thread Danny Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-18065?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Danny Chen updated FLINK-18065:
---
Priority: Critical  (was: Major)

> Add documentation for new scalar/table functions
> 
>
> Key: FLINK-18065
> URL: https://issues.apache.org/jira/browse/FLINK-18065
> Project: Flink
>  Issue Type: Sub-task
>  Components: Documentation
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Critical
>
> Write documentation for scalar/table functions of FLIP-65.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-17829) Add documentation for the new JDBC connector

2020-06-02 Thread Danny Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-17829?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Danny Chen updated FLINK-17829:
---
Priority: Critical  (was: Major)

> Add documentation for the new JDBC connector
> 
>
> Key: FLINK-17829
> URL: https://issues.apache.org/jira/browse/FLINK-17829
> Project: Flink
>  Issue Type: Sub-task
>  Components: Documentation
>Reporter: Jark Wu
>Assignee: Leonard Xu
>Priority: Critical
> Fix For: 1.11.0
>
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-18066) Add documentation for how to develop a new table source/sink

2020-06-02 Thread Danny Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-18066?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Danny Chen updated FLINK-18066:
---
Priority: Critical  (was: Major)

> Add documentation for how to develop a new table source/sink
> 
>
> Key: FLINK-18066
> URL: https://issues.apache.org/jira/browse/FLINK-18066
> Project: Flink
>  Issue Type: Sub-task
>  Components: Documentation
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Critical
>
> Covers how to write a custom source/sink and format using FLIP-95 interfaces.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-17406) Add documentation about dynamic table options

2020-06-02 Thread Danny Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-17406?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Danny Chen updated FLINK-17406:
---
Priority: Critical  (was: Major)

> Add documentation about dynamic table options
> -
>
> Key: FLINK-17406
> URL: https://issues.apache.org/jira/browse/FLINK-17406
> Project: Flink
>  Issue Type: Sub-task
>  Components: Documentation
>Reporter: Kurt Young
>Assignee: Danny Chen
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.11.0
>
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


  1   2   3   4   >