Re: [PR] KAFKA-20194: Change KeyValueStoreWrapper to have versioned and headersStore [kafka]
mjsax commented on code in PR #21580:
URL: https://github.com/apache/kafka/pull/21580#discussion_r2943207276
##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java:
##
@@ -109,16 +111,18 @@ private VOut computeValue(final KIn key, final VIn value)
{
return newValue;
}
-private ValueAndTimestamp computeValueAndTimestamp(final KIn key,
final ValueAndTimestamp valueAndTimestamp) {
+private ValueTimestampHeaders computeValueAndTimestamp(final KIn
key, final ValueTimestampHeaders valueTimestampHeaders) {
VOut newValue = null;
long timestamp = 0;
+Headers headers = null;
-if (valueAndTimestamp != null) {
-newValue = mapper.apply(key, valueAndTimestamp.value());
-timestamp = valueAndTimestamp.timestamp();
+if (valueTimestampHeaders != null) {
+newValue = mapper.apply(key, valueTimestampHeaders.value());
+timestamp = valueTimestampHeaders.timestamp();
+headers = valueTimestampHeaders.headers();
}
Review Comment:
If `valueTimestampHeaders == null`, should we set `headers =
context.headers()` ?
##
streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueStoreWrapper.java:
##
@@ -45,70 +49,72 @@ public class KeyValueStoreWrapper implements
StateStore {
public static final long PUT_RETURN_CODE_IS_LATEST
= VersionedKeyValueStore.PUT_RETURN_CODE_VALID_TO_UNDEFINED;
-private TimestampedKeyValueStore timestampedStore = null;
+private TimestampedKeyValueStoreWithHeaders headersStore = null;
private VersionedKeyValueStore versionedStore = null;
// same as either timestampedStore or versionedStore above. kept merely as
a convenience
// to simplify implementation for methods which do not depend on store
type.
private StateStore store;
+@SuppressWarnings("unchecked")
public KeyValueStoreWrapper(final ProcessorContext context, final
String storeName) {
+final StateStore rawStore = context.getStateStore(storeName);
Review Comment:
Why do we introduce `rawStore` ? The leads to the new to add
`@SuppressWarnings("unchecked")` and cast below.
##
streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueStoreWrapper.java:
##
@@ -45,70 +49,72 @@ public class KeyValueStoreWrapper implements
StateStore {
public static final long PUT_RETURN_CODE_IS_LATEST
= VersionedKeyValueStore.PUT_RETURN_CODE_VALID_TO_UNDEFINED;
-private TimestampedKeyValueStore timestampedStore = null;
+private TimestampedKeyValueStoreWithHeaders headersStore = null;
private VersionedKeyValueStore versionedStore = null;
// same as either timestampedStore or versionedStore above. kept merely as
a convenience
// to simplify implementation for methods which do not depend on store
type.
private StateStore store;
+@SuppressWarnings("unchecked")
public KeyValueStoreWrapper(final ProcessorContext context, final
String storeName) {
+final StateStore rawStore = context.getStateStore(storeName);
+
+// Try headers-aware timestamped store
try {
-// first try timestamped store
-timestampedStore = context.getStateStore(storeName);
-store = timestampedStore;
+headersStore = (TimestampedKeyValueStoreWithHeaders)
rawStore;
+store = headersStore;
return;
} catch (final ClassCastException e) {
-// ignore since could be versioned store instead
+// not headers store, try versioned
}
+// Try versioned store
try {
-// next try versioned store
-versionedStore = context.getStateStore(storeName);
+versionedStore = (VersionedKeyValueStore) rawStore;
store = versionedStore;
} catch (final ClassCastException e) {
-store = context.getStateStore(storeName);
-final String storeType = store == null ? "null" :
store.getClass().getName();
+final String storeType = rawStore == null ? "null" :
rawStore.getClass().getName();
throw new InvalidStateStoreException("KTable source state store
must implement either "
-+ "TimestampedKeyValueStore or VersionedKeyValueStore. Got: "
+ storeType);
++ "TimestampedKeyValueStore,
TimestampedKeyValueStoreWithHeaders, or VersionedKeyValueStore. Got: " +
storeType);
Review Comment:
```suggestion
+ "TimestampedKeyValueStoreWithHeaders or
VersionedKeyValueStore. Got: " + storeType);
```
##
streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueStoreWrapper.java:
##
@@ -45,70 +49,72 @@ public class KeyValueStoreWrapper implements
StateStore {
public static final long PUT_RETURN_CODE_IS_LATEST
= VersionedKeyV
Re: [PR] KAFKA-20194: Change KeyValueStoreWrapper to have versioned and headersStore [kafka]
aliehsaeedii commented on code in PR #21580: URL: https://github.com/apache/kafka/pull/21580#discussion_r2942992944 ## streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java: ## @@ -1222,7 +1222,7 @@ public void shouldUseSpecifiedStoreSupplierForEachOuterJoinOperationBetweenKStre final ProcessorTopology topology = builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(props)).buildTopology(); assertTypesForStateStore(topology.stateStores(), InMemoryWindowStore.class, -RocksDBWindowStore.class, +PlainToHeadersWindowStoreAdapter.class, Review Comment: We can either fix the test or change the implementation of PlainToHeadersWindowStoreAdapter -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] KAFKA-20194: Change KeyValueStoreWrapper to have versioned and headersStore [kafka]
aliehsaeedii commented on code in PR #21580:
URL: https://github.com/apache/kafka/pull/21580#discussion_r2936949422
##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableTransformValues.java:
##
@@ -192,9 +192,11 @@ private ValueAndTimestamp transformValue(final K
key, final ValueAndTimest
new RecordHeaders()
));
-final ValueAndTimestamp result = ValueAndTimestamp.make(
-valueTransformer.transform(key,
getValueOrNull(valueAndTimestamp)),
-valueAndTimestamp == null ? UNKNOWN :
valueAndTimestamp.timestamp());
+final ValueTimestampHeaders result =
ValueTimestampHeaders.make(
+valueTransformer.transform(key,
getValueOrNull(valueTimestampHeaders)),
+valueTimestampHeaders == null ? UNKNOWN :
valueTimestampHeaders.timestamp(),
+valueTimestampHeaders == null ? null :
valueTimestampHeaders.headers()
Review Comment:
Let's agree on `headers()`.
##
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java:
##
@@ -368,7 +368,8 @@ public void registerStore(final StateStore store,
@Override
public StateStore store(final String name) {
if (stores.containsKey(name)) {
-return stores.get(name).stateStore;
+final StateStore stateStore = stores.get(name).stateStore;
+return stateStore;
Review Comment:
left over from debugging phase:(
##
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java:
##
@@ -181,7 +181,8 @@ public S getStateStore(final String
name) {
}
final StateStore store = stateManager.store(name);
-return (S) wrapWithReadWriteStore(store);
+final StateStore wrappedStore = wrapWithReadWriteStore(store);
+return (S) wrappedStore;
Review Comment:
left over from debugging phase:(
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] KAFKA-20194: Change KeyValueStoreWrapper to have versioned and headersStore [kafka]
mjsax commented on code in PR #21580:
URL: https://github.com/apache/kafka/pull/21580#discussion_r2922801509
##
streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueStoreWrapper.java:
##
@@ -45,70 +48,81 @@ public class KeyValueStoreWrapper implements
StateStore {
public static final long PUT_RETURN_CODE_IS_LATEST
= VersionedKeyValueStore.PUT_RETURN_CODE_VALID_TO_UNDEFINED;
-private TimestampedKeyValueStore timestampedStore = null;
+private TimestampedKeyValueStoreWithHeaders headersStore = null;
private VersionedKeyValueStore versionedStore = null;
-// same as either timestampedStore or versionedStore above. kept merely as
a convenience
-// to simplify implementation for methods which do not depend on store
type.
private StateStore store;
+@SuppressWarnings("unchecked")
public KeyValueStoreWrapper(final ProcessorContext context, final
String storeName) {
+final StateStore rawStore = context.getStateStore(storeName);
+
+// Check if it's an OLD TimestampedKeyValueStore that needs adaptation
+if (rawStore instanceof TimestampedKeyValueStore &&
+!(rawStore instanceof TimestampedKeyValueStoreWithHeaders)) {
+// Adapt OLD store to NEW type for backward compatibility
+headersStore = new TimestampedKeyValueStoreToHeadersAdapter<>(
+(TimestampedKeyValueStore) rawStore
+);
+store = headersStore;
+return;
+}
+
+// Try headers-aware timestamped store
try {
-// first try timestamped store
-timestampedStore = context.getStateStore(storeName);
-store = timestampedStore;
+headersStore = (TimestampedKeyValueStoreWithHeaders)
rawStore;
+store = headersStore;
return;
} catch (final ClassCastException e) {
-// ignore since could be versioned store instead
+// not headers store, try versioned
}
+// Try versioned store
try {
-// next try versioned store
-versionedStore = context.getStateStore(storeName);
+versionedStore = (VersionedKeyValueStore) rawStore;
store = versionedStore;
} catch (final ClassCastException e) {
-store = context.getStateStore(storeName);
-final String storeType = store == null ? "null" :
store.getClass().getName();
+final String storeType = rawStore == null ? "null" :
rawStore.getClass().getName();
throw new InvalidStateStoreException("KTable source state store
must implement either "
-+ "TimestampedKeyValueStore or VersionedKeyValueStore. Got: "
+ storeType);
++ "TimestampedKeyValueStore,
TimestampedKeyValueStoreWithHeaders, or VersionedKeyValueStore. Got: " +
storeType);
}
}
-public ValueAndTimestamp get(final K key) {
-if (timestampedStore != null) {
-return timestampedStore.get(key);
+public ValueTimestampHeaders get(final K key) {
+if (headersStore != null) {
+return headersStore.get(key);
}
if (versionedStore != null) {
final VersionedRecord versionedRecord = versionedStore.get(key);
return versionedRecord == null
? null
-: ValueAndTimestamp.make(versionedRecord.value(),
versionedRecord.timestamp());
+: ValueTimestampHeaders.make(versionedRecord.value(),
versionedRecord.timestamp(), null);
}
-throw new IllegalStateException("KeyValueStoreWrapper must be
initialized with either timestamped or versioned store");
+throw new IllegalStateException("KeyValueStoreWrapper must be
initialized with either timestamped, headers, or versioned store");
}
-public ValueAndTimestamp get(final K key, final long asOfTimestamp) {
+public ValueTimestampHeaders get(final K key, final long asOfTimestamp)
{
if (!isVersionedStore()) {
throw new UnsupportedOperationException("get(key, timestamp) is
only supported for versioned stores");
}
final VersionedRecord versionedRecord = versionedStore.get(key,
asOfTimestamp);
-return versionedRecord == null ? null :
ValueAndTimestamp.make(versionedRecord.value(), versionedRecord.timestamp());
+return versionedRecord == null ? null :
ValueTimestampHeaders.make(versionedRecord.value(),
versionedRecord.timestamp(), null);
Review Comment:
`null` ?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] KAFKA-20194: Change KeyValueStoreWrapper to have versioned and headersStore [kafka]
mjsax commented on code in PR #21580:
URL: https://github.com/apache/kafka/pull/21580#discussion_r2922784937
##
streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueStoreWrapper.java:
##
@@ -45,70 +48,81 @@ public class KeyValueStoreWrapper implements
StateStore {
public static final long PUT_RETURN_CODE_IS_LATEST
= VersionedKeyValueStore.PUT_RETURN_CODE_VALID_TO_UNDEFINED;
-private TimestampedKeyValueStore timestampedStore = null;
+private TimestampedKeyValueStoreWithHeaders headersStore = null;
private VersionedKeyValueStore versionedStore = null;
-// same as either timestampedStore or versionedStore above. kept merely as
a convenience
-// to simplify implementation for methods which do not depend on store
type.
private StateStore store;
+@SuppressWarnings("unchecked")
public KeyValueStoreWrapper(final ProcessorContext context, final
String storeName) {
+final StateStore rawStore = context.getStateStore(storeName);
+
+// Check if it's an OLD TimestampedKeyValueStore that needs adaptation
+if (rawStore instanceof TimestampedKeyValueStore &&
+!(rawStore instanceof TimestampedKeyValueStoreWithHeaders)) {
+// Adapt OLD store to NEW type for backward compatibility
+headersStore = new TimestampedKeyValueStoreToHeadersAdapter<>(
Review Comment:
Not sure if I understand. If the user does a manual `addStateStore` this
store should only be used by custom `Processors`, not any pre-defined DSL
Processor? So it's not clear to me, how/why things might break and why we need
an adaptor for this case? Can you elaborat?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] KAFKA-20194: Change KeyValueStoreWrapper to have versioned and headersStore [kafka]
mjsax commented on code in PR #21580:
URL: https://github.com/apache/kafka/pull/21580#discussion_r2922784937
##
streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueStoreWrapper.java:
##
@@ -45,70 +48,81 @@ public class KeyValueStoreWrapper implements
StateStore {
public static final long PUT_RETURN_CODE_IS_LATEST
= VersionedKeyValueStore.PUT_RETURN_CODE_VALID_TO_UNDEFINED;
-private TimestampedKeyValueStore timestampedStore = null;
+private TimestampedKeyValueStoreWithHeaders headersStore = null;
private VersionedKeyValueStore versionedStore = null;
-// same as either timestampedStore or versionedStore above. kept merely as
a convenience
-// to simplify implementation for methods which do not depend on store
type.
private StateStore store;
+@SuppressWarnings("unchecked")
public KeyValueStoreWrapper(final ProcessorContext context, final
String storeName) {
+final StateStore rawStore = context.getStateStore(storeName);
+
+// Check if it's an OLD TimestampedKeyValueStore that needs adaptation
+if (rawStore instanceof TimestampedKeyValueStore &&
+!(rawStore instanceof TimestampedKeyValueStoreWithHeaders)) {
+// Adapt OLD store to NEW type for backward compatibility
+headersStore = new TimestampedKeyValueStoreToHeadersAdapter<>(
Review Comment:
Not sure if I understand. If the user does a manual `addStateStore` this
store should only be used by custom `Processors`, not any pre-defined DSL
Processor? So it's not clear to me, how/why things might break?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] KAFKA-20194: Change KeyValueStoreWrapper to have versioned and headersStore [kafka]
mjsax commented on code in PR #21580:
URL: https://github.com/apache/kafka/pull/21580#discussion_r2922779365
##
streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueStoreWrapper.java:
##
@@ -45,70 +48,81 @@ public class KeyValueStoreWrapper implements
StateStore {
public static final long PUT_RETURN_CODE_IS_LATEST
= VersionedKeyValueStore.PUT_RETURN_CODE_VALID_TO_UNDEFINED;
-private TimestampedKeyValueStore timestampedStore = null;
+private TimestampedKeyValueStoreWithHeaders headersStore = null;
private VersionedKeyValueStore versionedStore = null;
-// same as either timestampedStore or versionedStore above. kept merely as
a convenience
-// to simplify implementation for methods which do not depend on store
type.
private StateStore store;
+@SuppressWarnings("unchecked")
public KeyValueStoreWrapper(final ProcessorContext context, final
String storeName) {
+final StateStore rawStore = context.getStateStore(storeName);
+
+// Check if it's an OLD TimestampedKeyValueStore that needs adaptation
+if (rawStore instanceof TimestampedKeyValueStore &&
+!(rawStore instanceof TimestampedKeyValueStoreWithHeaders)) {
Review Comment:
Do we need this? I don't think that a `TimestampedKeyValueStore` can every
be `TimestampedKeyValueStoreWithHeaders`? They do not inherit from each other,
but both inherit from `KeyValueStore`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] KAFKA-20194: Change KeyValueStoreWrapper to have versioned and headersStore [kafka]
mjsax commented on code in PR #21580:
URL: https://github.com/apache/kafka/pull/21580#discussion_r2922771564
##
streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueStoreWrapper.java:
##
@@ -45,70 +48,81 @@ public class KeyValueStoreWrapper implements
StateStore {
public static final long PUT_RETURN_CODE_IS_LATEST
= VersionedKeyValueStore.PUT_RETURN_CODE_VALID_TO_UNDEFINED;
-private TimestampedKeyValueStore timestampedStore = null;
+private TimestampedKeyValueStoreWithHeaders headersStore = null;
private VersionedKeyValueStore versionedStore = null;
-// same as either timestampedStore or versionedStore above. kept merely as
a convenience
-// to simplify implementation for methods which do not depend on store
type.
Review Comment:
Why are we removing this comment? Seems to be still correct?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] KAFKA-20194: Change KeyValueStoreWrapper to have versioned and headersStore [kafka]
mjsax commented on code in PR #21580:
URL: https://github.com/apache/kafka/pull/21580#discussion_r2922738348
##
streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueChangeBuffer.java:
##
@@ -447,7 +447,7 @@ public Maybe>
priorValueForBuffered(final K key) {
// it's unfortunately not possible to know this, unless we
materialize the suppressed result, since our only
// knowledge of the prior value is what the upstream processor
sends us as the "old value" when we first
// buffer something.
-return Maybe.defined(ValueAndTimestamp.make(deserializedValue,
RecordQueue.UNKNOWN));
+return Maybe.defined(ValueTimestampHeaders.make(deserializedValue,
RecordQueue.UNKNOWN, null));
Review Comment:
nit: `null` -> `new RecordHeaders()` (doesn't make a read difference; just
for code consistency.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] KAFKA-20194: Change KeyValueStoreWrapper to have versioned and headersStore [kafka]
mjsax commented on code in PR #21580:
URL: https://github.com/apache/kafka/pull/21580#discussion_r2922705439
##
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java:
##
@@ -181,7 +181,8 @@ public S getStateStore(final String
name) {
}
final StateStore store = stateManager.store(name);
-return (S) wrapWithReadWriteStore(store);
+final StateStore wrappedStore = wrapWithReadWriteStore(store);
+return (S) wrappedStore;
Review Comment:
Why do we need this change and introduce `wrappedStore` ?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] KAFKA-20194: Change KeyValueStoreWrapper to have versioned and headersStore [kafka]
mjsax commented on code in PR #21580:
URL: https://github.com/apache/kafka/pull/21580#discussion_r2922707044
##
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java:
##
@@ -368,7 +368,8 @@ public void registerStore(final StateStore store,
@Override
public StateStore store(final String name) {
if (stores.containsKey(name)) {
-return stores.get(name).stateStore;
+final StateStore stateStore = stores.get(name).stateStore;
+return stateStore;
Review Comment:
Same question -- not sure why we introduce `stateStore` ?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] KAFKA-20194: Change KeyValueStoreWrapper to have versioned and headersStore [kafka]
mjsax commented on code in PR #21580: URL: https://github.com/apache/kafka/pull/21580#discussion_r2922690557 ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableTransformValues.java: ## @@ -192,9 +192,11 @@ private ValueAndTimestamp transformValue(final K key, final ValueAndTimest new RecordHeaders() )); -final ValueAndTimestamp result = ValueAndTimestamp.make( -valueTransformer.transform(key, getValueOrNull(valueAndTimestamp)), -valueAndTimestamp == null ? UNKNOWN : valueAndTimestamp.timestamp()); +final ValueTimestampHeaders result = ValueTimestampHeaders.make( +valueTransformer.transform(key, getValueOrNull(valueTimestampHeaders)), +valueTimestampHeaders == null ? UNKNOWN : valueTimestampHeaders.timestamp(), +valueTimestampHeaders == null ? null : valueTimestampHeaders.headers() Review Comment: similar question as for `KTable#mapValues` -- do we pass `headers()` or `new RecordHeaders()` ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] KAFKA-20194: Change KeyValueStoreWrapper to have versioned and headersStore [kafka]
mjsax commented on code in PR #21580: URL: https://github.com/apache/kafka/pull/21580#discussion_r2922687859 ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableTransformValues.java: ## @@ -192,9 +192,11 @@ private ValueAndTimestamp transformValue(final K key, final ValueAndTimest new RecordHeaders() Review Comment: Do we want to change this and pass in `valueTimestampHeaders == null ? new RecordHeaders() : valueTimestampHeaders.headers()` ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] KAFKA-20194: Change KeyValueStoreWrapper to have versioned and headersStore [kafka]
mjsax commented on code in PR #21580:
URL: https://github.com/apache/kafka/pull/21580#discussion_r2922680047
##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java:
##
@@ -109,16 +110,18 @@ private VOut computeValue(final KIn key, final VIn value)
{
return newValue;
}
-private ValueAndTimestamp computeValueAndTimestamp(final KIn key,
final ValueAndTimestamp valueAndTimestamp) {
+private ValueTimestampHeaders computeValueAndTimestamp(final KIn
key, final ValueTimestampHeaders valueTimestampHeaders) {
VOut newValue = null;
long timestamp = 0;
+Headers headers = null;
-if (valueAndTimestamp != null) {
-newValue = mapper.apply(key, valueAndTimestamp.value());
-timestamp = valueAndTimestamp.timestamp();
+if (valueTimestampHeaders != null) {
+newValue = mapper.apply(key, valueTimestampHeaders.value());
+timestamp = valueTimestampHeaders.timestamp();
+headers = valueTimestampHeaders.headers();
}
-return ValueAndTimestamp.make(newValue, timestamp);
+return ValueTimestampHeaders.make(newValue, timestamp, headers);
Review Comment:
Whatever you prefer -- it should not make a difference in practice, because
the DSL does never _read_ `Headers` atm.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] KAFKA-20194: Change KeyValueStoreWrapper to have versioned and headersStore [kafka]
aliehsaeedii commented on code in PR #21580:
URL: https://github.com/apache/kafka/pull/21580#discussion_r2921520361
##
streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueStoreWrapper.java:
##
@@ -45,70 +48,81 @@ public class KeyValueStoreWrapper implements
StateStore {
public static final long PUT_RETURN_CODE_IS_LATEST
= VersionedKeyValueStore.PUT_RETURN_CODE_VALID_TO_UNDEFINED;
-private TimestampedKeyValueStore timestampedStore = null;
+private TimestampedKeyValueStoreWithHeaders headersStore = null;
private VersionedKeyValueStore versionedStore = null;
-// same as either timestampedStore or versionedStore above. kept merely as
a convenience
-// to simplify implementation for methods which do not depend on store
type.
private StateStore store;
+@SuppressWarnings("unchecked")
public KeyValueStoreWrapper(final ProcessorContext context, final
String storeName) {
+final StateStore rawStore = context.getStateStore(storeName);
+
+// Check if it's an OLD TimestampedKeyValueStore that needs adaptation
+if (rawStore instanceof TimestampedKeyValueStore &&
+!(rawStore instanceof TimestampedKeyValueStoreWithHeaders)) {
+// Adapt OLD store to NEW type for backward compatibility
+headersStore = new TimestampedKeyValueStoreToHeadersAdapter<>(
Review Comment:
The adapter is needed here for 100% backward compatibility. If the user
specifies the store by `builder.addStateStore(OLD builder)`, the app breaks.
##
streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/HeadersStoreUpgradeIntegrationTest.java:
##
@@ -567,7 +571,7 @@ private void
verifyLegacyValuesWithEmptyHeaders(final K key,
() -> {
try {
final ReadOnlyKeyValueStore>
store = IntegrationTestUtils
-.getStore(STORE_NAME, kafkaStreams,
QueryableStoreTypes.keyValueStore());
Review Comment:
Because of the `Readonly*Facade` classes, the
`QueryableStoreTypes.keyValueStore()` does not work here any more. Those
`Facade`s convert the `ValueTimestampHeaders` to plain value and the test fails.
##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowStoreMaterializer.java:
##
@@ -58,29 +59,22 @@ public WindowStoreMaterializer(
public StoreBuilder builder() {
final DslStoreFormat storeFormat = dslStoreFormat() == null ?
DslStoreFormat.TIMESTAMPED : dslStoreFormat();
final WindowBytesStoreSupplier supplier = materialized.storeSupplier()
== null
-? dslStoreSuppliers().windowStore(new DslWindowParams(
-materialized.storeName(),
-Duration.ofMillis(retentionPeriod),
-Duration.ofMillis(windows.size()),
-false,
-emitStrategy,
-false,
-storeFormat
-))
-: (WindowBytesStoreSupplier) materialized.storeSupplier();
-
-final StoreBuilder builder;
-if (storeFormat == DslStoreFormat.HEADERS) {
-builder = Stores.timestampedWindowStoreWithHeadersBuilder(
-supplier,
-materialized.keySerde(),
-materialized.valueSerde());
-} else {
-builder = Stores.timestampedWindowStoreBuilder(
-supplier,
-materialized.keySerde(),
-materialized.valueSerde());
-}
Review Comment:
I think we keep the supplier as-is, but the builder must always be the new
one!
##
streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueStoreWrapper.java:
##
@@ -45,70 +53,80 @@ public class KeyValueStoreWrapper implements
StateStore {
public static final long PUT_RETURN_CODE_IS_LATEST
= VersionedKeyValueStore.PUT_RETURN_CODE_VALID_TO_UNDEFINED;
-private TimestampedKeyValueStore timestampedStore = null;
+private TimestampedKeyValueStoreWithHeaders headersStore = null;
private VersionedKeyValueStore versionedStore = null;
Review Comment:
Yes the headers versioned store is not implemented yet. Will be added later.
##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java:
##
@@ -109,16 +110,18 @@ private VOut computeValue(final KIn key, final VIn value)
{
return newValue;
}
-private ValueAndTimestamp computeValueAndTimestamp(final KIn key,
final ValueAndTimestamp valueAndTimestamp) {
+private ValueTimestampHeaders computeValueAndTimestamp(final KIn
key, final ValueTimestampHeaders valueTimestampHeaders) {
VOut newValue = null;
long timestamp = 0;
+Headers headers = null;
-if (valueAndTimestamp != null) {
-newValue = mapper.apply(key, va
