mjsax commented on code in PR #21411:
URL: https://github.com/apache/kafka/pull/21411#discussion_r2776294179
##########
streams/src/main/java/org/apache/kafka/streams/state/StateSerdes.java:
##########
@@ -149,26 +151,49 @@ public String topic() {
* @param rawKey the key as raw bytes
* @return the key as typed object
Review Comment:
Add:
```
@deprecated Since 4.3. Use {@link #keyFrom(byte[], Headers} instead.
```
Similar elsewhere.
##########
streams/src/test/java/org/apache/kafka/streams/state/StateSerdesTest.java:
##########
@@ -137,4 +141,142 @@ public void shouldThrowIfIncompatibleSerdeForKey() throws
ClassNotFoundException
"Change the default Serdes in StreamConfig or provide
correct Serdes via method parameters."));
}
+ @Test
+ public void shouldSerializeAndDeserializeKeyWithHeaders() {
+ final StateSerdes<String, String> stateSerdes =
+ new StateSerdes<>("test-topic", Serdes.String(), Serdes.String());
Review Comment:
Should we mock the key serde and verify that the provided headers are passed
in correctly?
##########
streams/src/main/java/org/apache/kafka/streams/state/StateSerdes.java:
##########
@@ -213,4 +260,34 @@ public byte[] rawValue(final V value) {
e);
}
}
+
+ /**
+ * Serialize the given value.
+ *
+ * @param value the value to be serialized
+ * @return the serialized value
+ */
+ @SuppressWarnings("rawtypes")
+ public byte[] rawValue(final V value, final Headers headers) {
Review Comment:
Same question.
##########
streams/src/test/java/org/apache/kafka/streams/state/StateSerdesTest.java:
##########
@@ -137,4 +141,142 @@ public void shouldThrowIfIncompatibleSerdeForKey() throws
ClassNotFoundException
"Change the default Serdes in StreamConfig or provide
correct Serdes via method parameters."));
}
+ @Test
+ public void shouldSerializeAndDeserializeKeyWithHeaders() {
+ final StateSerdes<String, String> stateSerdes =
+ new StateSerdes<>("test-topic", Serdes.String(), Serdes.String());
+ final Headers headers = new RecordHeaders()
+ .add("header-key", "header-value".getBytes());
+
+ final String key = "test-key";
+ final byte[] serialized = stateSerdes.rawKey(key, headers);
+ final String deserialized = stateSerdes.keyFrom(serialized, headers);
+
+ assertEquals(key, deserialized);
+ }
+
+ @Test
+ public void shouldSerializeAndDeserializeValueWithHeaders() {
+ final StateSerdes<String, String> stateSerdes =
+ new StateSerdes<>("test-topic", Serdes.String(), Serdes.String());
Review Comment:
Same question for value case.
##########
streams/src/test/java/org/apache/kafka/streams/state/StateSerdesTest.java:
##########
@@ -137,4 +141,142 @@ public void shouldThrowIfIncompatibleSerdeForKey() throws
ClassNotFoundException
"Change the default Serdes in StreamConfig or provide
correct Serdes via method parameters."));
}
+ @Test
+ public void shouldSerializeAndDeserializeKeyWithHeaders() {
+ final StateSerdes<String, String> stateSerdes =
+ new StateSerdes<>("test-topic", Serdes.String(), Serdes.String());
+ final Headers headers = new RecordHeaders()
+ .add("header-key", "header-value".getBytes());
+
+ final String key = "test-key";
+ final byte[] serialized = stateSerdes.rawKey(key, headers);
+ final String deserialized = stateSerdes.keyFrom(serialized, headers);
+
+ assertEquals(key, deserialized);
+ }
+
+ @Test
+ public void shouldSerializeAndDeserializeValueWithHeaders() {
+ final StateSerdes<String, String> stateSerdes =
+ new StateSerdes<>("test-topic", Serdes.String(), Serdes.String());
+ final Headers headers = new RecordHeaders()
+ .add("header-key", "header-value".getBytes());
+
+ final String value = "test-value";
+ final byte[] serialized = stateSerdes.rawValue(value, headers);
+ final String deserialized = stateSerdes.valueFrom(serialized, headers);
+
+ assertEquals(value, deserialized);
+ }
+
+ @Test
+ public void shouldSerializeKeyWithNullHeaders() {
+ final StateSerdes<String, String> stateSerdes =
+ new StateSerdes<>("test-topic", Serdes.String(), Serdes.String());
+
+ final String key = "test-key";
+ final byte[] serializedWithNull = stateSerdes.rawKey(key, null);
+ final byte[] serializedWithoutHeaders = stateSerdes.rawKey(key);
+
+ assertArrayEquals(serializedWithoutHeaders, serializedWithNull);
+ }
+
+ @Test
+ public void shouldSerializeValueWithNullHeaders() {
+ final StateSerdes<String, String> stateSerdes =
+ new StateSerdes<>("test-topic", Serdes.String(), Serdes.String());
+
+ final String value = "test-value";
+ final byte[] serializedWithNull = stateSerdes.rawValue(value, null);
+ final byte[] serializedWithoutHeaders = stateSerdes.rawValue(value);
+
+ assertArrayEquals(serializedWithoutHeaders, serializedWithNull);
+ }
+
+ @Test
+ public void shouldDeserializeKeyWithNullHeaders() {
+ final StateSerdes<String, String> stateSerdes =
+ new StateSerdes<>("test-topic", Serdes.String(), Serdes.String());
+
+ final String key = "test-key";
+ final byte[] serialized = stateSerdes.rawKey(key);
+
+ final String deserializedWithNull = stateSerdes.keyFrom(serialized,
null);
+ final String deserializedWithoutHeaders =
stateSerdes.keyFrom(serialized);
+
+ assertEquals(deserializedWithoutHeaders, deserializedWithNull);
+ }
+
+ @Test
+ public void shouldDeserializeValueWithNullHeaders() {
+ final StateSerdes<String, String> stateSerdes =
+ new StateSerdes<>("test-topic", Serdes.String(), Serdes.String());
+
+ final String value = "test-value";
+ final byte[] serialized = stateSerdes.rawValue(value);
+
+ final String deserializedWithNull = stateSerdes.valueFrom(serialized,
null);
+ final String deserializedWithoutHeaders =
stateSerdes.valueFrom(serialized);
+
+ assertEquals(deserializedWithoutHeaders, deserializedWithNull);
+ }
+
+ @Test
+ public void shouldSerializeKeyWithEmptyHeaders() {
Review Comment:
This case should be covered implicitly (if we let `rawKey(key)` call
`rawKey(key, new RecordHeaders())`) via the existing test for `rawKey(key)`. So
might be redundant?
##########
streams/src/test/java/org/apache/kafka/streams/state/StateSerdesTest.java:
##########
@@ -137,4 +141,142 @@ public void shouldThrowIfIncompatibleSerdeForKey() throws
ClassNotFoundException
"Change the default Serdes in StreamConfig or provide
correct Serdes via method parameters."));
}
+ @Test
+ public void shouldSerializeAndDeserializeKeyWithHeaders() {
+ final StateSerdes<String, String> stateSerdes =
+ new StateSerdes<>("test-topic", Serdes.String(), Serdes.String());
+ final Headers headers = new RecordHeaders()
+ .add("header-key", "header-value".getBytes());
+
+ final String key = "test-key";
+ final byte[] serialized = stateSerdes.rawKey(key, headers);
+ final String deserialized = stateSerdes.keyFrom(serialized, headers);
+
+ assertEquals(key, deserialized);
+ }
+
+ @Test
+ public void shouldSerializeAndDeserializeValueWithHeaders() {
+ final StateSerdes<String, String> stateSerdes =
+ new StateSerdes<>("test-topic", Serdes.String(), Serdes.String());
+ final Headers headers = new RecordHeaders()
+ .add("header-key", "header-value".getBytes());
+
+ final String value = "test-value";
+ final byte[] serialized = stateSerdes.rawValue(value, headers);
+ final String deserialized = stateSerdes.valueFrom(serialized, headers);
+
+ assertEquals(value, deserialized);
+ }
+
+ @Test
+ public void shouldSerializeKeyWithNullHeaders() {
+ final StateSerdes<String, String> stateSerdes =
+ new StateSerdes<>("test-topic", Serdes.String(), Serdes.String());
+
+ final String key = "test-key";
+ final byte[] serializedWithNull = stateSerdes.rawKey(key, null);
+ final byte[] serializedWithoutHeaders = stateSerdes.rawKey(key);
+
+ assertArrayEquals(serializedWithoutHeaders, serializedWithNull);
+ }
+
+ @Test
+ public void shouldSerializeValueWithNullHeaders() {
+ final StateSerdes<String, String> stateSerdes =
+ new StateSerdes<>("test-topic", Serdes.String(), Serdes.String());
+
+ final String value = "test-value";
+ final byte[] serializedWithNull = stateSerdes.rawValue(value, null);
+ final byte[] serializedWithoutHeaders = stateSerdes.rawValue(value);
+
+ assertArrayEquals(serializedWithoutHeaders, serializedWithNull);
+ }
+
+ @Test
+ public void shouldDeserializeKeyWithNullHeaders() {
Review Comment:
same
##########
streams/src/test/java/org/apache/kafka/streams/state/StateSerdesTest.java:
##########
@@ -137,4 +141,142 @@ public void shouldThrowIfIncompatibleSerdeForKey() throws
ClassNotFoundException
"Change the default Serdes in StreamConfig or provide
correct Serdes via method parameters."));
}
+ @Test
+ public void shouldSerializeAndDeserializeKeyWithHeaders() {
+ final StateSerdes<String, String> stateSerdes =
+ new StateSerdes<>("test-topic", Serdes.String(), Serdes.String());
+ final Headers headers = new RecordHeaders()
+ .add("header-key", "header-value".getBytes());
+
+ final String key = "test-key";
+ final byte[] serialized = stateSerdes.rawKey(key, headers);
+ final String deserialized = stateSerdes.keyFrom(serialized, headers);
+
+ assertEquals(key, deserialized);
+ }
+
+ @Test
+ public void shouldSerializeAndDeserializeValueWithHeaders() {
+ final StateSerdes<String, String> stateSerdes =
+ new StateSerdes<>("test-topic", Serdes.String(), Serdes.String());
+ final Headers headers = new RecordHeaders()
+ .add("header-key", "header-value".getBytes());
+
+ final String value = "test-value";
+ final byte[] serialized = stateSerdes.rawValue(value, headers);
+ final String deserialized = stateSerdes.valueFrom(serialized, headers);
+
+ assertEquals(value, deserialized);
+ }
+
+ @Test
+ public void shouldSerializeKeyWithNullHeaders() {
+ final StateSerdes<String, String> stateSerdes =
+ new StateSerdes<>("test-topic", Serdes.String(), Serdes.String());
+
+ final String key = "test-key";
+ final byte[] serializedWithNull = stateSerdes.rawKey(key, null);
+ final byte[] serializedWithoutHeaders = stateSerdes.rawKey(key);
+
+ assertArrayEquals(serializedWithoutHeaders, serializedWithNull);
+ }
+
+ @Test
+ public void shouldSerializeValueWithNullHeaders() {
Review Comment:
Same question for value case
##########
streams/src/test/java/org/apache/kafka/streams/state/StateSerdesTest.java:
##########
@@ -137,4 +141,142 @@ public void shouldThrowIfIncompatibleSerdeForKey() throws
ClassNotFoundException
"Change the default Serdes in StreamConfig or provide
correct Serdes via method parameters."));
}
+ @Test
+ public void shouldSerializeAndDeserializeKeyWithHeaders() {
+ final StateSerdes<String, String> stateSerdes =
+ new StateSerdes<>("test-topic", Serdes.String(), Serdes.String());
+ final Headers headers = new RecordHeaders()
+ .add("header-key", "header-value".getBytes());
+
+ final String key = "test-key";
+ final byte[] serialized = stateSerdes.rawKey(key, headers);
+ final String deserialized = stateSerdes.keyFrom(serialized, headers);
+
+ assertEquals(key, deserialized);
+ }
+
+ @Test
+ public void shouldSerializeAndDeserializeValueWithHeaders() {
+ final StateSerdes<String, String> stateSerdes =
+ new StateSerdes<>("test-topic", Serdes.String(), Serdes.String());
+ final Headers headers = new RecordHeaders()
+ .add("header-key", "header-value".getBytes());
+
+ final String value = "test-value";
+ final byte[] serialized = stateSerdes.rawValue(value, headers);
+ final String deserialized = stateSerdes.valueFrom(serialized, headers);
+
+ assertEquals(value, deserialized);
+ }
+
+ @Test
+ public void shouldSerializeKeyWithNullHeaders() {
Review Comment:
While we handle the `null` case, it's more of an guard -- in the end, we
should never call `rawKey` with `null` in prod code --- so wondering if we need
this test?
##########
streams/src/test/java/org/apache/kafka/streams/state/StateSerdesTest.java:
##########
@@ -137,4 +141,142 @@ public void shouldThrowIfIncompatibleSerdeForKey() throws
ClassNotFoundException
"Change the default Serdes in StreamConfig or provide
correct Serdes via method parameters."));
}
+ @Test
+ public void shouldSerializeAndDeserializeKeyWithHeaders() {
+ final StateSerdes<String, String> stateSerdes =
+ new StateSerdes<>("test-topic", Serdes.String(), Serdes.String());
+ final Headers headers = new RecordHeaders()
+ .add("header-key", "header-value".getBytes());
+
+ final String key = "test-key";
+ final byte[] serialized = stateSerdes.rawKey(key, headers);
+ final String deserialized = stateSerdes.keyFrom(serialized, headers);
+
+ assertEquals(key, deserialized);
+ }
+
+ @Test
+ public void shouldSerializeAndDeserializeValueWithHeaders() {
+ final StateSerdes<String, String> stateSerdes =
+ new StateSerdes<>("test-topic", Serdes.String(), Serdes.String());
+ final Headers headers = new RecordHeaders()
+ .add("header-key", "header-value".getBytes());
+
+ final String value = "test-value";
+ final byte[] serialized = stateSerdes.rawValue(value, headers);
+ final String deserialized = stateSerdes.valueFrom(serialized, headers);
+
+ assertEquals(value, deserialized);
+ }
+
+ @Test
+ public void shouldSerializeKeyWithNullHeaders() {
+ final StateSerdes<String, String> stateSerdes =
+ new StateSerdes<>("test-topic", Serdes.String(), Serdes.String());
+
+ final String key = "test-key";
+ final byte[] serializedWithNull = stateSerdes.rawKey(key, null);
+ final byte[] serializedWithoutHeaders = stateSerdes.rawKey(key);
+
+ assertArrayEquals(serializedWithoutHeaders, serializedWithNull);
+ }
+
+ @Test
+ public void shouldSerializeValueWithNullHeaders() {
+ final StateSerdes<String, String> stateSerdes =
+ new StateSerdes<>("test-topic", Serdes.String(), Serdes.String());
+
+ final String value = "test-value";
+ final byte[] serializedWithNull = stateSerdes.rawValue(value, null);
+ final byte[] serializedWithoutHeaders = stateSerdes.rawValue(value);
+
+ assertArrayEquals(serializedWithoutHeaders, serializedWithNull);
+ }
+
+ @Test
+ public void shouldDeserializeKeyWithNullHeaders() {
+ final StateSerdes<String, String> stateSerdes =
+ new StateSerdes<>("test-topic", Serdes.String(), Serdes.String());
+
+ final String key = "test-key";
+ final byte[] serialized = stateSerdes.rawKey(key);
+
+ final String deserializedWithNull = stateSerdes.keyFrom(serialized,
null);
+ final String deserializedWithoutHeaders =
stateSerdes.keyFrom(serialized);
+
+ assertEquals(deserializedWithoutHeaders, deserializedWithNull);
+ }
+
+ @Test
+ public void shouldDeserializeValueWithNullHeaders() {
Review Comment:
same
##########
streams/src/main/java/org/apache/kafka/streams/state/StateSerdes.java:
##########
@@ -149,26 +151,49 @@ public String topic() {
* @param rawKey the key as raw bytes
* @return the key as typed object
*/
+ @Deprecated
public K keyFrom(final byte[] rawKey) {
return keySerde.deserializer().deserialize(topic, rawKey);
}
+ /**
+ * Deserialize the key from raw bytes.
+ *
+ * @param rawKey the key as raw bytes
+ * @return the key as typed object
+ */
+ public K keyFrom(final byte[] rawKey, final Headers headers) {
+ return keySerde.deserializer().deserialize(topic, headers, rawKey);
+ }
+
/**
* Deserialize the value from raw bytes.
*
* @param rawValue the value as raw bytes
* @return the value as typed object
*/
+ @Deprecated
public V valueFrom(final byte[] rawValue) {
return valueSerde.deserializer().deserialize(topic, rawValue);
}
+ /**
+ * Deserialize the value from raw bytes.
+ *
+ * @param rawValue the value as raw bytes
+ * @return the value as typed object
+ */
+ public V valueFrom(final byte[] rawValue, Headers headers) {
Review Comment:
Missing `final`
##########
streams/src/main/java/org/apache/kafka/streams/state/StateSerdes.java:
##########
@@ -184,12 +209,34 @@ public byte[] rawKey(final K key) {
}
}
+ /**
+ * Serialize the given key.
+ *
+ * @param key the key to be serialized
+ * @return the serialized key
+ */
+ public byte[] rawKey(final K key, final Headers headers) {
+ try {
Review Comment:
This is a lot of code duplication -- should we update existing `rawKey(...)`
to just call this new one, passing in `new RecordHeaders()` object instead?
##########
streams/src/test/java/org/apache/kafka/streams/state/StateSerdesTest.java:
##########
@@ -137,4 +141,142 @@ public void shouldThrowIfIncompatibleSerdeForKey() throws
ClassNotFoundException
"Change the default Serdes in StreamConfig or provide
correct Serdes via method parameters."));
}
+ @Test
+ public void shouldSerializeAndDeserializeKeyWithHeaders() {
+ final StateSerdes<String, String> stateSerdes =
+ new StateSerdes<>("test-topic", Serdes.String(), Serdes.String());
+ final Headers headers = new RecordHeaders()
+ .add("header-key", "header-value".getBytes());
+
+ final String key = "test-key";
+ final byte[] serialized = stateSerdes.rawKey(key, headers);
+ final String deserialized = stateSerdes.keyFrom(serialized, headers);
+
+ assertEquals(key, deserialized);
+ }
+
+ @Test
+ public void shouldSerializeAndDeserializeValueWithHeaders() {
+ final StateSerdes<String, String> stateSerdes =
+ new StateSerdes<>("test-topic", Serdes.String(), Serdes.String());
+ final Headers headers = new RecordHeaders()
+ .add("header-key", "header-value".getBytes());
+
+ final String value = "test-value";
+ final byte[] serialized = stateSerdes.rawValue(value, headers);
+ final String deserialized = stateSerdes.valueFrom(serialized, headers);
+
+ assertEquals(value, deserialized);
+ }
+
+ @Test
+ public void shouldSerializeKeyWithNullHeaders() {
+ final StateSerdes<String, String> stateSerdes =
+ new StateSerdes<>("test-topic", Serdes.String(), Serdes.String());
+
+ final String key = "test-key";
+ final byte[] serializedWithNull = stateSerdes.rawKey(key, null);
+ final byte[] serializedWithoutHeaders = stateSerdes.rawKey(key);
+
+ assertArrayEquals(serializedWithoutHeaders, serializedWithNull);
+ }
+
+ @Test
+ public void shouldSerializeValueWithNullHeaders() {
+ final StateSerdes<String, String> stateSerdes =
+ new StateSerdes<>("test-topic", Serdes.String(), Serdes.String());
+
+ final String value = "test-value";
+ final byte[] serializedWithNull = stateSerdes.rawValue(value, null);
+ final byte[] serializedWithoutHeaders = stateSerdes.rawValue(value);
+
+ assertArrayEquals(serializedWithoutHeaders, serializedWithNull);
+ }
+
+ @Test
+ public void shouldDeserializeKeyWithNullHeaders() {
+ final StateSerdes<String, String> stateSerdes =
+ new StateSerdes<>("test-topic", Serdes.String(), Serdes.String());
+
+ final String key = "test-key";
+ final byte[] serialized = stateSerdes.rawKey(key);
+
+ final String deserializedWithNull = stateSerdes.keyFrom(serialized,
null);
+ final String deserializedWithoutHeaders =
stateSerdes.keyFrom(serialized);
+
+ assertEquals(deserializedWithoutHeaders, deserializedWithNull);
+ }
+
+ @Test
+ public void shouldDeserializeValueWithNullHeaders() {
+ final StateSerdes<String, String> stateSerdes =
+ new StateSerdes<>("test-topic", Serdes.String(), Serdes.String());
+
+ final String value = "test-value";
+ final byte[] serialized = stateSerdes.rawValue(value);
+
+ final String deserializedWithNull = stateSerdes.valueFrom(serialized,
null);
+ final String deserializedWithoutHeaders =
stateSerdes.valueFrom(serialized);
+
+ assertEquals(deserializedWithoutHeaders, deserializedWithNull);
+ }
+
+ @Test
+ public void shouldSerializeKeyWithEmptyHeaders() {
+ final StateSerdes<String, String> stateSerdes =
+ new StateSerdes<>("test-topic", Serdes.String(), Serdes.String());
+ final Headers emptyHeaders = new RecordHeaders();
+
+ final String key = "test-key";
+ final byte[] serialized = stateSerdes.rawKey(key, emptyHeaders);
+
+ assertNotNull(serialized);
+ }
+
+ @Test
+ public void shouldSerializeValueWithEmptyHeaders() {
Review Comment:
Redundant as above?
##########
streams/src/test/java/org/apache/kafka/streams/state/StateSerdesTest.java:
##########
@@ -137,4 +141,142 @@ public void shouldThrowIfIncompatibleSerdeForKey() throws
ClassNotFoundException
"Change the default Serdes in StreamConfig or provide
correct Serdes via method parameters."));
}
+ @Test
+ public void shouldSerializeAndDeserializeKeyWithHeaders() {
+ final StateSerdes<String, String> stateSerdes =
+ new StateSerdes<>("test-topic", Serdes.String(), Serdes.String());
+ final Headers headers = new RecordHeaders()
+ .add("header-key", "header-value".getBytes());
+
+ final String key = "test-key";
+ final byte[] serialized = stateSerdes.rawKey(key, headers);
+ final String deserialized = stateSerdes.keyFrom(serialized, headers);
+
+ assertEquals(key, deserialized);
+ }
+
+ @Test
+ public void shouldSerializeAndDeserializeValueWithHeaders() {
+ final StateSerdes<String, String> stateSerdes =
+ new StateSerdes<>("test-topic", Serdes.String(), Serdes.String());
+ final Headers headers = new RecordHeaders()
+ .add("header-key", "header-value".getBytes());
+
+ final String value = "test-value";
+ final byte[] serialized = stateSerdes.rawValue(value, headers);
+ final String deserialized = stateSerdes.valueFrom(serialized, headers);
+
+ assertEquals(value, deserialized);
+ }
+
+ @Test
+ public void shouldSerializeKeyWithNullHeaders() {
+ final StateSerdes<String, String> stateSerdes =
+ new StateSerdes<>("test-topic", Serdes.String(), Serdes.String());
+
+ final String key = "test-key";
+ final byte[] serializedWithNull = stateSerdes.rawKey(key, null);
+ final byte[] serializedWithoutHeaders = stateSerdes.rawKey(key);
+
+ assertArrayEquals(serializedWithoutHeaders, serializedWithNull);
+ }
+
+ @Test
+ public void shouldSerializeValueWithNullHeaders() {
+ final StateSerdes<String, String> stateSerdes =
+ new StateSerdes<>("test-topic", Serdes.String(), Serdes.String());
+
+ final String value = "test-value";
+ final byte[] serializedWithNull = stateSerdes.rawValue(value, null);
+ final byte[] serializedWithoutHeaders = stateSerdes.rawValue(value);
+
+ assertArrayEquals(serializedWithoutHeaders, serializedWithNull);
+ }
+
+ @Test
+ public void shouldDeserializeKeyWithNullHeaders() {
+ final StateSerdes<String, String> stateSerdes =
+ new StateSerdes<>("test-topic", Serdes.String(), Serdes.String());
+
+ final String key = "test-key";
+ final byte[] serialized = stateSerdes.rawKey(key);
+
+ final String deserializedWithNull = stateSerdes.keyFrom(serialized,
null);
+ final String deserializedWithoutHeaders =
stateSerdes.keyFrom(serialized);
+
+ assertEquals(deserializedWithoutHeaders, deserializedWithNull);
+ }
+
+ @Test
+ public void shouldDeserializeValueWithNullHeaders() {
+ final StateSerdes<String, String> stateSerdes =
+ new StateSerdes<>("test-topic", Serdes.String(), Serdes.String());
+
+ final String value = "test-value";
+ final byte[] serialized = stateSerdes.rawValue(value);
+
+ final String deserializedWithNull = stateSerdes.valueFrom(serialized,
null);
+ final String deserializedWithoutHeaders =
stateSerdes.valueFrom(serialized);
+
+ assertEquals(deserializedWithoutHeaders, deserializedWithNull);
+ }
+
+ @Test
+ public void shouldSerializeKeyWithEmptyHeaders() {
+ final StateSerdes<String, String> stateSerdes =
+ new StateSerdes<>("test-topic", Serdes.String(), Serdes.String());
+ final Headers emptyHeaders = new RecordHeaders();
+
+ final String key = "test-key";
+ final byte[] serialized = stateSerdes.rawKey(key, emptyHeaders);
+
+ assertNotNull(serialized);
+ }
+
+ @Test
+ public void shouldSerializeValueWithEmptyHeaders() {
+ final StateSerdes<String, String> stateSerdes =
+ new StateSerdes<>("test-topic", Serdes.String(), Serdes.String());
+ final Headers emptyHeaders = new RecordHeaders();
+
+ final String value = "test-value";
+ final byte[] serialized = stateSerdes.rawValue(value, emptyHeaders);
+
+ assertNotNull(serialized);
+ }
+
+ @Test
+ public void shouldThrowIfIncompatibleSerdeForKeyWithHeaders() throws
ClassNotFoundException {
Review Comment:
Same
##########
streams/src/test/java/org/apache/kafka/streams/state/StateSerdesTest.java:
##########
@@ -137,4 +141,142 @@ public void shouldThrowIfIncompatibleSerdeForKey() throws
ClassNotFoundException
"Change the default Serdes in StreamConfig or provide
correct Serdes via method parameters."));
}
+ @Test
+ public void shouldSerializeAndDeserializeKeyWithHeaders() {
+ final StateSerdes<String, String> stateSerdes =
+ new StateSerdes<>("test-topic", Serdes.String(), Serdes.String());
+ final Headers headers = new RecordHeaders()
+ .add("header-key", "header-value".getBytes());
+
+ final String key = "test-key";
+ final byte[] serialized = stateSerdes.rawKey(key, headers);
+ final String deserialized = stateSerdes.keyFrom(serialized, headers);
+
+ assertEquals(key, deserialized);
+ }
+
+ @Test
+ public void shouldSerializeAndDeserializeValueWithHeaders() {
+ final StateSerdes<String, String> stateSerdes =
+ new StateSerdes<>("test-topic", Serdes.String(), Serdes.String());
+ final Headers headers = new RecordHeaders()
+ .add("header-key", "header-value".getBytes());
+
+ final String value = "test-value";
+ final byte[] serialized = stateSerdes.rawValue(value, headers);
+ final String deserialized = stateSerdes.valueFrom(serialized, headers);
+
+ assertEquals(value, deserialized);
+ }
+
+ @Test
+ public void shouldSerializeKeyWithNullHeaders() {
+ final StateSerdes<String, String> stateSerdes =
+ new StateSerdes<>("test-topic", Serdes.String(), Serdes.String());
+
+ final String key = "test-key";
+ final byte[] serializedWithNull = stateSerdes.rawKey(key, null);
+ final byte[] serializedWithoutHeaders = stateSerdes.rawKey(key);
+
+ assertArrayEquals(serializedWithoutHeaders, serializedWithNull);
+ }
+
+ @Test
+ public void shouldSerializeValueWithNullHeaders() {
+ final StateSerdes<String, String> stateSerdes =
+ new StateSerdes<>("test-topic", Serdes.String(), Serdes.String());
+
+ final String value = "test-value";
+ final byte[] serializedWithNull = stateSerdes.rawValue(value, null);
+ final byte[] serializedWithoutHeaders = stateSerdes.rawValue(value);
+
+ assertArrayEquals(serializedWithoutHeaders, serializedWithNull);
+ }
+
+ @Test
+ public void shouldDeserializeKeyWithNullHeaders() {
+ final StateSerdes<String, String> stateSerdes =
+ new StateSerdes<>("test-topic", Serdes.String(), Serdes.String());
+
+ final String key = "test-key";
+ final byte[] serialized = stateSerdes.rawKey(key);
+
+ final String deserializedWithNull = stateSerdes.keyFrom(serialized,
null);
+ final String deserializedWithoutHeaders =
stateSerdes.keyFrom(serialized);
+
+ assertEquals(deserializedWithoutHeaders, deserializedWithNull);
+ }
+
+ @Test
+ public void shouldDeserializeValueWithNullHeaders() {
+ final StateSerdes<String, String> stateSerdes =
+ new StateSerdes<>("test-topic", Serdes.String(), Serdes.String());
+
+ final String value = "test-value";
+ final byte[] serialized = stateSerdes.rawValue(value);
+
+ final String deserializedWithNull = stateSerdes.valueFrom(serialized,
null);
+ final String deserializedWithoutHeaders =
stateSerdes.valueFrom(serialized);
+
+ assertEquals(deserializedWithoutHeaders, deserializedWithNull);
+ }
+
+ @Test
+ public void shouldSerializeKeyWithEmptyHeaders() {
+ final StateSerdes<String, String> stateSerdes =
+ new StateSerdes<>("test-topic", Serdes.String(), Serdes.String());
+ final Headers emptyHeaders = new RecordHeaders();
+
+ final String key = "test-key";
+ final byte[] serialized = stateSerdes.rawKey(key, emptyHeaders);
+
+ assertNotNull(serialized);
+ }
+
+ @Test
+ public void shouldSerializeValueWithEmptyHeaders() {
+ final StateSerdes<String, String> stateSerdes =
+ new StateSerdes<>("test-topic", Serdes.String(), Serdes.String());
+ final Headers emptyHeaders = new RecordHeaders();
+
+ final String value = "test-value";
+ final byte[] serialized = stateSerdes.rawValue(value, emptyHeaders);
+
+ assertNotNull(serialized);
+ }
+
+ @Test
+ public void shouldThrowIfIncompatibleSerdeForKeyWithHeaders() throws
ClassNotFoundException {
+ final Class myClass = Class.forName("java.lang.String");
+ final StateSerdes<Object, Object> stateSerdes =
+ new StateSerdes<Object, Object>("anyName",
Serdes.serdeFrom(myClass), Serdes.serdeFrom(myClass));
+ final Integer myInt = 123;
+ final Headers headers = new RecordHeaders().add("key",
"value".getBytes());
+
+ final Exception e = assertThrows(StreamsException.class, () ->
stateSerdes.rawKey(myInt, headers));
+ assertThat(
+ e.getMessage(),
+ equalTo(
+ "A serializer
(org.apache.kafka.common.serialization.StringSerializer) " +
+ "is not compatible to the actual key type (key type:
java.lang.Integer). " +
+ "Change the default Serdes in StreamConfig or provide
correct Serdes via method parameters."));
+ }
+
+ @Test
+ public void shouldThrowIfIncompatibleSerdeForValueWithHeaders() throws
ClassNotFoundException {
Review Comment:
Same
##########
streams/src/main/java/org/apache/kafka/streams/state/StateSerdes.java:
##########
@@ -149,26 +151,49 @@ public String topic() {
* @param rawKey the key as raw bytes
* @return the key as typed object
*/
+ @Deprecated
public K keyFrom(final byte[] rawKey) {
return keySerde.deserializer().deserialize(topic, rawKey);
}
+ /**
+ * Deserialize the key from raw bytes.
+ *
+ * @param rawKey the key as raw bytes
+ * @return the key as typed object
+ */
+ public K keyFrom(final byte[] rawKey, final Headers headers) {
+ return keySerde.deserializer().deserialize(topic, headers, rawKey);
+ }
+
/**
* Deserialize the value from raw bytes.
*
* @param rawValue the value as raw bytes
* @return the value as typed object
*/
+ @Deprecated
public V valueFrom(final byte[] rawValue) {
return valueSerde.deserializer().deserialize(topic, rawValue);
}
+ /**
+ * Deserialize the value from raw bytes.
+ *
+ * @param rawValue the value as raw bytes
+ * @return the value as typed object
+ */
+ public V valueFrom(final byte[] rawValue, Headers headers) {
+ return valueSerde.deserializer().deserialize(topic, headers,
Utils.wrapNullable(rawValue));
Review Comment:
Why do we need to add `Utils.wrapNullable` -- we don't use it in existing
`valueFrom(...)` either?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]