[ 
https://issues.apache.org/jira/browse/KAFKA-8879?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vlad Olevsky resolved KAFKA-8879.
---------------------------------
    Resolution: Invalid

> GlobalStateUpdateTask uses wrong javaType to deserialize value
> --------------------------------------------------------------
>
>                 Key: KAFKA-8879
>                 URL: https://issues.apache.org/jira/browse/KAFKA-8879
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 2.3.0
>            Reporter: Vlad Olevsky
>            Priority: Major
>
> We read messages from input topic, transform messages 
> (ChannelConfigNew->ChannelConfig) and send it to another topic:
>  
> {code:java}
> builder.stream(chlConfigValidationKafkaConfig.getChlConfigValidationKafkaSrcTopic(),
>  Consumed.with(Serdes.String(), new JsonSerde<>(ChannelConfigNew.class)))
>  .transform(() -> new ChannelConfigProcessor(chlConfigValidationKafkaConfig, 
> prometheusCounter, channelConfigPostDataHelper))
>  
> .to(chlConfigValidationKafkaConfig.getChlConfigValidationKafkaDestinationTopic(),
>  Produced.with(Serdes.String(), new JsonSerde<>(ChannelConfig.class)));
> {code}
> where ChannelConfigProcessor (only essential parts are shown)
>  
> {code:java}
> public class ChannelConfigProcessor implements Transformer<String, 
> ChannelConfigNew, KeyValue<String, ChannelConfig>> {
>  public KeyValue<String, ChannelConfig> transform(String ccid, 
> ChannelConfigNew channelConfigNew) {
>  return new KeyValue<>(ccid, convert(channelConfigNew));
>  }
>  
>  private ChannelConfig convert(ChannelConfigNew channelConfigNew){
>  ...
>  }
> }
>  
> {code}
> Both input (ChannelConfigNew ) and output(ChannelConfig) javaTypes are stored 
> in the headers of the message that is sent to another topic. Input javaType 
> (ChannelConfigNew) already presented in headers when serialization is called 
> in
> {code:java}
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl::send(final 
> String topic,
>  final K key,
>  final V value,
>  final Headers headers,
>  final Integer partition,
>  final Long timestamp,
>  final Serializer<K> keySerializer,
>  final Serializer<V> valueSerializer){
>    final byte[] valBytes = valueSerializer.serialize(topic, headers, value);
> }
>  
> {code}
> output javaType (ChannelConfig) is added to headers inside 
> valueSerializer.serialize() method:
> org.springframework.kafka.support.serializer.JsonSerializer
>  
> {code:java}
> @Override
>  public byte[] serialize(String topic, Headers headers, T data) {
>  if (data == null) {
>  return null;
>  }
>  if (this.addTypeInfo && headers != null) {
>  
> this.typeMapper.fromJavaType(this.objectMapper.constructType(data.getClass()),
>  headers);
>  }
>  return serialize(topic, data);
>  }
>  
> {code}
>  
> On other side we have GlobalKTable with processor that retrieves first 
> (ChannelConfigNew - wrong one) javaType from headers  and tries to 
> deserialize  data using this type:
>  
> {code:java}
> org.apache.kafka.streams.processor.internals.GlobalStateUpdateTask
> public void update(final ConsumerRecord<byte[], byte[]> record) {
> ...
>  final ConsumerRecord<Object, Object> deserialized = 
> sourceNodeAndDeserializer.deserialize(processorContext, record);
> ..
> }
> {code}
>  
>  
> sourceNodeAndDeserializer.deserialize calls
> {code:java}
> org.apache.kafka.streams.processor.internals.RecordDeserializer::deserialize()
> {
> sourceNode.deserializeValue(rawRecord.topic(), rawRecord.headers(), 
> rawRecord.value()), rawRecord.headers());
> }
>  
> eventually reaches
> org.springframework.kafka.support.serializer.JsonDeserializer::deserialize(String
>  topic, Headers headers, byte[] data){
>  if (data == null) {
>  return null;
>  }
>  JavaType javaType = this.typeMapper.toJavaType(headers);
>  if (javaType == null) {
>  Assert.state(this.targetType != null, "No type information in headers and no 
> default type provided");
>  return deserialize(topic, data);
>  }
>  else {
>  try {
>  return this.objectMapper.readerFor(javaType).readValue(data);
>  }
>  catch (IOException e) {
>  throw new SerializationException("Can't deserialize data [" + 
> Arrays.toString(data) +
>  "] from topic [" + topic + "]", e);
>  }
>  }
>  }
>  
> {code}
>  
> *JavaType javaType = this.typeMapper.toJavaType(headers)* extracts first 
> javaType -which is not one that should be used to deserialize the object (it 
> gets ChannelConfigNew  rather than ChannelConfig). As result the object is 
> not retrieved properly - all fields are null.
>  
>  
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.2#803003)

Reply via email to