Pietro created FLINK-35053:
------------------------------

             Summary: TIMESTAMP with TIME ZONE not supported by JDBC connector 
for Postgres
                 Key: FLINK-35053
                 URL: https://issues.apache.org/jira/browse/FLINK-35053
             Project: Flink
          Issue Type: Bug
          Components: Connectors / JDBC
    Affects Versions: jdbc-3.1.2, 1.18.1, 1.19.0
            Reporter: Pietro


The JDBC sink for Postgres does not support {{{}TIMESTAMP WITH TIME ZONE{}}}, 
nor {{TIMESTAMP_LTZ}} types.

Related issues: FLINK-22199, FLINK-20869
h2. Problem Explanation

A Postgres {{target_table}} has a field {{tm_tz}} of type {{timestamptz}} .
{code:sql}
-- Postgres DDL
CREATE TABLE target_table (
    tm_tz TIMESTAMP WITH TIME ZONE
)
{code}
In Flink we have a table with a column of type {{{}TIMESTAMP_LTZ(6){}}}, and 
our goal is to sink it to {{{}target_table{}}}.
{code:sql}
-- Flink DDL
CREATE TABLE sink (
    tm_tz TIMESTAMP_LTZ(6)
) WITH (
    'connector' = 'jdbc',
    'table-name' = 'target_table'
    ...
)
{code}
According to 
[AbstractPostgresCompatibleDialect.supportedTypes()|https://github.com/apache/flink-connector-jdbc/blob/7025642d88ff661e486745b23569595e1813a1d0/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/AbstractPostgresCompatibleDialect.java#L109],
 {{TIMESTAMP_WITH_LOCAL_TIME_ZONE}} is supported, while 
{{TIMESTAMP_WITH_TIME_ZONE}} is not.

However, when the converter is created via 
[AbstractJdbcRowConverter.createInternalConverter()|https://github.com/apache/flink-connector-jdbc/blob/7025642d88ff661e486745b23569595e1813a1d0/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/converter/AbstractJdbcRowConverter.java#L132],
 it throws an {{UnsupportedOperationException}} since 
{{TIMESTAMP_WITH_LOCAL_TIME_ZONE}} is *not* among the available types, while 
[{{TIMESTAMP_WITH_TIME_ZONE}}|https://github.com/apache/flink-connector-jdbc/blob/7025642d88ff661e486745b23569595e1813a1d0/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/converter/AbstractJdbcRowConverter.java#L168]
 is.
{code:java}
Exception in thread "main" java.lang.UnsupportedOperationException: Unsupported 
type:TIMESTAMP_LTZ(6)
        at 
org.apache.flink.connector.jdbc.converter.AbstractJdbcRowConverter.createInternalConverter(AbstractJdbcRowConverter.java:186)
        at 
org.apache.flink.connector.jdbc.databases.postgres.dialect.PostgresRowConverter.createPrimitiveConverter(PostgresRowConverter.java:99)
        at 
org.apache.flink.connector.jdbc.databases.postgres.dialect.PostgresRowConverter.createInternalConverter(PostgresRowConverter.java:58)
        at 
org.apache.flink.connector.jdbc.converter.AbstractJdbcRowConverter.createNullableInternalConverter(AbstractJdbcRowConverter.java:118)
        at 
org.apache.flink.connector.jdbc.converter.AbstractJdbcRowConverter.<init>(AbstractJdbcRowConverter.java:68)
        at 
org.apache.flink.connector.jdbc.databases.postgres.dialect.PostgresRowConverter.<init>(PostgresRowConverter.java:47)
        at 
org.apache.flink.connector.jdbc.databases.postgres.dialect.PostgresDialect.getRowConverter(PostgresDialect.java:51)
        at 
org.apache.flink.connector.jdbc.table.JdbcDynamicTableSource.getScanRuntimeProvider(JdbcDynamicTableSource.java:184)
        at 
org.apache.flink.table.planner.connectors.DynamicSourceUtils.validateScanSource(DynamicSourceUtils.java:478)
        at 
org.apache.flink.table.planner.connectors.DynamicSourceUtils.prepareDynamicSource(DynamicSourceUtils.java:161)
        at 
org.apache.flink.table.planner.connectors.DynamicSourceUtils.convertSourceToRel(DynamicSourceUtils.java:125)
        at 
org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.java:118)
        at 
org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:4002)
        at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2872)
        at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2432)
        at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2346)
        at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2291)
        at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:728)
        at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:714)
        at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3848)
        at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:618)
        at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:229)
        at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:205)
        at 
org.apache.flink.table.planner.operations.SqlNodeConvertContext.toRelRoot(SqlNodeConvertContext.java:69)
        at 
org.apache.flink.table.planner.operations.converters.SqlQueryConverter.convertSqlNode(SqlQueryConverter.java:48)
        at 
org.apache.flink.table.planner.operations.converters.SqlNodeConverters.convertSqlNode(SqlNodeConverters.java:73)
        at 
org.apache.flink.table.planner.operations.SqlNodeToOperationConversion.convertValidatedSqlNode(SqlNodeToOperationConversion.java:272)
        at 
org.apache.flink.table.planner.operations.SqlNodeToOperationConversion.convert(SqlNodeToOperationConversion.java:262)
        at 
org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:106)
        at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:708)
{code}
h3. Using TIMESTAMP WITH TIME ZONE

Defining {{tm_tz}} in Flink as {{TIMESTAMP(6) WITH TIME ZONE}} instead of 
{{TIMESTAMP_LTZ(6)}} does not solve the issue, and returns the following error 
instead:
{code:java}
Exception in thread "main" org.apache.flink.table.api.SqlParserException: SQL 
parse failed. Encountered "TIME" at line 1, column 66.
Was expecting:
    "LOCAL" ...
    
        at 
org.apache.flink.table.planner.parse.CalciteParser.parseSqlList(CalciteParser.java:81)
        at 
org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:102)
        at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:728)
{code}
h3. Using TIMESTAMP

Defining {{tm_tz}} in Flink as {{TIMESTAMP(6)}} can lead to potentially 
incorrect time zone conversions because Postgres interprets timestamps without 
an explicit time zone indication as if they were in the in the system's time 
zone (reference: [Postgres 
Documentation|https://www.postgresql.org/docs/current/datatype-datetime.html#DATATYPE-DATETIME-INPUT-TIME-STAMPS]).

For instance, assume that Postgres is in GMT+2 time zone. If in Flink we have 
the timestamp {{'2024-04-08 00:00:00'}} (UTC), then, when inserted into 
Postgres, it gets interpreted as if it is in GMT+2 and stored as 
{{{}'2024-04-07 22:00:00+00'{}}}.

To ensure that Postgres interprets it correctly, it should be passed by Flink 
as {{'2024-04-08 00:00:00+00'}} (notice the trailing {{{}'+00'{}}})



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to