[jira] [Commented] (FLINK-18070) Time attribute been materialized after sub graph optimize
[ https://issues.apache.org/jira/browse/FLINK-18070?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17138115#comment-17138115 ] godfrey he commented on FLINK-18070: we need to materialize time attribute only in sink-block. > Time attribute been materialized after sub graph optimize > - > > Key: FLINK-18070 > URL: https://issues.apache.org/jira/browse/FLINK-18070 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.10.0 >Reporter: YufeiLiu >Assignee: YufeiLiu >Priority: Major > Fix For: 1.11.0 > > > Hi, I want to use window aggregate after create temporary, and has multiple > sinks. But throw exception: > {code:java} > java.lang.AssertionError: type mismatch: > ref: > TIME ATTRIBUTE(PROCTIME) NOT NULL > input: > TIMESTAMP(3) NOT NULL > {code} > I look into the optimizer logic, there is comment at > {{CommonSubGraphBasedOptimizer}}: > "1. In general, for multi-sinks users tend to use VIEW which is a natural > common sub-graph." > After sub graph optimize, time attribute from source have been convert to > basic TIMESTAMP type according to {{FlinkRelTimeIndicatorProgram}}. But my > create view sql is simple query, I think didn't need to materialized time > attribute in theory. > Here is my code: > {code:java} > // connector.type COLLECTION is for debug use > tableEnv.sqlUpdate("CREATE TABLE source (\n" + > "`ts` AS PROCTIME(),\n" + > "`order_type` INT\n" + > ") WITH (\n" + > "'connector.type' = 'COLLECTION',\n" + > "'format.type' = 'json'\n" + > ")\n"); > tableEnv.createTemporaryView("source_view", tableEnv.sqlQuery("SELECT * FROM > source")); > tableEnv.sqlUpdate("CREATE TABLE sink (\n" + > "`result` BIGINT\n" + > ") WITH (\n" + > "'connector.type' = 'COLLECTION',\n" + > "'format.type' = 'json'\n" + > ")\n"); > tableEnv.sqlUpdate("INSERT INTO sink \n" + > "SELECT\n" + > "COUNT(1)\n" + > "FROM\n" + > "`source_view`\n" + > "WHERE\n" + > " `order_type` = 33\n" + > "GROUP BY\n" + > "TUMBLE(`ts`, INTERVAL '5' SECOND)\n"); > tableEnv.sqlUpdate("INSERT INTO sink \n" + > "SELECT\n" + > "COUNT(1)\n" + > "FROM\n" + > "`source_view`\n" + > "WHERE\n" + > " `order_type` = 34\n" + > "GROUP BY\n" + > "TUMBLE(`ts`, INTERVAL '5' SECOND)\n"); > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-18070) Time attribute been materialized after sub graph optimize
[ https://issues.apache.org/jira/browse/FLINK-18070?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17138114#comment-17138114 ] godfrey he commented on FLINK-18070: [~liuyufei] Thanks for reporting this. the simple approach to fix this is {{needFinalTimeIndicatorConversion}} method should return the value of {{isSinkBlock}} instead of {{true}} in {{StreamCommonSubGraphBasedOptimizer}}. > Time attribute been materialized after sub graph optimize > - > > Key: FLINK-18070 > URL: https://issues.apache.org/jira/browse/FLINK-18070 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.10.0 >Reporter: YufeiLiu >Assignee: YufeiLiu >Priority: Major > Fix For: 1.11.0 > > > Hi, I want to use window aggregate after create temporary, and has multiple > sinks. But throw exception: > {code:java} > java.lang.AssertionError: type mismatch: > ref: > TIME ATTRIBUTE(PROCTIME) NOT NULL > input: > TIMESTAMP(3) NOT NULL > {code} > I look into the optimizer logic, there is comment at > {{CommonSubGraphBasedOptimizer}}: > "1. In general, for multi-sinks users tend to use VIEW which is a natural > common sub-graph." > After sub graph optimize, time attribute from source have been convert to > basic TIMESTAMP type according to {{FlinkRelTimeIndicatorProgram}}. But my > create view sql is simple query, I think didn't need to materialized time > attribute in theory. > Here is my code: > {code:java} > // connector.type COLLECTION is for debug use > tableEnv.sqlUpdate("CREATE TABLE source (\n" + > "`ts` AS PROCTIME(),\n" + > "`order_type` INT\n" + > ") WITH (\n" + > "'connector.type' = 'COLLECTION',\n" + > "'format.type' = 'json'\n" + > ")\n"); > tableEnv.createTemporaryView("source_view", tableEnv.sqlQuery("SELECT * FROM > source")); > tableEnv.sqlUpdate("CREATE TABLE sink (\n" + > "`result` BIGINT\n" + > ") WITH (\n" + > "'connector.type' = 'COLLECTION',\n" + > "'format.type' = 'json'\n" + > ")\n"); > tableEnv.sqlUpdate("INSERT INTO sink \n" + > "SELECT\n" + > "COUNT(1)\n" + > "FROM\n" + > "`source_view`\n" + > "WHERE\n" + > " `order_type` = 33\n" + > "GROUP BY\n" + > "TUMBLE(`ts`, INTERVAL '5' SECOND)\n"); > tableEnv.sqlUpdate("INSERT INTO sink \n" + > "SELECT\n" + > "COUNT(1)\n" + > "FROM\n" + > "`source_view`\n" + > "WHERE\n" + > " `order_type` = 34\n" + > "GROUP BY\n" + > "TUMBLE(`ts`, INTERVAL '5' SECOND)\n"); > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-18070) Time attribute been materialized after sub graph optimize
[ https://issues.apache.org/jira/browse/FLINK-18070?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17138087#comment-17138087 ] Jark Wu commented on FLINK-18070: - cc [~godfreyhe] > Time attribute been materialized after sub graph optimize > - > > Key: FLINK-18070 > URL: https://issues.apache.org/jira/browse/FLINK-18070 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.10.0 >Reporter: YufeiLiu >Assignee: YufeiLiu >Priority: Major > Fix For: 1.11.0 > > > Hi, I want to use window aggregate after create temporary, and has multiple > sinks. But throw exception: > {code:java} > java.lang.AssertionError: type mismatch: > ref: > TIME ATTRIBUTE(PROCTIME) NOT NULL > input: > TIMESTAMP(3) NOT NULL > {code} > I look into the optimizer logic, there is comment at > {{CommonSubGraphBasedOptimizer}}: > "1. In general, for multi-sinks users tend to use VIEW which is a natural > common sub-graph." > After sub graph optimize, time attribute from source have been convert to > basic TIMESTAMP type according to {{FlinkRelTimeIndicatorProgram}}. But my > create view sql is simple query, I think didn't need to materialized time > attribute in theory. > Here is my code: > {code:java} > // connector.type COLLECTION is for debug use > tableEnv.sqlUpdate("CREATE TABLE source (\n" + > "`ts` AS PROCTIME(),\n" + > "`order_type` INT\n" + > ") WITH (\n" + > "'connector.type' = 'COLLECTION',\n" + > "'format.type' = 'json'\n" + > ")\n"); > tableEnv.createTemporaryView("source_view", tableEnv.sqlQuery("SELECT * FROM > source")); > tableEnv.sqlUpdate("CREATE TABLE sink (\n" + > "`result` BIGINT\n" + > ") WITH (\n" + > "'connector.type' = 'COLLECTION',\n" + > "'format.type' = 'json'\n" + > ")\n"); > tableEnv.sqlUpdate("INSERT INTO sink \n" + > "SELECT\n" + > "COUNT(1)\n" + > "FROM\n" + > "`source_view`\n" + > "WHERE\n" + > " `order_type` = 33\n" + > "GROUP BY\n" + > "TUMBLE(`ts`, INTERVAL '5' SECOND)\n"); > tableEnv.sqlUpdate("INSERT INTO sink \n" + > "SELECT\n" + > "COUNT(1)\n" + > "FROM\n" + > "`source_view`\n" + > "WHERE\n" + > " `order_type` = 34\n" + > "GROUP BY\n" + > "TUMBLE(`ts`, INTERVAL '5' SECOND)\n"); > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-18070) Time attribute been materialized after sub graph optimize
[ https://issues.apache.org/jira/browse/FLINK-18070?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17126599#comment-17126599 ] YufeiLiu commented on FLINK-18070: -- I still didn't get it, could you give me a example about this situation? Is a final block always a LogicSink? So if we don't need materialize processtime in intermediate block and LogicSink, it will always return convertedRoot directly. > Time attribute been materialized after sub graph optimize > - > > Key: FLINK-18070 > URL: https://issues.apache.org/jira/browse/FLINK-18070 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.10.0 >Reporter: YufeiLiu >Assignee: YufeiLiu >Priority: Major > Fix For: 1.11.0 > > > Hi, I want to use window aggregate after create temporary, and has multiple > sinks. But throw exception: > {code:java} > java.lang.AssertionError: type mismatch: > ref: > TIME ATTRIBUTE(PROCTIME) NOT NULL > input: > TIMESTAMP(3) NOT NULL > {code} > I look into the optimizer logic, there is comment at > {{CommonSubGraphBasedOptimizer}}: > "1. In general, for multi-sinks users tend to use VIEW which is a natural > common sub-graph." > After sub graph optimize, time attribute from source have been convert to > basic TIMESTAMP type according to {{FlinkRelTimeIndicatorProgram}}. But my > create view sql is simple query, I think didn't need to materialized time > attribute in theory. > Here is my code: > {code:java} > // connector.type COLLECTION is for debug use > tableEnv.sqlUpdate("CREATE TABLE source (\n" + > "`ts` AS PROCTIME(),\n" + > "`order_type` INT\n" + > ") WITH (\n" + > "'connector.type' = 'COLLECTION',\n" + > "'format.type' = 'json'\n" + > ")\n"); > tableEnv.createTemporaryView("source_view", tableEnv.sqlQuery("SELECT * FROM > source")); > tableEnv.sqlUpdate("CREATE TABLE sink (\n" + > "`result` BIGINT\n" + > ") WITH (\n" + > "'connector.type' = 'COLLECTION',\n" + > "'format.type' = 'json'\n" + > ")\n"); > tableEnv.sqlUpdate("INSERT INTO sink \n" + > "SELECT\n" + > "COUNT(1)\n" + > "FROM\n" + > "`source_view`\n" + > "WHERE\n" + > " `order_type` = 33\n" + > "GROUP BY\n" + > "TUMBLE(`ts`, INTERVAL '5' SECOND)\n"); > tableEnv.sqlUpdate("INSERT INTO sink \n" + > "SELECT\n" + > "COUNT(1)\n" + > "FROM\n" + > "`source_view`\n" + > "WHERE\n" + > " `order_type` = 34\n" + > "GROUP BY\n" + > "TUMBLE(`ts`, INTERVAL '5' SECOND)\n"); > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-18070) Time attribute been materialized after sub graph optimize
[ https://issues.apache.org/jira/browse/FLINK-18070?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17126322#comment-17126322 ] Benchao Li commented on FLINK-18070: As you can see, we only convert the time attributes from the input of sink. Actually, the sink node can have it's time attributes too, that's why this line exists: ```java // the LogicalSink is converted in RelTimeIndicatorConverter before if (rootRel.isInstanceOf[LogicalLegacySink] || !needFinalTimeIndicatorConversion) { return convertedRoot } ``` I think the right way to fix this is to leverage the `needFinalTimeIndicatorConversion` variable in `RelTimeIndicatorConverter.convert`, it's always true for now. We need to differentiate the case whether current optimization block is a final block or an intermediate logical block. > Time attribute been materialized after sub graph optimize > - > > Key: FLINK-18070 > URL: https://issues.apache.org/jira/browse/FLINK-18070 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.10.0 >Reporter: YufeiLiu >Assignee: YufeiLiu >Priority: Major > Fix For: 1.11.0 > > > Hi, I want to use window aggregate after create temporary, and has multiple > sinks. But throw exception: > {code:java} > java.lang.AssertionError: type mismatch: > ref: > TIME ATTRIBUTE(PROCTIME) NOT NULL > input: > TIMESTAMP(3) NOT NULL > {code} > I look into the optimizer logic, there is comment at > {{CommonSubGraphBasedOptimizer}}: > "1. In general, for multi-sinks users tend to use VIEW which is a natural > common sub-graph." > After sub graph optimize, time attribute from source have been convert to > basic TIMESTAMP type according to {{FlinkRelTimeIndicatorProgram}}. But my > create view sql is simple query, I think didn't need to materialized time > attribute in theory. > Here is my code: > {code:java} > // connector.type COLLECTION is for debug use > tableEnv.sqlUpdate("CREATE TABLE source (\n" + > "`ts` AS PROCTIME(),\n" + > "`order_type` INT\n" + > ") WITH (\n" + > "'connector.type' = 'COLLECTION',\n" + > "'format.type' = 'json'\n" + > ")\n"); > tableEnv.createTemporaryView("source_view", tableEnv.sqlQuery("SELECT * FROM > source")); > tableEnv.sqlUpdate("CREATE TABLE sink (\n" + > "`result` BIGINT\n" + > ") WITH (\n" + > "'connector.type' = 'COLLECTION',\n" + > "'format.type' = 'json'\n" + > ")\n"); > tableEnv.sqlUpdate("INSERT INTO sink \n" + > "SELECT\n" + > "COUNT(1)\n" + > "FROM\n" + > "`source_view`\n" + > "WHERE\n" + > " `order_type` = 33\n" + > "GROUP BY\n" + > "TUMBLE(`ts`, INTERVAL '5' SECOND)\n"); > tableEnv.sqlUpdate("INSERT INTO sink \n" + > "SELECT\n" + > "COUNT(1)\n" + > "FROM\n" + > "`source_view`\n" + > "WHERE\n" + > " `order_type` = 34\n" + > "GROUP BY\n" + > "TUMBLE(`ts`, INTERVAL '5' SECOND)\n"); > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-18070) Time attribute been materialized after sub graph optimize
[ https://issues.apache.org/jira/browse/FLINK-18070?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17125952#comment-17125952 ] YufeiLiu commented on FLINK-18070: -- [~libenchao] process time have already been materialized at begining of convert. {code:java} val converter = new RelTimeIndicatorConverter(rexBuilder) val convertedRoot = rootRel.accept(converter) {code} > Time attribute been materialized after sub graph optimize > - > > Key: FLINK-18070 > URL: https://issues.apache.org/jira/browse/FLINK-18070 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.10.0 >Reporter: YufeiLiu >Assignee: YufeiLiu >Priority: Major > Fix For: 1.11.0 > > > Hi, I want to use window aggregate after create temporary, and has multiple > sinks. But throw exception: > {code:java} > java.lang.AssertionError: type mismatch: > ref: > TIME ATTRIBUTE(PROCTIME) NOT NULL > input: > TIMESTAMP(3) NOT NULL > {code} > I look into the optimizer logic, there is comment at > {{CommonSubGraphBasedOptimizer}}: > "1. In general, for multi-sinks users tend to use VIEW which is a natural > common sub-graph." > After sub graph optimize, time attribute from source have been convert to > basic TIMESTAMP type according to {{FlinkRelTimeIndicatorProgram}}. But my > create view sql is simple query, I think didn't need to materialized time > attribute in theory. > Here is my code: > {code:java} > // connector.type COLLECTION is for debug use > tableEnv.sqlUpdate("CREATE TABLE source (\n" + > "`ts` AS PROCTIME(),\n" + > "`order_type` INT\n" + > ") WITH (\n" + > "'connector.type' = 'COLLECTION',\n" + > "'format.type' = 'json'\n" + > ")\n"); > tableEnv.createTemporaryView("source_view", tableEnv.sqlQuery("SELECT * FROM > source")); > tableEnv.sqlUpdate("CREATE TABLE sink (\n" + > "`result` BIGINT\n" + > ") WITH (\n" + > "'connector.type' = 'COLLECTION',\n" + > "'format.type' = 'json'\n" + > ")\n"); > tableEnv.sqlUpdate("INSERT INTO sink \n" + > "SELECT\n" + > "COUNT(1)\n" + > "FROM\n" + > "`source_view`\n" + > "WHERE\n" + > " `order_type` = 33\n" + > "GROUP BY\n" + > "TUMBLE(`ts`, INTERVAL '5' SECOND)\n"); > tableEnv.sqlUpdate("INSERT INTO sink \n" + > "SELECT\n" + > "COUNT(1)\n" + > "FROM\n" + > "`source_view`\n" + > "WHERE\n" + > " `order_type` = 34\n" + > "GROUP BY\n" + > "TUMBLE(`ts`, INTERVAL '5' SECOND)\n"); > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-18070) Time attribute been materialized after sub graph optimize
[ https://issues.apache.org/jira/browse/FLINK-18070?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17125873#comment-17125873 ] Benchao Li commented on FLINK-18070: [~liuyufei] we need to materialize the proctime time if we need it in sink, that's why it exists. > Time attribute been materialized after sub graph optimize > - > > Key: FLINK-18070 > URL: https://issues.apache.org/jira/browse/FLINK-18070 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.10.0 >Reporter: YufeiLiu >Assignee: YufeiLiu >Priority: Major > Fix For: 1.11.0 > > > Hi, I want to use window aggregate after create temporary, and has multiple > sinks. But throw exception: > {code:java} > java.lang.AssertionError: type mismatch: > ref: > TIME ATTRIBUTE(PROCTIME) NOT NULL > input: > TIMESTAMP(3) NOT NULL > {code} > I look into the optimizer logic, there is comment at > {{CommonSubGraphBasedOptimizer}}: > "1. In general, for multi-sinks users tend to use VIEW which is a natural > common sub-graph." > After sub graph optimize, time attribute from source have been convert to > basic TIMESTAMP type according to {{FlinkRelTimeIndicatorProgram}}. But my > create view sql is simple query, I think didn't need to materialized time > attribute in theory. > Here is my code: > {code:java} > // connector.type COLLECTION is for debug use > tableEnv.sqlUpdate("CREATE TABLE source (\n" + > "`ts` AS PROCTIME(),\n" + > "`order_type` INT\n" + > ") WITH (\n" + > "'connector.type' = 'COLLECTION',\n" + > "'format.type' = 'json'\n" + > ")\n"); > tableEnv.createTemporaryView("source_view", tableEnv.sqlQuery("SELECT * FROM > source")); > tableEnv.sqlUpdate("CREATE TABLE sink (\n" + > "`result` BIGINT\n" + > ") WITH (\n" + > "'connector.type' = 'COLLECTION',\n" + > "'format.type' = 'json'\n" + > ")\n"); > tableEnv.sqlUpdate("INSERT INTO sink \n" + > "SELECT\n" + > "COUNT(1)\n" + > "FROM\n" + > "`source_view`\n" + > "WHERE\n" + > " `order_type` = 33\n" + > "GROUP BY\n" + > "TUMBLE(`ts`, INTERVAL '5' SECOND)\n"); > tableEnv.sqlUpdate("INSERT INTO sink \n" + > "SELECT\n" + > "COUNT(1)\n" + > "FROM\n" + > "`source_view`\n" + > "WHERE\n" + > " `order_type` = 34\n" + > "GROUP BY\n" + > "TUMBLE(`ts`, INTERVAL '5' SECOND)\n"); > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-18070) Time attribute been materialized after sub graph optimize
[ https://issues.apache.org/jira/browse/FLINK-18070?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17125587#comment-17125587 ] YufeiLiu commented on FLINK-18070: -- [~libenchao] I feel uncertain about why we need "_materialize remaining proctime indicators_" in {{RelTimeIndicatorConverter}}, I only change {{needFinalTimeIndicatorConversion}} to false skip follow materialize convert, seems the problem sloved and didn't see other affects. > Time attribute been materialized after sub graph optimize > - > > Key: FLINK-18070 > URL: https://issues.apache.org/jira/browse/FLINK-18070 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.10.0 >Reporter: YufeiLiu >Assignee: YufeiLiu >Priority: Major > Fix For: 1.11.0 > > > Hi, I want to use window aggregate after create temporary, and has multiple > sinks. But throw exception: > {code:java} > java.lang.AssertionError: type mismatch: > ref: > TIME ATTRIBUTE(PROCTIME) NOT NULL > input: > TIMESTAMP(3) NOT NULL > {code} > I look into the optimizer logic, there is comment at > {{CommonSubGraphBasedOptimizer}}: > "1. In general, for multi-sinks users tend to use VIEW which is a natural > common sub-graph." > After sub graph optimize, time attribute from source have been convert to > basic TIMESTAMP type according to {{FlinkRelTimeIndicatorProgram}}. But my > create view sql is simple query, I think didn't need to materialized time > attribute in theory. > Here is my code: > {code:java} > // connector.type COLLECTION is for debug use > tableEnv.sqlUpdate("CREATE TABLE source (\n" + > "`ts` AS PROCTIME(),\n" + > "`order_type` INT\n" + > ") WITH (\n" + > "'connector.type' = 'COLLECTION',\n" + > "'format.type' = 'json'\n" + > ")\n"); > tableEnv.createTemporaryView("source_view", tableEnv.sqlQuery("SELECT * FROM > source")); > tableEnv.sqlUpdate("CREATE TABLE sink (\n" + > "`result` BIGINT\n" + > ") WITH (\n" + > "'connector.type' = 'COLLECTION',\n" + > "'format.type' = 'json'\n" + > ")\n"); > tableEnv.sqlUpdate("INSERT INTO sink \n" + > "SELECT\n" + > "COUNT(1)\n" + > "FROM\n" + > "`source_view`\n" + > "WHERE\n" + > " `order_type` = 33\n" + > "GROUP BY\n" + > "TUMBLE(`ts`, INTERVAL '5' SECOND)\n"); > tableEnv.sqlUpdate("INSERT INTO sink \n" + > "SELECT\n" + > "COUNT(1)\n" + > "FROM\n" + > "`source_view`\n" + > "WHERE\n" + > " `order_type` = 34\n" + > "GROUP BY\n" + > "TUMBLE(`ts`, INTERVAL '5' SECOND)\n"); > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-18070) Time attribute been materialized after sub graph optimize
[ https://issues.apache.org/jira/browse/FLINK-18070?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17124894#comment-17124894 ] YufeiLiu commented on FLINK-18070: -- [~libenchao] Sure, I'd love to contribute. Please assign this issue to me. > Time attribute been materialized after sub graph optimize > - > > Key: FLINK-18070 > URL: https://issues.apache.org/jira/browse/FLINK-18070 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.10.0 >Reporter: YufeiLiu >Priority: Major > Fix For: 1.11.0 > > > Hi, I want to use window aggregate after create temporary, and has multiple > sinks. But throw exception: > {code:java} > java.lang.AssertionError: type mismatch: > ref: > TIME ATTRIBUTE(PROCTIME) NOT NULL > input: > TIMESTAMP(3) NOT NULL > {code} > I look into the optimizer logic, there is comment at > {{CommonSubGraphBasedOptimizer}}: > "1. In general, for multi-sinks users tend to use VIEW which is a natural > common sub-graph." > After sub graph optimize, time attribute from source have been convert to > basic TIMESTAMP type according to {{FlinkRelTimeIndicatorProgram}}. But my > create view sql is simple query, I think didn't need to materialized time > attribute in theory. > Here is my code: > {code:java} > // connector.type COLLECTION is for debug use > tableEnv.sqlUpdate("CREATE TABLE source (\n" + > "`ts` AS PROCTIME(),\n" + > "`order_type` INT\n" + > ") WITH (\n" + > "'connector.type' = 'COLLECTION',\n" + > "'format.type' = 'json'\n" + > ")\n"); > tableEnv.createTemporaryView("source_view", tableEnv.sqlQuery("SELECT * FROM > source")); > tableEnv.sqlUpdate("CREATE TABLE sink (\n" + > "`result` BIGINT\n" + > ") WITH (\n" + > "'connector.type' = 'COLLECTION',\n" + > "'format.type' = 'json'\n" + > ")\n"); > tableEnv.sqlUpdate("INSERT INTO sink \n" + > "SELECT\n" + > "COUNT(1)\n" + > "FROM\n" + > "`source_view`\n" + > "WHERE\n" + > " `order_type` = 33\n" + > "GROUP BY\n" + > "TUMBLE(`ts`, INTERVAL '5' SECOND)\n"); > tableEnv.sqlUpdate("INSERT INTO sink \n" + > "SELECT\n" + > "COUNT(1)\n" + > "FROM\n" + > "`source_view`\n" + > "WHERE\n" + > " `order_type` = 34\n" + > "GROUP BY\n" + > "TUMBLE(`ts`, INTERVAL '5' SECOND)\n"); > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-18070) Time attribute been materialized after sub graph optimize
[ https://issues.apache.org/jira/browse/FLINK-18070?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17124763#comment-17124763 ] Benchao Li commented on FLINK-18070: [~liuyufei] Do you want to contribute to this issue? If you don't have time, I can give a hand here. > Time attribute been materialized after sub graph optimize > - > > Key: FLINK-18070 > URL: https://issues.apache.org/jira/browse/FLINK-18070 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.10.0 >Reporter: YufeiLiu >Priority: Major > Fix For: 1.11.0 > > > Hi, I want to use window aggregate after create temporary, and has multiple > sinks. But throw exception: > {code:java} > java.lang.AssertionError: type mismatch: > ref: > TIME ATTRIBUTE(PROCTIME) NOT NULL > input: > TIMESTAMP(3) NOT NULL > {code} > I look into the optimizer logic, there is comment at > {{CommonSubGraphBasedOptimizer}}: > "1. In general, for multi-sinks users tend to use VIEW which is a natural > common sub-graph." > After sub graph optimize, time attribute from source have been convert to > basic TIMESTAMP type according to {{FlinkRelTimeIndicatorProgram}}. But my > create view sql is simple query, I think didn't need to materialized time > attribute in theory. > Here is my code: > {code:java} > // connector.type COLLECTION is for debug use > tableEnv.sqlUpdate("CREATE TABLE source (\n" + > "`ts` AS PROCTIME(),\n" + > "`order_type` INT\n" + > ") WITH (\n" + > "'connector.type' = 'COLLECTION',\n" + > "'format.type' = 'json'\n" + > ")\n"); > tableEnv.createTemporaryView("source_view", tableEnv.sqlQuery("SELECT * FROM > source")); > tableEnv.sqlUpdate("CREATE TABLE sink (\n" + > "`result` BIGINT\n" + > ") WITH (\n" + > "'connector.type' = 'COLLECTION',\n" + > "'format.type' = 'json'\n" + > ")\n"); > tableEnv.sqlUpdate("INSERT INTO sink \n" + > "SELECT\n" + > "COUNT(1)\n" + > "FROM\n" + > "`source_view`\n" + > "WHERE\n" + > " `order_type` = 33\n" + > "GROUP BY\n" + > "TUMBLE(`ts`, INTERVAL '5' SECOND)\n"); > tableEnv.sqlUpdate("INSERT INTO sink \n" + > "SELECT\n" + > "COUNT(1)\n" + > "FROM\n" + > "`source_view`\n" + > "WHERE\n" + > " `order_type` = 34\n" + > "GROUP BY\n" + > "TUMBLE(`ts`, INTERVAL '5' SECOND)\n"); > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-18070) Time attribute been materialized after sub graph optimize
[ https://issues.apache.org/jira/browse/FLINK-18070?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17124735#comment-17124735 ] Jark Wu commented on FLINK-18070: - Yes, I think this is a bug in CommonSubGraphBasedOptimizer. > Time attribute been materialized after sub graph optimize > - > > Key: FLINK-18070 > URL: https://issues.apache.org/jira/browse/FLINK-18070 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.10.0 >Reporter: YufeiLiu >Priority: Major > Fix For: 1.11.0 > > > Hi, I want to use window aggregate after create temporary, and has multiple > sinks. But throw exception: > {code:java} > java.lang.AssertionError: type mismatch: > ref: > TIME ATTRIBUTE(PROCTIME) NOT NULL > input: > TIMESTAMP(3) NOT NULL > {code} > I look into the optimizer logic, there is comment at > {{CommonSubGraphBasedOptimizer}}: > "1. In general, for multi-sinks users tend to use VIEW which is a natural > common sub-graph." > After sub graph optimize, time attribute from source have been convert to > basic TIMESTAMP type according to {{FlinkRelTimeIndicatorProgram}}. But my > create view sql is simple query, I think didn't need to materialized time > attribute in theory. > Here is my code: > {code:java} > // connector.type COLLECTION is for debug use > tableEnv.sqlUpdate("CREATE TABLE source (\n" + > "`ts` AS PROCTIME(),\n" + > "`order_type` INT\n" + > ") WITH (\n" + > "'connector.type' = 'COLLECTION',\n" + > "'format.type' = 'json'\n" + > ")\n"); > tableEnv.createTemporaryView("source_view", tableEnv.sqlQuery("SELECT * FROM > source")); > tableEnv.sqlUpdate("CREATE TABLE sink (\n" + > "`result` BIGINT\n" + > ") WITH (\n" + > "'connector.type' = 'COLLECTION',\n" + > "'format.type' = 'json'\n" + > ")\n"); > tableEnv.sqlUpdate("INSERT INTO sink \n" + > "SELECT\n" + > "COUNT(1)\n" + > "FROM\n" + > "`source_view`\n" + > "WHERE\n" + > " `order_type` = 33\n" + > "GROUP BY\n" + > "TUMBLE(`ts`, INTERVAL '5' SECOND)\n"); > tableEnv.sqlUpdate("INSERT INTO sink \n" + > "SELECT\n" + > "COUNT(1)\n" + > "FROM\n" + > "`source_view`\n" + > "WHERE\n" + > " `order_type` = 34\n" + > "GROUP BY\n" + > "TUMBLE(`ts`, INTERVAL '5' SECOND)\n"); > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-18070) Time attribute been materialized after sub graph optimize
[ https://issues.apache.org/jira/browse/FLINK-18070?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17124713#comment-17124713 ] Benchao Li commented on FLINK-18070: [~liuyufei] Thanks for reporting this issue, I also verified this bug in master branch. And I've took a deeper look into it, it's because we materialize the `proctime` field in middle block. CC [~jark] Actually we suffers from this bug in our internal 1.9 branch. > Time attribute been materialized after sub graph optimize > - > > Key: FLINK-18070 > URL: https://issues.apache.org/jira/browse/FLINK-18070 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.10.0 >Reporter: YufeiLiu >Priority: Major > Fix For: 1.11.0 > > > Hi, I want to use window aggregate after create temporary, and has multiple > sinks. But throw exception: > {code:java} > java.lang.AssertionError: type mismatch: > ref: > TIME ATTRIBUTE(PROCTIME) NOT NULL > input: > TIMESTAMP(3) NOT NULL > {code} > I look into the optimizer logic, there is comment at > {{CommonSubGraphBasedOptimizer}}: > "1. In general, for multi-sinks users tend to use VIEW which is a natural > common sub-graph." > After sub graph optimize, time attribute from source have been convert to > basic TIMESTAMP type according to {{FlinkRelTimeIndicatorProgram}}. But my > create view sql is simple query, I think didn't need to materialized time > attribute in theory. > Here is my code: > {code:java} > // connector.type COLLECTION is for debug use > tableEnv.sqlUpdate("CREATE TABLE source (\n" + > "`ts` AS PROCTIME(),\n" + > "`order_type` INT\n" + > ") WITH (\n" + > "'connector.type' = 'COLLECTION',\n" + > "'format.type' = 'json'\n" + > ")\n"); > tableEnv.createTemporaryView("source_view", tableEnv.sqlQuery("SELECT * FROM > source")); > tableEnv.sqlUpdate("CREATE TABLE sink (\n" + > "`result` BIGINT\n" + > ") WITH (\n" + > "'connector.type' = 'COLLECTION',\n" + > "'format.type' = 'json'\n" + > ")\n"); > tableEnv.sqlUpdate("INSERT INTO sink \n" + > "SELECT\n" + > "COUNT(1)\n" + > "FROM\n" + > "`source_view`\n" + > "WHERE\n" + > " `order_type` = 33\n" + > "GROUP BY\n" + > "TUMBLE(`ts`, INTERVAL '5' SECOND)\n"); > tableEnv.sqlUpdate("INSERT INTO sink \n" + > "SELECT\n" + > "COUNT(1)\n" + > "FROM\n" + > "`source_view`\n" + > "WHERE\n" + > " `order_type` = 34\n" + > "GROUP BY\n" + > "TUMBLE(`ts`, INTERVAL '5' SECOND)\n"); > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)