Re: [PR] KAFKA-20220: Enable TimestampedKVStoreWithHeaders in DSL (2/N) [kafka]

2026-03-05 Thread via GitHub


aliehsaeedii commented on code in PR #21580:
URL: https://github.com/apache/kafka/pull/21580#discussion_r2888533746


##
streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreWithHeadersBuilder.java:
##
@@ -72,7 +72,11 @@ public TimestampedWindowStoreWithHeaders build() {
 
 if (!(store instanceof HeadersBytesStore)) {
 if (store.persistent()) {
-store = new TimestampedToHeadersWindowStoreAdapter(store);
+if (store instanceof TimestampedBytesStore) {
+store = new TimestampedToHeadersWindowStoreAdapter(store);
+} else {
+store = new PlainToHeadersWindowStoreAdapter(store);

Review Comment:
   For both cases we need. Please check `StreamJoinedStoreFactory`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] KAFKA-20220: Enable TimestampedKVStoreWithHeaders in DSL (2/N) [kafka]

2026-03-04 Thread via GitHub


mjsax commented on code in PR #21580:
URL: https://github.com/apache/kafka/pull/21580#discussion_r2887493506


##
streams/src/main/java/org/apache/kafka/streams/state/BuiltInDslStoreSuppliers.java:
##
@@ -63,7 +63,7 @@ public WindowBytesStoreSupplier windowStore(final 
DslWindowParams params) {
 }
 
 if (params.isTimestamped()) {
-return Stores.persistentTimestampedWindowStore(
+return Stores.persistentTimestampedWindowStoreWithHeaders(

Review Comment:
   Why is it correct to return a headers-supplier if `params` request a 
timestamped one? Don't we need to have a `switch(...)` here, similar to 
`keyValueStore` from above?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] KAFKA-20220: Enable TimestampedKVStoreWithHeaders in DSL (2/N) [kafka]

2026-03-04 Thread via GitHub


mjsax commented on code in PR #21580:
URL: https://github.com/apache/kafka/pull/21580#discussion_r2887490946


##
streams/src/main/java/org/apache/kafka/streams/state/BuiltInDslStoreSuppliers.java:
##


Review Comment:
   Don't we need to have a "headers version" for this one, too?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] KAFKA-20220: Enable TimestampedKVStoreWithHeaders in DSL (2/N) [kafka]

2026-03-04 Thread via GitHub


mjsax commented on code in PR #21580:
URL: https://github.com/apache/kafka/pull/21580#discussion_r2887483817


##
streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreWithHeadersBuilder.java:
##
@@ -72,7 +72,11 @@ public TimestampedWindowStoreWithHeaders build() {
 
 if (!(store instanceof HeadersBytesStore)) {
 if (store.persistent()) {
-store = new TimestampedToHeadersWindowStoreAdapter(store);
+if (store instanceof TimestampedBytesStore) {
+store = new TimestampedToHeadersWindowStoreAdapter(store);
+} else {
+store = new PlainToHeadersWindowStoreAdapter(store);

Review Comment:
   Same question: do we actually need this for the window-store case? I though 
we only need it for kv-store case?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] KAFKA-20220: Enable TimestampedKVStoreWithHeaders in DSL (2/N) [kafka]

2026-03-04 Thread via GitHub


mjsax commented on code in PR #21580:
URL: https://github.com/apache/kafka/pull/21580#discussion_r2887480432


##
streams/src/main/java/org/apache/kafka/streams/state/internals/PlainToHeadersWindowStoreAdapter.java:
##
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.ByteUtils;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.query.Position;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryConfig;
+import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.TimestampedWindowStoreWithHeaders;
+import org.apache.kafka.streams.state.WindowStore;
+import org.apache.kafka.streams.state.WindowStoreIterator;
+
+import java.nio.ByteBuffer;
+import java.time.Instant;
+import java.util.Map;
+
+import static 
org.apache.kafka.streams.state.HeadersBytesStore.convertFromPlainToHeaderFormat;
+
+/**
+ * Adapter for backward compatibility between {@link 
TimestampedWindowStoreWithHeaders}
+ * and {@link WindowStore}.
+ * 
+ * If a user provides a supplier for {@code WindowStore} (without timestamp 
and headers) when building
+ * a {@code TimestampedWindowStoreWithHeaders}, this adapter translates 
between the plain
+ * {@code byte[]} format and the timestamped-with-headers {@code byte[]} 
format.
+ * 
+ * Format conversion:
+ * 
+ *   Write: {@code [headers][timestamp][value]} → {@code [value]} (strip 
timestamp and headers)
+ *   Read: {@code [value]} → {@code [headers][timestamp][value]} (add -1 
as timestamp and empty headers)
+ * 
+ */
+public class PlainToHeadersWindowStoreAdapter implements WindowStore {

Review Comment:
   Alieh, can you clarify? I thought, we would need to add upgrading from 
plain-kv to header-kv store only? But not for window-store case? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] KAFKA-20220: Enable TimestampedKVStoreWithHeaders in DSL (2/N) [kafka]

2026-03-04 Thread via GitHub


mjsax commented on code in PR #21580:
URL: https://github.com/apache/kafka/pull/21580#discussion_r2887467634


##
streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueStoreWrapper.java:
##
@@ -45,70 +48,70 @@ public class KeyValueStoreWrapper implements 
StateStore {
 public static final long PUT_RETURN_CODE_IS_LATEST
 = VersionedKeyValueStore.PUT_RETURN_CODE_VALID_TO_UNDEFINED;
 
-private TimestampedKeyValueStore timestampedStore = null;
+private TimestampedKeyValueStoreWithHeaders headersStore = null;
 private VersionedKeyValueStore versionedStore = null;
 
-// same as either timestampedStore or versionedStore above. kept merely as 
a convenience
-// to simplify implementation for methods which do not depend on store 
type.
 private StateStore store;
 
 public KeyValueStoreWrapper(final ProcessorContext context, final 
String storeName) {
+// Try headers-aware store first, then versioned store
 try {
-// first try timestamped store
-timestampedStore = context.getStateStore(storeName);
-store = timestampedStore;
+// first try headers-aware timestamped store
+headersStore = context.getStateStore(storeName);
+store = headersStore;
 return;
 } catch (final ClassCastException e) {
-// ignore since could be versioned store instead
+// ignore since could be regular timestamped or versioned store 
instead
+System.out.println(e.getMessage());

Review Comment:
   Guess it's on left over debug statement that needs to get removed?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] KAFKA-20220: Enable TimestampedKVStoreWithHeaders in DSL (2/N) [kafka]

2026-03-04 Thread via GitHub


mjsax commented on code in PR #21580:
URL: https://github.com/apache/kafka/pull/21580#discussion_r2887463579


##
streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueChangeBuffer.java:
##
@@ -447,7 +447,7 @@ public Maybe> 
priorValueForBuffered(final K key) {
 // it's unfortunately not possible to know this, unless we 
materialize the suppressed result, since our only
 // knowledge of the prior value is what the upstream processor 
sends us as the "old value" when we first
 // buffer something.
-return Maybe.defined(ValueAndTimestamp.make(deserializedValue, 
RecordQueue.UNKNOWN));
+return Maybe.defined(ValueTimestampHeaders.make(deserializedValue, 
RecordQueue.UNKNOWN, new RecordHeaders()));

Review Comment:
   In other places we pass `null` as header as default (what is fine, because 
it's getting replace with `new RecordHeaders()` inside `ValueTimestampHeaders` 
constructor). But I think we should be consistent, so maybe pass in `null` here 
too?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] KAFKA-20220: Enable TimestampedKVStoreWithHeaders in DSL (2/N) [kafka]

2026-03-04 Thread via GitHub


mjsax commented on code in PR #21580:
URL: https://github.com/apache/kafka/pull/21580#discussion_r2887447794


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java:
##
@@ -166,7 +166,8 @@ public void process(final Record record) {
 } else {
 oldValue = null;
 }
-final long putReturnCode = store.put(record.key(), 
record.value(), record.timestamp());
+final long putReturnCode;
+putReturnCode = store.put(record.key(), record.value(), 
record.timestamp(), record.headers());

Review Comment:
   `new RecordHeaders()` ?



##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableTransformValues.java:
##
@@ -120,7 +120,7 @@ public void process(final Record> record) {
 
 if (queryableName != null) {
 final VOut oldValue = sendOldValues ? 
getValueOrNull(store.get(record.key())) : null;
-final long putReturnCode = store.put(record.key(), newValue, 
record.timestamp());
+final long putReturnCode = store.put(record.key(), newValue, 
record.timestamp(), record.headers());

Review Comment:
   `new RecordHeaders()` ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] KAFKA-20220: Enable TimestampedKVStoreWithHeaders in DSL (2/N) [kafka]

2026-03-04 Thread via GitHub


mjsax commented on code in PR #21580:
URL: https://github.com/apache/kafka/pull/21580#discussion_r2887446759


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableReduce.java:
##
@@ -125,7 +125,7 @@ public void process(final Record> record) {
 }
 
 // update the store with the new value
-final long putReturnCode = store.put(record.key(), newAgg, 
newTimestamp);
+final long putReturnCode = store.put(record.key(), newAgg, 
newTimestamp, record.headers());

Review Comment:
   `new RecordHeaders()` ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] KAFKA-20220: Enable TimestampedKVStoreWithHeaders in DSL (2/N) [kafka]

2026-03-04 Thread via GitHub


mjsax commented on code in PR #21580:
URL: https://github.com/apache/kafka/pull/21580#discussion_r2887445584


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java:
##
@@ -109,16 +110,18 @@ private VOut computeValue(final KIn key, final VIn value) 
{
 return newValue;
 }
 
-private ValueAndTimestamp computeValueAndTimestamp(final KIn key, 
final ValueAndTimestamp valueAndTimestamp) {
+private ValueTimestampHeaders computeValueAndTimestamp(final KIn 
key, final ValueTimestampHeaders valueTimestampHeaders) {
 VOut newValue = null;
 long timestamp = 0;
+Headers headers = null;
 
-if (valueAndTimestamp != null) {
-newValue = mapper.apply(key, valueAndTimestamp.value());
-timestamp = valueAndTimestamp.timestamp();
+if (valueTimestampHeaders != null) {
+newValue = mapper.apply(key, valueTimestampHeaders.value());
+timestamp = valueTimestampHeaders.timestamp();
+headers = valueTimestampHeaders.headers();
 }
 
-return ValueAndTimestamp.make(newValue, timestamp);
+return ValueTimestampHeaders.make(newValue, timestamp, headers);

Review Comment:
   I think passing `headers` here is fine, as it's stateless map() operation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] KAFKA-20220: Enable TimestampedKVStoreWithHeaders in DSL (2/N) [kafka]

2026-03-04 Thread via GitHub


mjsax commented on code in PR #21580:
URL: https://github.com/apache/kafka/pull/21580#discussion_r2887442676


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java:
##
@@ -146,7 +149,7 @@ public void process(final Record> record) {
 final VOut oldValue = computeOldValue(record.key(), 
record.value());
 
 if (queryableName != null) {
-final long putReturnCode = store.put(record.key(), newValue, 
record.timestamp());
+final long putReturnCode = store.put(record.key(), newValue, 
record.timestamp(), record.headers());

Review Comment:
   `new RecordHeaders()` ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] KAFKA-20220: Enable TimestampedKVStoreWithHeaders in DSL (2/N) [kafka]

2026-03-04 Thread via GitHub


mjsax commented on code in PR #21580:
URL: https://github.com/apache/kafka/pull/21580#discussion_r2887436202


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinMerger.java:
##
@@ -125,15 +125,15 @@ public void init(final ProcessorContext> 
context) {
 tupleForwarder = new TimestampedTupleForwarder<>(
 store.store(),
 context,
-new TimestampedCacheFlushListener<>(context),
+new TimestampedCacheFlushListenerWithHeaders<>(context),
 sendOldValues);
 }
 }
 
 @Override
 public void process(final Record> record) {
 if (queryableName != null) {
-final long putReturnCode = store.put(record.key(), 
record.value().newValue, record.timestamp());
+final long putReturnCode = store.put(record.key(), 
record.value().newValue, record.timestamp(), record.headers());

Review Comment:
   `new RecordHeaders()` ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] KAFKA-20220: Enable TimestampedKVStoreWithHeaders in DSL (2/N) [kafka]

2026-03-04 Thread via GitHub


mjsax commented on code in PR #21580:
URL: https://github.com/apache/kafka/pull/21580#discussion_r2887434921


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java:
##
@@ -147,7 +147,7 @@ public void process(final Record> record) {
 }
 
 if (queryableName != null) {
-final long putReturnCode = store.put(key, newValue, 
record.timestamp());
+final long putReturnCode = store.put(key, newValue, 
record.timestamp(), record.headers());

Review Comment:
   `new RecordHeaders()` ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] KAFKA-20220: Enable TimestampedKVStoreWithHeaders in DSL (2/N) [kafka]

2026-03-04 Thread via GitHub


mjsax commented on code in PR #21580:
URL: https://github.com/apache/kafka/pull/21580#discussion_r2887434124


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java:
##
@@ -143,13 +143,13 @@ public void process(final Record record) {
 oldAgg = initializer.apply();
 newTimestamp = record.timestamp();
 } else {
-newTimestamp = Math.max(record.timestamp(), 
oldAggAndTimestamp.timestamp());
+newTimestamp = Math.max(record.timestamp(), 
oldAggTimestampHeaders.timestamp());
 }
 
 newAgg = aggregator.apply(record.key(), record.value(), 
oldAgg);
 
 // update the store with the new value
-windowStore.put(record.key(), 
ValueAndTimestamp.make(newAgg, newTimestamp), windowStart);
+windowStore.put(record.key(), 
ValueTimestampHeaders.make(newAgg, newTimestamp, record.headers()), 
windowStart);

Review Comment:
   `new RecordHeaders()` ?



##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java:
##
@@ -133,7 +133,7 @@ public void process(final Record> record) {
 }
 
 // update the store with the new value
-final long putReturnCode = store.put(record.key(), newAgg, 
newTimestamp);
+final long putReturnCode = store.put(record.key(), newAgg, 
newTimestamp, record.headers());

Review Comment:
   `new RecordHeaders()` ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] KAFKA-20220: Enable TimestampedKVStoreWithHeaders in DSL (2/N) [kafka]

2026-03-04 Thread via GitHub


mjsax commented on code in PR #21580:
URL: https://github.com/apache/kafka/pull/21580#discussion_r2887433056


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java:
##
@@ -457,21 +458,22 @@ protected boolean shouldRangeFetch(final long 
emitRangeLowerBound, final long em
 }
 
 private void updateWindowAndForward(final Window window,
-final ValueAndTimestamp 
valueAndTime,
+final ValueTimestampHeaders 
valueTimestampHeaders,
 final Record record,
 final long windowCloseTime) {
 final long windowStart = window.start();
 final long windowEnd = window.end();
 
 if (windowEnd >= windowCloseTime) {
 // get aggregate from existing window
-final VAgg oldAgg = getValueOrNull(valueAndTime);
+final VAgg oldAgg = getValueOrNull(valueTimestampHeaders);
 final VAgg newAgg = aggregator.apply(record.key(), 
record.value(), oldAgg);
 
-final long newTimestamp = oldAgg == null ? record.timestamp() 
: Math.max(record.timestamp(), valueAndTime.timestamp());
+final long newTimestamp = oldAgg == null ? record.timestamp() 
: Math.max(record.timestamp(), valueTimestampHeaders.timestamp());
+final Headers headers  =  oldAgg == null ? record.headers() : 
valueTimestampHeaders.headers();
 windowStore.put(
 record.key(),
-ValueAndTimestamp.make(newAgg, newTimestamp),
+ValueTimestampHeaders.make(newAgg, newTimestamp, headers),

Review Comment:
   `new RecordHeaders()` ?



##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java:
##
@@ -457,21 +458,22 @@ protected boolean shouldRangeFetch(final long 
emitRangeLowerBound, final long em
 }
 
 private void updateWindowAndForward(final Window window,
-final ValueAndTimestamp 
valueAndTime,
+final ValueTimestampHeaders 
valueTimestampHeaders,
 final Record record,
 final long windowCloseTime) {
 final long windowStart = window.start();
 final long windowEnd = window.end();
 
 if (windowEnd >= windowCloseTime) {
 // get aggregate from existing window
-final VAgg oldAgg = getValueOrNull(valueAndTime);
+final VAgg oldAgg = getValueOrNull(valueTimestampHeaders);
 final VAgg newAgg = aggregator.apply(record.key(), 
record.value(), oldAgg);
 
-final long newTimestamp = oldAgg == null ? record.timestamp() 
: Math.max(record.timestamp(), valueAndTime.timestamp());
+final long newTimestamp = oldAgg == null ? record.timestamp() 
: Math.max(record.timestamp(), valueTimestampHeaders.timestamp());
+final Headers headers  =  oldAgg == null ? record.headers() : 
valueTimestampHeaders.headers();

Review Comment:
   I think we don't need this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] KAFKA-20220: Enable TimestampedKVStoreWithHeaders in DSL (2/N) [kafka]

2026-03-04 Thread via GitHub


mjsax commented on code in PR #21580:
URL: https://github.com/apache/kafka/pull/21580#discussion_r2887431893


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java:
##
@@ -386,14 +387,14 @@ private void createWindows(final Record record,
 
 // create left window for new record
 if (!leftWinAlreadyCreated) {
-final ValueAndTimestamp valueAndTime;
+final ValueTimestampHeaders valueTimestampHeaders;
 if (leftWindowNotEmpty(previousRecordTimestamp, 
record.timestamp())) {
-valueAndTime = ValueAndTimestamp.make(leftWinAgg.value(), 
record.timestamp());
+valueTimestampHeaders = 
ValueTimestampHeaders.make(leftWinAgg.value(), record.timestamp(), 
record.headers());
 } else {
-valueAndTime = ValueAndTimestamp.make(initializer.apply(), 
record.timestamp());
+valueTimestampHeaders = 
ValueTimestampHeaders.make(initializer.apply(), record.timestamp(), 
record.headers());

Review Comment:
   `new RecordHeaders()` ?



##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java:
##
@@ -417,8 +418,8 @@ private void createPreviousRecordRightWindow(final long 
windowStart,
  final Record 
record,
  final long closeTime) {
 final TimeWindow window = new TimeWindow(windowStart, windowStart 
+ windows.timeDifferenceMs());
-final ValueAndTimestamp valueAndTime = 
ValueAndTimestamp.make(initializer.apply(), record.timestamp());
-updateWindowAndForward(window, valueAndTime, record, closeTime);
+final ValueTimestampHeaders valueTimestampHeaders = 
ValueTimestampHeaders.make(initializer.apply(), record.timestamp(), 
record.headers());

Review Comment:
   `new RecordHeaders()` ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] KAFKA-20220: Enable TimestampedKVStoreWithHeaders in DSL (2/N) [kafka]

2026-03-04 Thread via GitHub


mjsax commented on code in PR #21580:
URL: https://github.com/apache/kafka/pull/21580#discussion_r2887431348


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java:
##
@@ -358,8 +359,8 @@ private void processEarly(final Record record, 
final long windowCloseT
 
 if (combinedWindow == null) {
 final TimeWindow window = new TimeWindow(0, 
windows.timeDifferenceMs());
-final ValueAndTimestamp valueAndTime = 
ValueAndTimestamp.make(initializer.apply(), record.timestamp());
-updateWindowAndForward(window, valueAndTime, record, 
windowCloseTime);
+final ValueTimestampHeaders valueTimestampHeaders = 
ValueTimestampHeaders.make(initializer.apply(), record.timestamp(), 
record.headers());

Review Comment:
   `new RecordHeaders()` ?



##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java:
##
@@ -386,14 +387,14 @@ private void createWindows(final Record record,
 
 // create left window for new record
 if (!leftWinAlreadyCreated) {
-final ValueAndTimestamp valueAndTime;
+final ValueTimestampHeaders valueTimestampHeaders;
 if (leftWindowNotEmpty(previousRecordTimestamp, 
record.timestamp())) {
-valueAndTime = ValueAndTimestamp.make(leftWinAgg.value(), 
record.timestamp());
+valueTimestampHeaders = 
ValueTimestampHeaders.make(leftWinAgg.value(), record.timestamp(), 
record.headers());

Review Comment:
   `new RecordHeaders()` ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] KAFKA-20220: Enable TimestampedKVStoreWithHeaders in DSL (2/N) [kafka]

2026-03-04 Thread via GitHub


mjsax commented on code in PR #21580:
URL: https://github.com/apache/kafka/pull/21580#discussion_r2887429744


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java:
##
@@ -129,7 +129,7 @@ public void process(final Record record) {
 newTimestamp = Math.max(record.timestamp(), 
oldAggAndTimestamp.timestamp());
 }
 
-final long putReturnCode = store.put(record.key(), newAgg, 
newTimestamp);
+final long putReturnCode = store.put(record.key(), newAgg, 
newTimestamp, record.headers());

Review Comment:
   `new RecordHeaders()` ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] KAFKA-20220: Enable TimestampedKVStoreWithHeaders in DSL (2/N) [kafka]

2026-03-04 Thread via GitHub


mjsax commented on code in PR #21580:
URL: https://github.com/apache/kafka/pull/21580#discussion_r2887428602


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java:
##
@@ -134,7 +134,7 @@ public void process(final Record record) {
 
 newAgg = aggregator.apply(record.key(), record.value(), oldAgg);
 
-final long putReturnCode = store.put(record.key(), newAgg, 
newTimestamp);
+final long putReturnCode = store.put(record.key(), newAgg, 
newTimestamp, record.headers());

Review Comment:
   I thought, for now, we wanted to use `new RecordHeaders()` when writing into 
the store?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] KAFKA-20220: Enable TimestampedKVStoreWithHeaders in DSL (2/N) [kafka]

2026-03-04 Thread via GitHub


mjsax commented on code in PR #21580:
URL: https://github.com/apache/kafka/pull/21580#discussion_r2887426008


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java:
##
@@ -94,7 +94,7 @@ public void init(final ProcessorContext> 
context) {
 tupleForwarder = new TimestampedTupleForwarder<>(
 store.store(),
 context,
-new TimestampedCacheFlushListener<>(context),
+new TimestampedCacheFlushListenerWithHeaders<>(context),

Review Comment:
   Is `TimestampedCacheFlushListener` still used after this update? I would 
believe not, and that `TimestampedCacheFlushListenerWithHeaders` just replaces 
it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] KAFKA-20220: Enable TimestampedKVStoreWithHeaders in DSL (2/N) [kafka]

2026-03-04 Thread via GitHub


zheguang commented on code in PR #21580:
URL: https://github.com/apache/kafka/pull/21580#discussion_r2886144098


##
streams/src/main/java/org/apache/kafka/streams/state/internals/PlainToHeadersWindowStoreAdapter.java:
##
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.ByteUtils;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.query.Position;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryConfig;
+import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.TimestampedWindowStoreWithHeaders;
+import org.apache.kafka.streams.state.WindowStore;
+import org.apache.kafka.streams.state.WindowStoreIterator;
+
+import java.nio.ByteBuffer;
+import java.time.Instant;
+import java.util.Map;
+
+import static 
org.apache.kafka.streams.state.HeadersBytesStore.convertFromPlainToHeaderFormat;
+
+/**
+ * Adapter for backward compatibility between {@link 
TimestampedWindowStoreWithHeaders}
+ * and {@link WindowStore}.
+ * 
+ * If a user provides a supplier for {@code WindowStore} (without timestamp 
and headers) when building
+ * a {@code TimestampedWindowStoreWithHeaders}, this adapter translates 
between the plain
+ * {@code byte[]} format and the timestamped-with-headers {@code byte[]} 
format.
+ * 
+ * Format conversion:
+ * 
+ *   Write: {@code [headers][timestamp][value]} → {@code [value]} (strip 
timestamp and headers)
+ *   Read: {@code [value]} → {@code [headers][timestamp][value]} (add -1 
as timestamp and empty headers)
+ * 
+ */
+public class PlainToHeadersWindowStoreAdapter implements WindowStore {

Review Comment:
   Interesting -- looks like this will allow skipping the intermediate upgrade 
to timestamped format.  Do we also need to change the 
`RocksDBTimestampedStoreWithHeaders` such that the `openRocksDB` and 
`openInUpgradeMode` also supports this upgrade path?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] KAFKA-20220: Enable TimestampedKVStoreWithHeaders in DSL (2/N) [kafka]

2026-03-04 Thread via GitHub


UladzislauBlok commented on code in PR #21580:
URL: https://github.com/apache/kafka/pull/21580#discussion_r2886100657


##
streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueStoreWrapper.java:
##
@@ -45,70 +53,80 @@ public class KeyValueStoreWrapper implements 
StateStore {
 public static final long PUT_RETURN_CODE_IS_LATEST
 = VersionedKeyValueStore.PUT_RETURN_CODE_VALID_TO_UNDEFINED;
 
-private TimestampedKeyValueStore timestampedStore = null;
+private TimestampedKeyValueStoreWithHeaders headersStore = null;
 private VersionedKeyValueStore versionedStore = null;

Review Comment:
   I saw it in the KIP, but you're right it's not implemented yet



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] KAFKA-20220: Enable TimestampedKVStoreWithHeaders in DSL (2/N) [kafka]

2026-03-04 Thread via GitHub


zheguang commented on code in PR #21580:
URL: https://github.com/apache/kafka/pull/21580#discussion_r2886077267


##
streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueStoreWrapper.java:
##
@@ -45,70 +53,80 @@ public class KeyValueStoreWrapper implements 
StateStore {
 public static final long PUT_RETURN_CODE_IS_LATEST
 = VersionedKeyValueStore.PUT_RETURN_CODE_VALID_TO_UNDEFINED;
 
-private TimestampedKeyValueStore timestampedStore = null;
+private TimestampedKeyValueStoreWithHeaders headersStore = null;
 private VersionedKeyValueStore versionedStore = null;

Review Comment:
   Looks like `VersionedKeyValueStoreWithHeaders` is not yet implemented/added. 
 Do we want to have a JIRA issue to keep track?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] KAFKA-20220: Enable TimestampedKVStoreWithHeaders in DSL (2/N) [kafka]

2026-03-04 Thread via GitHub


aliehsaeedii commented on code in PR #21580:
URL: https://github.com/apache/kafka/pull/21580#discussion_r2884294379


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java:
##
@@ -457,21 +458,22 @@ protected boolean shouldRangeFetch(final long 
emitRangeLowerBound, final long em
 }
 
 private void updateWindowAndForward(final Window window,
-final ValueAndTimestamp 
valueAndTime,
+final ValueTimestampHeaders 
valueTimestampHeaders,
 final Record record,
 final long windowCloseTime) {
 final long windowStart = window.start();
 final long windowEnd = window.end();
 
 if (windowEnd >= windowCloseTime) {
 // get aggregate from existing window
-final VAgg oldAgg = getValueOrNull(valueAndTime);
+final VAgg oldAgg = getValueOrNull(valueTimestampHeaders);
 final VAgg newAgg = aggregator.apply(record.key(), 
record.value(), oldAgg);
 
-final long newTimestamp = oldAgg == null ? record.timestamp() 
: Math.max(record.timestamp(), valueAndTime.timestamp());
+final long newTimestamp = oldAgg == null ? record.timestamp() 
: Math.max(record.timestamp(), valueTimestampHeaders.timestamp());
+final Headers headers  =  oldAgg == null ? record.headers() : 
valueTimestampHeaders.headers();
 windowStore.put(
 record.key(),
-ValueAndTimestamp.make(newAgg, newTimestamp),
+ValueTimestampHeaders.make(newAgg, newTimestamp, headers),

Review Comment:
   I assume this must be empty headers?!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] KAFKA-20220: Enable TimestampedKVStoreWithHeaders in DSL (2/N) [kafka]

2026-03-04 Thread via GitHub


aliehsaeedii commented on code in PR #21580:
URL: https://github.com/apache/kafka/pull/21580#discussion_r2884294379


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java:
##
@@ -457,21 +458,22 @@ protected boolean shouldRangeFetch(final long 
emitRangeLowerBound, final long em
 }
 
 private void updateWindowAndForward(final Window window,
-final ValueAndTimestamp 
valueAndTime,
+final ValueTimestampHeaders 
valueTimestampHeaders,
 final Record record,
 final long windowCloseTime) {
 final long windowStart = window.start();
 final long windowEnd = window.end();
 
 if (windowEnd >= windowCloseTime) {
 // get aggregate from existing window
-final VAgg oldAgg = getValueOrNull(valueAndTime);
+final VAgg oldAgg = getValueOrNull(valueTimestampHeaders);
 final VAgg newAgg = aggregator.apply(record.key(), 
record.value(), oldAgg);
 
-final long newTimestamp = oldAgg == null ? record.timestamp() 
: Math.max(record.timestamp(), valueAndTime.timestamp());
+final long newTimestamp = oldAgg == null ? record.timestamp() 
: Math.max(record.timestamp(), valueTimestampHeaders.timestamp());
+final Headers headers  =  oldAgg == null ? record.headers() : 
valueTimestampHeaders.headers();
 windowStore.put(
 record.key(),
-ValueAndTimestamp.make(newAgg, newTimestamp),
+ValueTimestampHeaders.make(newAgg, newTimestamp, headers),

Review Comment:
   I assume this must be empty headers?!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] KAFKA-20220: Enable TimestampedKVStoreWithHeaders in DSL (2/N) [kafka]

2026-03-02 Thread via GitHub


frankvicky commented on code in PR #21580:
URL: https://github.com/apache/kafka/pull/21580#discussion_r2873702738


##
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java:
##
@@ -217,38 +292,71 @@ public void testValueGetter() {
 final KTableValueGetter getter1 = 
getterSupplier1.get();
 
getter1.init(driver.setCurrentNodeForProcessorContext(table1.name));
 
-inputTopic1.pipeInput("A", "01", 10L);
-inputTopic1.pipeInput("B", "01", 20L);
-inputTopic1.pipeInput("C", "01", 15L);
-
-assertEquals(ValueAndTimestamp.make("01", 10L), getter1.get("A"));
-assertEquals(ValueAndTimestamp.make("01", 20L), getter1.get("B"));
-assertEquals(ValueAndTimestamp.make("01", 15L), getter1.get("C"));
-
-inputTopic1.pipeInput("A", "02", 30L);
-inputTopic1.pipeInput("B", "02", 5L);
-
-assertEquals(ValueAndTimestamp.make("02", 30L), getter1.get("A"));
-assertEquals(ValueAndTimestamp.make("02", 5L), getter1.get("B"));
-assertEquals(ValueAndTimestamp.make("01", 15L), getter1.get("C"));
-
-inputTopic1.pipeInput("A", "03", 29L);
-
-assertEquals(ValueAndTimestamp.make("03", 29L), getter1.get("A"));
-assertEquals(ValueAndTimestamp.make("02", 5L), getter1.get("B"));
-assertEquals(ValueAndTimestamp.make("01", 15L), getter1.get("C"));
-
-inputTopic1.pipeInput("A", null, 50L);
-inputTopic1.pipeInput("B", null, 3L);
-
-assertNull(getter1.get("A"));
-assertNull(getter1.get("B"));
-assertEquals(ValueAndTimestamp.make("01", 15L), getter1.get("C"));
+// Send records with unique headers for each key
+final Headers headersA = makeHeaders("key", "A");
+final Headers headersB = makeHeaders("key", "B");
+final Headers headersC = makeHeaders("key", "C");
+
+inputTopic1.pipeInput(new TestRecord<>("A", "01", headersA, 10L));
+inputTopic1.pipeInput(new TestRecord<>("B", "01", headersB, 20L));
+inputTopic1.pipeInput(new TestRecord<>("C", "01", headersC, 15L));
+
+if (storeFormat.equals("defaults")) {

Review Comment:
   We should introduce a const for `defaults` and `headers` since these two 
value are using everywhere.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] KAFKA-20220: Enable TimestampedKVStoreWithHeaders in DSL (2/N) [kafka]

2026-03-01 Thread via GitHub


UladzislauBlok commented on code in PR #21580:
URL: https://github.com/apache/kafka/pull/21580#discussion_r2869315709


##
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java:
##
@@ -270,38 +378,77 @@ public void testNotSendingOldValue() {
 );
 final MockApiProcessor proc1 = 
supplier.theCapturedProcessor();
 
-inputTopic1.pipeInput("A", "01", 10L);
-inputTopic1.pipeInput("B", "01", 20L);
-inputTopic1.pipeInput("C", "01", 15L);
-proc1.checkAndClearProcessResult(
-new KeyValueTimestamp<>("A", new Change<>("01", null), 10),
-new KeyValueTimestamp<>("B", new Change<>("01", null), 20),
-new KeyValueTimestamp<>("C", new Change<>("01", null), 15)
-);
+final Headers headers = makeHeaders("test", "header");
+inputTopic1.pipeInput(new TestRecord<>("A", "01", headers, 10L));
+inputTopic1.pipeInput(new TestRecord<>("B", "01", headers, 20L));
+inputTopic1.pipeInput(new TestRecord<>("C", "01", headers, 15L));
 
-inputTopic1.pipeInput("A", "02", 8L);
-inputTopic1.pipeInput("B", "02", 22L);
-proc1.checkAndClearProcessResult(
-new KeyValueTimestamp<>("A", new Change<>("02", null), 8),
-new KeyValueTimestamp<>("B", new Change<>("02", null), 22)
-);
+if (storeFormat.equals("default")) {
+proc1.checkAndClearProcessResult(
+new KeyValueTimestamp<>("A", new Change<>("01", null), 10),
+new KeyValueTimestamp<>("B", new Change<>("01", null), 20),
+new KeyValueTimestamp<>("C", new Change<>("01", null), 15)
+);
+} else if (storeFormat.equals("headers")) {

Review Comment:
   We've reused `default` and `headers` many times. Can we use some constant? 
e.g., psf field or enum



##
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java:
##
@@ -217,38 +292,71 @@ public void testValueGetter() {
 final KTableValueGetter getter1 = 
getterSupplier1.get();
 
getter1.init(driver.setCurrentNodeForProcessorContext(table1.name));
 
-inputTopic1.pipeInput("A", "01", 10L);
-inputTopic1.pipeInput("B", "01", 20L);
-inputTopic1.pipeInput("C", "01", 15L);
-
-assertEquals(ValueAndTimestamp.make("01", 10L), getter1.get("A"));
-assertEquals(ValueAndTimestamp.make("01", 20L), getter1.get("B"));
-assertEquals(ValueAndTimestamp.make("01", 15L), getter1.get("C"));
-
-inputTopic1.pipeInput("A", "02", 30L);
-inputTopic1.pipeInput("B", "02", 5L);
-
-assertEquals(ValueAndTimestamp.make("02", 30L), getter1.get("A"));
-assertEquals(ValueAndTimestamp.make("02", 5L), getter1.get("B"));
-assertEquals(ValueAndTimestamp.make("01", 15L), getter1.get("C"));
-
-inputTopic1.pipeInput("A", "03", 29L);
-
-assertEquals(ValueAndTimestamp.make("03", 29L), getter1.get("A"));
-assertEquals(ValueAndTimestamp.make("02", 5L), getter1.get("B"));
-assertEquals(ValueAndTimestamp.make("01", 15L), getter1.get("C"));
-
-inputTopic1.pipeInput("A", null, 50L);
-inputTopic1.pipeInput("B", null, 3L);
-
-assertNull(getter1.get("A"));
-assertNull(getter1.get("B"));
-assertEquals(ValueAndTimestamp.make("01", 15L), getter1.get("C"));
+// Send records with unique headers for each key
+final Headers headersA = makeHeaders("key", "A");
+final Headers headersB = makeHeaders("key", "B");
+final Headers headersC = makeHeaders("key", "C");
+
+inputTopic1.pipeInput(new TestRecord<>("A", "01", headersA, 10L));
+inputTopic1.pipeInput(new TestRecord<>("B", "01", headersB, 20L));
+inputTopic1.pipeInput(new TestRecord<>("C", "01", headersC, 15L));
+
+if (storeFormat.equals("defaults")) {
+assertFalse(getter1.supportsHeaders(), "Getter should not 
support headers with 'default' format");
+assertEquals(ValueAndTimestamp.make("01", 10L), 
getter1.get("A"));
+assertEquals(ValueAndTimestamp.make("01", 20L), 
getter1.get("B"));
+assertEquals(ValueAndTimestamp.make("01", 15L), 
getter1.get("C"));
+} else if (storeFormat.equals("headers")) {
+assertTrue(getter1.supportsHeaders(), "Getter should support 
headers with 'headers' format");
+assertEquals(ValueTimestampHeaders.make("01", 10L, headersA), 
getter1.getWithHeaders("A"));
+assertEquals(ValueTimestampHeaders.make("01", 20L, headersB), 
getter1.getWithHeaders("B"));
+assertEquals(ValueTimestampHeader