This is an automated email from the ASF dual-hosted git repository. mjsax pushed a commit to branch 3.7 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.7 by this push: new 8b9715bb70d KAFKA-16141: Fix StreamsStandbyTask system test (#15217) 8b9715bb70d is described below commit 8b9715bb70d12e8e692090e1cb5386946f6b0bf3 Author: Matthias J. Sax <matth...@confluent.io> AuthorDate: Fri Jan 19 09:23:42 2024 -0800 KAFKA-16141: Fix StreamsStandbyTask system test (#15217) KAFKA-15629 added `TimestampedByteStore` interface to `KeyValueToTimestampedKeyValueByteStoreAdapter` which break the restore code path and thus some system tests. This PR reverts this change for now. Reviewers: Almog Gavra <almog.ga...@gmail.com>, Walker Carlson <wcarl...@confluent.io> --- .../kafka/streams/state/internals/CachingKeyValueStore.java | 2 +- .../KeyValueToTimestampedKeyValueByteStoreAdapter.java | 3 +-- .../kafka/streams/state/internals/StoreQueryUtils.java | 12 +++++++++++- 3 files changed, 13 insertions(+), 4 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java index 8ba3052a9f4..36f08412828 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java @@ -187,7 +187,7 @@ public class CachingKeyValueStore final LRUCacheEntry lruCacheEntry = context.cache().get(cacheName, key); if (lruCacheEntry != null) { final byte[] rawValue; - if (timestampedSchema && !WrappedStateStore.isTimestamped(wrapped())) { + if (timestampedSchema && !WrappedStateStore.isTimestamped(wrapped()) && !StoreQueryUtils.isAdapter(wrapped())) { rawValue = ValueAndTimestampDeserializer.rawValue(lruCacheEntry.value()); } else { rawValue = lruCacheEntry.value(); diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueToTimestampedKeyValueByteStoreAdapter.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueToTimestampedKeyValueByteStoreAdapter.java index 592c178274f..8c1a7f15eee 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueToTimestampedKeyValueByteStoreAdapter.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueToTimestampedKeyValueByteStoreAdapter.java @@ -35,7 +35,6 @@ import org.apache.kafka.streams.query.internals.InternalQueryResultUtil; import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.KeyValueStore; -import org.apache.kafka.streams.state.TimestampedBytesStore; import java.util.List; @@ -54,7 +53,7 @@ import static org.apache.kafka.streams.state.internals.ValueAndTimestampDeserial */ @SuppressWarnings("unchecked") -public class KeyValueToTimestampedKeyValueByteStoreAdapter implements KeyValueStore<Bytes, byte[]>, TimestampedBytesStore { +public class KeyValueToTimestampedKeyValueByteStoreAdapter implements KeyValueStore<Bytes, byte[]> { final KeyValueStore<Bytes, byte[]> store; KeyValueToTimestampedKeyValueByteStoreAdapter(final KeyValueStore<Bytes, byte[]> store) { diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java index 255f597600a..1609e8b2c5d 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java @@ -410,7 +410,7 @@ public final class StoreQueryUtils { @SuppressWarnings({"unchecked", "rawtypes"}) public static <V> Function<byte[], V> getDeserializeValue(final StateSerdes<?, V> serdes, final StateStore wrapped) { final Serde<V> valueSerde = serdes.valueSerde(); - final boolean timestamped = WrappedStateStore.isTimestamped(wrapped); + final boolean timestamped = WrappedStateStore.isTimestamped(wrapped) || isAdapter(wrapped); final Deserializer<V> deserializer; if (!timestamped && valueSerde instanceof ValueAndTimestampSerde) { final ValueAndTimestampDeserializer valueAndTimestampDeserializer = @@ -422,6 +422,16 @@ public final class StoreQueryUtils { return byteArray -> deserializer.deserialize(serdes.topic(), byteArray); } + public static boolean isAdapter(final StateStore stateStore) { + if (stateStore instanceof KeyValueToTimestampedKeyValueByteStoreAdapter) { + return true; + } else if (stateStore instanceof WrappedStateStore) { + return isAdapter(((WrappedStateStore) stateStore).wrapped()); + } else { + return false; + } + } + @SuppressWarnings({"unchecked", "rawtypes"}) public static <V> Function<VersionedRecord<byte[]>, VersionedRecord<V>> getDeserializeValue(final StateSerdes<?, V> serdes) { final Serde<V> valueSerde = serdes.valueSerde();