[jira] [Commented] (FLINK-18070) Time attribute been materialized after sub graph optimize

2020-06-16 Thread godfrey he (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18070?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17138115#comment-17138115
 ] 

godfrey he commented on FLINK-18070:


we need to materialize time attribute only in sink-block.

> Time attribute been materialized after sub graph optimize
> -
>
> Key: FLINK-18070
> URL: https://issues.apache.org/jira/browse/FLINK-18070
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.10.0
>Reporter: YufeiLiu
>Assignee: YufeiLiu
>Priority: Major
> Fix For: 1.11.0
>
>
> Hi, I want to use window aggregate after create temporary, and has multiple 
> sinks. But throw exception:
> {code:java}
> java.lang.AssertionError: type mismatch:
> ref:
> TIME ATTRIBUTE(PROCTIME) NOT NULL
> input:
> TIMESTAMP(3) NOT NULL
> {code}
> I look into the optimizer logic, there is comment at 
> {{CommonSubGraphBasedOptimizer}}:
> "1. In general, for multi-sinks users tend to use VIEW which is a natural 
> common sub-graph."
> After sub graph optimize, time attribute from source have been convert to 
> basic TIMESTAMP type according to {{FlinkRelTimeIndicatorProgram}}. But my 
> create view sql is simple query, I think didn't need to materialized time 
> attribute in theory.
> Here is my code:
> {code:java}
> // connector.type COLLECTION is for debug use
> tableEnv.sqlUpdate("CREATE TABLE source (\n" +
>   "`ts` AS PROCTIME(),\n" +
>   "`order_type` INT\n" +
>   ") WITH (\n" +
>   "'connector.type' = 'COLLECTION',\n" +
>   "'format.type' = 'json'\n" +
>   ")\n");
> tableEnv.createTemporaryView("source_view", tableEnv.sqlQuery("SELECT * FROM 
> source"));
> tableEnv.sqlUpdate("CREATE TABLE sink (\n" +
>   "`result` BIGINT\n" +
>   ") WITH (\n" +
>   "'connector.type' = 'COLLECTION',\n" +
>   "'format.type' = 'json'\n" +
>   ")\n");
> tableEnv.sqlUpdate("INSERT INTO sink \n" +
>   "SELECT\n" +
>   "COUNT(1)\n" +
>   "FROM\n" +
>   "`source_view`\n" +
>   "WHERE\n" +
>   " `order_type` = 33\n" +
>   "GROUP BY\n" +
>   "TUMBLE(`ts`, INTERVAL '5' SECOND)\n");
> tableEnv.sqlUpdate("INSERT INTO sink \n" +
>   "SELECT\n" +
>   "COUNT(1)\n" +
>   "FROM\n" +
>   "`source_view`\n" +
>   "WHERE\n" +
>   " `order_type` = 34\n" +
>   "GROUP BY\n" +
>   "TUMBLE(`ts`, INTERVAL '5' SECOND)\n");
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-18070) Time attribute been materialized after sub graph optimize

2020-06-16 Thread godfrey he (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18070?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17138114#comment-17138114
 ] 

godfrey he commented on FLINK-18070:


[~liuyufei] Thanks for reporting this. the simple approach to fix this is 
{{needFinalTimeIndicatorConversion}} method should return the value of 
{{isSinkBlock}} instead of {{true}} in {{StreamCommonSubGraphBasedOptimizer}}. 

> Time attribute been materialized after sub graph optimize
> -
>
> Key: FLINK-18070
> URL: https://issues.apache.org/jira/browse/FLINK-18070
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.10.0
>Reporter: YufeiLiu
>Assignee: YufeiLiu
>Priority: Major
> Fix For: 1.11.0
>
>
> Hi, I want to use window aggregate after create temporary, and has multiple 
> sinks. But throw exception:
> {code:java}
> java.lang.AssertionError: type mismatch:
> ref:
> TIME ATTRIBUTE(PROCTIME) NOT NULL
> input:
> TIMESTAMP(3) NOT NULL
> {code}
> I look into the optimizer logic, there is comment at 
> {{CommonSubGraphBasedOptimizer}}:
> "1. In general, for multi-sinks users tend to use VIEW which is a natural 
> common sub-graph."
> After sub graph optimize, time attribute from source have been convert to 
> basic TIMESTAMP type according to {{FlinkRelTimeIndicatorProgram}}. But my 
> create view sql is simple query, I think didn't need to materialized time 
> attribute in theory.
> Here is my code:
> {code:java}
> // connector.type COLLECTION is for debug use
> tableEnv.sqlUpdate("CREATE TABLE source (\n" +
>   "`ts` AS PROCTIME(),\n" +
>   "`order_type` INT\n" +
>   ") WITH (\n" +
>   "'connector.type' = 'COLLECTION',\n" +
>   "'format.type' = 'json'\n" +
>   ")\n");
> tableEnv.createTemporaryView("source_view", tableEnv.sqlQuery("SELECT * FROM 
> source"));
> tableEnv.sqlUpdate("CREATE TABLE sink (\n" +
>   "`result` BIGINT\n" +
>   ") WITH (\n" +
>   "'connector.type' = 'COLLECTION',\n" +
>   "'format.type' = 'json'\n" +
>   ")\n");
> tableEnv.sqlUpdate("INSERT INTO sink \n" +
>   "SELECT\n" +
>   "COUNT(1)\n" +
>   "FROM\n" +
>   "`source_view`\n" +
>   "WHERE\n" +
>   " `order_type` = 33\n" +
>   "GROUP BY\n" +
>   "TUMBLE(`ts`, INTERVAL '5' SECOND)\n");
> tableEnv.sqlUpdate("INSERT INTO sink \n" +
>   "SELECT\n" +
>   "COUNT(1)\n" +
>   "FROM\n" +
>   "`source_view`\n" +
>   "WHERE\n" +
>   " `order_type` = 34\n" +
>   "GROUP BY\n" +
>   "TUMBLE(`ts`, INTERVAL '5' SECOND)\n");
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-18070) Time attribute been materialized after sub graph optimize

2020-06-16 Thread Jark Wu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18070?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17138087#comment-17138087
 ] 

Jark Wu commented on FLINK-18070:
-

cc [~godfreyhe]

> Time attribute been materialized after sub graph optimize
> -
>
> Key: FLINK-18070
> URL: https://issues.apache.org/jira/browse/FLINK-18070
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.10.0
>Reporter: YufeiLiu
>Assignee: YufeiLiu
>Priority: Major
> Fix For: 1.11.0
>
>
> Hi, I want to use window aggregate after create temporary, and has multiple 
> sinks. But throw exception:
> {code:java}
> java.lang.AssertionError: type mismatch:
> ref:
> TIME ATTRIBUTE(PROCTIME) NOT NULL
> input:
> TIMESTAMP(3) NOT NULL
> {code}
> I look into the optimizer logic, there is comment at 
> {{CommonSubGraphBasedOptimizer}}:
> "1. In general, for multi-sinks users tend to use VIEW which is a natural 
> common sub-graph."
> After sub graph optimize, time attribute from source have been convert to 
> basic TIMESTAMP type according to {{FlinkRelTimeIndicatorProgram}}. But my 
> create view sql is simple query, I think didn't need to materialized time 
> attribute in theory.
> Here is my code:
> {code:java}
> // connector.type COLLECTION is for debug use
> tableEnv.sqlUpdate("CREATE TABLE source (\n" +
>   "`ts` AS PROCTIME(),\n" +
>   "`order_type` INT\n" +
>   ") WITH (\n" +
>   "'connector.type' = 'COLLECTION',\n" +
>   "'format.type' = 'json'\n" +
>   ")\n");
> tableEnv.createTemporaryView("source_view", tableEnv.sqlQuery("SELECT * FROM 
> source"));
> tableEnv.sqlUpdate("CREATE TABLE sink (\n" +
>   "`result` BIGINT\n" +
>   ") WITH (\n" +
>   "'connector.type' = 'COLLECTION',\n" +
>   "'format.type' = 'json'\n" +
>   ")\n");
> tableEnv.sqlUpdate("INSERT INTO sink \n" +
>   "SELECT\n" +
>   "COUNT(1)\n" +
>   "FROM\n" +
>   "`source_view`\n" +
>   "WHERE\n" +
>   " `order_type` = 33\n" +
>   "GROUP BY\n" +
>   "TUMBLE(`ts`, INTERVAL '5' SECOND)\n");
> tableEnv.sqlUpdate("INSERT INTO sink \n" +
>   "SELECT\n" +
>   "COUNT(1)\n" +
>   "FROM\n" +
>   "`source_view`\n" +
>   "WHERE\n" +
>   " `order_type` = 34\n" +
>   "GROUP BY\n" +
>   "TUMBLE(`ts`, INTERVAL '5' SECOND)\n");
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-18070) Time attribute been materialized after sub graph optimize

2020-06-05 Thread YufeiLiu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18070?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17126599#comment-17126599
 ] 

YufeiLiu commented on FLINK-18070:
--

I still didn't get it, could you give me a example about this situation?

Is a final block always a LogicSink? So if we don't need materialize 
processtime in intermediate block and LogicSink, it will always return 
convertedRoot directly. 

> Time attribute been materialized after sub graph optimize
> -
>
> Key: FLINK-18070
> URL: https://issues.apache.org/jira/browse/FLINK-18070
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.10.0
>Reporter: YufeiLiu
>Assignee: YufeiLiu
>Priority: Major
> Fix For: 1.11.0
>
>
> Hi, I want to use window aggregate after create temporary, and has multiple 
> sinks. But throw exception:
> {code:java}
> java.lang.AssertionError: type mismatch:
> ref:
> TIME ATTRIBUTE(PROCTIME) NOT NULL
> input:
> TIMESTAMP(3) NOT NULL
> {code}
> I look into the optimizer logic, there is comment at 
> {{CommonSubGraphBasedOptimizer}}:
> "1. In general, for multi-sinks users tend to use VIEW which is a natural 
> common sub-graph."
> After sub graph optimize, time attribute from source have been convert to 
> basic TIMESTAMP type according to {{FlinkRelTimeIndicatorProgram}}. But my 
> create view sql is simple query, I think didn't need to materialized time 
> attribute in theory.
> Here is my code:
> {code:java}
> // connector.type COLLECTION is for debug use
> tableEnv.sqlUpdate("CREATE TABLE source (\n" +
>   "`ts` AS PROCTIME(),\n" +
>   "`order_type` INT\n" +
>   ") WITH (\n" +
>   "'connector.type' = 'COLLECTION',\n" +
>   "'format.type' = 'json'\n" +
>   ")\n");
> tableEnv.createTemporaryView("source_view", tableEnv.sqlQuery("SELECT * FROM 
> source"));
> tableEnv.sqlUpdate("CREATE TABLE sink (\n" +
>   "`result` BIGINT\n" +
>   ") WITH (\n" +
>   "'connector.type' = 'COLLECTION',\n" +
>   "'format.type' = 'json'\n" +
>   ")\n");
> tableEnv.sqlUpdate("INSERT INTO sink \n" +
>   "SELECT\n" +
>   "COUNT(1)\n" +
>   "FROM\n" +
>   "`source_view`\n" +
>   "WHERE\n" +
>   " `order_type` = 33\n" +
>   "GROUP BY\n" +
>   "TUMBLE(`ts`, INTERVAL '5' SECOND)\n");
> tableEnv.sqlUpdate("INSERT INTO sink \n" +
>   "SELECT\n" +
>   "COUNT(1)\n" +
>   "FROM\n" +
>   "`source_view`\n" +
>   "WHERE\n" +
>   " `order_type` = 34\n" +
>   "GROUP BY\n" +
>   "TUMBLE(`ts`, INTERVAL '5' SECOND)\n");
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-18070) Time attribute been materialized after sub graph optimize

2020-06-04 Thread Benchao Li (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18070?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17126322#comment-17126322
 ] 

Benchao Li commented on FLINK-18070:


As you can see, we only convert the time attributes from the input of sink. 
Actually, the sink node can have it's time attributes too, that's why this line 
exists:

```java

// the LogicalSink is converted in RelTimeIndicatorConverter before
if (rootRel.isInstanceOf[LogicalLegacySink] || 
!needFinalTimeIndicatorConversion) {
 return convertedRoot
}

```

I think the right way to fix this is to leverage the 
`needFinalTimeIndicatorConversion` variable in 
`RelTimeIndicatorConverter.convert`, it's always true for now. We need to 
differentiate the case whether current optimization block is a final block or 
an intermediate logical block.

 

> Time attribute been materialized after sub graph optimize
> -
>
> Key: FLINK-18070
> URL: https://issues.apache.org/jira/browse/FLINK-18070
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.10.0
>Reporter: YufeiLiu
>Assignee: YufeiLiu
>Priority: Major
> Fix For: 1.11.0
>
>
> Hi, I want to use window aggregate after create temporary, and has multiple 
> sinks. But throw exception:
> {code:java}
> java.lang.AssertionError: type mismatch:
> ref:
> TIME ATTRIBUTE(PROCTIME) NOT NULL
> input:
> TIMESTAMP(3) NOT NULL
> {code}
> I look into the optimizer logic, there is comment at 
> {{CommonSubGraphBasedOptimizer}}:
> "1. In general, for multi-sinks users tend to use VIEW which is a natural 
> common sub-graph."
> After sub graph optimize, time attribute from source have been convert to 
> basic TIMESTAMP type according to {{FlinkRelTimeIndicatorProgram}}. But my 
> create view sql is simple query, I think didn't need to materialized time 
> attribute in theory.
> Here is my code:
> {code:java}
> // connector.type COLLECTION is for debug use
> tableEnv.sqlUpdate("CREATE TABLE source (\n" +
>   "`ts` AS PROCTIME(),\n" +
>   "`order_type` INT\n" +
>   ") WITH (\n" +
>   "'connector.type' = 'COLLECTION',\n" +
>   "'format.type' = 'json'\n" +
>   ")\n");
> tableEnv.createTemporaryView("source_view", tableEnv.sqlQuery("SELECT * FROM 
> source"));
> tableEnv.sqlUpdate("CREATE TABLE sink (\n" +
>   "`result` BIGINT\n" +
>   ") WITH (\n" +
>   "'connector.type' = 'COLLECTION',\n" +
>   "'format.type' = 'json'\n" +
>   ")\n");
> tableEnv.sqlUpdate("INSERT INTO sink \n" +
>   "SELECT\n" +
>   "COUNT(1)\n" +
>   "FROM\n" +
>   "`source_view`\n" +
>   "WHERE\n" +
>   " `order_type` = 33\n" +
>   "GROUP BY\n" +
>   "TUMBLE(`ts`, INTERVAL '5' SECOND)\n");
> tableEnv.sqlUpdate("INSERT INTO sink \n" +
>   "SELECT\n" +
>   "COUNT(1)\n" +
>   "FROM\n" +
>   "`source_view`\n" +
>   "WHERE\n" +
>   " `order_type` = 34\n" +
>   "GROUP BY\n" +
>   "TUMBLE(`ts`, INTERVAL '5' SECOND)\n");
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-18070) Time attribute been materialized after sub graph optimize

2020-06-04 Thread YufeiLiu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18070?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17125952#comment-17125952
 ] 

YufeiLiu commented on FLINK-18070:
--

[~libenchao] 
process time have already been materialized at begining of convert.
{code:java}
val converter = new RelTimeIndicatorConverter(rexBuilder)
val convertedRoot = rootRel.accept(converter)
{code}


> Time attribute been materialized after sub graph optimize
> -
>
> Key: FLINK-18070
> URL: https://issues.apache.org/jira/browse/FLINK-18070
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.10.0
>Reporter: YufeiLiu
>Assignee: YufeiLiu
>Priority: Major
> Fix For: 1.11.0
>
>
> Hi, I want to use window aggregate after create temporary, and has multiple 
> sinks. But throw exception:
> {code:java}
> java.lang.AssertionError: type mismatch:
> ref:
> TIME ATTRIBUTE(PROCTIME) NOT NULL
> input:
> TIMESTAMP(3) NOT NULL
> {code}
> I look into the optimizer logic, there is comment at 
> {{CommonSubGraphBasedOptimizer}}:
> "1. In general, for multi-sinks users tend to use VIEW which is a natural 
> common sub-graph."
> After sub graph optimize, time attribute from source have been convert to 
> basic TIMESTAMP type according to {{FlinkRelTimeIndicatorProgram}}. But my 
> create view sql is simple query, I think didn't need to materialized time 
> attribute in theory.
> Here is my code:
> {code:java}
> // connector.type COLLECTION is for debug use
> tableEnv.sqlUpdate("CREATE TABLE source (\n" +
>   "`ts` AS PROCTIME(),\n" +
>   "`order_type` INT\n" +
>   ") WITH (\n" +
>   "'connector.type' = 'COLLECTION',\n" +
>   "'format.type' = 'json'\n" +
>   ")\n");
> tableEnv.createTemporaryView("source_view", tableEnv.sqlQuery("SELECT * FROM 
> source"));
> tableEnv.sqlUpdate("CREATE TABLE sink (\n" +
>   "`result` BIGINT\n" +
>   ") WITH (\n" +
>   "'connector.type' = 'COLLECTION',\n" +
>   "'format.type' = 'json'\n" +
>   ")\n");
> tableEnv.sqlUpdate("INSERT INTO sink \n" +
>   "SELECT\n" +
>   "COUNT(1)\n" +
>   "FROM\n" +
>   "`source_view`\n" +
>   "WHERE\n" +
>   " `order_type` = 33\n" +
>   "GROUP BY\n" +
>   "TUMBLE(`ts`, INTERVAL '5' SECOND)\n");
> tableEnv.sqlUpdate("INSERT INTO sink \n" +
>   "SELECT\n" +
>   "COUNT(1)\n" +
>   "FROM\n" +
>   "`source_view`\n" +
>   "WHERE\n" +
>   " `order_type` = 34\n" +
>   "GROUP BY\n" +
>   "TUMBLE(`ts`, INTERVAL '5' SECOND)\n");
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-18070) Time attribute been materialized after sub graph optimize

2020-06-04 Thread Benchao Li (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18070?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17125873#comment-17125873
 ] 

Benchao Li commented on FLINK-18070:


[~liuyufei] we need to materialize the proctime time if we need it in sink, 
that's why it exists. 

> Time attribute been materialized after sub graph optimize
> -
>
> Key: FLINK-18070
> URL: https://issues.apache.org/jira/browse/FLINK-18070
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.10.0
>Reporter: YufeiLiu
>Assignee: YufeiLiu
>Priority: Major
> Fix For: 1.11.0
>
>
> Hi, I want to use window aggregate after create temporary, and has multiple 
> sinks. But throw exception:
> {code:java}
> java.lang.AssertionError: type mismatch:
> ref:
> TIME ATTRIBUTE(PROCTIME) NOT NULL
> input:
> TIMESTAMP(3) NOT NULL
> {code}
> I look into the optimizer logic, there is comment at 
> {{CommonSubGraphBasedOptimizer}}:
> "1. In general, for multi-sinks users tend to use VIEW which is a natural 
> common sub-graph."
> After sub graph optimize, time attribute from source have been convert to 
> basic TIMESTAMP type according to {{FlinkRelTimeIndicatorProgram}}. But my 
> create view sql is simple query, I think didn't need to materialized time 
> attribute in theory.
> Here is my code:
> {code:java}
> // connector.type COLLECTION is for debug use
> tableEnv.sqlUpdate("CREATE TABLE source (\n" +
>   "`ts` AS PROCTIME(),\n" +
>   "`order_type` INT\n" +
>   ") WITH (\n" +
>   "'connector.type' = 'COLLECTION',\n" +
>   "'format.type' = 'json'\n" +
>   ")\n");
> tableEnv.createTemporaryView("source_view", tableEnv.sqlQuery("SELECT * FROM 
> source"));
> tableEnv.sqlUpdate("CREATE TABLE sink (\n" +
>   "`result` BIGINT\n" +
>   ") WITH (\n" +
>   "'connector.type' = 'COLLECTION',\n" +
>   "'format.type' = 'json'\n" +
>   ")\n");
> tableEnv.sqlUpdate("INSERT INTO sink \n" +
>   "SELECT\n" +
>   "COUNT(1)\n" +
>   "FROM\n" +
>   "`source_view`\n" +
>   "WHERE\n" +
>   " `order_type` = 33\n" +
>   "GROUP BY\n" +
>   "TUMBLE(`ts`, INTERVAL '5' SECOND)\n");
> tableEnv.sqlUpdate("INSERT INTO sink \n" +
>   "SELECT\n" +
>   "COUNT(1)\n" +
>   "FROM\n" +
>   "`source_view`\n" +
>   "WHERE\n" +
>   " `order_type` = 34\n" +
>   "GROUP BY\n" +
>   "TUMBLE(`ts`, INTERVAL '5' SECOND)\n");
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-18070) Time attribute been materialized after sub graph optimize

2020-06-04 Thread YufeiLiu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18070?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17125587#comment-17125587
 ] 

YufeiLiu commented on FLINK-18070:
--

[~libenchao] I feel uncertain about why we need "_materialize remaining 
proctime indicators_" in {{RelTimeIndicatorConverter}}, I only change 
{{needFinalTimeIndicatorConversion}} to false skip follow materialize convert, 
seems the problem sloved and didn't see other affects.

> Time attribute been materialized after sub graph optimize
> -
>
> Key: FLINK-18070
> URL: https://issues.apache.org/jira/browse/FLINK-18070
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.10.0
>Reporter: YufeiLiu
>Assignee: YufeiLiu
>Priority: Major
> Fix For: 1.11.0
>
>
> Hi, I want to use window aggregate after create temporary, and has multiple 
> sinks. But throw exception:
> {code:java}
> java.lang.AssertionError: type mismatch:
> ref:
> TIME ATTRIBUTE(PROCTIME) NOT NULL
> input:
> TIMESTAMP(3) NOT NULL
> {code}
> I look into the optimizer logic, there is comment at 
> {{CommonSubGraphBasedOptimizer}}:
> "1. In general, for multi-sinks users tend to use VIEW which is a natural 
> common sub-graph."
> After sub graph optimize, time attribute from source have been convert to 
> basic TIMESTAMP type according to {{FlinkRelTimeIndicatorProgram}}. But my 
> create view sql is simple query, I think didn't need to materialized time 
> attribute in theory.
> Here is my code:
> {code:java}
> // connector.type COLLECTION is for debug use
> tableEnv.sqlUpdate("CREATE TABLE source (\n" +
>   "`ts` AS PROCTIME(),\n" +
>   "`order_type` INT\n" +
>   ") WITH (\n" +
>   "'connector.type' = 'COLLECTION',\n" +
>   "'format.type' = 'json'\n" +
>   ")\n");
> tableEnv.createTemporaryView("source_view", tableEnv.sqlQuery("SELECT * FROM 
> source"));
> tableEnv.sqlUpdate("CREATE TABLE sink (\n" +
>   "`result` BIGINT\n" +
>   ") WITH (\n" +
>   "'connector.type' = 'COLLECTION',\n" +
>   "'format.type' = 'json'\n" +
>   ")\n");
> tableEnv.sqlUpdate("INSERT INTO sink \n" +
>   "SELECT\n" +
>   "COUNT(1)\n" +
>   "FROM\n" +
>   "`source_view`\n" +
>   "WHERE\n" +
>   " `order_type` = 33\n" +
>   "GROUP BY\n" +
>   "TUMBLE(`ts`, INTERVAL '5' SECOND)\n");
> tableEnv.sqlUpdate("INSERT INTO sink \n" +
>   "SELECT\n" +
>   "COUNT(1)\n" +
>   "FROM\n" +
>   "`source_view`\n" +
>   "WHERE\n" +
>   " `order_type` = 34\n" +
>   "GROUP BY\n" +
>   "TUMBLE(`ts`, INTERVAL '5' SECOND)\n");
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-18070) Time attribute been materialized after sub graph optimize

2020-06-03 Thread YufeiLiu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18070?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17124894#comment-17124894
 ] 

YufeiLiu commented on FLINK-18070:
--

[~libenchao] Sure, I'd love to contribute. Please assign this issue to me.

> Time attribute been materialized after sub graph optimize
> -
>
> Key: FLINK-18070
> URL: https://issues.apache.org/jira/browse/FLINK-18070
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.10.0
>Reporter: YufeiLiu
>Priority: Major
> Fix For: 1.11.0
>
>
> Hi, I want to use window aggregate after create temporary, and has multiple 
> sinks. But throw exception:
> {code:java}
> java.lang.AssertionError: type mismatch:
> ref:
> TIME ATTRIBUTE(PROCTIME) NOT NULL
> input:
> TIMESTAMP(3) NOT NULL
> {code}
> I look into the optimizer logic, there is comment at 
> {{CommonSubGraphBasedOptimizer}}:
> "1. In general, for multi-sinks users tend to use VIEW which is a natural 
> common sub-graph."
> After sub graph optimize, time attribute from source have been convert to 
> basic TIMESTAMP type according to {{FlinkRelTimeIndicatorProgram}}. But my 
> create view sql is simple query, I think didn't need to materialized time 
> attribute in theory.
> Here is my code:
> {code:java}
> // connector.type COLLECTION is for debug use
> tableEnv.sqlUpdate("CREATE TABLE source (\n" +
>   "`ts` AS PROCTIME(),\n" +
>   "`order_type` INT\n" +
>   ") WITH (\n" +
>   "'connector.type' = 'COLLECTION',\n" +
>   "'format.type' = 'json'\n" +
>   ")\n");
> tableEnv.createTemporaryView("source_view", tableEnv.sqlQuery("SELECT * FROM 
> source"));
> tableEnv.sqlUpdate("CREATE TABLE sink (\n" +
>   "`result` BIGINT\n" +
>   ") WITH (\n" +
>   "'connector.type' = 'COLLECTION',\n" +
>   "'format.type' = 'json'\n" +
>   ")\n");
> tableEnv.sqlUpdate("INSERT INTO sink \n" +
>   "SELECT\n" +
>   "COUNT(1)\n" +
>   "FROM\n" +
>   "`source_view`\n" +
>   "WHERE\n" +
>   " `order_type` = 33\n" +
>   "GROUP BY\n" +
>   "TUMBLE(`ts`, INTERVAL '5' SECOND)\n");
> tableEnv.sqlUpdate("INSERT INTO sink \n" +
>   "SELECT\n" +
>   "COUNT(1)\n" +
>   "FROM\n" +
>   "`source_view`\n" +
>   "WHERE\n" +
>   " `order_type` = 34\n" +
>   "GROUP BY\n" +
>   "TUMBLE(`ts`, INTERVAL '5' SECOND)\n");
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-18070) Time attribute been materialized after sub graph optimize

2020-06-03 Thread Benchao Li (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18070?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17124763#comment-17124763
 ] 

Benchao Li commented on FLINK-18070:


[~liuyufei] Do you want to contribute to this issue? If you don't have time, I 
can give a hand here.

> Time attribute been materialized after sub graph optimize
> -
>
> Key: FLINK-18070
> URL: https://issues.apache.org/jira/browse/FLINK-18070
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.10.0
>Reporter: YufeiLiu
>Priority: Major
> Fix For: 1.11.0
>
>
> Hi, I want to use window aggregate after create temporary, and has multiple 
> sinks. But throw exception:
> {code:java}
> java.lang.AssertionError: type mismatch:
> ref:
> TIME ATTRIBUTE(PROCTIME) NOT NULL
> input:
> TIMESTAMP(3) NOT NULL
> {code}
> I look into the optimizer logic, there is comment at 
> {{CommonSubGraphBasedOptimizer}}:
> "1. In general, for multi-sinks users tend to use VIEW which is a natural 
> common sub-graph."
> After sub graph optimize, time attribute from source have been convert to 
> basic TIMESTAMP type according to {{FlinkRelTimeIndicatorProgram}}. But my 
> create view sql is simple query, I think didn't need to materialized time 
> attribute in theory.
> Here is my code:
> {code:java}
> // connector.type COLLECTION is for debug use
> tableEnv.sqlUpdate("CREATE TABLE source (\n" +
>   "`ts` AS PROCTIME(),\n" +
>   "`order_type` INT\n" +
>   ") WITH (\n" +
>   "'connector.type' = 'COLLECTION',\n" +
>   "'format.type' = 'json'\n" +
>   ")\n");
> tableEnv.createTemporaryView("source_view", tableEnv.sqlQuery("SELECT * FROM 
> source"));
> tableEnv.sqlUpdate("CREATE TABLE sink (\n" +
>   "`result` BIGINT\n" +
>   ") WITH (\n" +
>   "'connector.type' = 'COLLECTION',\n" +
>   "'format.type' = 'json'\n" +
>   ")\n");
> tableEnv.sqlUpdate("INSERT INTO sink \n" +
>   "SELECT\n" +
>   "COUNT(1)\n" +
>   "FROM\n" +
>   "`source_view`\n" +
>   "WHERE\n" +
>   " `order_type` = 33\n" +
>   "GROUP BY\n" +
>   "TUMBLE(`ts`, INTERVAL '5' SECOND)\n");
> tableEnv.sqlUpdate("INSERT INTO sink \n" +
>   "SELECT\n" +
>   "COUNT(1)\n" +
>   "FROM\n" +
>   "`source_view`\n" +
>   "WHERE\n" +
>   " `order_type` = 34\n" +
>   "GROUP BY\n" +
>   "TUMBLE(`ts`, INTERVAL '5' SECOND)\n");
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-18070) Time attribute been materialized after sub graph optimize

2020-06-03 Thread Jark Wu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18070?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17124735#comment-17124735
 ] 

Jark Wu commented on FLINK-18070:
-

Yes, I think this is a bug in CommonSubGraphBasedOptimizer.

> Time attribute been materialized after sub graph optimize
> -
>
> Key: FLINK-18070
> URL: https://issues.apache.org/jira/browse/FLINK-18070
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.10.0
>Reporter: YufeiLiu
>Priority: Major
> Fix For: 1.11.0
>
>
> Hi, I want to use window aggregate after create temporary, and has multiple 
> sinks. But throw exception:
> {code:java}
> java.lang.AssertionError: type mismatch:
> ref:
> TIME ATTRIBUTE(PROCTIME) NOT NULL
> input:
> TIMESTAMP(3) NOT NULL
> {code}
> I look into the optimizer logic, there is comment at 
> {{CommonSubGraphBasedOptimizer}}:
> "1. In general, for multi-sinks users tend to use VIEW which is a natural 
> common sub-graph."
> After sub graph optimize, time attribute from source have been convert to 
> basic TIMESTAMP type according to {{FlinkRelTimeIndicatorProgram}}. But my 
> create view sql is simple query, I think didn't need to materialized time 
> attribute in theory.
> Here is my code:
> {code:java}
> // connector.type COLLECTION is for debug use
> tableEnv.sqlUpdate("CREATE TABLE source (\n" +
>   "`ts` AS PROCTIME(),\n" +
>   "`order_type` INT\n" +
>   ") WITH (\n" +
>   "'connector.type' = 'COLLECTION',\n" +
>   "'format.type' = 'json'\n" +
>   ")\n");
> tableEnv.createTemporaryView("source_view", tableEnv.sqlQuery("SELECT * FROM 
> source"));
> tableEnv.sqlUpdate("CREATE TABLE sink (\n" +
>   "`result` BIGINT\n" +
>   ") WITH (\n" +
>   "'connector.type' = 'COLLECTION',\n" +
>   "'format.type' = 'json'\n" +
>   ")\n");
> tableEnv.sqlUpdate("INSERT INTO sink \n" +
>   "SELECT\n" +
>   "COUNT(1)\n" +
>   "FROM\n" +
>   "`source_view`\n" +
>   "WHERE\n" +
>   " `order_type` = 33\n" +
>   "GROUP BY\n" +
>   "TUMBLE(`ts`, INTERVAL '5' SECOND)\n");
> tableEnv.sqlUpdate("INSERT INTO sink \n" +
>   "SELECT\n" +
>   "COUNT(1)\n" +
>   "FROM\n" +
>   "`source_view`\n" +
>   "WHERE\n" +
>   " `order_type` = 34\n" +
>   "GROUP BY\n" +
>   "TUMBLE(`ts`, INTERVAL '5' SECOND)\n");
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-18070) Time attribute been materialized after sub graph optimize

2020-06-03 Thread Benchao Li (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18070?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17124713#comment-17124713
 ] 

Benchao Li commented on FLINK-18070:


[~liuyufei] Thanks for reporting this issue, I also verified this bug in master 
branch.

And I've took a deeper look into it, it's because we materialize the `proctime` 
field in middle block. CC [~jark]

Actually we suffers from this bug in our internal 1.9 branch.

 

> Time attribute been materialized after sub graph optimize
> -
>
> Key: FLINK-18070
> URL: https://issues.apache.org/jira/browse/FLINK-18070
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.10.0
>Reporter: YufeiLiu
>Priority: Major
> Fix For: 1.11.0
>
>
> Hi, I want to use window aggregate after create temporary, and has multiple 
> sinks. But throw exception:
> {code:java}
> java.lang.AssertionError: type mismatch:
> ref:
> TIME ATTRIBUTE(PROCTIME) NOT NULL
> input:
> TIMESTAMP(3) NOT NULL
> {code}
> I look into the optimizer logic, there is comment at 
> {{CommonSubGraphBasedOptimizer}}:
> "1. In general, for multi-sinks users tend to use VIEW which is a natural 
> common sub-graph."
> After sub graph optimize, time attribute from source have been convert to 
> basic TIMESTAMP type according to {{FlinkRelTimeIndicatorProgram}}. But my 
> create view sql is simple query, I think didn't need to materialized time 
> attribute in theory.
> Here is my code:
> {code:java}
> // connector.type COLLECTION is for debug use
> tableEnv.sqlUpdate("CREATE TABLE source (\n" +
>   "`ts` AS PROCTIME(),\n" +
>   "`order_type` INT\n" +
>   ") WITH (\n" +
>   "'connector.type' = 'COLLECTION',\n" +
>   "'format.type' = 'json'\n" +
>   ")\n");
> tableEnv.createTemporaryView("source_view", tableEnv.sqlQuery("SELECT * FROM 
> source"));
> tableEnv.sqlUpdate("CREATE TABLE sink (\n" +
>   "`result` BIGINT\n" +
>   ") WITH (\n" +
>   "'connector.type' = 'COLLECTION',\n" +
>   "'format.type' = 'json'\n" +
>   ")\n");
> tableEnv.sqlUpdate("INSERT INTO sink \n" +
>   "SELECT\n" +
>   "COUNT(1)\n" +
>   "FROM\n" +
>   "`source_view`\n" +
>   "WHERE\n" +
>   " `order_type` = 33\n" +
>   "GROUP BY\n" +
>   "TUMBLE(`ts`, INTERVAL '5' SECOND)\n");
> tableEnv.sqlUpdate("INSERT INTO sink \n" +
>   "SELECT\n" +
>   "COUNT(1)\n" +
>   "FROM\n" +
>   "`source_view`\n" +
>   "WHERE\n" +
>   " `order_type` = 34\n" +
>   "GROUP BY\n" +
>   "TUMBLE(`ts`, INTERVAL '5' SECOND)\n");
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)