mjsax commented on code in PR #21670:
URL: https://github.com/apache/kafka/pull/21670#discussion_r2915173211


##########
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreWithHeaders.java:
##########
@@ -59,6 +68,84 @@ public void put(final Windowed<K> sessionKey, final 
AggregationWithHeaders<AGG>
 
     }
 
+    @Override
+    public KeyValueIterator<Windowed<K>, AggregationWithHeaders<AGG>> 
findSessions(final K key,
+                                                                               
    final Instant earliestSessionEndTime,
+                                                                               
    final Instant latestSessionStartTime) {
+        return super.findSessions(key, earliestSessionEndTime, 
latestSessionStartTime);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public <R> QueryResult<R> query(final Query<R> query,
+                                    final PositionBound positionBound,
+                                    final QueryConfig config) {
+        final long start = time.nanoseconds();
+        final QueryResult<R> result;
+
+        if (query instanceof WindowRangeQuery) {
+            final WindowRangeQuery<?, ?> windowRangeQuery = 
(WindowRangeQuery<?, ?>) query;
+            if (windowRangeQuery.getKey().isPresent()) {
+                result = runRangeQueryWithHeadersUnwrap(query, positionBound, 
config);

Review Comment:
   ```suggestion
                   result = runRangeQuery(query, positionBound, config);
   ```



##########
streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/PositionRestartIntegrationTest.java:
##########
@@ -248,6 +248,7 @@ public boolean isWindowed() {
         public boolean isSession() {
             return false;
         }
+

Review Comment:
   Can we revert this to get rid of the whole file in the PR?



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreWithHeaders.java:
##########
@@ -59,6 +68,84 @@ public void put(final Windowed<K> sessionKey, final 
AggregationWithHeaders<AGG>
 
     }
 
+    @Override
+    public KeyValueIterator<Windowed<K>, AggregationWithHeaders<AGG>> 
findSessions(final K key,
+                                                                               
    final Instant earliestSessionEndTime,
+                                                                               
    final Instant latestSessionStartTime) {
+        return super.findSessions(key, earliestSessionEndTime, 
latestSessionStartTime);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public <R> QueryResult<R> query(final Query<R> query,
+                                    final PositionBound positionBound,
+                                    final QueryConfig config) {
+        final long start = time.nanoseconds();
+        final QueryResult<R> result;
+
+        if (query instanceof WindowRangeQuery) {
+            final WindowRangeQuery<?, ?> windowRangeQuery = 
(WindowRangeQuery<?, ?>) query;
+            if (windowRangeQuery.getKey().isPresent()) {
+                result = runRangeQueryWithHeadersUnwrap(query, positionBound, 
config);
+            } else {
+                result = QueryResult.forFailure(
+                    FailureReason.UNKNOWN_QUERY_TYPE,
+                    "This store (" + getClass() + ") doesn't know how to"
+                        + " execute the given query (" + query + ") because"
+                        + " SessionStores only support 
WindowRangeQuery.withKey."
+                        + " Contact the store maintainer if you need support"
+                        + " for a new query type."
+                );
+            }
+            if (config.isCollectExecutionInfo()) {
+                result.addExecutionInfo(
+                    "Handled in " + getClass() + " with serdes "
+                        + serdes + " in " + (time.nanoseconds() - start) + 
"ns");
+            }
+        } else {
+            result = wrapped().query(query, positionBound, config);
+            if (config.isCollectExecutionInfo()) {
+                result.addExecutionInfo(
+                    "Handled in " + getClass() + " in " + (time.nanoseconds() 
- start) + "ns");
+            }
+        }
+        return result;
+    }
+
+    @SuppressWarnings("unchecked")
+    private <R> QueryResult<R> runRangeQueryWithHeadersUnwrap(final Query<R> 
query,
+                                                              final 
PositionBound positionBound,
+                                                              final 
QueryConfig config) {
+        final WindowRangeQuery<K, ?> typedQuery = (WindowRangeQuery<K, ?>) 
query;
+        final WindowRangeQuery<Bytes, byte[]> rawKeyQuery =
+            WindowRangeQuery.withKey(
+                Bytes.wrap(serdes.rawKey(typedQuery.getKey().get(), new 
RecordHeaders()))
+            );
+        final QueryResult<KeyValueIterator<Windowed<Bytes>, byte[]>> rawResult 
=
+            wrapped().query(rawKeyQuery, positionBound, config);
+        if (rawResult.isSuccess()) {
+            final MeteredWindowedKeyValueIterator<K, ?> typedResult =

Review Comment:
   `?` ?



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreWithHeaders.java:
##########
@@ -59,6 +68,84 @@ public void put(final Windowed<K> sessionKey, final 
AggregationWithHeaders<AGG>
 
     }
 
+    @Override
+    public KeyValueIterator<Windowed<K>, AggregationWithHeaders<AGG>> 
findSessions(final K key,
+                                                                               
    final Instant earliestSessionEndTime,
+                                                                               
    final Instant latestSessionStartTime) {
+        return super.findSessions(key, earliestSessionEndTime, 
latestSessionStartTime);

Review Comment:
   Why are we overriding this method if we only call `super` anyway?



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreWithHeaders.java:
##########
@@ -59,6 +68,84 @@ public void put(final Windowed<K> sessionKey, final 
AggregationWithHeaders<AGG>
 
     }
 
+    @Override
+    public KeyValueIterator<Windowed<K>, AggregationWithHeaders<AGG>> 
findSessions(final K key,
+                                                                               
    final Instant earliestSessionEndTime,
+                                                                               
    final Instant latestSessionStartTime) {
+        return super.findSessions(key, earliestSessionEndTime, 
latestSessionStartTime);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public <R> QueryResult<R> query(final Query<R> query,
+                                    final PositionBound positionBound,
+                                    final QueryConfig config) {
+        final long start = time.nanoseconds();
+        final QueryResult<R> result;
+
+        if (query instanceof WindowRangeQuery) {
+            final WindowRangeQuery<?, ?> windowRangeQuery = 
(WindowRangeQuery<?, ?>) query;

Review Comment:
   Why are we using `<?, ?>` instead of `<K, V>` like we do in 
`MeteredSessionStore`.



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreWithHeaders.java:
##########
@@ -59,6 +68,84 @@ public void put(final Windowed<K> sessionKey, final 
AggregationWithHeaders<AGG>
 
     }
 
+    @Override
+    public KeyValueIterator<Windowed<K>, AggregationWithHeaders<AGG>> 
findSessions(final K key,
+                                                                               
    final Instant earliestSessionEndTime,
+                                                                               
    final Instant latestSessionStartTime) {
+        return super.findSessions(key, earliestSessionEndTime, 
latestSessionStartTime);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public <R> QueryResult<R> query(final Query<R> query,
+                                    final PositionBound positionBound,
+                                    final QueryConfig config) {
+        final long start = time.nanoseconds();
+        final QueryResult<R> result;
+
+        if (query instanceof WindowRangeQuery) {
+            final WindowRangeQuery<?, ?> windowRangeQuery = 
(WindowRangeQuery<?, ?>) query;
+            if (windowRangeQuery.getKey().isPresent()) {
+                result = runRangeQueryWithHeadersUnwrap(query, positionBound, 
config);

Review Comment:
   Let's keep the name simple -- it's a `private` method only, so it seems 
unnecessary to have a too complex name.



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreWithHeaders.java:
##########
@@ -59,6 +68,84 @@ public void put(final Windowed<K> sessionKey, final 
AggregationWithHeaders<AGG>
 
     }
 
+    @Override
+    public KeyValueIterator<Windowed<K>, AggregationWithHeaders<AGG>> 
findSessions(final K key,
+                                                                               
    final Instant earliestSessionEndTime,
+                                                                               
    final Instant latestSessionStartTime) {
+        return super.findSessions(key, earliestSessionEndTime, 
latestSessionStartTime);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public <R> QueryResult<R> query(final Query<R> query,
+                                    final PositionBound positionBound,
+                                    final QueryConfig config) {
+        final long start = time.nanoseconds();
+        final QueryResult<R> result;
+
+        if (query instanceof WindowRangeQuery) {
+            final WindowRangeQuery<?, ?> windowRangeQuery = 
(WindowRangeQuery<?, ?>) query;
+            if (windowRangeQuery.getKey().isPresent()) {
+                result = runRangeQueryWithHeadersUnwrap(query, positionBound, 
config);
+            } else {
+                result = QueryResult.forFailure(
+                    FailureReason.UNKNOWN_QUERY_TYPE,
+                    "This store (" + getClass() + ") doesn't know how to"
+                        + " execute the given query (" + query + ") because"
+                        + " SessionStores only support 
WindowRangeQuery.withKey."
+                        + " Contact the store maintainer if you need support"
+                        + " for a new query type."
+                );
+            }
+            if (config.isCollectExecutionInfo()) {
+                result.addExecutionInfo(
+                    "Handled in " + getClass() + " with serdes "
+                        + serdes + " in " + (time.nanoseconds() - start) + 
"ns");
+            }
+        } else {
+            result = wrapped().query(query, positionBound, config);
+            if (config.isCollectExecutionInfo()) {
+                result.addExecutionInfo(
+                    "Handled in " + getClass() + " in " + (time.nanoseconds() 
- start) + "ns");
+            }
+        }
+        return result;
+    }
+
+    @SuppressWarnings("unchecked")
+    private <R> QueryResult<R> runRangeQueryWithHeadersUnwrap(final Query<R> 
query,
+                                                              final 
PositionBound positionBound,
+                                                              final 
QueryConfig config) {
+        final WindowRangeQuery<K, ?> typedQuery = (WindowRangeQuery<K, ?>) 
query;

Review Comment:
   As above. Why `?` ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to