mjsax commented on code in PR #21643:
URL: https://github.com/apache/kafka/pull/21643#discussion_r2898217677
##########
streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedToHeadersStoreAdapter.java:
##########
@@ -152,6 +155,39 @@ public boolean isOpen() {
public <R> QueryResult<R> query(final Query<R> query,
final PositionBound positionBound,
final QueryConfig config) {
+ // Handle KeyQuery: convert byte[] result from timestamped to headers
format
+ if (query instanceof KeyQuery) {
+ final KeyQuery<Bytes, byte[]> keyQuery = (KeyQuery<Bytes, byte[]>)
query;
+ final QueryResult<byte[]> rawResult = store.query(keyQuery,
positionBound, config);
+
+ if (rawResult.isSuccess()) {
+ final byte[] convertedValue =
convertToHeaderFormat(rawResult.getResult());
+ final QueryResult<byte[]> convertedResult =
+
InternalQueryResultUtil.copyAndSubstituteDeserializedResult(rawResult,
convertedValue);
+ return (QueryResult<R>) convertedResult;
+ } else {
+ return (QueryResult<R>) rawResult;
+ }
+ }
+
+ // Handle RangeQuery: wrap iterator to convert values
+ if (query instanceof RangeQuery) {
+ final RangeQuery<Bytes, byte[]> rangeQuery = (RangeQuery<Bytes,
byte[]>) query;
+ final QueryResult<KeyValueIterator<Bytes, byte[]>> rawResult =
+ store.query(rangeQuery, positionBound, config);
+
+ if (rawResult.isSuccess()) {
+ final KeyValueIterator<Bytes, byte[]> convertedIterator =
+ new
TimestampedToHeadersIteratorAdapter<>(rawResult.getResult());
+ final QueryResult<KeyValueIterator<Bytes, byte[]>>
convertedResult =
+
InternalQueryResultUtil.copyAndSubstituteDeserializedResult(rawResult,
convertedIterator);
+ return (QueryResult<R>) convertedResult;
+ } else {
+ return (QueryResult<R>) rawResult;
+ }
+ }
+
+ // For other query types, delegate to the underlying store
Review Comment:
Not sure if this is correct? -- For regular layers in the hierarchy, if a
query-type is not supported, it's fine to forward to lower layers, but for an
adaptor, if we cannot translate the bytes for unknown query type, even if the
lower layer supports the query, we would crash in the upper layer trying to
deserialize as the byte format does not match what the metered layer expects,
w/o the adaptor fixing it up...
So it seems better to return `FailedQueryResult` for this case?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]