[GitHub] [flink] danny0405 commented on a change in pull request #10123: [FLINK-14665][table-planner-blink] Support computed column for create…

2019-11-11 Thread GitBox
danny0405 commented on a change in pull request #10123: 
[FLINK-14665][table-planner-blink] Support computed column for create…
URL: https://github.com/apache/flink/pull/10123#discussion_r344613038
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/SinkTest.xml
 ##
 @@ -43,19 +43,17 @@ Sink(name=[`default_catalog`.`default_database`.`sink`], 
fields=[a])
   

[GitHub] [flink] danny0405 commented on a change in pull request #10123: [FLINK-14665][table-planner-blink] Support computed column for create…

2019-11-11 Thread GitBox
danny0405 commented on a change in pull request #10123: 
[FLINK-14665][table-planner-blink] Support computed column for create…
URL: https://github.com/apache/flink/pull/10123#discussion_r344613879
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/DagOptimizationTest.xml
 ##
 @@ -222,7 +211,7 @@ 
Sink(name=[`default_catalog`.`default_database`.`appendSink1`], fields=[a, b1],
 +- Reused(reference_id=[2])
 
 Sink(name=[`default_catalog`.`default_database`.`appendSink2`], fields=[a, 
b1], updateAsRetraction=[false], accMode=[Acc])
-+- Union(all=[true], union=[a, b], updateAsRetraction=[false], accMode=[Acc])
++- Union(all=[true], union=[a1, b1], updateAsRetraction=[false], accMode=[Acc])
 
 Review comment:
   The query is `SELECT a as a1, b as b1 FROM MyTable WHERE a <= 10` so the 
name is actually correct. The original name is wrong.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] danny0405 commented on a change in pull request #10123: [FLINK-14665][table-planner-blink] Support computed column for create…

2019-11-11 Thread GitBox
danny0405 commented on a change in pull request #10123: 
[FLINK-14665][table-planner-blink] Support computed column for create…
URL: https://github.com/apache/flink/pull/10123#discussion_r344614320
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/DagOptimizationTest.xml
 ##
 @@ -779,17 +758,15 @@ 
Sink(name=[`default_catalog`.`default_database`.`appendSink`], fields=[a1, b, c1
 LogicalSink(name=[`default_catalog`.`default_database`.`appendSink`], 
fields=[a1, b1])
 +- LogicalUnion(all=[true])
:- LogicalProject(a1=[$0], b1=[$1])
-   :  +- LogicalProject(a1=[$0], b1=[$1])
-   : +- LogicalFilter(condition=[<=($0, 10)])
-   :+- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable, source: [TestTableSource(a, b, c)]]])
-   +- LogicalProject(a=[$0], b1=[$1])
-  +- LogicalProject(a=[$2], b1=[$1])
- +- LogicalFilter(condition=[=($2, $0)])
-+- LogicalJoin(condition=[true], joinType=[inner])
-   :- LogicalProject(a1=[$0], b1=[$1])
-   :  +- LogicalFilter(condition=[<=($0, 10)])
-   : +- LogicalTableScan(table=[[default_catalog, 
default_database, MyTable, source: [TestTableSource(a, b, c)]]])
-   +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable2]])
+   :  +- LogicalFilter(condition=[<=($0, 10)])
+   : +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable, source: [TestTableSource(a, b, c)]]])
+   +- LogicalProject(a=[$2], b1=[$1])
+  +- LogicalFilter(condition=[=($2, $0)])
+ +- LogicalJoin(condition=[true], joinType=[inner])
 
 Review comment:
   The plan was totally equivalent.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] danny0405 commented on a change in pull request #10123: [FLINK-14665][table-planner-blink] Support computed column for create…

2019-11-11 Thread GitBox
danny0405 commented on a change in pull request #10123: 
[FLINK-14665][table-planner-blink] Support computed column for create…
URL: https://github.com/apache/flink/pull/10123#discussion_r344612353
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/api/stream/ExplainTest.xml
 ##
 @@ -785,47 +778,44 @@ Union(all=[true], union=[a, b, c])
 LogicalSink(name=[`default_catalog`.`default_database`.`appendSink1`], 
fields=[a, b])
 +- LogicalProject(id1=[$0], EXPR$1=[$2])
+- LogicalAggregate(group=[{0, 1}], EXPR$1=[LISTAGG($2, $3)])
-  +- LogicalProject(id1=[$0], $f1=[TUMBLE($1, 8000:INTERVAL SECOND)], 
text=[$2], $f3=[_UTF-16LE'#'])
- +- LogicalProject(id1=[$0], ts=[$2], text=[$1])
-+- LogicalFilter(condition=[AND(=($0, $3), >($2, -($7, 
30:INTERVAL MINUTE)), <($2, +($7, 18:INTERVAL MINUTE)))])
-   +- LogicalJoin(condition=[true], joinType=[inner])
-  :- LogicalWatermarkAssigner(fields=[id1, text, rowtime], 
rowtimeField=[rowtime], watermarkDelay=[0])
-  :  +- LogicalTableScan(table=[[default_catalog, 
default_database, T1]])
-  +- LogicalWatermarkAssigner(fields=[id2, cnt, name, goods, 
rowtime], rowtimeField=[rowtime], watermarkDelay=[0])
- +- LogicalTableScan(table=[[default_catalog, 
default_database, T2]])
+  +- LogicalProject(id1=[$0], $f1=[TUMBLE($2, 8000:INTERVAL SECOND)], 
text=[$1], $f3=[_UTF-16LE'#'])
+ +- LogicalFilter(condition=[AND(=($0, $3), >($2, -($7, 
30:INTERVAL MINUTE)), <($2, +($7, 18:INTERVAL MINUTE)))])
++- LogicalJoin(condition=[true], joinType=[inner])
+   :- LogicalWatermarkAssigner(fields=[id1, text, rowtime], 
rowtimeField=[rowtime], watermarkDelay=[0])
+   :  +- LogicalTableScan(table=[[default_catalog, 
default_database, T1]])
+   +- LogicalWatermarkAssigner(fields=[id2, cnt, name, goods, 
rowtime], rowtimeField=[rowtime], watermarkDelay=[0])
+  +- LogicalTableScan(table=[[default_catalog, 
default_database, T2]])
 
 LogicalSink(name=[`default_catalog`.`default_database`.`appendSink2`], 
fields=[a, b])
 +- LogicalProject(id1=[$0], EXPR$1=[$2])
+- LogicalAggregate(group=[{0, 1}], EXPR$1=[LISTAGG($2, $3)])
-  +- LogicalProject(id1=[$0], $f1=[HOP($1, 12000:INTERVAL SECOND, 
6000:INTERVAL SECOND)], text=[$2], $f3=[_UTF-16LE'*'])
- +- LogicalProject(id1=[$0], ts=[$2], text=[$1])
-+- LogicalFilter(condition=[AND(=($0, $3), >($2, -($7, 
30:INTERVAL MINUTE)), <($2, +($7, 18:INTERVAL MINUTE)))])
-   +- LogicalJoin(condition=[true], joinType=[inner])
-  :- LogicalWatermarkAssigner(fields=[id1, text, rowtime], 
rowtimeField=[rowtime], watermarkDelay=[0])
-  :  +- LogicalTableScan(table=[[default_catalog, 
default_database, T1]])
-  +- LogicalWatermarkAssigner(fields=[id2, cnt, name, goods, 
rowtime], rowtimeField=[rowtime], watermarkDelay=[0])
- +- LogicalTableScan(table=[[default_catalog, 
default_database, T2]])
+  +- LogicalProject(id1=[$0], $f1=[HOP($2, 12000:INTERVAL SECOND, 
6000:INTERVAL SECOND)], text=[$1], $f3=[_UTF-16LE'*'])
+ +- LogicalFilter(condition=[AND(=($0, $3), >($2, -($7, 
30:INTERVAL MINUTE)), <($2, +($7, 18:INTERVAL MINUTE)))])
++- LogicalJoin(condition=[true], joinType=[inner])
+   :- LogicalWatermarkAssigner(fields=[id1, text, rowtime], 
rowtimeField=[rowtime], watermarkDelay=[0])
+   :  +- LogicalTableScan(table=[[default_catalog, 
default_database, T1]])
+   +- LogicalWatermarkAssigner(fields=[id2, cnt, name, goods, 
rowtime], rowtimeField=[rowtime], watermarkDelay=[0])
+  +- LogicalTableScan(table=[[default_catalog, 
default_database, T2]])
 
 == Optimized Logical Plan ==
-Calc(select=[id1, rowtime AS ts, text], reuse_id=[1])
-+- WindowJoin(joinType=[InnerJoin], windowBounds=[isRowTime=true, 
leftLowerBound=-29, leftUpperBound=17, leftTimeIndex=2, 
rightTimeIndex=4], where=[AND(=(id1, id2), >(CAST(rowtime), -(CAST(rowtime0), 
30:INTERVAL MINUTE)), <(CAST(rowtime), +(CAST(rowtime0), 18:INTERVAL 
MINUTE)))], select=[id1, text, rowtime, id2, cnt, name, goods, rowtime0])
-   :- Exchange(distribution=[hash[id1]])
-   :  +- WatermarkAssigner(fields=[id1, text, rowtime], 
rowtimeField=[rowtime], watermarkDelay=[0], miniBatchInterval=[None])
-   : +- DataStreamScan(table=[[default_catalog, default_database, T1]], 
fields=[id1, text, rowtime])
-   +- Exchange(distribution=[hash[id2]])
-  +- WatermarkAssigner(fields=[id2, cnt, name, goods, rowtime], 
rowtimeField=[rowtime], watermarkDelay=[0], miniBatchInterval=[None])
- +- DataStreamScan(table=[[default_catalog, default_database, T2]], 
fields=[id2, cnt, name, goods, rowtime])
+WindowJoin(joinType=[InnerJoin], 

[GitHub] [flink] danny0405 commented on a change in pull request #10123: [FLINK-14665][table-planner-blink] Support computed column for create…

2019-11-11 Thread GitBox
danny0405 commented on a change in pull request #10123: 
[FLINK-14665][table-planner-blink] Support computed column for create…
URL: https://github.com/apache/flink/pull/10123#discussion_r344609071
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/DagOptimizationTest.xml
 ##
 @@ -199,37 +189,35 @@ 
LogicalSink(name=[`default_catalog`.`default_database`.`sink1`], fields=[a, b1])
 LogicalSink(name=[`default_catalog`.`default_database`.`sink2`], fields=[a, 
b1])
 +- LogicalUnion(all=[true])
:- LogicalProject(a1=[$0], b1=[$1])
-   :  +- LogicalProject(a1=[$0], b1=[$1])
-   : +- LogicalFilter(condition=[<=($0, 10)])
-   :+- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable, source: [TestTableSource(a, b, c)]]])
-   +- LogicalProject(a=[$0], b1=[$1])
-  +- LogicalProject(a=[$2], b1=[$1])
- +- LogicalFilter(condition=[=($2, $0)])
-+- LogicalJoin(condition=[true], joinType=[inner])
-   :- LogicalProject(a1=[$0], b1=[$1])
-   :  +- LogicalFilter(condition=[<=($0, 10)])
-   : +- LogicalTableScan(table=[[default_catalog, 
default_database, MyTable, source: [TestTableSource(a, b, c)]]])
-   +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable2, source: [TestTableSource(a, b, c, d, e)]]])
+   :  +- LogicalFilter(condition=[<=($0, 10)])
+   : +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable, source: [TestTableSource(a, b, c)]]])
+   +- LogicalProject(a=[$2], b1=[$1])
+  +- LogicalFilter(condition=[=($2, $0)])
+ +- LogicalJoin(condition=[true], joinType=[inner])
+:- LogicalProject(a1=[$0], b1=[$1])
+:  +- LogicalFilter(condition=[<=($0, 10)])
+: +- LogicalTableScan(table=[[default_catalog, 
default_database, MyTable, source: [TestTableSource(a, b, c)]]])
++- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable2, source: [TestTableSource(a, b, c, d, e)]]])
 ]]>
 
 
   

[GitHub] [flink] danny0405 commented on a change in pull request #10123: [FLINK-14665][table-planner-blink] Support computed column for create…

2019-11-11 Thread GitBox
danny0405 commented on a change in pull request #10123: 
[FLINK-14665][table-planner-blink] Support computed column for create…
URL: https://github.com/apache/flink/pull/10123#discussion_r344612583
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/DagOptimizationTest.xml
 ##
 @@ -89,19 +85,17 @@ Sink(name=[`default_catalog`.`default_database`.`sink3`], 
fields=[b, cnt])
   

[GitHub] [flink] danny0405 commented on a change in pull request #10123: [FLINK-14665][table-planner-blink] Support computed column for create…

2019-11-11 Thread GitBox
danny0405 commented on a change in pull request #10123: 
[FLINK-14665][table-planner-blink] Support computed column for create…
URL: https://github.com/apache/flink/pull/10123#discussion_r344613503
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/DagOptimizationTest.xml
 ##
 @@ -128,44 +122,40 @@ 
Sink(name=[`default_catalog`.`default_database`.`appendSink2`], fields=[total_mi
 LogicalSink(name=[`default_catalog`.`default_database`.`appendSink1`], 
fields=[a, b1])
 +- LogicalUnion(all=[true])
 
 Review comment:
   The plan was totally equivalent.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] danny0405 commented on a change in pull request #10123: [FLINK-14665][table-planner-blink] Support computed column for create…

2019-11-11 Thread GitBox
danny0405 commented on a change in pull request #10123: 
[FLINK-14665][table-planner-blink] Support computed column for create…
URL: https://github.com/apache/flink/pull/10123#discussion_r344612950
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/DagOptimizationTest.xml
 ##
 @@ -757,17 +741,15 @@ 
Sink(name=[`default_catalog`.`default_database`.`appendSink`], fields=[a1, b, c1
 LogicalSink(name=[`default_catalog`.`default_database`.`appendSink`], 
fields=[a1, b1])
 +- LogicalUnion(all=[true])
:- LogicalProject(a1=[$0], b1=[$1])
-   :  +- LogicalProject(a1=[$0], b1=[$1])
-   : +- LogicalFilter(condition=[<=($0, 10)])
-   :+- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable, source: [TestTableSource(a, b, c)]]])
-   +- LogicalProject(a=[$0], b1=[$1])
-  +- LogicalProject(a=[$2], b1=[$1])
- +- LogicalFilter(condition=[=($2, $0)])
-+- LogicalJoin(condition=[true], joinType=[inner])
-   :- LogicalProject(a1=[$0], b1=[$1])
-   :  +- LogicalFilter(condition=[<=($0, 10)])
-   : +- LogicalTableScan(table=[[default_catalog, 
default_database, MyTable, source: [TestTableSource(a, b, c)]]])
-   +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable2]])
+   :  +- LogicalFilter(condition=[<=($0, 10)])
+   : +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable, source: [TestTableSource(a, b, c)]]])
+   +- LogicalProject(a=[$2], b1=[$1])
+  +- LogicalFilter(condition=[=($2, $0)])
 
 Review comment:
   The plan was totally equivalent.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] danny0405 commented on a change in pull request #10123: [FLINK-14665][table-planner-blink] Support computed column for create…

2019-11-11 Thread GitBox
danny0405 commented on a change in pull request #10123: 
[FLINK-14665][table-planner-blink] Support computed column for create…
URL: https://github.com/apache/flink/pull/10123#discussion_r344613145
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/DagOptimizationTest.xml
 ##
 @@ -31,13 +31,11 @@ 
LogicalSink(name=[`default_catalog`.`default_database`.`retractSink`], fields=[b
  +- LogicalProject(b=[$1], a=[$0])
 +- LogicalUnion(all=[true])
:- LogicalProject(a=[$0], b=[$1], c=[$2])
-   :  +- LogicalProject(a=[$0], b=[$1], c=[$2])
-   : +- LogicalFilter(condition=[LIKE($2, _UTF-16LE'%hello%')])
-   :+- LogicalTableScan(table=[[default_catalog, 
default_database, MyTable, source: [TestTableSource(a, b, c)]]])
+   :  +- LogicalFilter(condition=[LIKE($2, _UTF-16LE'%hello%')])
+   : +- LogicalTableScan(table=[[default_catalog, 
default_database, MyTable, source: [TestTableSource(a, b, c)]]])
+- LogicalProject(a=[$0], b=[$1], c=[$2])
-  +- LogicalProject(a=[$0], b=[$1], c=[$2])
- +- LogicalFilter(condition=[LIKE($2, _UTF-16LE'%world%')])
 
 Review comment:
   The plan was totally equivalent.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] danny0405 commented on a change in pull request #10123: [FLINK-14665][table-planner-blink] Support computed column for create…

2019-11-11 Thread GitBox
danny0405 commented on a change in pull request #10123: 
[FLINK-14665][table-planner-blink] Support computed column for create…
URL: https://github.com/apache/flink/pull/10123#discussion_r344611623
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/api/stream/ExplainTest.xml
 ##
 @@ -638,47 +638,44 @@ SortLimit(orderBy=[a ASC], offset=[0], fetch=[5], 
updateAsRetraction=[false], ac
 LogicalSink(name=[`default_catalog`.`default_database`.`appendSink1`], 
fields=[a, b])
 +- LogicalProject(id1=[$0], EXPR$1=[$2])
+- LogicalAggregate(group=[{0, 1}], EXPR$1=[LISTAGG($2, $3)])
-  +- LogicalProject(id1=[$0], $f1=[TUMBLE($1, 8000:INTERVAL SECOND)], 
text=[$2], $f3=[_UTF-16LE'#'])
- +- LogicalProject(id1=[$0], ts=[$2], text=[$1])
-+- LogicalFilter(condition=[AND(=($0, $3), >($2, -($7, 
30:INTERVAL MINUTE)), <($2, +($7, 18:INTERVAL MINUTE)))])
-   +- LogicalJoin(condition=[true], joinType=[inner])
-  :- LogicalWatermarkAssigner(fields=[id1, text, rowtime], 
rowtimeField=[rowtime], watermarkDelay=[0])
-  :  +- LogicalTableScan(table=[[default_catalog, 
default_database, T1]])
-  +- LogicalWatermarkAssigner(fields=[id2, cnt, name, goods, 
rowtime], rowtimeField=[rowtime], watermarkDelay=[0])
- +- LogicalTableScan(table=[[default_catalog, 
default_database, T2]])
+  +- LogicalProject(id1=[$0], $f1=[TUMBLE($2, 8000:INTERVAL SECOND)], 
text=[$1], $f3=[_UTF-16LE'#'])
+ +- LogicalFilter(condition=[AND(=($0, $3), >($2, -($7, 
30:INTERVAL MINUTE)), <($2, +($7, 18:INTERVAL MINUTE)))])
++- LogicalJoin(condition=[true], joinType=[inner])
+   :- LogicalWatermarkAssigner(fields=[id1, text, rowtime], 
rowtimeField=[rowtime], watermarkDelay=[0])
+   :  +- LogicalTableScan(table=[[default_catalog, 
default_database, T1]])
+   +- LogicalWatermarkAssigner(fields=[id2, cnt, name, goods, 
rowtime], rowtimeField=[rowtime], watermarkDelay=[0])
+  +- LogicalTableScan(table=[[default_catalog, 
default_database, T2]])
 
 LogicalSink(name=[`default_catalog`.`default_database`.`appendSink2`], 
fields=[a, b])
 +- LogicalProject(id1=[$0], EXPR$1=[$2])
+- LogicalAggregate(group=[{0, 1}], EXPR$1=[LISTAGG($2, $3)])
-  +- LogicalProject(id1=[$0], $f1=[HOP($1, 12000:INTERVAL SECOND, 
6000:INTERVAL SECOND)], text=[$2], $f3=[_UTF-16LE'*'])
- +- LogicalProject(id1=[$0], ts=[$2], text=[$1])
-+- LogicalFilter(condition=[AND(=($0, $3), >($2, -($7, 
30:INTERVAL MINUTE)), <($2, +($7, 18:INTERVAL MINUTE)))])
-   +- LogicalJoin(condition=[true], joinType=[inner])
-  :- LogicalWatermarkAssigner(fields=[id1, text, rowtime], 
rowtimeField=[rowtime], watermarkDelay=[0])
-  :  +- LogicalTableScan(table=[[default_catalog, 
default_database, T1]])
-  +- LogicalWatermarkAssigner(fields=[id2, cnt, name, goods, 
rowtime], rowtimeField=[rowtime], watermarkDelay=[0])
- +- LogicalTableScan(table=[[default_catalog, 
default_database, T2]])
+  +- LogicalProject(id1=[$0], $f1=[HOP($2, 12000:INTERVAL SECOND, 
6000:INTERVAL SECOND)], text=[$1], $f3=[_UTF-16LE'*'])
+ +- LogicalFilter(condition=[AND(=($0, $3), >($2, -($7, 
30:INTERVAL MINUTE)), <($2, +($7, 18:INTERVAL MINUTE)))])
++- LogicalJoin(condition=[true], joinType=[inner])
+   :- LogicalWatermarkAssigner(fields=[id1, text, rowtime], 
rowtimeField=[rowtime], watermarkDelay=[0])
+   :  +- LogicalTableScan(table=[[default_catalog, 
default_database, T1]])
+   +- LogicalWatermarkAssigner(fields=[id2, cnt, name, goods, 
rowtime], rowtimeField=[rowtime], watermarkDelay=[0])
+  +- LogicalTableScan(table=[[default_catalog, 
default_database, T2]])
 
 == Optimized Logical Plan ==
-Calc(select=[id1, rowtime AS ts, text], updateAsRetraction=[true], 
accMode=[Acc], reuse_id=[1]): rowcount = , cumulative cost = {rows, cpu, io, 
network, memory}
-+- WindowJoin(joinType=[InnerJoin], windowBounds=[isRowTime=true, 
leftLowerBound=-29, leftUpperBound=17, leftTimeIndex=2, 
rightTimeIndex=4], where=[AND(=(id1, id2), >(CAST(rowtime), -(CAST(rowtime0), 
30:INTERVAL MINUTE)), <(CAST(rowtime), +(CAST(rowtime0), 18:INTERVAL 
MINUTE)))], select=[id1, text, rowtime, id2, cnt, name, goods, rowtime0], 
updateAsRetraction=[true], accMode=[Acc]): rowcount = , cumulative cost = 
{rows, cpu, io, network, memory}
-   :- Exchange(distribution=[hash[id1]], updateAsRetraction=[true], 
accMode=[Acc]): rowcount = , cumulative cost = {rows, cpu, io, network, memory}
-   :  +- WatermarkAssigner(fields=[id1, text, rowtime], 
rowtimeField=[rowtime], watermarkDelay=[0], miniBatchInterval=[None], 
updateAsRetraction=[true], accMode=[Acc]): rowcount = , cumulative cost = 
{rows, cpu, io, 

[GitHub] [flink] danny0405 commented on a change in pull request #10123: [FLINK-14665][table-planner-blink] Support computed column for create…

2019-11-11 Thread GitBox
danny0405 commented on a change in pull request #10123: 
[FLINK-14665][table-planner-blink] Support computed column for create…
URL: https://github.com/apache/flink/pull/10123#discussion_r344606608
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/DagOptimizationTest.xml
 ##
 @@ -199,37 +189,35 @@ 
LogicalSink(name=[`default_catalog`.`default_database`.`sink1`], fields=[a, b1])
 LogicalSink(name=[`default_catalog`.`default_database`.`sink2`], fields=[a, 
b1])
 +- LogicalUnion(all=[true])
:- LogicalProject(a1=[$0], b1=[$1])
-   :  +- LogicalProject(a1=[$0], b1=[$1])
-   : +- LogicalFilter(condition=[<=($0, 10)])
-   :+- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable, source: [TestTableSource(a, b, c)]]])
-   +- LogicalProject(a=[$0], b1=[$1])
-  +- LogicalProject(a=[$2], b1=[$1])
- +- LogicalFilter(condition=[=($2, $0)])
-+- LogicalJoin(condition=[true], joinType=[inner])
-   :- LogicalProject(a1=[$0], b1=[$1])
-   :  +- LogicalFilter(condition=[<=($0, 10)])
-   : +- LogicalTableScan(table=[[default_catalog, 
default_database, MyTable, source: [TestTableSource(a, b, c)]]])
-   +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable2, source: [TestTableSource(a, b, c, d, e)]]])
+   :  +- LogicalFilter(condition=[<=($0, 10)])
+   : +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable, source: [TestTableSource(a, b, c)]]])
+   +- LogicalProject(a=[$2], b1=[$1])
+  +- LogicalFilter(condition=[=($2, $0)])
+ +- LogicalJoin(condition=[true], joinType=[inner])
+:- LogicalProject(a1=[$0], b1=[$1])
+:  +- LogicalFilter(condition=[<=($0, 10)])
+: +- LogicalTableScan(table=[[default_catalog, 
default_database, MyTable, source: [TestTableSource(a, b, c)]]])
++- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable2, source: [TestTableSource(a, b, c, d, e)]]])
 ]]>
 
 
   

[GitHub] [flink] danny0405 commented on a change in pull request #10123: [FLINK-14665][table-planner-blink] Support computed column for create…

2019-11-11 Thread GitBox
danny0405 commented on a change in pull request #10123: 
[FLINK-14665][table-planner-blink] Support computed column for create…
URL: https://github.com/apache/flink/pull/10123#discussion_r344613465
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/DagOptimizationTest.xml
 ##
 @@ -88,19 +84,17 @@ 
Sink(name=[`default_catalog`.`default_database`.`upsertSink`], fields=[b, cnt],
   

[GitHub] [flink] danny0405 commented on a change in pull request #10123: [FLINK-14665][table-planner-blink] Support computed column for create…

2019-11-11 Thread GitBox
danny0405 commented on a change in pull request #10123: 
[FLINK-14665][table-planner-blink] Support computed column for create…
URL: https://github.com/apache/flink/pull/10123#discussion_r344602758
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/MiniBatchIntervalInferTest.xml
 ##
 @@ -221,76 +221,70 @@ Calc(select=[a, b])
 LogicalSink(name=[`default_catalog`.`default_database`.`appendSink1`], 
fields=[a, b])
 +- LogicalProject(id1=[$1], EXPR$1=[$2])
+- LogicalAggregate(group=[{0, 1}], EXPR$1=[LISTAGG($2, $3)])
-  +- LogicalProject($f0=[HOP($2, 12000:INTERVAL SECOND, 4000:INTERVAL 
SECOND)], id1=[$0], text=[$1], $f3=[_UTF-16LE'*'])
- +- LogicalProject(id1=[$1], text=[$2], ts=[TUMBLE_ROWTIME($0)])
-+- LogicalAggregate(group=[{0, 1}], text=[LISTAGG($2, $3)])
-   +- LogicalProject($f0=[TUMBLE($1, 6000:INTERVAL SECOND)], 
id1=[$0], text=[$2], $f3=[_UTF-16LE'#'])
-  +- LogicalProject(id1=[$0], ts=[$1], text=[$2])
- +- LogicalFilter(condition=[AND(=($0, $3), >($1, -($4, 
30:INTERVAL MINUTE)), <($1, +($4, 18:INTERVAL MINUTE)))])
-+- LogicalJoin(condition=[true], joinType=[inner])
-   :- LogicalWatermarkAssigner(fields=[id1, rowtime, 
text], rowtimeField=[rowtime], watermarkDelay=[0])
-   :  +- LogicalTableScan(table=[[default_catalog, 
default_database, T1]])
-   +- LogicalWatermarkAssigner(fields=[id2, rowtime, 
cnt, name, goods], rowtimeField=[rowtime], watermarkDelay=[0])
-  +- LogicalTableScan(table=[[default_catalog, 
default_database, T2]])
+  +- LogicalProject($f0=[HOP(TUMBLE_ROWTIME($0), 12000:INTERVAL SECOND, 
4000:INTERVAL SECOND)], id1=[$1], text=[$2], $f3=[_UTF-16LE'*'])
+ +- LogicalAggregate(group=[{0, 1}], text=[LISTAGG($2, $3)])
++- LogicalProject($f0=[TUMBLE($1, 6000:INTERVAL SECOND)], 
id1=[$0], text=[$2], $f3=[_UTF-16LE'#'])
+   +- LogicalFilter(condition=[AND(=($0, $3), >($1, -($4, 
30:INTERVAL MINUTE)), <($1, +($4, 18:INTERVAL MINUTE)))])
+  +- LogicalJoin(condition=[true], joinType=[inner])
+ :- LogicalWatermarkAssigner(fields=[id1, rowtime, text], 
rowtimeField=[rowtime], watermarkDelay=[0])
+ :  +- LogicalTableScan(table=[[default_catalog, 
default_database, T1]])
+ +- LogicalWatermarkAssigner(fields=[id2, rowtime, cnt, 
name, goods], rowtimeField=[rowtime], watermarkDelay=[0])
++- LogicalTableScan(table=[[default_catalog, 
default_database, T2]])
 
 LogicalSink(name=[`default_catalog`.`default_database`.`appendSink2`], 
fields=[a, b])
 +- LogicalProject(id1=[$1], EXPR$1=[$2])
+- LogicalAggregate(group=[{0, 1}], EXPR$1=[LISTAGG($2, $3)])
   +- LogicalProject($f0=[TUMBLE($1, 9000:INTERVAL SECOND)], id1=[$0], 
text=[$2], $f3=[_UTF-16LE'-'])
- +- LogicalProject(id1=[$0], ts=[$1], text=[$2])
+ +- LogicalFilter(condition=[AND(=($0, $3), >($1, -($4, 
30:INTERVAL MINUTE)), <($1, +($4, 18:INTERVAL MINUTE)))])
++- LogicalJoin(condition=[true], joinType=[inner])
+   :- LogicalWatermarkAssigner(fields=[id1, rowtime, text], 
rowtimeField=[rowtime], watermarkDelay=[0])
+   :  +- LogicalTableScan(table=[[default_catalog, 
default_database, T1]])
+   +- LogicalWatermarkAssigner(fields=[id2, rowtime, cnt, name, 
goods], rowtimeField=[rowtime], watermarkDelay=[0])
+  +- LogicalTableScan(table=[[default_catalog, 
default_database, T2]])
+
+LogicalSink(name=[`default_catalog`.`default_database`.`appendSink3`], 
fields=[a, b])
++- LogicalAggregate(group=[{0}], EXPR$1=[COUNT($1)])
+   +- LogicalProject(id1=[$1], text=[$2])
+  +- LogicalAggregate(group=[{0, 1}], text=[LISTAGG($2, $3)])
+ +- LogicalProject($f0=[TUMBLE($1, 6000:INTERVAL SECOND)], id1=[$0], 
text=[$2], $f3=[_UTF-16LE'#'])
 +- LogicalFilter(condition=[AND(=($0, $3), >($1, -($4, 
30:INTERVAL MINUTE)), <($1, +($4, 18:INTERVAL MINUTE)))])
+- LogicalJoin(condition=[true], joinType=[inner])
   :- LogicalWatermarkAssigner(fields=[id1, rowtime, text], 
rowtimeField=[rowtime], watermarkDelay=[0])
   :  +- LogicalTableScan(table=[[default_catalog, 
default_database, T1]])
   +- LogicalWatermarkAssigner(fields=[id2, rowtime, cnt, name, 
goods], rowtimeField=[rowtime], watermarkDelay=[0])
  +- LogicalTableScan(table=[[default_catalog, 
default_database, T2]])
 
-LogicalSink(name=[`default_catalog`.`default_database`.`appendSink3`], 
fields=[a, b])
-+- LogicalAggregate(group=[{0}], EXPR$1=[COUNT($1)])
-   +- LogicalProject(id1=[$0], text=[$1])
-  +- LogicalProject(id1=[$1], text=[$2], ts=[TUMBLE_ROWTIME($0)])
-  

[GitHub] [flink] danny0405 commented on a change in pull request #10123: [FLINK-14665][table-planner-blink] Support computed column for create…

2019-11-09 Thread GitBox
danny0405 commented on a change in pull request #10123: 
[FLINK-14665][table-planner-blink] Support computed column for create…
URL: https://github.com/apache/flink/pull/10123#discussion_r344437969
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/PhysicalTableSourceScan.scala
 ##
 @@ -53,8 +52,9 @@ abstract class PhysicalTableSourceScan(
   protected[flink] val tableSource: TableSource[_] = 
tableSourceTable.tableSource
 
   override def deriveRowType(): RelDataType = {
-val flinkTypeFactory = 
cluster.getTypeFactory.asInstanceOf[FlinkTypeFactory]
-tableSourceTable.getRowType(flinkTypeFactory)
+// TableScan row type should always keep same with its
 
 Review comment:
   BTW, we already handle the projected fields in 
TableSourceUtil#getFieldNameType


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] danny0405 commented on a change in pull request #10123: [FLINK-14665][table-planner-blink] Support computed column for create…

2019-11-09 Thread GitBox
danny0405 commented on a change in pull request #10123: 
[FLINK-14665][table-planner-blink] Support computed column for create…
URL: https://github.com/apache/flink/pull/10123#discussion_r344437426
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/PhysicalTableSourceScan.scala
 ##
 @@ -53,8 +52,9 @@ abstract class PhysicalTableSourceScan(
   protected[flink] val tableSource: TableSource[_] = 
tableSourceTable.tableSource
 
   override def deriveRowType(): RelDataType = {
-val flinkTypeFactory = 
cluster.getTypeFactory.asInstanceOf[FlinkTypeFactory]
-tableSourceTable.getRowType(flinkTypeFactory)
+// TableScan row type should always keep same with its
 
 Review comment:
   If the projection was pushed down, we should modify the relOptTable to keep 
the row type same as the projected table source. Because the RelOptTable has 
the ability to convert a table to relational expression, so we should keep it 
equivalence node with the underlying relational expression.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] danny0405 commented on a change in pull request #10123: [FLINK-14665][table-planner-blink] Support computed column for create…

2019-11-09 Thread GitBox
danny0405 commented on a change in pull request #10123: 
[FLINK-14665][table-planner-blink] Support computed column for create…
URL: https://github.com/apache/flink/pull/10123#discussion_r344436701
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/schema/FlinkRelOptTable.scala
 ##
 @@ -221,12 +230,40 @@ class FlinkRelOptTable protected(
 val cluster: RelOptCluster = context.getCluster
 if (table.isInstanceOf[TranslatableTable]) {
   table.asInstanceOf[TranslatableTable].toRel(context, this)
-} else if (Hook.ENABLE_BINDABLE.get(false)) {
 
 Review comment:
   Because Flink never use it, the orgina `ENABLE_BINDABLE` is only used for 
Calcite bindables. The code is wrong.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] danny0405 commented on a change in pull request #10123: [FLINK-14665][table-planner-blink] Support computed column for create…

2019-11-09 Thread GitBox
danny0405 commented on a change in pull request #10123: 
[FLINK-14665][table-planner-blink] Support computed column for create…
URL: https://github.com/apache/flink/pull/10123#discussion_r344436673
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/schema/FlinkRelOptTable.scala
 ##
 @@ -65,6 +64,16 @@ class FlinkRelOptTable protected(
   // Sets a bigger default value to avoid broadcast join.
   val DEFAULT_ROWCOUNT: Double = 1E8
 
+  lazy val columnExprs: Map[String, String] = table match {
+case cct : TableSourceTable[_] =>
 
 Review comment:
   Renamed to tableSourceTable.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] danny0405 commented on a change in pull request #10123: [FLINK-14665][table-planner-blink] Support computed column for create…

2019-11-09 Thread GitBox
danny0405 commented on a change in pull request #10123: 
[FLINK-14665][table-planner-blink] Support computed column for create…
URL: https://github.com/apache/flink/pull/10123#discussion_r344436635
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/MiniBatchIntervalInferTest.xml
 ##
 @@ -221,76 +221,70 @@ Calc(select=[a, b])
 LogicalSink(name=[`default_catalog`.`default_database`.`appendSink1`], 
fields=[a, b])
 +- LogicalProject(id1=[$1], EXPR$1=[$2])
+- LogicalAggregate(group=[{0, 1}], EXPR$1=[LISTAGG($2, $3)])
-  +- LogicalProject($f0=[HOP($2, 12000:INTERVAL SECOND, 4000:INTERVAL 
SECOND)], id1=[$0], text=[$1], $f3=[_UTF-16LE'*'])
- +- LogicalProject(id1=[$1], text=[$2], ts=[TUMBLE_ROWTIME($0)])
-+- LogicalAggregate(group=[{0, 1}], text=[LISTAGG($2, $3)])
-   +- LogicalProject($f0=[TUMBLE($1, 6000:INTERVAL SECOND)], 
id1=[$0], text=[$2], $f3=[_UTF-16LE'#'])
-  +- LogicalProject(id1=[$0], ts=[$1], text=[$2])
- +- LogicalFilter(condition=[AND(=($0, $3), >($1, -($4, 
30:INTERVAL MINUTE)), <($1, +($4, 18:INTERVAL MINUTE)))])
-+- LogicalJoin(condition=[true], joinType=[inner])
-   :- LogicalWatermarkAssigner(fields=[id1, rowtime, 
text], rowtimeField=[rowtime], watermarkDelay=[0])
-   :  +- LogicalTableScan(table=[[default_catalog, 
default_database, T1]])
-   +- LogicalWatermarkAssigner(fields=[id2, rowtime, 
cnt, name, goods], rowtimeField=[rowtime], watermarkDelay=[0])
-  +- LogicalTableScan(table=[[default_catalog, 
default_database, T2]])
+  +- LogicalProject($f0=[HOP(TUMBLE_ROWTIME($0), 12000:INTERVAL SECOND, 
4000:INTERVAL SECOND)], id1=[$1], text=[$2], $f3=[_UTF-16LE'*'])
 
 Review comment:
   Because the redundant projection was removed.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] danny0405 commented on a change in pull request #10123: [FLINK-14665][table-planner-blink] Support computed column for create…

2019-11-09 Thread GitBox
danny0405 commented on a change in pull request #10123: 
[FLINK-14665][table-planner-blink] Support computed column for create…
URL: https://github.com/apache/flink/pull/10123#discussion_r344436625
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/DagOptimizationTest.xml
 ##
 @@ -31,13 +31,11 @@ 
LogicalSink(name=[`default_catalog`.`default_database`.`retractSink`], fields=[b
  +- LogicalProject(b=[$1], a=[$0])
 +- LogicalUnion(all=[true])
:- LogicalProject(a=[$0], b=[$1], c=[$2])
-   :  +- LogicalProject(a=[$0], b=[$1], c=[$2])
-   : +- LogicalFilter(condition=[LIKE($2, _UTF-16LE'%hello%')])
-   :+- LogicalTableScan(table=[[default_catalog, 
default_database, MyTable, source: [TestTableSource(a, b, c)]]])
+   :  +- LogicalFilter(condition=[LIKE($2, _UTF-16LE'%hello%')])
 
 Review comment:
   When we use the `toRel` way to translate the view table, see 
`FlinkRelOptTable#toRel` TranslatableTable part, the view can not be expanded 
because of the `QueryOperationCatalogViewTable` use a redundant projection to 
fix the nullability attribute. This is definitely wrong, we should fix the 
nullability from the row type inference. So i remove the hacky projection and 
we reduce a projection in the middle plan.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] danny0405 commented on a change in pull request #10123: [FLINK-14665][table-planner-blink] Support computed column for create…

2019-11-09 Thread GitBox
danny0405 commented on a change in pull request #10123: 
[FLINK-14665][table-planner-blink] Support computed column for create…
URL: https://github.com/apache/flink/pull/10123#discussion_r344436277
 
 

 ##
 File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LegacyTypeInformationType.java
 ##
 @@ -50,7 +50,14 @@
private final TypeInformation typeInfo;
 
public LegacyTypeInformationType(LogicalTypeRoot logicalTypeRoot, 
TypeInformation typeInfo) {
-   super(true, logicalTypeRoot);
+   this(true, logicalTypeRoot, typeInfo);
+   }
+
+   public LegacyTypeInformationType(
+   boolean nullable,
 
 Review comment:
   To fix the nullability, this is not a TypeInformation, this is a 
LogicalType, see it's `#copy()` method how the nullability was used.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services