mjsax commented on a change in pull request #11582: URL: https://github.com/apache/kafka/pull/11582#discussion_r771737277
########## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java ########## @@ -186,6 +203,68 @@ public boolean setFlushListener(final CacheFlushListener<K, V> listener, return false; } + @SuppressWarnings("unchecked") + @Override + public <R> QueryResult<R> query(final Query<R> query, + final PositionBound positionBound, + final boolean collectExecutionInfo) { + + final long start = System.nanoTime(); + final QueryResult<R> result; + + final QueryHandler handler = queryHandlers.get(query.getClass()); + if (handler == null) { + result = wrapped().query(query, positionBound, collectExecutionInfo); + if (collectExecutionInfo) { + result.addExecutionInfo( + "Handled in " + getClass() + " in " + (System.nanoTime() - start) + "ns"); + } + } else { + result = (QueryResult<R>) handler.apply( + query, + positionBound, + collectExecutionInfo, + this + ); + if (collectExecutionInfo) { + result.addExecutionInfo( + "Handled in " + getClass() + " with serdes " + + serdes + " in " + (System.nanoTime() - start) + "ns"); + } + } + return result; + } + + @SuppressWarnings("unchecked") + private <R> QueryResult<R> runKeyQuery(final Query query, + final PositionBound positionBound, final boolean collectExecutionInfo) { + final QueryResult<R> result; + final KeyQuery<K, V> typedQuery = (KeyQuery<K, V>) query; + final KeyQuery<Bytes, byte[]> rawKeyQuery = KeyQuery.withKey(keyBytes(typedQuery.getKey())); + final QueryResult<byte[]> rawResult = + wrapped().query(rawKeyQuery, positionBound, collectExecutionInfo); + if (rawResult.isSuccess()) { + final boolean timestamped = WrappedStateStore.isTimestamped(wrapped()); + final Serde<V> vSerde = serdes.valueSerde(); + final Deserializer<V> deserializer; + if (!timestamped && vSerde instanceof ValueAndTimestampSerde) { Review comment: > The MeteredStore's serde is always a ValueAndTimestamp serde regardless of whether the inner store is Timestamped or not. Is it? (1) We also have `MeteredTimestampStore` (of course is extends `MeteredStore`) but it seems better to split the logic and move everything timestamp related into `MeteredTimestampStore`. (2) For PAPI users, they can add a plain `KeyValueStore` and we won't wrap it with the `TimestampedStore` face and the serdes won't be `ValueAndTimestamp` either. > What we do is, when you have a non-timestamped store, we wrap it with an extra layer (org.apache.kafka.streams.state.internals.KeyValueToTimestampedKeyValueByteStoreAdapter) that pads the returned values with a fake timestamp (org.apache.kafka.streams.state.TimestampedBytesStore#convertToTimestampedFormat We only do this in the DSL, if the user gives as a non-timestamped store via `Materialized` -- but for PAPI users, we never do this but use whatever store is given to use as-is. > so we did not implement the same padding logic for non-timestamped data and instead just bubble up to the MeteredStore Not sure if I can follow? It should not be a concern for IQ? Also, the current conversion between plain/timestamped is really just a corner case (and a case that we want to deprecate anyway -- we just did not find a way to do so -- maybe we should add a runtime check at some point and WARN users if they provide a non-timestamped store until we remove support for it and throw an exception instead...). Seems not worth to add more tech debt for this behavior that we only added to not break stuff. > Which means that if we want to deserialize it, we need to know whether to use the ValueAndTimestamp deserializer or just the Value's deserializer. Yes, but we should split this logic between the plain `MeteredStore` and the `MeteredTimestampStore`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org