mjsax commented on code in PR #21670:
URL: https://github.com/apache/kafka/pull/21670#discussion_r2915173211
##########
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreWithHeaders.java:
##########
@@ -59,6 +68,84 @@ public void put(final Windowed<K> sessionKey, final
AggregationWithHeaders<AGG>
}
+ @Override
+ public KeyValueIterator<Windowed<K>, AggregationWithHeaders<AGG>>
findSessions(final K key,
+
final Instant earliestSessionEndTime,
+
final Instant latestSessionStartTime) {
+ return super.findSessions(key, earliestSessionEndTime,
latestSessionStartTime);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public <R> QueryResult<R> query(final Query<R> query,
+ final PositionBound positionBound,
+ final QueryConfig config) {
+ final long start = time.nanoseconds();
+ final QueryResult<R> result;
+
+ if (query instanceof WindowRangeQuery) {
+ final WindowRangeQuery<?, ?> windowRangeQuery =
(WindowRangeQuery<?, ?>) query;
+ if (windowRangeQuery.getKey().isPresent()) {
+ result = runRangeQueryWithHeadersUnwrap(query, positionBound,
config);
Review Comment:
```suggestion
result = runRangeQuery(query, positionBound, config);
```
##########
streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/PositionRestartIntegrationTest.java:
##########
@@ -248,6 +248,7 @@ public boolean isWindowed() {
public boolean isSession() {
return false;
}
+
Review Comment:
Can we revert this to get rid of the whole file in the PR?
##########
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreWithHeaders.java:
##########
@@ -59,6 +68,84 @@ public void put(final Windowed<K> sessionKey, final
AggregationWithHeaders<AGG>
}
+ @Override
+ public KeyValueIterator<Windowed<K>, AggregationWithHeaders<AGG>>
findSessions(final K key,
+
final Instant earliestSessionEndTime,
+
final Instant latestSessionStartTime) {
+ return super.findSessions(key, earliestSessionEndTime,
latestSessionStartTime);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public <R> QueryResult<R> query(final Query<R> query,
+ final PositionBound positionBound,
+ final QueryConfig config) {
+ final long start = time.nanoseconds();
+ final QueryResult<R> result;
+
+ if (query instanceof WindowRangeQuery) {
+ final WindowRangeQuery<?, ?> windowRangeQuery =
(WindowRangeQuery<?, ?>) query;
+ if (windowRangeQuery.getKey().isPresent()) {
+ result = runRangeQueryWithHeadersUnwrap(query, positionBound,
config);
+ } else {
+ result = QueryResult.forFailure(
+ FailureReason.UNKNOWN_QUERY_TYPE,
+ "This store (" + getClass() + ") doesn't know how to"
+ + " execute the given query (" + query + ") because"
+ + " SessionStores only support
WindowRangeQuery.withKey."
+ + " Contact the store maintainer if you need support"
+ + " for a new query type."
+ );
+ }
+ if (config.isCollectExecutionInfo()) {
+ result.addExecutionInfo(
+ "Handled in " + getClass() + " with serdes "
+ + serdes + " in " + (time.nanoseconds() - start) +
"ns");
+ }
+ } else {
+ result = wrapped().query(query, positionBound, config);
+ if (config.isCollectExecutionInfo()) {
+ result.addExecutionInfo(
+ "Handled in " + getClass() + " in " + (time.nanoseconds()
- start) + "ns");
+ }
+ }
+ return result;
+ }
+
+ @SuppressWarnings("unchecked")
+ private <R> QueryResult<R> runRangeQueryWithHeadersUnwrap(final Query<R>
query,
+ final
PositionBound positionBound,
+ final
QueryConfig config) {
+ final WindowRangeQuery<K, ?> typedQuery = (WindowRangeQuery<K, ?>)
query;
+ final WindowRangeQuery<Bytes, byte[]> rawKeyQuery =
+ WindowRangeQuery.withKey(
+ Bytes.wrap(serdes.rawKey(typedQuery.getKey().get(), new
RecordHeaders()))
+ );
+ final QueryResult<KeyValueIterator<Windowed<Bytes>, byte[]>> rawResult
=
+ wrapped().query(rawKeyQuery, positionBound, config);
+ if (rawResult.isSuccess()) {
+ final MeteredWindowedKeyValueIterator<K, ?> typedResult =
Review Comment:
`?` ?
##########
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreWithHeaders.java:
##########
@@ -59,6 +68,84 @@ public void put(final Windowed<K> sessionKey, final
AggregationWithHeaders<AGG>
}
+ @Override
+ public KeyValueIterator<Windowed<K>, AggregationWithHeaders<AGG>>
findSessions(final K key,
+
final Instant earliestSessionEndTime,
+
final Instant latestSessionStartTime) {
+ return super.findSessions(key, earliestSessionEndTime,
latestSessionStartTime);
Review Comment:
Why are we overriding this method if we only call `super` anyway?
##########
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreWithHeaders.java:
##########
@@ -59,6 +68,84 @@ public void put(final Windowed<K> sessionKey, final
AggregationWithHeaders<AGG>
}
+ @Override
+ public KeyValueIterator<Windowed<K>, AggregationWithHeaders<AGG>>
findSessions(final K key,
+
final Instant earliestSessionEndTime,
+
final Instant latestSessionStartTime) {
+ return super.findSessions(key, earliestSessionEndTime,
latestSessionStartTime);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public <R> QueryResult<R> query(final Query<R> query,
+ final PositionBound positionBound,
+ final QueryConfig config) {
+ final long start = time.nanoseconds();
+ final QueryResult<R> result;
+
+ if (query instanceof WindowRangeQuery) {
+ final WindowRangeQuery<?, ?> windowRangeQuery =
(WindowRangeQuery<?, ?>) query;
Review Comment:
Why are we using `<?, ?>` instead of `<K, V>` like we do in
`MeteredSessionStore`.
##########
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreWithHeaders.java:
##########
@@ -59,6 +68,84 @@ public void put(final Windowed<K> sessionKey, final
AggregationWithHeaders<AGG>
}
+ @Override
+ public KeyValueIterator<Windowed<K>, AggregationWithHeaders<AGG>>
findSessions(final K key,
+
final Instant earliestSessionEndTime,
+
final Instant latestSessionStartTime) {
+ return super.findSessions(key, earliestSessionEndTime,
latestSessionStartTime);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public <R> QueryResult<R> query(final Query<R> query,
+ final PositionBound positionBound,
+ final QueryConfig config) {
+ final long start = time.nanoseconds();
+ final QueryResult<R> result;
+
+ if (query instanceof WindowRangeQuery) {
+ final WindowRangeQuery<?, ?> windowRangeQuery =
(WindowRangeQuery<?, ?>) query;
+ if (windowRangeQuery.getKey().isPresent()) {
+ result = runRangeQueryWithHeadersUnwrap(query, positionBound,
config);
Review Comment:
Let's keep the name simple -- it's a `private` method only, so it seems
unnecessary to have a too complex name.
##########
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreWithHeaders.java:
##########
@@ -59,6 +68,84 @@ public void put(final Windowed<K> sessionKey, final
AggregationWithHeaders<AGG>
}
+ @Override
+ public KeyValueIterator<Windowed<K>, AggregationWithHeaders<AGG>>
findSessions(final K key,
+
final Instant earliestSessionEndTime,
+
final Instant latestSessionStartTime) {
+ return super.findSessions(key, earliestSessionEndTime,
latestSessionStartTime);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public <R> QueryResult<R> query(final Query<R> query,
+ final PositionBound positionBound,
+ final QueryConfig config) {
+ final long start = time.nanoseconds();
+ final QueryResult<R> result;
+
+ if (query instanceof WindowRangeQuery) {
+ final WindowRangeQuery<?, ?> windowRangeQuery =
(WindowRangeQuery<?, ?>) query;
+ if (windowRangeQuery.getKey().isPresent()) {
+ result = runRangeQueryWithHeadersUnwrap(query, positionBound,
config);
+ } else {
+ result = QueryResult.forFailure(
+ FailureReason.UNKNOWN_QUERY_TYPE,
+ "This store (" + getClass() + ") doesn't know how to"
+ + " execute the given query (" + query + ") because"
+ + " SessionStores only support
WindowRangeQuery.withKey."
+ + " Contact the store maintainer if you need support"
+ + " for a new query type."
+ );
+ }
+ if (config.isCollectExecutionInfo()) {
+ result.addExecutionInfo(
+ "Handled in " + getClass() + " with serdes "
+ + serdes + " in " + (time.nanoseconds() - start) +
"ns");
+ }
+ } else {
+ result = wrapped().query(query, positionBound, config);
+ if (config.isCollectExecutionInfo()) {
+ result.addExecutionInfo(
+ "Handled in " + getClass() + " in " + (time.nanoseconds()
- start) + "ns");
+ }
+ }
+ return result;
+ }
+
+ @SuppressWarnings("unchecked")
+ private <R> QueryResult<R> runRangeQueryWithHeadersUnwrap(final Query<R>
query,
+ final
PositionBound positionBound,
+ final
QueryConfig config) {
+ final WindowRangeQuery<K, ?> typedQuery = (WindowRangeQuery<K, ?>)
query;
Review Comment:
As above. Why `?` ?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]