[ 
https://issues.apache.org/jira/browse/KAFKA-14665?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fuxin Hao updated KAFKA-14665:
------------------------------
    Description: 
I'm using {{`io.debezium.connector.postgresql.PostgresConnector}} and 
{{io.confluent.connect.jdbc.JdbcSinkConnector`}} to sync data between two 
PostgreSQL databases. And I set `{{{}time.precision.mode=adaptive`{}}} in 
[Debezium 
config|https://debezium.io/documentation/reference/stable/connectors/postgresql.html#postgresql-temporal-types].
 which would serialize PostgreSQL time data type to {{Integer}} or {{Long}} and 
it's incompatible with {{{}JdbcSinkConnector{}}}. So I wrote an SMT to 
transform these data from numeric types to strings.

 

Say I have the following table:
{code:java}
CREATE TABLE pk_created_at (
    created_at timestamp without time zone DEFAULT current_timestamp not null,
    PRIMARY KEY (created_at)
);
insert into pk_created_at values(current_timestamp); {code}
 

My source connector configuration:
{code:java}
{
    "name": "test-connector",
    "config": {
        "snapshot.mode": "always",
        "plugin.name": "pgoutput",
        "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
        "tasks.max": "1",
        "database.hostname": "source",
        "database.port": "5432",
        "database.user": "postgres",
        "database.password": "postgres",
        "database.dbname" : "test",
        "database.server.name": "test",
        "slot.name" : "test",
        "key.converter": "org.apache.kafka.connect.json.JsonConverter",
        "key.converter.schemas.enabled": true,
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "value.converter.schemas.enabled": true,
        "decimal.handling.mode": "string",
        "time.precision.mode": "adaptive",
        "transforms": "unwrap",
        "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState"
    }
} {code}
 

And the messages in kafka topic `{{{}test.public.pk_created_at`{}}} would be:
{code:java}
# bin/kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic 
test.public.pk_created_at --from-beginning { "schema":{ "type":"struct", 
"fields":[ { "type":"int64", "optional":false, 
"name":"io.debezium.time.MicroTimestamp", "version":1, "field":"created_at" } 
], "optional":false, "name":"test.public.pk_created_at.Value" }, "payload":{ 
"created_at":1669354751764130 } }{code}
 

After applying my SMT, the messages would be like this:
{code:java}
# bin/kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic 
test.public.pk_created_at --from-beginning { "schema":{ "type":"struct", 
"fields":[ { "type":"string", "optional":true, "field":"created_at" } ], 
"optional":false, "name":"test.public.pk_created_at.Value" }, "payload":{ 
"created_at":"2022-11-25T05:39:11.764130Z" } }{code}
{{ }}

It worke[d great if 
|https://github.com/FX-HAO/kafka-connect-debezium-tranforms/blob/master/src/main/java/com/github/haofuxin/kafka/connect/DebeziumTimestampConverter.java]{{`created_at`}}
 is not a part of primary keys. No error occurred. But the primary keys on some 
of my tables are composed of `{{{}id`{}}} and `{{{}created_at`{}}} like this: 
`{{{}PRIMARY KEY (id, created_at)`{}}}. Then it raised an exception in 
`{{{}JdbcSinkConnector`{}}} as below:
{code:java}
2022-11-25 06:57:01,450 INFO || Attempting to open connection #1 to PostgreSql 
[io.confluent.connect.jdbc.util.CachedConnectionProvider] 2022-11-25 
06:57:01,459 INFO || Maximum table name length for database is 63 bytes 
[io.confluent.connect.jdbc.dialect.PostgreSqlDatabaseDialect] 2022-11-25 
06:57:01,459 INFO || JdbcDbWriter Connected 
[io.confluent.connect.jdbc.sink.JdbcDbWriter] 2022-11-25 06:57:01,472 INFO || 
Checking PostgreSql dialect for existence of TABLE "pk_created_at" 
[io.confluent.connect.jdbc.dialect.GenericDatabaseDialect] 2022-11-25 
06:57:01,484 INFO || Using PostgreSql dialect TABLE "pk_created_at" present 
[io.confluent.connect.jdbc.dialect.GenericDatabaseDialect] 2022-11-25 
06:57:01,505 INFO || Checking PostgreSql dialect for type of TABLE 
"pk_created_at" [io.confluent.connect.jdbc.dialect.GenericDatabaseDialect] 
2022-11-25 06:57:01,508 INFO || Setting metadata for table "pk_created_at" to 
Table{name='"pk_created_at"', type=TABLE columns=[Column{'created_at', 
isPrimaryKey=true, allowsNull=false, sqlType=timestamp}]} 
[io.confluent.connect.jdbc.util.TableDefinitions] 2022-11-25 06:57:01,510 WARN 
|| Write of 2 records failed, remainingRetries=0 
[io.confluent.connect.jdbc.sink.JdbcSinkTask] java.sql.BatchUpdateException: 
Batch entry 0 INSERT INTO "pk_created_at" ("created_at") VALUES 
(1669359291990398) ON CONFLICT ("created_at") DO NOTHING was aborted: ERROR: 
column "created_at" is of type timestamp without time zone but expression is of 
type bigint Hint: You will need to rewrite or cast the expression. Position: 52 
Call getNextException to see other errors in the batch. at 
org.postgresql.jdbc.BatchResultHandler.handleError(BatchResultHandler.java:165) 
at org.postgresql.jdbc.PgStatement.internalExecuteBatch(PgStatement.java:871) 
at org.postgresql.jdbc.PgStatement.executeBatch(PgStatement.java:910) at 
org.postgresql.jdbc.PgPreparedStatement.executeBatch(PgPreparedStatement.java:1638)
 at 
io.confluent.connect.jdbc.sink.BufferedRecords.executeUpdates(BufferedRecords.java:196)
 at 
io.confluent.connect.jdbc.sink.BufferedRecords.flush(BufferedRecords.java:186) 
at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:80) at 
io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:84) at 
org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:581)
 at 
org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:333) 
at 
org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234)
 at 
org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203)
 at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188) at 
org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243) at 
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
 at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
 at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
 at java.base/java.lang.Thread.run(Thread.java:829) Caused by: 
org.postgresql.util.PSQLException: ERROR: column "created_at" is of type 
timestamp without time zone but expression is of type bigint Hint: You will 
need to rewrite or cast the expression. Position: 52 at 
org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2675)
 at 
org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2365)
 at 
org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:355) at 
org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:315) at 
org.postgresql.jdbc.PgStatement.internalExecuteBatch(PgStatement.java:868) ... 
17 more{code}
 

The error seems like the sink connector was still trying to insert 
`{{{}created_at`{}}} with a numeric `{{{}1669359291990398`{}}}. but I verified 
that the messages in the kafka topic have been transformed into strings. It 
worked if `{{{}created_at`{}}} is not a primary key.

 

[My 
SMT|https://github.com/FX-HAO/kafka-connect-debezium-tranforms/blob/master/src/main/java/com/github/haofuxin/kafka/connect/DebeziumTimestampConverter.java]:
 
{code:java}
public class DebeziumTimestampConverter<R extends ConnectRecord<R>> implements 
Transformation<R> {
    private static final Logger LOG = 
LoggerFactory.getLogger(DebeziumTimestampConverter.class);
    private Cache<Schema, Schema> schemaUpdateCache;
    private static final String PURPOSE = "convert 
io.debezium.time.MicroTimestamp into String";
    @Override
    public void configure(Map<String, ?> props) {
        schemaUpdateCache = new SynchronizedCache<>(new LRUCache<Schema, 
Schema>(16));
    }
    @Override
    public ConfigDef config() {
        return new ConfigDef();
    }
    @Override
    public void close() {
    }
    protected Schema operatingSchema(R record) {
        return record.valueSchema();
    }
    protected Object operatingValue(R record) {
        return record.value();
    }
    private String formatDate(Integer epoch) {
        if (epoch == null) {
            return "";
        }
        LocalDate d = LocalDate.ofEpochDay(epoch);
        return d.toString();
    }
    private String formatTime(Integer epoch) {
        if (epoch == null) {
            return "";
        }
        java.util.Date date = new java.util.Date(epoch);
        return new SimpleDateFormat("HH:mm:ss.SSS").format(date);
    }
    private String formatMicroTime(Long epochMicroSeconds) {
        if (epochMicroSeconds == null) {
            return "";
        }
        DateTimeFormatter formatter = 
DateTimeFormatter.ofPattern("HH:mm:ss.SSSSSS").withZone(ZoneId.from(ZoneOffset.UTC));
        long epochSeconds = epochMicroSeconds / 1000000L;
        long nanoOffset = ( epochMicroSeconds % 1000000L ) * 1000L ;
        Instant instant = Instant.ofEpochSecond( epochSeconds, nanoOffset );
        return formatter.format(instant);
    }
    private String formatTimestamp(Long epochMilliSeconds) {
        if (epochMilliSeconds == null) {
            return "";
        }
        DateTimeFormatter formatter = 
DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'").withZone(ZoneId.from(ZoneOffset.UTC));
        Instant instant = Instant.ofEpochMilli( epochMilliSeconds );
        return formatter.format(instant);
    }
    private String formatMicroTimestamp(Long epochMicroSeconds) {
        if (epochMicroSeconds == null) {
            return "";
        }
        long epochSeconds = epochMicroSeconds / 1000000L;
        long nanoOffset = ( epochMicroSeconds % 1000000L ) * 1000L ;
        Instant instant = Instant.ofEpochSecond( epochSeconds, nanoOffset );
        return instant.toString();
    }
    private Schema makeUpdatedSchema(Schema schema) {
        final SchemaBuilder builder = SchemaUtil.copySchemaBasics(schema, 
SchemaBuilder.struct());
        for (Field field: schema.fields()) {
            if (field.schema().type() != Schema.Type.STRING && (
                    MicroTimestamp.SCHEMA_NAME.equals(field.schema().name()) ||
                    Date.SCHEMA_NAME.equals(field.schema().name()) ||
                    Time.SCHEMA_NAME.equals(field.schema().name()) ||
                    MicroTime.SCHEMA_NAME.equals(field.schema().name()) ||
                    Timestamp.SCHEMA_NAME.equals(field.schema().name()))) {
                builder.field(field.name(), Schema.OPTIONAL_STRING_SCHEMA);
            } else {
                builder.field(field.name(), field.schema());
            }
        }
        return builder.build();
    }
    protected R newRecord(R record, Schema updatedSchema, Object updatedValue) {
        return record.newRecord(
                record.topic(),
                record.kafkaPartition(),
                record.keySchema(),
                record.key(),
                updatedSchema,
                updatedValue,
                record.timestamp()
        );
    }
    private R applyWithSchema(R r) {
        final Struct struct = requireStruct(operatingValue(r), PURPOSE);
        Schema updatedSchema = schemaUpdateCache.get(struct.schema());
        if(updatedSchema == null) {
            updatedSchema = makeUpdatedSchema(struct.schema());
            schemaUpdateCache.put(struct.schema(), updatedSchema);
        }
        final Struct updatedValue = new Struct(updatedSchema);
        for (Field field : struct.schema().fields()) {
            if (field.schema().type() != Schema.Type.STRING && 
field.schema().name() != null) {
                switch (field.schema().name()) {
                    case Date.SCHEMA_NAME:
                        Object value = struct.get(field);
                        if (value == null) {
                            updatedValue.put(field.name(), null);
                            continue;
                        }
                        if (value instanceof Integer) {
                            updatedValue.put(field.name(), 
formatDate((Integer)value));
                        } else {
                            updatedValue.put(field.name(), value);
                        }
                        break;
                    case Time.SCHEMA_NAME:
                        value = struct.get(field);
                        if (value == null) {
                            updatedValue.put(field.name(), null);
                            continue;
                        }
                        if (value instanceof Integer) {
                            updatedValue.put(field.name(), 
formatTime((Integer)value));
                        } else {
                            updatedValue.put(field.name(), value);
                        }
                        break;
                    case MicroTime.SCHEMA_NAME:
                        value = struct.get(field);
                        if (value == null) {
                            updatedValue.put(field.name(), null);
                            continue;
                        }
                        if (value instanceof Long) {
                            updatedValue.put(field.name(), 
formatMicroTime((Long)value));
                        } else {
                            updatedValue.put(field.name(), value);
                        }
                        break;
                    case Timestamp.SCHEMA_NAME:
                        value = struct.get(field);
                        if (value == null) {
                            updatedValue.put(field.name(), null);
                            continue;
                        }
                        if (value instanceof Long) {
                            updatedValue.put(field.name(), 
formatTimestamp((Long)value));
                        } else {
                            updatedValue.put(field.name(), value);
                        }
                        break;
                    case MicroTimestamp.SCHEMA_NAME:
                        value = struct.get(field);
                        if (value == null) {
                            updatedValue.put(field.name(), null);
                            continue;
                        }
                        if (value instanceof Long) {
                            updatedValue.put(field.name(), 
formatMicroTimestamp((Long)value));
                        } else {
                            updatedValue.put(field.name(), value);
                        }
                        break;
                }
            } else {
                updatedValue.put(field.name(), struct.get(field));
            }
        }
        return newRecord(r, updatedSchema, updatedValue);
    }
    @Override
    public R apply(R record) {
        if (operatingSchema(record) == null) {
            return record;
        } else {
            return applyWithSchema(record);
        }
    }
}{code}
 

  was:
I'm using {{`io.debezium.connector.postgresql.PostgresConnector}} and 
{{io.confluent.connect.jdbc.JdbcSinkConnector`}} to sync data between two 
PostgreSQL databases. And I set `{{{}time.precision.mode=adaptive`{}}} in 
[Debezium 
config|https://debezium.io/documentation/reference/stable/connectors/postgresql.html#postgresql-temporal-types].
 which would serialize PostgreSQL time data type to {{Integer}} or {{Long}} and 
it's incompatible with {{{}JdbcSinkConnector{}}}. So I wrote an SMT to 
transform these data from numeric types to strings.

 

Say I have the following table:
{code:java}
CREATE TABLE pk_created_at (
    created_at timestamp without time zone DEFAULT current_timestamp not null,
    PRIMARY KEY (created_at)
);
insert into pk_created_at values(current_timestamp); {code}
{{}}

{{}}

My source connector configuration:
{code:java}
{
    "name": "test-connector",
    "config": {
        "snapshot.mode": "always",
        "plugin.name": "pgoutput",
        "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
        "tasks.max": "1",
        "database.hostname": "source",
        "database.port": "5432",
        "database.user": "postgres",
        "database.password": "postgres",
        "database.dbname" : "test",
        "database.server.name": "test",
        "slot.name" : "test",
        "key.converter": "org.apache.kafka.connect.json.JsonConverter",
        "key.converter.schemas.enabled": true,
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "value.converter.schemas.enabled": true,
        "decimal.handling.mode": "string",
        "time.precision.mode": "adaptive",
        "transforms": "unwrap",
        "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState"
    }
} {code}
{{}}

{{}}

 

And the messages in kafka topic `{{{}test.public.pk_created_at`{}}} would be:

{{}}
{code:java}
# bin/kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic 
test.public.pk_created_at --from-beginning { "schema":{ "type":"struct", 
"fields":[ { "type":"int64", "optional":false, 
"name":"io.debezium.time.MicroTimestamp", "version":1, "field":"created_at" } 
], "optional":false, "name":"test.public.pk_created_at.Value" }, "payload":{ 
"created_at":1669354751764130 } }{code}
 

{{}}

After applying my SMT, the messages would be like this:

{{}}
{code:java}
# bin/kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic 
test.public.pk_created_at --from-beginning { "schema":{ "type":"struct", 
"fields":[ { "type":"string", "optional":true, "field":"created_at" } ], 
"optional":false, "name":"test.public.pk_created_at.Value" }, "payload":{ 
"created_at":"2022-11-25T05:39:11.764130Z" } }{code}
{{ }}

It worke[d great if 
|https://github.com/FX-HAO/kafka-connect-debezium-tranforms/blob/master/src/main/java/com/github/haofuxin/kafka/connect/DebeziumTimestampConverter.java]{{`created_at`}}
 is not a part of primary keys. No error occurred. But the primary keys on some 
of my tables are composed of `{{{}id`{}}} and `{{{}created_at`{}}} like this: 
`{{{}PRIMARY KEY (id, created_at)`{}}}. Then it raised an exception in 
`{{{}JdbcSinkConnector`{}}} as below:

{{}}
{code:java}
2022-11-25 06:57:01,450 INFO || Attempting to open connection #1 to PostgreSql 
[io.confluent.connect.jdbc.util.CachedConnectionProvider] 2022-11-25 
06:57:01,459 INFO || Maximum table name length for database is 63 bytes 
[io.confluent.connect.jdbc.dialect.PostgreSqlDatabaseDialect] 2022-11-25 
06:57:01,459 INFO || JdbcDbWriter Connected 
[io.confluent.connect.jdbc.sink.JdbcDbWriter] 2022-11-25 06:57:01,472 INFO || 
Checking PostgreSql dialect for existence of TABLE "pk_created_at" 
[io.confluent.connect.jdbc.dialect.GenericDatabaseDialect] 2022-11-25 
06:57:01,484 INFO || Using PostgreSql dialect TABLE "pk_created_at" present 
[io.confluent.connect.jdbc.dialect.GenericDatabaseDialect] 2022-11-25 
06:57:01,505 INFO || Checking PostgreSql dialect for type of TABLE 
"pk_created_at" [io.confluent.connect.jdbc.dialect.GenericDatabaseDialect] 
2022-11-25 06:57:01,508 INFO || Setting metadata for table "pk_created_at" to 
Table{name='"pk_created_at"', type=TABLE columns=[Column{'created_at', 
isPrimaryKey=true, allowsNull=false, sqlType=timestamp}]} 
[io.confluent.connect.jdbc.util.TableDefinitions] 2022-11-25 06:57:01,510 WARN 
|| Write of 2 records failed, remainingRetries=0 
[io.confluent.connect.jdbc.sink.JdbcSinkTask] java.sql.BatchUpdateException: 
Batch entry 0 INSERT INTO "pk_created_at" ("created_at") VALUES 
(1669359291990398) ON CONFLICT ("created_at") DO NOTHING was aborted: ERROR: 
column "created_at" is of type timestamp without time zone but expression is of 
type bigint Hint: You will need to rewrite or cast the expression. Position: 52 
Call getNextException to see other errors in the batch. at 
org.postgresql.jdbc.BatchResultHandler.handleError(BatchResultHandler.java:165) 
at org.postgresql.jdbc.PgStatement.internalExecuteBatch(PgStatement.java:871) 
at org.postgresql.jdbc.PgStatement.executeBatch(PgStatement.java:910) at 
org.postgresql.jdbc.PgPreparedStatement.executeBatch(PgPreparedStatement.java:1638)
 at 
io.confluent.connect.jdbc.sink.BufferedRecords.executeUpdates(BufferedRecords.java:196)
 at 
io.confluent.connect.jdbc.sink.BufferedRecords.flush(BufferedRecords.java:186) 
at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:80) at 
io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:84) at 
org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:581)
 at 
org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:333) 
at 
org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234)
 at 
org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203)
 at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188) at 
org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243) at 
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
 at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
 at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
 at java.base/java.lang.Thread.run(Thread.java:829) Caused by: 
org.postgresql.util.PSQLException: ERROR: column "created_at" is of type 
timestamp without time zone but expression is of type bigint Hint: You will 
need to rewrite or cast the expression. Position: 52 at 
org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2675)
 at 
org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2365)
 at 
org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:355) at 
org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:315) at 
org.postgresql.jdbc.PgStatement.internalExecuteBatch(PgStatement.java:868) ... 
17 more{code}
 

The error seems like the sink connector was still trying to insert 
`{{{}created_at`{}}} with a numeric `{{{}1669359291990398`{}}}. but I verified 
that the messages in the kafka topic have been transformed into strings. It 
worked if `{{{}created_at`{}}} is not a primary key.

 

[My 
SMT|https://github.com/FX-HAO/kafka-connect-debezium-tranforms/blob/master/src/main/java/com/github/haofuxin/kafka/connect/DebeziumTimestampConverter.java]:
 
{code:java}
public class DebeziumTimestampConverter<R extends ConnectRecord<R>> implements 
Transformation<R> {
    private static final Logger LOG = 
LoggerFactory.getLogger(DebeziumTimestampConverter.class);
    private Cache<Schema, Schema> schemaUpdateCache;
    private static final String PURPOSE = "convert 
io.debezium.time.MicroTimestamp into String";
    @Override
    public void configure(Map<String, ?> props) {
        schemaUpdateCache = new SynchronizedCache<>(new LRUCache<Schema, 
Schema>(16));
    }
    @Override
    public ConfigDef config() {
        return new ConfigDef();
    }
    @Override
    public void close() {
    }
    protected Schema operatingSchema(R record) {
        return record.valueSchema();
    }
    protected Object operatingValue(R record) {
        return record.value();
    }
    private String formatDate(Integer epoch) {
        if (epoch == null) {
            return "";
        }
        LocalDate d = LocalDate.ofEpochDay(epoch);
        return d.toString();
    }
    private String formatTime(Integer epoch) {
        if (epoch == null) {
            return "";
        }
        java.util.Date date = new java.util.Date(epoch);
        return new SimpleDateFormat("HH:mm:ss.SSS").format(date);
    }
    private String formatMicroTime(Long epochMicroSeconds) {
        if (epochMicroSeconds == null) {
            return "";
        }
        DateTimeFormatter formatter = 
DateTimeFormatter.ofPattern("HH:mm:ss.SSSSSS").withZone(ZoneId.from(ZoneOffset.UTC));
        long epochSeconds = epochMicroSeconds / 1000000L;
        long nanoOffset = ( epochMicroSeconds % 1000000L ) * 1000L ;
        Instant instant = Instant.ofEpochSecond( epochSeconds, nanoOffset );
        return formatter.format(instant);
    }
    private String formatTimestamp(Long epochMilliSeconds) {
        if (epochMilliSeconds == null) {
            return "";
        }
        DateTimeFormatter formatter = 
DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'").withZone(ZoneId.from(ZoneOffset.UTC));
        Instant instant = Instant.ofEpochMilli( epochMilliSeconds );
        return formatter.format(instant);
    }
    private String formatMicroTimestamp(Long epochMicroSeconds) {
        if (epochMicroSeconds == null) {
            return "";
        }
        long epochSeconds = epochMicroSeconds / 1000000L;
        long nanoOffset = ( epochMicroSeconds % 1000000L ) * 1000L ;
        Instant instant = Instant.ofEpochSecond( epochSeconds, nanoOffset );
        return instant.toString();
    }
    private Schema makeUpdatedSchema(Schema schema) {
        final SchemaBuilder builder = SchemaUtil.copySchemaBasics(schema, 
SchemaBuilder.struct());
        for (Field field: schema.fields()) {
            if (field.schema().type() != Schema.Type.STRING && (
                    MicroTimestamp.SCHEMA_NAME.equals(field.schema().name()) ||
                    Date.SCHEMA_NAME.equals(field.schema().name()) ||
                    Time.SCHEMA_NAME.equals(field.schema().name()) ||
                    MicroTime.SCHEMA_NAME.equals(field.schema().name()) ||
                    Timestamp.SCHEMA_NAME.equals(field.schema().name()))) {
                builder.field(field.name(), Schema.OPTIONAL_STRING_SCHEMA);
            } else {
                builder.field(field.name(), field.schema());
            }
        }
        return builder.build();
    }
    protected R newRecord(R record, Schema updatedSchema, Object updatedValue) {
        return record.newRecord(
                record.topic(),
                record.kafkaPartition(),
                record.keySchema(),
                record.key(),
                updatedSchema,
                updatedValue,
                record.timestamp()
        );
    }
    private R applyWithSchema(R r) {
        final Struct struct = requireStruct(operatingValue(r), PURPOSE);
        Schema updatedSchema = schemaUpdateCache.get(struct.schema());
        if(updatedSchema == null) {
            updatedSchema = makeUpdatedSchema(struct.schema());
            schemaUpdateCache.put(struct.schema(), updatedSchema);
        }
        final Struct updatedValue = new Struct(updatedSchema);
        for (Field field : struct.schema().fields()) {
            if (field.schema().type() != Schema.Type.STRING && 
field.schema().name() != null) {
                switch (field.schema().name()) {
                    case Date.SCHEMA_NAME:
                        Object value = struct.get(field);
                        if (value == null) {
                            updatedValue.put(field.name(), null);
                            continue;
                        }
                        if (value instanceof Integer) {
                            updatedValue.put(field.name(), 
formatDate((Integer)value));
                        } else {
                            updatedValue.put(field.name(), value);
                        }
                        break;
                    case Time.SCHEMA_NAME:
                        value = struct.get(field);
                        if (value == null) {
                            updatedValue.put(field.name(), null);
                            continue;
                        }
                        if (value instanceof Integer) {
                            updatedValue.put(field.name(), 
formatTime((Integer)value));
                        } else {
                            updatedValue.put(field.name(), value);
                        }
                        break;
                    case MicroTime.SCHEMA_NAME:
                        value = struct.get(field);
                        if (value == null) {
                            updatedValue.put(field.name(), null);
                            continue;
                        }
                        if (value instanceof Long) {
                            updatedValue.put(field.name(), 
formatMicroTime((Long)value));
                        } else {
                            updatedValue.put(field.name(), value);
                        }
                        break;
                    case Timestamp.SCHEMA_NAME:
                        value = struct.get(field);
                        if (value == null) {
                            updatedValue.put(field.name(), null);
                            continue;
                        }
                        if (value instanceof Long) {
                            updatedValue.put(field.name(), 
formatTimestamp((Long)value));
                        } else {
                            updatedValue.put(field.name(), value);
                        }
                        break;
                    case MicroTimestamp.SCHEMA_NAME:
                        value = struct.get(field);
                        if (value == null) {
                            updatedValue.put(field.name(), null);
                            continue;
                        }
                        if (value instanceof Long) {
                            updatedValue.put(field.name(), 
formatMicroTimestamp((Long)value));
                        } else {
                            updatedValue.put(field.name(), value);
                        }
                        break;
                }
            } else {
                updatedValue.put(field.name(), struct.get(field));
            }
        }
        return newRecord(r, updatedSchema, updatedValue);
    }
    @Override
    public R apply(R record) {
        if (operatingSchema(record) == null) {
            return record;
        } else {
            return applyWithSchema(record);
        }
    }
}{code}




 


> my custom SMT that converts Int to String does not work for primary keys
> ------------------------------------------------------------------------
>
>                 Key: KAFKA-14665
>                 URL: https://issues.apache.org/jira/browse/KAFKA-14665
>             Project: Kafka
>          Issue Type: Bug
>          Components: KafkaConnect
>    Affects Versions: 3.1.0
>            Reporter: Fuxin Hao
>            Priority: Major
>
> I'm using {{`io.debezium.connector.postgresql.PostgresConnector}} and 
> {{io.confluent.connect.jdbc.JdbcSinkConnector`}} to sync data between two 
> PostgreSQL databases. And I set `{{{}time.precision.mode=adaptive`{}}} in 
> [Debezium 
> config|https://debezium.io/documentation/reference/stable/connectors/postgresql.html#postgresql-temporal-types].
>  which would serialize PostgreSQL time data type to {{Integer}} or {{Long}} 
> and it's incompatible with {{{}JdbcSinkConnector{}}}. So I wrote an SMT to 
> transform these data from numeric types to strings.
>  
> Say I have the following table:
> {code:java}
> CREATE TABLE pk_created_at (
>     created_at timestamp without time zone DEFAULT current_timestamp not null,
>     PRIMARY KEY (created_at)
> );
> insert into pk_created_at values(current_timestamp); {code}
>  
> My source connector configuration:
> {code:java}
> {
>     "name": "test-connector",
>     "config": {
>         "snapshot.mode": "always",
>         "plugin.name": "pgoutput",
>         "connector.class": 
> "io.debezium.connector.postgresql.PostgresConnector",
>         "tasks.max": "1",
>         "database.hostname": "source",
>         "database.port": "5432",
>         "database.user": "postgres",
>         "database.password": "postgres",
>         "database.dbname" : "test",
>         "database.server.name": "test",
>         "slot.name" : "test",
>         "key.converter": "org.apache.kafka.connect.json.JsonConverter",
>         "key.converter.schemas.enabled": true,
>         "value.converter": "org.apache.kafka.connect.json.JsonConverter",
>         "value.converter.schemas.enabled": true,
>         "decimal.handling.mode": "string",
>         "time.precision.mode": "adaptive",
>         "transforms": "unwrap",
>         "transforms.unwrap.type": 
> "io.debezium.transforms.ExtractNewRecordState"
>     }
> } {code}
>  
> And the messages in kafka topic `{{{}test.public.pk_created_at`{}}} would be:
> {code:java}
> # bin/kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic 
> test.public.pk_created_at --from-beginning { "schema":{ "type":"struct", 
> "fields":[ { "type":"int64", "optional":false, 
> "name":"io.debezium.time.MicroTimestamp", "version":1, "field":"created_at" } 
> ], "optional":false, "name":"test.public.pk_created_at.Value" }, "payload":{ 
> "created_at":1669354751764130 } }{code}
>  
> After applying my SMT, the messages would be like this:
> {code:java}
> # bin/kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic 
> test.public.pk_created_at --from-beginning { "schema":{ "type":"struct", 
> "fields":[ { "type":"string", "optional":true, "field":"created_at" } ], 
> "optional":false, "name":"test.public.pk_created_at.Value" }, "payload":{ 
> "created_at":"2022-11-25T05:39:11.764130Z" } }{code}
> {{ }}
> It worke[d great if 
> |https://github.com/FX-HAO/kafka-connect-debezium-tranforms/blob/master/src/main/java/com/github/haofuxin/kafka/connect/DebeziumTimestampConverter.java]{{`created_at`}}
>  is not a part of primary keys. No error occurred. But the primary keys on 
> some of my tables are composed of `{{{}id`{}}} and `{{{}created_at`{}}} like 
> this: `{{{}PRIMARY KEY (id, created_at)`{}}}. Then it raised an exception in 
> `{{{}JdbcSinkConnector`{}}} as below:
> {code:java}
> 2022-11-25 06:57:01,450 INFO || Attempting to open connection #1 to 
> PostgreSql [io.confluent.connect.jdbc.util.CachedConnectionProvider] 
> 2022-11-25 06:57:01,459 INFO || Maximum table name length for database is 63 
> bytes [io.confluent.connect.jdbc.dialect.PostgreSqlDatabaseDialect] 
> 2022-11-25 06:57:01,459 INFO || JdbcDbWriter Connected 
> [io.confluent.connect.jdbc.sink.JdbcDbWriter] 2022-11-25 06:57:01,472 INFO || 
> Checking PostgreSql dialect for existence of TABLE "pk_created_at" 
> [io.confluent.connect.jdbc.dialect.GenericDatabaseDialect] 2022-11-25 
> 06:57:01,484 INFO || Using PostgreSql dialect TABLE "pk_created_at" present 
> [io.confluent.connect.jdbc.dialect.GenericDatabaseDialect] 2022-11-25 
> 06:57:01,505 INFO || Checking PostgreSql dialect for type of TABLE 
> "pk_created_at" [io.confluent.connect.jdbc.dialect.GenericDatabaseDialect] 
> 2022-11-25 06:57:01,508 INFO || Setting metadata for table "pk_created_at" to 
> Table{name='"pk_created_at"', type=TABLE columns=[Column{'created_at', 
> isPrimaryKey=true, allowsNull=false, sqlType=timestamp}]} 
> [io.confluent.connect.jdbc.util.TableDefinitions] 2022-11-25 06:57:01,510 
> WARN || Write of 2 records failed, remainingRetries=0 
> [io.confluent.connect.jdbc.sink.JdbcSinkTask] java.sql.BatchUpdateException: 
> Batch entry 0 INSERT INTO "pk_created_at" ("created_at") VALUES 
> (1669359291990398) ON CONFLICT ("created_at") DO NOTHING was aborted: ERROR: 
> column "created_at" is of type timestamp without time zone but expression is 
> of type bigint Hint: You will need to rewrite or cast the expression. 
> Position: 52 Call getNextException to see other errors in the batch. at 
> org.postgresql.jdbc.BatchResultHandler.handleError(BatchResultHandler.java:165)
>  at 
> org.postgresql.jdbc.PgStatement.internalExecuteBatch(PgStatement.java:871) at 
> org.postgresql.jdbc.PgStatement.executeBatch(PgStatement.java:910) at 
> org.postgresql.jdbc.PgPreparedStatement.executeBatch(PgPreparedStatement.java:1638)
>  at 
> io.confluent.connect.jdbc.sink.BufferedRecords.executeUpdates(BufferedRecords.java:196)
>  at 
> io.confluent.connect.jdbc.sink.BufferedRecords.flush(BufferedRecords.java:186)
>  at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:80) 
> at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:84) at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:581)
>  at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:333) 
> at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234)
>  at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203)
>  at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188) at 
> org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243) at 
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
>  at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at 
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>  at 
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>  at java.base/java.lang.Thread.run(Thread.java:829) Caused by: 
> org.postgresql.util.PSQLException: ERROR: column "created_at" is of type 
> timestamp without time zone but expression is of type bigint Hint: You will 
> need to rewrite or cast the expression. Position: 52 at 
> org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2675)
>  at 
> org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2365)
>  at 
> org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:355) 
> at 
> org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:315) 
> at org.postgresql.jdbc.PgStatement.internalExecuteBatch(PgStatement.java:868) 
> ... 17 more{code}
>  
> The error seems like the sink connector was still trying to insert 
> `{{{}created_at`{}}} with a numeric `{{{}1669359291990398`{}}}. but I 
> verified that the messages in the kafka topic have been transformed into 
> strings. It worked if `{{{}created_at`{}}} is not a primary key.
>  
> [My 
> SMT|https://github.com/FX-HAO/kafka-connect-debezium-tranforms/blob/master/src/main/java/com/github/haofuxin/kafka/connect/DebeziumTimestampConverter.java]:
>  
> {code:java}
> public class DebeziumTimestampConverter<R extends ConnectRecord<R>> 
> implements Transformation<R> {
>     private static final Logger LOG = 
> LoggerFactory.getLogger(DebeziumTimestampConverter.class);
>     private Cache<Schema, Schema> schemaUpdateCache;
>     private static final String PURPOSE = "convert 
> io.debezium.time.MicroTimestamp into String";
>     @Override
>     public void configure(Map<String, ?> props) {
>         schemaUpdateCache = new SynchronizedCache<>(new LRUCache<Schema, 
> Schema>(16));
>     }
>     @Override
>     public ConfigDef config() {
>         return new ConfigDef();
>     }
>     @Override
>     public void close() {
>     }
>     protected Schema operatingSchema(R record) {
>         return record.valueSchema();
>     }
>     protected Object operatingValue(R record) {
>         return record.value();
>     }
>     private String formatDate(Integer epoch) {
>         if (epoch == null) {
>             return "";
>         }
>         LocalDate d = LocalDate.ofEpochDay(epoch);
>         return d.toString();
>     }
>     private String formatTime(Integer epoch) {
>         if (epoch == null) {
>             return "";
>         }
>         java.util.Date date = new java.util.Date(epoch);
>         return new SimpleDateFormat("HH:mm:ss.SSS").format(date);
>     }
>     private String formatMicroTime(Long epochMicroSeconds) {
>         if (epochMicroSeconds == null) {
>             return "";
>         }
>         DateTimeFormatter formatter = 
> DateTimeFormatter.ofPattern("HH:mm:ss.SSSSSS").withZone(ZoneId.from(ZoneOffset.UTC));
>         long epochSeconds = epochMicroSeconds / 1000000L;
>         long nanoOffset = ( epochMicroSeconds % 1000000L ) * 1000L ;
>         Instant instant = Instant.ofEpochSecond( epochSeconds, nanoOffset );
>         return formatter.format(instant);
>     }
>     private String formatTimestamp(Long epochMilliSeconds) {
>         if (epochMilliSeconds == null) {
>             return "";
>         }
>         DateTimeFormatter formatter = 
> DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'").withZone(ZoneId.from(ZoneOffset.UTC));
>         Instant instant = Instant.ofEpochMilli( epochMilliSeconds );
>         return formatter.format(instant);
>     }
>     private String formatMicroTimestamp(Long epochMicroSeconds) {
>         if (epochMicroSeconds == null) {
>             return "";
>         }
>         long epochSeconds = epochMicroSeconds / 1000000L;
>         long nanoOffset = ( epochMicroSeconds % 1000000L ) * 1000L ;
>         Instant instant = Instant.ofEpochSecond( epochSeconds, nanoOffset );
>         return instant.toString();
>     }
>     private Schema makeUpdatedSchema(Schema schema) {
>         final SchemaBuilder builder = SchemaUtil.copySchemaBasics(schema, 
> SchemaBuilder.struct());
>         for (Field field: schema.fields()) {
>             if (field.schema().type() != Schema.Type.STRING && (
>                     MicroTimestamp.SCHEMA_NAME.equals(field.schema().name()) 
> ||
>                     Date.SCHEMA_NAME.equals(field.schema().name()) ||
>                     Time.SCHEMA_NAME.equals(field.schema().name()) ||
>                     MicroTime.SCHEMA_NAME.equals(field.schema().name()) ||
>                     Timestamp.SCHEMA_NAME.equals(field.schema().name()))) {
>                 builder.field(field.name(), Schema.OPTIONAL_STRING_SCHEMA);
>             } else {
>                 builder.field(field.name(), field.schema());
>             }
>         }
>         return builder.build();
>     }
>     protected R newRecord(R record, Schema updatedSchema, Object 
> updatedValue) {
>         return record.newRecord(
>                 record.topic(),
>                 record.kafkaPartition(),
>                 record.keySchema(),
>                 record.key(),
>                 updatedSchema,
>                 updatedValue,
>                 record.timestamp()
>         );
>     }
>     private R applyWithSchema(R r) {
>         final Struct struct = requireStruct(operatingValue(r), PURPOSE);
>         Schema updatedSchema = schemaUpdateCache.get(struct.schema());
>         if(updatedSchema == null) {
>             updatedSchema = makeUpdatedSchema(struct.schema());
>             schemaUpdateCache.put(struct.schema(), updatedSchema);
>         }
>         final Struct updatedValue = new Struct(updatedSchema);
>         for (Field field : struct.schema().fields()) {
>             if (field.schema().type() != Schema.Type.STRING && 
> field.schema().name() != null) {
>                 switch (field.schema().name()) {
>                     case Date.SCHEMA_NAME:
>                         Object value = struct.get(field);
>                         if (value == null) {
>                             updatedValue.put(field.name(), null);
>                             continue;
>                         }
>                         if (value instanceof Integer) {
>                             updatedValue.put(field.name(), 
> formatDate((Integer)value));
>                         } else {
>                             updatedValue.put(field.name(), value);
>                         }
>                         break;
>                     case Time.SCHEMA_NAME:
>                         value = struct.get(field);
>                         if (value == null) {
>                             updatedValue.put(field.name(), null);
>                             continue;
>                         }
>                         if (value instanceof Integer) {
>                             updatedValue.put(field.name(), 
> formatTime((Integer)value));
>                         } else {
>                             updatedValue.put(field.name(), value);
>                         }
>                         break;
>                     case MicroTime.SCHEMA_NAME:
>                         value = struct.get(field);
>                         if (value == null) {
>                             updatedValue.put(field.name(), null);
>                             continue;
>                         }
>                         if (value instanceof Long) {
>                             updatedValue.put(field.name(), 
> formatMicroTime((Long)value));
>                         } else {
>                             updatedValue.put(field.name(), value);
>                         }
>                         break;
>                     case Timestamp.SCHEMA_NAME:
>                         value = struct.get(field);
>                         if (value == null) {
>                             updatedValue.put(field.name(), null);
>                             continue;
>                         }
>                         if (value instanceof Long) {
>                             updatedValue.put(field.name(), 
> formatTimestamp((Long)value));
>                         } else {
>                             updatedValue.put(field.name(), value);
>                         }
>                         break;
>                     case MicroTimestamp.SCHEMA_NAME:
>                         value = struct.get(field);
>                         if (value == null) {
>                             updatedValue.put(field.name(), null);
>                             continue;
>                         }
>                         if (value instanceof Long) {
>                             updatedValue.put(field.name(), 
> formatMicroTimestamp((Long)value));
>                         } else {
>                             updatedValue.put(field.name(), value);
>                         }
>                         break;
>                 }
>             } else {
>                 updatedValue.put(field.name(), struct.get(field));
>             }
>         }
>         return newRecord(r, updatedSchema, updatedValue);
>     }
>     @Override
>     public R apply(R record) {
>         if (operatingSchema(record) == null) {
>             return record;
>         } else {
>             return applyWithSchema(record);
>         }
>     }
> }{code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to