aliehsaeedii commented on code in PR #21650:
URL: https://github.com/apache/kafka/pull/21650#discussion_r2896340666


##########
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreWithHeaders.java:
##########
@@ -83,4 +97,200 @@ public void put(final K key, final ValueTimestampHeaders<V> 
value, final long wi
     protected Bytes keyBytes(final K key, final Headers headers) {
         return Bytes.wrap(serdes.rawKey(key, headers));
     }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public <R> QueryResult<R> query(final Query<R> query,
+                                    final PositionBound positionBound,
+                                    final QueryConfig config) {
+        final long start = time.nanoseconds();
+        final QueryResult<R> result;
+
+        if (query instanceof WindowKeyQuery) {
+            result = runWindowKeyQuery((WindowKeyQuery<K, 
ValueTimestampHeaders<V>>) query, positionBound, config);
+        } else if (query instanceof WindowRangeQuery) {
+            result = runWindowRangeQuery((WindowRangeQuery<K, 
ValueTimestampHeaders<V>>) query, positionBound, config);
+        } else {
+            result = wrapped().query(query, positionBound, config);
+        }
+
+        if (config.isCollectExecutionInfo()) {
+            final String conversionType = isUnderlyingStoreTimestamped()

Review Comment:
   I added `isUnderlyingStoreTimestamped` check,  so we can decide whether to 
convert the query results to plain or timestamped values.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to