[jira] [Updated] (FLINK-29090) Fix the code gen for ColumnarMapData and ColumnarArrayData
[ https://issues.apache.org/jira/browse/FLINK-29090?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Danny Chen updated FLINK-29090: --- Description: !image-2022-08-24-10-15-11-824.png|width=589,height=284! Currently, the code generation for {{MapData}} assumes that it is the {{{}GenericMapData{}}}, but the new introduced {{ColumnarMapData}} and {{ColumnarArrayData}} can not be casted to {{{}GenericMapData{}}}. {{ColumnarMapData}} and {{ColumnarArrayData}} are introduced in FLINK-24614 [https://github.com/apache/flink/commit/5c731a37e1a8f71f9c9e813f6c741a1e203fa1a3] How to reproduce: {code:sql} create table parquet_source ( f_map map ) with ( 'connector' = 'filesystem', 'format' = 'parquet' ); select f_map['k1'] from table parquet_source; {code} was: !image-2022-08-24-10-15-11-824.png|width=589,height=284! Currently, the code generation for {{MapData}} assumes that it is the {{{}GenericMapData{}}}, but the new introduced {{ColumnarMapData}} and {{ColumnarArrayData}} can not be casted to {{{}GenericMapData{}}}. {{ColumnarMapData}} and {{ColumnarArrayData}} are introduced in FLINK-24614 [https://github.com/apache/flink/commit/5c731a37e1a8f71f9c9e813f6c741a1e203fa1a3] introduces, How to reproduce: {code:sql} create table parquet_source ( f_map map ) with ( 'connector' = 'filesystem', 'format' = 'parquet' ); select f_map['k1'] from table parquet_source; {code} > Fix the code gen for ColumnarMapData and ColumnarArrayData > -- > > Key: FLINK-29090 > URL: https://issues.apache.org/jira/browse/FLINK-29090 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 1.16.0 >Reporter: Danny Chen >Priority: Major > Fix For: 1.16.0 > > Attachments: image-2022-08-24-10-15-11-824.png > > > !image-2022-08-24-10-15-11-824.png|width=589,height=284! > Currently, the code generation for {{MapData}} assumes that it is the > {{{}GenericMapData{}}}, but the new introduced {{ColumnarMapData}} and > {{ColumnarArrayData}} can not be casted to {{{}GenericMapData{}}}. > {{ColumnarMapData}} and {{ColumnarArrayData}} are introduced in > FLINK-24614 > [https://github.com/apache/flink/commit/5c731a37e1a8f71f9c9e813f6c741a1e203fa1a3] > How to reproduce: > {code:sql} > create table parquet_source ( > f_map map > ) with ( > 'connector' = 'filesystem', > 'format' = 'parquet' > ); > select f_map['k1'] from table parquet_source; > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-29090) Fix the code gen for ColumnarMapData and ColumnarArrayData
Danny Chen created FLINK-29090: -- Summary: Fix the code gen for ColumnarMapData and ColumnarArrayData Key: FLINK-29090 URL: https://issues.apache.org/jira/browse/FLINK-29090 Project: Flink Issue Type: Bug Components: Table SQL / Runtime Affects Versions: 1.16.0 Reporter: Danny Chen Fix For: 1.16.0 Attachments: image-2022-08-24-10-15-11-824.png !image-2022-08-24-10-15-11-824.png|width=589,height=284! Currently, the code generation for {{MapData}} assumes that it is the {{{}GenericMapData{}}}, but the new introduced {{ColumnarMapData}} and {{ColumnarArrayData}} can not be casted to {{{}GenericMapData{}}}. {{ColumnarMapData}} and {{ColumnarArrayData}} are introduced in FLINK-24614 [https://github.com/apache/flink/commit/5c731a37e1a8f71f9c9e813f6c741a1e203fa1a3] introduces, How to reproduce: {code:sql} create table parquet_source ( f_map map ) with ( 'connector' = 'filesystem', 'format' = 'parquet' ); select f_map['k1'] from table parquet_source; {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-22370) ParquetColumnarRowSplitReader#reachedEnd() returns false after it returns true
Danny Chen created FLINK-22370: -- Summary: ParquetColumnarRowSplitReader#reachedEnd() returns false after it returns true Key: FLINK-22370 URL: https://issues.apache.org/jira/browse/FLINK-22370 Project: Flink Issue Type: Bug Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) Affects Versions: 1.12.2, 1.13.1 Reporter: Danny Chen Fix For: 1.13.1 {{ParquetColumnarRowSplitReader#reachedEnd()}} should always return true after it first time returns true. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-21520) ParquetInputFormat#setfilterPredicate() does not work
[ https://issues.apache.org/jira/browse/FLINK-21520?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17301550#comment-17301550 ] Danny Chen commented on FLINK-21520: Thanks, i guess in your query, CPU filtering is not the resource consuming bottleneck. > ParquetInputFormat#setfilterPredicate() does not work > - > > Key: FLINK-21520 > URL: https://issues.apache.org/jira/browse/FLINK-21520 > Project: Flink > Issue Type: Bug >Reporter: Etienne Chauchot >Priority: Major > > Simplified code: > {code:java} > FilterPredicate filterPredicate = eq(intColumn("intField"), 10); > parquetInputFormat.setFilterPredicate(filterPredicate); > env.createInput(parquetInputFormat); > {code} > > produces no records whereas > {code:java} > env.createInput(parquetInputFormat) > .filter((FilterFunction) value -> value.get("intField") == 10); > {code} > produces records. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-21520) ParquetInputFormat#setfilterPredicate() does not work
[ https://issues.apache.org/jira/browse/FLINK-21520?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17301357#comment-17301357 ] Danny Chen commented on FLINK-21520: That's also a good news, how did you get this conclusion "Performances are almost the same" ? > ParquetInputFormat#setfilterPredicate() does not work > - > > Key: FLINK-21520 > URL: https://issues.apache.org/jira/browse/FLINK-21520 > Project: Flink > Issue Type: Bug >Reporter: Etienne Chauchot >Priority: Major > > Simplified code: > {code:java} > FilterPredicate filterPredicate = eq(intColumn("intField"), 10); > parquetInputFormat.setFilterPredicate(filterPredicate); > env.createInput(parquetInputFormat); > {code} > > produces no records whereas > {code:java} > env.createInput(parquetInputFormat) > .filter((FilterFunction) value -> value.get("intField") == 10); > {code} > produces records. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-21520) ParquetInputFormat#setfilterPredicate() does not work
[ https://issues.apache.org/jira/browse/FLINK-21520?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17300105#comment-17300105 ] Danny Chen commented on FLINK-21520: I'm working on Flink on Apache HUDI now, and we use the Flink parquet format, in order to make time travel query, we need the filter expression push down for the Parquet format, which is ignored in current code base. I'm glad that you need this feature too, can you contribute for the code ? I can help for the code review ~ > ParquetInputFormat#setfilterPredicate() does not work > - > > Key: FLINK-21520 > URL: https://issues.apache.org/jira/browse/FLINK-21520 > Project: Flink > Issue Type: Bug >Reporter: Etienne Chauchot >Priority: Major > > Simplified code: > {code:java} > FilterPredicate filterPredicate = eq(intColumn("intField"), 10); > parquetInputFormat.setFilterPredicate(filterPredicate); > env.createInput(parquetInputFormat); > {code} > > produces no records whereas > {code:java} > env.createInput(parquetInputFormat) > .filter((FilterFunction) value -> value.get("intField") == 10); > {code} > produces records. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-20942) Digest of FLOAT literals throws UnsupportedOperationException
[ https://issues.apache.org/jira/browse/FLINK-20942?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17274208#comment-17274208 ] Danny Chen commented on FLINK-20942: Thanks [~twalthr], i have fired a fix via https://github.com/apache/flink/pull/14801 > Digest of FLOAT literals throws UnsupportedOperationException > - > > Key: FLINK-20942 > URL: https://issues.apache.org/jira/browse/FLINK-20942 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Reporter: Timo Walther >Priority: Critical > Labels: pull-request-available > Fix For: 1.13.0, 1.12.2 > > > The recent refactoring of Calcite's digests might have caused a regression > for FLOAT literals. {{org.apache.calcite.rex.RexLiteral#appendAsJava}} throws > a UnsupportedOperationException for the following query: > {code} > def main(args: Array[String]): Unit = { > val env = StreamExecutionEnvironment.getExecutionEnvironment > val source = env.fromElements( > (1.0f, 11.0f, 12.0f), > (2.0f, 21.0f, 22.0f), > (3.0f, 31.0f, 32.0f), > (4.0f, 41.0f, 42.0f), > (5.0f, 51.0f, 52.0f) > ) > val settings = EnvironmentSettings.newInstance() > .inStreamingMode() > .useBlinkPlanner() > .build() > val tEnv = StreamTableEnvironment.create(env, settings) > tEnv.createTemporaryView("myTable", source, $("id"), $("f1"), $("f2")) > val query = > """ > |select * from myTable where id in (1.0, 2.0, 3.0) > |""".stripMargin > tEnv.executeSql(query).print() > } > {code} > Stack trace: > {code} > Exception in thread "main" java.lang.UnsupportedOperationException: class > org.apache.calcite.sql.type.SqlTypeName: FLOAT > at org.apache.calcite.util.Util.needToImplement(Util.java:1075) > at org.apache.calcite.rex.RexLiteral.appendAsJava(RexLiteral.java:703) > at org.apache.calcite.rex.RexLiteral.toJavaString(RexLiteral.java:408) > at org.apache.calcite.rex.RexLiteral.computeDigest(RexLiteral.java:276) > at org.apache.calcite.rex.RexLiteral.(RexLiteral.java:223) > at org.apache.calcite.rex.RexLiteral.toLiteral(RexLiteral.java:737) > at > org.apache.calcite.rex.RexLiteral.lambda$printSarg$4(RexLiteral.java:710) > at > org.apache.calcite.util.RangeSets$Printer.singleton(RangeSets.java:397) > at org.apache.calcite.util.RangeSets.forEach(RangeSets.java:237) > at org.apache.calcite.util.Sarg.lambda$printTo$0(Sarg.java:110) > at org.apache.calcite.linq4j.Ord.forEach(Ord.java:157) > at org.apache.calcite.util.Sarg.printTo(Sarg.java:106) > at org.apache.calcite.rex.RexLiteral.printSarg(RexLiteral.java:709) > at > org.apache.calcite.rex.RexLiteral.lambda$appendAsJava$1(RexLiteral.java:652) > at org.apache.calcite.util.Util.asStringBuilder(Util.java:2502) > at org.apache.calcite.rex.RexLiteral.appendAsJava(RexLiteral.java:651) > at org.apache.calcite.rex.RexLiteral.toJavaString(RexLiteral.java:408) > at org.apache.calcite.rex.RexLiteral.computeDigest(RexLiteral.java:276) > at org.apache.calcite.rex.RexLiteral.(RexLiteral.java:223) > at org.apache.calcite.rex.RexBuilder.makeLiteral(RexBuilder.java:971) > at > org.apache.calcite.rex.RexBuilder.makeSearchArgumentLiteral(RexBuilder.java:1066) > at > org.apache.calcite.rex.RexSimplify$SargCollector.fix(RexSimplify.java:2786) > at > org.apache.calcite.rex.RexSimplify.lambda$simplifyOrs$6(RexSimplify.java:1843) > at java.util.ArrayList.forEach(ArrayList.java:1257) > at org.apache.calcite.rex.RexSimplify.simplifyOrs(RexSimplify.java:1843) > at org.apache.calcite.rex.RexSimplify.simplifyOr(RexSimplify.java:1817) > at org.apache.calcite.rex.RexSimplify.simplify(RexSimplify.java:313) > at > org.apache.calcite.rex.RexSimplify.simplifyUnknownAs(RexSimplify.java:282) > at org.apache.calcite.rex.RexSimplify.simplify(RexSimplify.java:257) > at > org.apache.flink.table.planner.plan.utils.FlinkRexUtil$.simplify(FlinkRexUtil.scala:213) > at > org.apache.flink.table.planner.plan.rules.logical.SimplifyFilterConditionRule.simplify(SimplifyFilterConditionRule.scala:63) > at > org.apache.flink.table.planner.plan.rules.logical.SimplifyFilterConditionRule.onMatch(SimplifyFilterConditionRule.scala:46) > at > org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:333) > at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:542) > at > org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:407) > at > org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:243) > at >
[jira] [Comment Edited] (FLINK-20942) Digest of FLOAT literals throws UnsupportedOperationException
[ https://issues.apache.org/jira/browse/FLINK-20942?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17272647#comment-17272647 ] Danny Chen edited comment on FLINK-20942 at 1/27/21, 8:05 AM: -- I have fired a fix in CALCITE-4479 but i have no idea how to fix quickly in Flink side, should we copy the {{RexLiteral}} then ? The class is huge .. was (Author: danny0405): I have fired a fix in CALCITE-4479 but i have no idea how to fix quickly in Flink side, should we copy the {{RexLiteral }} then ? The class is huge .. > Digest of FLOAT literals throws UnsupportedOperationException > - > > Key: FLINK-20942 > URL: https://issues.apache.org/jira/browse/FLINK-20942 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Reporter: Timo Walther >Priority: Critical > Fix For: 1.13.0, 1.12.2 > > > The recent refactoring of Calcite's digests might have caused a regression > for FLOAT literals. {{org.apache.calcite.rex.RexLiteral#appendAsJava}} throws > a UnsupportedOperationException for the following query: > {code} > def main(args: Array[String]): Unit = { > val env = StreamExecutionEnvironment.getExecutionEnvironment > val source = env.fromElements( > (1.0f, 11.0f, 12.0f), > (2.0f, 21.0f, 22.0f), > (3.0f, 31.0f, 32.0f), > (4.0f, 41.0f, 42.0f), > (5.0f, 51.0f, 52.0f) > ) > val settings = EnvironmentSettings.newInstance() > .inStreamingMode() > .useBlinkPlanner() > .build() > val tEnv = StreamTableEnvironment.create(env, settings) > tEnv.createTemporaryView("myTable", source, $("id"), $("f1"), $("f2")) > val query = > """ > |select * from myTable where id in (1.0, 2.0, 3.0) > |""".stripMargin > tEnv.executeSql(query).print() > } > {code} > Stack trace: > {code} > Exception in thread "main" java.lang.UnsupportedOperationException: class > org.apache.calcite.sql.type.SqlTypeName: FLOAT > at org.apache.calcite.util.Util.needToImplement(Util.java:1075) > at org.apache.calcite.rex.RexLiteral.appendAsJava(RexLiteral.java:703) > at org.apache.calcite.rex.RexLiteral.toJavaString(RexLiteral.java:408) > at org.apache.calcite.rex.RexLiteral.computeDigest(RexLiteral.java:276) > at org.apache.calcite.rex.RexLiteral.(RexLiteral.java:223) > at org.apache.calcite.rex.RexLiteral.toLiteral(RexLiteral.java:737) > at > org.apache.calcite.rex.RexLiteral.lambda$printSarg$4(RexLiteral.java:710) > at > org.apache.calcite.util.RangeSets$Printer.singleton(RangeSets.java:397) > at org.apache.calcite.util.RangeSets.forEach(RangeSets.java:237) > at org.apache.calcite.util.Sarg.lambda$printTo$0(Sarg.java:110) > at org.apache.calcite.linq4j.Ord.forEach(Ord.java:157) > at org.apache.calcite.util.Sarg.printTo(Sarg.java:106) > at org.apache.calcite.rex.RexLiteral.printSarg(RexLiteral.java:709) > at > org.apache.calcite.rex.RexLiteral.lambda$appendAsJava$1(RexLiteral.java:652) > at org.apache.calcite.util.Util.asStringBuilder(Util.java:2502) > at org.apache.calcite.rex.RexLiteral.appendAsJava(RexLiteral.java:651) > at org.apache.calcite.rex.RexLiteral.toJavaString(RexLiteral.java:408) > at org.apache.calcite.rex.RexLiteral.computeDigest(RexLiteral.java:276) > at org.apache.calcite.rex.RexLiteral.(RexLiteral.java:223) > at org.apache.calcite.rex.RexBuilder.makeLiteral(RexBuilder.java:971) > at > org.apache.calcite.rex.RexBuilder.makeSearchArgumentLiteral(RexBuilder.java:1066) > at > org.apache.calcite.rex.RexSimplify$SargCollector.fix(RexSimplify.java:2786) > at > org.apache.calcite.rex.RexSimplify.lambda$simplifyOrs$6(RexSimplify.java:1843) > at java.util.ArrayList.forEach(ArrayList.java:1257) > at org.apache.calcite.rex.RexSimplify.simplifyOrs(RexSimplify.java:1843) > at org.apache.calcite.rex.RexSimplify.simplifyOr(RexSimplify.java:1817) > at org.apache.calcite.rex.RexSimplify.simplify(RexSimplify.java:313) > at > org.apache.calcite.rex.RexSimplify.simplifyUnknownAs(RexSimplify.java:282) > at org.apache.calcite.rex.RexSimplify.simplify(RexSimplify.java:257) > at > org.apache.flink.table.planner.plan.utils.FlinkRexUtil$.simplify(FlinkRexUtil.scala:213) > at > org.apache.flink.table.planner.plan.rules.logical.SimplifyFilterConditionRule.simplify(SimplifyFilterConditionRule.scala:63) > at > org.apache.flink.table.planner.plan.rules.logical.SimplifyFilterConditionRule.onMatch(SimplifyFilterConditionRule.scala:46) > at > org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:333) > at
[jira] [Commented] (FLINK-20942) Digest of FLOAT literals throws UnsupportedOperationException
[ https://issues.apache.org/jira/browse/FLINK-20942?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17272647#comment-17272647 ] Danny Chen commented on FLINK-20942: I have fired a fix in CALCITE-4479 but i have no idea how to fix quickly in Flink side, should we copy the {{RexLiteral }} then ? The class is huge .. > Digest of FLOAT literals throws UnsupportedOperationException > - > > Key: FLINK-20942 > URL: https://issues.apache.org/jira/browse/FLINK-20942 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Reporter: Timo Walther >Priority: Critical > Fix For: 1.13.0, 1.12.2 > > > The recent refactoring of Calcite's digests might have caused a regression > for FLOAT literals. {{org.apache.calcite.rex.RexLiteral#appendAsJava}} throws > a UnsupportedOperationException for the following query: > {code} > def main(args: Array[String]): Unit = { > val env = StreamExecutionEnvironment.getExecutionEnvironment > val source = env.fromElements( > (1.0f, 11.0f, 12.0f), > (2.0f, 21.0f, 22.0f), > (3.0f, 31.0f, 32.0f), > (4.0f, 41.0f, 42.0f), > (5.0f, 51.0f, 52.0f) > ) > val settings = EnvironmentSettings.newInstance() > .inStreamingMode() > .useBlinkPlanner() > .build() > val tEnv = StreamTableEnvironment.create(env, settings) > tEnv.createTemporaryView("myTable", source, $("id"), $("f1"), $("f2")) > val query = > """ > |select * from myTable where id in (1.0, 2.0, 3.0) > |""".stripMargin > tEnv.executeSql(query).print() > } > {code} > Stack trace: > {code} > Exception in thread "main" java.lang.UnsupportedOperationException: class > org.apache.calcite.sql.type.SqlTypeName: FLOAT > at org.apache.calcite.util.Util.needToImplement(Util.java:1075) > at org.apache.calcite.rex.RexLiteral.appendAsJava(RexLiteral.java:703) > at org.apache.calcite.rex.RexLiteral.toJavaString(RexLiteral.java:408) > at org.apache.calcite.rex.RexLiteral.computeDigest(RexLiteral.java:276) > at org.apache.calcite.rex.RexLiteral.(RexLiteral.java:223) > at org.apache.calcite.rex.RexLiteral.toLiteral(RexLiteral.java:737) > at > org.apache.calcite.rex.RexLiteral.lambda$printSarg$4(RexLiteral.java:710) > at > org.apache.calcite.util.RangeSets$Printer.singleton(RangeSets.java:397) > at org.apache.calcite.util.RangeSets.forEach(RangeSets.java:237) > at org.apache.calcite.util.Sarg.lambda$printTo$0(Sarg.java:110) > at org.apache.calcite.linq4j.Ord.forEach(Ord.java:157) > at org.apache.calcite.util.Sarg.printTo(Sarg.java:106) > at org.apache.calcite.rex.RexLiteral.printSarg(RexLiteral.java:709) > at > org.apache.calcite.rex.RexLiteral.lambda$appendAsJava$1(RexLiteral.java:652) > at org.apache.calcite.util.Util.asStringBuilder(Util.java:2502) > at org.apache.calcite.rex.RexLiteral.appendAsJava(RexLiteral.java:651) > at org.apache.calcite.rex.RexLiteral.toJavaString(RexLiteral.java:408) > at org.apache.calcite.rex.RexLiteral.computeDigest(RexLiteral.java:276) > at org.apache.calcite.rex.RexLiteral.(RexLiteral.java:223) > at org.apache.calcite.rex.RexBuilder.makeLiteral(RexBuilder.java:971) > at > org.apache.calcite.rex.RexBuilder.makeSearchArgumentLiteral(RexBuilder.java:1066) > at > org.apache.calcite.rex.RexSimplify$SargCollector.fix(RexSimplify.java:2786) > at > org.apache.calcite.rex.RexSimplify.lambda$simplifyOrs$6(RexSimplify.java:1843) > at java.util.ArrayList.forEach(ArrayList.java:1257) > at org.apache.calcite.rex.RexSimplify.simplifyOrs(RexSimplify.java:1843) > at org.apache.calcite.rex.RexSimplify.simplifyOr(RexSimplify.java:1817) > at org.apache.calcite.rex.RexSimplify.simplify(RexSimplify.java:313) > at > org.apache.calcite.rex.RexSimplify.simplifyUnknownAs(RexSimplify.java:282) > at org.apache.calcite.rex.RexSimplify.simplify(RexSimplify.java:257) > at > org.apache.flink.table.planner.plan.utils.FlinkRexUtil$.simplify(FlinkRexUtil.scala:213) > at > org.apache.flink.table.planner.plan.rules.logical.SimplifyFilterConditionRule.simplify(SimplifyFilterConditionRule.scala:63) > at > org.apache.flink.table.planner.plan.rules.logical.SimplifyFilterConditionRule.onMatch(SimplifyFilterConditionRule.scala:46) > at > org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:333) > at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:542) > at > org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:407) > at > org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:243) > at >
[jira] [Commented] (FLINK-20942) Digest of FLOAT literals throws UnsupportedOperationException
[ https://issues.apache.org/jira/browse/FLINK-20942?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17270032#comment-17270032 ] Danny Chen commented on FLINK-20942: Hmm, the root cause is that the new introduced `Sarg` tries to make literals recursively, for this example, it tries to create `RexLiteral`s with `FLOAT` as type name which is not possible from the normally SQL context, so, this expects to be a bug, let me see how to fix it quickly in Flink side. > Digest of FLOAT literals throws UnsupportedOperationException > - > > Key: FLINK-20942 > URL: https://issues.apache.org/jira/browse/FLINK-20942 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Reporter: Timo Walther >Priority: Critical > Fix For: 1.13.0, 1.12.2 > > > The recent refactoring of Calcite's digests might have caused a regression > for FLOAT literals. {{org.apache.calcite.rex.RexLiteral#appendAsJava}} throws > a UnsupportedOperationException for the following query: > {code} > def main(args: Array[String]): Unit = { > val env = StreamExecutionEnvironment.getExecutionEnvironment > val source = env.fromElements( > (1.0f, 11.0f, 12.0f), > (2.0f, 21.0f, 22.0f), > (3.0f, 31.0f, 32.0f), > (4.0f, 41.0f, 42.0f), > (5.0f, 51.0f, 52.0f) > ) > val settings = EnvironmentSettings.newInstance() > .inStreamingMode() > .useBlinkPlanner() > .build() > val tEnv = StreamTableEnvironment.create(env, settings) > tEnv.createTemporaryView("myTable", source, $("id"), $("f1"), $("f2")) > val query = > """ > |select * from myTable where id in (1.0, 2.0, 3.0) > |""".stripMargin > tEnv.executeSql(query).print() > } > {code} > Stack trace: > {code} > Exception in thread "main" java.lang.UnsupportedOperationException: class > org.apache.calcite.sql.type.SqlTypeName: FLOAT > at org.apache.calcite.util.Util.needToImplement(Util.java:1075) > at org.apache.calcite.rex.RexLiteral.appendAsJava(RexLiteral.java:703) > at org.apache.calcite.rex.RexLiteral.toJavaString(RexLiteral.java:408) > at org.apache.calcite.rex.RexLiteral.computeDigest(RexLiteral.java:276) > at org.apache.calcite.rex.RexLiteral.(RexLiteral.java:223) > at org.apache.calcite.rex.RexLiteral.toLiteral(RexLiteral.java:737) > at > org.apache.calcite.rex.RexLiteral.lambda$printSarg$4(RexLiteral.java:710) > at > org.apache.calcite.util.RangeSets$Printer.singleton(RangeSets.java:397) > at org.apache.calcite.util.RangeSets.forEach(RangeSets.java:237) > at org.apache.calcite.util.Sarg.lambda$printTo$0(Sarg.java:110) > at org.apache.calcite.linq4j.Ord.forEach(Ord.java:157) > at org.apache.calcite.util.Sarg.printTo(Sarg.java:106) > at org.apache.calcite.rex.RexLiteral.printSarg(RexLiteral.java:709) > at > org.apache.calcite.rex.RexLiteral.lambda$appendAsJava$1(RexLiteral.java:652) > at org.apache.calcite.util.Util.asStringBuilder(Util.java:2502) > at org.apache.calcite.rex.RexLiteral.appendAsJava(RexLiteral.java:651) > at org.apache.calcite.rex.RexLiteral.toJavaString(RexLiteral.java:408) > at org.apache.calcite.rex.RexLiteral.computeDigest(RexLiteral.java:276) > at org.apache.calcite.rex.RexLiteral.(RexLiteral.java:223) > at org.apache.calcite.rex.RexBuilder.makeLiteral(RexBuilder.java:971) > at > org.apache.calcite.rex.RexBuilder.makeSearchArgumentLiteral(RexBuilder.java:1066) > at > org.apache.calcite.rex.RexSimplify$SargCollector.fix(RexSimplify.java:2786) > at > org.apache.calcite.rex.RexSimplify.lambda$simplifyOrs$6(RexSimplify.java:1843) > at java.util.ArrayList.forEach(ArrayList.java:1257) > at org.apache.calcite.rex.RexSimplify.simplifyOrs(RexSimplify.java:1843) > at org.apache.calcite.rex.RexSimplify.simplifyOr(RexSimplify.java:1817) > at org.apache.calcite.rex.RexSimplify.simplify(RexSimplify.java:313) > at > org.apache.calcite.rex.RexSimplify.simplifyUnknownAs(RexSimplify.java:282) > at org.apache.calcite.rex.RexSimplify.simplify(RexSimplify.java:257) > at > org.apache.flink.table.planner.plan.utils.FlinkRexUtil$.simplify(FlinkRexUtil.scala:213) > at > org.apache.flink.table.planner.plan.rules.logical.SimplifyFilterConditionRule.simplify(SimplifyFilterConditionRule.scala:63) > at > org.apache.flink.table.planner.plan.rules.logical.SimplifyFilterConditionRule.onMatch(SimplifyFilterConditionRule.scala:46) > at > org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:333) > at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:542) > at >
[jira] [Commented] (FLINK-20942) Digest of FLOAT literals throws UnsupportedOperationException
[ https://issues.apache.org/jira/browse/FLINK-20942?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17270006#comment-17270006 ] Danny Chen commented on FLINK-20942: Thanks [~twalthr] for reporting this issue, i would take a look and fix it soon ~ > Digest of FLOAT literals throws UnsupportedOperationException > - > > Key: FLINK-20942 > URL: https://issues.apache.org/jira/browse/FLINK-20942 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Reporter: Timo Walther >Priority: Critical > Fix For: 1.13.0, 1.12.2 > > > The recent refactoring of Calcite's digests might have caused a regression > for FLOAT literals. {{org.apache.calcite.rex.RexLiteral#appendAsJava}} throws > a UnsupportedOperationException for the following query: > {code} > def main(args: Array[String]): Unit = { > val env = StreamExecutionEnvironment.getExecutionEnvironment > val source = env.fromElements( > (1.0f, 11.0f, 12.0f), > (2.0f, 21.0f, 22.0f), > (3.0f, 31.0f, 32.0f), > (4.0f, 41.0f, 42.0f), > (5.0f, 51.0f, 52.0f) > ) > val settings = EnvironmentSettings.newInstance() > .inStreamingMode() > .useBlinkPlanner() > .build() > val tEnv = StreamTableEnvironment.create(env, settings) > tEnv.createTemporaryView("myTable", source, $("id"), $("f1"), $("f2")) > val query = > """ > |select * from myTable where id in (1.0, 2.0, 3.0) > |""".stripMargin > tEnv.executeSql(query).print() > } > {code} > Stack trace: > {code} > Exception in thread "main" java.lang.UnsupportedOperationException: class > org.apache.calcite.sql.type.SqlTypeName: FLOAT > at org.apache.calcite.util.Util.needToImplement(Util.java:1075) > at org.apache.calcite.rex.RexLiteral.appendAsJava(RexLiteral.java:703) > at org.apache.calcite.rex.RexLiteral.toJavaString(RexLiteral.java:408) > at org.apache.calcite.rex.RexLiteral.computeDigest(RexLiteral.java:276) > at org.apache.calcite.rex.RexLiteral.(RexLiteral.java:223) > at org.apache.calcite.rex.RexLiteral.toLiteral(RexLiteral.java:737) > at > org.apache.calcite.rex.RexLiteral.lambda$printSarg$4(RexLiteral.java:710) > at > org.apache.calcite.util.RangeSets$Printer.singleton(RangeSets.java:397) > at org.apache.calcite.util.RangeSets.forEach(RangeSets.java:237) > at org.apache.calcite.util.Sarg.lambda$printTo$0(Sarg.java:110) > at org.apache.calcite.linq4j.Ord.forEach(Ord.java:157) > at org.apache.calcite.util.Sarg.printTo(Sarg.java:106) > at org.apache.calcite.rex.RexLiteral.printSarg(RexLiteral.java:709) > at > org.apache.calcite.rex.RexLiteral.lambda$appendAsJava$1(RexLiteral.java:652) > at org.apache.calcite.util.Util.asStringBuilder(Util.java:2502) > at org.apache.calcite.rex.RexLiteral.appendAsJava(RexLiteral.java:651) > at org.apache.calcite.rex.RexLiteral.toJavaString(RexLiteral.java:408) > at org.apache.calcite.rex.RexLiteral.computeDigest(RexLiteral.java:276) > at org.apache.calcite.rex.RexLiteral.(RexLiteral.java:223) > at org.apache.calcite.rex.RexBuilder.makeLiteral(RexBuilder.java:971) > at > org.apache.calcite.rex.RexBuilder.makeSearchArgumentLiteral(RexBuilder.java:1066) > at > org.apache.calcite.rex.RexSimplify$SargCollector.fix(RexSimplify.java:2786) > at > org.apache.calcite.rex.RexSimplify.lambda$simplifyOrs$6(RexSimplify.java:1843) > at java.util.ArrayList.forEach(ArrayList.java:1257) > at org.apache.calcite.rex.RexSimplify.simplifyOrs(RexSimplify.java:1843) > at org.apache.calcite.rex.RexSimplify.simplifyOr(RexSimplify.java:1817) > at org.apache.calcite.rex.RexSimplify.simplify(RexSimplify.java:313) > at > org.apache.calcite.rex.RexSimplify.simplifyUnknownAs(RexSimplify.java:282) > at org.apache.calcite.rex.RexSimplify.simplify(RexSimplify.java:257) > at > org.apache.flink.table.planner.plan.utils.FlinkRexUtil$.simplify(FlinkRexUtil.scala:213) > at > org.apache.flink.table.planner.plan.rules.logical.SimplifyFilterConditionRule.simplify(SimplifyFilterConditionRule.scala:63) > at > org.apache.flink.table.planner.plan.rules.logical.SimplifyFilterConditionRule.onMatch(SimplifyFilterConditionRule.scala:46) > at > org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:333) > at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:542) > at > org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:407) > at > org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:243) > at > org.apache.calcite.plan.hep.HepInstruction$RuleInstance.execute(HepInstruction.java:127) >
[jira] [Commented] (FLINK-18027) ROW value constructor cannot deal with complex expressions
[ https://issues.apache.org/jira/browse/FLINK-18027?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17260156#comment-17260156 ] Danny Chen commented on FLINK-18027: Fixed in https://issues.apache.org/jira/browse/CALCITE-4456, the bug would be fixed if we upgrade to CALCITE release 1.27.0 or higher version. > ROW value constructor cannot deal with complex expressions > -- > > Key: FLINK-18027 > URL: https://issues.apache.org/jira/browse/FLINK-18027 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Reporter: Benchao Li >Priority: Major > Labels: pull-request-available > Fix For: 1.13.0 > > > {code:java} > create table my_source ( > my_row row > ) with (...); > create table my_sink ( > my_row row > ) with (...); > insert into my_sink > select ROW(my_row.a, my_row.b) > from my_source;{code} > will throw excepions: > {code:java} > Exception in thread "main" org.apache.flink.table.api.SqlParserException: SQL > parse failed. Encountered "." at line 1, column 18.Exception in thread "main" > org.apache.flink.table.api.SqlParserException: SQL parse failed. Encountered > "." at line 1, column 18.Was expecting one of: ")" ... "," ... at > org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:56) > at > org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:64) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:627) > at com.bytedance.demo.KafkaTableSource.main(KafkaTableSource.java:76)Caused > by: org.apache.calcite.sql.parser.SqlParseException: Encountered "." at line > 1, column 18.Was expecting one of: ")" ... "," ... at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.convertException(FlinkSqlParserImpl.java:416) > at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.normalizeException(FlinkSqlParserImpl.java:201) > at > org.apache.calcite.sql.parser.SqlParser.handleException(SqlParser.java:148) > at org.apache.calcite.sql.parser.SqlParser.parseQuery(SqlParser.java:163) at > org.apache.calcite.sql.parser.SqlParser.parseStmt(SqlParser.java:188) at > org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:54) > ... 3 moreCaused by: org.apache.flink.sql.parser.impl.ParseException: > Encountered "." at line 1, column 18.Was expecting one of: ")" ... "," > ... at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.generateParseException(FlinkSqlParserImpl.java:36161) > at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.jj_consume_token(FlinkSqlParserImpl.java:35975) > at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.ParenthesizedSimpleIdentifierList(FlinkSqlParserImpl.java:21432) > at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.Expression3(FlinkSqlParserImpl.java:17164) > at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.Expression2b(FlinkSqlParserImpl.java:16820) > at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.Expression2(FlinkSqlParserImpl.java:16861) > at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.Expression(FlinkSqlParserImpl.java:16792) > at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SelectExpression(FlinkSqlParserImpl.java:11091) > at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SelectItem(FlinkSqlParserImpl.java:10293) > at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SelectList(FlinkSqlParserImpl.java:10267) > at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlSelect(FlinkSqlParserImpl.java:6943) > at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.LeafQuery(FlinkSqlParserImpl.java:658) > at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.LeafQueryOrExpr(FlinkSqlParserImpl.java:16775) > at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.QueryOrExpr(FlinkSqlParserImpl.java:16238) > at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.OrderedQueryOrExpr(FlinkSqlParserImpl.java:532) > at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlStmt(FlinkSqlParserImpl.java:3761) > at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlStmtEof(FlinkSqlParserImpl.java:3800) > at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.parseSqlStmtEof(FlinkSqlParserImpl.java:248) > at org.apache.calcite.sql.parser.SqlParser.parseQuery(SqlParser.java:161) > ... 5 more > {code} > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-18027) ROW value constructor cannot deal with complex expressions
[ https://issues.apache.org/jira/browse/FLINK-18027?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17256799#comment-17256799 ] Danny Chen commented on FLINK-18027: Thanks for the check [~twalthr], let me have a check for the expression `ROW(f0 + 12, 'Hello world')` on the Calcite side. Would give the root cause and solution soon ~ > ROW value constructor cannot deal with complex expressions > -- > > Key: FLINK-18027 > URL: https://issues.apache.org/jira/browse/FLINK-18027 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Reporter: Benchao Li >Priority: Major > Labels: pull-request-available > Fix For: 1.13.0 > > > {code:java} > create table my_source ( > my_row row > ) with (...); > create table my_sink ( > my_row row > ) with (...); > insert into my_sink > select ROW(my_row.a, my_row.b) > from my_source;{code} > will throw excepions: > {code:java} > Exception in thread "main" org.apache.flink.table.api.SqlParserException: SQL > parse failed. Encountered "." at line 1, column 18.Exception in thread "main" > org.apache.flink.table.api.SqlParserException: SQL parse failed. Encountered > "." at line 1, column 18.Was expecting one of: ")" ... "," ... at > org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:56) > at > org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:64) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:627) > at com.bytedance.demo.KafkaTableSource.main(KafkaTableSource.java:76)Caused > by: org.apache.calcite.sql.parser.SqlParseException: Encountered "." at line > 1, column 18.Was expecting one of: ")" ... "," ... at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.convertException(FlinkSqlParserImpl.java:416) > at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.normalizeException(FlinkSqlParserImpl.java:201) > at > org.apache.calcite.sql.parser.SqlParser.handleException(SqlParser.java:148) > at org.apache.calcite.sql.parser.SqlParser.parseQuery(SqlParser.java:163) at > org.apache.calcite.sql.parser.SqlParser.parseStmt(SqlParser.java:188) at > org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:54) > ... 3 moreCaused by: org.apache.flink.sql.parser.impl.ParseException: > Encountered "." at line 1, column 18.Was expecting one of: ")" ... "," > ... at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.generateParseException(FlinkSqlParserImpl.java:36161) > at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.jj_consume_token(FlinkSqlParserImpl.java:35975) > at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.ParenthesizedSimpleIdentifierList(FlinkSqlParserImpl.java:21432) > at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.Expression3(FlinkSqlParserImpl.java:17164) > at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.Expression2b(FlinkSqlParserImpl.java:16820) > at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.Expression2(FlinkSqlParserImpl.java:16861) > at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.Expression(FlinkSqlParserImpl.java:16792) > at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SelectExpression(FlinkSqlParserImpl.java:11091) > at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SelectItem(FlinkSqlParserImpl.java:10293) > at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SelectList(FlinkSqlParserImpl.java:10267) > at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlSelect(FlinkSqlParserImpl.java:6943) > at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.LeafQuery(FlinkSqlParserImpl.java:658) > at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.LeafQueryOrExpr(FlinkSqlParserImpl.java:16775) > at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.QueryOrExpr(FlinkSqlParserImpl.java:16238) > at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.OrderedQueryOrExpr(FlinkSqlParserImpl.java:532) > at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlStmt(FlinkSqlParserImpl.java:3761) > at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlStmtEof(FlinkSqlParserImpl.java:3800) > at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.parseSqlStmtEof(FlinkSqlParserImpl.java:248) > at org.apache.calcite.sql.parser.SqlParser.parseQuery(SqlParser.java:161) > ... 5 more > {code} > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-20785) MapRelDataType for legacy planner has wrong digest
Danny Chen created FLINK-20785: -- Summary: MapRelDataType for legacy planner has wrong digest Key: FLINK-20785 URL: https://issues.apache.org/jira/browse/FLINK-20785 Project: Flink Issue Type: Task Components: Table SQL / Legacy Planner Affects Versions: 1.12.0 Reporter: Danny Chen In the legacy planner, we create the {{MapRelDataType}} ignoring the key and value nullability. If we implements the digest correctly, it would conflict with the logic. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-20704) Some rel data type does not implement the digest correctly
Danny Chen created FLINK-20704: -- Summary: Some rel data type does not implement the digest correctly Key: FLINK-20704 URL: https://issues.apache.org/jira/browse/FLINK-20704 Project: Flink Issue Type: Bug Components: Table SQL / Legacy Planner Affects Versions: 1.11.3, 1.12.0 Reporter: Danny Chen Fix For: 1.12.1 Some of the rel data types for legacy planner: - {{GenericRelDataType}} - {{ArrayRelDataType}} - {{MapRelDataType}} - {{MultisetRelDataType}} Does not implement the digest correctly, especially for {{GenericRelDataType}} , the {{RelDataTypeFactory}} caches the type instances based on its digest, a wrong digest impl would mess up the type instance creation, e.g. without this patch, all the {{GenericRelDataType}} instances have same digest `ANY`. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (FLINK-20368) Supports custom operator name for Flink SQL
[ https://issues.apache.org/jira/browse/FLINK-20368?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17239503#comment-17239503 ] Danny Chen edited comment on FLINK-20368 at 11/27/20, 4:22 AM: --- Hi [~ksp0422] ~ The DDL does not use DataStream API, for Blink planner, most of the operator codes are code-gened and chained through {{Transformation}}s, the operator name strategy is kind of builtin and user never see it. > Will it break like normal SELECT queries when upgrading If we keep operator name unchanged, it expected to be compatible. was (Author: danny0405): Hi [~ksp0422] ~ The DDL does not use DataStream API, for Blink planner, not of the operator codes are code-gened and chained through {{Transformation}}s, the operator name strategy is kind of builtin and user never see it. > Will it break like normal SELECT queries when upgrading If we keep operator name unchanged, it expected to be compatible. > Supports custom operator name for Flink SQL > --- > > Key: FLINK-20368 > URL: https://issues.apache.org/jira/browse/FLINK-20368 > Project: Flink > Issue Type: Improvement > Components: Table SQL / API >Affects Versions: 1.12.0 >Reporter: Danny Chen >Priority: Major > > A request from USER mailing list from Kevin Kwon: > For SQLs, I know that the operator ID assignment is not possible now since > the query optimizer may not be backward compatible in each release > But are DDLs also affected by this? > for example, > CREATE TABLE mytable ( > id BIGINT, > data STRING > ) with ( > connector = 'kafka' > ... > id = 'mytable' > name = 'mytable' > ) > and we can save all related checkpoint data -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-20368) Supports custom operator name for Flink SQL
[ https://issues.apache.org/jira/browse/FLINK-20368?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17239503#comment-17239503 ] Danny Chen commented on FLINK-20368: Hi [~ksp0422] ~ The DDL does not use DataStream API, for Blink planner, not of the operator codes are code-gened and chained through {{Transformation}}s, the operator name strategy is kind of builtin and user never see it. > Will it break like normal SELECT queries when upgrading If we keep operator name unchanged, it expected to be compatible. > Supports custom operator name for Flink SQL > --- > > Key: FLINK-20368 > URL: https://issues.apache.org/jira/browse/FLINK-20368 > Project: Flink > Issue Type: Improvement > Components: Table SQL / API >Affects Versions: 1.12.0 >Reporter: Danny Chen >Priority: Major > > A request from USER mailing list from Kevin Kwon: > For SQLs, I know that the operator ID assignment is not possible now since > the query optimizer may not be backward compatible in each release > But are DDLs also affected by this? > for example, > CREATE TABLE mytable ( > id BIGINT, > data STRING > ) with ( > connector = 'kafka' > ... > id = 'mytable' > name = 'mytable' > ) > and we can save all related checkpoint data -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-20368) Supports custom operator name for Flink SQL
Danny Chen created FLINK-20368: -- Summary: Supports custom operator name for Flink SQL Key: FLINK-20368 URL: https://issues.apache.org/jira/browse/FLINK-20368 Project: Flink Issue Type: Improvement Components: Table SQL / API Affects Versions: 1.12.0 Reporter: Danny Chen A request from USER mailing list from Kevin Kwon: For SQLs, I know that the operator ID assignment is not possible now since the query optimizer may not be backward compatible in each release But are DDLs also affected by this? for example, CREATE TABLE mytable ( id BIGINT, data STRING ) with ( connector = 'kafka' ... id = 'mytable' name = 'mytable' ) and we can save all related checkpoint data -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-20307) Strength the document about temporal table join syntax
[ https://issues.apache.org/jira/browse/FLINK-20307?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17237921#comment-17237921 ] Danny Chen commented on FLINK-20307: Yes, i have added more documents there so there is less confusion. > Strength the document about temporal table join syntax > -- > > Key: FLINK-20307 > URL: https://issues.apache.org/jira/browse/FLINK-20307 > Project: Flink > Issue Type: Improvement > Components: Documentation, Table SQL / Planner >Reporter: Dawid Wysakowicz >Assignee: Danny Chen >Priority: Minor > Labels: pull-request-available > Fix For: 1.12.0 > > > A query like: > {code} > SELECT p.name, p.qty * r.rate AS price, p.`tstamp` FROM Products p JOIN > versioned_rates r FOR SYSTEM_TIME AS OF p.`tstamp` ON p.currency = r.currency; > {code} > fails with: > {code} > [ERROR] Could not execute SQL statement. Reason: > org.apache.flink.sql.parser.impl.ParseException: Encountered "FOR" at line 1, > column 108. > Was expecting one of: > > "EXCEPT" ... > "FETCH" ... > "GROUP" ... > "HAVING" ... > "INTERSECT" ... > "LIMIT" ... > "OFFSET" ... > "ON" ... > "ORDER" ... > "MINUS" ... > "TABLESAMPLE" ... > "UNION" ... > "USING" ... > "WHERE" ... > "WINDOW" ... > "(" ... > "NATURAL" ... > "JOIN" ... > "INNER" ... > "LEFT" ... > "RIGHT" ... > "FULL" ... > "CROSS" ... > "," ... > "OUTER" ... > {code} > When I do not alias the {{versioned_rates}} table everything works as > expected. Therefore query like just runs: > {code} > SELECT p.name, p.qty * versioned_rates.rate AS price, p.`tstamp` FROM > Products p JOIN versioned_rates FOR SYSTEM_TIME AS OF p.`tstamp` ON > p.currency = versioned_rates.currency; > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-20307) Strength the document about temporal table join syntax
[ https://issues.apache.org/jira/browse/FLINK-20307?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Danny Chen updated FLINK-20307: --- Priority: Minor (was: Critical) > Strength the document about temporal table join syntax > -- > > Key: FLINK-20307 > URL: https://issues.apache.org/jira/browse/FLINK-20307 > Project: Flink > Issue Type: Improvement > Components: Documentation, Table SQL / Planner >Reporter: Dawid Wysakowicz >Assignee: Danny Chen >Priority: Minor > Fix For: 1.12.0 > > > A query like: > {code} > SELECT p.name, p.qty * r.rate AS price, p.`tstamp` FROM Products p JOIN > versioned_rates r FOR SYSTEM_TIME AS OF p.`tstamp` ON p.currency = r.currency; > {code} > fails with: > {code} > [ERROR] Could not execute SQL statement. Reason: > org.apache.flink.sql.parser.impl.ParseException: Encountered "FOR" at line 1, > column 108. > Was expecting one of: > > "EXCEPT" ... > "FETCH" ... > "GROUP" ... > "HAVING" ... > "INTERSECT" ... > "LIMIT" ... > "OFFSET" ... > "ON" ... > "ORDER" ... > "MINUS" ... > "TABLESAMPLE" ... > "UNION" ... > "USING" ... > "WHERE" ... > "WINDOW" ... > "(" ... > "NATURAL" ... > "JOIN" ... > "INNER" ... > "LEFT" ... > "RIGHT" ... > "FULL" ... > "CROSS" ... > "," ... > "OUTER" ... > {code} > When I do not alias the {{versioned_rates}} table everything works as > expected. Therefore query like just runs: > {code} > SELECT p.name, p.qty * versioned_rates.rate AS price, p.`tstamp` FROM > Products p JOIN versioned_rates FOR SYSTEM_TIME AS OF p.`tstamp` ON > p.currency = versioned_rates.currency; > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-20307) Strength the document about temporal table join syntax
[ https://issues.apache.org/jira/browse/FLINK-20307?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Danny Chen updated FLINK-20307: --- Component/s: Documentation > Strength the document about temporal table join syntax > -- > > Key: FLINK-20307 > URL: https://issues.apache.org/jira/browse/FLINK-20307 > Project: Flink > Issue Type: Improvement > Components: Documentation, Table SQL / Planner >Reporter: Dawid Wysakowicz >Assignee: Danny Chen >Priority: Critical > Fix For: 1.12.0 > > > A query like: > {code} > SELECT p.name, p.qty * r.rate AS price, p.`tstamp` FROM Products p JOIN > versioned_rates r FOR SYSTEM_TIME AS OF p.`tstamp` ON p.currency = r.currency; > {code} > fails with: > {code} > [ERROR] Could not execute SQL statement. Reason: > org.apache.flink.sql.parser.impl.ParseException: Encountered "FOR" at line 1, > column 108. > Was expecting one of: > > "EXCEPT" ... > "FETCH" ... > "GROUP" ... > "HAVING" ... > "INTERSECT" ... > "LIMIT" ... > "OFFSET" ... > "ON" ... > "ORDER" ... > "MINUS" ... > "TABLESAMPLE" ... > "UNION" ... > "USING" ... > "WHERE" ... > "WINDOW" ... > "(" ... > "NATURAL" ... > "JOIN" ... > "INNER" ... > "LEFT" ... > "RIGHT" ... > "FULL" ... > "CROSS" ... > "," ... > "OUTER" ... > {code} > When I do not alias the {{versioned_rates}} table everything works as > expected. Therefore query like just runs: > {code} > SELECT p.name, p.qty * versioned_rates.rate AS price, p.`tstamp` FROM > Products p JOIN versioned_rates FOR SYSTEM_TIME AS OF p.`tstamp` ON > p.currency = versioned_rates.currency; > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-20307) Strength the document about temporal table join syntax
[ https://issues.apache.org/jira/browse/FLINK-20307?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Danny Chen updated FLINK-20307: --- Summary: Strength the document about temporal table join syntax (was: Cannot alias versioned table in a join) > Strength the document about temporal table join syntax > -- > > Key: FLINK-20307 > URL: https://issues.apache.org/jira/browse/FLINK-20307 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Planner >Reporter: Dawid Wysakowicz >Assignee: Danny Chen >Priority: Critical > Fix For: 1.12.0 > > > A query like: > {code} > SELECT p.name, p.qty * r.rate AS price, p.`tstamp` FROM Products p JOIN > versioned_rates r FOR SYSTEM_TIME AS OF p.`tstamp` ON p.currency = r.currency; > {code} > fails with: > {code} > [ERROR] Could not execute SQL statement. Reason: > org.apache.flink.sql.parser.impl.ParseException: Encountered "FOR" at line 1, > column 108. > Was expecting one of: > > "EXCEPT" ... > "FETCH" ... > "GROUP" ... > "HAVING" ... > "INTERSECT" ... > "LIMIT" ... > "OFFSET" ... > "ON" ... > "ORDER" ... > "MINUS" ... > "TABLESAMPLE" ... > "UNION" ... > "USING" ... > "WHERE" ... > "WINDOW" ... > "(" ... > "NATURAL" ... > "JOIN" ... > "INNER" ... > "LEFT" ... > "RIGHT" ... > "FULL" ... > "CROSS" ... > "," ... > "OUTER" ... > {code} > When I do not alias the {{versioned_rates}} table everything works as > expected. Therefore query like just runs: > {code} > SELECT p.name, p.qty * versioned_rates.rate AS price, p.`tstamp` FROM > Products p JOIN versioned_rates FOR SYSTEM_TIME AS OF p.`tstamp` ON > p.currency = versioned_rates.currency; > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (FLINK-20307) Cannot alias versioned table in a join
[ https://issues.apache.org/jira/browse/FLINK-20307?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17237854#comment-17237854 ] Danny Chen edited comment on FLINK-20307 at 11/24/20, 4:25 AM: --- Based on the SQL standard 2011 7.6 table reference, the table primary syntax is: {code:xml} ::= [ ] [ [ AS ] [] ] {code} So the query should be re-written as following: {code:sql} SELECT p.name, p.qty * r.rate AS price, p.`tstamp` FROM Products p JOIN versioned_rates FOR SYSTEM_TIME AS OF p.`tstamp` AS r -- AS should be here, the AS keyword is optional ON p.currency = r.currency {code} was (Author: danny0405): Based on the SQL standard 2011 7.6 table reference, the table primary syntax is: {code:xml} ::= [ ] [ [ AS ] [] ] {code} So the query should be re-written as following: {code:sql} SELECT p.name, p.qty * r.rate AS price, p.`tstamp` FROM Products p JOIN versioned_rates FOR SYSTEM_TIME AS OF p.`tstamp` AS r -- AS should be here ~ ON p.currency = r.currency {code} > Cannot alias versioned table in a join > -- > > Key: FLINK-20307 > URL: https://issues.apache.org/jira/browse/FLINK-20307 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Planner >Reporter: Dawid Wysakowicz >Assignee: Danny Chen >Priority: Critical > Fix For: 1.12.0 > > > A query like: > {code} > SELECT p.name, p.qty * r.rate AS price, p.`tstamp` FROM Products p JOIN > versioned_rates r FOR SYSTEM_TIME AS OF p.`tstamp` ON p.currency = r.currency; > {code} > fails with: > {code} > [ERROR] Could not execute SQL statement. Reason: > org.apache.flink.sql.parser.impl.ParseException: Encountered "FOR" at line 1, > column 108. > Was expecting one of: > > "EXCEPT" ... > "FETCH" ... > "GROUP" ... > "HAVING" ... > "INTERSECT" ... > "LIMIT" ... > "OFFSET" ... > "ON" ... > "ORDER" ... > "MINUS" ... > "TABLESAMPLE" ... > "UNION" ... > "USING" ... > "WHERE" ... > "WINDOW" ... > "(" ... > "NATURAL" ... > "JOIN" ... > "INNER" ... > "LEFT" ... > "RIGHT" ... > "FULL" ... > "CROSS" ... > "," ... > "OUTER" ... > {code} > When I do not alias the {{versioned_rates}} table everything works as > expected. Therefore query like just runs: > {code} > SELECT p.name, p.qty * versioned_rates.rate AS price, p.`tstamp` FROM > Products p JOIN versioned_rates FOR SYSTEM_TIME AS OF p.`tstamp` ON > p.currency = versioned_rates.currency; > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-20307) Cannot alias versioned table in a join
[ https://issues.apache.org/jira/browse/FLINK-20307?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17237854#comment-17237854 ] Danny Chen commented on FLINK-20307: Based on the SQL standard 2011 7.6 table reference, the table primary syntax is: {code:xml} ::= [ ] [ [ AS ] [] ] {code} So the query should be re-written as following: {code:sql} SELECT p.name, p.qty * r.rate AS price, p.`tstamp` FROM Products p JOIN versioned_rates FOR SYSTEM_TIME AS OF p.`tstamp` AS r -- AS should be here ~ ON p.currency = r.currency {code} > Cannot alias versioned table in a join > -- > > Key: FLINK-20307 > URL: https://issues.apache.org/jira/browse/FLINK-20307 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Planner >Reporter: Dawid Wysakowicz >Assignee: Danny Chen >Priority: Critical > Fix For: 1.12.0 > > > A query like: > {code} > SELECT p.name, p.qty * r.rate AS price, p.`tstamp` FROM Products p JOIN > versioned_rates r FOR SYSTEM_TIME AS OF p.`tstamp` ON p.currency = r.currency; > {code} > fails with: > {code} > [ERROR] Could not execute SQL statement. Reason: > org.apache.flink.sql.parser.impl.ParseException: Encountered "FOR" at line 1, > column 108. > Was expecting one of: > > "EXCEPT" ... > "FETCH" ... > "GROUP" ... > "HAVING" ... > "INTERSECT" ... > "LIMIT" ... > "OFFSET" ... > "ON" ... > "ORDER" ... > "MINUS" ... > "TABLESAMPLE" ... > "UNION" ... > "USING" ... > "WHERE" ... > "WINDOW" ... > "(" ... > "NATURAL" ... > "JOIN" ... > "INNER" ... > "LEFT" ... > "RIGHT" ... > "FULL" ... > "CROSS" ... > "," ... > "OUTER" ... > {code} > When I do not alias the {{versioned_rates}} table everything works as > expected. Therefore query like just runs: > {code} > SELECT p.name, p.qty * versioned_rates.rate AS price, p.`tstamp` FROM > Products p JOIN versioned_rates FOR SYSTEM_TIME AS OF p.`tstamp` ON > p.currency = versioned_rates.currency; > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-20307) Cannot alias versioned table in a join
[ https://issues.apache.org/jira/browse/FLINK-20307?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17237794#comment-17237794 ] Danny Chen commented on FLINK-20307: Hi, [~dwysakowicz] ~ I would like to take this issue ~ > Cannot alias versioned table in a join > -- > > Key: FLINK-20307 > URL: https://issues.apache.org/jira/browse/FLINK-20307 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Planner >Reporter: Dawid Wysakowicz >Priority: Critical > Fix For: 1.12.0 > > > A query like: > {code} > SELECT p.name, p.qty * r.rate AS price, p.`tstamp` FROM Products p JOIN > versioned_rates r FOR SYSTEM_TIME AS OF p.`tstamp` ON p.currency = r.currency; > {code} > fails with: > {code} > [ERROR] Could not execute SQL statement. Reason: > org.apache.flink.sql.parser.impl.ParseException: Encountered "FOR" at line 1, > column 108. > Was expecting one of: > > "EXCEPT" ... > "FETCH" ... > "GROUP" ... > "HAVING" ... > "INTERSECT" ... > "LIMIT" ... > "OFFSET" ... > "ON" ... > "ORDER" ... > "MINUS" ... > "TABLESAMPLE" ... > "UNION" ... > "USING" ... > "WHERE" ... > "WINDOW" ... > "(" ... > "NATURAL" ... > "JOIN" ... > "INNER" ... > "LEFT" ... > "RIGHT" ... > "FULL" ... > "CROSS" ... > "," ... > "OUTER" ... > {code} > When I do not alias the {{versioned_rates}} table everything works as > expected. Therefore query like just runs: > {code} > SELECT p.name, p.qty * versioned_rates.rate AS price, p.`tstamp` FROM > Products p JOIN versioned_rates FOR SYSTEM_TIME AS OF p.`tstamp` ON > p.currency = versioned_rates.currency; > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-19175) Tests in JoinITCase do not test BroadcastHashJoin
[ https://issues.apache.org/jira/browse/FLINK-19175?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Danny Chen updated FLINK-19175: --- Fix Version/s: 1.12.0 > Tests in JoinITCase do not test BroadcastHashJoin > - > > Key: FLINK-19175 > URL: https://issues.apache.org/jira/browse/FLINK-19175 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner, Tests >Reporter: Dawid Wysakowicz >Assignee: Danny Chen >Priority: Major > Labels: pull-request-available > Fix For: 1.12.0 > > > The tests in JoinITCase claim to test the {{BroadcastHashJoin}}, but they > actually do not. None of the tables used in the tests have proper statistics > therefore, none of the tables meet the threshold for the broadcast join. At > the same time the {{ShuffleHashJoin}} is not disabled, therefore they > silently fallback to {{ShuffleHashJoin}}. > In summary none (or at least not all of the tests) are executed for > BroadcastHashJoin, but are executed twice for ShuffleHashJoin. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Closed] (FLINK-20017) Improve the error message for join two windowed stream with time attributes
[ https://issues.apache.org/jira/browse/FLINK-20017?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Danny Chen closed FLINK-20017. -- Resolution: Fixed > Improve the error message for join two windowed stream with time attributes > --- > > Key: FLINK-20017 > URL: https://issues.apache.org/jira/browse/FLINK-20017 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Affects Versions: 1.11.2 >Reporter: Danny Chen >Assignee: Danny Chen >Priority: Major > Fix For: 1.12.0 > > > Reported from USER mailing list by Satyam ~ > Current for table: > {code:sql} > CREATE TABLE T0 ( > amount BIGINT, > ts TIMESTAMP(3), > watermark for ts as ts - INTERVAL '5' SECOND > ) WITH ( > 'connector' = 'values', > 'data-id' = '$mDataId', > 'bounded' = 'false' > ) > {code} > and query: > {code:sql} > WITH A AS ( > SELECT COUNT(*) AS ct, tumble_rowtime(ts, INTERVAL '1' MINUTE) as tm > FROM T0 GROUP BY tumble(ts, INTERVAL '1' MINUTE)) > select L.ct, R.ct, L.tm, R.tm from A as L left join A as R on L.tm = R.tm > {code} > throws stacktrace: > {code:noformat} > java.lang.RuntimeException: Error while applying rule > StreamExecIntervalJoinRule(in:LOGICAL,out:STREAM_PHYSICAL), args > [rel#320:FlinkLogicalJoin.LOGICAL.any.None: > 0.[NONE].[NONE](left=RelSubset#315,right=RelSubset#315,condition==($1, > $3),joinType=left)] > at > org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:256) > at > org.apache.calcite.plan.volcano.IterativeRuleDriver.drive(IterativeRuleDriver.java:58) > at > org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:510) > at > org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:312) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:64) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58) > at > scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) > at > scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) > at scala.collection.Iterator$class.foreach(Iterator.scala:891) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at > scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157) > at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57) > at > org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:163) > at > org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:79) > at > org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77) > at > org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:286) > at > org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:165) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1261) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:702) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:1065) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:664) > at > org.apache.flink.table.planner.runtime.stream.sql.TableSourceITCase.testXXX(TableSourceITCase.scala:110) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at >
[jira] [Commented] (FLINK-20017) Improve the error message for join two windowed stream with time attributes
[ https://issues.apache.org/jira/browse/FLINK-20017?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17237223#comment-17237223 ] Danny Chen commented on FLINK-20017: The test case below works ok in master code now, so it is actually fixed, i will just close this issue. {code:java} @Test def testStreamWindowJoin(): Unit = { // data set val dataA = List( rowOf(2L, LocalDateTime.parse("1970-01-01T00:00:01")), rowOf(3L, LocalDateTime.parse("1970-01-01T00:00:04")), rowOf(4L, LocalDateTime.parse("1970-01-01T00:00:08")), rowOf(5L, LocalDateTime.parse("1970-01-01T00:00:16")), rowOf(6L, LocalDateTime.parse("1970-01-01T00:00:32"))) val dataID = TestValuesTableFactory.registerData(dataA) val createTable = s""" |CREATE TABLE T0 ( | amount BIGINT, | ts TIMESTAMP(3), | watermark for ts as ts - INTERVAL '5' SECOND |) WITH ( | 'connector' = 'values', | 'data-id' = '$dataID', | 'bounded' = 'false' |) |""".stripMargin tEnv.executeSql(createTable) val query = s""" |WITH A AS ( | SELECT COUNT(*) AS ct, tumble_rowtime(ts, INTERVAL '1' MINUTE) as tm |FROM T0 GROUP BY tumble(ts, INTERVAL '1' MINUTE)) |select L.ct, R.ct, L.tm from A as L left join A as R on L.tm = R.tm |""".stripMargin val result = CollectionUtil.iteratorToList(tEnv.executeSql(query).collect()) assertEquals(Seq("abc"), result) } {code} > Improve the error message for join two windowed stream with time attributes > --- > > Key: FLINK-20017 > URL: https://issues.apache.org/jira/browse/FLINK-20017 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Affects Versions: 1.11.2 >Reporter: Danny Chen >Assignee: Danny Chen >Priority: Major > Fix For: 1.12.0 > > > Reported from USER mailing list by Satyam ~ > Current for table: > {code:sql} > CREATE TABLE T0 ( > amount BIGINT, > ts TIMESTAMP(3), > watermark for ts as ts - INTERVAL '5' SECOND > ) WITH ( > 'connector' = 'values', > 'data-id' = '$mDataId', > 'bounded' = 'false' > ) > {code} > and query: > {code:sql} > WITH A AS ( > SELECT COUNT(*) AS ct, tumble_rowtime(ts, INTERVAL '1' MINUTE) as tm > FROM T0 GROUP BY tumble(ts, INTERVAL '1' MINUTE)) > select L.ct, R.ct, L.tm, R.tm from A as L left join A as R on L.tm = R.tm > {code} > throws stacktrace: > {code:noformat} > java.lang.RuntimeException: Error while applying rule > StreamExecIntervalJoinRule(in:LOGICAL,out:STREAM_PHYSICAL), args > [rel#320:FlinkLogicalJoin.LOGICAL.any.None: > 0.[NONE].[NONE](left=RelSubset#315,right=RelSubset#315,condition==($1, > $3),joinType=left)] > at > org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:256) > at > org.apache.calcite.plan.volcano.IterativeRuleDriver.drive(IterativeRuleDriver.java:58) > at > org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:510) > at > org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:312) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:64) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58) > at > scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) > at > scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) > at scala.collection.Iterator$class.foreach(Iterator.scala:891) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at > scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157) > at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57) > at > org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:163) > at > org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:79) > at > org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77) >
[jira] [Updated] (FLINK-20234) Json format supports SE/DE null elements of ARRAY type field
[ https://issues.apache.org/jira/browse/FLINK-20234?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Danny Chen updated FLINK-20234: --- Fix Version/s: (was: 1.12.0) > Json format supports SE/DE null elements of ARRAY type field > > > Key: FLINK-20234 > URL: https://issues.apache.org/jira/browse/FLINK-20234 > Project: Flink > Issue Type: Improvement > Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile), Table > SQL / Ecosystem >Affects Versions: 1.11.2 >Reporter: Danny Chen >Priority: Major > > Report my USER mailing list: > Hi, > I recently discovered some of our data has NULL values arriving in an > ARRAY column. This column is being consumed by Flink via the Kafka > connector Debezium format. We seem to be receiving NullPointerExceptions for > when these NULL values in the arrays arrive which restarts the source > operator in a loop. > Is there any way to not throw or to possibly filter out NULLs in an Array of > Strings in Flink? > We're somewhat stuck on how to solve this problem, we'd like to be defensive > about this on Flink's side. > Thanks! > The JSON: > {code:java} > { > "schema": { > "type": "struct", > "fields": [ > { > "type": "struct", > "fields": [ > { "type": "int32", "optional": false, "field": "id" }, > { > "type": "array", > "items": { "type": "string", "optional": true }, > "optional": false, > "field": "roles" > }, > ], > "optional": true, > "name": "db.public.data.Value", > "field": "before" > }, > { > "type": "struct", > "fields": [ > { "type": "int32", "optional": false, "field": "id" }, > { > "type": "array", > "items": { "type": "string", "optional": true }, > "optional": false, > "field": "roles" > }, > ], > "optional": true, > "name": "db.public.data.Value", > "field": "after" > }, > { > "type": "struct", > "fields": [ > { "type": "string", "optional": false, "field": "version" }, > { "type": "string", "optional": false, "field": "connector" }, > { "type": "string", "optional": false, "field": "name" }, > { "type": "int64", "optional": false, "field": "ts_ms" }, > { > "type": "string", > "optional": true, > "name": "io.debezium.data.Enum", > "version": 1, > "parameters": { "allowed": "true,last,false" }, > "default": "false", > "field": "snapshot" > }, > { "type": "string", "optional": false, "field": "db" }, > { "type": "string", "optional": false, "field": "schema" }, > { "type": "string", "optional": false, "field": "table" }, > { "type": "int64", "optional": true, "field": "txId" }, > { "type": "int64", "optional": true, "field": "lsn" }, > { "type": "int64", "optional": true, "field": "xmin" } > ], > "optional": false, > "name": "io.debezium.connector.postgresql.Source", > "field": "source" > }, > { "type": "string", "optional": false, "field": "op" }, > { "type": "int64", "optional": true, "field": "ts_ms" }, > { > "type": "struct", > "fields": [ > { "type": "string", "optional": false, "field": "id" }, > { "type": "int64", "optional": false, "field": "total_order" }, > { > "type": "int64", > "optional": false, > "field": "data_collection_order" > } > ], > "optional": true, > "field": "transaction" > } > ], > "optional": false, > "name": "db.public.data.Envelope" > }, > "payload": { > "before": null, > "after": { > "id": 76704, > "roles": [null], > }, > "source": { > "version": "1.3.0.Final", > "connector": "postgresql", > "name": "db", > "ts_ms": 1605739197360, > "snapshot": "true", > "db": "db", > "schema": "public", > "table": "data", > "txId": 1784, > "lsn": 1305806608, > "xmin": null > }, > "op": "r", > "ts_ms": 1605739197373, > "transaction": null > } > } > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-20234) Json format supports SE/DE null elements of ARRAY type field
Danny Chen created FLINK-20234: -- Summary: Json format supports SE/DE null elements of ARRAY type field Key: FLINK-20234 URL: https://issues.apache.org/jira/browse/FLINK-20234 Project: Flink Issue Type: Improvement Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) Affects Versions: 1.11.2 Reporter: Danny Chen Fix For: 1.12.0 Report my USER mailing list: Hi, I recently discovered some of our data has NULL values arriving in an ARRAY column. This column is being consumed by Flink via the Kafka connector Debezium format. We seem to be receiving NullPointerExceptions for when these NULL values in the arrays arrive which restarts the source operator in a loop. Is there any way to not throw or to possibly filter out NULLs in an Array of Strings in Flink? We're somewhat stuck on how to solve this problem, we'd like to be defensive about this on Flink's side. Thanks! -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-20200) SQL Hints are not supported in "Create View" syntax
[ https://issues.apache.org/jira/browse/FLINK-20200?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17234193#comment-17234193 ] Danny Chen commented on FLINK-20200: Thanks for reporting [~tinny], this should be a bug, i'm fixing it ~ > SQL Hints are not supported in "Create View" syntax > > > Key: FLINK-20200 > URL: https://issues.apache.org/jira/browse/FLINK-20200 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.11.2 >Reporter: shizhengchao >Priority: Major > > I have aready set the config option `table.dynamic-table-options.enabled` to > be true, but "SQL Hints" are not supported in View syntax. I got an error: > {code:java} > Exception in thread "main" java.lang.UnsupportedOperationException: class > org.apache.calcite.sql.SqlSyntax$6: SPECIAL > at org.apache.calcite.util.Util.needToImplement(Util.java:967) > at org.apache.calcite.sql.SqlSyntax$6.unparse(SqlSyntax.java:116) > at org.apache.calcite.sql.SqlOperator.unparse(SqlOperator.java:333) > at org.apache.calcite.sql.SqlDialect.unparseCall(SqlDialect.java:470) > at org.apache.calcite.sql.SqlCall.unparse(SqlCall.java:101) > at > org.apache.calcite.sql.SqlSelectOperator.unparse(SqlSelectOperator.java:176) > at org.apache.calcite.sql.SqlDialect.unparseCall(SqlDialect.java:470) > at org.apache.calcite.sql.SqlSelect.unparse(SqlSelect.java:246) > at org.apache.calcite.sql.SqlNode.toSqlString(SqlNode.java:151) > at org.apache.calcite.sql.SqlNode.toSqlString(SqlNode.java:173) > at org.apache.calcite.sql.SqlNode.toSqlString(SqlNode.java:182) > at > org.apache.flink.table.planner.operations.SqlToOperationConverter.getQuotedSqlString(SqlToOperationConverter.java:784) > at > org.apache.flink.table.planner.utils.Expander$Expanded.substitute(Expander.java:169) > at > org.apache.flink.table.planner.operations.SqlToOperationConverter.convertViewQuery(SqlToOperationConverter.java:694) > at > org.apache.flink.table.planner.operations.SqlToOperationConverter.convertCreateView(SqlToOperationConverter.java:665) > at > org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:228) > at > org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:684) > {code} > The sql code is as follows: > {code:java} > drop table if exists SourceA; > create table SourceA ( > idstring, > name string > ) with ( > 'connector' = 'kafka-0.11', > 'topic' = 'MyTopic', > 'properties.bootstrap.servers' = 'localhost:9092', > 'properties.group.id' = 'Test', > 'scan.startup.mode' = 'group-offsets', > 'format' = 'csv' > ); > drop table if exists print; > create table print ( > idstring, > name string > ) with ( > 'connector' = 'print' > ); > drop view if exists test_view; > create view test_view as > select > * > from SourceA /*+ OPTIONS('properties.group.id'='NewGroup') */; > insert into print > select * from test_view; > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-18988) Continuous query with LATERAL and LIMIT produces wrong result
[ https://issues.apache.org/jira/browse/FLINK-18988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17233552#comment-17233552 ] Danny Chen commented on FLINK-18988: Sorry for the delay reply, i will fix both master and 1.11. > Continuous query with LATERAL and LIMIT produces wrong result > - > > Key: FLINK-18988 > URL: https://issues.apache.org/jira/browse/FLINK-18988 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.11.1 >Reporter: Fabian Hueske >Assignee: Danny Chen >Priority: Critical > Labels: pull-request-available > > I was trying out the example queries provided in this blog post: > [https://materialize.io/lateral-joins-and-demand-driven-queries/] to check if > Flink supports the same and found that the queries were translated and > executed but produced the wrong result. > I used the SQL Client and Kafka (running at kafka:9092) to store the table > data. I executed the following statements: > {code:java} > -- create cities table > CREATE TABLE cities ( > name STRING NOT NULL, > state STRING NOT NULL, > pop INT NOT NULL > ) WITH ( > 'connector' = 'kafka', > 'topic' = 'cities', > 'properties.bootstrap.servers' = 'kafka:9092', > 'properties.group.id' = 'mygroup', > 'scan.startup.mode' = 'earliest-offset', > 'format' = 'json' > ); > -- fill cities table > INSERT INTO cities VALUES > ('Los_Angeles', 'CA', 3979576), > ('Phoenix', 'AZ', 1680992), > ('Houston', 'TX', 2320268), > ('San_Diego', 'CA', 1423851), > ('San_Francisco', 'CA', 881549), > ('New_York', 'NY', 8336817), > ('Dallas', 'TX', 1343573), > ('San_Antonio', 'TX', 1547253), > ('San_Jose', 'CA', 1021795), > ('Chicago', 'IL', 2695598), > ('Austin', 'TX', 978908); > -- execute query > SELECT state, name > FROM > (SELECT DISTINCT state FROM cities) states, > LATERAL ( > SELECT name, pop > FROM cities > WHERE state = states.state > ORDER BY pop > DESC LIMIT 3 > ); > -- result > state name >CA Los_Angeles >NY New_York >IL Chicago > -- expected result > state | name > --+- > TX | Dallas > AZ | Phoenix > IL | Chicago > TX | Houston > CA | San_Jose > NY | New_York > CA | San_Diego > CA | Los_Angeles > TX | San_Antonio > {code} > As you can see from the query result, Flink computes the top3 cities over all > states, not for every state individually. Hence, I assume that this is a bug > in the query optimizer or one of the rewriting rules. > There are two valid ways to solve this issue: > * Fixing the rewriting rules / optimizer (obviously preferred) > * Disabling this feature and throwing an exception -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (FLINK-18500) Make the legacy planner exception more clear when resolving computed columns types for schema
[ https://issues.apache.org/jira/browse/FLINK-18500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17233419#comment-17233419 ] Danny Chen edited comment on FLINK-18500 at 11/17/20, 8:14 AM: --- Thanks, i'm rebasing the code and fixing. We can revert the commit first. was (Author: danny0405): Thanks, i'm rebasing the code and fixing. > Make the legacy planner exception more clear when resolving computed columns > types for schema > - > > Key: FLINK-18500 > URL: https://issues.apache.org/jira/browse/FLINK-18500 > Project: Flink > Issue Type: Improvement > Components: Table SQL / API >Affects Versions: 1.11.0 >Reporter: Danny Chen >Assignee: Danny Chen >Priority: Major > Labels: pull-request-available > Fix For: 1.12.0 > > > From the user mail: > Hi, all: > i use zeppelin execute sql, FLink version is Flink 1.11 snapshot ,build from > branch release-1.11 ,commit is 334f35cbd6da754d8b5b294032cd84c858b1f973 > when the table type is datagen, Flink will thrown exception ,but the > exception message is null ; > My DDL is : > {code:sql} > CREATE TABLE datagen_dijie2 ( > f_sequence INT, > f_random INT, > f_random_str STRING, > ts AS localtimestamp, > WATERMARK FOR ts AS ts > ) WITH ( > 'connector' = 'datagen', > 'rows-per-second'='5', > 'fields.f_sequence.kind'='sequence', > 'fields.f_sequence.start'='1', > 'fields.f_sequence.end'='1000', > 'fields.f_random.min'='1', > 'fields.f_random.max'='1000', > 'fields.f_random_str.length'='10' > ); > {code} > My query sql is : > {code:sql} > select * from datagen_dijie2; > {code} > the exception is : > {noformat} > org.apache.flink.table.api.ValidationException: SQL validation failed. null > at > org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:146) > at > org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:108) > at > org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:187) > at > org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:658) > at > org.apache.zeppelin.flink.sql.AbstractStreamSqlJob.run(AbstractStreamSqlJob.java:102) > at > org.apache.zeppelin.flink.FlinkStreamSqlInterpreter.callInnerSelect(FlinkStreamSqlInterpreter.java:89) > at > org.apache.zeppelin.flink.FlinkSqlInterrpeter.callSelect(FlinkSqlInterrpeter.java:526) > at > org.apache.zeppelin.flink.FlinkSqlInterrpeter.callCommand(FlinkSqlInterrpeter.java:297) > at > org.apache.zeppelin.flink.FlinkSqlInterrpeter.runSqlList(FlinkSqlInterrpeter.java:191) > at > org.apache.zeppelin.flink.FlinkSqlInterrpeter.interpret(FlinkSqlInterrpeter.java:156) > at > org.apache.zeppelin.interpreter.LazyOpenInterpreter.interpret(LazyOpenInterpreter.java:110) > at > org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:776) > at > org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:668) > at org.apache.zeppelin.scheduler.Job.run(Job.java:172) at > org.apache.zeppelin.scheduler.AbstractScheduler.runJob(AbstractScheduler.java:130) > at > org.apache.zeppelin.scheduler.ParallelScheduler.lambda$runJobInScheduler$0(ParallelScheduler.java:39) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) Caused by: > java.lang.UnsupportedOperationException at > org.apache.flink.table.planner.ParserImpl.parseSqlExpression(ParserImpl.java:86) > at > org.apache.flink.table.api.internal.CatalogTableSchemaResolver.resolveExpressionDataType(CatalogTableSchemaResolver.java:119) > at > org.apache.flink.table.api.internal.CatalogTableSchemaResolver.resolve(CatalogTableSchemaResolver.java:83) > at > org.apache.flink.table.catalog.CatalogManager.resolveTableSchema(CatalogManager.java:380) > at > {noformat} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-18500) Make the legacy planner exception more clear when resolving computed columns types for schema
[ https://issues.apache.org/jira/browse/FLINK-18500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17233419#comment-17233419 ] Danny Chen commented on FLINK-18500: Thanks, i'm rebasing the code and fixing. > Make the legacy planner exception more clear when resolving computed columns > types for schema > - > > Key: FLINK-18500 > URL: https://issues.apache.org/jira/browse/FLINK-18500 > Project: Flink > Issue Type: Improvement > Components: Table SQL / API >Affects Versions: 1.11.0 >Reporter: Danny Chen >Assignee: Danny Chen >Priority: Major > Labels: pull-request-available > Fix For: 1.12.0 > > > From the user mail: > Hi, all: > i use zeppelin execute sql, FLink version is Flink 1.11 snapshot ,build from > branch release-1.11 ,commit is 334f35cbd6da754d8b5b294032cd84c858b1f973 > when the table type is datagen, Flink will thrown exception ,but the > exception message is null ; > My DDL is : > {code:sql} > CREATE TABLE datagen_dijie2 ( > f_sequence INT, > f_random INT, > f_random_str STRING, > ts AS localtimestamp, > WATERMARK FOR ts AS ts > ) WITH ( > 'connector' = 'datagen', > 'rows-per-second'='5', > 'fields.f_sequence.kind'='sequence', > 'fields.f_sequence.start'='1', > 'fields.f_sequence.end'='1000', > 'fields.f_random.min'='1', > 'fields.f_random.max'='1000', > 'fields.f_random_str.length'='10' > ); > {code} > My query sql is : > {code:sql} > select * from datagen_dijie2; > {code} > the exception is : > {noformat} > org.apache.flink.table.api.ValidationException: SQL validation failed. null > at > org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:146) > at > org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:108) > at > org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:187) > at > org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:658) > at > org.apache.zeppelin.flink.sql.AbstractStreamSqlJob.run(AbstractStreamSqlJob.java:102) > at > org.apache.zeppelin.flink.FlinkStreamSqlInterpreter.callInnerSelect(FlinkStreamSqlInterpreter.java:89) > at > org.apache.zeppelin.flink.FlinkSqlInterrpeter.callSelect(FlinkSqlInterrpeter.java:526) > at > org.apache.zeppelin.flink.FlinkSqlInterrpeter.callCommand(FlinkSqlInterrpeter.java:297) > at > org.apache.zeppelin.flink.FlinkSqlInterrpeter.runSqlList(FlinkSqlInterrpeter.java:191) > at > org.apache.zeppelin.flink.FlinkSqlInterrpeter.interpret(FlinkSqlInterrpeter.java:156) > at > org.apache.zeppelin.interpreter.LazyOpenInterpreter.interpret(LazyOpenInterpreter.java:110) > at > org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:776) > at > org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:668) > at org.apache.zeppelin.scheduler.Job.run(Job.java:172) at > org.apache.zeppelin.scheduler.AbstractScheduler.runJob(AbstractScheduler.java:130) > at > org.apache.zeppelin.scheduler.ParallelScheduler.lambda$runJobInScheduler$0(ParallelScheduler.java:39) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) Caused by: > java.lang.UnsupportedOperationException at > org.apache.flink.table.planner.ParserImpl.parseSqlExpression(ParserImpl.java:86) > at > org.apache.flink.table.api.internal.CatalogTableSchemaResolver.resolveExpressionDataType(CatalogTableSchemaResolver.java:119) > at > org.apache.flink.table.api.internal.CatalogTableSchemaResolver.resolve(CatalogTableSchemaResolver.java:83) > at > org.apache.flink.table.catalog.CatalogManager.resolveTableSchema(CatalogManager.java:380) > at > {noformat} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-20017) Improve the error message for join two windowed stream with time attributes
[ https://issues.apache.org/jira/browse/FLINK-20017?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Danny Chen updated FLINK-20017: --- Summary: Improve the error message for join two windowed stream with time attributes (was: Promote the error message for join two windowed stream with time attributes) > Improve the error message for join two windowed stream with time attributes > --- > > Key: FLINK-20017 > URL: https://issues.apache.org/jira/browse/FLINK-20017 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Affects Versions: 1.11.2 >Reporter: Danny Chen >Assignee: Danny Chen >Priority: Major > Fix For: 1.12.0 > > > Reported from USER mailing list by Satyam ~ > Current for table: > {code:sql} > CREATE TABLE T0 ( > amount BIGINT, > ts TIMESTAMP(3), > watermark for ts as ts - INTERVAL '5' SECOND > ) WITH ( > 'connector' = 'values', > 'data-id' = '$mDataId', > 'bounded' = 'false' > ) > {code} > and query: > {code:sql} > WITH A AS ( > SELECT COUNT(*) AS ct, tumble_rowtime(ts, INTERVAL '1' MINUTE) as tm > FROM T0 GROUP BY tumble(ts, INTERVAL '1' MINUTE)) > select L.ct, R.ct, L.tm, R.tm from A as L left join A as R on L.tm = R.tm > {code} > throws stacktrace: > {code:noformat} > java.lang.RuntimeException: Error while applying rule > StreamExecIntervalJoinRule(in:LOGICAL,out:STREAM_PHYSICAL), args > [rel#320:FlinkLogicalJoin.LOGICAL.any.None: > 0.[NONE].[NONE](left=RelSubset#315,right=RelSubset#315,condition==($1, > $3),joinType=left)] > at > org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:256) > at > org.apache.calcite.plan.volcano.IterativeRuleDriver.drive(IterativeRuleDriver.java:58) > at > org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:510) > at > org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:312) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:64) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58) > at > scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) > at > scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) > at scala.collection.Iterator$class.foreach(Iterator.scala:891) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at > scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157) > at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57) > at > org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:163) > at > org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:79) > at > org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77) > at > org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:286) > at > org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:165) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1261) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:702) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:1065) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:664) > at > org.apache.flink.table.planner.runtime.stream.sql.TableSourceITCase.testXXX(TableSourceITCase.scala:110) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) > at >
[jira] [Updated] (FLINK-20142) Update the document for CREATE TABLE LIKE that source table from different catalog is supported
[ https://issues.apache.org/jira/browse/FLINK-20142?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Danny Chen updated FLINK-20142: --- Fix Version/s: 1.12.0 > Update the document for CREATE TABLE LIKE that source table from different > catalog is supported > --- > > Key: FLINK-20142 > URL: https://issues.apache.org/jira/browse/FLINK-20142 > Project: Flink > Issue Type: Improvement > Components: Documentation, Table SQL / API >Affects Versions: 1.11.2 >Reporter: Danny Chen >Priority: Major > Fix For: 1.12.0 > > > The confusion from the [USER mailing > list|http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/CREATE-TABLE-LIKE-clause-from-different-catalog-or-database-td39364.html]: > Hi, > Is it disallowed to refer to a table from different databases or catalogs > when someone creates a table? > According to [1], there's no way to refer to tables belonging to different > databases or catalogs. > [1] > https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/create.html#create-table > Best, > Dongwon -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-20142) Update the document for CREATE TABLE LIKE that source table from different catalog is supported
Danny Chen created FLINK-20142: -- Summary: Update the document for CREATE TABLE LIKE that source table from different catalog is supported Key: FLINK-20142 URL: https://issues.apache.org/jira/browse/FLINK-20142 Project: Flink Issue Type: Improvement Components: Documentation Affects Versions: 1.11.2 Reporter: Danny Chen The confusion from the USER mailing list: Hi, Is it disallowed to refer to a table from different databases or catalogs when someone creates a table? According to [1], there's no way to refer to tables belonging to different databases or catalogs. [1] https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/create.html#create-table Best, Dongwon -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-20017) Promote the error message for join two windowed stream with time attributes
Danny Chen created FLINK-20017: -- Summary: Promote the error message for join two windowed stream with time attributes Key: FLINK-20017 URL: https://issues.apache.org/jira/browse/FLINK-20017 Project: Flink Issue Type: Bug Components: Table SQL / API Affects Versions: 1.11.2 Reporter: Danny Chen Fix For: 1.12.0 Current for table: {code:sql} CREATE TABLE T0 ( amount BIGINT, ts TIMESTAMP(3), watermark for ts as ts - INTERVAL '5' SECOND ) WITH ( 'connector' = 'values', 'data-id' = '$mDataId', 'bounded' = 'false' ) {code} and query: {code:sql} WITH A AS ( SELECT COUNT(*) AS ct, tumble_rowtime(ts, INTERVAL '1' MINUTE) as tm FROM T0 GROUP BY tumble(ts, INTERVAL '1' MINUTE)) select L.ct, R.ct, L.tm, R.tm from A as L left join A as R on L.tm = R.tm {code} throws stacktrace: {code:noformat} java.lang.RuntimeException: Error while applying rule StreamExecIntervalJoinRule(in:LOGICAL,out:STREAM_PHYSICAL), args [rel#320:FlinkLogicalJoin.LOGICAL.any.None: 0.[NONE].[NONE](left=RelSubset#315,right=RelSubset#315,condition==($1, $3),joinType=left)] at org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:256) at org.apache.calcite.plan.volcano.IterativeRuleDriver.drive(IterativeRuleDriver.java:58) at org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:510) at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:312) at org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:64) at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62) at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58) at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157) at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57) at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:163) at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:79) at org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77) at org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:286) at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:165) at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1261) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:702) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:1065) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:664) at org.apache.flink.table.planner.runtime.stream.sql.TableSourceITCase.testXXX(TableSourceITCase.scala:110) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) at org.junit.rules.ExpectedException$ExpectedExceptionStatement.evaluate(ExpectedException.java:239) at
[jira] [Updated] (FLINK-20017) Promote the error message for join two windowed stream with time attributes
[ https://issues.apache.org/jira/browse/FLINK-20017?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Danny Chen updated FLINK-20017: --- Description: Reported from USER mailing list by Satyam ~ Current for table: {code:sql} CREATE TABLE T0 ( amount BIGINT, ts TIMESTAMP(3), watermark for ts as ts - INTERVAL '5' SECOND ) WITH ( 'connector' = 'values', 'data-id' = '$mDataId', 'bounded' = 'false' ) {code} and query: {code:sql} WITH A AS ( SELECT COUNT(*) AS ct, tumble_rowtime(ts, INTERVAL '1' MINUTE) as tm FROM T0 GROUP BY tumble(ts, INTERVAL '1' MINUTE)) select L.ct, R.ct, L.tm, R.tm from A as L left join A as R on L.tm = R.tm {code} throws stacktrace: {code:noformat} java.lang.RuntimeException: Error while applying rule StreamExecIntervalJoinRule(in:LOGICAL,out:STREAM_PHYSICAL), args [rel#320:FlinkLogicalJoin.LOGICAL.any.None: 0.[NONE].[NONE](left=RelSubset#315,right=RelSubset#315,condition==($1, $3),joinType=left)] at org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:256) at org.apache.calcite.plan.volcano.IterativeRuleDriver.drive(IterativeRuleDriver.java:58) at org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:510) at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:312) at org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:64) at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62) at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58) at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157) at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57) at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:163) at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:79) at org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77) at org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:286) at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:165) at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1261) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:702) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:1065) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:664) at org.apache.flink.table.planner.runtime.stream.sql.TableSourceITCase.testXXX(TableSourceITCase.scala:110) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) at org.junit.rules.ExpectedException$ExpectedExceptionStatement.evaluate(ExpectedException.java:239) at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48) at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55) at org.junit.rules.RunRules.evaluate(RunRules.java:20) at
[jira] [Updated] (FLINK-19949) Unescape CSV format line and field delimiter character
[ https://issues.apache.org/jira/browse/FLINK-19949?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Danny Chen updated FLINK-19949: --- Description: We should unescape the line delimiter characters first because the DDL can be read from a file. So that the new line "\n" in the DDL options was recognized as 2 characters. While what user want is actually the invisible new line character. {code:sql} create table t1( ... ) with ( 'format' = 'csv', 'csv.line-delimiter' = '\n' ... ) {code} Same as the field delimiter. was: We should unescape the line delimiter characters first because the DDL can be read from a file. So that the new line "\n" in the DDL options was recognized as 2 characters. While what user want is actually the invisible new line character. {code:sql} create table t1( ... ) with ( 'format' = 'csv', 'csv.line-delimiter' = '\n' ... ) {code} > Unescape CSV format line and field delimiter character > -- > > Key: FLINK-19949 > URL: https://issues.apache.org/jira/browse/FLINK-19949 > Project: Flink > Issue Type: Improvement > Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) >Affects Versions: 1.11.2 >Reporter: Danny Chen >Priority: Major > Labels: pull-request-available > Fix For: 1.12.0 > > > We should unescape the line delimiter characters first because the DDL can be > read from a file. So that the new line "\n" in the DDL options was recognized > as 2 characters. > While what user want is actually the invisible new line character. > {code:sql} > create table t1( > ... > ) with ( > 'format' = 'csv', > 'csv.line-delimiter' = '\n' > ... > ) > {code} > Same as the field delimiter. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-19949) Unescape CSV format line and field delimiter character
[ https://issues.apache.org/jira/browse/FLINK-19949?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Danny Chen updated FLINK-19949: --- Summary: Unescape CSV format line and field delimiter character (was: Unescape CSV format line delimiter character) > Unescape CSV format line and field delimiter character > -- > > Key: FLINK-19949 > URL: https://issues.apache.org/jira/browse/FLINK-19949 > Project: Flink > Issue Type: Improvement > Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) >Affects Versions: 1.11.2 >Reporter: Danny Chen >Priority: Major > Labels: pull-request-available > Fix For: 1.12.0 > > > We should unescape the line delimiter characters first because the DDL can be > read from a file. So that the new line "\n" in the DDL options was recognized > as 2 characters. > While what user want is actually the invisible new line character. > {code:sql} > create table t1( > ... > ) with ( > 'format' = 'csv', > 'csv.line-delimiter' = '\n' > ... > ) > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-19949) Unescape CSV format line delimiter character
[ https://issues.apache.org/jira/browse/FLINK-19949?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17225244#comment-17225244 ] Danny Chen commented on FLINK-19949: Yes, but the unicode has poor usability. As a user, there would bring in confusion when the same DDL works in the Java code but fails when written in a file. And the unescape way does not cause any regression. > Unescape CSV format line delimiter character > > > Key: FLINK-19949 > URL: https://issues.apache.org/jira/browse/FLINK-19949 > Project: Flink > Issue Type: Improvement > Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) >Affects Versions: 1.11.2 >Reporter: Danny Chen >Priority: Major > Fix For: 1.12.0 > > > We should unescape the line delimiter characters first because the DDL can be > read from a file. So that the new line "\n" in the DDL options was recognized > as 2 characters. > While what user want is actually the invisible new line character. > {code:sql} > create table t1( > ... > ) with ( > 'format' = 'csv', > 'csv.line-delimiter' = '\n' > ... > ) > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-19949) Unescape CSV format line delimiter character
Danny Chen created FLINK-19949: -- Summary: Unescape CSV format line delimiter character Key: FLINK-19949 URL: https://issues.apache.org/jira/browse/FLINK-19949 Project: Flink Issue Type: Improvement Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) Affects Versions: 1.11.2 Reporter: Danny Chen Fix For: 1.12.0 We should unescape the line delimiter characters first because the DDL can be read from a file. So that the new line "\n" in the DDL options was recognized as 2 characters. While what user want is actually the invisible new line character. {code:sql} create table t1( ... ) with ( 'format' = 'csv', 'csv.line-delimiter' = '\n' ... ) {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-19779) Remove the "record_" field name prefix for Confluent Avro format deserialization
[ https://issues.apache.org/jira/browse/FLINK-19779?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17220547#comment-17220547 ] Danny Chen commented on FLINK-19779: [~xiaozilong] Yes, the avro schema builder does not allow same name field names, even if they are in different scope (different layer). [~maver1ck] I have fixed the nullability and precision problem, please check. > Remove the "record_" field name prefix for Confluent Avro format > deserialization > > > Key: FLINK-19779 > URL: https://issues.apache.org/jira/browse/FLINK-19779 > Project: Flink > Issue Type: Bug > Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) >Affects Versions: 1.12.0 >Reporter: Danny Chen >Priority: Blocker > Labels: pull-request-available > Fix For: 1.12.0 > > > Reported by Maciej Bryński : > Problem is this is not compatible. I'm unable to read anything from Kafka > using Confluent Registry. Example: > I have data in Kafka with following value schema: > {code:java} > { > "type": "record", > "name": "myrecord", > "fields": [ > { > "name": "f1", > "type": "string" > } > ] > } > {code} > I'm creating table using this avro-confluent format: > {code:sql} > create table `test` ( > `f1` STRING > ) WITH ( > 'connector' = 'kafka', > 'topic' = 'test', > 'properties.bootstrap.servers' = 'localhost:9092', > 'properties.group.id' = 'test1234', >'scan.startup.mode' = 'earliest-offset', > 'format' = 'avro-confluent' > 'avro-confluent.schema-registry.url' = 'http://localhost:8081' > ); > {code} > When trying to select data I'm getting error: > {code:noformat} > SELECT * FROM test; > [ERROR] Could not execute SQL statement. Reason: > org.apache.avro.AvroTypeException: Found myrecord, expecting record, missing > required field record_f1 > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-19779) Remove the "record_" field name prefix for Confluent Avro format deserialization
Danny Chen created FLINK-19779: -- Summary: Remove the "record_" field name prefix for Confluent Avro format deserialization Key: FLINK-19779 URL: https://issues.apache.org/jira/browse/FLINK-19779 Project: Flink Issue Type: Bug Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) Affects Versions: 1.12.0 Reporter: Danny Chen Fix For: 1.12.0 Reported by Maciej Bryński : Problem is this is not compatible. I'm unable to read anything from Kafka using Confluent Registry. Example: I have data in Kafka with following value schema: {code:java} { "type": "record", "name": "myrecord", "fields": [ { "name": "f1", "type": "string" } ] } {code} I'm creating table using this avro-confluent format: {code:sql} create table `test` ( `f1` STRING ) WITH ( 'connector' = 'kafka', 'topic' = 'test', 'properties.bootstrap.servers' = 'localhost:9092', 'properties.group.id' = 'test1234', 'scan.startup.mode' = 'earliest-offset', 'format' = 'avro-confluent' 'avro-confluent.schema-registry.url' = 'http://localhost:8081' ); {code} When trying to select data I'm getting error: {code:noformat} SELECT * FROM test; [ERROR] Could not execute SQL statement. Reason: org.apache.avro.AvroTypeException: Found myrecord, expecting record, missing required field record_f1 {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-19668) Fix the plan regression by Calcite upgrade to 1.26
[ https://issues.apache.org/jira/browse/FLINK-19668?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17216424#comment-17216424 ] Danny Chen commented on FLINK-19668: Another case is `org.apache.flink.table.planner.plan.batch.sql.agg.WindowAggregateTest.testReturnTypeInferenceForWindowAgg` of blink planner. > Fix the plan regression by Calcite upgrade to 1.26 > -- > > Key: FLINK-19668 > URL: https://issues.apache.org/jira/browse/FLINK-19668 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.11.3 >Reporter: Danny Chen >Priority: Major > > See org.apache.flink.table.plan.TimeIndicatorConversionTest.testUnion, > {code:java} > @Test > def testUnion(): Unit = { > val util = streamTestUtil() > val t = util.addTable[(Long, Long, Int)]("MyTable", 'rowtime.rowtime, > 'long, 'int) > val result = t.unionAll(t).select('rowtime) > val expected = unaryNode( > "DataStreamCalc", > binaryNode( > "DataStreamUnion", > streamTableNode(t), > streamTableNode(t), > term("all", "true"), > term("union all", "rowtime, long, int") > ), > term("select", "rowtime") > ) > util.verifyTable(result, expected) > } > {code} > The regression is because the new plan has same cost with the old, thus, the > old was not picked as the best. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-19668) Fix the plan regression by Calcite upgrade to 1.26
Danny Chen created FLINK-19668: -- Summary: Fix the plan regression by Calcite upgrade to 1.26 Key: FLINK-19668 URL: https://issues.apache.org/jira/browse/FLINK-19668 Project: Flink Issue Type: Bug Components: API / Core Affects Versions: 1.11.3 Reporter: Danny Chen See org.apache.flink.table.plan.TimeIndicatorConversionTest.testUnion, {code:java} @Test def testUnion(): Unit = { val util = streamTestUtil() val t = util.addTable[(Long, Long, Int)]("MyTable", 'rowtime.rowtime, 'long, 'int) val result = t.unionAll(t).select('rowtime) val expected = unaryNode( "DataStreamCalc", binaryNode( "DataStreamUnion", streamTableNode(t), streamTableNode(t), term("all", "true"), term("union all", "rowtime, long, int") ), term("select", "rowtime") ) util.verifyTable(result, expected) } {code} The regression is because the new plan has same cost with the old, thus, the old was not picked as the best. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-19567) JoinITCase.testInnerJoinOutputWithPk is unstable for POJO type inference
[ https://issues.apache.org/jira/browse/FLINK-19567?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17212104#comment-17212104 ] Danny Chen commented on FLINK-19567: I encountered this problem when upgrade the planner Calcite version to 1.26.0, but the error should not be caused by the upgrade. > JoinITCase.testInnerJoinOutputWithPk is unstable for POJO type inference > > > Key: FLINK-19567 > URL: https://issues.apache.org/jira/browse/FLINK-19567 > Project: Flink > Issue Type: Bug > Components: Table SQL / Legacy Planner >Affects Versions: 1.11.2 >Reporter: Danny Chen >Priority: Major > > The > {{org.apache.flink.table.runtime.stream.table.JoinITCase.testInnerJoinOutputWithPk}} > is unstable, it fails randomly in the mvn test but success in the IDEA local > runner. > Here is the stacktrace: > {code:xml} > [ERROR] > testInnerJoinOutputWithPk(org.apache.flink.table.runtime.stream.table.JoinITCase) > Time elapsed: 0.044 s <<< ERROR! > org.apache.flink.table.codegen.CodeGenException: Incompatible types of > expression and result type. > Expression[GeneratedExpression(result$19166,isNull$19167,,GenericType,false)] > type is [GenericType], result type is > [GenericType] > at > org.apache.flink.table.codegen.CodeGenerator$$anonfun$generateResultExpression$2.apply(CodeGenerator.scala:379) > at > org.apache.flink.table.codegen.CodeGenerator$$anonfun$generateResultExpression$2.apply(CodeGenerator.scala:377) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) > at > org.apache.flink.table.codegen.CodeGenerator.generateResultExpression(CodeGenerator.scala:377) > at > org.apache.flink.table.codegen.CodeGenerator.generateConverterResultExpression(CodeGenerator.scala:295) > at > org.apache.flink.table.plan.nodes.datastream.StreamScan$class.generateConversionProcessFunction(StreamScan.scala:115) > at > org.apache.flink.table.plan.nodes.datastream.StreamScan$class.convertToInternalRow(StreamScan.scala:74) > at > org.apache.flink.table.plan.nodes.datastream.DataStreamScan.convertToInternalRow(DataStreamScan.scala:46) > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-19567) JoinITCase.testInnerJoinOutputWithPk is unstable for POJO type inference
Danny Chen created FLINK-19567: -- Summary: JoinITCase.testInnerJoinOutputWithPk is unstable for POJO type inference Key: FLINK-19567 URL: https://issues.apache.org/jira/browse/FLINK-19567 Project: Flink Issue Type: Bug Components: Table SQL / Legacy Planner Affects Versions: 1.11.2 Reporter: Danny Chen The {{org.apache.flink.table.runtime.stream.table.JoinITCase.testInnerJoinOutputWithPk}} is unstable, it fails randomly in the mvn test but success in the IDEA local runner. Here is the stacktrace: {code:xml} [ERROR] testInnerJoinOutputWithPk(org.apache.flink.table.runtime.stream.table.JoinITCase) Time elapsed: 0.044 s <<< ERROR! org.apache.flink.table.codegen.CodeGenException: Incompatible types of expression and result type. Expression[GeneratedExpression(result$19166,isNull$19167,,GenericType,false)] type is [GenericType], result type is [GenericType] at org.apache.flink.table.codegen.CodeGenerator$$anonfun$generateResultExpression$2.apply(CodeGenerator.scala:379) at org.apache.flink.table.codegen.CodeGenerator$$anonfun$generateResultExpression$2.apply(CodeGenerator.scala:377) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at org.apache.flink.table.codegen.CodeGenerator.generateResultExpression(CodeGenerator.scala:377) at org.apache.flink.table.codegen.CodeGenerator.generateConverterResultExpression(CodeGenerator.scala:295) at org.apache.flink.table.plan.nodes.datastream.StreamScan$class.generateConversionProcessFunction(StreamScan.scala:115) at org.apache.flink.table.plan.nodes.datastream.StreamScan$class.convertToInternalRow(StreamScan.scala:74) at org.apache.flink.table.plan.nodes.datastream.DataStreamScan.convertToInternalRow(DataStreamScan.scala:46) {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-16579) Upgrade Calcite version to 1.26 for Flink SQL
[ https://issues.apache.org/jira/browse/FLINK-16579?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Danny Chen updated FLINK-16579: --- Summary: Upgrade Calcite version to 1.26 for Flink SQL (was: Upgrade Calcite version to 1.23 for Flink SQL) > Upgrade Calcite version to 1.26 for Flink SQL > - > > Key: FLINK-16579 > URL: https://issues.apache.org/jira/browse/FLINK-16579 > Project: Flink > Issue Type: Task > Components: Table SQL / Planner >Reporter: Jark Wu >Priority: Major > > A taks to upgrade Calcite version to 1.23. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-19175) Tests in JoinITCase do not test BroadcastHashJoin
[ https://issues.apache.org/jira/browse/FLINK-19175?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17193344#comment-17193344 ] Danny Chen commented on FLINK-19175: Nice catch, [~dwysakowicz], can i take this issue ? > Tests in JoinITCase do not test BroadcastHashJoin > - > > Key: FLINK-19175 > URL: https://issues.apache.org/jira/browse/FLINK-19175 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner, Tests >Reporter: Dawid Wysakowicz >Priority: Major > > The tests in JoinITCase claim to test the {{BroadcastHashJoin}}, but they > actually do not. None of the tables used in the tests have proper statistics > therefore, none of the tables meet the threshold for the broadcast join. At > the same time the {{ShuffleHashJoin}} is not disabled, therefore they > silently fallback to {{ShuffleHashJoin}}. > In summary none (or at least not all of the tests) are executed for > BroadcastHashJoin, but are executed twice for ShuffleHashJoin. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-19133) User provided kafka partitioners are not initialized correctly
[ https://issues.apache.org/jira/browse/FLINK-19133?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17190573#comment-17190573 ] Danny Chen commented on FLINK-19133: Hi, [~dwysakowicz], may i take this issue ~ > User provided kafka partitioners are not initialized correctly > -- > > Key: FLINK-19133 > URL: https://issues.apache.org/jira/browse/FLINK-19133 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.11.1 >Reporter: Dawid Wysakowicz >Assignee: Dawid Wysakowicz >Priority: Blocker > Fix For: 1.12.0, 1.11.2 > > > Reported in the ML: > https://lists.apache.org/thread.html/r94275a7314d44154eb1ac16237906e0f097e8a9d8a5a937e8dcb5e85%40%3Cdev.flink.apache.org%3E > If a user provides a partitioner in combination with SerializationSchema it > is not initialized correctly and has no access to the parallel instance index > or number of parallel instances. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-19127) Provide a replacement of StreamExecutionEnvironment.createRemoteEnvironment for TableEnvironment
[ https://issues.apache.org/jira/browse/FLINK-19127?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17190505#comment-17190505 ] Danny Chen commented on FLINK-19127: Thanks, indeed, notebook like zeppelin may need this ~ > Provide a replacement of StreamExecutionEnvironment.createRemoteEnvironment > for TableEnvironment > > > Key: FLINK-19127 > URL: https://issues.apache.org/jira/browse/FLINK-19127 > Project: Flink > Issue Type: Improvement > Components: Table SQL / API >Reporter: Timo Walther >Priority: Major > > Connecting to a remote cluster from the unified TableEnvironment is neither > tested nor documented. Since StreamExecutionEnvironment is not necessary > anymore, users should be able to do the same in TableEnvironment easily. This > is in particular useful for interactive sessions that run in an IDE. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-19127) Provide a replacement of StreamExecutionEnvironment.createRemoteEnvironment for TableEnvironment
[ https://issues.apache.org/jira/browse/FLINK-19127?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17189887#comment-17189887 ] Danny Chen commented on FLINK-19127: Is there any real use cases that can be shared here ? Who will run interactive sessions that run in an IDE ? for testing ? > Provide a replacement of StreamExecutionEnvironment.createRemoteEnvironment > for TableEnvironment > > > Key: FLINK-19127 > URL: https://issues.apache.org/jira/browse/FLINK-19127 > Project: Flink > Issue Type: Improvement > Components: Table SQL / API >Reporter: Timo Walther >Priority: Major > > Connecting to a remote cluster from the unified TableEnvironment is neither > tested nor documented. Since StreamExecutionEnvironment is not necessary > anymore, users should be able to do the same in TableEnvironment easily. This > is in particular useful for interactive sessions that run in an IDE. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-16528) Support Limit push down for Kafka streaming sources
[ https://issues.apache.org/jira/browse/FLINK-16528?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17189016#comment-17189016 ] Danny Chen commented on FLINK-16528: What is the semantics that for an unbounded stream to have a fetch limit ? These two combination conflicts. > Support Limit push down for Kafka streaming sources > --- > > Key: FLINK-16528 > URL: https://issues.apache.org/jira/browse/FLINK-16528 > Project: Flink > Issue Type: New Feature > Components: Table SQL / Ecosystem, Table SQL / Planner >Reporter: Jark Wu >Assignee: Jark Wu >Priority: Major > Labels: usability > Fix For: 1.12.0 > > > Currently, a limit query {{SELECT * FROM kafka LIMIT 10}} will be translated > into TopN operator and will scan the full data in the source. However, > {{LIMIT}} is a very useful feature in SQL CLI to explore data in the source. > It doesn't make sense it never stop. > We can support such case in streaming mode (ignore the text format): > {code} > flink > SELECT * FROM kafka LIMIT 10; > kafka_key |user_name| lang | created_at > +-+--+- > 494227746231685121 | burncaniff | en | 2014-07-29 14:07:31.000 > 494227746214535169 | gu8tn | ja | 2014-07-29 14:07:31.000 > 494227746219126785 | pequitamedicen | es | 2014-07-29 14:07:31.000 > 494227746201931777 | josnyS | ht | 2014-07-29 14:07:31.000 > 494227746219110401 | Cafe510 | en | 2014-07-29 14:07:31.000 > 494227746210332673 | Da_JuanAnd_Only | en | 2014-07-29 14:07:31.000 > 494227746193956865 | Smile_Kidrauhl6 | pt | 2014-07-29 14:07:31.000 > 494227750426017793 | CashforeverCD | en | 2014-07-29 14:07:32.000 > 494227750396653569 | FilmArsivimiz | tr | 2014-07-29 14:07:32.000 > 494227750388256769 | jmolas | es | 2014-07-29 14:07:32.000 > (10 rows) > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-19108) Stop expanding the identifiers with scope aliased by the system with 'EXPR$' prefix
[ https://issues.apache.org/jira/browse/FLINK-19108?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Danny Chen updated FLINK-19108: --- Description: For query {code:sql} create view tmp_view as select * from ( select f0, row_number() over (partition by f0 order by f0 desc) as rowNum from source) -- the query would be aliased as "EXPR$1" where rowNum = 1 {code} When validation, the inner query would have alias assigned by the system with prefix "EXPR$1", when in the `Expander`, we replace the id in the inner query all with this prefix which is wrong because we do not add the alias to the inner query anymore. To solve the problem, skip the expanding of id with "EXPR$" just like how {{SqlUtil#deriveAliasFromOrdinal}} added it. This was introduced by FLINK-18750. was: For query {code:sql} create view tmp_view as select * from ( select f0, row_number() over (partition by f0 order by f0 desc) as rowNum from source) -- the query would be aliased as "EXPR$1" where rowNum = 1 {code} When validation, the inner query would have alias assigned by the system with prefix "EXPR$1", when in the `Expander`, we replace the id in the inner query all with this prefix which is wrong because we do not add the alias to the inner query anymore. To solve the problem, skip the expanding of id with "EXPR$" just like how {{SqlUtil#deriveAliasFromOrdinal}} added it. > Stop expanding the identifiers with scope aliased by the system with 'EXPR$' > prefix > --- > > Key: FLINK-19108 > URL: https://issues.apache.org/jira/browse/FLINK-19108 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Affects Versions: 1.12.0, 1.11.2 >Reporter: Danny Chen >Priority: Major > Fix For: 1.12.0, 1.11.2 > > > For query > {code:sql} > create view tmp_view as > select * from ( > select f0, > row_number() over (partition by f0 order by f0 desc) as rowNum > from source) -- the query would be aliased as "EXPR$1" > where rowNum = 1 > {code} > When validation, the inner query would have alias assigned by the system with > prefix "EXPR$1", when in the `Expander`, we replace the id in the inner query > all with this prefix which is wrong because we do not add the alias to the > inner query anymore. > To solve the problem, skip the expanding of id with "EXPR$" just like how > {{SqlUtil#deriveAliasFromOrdinal}} added it. > This was introduced by FLINK-18750. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-19108) Stop expanding the identifiers with scope aliased by the system with 'EXPR$' prefix
[ https://issues.apache.org/jira/browse/FLINK-19108?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Danny Chen updated FLINK-19108: --- Description: For query {code:sql} create view tmp_view as select * from ( select f0, row_number() over (partition by f0 order by f0 desc) as rowNum from source) -- the query would be aliased as "EXPR$1" where rowNum = 1 {code} When validation, the inner query would have alias assigned by the system with prefix "EXPR$1", when in the `Expander`, we replace the id in the inner query all with this prefix which is wrong because we do not add the alias to the inner query anymore. To solve the problem, skip the expanding of id with "EXPR$" just like how {{SqlUtil#deriveAliasFromOrdinal}} added it. was: For query {code:sql} create view tmp_view as select * from ( select f0, row_number() over (partition by f0 order by f0 desc) as rowNum from source) -- the query would be aliased as "EXPR$1" where rowNum = 1 {code} when validation, the inner would have query alias by the system with prefix "EXPR$1", when in the `Expander`, we replace the id in the inner query all with this prefix which is wrong because we do not add the alias to the inner query anymore. To solve the problem, skip the expanding of id with "EXPR$" just like how {{SqlUtil#deriveAliasFromOrdinal}} added it. > Stop expanding the identifiers with scope aliased by the system with 'EXPR$' > prefix > --- > > Key: FLINK-19108 > URL: https://issues.apache.org/jira/browse/FLINK-19108 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Affects Versions: 1.12.0, 1.11.2 >Reporter: Danny Chen >Priority: Major > Fix For: 1.12.0, 1.11.2 > > > For query > {code:sql} > create view tmp_view as > select * from ( > select f0, > row_number() over (partition by f0 order by f0 desc) as rowNum > from source) -- the query would be aliased as "EXPR$1" > where rowNum = 1 > {code} > When validation, the inner query would have alias assigned by the system with > prefix "EXPR$1", when in the `Expander`, we replace the id in the inner query > all with this prefix which is wrong because we do not add the alias to the > inner query anymore. > To solve the problem, skip the expanding of id with "EXPR$" just like how > {{SqlUtil#deriveAliasFromOrdinal}} added it. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-19108) Stop expanding the identifiers with scope aliased by the system with 'EXPR$' prefix
Danny Chen created FLINK-19108: -- Summary: Stop expanding the identifiers with scope aliased by the system with 'EXPR$' prefix Key: FLINK-19108 URL: https://issues.apache.org/jira/browse/FLINK-19108 Project: Flink Issue Type: Bug Components: Table SQL / API Affects Versions: 1.12.0, 1.11.2 Reporter: Danny Chen Fix For: 1.12.0, 1.11.2 For query {code:sql} create view tmp_view as select * from ( select f0, row_number() over (partition by f0 order by f0 desc) as rowNum from source) -- the query would be aliased as "EXPR$1" where rowNum = 1 {code} when validation, the inner would have query alias by the system with prefix "EXPR$1", when in the `Expander`, we replace the id in the inner query all with this prefix which is wrong because we do not add the alias to the inner query anymore. To solve the problem, skip the expanding of id with "EXPR$" just like how {{SqlUtil#deriveAliasFromOrdinal}} added it. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (FLINK-19099) consumer kafka message repeat
[ https://issues.apache.org/jira/browse/FLINK-19099?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17187592#comment-17187592 ] Danny Chen edited comment on FLINK-19099 at 8/31/20, 9:23 AM: -- Before FLINK-15221, the SQL Kafka connector only supports "at least once" semantic, where the records may duplicate when there are failures. You can use data stream instead. was (Author: danny0405): Before Flink-15221, the SQL Kafka connector only supports "at least once" semantic, where the records may duplicate when there are failures. You can use data stream instead. > consumer kafka message repeat > - > > Key: FLINK-19099 > URL: https://issues.apache.org/jira/browse/FLINK-19099 > Project: Flink > Issue Type: Bug > Components: API / DataStream, Connectors / Kafka >Affects Versions: 1.11.0 >Reporter: zouwenlong >Priority: Major > > when taksmanager be killed ,my job consume some message , but the offset in > not commit, > then restart it ,my job consume kafka message repeat, I used checkpoint and > set 5 seconds , > I think this is a very common problem,how to solve this problem? -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-19099) consumer kafka message repeat
[ https://issues.apache.org/jira/browse/FLINK-19099?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17187592#comment-17187592 ] Danny Chen commented on FLINK-19099: Before Flink-15221, the SQL Kafka connector only supports "at least once" semantic, where the records may duplicate when there are failures. You can use data stream instead. > consumer kafka message repeat > - > > Key: FLINK-19099 > URL: https://issues.apache.org/jira/browse/FLINK-19099 > Project: Flink > Issue Type: Bug > Components: API / DataStream, Connectors / Kafka >Affects Versions: 1.11.0 >Reporter: zouwenlong >Priority: Major > > when taksmanager be killed ,my job consume some message , but the offset in > not commit, > then restart it ,my job consume kafka message repeat, I used checkpoint and > set 5 seconds , > I think this is a very common problem,how to solve this problem? -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-19101) The SelectivityEstimator throw an NullPointerException when convertValueInterval with string type
[ https://issues.apache.org/jira/browse/FLINK-19101?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17187586#comment-17187586 ] Danny Chen commented on FLINK-19101: You may need to provide the data set of the table so that we can re-produce the problem. > The SelectivityEstimator throw an NullPointerException when > convertValueInterval with string type > - > > Key: FLINK-19101 > URL: https://issues.apache.org/jira/browse/FLINK-19101 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.10.2, 1.11.1 >Reporter: fa zheng >Priority: Critical > Fix For: 1.12.0 > > > For a SQL : > {code:sql} > select t1.a, t2.b from t1 join t2 on t1.b=t2.b and t1.b<'2' > {code} > It will throw java.lang.NullPointerException because SelectivityEstimator > convert character value interval to string without considering null > situation. > {code:scala} > def convertValueInterval( > interval: ValueInterval, > typeFamily: RelDataTypeFamily): ValueInterval = { > require(interval != null && typeFamily != null) > interval match { > case ValueInterval.empty | ValueInterval.infinite => interval > case _ => > val (lower, includeLower) = interval match { > case li: WithLower => (li.lower, li.includeLower) > case _ => (null, false) > } > val (upper, includeUpper) = interval match { > case ui: WithUpper => (ui.upper, ui.includeUpper) > case _ => (null, false) > } > typeFamily match { > case SqlTypeFamily.NUMERIC | SqlTypeFamily.BOOLEAN | > SqlTypeFamily.DATE | >SqlTypeFamily.TIME | SqlTypeFamily.TIMESTAMP => > ValueInterval( > comparableToDouble(lower), > comparableToDouble(upper), > includeLower, > includeUpper) > case SqlTypeFamily.CHARACTER => > ValueInterval( > lower.toString, //It can't call toString directly > upper.toString //It can't call toString directly > includeLower, > includeUpper) > case _ => throw new UnsupportedOperationException(s"Unsupported > typeFamily: $typeFamily") > } > } > } > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-19098) Make Rowdata converters public
[ https://issues.apache.org/jira/browse/FLINK-19098?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17187585#comment-17187585 ] Danny Chen commented on FLINK-19098: Abstract some common part of the code to reuse is usually a good idea, and remember to mark them as {{@Internal}} because it is only used as a tool for connectors. > Make Rowdata converters public > -- > > Key: FLINK-19098 > URL: https://issues.apache.org/jira/browse/FLINK-19098 > Project: Flink > Issue Type: Improvement > Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile), Table > SQL / Ecosystem >Affects Versions: 1.11.0 >Reporter: Brian Zhou >Priority: Major > > Some of the Rowdata converters(SeDer between Rowdata and format objects like > GenericRecord/JsonNode) are private or package-private (like Json), this is > not easy for other third-party connector projects to utilize to implement its > own format factory in Table API. > For example, Pravega connector is now developing a schema-registry-based > format factory. The [Pravega schema > registry|https://github.com/pravega/schema-registry] is a rest service > similar with confluent registry , but it can help to serialize/deserialize > json/avro/protobuf/custom format data. It will help a lot if these converters > are public. > Noticed in FLINK-16048, we have already moved the avro converters out and > made them public. Similarly, it should be safe to make at least json and csv > format converters public. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-19048) support line format for table connector
[ https://issues.apache.org/jira/browse/FLINK-19048?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17185080#comment-17185080 ] Danny Chen commented on FLINK-19048: It seems a duplicate of FLINK-14356 > support line format for table connector > --- > > Key: FLINK-19048 > URL: https://issues.apache.org/jira/browse/FLINK-19048 > Project: Flink > Issue Type: New Feature > Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile), Table > SQL / Ecosystem >Reporter: badqiu >Priority: Major > Attachments: LineFormatFactory.java, > LineRowDataDeserializationSchema.java, LineRowDeserializationSchema.java, > LineRowFormatFactory.java, LineRowSchemaConverter.java, > LineRowSerializationSchema.java > > > Native string data format. No data conversion is done. > This format is particularly friendly to data without time attributes. With > UDF, writing your own data analysis will be much more convenient. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-18988) Continuous query with LATERAL and LIMIT produces wrong result
[ https://issues.apache.org/jira/browse/FLINK-18988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17180207#comment-17180207 ] Danny Chen commented on FLINK-18988: Hi, [~fhueske], i would like to take this issue ~ I guess you use the Blink planner right ? > Continuous query with LATERAL and LIMIT produces wrong result > - > > Key: FLINK-18988 > URL: https://issues.apache.org/jira/browse/FLINK-18988 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.11.1 >Reporter: Fabian Hueske >Priority: Critical > > I was trying out the example queries provided in this blog post: > [https://materialize.io/lateral-joins-and-demand-driven-queries/] to check if > Flink supports the same and found that the queries were translated and > executed but produced the wrong result. > I used the SQL Client and Kafka (running at kafka:9092) to store the table > data. I executed the following statements: > {code:java} > -- create cities table > CREATE TABLE cities ( > name STRING NOT NULL, > state STRING NOT NULL, > pop INT NOT NULL > ) WITH ( > 'connector' = 'kafka', > 'topic' = 'cities', > 'properties.bootstrap.servers' = 'kafka:9092', > 'properties.group.id' = 'mygroup', > 'scan.startup.mode' = 'earliest-offset', > 'format' = 'json' > ); > -- fill cities table > INSERT INTO cities VALUES > ('Los_Angeles', 'CA', 3979576), > ('Phoenix', 'AZ', 1680992), > ('Houston', 'TX', 2320268), > ('San_Diego', 'CA', 1423851), > ('San_Francisco', 'CA', 881549), > ('New_York', 'NY', 8336817), > ('Dallas', 'TX', 1343573), > ('San_Antonio', 'TX', 1547253), > ('San_Jose', 'CA', 1021795), > ('Chicago', 'IL', 2695598), > ('Austin', 'TX', 978908); > -- execute query > SELECT state, name > FROM > (SELECT DISTINCT state FROM cities) states, > LATERAL ( > SELECT name, pop > FROM cities > WHERE state = states.state > ORDER BY pop > DESC LIMIT 3 > ); > -- result > state name >CA Los_Angeles >NY New_York >IL Chicago > -- expected result > state | name > --+- > TX | Dallas > AZ | Phoenix > IL | Chicago > TX | Houston > CA | San_Jose > NY | New_York > CA | San_Diego > CA | Los_Angeles > TX | San_Antonio > {code} > As you can see from the query result, Flink computes the top3 cities over all > states, not for every state individually. Hence, I assume that this is a bug > in the query optimizer or one of the rewriting rules. > There are two valid ways to solve this issue: > * Fixing the rewriting rules / optimizer (obviously preferred) > * Disabling this feature and throwing an exception -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-18840) Support StatementSet with DataStream API
[ https://issues.apache.org/jira/browse/FLINK-18840?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17174121#comment-17174121 ] Danny Chen commented on FLINK-18840: I'm a little confused by this request, what is the case that user converts a whole statement set into a {{DataStream}} ? Is there any cases that SQL can not cover but only {{DataStream}} could ? For the statement set itself, it is mainly designed for SQL, we can add some APIs to convert a {{DataStream}} into table so that the following statements can reference that table, somehow this is kind of unification of the API. > Support StatementSet with DataStream API > > > Key: FLINK-18840 > URL: https://issues.apache.org/jira/browse/FLINK-18840 > Project: Flink > Issue Type: New Feature > Components: Table SQL / API >Reporter: Timo Walther >Priority: Major > > Currently, users of the {{StreamTableEnvironment}} cannot not translate a > {{StatementSet}} to DataStream API. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-18750) SqlValidatorException thrown when select from a view which contains a UDTF call
[ https://issues.apache.org/jira/browse/FLINK-18750?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17168834#comment-17168834 ] Danny Chen commented on FLINK-18750: The root cause is that, the `LATERAL` keyword of the View expanded query was lost, CALCITE-4077 (release 1.24.0) actually fixed this problem, while we can fix this first in Flink. > SqlValidatorException thrown when select from a view which contains a UDTF > call > --- > > Key: FLINK-18750 > URL: https://issues.apache.org/jira/browse/FLINK-18750 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Affects Versions: 1.11.1 >Reporter: Wei Zhong >Priority: Major > > When executing such code: > > {code:java} > package com.example; > import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; > import org.apache.flink.table.api.EnvironmentSettings; > import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; > import org.apache.flink.table.functions.TableFunction; > public class TestUTDF { >public static class UDTF extends TableFunction { > public void eval(String input) { > collect(input); > } >} >public static void main(String[] args) { > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > StreamTableEnvironment tEnv = StreamTableEnvironment.create( > env, EnvironmentSettings.newInstance().useBlinkPlanner().build()); > tEnv.createTemporarySystemFunction("udtf", new UDTF()); > tEnv.createTemporaryView("source", tEnv.fromValues("a", "b", > "c").as("f0")); > String udtfCall = "SELECT S.f0, T.f1 FROM source as S, LATERAL > TABLE(udtf(f0)) as T(f1)"; > System.out.println(tEnv.explainSql(udtfCall)); > String createViewCall = "CREATE VIEW tmp_view AS" + udtfCall; > tEnv.executeSql(createViewCall); > System.out.println(tEnv.from("tmp_view").explain()); >} > } > {code} > Such a SqlValidatorException would be thrown: > > > {code:java} > == Abstract Syntax Tree Abstract Syntax Tree ==LogicalProject(f0=[$0], > f1=[$1])+- LogicalCorrelate(correlation=[$cor0], joinType=[inner], > requiredColumns=[{0}]) :- LogicalProject(f0=[AS($0, _UTF-16LE'f0')]) : > +- LogicalValues(tuples=[[{ _UTF-16LE'a' }, { _UTF-16LE'b' }, { _UTF-16LE'c' > }]]) +- LogicalTableFunctionScan(invocation=[udtf($cor0.f0)], > rowType=[RecordType(VARCHAR(2147483647) EXPR$0)]) > == Optimized Logical Plan ==Correlate(invocation=[udtf($cor0.f0)], > correlate=[table(udtf($cor0.f0))], select=[f0,EXPR$0], > rowType=[RecordType(CHAR(1) f0, VARCHAR(2147483647) EXPR$0)], > joinType=[INNER])+- Calc(select=[f0]) +- Values(type=[RecordType(CHAR(1) > f0)], tuples=[[{ _UTF-16LE'a' }, { _UTF-16LE'b' }, { _UTF-16LE'c' }]]) > == Physical Execution Plan ==Stage 1 : Data Source content : Source: > Values(tuples=[[{ _UTF-16LE'a' }, { _UTF-16LE'b' }, { _UTF-16LE'c' }]]) > Stage 2 : Operator content : Calc(select=[f0]) ship_strategy : FORWARD > Stage 3 : Operator content : Correlate(invocation=[udtf($cor0.f0)], > correlate=[table(udtf($cor0.f0))], select=[f0,EXPR$0], > rowType=[RecordType(CHAR(1) f0, VARCHAR(2147483647) EXPR$0)], > joinType=[INNER]) ship_strategy : FORWARDException in thread "main" > org.apache.flink.table.api.ValidationException: SQL validation failed. From > line 4, column 14 to line 4, column 17: Column 'f0' not found in any table at > org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:146) > at > org.apache.flink.table.planner.calcite.FlinkPlannerImpl$ToRelContextImpl.expandView(FlinkPlannerImpl.scala:204) > at org.apache.calcite.plan.ViewExpanders$1.expandView(ViewExpanders.java:52) > at > org.apache.flink.table.planner.catalog.SqlCatalogViewTable.convertToRel(SqlCatalogViewTable.java:58) > at > org.apache.flink.table.planner.plan.schema.ExpandingPreparingTable.expand(ExpandingPreparingTable.java:59) > at > org.apache.flink.table.planner.plan.schema.ExpandingPreparingTable.toRel(ExpandingPreparingTable.java:55) > at > org.apache.calcite.rel.core.RelFactories$TableScanFactoryImpl.createScan(RelFactories.java:533) > at org.apache.calcite.tools.RelBuilder.scan(RelBuilder.java:1044) at > org.apache.calcite.tools.RelBuilder.scan(RelBuilder.java:1068) at > org.apache.flink.table.planner.plan.QueryOperationConverter$SingleRelVisitor.visit(QueryOperationConverter.java:349) > at > org.apache.flink.table.planner.plan.QueryOperationConverter$SingleRelVisitor.visit(QueryOperationConverter.java:152) > at > org.apache.flink.table.operations.CatalogQueryOperation.accept(CatalogQueryOperation.java:69) > at >
[jira] [Issue Comment Deleted] (FLINK-18750) SqlValidatorException thrown when select from a view which contains a UDTF call
[ https://issues.apache.org/jira/browse/FLINK-18750?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Danny Chen updated FLINK-18750: --- Comment: was deleted (was: The problem is because the example function code `UDTF` does not specify the return type, either through the {{#getResultType}} or through the {{DataTypeHint}}, thus during validation, the code goes to the {{SqlFunction#deriveType}}, when validation the return type of the collection table, the referenced column "f0" got a wrong scope (CatalogScope instead of JoinScope) thus it can not be resolved correctly. The scope has beed fixed in CALCITE-4077, which is since Calcite release 1.24.0.) > SqlValidatorException thrown when select from a view which contains a UDTF > call > --- > > Key: FLINK-18750 > URL: https://issues.apache.org/jira/browse/FLINK-18750 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Affects Versions: 1.11.1 >Reporter: Wei Zhong >Priority: Major > > When executing such code: > > {code:java} > package com.example; > import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; > import org.apache.flink.table.api.EnvironmentSettings; > import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; > import org.apache.flink.table.functions.TableFunction; > public class TestUTDF { >public static class UDTF extends TableFunction { > public void eval(String input) { > collect(input); > } >} >public static void main(String[] args) { > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > StreamTableEnvironment tEnv = StreamTableEnvironment.create( > env, EnvironmentSettings.newInstance().useBlinkPlanner().build()); > tEnv.createTemporarySystemFunction("udtf", new UDTF()); > tEnv.createTemporaryView("source", tEnv.fromValues("a", "b", > "c").as("f0")); > String udtfCall = "SELECT S.f0, T.f1 FROM source as S, LATERAL > TABLE(udtf(f0)) as T(f1)"; > System.out.println(tEnv.explainSql(udtfCall)); > String createViewCall = "CREATE VIEW tmp_view AS" + udtfCall; > tEnv.executeSql(createViewCall); > System.out.println(tEnv.from("tmp_view").explain()); >} > } > {code} > Such a SqlValidatorException would be thrown: > > > {code:java} > == Abstract Syntax Tree Abstract Syntax Tree ==LogicalProject(f0=[$0], > f1=[$1])+- LogicalCorrelate(correlation=[$cor0], joinType=[inner], > requiredColumns=[{0}]) :- LogicalProject(f0=[AS($0, _UTF-16LE'f0')]) : > +- LogicalValues(tuples=[[{ _UTF-16LE'a' }, { _UTF-16LE'b' }, { _UTF-16LE'c' > }]]) +- LogicalTableFunctionScan(invocation=[udtf($cor0.f0)], > rowType=[RecordType(VARCHAR(2147483647) EXPR$0)]) > == Optimized Logical Plan ==Correlate(invocation=[udtf($cor0.f0)], > correlate=[table(udtf($cor0.f0))], select=[f0,EXPR$0], > rowType=[RecordType(CHAR(1) f0, VARCHAR(2147483647) EXPR$0)], > joinType=[INNER])+- Calc(select=[f0]) +- Values(type=[RecordType(CHAR(1) > f0)], tuples=[[{ _UTF-16LE'a' }, { _UTF-16LE'b' }, { _UTF-16LE'c' }]]) > == Physical Execution Plan ==Stage 1 : Data Source content : Source: > Values(tuples=[[{ _UTF-16LE'a' }, { _UTF-16LE'b' }, { _UTF-16LE'c' }]]) > Stage 2 : Operator content : Calc(select=[f0]) ship_strategy : FORWARD > Stage 3 : Operator content : Correlate(invocation=[udtf($cor0.f0)], > correlate=[table(udtf($cor0.f0))], select=[f0,EXPR$0], > rowType=[RecordType(CHAR(1) f0, VARCHAR(2147483647) EXPR$0)], > joinType=[INNER]) ship_strategy : FORWARDException in thread "main" > org.apache.flink.table.api.ValidationException: SQL validation failed. From > line 4, column 14 to line 4, column 17: Column 'f0' not found in any table at > org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:146) > at > org.apache.flink.table.planner.calcite.FlinkPlannerImpl$ToRelContextImpl.expandView(FlinkPlannerImpl.scala:204) > at org.apache.calcite.plan.ViewExpanders$1.expandView(ViewExpanders.java:52) > at > org.apache.flink.table.planner.catalog.SqlCatalogViewTable.convertToRel(SqlCatalogViewTable.java:58) > at > org.apache.flink.table.planner.plan.schema.ExpandingPreparingTable.expand(ExpandingPreparingTable.java:59) > at > org.apache.flink.table.planner.plan.schema.ExpandingPreparingTable.toRel(ExpandingPreparingTable.java:55) > at > org.apache.calcite.rel.core.RelFactories$TableScanFactoryImpl.createScan(RelFactories.java:533) > at org.apache.calcite.tools.RelBuilder.scan(RelBuilder.java:1044) at > org.apache.calcite.tools.RelBuilder.scan(RelBuilder.java:1068) at >
[jira] [Commented] (FLINK-18750) SqlValidatorException thrown when select from a view which contains a UDTF call
[ https://issues.apache.org/jira/browse/FLINK-18750?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17168682#comment-17168682 ] Danny Chen commented on FLINK-18750: The problem is because the example function code `UDTF` does not specify the return type, either through the {{#getResultType}} or through the {{DataTypeHint}}, thus during validation, the code goes to the {{SqlFunction#deriveType}}, when validation the return type of the collection table, the referenced column "f0" got a wrong scope (CatalogScope instead of JoinScope) thus it can not be resolved correctly. The scope has beed fixed in CALCITE-4077, which is since Calcite release 1.24.0. > SqlValidatorException thrown when select from a view which contains a UDTF > call > --- > > Key: FLINK-18750 > URL: https://issues.apache.org/jira/browse/FLINK-18750 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Affects Versions: 1.11.1 >Reporter: Wei Zhong >Priority: Major > > When executing such code: > > {code:java} > package com.example; > import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; > import org.apache.flink.table.api.EnvironmentSettings; > import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; > import org.apache.flink.table.functions.TableFunction; > public class TestUTDF { >public static class UDTF extends TableFunction { > public void eval(String input) { > collect(input); > } >} >public static void main(String[] args) { > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > StreamTableEnvironment tEnv = StreamTableEnvironment.create( > env, EnvironmentSettings.newInstance().useBlinkPlanner().build()); > tEnv.createTemporarySystemFunction("udtf", new UDTF()); > tEnv.createTemporaryView("source", tEnv.fromValues("a", "b", > "c").as("f0")); > String udtfCall = "SELECT S.f0, T.f1 FROM source as S, LATERAL > TABLE(udtf(f0)) as T(f1)"; > System.out.println(tEnv.explainSql(udtfCall)); > String createViewCall = "CREATE VIEW tmp_view AS" + udtfCall; > tEnv.executeSql(createViewCall); > System.out.println(tEnv.from("tmp_view").explain()); >} > } > {code} > Such a SqlValidatorException would be thrown: > > > {code:java} > == Abstract Syntax Tree Abstract Syntax Tree ==LogicalProject(f0=[$0], > f1=[$1])+- LogicalCorrelate(correlation=[$cor0], joinType=[inner], > requiredColumns=[{0}]) :- LogicalProject(f0=[AS($0, _UTF-16LE'f0')]) : > +- LogicalValues(tuples=[[{ _UTF-16LE'a' }, { _UTF-16LE'b' }, { _UTF-16LE'c' > }]]) +- LogicalTableFunctionScan(invocation=[udtf($cor0.f0)], > rowType=[RecordType(VARCHAR(2147483647) EXPR$0)]) > == Optimized Logical Plan ==Correlate(invocation=[udtf($cor0.f0)], > correlate=[table(udtf($cor0.f0))], select=[f0,EXPR$0], > rowType=[RecordType(CHAR(1) f0, VARCHAR(2147483647) EXPR$0)], > joinType=[INNER])+- Calc(select=[f0]) +- Values(type=[RecordType(CHAR(1) > f0)], tuples=[[{ _UTF-16LE'a' }, { _UTF-16LE'b' }, { _UTF-16LE'c' }]]) > == Physical Execution Plan ==Stage 1 : Data Source content : Source: > Values(tuples=[[{ _UTF-16LE'a' }, { _UTF-16LE'b' }, { _UTF-16LE'c' }]]) > Stage 2 : Operator content : Calc(select=[f0]) ship_strategy : FORWARD > Stage 3 : Operator content : Correlate(invocation=[udtf($cor0.f0)], > correlate=[table(udtf($cor0.f0))], select=[f0,EXPR$0], > rowType=[RecordType(CHAR(1) f0, VARCHAR(2147483647) EXPR$0)], > joinType=[INNER]) ship_strategy : FORWARDException in thread "main" > org.apache.flink.table.api.ValidationException: SQL validation failed. From > line 4, column 14 to line 4, column 17: Column 'f0' not found in any table at > org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:146) > at > org.apache.flink.table.planner.calcite.FlinkPlannerImpl$ToRelContextImpl.expandView(FlinkPlannerImpl.scala:204) > at org.apache.calcite.plan.ViewExpanders$1.expandView(ViewExpanders.java:52) > at > org.apache.flink.table.planner.catalog.SqlCatalogViewTable.convertToRel(SqlCatalogViewTable.java:58) > at > org.apache.flink.table.planner.plan.schema.ExpandingPreparingTable.expand(ExpandingPreparingTable.java:59) > at > org.apache.flink.table.planner.plan.schema.ExpandingPreparingTable.toRel(ExpandingPreparingTable.java:55) > at > org.apache.calcite.rel.core.RelFactories$TableScanFactoryImpl.createScan(RelFactories.java:533) > at org.apache.calcite.tools.RelBuilder.scan(RelBuilder.java:1044) at > org.apache.calcite.tools.RelBuilder.scan(RelBuilder.java:1068) at >
[jira] [Created] (FLINK-18777) Supports schema registry catalog
Danny Chen created FLINK-18777: -- Summary: Supports schema registry catalog Key: FLINK-18777 URL: https://issues.apache.org/jira/browse/FLINK-18777 Project: Flink Issue Type: Bug Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) Affects Versions: 1.11.0 Reporter: Danny Chen Fix For: 1.12.0 Design doc: https://cwiki.apache.org/confluence/display/FLINK/FLIP-125%3A+Confluent+Schema+Registry+Catalog -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-16048) Support read/write confluent schema registry avro data from Kafka
[ https://issues.apache.org/jira/browse/FLINK-16048?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17164105#comment-17164105 ] Danny Chen commented on FLINK-16048: Personally i do not have a strong preference too, it seems most of us supports avro-confluent, let's use it. > Support read/write confluent schema registry avro data from Kafka > -- > > Key: FLINK-16048 > URL: https://issues.apache.org/jira/browse/FLINK-16048 > Project: Flink > Issue Type: Improvement > Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile), Table > SQL / Ecosystem >Affects Versions: 1.11.0 >Reporter: Leonard Xu >Assignee: Danny Chen >Priority: Major > Labels: pull-request-available, usability > Fix For: 1.12.0 > > > *The background* > I found SQL Kafka connector can not consume avro data that was serialized by > `KafkaAvroSerializer` and only can consume Row data with avro schema because > we use `AvroRowDeserializationSchema/AvroRowSerializationSchema` to se/de > data in `AvroRowFormatFactory`. > I think we should support this because `KafkaAvroSerializer` is very common > in Kafka. > and someone met same question in stackoverflow[1]. > [[1]https://stackoverflow.com/questions/56452571/caused-by-org-apache-avro-avroruntimeexception-malformed-data-length-is-negat/56478259|https://stackoverflow.com/questions/56452571/caused-by-org-apache-avro-avroruntimeexception-malformed-data-length-is-negat/56478259] > *The format details* > _The factory identifier (or format id)_ > There are 2 candidates now ~ > - {{avro-sr}}: the pattern borrowed from KSQL {{JSON_SR}} format [1] > - {{avro-confluent}}: the pattern borrowed from Clickhouse {{AvroConfluent}} > [2] > Personally i would prefer {{avro-sr}} because it is more concise and the > confluent is a company name which i think is not that suitable for a format > name. > _The format attributes_ > || Options || required || Remark || > | schema-registry.url | true | URL to connect to schema registry service | > | schema-registry.subject | false | Subject name to write to the Schema > Registry service, required for sink | -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-16048) Support read/write confluent schema registry avro data from Kafka
[ https://issues.apache.org/jira/browse/FLINK-16048?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17162464#comment-17162464 ] Danny Chen commented on FLINK-16048: The schema string can be inferred from the DDL table schema, just like what our existing avro format does. One thing user need to be in caution is that for the sink table, the fields nullability should be strictly same with the avro schema but Flink and Avro have different default nullability strategy (Flink default nullable & Avro default not). The Cloudera schema registry has different code path for schema registry, so i think there is no possibility it shares the same format id with confluent schema registry. But i'm still voting avro-sr if supporting Cloudera avro schema is not on our road map, the "confluent" is too verbose, Seth Wiesman has the same feeling. > Support read/write confluent schema registry avro data from Kafka > -- > > Key: FLINK-16048 > URL: https://issues.apache.org/jira/browse/FLINK-16048 > Project: Flink > Issue Type: Improvement > Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile), Table > SQL / Ecosystem >Affects Versions: 1.11.0 >Reporter: Leonard Xu >Assignee: Danny Chen >Priority: Major > Labels: pull-request-available, usability > Fix For: 1.12.0 > > > *The background* > I found SQL Kafka connector can not consume avro data that was serialized by > `KafkaAvroSerializer` and only can consume Row data with avro schema because > we use `AvroRowDeserializationSchema/AvroRowSerializationSchema` to se/de > data in `AvroRowFormatFactory`. > I think we should support this because `KafkaAvroSerializer` is very common > in Kafka. > and someone met same question in stackoverflow[1]. > [[1]https://stackoverflow.com/questions/56452571/caused-by-org-apache-avro-avroruntimeexception-malformed-data-length-is-negat/56478259|https://stackoverflow.com/questions/56452571/caused-by-org-apache-avro-avroruntimeexception-malformed-data-length-is-negat/56478259] > *The format details* > _The factory identifier (or format id)_ > There are 2 candidates now ~ > - {{avro-sr}}: the pattern borrowed from KSQL {{JSON_SR}} format [1] > - {{avro-confluent}}: the pattern borrowed from Clickhouse {{AvroConfluent}} > [2] > Personally i would prefer {{avro-sr}} because it is more concise and the > confluent is a company name which i think is not that suitable for a format > name. > _The format attributes_ > || Options || required || Remark || > | schema-registry.url | true | URL to connect to schema registry service | > | schema-registry.subject | false | Subject name to write to the Schema > Registry service, required for sink | -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-16048) Support read/write confluent schema registry avro data from Kafka
[ https://issues.apache.org/jira/browse/FLINK-16048?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Danny Chen updated FLINK-16048: --- Description: *The background* I found SQL Kafka connector can not consume avro data that was serialized by `KafkaAvroSerializer` and only can consume Row data with avro schema because we use `AvroRowDeserializationSchema/AvroRowSerializationSchema` to se/de data in `AvroRowFormatFactory`. I think we should support this because `KafkaAvroSerializer` is very common in Kafka. and someone met same question in stackoverflow[1]. [[1]https://stackoverflow.com/questions/56452571/caused-by-org-apache-avro-avroruntimeexception-malformed-data-length-is-negat/56478259|https://stackoverflow.com/questions/56452571/caused-by-org-apache-avro-avroruntimeexception-malformed-data-length-is-negat/56478259] *The format details* _The factory identifier (or format id)_ There are 2 candidates now ~ - {{avro-sr}}: the pattern borrowed from KSQL {{JSON_SR}} format [1] - {{avro-confluent}}: the pattern borrowed from Clickhouse {{AvroConfluent}} [2] Personally i would prefer {{avro-sr}} because it is more concise and the confluent is a company name which i think is not that suitable for a format name. _The format attributes_ || Options || required || Remark || | schema-registry.url | true | URL to connect to schema registry service | | schema-registry.subject | false | Subject name to write to the Schema Registry service, required for sink | was: *The background* I found SQL Kafka connector can not consume avro data that was serialized by `KafkaAvroSerializer` and only can consume Row data with avro schema because we use `AvroRowDeserializationSchema/AvroRowSerializationSchema` to se/de data in `AvroRowFormatFactory`. I think we should support this because `KafkaAvroSerializer` is very common in Kafka. and someone met same question in stackoverflow[1]. [[1]https://stackoverflow.com/questions/56452571/caused-by-org-apache-avro-avroruntimeexception-malformed-data-length-is-negat/56478259|https://stackoverflow.com/questions/56452571/caused-by-org-apache-avro-avroruntimeexception-malformed-data-length-is-negat/56478259] *The format details* _The factory identifier (or format id)_ There are 2 candidates now ~ - {{avro-sr}}: the pattern borrowed from KSQL {{JSON_SR}} format [1] - {{avro-confluent}}: the pattern borrowed from Clickhouse {{AvroConfluent}} [2] Personally i would prefer {{avro-sr}} because it is more concise and the confluent is a company name which i think is not that suitable for a format name. _The format attributes_ || Options || required || Remark || | schema-string | true | avro schema string used for (de)serialization | | schema-registry.url | true | URL to connect to schema registry service | | schema-registry.subject | false | Subject name to write to the Schema Registry service, required for sink | > Support read/write confluent schema registry avro data from Kafka > -- > > Key: FLINK-16048 > URL: https://issues.apache.org/jira/browse/FLINK-16048 > Project: Flink > Issue Type: Improvement > Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile), Table > SQL / Ecosystem >Affects Versions: 1.11.0 >Reporter: Leonard Xu >Assignee: Danny Chen >Priority: Major > Labels: pull-request-available, usability > Fix For: 1.12.0 > > > *The background* > I found SQL Kafka connector can not consume avro data that was serialized by > `KafkaAvroSerializer` and only can consume Row data with avro schema because > we use `AvroRowDeserializationSchema/AvroRowSerializationSchema` to se/de > data in `AvroRowFormatFactory`. > I think we should support this because `KafkaAvroSerializer` is very common > in Kafka. > and someone met same question in stackoverflow[1]. > [[1]https://stackoverflow.com/questions/56452571/caused-by-org-apache-avro-avroruntimeexception-malformed-data-length-is-negat/56478259|https://stackoverflow.com/questions/56452571/caused-by-org-apache-avro-avroruntimeexception-malformed-data-length-is-negat/56478259] > *The format details* > _The factory identifier (or format id)_ > There are 2 candidates now ~ > - {{avro-sr}}: the pattern borrowed from KSQL {{JSON_SR}} format [1] > - {{avro-confluent}}: the pattern borrowed from Clickhouse {{AvroConfluent}} > [2] > Personally i would prefer {{avro-sr}} because it is more concise and the > confluent is a company name which i think is not that suitable for a format > name. > _The format attributes_ > || Options || required || Remark || > | schema-registry.url | true | URL to connect to schema registry service | > | schema-registry.subject | false | Subject name to write to the Schema > Registry service, required for sink | -- This
[jira] [Comment Edited] (FLINK-16048) Support read/write confluent schema registry avro data from Kafka
[ https://issues.apache.org/jira/browse/FLINK-16048?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17160999#comment-17160999 ] Danny Chen edited comment on FLINK-16048 at 7/20/20, 8:35 AM: -- [~ykt836] Yes, we can infer the read schema from the DDL schema, the schema string is not necessary. [~anshul.bansal] We are planning to implement a schema registry catalog with what you do not need to create a table explicitly. [~dwysakowicz], this may be a problem, but do we plan to support cloudera schema registry ? was (Author: danny0405): [~ykt836] Yes, we can infer the read schema from the DDL schema, the schema string is not necessary. [~anshul.bansal] We are planning to implement a schema registry catalog with what you do not need to create a table explicitly. [~dwysakowicz], this may be a problem, but do we plan to support cloudra schema registry ? > Support read/write confluent schema registry avro data from Kafka > -- > > Key: FLINK-16048 > URL: https://issues.apache.org/jira/browse/FLINK-16048 > Project: Flink > Issue Type: Improvement > Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile), Table > SQL / Ecosystem >Affects Versions: 1.11.0 >Reporter: Leonard Xu >Assignee: Danny Chen >Priority: Major > Labels: pull-request-available, usability > Fix For: 1.12.0 > > > *The background* > I found SQL Kafka connector can not consume avro data that was serialized by > `KafkaAvroSerializer` and only can consume Row data with avro schema because > we use `AvroRowDeserializationSchema/AvroRowSerializationSchema` to se/de > data in `AvroRowFormatFactory`. > I think we should support this because `KafkaAvroSerializer` is very common > in Kafka. > and someone met same question in stackoverflow[1]. > [[1]https://stackoverflow.com/questions/56452571/caused-by-org-apache-avro-avroruntimeexception-malformed-data-length-is-negat/56478259|https://stackoverflow.com/questions/56452571/caused-by-org-apache-avro-avroruntimeexception-malformed-data-length-is-negat/56478259] > *The format details* > _The factory identifier (or format id)_ > There are 2 candidates now ~ > - {{avro-sr}}: the pattern borrowed from KSQL {{JSON_SR}} format [1] > - {{avro-confluent}}: the pattern borrowed from Clickhouse {{AvroConfluent}} > [2] > Personally i would prefer {{avro-sr}} because it is more concise and the > confluent is a company name which i think is not that suitable for a format > name. > _The format attributes_ > || Options || required || Remark || > | schema-string | true | avro schema string used for (de)serialization | > | schema-registry.url | true | URL to connect to schema registry service | > | schema-registry.subject | false | Subject name to write to the Schema > Registry service, required for sink | -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (FLINK-16048) Support read/write confluent schema registry avro data from Kafka
[ https://issues.apache.org/jira/browse/FLINK-16048?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17160999#comment-17160999 ] Danny Chen edited comment on FLINK-16048 at 7/20/20, 8:34 AM: -- [~ykt836] Yes, we can infer the read schema from the DDL schema, the schema string is not necessary. [~anshul.bansal] We are planning to implement a schema registry catalog with what you do not need to create a table explicitly. [~dwysakowicz], this may be a problem, but do we plan to support cloudra schema registry ? was (Author: danny0405): [~ykt836] This PR contributes 2 formats of schema registry avro, one way to remove the schema string is to pass in a schema id or the subject name, do the inference when the format was instantiated. We can fetch the avro schema string of the compile time but that was in the scope of schema inference, i think it is not good practice to let the format do a schema inference, the format se/de schema should be solid. The inference work should be done by the framework, either the DDL or the Catalog. [~dwysakowicz] Personally i agree the schema-string should not be user provided option, but we actually can not recover a schema string from a TableSchema, any good ideas ? One way i can think of is let user specify the subject name PLUS the schema version, what do you think ? > Support read/write confluent schema registry avro data from Kafka > -- > > Key: FLINK-16048 > URL: https://issues.apache.org/jira/browse/FLINK-16048 > Project: Flink > Issue Type: Improvement > Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile), Table > SQL / Ecosystem >Affects Versions: 1.11.0 >Reporter: Leonard Xu >Assignee: Danny Chen >Priority: Major > Labels: pull-request-available, usability > Fix For: 1.12.0 > > > *The background* > I found SQL Kafka connector can not consume avro data that was serialized by > `KafkaAvroSerializer` and only can consume Row data with avro schema because > we use `AvroRowDeserializationSchema/AvroRowSerializationSchema` to se/de > data in `AvroRowFormatFactory`. > I think we should support this because `KafkaAvroSerializer` is very common > in Kafka. > and someone met same question in stackoverflow[1]. > [[1]https://stackoverflow.com/questions/56452571/caused-by-org-apache-avro-avroruntimeexception-malformed-data-length-is-negat/56478259|https://stackoverflow.com/questions/56452571/caused-by-org-apache-avro-avroruntimeexception-malformed-data-length-is-negat/56478259] > *The format details* > _The factory identifier (or format id)_ > There are 2 candidates now ~ > - {{avro-sr}}: the pattern borrowed from KSQL {{JSON_SR}} format [1] > - {{avro-confluent}}: the pattern borrowed from Clickhouse {{AvroConfluent}} > [2] > Personally i would prefer {{avro-sr}} because it is more concise and the > confluent is a company name which i think is not that suitable for a format > name. > _The format attributes_ > || Options || required || Remark || > | schema-string | true | avro schema string used for (de)serialization | > | schema-registry.url | true | URL to connect to schema registry service | > | schema-registry.subject | false | Subject name to write to the Schema > Registry service, required for sink | -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (FLINK-16048) Support read/write confluent schema registry avro data from Kafka
[ https://issues.apache.org/jira/browse/FLINK-16048?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17160999#comment-17160999 ] Danny Chen edited comment on FLINK-16048 at 7/20/20, 8:13 AM: -- [~ykt836] This PR contributes 2 formats of schema registry avro, one way to remove the schema string is to pass in a schema id or the subject name, do the inference when the format was instantiated. We can fetch the avro schema string of the compile time but that was in the scope of schema inference, i think it is not good practice to let the format do a schema inference, the format se/de schema should be solid. The inference work should be done by the framework, either the DDL or the Catalog. [~dwysakowicz] Personally i agree the schema-string should not be user provided option, but we actually can not recover a schema string from a TableSchema, any good ideas ? One way i can think of is let user specify the subject name PLUS the schema version, what do you think ? was (Author: danny0405): [~ykt836] This PR contributes 2 formats of schema registry avro, one way to remove the schema string is to pass in a schema id or the subject name, do the inference when the format was instantiated. We can fetch the avro schema string of the compile time but that was in the scope of schema inference, i think it is not good practice to let the format do a schema inference, the format se/de schema should be solid. The inference work should be done by the framework, either the DDL or the Catalog. [~dwysakowicz] Personally i agree the schema-string should not be user provided option, but we actually can not recover a schema string from a TableSchema, any good ideas ? > Support read/write confluent schema registry avro data from Kafka > -- > > Key: FLINK-16048 > URL: https://issues.apache.org/jira/browse/FLINK-16048 > Project: Flink > Issue Type: Improvement > Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile), Table > SQL / Ecosystem >Affects Versions: 1.11.0 >Reporter: Leonard Xu >Assignee: Danny Chen >Priority: Major > Labels: pull-request-available, usability > Fix For: 1.12.0 > > > *The background* > I found SQL Kafka connector can not consume avro data that was serialized by > `KafkaAvroSerializer` and only can consume Row data with avro schema because > we use `AvroRowDeserializationSchema/AvroRowSerializationSchema` to se/de > data in `AvroRowFormatFactory`. > I think we should support this because `KafkaAvroSerializer` is very common > in Kafka. > and someone met same question in stackoverflow[1]. > [[1]https://stackoverflow.com/questions/56452571/caused-by-org-apache-avro-avroruntimeexception-malformed-data-length-is-negat/56478259|https://stackoverflow.com/questions/56452571/caused-by-org-apache-avro-avroruntimeexception-malformed-data-length-is-negat/56478259] > *The format details* > _The factory identifier (or format id)_ > There are 2 candidates now ~ > - {{avro-sr}}: the pattern borrowed from KSQL {{JSON_SR}} format [1] > - {{avro-confluent}}: the pattern borrowed from Clickhouse {{AvroConfluent}} > [2] > Personally i would prefer {{avro-sr}} because it is more concise and the > confluent is a company name which i think is not that suitable for a format > name. > _The format attributes_ > || Options || required || Remark || > | schema-string | true | avro schema string used for (de)serialization | > | schema-registry.url | true | URL to connect to schema registry service | > | schema-registry.subject | false | Subject name to write to the Schema > Registry service, required for sink | -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-16048) Support read/write confluent schema registry avro data from Kafka
[ https://issues.apache.org/jira/browse/FLINK-16048?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17160999#comment-17160999 ] Danny Chen commented on FLINK-16048: [~ykt836] This PR contributes 2 formats of schema registry avro, one way to remove the schema string is to pass in a schema id or the subject name, do the inference when the format was instantiated. We can fetch the avro schema string of the compile time but that was in the scope of schema inference, i think it is not good practice to let the format do a schema inference, the format se/de schema should be solid. The inference work should be done by the framework, either the DDL or the Catalog. [~dwysakowicz] Personally i agree the schema-string should not be user provided option, but we actually can not recover a schema string from a TableSchema, any good ideas ? > Support read/write confluent schema registry avro data from Kafka > -- > > Key: FLINK-16048 > URL: https://issues.apache.org/jira/browse/FLINK-16048 > Project: Flink > Issue Type: Improvement > Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile), Table > SQL / Ecosystem >Affects Versions: 1.11.0 >Reporter: Leonard Xu >Assignee: Danny Chen >Priority: Major > Labels: pull-request-available, usability > Fix For: 1.12.0 > > > *The background* > I found SQL Kafka connector can not consume avro data that was serialized by > `KafkaAvroSerializer` and only can consume Row data with avro schema because > we use `AvroRowDeserializationSchema/AvroRowSerializationSchema` to se/de > data in `AvroRowFormatFactory`. > I think we should support this because `KafkaAvroSerializer` is very common > in Kafka. > and someone met same question in stackoverflow[1]. > [[1]https://stackoverflow.com/questions/56452571/caused-by-org-apache-avro-avroruntimeexception-malformed-data-length-is-negat/56478259|https://stackoverflow.com/questions/56452571/caused-by-org-apache-avro-avroruntimeexception-malformed-data-length-is-negat/56478259] > *The format details* > _The factory identifier (or format id)_ > There are 2 candidates now ~ > - {{avro-sr}}: the pattern borrowed from KSQL {{JSON_SR}} format [1] > - {{avro-confluent}}: the pattern borrowed from Clickhouse {{AvroConfluent}} > [2] > Personally i would prefer {{avro-sr}} because it is more concise and the > confluent is a company name which i think is not that suitable for a format > name. > _The format attributes_ > || Options || required || Remark || > | schema-string | true | avro schema string used for (de)serialization | > | schema-registry.url | true | URL to connect to schema registry service | > | schema-registry.subject | false | Subject name to write to the Schema > Registry service, required for sink | -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-16048) Support read/write confluent schema registry avro data from Kafka
[ https://issues.apache.org/jira/browse/FLINK-16048?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Danny Chen updated FLINK-16048: --- Description: *The background* I found SQL Kafka connector can not consume avro data that was serialized by `KafkaAvroSerializer` and only can consume Row data with avro schema because we use `AvroRowDeserializationSchema/AvroRowSerializationSchema` to se/de data in `AvroRowFormatFactory`. I think we should support this because `KafkaAvroSerializer` is very common in Kafka. and someone met same question in stackoverflow[1]. [[1]https://stackoverflow.com/questions/56452571/caused-by-org-apache-avro-avroruntimeexception-malformed-data-length-is-negat/56478259|https://stackoverflow.com/questions/56452571/caused-by-org-apache-avro-avroruntimeexception-malformed-data-length-is-negat/56478259] *The format details* _The factory identifier (or format id)_ There are 2 candidates now ~ - {{avro-sr}}: the pattern borrowed from KSQL {{JSON_SR}} format [1] - {{avro-confluent}}: the pattern borrowed from Clickhouse {{AvroConfluent}} [2] Personally i would prefer {{avro-sr}} because it is more concise and the confluent is a company name which i think is not that suitable for a format name. _The format attributes_ || Options || required || Remark || | schema-string | true | avro schema string used for (de)serialization | | schema-registry.url | true | URL to connect to schema registry service | | schema-registry.subject | false | Subject name to write to the Schema Registry service, required for sink | was: *The background* I found SQL Kafka connector can not consume avro data that was serialized by `KafkaAvroSerializer` and only can consume Row data with avro schema because we use `AvroRowDeserializationSchema/AvroRowSerializationSchema` to se/de data in `AvroRowFormatFactory`. I think we should support this because `KafkaAvroSerializer` is very common in Kafka. and someone met same question in stackoverflow[1]. [[1]https://stackoverflow.com/questions/56452571/caused-by-org-apache-avro-avroruntimeexception-malformed-data-length-is-negat/56478259|https://stackoverflow.com/questions/56452571/caused-by-org-apache-avro-avroruntimeexception-malformed-data-length-is-negat/56478259] *The format details* _The factory identifier_ There are 2 candidates now ~ - {{avro-sr}}: the pattern borrowed from KSQL {{JSON_SR}} format [1] - {{avro-confluent}}: the pattern borrowed from Clickhouse {{AvroConfluent}} [2] Personally i would prefer {{avro-sr}} because it is more concise and the confluent is a company name which i think is not that suitable for a format name. _The format attributes_ || Options || required || Remark || | schema-string | true | avro schema string used for (de)serialization | | schema-registry.url | true | URL to connect to schema registry service | | schema-registry.subject | false | Subject name to write to the Schema Registry service, required for sink | > Support read/write confluent schema registry avro data from Kafka > -- > > Key: FLINK-16048 > URL: https://issues.apache.org/jira/browse/FLINK-16048 > Project: Flink > Issue Type: Improvement > Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile), Table > SQL / Ecosystem >Affects Versions: 1.11.0 >Reporter: Leonard Xu >Assignee: Danny Chen >Priority: Major > Labels: usability > Fix For: 1.12.0 > > > *The background* > I found SQL Kafka connector can not consume avro data that was serialized by > `KafkaAvroSerializer` and only can consume Row data with avro schema because > we use `AvroRowDeserializationSchema/AvroRowSerializationSchema` to se/de > data in `AvroRowFormatFactory`. > I think we should support this because `KafkaAvroSerializer` is very common > in Kafka. > and someone met same question in stackoverflow[1]. > [[1]https://stackoverflow.com/questions/56452571/caused-by-org-apache-avro-avroruntimeexception-malformed-data-length-is-negat/56478259|https://stackoverflow.com/questions/56452571/caused-by-org-apache-avro-avroruntimeexception-malformed-data-length-is-negat/56478259] > *The format details* > _The factory identifier (or format id)_ > There are 2 candidates now ~ > - {{avro-sr}}: the pattern borrowed from KSQL {{JSON_SR}} format [1] > - {{avro-confluent}}: the pattern borrowed from Clickhouse {{AvroConfluent}} > [2] > Personally i would prefer {{avro-sr}} because it is more concise and the > confluent is a company name which i think is not that suitable for a format > name. > _The format attributes_ > || Options || required || Remark || > | schema-string | true | avro schema string used for (de)serialization | > | schema-registry.url | true | URL to connect to schema registry service | > |
[jira] [Commented] (FLINK-16048) Support read/write confluent schema registry avro data from Kafka
[ https://issues.apache.org/jira/browse/FLINK-16048?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17159702#comment-17159702 ] Danny Chen commented on FLINK-16048: I'm hoping to have a consensus about the format id and it's attributes before firing the PR, thanks in advance for your suggestions ~ > Support read/write confluent schema registry avro data from Kafka > -- > > Key: FLINK-16048 > URL: https://issues.apache.org/jira/browse/FLINK-16048 > Project: Flink > Issue Type: Improvement > Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile), Table > SQL / Ecosystem >Affects Versions: 1.11.0 >Reporter: Leonard Xu >Assignee: Danny Chen >Priority: Major > Labels: usability > Fix For: 1.12.0 > > > *The background* > I found SQL Kafka connector can not consume avro data that was serialized by > `KafkaAvroSerializer` and only can consume Row data with avro schema because > we use `AvroRowDeserializationSchema/AvroRowSerializationSchema` to se/de > data in `AvroRowFormatFactory`. > I think we should support this because `KafkaAvroSerializer` is very common > in Kafka. > and someone met same question in stackoverflow[1]. > [[1]https://stackoverflow.com/questions/56452571/caused-by-org-apache-avro-avroruntimeexception-malformed-data-length-is-negat/56478259|https://stackoverflow.com/questions/56452571/caused-by-org-apache-avro-avroruntimeexception-malformed-data-length-is-negat/56478259] > *The format details* > _The factory identifier_ > There are 2 candidates now ~ > - {{avro-sr}}: the pattern borrowed from KSQL {{JSON_SR}} format [1] > - {{avro-confluent}}: the pattern borrowed from Clickhouse {{AvroConfluent}} > [2] > Personally i would prefer {{avro-sr}} because it is more concise and the > confluent is a company name which i think is not that suitable for a format > name. > _The format attributes_ > || Options || required || Remark || > | schema-string | true | avro schema string used for (de)serialization | > | schema-registry.url | true | URL to connect to schema registry service | > | schema-registry.subject | false | Subject name to write to the Schema > Registry service, required for sink | -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-16048) Support read/write confluent schema registry avro data from Kafka
[ https://issues.apache.org/jira/browse/FLINK-16048?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Danny Chen updated FLINK-16048: --- Description: *The background* I found SQL Kafka connector can not consume avro data that was serialized by `KafkaAvroSerializer` and only can consume Row data with avro schema because we use `AvroRowDeserializationSchema/AvroRowSerializationSchema` to se/de data in `AvroRowFormatFactory`. I think we should support this because `KafkaAvroSerializer` is very common in Kafka. and someone met same question in stackoverflow[1]. [[1]https://stackoverflow.com/questions/56452571/caused-by-org-apache-avro-avroruntimeexception-malformed-data-length-is-negat/56478259|https://stackoverflow.com/questions/56452571/caused-by-org-apache-avro-avroruntimeexception-malformed-data-length-is-negat/56478259] *The format details* The factory identifier There are 2 candidates now ~ - {{avro-sr}}: the pattern borrowed from KSQL {{JSON_SR}} format [1] - {{avro-confluent}}: the pattern borrowed from Clickhouse {{AvroConfluent}} [2] Personally i would prefer {{avro-sr}} because it is more concise and the confluent is a company name which i think is not that suitable for a format name. The format attributes || Options || required || Remark || | schema-string | true | avro schema string used for (de)serialization | | schema-registry.url | true | URL to connect to schema registry service | | schema-registry.subject | false | Subject name to write to the Schema Registry service, required for sink | was: I found SQL Kafka connector can not consume avro data that was serialized by `KafkaAvroSerializer` and only can consume Row data with avro schema because we use `AvroRowDeserializationSchema/AvroRowSerializationSchema` to se/de data in `AvroRowFormatFactory`. I think we should support this because `KafkaAvroSerializer` is very common in Kafka. and someone met same question in stackoverflow[1]. [[1]https://stackoverflow.com/questions/56452571/caused-by-org-apache-avro-avroruntimeexception-malformed-data-length-is-negat/56478259|https://stackoverflow.com/questions/56452571/caused-by-org-apache-avro-avroruntimeexception-malformed-data-length-is-negat/56478259] > Support read/write confluent schema registry avro data from Kafka > -- > > Key: FLINK-16048 > URL: https://issues.apache.org/jira/browse/FLINK-16048 > Project: Flink > Issue Type: Improvement > Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile), Table > SQL / Ecosystem >Affects Versions: 1.11.0 >Reporter: Leonard Xu >Assignee: Danny Chen >Priority: Major > Labels: usability > Fix For: 1.12.0 > > > *The background* > I found SQL Kafka connector can not consume avro data that was serialized by > `KafkaAvroSerializer` and only can consume Row data with avro schema because > we use `AvroRowDeserializationSchema/AvroRowSerializationSchema` to se/de > data in `AvroRowFormatFactory`. > I think we should support this because `KafkaAvroSerializer` is very common > in Kafka. > and someone met same question in stackoverflow[1]. > [[1]https://stackoverflow.com/questions/56452571/caused-by-org-apache-avro-avroruntimeexception-malformed-data-length-is-negat/56478259|https://stackoverflow.com/questions/56452571/caused-by-org-apache-avro-avroruntimeexception-malformed-data-length-is-negat/56478259] > *The format details* > The factory identifier > There are 2 candidates now ~ > - {{avro-sr}}: the pattern borrowed from KSQL {{JSON_SR}} format [1] > - {{avro-confluent}}: the pattern borrowed from Clickhouse {{AvroConfluent}} > [2] > Personally i would prefer {{avro-sr}} because it is more concise and the > confluent is a company name which i think is not that suitable for a format > name. > The format attributes > || Options || required || Remark || > | schema-string | true | avro schema string used for (de)serialization | > | schema-registry.url | true | URL to connect to schema registry service | > | schema-registry.subject | false | Subject name to write to the Schema > Registry service, required for sink | -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-16048) Support read/write confluent schema registry avro data from Kafka
[ https://issues.apache.org/jira/browse/FLINK-16048?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Danny Chen updated FLINK-16048: --- Description: *The background* I found SQL Kafka connector can not consume avro data that was serialized by `KafkaAvroSerializer` and only can consume Row data with avro schema because we use `AvroRowDeserializationSchema/AvroRowSerializationSchema` to se/de data in `AvroRowFormatFactory`. I think we should support this because `KafkaAvroSerializer` is very common in Kafka. and someone met same question in stackoverflow[1]. [[1]https://stackoverflow.com/questions/56452571/caused-by-org-apache-avro-avroruntimeexception-malformed-data-length-is-negat/56478259|https://stackoverflow.com/questions/56452571/caused-by-org-apache-avro-avroruntimeexception-malformed-data-length-is-negat/56478259] *The format details* _The factory identifier_ There are 2 candidates now ~ - {{avro-sr}}: the pattern borrowed from KSQL {{JSON_SR}} format [1] - {{avro-confluent}}: the pattern borrowed from Clickhouse {{AvroConfluent}} [2] Personally i would prefer {{avro-sr}} because it is more concise and the confluent is a company name which i think is not that suitable for a format name. _The format attributes_ || Options || required || Remark || | schema-string | true | avro schema string used for (de)serialization | | schema-registry.url | true | URL to connect to schema registry service | | schema-registry.subject | false | Subject name to write to the Schema Registry service, required for sink | was: *The background* I found SQL Kafka connector can not consume avro data that was serialized by `KafkaAvroSerializer` and only can consume Row data with avro schema because we use `AvroRowDeserializationSchema/AvroRowSerializationSchema` to se/de data in `AvroRowFormatFactory`. I think we should support this because `KafkaAvroSerializer` is very common in Kafka. and someone met same question in stackoverflow[1]. [[1]https://stackoverflow.com/questions/56452571/caused-by-org-apache-avro-avroruntimeexception-malformed-data-length-is-negat/56478259|https://stackoverflow.com/questions/56452571/caused-by-org-apache-avro-avroruntimeexception-malformed-data-length-is-negat/56478259] *The format details* The factory identifier There are 2 candidates now ~ - {{avro-sr}}: the pattern borrowed from KSQL {{JSON_SR}} format [1] - {{avro-confluent}}: the pattern borrowed from Clickhouse {{AvroConfluent}} [2] Personally i would prefer {{avro-sr}} because it is more concise and the confluent is a company name which i think is not that suitable for a format name. The format attributes || Options || required || Remark || | schema-string | true | avro schema string used for (de)serialization | | schema-registry.url | true | URL to connect to schema registry service | | schema-registry.subject | false | Subject name to write to the Schema Registry service, required for sink | > Support read/write confluent schema registry avro data from Kafka > -- > > Key: FLINK-16048 > URL: https://issues.apache.org/jira/browse/FLINK-16048 > Project: Flink > Issue Type: Improvement > Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile), Table > SQL / Ecosystem >Affects Versions: 1.11.0 >Reporter: Leonard Xu >Assignee: Danny Chen >Priority: Major > Labels: usability > Fix For: 1.12.0 > > > *The background* > I found SQL Kafka connector can not consume avro data that was serialized by > `KafkaAvroSerializer` and only can consume Row data with avro schema because > we use `AvroRowDeserializationSchema/AvroRowSerializationSchema` to se/de > data in `AvroRowFormatFactory`. > I think we should support this because `KafkaAvroSerializer` is very common > in Kafka. > and someone met same question in stackoverflow[1]. > [[1]https://stackoverflow.com/questions/56452571/caused-by-org-apache-avro-avroruntimeexception-malformed-data-length-is-negat/56478259|https://stackoverflow.com/questions/56452571/caused-by-org-apache-avro-avroruntimeexception-malformed-data-length-is-negat/56478259] > *The format details* > _The factory identifier_ > There are 2 candidates now ~ > - {{avro-sr}}: the pattern borrowed from KSQL {{JSON_SR}} format [1] > - {{avro-confluent}}: the pattern borrowed from Clickhouse {{AvroConfluent}} > [2] > Personally i would prefer {{avro-sr}} because it is more concise and the > confluent is a company name which i think is not that suitable for a format > name. > _The format attributes_ > || Options || required || Remark || > | schema-string | true | avro schema string used for (de)serialization | > | schema-registry.url | true | URL to connect to schema registry service | > | schema-registry.subject | false | Subject name
[jira] [Commented] (FLINK-16048) Support read/write confluent schema registry avro data from Kafka
[ https://issues.apache.org/jira/browse/FLINK-16048?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17159687#comment-17159687 ] Danny Chen commented on FLINK-16048: [~anshul.bansal] [~lingyaKK] I'm working on this now, as part of FLIP-125. We might need to have a discussion about the format attributes in this issue, but i believe it would be very soon to finish. [1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-125%3A+Confluent+Schema+Registry+Catalog > Support read/write confluent schema registry avro data from Kafka > -- > > Key: FLINK-16048 > URL: https://issues.apache.org/jira/browse/FLINK-16048 > Project: Flink > Issue Type: Improvement > Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile), Table > SQL / Ecosystem >Affects Versions: 1.11.0 >Reporter: Leonard Xu >Priority: Major > Labels: usability > Fix For: 1.12.0 > > > I found SQL Kafka connector can not consume avro data that was serialized by > `KafkaAvroSerializer` and only can consume Row data with avro schema because > we use `AvroRowDeserializationSchema/AvroRowSerializationSchema` to se/de > data in `AvroRowFormatFactory`. > I think we should support this because `KafkaAvroSerializer` is very common > in Kafka. > and someone met same question in stackoverflow[1]. > [[1]https://stackoverflow.com/questions/56452571/caused-by-org-apache-avro-avroruntimeexception-malformed-data-length-is-negat/56478259|https://stackoverflow.com/questions/56452571/caused-by-org-apache-avro-avroruntimeexception-malformed-data-length-is-negat/56478259] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-18440) ROW_NUMBER function: ROW/RANGE not allowed with RANK, DENSE_RANK or ROW_NUMBER functions
[ https://issues.apache.org/jira/browse/FLINK-18440?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17155338#comment-17155338 ] Danny Chen commented on FLINK-18440: I have fired a fix in https://github.com/apache/flink/pull/12868. > ROW_NUMBER function: ROW/RANGE not allowed with RANK, DENSE_RANK or > ROW_NUMBER functions > > > Key: FLINK-18440 > URL: https://issues.apache.org/jira/browse/FLINK-18440 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 1.11.0 >Reporter: LakeShen >Priority: Major > Labels: pull-request-available > Fix For: 1.11.1 > > Attachments: image-2020-06-28-18-55-43-692.png > > > When I run flink sql ,the flink sql like this: > create view test as select name, eat ,sum(age) as cnt from test_source group > by name,eat; > create view results as select *, ROW_NUMBER() OVER (PARTITION BY name > ORDER BY cnt DESC) as row_num from test; > create table sink ( > name varchar, > eat varchar, > cnt bigint > ) > with( > 'connector' = 'print' > ); > insert into sink select name,eat , cnt from results where row_num <= 3 ; > The same sql code I could run success in flink 1.10, now I change the flink > version into flink 1.11, it throw the exception. > Exception in thread "main" org.apache.flink.table.api.ValidationException: > SQL validation failed. From line 1, column 124 to line 1, column 127: > ROW/RANGE not allowed with RANK, DENSE_RANK or ROW_NUMBER functions > at > org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:146) > at > org.apache.flink.table.planner.calcite.FlinkPlannerImpl$ToRelContextImpl.expandView(FlinkPlannerImpl.scala:204) > at > org.apache.calcite.plan.ViewExpanders$1.expandView(ViewExpanders.java:52) > at > org.apache.flink.table.planner.catalog.SqlCatalogViewTable.convertToRel(SqlCatalogViewTable.java:58) > at > org.apache.flink.table.planner.plan.schema.ExpandingPreparingTable.expand(ExpandingPreparingTable.java:59) > at > org.apache.flink.table.planner.plan.schema.ExpandingPreparingTable.toRel(ExpandingPreparingTable.java:55) > at > org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3492) > at > org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2415) > at > org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2102) > at > org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051) > at > org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:661) > at > org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:642) > at > org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3345) > at > org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:568) > at > org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:164) > at > org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:151) > at > org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:773) > at > org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:745) > at > org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:238) > at > org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:527) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-18440) ROW_NUMBER function: ROW/RANGE not allowed with RANK, DENSE_RANK or ROW_NUMBER functions
[ https://issues.apache.org/jira/browse/FLINK-18440?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17155333#comment-17155333 ] Danny Chen commented on FLINK-18440: After some code research, i found that the SqlWindow bounds state was mutated during sql-to-rel conversion in Calcite release-1.22, this has been fixed in https://issues.apache.org/jira/browse/CALCITE-3877. See this logic in SqlValidatorImpl#resolveWindow of 1.22 Calcite: {code:java} if (populateBounds) { window.populateBounds(); } {code} The flag from SqlToRelConverter is always true. I can figure out 2 ways to fix this problem: 1. We can resolve this problem when upgrade to Calcite 1.23 or 1.24. 2. In SqlToOperationConverter#convertCreateView, keep the validated sql string before it is converted. I would fire a fix for 2 soon ~ > ROW_NUMBER function: ROW/RANGE not allowed with RANK, DENSE_RANK or > ROW_NUMBER functions > > > Key: FLINK-18440 > URL: https://issues.apache.org/jira/browse/FLINK-18440 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 1.11.0 >Reporter: LakeShen >Priority: Major > Labels: pull-request-available > Fix For: 1.11.1 > > Attachments: image-2020-06-28-18-55-43-692.png > > > When I run flink sql ,the flink sql like this: > create view test as select name, eat ,sum(age) as cnt from test_source group > by name,eat; > create view results as select *, ROW_NUMBER() OVER (PARTITION BY name > ORDER BY cnt DESC) as row_num from test; > create table sink ( > name varchar, > eat varchar, > cnt bigint > ) > with( > 'connector' = 'print' > ); > insert into sink select name,eat , cnt from results where row_num <= 3 ; > The same sql code I could run success in flink 1.10, now I change the flink > version into flink 1.11, it throw the exception. > Exception in thread "main" org.apache.flink.table.api.ValidationException: > SQL validation failed. From line 1, column 124 to line 1, column 127: > ROW/RANGE not allowed with RANK, DENSE_RANK or ROW_NUMBER functions > at > org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:146) > at > org.apache.flink.table.planner.calcite.FlinkPlannerImpl$ToRelContextImpl.expandView(FlinkPlannerImpl.scala:204) > at > org.apache.calcite.plan.ViewExpanders$1.expandView(ViewExpanders.java:52) > at > org.apache.flink.table.planner.catalog.SqlCatalogViewTable.convertToRel(SqlCatalogViewTable.java:58) > at > org.apache.flink.table.planner.plan.schema.ExpandingPreparingTable.expand(ExpandingPreparingTable.java:59) > at > org.apache.flink.table.planner.plan.schema.ExpandingPreparingTable.toRel(ExpandingPreparingTable.java:55) > at > org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3492) > at > org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2415) > at > org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2102) > at > org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051) > at > org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:661) > at > org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:642) > at > org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3345) > at > org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:568) > at > org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:164) > at > org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:151) > at > org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:773) > at > org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:745) > at > org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:238) > at > org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:527) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-18500) Make the legacy planner exception more clear when resolving computed columns types for schema
Danny Chen created FLINK-18500: -- Summary: Make the legacy planner exception more clear when resolving computed columns types for schema Key: FLINK-18500 URL: https://issues.apache.org/jira/browse/FLINK-18500 Project: Flink Issue Type: Improvement Components: Table SQL / API Affects Versions: 1.11.0 Reporter: Danny Chen >From the user mail: Hi, all: i use zeppelin execute sql, FLink version is Flink 1.11 snapshot ,build from branch release-1.11 ,commit is 334f35cbd6da754d8b5b294032cd84c858b1f973 when the table type is datagen, Flink will thrown exception ,but the exception message is null ; My DDL is : {code:sql} CREATE TABLE datagen_dijie2 ( f_sequence INT, f_random INT, f_random_str STRING, ts AS localtimestamp, WATERMARK FOR ts AS ts ) WITH ( 'connector' = 'datagen', 'rows-per-second'='5', 'fields.f_sequence.kind'='sequence', 'fields.f_sequence.start'='1', 'fields.f_sequence.end'='1000', 'fields.f_random.min'='1', 'fields.f_random.max'='1000', 'fields.f_random_str.length'='10' ); {code} My query sql is : {code:sql} select * from datagen_dijie2; {code} the exception is : {noformat} org.apache.flink.table.api.ValidationException: SQL validation failed. null at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:146) at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:108) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:187) at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78) at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:658) at org.apache.zeppelin.flink.sql.AbstractStreamSqlJob.run(AbstractStreamSqlJob.java:102) at org.apache.zeppelin.flink.FlinkStreamSqlInterpreter.callInnerSelect(FlinkStreamSqlInterpreter.java:89) at org.apache.zeppelin.flink.FlinkSqlInterrpeter.callSelect(FlinkSqlInterrpeter.java:526) at org.apache.zeppelin.flink.FlinkSqlInterrpeter.callCommand(FlinkSqlInterrpeter.java:297) at org.apache.zeppelin.flink.FlinkSqlInterrpeter.runSqlList(FlinkSqlInterrpeter.java:191) at org.apache.zeppelin.flink.FlinkSqlInterrpeter.interpret(FlinkSqlInterrpeter.java:156) at org.apache.zeppelin.interpreter.LazyOpenInterpreter.interpret(LazyOpenInterpreter.java:110) at org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:776) at org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:668) at org.apache.zeppelin.scheduler.Job.run(Job.java:172) at org.apache.zeppelin.scheduler.AbstractScheduler.runJob(AbstractScheduler.java:130) at org.apache.zeppelin.scheduler.ParallelScheduler.lambda$runJobInScheduler$0(ParallelScheduler.java:39) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.UnsupportedOperationException at org.apache.flink.table.planner.ParserImpl.parseSqlExpression(ParserImpl.java:86) at org.apache.flink.table.api.internal.CatalogTableSchemaResolver.resolveExpressionDataType(CatalogTableSchemaResolver.java:119) at org.apache.flink.table.api.internal.CatalogTableSchemaResolver.resolve(CatalogTableSchemaResolver.java:83) at org.apache.flink.table.catalog.CatalogManager.resolveTableSchema(CatalogManager.java:380) at {noformat} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-18487) New table source factory omits unrecognized properties silently
[ https://issues.apache.org/jira/browse/FLINK-18487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17151719#comment-17151719 ] Danny Chen commented on FLINK-18487: +1 to have the validation logic in the factory. The FactoryUtil can support validation patterns that are common enough. > New table source factory omits unrecognized properties silently > --- > > Key: FLINK-18487 > URL: https://issues.apache.org/jira/browse/FLINK-18487 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Affects Versions: 1.11.0 >Reporter: Benchao Li >Priority: Major > > For the following DDL, we just omits the unrecognized property > 'records-per-second'. > {code:sql} > CREATE TABLE MyDataGen ( > int_field int, > double_field double, > string_field varchar > ) WITH ( > 'connector' = 'datagen', > 'records-per-second' = '1' -- should be rows-per-second > ) > {code} > IMO, we should throw Exceptions to tell users that they used a wrong > property. > CC [~jark] [~twalthr] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-18315) Insert into partitioned table can fail with values
Danny Chen created FLINK-18315: -- Summary: Insert into partitioned table can fail with values Key: FLINK-18315 URL: https://issues.apache.org/jira/browse/FLINK-18315 Project: Flink Issue Type: Task Components: Table SQL / API Affects Versions: 1.10.0 Reporter: Danny Chen Fix For: 1.11.0 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-18009) Select an invalid column crashes the SQL CLI
[ https://issues.apache.org/jira/browse/FLINK-18009?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17126478#comment-17126478 ] Danny Chen commented on FLINK-18009: [~godfreyhe] Please check if this issue is fixed after FLINK-17893 is fixed. > Select an invalid column crashes the SQL CLI > > > Key: FLINK-18009 > URL: https://issues.apache.org/jira/browse/FLINK-18009 > Project: Flink > Issue Type: Bug > Components: Table SQL / Client >Reporter: Rui Li >Assignee: Nicholas Jiang >Priority: Blocker > Fix For: 1.11.0 > > > To reproduce: just select a non-existing column from table in SQL CLI -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-16497) Improve default flush strategy for JDBC sink to make it work out-of-box
[ https://issues.apache.org/jira/browse/FLINK-16497?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17126373#comment-17126373 ] Danny Chen commented on FLINK-16497: I think as a popular streaming engine, ensure good throughput and performance should be in the first class. Most of the client tools have a default flush strategy(either buffer size or interval)[1][2]. We should also follow that. I would suggest a default flush size (100) and flush interval(1s), it performs well for production and in local test 1s is also an acceptable latency. [1] https://kafka.apache.org/22/documentation.html#producerconfigs [2] https://github.com/searchbox-io/Jest > Improve default flush strategy for JDBC sink to make it work out-of-box > --- > > Key: FLINK-16497 > URL: https://issues.apache.org/jira/browse/FLINK-16497 > Project: Flink > Issue Type: Improvement > Components: Connectors / JDBC, Table SQL / Ecosystem >Reporter: Jark Wu >Priority: Critical > Fix For: 1.11.0 > > > Currently, JDBC sink provides 2 flush options: > {code} > 'connector.write.flush.max-rows' = '5000', -- default is 5000 > 'connector.write.flush.interval' = '2s', -- no default value > {code} > That means if flush interval is not set, the buffered output rows may not be > flushed to database for a long time. That is a surprising behavior because no > results are outputed by default. > So I propose to have a default flush '1s' interval for JDBC sink or default 1 > row for flush size. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Closed] (FLINK-15126) migrate "show functions" from sql cli to sql parser
[ https://issues.apache.org/jira/browse/FLINK-15126?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Danny Chen closed FLINK-15126. -- Resolution: Fixed > migrate "show functions" from sql cli to sql parser > --- > > Key: FLINK-15126 > URL: https://issues.apache.org/jira/browse/FLINK-15126 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / Client, Table SQL / Planner >Reporter: Bowen Li >Assignee: Zhenqiu Huang >Priority: Major > Fix For: 1.11.0 > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-13437) Add Hive SQL E2E test
[ https://issues.apache.org/jira/browse/FLINK-13437?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Danny Chen updated FLINK-13437: --- Fix Version/s: (was: 1.10.2) (was: 1.11.0) 1.12.0 > Add Hive SQL E2E test > - > > Key: FLINK-13437 > URL: https://issues.apache.org/jira/browse/FLINK-13437 > Project: Flink > Issue Type: Test > Components: Connectors / Hive, Tests >Affects Versions: 1.9.0 >Reporter: Till Rohrmann >Assignee: Terry Wang >Priority: Major > Labels: pull-request-available > Fix For: 1.12.0 > > Time Spent: 10m > Remaining Estimate: 0h > > We should add an E2E test for the Hive integration: List all tables and read > some metadata, read from an existing table, register a new table in Hive, use > a registered function, write to an existing table, write to a new table. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-17686) Add document to dataGen, print, blackhole connectors
[ https://issues.apache.org/jira/browse/FLINK-17686?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Danny Chen updated FLINK-17686: --- Priority: Critical (was: Major) > Add document to dataGen, print, blackhole connectors > > > Key: FLINK-17686 > URL: https://issues.apache.org/jira/browse/FLINK-17686 > Project: Flink > Issue Type: Sub-task > Components: Documentation >Affects Versions: 1.11.0 >Reporter: Jingsong Lee >Assignee: Shengkai Fang >Priority: Critical > Fix For: 1.11.0 > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-17831) Add documentation for the new Kafka connector
[ https://issues.apache.org/jira/browse/FLINK-17831?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Danny Chen updated FLINK-17831: --- Priority: Critical (was: Major) > Add documentation for the new Kafka connector > - > > Key: FLINK-17831 > URL: https://issues.apache.org/jira/browse/FLINK-17831 > Project: Flink > Issue Type: Sub-task > Components: Documentation >Reporter: Jark Wu >Assignee: Danny Chen >Priority: Critical > Fix For: 1.11.0 > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-17776) Add documentation for DDL in hive dialect
[ https://issues.apache.org/jira/browse/FLINK-17776?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Danny Chen updated FLINK-17776: --- Priority: Critical (was: Major) > Add documentation for DDL in hive dialect > - > > Key: FLINK-17776 > URL: https://issues.apache.org/jira/browse/FLINK-17776 > Project: Flink > Issue Type: Sub-task > Components: Documentation >Reporter: Jingsong Lee >Assignee: Rui Li >Priority: Critical > Labels: pull-request-available > Fix For: 1.11.0 > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-16975) Add docs for FileSystem connector
[ https://issues.apache.org/jira/browse/FLINK-16975?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Danny Chen updated FLINK-16975: --- Priority: Critical (was: Major) > Add docs for FileSystem connector > - > > Key: FLINK-16975 > URL: https://issues.apache.org/jira/browse/FLINK-16975 > Project: Flink > Issue Type: Sub-task > Components: Documentation >Affects Versions: 1.11.0 >Reporter: Leonard Xu >Assignee: Jingsong Lee >Priority: Critical > Labels: pull-request-available > Fix For: 1.11.0 > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-17599) Update documents due to FLIP-84
[ https://issues.apache.org/jira/browse/FLINK-17599?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Danny Chen updated FLINK-17599: --- Priority: Critical (was: Major) > Update documents due to FLIP-84 > --- > > Key: FLINK-17599 > URL: https://issues.apache.org/jira/browse/FLINK-17599 > Project: Flink > Issue Type: Sub-task > Components: Documentation >Reporter: Kurt Young >Assignee: godfrey he >Priority: Critical > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-17832) Add documentation for the new Elasticsearch connector
[ https://issues.apache.org/jira/browse/FLINK-17832?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Danny Chen updated FLINK-17832: --- Priority: Critical (was: Major) > Add documentation for the new Elasticsearch connector > - > > Key: FLINK-17832 > URL: https://issues.apache.org/jira/browse/FLINK-17832 > Project: Flink > Issue Type: Sub-task > Components: Documentation >Reporter: Jark Wu >Assignee: Shengkai Fang >Priority: Critical > Fix For: 1.11.0 > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-18065) Add documentation for new scalar/table functions
[ https://issues.apache.org/jira/browse/FLINK-18065?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Danny Chen updated FLINK-18065: --- Priority: Critical (was: Major) > Add documentation for new scalar/table functions > > > Key: FLINK-18065 > URL: https://issues.apache.org/jira/browse/FLINK-18065 > Project: Flink > Issue Type: Sub-task > Components: Documentation >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Critical > > Write documentation for scalar/table functions of FLIP-65. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-17829) Add documentation for the new JDBC connector
[ https://issues.apache.org/jira/browse/FLINK-17829?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Danny Chen updated FLINK-17829: --- Priority: Critical (was: Major) > Add documentation for the new JDBC connector > > > Key: FLINK-17829 > URL: https://issues.apache.org/jira/browse/FLINK-17829 > Project: Flink > Issue Type: Sub-task > Components: Documentation >Reporter: Jark Wu >Assignee: Leonard Xu >Priority: Critical > Fix For: 1.11.0 > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-18066) Add documentation for how to develop a new table source/sink
[ https://issues.apache.org/jira/browse/FLINK-18066?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Danny Chen updated FLINK-18066: --- Priority: Critical (was: Major) > Add documentation for how to develop a new table source/sink > > > Key: FLINK-18066 > URL: https://issues.apache.org/jira/browse/FLINK-18066 > Project: Flink > Issue Type: Sub-task > Components: Documentation >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Critical > > Covers how to write a custom source/sink and format using FLIP-95 interfaces. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-17406) Add documentation about dynamic table options
[ https://issues.apache.org/jira/browse/FLINK-17406?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Danny Chen updated FLINK-17406: --- Priority: Critical (was: Major) > Add documentation about dynamic table options > - > > Key: FLINK-17406 > URL: https://issues.apache.org/jira/browse/FLINK-17406 > Project: Flink > Issue Type: Sub-task > Components: Documentation >Reporter: Kurt Young >Assignee: Danny Chen >Priority: Critical > Labels: pull-request-available > Fix For: 1.11.0 > > -- This message was sent by Atlassian Jira (v8.3.4#803005)