[ 
https://issues.apache.org/jira/browse/KAFKA-8879?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vlad Olevsky updated KAFKA-8879:
--------------------------------
    Description: 
We read messages from input topic, transform messages 
(ChannelConfigNew->ChannelConfig) and send it to another topic:

 
{code:java}
builder.stream(chlConfigValidationKafkaConfig.getChlConfigValidationKafkaSrcTopic(),
 Consumed.with(Serdes.String(), new JsonSerde<>(ChannelConfigNew.class)))
 .transform(() -> new ChannelConfigProcessor(chlConfigValidationKafkaConfig, 
prometheusCounter, channelConfigPostDataHelper))
 
.to(chlConfigValidationKafkaConfig.getChlConfigValidationKafkaDestinationTopic(),
 Produced.with(Serdes.String(), new JsonSerde<>(ChannelConfig.class)));

{code}
where ChannelConfigProcessor (only essential parts are shown)

 
{code:java}
public class ChannelConfigProcessor implements Transformer<String, 
ChannelConfigNew, KeyValue<String, ChannelConfig>> {
 public KeyValue<String, ChannelConfig> transform(String ccid, ChannelConfigNew 
channelConfigNew) {
 return new KeyValue<>(ccid, convert(channelConfigNew));
 }
 
 private ChannelConfig convert(ChannelConfigNew channelConfigNew){
 ...
 }

}
 
{code}
Both input (ChannelConfigNew ) and output(ChannelConfig) javaTypes are stored 
in the headers of the message that is sent to another topic. Input javaType 
(ChannelConfigNew) already presented in headers when serialization is called in
{code:java}
org.apache.kafka.streams.processor.internals.RecordCollectorImpl::send(final 
String topic,
 final K key,
 final V value,
 final Headers headers,
 final Integer partition,
 final Long timestamp,
 final Serializer<K> keySerializer,
 final Serializer<V> valueSerializer){
   final byte[] valBytes = valueSerializer.serialize(topic, headers, value);
}
 
{code}
output javaType (ChannelConfig) is added to headers inside 
valueSerializer.serialize() method:

org.springframework.kafka.support.serializer.JsonSerializer

 
{code:java}
@Override
 public byte[] serialize(String topic, Headers headers, T data) {
 if (data == null) {
 return null;
 }
 if (this.addTypeInfo && headers != null) {
 this.typeMapper.fromJavaType(this.objectMapper.constructType(data.getClass()), 
headers);
 }
 return serialize(topic, data);
 }
 
{code}
 

On other side we have GlobalKTable with processor that retrieves first 
(ChannelConfigNew - wrong one) javaType from headers  and tries to deserialize  
data using this type:

 
{code:java}
org.apache.kafka.streams.processor.internals.GlobalStateUpdateTask
public void update(final ConsumerRecord<byte[], byte[]> record) {
...
 final ConsumerRecord<Object, Object> deserialized = 
sourceNodeAndDeserializer.deserialize(processorContext, record);
..
}
{code}
 

 

sourceNodeAndDeserializer.deserialize calls
{code:java}
org.apache.kafka.streams.processor.internals.RecordDeserializer::deserialize()
{
sourceNode.deserializeValue(rawRecord.topic(), rawRecord.headers(), 
rawRecord.value()), rawRecord.headers());
}
 
eventually reaches
org.springframework.kafka.support.serializer.JsonDeserializer::deserialize(String
 topic, Headers headers, byte[] data){
 if (data == null) {
 return null;
 }
 JavaType javaType = this.typeMapper.toJavaType(headers);
 if (javaType == null) {
 Assert.state(this.targetType != null, "No type information in headers and no 
default type provided");
 return deserialize(topic, data);
 }
 else {
 try {
 return this.objectMapper.readerFor(javaType).readValue(data);
 }
 catch (IOException e) {
 throw new SerializationException("Can't deserialize data [" + 
Arrays.toString(data) +
 "] from topic [" + topic + "]", e);
 }
 }
 }
 
{code}
 

*JavaType javaType = this.typeMapper.toJavaType(headers)* extracts first 
javaType -which is not one that should be used to deserialize the object (it 
gets ChannelConfigNew  rather than ChannelConfig). As result the object is not 
retrieved properly - all fields are null.

 

 

 

 

 

  was:
We read messages from input topic, transform messages 
(ChannelConfigNew->ChannelConfig) and send it to another topic:

 
{code:java}
builder.stream(chlConfigValidationKafkaConfig.getChlConfigValidationKafkaSrcTopic(),
 Consumed.with(Serdes.String(), new JsonSerde<>(ChannelConfigNew.class)))
 .transform(() -> new ChannelConfigProcessor(chlConfigValidationKafkaConfig, 
prometheusCounter, channelConfigPostDataHelper))
 
.to(chlConfigValidationKafkaConfig.getChlConfigValidationKafkaDestinationTopic(),
 Produced.with(Serdes.String(), new JsonSerde<>(ChannelConfig.class)));

{code}
where ChannelConfigProcessor (only essential part is shown)

 
{code:java}
public class ChannelConfigProcessor implements Transformer<String, 
ChannelConfigNew, KeyValue<String, ChannelConfig>> {
 public KeyValue<String, ChannelConfig> transform(String ccid, ChannelConfigNew 
channelConfigNew) {
 return new KeyValue<>(ccid, convert(channelConfigNew));
 }
 
 private ChannelConfig convert(ChannelConfigNew channelConfigNew){
 ...
 }

}
 
{code}
Both input (ChannelConfigNew ) and output(ChannelConfig) javaTypes are stored 
in the headers of the message sent to another topic. Input type already 
presented in headers when serialization is called in
{code:java}
org.apache.kafka.streams.processor.internals.RecordCollectorImpl::send(final 
String topic,
 final K key,
 final V value,
 final Headers headers,
 final Integer partition,
 final Long timestamp,
 final Serializer<K> keySerializer,
 final Serializer<V> valueSerializer){
   final byte[] valBytes = valueSerializer.serialize(topic, headers, value);
}
 
{code}
output type is added to headers inside valueSerializer.serialize() method:

org.springframework.kafka.support.serializer.JsonSerializer

 
{code:java}
@Override
 public byte[] serialize(String topic, Headers headers, T data) {
 if (data == null) {
 return null;
 }
 if (this.addTypeInfo && headers != null) {
 this.typeMapper.fromJavaType(this.objectMapper.constructType(data.getClass()), 
headers);
 }
 return serialize(topic, data);
 }
 
{code}
 

On other side we have GlobalKTable with processor that retrieves first 
(ChannelConfigNew - wrong one) javaType from header  and try to deserialize  
data using this type:

 
{code:java}
org.apache.kafka.streams.processor.internals.GlobalStateUpdateTask
public void update(final ConsumerRecord<byte[], byte[]> record) {
...
 final ConsumerRecord<Object, Object> deserialized = 
sourceNodeAndDeserializer.deserialize(processorContext, record);
..
}
{code}
 

 

sourceNodeAndDeserializer.deserialize calls
{code:java}
org.apache.kafka.streams.processor.internals.RecordDeserializer::deserialize()
{
sourceNode.deserializeValue(rawRecord.topic(), rawRecord.headers(), 
rawRecord.value()), rawRecord.headers());
}
 
eventually reaches
org.springframework.kafka.support.serializer.JsonDeserializer::deserialize(String
 topic, Headers headers, byte[] data){
 if (data == null) {
 return null;
 }
 JavaType javaType = this.typeMapper.toJavaType(headers);
 if (javaType == null) {
 Assert.state(this.targetType != null, "No type information in headers and no 
default type provided");
 return deserialize(topic, data);
 }
 else {
 try {
 return this.objectMapper.readerFor(javaType).readValue(data);
 }
 catch (IOException e) {
 throw new SerializationException("Can't deserialize data [" + 
Arrays.toString(data) +
 "] from topic [" + topic + "]", e);
 }
 }
 }
 
{code}
 

JavaType javaType = this.typeMapper.toJavaType(headers) calls extract first 
javaType -which is not one that should be used to deserialize the object (it 
gets ChannelConfigNew  rather than ChannelConfig). As result the object is not 
retrieved properly - all fields are null.

 

 

 

 

 


> GlobalStateUpdateTask uses wrong javaType to deserialize value
> --------------------------------------------------------------
>
>                 Key: KAFKA-8879
>                 URL: https://issues.apache.org/jira/browse/KAFKA-8879
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 2.3.0
>            Reporter: Vlad Olevsky
>            Priority: Major
>
> We read messages from input topic, transform messages 
> (ChannelConfigNew->ChannelConfig) and send it to another topic:
>  
> {code:java}
> builder.stream(chlConfigValidationKafkaConfig.getChlConfigValidationKafkaSrcTopic(),
>  Consumed.with(Serdes.String(), new JsonSerde<>(ChannelConfigNew.class)))
>  .transform(() -> new ChannelConfigProcessor(chlConfigValidationKafkaConfig, 
> prometheusCounter, channelConfigPostDataHelper))
>  
> .to(chlConfigValidationKafkaConfig.getChlConfigValidationKafkaDestinationTopic(),
>  Produced.with(Serdes.String(), new JsonSerde<>(ChannelConfig.class)));
> {code}
> where ChannelConfigProcessor (only essential parts are shown)
>  
> {code:java}
> public class ChannelConfigProcessor implements Transformer<String, 
> ChannelConfigNew, KeyValue<String, ChannelConfig>> {
>  public KeyValue<String, ChannelConfig> transform(String ccid, 
> ChannelConfigNew channelConfigNew) {
>  return new KeyValue<>(ccid, convert(channelConfigNew));
>  }
>  
>  private ChannelConfig convert(ChannelConfigNew channelConfigNew){
>  ...
>  }
> }
>  
> {code}
> Both input (ChannelConfigNew ) and output(ChannelConfig) javaTypes are stored 
> in the headers of the message that is sent to another topic. Input javaType 
> (ChannelConfigNew) already presented in headers when serialization is called 
> in
> {code:java}
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl::send(final 
> String topic,
>  final K key,
>  final V value,
>  final Headers headers,
>  final Integer partition,
>  final Long timestamp,
>  final Serializer<K> keySerializer,
>  final Serializer<V> valueSerializer){
>    final byte[] valBytes = valueSerializer.serialize(topic, headers, value);
> }
>  
> {code}
> output javaType (ChannelConfig) is added to headers inside 
> valueSerializer.serialize() method:
> org.springframework.kafka.support.serializer.JsonSerializer
>  
> {code:java}
> @Override
>  public byte[] serialize(String topic, Headers headers, T data) {
>  if (data == null) {
>  return null;
>  }
>  if (this.addTypeInfo && headers != null) {
>  
> this.typeMapper.fromJavaType(this.objectMapper.constructType(data.getClass()),
>  headers);
>  }
>  return serialize(topic, data);
>  }
>  
> {code}
>  
> On other side we have GlobalKTable with processor that retrieves first 
> (ChannelConfigNew - wrong one) javaType from headers  and tries to 
> deserialize  data using this type:
>  
> {code:java}
> org.apache.kafka.streams.processor.internals.GlobalStateUpdateTask
> public void update(final ConsumerRecord<byte[], byte[]> record) {
> ...
>  final ConsumerRecord<Object, Object> deserialized = 
> sourceNodeAndDeserializer.deserialize(processorContext, record);
> ..
> }
> {code}
>  
>  
> sourceNodeAndDeserializer.deserialize calls
> {code:java}
> org.apache.kafka.streams.processor.internals.RecordDeserializer::deserialize()
> {
> sourceNode.deserializeValue(rawRecord.topic(), rawRecord.headers(), 
> rawRecord.value()), rawRecord.headers());
> }
>  
> eventually reaches
> org.springframework.kafka.support.serializer.JsonDeserializer::deserialize(String
>  topic, Headers headers, byte[] data){
>  if (data == null) {
>  return null;
>  }
>  JavaType javaType = this.typeMapper.toJavaType(headers);
>  if (javaType == null) {
>  Assert.state(this.targetType != null, "No type information in headers and no 
> default type provided");
>  return deserialize(topic, data);
>  }
>  else {
>  try {
>  return this.objectMapper.readerFor(javaType).readValue(data);
>  }
>  catch (IOException e) {
>  throw new SerializationException("Can't deserialize data [" + 
> Arrays.toString(data) +
>  "] from topic [" + topic + "]", e);
>  }
>  }
>  }
>  
> {code}
>  
> *JavaType javaType = this.typeMapper.toJavaType(headers)* extracts first 
> javaType -which is not one that should be used to deserialize the object (it 
> gets ChannelConfigNew  rather than ChannelConfig). As result the object is 
> not retrieved properly - all fields are null.
>  
>  
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.2#803003)

Reply via email to