mjsax commented on code in PR #21580:
URL: https://github.com/apache/kafka/pull/21580#discussion_r2922801509


##########
streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueStoreWrapper.java:
##########
@@ -45,70 +48,81 @@ public class KeyValueStoreWrapper<K, V> implements 
StateStore {
     public static final long PUT_RETURN_CODE_IS_LATEST
         = VersionedKeyValueStore.PUT_RETURN_CODE_VALID_TO_UNDEFINED;
 
-    private TimestampedKeyValueStore<K, V> timestampedStore = null;
+    private TimestampedKeyValueStoreWithHeaders<K, V> headersStore = null;
     private VersionedKeyValueStore<K, V> versionedStore = null;
 
-    // same as either timestampedStore or versionedStore above. kept merely as 
a convenience
-    // to simplify implementation for methods which do not depend on store 
type.
     private StateStore store;
 
+    @SuppressWarnings("unchecked")
     public KeyValueStoreWrapper(final ProcessorContext<?, ?> context, final 
String storeName) {
+        final StateStore rawStore = context.getStateStore(storeName);
+
+        // Check if it's an OLD TimestampedKeyValueStore that needs adaptation
+        if (rawStore instanceof TimestampedKeyValueStore &&
+            !(rawStore instanceof TimestampedKeyValueStoreWithHeaders)) {
+            // Adapt OLD store to NEW type for backward compatibility
+            headersStore = new TimestampedKeyValueStoreToHeadersAdapter<>(
+                (TimestampedKeyValueStore<K, V>) rawStore
+            );
+            store = headersStore;
+            return;
+        }
+
+        // Try headers-aware timestamped store
         try {
-            // first try timestamped store
-            timestampedStore = context.getStateStore(storeName);
-            store = timestampedStore;
+            headersStore = (TimestampedKeyValueStoreWithHeaders<K, V>) 
rawStore;
+            store = headersStore;
             return;
         } catch (final ClassCastException e) {
-            // ignore since could be versioned store instead
+            // not headers store, try versioned
         }
 
+        // Try versioned store
         try {
-            // next try versioned store
-            versionedStore = context.getStateStore(storeName);
+            versionedStore = (VersionedKeyValueStore<K, V>) rawStore;
             store = versionedStore;
         } catch (final ClassCastException e) {
-            store = context.getStateStore(storeName);
-            final String storeType = store == null ? "null" : 
store.getClass().getName();
+            final String storeType = rawStore == null ? "null" : 
rawStore.getClass().getName();
             throw new InvalidStateStoreException("KTable source state store 
must implement either "
-                + "TimestampedKeyValueStore or VersionedKeyValueStore. Got: " 
+ storeType);
+                + "TimestampedKeyValueStore, 
TimestampedKeyValueStoreWithHeaders, or VersionedKeyValueStore. Got: " + 
storeType);
         }
     }
 
-    public ValueAndTimestamp<V> get(final K key) {
-        if (timestampedStore != null) {
-            return timestampedStore.get(key);
+    public ValueTimestampHeaders<V> get(final K key) {
+        if (headersStore != null) {
+            return headersStore.get(key);
         }
         if (versionedStore != null) {
             final VersionedRecord<V> versionedRecord = versionedStore.get(key);
             return versionedRecord == null
                 ? null
-                : ValueAndTimestamp.make(versionedRecord.value(), 
versionedRecord.timestamp());
+                : ValueTimestampHeaders.make(versionedRecord.value(), 
versionedRecord.timestamp(), null);
         }
-        throw new IllegalStateException("KeyValueStoreWrapper must be 
initialized with either timestamped or versioned store");
+        throw new IllegalStateException("KeyValueStoreWrapper must be 
initialized with either timestamped, headers, or versioned store");
     }
 
-    public ValueAndTimestamp<V> get(final K key, final long asOfTimestamp) {
+    public ValueTimestampHeaders<V> get(final K key, final long asOfTimestamp) 
{
         if (!isVersionedStore()) {
             throw new UnsupportedOperationException("get(key, timestamp) is 
only supported for versioned stores");
         }
         final VersionedRecord<V> versionedRecord = versionedStore.get(key, 
asOfTimestamp);
-        return versionedRecord == null ? null : 
ValueAndTimestamp.make(versionedRecord.value(), versionedRecord.timestamp());
+        return versionedRecord == null ? null : 
ValueTimestampHeaders.make(versionedRecord.value(), 
versionedRecord.timestamp(), null);

Review Comment:
   `null` ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to