UladzislauBlok commented on code in PR #21768:
URL: https://github.com/apache/kafka/pull/21768#discussion_r2941105703
##########
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreWithHeaders.java:
##########
@@ -294,9 +338,10 @@ private <R> QueryResult<R> runTimestampedRangeQuery(final
Query<R> query,
if (rawResult.isSuccess()) {
final KeyValueIterator<Bytes, byte[]> iterator =
rawResult.getResult();
final KeyValueIterator<K, ValueAndTimestamp<V>> resultIterator =
- (KeyValueIterator<K, ValueAndTimestamp<V>>) new
MeteredTimestampedKeyValueStoreWithHeadersIterator(
+ (KeyValueIterator<K, ValueAndTimestamp<V>>) new
MeteredTimestampedKeyValueStoreWithHeadersQueryIterator(
Review Comment:
Shouldn't it be a KeyValueIterator<K, ValueTimestampHeaders<V>>
resultIterator instead of KeyValueIterator<K, ValueAndTimestamp<V>> ?
Why do we return just ValueAndTimestamp<V> as a query result?
UPD: I see that this is what iterator returns:
https://github.com/apache/kafka/pull/21768/changes#diff-43268e867244a98e6614710d9f2a3be2c519345e1f81204b91414ee1db77c754R464
, but still why?
##########
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreWithHeaders.java:
##########
@@ -513,7 +555,18 @@ public K peekNextKey() {
}
}
- protected Bytes keyBytes(final K key, final Headers headers) {
+ @Override
+ protected Bytes serializeKey(final K key) {
+ throw new
UnsupportedOperationException("MeteredTimestampedKeyValueStoreWithHeaders
required to pass in Headers when serializing a key.");
Review Comment:
May be we don't even need this. See my comment
https://github.com/apache/kafka/pull/21768/changes#r2941312324
##########
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreWithHeaders.java:
##########
@@ -105,13 +107,24 @@ protected Serde<ValueTimestampHeaders<V>>
prepareValueSerdeForStore(final Serde<
}
}
+ @Override
+ public ValueTimestampHeaders<V> get(final K key) {
+ Objects.requireNonNull(key, "key cannot be null");
+ try {
+ return maybeMeasureLatency(() ->
deserializeValue(wrapped().get(serializeKey(key, new RecordHeaders()))), time,
getSensor);
Review Comment:
I think `context.headers()` is better, but still have a question though
If we do it this way, effectively this is the same as
https://github.com/apache/kafka/blob/eb0a7358c453782f07c860fe94f8e5e1c8ef7f6f/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java#L448
Can we just reuse it?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]