frankvicky commented on code in PR #21736:
URL: https://github.com/apache/kafka/pull/21736#discussion_r2936680792


##########
streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreWithHeadersTest.java:
##########
@@ -497,4 +498,259 @@ private KafkaMetric metric(final String name) {
     private KafkaMetric metric(final MetricName metricName) {
         return this.metrics.metric(metricName);
     }
+
+    @Test
+    @SuppressWarnings("unchecked")
+    public void shouldUseHeadersFromValueToDeserializeKeyInRange() {
+        setUp();
+
+        final Serde<String> keySerde = mock(Serde.class);
+        final Serializer<String> keySerializer = mock(Serializer.class);
+        final Deserializer<String> keyDeserializer = mock(Deserializer.class);
+        final Serde<ValueTimestampHeaders<String>> valueSerde = 
mock(Serde.class);
+        final Deserializer<ValueTimestampHeaders<String>> valueDeserializer = 
mock(Deserializer.class);
+
+        lenient().when(keySerde.deserializer()).thenReturn(keyDeserializer);
+        lenient().when(keySerde.serializer()).thenReturn(keySerializer);
+        
lenient().when(valueSerde.deserializer()).thenReturn(valueDeserializer);
+
+        lenient().when(keySerializer.serialize(any(), 
any(RecordHeaders.class), any())).thenReturn(KEY.getBytes());
+
+        lenient().when(valueDeserializer.deserialize(any(), 
any(RecordHeaders.class), eq(VALUE_TIMESTAMP_HEADERS_BYTES)))
+            .thenReturn(VALUE_TIMESTAMP_HEADERS);
+
+        lenient().when(keyDeserializer.deserialize(any(), eq(HEADERS), 
eq(KEY.getBytes())))
+            .thenReturn(KEY);
+
+        final KeyValue<Bytes, byte[]> testData = KeyValue.pair(KEY_BYTES, 
VALUE_TIMESTAMP_HEADERS_BYTES);
+
+        when(inner.range(any(Bytes.class), any(Bytes.class)))
+            .thenReturn(new 
KeyValueIteratorStub<>(List.of(testData).iterator()));
+
+        metered = new MeteredTimestampedKeyValueStoreWithHeaders<>(
+            inner,
+            STORE_TYPE,
+            new MockTime(),
+            keySerde,
+            valueSerde
+        );
+        metered.init(context, metered);
+
+        final KeyValueIterator<String, ValueTimestampHeaders<String>> iterator 
= metered.range("a", "z");
+
+        assertTrue(iterator.hasNext());
+        final KeyValue<String, ValueTimestampHeaders<String>> result = 
iterator.next();

Review Comment:
   make sense



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to