mjsax commented on code in PR #21411:
URL: https://github.com/apache/kafka/pull/21411#discussion_r2776294179


##########
streams/src/main/java/org/apache/kafka/streams/state/StateSerdes.java:
##########
@@ -149,26 +151,49 @@ public String topic() {
      * @param rawKey  the key as raw bytes
      * @return        the key as typed object

Review Comment:
   Add:
   ```
   @deprecated Since 4.3. Use {@link #keyFrom(byte[], Headers} instead.
   ```
   Similar elsewhere.



##########
streams/src/test/java/org/apache/kafka/streams/state/StateSerdesTest.java:
##########
@@ -137,4 +141,142 @@ public void shouldThrowIfIncompatibleSerdeForKey() throws 
ClassNotFoundException
                     "Change the default Serdes in StreamConfig or provide 
correct Serdes via method parameters."));
     }
 
+    @Test
+    public void shouldSerializeAndDeserializeKeyWithHeaders() {
+        final StateSerdes<String, String> stateSerdes =
+            new StateSerdes<>("test-topic", Serdes.String(), Serdes.String());

Review Comment:
   Should we mock the key serde and verify that the provided headers are passed 
in correctly?



##########
streams/src/main/java/org/apache/kafka/streams/state/StateSerdes.java:
##########
@@ -213,4 +260,34 @@ public byte[] rawValue(final V value) {
                     e);
         }
     }
+
+    /**
+     * Serialize the given value.
+     *
+     * @param value  the value to be serialized
+     * @return       the serialized value
+     */
+    @SuppressWarnings("rawtypes")
+    public byte[] rawValue(final V value, final Headers headers) {

Review Comment:
   Same question.



##########
streams/src/test/java/org/apache/kafka/streams/state/StateSerdesTest.java:
##########
@@ -137,4 +141,142 @@ public void shouldThrowIfIncompatibleSerdeForKey() throws 
ClassNotFoundException
                     "Change the default Serdes in StreamConfig or provide 
correct Serdes via method parameters."));
     }
 
+    @Test
+    public void shouldSerializeAndDeserializeKeyWithHeaders() {
+        final StateSerdes<String, String> stateSerdes =
+            new StateSerdes<>("test-topic", Serdes.String(), Serdes.String());
+        final Headers headers = new RecordHeaders()
+            .add("header-key", "header-value".getBytes());
+
+        final String key = "test-key";
+        final byte[] serialized = stateSerdes.rawKey(key, headers);
+        final String deserialized = stateSerdes.keyFrom(serialized, headers);
+
+        assertEquals(key, deserialized);
+    }
+
+    @Test
+    public void shouldSerializeAndDeserializeValueWithHeaders() {
+        final StateSerdes<String, String> stateSerdes =
+            new StateSerdes<>("test-topic", Serdes.String(), Serdes.String());

Review Comment:
   Same question for value case.



##########
streams/src/test/java/org/apache/kafka/streams/state/StateSerdesTest.java:
##########
@@ -137,4 +141,142 @@ public void shouldThrowIfIncompatibleSerdeForKey() throws 
ClassNotFoundException
                     "Change the default Serdes in StreamConfig or provide 
correct Serdes via method parameters."));
     }
 
+    @Test
+    public void shouldSerializeAndDeserializeKeyWithHeaders() {
+        final StateSerdes<String, String> stateSerdes =
+            new StateSerdes<>("test-topic", Serdes.String(), Serdes.String());
+        final Headers headers = new RecordHeaders()
+            .add("header-key", "header-value".getBytes());
+
+        final String key = "test-key";
+        final byte[] serialized = stateSerdes.rawKey(key, headers);
+        final String deserialized = stateSerdes.keyFrom(serialized, headers);
+
+        assertEquals(key, deserialized);
+    }
+
+    @Test
+    public void shouldSerializeAndDeserializeValueWithHeaders() {
+        final StateSerdes<String, String> stateSerdes =
+            new StateSerdes<>("test-topic", Serdes.String(), Serdes.String());
+        final Headers headers = new RecordHeaders()
+            .add("header-key", "header-value".getBytes());
+
+        final String value = "test-value";
+        final byte[] serialized = stateSerdes.rawValue(value, headers);
+        final String deserialized = stateSerdes.valueFrom(serialized, headers);
+
+        assertEquals(value, deserialized);
+    }
+
+    @Test
+    public void shouldSerializeKeyWithNullHeaders() {
+        final StateSerdes<String, String> stateSerdes =
+            new StateSerdes<>("test-topic", Serdes.String(), Serdes.String());
+
+        final String key = "test-key";
+        final byte[] serializedWithNull = stateSerdes.rawKey(key, null);
+        final byte[] serializedWithoutHeaders = stateSerdes.rawKey(key);
+
+        assertArrayEquals(serializedWithoutHeaders, serializedWithNull);
+    }
+
+    @Test
+    public void shouldSerializeValueWithNullHeaders() {
+        final StateSerdes<String, String> stateSerdes =
+            new StateSerdes<>("test-topic", Serdes.String(), Serdes.String());
+
+        final String value = "test-value";
+        final byte[] serializedWithNull = stateSerdes.rawValue(value, null);
+        final byte[] serializedWithoutHeaders = stateSerdes.rawValue(value);
+
+        assertArrayEquals(serializedWithoutHeaders, serializedWithNull);
+    }
+
+    @Test
+    public void shouldDeserializeKeyWithNullHeaders() {
+        final StateSerdes<String, String> stateSerdes =
+            new StateSerdes<>("test-topic", Serdes.String(), Serdes.String());
+
+        final String key = "test-key";
+        final byte[] serialized = stateSerdes.rawKey(key);
+
+        final String deserializedWithNull = stateSerdes.keyFrom(serialized, 
null);
+        final String deserializedWithoutHeaders = 
stateSerdes.keyFrom(serialized);
+
+        assertEquals(deserializedWithoutHeaders, deserializedWithNull);
+    }
+
+    @Test
+    public void shouldDeserializeValueWithNullHeaders() {
+        final StateSerdes<String, String> stateSerdes =
+            new StateSerdes<>("test-topic", Serdes.String(), Serdes.String());
+
+        final String value = "test-value";
+        final byte[] serialized = stateSerdes.rawValue(value);
+
+        final String deserializedWithNull = stateSerdes.valueFrom(serialized, 
null);
+        final String deserializedWithoutHeaders = 
stateSerdes.valueFrom(serialized);
+
+        assertEquals(deserializedWithoutHeaders, deserializedWithNull);
+    }
+
+    @Test
+    public void shouldSerializeKeyWithEmptyHeaders() {

Review Comment:
   This case should be covered implicitly (if we let `rawKey(key)` call 
`rawKey(key, new RecordHeaders())`) via the existing test for `rawKey(key)`. So 
might be redundant?



##########
streams/src/test/java/org/apache/kafka/streams/state/StateSerdesTest.java:
##########
@@ -137,4 +141,142 @@ public void shouldThrowIfIncompatibleSerdeForKey() throws 
ClassNotFoundException
                     "Change the default Serdes in StreamConfig or provide 
correct Serdes via method parameters."));
     }
 
+    @Test
+    public void shouldSerializeAndDeserializeKeyWithHeaders() {
+        final StateSerdes<String, String> stateSerdes =
+            new StateSerdes<>("test-topic", Serdes.String(), Serdes.String());
+        final Headers headers = new RecordHeaders()
+            .add("header-key", "header-value".getBytes());
+
+        final String key = "test-key";
+        final byte[] serialized = stateSerdes.rawKey(key, headers);
+        final String deserialized = stateSerdes.keyFrom(serialized, headers);
+
+        assertEquals(key, deserialized);
+    }
+
+    @Test
+    public void shouldSerializeAndDeserializeValueWithHeaders() {
+        final StateSerdes<String, String> stateSerdes =
+            new StateSerdes<>("test-topic", Serdes.String(), Serdes.String());
+        final Headers headers = new RecordHeaders()
+            .add("header-key", "header-value".getBytes());
+
+        final String value = "test-value";
+        final byte[] serialized = stateSerdes.rawValue(value, headers);
+        final String deserialized = stateSerdes.valueFrom(serialized, headers);
+
+        assertEquals(value, deserialized);
+    }
+
+    @Test
+    public void shouldSerializeKeyWithNullHeaders() {
+        final StateSerdes<String, String> stateSerdes =
+            new StateSerdes<>("test-topic", Serdes.String(), Serdes.String());
+
+        final String key = "test-key";
+        final byte[] serializedWithNull = stateSerdes.rawKey(key, null);
+        final byte[] serializedWithoutHeaders = stateSerdes.rawKey(key);
+
+        assertArrayEquals(serializedWithoutHeaders, serializedWithNull);
+    }
+
+    @Test
+    public void shouldSerializeValueWithNullHeaders() {
+        final StateSerdes<String, String> stateSerdes =
+            new StateSerdes<>("test-topic", Serdes.String(), Serdes.String());
+
+        final String value = "test-value";
+        final byte[] serializedWithNull = stateSerdes.rawValue(value, null);
+        final byte[] serializedWithoutHeaders = stateSerdes.rawValue(value);
+
+        assertArrayEquals(serializedWithoutHeaders, serializedWithNull);
+    }
+
+    @Test
+    public void shouldDeserializeKeyWithNullHeaders() {

Review Comment:
   same



##########
streams/src/test/java/org/apache/kafka/streams/state/StateSerdesTest.java:
##########
@@ -137,4 +141,142 @@ public void shouldThrowIfIncompatibleSerdeForKey() throws 
ClassNotFoundException
                     "Change the default Serdes in StreamConfig or provide 
correct Serdes via method parameters."));
     }
 
+    @Test
+    public void shouldSerializeAndDeserializeKeyWithHeaders() {
+        final StateSerdes<String, String> stateSerdes =
+            new StateSerdes<>("test-topic", Serdes.String(), Serdes.String());
+        final Headers headers = new RecordHeaders()
+            .add("header-key", "header-value".getBytes());
+
+        final String key = "test-key";
+        final byte[] serialized = stateSerdes.rawKey(key, headers);
+        final String deserialized = stateSerdes.keyFrom(serialized, headers);
+
+        assertEquals(key, deserialized);
+    }
+
+    @Test
+    public void shouldSerializeAndDeserializeValueWithHeaders() {
+        final StateSerdes<String, String> stateSerdes =
+            new StateSerdes<>("test-topic", Serdes.String(), Serdes.String());
+        final Headers headers = new RecordHeaders()
+            .add("header-key", "header-value".getBytes());
+
+        final String value = "test-value";
+        final byte[] serialized = stateSerdes.rawValue(value, headers);
+        final String deserialized = stateSerdes.valueFrom(serialized, headers);
+
+        assertEquals(value, deserialized);
+    }
+
+    @Test
+    public void shouldSerializeKeyWithNullHeaders() {
+        final StateSerdes<String, String> stateSerdes =
+            new StateSerdes<>("test-topic", Serdes.String(), Serdes.String());
+
+        final String key = "test-key";
+        final byte[] serializedWithNull = stateSerdes.rawKey(key, null);
+        final byte[] serializedWithoutHeaders = stateSerdes.rawKey(key);
+
+        assertArrayEquals(serializedWithoutHeaders, serializedWithNull);
+    }
+
+    @Test
+    public void shouldSerializeValueWithNullHeaders() {

Review Comment:
   Same question for value case



##########
streams/src/test/java/org/apache/kafka/streams/state/StateSerdesTest.java:
##########
@@ -137,4 +141,142 @@ public void shouldThrowIfIncompatibleSerdeForKey() throws 
ClassNotFoundException
                     "Change the default Serdes in StreamConfig or provide 
correct Serdes via method parameters."));
     }
 
+    @Test
+    public void shouldSerializeAndDeserializeKeyWithHeaders() {
+        final StateSerdes<String, String> stateSerdes =
+            new StateSerdes<>("test-topic", Serdes.String(), Serdes.String());
+        final Headers headers = new RecordHeaders()
+            .add("header-key", "header-value".getBytes());
+
+        final String key = "test-key";
+        final byte[] serialized = stateSerdes.rawKey(key, headers);
+        final String deserialized = stateSerdes.keyFrom(serialized, headers);
+
+        assertEquals(key, deserialized);
+    }
+
+    @Test
+    public void shouldSerializeAndDeserializeValueWithHeaders() {
+        final StateSerdes<String, String> stateSerdes =
+            new StateSerdes<>("test-topic", Serdes.String(), Serdes.String());
+        final Headers headers = new RecordHeaders()
+            .add("header-key", "header-value".getBytes());
+
+        final String value = "test-value";
+        final byte[] serialized = stateSerdes.rawValue(value, headers);
+        final String deserialized = stateSerdes.valueFrom(serialized, headers);
+
+        assertEquals(value, deserialized);
+    }
+
+    @Test
+    public void shouldSerializeKeyWithNullHeaders() {

Review Comment:
   While we handle the `null` case, it's more of an guard -- in the end, we 
should never call `rawKey` with `null` in prod code --- so wondering if we need 
this test?



##########
streams/src/test/java/org/apache/kafka/streams/state/StateSerdesTest.java:
##########
@@ -137,4 +141,142 @@ public void shouldThrowIfIncompatibleSerdeForKey() throws 
ClassNotFoundException
                     "Change the default Serdes in StreamConfig or provide 
correct Serdes via method parameters."));
     }
 
+    @Test
+    public void shouldSerializeAndDeserializeKeyWithHeaders() {
+        final StateSerdes<String, String> stateSerdes =
+            new StateSerdes<>("test-topic", Serdes.String(), Serdes.String());
+        final Headers headers = new RecordHeaders()
+            .add("header-key", "header-value".getBytes());
+
+        final String key = "test-key";
+        final byte[] serialized = stateSerdes.rawKey(key, headers);
+        final String deserialized = stateSerdes.keyFrom(serialized, headers);
+
+        assertEquals(key, deserialized);
+    }
+
+    @Test
+    public void shouldSerializeAndDeserializeValueWithHeaders() {
+        final StateSerdes<String, String> stateSerdes =
+            new StateSerdes<>("test-topic", Serdes.String(), Serdes.String());
+        final Headers headers = new RecordHeaders()
+            .add("header-key", "header-value".getBytes());
+
+        final String value = "test-value";
+        final byte[] serialized = stateSerdes.rawValue(value, headers);
+        final String deserialized = stateSerdes.valueFrom(serialized, headers);
+
+        assertEquals(value, deserialized);
+    }
+
+    @Test
+    public void shouldSerializeKeyWithNullHeaders() {
+        final StateSerdes<String, String> stateSerdes =
+            new StateSerdes<>("test-topic", Serdes.String(), Serdes.String());
+
+        final String key = "test-key";
+        final byte[] serializedWithNull = stateSerdes.rawKey(key, null);
+        final byte[] serializedWithoutHeaders = stateSerdes.rawKey(key);
+
+        assertArrayEquals(serializedWithoutHeaders, serializedWithNull);
+    }
+
+    @Test
+    public void shouldSerializeValueWithNullHeaders() {
+        final StateSerdes<String, String> stateSerdes =
+            new StateSerdes<>("test-topic", Serdes.String(), Serdes.String());
+
+        final String value = "test-value";
+        final byte[] serializedWithNull = stateSerdes.rawValue(value, null);
+        final byte[] serializedWithoutHeaders = stateSerdes.rawValue(value);
+
+        assertArrayEquals(serializedWithoutHeaders, serializedWithNull);
+    }
+
+    @Test
+    public void shouldDeserializeKeyWithNullHeaders() {
+        final StateSerdes<String, String> stateSerdes =
+            new StateSerdes<>("test-topic", Serdes.String(), Serdes.String());
+
+        final String key = "test-key";
+        final byte[] serialized = stateSerdes.rawKey(key);
+
+        final String deserializedWithNull = stateSerdes.keyFrom(serialized, 
null);
+        final String deserializedWithoutHeaders = 
stateSerdes.keyFrom(serialized);
+
+        assertEquals(deserializedWithoutHeaders, deserializedWithNull);
+    }
+
+    @Test
+    public void shouldDeserializeValueWithNullHeaders() {

Review Comment:
   same



##########
streams/src/main/java/org/apache/kafka/streams/state/StateSerdes.java:
##########
@@ -149,26 +151,49 @@ public String topic() {
      * @param rawKey  the key as raw bytes
      * @return        the key as typed object
      */
+    @Deprecated
     public K keyFrom(final byte[] rawKey) {
         return keySerde.deserializer().deserialize(topic, rawKey);
     }
 
+    /**
+     * Deserialize the key from raw bytes.
+     *
+     * @param rawKey  the key as raw bytes
+     * @return        the key as typed object
+     */
+    public K keyFrom(final byte[] rawKey, final Headers headers) {
+        return keySerde.deserializer().deserialize(topic, headers, rawKey);
+    }
+
     /**
      * Deserialize the value from raw bytes.
      *
      * @param rawValue  the value as raw bytes
      * @return          the value as typed object
      */
+    @Deprecated
     public V valueFrom(final byte[] rawValue) {
         return valueSerde.deserializer().deserialize(topic, rawValue);
     }
 
+    /**
+     * Deserialize the value from raw bytes.
+     *
+     * @param rawValue  the value as raw bytes
+     * @return          the value as typed object
+     */
+    public V valueFrom(final byte[] rawValue, Headers headers) {

Review Comment:
   Missing `final`



##########
streams/src/main/java/org/apache/kafka/streams/state/StateSerdes.java:
##########
@@ -184,12 +209,34 @@ public byte[] rawKey(final K key) {
         }
     }
 
+    /**
+     * Serialize the given key.
+     *
+     * @param key  the key to be serialized
+     * @return     the serialized key
+     */
+    public byte[] rawKey(final K key, final Headers headers) {
+        try {

Review Comment:
   This is a lot of code duplication -- should we update existing `rawKey(...)` 
to just call this new one, passing in `new RecordHeaders()` object instead?



##########
streams/src/test/java/org/apache/kafka/streams/state/StateSerdesTest.java:
##########
@@ -137,4 +141,142 @@ public void shouldThrowIfIncompatibleSerdeForKey() throws 
ClassNotFoundException
                     "Change the default Serdes in StreamConfig or provide 
correct Serdes via method parameters."));
     }
 
+    @Test
+    public void shouldSerializeAndDeserializeKeyWithHeaders() {
+        final StateSerdes<String, String> stateSerdes =
+            new StateSerdes<>("test-topic", Serdes.String(), Serdes.String());
+        final Headers headers = new RecordHeaders()
+            .add("header-key", "header-value".getBytes());
+
+        final String key = "test-key";
+        final byte[] serialized = stateSerdes.rawKey(key, headers);
+        final String deserialized = stateSerdes.keyFrom(serialized, headers);
+
+        assertEquals(key, deserialized);
+    }
+
+    @Test
+    public void shouldSerializeAndDeserializeValueWithHeaders() {
+        final StateSerdes<String, String> stateSerdes =
+            new StateSerdes<>("test-topic", Serdes.String(), Serdes.String());
+        final Headers headers = new RecordHeaders()
+            .add("header-key", "header-value".getBytes());
+
+        final String value = "test-value";
+        final byte[] serialized = stateSerdes.rawValue(value, headers);
+        final String deserialized = stateSerdes.valueFrom(serialized, headers);
+
+        assertEquals(value, deserialized);
+    }
+
+    @Test
+    public void shouldSerializeKeyWithNullHeaders() {
+        final StateSerdes<String, String> stateSerdes =
+            new StateSerdes<>("test-topic", Serdes.String(), Serdes.String());
+
+        final String key = "test-key";
+        final byte[] serializedWithNull = stateSerdes.rawKey(key, null);
+        final byte[] serializedWithoutHeaders = stateSerdes.rawKey(key);
+
+        assertArrayEquals(serializedWithoutHeaders, serializedWithNull);
+    }
+
+    @Test
+    public void shouldSerializeValueWithNullHeaders() {
+        final StateSerdes<String, String> stateSerdes =
+            new StateSerdes<>("test-topic", Serdes.String(), Serdes.String());
+
+        final String value = "test-value";
+        final byte[] serializedWithNull = stateSerdes.rawValue(value, null);
+        final byte[] serializedWithoutHeaders = stateSerdes.rawValue(value);
+
+        assertArrayEquals(serializedWithoutHeaders, serializedWithNull);
+    }
+
+    @Test
+    public void shouldDeserializeKeyWithNullHeaders() {
+        final StateSerdes<String, String> stateSerdes =
+            new StateSerdes<>("test-topic", Serdes.String(), Serdes.String());
+
+        final String key = "test-key";
+        final byte[] serialized = stateSerdes.rawKey(key);
+
+        final String deserializedWithNull = stateSerdes.keyFrom(serialized, 
null);
+        final String deserializedWithoutHeaders = 
stateSerdes.keyFrom(serialized);
+
+        assertEquals(deserializedWithoutHeaders, deserializedWithNull);
+    }
+
+    @Test
+    public void shouldDeserializeValueWithNullHeaders() {
+        final StateSerdes<String, String> stateSerdes =
+            new StateSerdes<>("test-topic", Serdes.String(), Serdes.String());
+
+        final String value = "test-value";
+        final byte[] serialized = stateSerdes.rawValue(value);
+
+        final String deserializedWithNull = stateSerdes.valueFrom(serialized, 
null);
+        final String deserializedWithoutHeaders = 
stateSerdes.valueFrom(serialized);
+
+        assertEquals(deserializedWithoutHeaders, deserializedWithNull);
+    }
+
+    @Test
+    public void shouldSerializeKeyWithEmptyHeaders() {
+        final StateSerdes<String, String> stateSerdes =
+            new StateSerdes<>("test-topic", Serdes.String(), Serdes.String());
+        final Headers emptyHeaders = new RecordHeaders();
+
+        final String key = "test-key";
+        final byte[] serialized = stateSerdes.rawKey(key, emptyHeaders);
+
+        assertNotNull(serialized);
+    }
+
+    @Test
+    public void shouldSerializeValueWithEmptyHeaders() {

Review Comment:
   Redundant as above?



##########
streams/src/test/java/org/apache/kafka/streams/state/StateSerdesTest.java:
##########
@@ -137,4 +141,142 @@ public void shouldThrowIfIncompatibleSerdeForKey() throws 
ClassNotFoundException
                     "Change the default Serdes in StreamConfig or provide 
correct Serdes via method parameters."));
     }
 
+    @Test
+    public void shouldSerializeAndDeserializeKeyWithHeaders() {
+        final StateSerdes<String, String> stateSerdes =
+            new StateSerdes<>("test-topic", Serdes.String(), Serdes.String());
+        final Headers headers = new RecordHeaders()
+            .add("header-key", "header-value".getBytes());
+
+        final String key = "test-key";
+        final byte[] serialized = stateSerdes.rawKey(key, headers);
+        final String deserialized = stateSerdes.keyFrom(serialized, headers);
+
+        assertEquals(key, deserialized);
+    }
+
+    @Test
+    public void shouldSerializeAndDeserializeValueWithHeaders() {
+        final StateSerdes<String, String> stateSerdes =
+            new StateSerdes<>("test-topic", Serdes.String(), Serdes.String());
+        final Headers headers = new RecordHeaders()
+            .add("header-key", "header-value".getBytes());
+
+        final String value = "test-value";
+        final byte[] serialized = stateSerdes.rawValue(value, headers);
+        final String deserialized = stateSerdes.valueFrom(serialized, headers);
+
+        assertEquals(value, deserialized);
+    }
+
+    @Test
+    public void shouldSerializeKeyWithNullHeaders() {
+        final StateSerdes<String, String> stateSerdes =
+            new StateSerdes<>("test-topic", Serdes.String(), Serdes.String());
+
+        final String key = "test-key";
+        final byte[] serializedWithNull = stateSerdes.rawKey(key, null);
+        final byte[] serializedWithoutHeaders = stateSerdes.rawKey(key);
+
+        assertArrayEquals(serializedWithoutHeaders, serializedWithNull);
+    }
+
+    @Test
+    public void shouldSerializeValueWithNullHeaders() {
+        final StateSerdes<String, String> stateSerdes =
+            new StateSerdes<>("test-topic", Serdes.String(), Serdes.String());
+
+        final String value = "test-value";
+        final byte[] serializedWithNull = stateSerdes.rawValue(value, null);
+        final byte[] serializedWithoutHeaders = stateSerdes.rawValue(value);
+
+        assertArrayEquals(serializedWithoutHeaders, serializedWithNull);
+    }
+
+    @Test
+    public void shouldDeserializeKeyWithNullHeaders() {
+        final StateSerdes<String, String> stateSerdes =
+            new StateSerdes<>("test-topic", Serdes.String(), Serdes.String());
+
+        final String key = "test-key";
+        final byte[] serialized = stateSerdes.rawKey(key);
+
+        final String deserializedWithNull = stateSerdes.keyFrom(serialized, 
null);
+        final String deserializedWithoutHeaders = 
stateSerdes.keyFrom(serialized);
+
+        assertEquals(deserializedWithoutHeaders, deserializedWithNull);
+    }
+
+    @Test
+    public void shouldDeserializeValueWithNullHeaders() {
+        final StateSerdes<String, String> stateSerdes =
+            new StateSerdes<>("test-topic", Serdes.String(), Serdes.String());
+
+        final String value = "test-value";
+        final byte[] serialized = stateSerdes.rawValue(value);
+
+        final String deserializedWithNull = stateSerdes.valueFrom(serialized, 
null);
+        final String deserializedWithoutHeaders = 
stateSerdes.valueFrom(serialized);
+
+        assertEquals(deserializedWithoutHeaders, deserializedWithNull);
+    }
+
+    @Test
+    public void shouldSerializeKeyWithEmptyHeaders() {
+        final StateSerdes<String, String> stateSerdes =
+            new StateSerdes<>("test-topic", Serdes.String(), Serdes.String());
+        final Headers emptyHeaders = new RecordHeaders();
+
+        final String key = "test-key";
+        final byte[] serialized = stateSerdes.rawKey(key, emptyHeaders);
+
+        assertNotNull(serialized);
+    }
+
+    @Test
+    public void shouldSerializeValueWithEmptyHeaders() {
+        final StateSerdes<String, String> stateSerdes =
+            new StateSerdes<>("test-topic", Serdes.String(), Serdes.String());
+        final Headers emptyHeaders = new RecordHeaders();
+
+        final String value = "test-value";
+        final byte[] serialized = stateSerdes.rawValue(value, emptyHeaders);
+
+        assertNotNull(serialized);
+    }
+
+    @Test
+    public void shouldThrowIfIncompatibleSerdeForKeyWithHeaders() throws 
ClassNotFoundException {

Review Comment:
   Same



##########
streams/src/test/java/org/apache/kafka/streams/state/StateSerdesTest.java:
##########
@@ -137,4 +141,142 @@ public void shouldThrowIfIncompatibleSerdeForKey() throws 
ClassNotFoundException
                     "Change the default Serdes in StreamConfig or provide 
correct Serdes via method parameters."));
     }
 
+    @Test
+    public void shouldSerializeAndDeserializeKeyWithHeaders() {
+        final StateSerdes<String, String> stateSerdes =
+            new StateSerdes<>("test-topic", Serdes.String(), Serdes.String());
+        final Headers headers = new RecordHeaders()
+            .add("header-key", "header-value".getBytes());
+
+        final String key = "test-key";
+        final byte[] serialized = stateSerdes.rawKey(key, headers);
+        final String deserialized = stateSerdes.keyFrom(serialized, headers);
+
+        assertEquals(key, deserialized);
+    }
+
+    @Test
+    public void shouldSerializeAndDeserializeValueWithHeaders() {
+        final StateSerdes<String, String> stateSerdes =
+            new StateSerdes<>("test-topic", Serdes.String(), Serdes.String());
+        final Headers headers = new RecordHeaders()
+            .add("header-key", "header-value".getBytes());
+
+        final String value = "test-value";
+        final byte[] serialized = stateSerdes.rawValue(value, headers);
+        final String deserialized = stateSerdes.valueFrom(serialized, headers);
+
+        assertEquals(value, deserialized);
+    }
+
+    @Test
+    public void shouldSerializeKeyWithNullHeaders() {
+        final StateSerdes<String, String> stateSerdes =
+            new StateSerdes<>("test-topic", Serdes.String(), Serdes.String());
+
+        final String key = "test-key";
+        final byte[] serializedWithNull = stateSerdes.rawKey(key, null);
+        final byte[] serializedWithoutHeaders = stateSerdes.rawKey(key);
+
+        assertArrayEquals(serializedWithoutHeaders, serializedWithNull);
+    }
+
+    @Test
+    public void shouldSerializeValueWithNullHeaders() {
+        final StateSerdes<String, String> stateSerdes =
+            new StateSerdes<>("test-topic", Serdes.String(), Serdes.String());
+
+        final String value = "test-value";
+        final byte[] serializedWithNull = stateSerdes.rawValue(value, null);
+        final byte[] serializedWithoutHeaders = stateSerdes.rawValue(value);
+
+        assertArrayEquals(serializedWithoutHeaders, serializedWithNull);
+    }
+
+    @Test
+    public void shouldDeserializeKeyWithNullHeaders() {
+        final StateSerdes<String, String> stateSerdes =
+            new StateSerdes<>("test-topic", Serdes.String(), Serdes.String());
+
+        final String key = "test-key";
+        final byte[] serialized = stateSerdes.rawKey(key);
+
+        final String deserializedWithNull = stateSerdes.keyFrom(serialized, 
null);
+        final String deserializedWithoutHeaders = 
stateSerdes.keyFrom(serialized);
+
+        assertEquals(deserializedWithoutHeaders, deserializedWithNull);
+    }
+
+    @Test
+    public void shouldDeserializeValueWithNullHeaders() {
+        final StateSerdes<String, String> stateSerdes =
+            new StateSerdes<>("test-topic", Serdes.String(), Serdes.String());
+
+        final String value = "test-value";
+        final byte[] serialized = stateSerdes.rawValue(value);
+
+        final String deserializedWithNull = stateSerdes.valueFrom(serialized, 
null);
+        final String deserializedWithoutHeaders = 
stateSerdes.valueFrom(serialized);
+
+        assertEquals(deserializedWithoutHeaders, deserializedWithNull);
+    }
+
+    @Test
+    public void shouldSerializeKeyWithEmptyHeaders() {
+        final StateSerdes<String, String> stateSerdes =
+            new StateSerdes<>("test-topic", Serdes.String(), Serdes.String());
+        final Headers emptyHeaders = new RecordHeaders();
+
+        final String key = "test-key";
+        final byte[] serialized = stateSerdes.rawKey(key, emptyHeaders);
+
+        assertNotNull(serialized);
+    }
+
+    @Test
+    public void shouldSerializeValueWithEmptyHeaders() {
+        final StateSerdes<String, String> stateSerdes =
+            new StateSerdes<>("test-topic", Serdes.String(), Serdes.String());
+        final Headers emptyHeaders = new RecordHeaders();
+
+        final String value = "test-value";
+        final byte[] serialized = stateSerdes.rawValue(value, emptyHeaders);
+
+        assertNotNull(serialized);
+    }
+
+    @Test
+    public void shouldThrowIfIncompatibleSerdeForKeyWithHeaders() throws 
ClassNotFoundException {
+        final Class myClass = Class.forName("java.lang.String");
+        final StateSerdes<Object, Object> stateSerdes =
+            new StateSerdes<Object, Object>("anyName", 
Serdes.serdeFrom(myClass), Serdes.serdeFrom(myClass));
+        final Integer myInt = 123;
+        final Headers headers = new RecordHeaders().add("key", 
"value".getBytes());
+
+        final Exception e = assertThrows(StreamsException.class, () -> 
stateSerdes.rawKey(myInt, headers));
+        assertThat(
+            e.getMessage(),
+            equalTo(
+                "A serializer 
(org.apache.kafka.common.serialization.StringSerializer) " +
+                    "is not compatible to the actual key type (key type: 
java.lang.Integer). " +
+                    "Change the default Serdes in StreamConfig or provide 
correct Serdes via method parameters."));
+    }
+
+    @Test
+    public void shouldThrowIfIncompatibleSerdeForValueWithHeaders() throws 
ClassNotFoundException {

Review Comment:
   Same



##########
streams/src/main/java/org/apache/kafka/streams/state/StateSerdes.java:
##########
@@ -149,26 +151,49 @@ public String topic() {
      * @param rawKey  the key as raw bytes
      * @return        the key as typed object
      */
+    @Deprecated
     public K keyFrom(final byte[] rawKey) {
         return keySerde.deserializer().deserialize(topic, rawKey);
     }
 
+    /**
+     * Deserialize the key from raw bytes.
+     *
+     * @param rawKey  the key as raw bytes
+     * @return        the key as typed object
+     */
+    public K keyFrom(final byte[] rawKey, final Headers headers) {
+        return keySerde.deserializer().deserialize(topic, headers, rawKey);
+    }
+
     /**
      * Deserialize the value from raw bytes.
      *
      * @param rawValue  the value as raw bytes
      * @return          the value as typed object
      */
+    @Deprecated
     public V valueFrom(final byte[] rawValue) {
         return valueSerde.deserializer().deserialize(topic, rawValue);
     }
 
+    /**
+     * Deserialize the value from raw bytes.
+     *
+     * @param rawValue  the value as raw bytes
+     * @return          the value as typed object
+     */
+    public V valueFrom(final byte[] rawValue, Headers headers) {
+        return valueSerde.deserializer().deserialize(topic, headers, 
Utils.wrapNullable(rawValue));

Review Comment:
   Why do we need to add `Utils.wrapNullable` -- we don't use it in existing 
`valueFrom(...)` either?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to