mjsax commented on code in PR #21580:
URL: https://github.com/apache/kafka/pull/21580#discussion_r2922784937


##########
streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueStoreWrapper.java:
##########
@@ -45,70 +48,81 @@ public class KeyValueStoreWrapper<K, V> implements 
StateStore {
     public static final long PUT_RETURN_CODE_IS_LATEST
         = VersionedKeyValueStore.PUT_RETURN_CODE_VALID_TO_UNDEFINED;
 
-    private TimestampedKeyValueStore<K, V> timestampedStore = null;
+    private TimestampedKeyValueStoreWithHeaders<K, V> headersStore = null;
     private VersionedKeyValueStore<K, V> versionedStore = null;
 
-    // same as either timestampedStore or versionedStore above. kept merely as 
a convenience
-    // to simplify implementation for methods which do not depend on store 
type.
     private StateStore store;
 
+    @SuppressWarnings("unchecked")
     public KeyValueStoreWrapper(final ProcessorContext<?, ?> context, final 
String storeName) {
+        final StateStore rawStore = context.getStateStore(storeName);
+
+        // Check if it's an OLD TimestampedKeyValueStore that needs adaptation
+        if (rawStore instanceof TimestampedKeyValueStore &&
+            !(rawStore instanceof TimestampedKeyValueStoreWithHeaders)) {
+            // Adapt OLD store to NEW type for backward compatibility
+            headersStore = new TimestampedKeyValueStoreToHeadersAdapter<>(

Review Comment:
   Not sure if I understand. If the user does a manual `addStateStore` this 
store should only be used by custom `Processors`, not any pre-defined DSL 
Processor? So it's not clear to me, how/why things might break and why we need 
an adaptor for this case? Can you elaborat?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to