[jira] [Commented] (FLINK-22082) Nested projection push down doesn't work for data such as row(array(row))
[ https://issues.apache.org/jira/browse/FLINK-22082?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17319842#comment-17319842 ] godfrey he commented on FLINK-22082: Fixed in 1.13.0: 2bbb1ba4072b9f03f9d3f9a17e004b5fe1eb4aa9 > Nested projection push down doesn't work for data such as row(array(row)) > - > > Key: FLINK-22082 > URL: https://issues.apache.org/jira/browse/FLINK-22082 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.12.0, 1.13.0 >Reporter: Dian Fu >Assignee: Shengkai Fang >Priority: Critical > Labels: pull-request-available > Fix For: 1.13.0, 1.12.3 > > > For the following job: > {code} > from pyflink.datastream import StreamExecutionEnvironment > from pyflink.table import TableConfig, StreamTableEnvironment > config = TableConfig() > env = StreamExecutionEnvironment.get_execution_environment() > t_env = StreamTableEnvironment.create(env, config) > source_ddl = """ > CREATE TABLE InTable ( > `ID` STRING, > `Timestamp` TIMESTAMP(3), > `Result` ROW( > `data` ROW(`value` BIGINT) ARRAY), > WATERMARK FOR `Timestamp` AS `Timestamp` > ) WITH ( > 'connector' = 'filesystem', > 'format' = 'json', > 'path' = '/tmp/1.txt' > ) > """ > sink_ddl = """ > CREATE TABLE OutTable ( > `ID` STRING, > `value` BIGINT > ) WITH ( > 'connector' = 'print' > ) > """ > t_env.execute_sql(source_ddl) > t_env.execute_sql(sink_ddl) > table = t_env.from_path('InTable') > table \ > .select( > table.ID, > table.Result.data.at(1).value) \ > .execute_insert('OutTable') \ > .wait() > {code} > It will thrown the following exception: > {code} > : scala.MatchError: ITEM($2.data, 1) (of class org.apache.calcite.rex.RexCall) > at > org.apache.flink.table.planner.plan.utils.NestedSchemaExtractor.internalVisit$1(NestedProjectionUtil.scala:273) > at > org.apache.flink.table.planner.plan.utils.NestedSchemaExtractor.visitFieldAccess(NestedProjectionUtil.scala:283) > at > org.apache.flink.table.planner.plan.utils.NestedSchemaExtractor.visitFieldAccess(NestedProjectionUtil.scala:269) > at org.apache.calcite.rex.RexFieldAccess.accept(RexFieldAccess.java:92) > at > org.apache.flink.table.planner.plan.utils.NestedProjectionUtil$$anonfun$build$1.apply(NestedProjectionUtil.scala:112) > at > org.apache.flink.table.planner.plan.utils.NestedProjectionUtil$$anonfun$build$1.apply(NestedProjectionUtil.scala:111) > at scala.collection.Iterator$class.foreach(Iterator.scala:891) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at > org.apache.flink.table.planner.plan.utils.NestedProjectionUtil$.build(NestedProjectionUtil.scala:111) > at > org.apache.flink.table.planner.plan.utils.NestedProjectionUtil.build(NestedProjectionUtil.scala) > at > org.apache.flink.table.planner.plan.rules.logical.ProjectWatermarkAssignerTransposeRule.getUsedFieldsInTopLevelProjectAndWatermarkAssigner(ProjectWatermarkAssignerTransposeRule.java:155) > at > org.apache.flink.table.planner.plan.rules.logical.ProjectWatermarkAssignerTransposeRule.matches(ProjectWatermarkAssignerTransposeRule.java:65) > {code} > See > https://stackoverflow.com/questions/66888486/pyflink-extract-nested-fields-from-json-array > for more details -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-22082) Nested projection push down doesn't work for data such as row(array(row))
[ https://issues.apache.org/jira/browse/FLINK-22082?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17313499#comment-17313499 ] Dian Fu commented on FLINK-22082: - [~jark] Thanks a lot for the confirmation. Agree with you. NestedProjectionUtil should handle properly for this case and make sure the job still able to run. > Nested projection push down doesn't work for data such as row(array(row)) > - > > Key: FLINK-22082 > URL: https://issues.apache.org/jira/browse/FLINK-22082 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.12.0, 1.13.0 >Reporter: Dian Fu >Priority: Critical > Fix For: 1.13.0, 1.12.3 > > > For the following job: > {code} > from pyflink.datastream import StreamExecutionEnvironment > from pyflink.table import TableConfig, StreamTableEnvironment > config = TableConfig() > env = StreamExecutionEnvironment.get_execution_environment() > t_env = StreamTableEnvironment.create(env, config) > source_ddl = """ > CREATE TABLE InTable ( > `ID` STRING, > `Timestamp` TIMESTAMP(3), > `Result` ROW( > `data` ROW(`value` BIGINT) ARRAY), > WATERMARK FOR `Timestamp` AS `Timestamp` > ) WITH ( > 'connector' = 'filesystem', > 'format' = 'json', > 'path' = '/tmp/1.txt' > ) > """ > sink_ddl = """ > CREATE TABLE OutTable ( > `ID` STRING, > `value` BIGINT > ) WITH ( > 'connector' = 'print' > ) > """ > t_env.execute_sql(source_ddl) > t_env.execute_sql(sink_ddl) > table = t_env.from_path('InTable') > table \ > .select( > table.ID, > table.Result.data.at(1).value) \ > .execute_insert('OutTable') \ > .wait() > {code} > It will thrown the following exception: > {code} > : scala.MatchError: ITEM($2.data, 1) (of class org.apache.calcite.rex.RexCall) > at > org.apache.flink.table.planner.plan.utils.NestedSchemaExtractor.internalVisit$1(NestedProjectionUtil.scala:273) > at > org.apache.flink.table.planner.plan.utils.NestedSchemaExtractor.visitFieldAccess(NestedProjectionUtil.scala:283) > at > org.apache.flink.table.planner.plan.utils.NestedSchemaExtractor.visitFieldAccess(NestedProjectionUtil.scala:269) > at org.apache.calcite.rex.RexFieldAccess.accept(RexFieldAccess.java:92) > at > org.apache.flink.table.planner.plan.utils.NestedProjectionUtil$$anonfun$build$1.apply(NestedProjectionUtil.scala:112) > at > org.apache.flink.table.planner.plan.utils.NestedProjectionUtil$$anonfun$build$1.apply(NestedProjectionUtil.scala:111) > at scala.collection.Iterator$class.foreach(Iterator.scala:891) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at > org.apache.flink.table.planner.plan.utils.NestedProjectionUtil$.build(NestedProjectionUtil.scala:111) > at > org.apache.flink.table.planner.plan.utils.NestedProjectionUtil.build(NestedProjectionUtil.scala) > at > org.apache.flink.table.planner.plan.rules.logical.ProjectWatermarkAssignerTransposeRule.getUsedFieldsInTopLevelProjectAndWatermarkAssigner(ProjectWatermarkAssignerTransposeRule.java:155) > at > org.apache.flink.table.planner.plan.rules.logical.ProjectWatermarkAssignerTransposeRule.matches(ProjectWatermarkAssignerTransposeRule.java:65) > {code} > See > https://stackoverflow.com/questions/66888486/pyflink-extract-nested-fields-from-json-array > for more details -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-22082) Nested projection push down doesn't work for data such as row(array(row))
[ https://issues.apache.org/jira/browse/FLINK-22082?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17313223#comment-17313223 ] Jark Wu commented on FLINK-22082: - [~dian.fu] kafka source doesn't support even the basic projection pushdown. > Nested projection push down doesn't work for data such as row(array(row)) > - > > Key: FLINK-22082 > URL: https://issues.apache.org/jira/browse/FLINK-22082 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.12.0, 1.13.0 >Reporter: Dian Fu >Priority: Major > > For the following job: > {code} > from pyflink.datastream import StreamExecutionEnvironment > from pyflink.table import TableConfig, StreamTableEnvironment > config = TableConfig() > env = StreamExecutionEnvironment.get_execution_environment() > t_env = StreamTableEnvironment.create(env, config) > source_ddl = """ > CREATE TABLE InTable ( > `ID` STRING, > `Timestamp` TIMESTAMP(3), > `Result` ROW( > `data` ROW(`value` BIGINT) ARRAY), > WATERMARK FOR `Timestamp` AS `Timestamp` > ) WITH ( > 'connector' = 'filesystem', > 'format' = 'json', > 'path' = '/tmp/1.txt' > ) > """ > sink_ddl = """ > CREATE TABLE OutTable ( > `ID` STRING, > `value` BIGINT > ) WITH ( > 'connector' = 'print' > ) > """ > t_env.execute_sql(source_ddl) > t_env.execute_sql(sink_ddl) > table = t_env.from_path('InTable') > table \ > .select( > table.ID, > table.Result.data.at(1).value) \ > .execute_insert('OutTable') \ > .wait() > {code} > It will thrown the following exception: > {code} > : scala.MatchError: ITEM($2.data, 1) (of class org.apache.calcite.rex.RexCall) > at > org.apache.flink.table.planner.plan.utils.NestedSchemaExtractor.internalVisit$1(NestedProjectionUtil.scala:273) > at > org.apache.flink.table.planner.plan.utils.NestedSchemaExtractor.visitFieldAccess(NestedProjectionUtil.scala:283) > at > org.apache.flink.table.planner.plan.utils.NestedSchemaExtractor.visitFieldAccess(NestedProjectionUtil.scala:269) > at org.apache.calcite.rex.RexFieldAccess.accept(RexFieldAccess.java:92) > at > org.apache.flink.table.planner.plan.utils.NestedProjectionUtil$$anonfun$build$1.apply(NestedProjectionUtil.scala:112) > at > org.apache.flink.table.planner.plan.utils.NestedProjectionUtil$$anonfun$build$1.apply(NestedProjectionUtil.scala:111) > at scala.collection.Iterator$class.foreach(Iterator.scala:891) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at > org.apache.flink.table.planner.plan.utils.NestedProjectionUtil$.build(NestedProjectionUtil.scala:111) > at > org.apache.flink.table.planner.plan.utils.NestedProjectionUtil.build(NestedProjectionUtil.scala) > at > org.apache.flink.table.planner.plan.rules.logical.ProjectWatermarkAssignerTransposeRule.getUsedFieldsInTopLevelProjectAndWatermarkAssigner(ProjectWatermarkAssignerTransposeRule.java:155) > at > org.apache.flink.table.planner.plan.rules.logical.ProjectWatermarkAssignerTransposeRule.matches(ProjectWatermarkAssignerTransposeRule.java:65) > {code} > See > https://stackoverflow.com/questions/66888486/pyflink-extract-nested-fields-from-json-array > for more details -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-22082) Nested projection push down doesn't work for data such as row(array(row))
[ https://issues.apache.org/jira/browse/FLINK-22082?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17313224#comment-17313224 ] Jark Wu commented on FLINK-22082: - But I think we should throw the exception, the job should be able to run. So this is a bug. > Nested projection push down doesn't work for data such as row(array(row)) > - > > Key: FLINK-22082 > URL: https://issues.apache.org/jira/browse/FLINK-22082 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.12.0, 1.13.0 >Reporter: Dian Fu >Priority: Major > > For the following job: > {code} > from pyflink.datastream import StreamExecutionEnvironment > from pyflink.table import TableConfig, StreamTableEnvironment > config = TableConfig() > env = StreamExecutionEnvironment.get_execution_environment() > t_env = StreamTableEnvironment.create(env, config) > source_ddl = """ > CREATE TABLE InTable ( > `ID` STRING, > `Timestamp` TIMESTAMP(3), > `Result` ROW( > `data` ROW(`value` BIGINT) ARRAY), > WATERMARK FOR `Timestamp` AS `Timestamp` > ) WITH ( > 'connector' = 'filesystem', > 'format' = 'json', > 'path' = '/tmp/1.txt' > ) > """ > sink_ddl = """ > CREATE TABLE OutTable ( > `ID` STRING, > `value` BIGINT > ) WITH ( > 'connector' = 'print' > ) > """ > t_env.execute_sql(source_ddl) > t_env.execute_sql(sink_ddl) > table = t_env.from_path('InTable') > table \ > .select( > table.ID, > table.Result.data.at(1).value) \ > .execute_insert('OutTable') \ > .wait() > {code} > It will thrown the following exception: > {code} > : scala.MatchError: ITEM($2.data, 1) (of class org.apache.calcite.rex.RexCall) > at > org.apache.flink.table.planner.plan.utils.NestedSchemaExtractor.internalVisit$1(NestedProjectionUtil.scala:273) > at > org.apache.flink.table.planner.plan.utils.NestedSchemaExtractor.visitFieldAccess(NestedProjectionUtil.scala:283) > at > org.apache.flink.table.planner.plan.utils.NestedSchemaExtractor.visitFieldAccess(NestedProjectionUtil.scala:269) > at org.apache.calcite.rex.RexFieldAccess.accept(RexFieldAccess.java:92) > at > org.apache.flink.table.planner.plan.utils.NestedProjectionUtil$$anonfun$build$1.apply(NestedProjectionUtil.scala:112) > at > org.apache.flink.table.planner.plan.utils.NestedProjectionUtil$$anonfun$build$1.apply(NestedProjectionUtil.scala:111) > at scala.collection.Iterator$class.foreach(Iterator.scala:891) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at > org.apache.flink.table.planner.plan.utils.NestedProjectionUtil$.build(NestedProjectionUtil.scala:111) > at > org.apache.flink.table.planner.plan.utils.NestedProjectionUtil.build(NestedProjectionUtil.scala) > at > org.apache.flink.table.planner.plan.rules.logical.ProjectWatermarkAssignerTransposeRule.getUsedFieldsInTopLevelProjectAndWatermarkAssigner(ProjectWatermarkAssignerTransposeRule.java:155) > at > org.apache.flink.table.planner.plan.rules.logical.ProjectWatermarkAssignerTransposeRule.matches(ProjectWatermarkAssignerTransposeRule.java:65) > {code} > See > https://stackoverflow.com/questions/66888486/pyflink-extract-nested-fields-from-json-array > for more details -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-22082) Nested projection push down doesn't work for data such as row(array(row))
[ https://issues.apache.org/jira/browse/FLINK-22082?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17312880#comment-17312880 ] Dian Fu commented on FLINK-22082: - Does kafka source support nested projection pushdown? Actually it uses Kafka source in the original job: https://stackoverflow.com/questions/66888486/pyflink-extract-nested-fields-from-json-array I just changed it to filesystem for ease of validation(as FileSystemTableSource is bundled in the blink planner). For the exception itself, it seems that it has nothing to do with what the source is. > Nested projection push down doesn't work for data such as row(array(row)) > - > > Key: FLINK-22082 > URL: https://issues.apache.org/jira/browse/FLINK-22082 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.12.0, 1.13.0 >Reporter: Dian Fu >Priority: Major > > For the following job: > {code} > from pyflink.datastream import StreamExecutionEnvironment > from pyflink.table import TableConfig, StreamTableEnvironment > config = TableConfig() > env = StreamExecutionEnvironment.get_execution_environment() > t_env = StreamTableEnvironment.create(env, config) > source_ddl = """ > CREATE TABLE InTable ( > `ID` STRING, > `Timestamp` TIMESTAMP(3), > `Result` ROW( > `data` ROW(`value` BIGINT) ARRAY), > WATERMARK FOR `Timestamp` AS `Timestamp` > ) WITH ( > 'connector' = 'filesystem', > 'format' = 'json', > 'path' = '/tmp/1.txt' > ) > """ > sink_ddl = """ > CREATE TABLE OutTable ( > `ID` STRING, > `value` BIGINT > ) WITH ( > 'connector' = 'print' > ) > """ > t_env.execute_sql(source_ddl) > t_env.execute_sql(sink_ddl) > table = t_env.from_path('InTable') > table \ > .select( > table.ID, > table.Result.data.at(1).value) \ > .execute_insert('OutTable') \ > .wait() > {code} > It will thrown the following exception: > {code} > : scala.MatchError: ITEM($2.data, 1) (of class org.apache.calcite.rex.RexCall) > at > org.apache.flink.table.planner.plan.utils.NestedSchemaExtractor.internalVisit$1(NestedProjectionUtil.scala:273) > at > org.apache.flink.table.planner.plan.utils.NestedSchemaExtractor.visitFieldAccess(NestedProjectionUtil.scala:283) > at > org.apache.flink.table.planner.plan.utils.NestedSchemaExtractor.visitFieldAccess(NestedProjectionUtil.scala:269) > at org.apache.calcite.rex.RexFieldAccess.accept(RexFieldAccess.java:92) > at > org.apache.flink.table.planner.plan.utils.NestedProjectionUtil$$anonfun$build$1.apply(NestedProjectionUtil.scala:112) > at > org.apache.flink.table.planner.plan.utils.NestedProjectionUtil$$anonfun$build$1.apply(NestedProjectionUtil.scala:111) > at scala.collection.Iterator$class.foreach(Iterator.scala:891) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at > org.apache.flink.table.planner.plan.utils.NestedProjectionUtil$.build(NestedProjectionUtil.scala:111) > at > org.apache.flink.table.planner.plan.utils.NestedProjectionUtil.build(NestedProjectionUtil.scala) > at > org.apache.flink.table.planner.plan.rules.logical.ProjectWatermarkAssignerTransposeRule.getUsedFieldsInTopLevelProjectAndWatermarkAssigner(ProjectWatermarkAssignerTransposeRule.java:155) > at > org.apache.flink.table.planner.plan.rules.logical.ProjectWatermarkAssignerTransposeRule.matches(ProjectWatermarkAssignerTransposeRule.java:65) > {code} > See > https://stackoverflow.com/questions/66888486/pyflink-extract-nested-fields-from-json-array > for more details -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-22082) Nested projection push down doesn't work for data such as row(array(row))
[ https://issues.apache.org/jira/browse/FLINK-22082?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17312872#comment-17312872 ] Jark Wu commented on FLINK-22082: - {{FileSystemTableSource}} doesn't support nested projection pushdown yet. > Nested projection push down doesn't work for data such as row(array(row)) > - > > Key: FLINK-22082 > URL: https://issues.apache.org/jira/browse/FLINK-22082 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.12.0, 1.13.0 >Reporter: Dian Fu >Priority: Major > > For the following job: > {code} > from pyflink.datastream import StreamExecutionEnvironment > from pyflink.table import TableConfig, StreamTableEnvironment > config = TableConfig() > env = StreamExecutionEnvironment.get_execution_environment() > t_env = StreamTableEnvironment.create(env, config) > source_ddl = """ > CREATE TABLE InTable ( > `ID` STRING, > `Timestamp` TIMESTAMP(3), > `Result` ROW( > `data` ROW(`value` BIGINT) ARRAY), > WATERMARK FOR `Timestamp` AS `Timestamp` > ) WITH ( > 'connector' = 'filesystem', > 'format' = 'json', > 'path' = '/tmp/1.txt' > ) > """ > sink_ddl = """ > CREATE TABLE OutTable ( > `ID` STRING, > `value` BIGINT > ) WITH ( > 'connector' = 'print' > ) > """ > t_env.execute_sql(source_ddl) > t_env.execute_sql(sink_ddl) > table = t_env.from_path('InTable') > table \ > .select( > table.ID, > table.Result.data.at(1).value) \ > .execute_insert('OutTable') \ > .wait() > {code} > It will thrown the following exception: > {code} > : scala.MatchError: ITEM($2.data, 1) (of class org.apache.calcite.rex.RexCall) > at > org.apache.flink.table.planner.plan.utils.NestedSchemaExtractor.internalVisit$1(NestedProjectionUtil.scala:273) > at > org.apache.flink.table.planner.plan.utils.NestedSchemaExtractor.visitFieldAccess(NestedProjectionUtil.scala:283) > at > org.apache.flink.table.planner.plan.utils.NestedSchemaExtractor.visitFieldAccess(NestedProjectionUtil.scala:269) > at org.apache.calcite.rex.RexFieldAccess.accept(RexFieldAccess.java:92) > at > org.apache.flink.table.planner.plan.utils.NestedProjectionUtil$$anonfun$build$1.apply(NestedProjectionUtil.scala:112) > at > org.apache.flink.table.planner.plan.utils.NestedProjectionUtil$$anonfun$build$1.apply(NestedProjectionUtil.scala:111) > at scala.collection.Iterator$class.foreach(Iterator.scala:891) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at > org.apache.flink.table.planner.plan.utils.NestedProjectionUtil$.build(NestedProjectionUtil.scala:111) > at > org.apache.flink.table.planner.plan.utils.NestedProjectionUtil.build(NestedProjectionUtil.scala) > at > org.apache.flink.table.planner.plan.rules.logical.ProjectWatermarkAssignerTransposeRule.getUsedFieldsInTopLevelProjectAndWatermarkAssigner(ProjectWatermarkAssignerTransposeRule.java:155) > at > org.apache.flink.table.planner.plan.rules.logical.ProjectWatermarkAssignerTransposeRule.matches(ProjectWatermarkAssignerTransposeRule.java:65) > {code} > See > https://stackoverflow.com/questions/66888486/pyflink-extract-nested-fields-from-json-array > for more details -- This message was sent by Atlassian Jira (v8.3.4#803005)