[ https://issues.apache.org/jira/browse/KAFKA-8879?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Vlad Olevsky updated KAFKA-8879: -------------------------------- Description: We read messages from input topic, transform messages (ChannelConfigNew->ChannelConfig) and send it to another topic: {code:java} builder.stream(chlConfigValidationKafkaConfig.getChlConfigValidationKafkaSrcTopic(), Consumed.with(Serdes.String(), new JsonSerde<>(ChannelConfigNew.class))) .transform(() -> new ChannelConfigProcessor(chlConfigValidationKafkaConfig, prometheusCounter, channelConfigPostDataHelper)) .to(chlConfigValidationKafkaConfig.getChlConfigValidationKafkaDestinationTopic(), Produced.with(Serdes.String(), new JsonSerde<>(ChannelConfig.class))); {code} where ChannelConfigProcessor (only essential parts are shown) {code:java} public class ChannelConfigProcessor implements Transformer<String, ChannelConfigNew, KeyValue<String, ChannelConfig>> { public KeyValue<String, ChannelConfig> transform(String ccid, ChannelConfigNew channelConfigNew) { return new KeyValue<>(ccid, convert(channelConfigNew)); } private ChannelConfig convert(ChannelConfigNew channelConfigNew){ ... } } {code} Both input (ChannelConfigNew ) and output(ChannelConfig) javaTypes are stored in the headers of the message that is sent to another topic. Input javaType (ChannelConfigNew) already presented in headers when serialization is called in {code:java} org.apache.kafka.streams.processor.internals.RecordCollectorImpl::send(final String topic, final K key, final V value, final Headers headers, final Integer partition, final Long timestamp, final Serializer<K> keySerializer, final Serializer<V> valueSerializer){ final byte[] valBytes = valueSerializer.serialize(topic, headers, value); } {code} output javaType (ChannelConfig) is added to headers inside valueSerializer.serialize() method: org.springframework.kafka.support.serializer.JsonSerializer {code:java} @Override public byte[] serialize(String topic, Headers headers, T data) { if (data == null) { return null; } if (this.addTypeInfo && headers != null) { this.typeMapper.fromJavaType(this.objectMapper.constructType(data.getClass()), headers); } return serialize(topic, data); } {code} On other side we have GlobalKTable with processor that retrieves first (ChannelConfigNew - wrong one) javaType from headers and tries to deserialize data using this type: {code:java} org.apache.kafka.streams.processor.internals.GlobalStateUpdateTask public void update(final ConsumerRecord<byte[], byte[]> record) { ... final ConsumerRecord<Object, Object> deserialized = sourceNodeAndDeserializer.deserialize(processorContext, record); .. } {code} sourceNodeAndDeserializer.deserialize calls {code:java} org.apache.kafka.streams.processor.internals.RecordDeserializer::deserialize() { sourceNode.deserializeValue(rawRecord.topic(), rawRecord.headers(), rawRecord.value()), rawRecord.headers()); } eventually reaches org.springframework.kafka.support.serializer.JsonDeserializer::deserialize(String topic, Headers headers, byte[] data){ if (data == null) { return null; } JavaType javaType = this.typeMapper.toJavaType(headers); if (javaType == null) { Assert.state(this.targetType != null, "No type information in headers and no default type provided"); return deserialize(topic, data); } else { try { return this.objectMapper.readerFor(javaType).readValue(data); } catch (IOException e) { throw new SerializationException("Can't deserialize data [" + Arrays.toString(data) + "] from topic [" + topic + "]", e); } } } {code} *JavaType javaType = this.typeMapper.toJavaType(headers)* extracts first javaType -which is not one that should be used to deserialize the object (it gets ChannelConfigNew rather than ChannelConfig). As result the object is not retrieved properly - all fields are null. was: We read messages from input topic, transform messages (ChannelConfigNew->ChannelConfig) and send it to another topic: {code:java} builder.stream(chlConfigValidationKafkaConfig.getChlConfigValidationKafkaSrcTopic(), Consumed.with(Serdes.String(), new JsonSerde<>(ChannelConfigNew.class))) .transform(() -> new ChannelConfigProcessor(chlConfigValidationKafkaConfig, prometheusCounter, channelConfigPostDataHelper)) .to(chlConfigValidationKafkaConfig.getChlConfigValidationKafkaDestinationTopic(), Produced.with(Serdes.String(), new JsonSerde<>(ChannelConfig.class))); {code} where ChannelConfigProcessor (only essential part is shown) {code:java} public class ChannelConfigProcessor implements Transformer<String, ChannelConfigNew, KeyValue<String, ChannelConfig>> { public KeyValue<String, ChannelConfig> transform(String ccid, ChannelConfigNew channelConfigNew) { return new KeyValue<>(ccid, convert(channelConfigNew)); } private ChannelConfig convert(ChannelConfigNew channelConfigNew){ ... } } {code} Both input (ChannelConfigNew ) and output(ChannelConfig) javaTypes are stored in the headers of the message sent to another topic. Input type already presented in headers when serialization is called in {code:java} org.apache.kafka.streams.processor.internals.RecordCollectorImpl::send(final String topic, final K key, final V value, final Headers headers, final Integer partition, final Long timestamp, final Serializer<K> keySerializer, final Serializer<V> valueSerializer){ final byte[] valBytes = valueSerializer.serialize(topic, headers, value); } {code} output type is added to headers inside valueSerializer.serialize() method: org.springframework.kafka.support.serializer.JsonSerializer {code:java} @Override public byte[] serialize(String topic, Headers headers, T data) { if (data == null) { return null; } if (this.addTypeInfo && headers != null) { this.typeMapper.fromJavaType(this.objectMapper.constructType(data.getClass()), headers); } return serialize(topic, data); } {code} On other side we have GlobalKTable with processor that retrieves first (ChannelConfigNew - wrong one) javaType from header and try to deserialize data using this type: {code:java} org.apache.kafka.streams.processor.internals.GlobalStateUpdateTask public void update(final ConsumerRecord<byte[], byte[]> record) { ... final ConsumerRecord<Object, Object> deserialized = sourceNodeAndDeserializer.deserialize(processorContext, record); .. } {code} sourceNodeAndDeserializer.deserialize calls {code:java} org.apache.kafka.streams.processor.internals.RecordDeserializer::deserialize() { sourceNode.deserializeValue(rawRecord.topic(), rawRecord.headers(), rawRecord.value()), rawRecord.headers()); } eventually reaches org.springframework.kafka.support.serializer.JsonDeserializer::deserialize(String topic, Headers headers, byte[] data){ if (data == null) { return null; } JavaType javaType = this.typeMapper.toJavaType(headers); if (javaType == null) { Assert.state(this.targetType != null, "No type information in headers and no default type provided"); return deserialize(topic, data); } else { try { return this.objectMapper.readerFor(javaType).readValue(data); } catch (IOException e) { throw new SerializationException("Can't deserialize data [" + Arrays.toString(data) + "] from topic [" + topic + "]", e); } } } {code} JavaType javaType = this.typeMapper.toJavaType(headers) calls extract first javaType -which is not one that should be used to deserialize the object (it gets ChannelConfigNew rather than ChannelConfig). As result the object is not retrieved properly - all fields are null. > GlobalStateUpdateTask uses wrong javaType to deserialize value > -------------------------------------------------------------- > > Key: KAFKA-8879 > URL: https://issues.apache.org/jira/browse/KAFKA-8879 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 2.3.0 > Reporter: Vlad Olevsky > Priority: Major > > We read messages from input topic, transform messages > (ChannelConfigNew->ChannelConfig) and send it to another topic: > > {code:java} > builder.stream(chlConfigValidationKafkaConfig.getChlConfigValidationKafkaSrcTopic(), > Consumed.with(Serdes.String(), new JsonSerde<>(ChannelConfigNew.class))) > .transform(() -> new ChannelConfigProcessor(chlConfigValidationKafkaConfig, > prometheusCounter, channelConfigPostDataHelper)) > > .to(chlConfigValidationKafkaConfig.getChlConfigValidationKafkaDestinationTopic(), > Produced.with(Serdes.String(), new JsonSerde<>(ChannelConfig.class))); > {code} > where ChannelConfigProcessor (only essential parts are shown) > > {code:java} > public class ChannelConfigProcessor implements Transformer<String, > ChannelConfigNew, KeyValue<String, ChannelConfig>> { > public KeyValue<String, ChannelConfig> transform(String ccid, > ChannelConfigNew channelConfigNew) { > return new KeyValue<>(ccid, convert(channelConfigNew)); > } > > private ChannelConfig convert(ChannelConfigNew channelConfigNew){ > ... > } > } > > {code} > Both input (ChannelConfigNew ) and output(ChannelConfig) javaTypes are stored > in the headers of the message that is sent to another topic. Input javaType > (ChannelConfigNew) already presented in headers when serialization is called > in > {code:java} > org.apache.kafka.streams.processor.internals.RecordCollectorImpl::send(final > String topic, > final K key, > final V value, > final Headers headers, > final Integer partition, > final Long timestamp, > final Serializer<K> keySerializer, > final Serializer<V> valueSerializer){ > final byte[] valBytes = valueSerializer.serialize(topic, headers, value); > } > > {code} > output javaType (ChannelConfig) is added to headers inside > valueSerializer.serialize() method: > org.springframework.kafka.support.serializer.JsonSerializer > > {code:java} > @Override > public byte[] serialize(String topic, Headers headers, T data) { > if (data == null) { > return null; > } > if (this.addTypeInfo && headers != null) { > > this.typeMapper.fromJavaType(this.objectMapper.constructType(data.getClass()), > headers); > } > return serialize(topic, data); > } > > {code} > > On other side we have GlobalKTable with processor that retrieves first > (ChannelConfigNew - wrong one) javaType from headers and tries to > deserialize data using this type: > > {code:java} > org.apache.kafka.streams.processor.internals.GlobalStateUpdateTask > public void update(final ConsumerRecord<byte[], byte[]> record) { > ... > final ConsumerRecord<Object, Object> deserialized = > sourceNodeAndDeserializer.deserialize(processorContext, record); > .. > } > {code} > > > sourceNodeAndDeserializer.deserialize calls > {code:java} > org.apache.kafka.streams.processor.internals.RecordDeserializer::deserialize() > { > sourceNode.deserializeValue(rawRecord.topic(), rawRecord.headers(), > rawRecord.value()), rawRecord.headers()); > } > > eventually reaches > org.springframework.kafka.support.serializer.JsonDeserializer::deserialize(String > topic, Headers headers, byte[] data){ > if (data == null) { > return null; > } > JavaType javaType = this.typeMapper.toJavaType(headers); > if (javaType == null) { > Assert.state(this.targetType != null, "No type information in headers and no > default type provided"); > return deserialize(topic, data); > } > else { > try { > return this.objectMapper.readerFor(javaType).readValue(data); > } > catch (IOException e) { > throw new SerializationException("Can't deserialize data [" + > Arrays.toString(data) + > "] from topic [" + topic + "]", e); > } > } > } > > {code} > > *JavaType javaType = this.typeMapper.toJavaType(headers)* extracts first > javaType -which is not one that should be used to deserialize the object (it > gets ChannelConfigNew rather than ChannelConfig). As result the object is > not retrieved properly - all fields are null. > > > > > -- This message was sent by Atlassian Jira (v8.3.2#803003)