[ 
https://issues.apache.org/jira/browse/KAFKA-20173?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Uladzislau Blok updated KAFKA-20173:
------------------------------------
    Description: 
Example of expected change:

{{SessionWindowedDeserializer#deserialize(final String topic, final byte[] 
data)}}

This is wrapper which is using underlying deserealizer: 
{{SessionKeySchema.from}} -> {{extractKey}} -> {{deserializer.deserialize}}

The idea of this ticket is to find all call to serde method (serealize / 
deserialize) and propagate headers (or create them from bytes if needed)

  was:
List of all checked calls for deserialization (search by pattern {{ 
deserialize( }} ):
 * {{SessionWindowedDeserializer#deserialize(final String topic, final byte[] 
data)}} - there is no headers, nothing to propagate;
 * {{TimeWindowedDeserializer#deserialize(final String topic, final byte[] 
data)}} - there is no headers, nothing to propagate;
 * {{ChangedDeserializer#deserialize(final String topic, final Headers headers, 
final byte[] data)}}  -  make use of headers.
 * {{FullChangeSerde#deserializeParts(final String topic, final Change<byte[]> 
serialChange)}} - no headers, nothing to propagate
 * {{CombinedKeySchema#fromBytes(final Bytes data)}} - no headers, nothing to 
propagate
 * {{SubscriptionResponseWrapperSerde#deserialize(final String topic, final 
byte[] data)}} - no headers, nothing to propagate
 * {{SubscriptionWrapperSerde#deserialize(final String ignored, final byte[] 
data)}} - no headers, nothing to propagate
 * {{ChangelogRecordDeserializationHelper#applyChecksAndUpdatePosition(final 
ConsumerRecord<byte[], byte[]> record, final boolean consistencyEnabled, final 
Position position)}} - make use of headers.
 * {{GlobalStateManagerImpl#reprocessState(... many params)}} - Not propagate 
headers to deserializer. {*}Is it case a to be fixed{*}? 
{{reprocessFactory.keyDeserializer().deserialize(record.topic(), record.key())}}
 * {{GlobalStateUpdateTask#update(final ConsumerRecord<byte[], byte[]> 
record)}} - It uses different type of deserializer 
({{{}RecordDeserializer{}}}). From my understanding we don't need headers here
 * {{ProcessorMetadata#deserialize(final byte[] metaDataBytes)}} - no headers, 
nothing to propagate
 * {{ProcessorRecordContext#deserialize(final ByteBuffer buffer)}} - different 
type of deserialiser. Not related
 * {{RecordDeserializer#deserialize(final ProcessorContext<?, ?> 
processorContext,final ConsumerRecord<byte[], byte[]> rawRecord)}} - make use 
of headers. 
 * {{RecordQueue#updateHead}} - uses different type of deserializer
 * {{SourceNode#deserializeKey}} - make use of headers
 * {{SourceNode#deserializeValue}} - make use of headers
 * {{TopicPartitionMetadata#decode(final String encryptedString)}} - uses 
different type of deserializer
 * {{StateSerdes#keyFrom(final byte[] rawKey, final Headers headers)}} - make 
use of headers
 * {{{}StateSerdes#valueFrom(final byte[] rawValue, final Headers headers){}}}- 
make use of headers
 * {{{}BufferValue#deserialize(final ByteBuffer buffer{}}}) - different type of 
deserializer
 * {{ContextualRecord#deserialize(final ByteBuffer buffer)}} - different type 
of deserializer
 * HeadersDeserializer... header deserializer itself
 * {{{}InMemoryTimeOrderedKeyValueChangeBuffer#evictWhile(final 
Supplier<Boolean> predicate, final Consumer<Eviction<K, Change<V>>> 
callback){}}}- doesn't use headers, while they can be accessed from context. 
*Is it a case?*
 * 
{{{}InMemoryTimeOrderedKeyValueChangeBuffer#evictWhilepriorValueForBuffered(final
 K key){}}}- doesn't use headers, while they can be accessed from context. *Is 
it a case?*


> Revisit KS serde code to check if headers are passed correctly
> --------------------------------------------------------------
>
>                 Key: KAFKA-20173
>                 URL: https://issues.apache.org/jira/browse/KAFKA-20173
>             Project: Kafka
>          Issue Type: Sub-task
>          Components: streams
>            Reporter: TengYao Chi
>            Assignee: Uladzislau Blok
>            Priority: Major
>             Fix For: 4.3.0
>
>
> Example of expected change:
> {{SessionWindowedDeserializer#deserialize(final String topic, final byte[] 
> data)}}
> This is wrapper which is using underlying deserealizer: 
> {{SessionKeySchema.from}} -> {{extractKey}} -> {{deserializer.deserialize}}
> The idea of this ticket is to find all call to serde method (serealize / 
> deserialize) and propagate headers (or create them from bytes if needed)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to