mjsax commented on code in PR #21580:
URL: https://github.com/apache/kafka/pull/21580#discussion_r2922784937
##########
streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueStoreWrapper.java:
##########
@@ -45,70 +48,81 @@ public class KeyValueStoreWrapper<K, V> implements
StateStore {
public static final long PUT_RETURN_CODE_IS_LATEST
= VersionedKeyValueStore.PUT_RETURN_CODE_VALID_TO_UNDEFINED;
- private TimestampedKeyValueStore<K, V> timestampedStore = null;
+ private TimestampedKeyValueStoreWithHeaders<K, V> headersStore = null;
private VersionedKeyValueStore<K, V> versionedStore = null;
- // same as either timestampedStore or versionedStore above. kept merely as
a convenience
- // to simplify implementation for methods which do not depend on store
type.
private StateStore store;
+ @SuppressWarnings("unchecked")
public KeyValueStoreWrapper(final ProcessorContext<?, ?> context, final
String storeName) {
+ final StateStore rawStore = context.getStateStore(storeName);
+
+ // Check if it's an OLD TimestampedKeyValueStore that needs adaptation
+ if (rawStore instanceof TimestampedKeyValueStore &&
+ !(rawStore instanceof TimestampedKeyValueStoreWithHeaders)) {
+ // Adapt OLD store to NEW type for backward compatibility
+ headersStore = new TimestampedKeyValueStoreToHeadersAdapter<>(
Review Comment:
Not sure if I understand. If the user does a manual `addStateStore` this
store should only be used by custom `Processors`, not any pre-defined DSL
Processor? So it's not clear to me, how/why things might break and why we need
an adaptor for this case? Can you elaborat?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]