mjsax commented on a change in pull request #11582:
URL: https://github.com/apache/kafka/pull/11582#discussion_r771737277



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
##########
@@ -186,6 +203,68 @@ public boolean setFlushListener(final 
CacheFlushListener<K, V> listener,
         return false;
     }
 
+    @SuppressWarnings("unchecked")
+    @Override
+    public <R> QueryResult<R> query(final Query<R> query,
+        final PositionBound positionBound,
+        final boolean collectExecutionInfo) {
+
+        final long start = System.nanoTime();
+        final QueryResult<R> result;
+
+        final QueryHandler handler = queryHandlers.get(query.getClass());
+        if (handler == null) {
+            result = wrapped().query(query, positionBound, 
collectExecutionInfo);
+            if (collectExecutionInfo) {
+                result.addExecutionInfo(
+                    "Handled in " + getClass() + " in " + (System.nanoTime() - 
start) + "ns");
+            }
+        } else {
+            result = (QueryResult<R>) handler.apply(
+                query,
+                positionBound,
+                collectExecutionInfo,
+                this
+            );
+            if (collectExecutionInfo) {
+                result.addExecutionInfo(
+                    "Handled in " + getClass() + " with serdes "
+                        + serdes + " in " + (System.nanoTime() - start) + 
"ns");
+            }
+        }
+        return result;
+    }
+
+    @SuppressWarnings("unchecked")
+    private <R> QueryResult<R> runKeyQuery(final Query query,
+        final PositionBound positionBound, final boolean collectExecutionInfo) 
{
+        final QueryResult<R> result;
+        final KeyQuery<K, V> typedQuery = (KeyQuery<K, V>) query;
+        final KeyQuery<Bytes, byte[]> rawKeyQuery = 
KeyQuery.withKey(keyBytes(typedQuery.getKey()));
+        final QueryResult<byte[]> rawResult =
+            wrapped().query(rawKeyQuery, positionBound, collectExecutionInfo);
+        if (rawResult.isSuccess()) {
+            final boolean timestamped = 
WrappedStateStore.isTimestamped(wrapped());
+            final Serde<V> vSerde = serdes.valueSerde();
+            final Deserializer<V> deserializer;
+            if (!timestamped && vSerde instanceof ValueAndTimestampSerde) {

Review comment:
       > The MeteredStore's serde is always a ValueAndTimestamp serde 
regardless of whether the inner store is Timestamped or not. 
   
   Is it? (1) We also have `MeteredTimestampStore` (of course is extends 
`MeteredStore`) but it seems better to split the logic and move everything 
timestamp related into `MeteredTimestampStore`. (2) For PAPI users, they can 
add a plain `KeyValueStore` and we won't wrap it with the `TimestampedStore` 
face and the serdes won't be `ValueAndTimestamp` either.
   
   > What we do is, when you have a non-timestamped store, we wrap it with an 
extra layer 
(org.apache.kafka.streams.state.internals.KeyValueToTimestampedKeyValueByteStoreAdapter)
 that pads the returned values with a fake timestamp 
(org.apache.kafka.streams.state.TimestampedBytesStore#convertToTimestampedFormat
   
   We only do this in the DSL, if the user gives as a non-timestamped store via 
`Materialized` -- but for PAPI users, we never do this but use whatever store 
is given to use as-is.
   
   > so we did not implement the same padding logic for non-timestamped data 
and instead just bubble up to the MeteredStore
   
   Not sure if I can follow? It should not be a concern for IQ? Also, the 
current conversion between plain/timestamped is really just a corner case (and 
a case that we want to deprecate anyway -- we just did not find a way to do so 
-- maybe we should add a runtime check at some point and WARN users if they 
provide a non-timestamped store until we remove support for it and throw an 
exception instead...). Seems not worth to add more tech debt for this behavior 
that we only added to not break stuff.
   
   > Which means that if we want to deserialize it, we need to know whether to 
use the ValueAndTimestamp deserializer or just the Value's deserializer.
   
   Yes, but we should split this logic between the plain `MeteredStore` and the 
`MeteredTimestampStore`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to