[ https://issues.apache.org/jira/browse/BEAM-7073?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Vishwas resolved BEAM-7073. --------------------------- Fix Version/s: 2.14.0 2.17.0 Resolution: Fixed I have tested this fix with 2.15 version and the issue is resolved. > AvroUtils converting generic record to Beam Row causes class cast exception > --------------------------------------------------------------------------- > > Key: BEAM-7073 > URL: https://issues.apache.org/jira/browse/BEAM-7073 > Project: Beam > Issue Type: Bug > Components: sdk-java-core > Affects Versions: 2.11.0 > Environment: Direct Runner > Reporter: Vishwas > Assignee: Ryan Skraba > Priority: Major > Fix For: 2.17.0, 2.14.0 > > Time Spent: 50m > Remaining Estimate: 0h > > Below is my pipeline: > KafkaSource (KafkaIo.read) -----------> Pardo -------> BeamSql------> > KafkaSink (KafkaIO.write) > Kafka Source IO reads from Kafka topic avro records and deserializes it to > generic record using below > KafkaIO.Read<String, GenericRecord> kafkaIoRead = KafkaIO.<String, > GenericRecord>read() > .withBootstrapServers(bootstrapServerUrl) > .withTopic(topicName) > .withKeyDeserializer(StringDeserializer.class) > .withValueDeserializerAndCoder(GenericAvroDeserializer.class, > > AvroCoder.of(GenericRecord.class, avroSchema)) > > .updateConsumerProperties(ImmutableMap.of("schema.registry.url", > > schemaRegistryUrl)); > Avro schema of the topic has a logicaltype (timestamp-millis). This is > deserialized to > joda-time. > { > "name": "timeOfRelease", > "type": [ > "null", > { > "type": "long", > "logicalType": "timestamp-millis", > "connect.version": 1, > "connect.name": "org.apache.kafka.connect.data.Timestamp" > } > ], > "default": null, > } > Now in my Pardo transform, I am trying to use the AvroUtils class methods to > convert the generic record to Beam Row and getting below class cast exception > AvroUtils.toBeamRowStrict(genericRecord, this.beamSchema) > Caused by: java.lang.ClassCastException: org.joda.time.DateTime cannot be > cast to java.lang.Long > at > org.apache.beam.sdk.schemas.utils.AvroUtils.convertAvroFieldStrict(AvroUtils.java:664) > at > org.apache.beam.sdk.schemas.utils.AvroUtils.toBeamRowStrict(AvroUtils.java:217) > > This looks like a bug as joda time type created as part of deserialization is > being type casted to Long in below code. > else if (logicalType instanceof LogicalTypes.TimestampMillis) { > return convertDateTimeStrict((Long) value, fieldType); > } > PS: I also used the avro-tools 1.8.2 jar to get the classes for the mentioned > avro schema and I see that the attribute with timestamp-millis logical type > is being converted to joda-time. > -- This message was sent by Atlassian Jira (v8.3.4#803005)