Vlad Olevsky created KAFKA-8879:
-----------------------------------

             Summary: GlobalStateUpdateTask uses wrong javaType to deserialize 
value
                 Key: KAFKA-8879
                 URL: https://issues.apache.org/jira/browse/KAFKA-8879
             Project: Kafka
          Issue Type: Bug
          Components: streams
    Affects Versions: 2.3.0
            Reporter: Vlad Olevsky


We read messages from input topic, transform messages 
(ChannelConfigNew->ChannelConfig) and send it to another topic:

 
{code:java}
builder.stream(chlConfigValidationKafkaConfig.getChlConfigValidationKafkaSrcTopic(),
 Consumed.with(Serdes.String(), new JsonSerde<>(ChannelConfigNew.class)))
 .transform(() -> new ChannelConfigProcessor(chlConfigValidationKafkaConfig, 
prometheusCounter, channelConfigPostDataHelper))
 
.to(chlConfigValidationKafkaConfig.getChlConfigValidationKafkaDestinationTopic(),
 Produced.with(Serdes.String(), new JsonSerde<>(ChannelConfig.class)));

{code}
where ChannelConfigProcessor (only essential part is shown)

 
{code:java}
public class ChannelConfigProcessor implements Transformer<String, 
ChannelConfigNew, KeyValue<String, ChannelConfig>> {
 public KeyValue<String, ChannelConfig> transform(String ccid, ChannelConfigNew 
channelConfigNew) {
 return new KeyValue<>(ccid, convert(channelConfigNew));
 }
 
 private ChannelConfig convert(ChannelConfigNew channelConfigNew){
 ...
 }

}
 
{code}
Both input (ChannelConfigNew ) and output(ChannelConfig) javaTypes are stored 
in the headers of the message sent to another topic. Input type already 
presented in headers when serialization is called in
{code:java}
org.apache.kafka.streams.processor.internals.RecordCollectorImpl::send(final 
String topic,
 final K key,
 final V value,
 final Headers headers,
 final Integer partition,
 final Long timestamp,
 final Serializer<K> keySerializer,
 final Serializer<V> valueSerializer){
   final byte[] valBytes = valueSerializer.serialize(topic, headers, value);
}
 
{code}
output type is added to headers inside valueSerializer.serialize() method:

org.springframework.kafka.support.serializer.JsonSerializer

 
{code:java}
@Override
 public byte[] serialize(String topic, Headers headers, T data) {
 if (data == null) {
 return null;
 }
 if (this.addTypeInfo && headers != null) {
 this.typeMapper.fromJavaType(this.objectMapper.constructType(data.getClass()), 
headers);
 }
 return serialize(topic, data);
 }
 
{code}
 

On other side we have GlobalKTable with processor that retrieves first 
(ChannelConfigNew - wrong one) javaType from header  and try to deserialize  
data using this type:

 
{code:java}
org.apache.kafka.streams.processor.internals.GlobalStateUpdateTask
public void update(final ConsumerRecord<byte[], byte[]> record) {
...
 final ConsumerRecord<Object, Object> deserialized = 
sourceNodeAndDeserializer.deserialize(processorContext, record);
..
}
{code}
 

 

sourceNodeAndDeserializer.deserialize calls
{code:java}
org.apache.kafka.streams.processor.internals.RecordDeserializer::deserialize()
{
sourceNode.deserializeValue(rawRecord.topic(), rawRecord.headers(), 
rawRecord.value()), rawRecord.headers());
}
 
eventually reaches
org.springframework.kafka.support.serializer.JsonDeserializer::deserialize(String
 topic, Headers headers, byte[] data){
 if (data == null) {
 return null;
 }
 JavaType javaType = this.typeMapper.toJavaType(headers);
 if (javaType == null) {
 Assert.state(this.targetType != null, "No type information in headers and no 
default type provided");
 return deserialize(topic, data);
 }
 else {
 try {
 return this.objectMapper.readerFor(javaType).readValue(data);
 }
 catch (IOException e) {
 throw new SerializationException("Can't deserialize data [" + 
Arrays.toString(data) +
 "] from topic [" + topic + "]", e);
 }
 }
 }
 
{code}
 

JavaType javaType = this.typeMapper.toJavaType(headers) calls extract first 
javaType -which is not one that should be used to deserialize the object (it 
gets ChannelConfigNew  rather than ChannelConfig). As result the object is not 
retrieved properly - all fields are null.

 

 

 

 

 



--
This message was sent by Atlassian Jira
(v8.3.2#803003)

Reply via email to