mjsax commented on code in PR #21756:
URL: https://github.com/apache/kafka/pull/21756#discussion_r2936122432


##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/FullChangeSerde.java:
##########


Review Comment:
   I think we need to pass in a Headers parameter and use for both old and new 
value.
   
   The caller should make a decision what to pass. If <T> == 
HeaderValueTimestamp whatever header get's passed, would be ignored by 
innerSerializer which would be HeaderValueTimestampSserializer which take the 
header from data.oldValue / data.newValue which are both of type 
`HeaderValueTimestamp`.



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/Change.java:
##########
@@ -16,21 +16,32 @@
  */
 package org.apache.kafka.streams.kstream.internals;
 
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+
 import java.util.Objects;
 
 public class Change<T> {
 
     public final T newValue;
+    public final Headers newHeaders;
     public final T oldValue;
+    public final Headers oldHeaders;

Review Comment:
   I don't think we need to add `newHeader` or `oldHeader`. Cf  discussion on 
https://issues.apache.org/jira/browse/KAFKA-20311



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/FullChangeSerde.java:
##########


Review Comment:
   I think we need to pass in a `Headers` parameter and use for both old and 
new value.
   
   The caller should make a decision what to pass. For the header-case (ie, if 
the serialized `byte[]` arrays hold a ValueTimestampeHeader object) whatever 
header get's passed, would be ignored by `innerDeserializer` which would be 
`HeaderValueTimestampDeserializer` which take the header from the `byte[]` 
array.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to