mjsax commented on code in PR #21768:
URL: https://github.com/apache/kafka/pull/21768#discussion_r2937813181


##########
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java:
##########
@@ -87,7 +87,7 @@ public class MeteredKeyValueStore<K, V>
     protected Sensor putIfAbsentSensor;
     protected Sensor getSensor;
     protected Sensor deleteSensor;
-    private Sensor putAllSensor;
+    protected Sensor putAllSensor;

Review Comment:
   Need access in `MeteredKeyValueTimestampStoreWithHeaders`



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java:
##########
@@ -288,10 +288,9 @@ private <R> QueryResult<R> runRangeQuery(final Query<R> 
query,
             wrapped().query(rawRangeQuery, positionBound, config);
         if (rawResult.isSuccess()) {
             final KeyValueIterator<Bytes, byte[]> iterator = 
rawResult.getResult();
-            final KeyValueIterator<K, V> resultIterator = new 
MeteredKeyValueTimestampedIterator(
+            final KeyValueIterator<K, V> resultIterator = new 
MeteredKeyValueStoreIterator(

Review Comment:
   Side cleanup: I think there is no reason to have 
`MeteredKeyValueTimestampedIterator` (removing blow), and we can just use 
existing `MeteredKeyValueStoreIterator`. -- We don't have the case to bridge 
between different value types in `MeteredKeyValueStore`.



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreWithHeaders.java:
##########
@@ -105,13 +107,24 @@ protected Serde<ValueTimestampHeaders<V>> 
prepareValueSerdeForStore(final Serde<
         }
     }
 
+    @Override
+    public ValueTimestampHeaders<V> get(final K key) {
+        Objects.requireNonNull(key, "key cannot be null");
+        try {
+            return maybeMeasureLatency(() -> 
deserializeValue(wrapped().get(serializeKey(key, new RecordHeaders()))), time, 
getSensor);
+        } catch (final ProcessorStateException e) {
+            final String message = String.format(e.getMessage(), key);
+            throw new ProcessorStateException(message, e);
+        }
+    }
+
     @Override
     public void put(final K key,
                     final ValueTimestampHeaders<V> value) {
         Objects.requireNonNull(key, "key cannot be null");
         try {
             final Headers headers = value != null ? value.headers() : new 
RecordHeaders();
-            maybeMeasureLatency(() -> wrapped().put(keyBytes(key, headers), 
serdes.rawValue(value, headers)), time, putSensor);
+            maybeMeasureLatency(() -> wrapped().put(serializeKey(key, 
headers), serdes.rawValue(value, headers)), time, putSensor);

Review Comment:
   Unifying the existing `keyBytes` method with the naming of 
`MeteredKeyValueStore` which uses `serializeKey`



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreWithHeaders.java:
##########
@@ -513,7 +555,18 @@ public K peekNextKey() {
         }
     }
 
-    protected Bytes keyBytes(final K key, final Headers headers) {
+    @Override
+    protected Bytes serializeKey(final K key) {
+        throw new 
UnsupportedOperationException("MeteredTimestampedKeyValueStoreWithHeaders 
required to pass in Headers when serializing a key.");
+    }
+
+    protected Bytes serializeKey(final K key, final Headers headers) {
         return Bytes.wrap(serdes.rawKey(key, headers));
     }
+
+    @Override
+    protected K deserializeKey(final byte[] rawKey) {
+        throw new 
UnsupportedOperationException("MeteredTimestampedKeyValueStoreWithHeaders 
required to pass in Headers when deserializing a key.");

Review Comment:
   Similar guard -- this did not surface any issues.



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreWithHeaders.java:
##########
@@ -318,7 +363,7 @@ public <PS extends Serializer<P>, P> KeyValueIterator<K, 
ValueTimestampHeaders<V
                                                                                
                   final PS prefixKeySerializer) {
         Objects.requireNonNull(prefix, "prefix cannot be null");
         Objects.requireNonNull(prefixKeySerializer, "prefixKeySerializer 
cannot be null");
-        return new MeteredValueTimestampHeadersIterator(
+        return new MeteredTimestampedKeyValueStoreWithHeadersIterator(

Review Comment:
   Just renaming to align naming patterns



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreWithHeaders.java:
##########
@@ -125,14 +138,42 @@ public ValueTimestampHeaders<V> putIfAbsent(final K key,
         Objects.requireNonNull(key, "key cannot be null");
         final Headers headers = value != null ? value.headers() : new 
RecordHeaders();
         final ValueTimestampHeaders<V> currentValue = maybeMeasureLatency(
-            () -> deserializeValue(wrapped().putIfAbsent(keyBytes(key, 
headers), serdes.rawValue(value, headers))),
+            // `rawOldValue` returned from `wrapped().putIfAbsent(...)` is 
type ValueTimestampHeader
+            // -> no need to pass in Headers into `deserializeValue()`
+            () -> deserializeValue(wrapped().putIfAbsent(serializeKey(key, 
headers), serdes.rawValue(value, headers))),
             time,
             putIfAbsentSensor
         );
         maybeRecordE2ELatency();
         return currentValue;
     }
 
+    @Override
+    public void putAll(final List<KeyValue<K, ValueTimestampHeaders<V>>> 
entries) {
+        entries.forEach(entry -> Objects.requireNonNull(entry.key, "key cannot 
be null"));
+        maybeMeasureLatency(() -> wrapped().putAll(innerEntries(entries)), 
time, putAllSensor);
+    }
+
+    private List<KeyValue<Bytes, byte[]>> innerEntries(final List<KeyValue<K, 
ValueTimestampHeaders<V>>> from) {
+        final List<KeyValue<Bytes, byte[]>> byteEntries = new ArrayList<>();
+        for (final KeyValue<K, ValueTimestampHeaders<V>> entry : from) {
+            final Headers headers = entry.value != null ? 
entry.value.headers() : new RecordHeaders();
+            byteEntries.add(KeyValue.pair(serializeKey(entry.key, headers), 
serializeValue(entry.value)));

Review Comment:
   Similar side-effect as for `get()` -- need to overwrite to pass in headers.



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreWithHeaders.java:
##########
@@ -125,14 +138,42 @@ public ValueTimestampHeaders<V> putIfAbsent(final K key,
         Objects.requireNonNull(key, "key cannot be null");
         final Headers headers = value != null ? value.headers() : new 
RecordHeaders();
         final ValueTimestampHeaders<V> currentValue = maybeMeasureLatency(
-            () -> deserializeValue(wrapped().putIfAbsent(keyBytes(key, 
headers), serdes.rawValue(value, headers))),
+            // `rawOldValue` returned from `wrapped().putIfAbsent(...)` is 
type ValueTimestampHeader
+            // -> no need to pass in Headers into `deserializeValue()`
+            () -> deserializeValue(wrapped().putIfAbsent(serializeKey(key, 
headers), serdes.rawValue(value, headers))),
             time,
             putIfAbsentSensor
         );
         maybeRecordE2ELatency();
         return currentValue;
     }
 
+    @Override
+    public void putAll(final List<KeyValue<K, ValueTimestampHeaders<V>>> 
entries) {
+        entries.forEach(entry -> Objects.requireNonNull(entry.key, "key cannot 
be null"));
+        maybeMeasureLatency(() -> wrapped().putAll(innerEntries(entries)), 
time, putAllSensor);
+    }
+
+    private List<KeyValue<Bytes, byte[]>> innerEntries(final List<KeyValue<K, 
ValueTimestampHeaders<V>>> from) {
+        final List<KeyValue<Bytes, byte[]>> byteEntries = new ArrayList<>();
+        for (final KeyValue<K, ValueTimestampHeaders<V>> entry : from) {
+            final Headers headers = entry.value != null ? 
entry.value.headers() : new RecordHeaders();
+            byteEntries.add(KeyValue.pair(serializeKey(entry.key, headers), 
serializeValue(entry.value)));
+        }
+        return byteEntries;
+    }
+
+    @Override
+    public ValueTimestampHeaders<V> delete(final K key) {

Review Comment:
   This is the bug -- we need to override `delete`, too, to pass headers 
correctly.
   
   Related to https://github.com/apache/kafka/pull/21639



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreWithHeaders.java:
##########
@@ -105,13 +107,24 @@ protected Serde<ValueTimestampHeaders<V>> 
prepareValueSerdeForStore(final Serde<
         }
     }
 
+    @Override
+    public ValueTimestampHeaders<V> get(final K key) {
+        Objects.requireNonNull(key, "key cannot be null");
+        try {
+            return maybeMeasureLatency(() -> 
deserializeValue(wrapped().get(serializeKey(key, new RecordHeaders()))), time, 
getSensor);

Review Comment:
   We need to overwrite `get()` to be able to pass in headers into 
`serializeKey(...)` -- For `get()` it might not be strictly necessary, but if 
forces us to make a conscious decision what to pass -- it's a side-effect of 
disallowing to call `serializeKey` w/o headers.



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreWithHeaders.java:
##########
@@ -409,21 +456,15 @@ public KeyValue<K, V> next() {
 
             final KeyValue<Bytes, byte[]> keyValue = iter.next();
             final ValueTimestampHeaders<V> valueTimestampHeaders = 
valueTimestampHeadersDeserializer.apply(keyValue.value);
-            final Headers headers = valueTimestampHeaders != null ? 
valueTimestampHeaders.headers() : new RecordHeaders();

Review Comment:
   This can never be `null`. We know that `iter.next` cannot return `null` 
(this would be a "non existing row" -- `null`-values are deletes).
   
   Thus simplifying the code (we don't have any `null` checks for this case in 
older code for plain- and ts-store either.
   
   Similar below



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreWithHeaders.java:
##########
@@ -251,9 +294,10 @@ private <R> QueryResult<R> runRangeQuery(final Query<R> 
query,
             wrapped().query(rawRangeQuery, positionBound, config);
         if (rawResult.isSuccess()) {
             final KeyValueIterator<Bytes, byte[]> iterator = 
rawResult.getResult();
-            final KeyValueIterator<K, V> resultIterator = new 
MeteredTimestampedKeyValueStoreWithHeadersIterator(
+            final KeyValueIterator<K, V> resultIterator = new 
MeteredTimestampedKeyValueStoreWithHeadersQueryIterator(

Review Comment:
   Just renaming to aling naming patterns.



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java:
##########
@@ -387,13 +386,13 @@ public V delete(final K key) {
     public <PS extends Serializer<P>, P> KeyValueIterator<K, V> 
prefixScan(final P prefix, final PS prefixKeySerializer) {
         Objects.requireNonNull(prefix, "prefix cannot be null");
         Objects.requireNonNull(prefixKeySerializer, "prefixKeySerializer 
cannot be null");
-        return new MeteredKeyValueIterator(wrapped().prefixScan(prefix, 
prefixKeySerializer), prefixScanSensor);
+        return new MeteredKeyValueStoreIterator(wrapped().prefixScan(prefix, 
prefixKeySerializer), prefixScanSensor);

Review Comment:
   Just renaming, to align to established naming patterns across the different 
kv-metered stores.



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreWithHeaders.java:
##########
@@ -105,13 +107,24 @@ protected Serde<ValueTimestampHeaders<V>> 
prepareValueSerdeForStore(final Serde<
         }
     }
 
+    @Override
+    public ValueTimestampHeaders<V> get(final K key) {
+        Objects.requireNonNull(key, "key cannot be null");
+        try {
+            return maybeMeasureLatency(() -> 
deserializeValue(wrapped().get(serializeKey(key, new RecordHeaders()))), time, 
getSensor);

Review Comment:
   Wondering if we would want to pass `context.headers()` instead of `new 
RecordHeaders()`, similar to what we did in 
https://github.com/apache/kafka/pull/21684
   
   If yes, applies elsewhere in this class, too.



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreWithHeaders.java:
##########
@@ -513,7 +555,18 @@ public K peekNextKey() {
         }
     }
 
-    protected Bytes keyBytes(final K key, final Headers headers) {
+    @Override
+    protected Bytes serializeKey(final K key) {
+        throw new 
UnsupportedOperationException("MeteredTimestampedKeyValueStoreWithHeaders 
required to pass in Headers when serializing a key.");

Review Comment:
   This was actually surfacing the missing overrides, highlighting the bug to 
not override `delete`. So a good guard to have in place (little bit annoying 
side effect that it forces us to override put/putAll, too, but I think it's 
worth the prices to pay)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to