UladzislauBlok commented on code in PR #21768:
URL: https://github.com/apache/kafka/pull/21768#discussion_r2941105703


##########
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreWithHeaders.java:
##########
@@ -294,9 +338,10 @@ private <R> QueryResult<R> runTimestampedRangeQuery(final 
Query<R> query,
         if (rawResult.isSuccess()) {
             final KeyValueIterator<Bytes, byte[]> iterator = 
rawResult.getResult();
             final KeyValueIterator<K, ValueAndTimestamp<V>> resultIterator =
-                    (KeyValueIterator<K, ValueAndTimestamp<V>>) new 
MeteredTimestampedKeyValueStoreWithHeadersIterator(
+                    (KeyValueIterator<K, ValueAndTimestamp<V>>) new 
MeteredTimestampedKeyValueStoreWithHeadersQueryIterator(

Review Comment:
   Shouldn't it be a KeyValueIterator<K, ValueTimestampHeaders<V>> 
resultIterator instead of KeyValueIterator<K, ValueAndTimestamp<V>> ?
   Why do we return just ValueAndTimestamp<V> as a query result?
   UPD: I see that this is what iterator returns: 
https://github.com/apache/kafka/pull/21768/changes#diff-43268e867244a98e6614710d9f2a3be2c519345e1f81204b91414ee1db77c754R464
 , but still why?



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreWithHeaders.java:
##########
@@ -513,7 +555,18 @@ public K peekNextKey() {
         }
     }
 
-    protected Bytes keyBytes(final K key, final Headers headers) {
+    @Override
+    protected Bytes serializeKey(final K key) {
+        throw new 
UnsupportedOperationException("MeteredTimestampedKeyValueStoreWithHeaders 
required to pass in Headers when serializing a key.");

Review Comment:
   May be we don't even need this. See my comment 
https://github.com/apache/kafka/pull/21768/changes#r2941312324



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreWithHeaders.java:
##########
@@ -105,13 +107,24 @@ protected Serde<ValueTimestampHeaders<V>> 
prepareValueSerdeForStore(final Serde<
         }
     }
 
+    @Override
+    public ValueTimestampHeaders<V> get(final K key) {
+        Objects.requireNonNull(key, "key cannot be null");
+        try {
+            return maybeMeasureLatency(() -> 
deserializeValue(wrapped().get(serializeKey(key, new RecordHeaders()))), time, 
getSensor);

Review Comment:
   I think `context.headers()` is better, but still have a question though
   If we do it this way, effectively this is the same as 
https://github.com/apache/kafka/blob/eb0a7358c453782f07c860fe94f8e5e1c8ef7f6f/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java#L448
   Can we just reuse it?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to