This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch 3.7
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.7 by this push:
     new 8b9715bb70d KAFKA-16141: Fix StreamsStandbyTask system test (#15217)
8b9715bb70d is described below

commit 8b9715bb70d12e8e692090e1cb5386946f6b0bf3
Author: Matthias J. Sax <matth...@confluent.io>
AuthorDate: Fri Jan 19 09:23:42 2024 -0800

    KAFKA-16141: Fix StreamsStandbyTask system test (#15217)
    
    KAFKA-15629 added `TimestampedByteStore` interface to
    `KeyValueToTimestampedKeyValueByteStoreAdapter` which break the restore
    code path and thus some system tests.
    
    This PR reverts this change for now.
    
    Reviewers: Almog Gavra <almog.ga...@gmail.com>, Walker Carlson 
<wcarl...@confluent.io>
---
 .../kafka/streams/state/internals/CachingKeyValueStore.java  |  2 +-
 .../KeyValueToTimestampedKeyValueByteStoreAdapter.java       |  3 +--
 .../kafka/streams/state/internals/StoreQueryUtils.java       | 12 +++++++++++-
 3 files changed, 13 insertions(+), 4 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
index 8ba3052a9f4..36f08412828 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
@@ -187,7 +187,7 @@ public class CachingKeyValueStore
             final LRUCacheEntry lruCacheEntry = context.cache().get(cacheName, 
key);
             if (lruCacheEntry != null) {
                 final byte[] rawValue;
-                if (timestampedSchema && 
!WrappedStateStore.isTimestamped(wrapped())) {
+                if (timestampedSchema && 
!WrappedStateStore.isTimestamped(wrapped()) && 
!StoreQueryUtils.isAdapter(wrapped())) {
                     rawValue = 
ValueAndTimestampDeserializer.rawValue(lruCacheEntry.value());
                 } else {
                     rawValue = lruCacheEntry.value();
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueToTimestampedKeyValueByteStoreAdapter.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueToTimestampedKeyValueByteStoreAdapter.java
index 592c178274f..8c1a7f15eee 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueToTimestampedKeyValueByteStoreAdapter.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueToTimestampedKeyValueByteStoreAdapter.java
@@ -35,7 +35,6 @@ import 
org.apache.kafka.streams.query.internals.InternalQueryResultUtil;
 import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
-import org.apache.kafka.streams.state.TimestampedBytesStore;
 
 import java.util.List;
 
@@ -54,7 +53,7 @@ import static 
org.apache.kafka.streams.state.internals.ValueAndTimestampDeserial
  */
 
 @SuppressWarnings("unchecked")
-public class KeyValueToTimestampedKeyValueByteStoreAdapter implements 
KeyValueStore<Bytes, byte[]>, TimestampedBytesStore {
+public class KeyValueToTimestampedKeyValueByteStoreAdapter implements 
KeyValueStore<Bytes, byte[]> {
     final KeyValueStore<Bytes, byte[]> store;
 
     KeyValueToTimestampedKeyValueByteStoreAdapter(final KeyValueStore<Bytes, 
byte[]> store) {
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java
index 255f597600a..1609e8b2c5d 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java
@@ -410,7 +410,7 @@ public final class StoreQueryUtils {
     @SuppressWarnings({"unchecked", "rawtypes"})
     public static <V> Function<byte[], V> getDeserializeValue(final 
StateSerdes<?, V> serdes, final StateStore wrapped) {
         final Serde<V> valueSerde = serdes.valueSerde();
-        final boolean timestamped = WrappedStateStore.isTimestamped(wrapped);
+        final boolean timestamped = WrappedStateStore.isTimestamped(wrapped) 
|| isAdapter(wrapped);
         final Deserializer<V> deserializer;
         if (!timestamped && valueSerde instanceof ValueAndTimestampSerde) {
             final ValueAndTimestampDeserializer valueAndTimestampDeserializer =
@@ -422,6 +422,16 @@ public final class StoreQueryUtils {
         return byteArray -> deserializer.deserialize(serdes.topic(), 
byteArray);
     }
 
+    public static boolean isAdapter(final StateStore stateStore) {
+        if (stateStore instanceof 
KeyValueToTimestampedKeyValueByteStoreAdapter) {
+            return true;
+        } else if (stateStore instanceof WrappedStateStore) {
+            return isAdapter(((WrappedStateStore) stateStore).wrapped());
+        } else {
+            return false;
+        }
+    }
+
     @SuppressWarnings({"unchecked", "rawtypes"})
     public static <V> Function<VersionedRecord<byte[]>, VersionedRecord<V>> 
getDeserializeValue(final StateSerdes<?, V> serdes) {
         final Serde<V> valueSerde = serdes.valueSerde();

Reply via email to