[jira] [Commented] (FLINK-21542) Add documentation for supporting INSERT INTO specific columns

2021-03-02 Thread Zhenghua Gao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-21542?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17294341#comment-17294341
 ] 

Zhenghua Gao commented on FLINK-21542:
--

Is it enough to update the syntax of [insert 
syntax|https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/sql/insert.html#syntax]
 ?
 

> Add documentation for supporting INSERT INTO specific columns
> -
>
> Key: FLINK-21542
> URL: https://issues.apache.org/jira/browse/FLINK-21542
> Project: Flink
>  Issue Type: New Feature
>  Components: Table SQL / API
>Reporter: Jark Wu
>Priority: Major
> Fix For: 1.13.0
>
>
> We have supported INSERT INTO specific columns in FLINK-18726, but no add 
> documentation yet. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-18726) Support INSERT INTO specific columns

2021-01-24 Thread Zhenghua Gao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18726?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17271091#comment-17271091
 ] 

Zhenghua Gao commented on FLINK-18726:
--

[~twalthr] [~Leonard Xu] [~jark] I have implemented this in our internal 
branch, and found it's a little complex. The column list specification overlaps 
with computed columns and partition columns in validation which sometimes 
causes difficulties. I'd like take it and contribute our proposal.

> Support INSERT INTO specific columns
> 
>
> Key: FLINK-18726
> URL: https://issues.apache.org/jira/browse/FLINK-18726
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Reporter: Caizhi Weng
>Assignee: Atri Sharma
>Priority: Major
>  Labels: sprint
> Fix For: 1.13.0
>
>
> Currently Flink only supports insert into a table without specifying columns, 
> but most database systems support insert into specific columns by
> {code:sql}
> INSERT INTO table_name(column1, column2, ...) ...
> {code}
> The columns not specified will be filled with default values or {{NULL}} if 
> no default value is given when creating the table.
> As Flink currently does not support default values when creating tables, we 
> can fill the unspecified columns with {{NULL}} and throw exceptions if there 
> are columns with {{NOT NULL}} constraints.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-21026) Align column list specification with Hive in INSERT statement

2021-01-19 Thread Zhenghua Gao (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-21026?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhenghua Gao updated FLINK-21026:
-
Description: 
HIVE-9481 allows column list specification in INSERT statement. The syntax is:
{code:java}
INSERT INTO TABLE table_name 
[PARTITION (partcol1[=val1], partcol2[=val2] ...)] 
[(column list)] 
select_statement FROM from_statement
{code}
In the MeanWhile, flink introduces PARTITION syntax that the PARTITION clause 
appears after the COLUMN LIST clause. It looks weird and luckily we don't 
support COLUMN LIST clause now.  I think it'a good chance to align this with 
Hive now.

 
  
  

  was:
HIVE-9481 allows column list specification in INSERT statement. The syntax is:
{code:java}
INSERT INTO TABLE table_name 
[PARTITION (partcol1[=val1], partcol2[=val2] ...)] 
[(column list)] 
select_statement FROM from_statement
{code}
In the MeanWhile, flink introduces PARTITION syntax that the PARTITION clause 
appears after the COLUMN LIST clause. It looks weird and luckily we don't 
support COLUMN LIST clause nowFLINK-18726.  I think it'a good chance to align 
this with Hive now.

 
  
 


> Align column list specification with Hive in INSERT statement
> -
>
> Key: FLINK-21026
> URL: https://issues.apache.org/jira/browse/FLINK-21026
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Reporter: Zhenghua Gao
>Priority: Major
>
> HIVE-9481 allows column list specification in INSERT statement. The syntax is:
> {code:java}
> INSERT INTO TABLE table_name 
> [PARTITION (partcol1[=val1], partcol2[=val2] ...)] 
> [(column list)] 
> select_statement FROM from_statement
> {code}
> In the MeanWhile, flink introduces PARTITION syntax that the PARTITION clause 
> appears after the COLUMN LIST clause. It looks weird and luckily we don't 
> support COLUMN LIST clause now.  I think it'a good chance to align this with 
> Hive now.
>  
>   
>   



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-21026) Align column list specification with Hive in INSERT statement

2021-01-19 Thread Zhenghua Gao (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-21026?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhenghua Gao updated FLINK-21026:
-
Description: 
HIVE-9481 allows column list specification in INSERT statement. The syntax is:
{code:java}
INSERT INTO TABLE table_name 
[PARTITION (partcol1[=val1], partcol2[=val2] ...)] 
[(column list)] 
select_statement FROM from_statement
{code}
In the MeanWhile, flink introduces PARTITION syntax that the PARTITION clause 
appears after the COLUMN LIST clause. It looks weird and luckily we don't 
support COLUMN LIST clause nowFLINK-18726.  I think it'a good chance to align 
this with Hive now.

 
  
 

  was:
[HIVE-9481|https://issues.apache.org/jira/browse/HIVE-9481] allows column list 
specification in INSERT statement. The syntax is:
{code:java}
INSERT INTO TABLE table_name 
[PARTITION (partcol1[=val1], partcol2[=val2] ...)] 
[(column list)] 
select_statement FROM from_statement
{code}
In the MeanWhile, flink introduces PARTITION syntax that the PARTITION clause 
appears after the COLUMN LIST clause. It looks weird and luckily we don't 
support COLUMN LIST clause 
now[FLINK-18726|https://issues.apache.org/jira/browse/FLINK-18726].  I think 
it'a good change to align this with Hive now.

 
 


> Align column list specification with Hive in INSERT statement
> -
>
> Key: FLINK-21026
> URL: https://issues.apache.org/jira/browse/FLINK-21026
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Reporter: Zhenghua Gao
>Priority: Major
>
> HIVE-9481 allows column list specification in INSERT statement. The syntax is:
> {code:java}
> INSERT INTO TABLE table_name 
> [PARTITION (partcol1[=val1], partcol2[=val2] ...)] 
> [(column list)] 
> select_statement FROM from_statement
> {code}
> In the MeanWhile, flink introduces PARTITION syntax that the PARTITION clause 
> appears after the COLUMN LIST clause. It looks weird and luckily we don't 
> support COLUMN LIST clause nowFLINK-18726.  I think it'a good chance to align 
> this with Hive now.
>  
>   
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-21026) Align column list specification with Hive in INSERT statement

2021-01-19 Thread Zhenghua Gao (Jira)
Zhenghua Gao created FLINK-21026:


 Summary: Align column list specification with Hive in INSERT 
statement
 Key: FLINK-21026
 URL: https://issues.apache.org/jira/browse/FLINK-21026
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / API
Reporter: Zhenghua Gao


[HIVE-9481|https://issues.apache.org/jira/browse/HIVE-9481] allows column list 
specification in INSERT statement. The syntax is:
{code:java}
INSERT INTO TABLE table_name 
[PARTITION (partcol1[=val1], partcol2[=val2] ...)] 
[(column list)] 
select_statement FROM from_statement
{code}
In the MeanWhile, flink introduces PARTITION syntax that the PARTITION clause 
appears after the COLUMN LIST clause. It looks weird and luckily we don't 
support COLUMN LIST clause 
now[FLINK-18726|https://issues.apache.org/jira/browse/FLINK-18726].  I think 
it'a good change to align this with Hive now.

 
 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-16827) StreamExecTemporalSort should require a distribution trait in StreamExecTemporalSortRule

2020-07-16 Thread Zhenghua Gao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16827?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17159601#comment-17159601
 ] 

Zhenghua Gao commented on FLINK-16827:
--

[~jark] [~libenchao] I think this bugfix should port to 1.11, or the 
TemporalSort in 1.11 is not available for production environments.

> StreamExecTemporalSort should require a distribution trait in 
> StreamExecTemporalSortRule
> 
>
> Key: FLINK-16827
> URL: https://issues.apache.org/jira/browse/FLINK-16827
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner, Table SQL / Runtime
>Affects Versions: 1.9.1
> Environment: flink on yarn
> !image-2020-03-27-21-23-13-648.png!
>Reporter: wuchangjun
>Assignee: Benchao Li
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.12.0
>
> Attachments: image-2020-03-27-21-22-21-122.png, 
> image-2020-03-27-21-22-44-191.png, image-2020-03-27-21-23-13-648.png
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> flink reads kafka data and sorts by time field. In the case of multiple 
> concurrency, it throws the following null pointer exception. One concurrent 
> processing is normal.
> !image-2020-03-27-21-22-21-122.png!
>  
> !image-2020-03-27-21-22-44-191.png!
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (FLINK-16117) Avoid register source in TableTestBase#addTableSource

2020-05-15 Thread Zhenghua Gao (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-16117?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhenghua Gao closed FLINK-16117.

Resolution: Won't Fix

> Avoid register source in TableTestBase#addTableSource
> -
>
> Key: FLINK-16117
> URL: https://issues.apache.org/jira/browse/FLINK-16117
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Reporter: Zhenghua Gao
>Assignee: Zhenghua Gao
>Priority: Major
>
> This affects thousands of unit tests:
> 1) explainSourceAsString of CatalogSourceTable changes
> 2)JoinTest#testUDFInJoinCondition: SQL keywords must be escaped
> 3) GroupWindowTest#testTimestampEventTimeTumblingGroupWindowWithProperties: 
> Reference to a rowtime or proctime window required
> 4) SetOperatorsTest#testInWithProject: legacy type vs new type
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-15591) make up for feature parity by supporting CREATE TEMPORARY TABLE/VIEW in DDL

2020-04-26 Thread Zhenghua Gao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15591?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17092691#comment-17092691
 ] 

Zhenghua Gao commented on FLINK-15591:
--

will open a PR soon for temporary table.

> make up for feature parity by supporting CREATE TEMPORARY TABLE/VIEW in DDL
> ---
>
> Key: FLINK-15591
> URL: https://issues.apache.org/jira/browse/FLINK-15591
> Project: Flink
>  Issue Type: Task
>  Components: Table SQL / API
>Reporter: Bowen Li
>Assignee: Kurt Young
>Priority: Major
>
> make up for feature parity by supporting CREATE TEMPORARY TABLE/VIEW in DDL 
> as corresponding API to that in Table API. Table API already support such 
> operations explicitly in 1.10



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-15591) make up for feature parity by supporting CREATE TEMPORARY TABLE/VIEW in DDL

2020-04-26 Thread Zhenghua Gao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15591?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17092654#comment-17092654
 ] 

Zhenghua Gao edited comment on FLINK-15591 at 4/26/20, 11:34 AM:
-

temporary view is supported in https://issues.apache.org/jira/browse/FLINK-17106


was (Author: docete):
Supported in https://issues.apache.org/jira/browse/FLINK-17106

> make up for feature parity by supporting CREATE TEMPORARY TABLE/VIEW in DDL
> ---
>
> Key: FLINK-15591
> URL: https://issues.apache.org/jira/browse/FLINK-15591
> Project: Flink
>  Issue Type: Task
>  Components: Table SQL / API
>Reporter: Bowen Li
>Assignee: Kurt Young
>Priority: Major
>
> make up for feature parity by supporting CREATE TEMPORARY TABLE/VIEW in DDL 
> as corresponding API to that in Table API. Table API already support such 
> operations explicitly in 1.10



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-15591) make up for feature parity by supporting CREATE TEMPORARY TABLE/VIEW in DDL

2020-04-26 Thread Zhenghua Gao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15591?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17092654#comment-17092654
 ] 

Zhenghua Gao commented on FLINK-15591:
--

Supported in https://issues.apache.org/jira/browse/FLINK-17106

> make up for feature parity by supporting CREATE TEMPORARY TABLE/VIEW in DDL
> ---
>
> Key: FLINK-15591
> URL: https://issues.apache.org/jira/browse/FLINK-15591
> Project: Flink
>  Issue Type: Task
>  Components: Table SQL / API
>Reporter: Bowen Li
>Assignee: Kurt Young
>Priority: Major
>
> make up for feature parity by supporting CREATE TEMPORARY TABLE/VIEW in DDL 
> as corresponding API to that in Table API. Table API already support such 
> operations explicitly in 1.10



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-17112) Support DESCRIBE view_name in Flink SQL

2020-04-23 Thread Zhenghua Gao (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-17112?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhenghua Gao updated FLINK-17112:
-
Description: FLINK-14688 introduces DESCRIBE statement in sql parser, but 
doesn't implement it in planner side because the TableEnvironment.sqlUpdate 
returns nothing. Since FLINK-16366 introduces TableEnvironment.executeSql and 
returns TableResult, we can implement DESCRIBE statement in planner now.

> Support DESCRIBE view_name in Flink SQL
> ---
>
> Key: FLINK-17112
> URL: https://issues.apache.org/jira/browse/FLINK-17112
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API, Table SQL / Planner
>Affects Versions: 1.10.0
>Reporter: Zhenghua Gao
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.11.0
>
>
> FLINK-14688 introduces DESCRIBE statement in sql parser, but doesn't 
> implement it in planner side because the TableEnvironment.sqlUpdate returns 
> nothing. Since FLINK-16366 introduces TableEnvironment.executeSql and returns 
> TableResult, we can implement DESCRIBE statement in planner now.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-17112) Support DESCRIBE view_name in Flink SQL

2020-04-23 Thread Zhenghua Gao (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-17112?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhenghua Gao updated FLINK-17112:
-
Summary: Support DESCRIBE view_name in Flink SQL  (was: Support DESCRIBE 
VIEW view_name in Flink SQL)

> Support DESCRIBE view_name in Flink SQL
> ---
>
> Key: FLINK-17112
> URL: https://issues.apache.org/jira/browse/FLINK-17112
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API, Table SQL / Planner
>Affects Versions: 1.10.0
>Reporter: Zhenghua Gao
>Priority: Major
> Fix For: 1.11.0
>
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-16160) Schema#proctime and Schema#rowtime don't work in TableEnvironment#connect code path

2020-04-21 Thread Zhenghua Gao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16160?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17088275#comment-17088275
 ] 

Zhenghua Gao edited comment on FLINK-16160 at 4/21/20, 6:04 AM:


The root causes are:
 * proctime()/rowtime() are used along with 
DefinedRowtimeAttributes/DefinedProctimeAttribute and ConnectorCatalogTable.  
The original code path stores the ConnectorCatalogTable object in Catalog and 
in validate phase, the RowType is derived from ConnectorCatalogTable.getSchema 
which contains time indicator. After FLINK-14490, we store CatalogTableImpl 
object in Catalog and in validate phrase, the RowType is derived from 
CatalogTableImpl.getSchema which doesn't contain time indicator.
 * In SqlToRel phase, FlinkCalciteCatalogReader converts ConnectorCatalogTable 
to TableSourceTable and converts CatalogTable to CatalogSourceTable. The 
TableSourceTable would be converted to LogicalTableScan directly and contains 
time indicator. Otherwise the CatalogSourceTable would be converted to a 
LogicalTableScan whose time indicator is erased(by FLINK-16345).

The solution is straightforward:
 * We should instantiate the TableSource in CatalogSchemaTable and check if 
it's a DefinedRowtimeAttributes/DefinedProctimeAttribute instance. If so, 
rewrite the TableSchema to patch the time indicator(as it is in 
ConnectorCatalogTable#calculateSourceSchema). This will pass the validation.
 * Avoid erasing time indicator in CatalogSourceTable if the TableSource is a 
DefinedRowtimeAttributes/DefinedProctimeAttribute instance


was (Author: docete):
The root causes are:
 * proctime()/rowtime() are used along with 
DefinedRowtimeAttributes/DefinedProctimeAttribute and ConnectorCatalogTable.  
The original code path stores the ConnectorCatalogTable object in Catalog and 
in validate phrase, the RowType is derived from ConnectorCatalogTable.getSchema 
which contains time indicator. After FLINK-14490, we store CatalogTableImpl 
object in Catalog and in validate phrase, the RowType is derived from 
CatalogTableImpl.getSchema which doesn't contain time indicator.
 * In SqlToRel phrase, FlinkCalciteCatalogReader converts ConnectorCatalogTable 
to TableSourceTable and converts CatalogTable to CatalogSourceTable. The 
TableSourceTable would be converted to LogicalTableScan directly and contains 
time indicator. Otherwise the CatalogSourceTable would be converted to a 
LogicalTableScan whose time indicator is erased(by FLINK-16345).

The solution is straightforward:
 * We should instantiate the TableSource in CatalogSchemaTable and check if 
it's a DefinedRowtimeAttributes/DefinedProctimeAttribute instance. If so, 
rewrite the TableSchema to patch the time indicator(as it is in 
ConnectorCatalogTable#calculateSourceSchema). This will pass the validation.
 * Avoid erasing time indicator in CatalogSourceTable if the TableSource is a 
DefinedRowtimeAttributes/DefinedProctimeAttribute instance

> Schema#proctime and Schema#rowtime don't work in TableEnvironment#connect 
> code path
> ---
>
> Key: FLINK-16160
> URL: https://issues.apache.org/jira/browse/FLINK-16160
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Reporter: Zhenghua Gao
>Assignee: Zhenghua Gao
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.11.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> In ConnectTableDescriptor#createTemporaryTable, the proctime/rowtime 
> properties are ignored so the generated catalog table is not correct. We 
> should fix this to let TableEnvironment#connect() support watermark.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-16160) Schema#proctime and Schema#rowtime don't work in TableEnvironment#connect code path

2020-04-20 Thread Zhenghua Gao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16160?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17088275#comment-17088275
 ] 

Zhenghua Gao commented on FLINK-16160:
--

The root causes are:
 * proctime()/rowtime() are used along with 
DefinedRowtimeAttributes/DefinedProctimeAttribute and ConnectorCatalogTable.  
The original code path stores the ConnectorCatalogTable object in Catalog and 
in validate phrase, the RowType is derived from ConnectorCatalogTable.getSchema 
which contains time indicator. After FLINK-14490, we store CatalogTableImpl 
object in Catalog and in validate phrase, the RowType is derived from 
CatalogTableImpl.getSchema which doesn't contain time indicator.
 * In SqlToRel phrase, FlinkCalciteCatalogReader converts ConnectorCatalogTable 
to TableSourceTable and converts CatalogTable to CatalogSourceTable. The 
TableSourceTable would be converted to LogicalTableScan directly and contains 
time indicator. Otherwise the CatalogSourceTable would be converted to a 
LogicalTableScan whose time indicator is erased(by FLINK-16345).

The solution is straightforward:
 * We should instantiate the TableSource in CatalogSchemaTable and check if 
it's a DefinedRowtimeAttributes/DefinedProctimeAttribute instance. If so, 
rewrite the TableSchema to patch the time indicator(as it is in 
ConnectorCatalogTable#calculateSourceSchema). This will pass the validation.
 * Avoid erasing time indicator in CatalogSourceTable if the TableSource is a 
DefinedRowtimeAttributes/DefinedProctimeAttribute instance

> Schema#proctime and Schema#rowtime don't work in TableEnvironment#connect 
> code path
> ---
>
> Key: FLINK-16160
> URL: https://issues.apache.org/jira/browse/FLINK-16160
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Reporter: Zhenghua Gao
>Assignee: Zhenghua Gao
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.11.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> In ConnectTableDescriptor#createTemporaryTable, the proctime/rowtime 
> properties are ignored so the generated catalog table is not correct. We 
> should fix this to let TableEnvironment#connect() support watermark.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-17111) Support SHOW VIEWS in Flink SQL

2020-04-15 Thread Zhenghua Gao (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-17111?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhenghua Gao updated FLINK-17111:
-
Description: 
SHOW TABLES and SHOW VIEWS are not SQL standard-compliant commands. 

MySQL supports SHOW TABLES which lists the non-TEMPORARY tables(and views) in a 
given database, and doesn't support SHOW VIEWS.

Oracle/SQL Server/PostgreSQL don't support SHOW TABLES and SHOW VIEWS. A 
workaround is to query a system table which stores metadata of tables and views.

Hive supports both SHOW TABLES and SHOW VIEWS. 

We follows the Hive style which lists all tables and views with SHOW TABLES and 
lists only views with SHOW VIEWS. 

  was:
SHOW TABLES and SHOW VIEWS are not SQL standard-compliant commands. 

MySQL supports SHOW TABLES which lists the non-TEMPORARY tables(and views) in a 
given database, and doesn't support SHOW VIEWS.

Oracle/SQL Server/PostgreSQL don't support SHOW TABLES and SHOW VIEWS. A 
workaround is to query a system table which stores tables and views.

Hive supports both SHOW TABLES and SHOW VIEWS. 

We follows the Hive style which lists all tables and views with SHOW TABLES and 
lists only views with SHOW VIEWS. 


> Support SHOW VIEWS in Flink SQL 
> 
>
> Key: FLINK-17111
> URL: https://issues.apache.org/jira/browse/FLINK-17111
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API, Table SQL / Planner
>Affects Versions: 1.10.0
>Reporter: Zhenghua Gao
>Priority: Major
> Fix For: 1.11.0
>
>
> SHOW TABLES and SHOW VIEWS are not SQL standard-compliant commands. 
> MySQL supports SHOW TABLES which lists the non-TEMPORARY tables(and views) in 
> a given database, and doesn't support SHOW VIEWS.
> Oracle/SQL Server/PostgreSQL don't support SHOW TABLES and SHOW VIEWS. A 
> workaround is to query a system table which stores metadata of tables and 
> views.
> Hive supports both SHOW TABLES and SHOW VIEWS. 
> We follows the Hive style which lists all tables and views with SHOW TABLES 
> and lists only views with SHOW VIEWS. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-17111) Support SHOW VIEWS in Flink SQL

2020-04-15 Thread Zhenghua Gao (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-17111?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhenghua Gao updated FLINK-17111:
-
Description: 
SHOW TABLES and SHOW VIEWS are not SQL standard-compliant commands. 

MySQL supports SHOW TABLES which lists the non-TEMPORARY tables(and views) in a 
given database, and doesn't support SHOW VIEWS.

Oracle/SQL Server/PostgreSQL don't support SHOW TABLES and SHOW VIEWS. A 
workaround is to query a system table which stores tables and views.

Hive supports both SHOW TABLES and SHOW VIEWS. 

We follows the Hive style which lists all tables and views with SHOW TABLES and 
lists only views with SHOW VIEWS. 

> Support SHOW VIEWS in Flink SQL 
> 
>
> Key: FLINK-17111
> URL: https://issues.apache.org/jira/browse/FLINK-17111
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API, Table SQL / Planner
>Affects Versions: 1.10.0
>Reporter: Zhenghua Gao
>Priority: Major
> Fix For: 1.11.0
>
>
> SHOW TABLES and SHOW VIEWS are not SQL standard-compliant commands. 
> MySQL supports SHOW TABLES which lists the non-TEMPORARY tables(and views) in 
> a given database, and doesn't support SHOW VIEWS.
> Oracle/SQL Server/PostgreSQL don't support SHOW TABLES and SHOW VIEWS. A 
> workaround is to query a system table which stores tables and views.
> Hive supports both SHOW TABLES and SHOW VIEWS. 
> We follows the Hive style which lists all tables and views with SHOW TABLES 
> and lists only views with SHOW VIEWS. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-17113) Refactor view support in SQL Client

2020-04-13 Thread Zhenghua Gao (Jira)
Zhenghua Gao created FLINK-17113:


 Summary: Refactor view support in SQL Client
 Key: FLINK-17113
 URL: https://issues.apache.org/jira/browse/FLINK-17113
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / Client
Affects Versions: 1.10.0
Reporter: Zhenghua Gao
 Fix For: 1.11.0






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-17112) Support DESCRIBE VIEW view_name in Flink SQL

2020-04-13 Thread Zhenghua Gao (Jira)
Zhenghua Gao created FLINK-17112:


 Summary: Support DESCRIBE VIEW view_name in Flink SQL
 Key: FLINK-17112
 URL: https://issues.apache.org/jira/browse/FLINK-17112
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / API, Table SQL / Planner
Affects Versions: 1.10.0
Reporter: Zhenghua Gao
 Fix For: 1.11.0






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-17111) Support SHOW VIEWS in Flink SQL

2020-04-13 Thread Zhenghua Gao (Jira)
Zhenghua Gao created FLINK-17111:


 Summary: Support SHOW VIEWS in Flink SQL 
 Key: FLINK-17111
 URL: https://issues.apache.org/jira/browse/FLINK-17111
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / API, Table SQL / Planner
Affects Versions: 1.10.0
Reporter: Zhenghua Gao
 Fix For: 1.11.0






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-17106) Support create/drop view in Flink SQL

2020-04-13 Thread Zhenghua Gao (Jira)
Zhenghua Gao created FLINK-17106:


 Summary: Support create/drop view in Flink SQL
 Key: FLINK-17106
 URL: https://issues.apache.org/jira/browse/FLINK-17106
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / API, Table SQL / Legacy Planner, Table SQL / 
Planner
Affects Versions: 1.10.0
Reporter: Zhenghua Gao
 Fix For: 1.11.0






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-17105) FLIP-71: E2E view support for Flink SQL

2020-04-13 Thread Zhenghua Gao (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-17105?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhenghua Gao updated FLINK-17105:
-
Component/s: Table SQL / Planner
 Table SQL / Legacy Planner
 Table SQL / Client
 Table SQL / API

> FLIP-71: E2E view support for Flink SQL
> ---
>
> Key: FLINK-17105
> URL: https://issues.apache.org/jira/browse/FLINK-17105
> Project: Flink
>  Issue Type: New Feature
>  Components: Table SQL / API, Table SQL / Client, Table SQL / Legacy 
> Planner, Table SQL / Planner
>Affects Versions: 1.10.0
>Reporter: Zhenghua Gao
>Priority: Major
> Fix For: 1.11.0
>
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-17105) FLIP-71: E2E view support for Flink SQL

2020-04-13 Thread Zhenghua Gao (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-17105?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhenghua Gao updated FLINK-17105:
-
Summary: FLIP-71: E2E view support for Flink SQL  (was: FLIP-71: E2E 
viewsupport )

> FLIP-71: E2E view support for Flink SQL
> ---
>
> Key: FLINK-17105
> URL: https://issues.apache.org/jira/browse/FLINK-17105
> Project: Flink
>  Issue Type: New Feature
>Reporter: Zhenghua Gao
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-17105) FLIP-71: E2E view support for Flink SQL

2020-04-13 Thread Zhenghua Gao (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-17105?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhenghua Gao updated FLINK-17105:
-
Affects Version/s: 1.10.0

> FLIP-71: E2E view support for Flink SQL
> ---
>
> Key: FLINK-17105
> URL: https://issues.apache.org/jira/browse/FLINK-17105
> Project: Flink
>  Issue Type: New Feature
>Affects Versions: 1.10.0
>Reporter: Zhenghua Gao
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-17105) FLIP-71: E2E viewsupport

2020-04-13 Thread Zhenghua Gao (Jira)
Zhenghua Gao created FLINK-17105:


 Summary: FLIP-71: E2E viewsupport 
 Key: FLINK-17105
 URL: https://issues.apache.org/jira/browse/FLINK-17105
 Project: Flink
  Issue Type: New Feature
Reporter: Zhenghua Gao






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-17105) FLIP-71: E2E view support for Flink SQL

2020-04-13 Thread Zhenghua Gao (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-17105?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhenghua Gao updated FLINK-17105:
-
Fix Version/s: 1.11.0

> FLIP-71: E2E view support for Flink SQL
> ---
>
> Key: FLINK-17105
> URL: https://issues.apache.org/jira/browse/FLINK-17105
> Project: Flink
>  Issue Type: New Feature
>Affects Versions: 1.10.0
>Reporter: Zhenghua Gao
>Priority: Major
> Fix For: 1.11.0
>
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-17098) CatalogManager#dropTemporaryTable and dropTemporaryView should use ObjectIdentifier as its argument

2020-04-12 Thread Zhenghua Gao (Jira)
Zhenghua Gao created FLINK-17098:


 Summary: CatalogManager#dropTemporaryTable and dropTemporaryView 
should use ObjectIdentifier as its argument
 Key: FLINK-17098
 URL: https://issues.apache.org/jira/browse/FLINK-17098
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / API
Affects Versions: 1.10.0
Reporter: Zhenghua Gao
 Fix For: 1.11.0


Since CatalogManager#createTable, createTemporaryTable and dropTable use the 
given 

fully qualified ObjectIdentifier to create or drop tables/temporary tables, we 
should also use ObjectIdentifier (instead of UnresolvedIdentifier) in 
dropTemporaryTable and dropTemporaryView. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-17067) CatalogManager#createTable and createTemporaryTable should provide consistent semantics

2020-04-09 Thread Zhenghua Gao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17067?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17079280#comment-17079280
 ] 

Zhenghua Gao commented on FLINK-17067:
--

Seems we didn't expose  [OR REPLACE] out yet. 

Most invoking of CatalogManager#createTemporaryTable use "[OR REPLACE] = 
false", which is compatible with "[IF NOT EXISTS] = false".

The only two exceptions are TableEnvironmentImpl#registerTableSourceInternal() 
and TableEnvironmentImpl#registerTableSinkInternal() which are called in 
deprecated APIs (registerTableSource/Sink). 

> CatalogManager#createTable and createTemporaryTable should provide consistent 
> semantics
> ---
>
> Key: FLINK-17067
> URL: https://issues.apache.org/jira/browse/FLINK-17067
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / API
>Affects Versions: 1.10.0
>Reporter: Zhenghua Gao
>Priority: Major
> Fix For: 1.11.0
>
>
> Currently CatalogManager#createTable provides  [IF NOT EXISTS] semantic and 
> CatalogManager#createTemporaryTable provides [OR REPLACE] semantic. IMO they 
> should provide consistent semantics: [IF NOT EXISTS] or [OR REPLACE].
> I prefer [IF NOT EXISTS] since we didn't support [OR REPLACE] in table 
> DDL(and view DDL) currently.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-17067) CatalogManager#createTable and createTemporaryTable should provide consistent semantics

2020-04-09 Thread Zhenghua Gao (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-17067?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhenghua Gao updated FLINK-17067:
-
Description: 
Currently CatalogManager#createTable provides  [IF NOT EXISTS] semantic and 
CatalogManager#createTemporaryTable provides [OR REPLACE] semantic. IMO they 
should provide consistent semantics: [IF NOT EXISTS] or [OR REPLACE].

I prefer [IF NOT EXISTS] since we didn't support [OR REPLACE] in table DDL(and 
view DDL) currently.
 

  was:
Currently CatalogManager#createTable provides  [IF NOT EXISTS] semantic and 
CatalogManager#createTemporaryTable provides [OR REPLACE] semantic. IMO they 
should provide consistent semantics: [IF NOT EXISTS] or [OR REPLACE] or BOTH.

I prefer [IF NOT EXISTS] since we didn't support [OR REPLACE] in table DDL(and 
view DDL) currently.


> CatalogManager#createTable and createTemporaryTable should provide consistent 
> semantics
> ---
>
> Key: FLINK-17067
> URL: https://issues.apache.org/jira/browse/FLINK-17067
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / API
>Affects Versions: 1.10.0
>Reporter: Zhenghua Gao
>Priority: Major
> Fix For: 1.11.0
>
>
> Currently CatalogManager#createTable provides  [IF NOT EXISTS] semantic and 
> CatalogManager#createTemporaryTable provides [OR REPLACE] semantic. IMO they 
> should provide consistent semantics: [IF NOT EXISTS] or [OR REPLACE].
> I prefer [IF NOT EXISTS] since we didn't support [OR REPLACE] in table 
> DDL(and view DDL) currently.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-17067) CatalogManager#createTable and createTemporaryTable should provide consistent semantics

2020-04-09 Thread Zhenghua Gao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17067?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17079091#comment-17079091
 ] 

Zhenghua Gao commented on FLINK-17067:
--

Or the CREATE [TEMPORARY] TABLE|VIEW [IF NOT EXISTS] can't provide consistent 
semantics.

cc [~dwysakowicz] [~jark] What do you think?

> CatalogManager#createTable and createTemporaryTable should provide consistent 
> semantics
> ---
>
> Key: FLINK-17067
> URL: https://issues.apache.org/jira/browse/FLINK-17067
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / API
>Affects Versions: 1.10.0
>Reporter: Zhenghua Gao
>Priority: Major
> Fix For: 1.11.0
>
>
> Currently CatalogManager#createTable provides  [IF NOT EXISTS] semantic and 
> CatalogManager#createTemporaryTable provides [OR REPLACE] semantic. IMO they 
> should provide consistent semantics: [IF NOT EXISTS] or [OR REPLACE] or BOTH.
> I prefer [IF NOT EXISTS] since we didn't support [OR REPLACE] in table 
> DDL(and view DDL) currently.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-17067) CatalogManager#createTable and createTemporaryTable should provide consistent semantics

2020-04-09 Thread Zhenghua Gao (Jira)
Zhenghua Gao created FLINK-17067:


 Summary: CatalogManager#createTable and createTemporaryTable 
should provide consistent semantics
 Key: FLINK-17067
 URL: https://issues.apache.org/jira/browse/FLINK-17067
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / API
Affects Versions: 1.10.0
Reporter: Zhenghua Gao
 Fix For: 1.11.0


Currently CatalogManager#createTable provides  [IF NOT EXISTS] semantic and 
CatalogManager#createTemporaryTable provides [OR REPLACE] semantic. IMO they 
should provide consistent semantics: [IF NOT EXISTS] or [OR REPLACE] or BOTH.

I prefer [IF NOT EXISTS] since we didn't support [OR REPLACE] in table DDL(and 
view DDL) currently.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-16632) SqlDateTimeUtils#toSqlTimestamp(String, String) may yield incorrect result

2020-04-07 Thread Zhenghua Gao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16632?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17077035#comment-17077035
 ] 

Zhenghua Gao commented on FLINK-16632:
--

SqlDateTimeUtils#toSqlTimestamp(String, String) may yield incorrect result. 
Which cause:

1) cast STRING to TIMESTAMP yields incompatible result. 

The original result comes from DateTimeUtils#timestampStringToUnixDate which 
supports special cases 

like '1999-9-10 05:20:10' or '1999-9-10'.

2) TO_TIMESTAMP yields incorrect result.

The original result comes from SqlDateTimeUtils#toTimestamp(String, String, 
TimeZone) which follows rules of completion of java.text.SimpleDateFormat

Will file a PR to fix this

> SqlDateTimeUtils#toSqlTimestamp(String, String) may yield incorrect result
> --
>
> Key: FLINK-16632
> URL: https://issues.apache.org/jira/browse/FLINK-16632
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Reporter: Jingsong Lee
>Assignee: Zhenghua Gao
>Priority: Critical
> Fix For: 1.10.1, 1.11.0
>
> Attachments: image-2020-04-03-15-39-35-702.png
>
>
> Legacy planner support SQL: "CAST('1999-9-10' AS TIMESTAMP)".
> Blink planner loose this support after timestamp precision support.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-16632) SqlDateTimeUtils#toSqlTimestamp(String, String) may yield incorrect result

2020-04-07 Thread Zhenghua Gao (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-16632?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhenghua Gao updated FLINK-16632:
-
Summary: SqlDateTimeUtils#toSqlTimestamp(String, String) may yield 
incorrect result  (was: Cast string to timestamp fail)

> SqlDateTimeUtils#toSqlTimestamp(String, String) may yield incorrect result
> --
>
> Key: FLINK-16632
> URL: https://issues.apache.org/jira/browse/FLINK-16632
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Reporter: Jingsong Lee
>Assignee: Zhenghua Gao
>Priority: Critical
> Fix For: 1.10.1, 1.11.0
>
> Attachments: image-2020-04-03-15-39-35-702.png
>
>
> Legacy planner support SQL: "CAST('1999-9-10' AS TIMESTAMP)".
> Blink planner loose this support after timestamp precision support.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-16823) The functioin TIMESTAMPDIFF doesn't perform expected result

2020-03-31 Thread Zhenghua Gao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16823?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17071511#comment-17071511
 ] 

Zhenghua Gao commented on FLINK-16823:
--

After talk with [~danny0405], we decide to introduce addMonths/subtractMonths 
as a temporary solution on Flink side to fix this corner case before 
CALCITE-3881 is fixed.

> The functioin TIMESTAMPDIFF doesn't perform expected result
> ---
>
> Key: FLINK-16823
> URL: https://issues.apache.org/jira/browse/FLINK-16823
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.9.1, 1.10.0
>Reporter: Adam N D DENG
>Assignee: Zhenghua Gao
>Priority: Major
>  Labels: pull-request-available
> Attachments: image-2020-03-27-13-50-51-955.png
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> For example,
> In mysql bellow sql get result 6, but in flink the output is 5
> SELECT timestampdiff (MONTH, TIMESTAMP '2019-09-01 00:00:00',TIMESTAMP 
> '2020-03-01 00:00:00' )
>  
> !image-2020-03-27-13-50-51-955.png!
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-16379) Introduce fromValues in TableEnvironment

2020-03-29 Thread Zhenghua Gao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16379?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17070659#comment-17070659
 ] 

Zhenghua Gao commented on FLINK-16379:
--

[~dwysakowicz] You could merge your branch first(better to support Row and 
fromValues(DataType, Object...) if you have time). I will find another way to 
support plan test in testing utilities.

> Introduce fromValues in TableEnvironment
> 
>
> Key: FLINK-16379
> URL: https://issues.apache.org/jira/browse/FLINK-16379
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.11.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Introduce a fromValues method to TableEnvironment similar to {{VALUES}} 
> clause in SQL
> The suggested API could look like:
> {code}
>   /**
>* Creates a Table from a given row constructing expressions.
>*
>* Examples:
>*
>* You can use {@link Expressions#row(Object, Object...)} to create 
> a composite rows:
>* {@code
>*  tEnv.fromValues(
>*  row(1, "ABC"),
>*  row(2L, "ABCDE")
>*  )
>* }
>* will produce a Table with a schema as follows:
>* {@code
>*  root
>*  |-- f0: BIGINT NOT NULL
>*  |-- f1: VARCHAR(5) NOT NULL
>* }
>*
>* ROWs that are a result of e.g. a function call are not flattened
>* {@code
>*  public class RowFunction extends ScalarFunction {
>*  @DataTypeHint("ROW")
>*  Row eval();
>*  }
>*
>*  tEnv.fromValues(
>*  call(new RowFunction()),
>*  call(new RowFunction())
>*  )
>* }
>* will produce a Table with a schema as follows:
>* {@code
>*  root
>*  |-- f0: ROW<`f0` BIGINT, `f1` VARCHAR(5)>
>* }
>*
>* The row constructor can be dropped to create a table with a 
> single row:
>*
>* ROWs that are a result of e.g. a function call are not flattened
>* {@code
>*  tEnv.fromValues(
>*  1,
>*  2L,
>*  3
>*  )
>* }
>* will produce a Table with a schema as follows:
>* {@code
>*  root
>*  |-- f0: BIGINT NOT NULL
>* }
>*
>* @param expressions Expressions for constructing rows of the VALUES 
> table.
>*/
>   Table fromValues(Expression... expressions);
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-16823) The functioin TIMESTAMPDIFF doesn't perform expected result

2020-03-27 Thread Zhenghua Gao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16823?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17068618#comment-17068618
 ] 

Zhenghua Gao commented on FLINK-16823:
--

[~danny0405] What the plan of the upgrading of Calcite? Should we also upgrade 
calcite-avatica?

> The functioin TIMESTAMPDIFF doesn't perform expected result
> ---
>
> Key: FLINK-16823
> URL: https://issues.apache.org/jira/browse/FLINK-16823
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.9.1, 1.10.0
>Reporter: Adam N D DENG
>Assignee: Zhenghua Gao
>Priority: Major
> Attachments: image-2020-03-27-13-50-51-955.png
>
>
> For example,
> In mysql bellow sql get result 6, but in flink the output is 5
> SELECT timestampdiff (MONTH, TIMESTAMP '2019-09-01 00:00:00',TIMESTAMP 
> '2020-03-01 00:00:00' )
>  
> !image-2020-03-27-13-50-51-955.png!
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-16823) The functioin TIMESTAMPDIFF doesn't perform expected result

2020-03-27 Thread Zhenghua Gao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16823?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17068610#comment-17068610
 ] 

Zhenghua Gao commented on FLINK-16823:
--

The root cause is CALCITE-3881, will fix on calcite side.

> The functioin TIMESTAMPDIFF doesn't perform expected result
> ---
>
> Key: FLINK-16823
> URL: https://issues.apache.org/jira/browse/FLINK-16823
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.9.1, 1.10.0
>Reporter: Adam N D DENG
>Assignee: Zhenghua Gao
>Priority: Major
> Attachments: image-2020-03-27-13-50-51-955.png
>
>
> For example,
> In mysql bellow sql get result 6, but in flink the output is 5
> SELECT timestampdiff (MONTH, TIMESTAMP '2019-09-01 00:00:00',TIMESTAMP 
> '2020-03-01 00:00:00' )
>  
> !image-2020-03-27-13-50-51-955.png!
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-16823) The functioin TIMESTAMPDIFF doesn't perform expected result

2020-03-27 Thread Zhenghua Gao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16823?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17068426#comment-17068426
 ] 

Zhenghua Gao commented on FLINK-16823:
--

Flink use Calcite's SqlFunctions#subtractMonths to find the number of month 
between two dates/timestamps.

There may be a bug in the algorithm of SqlFunctions#subtractMonths. 

Will dig deeper to find the root cause and fix.

 

> The functioin TIMESTAMPDIFF doesn't perform expected result
> ---
>
> Key: FLINK-16823
> URL: https://issues.apache.org/jira/browse/FLINK-16823
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.9.1, 1.10.0
>Reporter: Adam N D DENG
>Assignee: Zhenghua Gao
>Priority: Major
> Attachments: image-2020-03-27-13-50-51-955.png
>
>
> For example,
> In mysql bellow sql get result 6, but in flink the output is 5
> SELECT timestampdiff (MONTH, TIMESTAMP '2019-09-01 00:00:00',TIMESTAMP 
> '2020-03-01 00:00:00' )
>  
> !image-2020-03-27-13-50-51-955.png!
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-16823) The functioin TIMESTAMPDIFF doesn't perform expected result

2020-03-27 Thread Zhenghua Gao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16823?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17068396#comment-17068396
 ] 

Zhenghua Gao commented on FLINK-16823:
--

OK, i will take a look at it.

> The functioin TIMESTAMPDIFF doesn't perform expected result
> ---
>
> Key: FLINK-16823
> URL: https://issues.apache.org/jira/browse/FLINK-16823
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.9.1, 1.10.0
>Reporter: Adam N D DENG
>Priority: Major
> Attachments: image-2020-03-27-13-50-51-955.png
>
>
> For example,
> In mysql bellow sql get result 6, but in flink the output is 5
> SELECT timestampdiff (MONTH, TIMESTAMP '2019-09-01 00:00:00',TIMESTAMP 
> '2020-03-01 00:00:00' )
>  
> !image-2020-03-27-13-50-51-955.png!
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-16800) TypeMappingUtils#checkIfCompatible didn't deal with nested types

2020-03-26 Thread Zhenghua Gao (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-16800?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhenghua Gao updated FLINK-16800:
-
Description: 
the planner uses TypeMappingUtils#checkIfCompatible to validate logical schema 
and physical schema are compatible when translate CatalogSinkModifyOperation to 
Calcite relational expression.  The validation didn't deal with nested types 
well, which could throw the following ValidationException:
{code:java}
Exception in thread "main" org.apache.flink.table.api.ValidationException:
Type ARRAY> of table field 'old'
does not match with the physical type ARRAY> of the 'old' field of the TableSource return
type.
at
org.apache.flink.table.utils.TypeMappingUtils.lambda$checkPhysicalLogicalTypeCompatible$4(TypeMappingUtils.java:164)
at
org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:277)
at
org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:254)
at
org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor.visit(LogicalTypeDefaultVisitor.java:157)
at org.apache.flink.table.types.logical.ArrayType.accept(ArrayType.java:110)
at
org.apache.flink.table.utils.TypeMappingUtils.checkIfCompatible(TypeMappingUtils.java:254)
at
org.apache.flink.table.utils.TypeMappingUtils.checkPhysicalLogicalTypeCompatible(TypeMappingUtils.java:160)
at
org.apache.flink.table.utils.TypeMappingUtils.lambda$computeInCompositeType$8(TypeMappingUtils.java:232)
at java.util.stream.Collectors.lambda$toMap$58(Collectors.java:1321)
at java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169)
at
java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
at
java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
at
java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
at
org.apache.flink.table.utils.TypeMappingUtils.computeInCompositeType(TypeMappingUtils.java:214)
at
org.apache.flink.table.utils.TypeMappingUtils.computePhysicalIndices(TypeMappingUtils.java:192)
at
org.apache.flink.table.utils.TypeMappingUtils.computePhysicalIndicesOrTimeAttributeMarkers(TypeMappingUtils.java:112)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.computeIndexMapping(StreamExecTableSourceScan.scala:212)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:107)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:62)
at
org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlan(StreamExecTableSourceScan.scala:62)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange.translateToPlanInternal(StreamExecExchange.scala:84)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange.translateToPlanInternal(StreamExecExchange.scala:44)
at
org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange.translateToPlan(StreamExecExchange.scala:44)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLimit.translateToPlanInternal(StreamExecLimit.scala:161)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLimit.translateToPlanInternal(StreamExecLimit.scala:51)
at
org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLimit.translateToPlan(StreamExecLimit.scala:51)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToTransformation(StreamExecSink.scala:184)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:153)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48)
at
org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48)
at
org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60)
at
org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:59)
at

[jira] [Updated] (FLINK-16800) TypeMappingUtils#checkIfCompatible didn't deal with nested types

2020-03-26 Thread Zhenghua Gao (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-16800?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhenghua Gao updated FLINK-16800:
-
Summary: TypeMappingUtils#checkIfCompatible didn't deal with nested types  
(was: TypeMappingUtils#checkPhysicalLogicalTypeCompatible didn't deal with 
nested types)

> TypeMappingUtils#checkIfCompatible didn't deal with nested types
> 
>
> Key: FLINK-16800
> URL: https://issues.apache.org/jira/browse/FLINK-16800
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.10.0
>Reporter: Zhenghua Gao
>Priority: Major
> Fix For: 1.11.0
>
>
> the planner will use TypeMappingUtils#checkPhysicalLogicalTypeCompatible to 
> validate logical schema and physical schema are compatible when translate 
> CatalogSinkModifyOperation to Calcite relational expression.  The validation 
> didn't deal with nested types well, which could expose the following 
> ValidationException:
> {code:java}
> Exception in thread "main" org.apache.flink.table.api.ValidationException:
> Type ARRAY> of table field 'old'
> does not match with the physical type ARRAY LEGACY('DECIMAL', 'DECIMAL')>> of the 'old' field of the TableSource return
> type.
> at
> org.apache.flink.table.utils.TypeMappingUtils.lambda$checkPhysicalLogicalTypeCompatible$4(TypeMappingUtils.java:164)
> at
> org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:277)
> at
> org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:254)
> at
> org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor.visit(LogicalTypeDefaultVisitor.java:157)
> at org.apache.flink.table.types.logical.ArrayType.accept(ArrayType.java:110)
> at
> org.apache.flink.table.utils.TypeMappingUtils.checkIfCompatible(TypeMappingUtils.java:254)
> at
> org.apache.flink.table.utils.TypeMappingUtils.checkPhysicalLogicalTypeCompatible(TypeMappingUtils.java:160)
> at
> org.apache.flink.table.utils.TypeMappingUtils.lambda$computeInCompositeType$8(TypeMappingUtils.java:232)
> at java.util.stream.Collectors.lambda$toMap$58(Collectors.java:1321)
> at java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169)
> at
> java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
> at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
> at
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
> at
> java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
> at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
> at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
> at
> org.apache.flink.table.utils.TypeMappingUtils.computeInCompositeType(TypeMappingUtils.java:214)
> at
> org.apache.flink.table.utils.TypeMappingUtils.computePhysicalIndices(TypeMappingUtils.java:192)
> at
> org.apache.flink.table.utils.TypeMappingUtils.computePhysicalIndicesOrTimeAttributeMarkers(TypeMappingUtils.java:112)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.computeIndexMapping(StreamExecTableSourceScan.scala:212)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:107)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:62)
> at
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlan(StreamExecTableSourceScan.scala:62)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange.translateToPlanInternal(StreamExecExchange.scala:84)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange.translateToPlanInternal(StreamExecExchange.scala:44)
> at
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange.translateToPlan(StreamExecExchange.scala:44)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLimit.translateToPlanInternal(StreamExecLimit.scala:161)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLimit.translateToPlanInternal(StreamExecLimit.scala:51)
> at
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLimit.translateToPlan(StreamExecLimit.scala:51)
> at
> 

[jira] [Updated] (FLINK-16800) TypeMappingUtils#checkIfCompatible didn't deal with nested types

2020-03-26 Thread Zhenghua Gao (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-16800?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhenghua Gao updated FLINK-16800:
-
Description: 
the planner will use TypeMappingUtils#checkIfCompatible to validate logical 
schema and physical schema are compatible when translate 
CatalogSinkModifyOperation to Calcite relational expression.  The validation 
didn't deal with nested types well, which could expose the following 
ValidationException:
{code:java}
Exception in thread "main" org.apache.flink.table.api.ValidationException:
Type ARRAY> of table field 'old'
does not match with the physical type ARRAY> of the 'old' field of the TableSource return
type.
at
org.apache.flink.table.utils.TypeMappingUtils.lambda$checkPhysicalLogicalTypeCompatible$4(TypeMappingUtils.java:164)
at
org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:277)
at
org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:254)
at
org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor.visit(LogicalTypeDefaultVisitor.java:157)
at org.apache.flink.table.types.logical.ArrayType.accept(ArrayType.java:110)
at
org.apache.flink.table.utils.TypeMappingUtils.checkIfCompatible(TypeMappingUtils.java:254)
at
org.apache.flink.table.utils.TypeMappingUtils.checkPhysicalLogicalTypeCompatible(TypeMappingUtils.java:160)
at
org.apache.flink.table.utils.TypeMappingUtils.lambda$computeInCompositeType$8(TypeMappingUtils.java:232)
at java.util.stream.Collectors.lambda$toMap$58(Collectors.java:1321)
at java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169)
at
java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
at
java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
at
java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
at
org.apache.flink.table.utils.TypeMappingUtils.computeInCompositeType(TypeMappingUtils.java:214)
at
org.apache.flink.table.utils.TypeMappingUtils.computePhysicalIndices(TypeMappingUtils.java:192)
at
org.apache.flink.table.utils.TypeMappingUtils.computePhysicalIndicesOrTimeAttributeMarkers(TypeMappingUtils.java:112)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.computeIndexMapping(StreamExecTableSourceScan.scala:212)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:107)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:62)
at
org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlan(StreamExecTableSourceScan.scala:62)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange.translateToPlanInternal(StreamExecExchange.scala:84)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange.translateToPlanInternal(StreamExecExchange.scala:44)
at
org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange.translateToPlan(StreamExecExchange.scala:44)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLimit.translateToPlanInternal(StreamExecLimit.scala:161)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLimit.translateToPlanInternal(StreamExecLimit.scala:51)
at
org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLimit.translateToPlan(StreamExecLimit.scala:51)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToTransformation(StreamExecSink.scala:184)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:153)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48)
at
org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48)
at
org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60)
at
org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:59)
at

[jira] [Created] (FLINK-16800) TypeMappingUtils#checkPhysicalLogicalTypeCompatible didn't deal with nested types

2020-03-26 Thread Zhenghua Gao (Jira)
Zhenghua Gao created FLINK-16800:


 Summary: TypeMappingUtils#checkPhysicalLogicalTypeCompatible 
didn't deal with nested types
 Key: FLINK-16800
 URL: https://issues.apache.org/jira/browse/FLINK-16800
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / API
Affects Versions: 1.10.0
Reporter: Zhenghua Gao
 Fix For: 1.11.0


the planner will use TypeMappingUtils#checkPhysicalLogicalTypeCompatible to 
validate logical schema and physical schema are compatible when translate 
CatalogSinkModifyOperation to Calcite relational expression.  The validation 
didn't deal with nested types well, which could expose the following 
ValidationException:
{code:java}
Exception in thread "main" org.apache.flink.table.api.ValidationException:
Type ARRAY> of table field 'old'
does not match with the physical type ARRAY> of the 'old' field of the TableSource return
type.
at
org.apache.flink.table.utils.TypeMappingUtils.lambda$checkPhysicalLogicalTypeCompatible$4(TypeMappingUtils.java:164)
at
org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:277)
at
org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:254)
at
org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor.visit(LogicalTypeDefaultVisitor.java:157)
at org.apache.flink.table.types.logical.ArrayType.accept(ArrayType.java:110)
at
org.apache.flink.table.utils.TypeMappingUtils.checkIfCompatible(TypeMappingUtils.java:254)
at
org.apache.flink.table.utils.TypeMappingUtils.checkPhysicalLogicalTypeCompatible(TypeMappingUtils.java:160)
at
org.apache.flink.table.utils.TypeMappingUtils.lambda$computeInCompositeType$8(TypeMappingUtils.java:232)
at java.util.stream.Collectors.lambda$toMap$58(Collectors.java:1321)
at java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169)
at
java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
at
java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
at
java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
at
org.apache.flink.table.utils.TypeMappingUtils.computeInCompositeType(TypeMappingUtils.java:214)
at
org.apache.flink.table.utils.TypeMappingUtils.computePhysicalIndices(TypeMappingUtils.java:192)
at
org.apache.flink.table.utils.TypeMappingUtils.computePhysicalIndicesOrTimeAttributeMarkers(TypeMappingUtils.java:112)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.computeIndexMapping(StreamExecTableSourceScan.scala:212)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:107)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:62)
at
org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlan(StreamExecTableSourceScan.scala:62)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange.translateToPlanInternal(StreamExecExchange.scala:84)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange.translateToPlanInternal(StreamExecExchange.scala:44)
at
org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange.translateToPlan(StreamExecExchange.scala:44)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLimit.translateToPlanInternal(StreamExecLimit.scala:161)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLimit.translateToPlanInternal(StreamExecLimit.scala:51)
at
org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLimit.translateToPlan(StreamExecLimit.scala:51)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToTransformation(StreamExecSink.scala:184)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:153)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48)
at
org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48)
at

[jira] [Commented] (FLINK-16379) Introduce fromValues in TableEnvironment

2020-03-13 Thread Zhenghua Gao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16379?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17059150#comment-17059150
 ] 

Zhenghua Gao commented on FLINK-16379:
--

+1 to merge fromElements to fromValues. 

Construction empty table with Values may be useless because plan with empty 
LogicalValues could be optimized. 

> Introduce fromValues in TableEnvironment
> 
>
> Key: FLINK-16379
> URL: https://issues.apache.org/jira/browse/FLINK-16379
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.11.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Introduce a fromValues method to TableEnvironment similar to {{VALUES}} 
> clause in SQL
> The suggested API could look like:
> {code}
>   /**
>* Creates a Table from a given row constructing expressions.
>*
>* Examples:
>*
>* You can use {@link Expressions#row(Object, Object...)} to create 
> a composite rows:
>* {@code
>*  tEnv.fromValues(
>*  row(1, "ABC"),
>*  row(2L, "ABCDE")
>*  )
>* }
>* will produce a Table with a schema as follows:
>* {@code
>*  root
>*  |-- f0: BIGINT NOT NULL
>*  |-- f1: VARCHAR(5) NOT NULL
>* }
>*
>* ROWs that are a result of e.g. a function call are not flattened
>* {@code
>*  public class RowFunction extends ScalarFunction {
>*  @DataTypeHint("ROW")
>*  Row eval();
>*  }
>*
>*  tEnv.fromValues(
>*  call(new RowFunction()),
>*  call(new RowFunction())
>*  )
>* }
>* will produce a Table with a schema as follows:
>* {@code
>*  root
>*  |-- f0: ROW<`f0` BIGINT, `f1` VARCHAR(5)>
>* }
>*
>* The row constructor can be dropped to create a table with a 
> single row:
>*
>* ROWs that are a result of e.g. a function call are not flattened
>* {@code
>*  tEnv.fromValues(
>*  1,
>*  2L,
>*  3
>*  )
>* }
>* will produce a Table with a schema as follows:
>* {@code
>*  root
>*  |-- f0: BIGINT NOT NULL
>* }
>*
>* @param expressions Expressions for constructing rows of the VALUES 
> table.
>*/
>   Table fromValues(Expression... expressions);
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-16379) Introduce fromValues in TableEnvironment

2020-03-11 Thread Zhenghua Gao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16379?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17057536#comment-17057536
 ] 

Zhenghua Gao commented on FLINK-16379:
--

That's right that we can improve `fromValues(Object ...)`. But how would we 
implement the following requirements: 1) construct an empty table. 2) construct 
a table with POJO
 

> Introduce fromValues in TableEnvironment
> 
>
> Key: FLINK-16379
> URL: https://issues.apache.org/jira/browse/FLINK-16379
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.11.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Introduce a fromValues method to TableEnvironment similar to {{VALUES}} 
> clause in SQL
> The suggested API could look like:
> {code}
>   /**
>* Creates a Table from a given row constructing expressions.
>*
>* Examples:
>*
>* You can use {@link Expressions#row(Object, Object...)} to create 
> a composite rows:
>* {@code
>*  tEnv.fromValues(
>*  row(1, "ABC"),
>*  row(2L, "ABCDE")
>*  )
>* }
>* will produce a Table with a schema as follows:
>* {@code
>*  root
>*  |-- f0: BIGINT NOT NULL
>*  |-- f1: VARCHAR(5) NOT NULL
>* }
>*
>* ROWs that are a result of e.g. a function call are not flattened
>* {@code
>*  public class RowFunction extends ScalarFunction {
>*  @DataTypeHint("ROW")
>*  Row eval();
>*  }
>*
>*  tEnv.fromValues(
>*  call(new RowFunction()),
>*  call(new RowFunction())
>*  )
>* }
>* will produce a Table with a schema as follows:
>* {@code
>*  root
>*  |-- f0: ROW<`f0` BIGINT, `f1` VARCHAR(5)>
>* }
>*
>* The row constructor can be dropped to create a table with a 
> single row:
>*
>* ROWs that are a result of e.g. a function call are not flattened
>* {@code
>*  tEnv.fromValues(
>*  1,
>*  2L,
>*  3
>*  )
>* }
>* will produce a Table with a schema as follows:
>* {@code
>*  root
>*  |-- f0: BIGINT NOT NULL
>* }
>*
>* @param expressions Expressions for constructing rows of the VALUES 
> table.
>*/
>   Table fromValues(Expression... expressions);
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-16379) Introduce fromValues in TableEnvironment

2020-03-03 Thread Zhenghua Gao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16379?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17050732#comment-17050732
 ] 

Zhenghua Gao commented on FLINK-16379:
--

>From the user perspective, constructing a Table from raw objects is a more 
>convenient and direct way.  Expression brings with it additional complexity 
>and a steeper learning curve (especially among the java users). Another 
>concern is the optimization of LogicalValues and LogicalTableScan is very 
>different. There would be big changes for the plan test if we use fromValues.

> Introduce fromValues in TableEnvironment
> 
>
> Key: FLINK-16379
> URL: https://issues.apache.org/jira/browse/FLINK-16379
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.11.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Introduce a fromValues method to TableEnvironment similar to {{VALUES}} 
> clause in SQL
> The suggested API could look like:
> {code}
>   /**
>* Creates a Table from a given row constructing expressions.
>*
>* Examples:
>*
>* You can use {@link Expressions#row(Object, Object...)} to create 
> a composite rows:
>* {@code
>*  tEnv.fromValues(
>*  row(1, "ABC"),
>*  row(2L, "ABCDE")
>*  )
>* }
>* will produce a Table with a schema as follows:
>* {@code
>*  root
>*  |-- f0: BIGINT NOT NULL
>*  |-- f1: VARCHAR(5) NOT NULL
>* }
>*
>* ROWs that are a result of e.g. a function call are not flattened
>* {@code
>*  public class RowFunction extends ScalarFunction {
>*  @DataTypeHint("ROW")
>*  Row eval();
>*  }
>*
>*  tEnv.fromValues(
>*  call(new RowFunction()),
>*  call(new RowFunction())
>*  )
>* }
>* will produce a Table with a schema as follows:
>* {@code
>*  root
>*  |-- f0: ROW<`f0` BIGINT, `f1` VARCHAR(5)>
>* }
>*
>* The row constructor can be dropped to create a table with a 
> single row:
>*
>* ROWs that are a result of e.g. a function call are not flattened
>* {@code
>*  tEnv.fromValues(
>*  1,
>*  2L,
>*  3
>*  )
>* }
>* will produce a Table with a schema as follows:
>* {@code
>*  root
>*  |-- f0: BIGINT NOT NULL
>* }
>*
>* @param expressions Expressions for constructing rows of the VALUES 
> table.
>*/
>   Table fromValues(Expression... expressions);
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-16029) Remove register source and sink in test cases of blink planner

2020-03-01 Thread Zhenghua Gao (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-16029?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhenghua Gao updated FLINK-16029:
-
Summary: Remove register source and sink in test cases of blink planner  
(was: Remove register source and sink in test cases of planner)

> Remove register source and sink in test cases of blink planner
> --
>
> Key: FLINK-16029
> URL: https://issues.apache.org/jira/browse/FLINK-16029
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API, Tests
>Reporter: Zhenghua Gao
>Priority: Major
>
> Many test cases of planner use TableEnvironement.registerTableSource() and 
> registerTableSink() which should be avoid。We want to refactor these cases via 
> TableEnvironment.connect().



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-16327) Add TableEnvironment.fromElements interfaces for usability

2020-03-01 Thread Zhenghua Gao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16327?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17048740#comment-17048740
 ] 

Zhenghua Gao commented on FLINK-16327:
--

CC [~twalthr] [~dwysakowicz] What do you think about this?

> Add TableEnvironment.fromElements interfaces for usability
> --
>
> Key: FLINK-16327
> URL: https://issues.apache.org/jira/browse/FLINK-16327
> Project: Flink
>  Issue Type: New Feature
>  Components: Table SQL / API
>Affects Versions: 1.11.0
>Reporter: Zhenghua Gao
>Priority: Major
>
> h1. Interface
> {code:java}
> /** 
>* Creates a table from a group of objects (known as its elements). The 
> schema of the table 
>* would be inferred from the type of elements. 
>* 
>* @param data a group of objects. 
>*/
> Table fromElements(Collection data);
> /** 
>* Creates a table from a group of objects (known as its elements). The 
> schema of the table 
>* would be inferred from the passed in data type. 
>* 
>* @param data a group of objects 
>* @param dataType the data type of the data 
>*/
> Table fromElements(Collection data, DataType dataType);
> {code}
> h1. Use Case
>  * One potential use case for Table API
> {code:java}
> @Test 
> def testUnregisteredCollectionSource1(): Unit = {
>   val env = StreamExecutionEnvironment.getExecutionEnvironment
>   val tEnv = StreamTableEnvironment.create(env)
>   StreamITCase.testResults = mutable.MutableList()
>   val data = Seq(
> Row.of("Mike", new JInt(5), new JDouble(12.3), "Smith"))
>   
>   tEnv.fromElements(data.asJava)
>   .as('first, 'id, 'score, 'last)
> .where('id > 4)
> .select('last, 'score * 2)
> .toAppendStream[Row]
> .addSink(new StreamITCase.StringSink[Row])
>   env.execute()
> }
> @Test 
> def testUnregisteredCollectionSource2(): Unit = {
>   val env = StreamExecutionEnvironment.getExecutionEnvironment
>   val tEnv = StreamTableEnvironment.create(env)
>   StreamITCase.testResults = mutable.MutableList()
>   val data = Seq(
> Row.of("Mike", new JInt(5), new JDouble(12.3), "Smith"))
>   val dataType = DataTypes.ROW(
> DataTypes.FIELD("first", DataTypes.STRING()),
> DataTypes.FIELD("id", DataTypes.INT()),
> DataTypes.FIELD("score", DataTypes.DOUBLE()),
> DataTypes.FIELD("last", DataTypes.STRING()))
>   tEnv.fromElements(data.asJava, dataType)
> .where('id > 4)
> .select('last, 'score * 2)
> .toAppendStream[Row]
> .addSink(new StreamITCase.StringSink[Row])
>   env.execute()
> }
> {code}
>  * One potential use case for SQL
> {code:java}
> @Test 
> def testUnregisteredCollectionSource1(): Unit = {
>   val env = StreamExecutionEnvironment.getExecutionEnvironment
>   val tEnv = StreamTableEnvironment.create(env)
>   StreamITCase.testResults = mutable.MutableList()
>   val data = Seq(
> Row.of("Mike", new JInt(5), new JDouble(12.3), "Smith"))
>   
>   val table = tEnv.fromElements(data.asJava).as('first, 'id, 'score, 'last)
>   
>   tEnv.createTemporaryView("T", table)
>   tEnv.sqlQuery("SELECT last, score * 2 FROM T WHERE id > 4")
>   .toAppendStream[Row]
>   .addSink(new StreamITCase.StringSink[Row])
>   env.execute()
> }
> @Test 
> def testUnregisteredCollectionSource2(): Unit = {
>   val env = StreamExecutionEnvironment.getExecutionEnvironment
>   val tEnv = StreamTableEnvironment.create(env)
>   StreamITCase.testResults = mutable.MutableList()
>   val data = Seq(
> Row.of("Mike", new JInt(5), new JDouble(12.3), "Smith"))
>   val dataType = DataTypes.ROW(
> DataTypes.FIELD("first", DataTypes.STRING()),
> DataTypes.FIELD("id", DataTypes.INT()),
> DataTypes.FIELD("score", DataTypes.DOUBLE()),
> DataTypes.FIELD("last", DataTypes.STRING()))
>   val table = tEnv.fromElements(data.asJava, dataType)
>   tEnv.createTemporaryView("T", table)
>   tEnv.sqlQuery("SELECT last, score * 2 FROM T WHERE id > 4")
>   .toAppendStream[Row]
>   .addSink(new StreamITCase.StringSink[Row])
>   env.execute()
> }
> {code}
> h1. The proposal
>  * data type inference
> We need to infer the data type from the data for the first interface. A 
> potential tool is the DataTypeExtractor, but it doesn't support scala.tuple, 
> Row, etc. For the most popular in our test cases Row or scala.tuple type, we 
> could enumerate and use a recursive traversal method to get all available 
> types of underlying objects. This can solve most of the cases and improve 
> usability.
>  * proposed changes
>  ** A CollectionQueryOperation which implements QueryOperation to describe 
> the relational operation
>  ** The logical and physical RelNode for legacy planner. In the physical 
> node, we can translate the data to DataStream
>  ** The logical and physical RelNode for blink planner. In the physical node, 
> we can translate the 

[jira] [Created] (FLINK-16327) Add TableEnvironment.fromElements interfaces for usability

2020-02-28 Thread Zhenghua Gao (Jira)
Zhenghua Gao created FLINK-16327:


 Summary: Add TableEnvironment.fromElements interfaces for usability
 Key: FLINK-16327
 URL: https://issues.apache.org/jira/browse/FLINK-16327
 Project: Flink
  Issue Type: New Feature
  Components: Table SQL / API
Affects Versions: 1.11.0
Reporter: Zhenghua Gao


h1. Interface
{code:java}
/** 
   * Creates a table from a group of objects (known as its elements). The 
schema of the table 
   * would be inferred from the type of elements. 
   * 
   * @param data a group of objects. 
   */
Table fromElements(Collection data);
/** 
   * Creates a table from a group of objects (known as its elements). The 
schema of the table 
   * would be inferred from the passed in data type. 
   * 
   * @param data a group of objects 
   * @param dataType the data type of the data 
   */
Table fromElements(Collection data, DataType dataType);
{code}
h1. Use Case
 * One potential use case for Table API

{code:java}
@Test 
def testUnregisteredCollectionSource1(): Unit = {
  val env = StreamExecutionEnvironment.getExecutionEnvironment
  val tEnv = StreamTableEnvironment.create(env)
  StreamITCase.testResults = mutable.MutableList()

  val data = Seq(
Row.of("Mike", new JInt(5), new JDouble(12.3), "Smith"))
  
  tEnv.fromElements(data.asJava)
.as('first, 'id, 'score, 'last)
.where('id > 4)
.select('last, 'score * 2)
.toAppendStream[Row]
.addSink(new StreamITCase.StringSink[Row])

  env.execute()
}

@Test 
def testUnregisteredCollectionSource2(): Unit = {
  val env = StreamExecutionEnvironment.getExecutionEnvironment
  val tEnv = StreamTableEnvironment.create(env)
  StreamITCase.testResults = mutable.MutableList()

  val data = Seq(
Row.of("Mike", new JInt(5), new JDouble(12.3), "Smith"))

  val dataType = DataTypes.ROW(
DataTypes.FIELD("first", DataTypes.STRING()),
DataTypes.FIELD("id", DataTypes.INT()),
DataTypes.FIELD("score", DataTypes.DOUBLE()),
DataTypes.FIELD("last", DataTypes.STRING()))

  tEnv.fromElements(data.asJava, dataType)
.where('id > 4)
.select('last, 'score * 2)
.toAppendStream[Row]
.addSink(new StreamITCase.StringSink[Row])

  env.execute()
}
{code}
 * One potential use case for SQL

{code:java}
@Test 
def testUnregisteredCollectionSource1(): Unit = {
  val env = StreamExecutionEnvironment.getExecutionEnvironment
  val tEnv = StreamTableEnvironment.create(env)
  StreamITCase.testResults = mutable.MutableList()

  val data = Seq(
Row.of("Mike", new JInt(5), new JDouble(12.3), "Smith"))
  
  val table = tEnv.fromElements(data.asJava).as('first, 'id, 'score, 'last)
  
  tEnv.createTemporaryView("T", table)

  tEnv.sqlQuery("SELECT last, score * 2 FROM T WHERE id > 4")
.toAppendStream[Row]
.addSink(new StreamITCase.StringSink[Row])

  env.execute()
}

@Test 
def testUnregisteredCollectionSource2(): Unit = {
  val env = StreamExecutionEnvironment.getExecutionEnvironment
  val tEnv = StreamTableEnvironment.create(env)
  StreamITCase.testResults = mutable.MutableList()

  val data = Seq(
Row.of("Mike", new JInt(5), new JDouble(12.3), "Smith"))

  val dataType = DataTypes.ROW(
DataTypes.FIELD("first", DataTypes.STRING()),
DataTypes.FIELD("id", DataTypes.INT()),
DataTypes.FIELD("score", DataTypes.DOUBLE()),
DataTypes.FIELD("last", DataTypes.STRING()))

  val table = tEnv.fromElements(data.asJava, dataType)
  tEnv.createTemporaryView("T", table)

  tEnv.sqlQuery("SELECT last, score * 2 FROM T WHERE id > 4")
.toAppendStream[Row]
.addSink(new StreamITCase.StringSink[Row])

  env.execute()
}
{code}
h1. The proposal
 * data type inference

We need to infer the data type from the data for the first interface. A 
potential tool is the DataTypeExtractor, but it doesn't support scala.tuple, 
Row, etc. For the most popular in our test cases Row or scala.tuple type, we 
could enumerate and use a recursive traversal method to get all available types 
of underlying objects. This can solve most of the cases and improve usability.
 * proposed changes
 ** A CollectionQueryOperation which implements QueryOperation to describe the 
relational operation
 ** The logical and physical RelNode for legacy planner. In the physical node, 
we can translate the data to DataStream
 ** The logical and physical RelNode for blink planner. In the physical node, 
we can translate the data to Transformation

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-13302) DateTimeUtils.unixDateCeil returns the same value as unixDateFloor does

2020-02-20 Thread Zhenghua Gao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-13302?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17041568#comment-17041568
 ] 

Zhenghua Gao commented on FLINK-13302:
--

[~tison] The related issue is fixed in avatica-1.16.0, we could remove the code 
in flink side after we bump avatica-core to avatica-1.16.0.

[~danny0405] What the plan for avatica-1.16.0? 

> DateTimeUtils.unixDateCeil returns the same value as unixDateFloor does
> ---
>
> Key: FLINK-13302
> URL: https://issues.apache.org/jira/browse/FLINK-13302
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Legacy Planner
>Affects Versions: 1.9.0, 1.10.0
>Reporter: Zhenghua Gao
>Assignee: Zhenghua Gao
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.9.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> Internally, unixDateCeil & unixDateFloor call julianDateFloor and pass a 
> boolean parameter to differentiate them. But unixDateCeil passes wrong 
> parameter value and returns the same value as unixDateFloor does.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-16160) Schema#proctime and Schema#rowtime don't work in TableEnvironment#connect code path

2020-02-19 Thread Zhenghua Gao (Jira)
Zhenghua Gao created FLINK-16160:


 Summary: Schema#proctime and Schema#rowtime don't work in 
TableEnvironment#connect code path
 Key: FLINK-16160
 URL: https://issues.apache.org/jira/browse/FLINK-16160
 Project: Flink
  Issue Type: Sub-task
Reporter: Zhenghua Gao


In ConnectTableDescriptor#createTemporaryTable, the proctime/rowtime properties 
are ignored so the generated catalog table is not correct. We should fix this 
to let TableEnvironment#connect() support watermark.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-16143) Turn on more date time functions of blink planner

2020-02-19 Thread Zhenghua Gao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17039787#comment-17039787
 ] 

Zhenghua Gao commented on FLINK-16143:
--

[~jark] Actually the TIMESTAMPDIFF/TIMESTAMPADD with DATE parameter supports 
DATE type: 

timestampadd(DAY, 1, date '2016-06-15') gets date '2016-06-16' 

timestampdiff(DAY, date '2016-06-15', date '2016-06-14') gets -1

> Turn on more date time functions of blink planner
> -
>
> Key: FLINK-16143
> URL: https://issues.apache.org/jira/browse/FLINK-16143
> Project: Flink
>  Issue Type: New Feature
>  Components: Table SQL / Planner
>Reporter: Zili Chen
>Priority: Major
> Fix For: 1.11.0
>
>
> Currently blink planner has a series of built-in functions such as
> DATEDIFF
>  DATE_ADD
>  DATE_SUB
> which haven't been into used so far. We could add the necessary register, 
> generate and convert code to make it available in production scope.
>  
> what do you think [~jark]?
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-16117) Avoid register source in TableTestBase#addTableSource

2020-02-17 Thread Zhenghua Gao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16117?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17038729#comment-17038729
 ] 

Zhenghua Gao commented on FLINK-16117:
--

Most of these tests are plan tests and there is no input data.

My initial thought is use tableEnv.connect to replace 
tableEnv.registerTableSource in TableTestBase#addTableSource.

 
 

> Avoid register source in TableTestBase#addTableSource
> -
>
> Key: FLINK-16117
> URL: https://issues.apache.org/jira/browse/FLINK-16117
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Zhenghua Gao
>Assignee: Zhenghua Gao
>Priority: Major
>
> This affects thousands of unit tests:
> 1) explainSourceAsString of CatalogSourceTable changes
> 2)JoinTest#testUDFInJoinCondition: SQL keywords must be escaped
> 3) GroupWindowTest#testTimestampEventTimeTumblingGroupWindowWithProperties: 
> Reference to a rowtime or proctime window required
> 4) SetOperatorsTest#testInWithProject: legacy type vs new type
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-16117) Avoid register source in TableTestBase#addTableSource

2020-02-17 Thread Zhenghua Gao (Jira)
Zhenghua Gao created FLINK-16117:


 Summary: Avoid register source in TableTestBase#addTableSource
 Key: FLINK-16117
 URL: https://issues.apache.org/jira/browse/FLINK-16117
 Project: Flink
  Issue Type: Sub-task
Reporter: Zhenghua Gao


This affects thousands of unit tests:

1) explainSourceAsString of CatalogSourceTable changes

2)JoinTest#testUDFInJoinCondition: SQL keywords must be escaped

3) GroupWindowTest#testTimestampEventTimeTumblingGroupWindowWithProperties: 
Reference to a rowtime or proctime window required

4) SetOperatorsTest#testInWithProject: legacy type vs new type
 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-16029) Remove register source and sink in test cases of planner

2020-02-12 Thread Zhenghua Gao (Jira)
Zhenghua Gao created FLINK-16029:


 Summary: Remove register source and sink in test cases of planner
 Key: FLINK-16029
 URL: https://issues.apache.org/jira/browse/FLINK-16029
 Project: Flink
  Issue Type: Sub-task
Reporter: Zhenghua Gao


Many test cases of planner use TableEnvironement.registerTableSource() and 
registerTableSink() which should be avoid。We want to refactor these cases via 
TableEnvironment.connect().



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-15990) Remove register source and sink in ConnectTableDescriptor

2020-02-11 Thread Zhenghua Gao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15990?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17034351#comment-17034351
 ] 

Zhenghua Gao commented on FLINK-15990:
--

the comment of *TableEnvironment.connect()* related code [1][2][3] should be 
updated too.

[1] 
[https://github.com/apache/flink/blob/master/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java#L489]

[2] 
[https://github.com/apache/flink/blob/master/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/BatchTableEnvironment.java#L342]

[3] 
[https://github.com/apache/flink/blob/master/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/BatchTableEnvironment.scala#L304]

> Remove register source and sink in ConnectTableDescriptor
> -
>
> Key: FLINK-15990
> URL: https://issues.apache.org/jira/browse/FLINK-15990
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Reporter: Jingsong Lee
>Assignee: Jingsong Lee
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.11.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> We should always use {{ConnectTableDescriptor.createTemporaryTable}}.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-15968) LegacyTypeInfoDataTypeConverter should support conversion from BINARY/VARBINARY to BYTE_PRIMITIVE_ARRAY_TYPE_INFO

2020-02-10 Thread Zhenghua Gao (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15968?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhenghua Gao updated FLINK-15968:
-
Description: 
Currently LegacyTypeInfoDataTypeConverter only support conversion between 
DataTypes.BYTES and BYTE_PRIMITIVE_ARRAY_TYPE_INFO. When we update connectors 
to new type system, we need to convert BINARY(p) or VARBINARY(p) to 
BYTE_PRIMITIVE_ARRAY_TYPE_INFO.  A similar logic has been implemented for 
CHAR/VARCHAR type.

The Hive connector achieve this via depending blink planner‘s conversion logic, 
which is odd because a planner dependency won't be necessary for connectors.

 
 

  was:
Currently LegacyTypeInfoDataTypeConverter only support conversion between 
DataTypes.BYTES and BYTE_PRIMITIVE_ARRAY_TYPE_INFO. When we update connectors 
to new type system, we need to convert BINARY(p) or VARBINARY(p) to 
BYTE_PRIMITIVE_ARRAY_TYPE_INFO. 

The Hive connector achieve this via depending blink planner‘s conversion logic, 
which is odd because a planner dependency won't be necessary for connectors.

 


> LegacyTypeInfoDataTypeConverter should support conversion from 
> BINARY/VARBINARY to BYTE_PRIMITIVE_ARRAY_TYPE_INFO
> -
>
> Key: FLINK-15968
> URL: https://issues.apache.org/jira/browse/FLINK-15968
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Legacy Planner, Table SQL / Planner
>Affects Versions: 1.11.0
>Reporter: Zhenghua Gao
>Priority: Major
>
> Currently LegacyTypeInfoDataTypeConverter only support conversion between 
> DataTypes.BYTES and BYTE_PRIMITIVE_ARRAY_TYPE_INFO. When we update connectors 
> to new type system, we need to convert BINARY(p) or VARBINARY(p) to 
> BYTE_PRIMITIVE_ARRAY_TYPE_INFO.  A similar logic has been implemented for 
> CHAR/VARCHAR type.
> The Hive connector achieve this via depending blink planner‘s conversion 
> logic, which is odd because a planner dependency won't be necessary for 
> connectors.
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-15968) LegacyTypeInfoDataTypeConverter should support conversion from BINARY/VARBINARY to BYTE_PRIMITIVE_ARRAY_TYPE_INFO

2020-02-10 Thread Zhenghua Gao (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15968?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhenghua Gao updated FLINK-15968:
-
Summary: LegacyTypeInfoDataTypeConverter should support conversion from 
BINARY/VARBINARY to BYTE_PRIMITIVE_ARRAY_TYPE_INFO  (was: 
LegacyTypeInfoDataTypeConverter should support conversion between 
BINARY/VARBINARY and BYTE_PRIMITIVE_ARRAY_TYPE_INFO)

> LegacyTypeInfoDataTypeConverter should support conversion from 
> BINARY/VARBINARY to BYTE_PRIMITIVE_ARRAY_TYPE_INFO
> -
>
> Key: FLINK-15968
> URL: https://issues.apache.org/jira/browse/FLINK-15968
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Legacy Planner, Table SQL / Planner
>Affects Versions: 1.11.0
>Reporter: Zhenghua Gao
>Priority: Major
>
> Currently LegacyTypeInfoDataTypeConverter only support conversion between 
> DataTypes.BYTES and BYTE_PRIMITIVE_ARRAY_TYPE_INFO. When we update connectors 
> to new type system, we need to convert BINARY(p) or VARBINARY(p) to 
> BYTE_PRIMITIVE_ARRAY_TYPE_INFO. 
> The Hive connector achieve this via depending blink planner‘s conversion 
> logic, which is odd because a planner dependency won't be necessary for 
> connectors.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-15968) LegacyTypeInfoDataTypeConverter should support conversion between BINARY/VARBINARY and BYTE_PRIMITIVE_ARRAY_TYPE_INFO

2020-02-10 Thread Zhenghua Gao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15968?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17033551#comment-17033551
 ] 

Zhenghua Gao commented on FLINK-15968:
--

[~jark] [~lzljs3620320] What do you think about this?

> LegacyTypeInfoDataTypeConverter should support conversion between 
> BINARY/VARBINARY and BYTE_PRIMITIVE_ARRAY_TYPE_INFO
> -
>
> Key: FLINK-15968
> URL: https://issues.apache.org/jira/browse/FLINK-15968
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Legacy Planner, Table SQL / Planner
>Affects Versions: 1.11.0
>Reporter: Zhenghua Gao
>Priority: Major
>
> Currently LegacyTypeInfoDataTypeConverter only support conversion between 
> DataTypes.BYTES and BYTE_PRIMITIVE_ARRAY_TYPE_INFO. When we update connectors 
> to new type system, we need to convert BINARY(p) or VARBINARY(p) to 
> BYTE_PRIMITIVE_ARRAY_TYPE_INFO. 
> The Hive connector achieve this via depending blink planner‘s conversion 
> logic, which is odd because a planner dependency won't be necessary for 
> connectors.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-15968) LegacyTypeInfoDataTypeConverter should support conversion between BINARY/VARBINARY and BYTE_PRIMITIVE_ARRAY_TYPE_INFO

2020-02-10 Thread Zhenghua Gao (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15968?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhenghua Gao updated FLINK-15968:
-
Description: 
Currently LegacyTypeInfoDataTypeConverter only support conversion between 
DataTypes.BYTES and BYTE_PRIMITIVE_ARRAY_TYPE_INFO. When we update connectors 
to new type system, we need to convert BINARY(p) or VARBINARY(p) to 
BYTE_PRIMITIVE_ARRAY_TYPE_INFO. 

The Hive connector achieve this via depending blink planner‘s conversion logic, 
which is odd because a planner dependency won't be necessary for connectors.

 

  was:
Currently LegacyTypeInfoDataTypeConverter only support conversion between 
DataTypes.BYTES and BYTE_PRIMITIVE_ARRAY_TYPE_INFO. When we update connectors 
to new type system, we need to convert BINARY(n) or VARBINARY(n) to 
BYTE_PRIMITIVE_ARRAY_TYPE_INFO. 

The Hive connector achieve this via depending blink planner‘s conversion logic, 
which is odd because a planner dependency won't be necessary for connectors.

 


> LegacyTypeInfoDataTypeConverter should support conversion between 
> BINARY/VARBINARY and BYTE_PRIMITIVE_ARRAY_TYPE_INFO
> -
>
> Key: FLINK-15968
> URL: https://issues.apache.org/jira/browse/FLINK-15968
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Legacy Planner, Table SQL / Planner
>Affects Versions: 1.11.0
>Reporter: Zhenghua Gao
>Priority: Major
>
> Currently LegacyTypeInfoDataTypeConverter only support conversion between 
> DataTypes.BYTES and BYTE_PRIMITIVE_ARRAY_TYPE_INFO. When we update connectors 
> to new type system, we need to convert BINARY(p) or VARBINARY(p) to 
> BYTE_PRIMITIVE_ARRAY_TYPE_INFO. 
> The Hive connector achieve this via depending blink planner‘s conversion 
> logic, which is odd because a planner dependency won't be necessary for 
> connectors.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-15968) LegacyTypeInfoDataTypeConverter should support conversion between BINARY/VARBINARY and BYTE_PRIMITIVE_ARRAY_TYPE_INFO

2020-02-10 Thread Zhenghua Gao (Jira)
Zhenghua Gao created FLINK-15968:


 Summary: LegacyTypeInfoDataTypeConverter should support conversion 
between BINARY/VARBINARY and BYTE_PRIMITIVE_ARRAY_TYPE_INFO
 Key: FLINK-15968
 URL: https://issues.apache.org/jira/browse/FLINK-15968
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Legacy Planner, Table SQL / Planner
Affects Versions: 1.11.0
Reporter: Zhenghua Gao


Currently LegacyTypeInfoDataTypeConverter only support conversion between 
DataTypes.BYTES and BYTE_PRIMITIVE_ARRAY_TYPE_INFO. When we update connectors 
to new type system, we need to convert BINARY(n) or VARBINARY(n) to 
BYTE_PRIMITIVE_ARRAY_TYPE_INFO. 

The Hive connector achieve this via depending blink planner‘s conversion logic, 
which is odd because a planner dependency won't be necessary for connectors.

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-15881) Stop overriding TableSource::getReturnType in jdbc connector

2020-02-04 Thread Zhenghua Gao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15881?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17030381#comment-17030381
 ] 

Zhenghua Gao commented on FLINK-15881:
--

This duplicates to FLINK-15445. The PR for FLINK-15445 would remove 
*getRetrunType* in JDBCTableSource
 

> Stop overriding TableSource::getReturnType in jdbc connector
> 
>
> Key: FLINK-15881
> URL: https://issues.apache.org/jira/browse/FLINK-15881
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / JDBC, Table SQL / API
>Reporter: Kurt Young
>Assignee: Zhenghua Gao
>Priority: Major
> Fix For: 1.11.0
>
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-15574) DataType to LogicalType conversion issue

2020-01-17 Thread Zhenghua Gao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15574?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17017874#comment-17017874
 ] 

Zhenghua Gao commented on FLINK-15574:
--

The root cause is mix use of conversion logic of blink planner and 
flink-table-common. In your code snippet, the creation of TableSchema use 
flink-table-common logic, and the translate logic of toDataStream, blink 
planner use its own converstion logic:
{code:java}
private def computeIndexMapping()
  : Array[Int] = {
  TypeMappingUtils.computePhysicalIndicesOrTimeAttributeMarkers(
tableSource,
FlinkTypeFactory.toTableSchema(getRowType).getTableColumns,  <- 
 planner conversion logic
true,
nameMapping
  )
}
{code}
Please try to use DataType directly to avoid this mix use:
{code:java}
  @Test
  def testTableToDataStreamGenericTypeInfo(): Unit = {
// create a simple Table with just one field of type array
val tEnv = scalaStreamTestUtil().tableEnv
//val tableTypeInfo = Types.ROW(
//  Array("my_arr"),
//  Array[TypeInformation[_]](BasicArrayTypeInfo.STRING_ARRAY_TYPE_INFO)
//)
//val tableSchema = TableSchema.fromTypeInfo(tableTypeInfo)
val tableSchema = TableSchema.builder()
.field("my_arr", DataTypes.ARRAY(DataTypes.STRING()))
.build()

tEnv.registerTableSource("MyTable", new TestTableSource(true, tableSchema))
val sqlQuery = "SELECT * FROM MyTable"

val table = tEnv.sqlQuery(sqlQuery)
tEnv.toAppendStream[Row](table)
  }
{code}
CC [~dwysakowicz] [~lzljs3620320]  Should we do something to handle this mix 
use of conversion logic?

> DataType to LogicalType conversion issue
> 
>
> Key: FLINK-15574
> URL: https://issues.apache.org/jira/browse/FLINK-15574
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Runtime
>Reporter: Benoit Hanotte
>Priority: Major
>  Labels: pull-request-available
> Attachments: 0001-FLINK-15574-Add-unit-test-to-reproduce-issue.patch
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> We seem to be encountering an issue with the conversion from DataType to 
> LogicalType with the Blink planner (full stacktrace below):
> {code}
> org.apache.flink.table.api.ValidationException: Type 
> LEGACY(BasicArrayTypeInfo) of table field 'my_array' does not match 
> with type BasicArrayTypeInfo of the field 'my_array' of the 
> TableSource return type.
> {code}
> It seems there exists 2 paths to do the conversion from DataType to 
> LogicalType:
> 1. TypeConversions.fromLegacyInfoToDataType():
> used for instance when calling TableSchema.fromTypeInformation().
> 2.  LogicalTypeDataTypeConverter.fromDataTypeToLogicalType():
> Deprecated but still used in TableSourceUtil and many other places.
> These 2 code paths can return a different LogicalType for the same input, 
> leading to issues when the LogicalTypes are compared to ensure they are 
> compatible.  For instance, PlannerTypeUtils.isAssignable() returns false for 
> a DataType created from BasicArrayTypeInfo (leading to the 
> ValidationException above).
> The full stacktrace is the following:
> {code}
> org.apache.flink.table.api.ValidationException: Type 
> LEGACY(BasicArrayTypeInfo) of table field 'my_array' does not match 
> with type BasicArrayTypeInfo of the field 'my_array' of the 
> TableSource return type.
>   at 
> org.apache.flink.table.planner.sources.TableSourceUtil$$anonfun$4.apply(TableSourceUtil.scala:121)
>   at 
> org.apache.flink.table.planner.sources.TableSourceUtil$$anonfun$4.apply(TableSourceUtil.scala:92)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at 
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>   at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
>   at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>   at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
>   at 
> org.apache.flink.table.planner.sources.TableSourceUtil$.computeIndexMapping(TableSourceUtil.scala:92)
>   at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:100)
>   at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:55)
>   at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54)
>   at 
> 

[jira] [Commented] (FLINK-15469) Update UpsertStreamTableSink and RetractStreamTableSink and related interface to new type system

2020-01-16 Thread Zhenghua Gao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15469?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17017755#comment-17017755
 ] 

Zhenghua Gao commented on FLINK-15469:
--

After an initial POC, the getTypeClass is not needed because Upsert/Retract 
stream table sink always need java *Tuple2*. And, we don't need any changes for 
the *getOutputType*/*getConsumedDataType* because the codegen could use 
*getRecordDataType* directly.
 

> Update UpsertStreamTableSink and RetractStreamTableSink and related interface 
> to new type system
> 
>
> Key: FLINK-15469
> URL: https://issues.apache.org/jira/browse/FLINK-15469
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / API
>Affects Versions: 1.10.0
>Reporter: Zhenghua Gao
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.11.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Currently *UpsertStreamTableSink* can only returns TypeInformation of the 
> requested record, which can't support types with precision and scale, e.g. 
> TIMESTAMP(p), DECIMAL(p,s).
> A proposal is deprecating the *getRecordType* API and adding a 
> *getRecordDataType* API instead to return the data type of the requested 
> record.
> {code:java}
> /**
>  * Returns the requested record type.
>  * 
>  * @Deprecated This method will be removed in future versions. It's 
> recommended to use {@link #getRecordDataType()} instead.
>  */
> @Deprecated
> TypeInformation getRecordType();
> /*
>  * Returns the requested record data type.
>  */
> DataType getRecordDataType();
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-15469) Update UpsertStreamTableSink and RetractStreamTableSink and related interface to new type system

2020-01-16 Thread Zhenghua Gao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15469?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17014828#comment-17014828
 ] 

Zhenghua Gao edited comment on FLINK-15469 at 1/17/20 7:12 AM:
---

Hi [~lzljs3620320], after re-think the whole thing, we should bring the 
physical data types of the sink and the type class(java tuple2 or scale tuple2) 
to the planner so that our blink planner could handle the precision things. So 
there is a proposal as following:
 # deprecate getRecordType and introduce getRecordDataType instead
 # -remove getOutputType and introduce getConsumedDataType, which returns 
ROW-
 # -introduce getTypeClass interface, which returns type class for codegen-

What do you think? I will file a PR soon if this works. 

 
  
 


was (Author: docete):
Hi [~lzljs3620320], after re-think the whole thing, we should bring the 
physical data types of the sink and the type class(java tuple2 or scale tuple2) 
to the planner so that our blink planner could handle the precision things. So 
there is a proposal as following:
 # deprecate getRecordType and introduce getRecordDataType instead
 # remove getOutputType and introduce getConsumedDataType, which returns 
ROW
 # introduce getTypeClass interface, which returns type class for codegen

What do you think? I will file a PR soon if this works. 

 
 

> Update UpsertStreamTableSink and RetractStreamTableSink and related interface 
> to new type system
> 
>
> Key: FLINK-15469
> URL: https://issues.apache.org/jira/browse/FLINK-15469
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / API
>Affects Versions: 1.10.0
>Reporter: Zhenghua Gao
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.11.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Currently *UpsertStreamTableSink* can only returns TypeInformation of the 
> requested record, which can't support types with precision and scale, e.g. 
> TIMESTAMP(p), DECIMAL(p,s).
> A proposal is deprecating the *getRecordType* API and adding a 
> *getRecordDataType* API instead to return the data type of the requested 
> record.
> {code:java}
> /**
>  * Returns the requested record type.
>  * 
>  * @Deprecated This method will be removed in future versions. It's 
> recommended to use {@link #getRecordDataType()} instead.
>  */
> @Deprecated
> TypeInformation getRecordType();
> /*
>  * Returns the requested record data type.
>  */
> DataType getRecordDataType();
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-15469) Update UpsertStreamTableSink and RetractStreamTableSink and related interface to new type system

2020-01-16 Thread Zhenghua Gao (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15469?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhenghua Gao updated FLINK-15469:
-
Summary: Update UpsertStreamTableSink and RetractStreamTableSink and 
related interface to new type system  (was: UpsertStreamTableSink should 
support new type system)

> Update UpsertStreamTableSink and RetractStreamTableSink and related interface 
> to new type system
> 
>
> Key: FLINK-15469
> URL: https://issues.apache.org/jira/browse/FLINK-15469
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / API
>Affects Versions: 1.10.0
>Reporter: Zhenghua Gao
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.11.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Currently *UpsertStreamTableSink* can only returns TypeInformation of the 
> requested record, which can't support types with precision and scale, e.g. 
> TIMESTAMP(p), DECIMAL(p,s).
> A proposal is deprecating the *getRecordType* API and adding a 
> *getRecordDataType* API instead to return the data type of the requested 
> record.
> {code:java}
> /**
>  * Returns the requested record type.
>  * 
>  * @Deprecated This method will be removed in future versions. It's 
> recommended to use {@link #getRecordDataType()} instead.
>  */
> @Deprecated
> TypeInformation getRecordType();
> /*
>  * Returns the requested record data type.
>  */
> DataType getRecordDataType();
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-15602) Blink planner does not respect the precision when casting timestamp to varchar

2020-01-16 Thread Zhenghua Gao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15602?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17017666#comment-17017666
 ] 

Zhenghua Gao commented on FLINK-15602:
--

[~twalthr] AFAIK we have padded DECIMAL type and intervals in Blink planner.

 

> Blink planner does not respect the precision when casting timestamp to varchar
> --
>
> Key: FLINK-15602
> URL: https://issues.apache.org/jira/browse/FLINK-15602
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.10.0
>Reporter: Dawid Wysakowicz
>Assignee: Zhenghua Gao
>Priority: Blocker
> Fix For: 1.10.0
>
>
> According to SQL 2011 Part 2 Section 6.13 General Rules 11) d)
> {quote}
> If SD is a datetime data type or an interval data type then let Y be the 
> shortest character string that
> conforms to the definition of  in Subclause 5.3, “”, and 
> such that the interpreted value
> of Y is SV and the interpreted precision of Y is the precision of SD.
> {quote}
> That means:
> {code}
> select cast(cast(TO_TIMESTAMP('2014-07-02 06:14:00', '-MM-DD HH24:mm:SS') 
> as TIMESTAMP(0)) as VARCHAR(256)) from ...;
> // should produce
> // 2014-07-02 06:14:00
> select cast(cast(TO_TIMESTAMP('2014-07-02 06:14:00', '-MM-DD HH24:mm:SS') 
> as TIMESTAMP(3)) as VARCHAR(256)) from ...;
> // should produce
> // 2014-07-02 06:14:00.000
> select cast(cast(TO_TIMESTAMP('2014-07-02 06:14:00', '-MM-DD HH24:mm:SS') 
> as TIMESTAMP(9)) as VARCHAR(256)) from ...;
> // should produce
> // 2014-07-02 06:14:00.0
> {code}
> One possible solution would be to propagate the precision in 
> {{org.apache.flink.table.planner.codegen.calls.ScalarOperatorGens#localTimeToStringCode}}.
>  If I am not mistaken this problem was introduced in [FLINK-14599]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-15602) Blink planner does not respect the precision when casting timestamp to varchar

2020-01-16 Thread Zhenghua Gao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15602?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17016745#comment-17016745
 ] 

Zhenghua Gao commented on FLINK-15602:
--

[~dwysakowicz] [~twalthr] [~jark]

Let's padding the TIMESTAMP type in both planner. I will file a PR soon. 
 

> Blink planner does not respect the precision when casting timestamp to varchar
> --
>
> Key: FLINK-15602
> URL: https://issues.apache.org/jira/browse/FLINK-15602
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.10.0
>Reporter: Dawid Wysakowicz
>Priority: Blocker
> Fix For: 1.10.0
>
>
> According to SQL 2011 Part 2 Section 6.13 General Rules 11) d)
> {quote}
> If SD is a datetime data type or an interval data type then let Y be the 
> shortest character string that
> conforms to the definition of  in Subclause 5.3, “”, and 
> such that the interpreted value
> of Y is SV and the interpreted precision of Y is the precision of SD.
> {quote}
> That means:
> {code}
> select cast(cast(TO_TIMESTAMP('2014-07-02 06:14:00', '-MM-DD HH24:mm:SS') 
> as TIMESTAMP(0)) as VARCHAR(256)) from ...;
> // should produce
> // 2014-07-02 06:14:00
> select cast(cast(TO_TIMESTAMP('2014-07-02 06:14:00', '-MM-DD HH24:mm:SS') 
> as TIMESTAMP(3)) as VARCHAR(256)) from ...;
> // should produce
> // 2014-07-02 06:14:00.000
> select cast(cast(TO_TIMESTAMP('2014-07-02 06:14:00', '-MM-DD HH24:mm:SS') 
> as TIMESTAMP(9)) as VARCHAR(256)) from ...;
> // should produce
> // 2014-07-02 06:14:00.0
> {code}
> One possible solution would be to propagate the precision in 
> {{org.apache.flink.table.planner.codegen.calls.ScalarOperatorGens#localTimeToStringCode}}.
>  If I am not mistaken this problem was introduced in [FLINK-14599]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-15546) Obscure error message from ScalarOperatorGens::generateCast

2020-01-16 Thread Zhenghua Gao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15546?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17016740#comment-17016740
 ] 

Zhenghua Gao commented on FLINK-15546:
--

[~lzljs3620320] OK, I will file a PR soon.

[~jark] Please assign this to me.

> Obscure error message from ScalarOperatorGens::generateCast
> ---
>
> Key: FLINK-15546
> URL: https://issues.apache.org/jira/browse/FLINK-15546
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / API
>Reporter: Rui Li
>Priority: Minor
>
> Consider the following case:
> {noformat}
> Flink SQL> describe foo;
> root
>  |-- x: ROW<`f1` DOUBLE, `f2` VARCHAR(10)>
> Flink SQL> insert into foo select row(1.1,'abc');
> [INFO] Submitting SQL update statement to the cluster...
> [ERROR] Could not execute SQL statement. Reason:
> org.apache.flink.table.planner.codegen.CodeGenException: Unsupported cast 
> from 'ROW' to 'ROW'.
> {noformat}
> Users are unlikely to figure out what goes wrong from the above error 
> message. Something like {{Unsupported cast from 'ROW' 
> to 'ROW'}} will be more helpful.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-15602) Blink planner does not respect the precision when casting timestamp to varchar

2020-01-16 Thread Zhenghua Gao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15602?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17016512#comment-17016512
 ] 

Zhenghua Gao edited comment on FLINK-15602 at 1/16/20 9:16 AM:
---

Hi [~dwysakowicz] [~twalthr] altherI investigated the behavior of popular DBMS 
and found: PostgreSQL DO NOT pad '0' and Oracle/MS SQL pad '0' (MYSQL would pad 
'0' for TIMESTAMP type and not pad '0' for DATETIME type). And, hive/spark 
would not pad '0' too.

What's your opinion about the padding behavior? 

 


was (Author: docete):
Hi [~dwysakowicz] [~tiwalter] I investigated the behavior of popular DBMS and 
found: PostgreSQL DO NOT pad '0' and Oracle/MS SQL pad '0' (MYSQL would pad '0' 
for TIMESTAMP type and not pad '0' for DATETIME type). And, hive/spark would 
not pad '0' too.

What's your opinion about the padding behavior? 

 

> Blink planner does not respect the precision when casting timestamp to varchar
> --
>
> Key: FLINK-15602
> URL: https://issues.apache.org/jira/browse/FLINK-15602
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.10.0
>Reporter: Dawid Wysakowicz
>Priority: Blocker
> Fix For: 1.10.0
>
>
> According to SQL 2011 Part 2 Section 6.13 General Rules 11) d)
> {quote}
> If SD is a datetime data type or an interval data type then let Y be the 
> shortest character string that
> conforms to the definition of  in Subclause 5.3, “”, and 
> such that the interpreted value
> of Y is SV and the interpreted precision of Y is the precision of SD.
> {quote}
> That means:
> {code}
> select cast(cast(TO_TIMESTAMP('2014-07-02 06:14:00', '-MM-DD HH24:mm:SS') 
> as TIMESTAMP(0)) as VARCHAR(256)) from ...;
> // should produce
> // 2014-07-02 06:14:00
> select cast(cast(TO_TIMESTAMP('2014-07-02 06:14:00', '-MM-DD HH24:mm:SS') 
> as TIMESTAMP(3)) as VARCHAR(256)) from ...;
> // should produce
> // 2014-07-02 06:14:00.000
> select cast(cast(TO_TIMESTAMP('2014-07-02 06:14:00', '-MM-DD HH24:mm:SS') 
> as TIMESTAMP(9)) as VARCHAR(256)) from ...;
> // should produce
> // 2014-07-02 06:14:00.0
> {code}
> One possible solution would be to propagate the precision in 
> {{org.apache.flink.table.planner.codegen.calls.ScalarOperatorGens#localTimeToStringCode}}.
>  If I am not mistaken this problem was introduced in [FLINK-14599]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-15602) Blink planner does not respect the precision when casting timestamp to varchar

2020-01-15 Thread Zhenghua Gao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15602?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17016512#comment-17016512
 ] 

Zhenghua Gao commented on FLINK-15602:
--

Hi [~dwysakowicz] [~tiwalter] I investigated the behavior of popular DBMS and 
found: PostgreSQL DO NOT pad '0' and Oracle/MS SQL pad '0' (MYSQL would pad '0' 
for TIMESTAMP type and not pad '0' for DATETIME type). And, hive/spark would 
not pad '0' too.

What's your opinion about the padding behavior? 

 

> Blink planner does not respect the precision when casting timestamp to varchar
> --
>
> Key: FLINK-15602
> URL: https://issues.apache.org/jira/browse/FLINK-15602
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.10.0
>Reporter: Dawid Wysakowicz
>Priority: Blocker
> Fix For: 1.10.0
>
>
> According to SQL 2011 Part 2 Section 6.13 General Rules 11) d)
> {quote}
> If SD is a datetime data type or an interval data type then let Y be the 
> shortest character string that
> conforms to the definition of  in Subclause 5.3, “”, and 
> such that the interpreted value
> of Y is SV and the interpreted precision of Y is the precision of SD.
> {quote}
> That means:
> {code}
> select cast(cast(TO_TIMESTAMP('2014-07-02 06:14:00', '-MM-DD HH24:mm:SS') 
> as TIMESTAMP(0)) as VARCHAR(256)) from ...;
> // should produce
> // 2014-07-02 06:14:00
> select cast(cast(TO_TIMESTAMP('2014-07-02 06:14:00', '-MM-DD HH24:mm:SS') 
> as TIMESTAMP(3)) as VARCHAR(256)) from ...;
> // should produce
> // 2014-07-02 06:14:00.000
> select cast(cast(TO_TIMESTAMP('2014-07-02 06:14:00', '-MM-DD HH24:mm:SS') 
> as TIMESTAMP(9)) as VARCHAR(256)) from ...;
> // should produce
> // 2014-07-02 06:14:00.0
> {code}
> One possible solution would be to propagate the precision in 
> {{org.apache.flink.table.planner.codegen.calls.ScalarOperatorGens#localTimeToStringCode}}.
>  If I am not mistaken this problem was introduced in [FLINK-14599]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-15469) UpsertStreamTableSink should support new type system

2020-01-13 Thread Zhenghua Gao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15469?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17014828#comment-17014828
 ] 

Zhenghua Gao edited comment on FLINK-15469 at 1/14/20 3:15 AM:
---

Hi [~lzljs3620320], after re-think the whole thing, we should bring the 
physical data types of the sink and the type class(java tuple2 or scale tuple2) 
to the planner so that our blink planner could handle the precision things. So 
there is a proposal as following:
 # deprecate getRecordType and introduce getRecordDataType instead
 # remove getOutputType and introduce getConsumedDataType, which returns 
ROW
 # introduce getTypeClass interface, which returns type class for codegen

What do you think? I will file a PR soon if this works. 

 
 


was (Author: docete):
Hi [~lzljs3620320], after re-think the whole thing, we should bring the 
physical data types of the sink and the type class(java tuple2 or scale tuple2) 
to the planner so that our blink planner could handle the precision things. So 
there is a proposal as following:
 # remove getRecordType and introduce getRecordDataType instead
 # remove getOutputType and introduce getConsumedDataType, which returns 
ROW
 # introduce getTypeClass interface, which returns type class for codegen

What do you think? I will file a PR soon if this works. 

 

> UpsertStreamTableSink should support new type system
> 
>
> Key: FLINK-15469
> URL: https://issues.apache.org/jira/browse/FLINK-15469
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / API
>Affects Versions: 1.10.0
>Reporter: Zhenghua Gao
>Priority: Major
> Fix For: 1.11.0
>
>
> Currently *UpsertStreamTableSink* can only returns TypeInformation of the 
> requested record, which can't support types with precision and scale, e.g. 
> TIMESTAMP(p), DECIMAL(p,s).
> A proposal is deprecating the *getRecordType* API and adding a 
> *getRecordDataType* API instead to return the data type of the requested 
> record.
> {code:java}
> /**
>  * Returns the requested record type.
>  * 
>  * @Deprecated This method will be removed in future versions. It's 
> recommended to use {@link #getRecordDataType()} instead.
>  */
> @Deprecated
> TypeInformation getRecordType();
> /*
>  * Returns the requested record data type.
>  */
> DataType getRecordDataType();
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-15469) UpsertStreamTableSink should support new type system

2020-01-13 Thread Zhenghua Gao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15469?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17014828#comment-17014828
 ] 

Zhenghua Gao commented on FLINK-15469:
--

Hi [~lzljs3620320], after re-think the whole thing, we should bring the 
physical data types of the sink and the type class(java tuple2 or scale tuple2) 
to the planner so that our blink planner could handle the precision things. So 
there is a proposal as following:
 # remove getRecordType and introduce getRecordDataType instead
 # remove getOutputType and introduce getConsumedDataType, which returns 
ROW
 # introduce getTypeClass interface, which returns type class for codegen

What do you think? I will file a PR soon if this works. 

 

> UpsertStreamTableSink should support new type system
> 
>
> Key: FLINK-15469
> URL: https://issues.apache.org/jira/browse/FLINK-15469
> Project: Flink
>  Issue Type: New Feature
>  Components: Table SQL / API
>Affects Versions: 1.10.0
>Reporter: Zhenghua Gao
>Priority: Major
> Fix For: 1.11.0
>
>
> Currently *UpsertStreamTableSink* can only returns TypeInformation of the 
> requested record, which can't support types with precision and scale, e.g. 
> TIMESTAMP(p), DECIMAL(p,s).
> A proposal is deprecating the *getRecordType* API and adding a 
> *getRecordDataType* API instead to return the data type of the requested 
> record.
> {code:java}
> /**
>  * Returns the requested record type.
>  * 
>  * @Deprecated This method will be removed in future versions. It's 
> recommended to use {@link #getRecordDataType()} instead.
>  */
> @Deprecated
> TypeInformation getRecordType();
> /*
>  * Returns the requested record data type.
>  */
> DataType getRecordDataType();
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-15546) Obscure error message from ScalarOperatorGens::generateCast

2020-01-13 Thread Zhenghua Gao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15546?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17014818#comment-17014818
 ] 

Zhenghua Gao commented on FLINK-15546:
--

{code:java}
case (from, to) =>
  throw new CodeGenException(s"Unsupported cast from '$from' to '$to'.")
{code}
I think we should use *operand.resultType* and *targetType* instead in the 
error message since from and to are *LogicalTypeRoot*
 

> Obscure error message from ScalarOperatorGens::generateCast
> ---
>
> Key: FLINK-15546
> URL: https://issues.apache.org/jira/browse/FLINK-15546
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / API
>Reporter: Rui Li
>Priority: Minor
>
> Consider the following case:
> {noformat}
> Flink SQL> describe foo;
> root
>  |-- x: ROW<`f1` DOUBLE, `f2` VARCHAR(10)>
> Flink SQL> insert into foo select row(1.1,'abc');
> [INFO] Submitting SQL update statement to the cluster...
> [ERROR] Could not execute SQL statement. Reason:
> org.apache.flink.table.planner.codegen.CodeGenException: Unsupported cast 
> from 'ROW' to 'ROW'.
> {noformat}
> Users are unlikely to figure out what goes wrong from the above error 
> message. Something like {{Unsupported cast from 'ROW' 
> to 'ROW'}} will be more helpful.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-15574) DataType to LogicalType conversion issue

2020-01-13 Thread Zhenghua Gao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15574?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17014804#comment-17014804
 ] 

Zhenghua Gao commented on FLINK-15574:
--

HI [~b.hanotte], a reason to keep this deprecated conversion logic is to bypass 
conversion logic in flink-table-common module. In blink we introduce some 
runtime typeinformation(e.g. DecimalTypeInfo, LegacyTimestampTypeInfo, etc) to 
support things like precision. 

Could you update your test to a meaningful scenarios so that we can check 
whether there is a potential bug in blink's conversion logic? 

> DataType to LogicalType conversion issue
> 
>
> Key: FLINK-15574
> URL: https://issues.apache.org/jira/browse/FLINK-15574
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Runtime
>Reporter: Benoit Hanotte
>Priority: Major
>  Labels: pull-request-available
> Attachments: 0001-FLINK-15574-Add-unit-test-to-reproduce-issue.patch
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> We seem to be encountering an issue with the conversion from DataType to 
> LogicalType with the Blink planner (full stacktrace below):
> {code}
> org.apache.flink.table.api.ValidationException: Type 
> LEGACY(BasicArrayTypeInfo) of table field 'my_array' does not match 
> with type BasicArrayTypeInfo of the field 'my_array' of the 
> TableSource return type.
> {code}
> It seems there exists 2 paths to do the conversion from DataType to 
> LogicalType:
> 1. TypeConversions.fromLegacyInfoToDataType():
> used for instance when calling TableSchema.fromTypeInformation().
> 2.  LogicalTypeDataTypeConverter.fromDataTypeToLogicalType():
> Deprecated but still used in TableSourceUtil and many other places.
> These 2 code paths can return a different LogicalType for the same input, 
> leading to issues when the LogicalTypes are compared to ensure they are 
> compatible.  For instance, PlannerTypeUtils.isAssignable() returns false for 
> a DataType created from BasicArrayTypeInfo (leading to the 
> ValidationException above).
> The full stacktrace is the following:
> {code}
> org.apache.flink.table.api.ValidationException: Type 
> LEGACY(BasicArrayTypeInfo) of table field 'my_array' does not match 
> with type BasicArrayTypeInfo of the field 'my_array' of the 
> TableSource return type.
>   at 
> org.apache.flink.table.planner.sources.TableSourceUtil$$anonfun$4.apply(TableSourceUtil.scala:121)
>   at 
> org.apache.flink.table.planner.sources.TableSourceUtil$$anonfun$4.apply(TableSourceUtil.scala:92)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at 
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>   at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
>   at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>   at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
>   at 
> org.apache.flink.table.planner.sources.TableSourceUtil$.computeIndexMapping(TableSourceUtil.scala:92)
>   at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:100)
>   at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:55)
>   at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54)
>   at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlan(StreamExecTableSourceScan.scala:55)
>   at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:86)
>   at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:46)
>   at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54)
>   at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlan(StreamExecCalc.scala:46)
>   at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecUnion$$anonfun$2.apply(StreamExecUnion.scala:86)
>   at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecUnion$$anonfun$2.apply(StreamExecUnion.scala:86)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>

[jira] [Commented] (FLINK-15509) Use sql cilents create view occur Unexpected exception

2020-01-10 Thread Zhenghua Gao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15509?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17012596#comment-17012596
 ] 

Zhenghua Gao commented on FLINK-15509:
--

What about validate data type mismatch in this phase either? [~lzljs3620320]

> Use sql cilents create view occur Unexpected exception
> --
>
> Key: FLINK-15509
> URL: https://issues.apache.org/jira/browse/FLINK-15509
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Client
>Affects Versions: 1.10.0
>Reporter: Xianxun Ye
>Priority: Major
> Fix For: 1.11.0
>
>
> version:master.
> Firstly I created a table sucessful by sql clients,  and then throw an 
> unexcepetd exp when created a view.
> My steps:
> Flink SQL> create table myTable (id int);
> *[INFO] Table has been created.*
> Flink SQL> show tables ;
> myTable
> Flink SQL> describe myTable ;
> root
>  |-- id: INT
> Flink SQL> create view myView as select * from myTable;
>  
> Exception in thread "main" org.apache.flink.table.client.SqlClientException: 
> Unexpected exception. This is a bug. Please consider filing an issue.
>  at org.apache.flink.table.client.SqlClient.main(SqlClient.java:190)
> Caused by: org.apache.flink.table.api.ValidationException: SQL validation 
> failed. findAndCreateTableSource failed.
>  at 
> org.apache.flink.table.calcite.FlinkPlannerImpl.validateInternal(FlinkPlannerImpl.scala:130)
>  at 
> org.apache.flink.table.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:105)
>  at 
> org.apache.flink.table.sqlexec.SqlToOperationConverter.convert(SqlToOperationConverter.java:124)
>  at org.apache.flink.table.planner.ParserImpl.parse(ParserImpl.java:66)
>  at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:464)
>  at 
> org.apache.flink.table.client.gateway.local.LocalExecutor.addView(LocalExecutor.java:300)
>  at 
> org.apache.flink.table.client.cli.CliClient.callCreateView(CliClient.java:579)
>  at 
> org.apache.flink.table.client.cli.CliClient.callCommand(CliClient.java:308)
>  at java.util.Optional.ifPresent(Optional.java:159)
>  at org.apache.flink.table.client.cli.CliClient.open(CliClient.java:200)
>  at org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:125)
>  at org.apache.flink.table.client.SqlClient.start(SqlClient.java:104)
>  at org.apache.flink.table.client.SqlClient.main(SqlClient.java:178)
> Caused by: org.apache.flink.table.api.TableException: 
> findAndCreateTableSource failed.
>  at 
> org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:55)
>  at 
> org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:92)
>  at 
> org.apache.flink.table.catalog.DatabaseCalciteSchema.convertCatalogTable(DatabaseCalciteSchema.java:138)
>  at 
> org.apache.flink.table.catalog.DatabaseCalciteSchema.convertTable(DatabaseCalciteSchema.java:97)
>  at 
> org.apache.flink.table.catalog.DatabaseCalciteSchema.lambda$getTable$0(DatabaseCalciteSchema.java:86)
>  at java.util.Optional.map(Optional.java:215)
>  at 
> org.apache.flink.table.catalog.DatabaseCalciteSchema.getTable(DatabaseCalciteSchema.java:76)
>  at 
> org.apache.calcite.jdbc.SimpleCalciteSchema.getImplicitTable(SimpleCalciteSchema.java:83)
>  at org.apache.calcite.jdbc.CalciteSchema.getTable(CalciteSchema.java:289)
>  at org.apache.calcite.sql.validate.EmptyScope.resolve_(EmptyScope.java:143)
>  at 
> org.apache.calcite.sql.validate.EmptyScope.resolveTable(EmptyScope.java:99)
>  at 
> org.apache.calcite.sql.validate.DelegatingScope.resolveTable(DelegatingScope.java:203)
>  at 
> org.apache.calcite.sql.validate.IdentifierNamespace.resolveImpl(IdentifierNamespace.java:105)
>  at 
> org.apache.calcite.sql.validate.IdentifierNamespace.validateImpl(IdentifierNamespace.java:177)
>  at 
> org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
>  at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1005)
>  at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:965)
>  at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3125)
>  at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3107)
>  at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3379)
>  at 
> org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)
>  at 
> org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
>  at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1005)
>  at 
> 

[jira] [Commented] (FLINK-15445) JDBC Table Source didn't work for Types with precision (or/and scale)

2020-01-09 Thread Zhenghua Gao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15445?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17012429#comment-17012429
 ] 

Zhenghua Gao commented on FLINK-15445:
--

We come to an agreement that the user should be informed if the connector does 
not support the desired precision. And I will put this discussion on ML to let 
more people get involved. Hopefully we come to something like a connector 
development guide such that all connectors behave the same for such cases.

[http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-A-mechanism-to-validate-the-precision-of-columns-for-connectors-td36552.html]

> JDBC Table Source didn't work for Types with precision (or/and scale)
> -
>
> Key: FLINK-15445
> URL: https://issues.apache.org/jira/browse/FLINK-15445
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / JDBC
>Affects Versions: 1.10.0
>Reporter: Zhenghua Gao
>Assignee: Zhenghua Gao
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.10.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> {code:java}
>  public class JDBCSourceExample {
> public static void main(String[] args) throws Exception {
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setParallelism(1);
> EnvironmentSettings envSettings = EnvironmentSettings.newInstance()
> .useBlinkPlanner()
> .inStreamingMode()
> .build();
> StreamTableEnvironment tableEnvironment = 
> StreamTableEnvironment.create(env, envSettings);
> String mysqlCurrencyDDL = "CREATE TABLE currency (\n" +
> "  currency_id BIGINT,\n" +
> "  currency_name STRING,\n" +
> "  rate DOUBLE,\n" +
> "  currency_time TIMESTAMP(3),\n" +
> "  country STRING,\n" +
> "  timestamp6 TIMESTAMP(6),\n" +
> "  time6 TIME(6),\n" +
> "  gdp DECIMAL(10, 4)\n" +
> ") WITH (\n" +
> "   'connector.type' = 'jdbc',\n" +
> "   'connector.url' = 'jdbc:mysql://localhost:3306/test',\n" +
> "   'connector.username' = 'root'," +
> "   'connector.table' = 'currency',\n" +
> "   'connector.driver' = 'com.mysql.jdbc.Driver',\n" +
> "   'connector.lookup.cache.max-rows' = '500', \n" +
> "   'connector.lookup.cache.ttl' = '10s',\n" +
> "   'connector.lookup.max-retries' = '3'" +
> ")";
> tableEnvironment.sqlUpdate(mysqlCurrencyDDL);
> String querySQL = "select * from currency";
> tableEnvironment.toAppendStream(tableEnvironment.sqlQuery(querySQL), 
> Row.class).print();
> tableEnvironment.execute("JdbcExample");
> }
> }{code}
>  
> Throws Exception:
>  
> {code:java}
> org.apache.flink.table.api.ValidationException: Type TIMESTAMP(6) of table 
> field 'timestamp6_col' does not match with the physical type TIMESTAMP(3) of 
> the 'timestamp6_col' field of the TableSource return 
> type.org.apache.flink.table.api.ValidationException: Type TIMESTAMP(6) of 
> table field 'timestamp6_col' does not match with the physical type 
> TIMESTAMP(3) of the 'timestamp6_col' field of the TableSource return type.
>  at 
> org.apache.flink.table.utils.TypeMappingUtils.lambda$checkPhysicalLogicalTypeCompatible$4(TypeMappingUtils.java:164)
>  at 
> org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:277)
>  at 
> org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:254)
>  at 
> org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor.visit(LogicalTypeDefaultVisitor.java:132)
>  at 
> org.apache.flink.table.types.logical.TimestampType.accept(TimestampType.java:151)
>  at 
> org.apache.flink.table.utils.TypeMappingUtils.checkIfCompatible(TypeMappingUtils.java:254)
>  at 
> org.apache.flink.table.utils.TypeMappingUtils.checkPhysicalLogicalTypeCompatible(TypeMappingUtils.java:160)
>  at 
> org.apache.flink.table.utils.TypeMappingUtils.lambda$computeInCompositeType$8(TypeMappingUtils.java:232)
>  at java.util.stream.Collectors.lambda$toMap$58(Collectors.java:1321) at 
> java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169) at 
> java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
>  at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481) at 
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471) 
> at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708) 
> at 

[jira] [Commented] (FLINK-15509) Use sql cilents create view occur Unexpected exception

2020-01-09 Thread Zhenghua Gao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15509?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17011659#comment-17011659
 ] 

Zhenghua Gao commented on FLINK-15509:
--

Should we verify whether the properties exist, or whether the properties is 
correct?

I think correctness is hard to verify on Compile side (e.g. HDFS path, JDBC 
URLs)

> Use sql cilents create view occur Unexpected exception
> --
>
> Key: FLINK-15509
> URL: https://issues.apache.org/jira/browse/FLINK-15509
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Client
>Affects Versions: 1.10.0
>Reporter: Xianxun Ye
>Priority: Major
> Fix For: 1.11.0
>
>
> version:master.
> Firstly I created a table sucessful by sql clients,  and then throw an 
> unexcepetd exp when created a view.
> My steps:
> Flink SQL> create table myTable (id int);
> *[INFO] Table has been created.*
> Flink SQL> show tables ;
> myTable
> Flink SQL> describe myTable ;
> root
>  |-- id: INT
> Flink SQL> create view myView as select * from myTable;
>  
> Exception in thread "main" org.apache.flink.table.client.SqlClientException: 
> Unexpected exception. This is a bug. Please consider filing an issue.
>  at org.apache.flink.table.client.SqlClient.main(SqlClient.java:190)
> Caused by: org.apache.flink.table.api.ValidationException: SQL validation 
> failed. findAndCreateTableSource failed.
>  at 
> org.apache.flink.table.calcite.FlinkPlannerImpl.validateInternal(FlinkPlannerImpl.scala:130)
>  at 
> org.apache.flink.table.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:105)
>  at 
> org.apache.flink.table.sqlexec.SqlToOperationConverter.convert(SqlToOperationConverter.java:124)
>  at org.apache.flink.table.planner.ParserImpl.parse(ParserImpl.java:66)
>  at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:464)
>  at 
> org.apache.flink.table.client.gateway.local.LocalExecutor.addView(LocalExecutor.java:300)
>  at 
> org.apache.flink.table.client.cli.CliClient.callCreateView(CliClient.java:579)
>  at 
> org.apache.flink.table.client.cli.CliClient.callCommand(CliClient.java:308)
>  at java.util.Optional.ifPresent(Optional.java:159)
>  at org.apache.flink.table.client.cli.CliClient.open(CliClient.java:200)
>  at org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:125)
>  at org.apache.flink.table.client.SqlClient.start(SqlClient.java:104)
>  at org.apache.flink.table.client.SqlClient.main(SqlClient.java:178)
> Caused by: org.apache.flink.table.api.TableException: 
> findAndCreateTableSource failed.
>  at 
> org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:55)
>  at 
> org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:92)
>  at 
> org.apache.flink.table.catalog.DatabaseCalciteSchema.convertCatalogTable(DatabaseCalciteSchema.java:138)
>  at 
> org.apache.flink.table.catalog.DatabaseCalciteSchema.convertTable(DatabaseCalciteSchema.java:97)
>  at 
> org.apache.flink.table.catalog.DatabaseCalciteSchema.lambda$getTable$0(DatabaseCalciteSchema.java:86)
>  at java.util.Optional.map(Optional.java:215)
>  at 
> org.apache.flink.table.catalog.DatabaseCalciteSchema.getTable(DatabaseCalciteSchema.java:76)
>  at 
> org.apache.calcite.jdbc.SimpleCalciteSchema.getImplicitTable(SimpleCalciteSchema.java:83)
>  at org.apache.calcite.jdbc.CalciteSchema.getTable(CalciteSchema.java:289)
>  at org.apache.calcite.sql.validate.EmptyScope.resolve_(EmptyScope.java:143)
>  at 
> org.apache.calcite.sql.validate.EmptyScope.resolveTable(EmptyScope.java:99)
>  at 
> org.apache.calcite.sql.validate.DelegatingScope.resolveTable(DelegatingScope.java:203)
>  at 
> org.apache.calcite.sql.validate.IdentifierNamespace.resolveImpl(IdentifierNamespace.java:105)
>  at 
> org.apache.calcite.sql.validate.IdentifierNamespace.validateImpl(IdentifierNamespace.java:177)
>  at 
> org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
>  at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1005)
>  at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:965)
>  at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3125)
>  at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3107)
>  at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3379)
>  at 
> org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)
>  at 
> org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
>  at 
> 

[jira] [Commented] (FLINK-15525) HBase connector should use new type system to suppport precision/scale

2020-01-08 Thread Zhenghua Gao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17011346#comment-17011346
 ] 

Zhenghua Gao commented on FLINK-15525:
--

[~lzljs3620320] [~Leonard Xu] Close this and copy the description to the older 
one.  

We can discuss under the older ticket.

> HBase connector should use new type system to suppport precision/scale
> --
>
> Key: FLINK-15525
> URL: https://issues.apache.org/jira/browse/FLINK-15525
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / HBase
>Affects Versions: 1.10.0
>Reporter: Zhenghua Gao
>Priority: Major
> Fix For: 1.11.0
>
>
> Currently *HBaseTableSchema* use TypeInformation to specify an HBase table's 
> schema, which would cause precision/scale loss for several data types. 
> Meanwhile *HBaseTypeUtils* serialize a java.sql.Timestamp to long in bytes, 
> which would cause precision loss for TIMESTAMP types.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-15363) Hbase connector do not support datatypes with precision like TIMESTAMP(9) and DECIMAL(10,4)

2020-01-08 Thread Zhenghua Gao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15363?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17011343#comment-17011343
 ] 

Zhenghua Gao commented on FLINK-15363:
--

Currently *HBaseTableSchema* use TypeInformation to specify an HBase table's 
schema, which would cause precision/scale loss for several data types. 
Meanwhile *HBaseTypeUtils* serialize a java.sql.Timestamp to long in bytes, 
which would cause precision loss for TIMESTAMP types.

the hbase connector should use new type system to fix this.

> Hbase connector do not support datatypes with precision like TIMESTAMP(9) and 
> DECIMAL(10,4)
> ---
>
> Key: FLINK-15363
> URL: https://issues.apache.org/jira/browse/FLINK-15363
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / HBase
>Affects Versions: 1.10.0
>Reporter: Leonard Xu
>Priority: Major
> Fix For: 1.10.0
>
>
> {code:java}
> // exception msg
> rowtype of new rel:rowtype of new rel:RecordType(VARCHAR(2147483647) 
> CHARACTER SET "UTF-16LE" order_id, VARCHAR(2147483647) CHARACTER SET 
> "UTF-16LE" item, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" currency, 
> DECIMAL(10, 4) amount, TIMESTAMP(3) order_time, TIME ATTRIBUTE(PROCTIME) NOT 
> NULL proc_time, DECIMAL(20, 4) amount_kg, TIME ATTRIBUTE(ROWTIME) ts, BIGINT 
> currency_id, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" currency_name, 
> DECIMAL(38, 4) rate, TIMESTAMP(3) currency_time, VARCHAR(2147483647) 
> CHARACTER SET "UTF-16LE" country, VARCHAR(2147483647) CHARACTER SET 
> "UTF-16LE" rowkey, RecordType:peek_no_expand(INTEGER country_id, 
> VARCHAR(2147483647) CHARACTER SET "UTF-16LE" country_name, 
> VARCHAR(2147483647) CHARACTER SET "UTF-16LE" country_name_cn, 
> VARCHAR(2147483647) CHARACTER SET "UTF-16LE" currency, VARCHAR(2147483647) 
> CHARACTER SET "UTF-16LE" region_name) f1, 
> RecordType:peek_no_expand(TIMESTAMP(3) record_timestamp3, TIMESTAMP(3) 
> record_timestamp9, TIME(0) time3, TIME(0) time9, DECIMAL(38, 18) gdp) f2) NOT 
> NULLrowtype of set:RecordType(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" 
> order_id, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" item, 
> VARCHAR(2147483647) CHARACTER SET "UTF-16LE" currency, DECIMAL(10, 4) amount, 
> TIMESTAMP(3) order_time, TIME ATTRIBUTE(PROCTIME) NOT NULL proc_time, 
> DECIMAL(20, 4) amount_kg, TIME ATTRIBUTE(ROWTIME) ts, BIGINT currency_id, 
> VARCHAR(2147483647) CHARACTER SET "UTF-16LE" currency_name, DECIMAL(38, 4) 
> rate, TIMESTAMP(3) currency_time, VARCHAR(2147483647) CHARACTER SET 
> "UTF-16LE" country, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" rowkey, 
> RecordType:peek_no_expand(INTEGER country_id, VARCHAR(2147483647) CHARACTER 
> SET "UTF-16LE" country_name, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" 
> country_name_cn, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" currency, 
> VARCHAR(2147483647) CHARACTER SET "UTF-16LE" region_name) f1, 
> RecordType:peek_no_expand(TIMESTAMP(3) record_timestamp3, TIMESTAMP(9) 
> record_timestamp9, TIME(0) time3, TIME(0) time9, DECIMAL(10, 4) gdp) f2) NOT 
> NULL at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:84)
>  at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
>  at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
>  at 
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
>  at 
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
>  at scala.collection.Iterator$class.foreach(Iterator.scala:893) at 
> scala.collection.AbstractIterator.foreach(Iterator.scala:1336) at 
> scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at 
> scala.collection.AbstractIterable.foreach(Iterable.scala:54) at 
> scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157) at 
> scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57)
>  at 
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:167)
>  at 
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:89)
>  at 
> org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
>  at 
> org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:223)
>  at 
> 

[jira] [Closed] (FLINK-15525) HBase connector should use new type system to suppport precision/scale

2020-01-08 Thread Zhenghua Gao (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15525?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhenghua Gao closed FLINK-15525.

Resolution: Duplicate

> HBase connector should use new type system to suppport precision/scale
> --
>
> Key: FLINK-15525
> URL: https://issues.apache.org/jira/browse/FLINK-15525
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / HBase
>Affects Versions: 1.10.0
>Reporter: Zhenghua Gao
>Priority: Major
> Fix For: 1.11.0
>
>
> Currently *HBaseTableSchema* use TypeInformation to specify an HBase table's 
> schema, which would cause precision/scale loss for several data types. 
> Meanwhile *HBaseTypeUtils* serialize a java.sql.Timestamp to long in bytes, 
> which would cause precision loss for TIMESTAMP types.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-15525) HBase connector should use new type system to suppport precision/scale

2020-01-08 Thread Zhenghua Gao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17011328#comment-17011328
 ] 

Zhenghua Gao commented on FLINK-15525:
--

[~Leonard Xu] Added a link to old ticket.

> HBase connector should use new type system to suppport precision/scale
> --
>
> Key: FLINK-15525
> URL: https://issues.apache.org/jira/browse/FLINK-15525
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / HBase
>Affects Versions: 1.10.0
>Reporter: Zhenghua Gao
>Priority: Major
> Fix For: 1.11.0
>
>
> Currently *HBaseTableSchema* use TypeInformation to specify an HBase table's 
> schema, which would cause precision/scale loss for several data types. 
> Meanwhile *HBaseTypeUtils* serialize a java.sql.Timestamp to long in bytes, 
> which would cause precision loss for TIMESTAMP types.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-13651) Blink planner should parse char(n)/varchar(n)/decimal(p, s) inside a string to corresponding datatype

2020-01-08 Thread Zhenghua Gao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-13651?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17011306#comment-17011306
 ] 

Zhenghua Gao commented on FLINK-13651:
--

[~liyu] [~jark] The root cause is PlannerExpressionParserImpl in both old 
planner and blink planner use old type system, and we must keep them as the 
same for packaging concern, see 
https://issues.apache.org/jira/browse/FLINK-13267

The impact of using new type system for both PlannerExpressionParserImpl needs 
to evaluate. 

I prefer to postpone it to later release since this only affects Table API.

[~lzljs3620320] Is there any plan to refactor the Scala style Expression 
parser? 

> Blink planner should parse char(n)/varchar(n)/decimal(p, s) inside a string 
> to corresponding datatype
> -
>
> Key: FLINK-13651
> URL: https://issues.apache.org/jira/browse/FLINK-13651
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.9.0, 1.10.0
>Reporter: Zhenghua Gao
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> could reproduce in  ScalarFunctionsTest:
> `testAllApis(
>    'f31.cast(DataTypes.DECIMAL(38, 18)).truncate(2),
>    "f31.cast(DECIMAL(38, 18)).truncate(2)",
>    "truncate(cast(f31 as decimal(38, 18)), 2)",
>    "-0.12")`
>  
> A possible reason is LookupCallResolver treat decimal(38, 18) as a function 
> call.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-15525) HBase connector should use new type system to suppport precision/scale

2020-01-08 Thread Zhenghua Gao (Jira)
Zhenghua Gao created FLINK-15525:


 Summary: HBase connector should use new type system to suppport 
precision/scale
 Key: FLINK-15525
 URL: https://issues.apache.org/jira/browse/FLINK-15525
 Project: Flink
  Issue Type: Bug
  Components: Connectors / HBase
Affects Versions: 1.10.0
Reporter: Zhenghua Gao
 Fix For: 1.11.0


Currently *HBaseTableSchema* use TypeInformation to specify an HBase table's 
schema, which would cause precision/scale loss for several data types. 
Meanwhile *HBaseTypeUtils* serialize a java.sql.Timestamp to long in bytes, 
which would cause precision loss for TIMESTAMP types.
 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-14925) the return type of TO_TIMESTAMP should be Timestamp(9) instead of Timestamp(3)

2020-01-08 Thread Zhenghua Gao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14925?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17010512#comment-17010512
 ] 

Zhenghua Gao commented on FLINK-14925:
--

[~jark] Make sense. Is there any ticket to track supporting TIMESTAMP(9) as 
rowtime attribute?

> the return type of TO_TIMESTAMP should be Timestamp(9) instead of Timestamp(3)
> --
>
> Key: FLINK-14925
> URL: https://issues.apache.org/jira/browse/FLINK-14925
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.10.0
>Reporter: Zhenghua Gao
>Priority: Minor
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-15509) Use sql cilents create view occur Unexpected exception

2020-01-07 Thread Zhenghua Gao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15509?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17010331#comment-17010331
 ] 

Zhenghua Gao commented on FLINK-15509:
--

Hi [~yesorno], the reason is the table you created is not valid. Our DDL should 
contains both schema information and *with properties* to describe 
connectors[1][2].

When you create a table in DDL, it just write the table information in catalog.

When you query a table, flink will check whether the table is valid(have a try 
to "SELECT * FROM myTable", will throw the same exception).

Meanwhile,when you create a view in SQL client, flink will check whether the 
underlying table is valid.

[1] 
[https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/create.html]

[2] 
[https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connect.html#file-system-connector]

> Use sql cilents create view occur Unexpected exception
> --
>
> Key: FLINK-15509
> URL: https://issues.apache.org/jira/browse/FLINK-15509
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Client
>Reporter: Xianxun Ye
>Priority: Major
>
> version:master.
> Firstly I created a table sucessful by sql clients,  and then throw an 
> unexcepetd exp when created a view.
> My steps:
> Flink SQL> create table myTable (id int);
> *[INFO] Table has been created.*
> Flink SQL> show tables ;
> myTable
> Flink SQL> describe myTable ;
> root
>  |-- id: INT
> Flink SQL> create view myView as select * from myTable;
>  
> Exception in thread "main" org.apache.flink.table.client.SqlClientException: 
> Unexpected exception. This is a bug. Please consider filing an issue.
>  at org.apache.flink.table.client.SqlClient.main(SqlClient.java:190)
> Caused by: org.apache.flink.table.api.ValidationException: SQL validation 
> failed. findAndCreateTableSource failed.
>  at 
> org.apache.flink.table.calcite.FlinkPlannerImpl.validateInternal(FlinkPlannerImpl.scala:130)
>  at 
> org.apache.flink.table.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:105)
>  at 
> org.apache.flink.table.sqlexec.SqlToOperationConverter.convert(SqlToOperationConverter.java:124)
>  at org.apache.flink.table.planner.ParserImpl.parse(ParserImpl.java:66)
>  at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:464)
>  at 
> org.apache.flink.table.client.gateway.local.LocalExecutor.addView(LocalExecutor.java:300)
>  at 
> org.apache.flink.table.client.cli.CliClient.callCreateView(CliClient.java:579)
>  at 
> org.apache.flink.table.client.cli.CliClient.callCommand(CliClient.java:308)
>  at java.util.Optional.ifPresent(Optional.java:159)
>  at org.apache.flink.table.client.cli.CliClient.open(CliClient.java:200)
>  at org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:125)
>  at org.apache.flink.table.client.SqlClient.start(SqlClient.java:104)
>  at org.apache.flink.table.client.SqlClient.main(SqlClient.java:178)
> Caused by: org.apache.flink.table.api.TableException: 
> findAndCreateTableSource failed.
>  at 
> org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:55)
>  at 
> org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:92)
>  at 
> org.apache.flink.table.catalog.DatabaseCalciteSchema.convertCatalogTable(DatabaseCalciteSchema.java:138)
>  at 
> org.apache.flink.table.catalog.DatabaseCalciteSchema.convertTable(DatabaseCalciteSchema.java:97)
>  at 
> org.apache.flink.table.catalog.DatabaseCalciteSchema.lambda$getTable$0(DatabaseCalciteSchema.java:86)
>  at java.util.Optional.map(Optional.java:215)
>  at 
> org.apache.flink.table.catalog.DatabaseCalciteSchema.getTable(DatabaseCalciteSchema.java:76)
>  at 
> org.apache.calcite.jdbc.SimpleCalciteSchema.getImplicitTable(SimpleCalciteSchema.java:83)
>  at org.apache.calcite.jdbc.CalciteSchema.getTable(CalciteSchema.java:289)
>  at org.apache.calcite.sql.validate.EmptyScope.resolve_(EmptyScope.java:143)
>  at 
> org.apache.calcite.sql.validate.EmptyScope.resolveTable(EmptyScope.java:99)
>  at 
> org.apache.calcite.sql.validate.DelegatingScope.resolveTable(DelegatingScope.java:203)
>  at 
> org.apache.calcite.sql.validate.IdentifierNamespace.resolveImpl(IdentifierNamespace.java:105)
>  at 
> org.apache.calcite.sql.validate.IdentifierNamespace.validateImpl(IdentifierNamespace.java:177)
>  at 
> org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
>  at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1005)
>  at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:965)
>  at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3125)
>  at 
> 

[jira] [Commented] (FLINK-15508) support DataType with precision in ExpressionParser

2020-01-07 Thread Zhenghua Gao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15508?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17010323#comment-17010323
 ] 

Zhenghua Gao commented on FLINK-15508:
--

Seem as duplicates of FLINK-13651? The root cause is 
PlannerExpressionParserImpl of  both old planner and blink planner use old type 
system( and we must keep both PlannerExpressionParserImpl classes the same for 
packaging concern, see FLINK-13267). 

> support DataType with precision in ExpressionParser
> ---
>
> Key: FLINK-15508
> URL: https://issues.apache.org/jira/browse/FLINK-15508
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / API
>Reporter: Leonard Xu
>Priority: Major
> Fix For: 1.11.0
>
>
> PlannerExpressionParser do not support DataTypes with precision like 
> `DECIMAL(38, 0)`, this will lead following api test fail.
> {code:java}
> //success 
> testTableApi("123456789123456789123456789".cast(DataTypes.DECIMAL(38, 0)),
>   "123456789123456789123456789")
> //fail
> testTableApi(ExpressionParser.parseExpression(
>   "'123456789123456789123456789'.cast(DECIMAL(38, 0))"),
>   "123456789123456789123456789")
> {code}
> org.apache.flink.table.api.ValidationException: Undefined function: 
> DECIMALorg.apache.flink.table.api.ValidationException: Undefined function: 
> DECIMAL at 
> org.apache.flink.table.expressions.resolver.LookupCallResolver.lambda$visit$0(LookupCallResolver.java:47)
>  at java.util.Optional.orElseThrow(Optional.java:290) at 
> org.apache.flink.table.expressions.resolver.LookupCallResolver.visit(LookupCallResolver.java:47)
>  at 
> org.apache.flink.table.expressions.resolver.LookupCallResolver.visit(LookupCallResolver.java:36)
>  at 
> org.apache.flink.table.expressions.ApiExpressionVisitor.visit(ApiExpressionVisitor.java:37)
>  at 
> org.apache.flink.table.expressions.LookupCallExpression.accept(LookupCallExpression.java:66)
>  at 
> org.apache.flink.table.expressions.resolver.LookupCallResolver.lambda$resolveChildren$1(LookupCallResolver.java:62)
>  at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) 
> at 
> java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
>  at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481) at 
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471) 
> at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708) 
> at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) at 
> java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499) at 
> org.apache.flink.table.expressions.resolver.LookupCallResolver.resolveChildren(LookupCallResolver.java:63)
>  at 
> org.apache.flink.table.expressions.resolver.LookupCallResolver.visit(LookupCallResolver.java:52)
>  at 
> org.apache.flink.table.expressions.resolver.LookupCallResolver.visit(LookupCallResolver.java:36)
>  at 
> org.apache.flink.table.expressions.ApiExpressionVisitor.visit(ApiExpressionVisitor.java:37)
>  at 
> org.apache.flink.table.expressions.LookupCallExpression.accept(LookupCallExpression.java:66)
>  at 
> org.apache.flink.table.api.internal.TableImpl.lambda$select$0(TableImpl.java:123)
>  at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) 
> at 
> java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948)
>  at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481) at 
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471) 
> at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708) 
> at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) at 
> java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499) at 
> org.apache.flink.table.api.internal.TableImpl.select(TableImpl.java:124) at 
> org.apache.flink.table.planner.expressions.utils.ExpressionTestBase.addTableApiTestExpr(ExpressionTestBase.scala:242)
>  at 
> org.apache.flink.table.planner.expressions.utils.ExpressionTestBase.testTableApi(ExpressionTestBase.scala:232)
>  at 
> org.apache.flink.table.planner.expressions.DecimalTypeTest.testDecimal(DecimalTypeTest.scala:156)
>  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>  at java.lang.reflect.Method.invoke(Method.java:498) at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>  at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>  at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>  at 
> 

[jira] [Commented] (FLINK-15460) planner dependencies won't be necessary for JDBC connector

2020-01-05 Thread Zhenghua Gao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17008583#comment-17008583
 ] 

Zhenghua Gao commented on FLINK-15460:
--

[~twalthr] Actually jdbc connector code should not depend on planner now. Only 
testing code depend on them. So I will remove planner dependencies from JDBC 
connector by changing the scope to test. What do you think?

> planner dependencies won't be necessary for JDBC connector
> --
>
> Key: FLINK-15460
> URL: https://issues.apache.org/jira/browse/FLINK-15460
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / JDBC
>Affects Versions: 1.10.0
>Reporter: Zhenghua Gao
>Priority: Minor
> Fix For: 1.11.0
>
>
> remove planner dependencies from JDBC connector by changing the scope to test.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-15460) planner dependencies won't be necessary for JDBC connector

2020-01-05 Thread Zhenghua Gao (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15460?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhenghua Gao updated FLINK-15460:
-
Description: remove planner dependencies from JDBC connector by changing 
the scope to test.

> planner dependencies won't be necessary for JDBC connector
> --
>
> Key: FLINK-15460
> URL: https://issues.apache.org/jira/browse/FLINK-15460
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / JDBC
>Affects Versions: 1.10.0
>Reporter: Zhenghua Gao
>Priority: Minor
> Fix For: 1.11.0
>
>
> remove planner dependencies from JDBC connector by changing the scope to test.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-15460) planner dependencies won't be necessary for JDBC connector

2020-01-05 Thread Zhenghua Gao (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15460?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhenghua Gao updated FLINK-15460:
-
Component/s: (was: Connectors / HBase)

> planner dependencies won't be necessary for JDBC connector
> --
>
> Key: FLINK-15460
> URL: https://issues.apache.org/jira/browse/FLINK-15460
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / JDBC
>Affects Versions: 1.10.0
>Reporter: Zhenghua Gao
>Priority: Minor
> Fix For: 1.11.0
>
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-15445) JDBC Table Source didn't work for Types with precision (or/and scale)

2020-01-05 Thread Zhenghua Gao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15445?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17008529#comment-17008529
 ] 

Zhenghua Gao commented on FLINK-15445:
--

[~jark] [~twalthr] I have opened a PR to let JDBC source support 
precision/scale, and a problem arises: Do we need to check whether the 
underlying database supports the data types defined in our DDL to avoid 
precision loss?  Some scenarios are listed as following:
 # the underlying DB supports DECIMAL(65, 30), which is out of range of Flink's 
decimal
 # User defines a table with DECIMAL(10, 4) in underlying db, and defines a 
table with DECIMAL(5, 2) in Flink
 # the underlying DB supports TIMESTAMP(6), and user defines a table with 
TIMESTAMP(9) in Flink
 # the precision or scale of the underlying DB varies between different versions

What do you think about this?

> JDBC Table Source didn't work for Types with precision (or/and scale)
> -
>
> Key: FLINK-15445
> URL: https://issues.apache.org/jira/browse/FLINK-15445
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / JDBC
>Affects Versions: 1.10.0
>Reporter: Zhenghua Gao
>Assignee: Zhenghua Gao
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.10.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> {code:java}
>  public class JDBCSourceExample {
> public static void main(String[] args) throws Exception {
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setParallelism(1);
> EnvironmentSettings envSettings = EnvironmentSettings.newInstance()
> .useBlinkPlanner()
> .inStreamingMode()
> .build();
> StreamTableEnvironment tableEnvironment = 
> StreamTableEnvironment.create(env, envSettings);
> String mysqlCurrencyDDL = "CREATE TABLE currency (\n" +
> "  currency_id BIGINT,\n" +
> "  currency_name STRING,\n" +
> "  rate DOUBLE,\n" +
> "  currency_time TIMESTAMP(3),\n" +
> "  country STRING,\n" +
> "  timestamp6 TIMESTAMP(6),\n" +
> "  time6 TIME(6),\n" +
> "  gdp DECIMAL(10, 4)\n" +
> ") WITH (\n" +
> "   'connector.type' = 'jdbc',\n" +
> "   'connector.url' = 'jdbc:mysql://localhost:3306/test',\n" +
> "   'connector.username' = 'root'," +
> "   'connector.table' = 'currency',\n" +
> "   'connector.driver' = 'com.mysql.jdbc.Driver',\n" +
> "   'connector.lookup.cache.max-rows' = '500', \n" +
> "   'connector.lookup.cache.ttl' = '10s',\n" +
> "   'connector.lookup.max-retries' = '3'" +
> ")";
> tableEnvironment.sqlUpdate(mysqlCurrencyDDL);
> String querySQL = "select * from currency";
> tableEnvironment.toAppendStream(tableEnvironment.sqlQuery(querySQL), 
> Row.class).print();
> tableEnvironment.execute("JdbcExample");
> }
> }{code}
>  
> Throws Exception:
>  
> {code:java}
> org.apache.flink.table.api.ValidationException: Type TIMESTAMP(6) of table 
> field 'timestamp6_col' does not match with the physical type TIMESTAMP(3) of 
> the 'timestamp6_col' field of the TableSource return 
> type.org.apache.flink.table.api.ValidationException: Type TIMESTAMP(6) of 
> table field 'timestamp6_col' does not match with the physical type 
> TIMESTAMP(3) of the 'timestamp6_col' field of the TableSource return type.
>  at 
> org.apache.flink.table.utils.TypeMappingUtils.lambda$checkPhysicalLogicalTypeCompatible$4(TypeMappingUtils.java:164)
>  at 
> org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:277)
>  at 
> org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:254)
>  at 
> org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor.visit(LogicalTypeDefaultVisitor.java:132)
>  at 
> org.apache.flink.table.types.logical.TimestampType.accept(TimestampType.java:151)
>  at 
> org.apache.flink.table.utils.TypeMappingUtils.checkIfCompatible(TypeMappingUtils.java:254)
>  at 
> org.apache.flink.table.utils.TypeMappingUtils.checkPhysicalLogicalTypeCompatible(TypeMappingUtils.java:160)
>  at 
> org.apache.flink.table.utils.TypeMappingUtils.lambda$computeInCompositeType$8(TypeMappingUtils.java:232)
>  at java.util.stream.Collectors.lambda$toMap$58(Collectors.java:1321) at 
> java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169) at 
> java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
>  at 

[jira] [Created] (FLINK-15469) UpsertStreamTableSink should support new type system

2020-01-03 Thread Zhenghua Gao (Jira)
Zhenghua Gao created FLINK-15469:


 Summary: UpsertStreamTableSink should support new type system
 Key: FLINK-15469
 URL: https://issues.apache.org/jira/browse/FLINK-15469
 Project: Flink
  Issue Type: New Feature
  Components: Table SQL / API
Affects Versions: 1.10.0
Reporter: Zhenghua Gao
 Fix For: 1.11.0


Currently *UpsertStreamTableSink* can only returns TypeInformation of the 
requested record, which can't support types with precision and scale, e.g. 
TIMESTAMP(p), DECIMAL(p,s).

A proposal is deprecating the *getRecordType* API and adding a 
*getRecordDataType* API instead to return the data type of the requested record.
{code:java}
/**
 * Returns the requested record type.
 * 
 * @Deprecated This method will be removed in future versions. It's recommended 
to use {@link #getRecordDataType()} instead.
 */
@Deprecated
TypeInformation getRecordType();

/*
 * Returns the requested record data type.
 */
DataType getRecordDataType();

{code}
 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-15460) planner dependencies won't be necessary for JDBC connector

2020-01-02 Thread Zhenghua Gao (Jira)
Zhenghua Gao created FLINK-15460:


 Summary: planner dependencies won't be necessary for JDBC connector
 Key: FLINK-15460
 URL: https://issues.apache.org/jira/browse/FLINK-15460
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / HBase, Connectors / JDBC
Affects Versions: 1.10.0
Reporter: Zhenghua Gao
 Fix For: 1.11.0






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-15445) JDBC Table Source didn't work for Types with precision (or/and scale)

2020-01-02 Thread Zhenghua Gao (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15445?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhenghua Gao updated FLINK-15445:
-
Description: 
{code:java}
 public class JDBCSourceExample {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);

EnvironmentSettings envSettings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();
StreamTableEnvironment tableEnvironment = 
StreamTableEnvironment.create(env, envSettings);
String mysqlCurrencyDDL = "CREATE TABLE currency (\n" +
"  currency_id BIGINT,\n" +
"  currency_name STRING,\n" +
"  rate DOUBLE,\n" +
"  currency_time TIMESTAMP(3),\n" +
"  country STRING,\n" +
"  timestamp6 TIMESTAMP(6),\n" +
"  time6 TIME(6),\n" +
"  gdp DECIMAL(10, 4)\n" +
") WITH (\n" +
"   'connector.type' = 'jdbc',\n" +
"   'connector.url' = 'jdbc:mysql://localhost:3306/test',\n" +
"   'connector.username' = 'root'," +
"   'connector.table' = 'currency',\n" +
"   'connector.driver' = 'com.mysql.jdbc.Driver',\n" +
"   'connector.lookup.cache.max-rows' = '500', \n" +
"   'connector.lookup.cache.ttl' = '10s',\n" +
"   'connector.lookup.max-retries' = '3'" +
")";

tableEnvironment.sqlUpdate(mysqlCurrencyDDL);


String querySQL = "select * from currency";

tableEnvironment.toAppendStream(tableEnvironment.sqlQuery(querySQL), 
Row.class).print();

tableEnvironment.execute("JdbcExample");
}
}{code}
 

Throws Exception:

 
{code:java}
org.apache.flink.table.api.ValidationException: Type TIMESTAMP(6) of table 
field 'timestamp6_col' does not match with the physical type TIMESTAMP(3) of 
the 'timestamp6_col' field of the TableSource return 
type.org.apache.flink.table.api.ValidationException: Type TIMESTAMP(6) of table 
field 'timestamp6_col' does not match with the physical type TIMESTAMP(3) of 
the 'timestamp6_col' field of the TableSource return type.
 at 
org.apache.flink.table.utils.TypeMappingUtils.lambda$checkPhysicalLogicalTypeCompatible$4(TypeMappingUtils.java:164)
 at 
org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:277)
 at 
org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:254)
 at 
org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor.visit(LogicalTypeDefaultVisitor.java:132)
 at 
org.apache.flink.table.types.logical.TimestampType.accept(TimestampType.java:151)
 at 
org.apache.flink.table.utils.TypeMappingUtils.checkIfCompatible(TypeMappingUtils.java:254)
 at 
org.apache.flink.table.utils.TypeMappingUtils.checkPhysicalLogicalTypeCompatible(TypeMappingUtils.java:160)
 at 
org.apache.flink.table.utils.TypeMappingUtils.lambda$computeInCompositeType$8(TypeMappingUtils.java:232)
 at java.util.stream.Collectors.lambda$toMap$58(Collectors.java:1321) at 
java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169) at 
java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382) 
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481) at 
java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471) at 
java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708) at 
java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) at 
java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499) at 
org.apache.flink.table.utils.TypeMappingUtils.computeInCompositeType(TypeMappingUtils.java:214)
 at 
org.apache.flink.table.utils.TypeMappingUtils.computePhysicalIndices(TypeMappingUtils.java:192)
 at 
org.apache.flink.table.utils.TypeMappingUtils.computePhysicalIndicesOrTimeAttributeMarkers(TypeMappingUtils.java:112)
 at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.computeIndexMapping(StreamExecTableSourceScan.scala:211)
 at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:107)
 at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:62)
 at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
 at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlan(StreamExecTableSourceScan.scala:62)
 at 

[jira] [Updated] (FLINK-15379) JDBC connector return wrong value if defined dataType contains precision

2019-12-31 Thread Zhenghua Gao (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15379?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhenghua Gao updated FLINK-15379:
-
Description: 
A mysql table like:

 
{code:java}
// CREATE TABLE `currency` (
  `currency_id` bigint(20) NOT NULL,
  `currency_name` varchar(200) DEFAULT NULL,
  `rate` double DEFAULT NULL,
  `currency_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP,
  `country` varchar(100) DEFAULT NULL,
  `timestamp6` timestamp(6) NULL DEFAULT NULL,
  `time6` time(6) DEFAULT NULL,
  `gdp` decimal(10,4) DEFAULT NULL,
  PRIMARY KEY (`currency_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8
+-+---+--+-+-++-+--+
| currency_id | currency_name | rate | currency_time   | country | 
timestamp6 | time6   | gdp  |
+-+---+--+-+-++-+--+
|   1 | US Dollar | 1020 | 2019-12-20 17:23:00 | America | 
2019-12-20 12:22:00.023456 | 12:22:00.023456 | 100.0112 |
|   2 | Euro  |  114 | 2019-12-20 12:22:00 | Germany | 
2019-12-20 12:22:00.023456 | 12:22:00.023456 | 100.0112 |
|   3 | RMB   |   16 | 2019-12-20 12:22:00 | China   | 
2019-12-20 12:22:00.023456 | 12:22:00.023456 | 100.0112 |
|   4 | Yen   |1 | 2019-12-20 12:22:00 | Japan   | 
2019-12-20 12:22:00.123456 | 12:22:00.123456 | 100.4112 |
+-+---+--+-+-++-+--+{code}
 

If user defined a jdbc table as  dimension table like:

 
{code:java}
// 
public static final String mysqlCurrencyDDL = "CREATE TABLE currency (\n" +
"  currency_id BIGINT,\n" +
"  currency_name STRING,\n" +
"  rate DOUBLE,\n" +
"  currency_time TIMESTAMP(3),\n" +
"  country STRING,\n" +
"  timestamp6 TIMESTAMP(6),\n" +
"  time6 TIME(6),\n" +
"  gdp DECIMAL(10, 4)\n" +
") WITH (\n" +
"   'connector.type' = 'jdbc',\n" +
"   'connector.url' = 'jdbc:mysql://localhost:3306/test',\n" +
"   'connector.username' = 'root'," +
"   'connector.table' = 'currency',\n" +
"   'connector.driver' = 'com.mysql.jdbc.Driver',\n" +
"   'connector.lookup.cache.max-rows' = '500', \n" +
"   'connector.lookup.cache.ttl' = '10s',\n" +
"   'connector.lookup.max-retries' = '3'" +
")";
{code}
 

User will get wrong value in column `timestamp6`,`time6`,`gdp`:
{code:java}
// c.currency_id, c.currency_name, c.rate, c.currency_time, c.country, 
c.timestamp6, c.time6, c.gdp 

1,US 
Dollar,1020.0,2019-12-20T17:23,America,2019-12-20T12:22:00.023456,12:22,-0.0001
2,Euro,114.0,2019-12-20T12:22,Germany,2019-12-20T12:22:00.023456,12:22,-0.0001
4,Yen,1.0,2019-12-20T12:22,Japan,2019-12-20T12:22:00.123456,12:22,-0.0001{code}
 

 
{code:java}
public class JDBCSourceExample {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);

EnvironmentSettings envSettings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();
StreamTableEnvironment tableEnvironment = 
StreamTableEnvironment.create(env, envSettings);
String mysqlCurrencyDDL = "CREATE TABLE currency (\n" +
"  currency_id BIGINT,\n" +
"  currency_name STRING,\n" +
"  rate DOUBLE,\n" +
"  currency_time TIMESTAMP(3),\n" +
"  country STRING,\n" +
"  timestamp6 TIMESTAMP(6),\n" +
"  time6 TIME(6),\n" +
"  gdp DECIMAL(10, 4)\n" +
") WITH (\n" +
"   'connector.type' = 'jdbc',\n" +
"   'connector.url' = 'jdbc:mysql://localhost:3306/test',\n" +
"   'connector.username' = 'root'," +
"   'connector.table' = 'currency',\n" +
"   'connector.driver' = 'com.mysql.jdbc.Driver',\n" +
"   'connector.lookup.cache.max-rows' = '500', \n" +
"   'connector.lookup.cache.ttl' = '10s',\n" +
"   'connector.lookup.max-retries' = '3'" +
")";

tableEnvironment.sqlUpdate(mysqlCurrencyDDL);


String querySQL = "select * from currency";

tableEnvironment.toAppendStream(tableEnvironment.sqlQuery(querySQL), 
Row.class).print();

tableEnvironment.execute("JdbcExample");
}
}
{code}
 

  was:
A mysql table like:

 
{code:java}
// CREATE TABLE `currency` (
  `currency_id` bigint(20) NOT NULL,
  `currency_name` varchar(200) DEFAULT NULL,
  `rate` double 

[jira] [Updated] (FLINK-15445) JDBC Table Source didn't work for Types with precision (or/and scale)

2019-12-31 Thread Zhenghua Gao (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15445?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhenghua Gao updated FLINK-15445:
-
Description: 
{code:java}
 public class JDBCSourceExample {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);

EnvironmentSettings envSettings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();
StreamTableEnvironment tableEnvironment = 
StreamTableEnvironment.create(env, envSettings);
String mysqlCurrencyDDL = "CREATE TABLE currency (\n" +
"  currency_id BIGINT,\n" +
"  currency_name STRING,\n" +
"  rate DOUBLE,\n" +
"  currency_time TIMESTAMP(3),\n" +
"  country STRING,\n" +
"  timestamp6 TIMESTAMP(6),\n" +
"  time6 TIME(6),\n" +
"  gdp DECIMAL(10, 4)\n" +
") WITH (\n" +
"   'connector.type' = 'jdbc',\n" +
"   'connector.url' = 'jdbc:mysql://localhost:3306/test',\n" +
"   'connector.username' = 'root'," +
"   'connector.table' = 'currency',\n" +
"   'connector.driver' = 'com.mysql.jdbc.Driver',\n" +
"   'connector.lookup.cache.max-rows' = '500', \n" +
"   'connector.lookup.cache.ttl' = '10s',\n" +
"   'connector.lookup.max-retries' = '3'" +
")";

tableEnvironment.sqlUpdate(mysqlCurrencyDDL);


String querySQL = "select * from currency";

tableEnvironment.toAppendStream(tableEnvironment.sqlQuery(querySQL), 
Row.class).print();

tableEnvironment.execute("JdbcExample");
}
}{code}
 

Throws Exception:

 

Exception in thread "main" org.apache.flink.table.api.ValidationException: Type 
TIMESTAMP(6) of table field 'timestamp6' does not match with the physical type 
TIMESTAMP(3) of the 'timestamp9' field of the TableSource return type.

  was:
{code:java}
 public class JDBCSourceExample {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);

EnvironmentSettings envSettings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();
StreamTableEnvironment tableEnvironment = 
StreamTableEnvironment.create(env, envSettings);
String mysqlCurrencyDDL = "CREATE TABLE currency (\n" +
"  currency_id BIGINT,\n" +
"  currency_name STRING,\n" +
"  rate DOUBLE,\n" +
"  currency_time TIMESTAMP(3),\n" +
"  country STRING,\n" +
"  timestamp6 TIMESTAMP(6),\n" +
"  time6 TIME(6),\n" +
"  gdp DECIMAL(10, 4)\n" +
") WITH (\n" +
"   'connector.type' = 'jdbc',\n" +
"   'connector.url' = 'jdbc:mysql://localhost:3306/test',\n" +
"   'connector.username' = 'root'," +
"   'connector.table' = 'currency',\n" +
"   'connector.driver' = 'com.mysql.jdbc.Driver',\n" +
"   'connector.lookup.cache.max-rows' = '500', \n" +
"   'connector.lookup.cache.ttl' = '10s',\n" +
"   'connector.lookup.max-retries' = '3'" +
")";

tableEnvironment.sqlUpdate(mysqlCurrencyDDL);


String querySQL = "select * from currency";

tableEnvironment.toAppendStream(tableEnvironment.sqlQuery(querySQL), 
Row.class).print();

tableEnvironment.execute("JdbcExample");
}
}{code}


> JDBC Table Source didn't work for Types with precision (or/and scale)
> -
>
> Key: FLINK-15445
> URL: https://issues.apache.org/jira/browse/FLINK-15445
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / JDBC
>Affects Versions: 1.10.0
>Reporter: Zhenghua Gao
>Priority: Major
> Fix For: 1.10.0
>
>
> {code:java}
>  public class JDBCSourceExample {
> public static void main(String[] args) throws Exception {
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setParallelism(1);
> EnvironmentSettings envSettings = EnvironmentSettings.newInstance()
> .useBlinkPlanner()
> .inStreamingMode()
> .build();
> StreamTableEnvironment tableEnvironment = 
> StreamTableEnvironment.create(env, envSettings);
>  

[jira] [Updated] (FLINK-15445) JDBC Table Source didn't work for Types with precision (or/and scale)

2019-12-31 Thread Zhenghua Gao (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15445?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhenghua Gao updated FLINK-15445:
-
Description: 
{code:java}
 public class JDBCSourceExample {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);

EnvironmentSettings envSettings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();
StreamTableEnvironment tableEnvironment = 
StreamTableEnvironment.create(env, envSettings);
String mysqlCurrencyDDL = "CREATE TABLE currency (\n" +
"  currency_id BIGINT,\n" +
"  currency_name STRING,\n" +
"  rate DOUBLE,\n" +
"  currency_time TIMESTAMP(3),\n" +
"  country STRING,\n" +
"  timestamp6 TIMESTAMP(6),\n" +
"  time6 TIME(6),\n" +
"  gdp DECIMAL(10, 4)\n" +
") WITH (\n" +
"   'connector.type' = 'jdbc',\n" +
"   'connector.url' = 'jdbc:mysql://localhost:3306/test',\n" +
"   'connector.username' = 'root'," +
"   'connector.table' = 'currency',\n" +
"   'connector.driver' = 'com.mysql.jdbc.Driver',\n" +
"   'connector.lookup.cache.max-rows' = '500', \n" +
"   'connector.lookup.cache.ttl' = '10s',\n" +
"   'connector.lookup.max-retries' = '3'" +
")";

tableEnvironment.sqlUpdate(mysqlCurrencyDDL);


String querySQL = "select * from currency";

tableEnvironment.toAppendStream(tableEnvironment.sqlQuery(querySQL), 
Row.class).print();

tableEnvironment.execute("JdbcExample");
}
}{code}

  was:
{code:java}
public class JDBCSourceExample { public static void main(String[] args) throws 
Exception { StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); 
EnvironmentSettings envSettings = EnvironmentSettings.newInstance() 
.useBlinkPlanner() .inStreamingMode() .build(); StreamTableEnvironment 
tableEnvironment = StreamTableEnvironment.create(env, envSettings); String 
mysqlCurrencyDDL = "CREATE TABLE currency (\n" + " currency_id BIGINT,\n" + " 
currency_name STRING,\n" + " rate DOUBLE,\n" + " currency_time TIMESTAMP(3),\n" 
+ " country STRING,\n" + " timestamp6 TIMESTAMP(6),\n" + " time6 TIME(6),\n" + 
" gdp DECIMAL(10, 4)\n" + ") WITH (\n" + " 'connector.type' = 'jdbc',\n" + " 
'connector.url' = 'jdbc:mysql://localhost:3306/test',\n" + " 
'connector.username' = 'root'," + " 'connector.table' = 'currency',\n" + " 
'connector.driver' = 'com.mysql.jdbc.Driver',\n" + " 
'connector.lookup.cache.max-rows' = '500', \n" + " 'connector.lookup.cache.ttl' 
= '10s',\n" + " 'connector.lookup.max-retries' = '3'" + ")"; 
tableEnvironment.sqlUpdate(mysqlCurrencyDDL); String querySQL = "select * from 
currency"; tableEnvironment.toAppendStream(tableEnvironment.sqlQuery(querySQL), 
Row.class).print(); tableEnvironment.execute("JdbcExample"); } }
{code}
 


> JDBC Table Source didn't work for Types with precision (or/and scale)
> -
>
> Key: FLINK-15445
> URL: https://issues.apache.org/jira/browse/FLINK-15445
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / JDBC
>Affects Versions: 1.10.0
>Reporter: Zhenghua Gao
>Priority: Major
> Fix For: 1.10.0
>
>
> {code:java}
>  public class JDBCSourceExample {
> public static void main(String[] args) throws Exception {
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setParallelism(1);
> EnvironmentSettings envSettings = EnvironmentSettings.newInstance()
> .useBlinkPlanner()
> .inStreamingMode()
> .build();
> StreamTableEnvironment tableEnvironment = 
> StreamTableEnvironment.create(env, envSettings);
> String mysqlCurrencyDDL = "CREATE TABLE currency (\n" +
> "  currency_id BIGINT,\n" +
> "  currency_name STRING,\n" +
> "  rate DOUBLE,\n" +
> "  currency_time TIMESTAMP(3),\n" +
> "  country STRING,\n" +
> "  timestamp6 TIMESTAMP(6),\n" +
> "  time6 TIME(6),\n" +
> "  gdp DECIMAL(10, 4)\n" +
> ") WITH (\n" +
> "   'connector.type' = 'jdbc',\n" +
> "   'connector.url' = 'jdbc:mysql://localhost:3306/test',\n" +
> "   'connector.username' = 'root'," +
> "   

[jira] [Updated] (FLINK-15445) JDBC Table Source didn't work for Types with precision (or/and scale)

2019-12-31 Thread Zhenghua Gao (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15445?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhenghua Gao updated FLINK-15445:
-
Description: 
{code:java}
public class JDBCSourceExample { public static void main(String[] args) throws 
Exception { StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); 
EnvironmentSettings envSettings = EnvironmentSettings.newInstance() 
.useBlinkPlanner() .inStreamingMode() .build(); StreamTableEnvironment 
tableEnvironment = StreamTableEnvironment.create(env, envSettings); String 
mysqlCurrencyDDL = "CREATE TABLE currency (\n" + " currency_id BIGINT,\n" + " 
currency_name STRING,\n" + " rate DOUBLE,\n" + " currency_time TIMESTAMP(3),\n" 
+ " country STRING,\n" + " timestamp6 TIMESTAMP(6),\n" + " time6 TIME(6),\n" + 
" gdp DECIMAL(10, 4)\n" + ") WITH (\n" + " 'connector.type' = 'jdbc',\n" + " 
'connector.url' = 'jdbc:mysql://localhost:3306/test',\n" + " 
'connector.username' = 'root'," + " 'connector.table' = 'currency',\n" + " 
'connector.driver' = 'com.mysql.jdbc.Driver',\n" + " 
'connector.lookup.cache.max-rows' = '500', \n" + " 'connector.lookup.cache.ttl' 
= '10s',\n" + " 'connector.lookup.max-retries' = '3'" + ")"; 
tableEnvironment.sqlUpdate(mysqlCurrencyDDL); String querySQL = "select * from 
currency"; tableEnvironment.toAppendStream(tableEnvironment.sqlQuery(querySQL), 
Row.class).print(); tableEnvironment.execute("JdbcExample"); } }
{code}
 

> JDBC Table Source didn't work for Types with precision (or/and scale)
> -
>
> Key: FLINK-15445
> URL: https://issues.apache.org/jira/browse/FLINK-15445
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / JDBC
>Affects Versions: 1.10.0
>Reporter: Zhenghua Gao
>Priority: Major
> Fix For: 1.10.0
>
>
> {code:java}
> public class JDBCSourceExample { public static void main(String[] args) 
> throws Exception { StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); 
> EnvironmentSettings envSettings = EnvironmentSettings.newInstance() 
> .useBlinkPlanner() .inStreamingMode() .build(); StreamTableEnvironment 
> tableEnvironment = StreamTableEnvironment.create(env, envSettings); String 
> mysqlCurrencyDDL = "CREATE TABLE currency (\n" + " currency_id BIGINT,\n" + " 
> currency_name STRING,\n" + " rate DOUBLE,\n" + " currency_time 
> TIMESTAMP(3),\n" + " country STRING,\n" + " timestamp6 TIMESTAMP(6),\n" + " 
> time6 TIME(6),\n" + " gdp DECIMAL(10, 4)\n" + ") WITH (\n" + " 
> 'connector.type' = 'jdbc',\n" + " 'connector.url' = 
> 'jdbc:mysql://localhost:3306/test',\n" + " 'connector.username' = 'root'," + 
> " 'connector.table' = 'currency',\n" + " 'connector.driver' = 
> 'com.mysql.jdbc.Driver',\n" + " 'connector.lookup.cache.max-rows' = '500', 
> \n" + " 'connector.lookup.cache.ttl' = '10s',\n" + " 
> 'connector.lookup.max-retries' = '3'" + ")"; 
> tableEnvironment.sqlUpdate(mysqlCurrencyDDL); String querySQL = "select * 
> from currency"; 
> tableEnvironment.toAppendStream(tableEnvironment.sqlQuery(querySQL), 
> Row.class).print(); tableEnvironment.execute("JdbcExample"); } }
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-15379) JDBC connector return wrong value if defined dataType contains precision

2019-12-31 Thread Zhenghua Gao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15379?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17006047#comment-17006047
 ] 

Zhenghua Gao commented on FLINK-15379:
--

[~jark] the issue can't reproduce after FLINK-15168, instead, the 
ValidationException appears. And I opened a ticket(FLINK-15445) to track the 
ValidationException issue.

> JDBC connector return wrong value if defined dataType contains precision
> 
>
> Key: FLINK-15379
> URL: https://issues.apache.org/jira/browse/FLINK-15379
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / JDBC, Table SQL / Planner
>Affects Versions: 1.10.0
>Reporter: Leonard Xu
>Priority: Major
>
> A mysql table like:
>  
> {code:java}
> // CREATE TABLE `currency` (
>   `currency_id` bigint(20) NOT NULL,
>   `currency_name` varchar(200) DEFAULT NULL,
>   `rate` double DEFAULT NULL,
>   `currency_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP,
>   `country` varchar(100) DEFAULT NULL,
>   `timestamp6` timestamp(6) NULL DEFAULT NULL,
>   `time6` time(6) DEFAULT NULL,
>   `gdp` decimal(10,4) DEFAULT NULL,
>   PRIMARY KEY (`currency_id`)
> ) ENGINE=InnoDB DEFAULT CHARSET=utf8
> +-+---+--+-+-++-+--+
> | currency_id | currency_name | rate | currency_time   | country | 
> timestamp6 | time6   | gdp  |
> +-+---+--+-+-++-+--+
> |   1 | US Dollar | 1020 | 2019-12-20 17:23:00 | America | 
> 2019-12-20 12:22:00.023456 | 12:22:00.023456 | 100.0112 |
> |   2 | Euro  |  114 | 2019-12-20 12:22:00 | Germany | 
> 2019-12-20 12:22:00.023456 | 12:22:00.023456 | 100.0112 |
> |   3 | RMB   |   16 | 2019-12-20 12:22:00 | China   | 
> 2019-12-20 12:22:00.023456 | 12:22:00.023456 | 100.0112 |
> |   4 | Yen   |1 | 2019-12-20 12:22:00 | Japan   | 
> 2019-12-20 12:22:00.123456 | 12:22:00.123456 | 100.4112 |
> +-+---+--+-+-++-+--+{code}
>  
> If user defined a jdbc table as  dimension table like:
>  
> {code:java}
> // 
> public static final String mysqlCurrencyDDL = "CREATE TABLE currency (\n" +
> "  currency_id BIGINT,\n" +
> "  currency_name STRING,\n" +
> "  rate DOUBLE,\n" +
> "  currency_time TIMESTAMP(3),\n" +
> "  country STRING,\n" +
> "  timestamp6 TIMESTAMP(6),\n" +
> "  time6 TIME(6),\n" +
> "  gdp DECIMAL(10, 4)\n" +
> ") WITH (\n" +
> "   'connector.type' = 'jdbc',\n" +
> "   'connector.url' = 'jdbc:mysql://localhost:3306/test',\n" +
> "   'connector.username' = 'root'," +
> "   'connector.table' = 'currency',\n" +
> "   'connector.driver' = 'com.mysql.jdbc.Driver',\n" +
> "   'connector.lookup.cache.max-rows' = '500', \n" +
> "   'connector.lookup.cache.ttl' = '10s',\n" +
> "   'connector.lookup.max-retries' = '3'" +
> ")";
> {code}
>  
> User will get wrong value in column `timestamp6`,`time6`,`gdp`:
> {code:java}
> // c.currency_id, c.currency_name, c.rate, c.currency_time, c.country, 
> c.timestamp6, c.time6, c.gdp 
> 1,US 
> Dollar,1020.0,2019-12-20T17:23,America,2019-12-20T12:22:00.023456,12:22,-0.0001
> 2,Euro,114.0,2019-12-20T12:22,Germany,2019-12-20T12:22:00.023456,12:22,-0.0001
> 4,Yen,1.0,2019-12-20T12:22,Japan,2019-12-20T12:22:00.123456,12:22,-0.0001{code}
>  
>  
> {code:java}
> public class JDBCSourceExample {
> public static void main(String[] args) throws Exception {
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setParallelism(1);
> EnvironmentSettings envSettings = EnvironmentSettings.newInstance()
> .useBlinkPlanner()
> .inStreamingMode()
> .build();
> StreamTableEnvironment tableEnvironment = 
> StreamTableEnvironment.create(env, envSettings);
> String mysqlCurrencyDDL = "CREATE TABLE currency (\n" +
> "  currency_id BIGINT,\n" +
> "  currency_name STRING,\n" +
> "  rate DOUBLE,\n" +
> "  currency_time TIMESTAMP(3),\n" +
> "  country STRING,\n" +
> "  timestamp6 TIMESTAMP(6),\n" +
> "  time6 TIME(6),\n" +
> "  gdp DECIMAL(10, 4)\n" +
> ") WITH (\n" +
> "   'connector.type' = 'jdbc',\n" +
> "   'connector.url' = 'jdbc:mysql://localhost:3306/test',\n" +
>

[jira] [Created] (FLINK-15445) JDBC Table Source didn't work for Types with precision (or/and scale)

2019-12-31 Thread Zhenghua Gao (Jira)
Zhenghua Gao created FLINK-15445:


 Summary: JDBC Table Source didn't work for Types with precision 
(or/and scale)
 Key: FLINK-15445
 URL: https://issues.apache.org/jira/browse/FLINK-15445
 Project: Flink
  Issue Type: Bug
  Components: Connectors / JDBC
Affects Versions: 1.10.0
Reporter: Zhenghua Gao






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-15379) JDBC connector return wrong value if defined dataType contains precision

2019-12-31 Thread Zhenghua Gao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15379?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17005998#comment-17005998
 ] 

Zhenghua Gao commented on FLINK-15379:
--

Since FLINK-15168 modified logic of computing physical indices, the query in 
description would failed in validation phase:

Exception in thread "main" org.apache.flink.table.api.ValidationException: Type 
TIMESTAMP(6) of table field 'timestamp6' does not match with the physical type 
TIMESTAMP(3) of the 'timestamp9' field of the TableSource return type.

The root cause is the JDBC table source should implement *getProducedDataType* 
and return proper types with precision.

 
 

> JDBC connector return wrong value if defined dataType contains precision
> 
>
> Key: FLINK-15379
> URL: https://issues.apache.org/jira/browse/FLINK-15379
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / JDBC, Table SQL / Planner
>Affects Versions: 1.10.0
>Reporter: Leonard Xu
>Priority: Major
> Fix For: 1.10.0
>
>
> A mysql table like:
>  
> {code:java}
> // CREATE TABLE `currency` (
>   `currency_id` bigint(20) NOT NULL,
>   `currency_name` varchar(200) DEFAULT NULL,
>   `rate` double DEFAULT NULL,
>   `currency_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP,
>   `country` varchar(100) DEFAULT NULL,
>   `timestamp6` timestamp(6) NULL DEFAULT NULL,
>   `time6` time(6) DEFAULT NULL,
>   `gdp` decimal(10,4) DEFAULT NULL,
>   PRIMARY KEY (`currency_id`)
> ) ENGINE=InnoDB DEFAULT CHARSET=utf8
> +-+---+--+-+-++-+--+
> | currency_id | currency_name | rate | currency_time   | country | 
> timestamp6 | time6   | gdp  |
> +-+---+--+-+-++-+--+
> |   1 | US Dollar | 1020 | 2019-12-20 17:23:00 | America | 
> 2019-12-20 12:22:00.023456 | 12:22:00.023456 | 100.0112 |
> |   2 | Euro  |  114 | 2019-12-20 12:22:00 | Germany | 
> 2019-12-20 12:22:00.023456 | 12:22:00.023456 | 100.0112 |
> |   3 | RMB   |   16 | 2019-12-20 12:22:00 | China   | 
> 2019-12-20 12:22:00.023456 | 12:22:00.023456 | 100.0112 |
> |   4 | Yen   |1 | 2019-12-20 12:22:00 | Japan   | 
> 2019-12-20 12:22:00.123456 | 12:22:00.123456 | 100.4112 |
> +-+---+--+-+-++-+--+{code}
>  
> If user defined a jdbc table as  dimension table like:
>  
> {code:java}
> // 
> public static final String mysqlCurrencyDDL = "CREATE TABLE currency (\n" +
> "  currency_id BIGINT,\n" +
> "  currency_name STRING,\n" +
> "  rate DOUBLE,\n" +
> "  currency_time TIMESTAMP(3),\n" +
> "  country STRING,\n" +
> "  timestamp6 TIMESTAMP(6),\n" +
> "  time6 TIME(6),\n" +
> "  gdp DECIMAL(10, 4)\n" +
> ") WITH (\n" +
> "   'connector.type' = 'jdbc',\n" +
> "   'connector.url' = 'jdbc:mysql://localhost:3306/test',\n" +
> "   'connector.username' = 'root'," +
> "   'connector.table' = 'currency',\n" +
> "   'connector.driver' = 'com.mysql.jdbc.Driver',\n" +
> "   'connector.lookup.cache.max-rows' = '500', \n" +
> "   'connector.lookup.cache.ttl' = '10s',\n" +
> "   'connector.lookup.max-retries' = '3'" +
> ")";
> {code}
>  
> User will get wrong value in column `timestamp6`,`time6`,`gdp`:
> {code:java}
> // c.currency_id, c.currency_name, c.rate, c.currency_time, c.country, 
> c.timestamp6, c.time6, c.gdp 
> 1,US 
> Dollar,1020.0,2019-12-20T17:23,America,2019-12-20T12:22:00.023456,12:22,-0.0001
> 2,Euro,114.0,2019-12-20T12:22,Germany,2019-12-20T12:22:00.023456,12:22,-0.0001
> 4,Yen,1.0,2019-12-20T12:22,Japan,2019-12-20T12:22:00.123456,12:22,-0.0001{code}
>  
>  
> {code:java}
> public class JDBCSourceExample {
> public static void main(String[] args) throws Exception {
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setParallelism(1);
> EnvironmentSettings envSettings = EnvironmentSettings.newInstance()
> .useBlinkPlanner()
> .inStreamingMode()
> .build();
> StreamTableEnvironment tableEnvironment = 
> StreamTableEnvironment.create(env, envSettings);
> String mysqlCurrencyDDL = "CREATE TABLE currency (\n" +
> "  currency_id BIGINT,\n" +
> "  currency_name STRING,\n" +
> "  rate DOUBLE,\n" +
> "  currency_time TIMESTAMP(3),\n" +
>

  1   2   3   >