Hi, there!

I have encountered the problem, that Kafka Connect's Cast transformation loses 
schema information (basically, schema name) while doing type casting. I have 
reproduced this problem with the following test in 
org.apache.kafka.connect.transforms.CastTest for current trunk repository 
branch:
```
@SuppressWarnings("unchecked")
@Test
public void castWholeRecordValueWithSchemaBooleanAndTimestampField() {
    final Cast<SourceRecord> xform = new Cast.Value<>();
    xform.configure(Collections.singletonMap(Cast.SPEC_CONFIG, 
"int64:boolean"));

    SchemaBuilder builder = SchemaBuilder.struct();
    builder.field("int64", Schema.INT64_SCHEMA);
    builder.field("timestamp", Timestamp.SCHEMA);
    Schema supportedTypesSchema = builder.build();

    Struct recordValue = new Struct(supportedTypesSchema);
    recordValue.put("int64", (long) 64);
    recordValue.put("timestamp", new java.sql.Timestamp(0L));

    SourceRecord transformed = xform.apply(new SourceRecord(null, null, 
"topic", 0,
            supportedTypesSchema, recordValue));

    assertNull(transformed.valueSchema());
    assertEquals(true, ((Map<String, Object>) 
transformed.value()).get("int64"));
    assertEquals(new java.sql.Timestamp(0L), ((Map<String, Object>) 
transformed.value()).get("timestamp"));
}
```
And this fails with the following exception:
```
org.apache.kafka.connect.errors.DataException: Invalid Java object for schema 
type INT64: class java.sql.Timestamp for field: "null"

                at 
org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:240)
                at 
org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:209)
                at org.apache.kafka.connect.data.Struct.put(Struct.java:214)
                at 
org.apache.kafka.connect.transforms.Cast.applyWithSchema(Cast.java:152)
                at org.apache.kafka.connect.transforms.Cast.apply(Cast.java:108)
                at 
org.apache.kafka.connect.transforms.CastTest.castWholeRecordValueWithSchemaBooleanAndTimestampField(CastTest.java:380)
```
This happens because Timestamp.SCHEMA has schema.type = 'INT64' and schema.name 
= "org.apache.kafka.connect.data.Timestamp", but 
org.apache.kafka.connect.transforms.Cast#getOrBuildSchema method copies only 
schema.type and rewrites schema.name with 'null'.

For example, such a behavior leads to connector failure while exporting data 
from a database having timestamp field and additional field we perform type 
casting on (I have a connector settings for PostgreSQL which leads to this 
problem).

Should I report a bug or there is something I misunderstand? I have patched a 
source code locally in order to fix this problem for my particular case and I 
am ready to prepare a more general path if it will be necessary.

I have also attached a git patch file with the failing test case.

Thanks,
Artem

Reply via email to