[jira] [Commented] (FLINK-21542) Add documentation for supporting INSERT INTO specific columns
[ https://issues.apache.org/jira/browse/FLINK-21542?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17294341#comment-17294341 ] Zhenghua Gao commented on FLINK-21542: -- Is it enough to update the syntax of [insert syntax|https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/sql/insert.html#syntax] ? > Add documentation for supporting INSERT INTO specific columns > - > > Key: FLINK-21542 > URL: https://issues.apache.org/jira/browse/FLINK-21542 > Project: Flink > Issue Type: New Feature > Components: Table SQL / API >Reporter: Jark Wu >Priority: Major > Fix For: 1.13.0 > > > We have supported INSERT INTO specific columns in FLINK-18726, but no add > documentation yet. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-18726) Support INSERT INTO specific columns
[ https://issues.apache.org/jira/browse/FLINK-18726?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17271091#comment-17271091 ] Zhenghua Gao commented on FLINK-18726: -- [~twalthr] [~Leonard Xu] [~jark] I have implemented this in our internal branch, and found it's a little complex. The column list specification overlaps with computed columns and partition columns in validation which sometimes causes difficulties. I'd like take it and contribute our proposal. > Support INSERT INTO specific columns > > > Key: FLINK-18726 > URL: https://issues.apache.org/jira/browse/FLINK-18726 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Planner >Reporter: Caizhi Weng >Assignee: Atri Sharma >Priority: Major > Labels: sprint > Fix For: 1.13.0 > > > Currently Flink only supports insert into a table without specifying columns, > but most database systems support insert into specific columns by > {code:sql} > INSERT INTO table_name(column1, column2, ...) ... > {code} > The columns not specified will be filled with default values or {{NULL}} if > no default value is given when creating the table. > As Flink currently does not support default values when creating tables, we > can fill the unspecified columns with {{NULL}} and throw exceptions if there > are columns with {{NOT NULL}} constraints. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-21026) Align column list specification with Hive in INSERT statement
[ https://issues.apache.org/jira/browse/FLINK-21026?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhenghua Gao updated FLINK-21026: - Description: HIVE-9481 allows column list specification in INSERT statement. The syntax is: {code:java} INSERT INTO TABLE table_name [PARTITION (partcol1[=val1], partcol2[=val2] ...)] [(column list)] select_statement FROM from_statement {code} In the MeanWhile, flink introduces PARTITION syntax that the PARTITION clause appears after the COLUMN LIST clause. It looks weird and luckily we don't support COLUMN LIST clause now. I think it'a good chance to align this with Hive now. was: HIVE-9481 allows column list specification in INSERT statement. The syntax is: {code:java} INSERT INTO TABLE table_name [PARTITION (partcol1[=val1], partcol2[=val2] ...)] [(column list)] select_statement FROM from_statement {code} In the MeanWhile, flink introduces PARTITION syntax that the PARTITION clause appears after the COLUMN LIST clause. It looks weird and luckily we don't support COLUMN LIST clause nowFLINK-18726. I think it'a good chance to align this with Hive now. > Align column list specification with Hive in INSERT statement > - > > Key: FLINK-21026 > URL: https://issues.apache.org/jira/browse/FLINK-21026 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Reporter: Zhenghua Gao >Priority: Major > > HIVE-9481 allows column list specification in INSERT statement. The syntax is: > {code:java} > INSERT INTO TABLE table_name > [PARTITION (partcol1[=val1], partcol2[=val2] ...)] > [(column list)] > select_statement FROM from_statement > {code} > In the MeanWhile, flink introduces PARTITION syntax that the PARTITION clause > appears after the COLUMN LIST clause. It looks weird and luckily we don't > support COLUMN LIST clause now. I think it'a good chance to align this with > Hive now. > > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-21026) Align column list specification with Hive in INSERT statement
[ https://issues.apache.org/jira/browse/FLINK-21026?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhenghua Gao updated FLINK-21026: - Description: HIVE-9481 allows column list specification in INSERT statement. The syntax is: {code:java} INSERT INTO TABLE table_name [PARTITION (partcol1[=val1], partcol2[=val2] ...)] [(column list)] select_statement FROM from_statement {code} In the MeanWhile, flink introduces PARTITION syntax that the PARTITION clause appears after the COLUMN LIST clause. It looks weird and luckily we don't support COLUMN LIST clause nowFLINK-18726. I think it'a good chance to align this with Hive now. was: [HIVE-9481|https://issues.apache.org/jira/browse/HIVE-9481] allows column list specification in INSERT statement. The syntax is: {code:java} INSERT INTO TABLE table_name [PARTITION (partcol1[=val1], partcol2[=val2] ...)] [(column list)] select_statement FROM from_statement {code} In the MeanWhile, flink introduces PARTITION syntax that the PARTITION clause appears after the COLUMN LIST clause. It looks weird and luckily we don't support COLUMN LIST clause now[FLINK-18726|https://issues.apache.org/jira/browse/FLINK-18726]. I think it'a good change to align this with Hive now. > Align column list specification with Hive in INSERT statement > - > > Key: FLINK-21026 > URL: https://issues.apache.org/jira/browse/FLINK-21026 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Reporter: Zhenghua Gao >Priority: Major > > HIVE-9481 allows column list specification in INSERT statement. The syntax is: > {code:java} > INSERT INTO TABLE table_name > [PARTITION (partcol1[=val1], partcol2[=val2] ...)] > [(column list)] > select_statement FROM from_statement > {code} > In the MeanWhile, flink introduces PARTITION syntax that the PARTITION clause > appears after the COLUMN LIST clause. It looks weird and luckily we don't > support COLUMN LIST clause nowFLINK-18726. I think it'a good chance to align > this with Hive now. > > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-21026) Align column list specification with Hive in INSERT statement
Zhenghua Gao created FLINK-21026: Summary: Align column list specification with Hive in INSERT statement Key: FLINK-21026 URL: https://issues.apache.org/jira/browse/FLINK-21026 Project: Flink Issue Type: Bug Components: Table SQL / API Reporter: Zhenghua Gao [HIVE-9481|https://issues.apache.org/jira/browse/HIVE-9481] allows column list specification in INSERT statement. The syntax is: {code:java} INSERT INTO TABLE table_name [PARTITION (partcol1[=val1], partcol2[=val2] ...)] [(column list)] select_statement FROM from_statement {code} In the MeanWhile, flink introduces PARTITION syntax that the PARTITION clause appears after the COLUMN LIST clause. It looks weird and luckily we don't support COLUMN LIST clause now[FLINK-18726|https://issues.apache.org/jira/browse/FLINK-18726]. I think it'a good change to align this with Hive now. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-16827) StreamExecTemporalSort should require a distribution trait in StreamExecTemporalSortRule
[ https://issues.apache.org/jira/browse/FLINK-16827?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17159601#comment-17159601 ] Zhenghua Gao commented on FLINK-16827: -- [~jark] [~libenchao] I think this bugfix should port to 1.11, or the TemporalSort in 1.11 is not available for production environments. > StreamExecTemporalSort should require a distribution trait in > StreamExecTemporalSortRule > > > Key: FLINK-16827 > URL: https://issues.apache.org/jira/browse/FLINK-16827 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner, Table SQL / Runtime >Affects Versions: 1.9.1 > Environment: flink on yarn > !image-2020-03-27-21-23-13-648.png! >Reporter: wuchangjun >Assignee: Benchao Li >Priority: Major > Labels: pull-request-available > Fix For: 1.12.0 > > Attachments: image-2020-03-27-21-22-21-122.png, > image-2020-03-27-21-22-44-191.png, image-2020-03-27-21-23-13-648.png > > Time Spent: 10m > Remaining Estimate: 0h > > flink reads kafka data and sorts by time field. In the case of multiple > concurrency, it throws the following null pointer exception. One concurrent > processing is normal. > !image-2020-03-27-21-22-21-122.png! > > !image-2020-03-27-21-22-44-191.png! > > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Closed] (FLINK-16117) Avoid register source in TableTestBase#addTableSource
[ https://issues.apache.org/jira/browse/FLINK-16117?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhenghua Gao closed FLINK-16117. Resolution: Won't Fix > Avoid register source in TableTestBase#addTableSource > - > > Key: FLINK-16117 > URL: https://issues.apache.org/jira/browse/FLINK-16117 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / API >Reporter: Zhenghua Gao >Assignee: Zhenghua Gao >Priority: Major > > This affects thousands of unit tests: > 1) explainSourceAsString of CatalogSourceTable changes > 2)JoinTest#testUDFInJoinCondition: SQL keywords must be escaped > 3) GroupWindowTest#testTimestampEventTimeTumblingGroupWindowWithProperties: > Reference to a rowtime or proctime window required > 4) SetOperatorsTest#testInWithProject: legacy type vs new type > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-15591) make up for feature parity by supporting CREATE TEMPORARY TABLE/VIEW in DDL
[ https://issues.apache.org/jira/browse/FLINK-15591?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17092691#comment-17092691 ] Zhenghua Gao commented on FLINK-15591: -- will open a PR soon for temporary table. > make up for feature parity by supporting CREATE TEMPORARY TABLE/VIEW in DDL > --- > > Key: FLINK-15591 > URL: https://issues.apache.org/jira/browse/FLINK-15591 > Project: Flink > Issue Type: Task > Components: Table SQL / API >Reporter: Bowen Li >Assignee: Kurt Young >Priority: Major > > make up for feature parity by supporting CREATE TEMPORARY TABLE/VIEW in DDL > as corresponding API to that in Table API. Table API already support such > operations explicitly in 1.10 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (FLINK-15591) make up for feature parity by supporting CREATE TEMPORARY TABLE/VIEW in DDL
[ https://issues.apache.org/jira/browse/FLINK-15591?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17092654#comment-17092654 ] Zhenghua Gao edited comment on FLINK-15591 at 4/26/20, 11:34 AM: - temporary view is supported in https://issues.apache.org/jira/browse/FLINK-17106 was (Author: docete): Supported in https://issues.apache.org/jira/browse/FLINK-17106 > make up for feature parity by supporting CREATE TEMPORARY TABLE/VIEW in DDL > --- > > Key: FLINK-15591 > URL: https://issues.apache.org/jira/browse/FLINK-15591 > Project: Flink > Issue Type: Task > Components: Table SQL / API >Reporter: Bowen Li >Assignee: Kurt Young >Priority: Major > > make up for feature parity by supporting CREATE TEMPORARY TABLE/VIEW in DDL > as corresponding API to that in Table API. Table API already support such > operations explicitly in 1.10 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-15591) make up for feature parity by supporting CREATE TEMPORARY TABLE/VIEW in DDL
[ https://issues.apache.org/jira/browse/FLINK-15591?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17092654#comment-17092654 ] Zhenghua Gao commented on FLINK-15591: -- Supported in https://issues.apache.org/jira/browse/FLINK-17106 > make up for feature parity by supporting CREATE TEMPORARY TABLE/VIEW in DDL > --- > > Key: FLINK-15591 > URL: https://issues.apache.org/jira/browse/FLINK-15591 > Project: Flink > Issue Type: Task > Components: Table SQL / API >Reporter: Bowen Li >Assignee: Kurt Young >Priority: Major > > make up for feature parity by supporting CREATE TEMPORARY TABLE/VIEW in DDL > as corresponding API to that in Table API. Table API already support such > operations explicitly in 1.10 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-17112) Support DESCRIBE view_name in Flink SQL
[ https://issues.apache.org/jira/browse/FLINK-17112?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhenghua Gao updated FLINK-17112: - Description: FLINK-14688 introduces DESCRIBE statement in sql parser, but doesn't implement it in planner side because the TableEnvironment.sqlUpdate returns nothing. Since FLINK-16366 introduces TableEnvironment.executeSql and returns TableResult, we can implement DESCRIBE statement in planner now. > Support DESCRIBE view_name in Flink SQL > --- > > Key: FLINK-17112 > URL: https://issues.apache.org/jira/browse/FLINK-17112 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / API, Table SQL / Planner >Affects Versions: 1.10.0 >Reporter: Zhenghua Gao >Priority: Major > Labels: pull-request-available > Fix For: 1.11.0 > > > FLINK-14688 introduces DESCRIBE statement in sql parser, but doesn't > implement it in planner side because the TableEnvironment.sqlUpdate returns > nothing. Since FLINK-16366 introduces TableEnvironment.executeSql and returns > TableResult, we can implement DESCRIBE statement in planner now. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-17112) Support DESCRIBE view_name in Flink SQL
[ https://issues.apache.org/jira/browse/FLINK-17112?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhenghua Gao updated FLINK-17112: - Summary: Support DESCRIBE view_name in Flink SQL (was: Support DESCRIBE VIEW view_name in Flink SQL) > Support DESCRIBE view_name in Flink SQL > --- > > Key: FLINK-17112 > URL: https://issues.apache.org/jira/browse/FLINK-17112 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / API, Table SQL / Planner >Affects Versions: 1.10.0 >Reporter: Zhenghua Gao >Priority: Major > Fix For: 1.11.0 > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (FLINK-16160) Schema#proctime and Schema#rowtime don't work in TableEnvironment#connect code path
[ https://issues.apache.org/jira/browse/FLINK-16160?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17088275#comment-17088275 ] Zhenghua Gao edited comment on FLINK-16160 at 4/21/20, 6:04 AM: The root causes are: * proctime()/rowtime() are used along with DefinedRowtimeAttributes/DefinedProctimeAttribute and ConnectorCatalogTable. The original code path stores the ConnectorCatalogTable object in Catalog and in validate phase, the RowType is derived from ConnectorCatalogTable.getSchema which contains time indicator. After FLINK-14490, we store CatalogTableImpl object in Catalog and in validate phrase, the RowType is derived from CatalogTableImpl.getSchema which doesn't contain time indicator. * In SqlToRel phase, FlinkCalciteCatalogReader converts ConnectorCatalogTable to TableSourceTable and converts CatalogTable to CatalogSourceTable. The TableSourceTable would be converted to LogicalTableScan directly and contains time indicator. Otherwise the CatalogSourceTable would be converted to a LogicalTableScan whose time indicator is erased(by FLINK-16345). The solution is straightforward: * We should instantiate the TableSource in CatalogSchemaTable and check if it's a DefinedRowtimeAttributes/DefinedProctimeAttribute instance. If so, rewrite the TableSchema to patch the time indicator(as it is in ConnectorCatalogTable#calculateSourceSchema). This will pass the validation. * Avoid erasing time indicator in CatalogSourceTable if the TableSource is a DefinedRowtimeAttributes/DefinedProctimeAttribute instance was (Author: docete): The root causes are: * proctime()/rowtime() are used along with DefinedRowtimeAttributes/DefinedProctimeAttribute and ConnectorCatalogTable. The original code path stores the ConnectorCatalogTable object in Catalog and in validate phrase, the RowType is derived from ConnectorCatalogTable.getSchema which contains time indicator. After FLINK-14490, we store CatalogTableImpl object in Catalog and in validate phrase, the RowType is derived from CatalogTableImpl.getSchema which doesn't contain time indicator. * In SqlToRel phrase, FlinkCalciteCatalogReader converts ConnectorCatalogTable to TableSourceTable and converts CatalogTable to CatalogSourceTable. The TableSourceTable would be converted to LogicalTableScan directly and contains time indicator. Otherwise the CatalogSourceTable would be converted to a LogicalTableScan whose time indicator is erased(by FLINK-16345). The solution is straightforward: * We should instantiate the TableSource in CatalogSchemaTable and check if it's a DefinedRowtimeAttributes/DefinedProctimeAttribute instance. If so, rewrite the TableSchema to patch the time indicator(as it is in ConnectorCatalogTable#calculateSourceSchema). This will pass the validation. * Avoid erasing time indicator in CatalogSourceTable if the TableSource is a DefinedRowtimeAttributes/DefinedProctimeAttribute instance > Schema#proctime and Schema#rowtime don't work in TableEnvironment#connect > code path > --- > > Key: FLINK-16160 > URL: https://issues.apache.org/jira/browse/FLINK-16160 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / API >Reporter: Zhenghua Gao >Assignee: Zhenghua Gao >Priority: Critical > Labels: pull-request-available > Fix For: 1.11.0 > > Time Spent: 10m > Remaining Estimate: 0h > > In ConnectTableDescriptor#createTemporaryTable, the proctime/rowtime > properties are ignored so the generated catalog table is not correct. We > should fix this to let TableEnvironment#connect() support watermark. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-16160) Schema#proctime and Schema#rowtime don't work in TableEnvironment#connect code path
[ https://issues.apache.org/jira/browse/FLINK-16160?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17088275#comment-17088275 ] Zhenghua Gao commented on FLINK-16160: -- The root causes are: * proctime()/rowtime() are used along with DefinedRowtimeAttributes/DefinedProctimeAttribute and ConnectorCatalogTable. The original code path stores the ConnectorCatalogTable object in Catalog and in validate phrase, the RowType is derived from ConnectorCatalogTable.getSchema which contains time indicator. After FLINK-14490, we store CatalogTableImpl object in Catalog and in validate phrase, the RowType is derived from CatalogTableImpl.getSchema which doesn't contain time indicator. * In SqlToRel phrase, FlinkCalciteCatalogReader converts ConnectorCatalogTable to TableSourceTable and converts CatalogTable to CatalogSourceTable. The TableSourceTable would be converted to LogicalTableScan directly and contains time indicator. Otherwise the CatalogSourceTable would be converted to a LogicalTableScan whose time indicator is erased(by FLINK-16345). The solution is straightforward: * We should instantiate the TableSource in CatalogSchemaTable and check if it's a DefinedRowtimeAttributes/DefinedProctimeAttribute instance. If so, rewrite the TableSchema to patch the time indicator(as it is in ConnectorCatalogTable#calculateSourceSchema). This will pass the validation. * Avoid erasing time indicator in CatalogSourceTable if the TableSource is a DefinedRowtimeAttributes/DefinedProctimeAttribute instance > Schema#proctime and Schema#rowtime don't work in TableEnvironment#connect > code path > --- > > Key: FLINK-16160 > URL: https://issues.apache.org/jira/browse/FLINK-16160 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / API >Reporter: Zhenghua Gao >Assignee: Zhenghua Gao >Priority: Critical > Labels: pull-request-available > Fix For: 1.11.0 > > Time Spent: 10m > Remaining Estimate: 0h > > In ConnectTableDescriptor#createTemporaryTable, the proctime/rowtime > properties are ignored so the generated catalog table is not correct. We > should fix this to let TableEnvironment#connect() support watermark. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-17111) Support SHOW VIEWS in Flink SQL
[ https://issues.apache.org/jira/browse/FLINK-17111?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhenghua Gao updated FLINK-17111: - Description: SHOW TABLES and SHOW VIEWS are not SQL standard-compliant commands. MySQL supports SHOW TABLES which lists the non-TEMPORARY tables(and views) in a given database, and doesn't support SHOW VIEWS. Oracle/SQL Server/PostgreSQL don't support SHOW TABLES and SHOW VIEWS. A workaround is to query a system table which stores metadata of tables and views. Hive supports both SHOW TABLES and SHOW VIEWS. We follows the Hive style which lists all tables and views with SHOW TABLES and lists only views with SHOW VIEWS. was: SHOW TABLES and SHOW VIEWS are not SQL standard-compliant commands. MySQL supports SHOW TABLES which lists the non-TEMPORARY tables(and views) in a given database, and doesn't support SHOW VIEWS. Oracle/SQL Server/PostgreSQL don't support SHOW TABLES and SHOW VIEWS. A workaround is to query a system table which stores tables and views. Hive supports both SHOW TABLES and SHOW VIEWS. We follows the Hive style which lists all tables and views with SHOW TABLES and lists only views with SHOW VIEWS. > Support SHOW VIEWS in Flink SQL > > > Key: FLINK-17111 > URL: https://issues.apache.org/jira/browse/FLINK-17111 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / API, Table SQL / Planner >Affects Versions: 1.10.0 >Reporter: Zhenghua Gao >Priority: Major > Fix For: 1.11.0 > > > SHOW TABLES and SHOW VIEWS are not SQL standard-compliant commands. > MySQL supports SHOW TABLES which lists the non-TEMPORARY tables(and views) in > a given database, and doesn't support SHOW VIEWS. > Oracle/SQL Server/PostgreSQL don't support SHOW TABLES and SHOW VIEWS. A > workaround is to query a system table which stores metadata of tables and > views. > Hive supports both SHOW TABLES and SHOW VIEWS. > We follows the Hive style which lists all tables and views with SHOW TABLES > and lists only views with SHOW VIEWS. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-17111) Support SHOW VIEWS in Flink SQL
[ https://issues.apache.org/jira/browse/FLINK-17111?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhenghua Gao updated FLINK-17111: - Description: SHOW TABLES and SHOW VIEWS are not SQL standard-compliant commands. MySQL supports SHOW TABLES which lists the non-TEMPORARY tables(and views) in a given database, and doesn't support SHOW VIEWS. Oracle/SQL Server/PostgreSQL don't support SHOW TABLES and SHOW VIEWS. A workaround is to query a system table which stores tables and views. Hive supports both SHOW TABLES and SHOW VIEWS. We follows the Hive style which lists all tables and views with SHOW TABLES and lists only views with SHOW VIEWS. > Support SHOW VIEWS in Flink SQL > > > Key: FLINK-17111 > URL: https://issues.apache.org/jira/browse/FLINK-17111 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / API, Table SQL / Planner >Affects Versions: 1.10.0 >Reporter: Zhenghua Gao >Priority: Major > Fix For: 1.11.0 > > > SHOW TABLES and SHOW VIEWS are not SQL standard-compliant commands. > MySQL supports SHOW TABLES which lists the non-TEMPORARY tables(and views) in > a given database, and doesn't support SHOW VIEWS. > Oracle/SQL Server/PostgreSQL don't support SHOW TABLES and SHOW VIEWS. A > workaround is to query a system table which stores tables and views. > Hive supports both SHOW TABLES and SHOW VIEWS. > We follows the Hive style which lists all tables and views with SHOW TABLES > and lists only views with SHOW VIEWS. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-17113) Refactor view support in SQL Client
Zhenghua Gao created FLINK-17113: Summary: Refactor view support in SQL Client Key: FLINK-17113 URL: https://issues.apache.org/jira/browse/FLINK-17113 Project: Flink Issue Type: Sub-task Components: Table SQL / Client Affects Versions: 1.10.0 Reporter: Zhenghua Gao Fix For: 1.11.0 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-17112) Support DESCRIBE VIEW view_name in Flink SQL
Zhenghua Gao created FLINK-17112: Summary: Support DESCRIBE VIEW view_name in Flink SQL Key: FLINK-17112 URL: https://issues.apache.org/jira/browse/FLINK-17112 Project: Flink Issue Type: Sub-task Components: Table SQL / API, Table SQL / Planner Affects Versions: 1.10.0 Reporter: Zhenghua Gao Fix For: 1.11.0 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-17111) Support SHOW VIEWS in Flink SQL
Zhenghua Gao created FLINK-17111: Summary: Support SHOW VIEWS in Flink SQL Key: FLINK-17111 URL: https://issues.apache.org/jira/browse/FLINK-17111 Project: Flink Issue Type: Sub-task Components: Table SQL / API, Table SQL / Planner Affects Versions: 1.10.0 Reporter: Zhenghua Gao Fix For: 1.11.0 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-17106) Support create/drop view in Flink SQL
Zhenghua Gao created FLINK-17106: Summary: Support create/drop view in Flink SQL Key: FLINK-17106 URL: https://issues.apache.org/jira/browse/FLINK-17106 Project: Flink Issue Type: Sub-task Components: Table SQL / API, Table SQL / Legacy Planner, Table SQL / Planner Affects Versions: 1.10.0 Reporter: Zhenghua Gao Fix For: 1.11.0 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-17105) FLIP-71: E2E view support for Flink SQL
[ https://issues.apache.org/jira/browse/FLINK-17105?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhenghua Gao updated FLINK-17105: - Component/s: Table SQL / Planner Table SQL / Legacy Planner Table SQL / Client Table SQL / API > FLIP-71: E2E view support for Flink SQL > --- > > Key: FLINK-17105 > URL: https://issues.apache.org/jira/browse/FLINK-17105 > Project: Flink > Issue Type: New Feature > Components: Table SQL / API, Table SQL / Client, Table SQL / Legacy > Planner, Table SQL / Planner >Affects Versions: 1.10.0 >Reporter: Zhenghua Gao >Priority: Major > Fix For: 1.11.0 > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-17105) FLIP-71: E2E view support for Flink SQL
[ https://issues.apache.org/jira/browse/FLINK-17105?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhenghua Gao updated FLINK-17105: - Summary: FLIP-71: E2E view support for Flink SQL (was: FLIP-71: E2E viewsupport ) > FLIP-71: E2E view support for Flink SQL > --- > > Key: FLINK-17105 > URL: https://issues.apache.org/jira/browse/FLINK-17105 > Project: Flink > Issue Type: New Feature >Reporter: Zhenghua Gao >Priority: Major > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-17105) FLIP-71: E2E view support for Flink SQL
[ https://issues.apache.org/jira/browse/FLINK-17105?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhenghua Gao updated FLINK-17105: - Affects Version/s: 1.10.0 > FLIP-71: E2E view support for Flink SQL > --- > > Key: FLINK-17105 > URL: https://issues.apache.org/jira/browse/FLINK-17105 > Project: Flink > Issue Type: New Feature >Affects Versions: 1.10.0 >Reporter: Zhenghua Gao >Priority: Major > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-17105) FLIP-71: E2E viewsupport
Zhenghua Gao created FLINK-17105: Summary: FLIP-71: E2E viewsupport Key: FLINK-17105 URL: https://issues.apache.org/jira/browse/FLINK-17105 Project: Flink Issue Type: New Feature Reporter: Zhenghua Gao -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-17105) FLIP-71: E2E view support for Flink SQL
[ https://issues.apache.org/jira/browse/FLINK-17105?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhenghua Gao updated FLINK-17105: - Fix Version/s: 1.11.0 > FLIP-71: E2E view support for Flink SQL > --- > > Key: FLINK-17105 > URL: https://issues.apache.org/jira/browse/FLINK-17105 > Project: Flink > Issue Type: New Feature >Affects Versions: 1.10.0 >Reporter: Zhenghua Gao >Priority: Major > Fix For: 1.11.0 > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-17098) CatalogManager#dropTemporaryTable and dropTemporaryView should use ObjectIdentifier as its argument
Zhenghua Gao created FLINK-17098: Summary: CatalogManager#dropTemporaryTable and dropTemporaryView should use ObjectIdentifier as its argument Key: FLINK-17098 URL: https://issues.apache.org/jira/browse/FLINK-17098 Project: Flink Issue Type: Improvement Components: Table SQL / API Affects Versions: 1.10.0 Reporter: Zhenghua Gao Fix For: 1.11.0 Since CatalogManager#createTable, createTemporaryTable and dropTable use the given fully qualified ObjectIdentifier to create or drop tables/temporary tables, we should also use ObjectIdentifier (instead of UnresolvedIdentifier) in dropTemporaryTable and dropTemporaryView. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-17067) CatalogManager#createTable and createTemporaryTable should provide consistent semantics
[ https://issues.apache.org/jira/browse/FLINK-17067?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17079280#comment-17079280 ] Zhenghua Gao commented on FLINK-17067: -- Seems we didn't expose [OR REPLACE] out yet. Most invoking of CatalogManager#createTemporaryTable use "[OR REPLACE] = false", which is compatible with "[IF NOT EXISTS] = false". The only two exceptions are TableEnvironmentImpl#registerTableSourceInternal() and TableEnvironmentImpl#registerTableSinkInternal() which are called in deprecated APIs (registerTableSource/Sink). > CatalogManager#createTable and createTemporaryTable should provide consistent > semantics > --- > > Key: FLINK-17067 > URL: https://issues.apache.org/jira/browse/FLINK-17067 > Project: Flink > Issue Type: Improvement > Components: Table SQL / API >Affects Versions: 1.10.0 >Reporter: Zhenghua Gao >Priority: Major > Fix For: 1.11.0 > > > Currently CatalogManager#createTable provides [IF NOT EXISTS] semantic and > CatalogManager#createTemporaryTable provides [OR REPLACE] semantic. IMO they > should provide consistent semantics: [IF NOT EXISTS] or [OR REPLACE]. > I prefer [IF NOT EXISTS] since we didn't support [OR REPLACE] in table > DDL(and view DDL) currently. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-17067) CatalogManager#createTable and createTemporaryTable should provide consistent semantics
[ https://issues.apache.org/jira/browse/FLINK-17067?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhenghua Gao updated FLINK-17067: - Description: Currently CatalogManager#createTable provides [IF NOT EXISTS] semantic and CatalogManager#createTemporaryTable provides [OR REPLACE] semantic. IMO they should provide consistent semantics: [IF NOT EXISTS] or [OR REPLACE]. I prefer [IF NOT EXISTS] since we didn't support [OR REPLACE] in table DDL(and view DDL) currently. was: Currently CatalogManager#createTable provides [IF NOT EXISTS] semantic and CatalogManager#createTemporaryTable provides [OR REPLACE] semantic. IMO they should provide consistent semantics: [IF NOT EXISTS] or [OR REPLACE] or BOTH. I prefer [IF NOT EXISTS] since we didn't support [OR REPLACE] in table DDL(and view DDL) currently. > CatalogManager#createTable and createTemporaryTable should provide consistent > semantics > --- > > Key: FLINK-17067 > URL: https://issues.apache.org/jira/browse/FLINK-17067 > Project: Flink > Issue Type: Improvement > Components: Table SQL / API >Affects Versions: 1.10.0 >Reporter: Zhenghua Gao >Priority: Major > Fix For: 1.11.0 > > > Currently CatalogManager#createTable provides [IF NOT EXISTS] semantic and > CatalogManager#createTemporaryTable provides [OR REPLACE] semantic. IMO they > should provide consistent semantics: [IF NOT EXISTS] or [OR REPLACE]. > I prefer [IF NOT EXISTS] since we didn't support [OR REPLACE] in table > DDL(and view DDL) currently. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-17067) CatalogManager#createTable and createTemporaryTable should provide consistent semantics
[ https://issues.apache.org/jira/browse/FLINK-17067?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17079091#comment-17079091 ] Zhenghua Gao commented on FLINK-17067: -- Or the CREATE [TEMPORARY] TABLE|VIEW [IF NOT EXISTS] can't provide consistent semantics. cc [~dwysakowicz] [~jark] What do you think? > CatalogManager#createTable and createTemporaryTable should provide consistent > semantics > --- > > Key: FLINK-17067 > URL: https://issues.apache.org/jira/browse/FLINK-17067 > Project: Flink > Issue Type: Improvement > Components: Table SQL / API >Affects Versions: 1.10.0 >Reporter: Zhenghua Gao >Priority: Major > Fix For: 1.11.0 > > > Currently CatalogManager#createTable provides [IF NOT EXISTS] semantic and > CatalogManager#createTemporaryTable provides [OR REPLACE] semantic. IMO they > should provide consistent semantics: [IF NOT EXISTS] or [OR REPLACE] or BOTH. > I prefer [IF NOT EXISTS] since we didn't support [OR REPLACE] in table > DDL(and view DDL) currently. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-17067) CatalogManager#createTable and createTemporaryTable should provide consistent semantics
Zhenghua Gao created FLINK-17067: Summary: CatalogManager#createTable and createTemporaryTable should provide consistent semantics Key: FLINK-17067 URL: https://issues.apache.org/jira/browse/FLINK-17067 Project: Flink Issue Type: Improvement Components: Table SQL / API Affects Versions: 1.10.0 Reporter: Zhenghua Gao Fix For: 1.11.0 Currently CatalogManager#createTable provides [IF NOT EXISTS] semantic and CatalogManager#createTemporaryTable provides [OR REPLACE] semantic. IMO they should provide consistent semantics: [IF NOT EXISTS] or [OR REPLACE] or BOTH. I prefer [IF NOT EXISTS] since we didn't support [OR REPLACE] in table DDL(and view DDL) currently. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-16632) SqlDateTimeUtils#toSqlTimestamp(String, String) may yield incorrect result
[ https://issues.apache.org/jira/browse/FLINK-16632?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17077035#comment-17077035 ] Zhenghua Gao commented on FLINK-16632: -- SqlDateTimeUtils#toSqlTimestamp(String, String) may yield incorrect result. Which cause: 1) cast STRING to TIMESTAMP yields incompatible result. The original result comes from DateTimeUtils#timestampStringToUnixDate which supports special cases like '1999-9-10 05:20:10' or '1999-9-10'. 2) TO_TIMESTAMP yields incorrect result. The original result comes from SqlDateTimeUtils#toTimestamp(String, String, TimeZone) which follows rules of completion of java.text.SimpleDateFormat Will file a PR to fix this > SqlDateTimeUtils#toSqlTimestamp(String, String) may yield incorrect result > -- > > Key: FLINK-16632 > URL: https://issues.apache.org/jira/browse/FLINK-16632 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Reporter: Jingsong Lee >Assignee: Zhenghua Gao >Priority: Critical > Fix For: 1.10.1, 1.11.0 > > Attachments: image-2020-04-03-15-39-35-702.png > > > Legacy planner support SQL: "CAST('1999-9-10' AS TIMESTAMP)". > Blink planner loose this support after timestamp precision support. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-16632) SqlDateTimeUtils#toSqlTimestamp(String, String) may yield incorrect result
[ https://issues.apache.org/jira/browse/FLINK-16632?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhenghua Gao updated FLINK-16632: - Summary: SqlDateTimeUtils#toSqlTimestamp(String, String) may yield incorrect result (was: Cast string to timestamp fail) > SqlDateTimeUtils#toSqlTimestamp(String, String) may yield incorrect result > -- > > Key: FLINK-16632 > URL: https://issues.apache.org/jira/browse/FLINK-16632 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Reporter: Jingsong Lee >Assignee: Zhenghua Gao >Priority: Critical > Fix For: 1.10.1, 1.11.0 > > Attachments: image-2020-04-03-15-39-35-702.png > > > Legacy planner support SQL: "CAST('1999-9-10' AS TIMESTAMP)". > Blink planner loose this support after timestamp precision support. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-16823) The functioin TIMESTAMPDIFF doesn't perform expected result
[ https://issues.apache.org/jira/browse/FLINK-16823?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17071511#comment-17071511 ] Zhenghua Gao commented on FLINK-16823: -- After talk with [~danny0405], we decide to introduce addMonths/subtractMonths as a temporary solution on Flink side to fix this corner case before CALCITE-3881 is fixed. > The functioin TIMESTAMPDIFF doesn't perform expected result > --- > > Key: FLINK-16823 > URL: https://issues.apache.org/jira/browse/FLINK-16823 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.9.1, 1.10.0 >Reporter: Adam N D DENG >Assignee: Zhenghua Gao >Priority: Major > Labels: pull-request-available > Attachments: image-2020-03-27-13-50-51-955.png > > Time Spent: 10m > Remaining Estimate: 0h > > For example, > In mysql bellow sql get result 6, but in flink the output is 5 > SELECT timestampdiff (MONTH, TIMESTAMP '2019-09-01 00:00:00',TIMESTAMP > '2020-03-01 00:00:00' ) > > !image-2020-03-27-13-50-51-955.png! > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-16379) Introduce fromValues in TableEnvironment
[ https://issues.apache.org/jira/browse/FLINK-16379?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17070659#comment-17070659 ] Zhenghua Gao commented on FLINK-16379: -- [~dwysakowicz] You could merge your branch first(better to support Row and fromValues(DataType, Object...) if you have time). I will find another way to support plan test in testing utilities. > Introduce fromValues in TableEnvironment > > > Key: FLINK-16379 > URL: https://issues.apache.org/jira/browse/FLINK-16379 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / API >Reporter: Dawid Wysakowicz >Assignee: Dawid Wysakowicz >Priority: Major > Labels: pull-request-available > Fix For: 1.11.0 > > Time Spent: 10m > Remaining Estimate: 0h > > Introduce a fromValues method to TableEnvironment similar to {{VALUES}} > clause in SQL > The suggested API could look like: > {code} > /** >* Creates a Table from a given row constructing expressions. >* >* Examples: >* >* You can use {@link Expressions#row(Object, Object...)} to create > a composite rows: >* {@code >* tEnv.fromValues( >* row(1, "ABC"), >* row(2L, "ABCDE") >* ) >* } >* will produce a Table with a schema as follows: >* {@code >* root >* |-- f0: BIGINT NOT NULL >* |-- f1: VARCHAR(5) NOT NULL >* } >* >* ROWs that are a result of e.g. a function call are not flattened >* {@code >* public class RowFunction extends ScalarFunction { >* @DataTypeHint("ROW") >* Row eval(); >* } >* >* tEnv.fromValues( >* call(new RowFunction()), >* call(new RowFunction()) >* ) >* } >* will produce a Table with a schema as follows: >* {@code >* root >* |-- f0: ROW<`f0` BIGINT, `f1` VARCHAR(5)> >* } >* >* The row constructor can be dropped to create a table with a > single row: >* >* ROWs that are a result of e.g. a function call are not flattened >* {@code >* tEnv.fromValues( >* 1, >* 2L, >* 3 >* ) >* } >* will produce a Table with a schema as follows: >* {@code >* root >* |-- f0: BIGINT NOT NULL >* } >* >* @param expressions Expressions for constructing rows of the VALUES > table. >*/ > Table fromValues(Expression... expressions); > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-16823) The functioin TIMESTAMPDIFF doesn't perform expected result
[ https://issues.apache.org/jira/browse/FLINK-16823?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17068618#comment-17068618 ] Zhenghua Gao commented on FLINK-16823: -- [~danny0405] What the plan of the upgrading of Calcite? Should we also upgrade calcite-avatica? > The functioin TIMESTAMPDIFF doesn't perform expected result > --- > > Key: FLINK-16823 > URL: https://issues.apache.org/jira/browse/FLINK-16823 > Project: Flink > Issue Type: Bug >Affects Versions: 1.9.1, 1.10.0 >Reporter: Adam N D DENG >Assignee: Zhenghua Gao >Priority: Major > Attachments: image-2020-03-27-13-50-51-955.png > > > For example, > In mysql bellow sql get result 6, but in flink the output is 5 > SELECT timestampdiff (MONTH, TIMESTAMP '2019-09-01 00:00:00',TIMESTAMP > '2020-03-01 00:00:00' ) > > !image-2020-03-27-13-50-51-955.png! > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-16823) The functioin TIMESTAMPDIFF doesn't perform expected result
[ https://issues.apache.org/jira/browse/FLINK-16823?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17068610#comment-17068610 ] Zhenghua Gao commented on FLINK-16823: -- The root cause is CALCITE-3881, will fix on calcite side. > The functioin TIMESTAMPDIFF doesn't perform expected result > --- > > Key: FLINK-16823 > URL: https://issues.apache.org/jira/browse/FLINK-16823 > Project: Flink > Issue Type: Bug >Affects Versions: 1.9.1, 1.10.0 >Reporter: Adam N D DENG >Assignee: Zhenghua Gao >Priority: Major > Attachments: image-2020-03-27-13-50-51-955.png > > > For example, > In mysql bellow sql get result 6, but in flink the output is 5 > SELECT timestampdiff (MONTH, TIMESTAMP '2019-09-01 00:00:00',TIMESTAMP > '2020-03-01 00:00:00' ) > > !image-2020-03-27-13-50-51-955.png! > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-16823) The functioin TIMESTAMPDIFF doesn't perform expected result
[ https://issues.apache.org/jira/browse/FLINK-16823?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17068426#comment-17068426 ] Zhenghua Gao commented on FLINK-16823: -- Flink use Calcite's SqlFunctions#subtractMonths to find the number of month between two dates/timestamps. There may be a bug in the algorithm of SqlFunctions#subtractMonths. Will dig deeper to find the root cause and fix. > The functioin TIMESTAMPDIFF doesn't perform expected result > --- > > Key: FLINK-16823 > URL: https://issues.apache.org/jira/browse/FLINK-16823 > Project: Flink > Issue Type: Bug >Affects Versions: 1.9.1, 1.10.0 >Reporter: Adam N D DENG >Assignee: Zhenghua Gao >Priority: Major > Attachments: image-2020-03-27-13-50-51-955.png > > > For example, > In mysql bellow sql get result 6, but in flink the output is 5 > SELECT timestampdiff (MONTH, TIMESTAMP '2019-09-01 00:00:00',TIMESTAMP > '2020-03-01 00:00:00' ) > > !image-2020-03-27-13-50-51-955.png! > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-16823) The functioin TIMESTAMPDIFF doesn't perform expected result
[ https://issues.apache.org/jira/browse/FLINK-16823?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17068396#comment-17068396 ] Zhenghua Gao commented on FLINK-16823: -- OK, i will take a look at it. > The functioin TIMESTAMPDIFF doesn't perform expected result > --- > > Key: FLINK-16823 > URL: https://issues.apache.org/jira/browse/FLINK-16823 > Project: Flink > Issue Type: Bug >Affects Versions: 1.9.1, 1.10.0 >Reporter: Adam N D DENG >Priority: Major > Attachments: image-2020-03-27-13-50-51-955.png > > > For example, > In mysql bellow sql get result 6, but in flink the output is 5 > SELECT timestampdiff (MONTH, TIMESTAMP '2019-09-01 00:00:00',TIMESTAMP > '2020-03-01 00:00:00' ) > > !image-2020-03-27-13-50-51-955.png! > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-16800) TypeMappingUtils#checkIfCompatible didn't deal with nested types
[ https://issues.apache.org/jira/browse/FLINK-16800?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhenghua Gao updated FLINK-16800: - Description: the planner uses TypeMappingUtils#checkIfCompatible to validate logical schema and physical schema are compatible when translate CatalogSinkModifyOperation to Calcite relational expression. The validation didn't deal with nested types well, which could throw the following ValidationException: {code:java} Exception in thread "main" org.apache.flink.table.api.ValidationException: Type ARRAY> of table field 'old' does not match with the physical type ARRAY> of the 'old' field of the TableSource return type. at org.apache.flink.table.utils.TypeMappingUtils.lambda$checkPhysicalLogicalTypeCompatible$4(TypeMappingUtils.java:164) at org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:277) at org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:254) at org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor.visit(LogicalTypeDefaultVisitor.java:157) at org.apache.flink.table.types.logical.ArrayType.accept(ArrayType.java:110) at org.apache.flink.table.utils.TypeMappingUtils.checkIfCompatible(TypeMappingUtils.java:254) at org.apache.flink.table.utils.TypeMappingUtils.checkPhysicalLogicalTypeCompatible(TypeMappingUtils.java:160) at org.apache.flink.table.utils.TypeMappingUtils.lambda$computeInCompositeType$8(TypeMappingUtils.java:232) at java.util.stream.Collectors.lambda$toMap$58(Collectors.java:1321) at java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169) at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382) at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481) at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471) at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708) at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499) at org.apache.flink.table.utils.TypeMappingUtils.computeInCompositeType(TypeMappingUtils.java:214) at org.apache.flink.table.utils.TypeMappingUtils.computePhysicalIndices(TypeMappingUtils.java:192) at org.apache.flink.table.utils.TypeMappingUtils.computePhysicalIndicesOrTimeAttributeMarkers(TypeMappingUtils.java:112) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.computeIndexMapping(StreamExecTableSourceScan.scala:212) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:107) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:62) at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlan(StreamExecTableSourceScan.scala:62) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange.translateToPlanInternal(StreamExecExchange.scala:84) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange.translateToPlanInternal(StreamExecExchange.scala:44) at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange.translateToPlan(StreamExecExchange.scala:44) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLimit.translateToPlanInternal(StreamExecLimit.scala:161) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLimit.translateToPlanInternal(StreamExecLimit.scala:51) at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLimit.translateToPlan(StreamExecLimit.scala:51) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToTransformation(StreamExecSink.scala:184) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:153) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48) at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48) at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60) at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:59) at
[jira] [Updated] (FLINK-16800) TypeMappingUtils#checkIfCompatible didn't deal with nested types
[ https://issues.apache.org/jira/browse/FLINK-16800?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhenghua Gao updated FLINK-16800: - Summary: TypeMappingUtils#checkIfCompatible didn't deal with nested types (was: TypeMappingUtils#checkPhysicalLogicalTypeCompatible didn't deal with nested types) > TypeMappingUtils#checkIfCompatible didn't deal with nested types > > > Key: FLINK-16800 > URL: https://issues.apache.org/jira/browse/FLINK-16800 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Affects Versions: 1.10.0 >Reporter: Zhenghua Gao >Priority: Major > Fix For: 1.11.0 > > > the planner will use TypeMappingUtils#checkPhysicalLogicalTypeCompatible to > validate logical schema and physical schema are compatible when translate > CatalogSinkModifyOperation to Calcite relational expression. The validation > didn't deal with nested types well, which could expose the following > ValidationException: > {code:java} > Exception in thread "main" org.apache.flink.table.api.ValidationException: > Type ARRAY> of table field 'old' > does not match with the physical type ARRAY LEGACY('DECIMAL', 'DECIMAL')>> of the 'old' field of the TableSource return > type. > at > org.apache.flink.table.utils.TypeMappingUtils.lambda$checkPhysicalLogicalTypeCompatible$4(TypeMappingUtils.java:164) > at > org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:277) > at > org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:254) > at > org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor.visit(LogicalTypeDefaultVisitor.java:157) > at org.apache.flink.table.types.logical.ArrayType.accept(ArrayType.java:110) > at > org.apache.flink.table.utils.TypeMappingUtils.checkIfCompatible(TypeMappingUtils.java:254) > at > org.apache.flink.table.utils.TypeMappingUtils.checkPhysicalLogicalTypeCompatible(TypeMappingUtils.java:160) > at > org.apache.flink.table.utils.TypeMappingUtils.lambda$computeInCompositeType$8(TypeMappingUtils.java:232) > at java.util.stream.Collectors.lambda$toMap$58(Collectors.java:1321) > at java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169) > at > java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382) > at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481) > at > java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471) > at > java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708) > at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) > at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499) > at > org.apache.flink.table.utils.TypeMappingUtils.computeInCompositeType(TypeMappingUtils.java:214) > at > org.apache.flink.table.utils.TypeMappingUtils.computePhysicalIndices(TypeMappingUtils.java:192) > at > org.apache.flink.table.utils.TypeMappingUtils.computePhysicalIndicesOrTimeAttributeMarkers(TypeMappingUtils.java:112) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.computeIndexMapping(StreamExecTableSourceScan.scala:212) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:107) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:62) > at > org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlan(StreamExecTableSourceScan.scala:62) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange.translateToPlanInternal(StreamExecExchange.scala:84) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange.translateToPlanInternal(StreamExecExchange.scala:44) > at > org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange.translateToPlan(StreamExecExchange.scala:44) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLimit.translateToPlanInternal(StreamExecLimit.scala:161) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLimit.translateToPlanInternal(StreamExecLimit.scala:51) > at > org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLimit.translateToPlan(StreamExecLimit.scala:51) > at >
[jira] [Updated] (FLINK-16800) TypeMappingUtils#checkIfCompatible didn't deal with nested types
[ https://issues.apache.org/jira/browse/FLINK-16800?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhenghua Gao updated FLINK-16800: - Description: the planner will use TypeMappingUtils#checkIfCompatible to validate logical schema and physical schema are compatible when translate CatalogSinkModifyOperation to Calcite relational expression. The validation didn't deal with nested types well, which could expose the following ValidationException: {code:java} Exception in thread "main" org.apache.flink.table.api.ValidationException: Type ARRAY> of table field 'old' does not match with the physical type ARRAY> of the 'old' field of the TableSource return type. at org.apache.flink.table.utils.TypeMappingUtils.lambda$checkPhysicalLogicalTypeCompatible$4(TypeMappingUtils.java:164) at org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:277) at org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:254) at org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor.visit(LogicalTypeDefaultVisitor.java:157) at org.apache.flink.table.types.logical.ArrayType.accept(ArrayType.java:110) at org.apache.flink.table.utils.TypeMappingUtils.checkIfCompatible(TypeMappingUtils.java:254) at org.apache.flink.table.utils.TypeMappingUtils.checkPhysicalLogicalTypeCompatible(TypeMappingUtils.java:160) at org.apache.flink.table.utils.TypeMappingUtils.lambda$computeInCompositeType$8(TypeMappingUtils.java:232) at java.util.stream.Collectors.lambda$toMap$58(Collectors.java:1321) at java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169) at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382) at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481) at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471) at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708) at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499) at org.apache.flink.table.utils.TypeMappingUtils.computeInCompositeType(TypeMappingUtils.java:214) at org.apache.flink.table.utils.TypeMappingUtils.computePhysicalIndices(TypeMappingUtils.java:192) at org.apache.flink.table.utils.TypeMappingUtils.computePhysicalIndicesOrTimeAttributeMarkers(TypeMappingUtils.java:112) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.computeIndexMapping(StreamExecTableSourceScan.scala:212) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:107) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:62) at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlan(StreamExecTableSourceScan.scala:62) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange.translateToPlanInternal(StreamExecExchange.scala:84) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange.translateToPlanInternal(StreamExecExchange.scala:44) at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange.translateToPlan(StreamExecExchange.scala:44) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLimit.translateToPlanInternal(StreamExecLimit.scala:161) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLimit.translateToPlanInternal(StreamExecLimit.scala:51) at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLimit.translateToPlan(StreamExecLimit.scala:51) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToTransformation(StreamExecSink.scala:184) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:153) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48) at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48) at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60) at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:59) at
[jira] [Created] (FLINK-16800) TypeMappingUtils#checkPhysicalLogicalTypeCompatible didn't deal with nested types
Zhenghua Gao created FLINK-16800: Summary: TypeMappingUtils#checkPhysicalLogicalTypeCompatible didn't deal with nested types Key: FLINK-16800 URL: https://issues.apache.org/jira/browse/FLINK-16800 Project: Flink Issue Type: Bug Components: Table SQL / API Affects Versions: 1.10.0 Reporter: Zhenghua Gao Fix For: 1.11.0 the planner will use TypeMappingUtils#checkPhysicalLogicalTypeCompatible to validate logical schema and physical schema are compatible when translate CatalogSinkModifyOperation to Calcite relational expression. The validation didn't deal with nested types well, which could expose the following ValidationException: {code:java} Exception in thread "main" org.apache.flink.table.api.ValidationException: Type ARRAY> of table field 'old' does not match with the physical type ARRAY> of the 'old' field of the TableSource return type. at org.apache.flink.table.utils.TypeMappingUtils.lambda$checkPhysicalLogicalTypeCompatible$4(TypeMappingUtils.java:164) at org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:277) at org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:254) at org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor.visit(LogicalTypeDefaultVisitor.java:157) at org.apache.flink.table.types.logical.ArrayType.accept(ArrayType.java:110) at org.apache.flink.table.utils.TypeMappingUtils.checkIfCompatible(TypeMappingUtils.java:254) at org.apache.flink.table.utils.TypeMappingUtils.checkPhysicalLogicalTypeCompatible(TypeMappingUtils.java:160) at org.apache.flink.table.utils.TypeMappingUtils.lambda$computeInCompositeType$8(TypeMappingUtils.java:232) at java.util.stream.Collectors.lambda$toMap$58(Collectors.java:1321) at java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169) at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382) at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481) at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471) at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708) at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499) at org.apache.flink.table.utils.TypeMappingUtils.computeInCompositeType(TypeMappingUtils.java:214) at org.apache.flink.table.utils.TypeMappingUtils.computePhysicalIndices(TypeMappingUtils.java:192) at org.apache.flink.table.utils.TypeMappingUtils.computePhysicalIndicesOrTimeAttributeMarkers(TypeMappingUtils.java:112) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.computeIndexMapping(StreamExecTableSourceScan.scala:212) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:107) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:62) at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlan(StreamExecTableSourceScan.scala:62) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange.translateToPlanInternal(StreamExecExchange.scala:84) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange.translateToPlanInternal(StreamExecExchange.scala:44) at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange.translateToPlan(StreamExecExchange.scala:44) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLimit.translateToPlanInternal(StreamExecLimit.scala:161) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLimit.translateToPlanInternal(StreamExecLimit.scala:51) at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLimit.translateToPlan(StreamExecLimit.scala:51) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToTransformation(StreamExecSink.scala:184) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:153) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48) at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48) at
[jira] [Commented] (FLINK-16379) Introduce fromValues in TableEnvironment
[ https://issues.apache.org/jira/browse/FLINK-16379?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17059150#comment-17059150 ] Zhenghua Gao commented on FLINK-16379: -- +1 to merge fromElements to fromValues. Construction empty table with Values may be useless because plan with empty LogicalValues could be optimized. > Introduce fromValues in TableEnvironment > > > Key: FLINK-16379 > URL: https://issues.apache.org/jira/browse/FLINK-16379 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / API >Reporter: Dawid Wysakowicz >Assignee: Dawid Wysakowicz >Priority: Major > Labels: pull-request-available > Fix For: 1.11.0 > > Time Spent: 10m > Remaining Estimate: 0h > > Introduce a fromValues method to TableEnvironment similar to {{VALUES}} > clause in SQL > The suggested API could look like: > {code} > /** >* Creates a Table from a given row constructing expressions. >* >* Examples: >* >* You can use {@link Expressions#row(Object, Object...)} to create > a composite rows: >* {@code >* tEnv.fromValues( >* row(1, "ABC"), >* row(2L, "ABCDE") >* ) >* } >* will produce a Table with a schema as follows: >* {@code >* root >* |-- f0: BIGINT NOT NULL >* |-- f1: VARCHAR(5) NOT NULL >* } >* >* ROWs that are a result of e.g. a function call are not flattened >* {@code >* public class RowFunction extends ScalarFunction { >* @DataTypeHint("ROW") >* Row eval(); >* } >* >* tEnv.fromValues( >* call(new RowFunction()), >* call(new RowFunction()) >* ) >* } >* will produce a Table with a schema as follows: >* {@code >* root >* |-- f0: ROW<`f0` BIGINT, `f1` VARCHAR(5)> >* } >* >* The row constructor can be dropped to create a table with a > single row: >* >* ROWs that are a result of e.g. a function call are not flattened >* {@code >* tEnv.fromValues( >* 1, >* 2L, >* 3 >* ) >* } >* will produce a Table with a schema as follows: >* {@code >* root >* |-- f0: BIGINT NOT NULL >* } >* >* @param expressions Expressions for constructing rows of the VALUES > table. >*/ > Table fromValues(Expression... expressions); > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-16379) Introduce fromValues in TableEnvironment
[ https://issues.apache.org/jira/browse/FLINK-16379?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17057536#comment-17057536 ] Zhenghua Gao commented on FLINK-16379: -- That's right that we can improve `fromValues(Object ...)`. But how would we implement the following requirements: 1) construct an empty table. 2) construct a table with POJO > Introduce fromValues in TableEnvironment > > > Key: FLINK-16379 > URL: https://issues.apache.org/jira/browse/FLINK-16379 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / API >Reporter: Dawid Wysakowicz >Assignee: Dawid Wysakowicz >Priority: Major > Labels: pull-request-available > Fix For: 1.11.0 > > Time Spent: 10m > Remaining Estimate: 0h > > Introduce a fromValues method to TableEnvironment similar to {{VALUES}} > clause in SQL > The suggested API could look like: > {code} > /** >* Creates a Table from a given row constructing expressions. >* >* Examples: >* >* You can use {@link Expressions#row(Object, Object...)} to create > a composite rows: >* {@code >* tEnv.fromValues( >* row(1, "ABC"), >* row(2L, "ABCDE") >* ) >* } >* will produce a Table with a schema as follows: >* {@code >* root >* |-- f0: BIGINT NOT NULL >* |-- f1: VARCHAR(5) NOT NULL >* } >* >* ROWs that are a result of e.g. a function call are not flattened >* {@code >* public class RowFunction extends ScalarFunction { >* @DataTypeHint("ROW") >* Row eval(); >* } >* >* tEnv.fromValues( >* call(new RowFunction()), >* call(new RowFunction()) >* ) >* } >* will produce a Table with a schema as follows: >* {@code >* root >* |-- f0: ROW<`f0` BIGINT, `f1` VARCHAR(5)> >* } >* >* The row constructor can be dropped to create a table with a > single row: >* >* ROWs that are a result of e.g. a function call are not flattened >* {@code >* tEnv.fromValues( >* 1, >* 2L, >* 3 >* ) >* } >* will produce a Table with a schema as follows: >* {@code >* root >* |-- f0: BIGINT NOT NULL >* } >* >* @param expressions Expressions for constructing rows of the VALUES > table. >*/ > Table fromValues(Expression... expressions); > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-16379) Introduce fromValues in TableEnvironment
[ https://issues.apache.org/jira/browse/FLINK-16379?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17050732#comment-17050732 ] Zhenghua Gao commented on FLINK-16379: -- >From the user perspective, constructing a Table from raw objects is a more >convenient and direct way. Expression brings with it additional complexity >and a steeper learning curve (especially among the java users). Another >concern is the optimization of LogicalValues and LogicalTableScan is very >different. There would be big changes for the plan test if we use fromValues. > Introduce fromValues in TableEnvironment > > > Key: FLINK-16379 > URL: https://issues.apache.org/jira/browse/FLINK-16379 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / API >Reporter: Dawid Wysakowicz >Assignee: Dawid Wysakowicz >Priority: Major > Labels: pull-request-available > Fix For: 1.11.0 > > Time Spent: 10m > Remaining Estimate: 0h > > Introduce a fromValues method to TableEnvironment similar to {{VALUES}} > clause in SQL > The suggested API could look like: > {code} > /** >* Creates a Table from a given row constructing expressions. >* >* Examples: >* >* You can use {@link Expressions#row(Object, Object...)} to create > a composite rows: >* {@code >* tEnv.fromValues( >* row(1, "ABC"), >* row(2L, "ABCDE") >* ) >* } >* will produce a Table with a schema as follows: >* {@code >* root >* |-- f0: BIGINT NOT NULL >* |-- f1: VARCHAR(5) NOT NULL >* } >* >* ROWs that are a result of e.g. a function call are not flattened >* {@code >* public class RowFunction extends ScalarFunction { >* @DataTypeHint("ROW") >* Row eval(); >* } >* >* tEnv.fromValues( >* call(new RowFunction()), >* call(new RowFunction()) >* ) >* } >* will produce a Table with a schema as follows: >* {@code >* root >* |-- f0: ROW<`f0` BIGINT, `f1` VARCHAR(5)> >* } >* >* The row constructor can be dropped to create a table with a > single row: >* >* ROWs that are a result of e.g. a function call are not flattened >* {@code >* tEnv.fromValues( >* 1, >* 2L, >* 3 >* ) >* } >* will produce a Table with a schema as follows: >* {@code >* root >* |-- f0: BIGINT NOT NULL >* } >* >* @param expressions Expressions for constructing rows of the VALUES > table. >*/ > Table fromValues(Expression... expressions); > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-16029) Remove register source and sink in test cases of blink planner
[ https://issues.apache.org/jira/browse/FLINK-16029?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhenghua Gao updated FLINK-16029: - Summary: Remove register source and sink in test cases of blink planner (was: Remove register source and sink in test cases of planner) > Remove register source and sink in test cases of blink planner > -- > > Key: FLINK-16029 > URL: https://issues.apache.org/jira/browse/FLINK-16029 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / API, Tests >Reporter: Zhenghua Gao >Priority: Major > > Many test cases of planner use TableEnvironement.registerTableSource() and > registerTableSink() which should be avoid。We want to refactor these cases via > TableEnvironment.connect(). -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-16327) Add TableEnvironment.fromElements interfaces for usability
[ https://issues.apache.org/jira/browse/FLINK-16327?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17048740#comment-17048740 ] Zhenghua Gao commented on FLINK-16327: -- CC [~twalthr] [~dwysakowicz] What do you think about this? > Add TableEnvironment.fromElements interfaces for usability > -- > > Key: FLINK-16327 > URL: https://issues.apache.org/jira/browse/FLINK-16327 > Project: Flink > Issue Type: New Feature > Components: Table SQL / API >Affects Versions: 1.11.0 >Reporter: Zhenghua Gao >Priority: Major > > h1. Interface > {code:java} > /** >* Creates a table from a group of objects (known as its elements). The > schema of the table >* would be inferred from the type of elements. >* >* @param data a group of objects. >*/ > Table fromElements(Collection data); > /** >* Creates a table from a group of objects (known as its elements). The > schema of the table >* would be inferred from the passed in data type. >* >* @param data a group of objects >* @param dataType the data type of the data >*/ > Table fromElements(Collection data, DataType dataType); > {code} > h1. Use Case > * One potential use case for Table API > {code:java} > @Test > def testUnregisteredCollectionSource1(): Unit = { > val env = StreamExecutionEnvironment.getExecutionEnvironment > val tEnv = StreamTableEnvironment.create(env) > StreamITCase.testResults = mutable.MutableList() > val data = Seq( > Row.of("Mike", new JInt(5), new JDouble(12.3), "Smith")) > > tEnv.fromElements(data.asJava) > .as('first, 'id, 'score, 'last) > .where('id > 4) > .select('last, 'score * 2) > .toAppendStream[Row] > .addSink(new StreamITCase.StringSink[Row]) > env.execute() > } > @Test > def testUnregisteredCollectionSource2(): Unit = { > val env = StreamExecutionEnvironment.getExecutionEnvironment > val tEnv = StreamTableEnvironment.create(env) > StreamITCase.testResults = mutable.MutableList() > val data = Seq( > Row.of("Mike", new JInt(5), new JDouble(12.3), "Smith")) > val dataType = DataTypes.ROW( > DataTypes.FIELD("first", DataTypes.STRING()), > DataTypes.FIELD("id", DataTypes.INT()), > DataTypes.FIELD("score", DataTypes.DOUBLE()), > DataTypes.FIELD("last", DataTypes.STRING())) > tEnv.fromElements(data.asJava, dataType) > .where('id > 4) > .select('last, 'score * 2) > .toAppendStream[Row] > .addSink(new StreamITCase.StringSink[Row]) > env.execute() > } > {code} > * One potential use case for SQL > {code:java} > @Test > def testUnregisteredCollectionSource1(): Unit = { > val env = StreamExecutionEnvironment.getExecutionEnvironment > val tEnv = StreamTableEnvironment.create(env) > StreamITCase.testResults = mutable.MutableList() > val data = Seq( > Row.of("Mike", new JInt(5), new JDouble(12.3), "Smith")) > > val table = tEnv.fromElements(data.asJava).as('first, 'id, 'score, 'last) > > tEnv.createTemporaryView("T", table) > tEnv.sqlQuery("SELECT last, score * 2 FROM T WHERE id > 4") > .toAppendStream[Row] > .addSink(new StreamITCase.StringSink[Row]) > env.execute() > } > @Test > def testUnregisteredCollectionSource2(): Unit = { > val env = StreamExecutionEnvironment.getExecutionEnvironment > val tEnv = StreamTableEnvironment.create(env) > StreamITCase.testResults = mutable.MutableList() > val data = Seq( > Row.of("Mike", new JInt(5), new JDouble(12.3), "Smith")) > val dataType = DataTypes.ROW( > DataTypes.FIELD("first", DataTypes.STRING()), > DataTypes.FIELD("id", DataTypes.INT()), > DataTypes.FIELD("score", DataTypes.DOUBLE()), > DataTypes.FIELD("last", DataTypes.STRING())) > val table = tEnv.fromElements(data.asJava, dataType) > tEnv.createTemporaryView("T", table) > tEnv.sqlQuery("SELECT last, score * 2 FROM T WHERE id > 4") > .toAppendStream[Row] > .addSink(new StreamITCase.StringSink[Row]) > env.execute() > } > {code} > h1. The proposal > * data type inference > We need to infer the data type from the data for the first interface. A > potential tool is the DataTypeExtractor, but it doesn't support scala.tuple, > Row, etc. For the most popular in our test cases Row or scala.tuple type, we > could enumerate and use a recursive traversal method to get all available > types of underlying objects. This can solve most of the cases and improve > usability. > * proposed changes > ** A CollectionQueryOperation which implements QueryOperation to describe > the relational operation > ** The logical and physical RelNode for legacy planner. In the physical > node, we can translate the data to DataStream > ** The logical and physical RelNode for blink planner. In the physical node, > we can translate the
[jira] [Created] (FLINK-16327) Add TableEnvironment.fromElements interfaces for usability
Zhenghua Gao created FLINK-16327: Summary: Add TableEnvironment.fromElements interfaces for usability Key: FLINK-16327 URL: https://issues.apache.org/jira/browse/FLINK-16327 Project: Flink Issue Type: New Feature Components: Table SQL / API Affects Versions: 1.11.0 Reporter: Zhenghua Gao h1. Interface {code:java} /** * Creates a table from a group of objects (known as its elements). The schema of the table * would be inferred from the type of elements. * * @param data a group of objects. */ Table fromElements(Collection data); /** * Creates a table from a group of objects (known as its elements). The schema of the table * would be inferred from the passed in data type. * * @param data a group of objects * @param dataType the data type of the data */ Table fromElements(Collection data, DataType dataType); {code} h1. Use Case * One potential use case for Table API {code:java} @Test def testUnregisteredCollectionSource1(): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment val tEnv = StreamTableEnvironment.create(env) StreamITCase.testResults = mutable.MutableList() val data = Seq( Row.of("Mike", new JInt(5), new JDouble(12.3), "Smith")) tEnv.fromElements(data.asJava) .as('first, 'id, 'score, 'last) .where('id > 4) .select('last, 'score * 2) .toAppendStream[Row] .addSink(new StreamITCase.StringSink[Row]) env.execute() } @Test def testUnregisteredCollectionSource2(): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment val tEnv = StreamTableEnvironment.create(env) StreamITCase.testResults = mutable.MutableList() val data = Seq( Row.of("Mike", new JInt(5), new JDouble(12.3), "Smith")) val dataType = DataTypes.ROW( DataTypes.FIELD("first", DataTypes.STRING()), DataTypes.FIELD("id", DataTypes.INT()), DataTypes.FIELD("score", DataTypes.DOUBLE()), DataTypes.FIELD("last", DataTypes.STRING())) tEnv.fromElements(data.asJava, dataType) .where('id > 4) .select('last, 'score * 2) .toAppendStream[Row] .addSink(new StreamITCase.StringSink[Row]) env.execute() } {code} * One potential use case for SQL {code:java} @Test def testUnregisteredCollectionSource1(): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment val tEnv = StreamTableEnvironment.create(env) StreamITCase.testResults = mutable.MutableList() val data = Seq( Row.of("Mike", new JInt(5), new JDouble(12.3), "Smith")) val table = tEnv.fromElements(data.asJava).as('first, 'id, 'score, 'last) tEnv.createTemporaryView("T", table) tEnv.sqlQuery("SELECT last, score * 2 FROM T WHERE id > 4") .toAppendStream[Row] .addSink(new StreamITCase.StringSink[Row]) env.execute() } @Test def testUnregisteredCollectionSource2(): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment val tEnv = StreamTableEnvironment.create(env) StreamITCase.testResults = mutable.MutableList() val data = Seq( Row.of("Mike", new JInt(5), new JDouble(12.3), "Smith")) val dataType = DataTypes.ROW( DataTypes.FIELD("first", DataTypes.STRING()), DataTypes.FIELD("id", DataTypes.INT()), DataTypes.FIELD("score", DataTypes.DOUBLE()), DataTypes.FIELD("last", DataTypes.STRING())) val table = tEnv.fromElements(data.asJava, dataType) tEnv.createTemporaryView("T", table) tEnv.sqlQuery("SELECT last, score * 2 FROM T WHERE id > 4") .toAppendStream[Row] .addSink(new StreamITCase.StringSink[Row]) env.execute() } {code} h1. The proposal * data type inference We need to infer the data type from the data for the first interface. A potential tool is the DataTypeExtractor, but it doesn't support scala.tuple, Row, etc. For the most popular in our test cases Row or scala.tuple type, we could enumerate and use a recursive traversal method to get all available types of underlying objects. This can solve most of the cases and improve usability. * proposed changes ** A CollectionQueryOperation which implements QueryOperation to describe the relational operation ** The logical and physical RelNode for legacy planner. In the physical node, we can translate the data to DataStream ** The logical and physical RelNode for blink planner. In the physical node, we can translate the data to Transformation -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-13302) DateTimeUtils.unixDateCeil returns the same value as unixDateFloor does
[ https://issues.apache.org/jira/browse/FLINK-13302?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17041568#comment-17041568 ] Zhenghua Gao commented on FLINK-13302: -- [~tison] The related issue is fixed in avatica-1.16.0, we could remove the code in flink side after we bump avatica-core to avatica-1.16.0. [~danny0405] What the plan for avatica-1.16.0? > DateTimeUtils.unixDateCeil returns the same value as unixDateFloor does > --- > > Key: FLINK-13302 > URL: https://issues.apache.org/jira/browse/FLINK-13302 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / Legacy Planner >Affects Versions: 1.9.0, 1.10.0 >Reporter: Zhenghua Gao >Assignee: Zhenghua Gao >Priority: Major > Labels: pull-request-available > Fix For: 1.9.0 > > Time Spent: 20m > Remaining Estimate: 0h > > Internally, unixDateCeil & unixDateFloor call julianDateFloor and pass a > boolean parameter to differentiate them. But unixDateCeil passes wrong > parameter value and returns the same value as unixDateFloor does. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-16160) Schema#proctime and Schema#rowtime don't work in TableEnvironment#connect code path
Zhenghua Gao created FLINK-16160: Summary: Schema#proctime and Schema#rowtime don't work in TableEnvironment#connect code path Key: FLINK-16160 URL: https://issues.apache.org/jira/browse/FLINK-16160 Project: Flink Issue Type: Sub-task Reporter: Zhenghua Gao In ConnectTableDescriptor#createTemporaryTable, the proctime/rowtime properties are ignored so the generated catalog table is not correct. We should fix this to let TableEnvironment#connect() support watermark. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-16143) Turn on more date time functions of blink planner
[ https://issues.apache.org/jira/browse/FLINK-16143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17039787#comment-17039787 ] Zhenghua Gao commented on FLINK-16143: -- [~jark] Actually the TIMESTAMPDIFF/TIMESTAMPADD with DATE parameter supports DATE type: timestampadd(DAY, 1, date '2016-06-15') gets date '2016-06-16' timestampdiff(DAY, date '2016-06-15', date '2016-06-14') gets -1 > Turn on more date time functions of blink planner > - > > Key: FLINK-16143 > URL: https://issues.apache.org/jira/browse/FLINK-16143 > Project: Flink > Issue Type: New Feature > Components: Table SQL / Planner >Reporter: Zili Chen >Priority: Major > Fix For: 1.11.0 > > > Currently blink planner has a series of built-in functions such as > DATEDIFF > DATE_ADD > DATE_SUB > which haven't been into used so far. We could add the necessary register, > generate and convert code to make it available in production scope. > > what do you think [~jark]? > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-16117) Avoid register source in TableTestBase#addTableSource
[ https://issues.apache.org/jira/browse/FLINK-16117?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17038729#comment-17038729 ] Zhenghua Gao commented on FLINK-16117: -- Most of these tests are plan tests and there is no input data. My initial thought is use tableEnv.connect to replace tableEnv.registerTableSource in TableTestBase#addTableSource. > Avoid register source in TableTestBase#addTableSource > - > > Key: FLINK-16117 > URL: https://issues.apache.org/jira/browse/FLINK-16117 > Project: Flink > Issue Type: Sub-task >Reporter: Zhenghua Gao >Assignee: Zhenghua Gao >Priority: Major > > This affects thousands of unit tests: > 1) explainSourceAsString of CatalogSourceTable changes > 2)JoinTest#testUDFInJoinCondition: SQL keywords must be escaped > 3) GroupWindowTest#testTimestampEventTimeTumblingGroupWindowWithProperties: > Reference to a rowtime or proctime window required > 4) SetOperatorsTest#testInWithProject: legacy type vs new type > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-16117) Avoid register source in TableTestBase#addTableSource
Zhenghua Gao created FLINK-16117: Summary: Avoid register source in TableTestBase#addTableSource Key: FLINK-16117 URL: https://issues.apache.org/jira/browse/FLINK-16117 Project: Flink Issue Type: Sub-task Reporter: Zhenghua Gao This affects thousands of unit tests: 1) explainSourceAsString of CatalogSourceTable changes 2)JoinTest#testUDFInJoinCondition: SQL keywords must be escaped 3) GroupWindowTest#testTimestampEventTimeTumblingGroupWindowWithProperties: Reference to a rowtime or proctime window required 4) SetOperatorsTest#testInWithProject: legacy type vs new type -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-16029) Remove register source and sink in test cases of planner
Zhenghua Gao created FLINK-16029: Summary: Remove register source and sink in test cases of planner Key: FLINK-16029 URL: https://issues.apache.org/jira/browse/FLINK-16029 Project: Flink Issue Type: Sub-task Reporter: Zhenghua Gao Many test cases of planner use TableEnvironement.registerTableSource() and registerTableSink() which should be avoid。We want to refactor these cases via TableEnvironment.connect(). -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-15990) Remove register source and sink in ConnectTableDescriptor
[ https://issues.apache.org/jira/browse/FLINK-15990?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17034351#comment-17034351 ] Zhenghua Gao commented on FLINK-15990: -- the comment of *TableEnvironment.connect()* related code [1][2][3] should be updated too. [1] [https://github.com/apache/flink/blob/master/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java#L489] [2] [https://github.com/apache/flink/blob/master/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/BatchTableEnvironment.java#L342] [3] [https://github.com/apache/flink/blob/master/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/BatchTableEnvironment.scala#L304] > Remove register source and sink in ConnectTableDescriptor > - > > Key: FLINK-15990 > URL: https://issues.apache.org/jira/browse/FLINK-15990 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / API >Reporter: Jingsong Lee >Assignee: Jingsong Lee >Priority: Major > Labels: pull-request-available > Fix For: 1.11.0 > > Time Spent: 10m > Remaining Estimate: 0h > > We should always use {{ConnectTableDescriptor.createTemporaryTable}}. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-15968) LegacyTypeInfoDataTypeConverter should support conversion from BINARY/VARBINARY to BYTE_PRIMITIVE_ARRAY_TYPE_INFO
[ https://issues.apache.org/jira/browse/FLINK-15968?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhenghua Gao updated FLINK-15968: - Description: Currently LegacyTypeInfoDataTypeConverter only support conversion between DataTypes.BYTES and BYTE_PRIMITIVE_ARRAY_TYPE_INFO. When we update connectors to new type system, we need to convert BINARY(p) or VARBINARY(p) to BYTE_PRIMITIVE_ARRAY_TYPE_INFO. A similar logic has been implemented for CHAR/VARCHAR type. The Hive connector achieve this via depending blink planner‘s conversion logic, which is odd because a planner dependency won't be necessary for connectors. was: Currently LegacyTypeInfoDataTypeConverter only support conversion between DataTypes.BYTES and BYTE_PRIMITIVE_ARRAY_TYPE_INFO. When we update connectors to new type system, we need to convert BINARY(p) or VARBINARY(p) to BYTE_PRIMITIVE_ARRAY_TYPE_INFO. The Hive connector achieve this via depending blink planner‘s conversion logic, which is odd because a planner dependency won't be necessary for connectors. > LegacyTypeInfoDataTypeConverter should support conversion from > BINARY/VARBINARY to BYTE_PRIMITIVE_ARRAY_TYPE_INFO > - > > Key: FLINK-15968 > URL: https://issues.apache.org/jira/browse/FLINK-15968 > Project: Flink > Issue Type: Bug > Components: Table SQL / Legacy Planner, Table SQL / Planner >Affects Versions: 1.11.0 >Reporter: Zhenghua Gao >Priority: Major > > Currently LegacyTypeInfoDataTypeConverter only support conversion between > DataTypes.BYTES and BYTE_PRIMITIVE_ARRAY_TYPE_INFO. When we update connectors > to new type system, we need to convert BINARY(p) or VARBINARY(p) to > BYTE_PRIMITIVE_ARRAY_TYPE_INFO. A similar logic has been implemented for > CHAR/VARCHAR type. > The Hive connector achieve this via depending blink planner‘s conversion > logic, which is odd because a planner dependency won't be necessary for > connectors. > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-15968) LegacyTypeInfoDataTypeConverter should support conversion from BINARY/VARBINARY to BYTE_PRIMITIVE_ARRAY_TYPE_INFO
[ https://issues.apache.org/jira/browse/FLINK-15968?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhenghua Gao updated FLINK-15968: - Summary: LegacyTypeInfoDataTypeConverter should support conversion from BINARY/VARBINARY to BYTE_PRIMITIVE_ARRAY_TYPE_INFO (was: LegacyTypeInfoDataTypeConverter should support conversion between BINARY/VARBINARY and BYTE_PRIMITIVE_ARRAY_TYPE_INFO) > LegacyTypeInfoDataTypeConverter should support conversion from > BINARY/VARBINARY to BYTE_PRIMITIVE_ARRAY_TYPE_INFO > - > > Key: FLINK-15968 > URL: https://issues.apache.org/jira/browse/FLINK-15968 > Project: Flink > Issue Type: Bug > Components: Table SQL / Legacy Planner, Table SQL / Planner >Affects Versions: 1.11.0 >Reporter: Zhenghua Gao >Priority: Major > > Currently LegacyTypeInfoDataTypeConverter only support conversion between > DataTypes.BYTES and BYTE_PRIMITIVE_ARRAY_TYPE_INFO. When we update connectors > to new type system, we need to convert BINARY(p) or VARBINARY(p) to > BYTE_PRIMITIVE_ARRAY_TYPE_INFO. > The Hive connector achieve this via depending blink planner‘s conversion > logic, which is odd because a planner dependency won't be necessary for > connectors. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-15968) LegacyTypeInfoDataTypeConverter should support conversion between BINARY/VARBINARY and BYTE_PRIMITIVE_ARRAY_TYPE_INFO
[ https://issues.apache.org/jira/browse/FLINK-15968?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17033551#comment-17033551 ] Zhenghua Gao commented on FLINK-15968: -- [~jark] [~lzljs3620320] What do you think about this? > LegacyTypeInfoDataTypeConverter should support conversion between > BINARY/VARBINARY and BYTE_PRIMITIVE_ARRAY_TYPE_INFO > - > > Key: FLINK-15968 > URL: https://issues.apache.org/jira/browse/FLINK-15968 > Project: Flink > Issue Type: Bug > Components: Table SQL / Legacy Planner, Table SQL / Planner >Affects Versions: 1.11.0 >Reporter: Zhenghua Gao >Priority: Major > > Currently LegacyTypeInfoDataTypeConverter only support conversion between > DataTypes.BYTES and BYTE_PRIMITIVE_ARRAY_TYPE_INFO. When we update connectors > to new type system, we need to convert BINARY(p) or VARBINARY(p) to > BYTE_PRIMITIVE_ARRAY_TYPE_INFO. > The Hive connector achieve this via depending blink planner‘s conversion > logic, which is odd because a planner dependency won't be necessary for > connectors. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-15968) LegacyTypeInfoDataTypeConverter should support conversion between BINARY/VARBINARY and BYTE_PRIMITIVE_ARRAY_TYPE_INFO
[ https://issues.apache.org/jira/browse/FLINK-15968?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhenghua Gao updated FLINK-15968: - Description: Currently LegacyTypeInfoDataTypeConverter only support conversion between DataTypes.BYTES and BYTE_PRIMITIVE_ARRAY_TYPE_INFO. When we update connectors to new type system, we need to convert BINARY(p) or VARBINARY(p) to BYTE_PRIMITIVE_ARRAY_TYPE_INFO. The Hive connector achieve this via depending blink planner‘s conversion logic, which is odd because a planner dependency won't be necessary for connectors. was: Currently LegacyTypeInfoDataTypeConverter only support conversion between DataTypes.BYTES and BYTE_PRIMITIVE_ARRAY_TYPE_INFO. When we update connectors to new type system, we need to convert BINARY(n) or VARBINARY(n) to BYTE_PRIMITIVE_ARRAY_TYPE_INFO. The Hive connector achieve this via depending blink planner‘s conversion logic, which is odd because a planner dependency won't be necessary for connectors. > LegacyTypeInfoDataTypeConverter should support conversion between > BINARY/VARBINARY and BYTE_PRIMITIVE_ARRAY_TYPE_INFO > - > > Key: FLINK-15968 > URL: https://issues.apache.org/jira/browse/FLINK-15968 > Project: Flink > Issue Type: Bug > Components: Table SQL / Legacy Planner, Table SQL / Planner >Affects Versions: 1.11.0 >Reporter: Zhenghua Gao >Priority: Major > > Currently LegacyTypeInfoDataTypeConverter only support conversion between > DataTypes.BYTES and BYTE_PRIMITIVE_ARRAY_TYPE_INFO. When we update connectors > to new type system, we need to convert BINARY(p) or VARBINARY(p) to > BYTE_PRIMITIVE_ARRAY_TYPE_INFO. > The Hive connector achieve this via depending blink planner‘s conversion > logic, which is odd because a planner dependency won't be necessary for > connectors. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-15968) LegacyTypeInfoDataTypeConverter should support conversion between BINARY/VARBINARY and BYTE_PRIMITIVE_ARRAY_TYPE_INFO
Zhenghua Gao created FLINK-15968: Summary: LegacyTypeInfoDataTypeConverter should support conversion between BINARY/VARBINARY and BYTE_PRIMITIVE_ARRAY_TYPE_INFO Key: FLINK-15968 URL: https://issues.apache.org/jira/browse/FLINK-15968 Project: Flink Issue Type: Bug Components: Table SQL / Legacy Planner, Table SQL / Planner Affects Versions: 1.11.0 Reporter: Zhenghua Gao Currently LegacyTypeInfoDataTypeConverter only support conversion between DataTypes.BYTES and BYTE_PRIMITIVE_ARRAY_TYPE_INFO. When we update connectors to new type system, we need to convert BINARY(n) or VARBINARY(n) to BYTE_PRIMITIVE_ARRAY_TYPE_INFO. The Hive connector achieve this via depending blink planner‘s conversion logic, which is odd because a planner dependency won't be necessary for connectors. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-15881) Stop overriding TableSource::getReturnType in jdbc connector
[ https://issues.apache.org/jira/browse/FLINK-15881?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17030381#comment-17030381 ] Zhenghua Gao commented on FLINK-15881: -- This duplicates to FLINK-15445. The PR for FLINK-15445 would remove *getRetrunType* in JDBCTableSource > Stop overriding TableSource::getReturnType in jdbc connector > > > Key: FLINK-15881 > URL: https://issues.apache.org/jira/browse/FLINK-15881 > Project: Flink > Issue Type: Sub-task > Components: Connectors / JDBC, Table SQL / API >Reporter: Kurt Young >Assignee: Zhenghua Gao >Priority: Major > Fix For: 1.11.0 > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-15574) DataType to LogicalType conversion issue
[ https://issues.apache.org/jira/browse/FLINK-15574?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17017874#comment-17017874 ] Zhenghua Gao commented on FLINK-15574: -- The root cause is mix use of conversion logic of blink planner and flink-table-common. In your code snippet, the creation of TableSchema use flink-table-common logic, and the translate logic of toDataStream, blink planner use its own converstion logic: {code:java} private def computeIndexMapping() : Array[Int] = { TypeMappingUtils.computePhysicalIndicesOrTimeAttributeMarkers( tableSource, FlinkTypeFactory.toTableSchema(getRowType).getTableColumns, <- planner conversion logic true, nameMapping ) } {code} Please try to use DataType directly to avoid this mix use: {code:java} @Test def testTableToDataStreamGenericTypeInfo(): Unit = { // create a simple Table with just one field of type array val tEnv = scalaStreamTestUtil().tableEnv //val tableTypeInfo = Types.ROW( // Array("my_arr"), // Array[TypeInformation[_]](BasicArrayTypeInfo.STRING_ARRAY_TYPE_INFO) //) //val tableSchema = TableSchema.fromTypeInfo(tableTypeInfo) val tableSchema = TableSchema.builder() .field("my_arr", DataTypes.ARRAY(DataTypes.STRING())) .build() tEnv.registerTableSource("MyTable", new TestTableSource(true, tableSchema)) val sqlQuery = "SELECT * FROM MyTable" val table = tEnv.sqlQuery(sqlQuery) tEnv.toAppendStream[Row](table) } {code} CC [~dwysakowicz] [~lzljs3620320] Should we do something to handle this mix use of conversion logic? > DataType to LogicalType conversion issue > > > Key: FLINK-15574 > URL: https://issues.apache.org/jira/browse/FLINK-15574 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Runtime >Reporter: Benoit Hanotte >Priority: Major > Labels: pull-request-available > Attachments: 0001-FLINK-15574-Add-unit-test-to-reproduce-issue.patch > > Time Spent: 10m > Remaining Estimate: 0h > > We seem to be encountering an issue with the conversion from DataType to > LogicalType with the Blink planner (full stacktrace below): > {code} > org.apache.flink.table.api.ValidationException: Type > LEGACY(BasicArrayTypeInfo) of table field 'my_array' does not match > with type BasicArrayTypeInfo of the field 'my_array' of the > TableSource return type. > {code} > It seems there exists 2 paths to do the conversion from DataType to > LogicalType: > 1. TypeConversions.fromLegacyInfoToDataType(): > used for instance when calling TableSchema.fromTypeInformation(). > 2. LogicalTypeDataTypeConverter.fromDataTypeToLogicalType(): > Deprecated but still used in TableSourceUtil and many other places. > These 2 code paths can return a different LogicalType for the same input, > leading to issues when the LogicalTypes are compared to ensure they are > compatible. For instance, PlannerTypeUtils.isAssignable() returns false for > a DataType created from BasicArrayTypeInfo (leading to the > ValidationException above). > The full stacktrace is the following: > {code} > org.apache.flink.table.api.ValidationException: Type > LEGACY(BasicArrayTypeInfo) of table field 'my_array' does not match > with type BasicArrayTypeInfo of the field 'my_array' of the > TableSource return type. > at > org.apache.flink.table.planner.sources.TableSourceUtil$$anonfun$4.apply(TableSourceUtil.scala:121) > at > org.apache.flink.table.planner.sources.TableSourceUtil$$anonfun$4.apply(TableSourceUtil.scala:92) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) > at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) > at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186) > at > org.apache.flink.table.planner.sources.TableSourceUtil$.computeIndexMapping(TableSourceUtil.scala:92) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:100) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:55) > at > org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54) > at >
[jira] [Commented] (FLINK-15469) Update UpsertStreamTableSink and RetractStreamTableSink and related interface to new type system
[ https://issues.apache.org/jira/browse/FLINK-15469?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17017755#comment-17017755 ] Zhenghua Gao commented on FLINK-15469: -- After an initial POC, the getTypeClass is not needed because Upsert/Retract stream table sink always need java *Tuple2*. And, we don't need any changes for the *getOutputType*/*getConsumedDataType* because the codegen could use *getRecordDataType* directly. > Update UpsertStreamTableSink and RetractStreamTableSink and related interface > to new type system > > > Key: FLINK-15469 > URL: https://issues.apache.org/jira/browse/FLINK-15469 > Project: Flink > Issue Type: Improvement > Components: Table SQL / API >Affects Versions: 1.10.0 >Reporter: Zhenghua Gao >Priority: Major > Labels: pull-request-available > Fix For: 1.11.0 > > Time Spent: 10m > Remaining Estimate: 0h > > Currently *UpsertStreamTableSink* can only returns TypeInformation of the > requested record, which can't support types with precision and scale, e.g. > TIMESTAMP(p), DECIMAL(p,s). > A proposal is deprecating the *getRecordType* API and adding a > *getRecordDataType* API instead to return the data type of the requested > record. > {code:java} > /** > * Returns the requested record type. > * > * @Deprecated This method will be removed in future versions. It's > recommended to use {@link #getRecordDataType()} instead. > */ > @Deprecated > TypeInformation getRecordType(); > /* > * Returns the requested record data type. > */ > DataType getRecordDataType(); > {code} > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (FLINK-15469) Update UpsertStreamTableSink and RetractStreamTableSink and related interface to new type system
[ https://issues.apache.org/jira/browse/FLINK-15469?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17014828#comment-17014828 ] Zhenghua Gao edited comment on FLINK-15469 at 1/17/20 7:12 AM: --- Hi [~lzljs3620320], after re-think the whole thing, we should bring the physical data types of the sink and the type class(java tuple2 or scale tuple2) to the planner so that our blink planner could handle the precision things. So there is a proposal as following: # deprecate getRecordType and introduce getRecordDataType instead # -remove getOutputType and introduce getConsumedDataType, which returns ROW- # -introduce getTypeClass interface, which returns type class for codegen- What do you think? I will file a PR soon if this works. was (Author: docete): Hi [~lzljs3620320], after re-think the whole thing, we should bring the physical data types of the sink and the type class(java tuple2 or scale tuple2) to the planner so that our blink planner could handle the precision things. So there is a proposal as following: # deprecate getRecordType and introduce getRecordDataType instead # remove getOutputType and introduce getConsumedDataType, which returns ROW # introduce getTypeClass interface, which returns type class for codegen What do you think? I will file a PR soon if this works. > Update UpsertStreamTableSink and RetractStreamTableSink and related interface > to new type system > > > Key: FLINK-15469 > URL: https://issues.apache.org/jira/browse/FLINK-15469 > Project: Flink > Issue Type: Improvement > Components: Table SQL / API >Affects Versions: 1.10.0 >Reporter: Zhenghua Gao >Priority: Major > Labels: pull-request-available > Fix For: 1.11.0 > > Time Spent: 10m > Remaining Estimate: 0h > > Currently *UpsertStreamTableSink* can only returns TypeInformation of the > requested record, which can't support types with precision and scale, e.g. > TIMESTAMP(p), DECIMAL(p,s). > A proposal is deprecating the *getRecordType* API and adding a > *getRecordDataType* API instead to return the data type of the requested > record. > {code:java} > /** > * Returns the requested record type. > * > * @Deprecated This method will be removed in future versions. It's > recommended to use {@link #getRecordDataType()} instead. > */ > @Deprecated > TypeInformation getRecordType(); > /* > * Returns the requested record data type. > */ > DataType getRecordDataType(); > {code} > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-15469) Update UpsertStreamTableSink and RetractStreamTableSink and related interface to new type system
[ https://issues.apache.org/jira/browse/FLINK-15469?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhenghua Gao updated FLINK-15469: - Summary: Update UpsertStreamTableSink and RetractStreamTableSink and related interface to new type system (was: UpsertStreamTableSink should support new type system) > Update UpsertStreamTableSink and RetractStreamTableSink and related interface > to new type system > > > Key: FLINK-15469 > URL: https://issues.apache.org/jira/browse/FLINK-15469 > Project: Flink > Issue Type: Improvement > Components: Table SQL / API >Affects Versions: 1.10.0 >Reporter: Zhenghua Gao >Priority: Major > Labels: pull-request-available > Fix For: 1.11.0 > > Time Spent: 10m > Remaining Estimate: 0h > > Currently *UpsertStreamTableSink* can only returns TypeInformation of the > requested record, which can't support types with precision and scale, e.g. > TIMESTAMP(p), DECIMAL(p,s). > A proposal is deprecating the *getRecordType* API and adding a > *getRecordDataType* API instead to return the data type of the requested > record. > {code:java} > /** > * Returns the requested record type. > * > * @Deprecated This method will be removed in future versions. It's > recommended to use {@link #getRecordDataType()} instead. > */ > @Deprecated > TypeInformation getRecordType(); > /* > * Returns the requested record data type. > */ > DataType getRecordDataType(); > {code} > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-15602) Blink planner does not respect the precision when casting timestamp to varchar
[ https://issues.apache.org/jira/browse/FLINK-15602?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17017666#comment-17017666 ] Zhenghua Gao commented on FLINK-15602: -- [~twalthr] AFAIK we have padded DECIMAL type and intervals in Blink planner. > Blink planner does not respect the precision when casting timestamp to varchar > -- > > Key: FLINK-15602 > URL: https://issues.apache.org/jira/browse/FLINK-15602 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.10.0 >Reporter: Dawid Wysakowicz >Assignee: Zhenghua Gao >Priority: Blocker > Fix For: 1.10.0 > > > According to SQL 2011 Part 2 Section 6.13 General Rules 11) d) > {quote} > If SD is a datetime data type or an interval data type then let Y be the > shortest character string that > conforms to the definition of in Subclause 5.3, “”, and > such that the interpreted value > of Y is SV and the interpreted precision of Y is the precision of SD. > {quote} > That means: > {code} > select cast(cast(TO_TIMESTAMP('2014-07-02 06:14:00', '-MM-DD HH24:mm:SS') > as TIMESTAMP(0)) as VARCHAR(256)) from ...; > // should produce > // 2014-07-02 06:14:00 > select cast(cast(TO_TIMESTAMP('2014-07-02 06:14:00', '-MM-DD HH24:mm:SS') > as TIMESTAMP(3)) as VARCHAR(256)) from ...; > // should produce > // 2014-07-02 06:14:00.000 > select cast(cast(TO_TIMESTAMP('2014-07-02 06:14:00', '-MM-DD HH24:mm:SS') > as TIMESTAMP(9)) as VARCHAR(256)) from ...; > // should produce > // 2014-07-02 06:14:00.0 > {code} > One possible solution would be to propagate the precision in > {{org.apache.flink.table.planner.codegen.calls.ScalarOperatorGens#localTimeToStringCode}}. > If I am not mistaken this problem was introduced in [FLINK-14599] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-15602) Blink planner does not respect the precision when casting timestamp to varchar
[ https://issues.apache.org/jira/browse/FLINK-15602?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17016745#comment-17016745 ] Zhenghua Gao commented on FLINK-15602: -- [~dwysakowicz] [~twalthr] [~jark] Let's padding the TIMESTAMP type in both planner. I will file a PR soon. > Blink planner does not respect the precision when casting timestamp to varchar > -- > > Key: FLINK-15602 > URL: https://issues.apache.org/jira/browse/FLINK-15602 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.10.0 >Reporter: Dawid Wysakowicz >Priority: Blocker > Fix For: 1.10.0 > > > According to SQL 2011 Part 2 Section 6.13 General Rules 11) d) > {quote} > If SD is a datetime data type or an interval data type then let Y be the > shortest character string that > conforms to the definition of in Subclause 5.3, “”, and > such that the interpreted value > of Y is SV and the interpreted precision of Y is the precision of SD. > {quote} > That means: > {code} > select cast(cast(TO_TIMESTAMP('2014-07-02 06:14:00', '-MM-DD HH24:mm:SS') > as TIMESTAMP(0)) as VARCHAR(256)) from ...; > // should produce > // 2014-07-02 06:14:00 > select cast(cast(TO_TIMESTAMP('2014-07-02 06:14:00', '-MM-DD HH24:mm:SS') > as TIMESTAMP(3)) as VARCHAR(256)) from ...; > // should produce > // 2014-07-02 06:14:00.000 > select cast(cast(TO_TIMESTAMP('2014-07-02 06:14:00', '-MM-DD HH24:mm:SS') > as TIMESTAMP(9)) as VARCHAR(256)) from ...; > // should produce > // 2014-07-02 06:14:00.0 > {code} > One possible solution would be to propagate the precision in > {{org.apache.flink.table.planner.codegen.calls.ScalarOperatorGens#localTimeToStringCode}}. > If I am not mistaken this problem was introduced in [FLINK-14599] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-15546) Obscure error message from ScalarOperatorGens::generateCast
[ https://issues.apache.org/jira/browse/FLINK-15546?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17016740#comment-17016740 ] Zhenghua Gao commented on FLINK-15546: -- [~lzljs3620320] OK, I will file a PR soon. [~jark] Please assign this to me. > Obscure error message from ScalarOperatorGens::generateCast > --- > > Key: FLINK-15546 > URL: https://issues.apache.org/jira/browse/FLINK-15546 > Project: Flink > Issue Type: Improvement > Components: Table SQL / API >Reporter: Rui Li >Priority: Minor > > Consider the following case: > {noformat} > Flink SQL> describe foo; > root > |-- x: ROW<`f1` DOUBLE, `f2` VARCHAR(10)> > Flink SQL> insert into foo select row(1.1,'abc'); > [INFO] Submitting SQL update statement to the cluster... > [ERROR] Could not execute SQL statement. Reason: > org.apache.flink.table.planner.codegen.CodeGenException: Unsupported cast > from 'ROW' to 'ROW'. > {noformat} > Users are unlikely to figure out what goes wrong from the above error > message. Something like {{Unsupported cast from 'ROW' > to 'ROW'}} will be more helpful. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (FLINK-15602) Blink planner does not respect the precision when casting timestamp to varchar
[ https://issues.apache.org/jira/browse/FLINK-15602?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17016512#comment-17016512 ] Zhenghua Gao edited comment on FLINK-15602 at 1/16/20 9:16 AM: --- Hi [~dwysakowicz] [~twalthr] altherI investigated the behavior of popular DBMS and found: PostgreSQL DO NOT pad '0' and Oracle/MS SQL pad '0' (MYSQL would pad '0' for TIMESTAMP type and not pad '0' for DATETIME type). And, hive/spark would not pad '0' too. What's your opinion about the padding behavior? was (Author: docete): Hi [~dwysakowicz] [~tiwalter] I investigated the behavior of popular DBMS and found: PostgreSQL DO NOT pad '0' and Oracle/MS SQL pad '0' (MYSQL would pad '0' for TIMESTAMP type and not pad '0' for DATETIME type). And, hive/spark would not pad '0' too. What's your opinion about the padding behavior? > Blink planner does not respect the precision when casting timestamp to varchar > -- > > Key: FLINK-15602 > URL: https://issues.apache.org/jira/browse/FLINK-15602 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.10.0 >Reporter: Dawid Wysakowicz >Priority: Blocker > Fix For: 1.10.0 > > > According to SQL 2011 Part 2 Section 6.13 General Rules 11) d) > {quote} > If SD is a datetime data type or an interval data type then let Y be the > shortest character string that > conforms to the definition of in Subclause 5.3, “”, and > such that the interpreted value > of Y is SV and the interpreted precision of Y is the precision of SD. > {quote} > That means: > {code} > select cast(cast(TO_TIMESTAMP('2014-07-02 06:14:00', '-MM-DD HH24:mm:SS') > as TIMESTAMP(0)) as VARCHAR(256)) from ...; > // should produce > // 2014-07-02 06:14:00 > select cast(cast(TO_TIMESTAMP('2014-07-02 06:14:00', '-MM-DD HH24:mm:SS') > as TIMESTAMP(3)) as VARCHAR(256)) from ...; > // should produce > // 2014-07-02 06:14:00.000 > select cast(cast(TO_TIMESTAMP('2014-07-02 06:14:00', '-MM-DD HH24:mm:SS') > as TIMESTAMP(9)) as VARCHAR(256)) from ...; > // should produce > // 2014-07-02 06:14:00.0 > {code} > One possible solution would be to propagate the precision in > {{org.apache.flink.table.planner.codegen.calls.ScalarOperatorGens#localTimeToStringCode}}. > If I am not mistaken this problem was introduced in [FLINK-14599] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-15602) Blink planner does not respect the precision when casting timestamp to varchar
[ https://issues.apache.org/jira/browse/FLINK-15602?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17016512#comment-17016512 ] Zhenghua Gao commented on FLINK-15602: -- Hi [~dwysakowicz] [~tiwalter] I investigated the behavior of popular DBMS and found: PostgreSQL DO NOT pad '0' and Oracle/MS SQL pad '0' (MYSQL would pad '0' for TIMESTAMP type and not pad '0' for DATETIME type). And, hive/spark would not pad '0' too. What's your opinion about the padding behavior? > Blink planner does not respect the precision when casting timestamp to varchar > -- > > Key: FLINK-15602 > URL: https://issues.apache.org/jira/browse/FLINK-15602 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.10.0 >Reporter: Dawid Wysakowicz >Priority: Blocker > Fix For: 1.10.0 > > > According to SQL 2011 Part 2 Section 6.13 General Rules 11) d) > {quote} > If SD is a datetime data type or an interval data type then let Y be the > shortest character string that > conforms to the definition of in Subclause 5.3, “”, and > such that the interpreted value > of Y is SV and the interpreted precision of Y is the precision of SD. > {quote} > That means: > {code} > select cast(cast(TO_TIMESTAMP('2014-07-02 06:14:00', '-MM-DD HH24:mm:SS') > as TIMESTAMP(0)) as VARCHAR(256)) from ...; > // should produce > // 2014-07-02 06:14:00 > select cast(cast(TO_TIMESTAMP('2014-07-02 06:14:00', '-MM-DD HH24:mm:SS') > as TIMESTAMP(3)) as VARCHAR(256)) from ...; > // should produce > // 2014-07-02 06:14:00.000 > select cast(cast(TO_TIMESTAMP('2014-07-02 06:14:00', '-MM-DD HH24:mm:SS') > as TIMESTAMP(9)) as VARCHAR(256)) from ...; > // should produce > // 2014-07-02 06:14:00.0 > {code} > One possible solution would be to propagate the precision in > {{org.apache.flink.table.planner.codegen.calls.ScalarOperatorGens#localTimeToStringCode}}. > If I am not mistaken this problem was introduced in [FLINK-14599] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (FLINK-15469) UpsertStreamTableSink should support new type system
[ https://issues.apache.org/jira/browse/FLINK-15469?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17014828#comment-17014828 ] Zhenghua Gao edited comment on FLINK-15469 at 1/14/20 3:15 AM: --- Hi [~lzljs3620320], after re-think the whole thing, we should bring the physical data types of the sink and the type class(java tuple2 or scale tuple2) to the planner so that our blink planner could handle the precision things. So there is a proposal as following: # deprecate getRecordType and introduce getRecordDataType instead # remove getOutputType and introduce getConsumedDataType, which returns ROW # introduce getTypeClass interface, which returns type class for codegen What do you think? I will file a PR soon if this works. was (Author: docete): Hi [~lzljs3620320], after re-think the whole thing, we should bring the physical data types of the sink and the type class(java tuple2 or scale tuple2) to the planner so that our blink planner could handle the precision things. So there is a proposal as following: # remove getRecordType and introduce getRecordDataType instead # remove getOutputType and introduce getConsumedDataType, which returns ROW # introduce getTypeClass interface, which returns type class for codegen What do you think? I will file a PR soon if this works. > UpsertStreamTableSink should support new type system > > > Key: FLINK-15469 > URL: https://issues.apache.org/jira/browse/FLINK-15469 > Project: Flink > Issue Type: Improvement > Components: Table SQL / API >Affects Versions: 1.10.0 >Reporter: Zhenghua Gao >Priority: Major > Fix For: 1.11.0 > > > Currently *UpsertStreamTableSink* can only returns TypeInformation of the > requested record, which can't support types with precision and scale, e.g. > TIMESTAMP(p), DECIMAL(p,s). > A proposal is deprecating the *getRecordType* API and adding a > *getRecordDataType* API instead to return the data type of the requested > record. > {code:java} > /** > * Returns the requested record type. > * > * @Deprecated This method will be removed in future versions. It's > recommended to use {@link #getRecordDataType()} instead. > */ > @Deprecated > TypeInformation getRecordType(); > /* > * Returns the requested record data type. > */ > DataType getRecordDataType(); > {code} > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-15469) UpsertStreamTableSink should support new type system
[ https://issues.apache.org/jira/browse/FLINK-15469?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17014828#comment-17014828 ] Zhenghua Gao commented on FLINK-15469: -- Hi [~lzljs3620320], after re-think the whole thing, we should bring the physical data types of the sink and the type class(java tuple2 or scale tuple2) to the planner so that our blink planner could handle the precision things. So there is a proposal as following: # remove getRecordType and introduce getRecordDataType instead # remove getOutputType and introduce getConsumedDataType, which returns ROW # introduce getTypeClass interface, which returns type class for codegen What do you think? I will file a PR soon if this works. > UpsertStreamTableSink should support new type system > > > Key: FLINK-15469 > URL: https://issues.apache.org/jira/browse/FLINK-15469 > Project: Flink > Issue Type: New Feature > Components: Table SQL / API >Affects Versions: 1.10.0 >Reporter: Zhenghua Gao >Priority: Major > Fix For: 1.11.0 > > > Currently *UpsertStreamTableSink* can only returns TypeInformation of the > requested record, which can't support types with precision and scale, e.g. > TIMESTAMP(p), DECIMAL(p,s). > A proposal is deprecating the *getRecordType* API and adding a > *getRecordDataType* API instead to return the data type of the requested > record. > {code:java} > /** > * Returns the requested record type. > * > * @Deprecated This method will be removed in future versions. It's > recommended to use {@link #getRecordDataType()} instead. > */ > @Deprecated > TypeInformation getRecordType(); > /* > * Returns the requested record data type. > */ > DataType getRecordDataType(); > {code} > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-15546) Obscure error message from ScalarOperatorGens::generateCast
[ https://issues.apache.org/jira/browse/FLINK-15546?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17014818#comment-17014818 ] Zhenghua Gao commented on FLINK-15546: -- {code:java} case (from, to) => throw new CodeGenException(s"Unsupported cast from '$from' to '$to'.") {code} I think we should use *operand.resultType* and *targetType* instead in the error message since from and to are *LogicalTypeRoot* > Obscure error message from ScalarOperatorGens::generateCast > --- > > Key: FLINK-15546 > URL: https://issues.apache.org/jira/browse/FLINK-15546 > Project: Flink > Issue Type: Improvement > Components: Table SQL / API >Reporter: Rui Li >Priority: Minor > > Consider the following case: > {noformat} > Flink SQL> describe foo; > root > |-- x: ROW<`f1` DOUBLE, `f2` VARCHAR(10)> > Flink SQL> insert into foo select row(1.1,'abc'); > [INFO] Submitting SQL update statement to the cluster... > [ERROR] Could not execute SQL statement. Reason: > org.apache.flink.table.planner.codegen.CodeGenException: Unsupported cast > from 'ROW' to 'ROW'. > {noformat} > Users are unlikely to figure out what goes wrong from the above error > message. Something like {{Unsupported cast from 'ROW' > to 'ROW'}} will be more helpful. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-15574) DataType to LogicalType conversion issue
[ https://issues.apache.org/jira/browse/FLINK-15574?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17014804#comment-17014804 ] Zhenghua Gao commented on FLINK-15574: -- HI [~b.hanotte], a reason to keep this deprecated conversion logic is to bypass conversion logic in flink-table-common module. In blink we introduce some runtime typeinformation(e.g. DecimalTypeInfo, LegacyTimestampTypeInfo, etc) to support things like precision. Could you update your test to a meaningful scenarios so that we can check whether there is a potential bug in blink's conversion logic? > DataType to LogicalType conversion issue > > > Key: FLINK-15574 > URL: https://issues.apache.org/jira/browse/FLINK-15574 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Runtime >Reporter: Benoit Hanotte >Priority: Major > Labels: pull-request-available > Attachments: 0001-FLINK-15574-Add-unit-test-to-reproduce-issue.patch > > Time Spent: 10m > Remaining Estimate: 0h > > We seem to be encountering an issue with the conversion from DataType to > LogicalType with the Blink planner (full stacktrace below): > {code} > org.apache.flink.table.api.ValidationException: Type > LEGACY(BasicArrayTypeInfo) of table field 'my_array' does not match > with type BasicArrayTypeInfo of the field 'my_array' of the > TableSource return type. > {code} > It seems there exists 2 paths to do the conversion from DataType to > LogicalType: > 1. TypeConversions.fromLegacyInfoToDataType(): > used for instance when calling TableSchema.fromTypeInformation(). > 2. LogicalTypeDataTypeConverter.fromDataTypeToLogicalType(): > Deprecated but still used in TableSourceUtil and many other places. > These 2 code paths can return a different LogicalType for the same input, > leading to issues when the LogicalTypes are compared to ensure they are > compatible. For instance, PlannerTypeUtils.isAssignable() returns false for > a DataType created from BasicArrayTypeInfo (leading to the > ValidationException above). > The full stacktrace is the following: > {code} > org.apache.flink.table.api.ValidationException: Type > LEGACY(BasicArrayTypeInfo) of table field 'my_array' does not match > with type BasicArrayTypeInfo of the field 'my_array' of the > TableSource return type. > at > org.apache.flink.table.planner.sources.TableSourceUtil$$anonfun$4.apply(TableSourceUtil.scala:121) > at > org.apache.flink.table.planner.sources.TableSourceUtil$$anonfun$4.apply(TableSourceUtil.scala:92) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) > at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) > at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186) > at > org.apache.flink.table.planner.sources.TableSourceUtil$.computeIndexMapping(TableSourceUtil.scala:92) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:100) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:55) > at > org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlan(StreamExecTableSourceScan.scala:55) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:86) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:46) > at > org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlan(StreamExecCalc.scala:46) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecUnion$$anonfun$2.apply(StreamExecUnion.scala:86) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecUnion$$anonfun$2.apply(StreamExecUnion.scala:86) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) >
[jira] [Commented] (FLINK-15509) Use sql cilents create view occur Unexpected exception
[ https://issues.apache.org/jira/browse/FLINK-15509?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17012596#comment-17012596 ] Zhenghua Gao commented on FLINK-15509: -- What about validate data type mismatch in this phase either? [~lzljs3620320] > Use sql cilents create view occur Unexpected exception > -- > > Key: FLINK-15509 > URL: https://issues.apache.org/jira/browse/FLINK-15509 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Client >Affects Versions: 1.10.0 >Reporter: Xianxun Ye >Priority: Major > Fix For: 1.11.0 > > > version:master. > Firstly I created a table sucessful by sql clients, and then throw an > unexcepetd exp when created a view. > My steps: > Flink SQL> create table myTable (id int); > *[INFO] Table has been created.* > Flink SQL> show tables ; > myTable > Flink SQL> describe myTable ; > root > |-- id: INT > Flink SQL> create view myView as select * from myTable; > > Exception in thread "main" org.apache.flink.table.client.SqlClientException: > Unexpected exception. This is a bug. Please consider filing an issue. > at org.apache.flink.table.client.SqlClient.main(SqlClient.java:190) > Caused by: org.apache.flink.table.api.ValidationException: SQL validation > failed. findAndCreateTableSource failed. > at > org.apache.flink.table.calcite.FlinkPlannerImpl.validateInternal(FlinkPlannerImpl.scala:130) > at > org.apache.flink.table.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:105) > at > org.apache.flink.table.sqlexec.SqlToOperationConverter.convert(SqlToOperationConverter.java:124) > at org.apache.flink.table.planner.ParserImpl.parse(ParserImpl.java:66) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:464) > at > org.apache.flink.table.client.gateway.local.LocalExecutor.addView(LocalExecutor.java:300) > at > org.apache.flink.table.client.cli.CliClient.callCreateView(CliClient.java:579) > at > org.apache.flink.table.client.cli.CliClient.callCommand(CliClient.java:308) > at java.util.Optional.ifPresent(Optional.java:159) > at org.apache.flink.table.client.cli.CliClient.open(CliClient.java:200) > at org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:125) > at org.apache.flink.table.client.SqlClient.start(SqlClient.java:104) > at org.apache.flink.table.client.SqlClient.main(SqlClient.java:178) > Caused by: org.apache.flink.table.api.TableException: > findAndCreateTableSource failed. > at > org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:55) > at > org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:92) > at > org.apache.flink.table.catalog.DatabaseCalciteSchema.convertCatalogTable(DatabaseCalciteSchema.java:138) > at > org.apache.flink.table.catalog.DatabaseCalciteSchema.convertTable(DatabaseCalciteSchema.java:97) > at > org.apache.flink.table.catalog.DatabaseCalciteSchema.lambda$getTable$0(DatabaseCalciteSchema.java:86) > at java.util.Optional.map(Optional.java:215) > at > org.apache.flink.table.catalog.DatabaseCalciteSchema.getTable(DatabaseCalciteSchema.java:76) > at > org.apache.calcite.jdbc.SimpleCalciteSchema.getImplicitTable(SimpleCalciteSchema.java:83) > at org.apache.calcite.jdbc.CalciteSchema.getTable(CalciteSchema.java:289) > at org.apache.calcite.sql.validate.EmptyScope.resolve_(EmptyScope.java:143) > at > org.apache.calcite.sql.validate.EmptyScope.resolveTable(EmptyScope.java:99) > at > org.apache.calcite.sql.validate.DelegatingScope.resolveTable(DelegatingScope.java:203) > at > org.apache.calcite.sql.validate.IdentifierNamespace.resolveImpl(IdentifierNamespace.java:105) > at > org.apache.calcite.sql.validate.IdentifierNamespace.validateImpl(IdentifierNamespace.java:177) > at > org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1005) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:965) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3125) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3107) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3379) > at > org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60) > at > org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1005) > at >
[jira] [Commented] (FLINK-15445) JDBC Table Source didn't work for Types with precision (or/and scale)
[ https://issues.apache.org/jira/browse/FLINK-15445?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17012429#comment-17012429 ] Zhenghua Gao commented on FLINK-15445: -- We come to an agreement that the user should be informed if the connector does not support the desired precision. And I will put this discussion on ML to let more people get involved. Hopefully we come to something like a connector development guide such that all connectors behave the same for such cases. [http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-A-mechanism-to-validate-the-precision-of-columns-for-connectors-td36552.html] > JDBC Table Source didn't work for Types with precision (or/and scale) > - > > Key: FLINK-15445 > URL: https://issues.apache.org/jira/browse/FLINK-15445 > Project: Flink > Issue Type: Bug > Components: Connectors / JDBC >Affects Versions: 1.10.0 >Reporter: Zhenghua Gao >Assignee: Zhenghua Gao >Priority: Major > Labels: pull-request-available > Fix For: 1.10.0 > > Time Spent: 10m > Remaining Estimate: 0h > > {code:java} > public class JDBCSourceExample { > public static void main(String[] args) throws Exception { > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.setParallelism(1); > EnvironmentSettings envSettings = EnvironmentSettings.newInstance() > .useBlinkPlanner() > .inStreamingMode() > .build(); > StreamTableEnvironment tableEnvironment = > StreamTableEnvironment.create(env, envSettings); > String mysqlCurrencyDDL = "CREATE TABLE currency (\n" + > " currency_id BIGINT,\n" + > " currency_name STRING,\n" + > " rate DOUBLE,\n" + > " currency_time TIMESTAMP(3),\n" + > " country STRING,\n" + > " timestamp6 TIMESTAMP(6),\n" + > " time6 TIME(6),\n" + > " gdp DECIMAL(10, 4)\n" + > ") WITH (\n" + > " 'connector.type' = 'jdbc',\n" + > " 'connector.url' = 'jdbc:mysql://localhost:3306/test',\n" + > " 'connector.username' = 'root'," + > " 'connector.table' = 'currency',\n" + > " 'connector.driver' = 'com.mysql.jdbc.Driver',\n" + > " 'connector.lookup.cache.max-rows' = '500', \n" + > " 'connector.lookup.cache.ttl' = '10s',\n" + > " 'connector.lookup.max-retries' = '3'" + > ")"; > tableEnvironment.sqlUpdate(mysqlCurrencyDDL); > String querySQL = "select * from currency"; > tableEnvironment.toAppendStream(tableEnvironment.sqlQuery(querySQL), > Row.class).print(); > tableEnvironment.execute("JdbcExample"); > } > }{code} > > Throws Exception: > > {code:java} > org.apache.flink.table.api.ValidationException: Type TIMESTAMP(6) of table > field 'timestamp6_col' does not match with the physical type TIMESTAMP(3) of > the 'timestamp6_col' field of the TableSource return > type.org.apache.flink.table.api.ValidationException: Type TIMESTAMP(6) of > table field 'timestamp6_col' does not match with the physical type > TIMESTAMP(3) of the 'timestamp6_col' field of the TableSource return type. > at > org.apache.flink.table.utils.TypeMappingUtils.lambda$checkPhysicalLogicalTypeCompatible$4(TypeMappingUtils.java:164) > at > org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:277) > at > org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:254) > at > org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor.visit(LogicalTypeDefaultVisitor.java:132) > at > org.apache.flink.table.types.logical.TimestampType.accept(TimestampType.java:151) > at > org.apache.flink.table.utils.TypeMappingUtils.checkIfCompatible(TypeMappingUtils.java:254) > at > org.apache.flink.table.utils.TypeMappingUtils.checkPhysicalLogicalTypeCompatible(TypeMappingUtils.java:160) > at > org.apache.flink.table.utils.TypeMappingUtils.lambda$computeInCompositeType$8(TypeMappingUtils.java:232) > at java.util.stream.Collectors.lambda$toMap$58(Collectors.java:1321) at > java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169) at > java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382) > at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481) at > java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471) > at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708) > at
[jira] [Commented] (FLINK-15509) Use sql cilents create view occur Unexpected exception
[ https://issues.apache.org/jira/browse/FLINK-15509?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17011659#comment-17011659 ] Zhenghua Gao commented on FLINK-15509: -- Should we verify whether the properties exist, or whether the properties is correct? I think correctness is hard to verify on Compile side (e.g. HDFS path, JDBC URLs) > Use sql cilents create view occur Unexpected exception > -- > > Key: FLINK-15509 > URL: https://issues.apache.org/jira/browse/FLINK-15509 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Client >Affects Versions: 1.10.0 >Reporter: Xianxun Ye >Priority: Major > Fix For: 1.11.0 > > > version:master. > Firstly I created a table sucessful by sql clients, and then throw an > unexcepetd exp when created a view. > My steps: > Flink SQL> create table myTable (id int); > *[INFO] Table has been created.* > Flink SQL> show tables ; > myTable > Flink SQL> describe myTable ; > root > |-- id: INT > Flink SQL> create view myView as select * from myTable; > > Exception in thread "main" org.apache.flink.table.client.SqlClientException: > Unexpected exception. This is a bug. Please consider filing an issue. > at org.apache.flink.table.client.SqlClient.main(SqlClient.java:190) > Caused by: org.apache.flink.table.api.ValidationException: SQL validation > failed. findAndCreateTableSource failed. > at > org.apache.flink.table.calcite.FlinkPlannerImpl.validateInternal(FlinkPlannerImpl.scala:130) > at > org.apache.flink.table.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:105) > at > org.apache.flink.table.sqlexec.SqlToOperationConverter.convert(SqlToOperationConverter.java:124) > at org.apache.flink.table.planner.ParserImpl.parse(ParserImpl.java:66) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:464) > at > org.apache.flink.table.client.gateway.local.LocalExecutor.addView(LocalExecutor.java:300) > at > org.apache.flink.table.client.cli.CliClient.callCreateView(CliClient.java:579) > at > org.apache.flink.table.client.cli.CliClient.callCommand(CliClient.java:308) > at java.util.Optional.ifPresent(Optional.java:159) > at org.apache.flink.table.client.cli.CliClient.open(CliClient.java:200) > at org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:125) > at org.apache.flink.table.client.SqlClient.start(SqlClient.java:104) > at org.apache.flink.table.client.SqlClient.main(SqlClient.java:178) > Caused by: org.apache.flink.table.api.TableException: > findAndCreateTableSource failed. > at > org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:55) > at > org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:92) > at > org.apache.flink.table.catalog.DatabaseCalciteSchema.convertCatalogTable(DatabaseCalciteSchema.java:138) > at > org.apache.flink.table.catalog.DatabaseCalciteSchema.convertTable(DatabaseCalciteSchema.java:97) > at > org.apache.flink.table.catalog.DatabaseCalciteSchema.lambda$getTable$0(DatabaseCalciteSchema.java:86) > at java.util.Optional.map(Optional.java:215) > at > org.apache.flink.table.catalog.DatabaseCalciteSchema.getTable(DatabaseCalciteSchema.java:76) > at > org.apache.calcite.jdbc.SimpleCalciteSchema.getImplicitTable(SimpleCalciteSchema.java:83) > at org.apache.calcite.jdbc.CalciteSchema.getTable(CalciteSchema.java:289) > at org.apache.calcite.sql.validate.EmptyScope.resolve_(EmptyScope.java:143) > at > org.apache.calcite.sql.validate.EmptyScope.resolveTable(EmptyScope.java:99) > at > org.apache.calcite.sql.validate.DelegatingScope.resolveTable(DelegatingScope.java:203) > at > org.apache.calcite.sql.validate.IdentifierNamespace.resolveImpl(IdentifierNamespace.java:105) > at > org.apache.calcite.sql.validate.IdentifierNamespace.validateImpl(IdentifierNamespace.java:177) > at > org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1005) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:965) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3125) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3107) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3379) > at > org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60) > at > org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84) > at >
[jira] [Commented] (FLINK-15525) HBase connector should use new type system to suppport precision/scale
[ https://issues.apache.org/jira/browse/FLINK-15525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17011346#comment-17011346 ] Zhenghua Gao commented on FLINK-15525: -- [~lzljs3620320] [~Leonard Xu] Close this and copy the description to the older one. We can discuss under the older ticket. > HBase connector should use new type system to suppport precision/scale > -- > > Key: FLINK-15525 > URL: https://issues.apache.org/jira/browse/FLINK-15525 > Project: Flink > Issue Type: Bug > Components: Connectors / HBase >Affects Versions: 1.10.0 >Reporter: Zhenghua Gao >Priority: Major > Fix For: 1.11.0 > > > Currently *HBaseTableSchema* use TypeInformation to specify an HBase table's > schema, which would cause precision/scale loss for several data types. > Meanwhile *HBaseTypeUtils* serialize a java.sql.Timestamp to long in bytes, > which would cause precision loss for TIMESTAMP types. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-15363) Hbase connector do not support datatypes with precision like TIMESTAMP(9) and DECIMAL(10,4)
[ https://issues.apache.org/jira/browse/FLINK-15363?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17011343#comment-17011343 ] Zhenghua Gao commented on FLINK-15363: -- Currently *HBaseTableSchema* use TypeInformation to specify an HBase table's schema, which would cause precision/scale loss for several data types. Meanwhile *HBaseTypeUtils* serialize a java.sql.Timestamp to long in bytes, which would cause precision loss for TIMESTAMP types. the hbase connector should use new type system to fix this. > Hbase connector do not support datatypes with precision like TIMESTAMP(9) and > DECIMAL(10,4) > --- > > Key: FLINK-15363 > URL: https://issues.apache.org/jira/browse/FLINK-15363 > Project: Flink > Issue Type: Bug > Components: Connectors / HBase >Affects Versions: 1.10.0 >Reporter: Leonard Xu >Priority: Major > Fix For: 1.10.0 > > > {code:java} > // exception msg > rowtype of new rel:rowtype of new rel:RecordType(VARCHAR(2147483647) > CHARACTER SET "UTF-16LE" order_id, VARCHAR(2147483647) CHARACTER SET > "UTF-16LE" item, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" currency, > DECIMAL(10, 4) amount, TIMESTAMP(3) order_time, TIME ATTRIBUTE(PROCTIME) NOT > NULL proc_time, DECIMAL(20, 4) amount_kg, TIME ATTRIBUTE(ROWTIME) ts, BIGINT > currency_id, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" currency_name, > DECIMAL(38, 4) rate, TIMESTAMP(3) currency_time, VARCHAR(2147483647) > CHARACTER SET "UTF-16LE" country, VARCHAR(2147483647) CHARACTER SET > "UTF-16LE" rowkey, RecordType:peek_no_expand(INTEGER country_id, > VARCHAR(2147483647) CHARACTER SET "UTF-16LE" country_name, > VARCHAR(2147483647) CHARACTER SET "UTF-16LE" country_name_cn, > VARCHAR(2147483647) CHARACTER SET "UTF-16LE" currency, VARCHAR(2147483647) > CHARACTER SET "UTF-16LE" region_name) f1, > RecordType:peek_no_expand(TIMESTAMP(3) record_timestamp3, TIMESTAMP(3) > record_timestamp9, TIME(0) time3, TIME(0) time9, DECIMAL(38, 18) gdp) f2) NOT > NULLrowtype of set:RecordType(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" > order_id, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" item, > VARCHAR(2147483647) CHARACTER SET "UTF-16LE" currency, DECIMAL(10, 4) amount, > TIMESTAMP(3) order_time, TIME ATTRIBUTE(PROCTIME) NOT NULL proc_time, > DECIMAL(20, 4) amount_kg, TIME ATTRIBUTE(ROWTIME) ts, BIGINT currency_id, > VARCHAR(2147483647) CHARACTER SET "UTF-16LE" currency_name, DECIMAL(38, 4) > rate, TIMESTAMP(3) currency_time, VARCHAR(2147483647) CHARACTER SET > "UTF-16LE" country, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" rowkey, > RecordType:peek_no_expand(INTEGER country_id, VARCHAR(2147483647) CHARACTER > SET "UTF-16LE" country_name, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" > country_name_cn, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" currency, > VARCHAR(2147483647) CHARACTER SET "UTF-16LE" region_name) f1, > RecordType:peek_no_expand(TIMESTAMP(3) record_timestamp3, TIMESTAMP(9) > record_timestamp9, TIME(0) time3, TIME(0) time9, DECIMAL(10, 4) gdp) f2) NOT > NULL at > org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:84) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58) > at > scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) > at > scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) > at scala.collection.Iterator$class.foreach(Iterator.scala:893) at > scala.collection.AbstractIterator.foreach(Iterator.scala:1336) at > scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at > scala.collection.AbstractIterable.foreach(Iterable.scala:54) at > scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157) at > scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) at > org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57) > at > org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:167) > at > org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:89) > at > org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77) > at > org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:223) > at >
[jira] [Closed] (FLINK-15525) HBase connector should use new type system to suppport precision/scale
[ https://issues.apache.org/jira/browse/FLINK-15525?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhenghua Gao closed FLINK-15525. Resolution: Duplicate > HBase connector should use new type system to suppport precision/scale > -- > > Key: FLINK-15525 > URL: https://issues.apache.org/jira/browse/FLINK-15525 > Project: Flink > Issue Type: Bug > Components: Connectors / HBase >Affects Versions: 1.10.0 >Reporter: Zhenghua Gao >Priority: Major > Fix For: 1.11.0 > > > Currently *HBaseTableSchema* use TypeInformation to specify an HBase table's > schema, which would cause precision/scale loss for several data types. > Meanwhile *HBaseTypeUtils* serialize a java.sql.Timestamp to long in bytes, > which would cause precision loss for TIMESTAMP types. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-15525) HBase connector should use new type system to suppport precision/scale
[ https://issues.apache.org/jira/browse/FLINK-15525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17011328#comment-17011328 ] Zhenghua Gao commented on FLINK-15525: -- [~Leonard Xu] Added a link to old ticket. > HBase connector should use new type system to suppport precision/scale > -- > > Key: FLINK-15525 > URL: https://issues.apache.org/jira/browse/FLINK-15525 > Project: Flink > Issue Type: Bug > Components: Connectors / HBase >Affects Versions: 1.10.0 >Reporter: Zhenghua Gao >Priority: Major > Fix For: 1.11.0 > > > Currently *HBaseTableSchema* use TypeInformation to specify an HBase table's > schema, which would cause precision/scale loss for several data types. > Meanwhile *HBaseTypeUtils* serialize a java.sql.Timestamp to long in bytes, > which would cause precision loss for TIMESTAMP types. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-13651) Blink planner should parse char(n)/varchar(n)/decimal(p, s) inside a string to corresponding datatype
[ https://issues.apache.org/jira/browse/FLINK-13651?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17011306#comment-17011306 ] Zhenghua Gao commented on FLINK-13651: -- [~liyu] [~jark] The root cause is PlannerExpressionParserImpl in both old planner and blink planner use old type system, and we must keep them as the same for packaging concern, see https://issues.apache.org/jira/browse/FLINK-13267 The impact of using new type system for both PlannerExpressionParserImpl needs to evaluate. I prefer to postpone it to later release since this only affects Table API. [~lzljs3620320] Is there any plan to refactor the Scala style Expression parser? > Blink planner should parse char(n)/varchar(n)/decimal(p, s) inside a string > to corresponding datatype > - > > Key: FLINK-13651 > URL: https://issues.apache.org/jira/browse/FLINK-13651 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.9.0, 1.10.0 >Reporter: Zhenghua Gao >Priority: Major > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > could reproduce in ScalarFunctionsTest: > `testAllApis( > 'f31.cast(DataTypes.DECIMAL(38, 18)).truncate(2), > "f31.cast(DECIMAL(38, 18)).truncate(2)", > "truncate(cast(f31 as decimal(38, 18)), 2)", > "-0.12")` > > A possible reason is LookupCallResolver treat decimal(38, 18) as a function > call. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-15525) HBase connector should use new type system to suppport precision/scale
Zhenghua Gao created FLINK-15525: Summary: HBase connector should use new type system to suppport precision/scale Key: FLINK-15525 URL: https://issues.apache.org/jira/browse/FLINK-15525 Project: Flink Issue Type: Bug Components: Connectors / HBase Affects Versions: 1.10.0 Reporter: Zhenghua Gao Fix For: 1.11.0 Currently *HBaseTableSchema* use TypeInformation to specify an HBase table's schema, which would cause precision/scale loss for several data types. Meanwhile *HBaseTypeUtils* serialize a java.sql.Timestamp to long in bytes, which would cause precision loss for TIMESTAMP types. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-14925) the return type of TO_TIMESTAMP should be Timestamp(9) instead of Timestamp(3)
[ https://issues.apache.org/jira/browse/FLINK-14925?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17010512#comment-17010512 ] Zhenghua Gao commented on FLINK-14925: -- [~jark] Make sense. Is there any ticket to track supporting TIMESTAMP(9) as rowtime attribute? > the return type of TO_TIMESTAMP should be Timestamp(9) instead of Timestamp(3) > -- > > Key: FLINK-14925 > URL: https://issues.apache.org/jira/browse/FLINK-14925 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.10.0 >Reporter: Zhenghua Gao >Priority: Minor > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-15509) Use sql cilents create view occur Unexpected exception
[ https://issues.apache.org/jira/browse/FLINK-15509?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17010331#comment-17010331 ] Zhenghua Gao commented on FLINK-15509: -- Hi [~yesorno], the reason is the table you created is not valid. Our DDL should contains both schema information and *with properties* to describe connectors[1][2]. When you create a table in DDL, it just write the table information in catalog. When you query a table, flink will check whether the table is valid(have a try to "SELECT * FROM myTable", will throw the same exception). Meanwhile,when you create a view in SQL client, flink will check whether the underlying table is valid. [1] [https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/create.html] [2] [https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connect.html#file-system-connector] > Use sql cilents create view occur Unexpected exception > -- > > Key: FLINK-15509 > URL: https://issues.apache.org/jira/browse/FLINK-15509 > Project: Flink > Issue Type: Bug > Components: Table SQL / Client >Reporter: Xianxun Ye >Priority: Major > > version:master. > Firstly I created a table sucessful by sql clients, and then throw an > unexcepetd exp when created a view. > My steps: > Flink SQL> create table myTable (id int); > *[INFO] Table has been created.* > Flink SQL> show tables ; > myTable > Flink SQL> describe myTable ; > root > |-- id: INT > Flink SQL> create view myView as select * from myTable; > > Exception in thread "main" org.apache.flink.table.client.SqlClientException: > Unexpected exception. This is a bug. Please consider filing an issue. > at org.apache.flink.table.client.SqlClient.main(SqlClient.java:190) > Caused by: org.apache.flink.table.api.ValidationException: SQL validation > failed. findAndCreateTableSource failed. > at > org.apache.flink.table.calcite.FlinkPlannerImpl.validateInternal(FlinkPlannerImpl.scala:130) > at > org.apache.flink.table.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:105) > at > org.apache.flink.table.sqlexec.SqlToOperationConverter.convert(SqlToOperationConverter.java:124) > at org.apache.flink.table.planner.ParserImpl.parse(ParserImpl.java:66) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:464) > at > org.apache.flink.table.client.gateway.local.LocalExecutor.addView(LocalExecutor.java:300) > at > org.apache.flink.table.client.cli.CliClient.callCreateView(CliClient.java:579) > at > org.apache.flink.table.client.cli.CliClient.callCommand(CliClient.java:308) > at java.util.Optional.ifPresent(Optional.java:159) > at org.apache.flink.table.client.cli.CliClient.open(CliClient.java:200) > at org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:125) > at org.apache.flink.table.client.SqlClient.start(SqlClient.java:104) > at org.apache.flink.table.client.SqlClient.main(SqlClient.java:178) > Caused by: org.apache.flink.table.api.TableException: > findAndCreateTableSource failed. > at > org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:55) > at > org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:92) > at > org.apache.flink.table.catalog.DatabaseCalciteSchema.convertCatalogTable(DatabaseCalciteSchema.java:138) > at > org.apache.flink.table.catalog.DatabaseCalciteSchema.convertTable(DatabaseCalciteSchema.java:97) > at > org.apache.flink.table.catalog.DatabaseCalciteSchema.lambda$getTable$0(DatabaseCalciteSchema.java:86) > at java.util.Optional.map(Optional.java:215) > at > org.apache.flink.table.catalog.DatabaseCalciteSchema.getTable(DatabaseCalciteSchema.java:76) > at > org.apache.calcite.jdbc.SimpleCalciteSchema.getImplicitTable(SimpleCalciteSchema.java:83) > at org.apache.calcite.jdbc.CalciteSchema.getTable(CalciteSchema.java:289) > at org.apache.calcite.sql.validate.EmptyScope.resolve_(EmptyScope.java:143) > at > org.apache.calcite.sql.validate.EmptyScope.resolveTable(EmptyScope.java:99) > at > org.apache.calcite.sql.validate.DelegatingScope.resolveTable(DelegatingScope.java:203) > at > org.apache.calcite.sql.validate.IdentifierNamespace.resolveImpl(IdentifierNamespace.java:105) > at > org.apache.calcite.sql.validate.IdentifierNamespace.validateImpl(IdentifierNamespace.java:177) > at > org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1005) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:965) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3125) > at >
[jira] [Commented] (FLINK-15508) support DataType with precision in ExpressionParser
[ https://issues.apache.org/jira/browse/FLINK-15508?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17010323#comment-17010323 ] Zhenghua Gao commented on FLINK-15508: -- Seem as duplicates of FLINK-13651? The root cause is PlannerExpressionParserImpl of both old planner and blink planner use old type system( and we must keep both PlannerExpressionParserImpl classes the same for packaging concern, see FLINK-13267). > support DataType with precision in ExpressionParser > --- > > Key: FLINK-15508 > URL: https://issues.apache.org/jira/browse/FLINK-15508 > Project: Flink > Issue Type: Improvement > Components: Table SQL / API >Reporter: Leonard Xu >Priority: Major > Fix For: 1.11.0 > > > PlannerExpressionParser do not support DataTypes with precision like > `DECIMAL(38, 0)`, this will lead following api test fail. > {code:java} > //success > testTableApi("123456789123456789123456789".cast(DataTypes.DECIMAL(38, 0)), > "123456789123456789123456789") > //fail > testTableApi(ExpressionParser.parseExpression( > "'123456789123456789123456789'.cast(DECIMAL(38, 0))"), > "123456789123456789123456789") > {code} > org.apache.flink.table.api.ValidationException: Undefined function: > DECIMALorg.apache.flink.table.api.ValidationException: Undefined function: > DECIMAL at > org.apache.flink.table.expressions.resolver.LookupCallResolver.lambda$visit$0(LookupCallResolver.java:47) > at java.util.Optional.orElseThrow(Optional.java:290) at > org.apache.flink.table.expressions.resolver.LookupCallResolver.visit(LookupCallResolver.java:47) > at > org.apache.flink.table.expressions.resolver.LookupCallResolver.visit(LookupCallResolver.java:36) > at > org.apache.flink.table.expressions.ApiExpressionVisitor.visit(ApiExpressionVisitor.java:37) > at > org.apache.flink.table.expressions.LookupCallExpression.accept(LookupCallExpression.java:66) > at > org.apache.flink.table.expressions.resolver.LookupCallResolver.lambda$resolveChildren$1(LookupCallResolver.java:62) > at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) > at > java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382) > at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481) at > java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471) > at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708) > at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) at > java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499) at > org.apache.flink.table.expressions.resolver.LookupCallResolver.resolveChildren(LookupCallResolver.java:63) > at > org.apache.flink.table.expressions.resolver.LookupCallResolver.visit(LookupCallResolver.java:52) > at > org.apache.flink.table.expressions.resolver.LookupCallResolver.visit(LookupCallResolver.java:36) > at > org.apache.flink.table.expressions.ApiExpressionVisitor.visit(ApiExpressionVisitor.java:37) > at > org.apache.flink.table.expressions.LookupCallExpression.accept(LookupCallExpression.java:66) > at > org.apache.flink.table.api.internal.TableImpl.lambda$select$0(TableImpl.java:123) > at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) > at > java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948) > at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481) at > java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471) > at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708) > at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) at > java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499) at > org.apache.flink.table.api.internal.TableImpl.select(TableImpl.java:124) at > org.apache.flink.table.planner.expressions.utils.ExpressionTestBase.addTableApiTestExpr(ExpressionTestBase.scala:242) > at > org.apache.flink.table.planner.expressions.utils.ExpressionTestBase.testTableApi(ExpressionTestBase.scala:232) > at > org.apache.flink.table.planner.expressions.DecimalTypeTest.testDecimal(DecimalTypeTest.scala:156) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) > at >
[jira] [Commented] (FLINK-15460) planner dependencies won't be necessary for JDBC connector
[ https://issues.apache.org/jira/browse/FLINK-15460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17008583#comment-17008583 ] Zhenghua Gao commented on FLINK-15460: -- [~twalthr] Actually jdbc connector code should not depend on planner now. Only testing code depend on them. So I will remove planner dependencies from JDBC connector by changing the scope to test. What do you think? > planner dependencies won't be necessary for JDBC connector > -- > > Key: FLINK-15460 > URL: https://issues.apache.org/jira/browse/FLINK-15460 > Project: Flink > Issue Type: Improvement > Components: Connectors / JDBC >Affects Versions: 1.10.0 >Reporter: Zhenghua Gao >Priority: Minor > Fix For: 1.11.0 > > > remove planner dependencies from JDBC connector by changing the scope to test. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-15460) planner dependencies won't be necessary for JDBC connector
[ https://issues.apache.org/jira/browse/FLINK-15460?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhenghua Gao updated FLINK-15460: - Description: remove planner dependencies from JDBC connector by changing the scope to test. > planner dependencies won't be necessary for JDBC connector > -- > > Key: FLINK-15460 > URL: https://issues.apache.org/jira/browse/FLINK-15460 > Project: Flink > Issue Type: Improvement > Components: Connectors / JDBC >Affects Versions: 1.10.0 >Reporter: Zhenghua Gao >Priority: Minor > Fix For: 1.11.0 > > > remove planner dependencies from JDBC connector by changing the scope to test. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-15460) planner dependencies won't be necessary for JDBC connector
[ https://issues.apache.org/jira/browse/FLINK-15460?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhenghua Gao updated FLINK-15460: - Component/s: (was: Connectors / HBase) > planner dependencies won't be necessary for JDBC connector > -- > > Key: FLINK-15460 > URL: https://issues.apache.org/jira/browse/FLINK-15460 > Project: Flink > Issue Type: Improvement > Components: Connectors / JDBC >Affects Versions: 1.10.0 >Reporter: Zhenghua Gao >Priority: Minor > Fix For: 1.11.0 > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-15445) JDBC Table Source didn't work for Types with precision (or/and scale)
[ https://issues.apache.org/jira/browse/FLINK-15445?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17008529#comment-17008529 ] Zhenghua Gao commented on FLINK-15445: -- [~jark] [~twalthr] I have opened a PR to let JDBC source support precision/scale, and a problem arises: Do we need to check whether the underlying database supports the data types defined in our DDL to avoid precision loss? Some scenarios are listed as following: # the underlying DB supports DECIMAL(65, 30), which is out of range of Flink's decimal # User defines a table with DECIMAL(10, 4) in underlying db, and defines a table with DECIMAL(5, 2) in Flink # the underlying DB supports TIMESTAMP(6), and user defines a table with TIMESTAMP(9) in Flink # the precision or scale of the underlying DB varies between different versions What do you think about this? > JDBC Table Source didn't work for Types with precision (or/and scale) > - > > Key: FLINK-15445 > URL: https://issues.apache.org/jira/browse/FLINK-15445 > Project: Flink > Issue Type: Bug > Components: Connectors / JDBC >Affects Versions: 1.10.0 >Reporter: Zhenghua Gao >Assignee: Zhenghua Gao >Priority: Major > Labels: pull-request-available > Fix For: 1.10.0 > > Time Spent: 10m > Remaining Estimate: 0h > > {code:java} > public class JDBCSourceExample { > public static void main(String[] args) throws Exception { > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.setParallelism(1); > EnvironmentSettings envSettings = EnvironmentSettings.newInstance() > .useBlinkPlanner() > .inStreamingMode() > .build(); > StreamTableEnvironment tableEnvironment = > StreamTableEnvironment.create(env, envSettings); > String mysqlCurrencyDDL = "CREATE TABLE currency (\n" + > " currency_id BIGINT,\n" + > " currency_name STRING,\n" + > " rate DOUBLE,\n" + > " currency_time TIMESTAMP(3),\n" + > " country STRING,\n" + > " timestamp6 TIMESTAMP(6),\n" + > " time6 TIME(6),\n" + > " gdp DECIMAL(10, 4)\n" + > ") WITH (\n" + > " 'connector.type' = 'jdbc',\n" + > " 'connector.url' = 'jdbc:mysql://localhost:3306/test',\n" + > " 'connector.username' = 'root'," + > " 'connector.table' = 'currency',\n" + > " 'connector.driver' = 'com.mysql.jdbc.Driver',\n" + > " 'connector.lookup.cache.max-rows' = '500', \n" + > " 'connector.lookup.cache.ttl' = '10s',\n" + > " 'connector.lookup.max-retries' = '3'" + > ")"; > tableEnvironment.sqlUpdate(mysqlCurrencyDDL); > String querySQL = "select * from currency"; > tableEnvironment.toAppendStream(tableEnvironment.sqlQuery(querySQL), > Row.class).print(); > tableEnvironment.execute("JdbcExample"); > } > }{code} > > Throws Exception: > > {code:java} > org.apache.flink.table.api.ValidationException: Type TIMESTAMP(6) of table > field 'timestamp6_col' does not match with the physical type TIMESTAMP(3) of > the 'timestamp6_col' field of the TableSource return > type.org.apache.flink.table.api.ValidationException: Type TIMESTAMP(6) of > table field 'timestamp6_col' does not match with the physical type > TIMESTAMP(3) of the 'timestamp6_col' field of the TableSource return type. > at > org.apache.flink.table.utils.TypeMappingUtils.lambda$checkPhysicalLogicalTypeCompatible$4(TypeMappingUtils.java:164) > at > org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:277) > at > org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:254) > at > org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor.visit(LogicalTypeDefaultVisitor.java:132) > at > org.apache.flink.table.types.logical.TimestampType.accept(TimestampType.java:151) > at > org.apache.flink.table.utils.TypeMappingUtils.checkIfCompatible(TypeMappingUtils.java:254) > at > org.apache.flink.table.utils.TypeMappingUtils.checkPhysicalLogicalTypeCompatible(TypeMappingUtils.java:160) > at > org.apache.flink.table.utils.TypeMappingUtils.lambda$computeInCompositeType$8(TypeMappingUtils.java:232) > at java.util.stream.Collectors.lambda$toMap$58(Collectors.java:1321) at > java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169) at > java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382) > at
[jira] [Created] (FLINK-15469) UpsertStreamTableSink should support new type system
Zhenghua Gao created FLINK-15469: Summary: UpsertStreamTableSink should support new type system Key: FLINK-15469 URL: https://issues.apache.org/jira/browse/FLINK-15469 Project: Flink Issue Type: New Feature Components: Table SQL / API Affects Versions: 1.10.0 Reporter: Zhenghua Gao Fix For: 1.11.0 Currently *UpsertStreamTableSink* can only returns TypeInformation of the requested record, which can't support types with precision and scale, e.g. TIMESTAMP(p), DECIMAL(p,s). A proposal is deprecating the *getRecordType* API and adding a *getRecordDataType* API instead to return the data type of the requested record. {code:java} /** * Returns the requested record type. * * @Deprecated This method will be removed in future versions. It's recommended to use {@link #getRecordDataType()} instead. */ @Deprecated TypeInformation getRecordType(); /* * Returns the requested record data type. */ DataType getRecordDataType(); {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-15460) planner dependencies won't be necessary for JDBC connector
Zhenghua Gao created FLINK-15460: Summary: planner dependencies won't be necessary for JDBC connector Key: FLINK-15460 URL: https://issues.apache.org/jira/browse/FLINK-15460 Project: Flink Issue Type: Improvement Components: Connectors / HBase, Connectors / JDBC Affects Versions: 1.10.0 Reporter: Zhenghua Gao Fix For: 1.11.0 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-15445) JDBC Table Source didn't work for Types with precision (or/and scale)
[ https://issues.apache.org/jira/browse/FLINK-15445?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhenghua Gao updated FLINK-15445: - Description: {code:java} public class JDBCSourceExample { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); EnvironmentSettings envSettings = EnvironmentSettings.newInstance() .useBlinkPlanner() .inStreamingMode() .build(); StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env, envSettings); String mysqlCurrencyDDL = "CREATE TABLE currency (\n" + " currency_id BIGINT,\n" + " currency_name STRING,\n" + " rate DOUBLE,\n" + " currency_time TIMESTAMP(3),\n" + " country STRING,\n" + " timestamp6 TIMESTAMP(6),\n" + " time6 TIME(6),\n" + " gdp DECIMAL(10, 4)\n" + ") WITH (\n" + " 'connector.type' = 'jdbc',\n" + " 'connector.url' = 'jdbc:mysql://localhost:3306/test',\n" + " 'connector.username' = 'root'," + " 'connector.table' = 'currency',\n" + " 'connector.driver' = 'com.mysql.jdbc.Driver',\n" + " 'connector.lookup.cache.max-rows' = '500', \n" + " 'connector.lookup.cache.ttl' = '10s',\n" + " 'connector.lookup.max-retries' = '3'" + ")"; tableEnvironment.sqlUpdate(mysqlCurrencyDDL); String querySQL = "select * from currency"; tableEnvironment.toAppendStream(tableEnvironment.sqlQuery(querySQL), Row.class).print(); tableEnvironment.execute("JdbcExample"); } }{code} Throws Exception: {code:java} org.apache.flink.table.api.ValidationException: Type TIMESTAMP(6) of table field 'timestamp6_col' does not match with the physical type TIMESTAMP(3) of the 'timestamp6_col' field of the TableSource return type.org.apache.flink.table.api.ValidationException: Type TIMESTAMP(6) of table field 'timestamp6_col' does not match with the physical type TIMESTAMP(3) of the 'timestamp6_col' field of the TableSource return type. at org.apache.flink.table.utils.TypeMappingUtils.lambda$checkPhysicalLogicalTypeCompatible$4(TypeMappingUtils.java:164) at org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:277) at org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:254) at org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor.visit(LogicalTypeDefaultVisitor.java:132) at org.apache.flink.table.types.logical.TimestampType.accept(TimestampType.java:151) at org.apache.flink.table.utils.TypeMappingUtils.checkIfCompatible(TypeMappingUtils.java:254) at org.apache.flink.table.utils.TypeMappingUtils.checkPhysicalLogicalTypeCompatible(TypeMappingUtils.java:160) at org.apache.flink.table.utils.TypeMappingUtils.lambda$computeInCompositeType$8(TypeMappingUtils.java:232) at java.util.stream.Collectors.lambda$toMap$58(Collectors.java:1321) at java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169) at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382) at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481) at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471) at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708) at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499) at org.apache.flink.table.utils.TypeMappingUtils.computeInCompositeType(TypeMappingUtils.java:214) at org.apache.flink.table.utils.TypeMappingUtils.computePhysicalIndices(TypeMappingUtils.java:192) at org.apache.flink.table.utils.TypeMappingUtils.computePhysicalIndicesOrTimeAttributeMarkers(TypeMappingUtils.java:112) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.computeIndexMapping(StreamExecTableSourceScan.scala:211) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:107) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:62) at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlan(StreamExecTableSourceScan.scala:62) at
[jira] [Updated] (FLINK-15379) JDBC connector return wrong value if defined dataType contains precision
[ https://issues.apache.org/jira/browse/FLINK-15379?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhenghua Gao updated FLINK-15379: - Description: A mysql table like: {code:java} // CREATE TABLE `currency` ( `currency_id` bigint(20) NOT NULL, `currency_name` varchar(200) DEFAULT NULL, `rate` double DEFAULT NULL, `currency_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP, `country` varchar(100) DEFAULT NULL, `timestamp6` timestamp(6) NULL DEFAULT NULL, `time6` time(6) DEFAULT NULL, `gdp` decimal(10,4) DEFAULT NULL, PRIMARY KEY (`currency_id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8 +-+---+--+-+-++-+--+ | currency_id | currency_name | rate | currency_time | country | timestamp6 | time6 | gdp | +-+---+--+-+-++-+--+ | 1 | US Dollar | 1020 | 2019-12-20 17:23:00 | America | 2019-12-20 12:22:00.023456 | 12:22:00.023456 | 100.0112 | | 2 | Euro | 114 | 2019-12-20 12:22:00 | Germany | 2019-12-20 12:22:00.023456 | 12:22:00.023456 | 100.0112 | | 3 | RMB | 16 | 2019-12-20 12:22:00 | China | 2019-12-20 12:22:00.023456 | 12:22:00.023456 | 100.0112 | | 4 | Yen |1 | 2019-12-20 12:22:00 | Japan | 2019-12-20 12:22:00.123456 | 12:22:00.123456 | 100.4112 | +-+---+--+-+-++-+--+{code} If user defined a jdbc table as dimension table like: {code:java} // public static final String mysqlCurrencyDDL = "CREATE TABLE currency (\n" + " currency_id BIGINT,\n" + " currency_name STRING,\n" + " rate DOUBLE,\n" + " currency_time TIMESTAMP(3),\n" + " country STRING,\n" + " timestamp6 TIMESTAMP(6),\n" + " time6 TIME(6),\n" + " gdp DECIMAL(10, 4)\n" + ") WITH (\n" + " 'connector.type' = 'jdbc',\n" + " 'connector.url' = 'jdbc:mysql://localhost:3306/test',\n" + " 'connector.username' = 'root'," + " 'connector.table' = 'currency',\n" + " 'connector.driver' = 'com.mysql.jdbc.Driver',\n" + " 'connector.lookup.cache.max-rows' = '500', \n" + " 'connector.lookup.cache.ttl' = '10s',\n" + " 'connector.lookup.max-retries' = '3'" + ")"; {code} User will get wrong value in column `timestamp6`,`time6`,`gdp`: {code:java} // c.currency_id, c.currency_name, c.rate, c.currency_time, c.country, c.timestamp6, c.time6, c.gdp 1,US Dollar,1020.0,2019-12-20T17:23,America,2019-12-20T12:22:00.023456,12:22,-0.0001 2,Euro,114.0,2019-12-20T12:22,Germany,2019-12-20T12:22:00.023456,12:22,-0.0001 4,Yen,1.0,2019-12-20T12:22,Japan,2019-12-20T12:22:00.123456,12:22,-0.0001{code} {code:java} public class JDBCSourceExample { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); EnvironmentSettings envSettings = EnvironmentSettings.newInstance() .useBlinkPlanner() .inStreamingMode() .build(); StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env, envSettings); String mysqlCurrencyDDL = "CREATE TABLE currency (\n" + " currency_id BIGINT,\n" + " currency_name STRING,\n" + " rate DOUBLE,\n" + " currency_time TIMESTAMP(3),\n" + " country STRING,\n" + " timestamp6 TIMESTAMP(6),\n" + " time6 TIME(6),\n" + " gdp DECIMAL(10, 4)\n" + ") WITH (\n" + " 'connector.type' = 'jdbc',\n" + " 'connector.url' = 'jdbc:mysql://localhost:3306/test',\n" + " 'connector.username' = 'root'," + " 'connector.table' = 'currency',\n" + " 'connector.driver' = 'com.mysql.jdbc.Driver',\n" + " 'connector.lookup.cache.max-rows' = '500', \n" + " 'connector.lookup.cache.ttl' = '10s',\n" + " 'connector.lookup.max-retries' = '3'" + ")"; tableEnvironment.sqlUpdate(mysqlCurrencyDDL); String querySQL = "select * from currency"; tableEnvironment.toAppendStream(tableEnvironment.sqlQuery(querySQL), Row.class).print(); tableEnvironment.execute("JdbcExample"); } } {code} was: A mysql table like: {code:java} // CREATE TABLE `currency` ( `currency_id` bigint(20) NOT NULL, `currency_name` varchar(200) DEFAULT NULL, `rate` double
[jira] [Updated] (FLINK-15445) JDBC Table Source didn't work for Types with precision (or/and scale)
[ https://issues.apache.org/jira/browse/FLINK-15445?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhenghua Gao updated FLINK-15445: - Description: {code:java} public class JDBCSourceExample { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); EnvironmentSettings envSettings = EnvironmentSettings.newInstance() .useBlinkPlanner() .inStreamingMode() .build(); StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env, envSettings); String mysqlCurrencyDDL = "CREATE TABLE currency (\n" + " currency_id BIGINT,\n" + " currency_name STRING,\n" + " rate DOUBLE,\n" + " currency_time TIMESTAMP(3),\n" + " country STRING,\n" + " timestamp6 TIMESTAMP(6),\n" + " time6 TIME(6),\n" + " gdp DECIMAL(10, 4)\n" + ") WITH (\n" + " 'connector.type' = 'jdbc',\n" + " 'connector.url' = 'jdbc:mysql://localhost:3306/test',\n" + " 'connector.username' = 'root'," + " 'connector.table' = 'currency',\n" + " 'connector.driver' = 'com.mysql.jdbc.Driver',\n" + " 'connector.lookup.cache.max-rows' = '500', \n" + " 'connector.lookup.cache.ttl' = '10s',\n" + " 'connector.lookup.max-retries' = '3'" + ")"; tableEnvironment.sqlUpdate(mysqlCurrencyDDL); String querySQL = "select * from currency"; tableEnvironment.toAppendStream(tableEnvironment.sqlQuery(querySQL), Row.class).print(); tableEnvironment.execute("JdbcExample"); } }{code} Throws Exception: Exception in thread "main" org.apache.flink.table.api.ValidationException: Type TIMESTAMP(6) of table field 'timestamp6' does not match with the physical type TIMESTAMP(3) of the 'timestamp9' field of the TableSource return type. was: {code:java} public class JDBCSourceExample { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); EnvironmentSettings envSettings = EnvironmentSettings.newInstance() .useBlinkPlanner() .inStreamingMode() .build(); StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env, envSettings); String mysqlCurrencyDDL = "CREATE TABLE currency (\n" + " currency_id BIGINT,\n" + " currency_name STRING,\n" + " rate DOUBLE,\n" + " currency_time TIMESTAMP(3),\n" + " country STRING,\n" + " timestamp6 TIMESTAMP(6),\n" + " time6 TIME(6),\n" + " gdp DECIMAL(10, 4)\n" + ") WITH (\n" + " 'connector.type' = 'jdbc',\n" + " 'connector.url' = 'jdbc:mysql://localhost:3306/test',\n" + " 'connector.username' = 'root'," + " 'connector.table' = 'currency',\n" + " 'connector.driver' = 'com.mysql.jdbc.Driver',\n" + " 'connector.lookup.cache.max-rows' = '500', \n" + " 'connector.lookup.cache.ttl' = '10s',\n" + " 'connector.lookup.max-retries' = '3'" + ")"; tableEnvironment.sqlUpdate(mysqlCurrencyDDL); String querySQL = "select * from currency"; tableEnvironment.toAppendStream(tableEnvironment.sqlQuery(querySQL), Row.class).print(); tableEnvironment.execute("JdbcExample"); } }{code} > JDBC Table Source didn't work for Types with precision (or/and scale) > - > > Key: FLINK-15445 > URL: https://issues.apache.org/jira/browse/FLINK-15445 > Project: Flink > Issue Type: Bug > Components: Connectors / JDBC >Affects Versions: 1.10.0 >Reporter: Zhenghua Gao >Priority: Major > Fix For: 1.10.0 > > > {code:java} > public class JDBCSourceExample { > public static void main(String[] args) throws Exception { > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.setParallelism(1); > EnvironmentSettings envSettings = EnvironmentSettings.newInstance() > .useBlinkPlanner() > .inStreamingMode() > .build(); > StreamTableEnvironment tableEnvironment = > StreamTableEnvironment.create(env, envSettings); >
[jira] [Updated] (FLINK-15445) JDBC Table Source didn't work for Types with precision (or/and scale)
[ https://issues.apache.org/jira/browse/FLINK-15445?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhenghua Gao updated FLINK-15445: - Description: {code:java} public class JDBCSourceExample { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); EnvironmentSettings envSettings = EnvironmentSettings.newInstance() .useBlinkPlanner() .inStreamingMode() .build(); StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env, envSettings); String mysqlCurrencyDDL = "CREATE TABLE currency (\n" + " currency_id BIGINT,\n" + " currency_name STRING,\n" + " rate DOUBLE,\n" + " currency_time TIMESTAMP(3),\n" + " country STRING,\n" + " timestamp6 TIMESTAMP(6),\n" + " time6 TIME(6),\n" + " gdp DECIMAL(10, 4)\n" + ") WITH (\n" + " 'connector.type' = 'jdbc',\n" + " 'connector.url' = 'jdbc:mysql://localhost:3306/test',\n" + " 'connector.username' = 'root'," + " 'connector.table' = 'currency',\n" + " 'connector.driver' = 'com.mysql.jdbc.Driver',\n" + " 'connector.lookup.cache.max-rows' = '500', \n" + " 'connector.lookup.cache.ttl' = '10s',\n" + " 'connector.lookup.max-retries' = '3'" + ")"; tableEnvironment.sqlUpdate(mysqlCurrencyDDL); String querySQL = "select * from currency"; tableEnvironment.toAppendStream(tableEnvironment.sqlQuery(querySQL), Row.class).print(); tableEnvironment.execute("JdbcExample"); } }{code} was: {code:java} public class JDBCSourceExample { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); EnvironmentSettings envSettings = EnvironmentSettings.newInstance() .useBlinkPlanner() .inStreamingMode() .build(); StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env, envSettings); String mysqlCurrencyDDL = "CREATE TABLE currency (\n" + " currency_id BIGINT,\n" + " currency_name STRING,\n" + " rate DOUBLE,\n" + " currency_time TIMESTAMP(3),\n" + " country STRING,\n" + " timestamp6 TIMESTAMP(6),\n" + " time6 TIME(6),\n" + " gdp DECIMAL(10, 4)\n" + ") WITH (\n" + " 'connector.type' = 'jdbc',\n" + " 'connector.url' = 'jdbc:mysql://localhost:3306/test',\n" + " 'connector.username' = 'root'," + " 'connector.table' = 'currency',\n" + " 'connector.driver' = 'com.mysql.jdbc.Driver',\n" + " 'connector.lookup.cache.max-rows' = '500', \n" + " 'connector.lookup.cache.ttl' = '10s',\n" + " 'connector.lookup.max-retries' = '3'" + ")"; tableEnvironment.sqlUpdate(mysqlCurrencyDDL); String querySQL = "select * from currency"; tableEnvironment.toAppendStream(tableEnvironment.sqlQuery(querySQL), Row.class).print(); tableEnvironment.execute("JdbcExample"); } } {code} > JDBC Table Source didn't work for Types with precision (or/and scale) > - > > Key: FLINK-15445 > URL: https://issues.apache.org/jira/browse/FLINK-15445 > Project: Flink > Issue Type: Bug > Components: Connectors / JDBC >Affects Versions: 1.10.0 >Reporter: Zhenghua Gao >Priority: Major > Fix For: 1.10.0 > > > {code:java} > public class JDBCSourceExample { > public static void main(String[] args) throws Exception { > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.setParallelism(1); > EnvironmentSettings envSettings = EnvironmentSettings.newInstance() > .useBlinkPlanner() > .inStreamingMode() > .build(); > StreamTableEnvironment tableEnvironment = > StreamTableEnvironment.create(env, envSettings); > String mysqlCurrencyDDL = "CREATE TABLE currency (\n" + > " currency_id BIGINT,\n" + > " currency_name STRING,\n" + > " rate DOUBLE,\n" + > " currency_time TIMESTAMP(3),\n" + > " country STRING,\n" + > " timestamp6 TIMESTAMP(6),\n" + > " time6 TIME(6),\n" + > " gdp DECIMAL(10, 4)\n" + > ") WITH (\n" + > " 'connector.type' = 'jdbc',\n" + > " 'connector.url' = 'jdbc:mysql://localhost:3306/test',\n" + > " 'connector.username' = 'root'," + > "
[jira] [Updated] (FLINK-15445) JDBC Table Source didn't work for Types with precision (or/and scale)
[ https://issues.apache.org/jira/browse/FLINK-15445?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhenghua Gao updated FLINK-15445: - Description: {code:java} public class JDBCSourceExample { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); EnvironmentSettings envSettings = EnvironmentSettings.newInstance() .useBlinkPlanner() .inStreamingMode() .build(); StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env, envSettings); String mysqlCurrencyDDL = "CREATE TABLE currency (\n" + " currency_id BIGINT,\n" + " currency_name STRING,\n" + " rate DOUBLE,\n" + " currency_time TIMESTAMP(3),\n" + " country STRING,\n" + " timestamp6 TIMESTAMP(6),\n" + " time6 TIME(6),\n" + " gdp DECIMAL(10, 4)\n" + ") WITH (\n" + " 'connector.type' = 'jdbc',\n" + " 'connector.url' = 'jdbc:mysql://localhost:3306/test',\n" + " 'connector.username' = 'root'," + " 'connector.table' = 'currency',\n" + " 'connector.driver' = 'com.mysql.jdbc.Driver',\n" + " 'connector.lookup.cache.max-rows' = '500', \n" + " 'connector.lookup.cache.ttl' = '10s',\n" + " 'connector.lookup.max-retries' = '3'" + ")"; tableEnvironment.sqlUpdate(mysqlCurrencyDDL); String querySQL = "select * from currency"; tableEnvironment.toAppendStream(tableEnvironment.sqlQuery(querySQL), Row.class).print(); tableEnvironment.execute("JdbcExample"); } } {code} > JDBC Table Source didn't work for Types with precision (or/and scale) > - > > Key: FLINK-15445 > URL: https://issues.apache.org/jira/browse/FLINK-15445 > Project: Flink > Issue Type: Bug > Components: Connectors / JDBC >Affects Versions: 1.10.0 >Reporter: Zhenghua Gao >Priority: Major > Fix For: 1.10.0 > > > {code:java} > public class JDBCSourceExample { public static void main(String[] args) > throws Exception { StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); > EnvironmentSettings envSettings = EnvironmentSettings.newInstance() > .useBlinkPlanner() .inStreamingMode() .build(); StreamTableEnvironment > tableEnvironment = StreamTableEnvironment.create(env, envSettings); String > mysqlCurrencyDDL = "CREATE TABLE currency (\n" + " currency_id BIGINT,\n" + " > currency_name STRING,\n" + " rate DOUBLE,\n" + " currency_time > TIMESTAMP(3),\n" + " country STRING,\n" + " timestamp6 TIMESTAMP(6),\n" + " > time6 TIME(6),\n" + " gdp DECIMAL(10, 4)\n" + ") WITH (\n" + " > 'connector.type' = 'jdbc',\n" + " 'connector.url' = > 'jdbc:mysql://localhost:3306/test',\n" + " 'connector.username' = 'root'," + > " 'connector.table' = 'currency',\n" + " 'connector.driver' = > 'com.mysql.jdbc.Driver',\n" + " 'connector.lookup.cache.max-rows' = '500', > \n" + " 'connector.lookup.cache.ttl' = '10s',\n" + " > 'connector.lookup.max-retries' = '3'" + ")"; > tableEnvironment.sqlUpdate(mysqlCurrencyDDL); String querySQL = "select * > from currency"; > tableEnvironment.toAppendStream(tableEnvironment.sqlQuery(querySQL), > Row.class).print(); tableEnvironment.execute("JdbcExample"); } } > {code} > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-15379) JDBC connector return wrong value if defined dataType contains precision
[ https://issues.apache.org/jira/browse/FLINK-15379?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17006047#comment-17006047 ] Zhenghua Gao commented on FLINK-15379: -- [~jark] the issue can't reproduce after FLINK-15168, instead, the ValidationException appears. And I opened a ticket(FLINK-15445) to track the ValidationException issue. > JDBC connector return wrong value if defined dataType contains precision > > > Key: FLINK-15379 > URL: https://issues.apache.org/jira/browse/FLINK-15379 > Project: Flink > Issue Type: Bug > Components: Connectors / JDBC, Table SQL / Planner >Affects Versions: 1.10.0 >Reporter: Leonard Xu >Priority: Major > > A mysql table like: > > {code:java} > // CREATE TABLE `currency` ( > `currency_id` bigint(20) NOT NULL, > `currency_name` varchar(200) DEFAULT NULL, > `rate` double DEFAULT NULL, > `currency_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP, > `country` varchar(100) DEFAULT NULL, > `timestamp6` timestamp(6) NULL DEFAULT NULL, > `time6` time(6) DEFAULT NULL, > `gdp` decimal(10,4) DEFAULT NULL, > PRIMARY KEY (`currency_id`) > ) ENGINE=InnoDB DEFAULT CHARSET=utf8 > +-+---+--+-+-++-+--+ > | currency_id | currency_name | rate | currency_time | country | > timestamp6 | time6 | gdp | > +-+---+--+-+-++-+--+ > | 1 | US Dollar | 1020 | 2019-12-20 17:23:00 | America | > 2019-12-20 12:22:00.023456 | 12:22:00.023456 | 100.0112 | > | 2 | Euro | 114 | 2019-12-20 12:22:00 | Germany | > 2019-12-20 12:22:00.023456 | 12:22:00.023456 | 100.0112 | > | 3 | RMB | 16 | 2019-12-20 12:22:00 | China | > 2019-12-20 12:22:00.023456 | 12:22:00.023456 | 100.0112 | > | 4 | Yen |1 | 2019-12-20 12:22:00 | Japan | > 2019-12-20 12:22:00.123456 | 12:22:00.123456 | 100.4112 | > +-+---+--+-+-++-+--+{code} > > If user defined a jdbc table as dimension table like: > > {code:java} > // > public static final String mysqlCurrencyDDL = "CREATE TABLE currency (\n" + > " currency_id BIGINT,\n" + > " currency_name STRING,\n" + > " rate DOUBLE,\n" + > " currency_time TIMESTAMP(3),\n" + > " country STRING,\n" + > " timestamp6 TIMESTAMP(6),\n" + > " time6 TIME(6),\n" + > " gdp DECIMAL(10, 4)\n" + > ") WITH (\n" + > " 'connector.type' = 'jdbc',\n" + > " 'connector.url' = 'jdbc:mysql://localhost:3306/test',\n" + > " 'connector.username' = 'root'," + > " 'connector.table' = 'currency',\n" + > " 'connector.driver' = 'com.mysql.jdbc.Driver',\n" + > " 'connector.lookup.cache.max-rows' = '500', \n" + > " 'connector.lookup.cache.ttl' = '10s',\n" + > " 'connector.lookup.max-retries' = '3'" + > ")"; > {code} > > User will get wrong value in column `timestamp6`,`time6`,`gdp`: > {code:java} > // c.currency_id, c.currency_name, c.rate, c.currency_time, c.country, > c.timestamp6, c.time6, c.gdp > 1,US > Dollar,1020.0,2019-12-20T17:23,America,2019-12-20T12:22:00.023456,12:22,-0.0001 > 2,Euro,114.0,2019-12-20T12:22,Germany,2019-12-20T12:22:00.023456,12:22,-0.0001 > 4,Yen,1.0,2019-12-20T12:22,Japan,2019-12-20T12:22:00.123456,12:22,-0.0001{code} > > > {code:java} > public class JDBCSourceExample { > public static void main(String[] args) throws Exception { > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.setParallelism(1); > EnvironmentSettings envSettings = EnvironmentSettings.newInstance() > .useBlinkPlanner() > .inStreamingMode() > .build(); > StreamTableEnvironment tableEnvironment = > StreamTableEnvironment.create(env, envSettings); > String mysqlCurrencyDDL = "CREATE TABLE currency (\n" + > " currency_id BIGINT,\n" + > " currency_name STRING,\n" + > " rate DOUBLE,\n" + > " currency_time TIMESTAMP(3),\n" + > " country STRING,\n" + > " timestamp6 TIMESTAMP(6),\n" + > " time6 TIME(6),\n" + > " gdp DECIMAL(10, 4)\n" + > ") WITH (\n" + > " 'connector.type' = 'jdbc',\n" + > " 'connector.url' = 'jdbc:mysql://localhost:3306/test',\n" + >
[jira] [Created] (FLINK-15445) JDBC Table Source didn't work for Types with precision (or/and scale)
Zhenghua Gao created FLINK-15445: Summary: JDBC Table Source didn't work for Types with precision (or/and scale) Key: FLINK-15445 URL: https://issues.apache.org/jira/browse/FLINK-15445 Project: Flink Issue Type: Bug Components: Connectors / JDBC Affects Versions: 1.10.0 Reporter: Zhenghua Gao -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-15379) JDBC connector return wrong value if defined dataType contains precision
[ https://issues.apache.org/jira/browse/FLINK-15379?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17005998#comment-17005998 ] Zhenghua Gao commented on FLINK-15379: -- Since FLINK-15168 modified logic of computing physical indices, the query in description would failed in validation phase: Exception in thread "main" org.apache.flink.table.api.ValidationException: Type TIMESTAMP(6) of table field 'timestamp6' does not match with the physical type TIMESTAMP(3) of the 'timestamp9' field of the TableSource return type. The root cause is the JDBC table source should implement *getProducedDataType* and return proper types with precision. > JDBC connector return wrong value if defined dataType contains precision > > > Key: FLINK-15379 > URL: https://issues.apache.org/jira/browse/FLINK-15379 > Project: Flink > Issue Type: Bug > Components: Connectors / JDBC, Table SQL / Planner >Affects Versions: 1.10.0 >Reporter: Leonard Xu >Priority: Major > Fix For: 1.10.0 > > > A mysql table like: > > {code:java} > // CREATE TABLE `currency` ( > `currency_id` bigint(20) NOT NULL, > `currency_name` varchar(200) DEFAULT NULL, > `rate` double DEFAULT NULL, > `currency_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP, > `country` varchar(100) DEFAULT NULL, > `timestamp6` timestamp(6) NULL DEFAULT NULL, > `time6` time(6) DEFAULT NULL, > `gdp` decimal(10,4) DEFAULT NULL, > PRIMARY KEY (`currency_id`) > ) ENGINE=InnoDB DEFAULT CHARSET=utf8 > +-+---+--+-+-++-+--+ > | currency_id | currency_name | rate | currency_time | country | > timestamp6 | time6 | gdp | > +-+---+--+-+-++-+--+ > | 1 | US Dollar | 1020 | 2019-12-20 17:23:00 | America | > 2019-12-20 12:22:00.023456 | 12:22:00.023456 | 100.0112 | > | 2 | Euro | 114 | 2019-12-20 12:22:00 | Germany | > 2019-12-20 12:22:00.023456 | 12:22:00.023456 | 100.0112 | > | 3 | RMB | 16 | 2019-12-20 12:22:00 | China | > 2019-12-20 12:22:00.023456 | 12:22:00.023456 | 100.0112 | > | 4 | Yen |1 | 2019-12-20 12:22:00 | Japan | > 2019-12-20 12:22:00.123456 | 12:22:00.123456 | 100.4112 | > +-+---+--+-+-++-+--+{code} > > If user defined a jdbc table as dimension table like: > > {code:java} > // > public static final String mysqlCurrencyDDL = "CREATE TABLE currency (\n" + > " currency_id BIGINT,\n" + > " currency_name STRING,\n" + > " rate DOUBLE,\n" + > " currency_time TIMESTAMP(3),\n" + > " country STRING,\n" + > " timestamp6 TIMESTAMP(6),\n" + > " time6 TIME(6),\n" + > " gdp DECIMAL(10, 4)\n" + > ") WITH (\n" + > " 'connector.type' = 'jdbc',\n" + > " 'connector.url' = 'jdbc:mysql://localhost:3306/test',\n" + > " 'connector.username' = 'root'," + > " 'connector.table' = 'currency',\n" + > " 'connector.driver' = 'com.mysql.jdbc.Driver',\n" + > " 'connector.lookup.cache.max-rows' = '500', \n" + > " 'connector.lookup.cache.ttl' = '10s',\n" + > " 'connector.lookup.max-retries' = '3'" + > ")"; > {code} > > User will get wrong value in column `timestamp6`,`time6`,`gdp`: > {code:java} > // c.currency_id, c.currency_name, c.rate, c.currency_time, c.country, > c.timestamp6, c.time6, c.gdp > 1,US > Dollar,1020.0,2019-12-20T17:23,America,2019-12-20T12:22:00.023456,12:22,-0.0001 > 2,Euro,114.0,2019-12-20T12:22,Germany,2019-12-20T12:22:00.023456,12:22,-0.0001 > 4,Yen,1.0,2019-12-20T12:22,Japan,2019-12-20T12:22:00.123456,12:22,-0.0001{code} > > > {code:java} > public class JDBCSourceExample { > public static void main(String[] args) throws Exception { > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.setParallelism(1); > EnvironmentSettings envSettings = EnvironmentSettings.newInstance() > .useBlinkPlanner() > .inStreamingMode() > .build(); > StreamTableEnvironment tableEnvironment = > StreamTableEnvironment.create(env, envSettings); > String mysqlCurrencyDDL = "CREATE TABLE currency (\n" + > " currency_id BIGINT,\n" + > " currency_name STRING,\n" + > " rate DOUBLE,\n" + > " currency_time TIMESTAMP(3),\n" + >