[
https://issues.apache.org/jira/browse/KAFKA-20173?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Uladzislau Blok updated KAFKA-20173:
------------------------------------
Description:
Example of expected change:
{{SessionWindowedDeserializer#deserialize(final String topic, final byte[]
data)}}
This is wrapper which is using underlying deserealizer:
{{SessionKeySchema.from}} -> {{extractKey}} -> {{deserializer.deserialize}}
The idea of this ticket is to find all call to serde method (serealize /
deserialize) and propagate headers (or create them from bytes if needed)
was:
List of all checked calls for deserialization (search by pattern {{
deserialize( }} ):
* {{SessionWindowedDeserializer#deserialize(final String topic, final byte[]
data)}} - there is no headers, nothing to propagate;
* {{TimeWindowedDeserializer#deserialize(final String topic, final byte[]
data)}} - there is no headers, nothing to propagate;
* {{ChangedDeserializer#deserialize(final String topic, final Headers headers,
final byte[] data)}} - make use of headers.
* {{FullChangeSerde#deserializeParts(final String topic, final Change<byte[]>
serialChange)}} - no headers, nothing to propagate
* {{CombinedKeySchema#fromBytes(final Bytes data)}} - no headers, nothing to
propagate
* {{SubscriptionResponseWrapperSerde#deserialize(final String topic, final
byte[] data)}} - no headers, nothing to propagate
* {{SubscriptionWrapperSerde#deserialize(final String ignored, final byte[]
data)}} - no headers, nothing to propagate
* {{ChangelogRecordDeserializationHelper#applyChecksAndUpdatePosition(final
ConsumerRecord<byte[], byte[]> record, final boolean consistencyEnabled, final
Position position)}} - make use of headers.
* {{GlobalStateManagerImpl#reprocessState(... many params)}} - Not propagate
headers to deserializer. {*}Is it case a to be fixed{*}?
{{reprocessFactory.keyDeserializer().deserialize(record.topic(), record.key())}}
* {{GlobalStateUpdateTask#update(final ConsumerRecord<byte[], byte[]>
record)}} - It uses different type of deserializer
({{{}RecordDeserializer{}}}). From my understanding we don't need headers here
* {{ProcessorMetadata#deserialize(final byte[] metaDataBytes)}} - no headers,
nothing to propagate
* {{ProcessorRecordContext#deserialize(final ByteBuffer buffer)}} - different
type of deserialiser. Not related
* {{RecordDeserializer#deserialize(final ProcessorContext<?, ?>
processorContext,final ConsumerRecord<byte[], byte[]> rawRecord)}} - make use
of headers.
* {{RecordQueue#updateHead}} - uses different type of deserializer
* {{SourceNode#deserializeKey}} - make use of headers
* {{SourceNode#deserializeValue}} - make use of headers
* {{TopicPartitionMetadata#decode(final String encryptedString)}} - uses
different type of deserializer
* {{StateSerdes#keyFrom(final byte[] rawKey, final Headers headers)}} - make
use of headers
* {{{}StateSerdes#valueFrom(final byte[] rawValue, final Headers headers){}}}-
make use of headers
* {{{}BufferValue#deserialize(final ByteBuffer buffer{}}}) - different type of
deserializer
* {{ContextualRecord#deserialize(final ByteBuffer buffer)}} - different type
of deserializer
* HeadersDeserializer... header deserializer itself
* {{{}InMemoryTimeOrderedKeyValueChangeBuffer#evictWhile(final
Supplier<Boolean> predicate, final Consumer<Eviction<K, Change<V>>>
callback){}}}- doesn't use headers, while they can be accessed from context.
*Is it a case?*
*
{{{}InMemoryTimeOrderedKeyValueChangeBuffer#evictWhilepriorValueForBuffered(final
K key){}}}- doesn't use headers, while they can be accessed from context. *Is
it a case?*
> Revisit KS serde code to check if headers are passed correctly
> --------------------------------------------------------------
>
> Key: KAFKA-20173
> URL: https://issues.apache.org/jira/browse/KAFKA-20173
> Project: Kafka
> Issue Type: Sub-task
> Components: streams
> Reporter: TengYao Chi
> Assignee: Uladzislau Blok
> Priority: Major
> Fix For: 4.3.0
>
>
> Example of expected change:
> {{SessionWindowedDeserializer#deserialize(final String topic, final byte[]
> data)}}
> This is wrapper which is using underlying deserealizer:
> {{SessionKeySchema.from}} -> {{extractKey}} -> {{deserializer.deserialize}}
> The idea of this ticket is to find all call to serde method (serealize /
> deserialize) and propagate headers (or create them from bytes if needed)
--
This message was sent by Atlassian Jira
(v8.20.10#820010)