Re: [PR] KAFKA-20220: Enable TimestampedKVStoreWithHeaders in DSL (2/N) [kafka]
aliehsaeedii commented on code in PR #21580:
URL: https://github.com/apache/kafka/pull/21580#discussion_r2888533746
##
streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreWithHeadersBuilder.java:
##
@@ -72,7 +72,11 @@ public TimestampedWindowStoreWithHeaders build() {
if (!(store instanceof HeadersBytesStore)) {
if (store.persistent()) {
-store = new TimestampedToHeadersWindowStoreAdapter(store);
+if (store instanceof TimestampedBytesStore) {
+store = new TimestampedToHeadersWindowStoreAdapter(store);
+} else {
+store = new PlainToHeadersWindowStoreAdapter(store);
Review Comment:
For both cases we need. Please check `StreamJoinedStoreFactory`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] KAFKA-20220: Enable TimestampedKVStoreWithHeaders in DSL (2/N) [kafka]
mjsax commented on code in PR #21580:
URL: https://github.com/apache/kafka/pull/21580#discussion_r2887493506
##
streams/src/main/java/org/apache/kafka/streams/state/BuiltInDslStoreSuppliers.java:
##
@@ -63,7 +63,7 @@ public WindowBytesStoreSupplier windowStore(final
DslWindowParams params) {
}
if (params.isTimestamped()) {
-return Stores.persistentTimestampedWindowStore(
+return Stores.persistentTimestampedWindowStoreWithHeaders(
Review Comment:
Why is it correct to return a headers-supplier if `params` request a
timestamped one? Don't we need to have a `switch(...)` here, similar to
`keyValueStore` from above?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] KAFKA-20220: Enable TimestampedKVStoreWithHeaders in DSL (2/N) [kafka]
mjsax commented on code in PR #21580: URL: https://github.com/apache/kafka/pull/21580#discussion_r2887490946 ## streams/src/main/java/org/apache/kafka/streams/state/BuiltInDslStoreSuppliers.java: ## Review Comment: Don't we need to have a "headers version" for this one, too? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] KAFKA-20220: Enable TimestampedKVStoreWithHeaders in DSL (2/N) [kafka]
mjsax commented on code in PR #21580:
URL: https://github.com/apache/kafka/pull/21580#discussion_r2887483817
##
streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreWithHeadersBuilder.java:
##
@@ -72,7 +72,11 @@ public TimestampedWindowStoreWithHeaders build() {
if (!(store instanceof HeadersBytesStore)) {
if (store.persistent()) {
-store = new TimestampedToHeadersWindowStoreAdapter(store);
+if (store instanceof TimestampedBytesStore) {
+store = new TimestampedToHeadersWindowStoreAdapter(store);
+} else {
+store = new PlainToHeadersWindowStoreAdapter(store);
Review Comment:
Same question: do we actually need this for the window-store case? I though
we only need it for kv-store case?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] KAFKA-20220: Enable TimestampedKVStoreWithHeaders in DSL (2/N) [kafka]
mjsax commented on code in PR #21580:
URL: https://github.com/apache/kafka/pull/21580#discussion_r2887480432
##
streams/src/main/java/org/apache/kafka/streams/state/internals/PlainToHeadersWindowStoreAdapter.java:
##
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.ByteUtils;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.query.Position;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryConfig;
+import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.TimestampedWindowStoreWithHeaders;
+import org.apache.kafka.streams.state.WindowStore;
+import org.apache.kafka.streams.state.WindowStoreIterator;
+
+import java.nio.ByteBuffer;
+import java.time.Instant;
+import java.util.Map;
+
+import static
org.apache.kafka.streams.state.HeadersBytesStore.convertFromPlainToHeaderFormat;
+
+/**
+ * Adapter for backward compatibility between {@link
TimestampedWindowStoreWithHeaders}
+ * and {@link WindowStore}.
+ *
+ * If a user provides a supplier for {@code WindowStore} (without timestamp
and headers) when building
+ * a {@code TimestampedWindowStoreWithHeaders}, this adapter translates
between the plain
+ * {@code byte[]} format and the timestamped-with-headers {@code byte[]}
format.
+ *
+ * Format conversion:
+ *
+ * Write: {@code [headers][timestamp][value]} → {@code [value]} (strip
timestamp and headers)
+ * Read: {@code [value]} → {@code [headers][timestamp][value]} (add -1
as timestamp and empty headers)
+ *
+ */
+public class PlainToHeadersWindowStoreAdapter implements WindowStore {
Review Comment:
Alieh, can you clarify? I thought, we would need to add upgrading from
plain-kv to header-kv store only? But not for window-store case?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] KAFKA-20220: Enable TimestampedKVStoreWithHeaders in DSL (2/N) [kafka]
mjsax commented on code in PR #21580:
URL: https://github.com/apache/kafka/pull/21580#discussion_r2887467634
##
streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueStoreWrapper.java:
##
@@ -45,70 +48,70 @@ public class KeyValueStoreWrapper implements
StateStore {
public static final long PUT_RETURN_CODE_IS_LATEST
= VersionedKeyValueStore.PUT_RETURN_CODE_VALID_TO_UNDEFINED;
-private TimestampedKeyValueStore timestampedStore = null;
+private TimestampedKeyValueStoreWithHeaders headersStore = null;
private VersionedKeyValueStore versionedStore = null;
-// same as either timestampedStore or versionedStore above. kept merely as
a convenience
-// to simplify implementation for methods which do not depend on store
type.
private StateStore store;
public KeyValueStoreWrapper(final ProcessorContext context, final
String storeName) {
+// Try headers-aware store first, then versioned store
try {
-// first try timestamped store
-timestampedStore = context.getStateStore(storeName);
-store = timestampedStore;
+// first try headers-aware timestamped store
+headersStore = context.getStateStore(storeName);
+store = headersStore;
return;
} catch (final ClassCastException e) {
-// ignore since could be versioned store instead
+// ignore since could be regular timestamped or versioned store
instead
+System.out.println(e.getMessage());
Review Comment:
Guess it's on left over debug statement that needs to get removed?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] KAFKA-20220: Enable TimestampedKVStoreWithHeaders in DSL (2/N) [kafka]
mjsax commented on code in PR #21580:
URL: https://github.com/apache/kafka/pull/21580#discussion_r2887463579
##
streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueChangeBuffer.java:
##
@@ -447,7 +447,7 @@ public Maybe>
priorValueForBuffered(final K key) {
// it's unfortunately not possible to know this, unless we
materialize the suppressed result, since our only
// knowledge of the prior value is what the upstream processor
sends us as the "old value" when we first
// buffer something.
-return Maybe.defined(ValueAndTimestamp.make(deserializedValue,
RecordQueue.UNKNOWN));
+return Maybe.defined(ValueTimestampHeaders.make(deserializedValue,
RecordQueue.UNKNOWN, new RecordHeaders()));
Review Comment:
In other places we pass `null` as header as default (what is fine, because
it's getting replace with `new RecordHeaders()` inside `ValueTimestampHeaders`
constructor). But I think we should be consistent, so maybe pass in `null` here
too?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] KAFKA-20220: Enable TimestampedKVStoreWithHeaders in DSL (2/N) [kafka]
mjsax commented on code in PR #21580:
URL: https://github.com/apache/kafka/pull/21580#discussion_r2887447794
##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java:
##
@@ -166,7 +166,8 @@ public void process(final Record record) {
} else {
oldValue = null;
}
-final long putReturnCode = store.put(record.key(),
record.value(), record.timestamp());
+final long putReturnCode;
+putReturnCode = store.put(record.key(), record.value(),
record.timestamp(), record.headers());
Review Comment:
`new RecordHeaders()` ?
##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableTransformValues.java:
##
@@ -120,7 +120,7 @@ public void process(final Record> record) {
if (queryableName != null) {
final VOut oldValue = sendOldValues ?
getValueOrNull(store.get(record.key())) : null;
-final long putReturnCode = store.put(record.key(), newValue,
record.timestamp());
+final long putReturnCode = store.put(record.key(), newValue,
record.timestamp(), record.headers());
Review Comment:
`new RecordHeaders()` ?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] KAFKA-20220: Enable TimestampedKVStoreWithHeaders in DSL (2/N) [kafka]
mjsax commented on code in PR #21580:
URL: https://github.com/apache/kafka/pull/21580#discussion_r2887446759
##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableReduce.java:
##
@@ -125,7 +125,7 @@ public void process(final Record> record) {
}
// update the store with the new value
-final long putReturnCode = store.put(record.key(), newAgg,
newTimestamp);
+final long putReturnCode = store.put(record.key(), newAgg,
newTimestamp, record.headers());
Review Comment:
`new RecordHeaders()` ?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] KAFKA-20220: Enable TimestampedKVStoreWithHeaders in DSL (2/N) [kafka]
mjsax commented on code in PR #21580:
URL: https://github.com/apache/kafka/pull/21580#discussion_r2887445584
##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java:
##
@@ -109,16 +110,18 @@ private VOut computeValue(final KIn key, final VIn value)
{
return newValue;
}
-private ValueAndTimestamp computeValueAndTimestamp(final KIn key,
final ValueAndTimestamp valueAndTimestamp) {
+private ValueTimestampHeaders computeValueAndTimestamp(final KIn
key, final ValueTimestampHeaders valueTimestampHeaders) {
VOut newValue = null;
long timestamp = 0;
+Headers headers = null;
-if (valueAndTimestamp != null) {
-newValue = mapper.apply(key, valueAndTimestamp.value());
-timestamp = valueAndTimestamp.timestamp();
+if (valueTimestampHeaders != null) {
+newValue = mapper.apply(key, valueTimestampHeaders.value());
+timestamp = valueTimestampHeaders.timestamp();
+headers = valueTimestampHeaders.headers();
}
-return ValueAndTimestamp.make(newValue, timestamp);
+return ValueTimestampHeaders.make(newValue, timestamp, headers);
Review Comment:
I think passing `headers` here is fine, as it's stateless map() operation.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] KAFKA-20220: Enable TimestampedKVStoreWithHeaders in DSL (2/N) [kafka]
mjsax commented on code in PR #21580:
URL: https://github.com/apache/kafka/pull/21580#discussion_r2887442676
##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java:
##
@@ -146,7 +149,7 @@ public void process(final Record> record) {
final VOut oldValue = computeOldValue(record.key(),
record.value());
if (queryableName != null) {
-final long putReturnCode = store.put(record.key(), newValue,
record.timestamp());
+final long putReturnCode = store.put(record.key(), newValue,
record.timestamp(), record.headers());
Review Comment:
`new RecordHeaders()` ?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] KAFKA-20220: Enable TimestampedKVStoreWithHeaders in DSL (2/N) [kafka]
mjsax commented on code in PR #21580:
URL: https://github.com/apache/kafka/pull/21580#discussion_r2887436202
##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinMerger.java:
##
@@ -125,15 +125,15 @@ public void init(final ProcessorContext>
context) {
tupleForwarder = new TimestampedTupleForwarder<>(
store.store(),
context,
-new TimestampedCacheFlushListener<>(context),
+new TimestampedCacheFlushListenerWithHeaders<>(context),
sendOldValues);
}
}
@Override
public void process(final Record> record) {
if (queryableName != null) {
-final long putReturnCode = store.put(record.key(),
record.value().newValue, record.timestamp());
+final long putReturnCode = store.put(record.key(),
record.value().newValue, record.timestamp(), record.headers());
Review Comment:
`new RecordHeaders()` ?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] KAFKA-20220: Enable TimestampedKVStoreWithHeaders in DSL (2/N) [kafka]
mjsax commented on code in PR #21580:
URL: https://github.com/apache/kafka/pull/21580#discussion_r2887434921
##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java:
##
@@ -147,7 +147,7 @@ public void process(final Record> record) {
}
if (queryableName != null) {
-final long putReturnCode = store.put(key, newValue,
record.timestamp());
+final long putReturnCode = store.put(key, newValue,
record.timestamp(), record.headers());
Review Comment:
`new RecordHeaders()` ?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] KAFKA-20220: Enable TimestampedKVStoreWithHeaders in DSL (2/N) [kafka]
mjsax commented on code in PR #21580:
URL: https://github.com/apache/kafka/pull/21580#discussion_r2887434124
##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java:
##
@@ -143,13 +143,13 @@ public void process(final Record record) {
oldAgg = initializer.apply();
newTimestamp = record.timestamp();
} else {
-newTimestamp = Math.max(record.timestamp(),
oldAggAndTimestamp.timestamp());
+newTimestamp = Math.max(record.timestamp(),
oldAggTimestampHeaders.timestamp());
}
newAgg = aggregator.apply(record.key(), record.value(),
oldAgg);
// update the store with the new value
-windowStore.put(record.key(),
ValueAndTimestamp.make(newAgg, newTimestamp), windowStart);
+windowStore.put(record.key(),
ValueTimestampHeaders.make(newAgg, newTimestamp, record.headers()),
windowStart);
Review Comment:
`new RecordHeaders()` ?
##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java:
##
@@ -133,7 +133,7 @@ public void process(final Record> record) {
}
// update the store with the new value
-final long putReturnCode = store.put(record.key(), newAgg,
newTimestamp);
+final long putReturnCode = store.put(record.key(), newAgg,
newTimestamp, record.headers());
Review Comment:
`new RecordHeaders()` ?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] KAFKA-20220: Enable TimestampedKVStoreWithHeaders in DSL (2/N) [kafka]
mjsax commented on code in PR #21580:
URL: https://github.com/apache/kafka/pull/21580#discussion_r2887433056
##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java:
##
@@ -457,21 +458,22 @@ protected boolean shouldRangeFetch(final long
emitRangeLowerBound, final long em
}
private void updateWindowAndForward(final Window window,
-final ValueAndTimestamp
valueAndTime,
+final ValueTimestampHeaders
valueTimestampHeaders,
final Record record,
final long windowCloseTime) {
final long windowStart = window.start();
final long windowEnd = window.end();
if (windowEnd >= windowCloseTime) {
// get aggregate from existing window
-final VAgg oldAgg = getValueOrNull(valueAndTime);
+final VAgg oldAgg = getValueOrNull(valueTimestampHeaders);
final VAgg newAgg = aggregator.apply(record.key(),
record.value(), oldAgg);
-final long newTimestamp = oldAgg == null ? record.timestamp()
: Math.max(record.timestamp(), valueAndTime.timestamp());
+final long newTimestamp = oldAgg == null ? record.timestamp()
: Math.max(record.timestamp(), valueTimestampHeaders.timestamp());
+final Headers headers = oldAgg == null ? record.headers() :
valueTimestampHeaders.headers();
windowStore.put(
record.key(),
-ValueAndTimestamp.make(newAgg, newTimestamp),
+ValueTimestampHeaders.make(newAgg, newTimestamp, headers),
Review Comment:
`new RecordHeaders()` ?
##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java:
##
@@ -457,21 +458,22 @@ protected boolean shouldRangeFetch(final long
emitRangeLowerBound, final long em
}
private void updateWindowAndForward(final Window window,
-final ValueAndTimestamp
valueAndTime,
+final ValueTimestampHeaders
valueTimestampHeaders,
final Record record,
final long windowCloseTime) {
final long windowStart = window.start();
final long windowEnd = window.end();
if (windowEnd >= windowCloseTime) {
// get aggregate from existing window
-final VAgg oldAgg = getValueOrNull(valueAndTime);
+final VAgg oldAgg = getValueOrNull(valueTimestampHeaders);
final VAgg newAgg = aggregator.apply(record.key(),
record.value(), oldAgg);
-final long newTimestamp = oldAgg == null ? record.timestamp()
: Math.max(record.timestamp(), valueAndTime.timestamp());
+final long newTimestamp = oldAgg == null ? record.timestamp()
: Math.max(record.timestamp(), valueTimestampHeaders.timestamp());
+final Headers headers = oldAgg == null ? record.headers() :
valueTimestampHeaders.headers();
Review Comment:
I think we don't need this?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] KAFKA-20220: Enable TimestampedKVStoreWithHeaders in DSL (2/N) [kafka]
mjsax commented on code in PR #21580:
URL: https://github.com/apache/kafka/pull/21580#discussion_r2887431893
##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java:
##
@@ -386,14 +387,14 @@ private void createWindows(final Record record,
// create left window for new record
if (!leftWinAlreadyCreated) {
-final ValueAndTimestamp valueAndTime;
+final ValueTimestampHeaders valueTimestampHeaders;
if (leftWindowNotEmpty(previousRecordTimestamp,
record.timestamp())) {
-valueAndTime = ValueAndTimestamp.make(leftWinAgg.value(),
record.timestamp());
+valueTimestampHeaders =
ValueTimestampHeaders.make(leftWinAgg.value(), record.timestamp(),
record.headers());
} else {
-valueAndTime = ValueAndTimestamp.make(initializer.apply(),
record.timestamp());
+valueTimestampHeaders =
ValueTimestampHeaders.make(initializer.apply(), record.timestamp(),
record.headers());
Review Comment:
`new RecordHeaders()` ?
##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java:
##
@@ -417,8 +418,8 @@ private void createPreviousRecordRightWindow(final long
windowStart,
final Record
record,
final long closeTime) {
final TimeWindow window = new TimeWindow(windowStart, windowStart
+ windows.timeDifferenceMs());
-final ValueAndTimestamp valueAndTime =
ValueAndTimestamp.make(initializer.apply(), record.timestamp());
-updateWindowAndForward(window, valueAndTime, record, closeTime);
+final ValueTimestampHeaders valueTimestampHeaders =
ValueTimestampHeaders.make(initializer.apply(), record.timestamp(),
record.headers());
Review Comment:
`new RecordHeaders()` ?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] KAFKA-20220: Enable TimestampedKVStoreWithHeaders in DSL (2/N) [kafka]
mjsax commented on code in PR #21580:
URL: https://github.com/apache/kafka/pull/21580#discussion_r2887431348
##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java:
##
@@ -358,8 +359,8 @@ private void processEarly(final Record record,
final long windowCloseT
if (combinedWindow == null) {
final TimeWindow window = new TimeWindow(0,
windows.timeDifferenceMs());
-final ValueAndTimestamp valueAndTime =
ValueAndTimestamp.make(initializer.apply(), record.timestamp());
-updateWindowAndForward(window, valueAndTime, record,
windowCloseTime);
+final ValueTimestampHeaders valueTimestampHeaders =
ValueTimestampHeaders.make(initializer.apply(), record.timestamp(),
record.headers());
Review Comment:
`new RecordHeaders()` ?
##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java:
##
@@ -386,14 +387,14 @@ private void createWindows(final Record record,
// create left window for new record
if (!leftWinAlreadyCreated) {
-final ValueAndTimestamp valueAndTime;
+final ValueTimestampHeaders valueTimestampHeaders;
if (leftWindowNotEmpty(previousRecordTimestamp,
record.timestamp())) {
-valueAndTime = ValueAndTimestamp.make(leftWinAgg.value(),
record.timestamp());
+valueTimestampHeaders =
ValueTimestampHeaders.make(leftWinAgg.value(), record.timestamp(),
record.headers());
Review Comment:
`new RecordHeaders()` ?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] KAFKA-20220: Enable TimestampedKVStoreWithHeaders in DSL (2/N) [kafka]
mjsax commented on code in PR #21580:
URL: https://github.com/apache/kafka/pull/21580#discussion_r2887429744
##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java:
##
@@ -129,7 +129,7 @@ public void process(final Record record) {
newTimestamp = Math.max(record.timestamp(),
oldAggAndTimestamp.timestamp());
}
-final long putReturnCode = store.put(record.key(), newAgg,
newTimestamp);
+final long putReturnCode = store.put(record.key(), newAgg,
newTimestamp, record.headers());
Review Comment:
`new RecordHeaders()` ?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] KAFKA-20220: Enable TimestampedKVStoreWithHeaders in DSL (2/N) [kafka]
mjsax commented on code in PR #21580:
URL: https://github.com/apache/kafka/pull/21580#discussion_r2887428602
##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java:
##
@@ -134,7 +134,7 @@ public void process(final Record record) {
newAgg = aggregator.apply(record.key(), record.value(), oldAgg);
-final long putReturnCode = store.put(record.key(), newAgg,
newTimestamp);
+final long putReturnCode = store.put(record.key(), newAgg,
newTimestamp, record.headers());
Review Comment:
I thought, for now, we wanted to use `new RecordHeaders()` when writing into
the store?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] KAFKA-20220: Enable TimestampedKVStoreWithHeaders in DSL (2/N) [kafka]
mjsax commented on code in PR #21580:
URL: https://github.com/apache/kafka/pull/21580#discussion_r2887426008
##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java:
##
@@ -94,7 +94,7 @@ public void init(final ProcessorContext>
context) {
tupleForwarder = new TimestampedTupleForwarder<>(
store.store(),
context,
-new TimestampedCacheFlushListener<>(context),
+new TimestampedCacheFlushListenerWithHeaders<>(context),
Review Comment:
Is `TimestampedCacheFlushListener` still used after this update? I would
believe not, and that `TimestampedCacheFlushListenerWithHeaders` just replaces
it.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] KAFKA-20220: Enable TimestampedKVStoreWithHeaders in DSL (2/N) [kafka]
zheguang commented on code in PR #21580:
URL: https://github.com/apache/kafka/pull/21580#discussion_r2886144098
##
streams/src/main/java/org/apache/kafka/streams/state/internals/PlainToHeadersWindowStoreAdapter.java:
##
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.ByteUtils;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.query.Position;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryConfig;
+import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.TimestampedWindowStoreWithHeaders;
+import org.apache.kafka.streams.state.WindowStore;
+import org.apache.kafka.streams.state.WindowStoreIterator;
+
+import java.nio.ByteBuffer;
+import java.time.Instant;
+import java.util.Map;
+
+import static
org.apache.kafka.streams.state.HeadersBytesStore.convertFromPlainToHeaderFormat;
+
+/**
+ * Adapter for backward compatibility between {@link
TimestampedWindowStoreWithHeaders}
+ * and {@link WindowStore}.
+ *
+ * If a user provides a supplier for {@code WindowStore} (without timestamp
and headers) when building
+ * a {@code TimestampedWindowStoreWithHeaders}, this adapter translates
between the plain
+ * {@code byte[]} format and the timestamped-with-headers {@code byte[]}
format.
+ *
+ * Format conversion:
+ *
+ * Write: {@code [headers][timestamp][value]} → {@code [value]} (strip
timestamp and headers)
+ * Read: {@code [value]} → {@code [headers][timestamp][value]} (add -1
as timestamp and empty headers)
+ *
+ */
+public class PlainToHeadersWindowStoreAdapter implements WindowStore {
Review Comment:
Interesting -- looks like this will allow skipping the intermediate upgrade
to timestamped format. Do we also need to change the
`RocksDBTimestampedStoreWithHeaders` such that the `openRocksDB` and
`openInUpgradeMode` also supports this upgrade path?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] KAFKA-20220: Enable TimestampedKVStoreWithHeaders in DSL (2/N) [kafka]
UladzislauBlok commented on code in PR #21580:
URL: https://github.com/apache/kafka/pull/21580#discussion_r2886100657
##
streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueStoreWrapper.java:
##
@@ -45,70 +53,80 @@ public class KeyValueStoreWrapper implements
StateStore {
public static final long PUT_RETURN_CODE_IS_LATEST
= VersionedKeyValueStore.PUT_RETURN_CODE_VALID_TO_UNDEFINED;
-private TimestampedKeyValueStore timestampedStore = null;
+private TimestampedKeyValueStoreWithHeaders headersStore = null;
private VersionedKeyValueStore versionedStore = null;
Review Comment:
I saw it in the KIP, but you're right it's not implemented yet
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] KAFKA-20220: Enable TimestampedKVStoreWithHeaders in DSL (2/N) [kafka]
zheguang commented on code in PR #21580:
URL: https://github.com/apache/kafka/pull/21580#discussion_r2886077267
##
streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueStoreWrapper.java:
##
@@ -45,70 +53,80 @@ public class KeyValueStoreWrapper implements
StateStore {
public static final long PUT_RETURN_CODE_IS_LATEST
= VersionedKeyValueStore.PUT_RETURN_CODE_VALID_TO_UNDEFINED;
-private TimestampedKeyValueStore timestampedStore = null;
+private TimestampedKeyValueStoreWithHeaders headersStore = null;
private VersionedKeyValueStore versionedStore = null;
Review Comment:
Looks like `VersionedKeyValueStoreWithHeaders` is not yet implemented/added.
Do we want to have a JIRA issue to keep track?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] KAFKA-20220: Enable TimestampedKVStoreWithHeaders in DSL (2/N) [kafka]
aliehsaeedii commented on code in PR #21580:
URL: https://github.com/apache/kafka/pull/21580#discussion_r2884294379
##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java:
##
@@ -457,21 +458,22 @@ protected boolean shouldRangeFetch(final long
emitRangeLowerBound, final long em
}
private void updateWindowAndForward(final Window window,
-final ValueAndTimestamp
valueAndTime,
+final ValueTimestampHeaders
valueTimestampHeaders,
final Record record,
final long windowCloseTime) {
final long windowStart = window.start();
final long windowEnd = window.end();
if (windowEnd >= windowCloseTime) {
// get aggregate from existing window
-final VAgg oldAgg = getValueOrNull(valueAndTime);
+final VAgg oldAgg = getValueOrNull(valueTimestampHeaders);
final VAgg newAgg = aggregator.apply(record.key(),
record.value(), oldAgg);
-final long newTimestamp = oldAgg == null ? record.timestamp()
: Math.max(record.timestamp(), valueAndTime.timestamp());
+final long newTimestamp = oldAgg == null ? record.timestamp()
: Math.max(record.timestamp(), valueTimestampHeaders.timestamp());
+final Headers headers = oldAgg == null ? record.headers() :
valueTimestampHeaders.headers();
windowStore.put(
record.key(),
-ValueAndTimestamp.make(newAgg, newTimestamp),
+ValueTimestampHeaders.make(newAgg, newTimestamp, headers),
Review Comment:
I assume this must be empty headers?!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] KAFKA-20220: Enable TimestampedKVStoreWithHeaders in DSL (2/N) [kafka]
aliehsaeedii commented on code in PR #21580:
URL: https://github.com/apache/kafka/pull/21580#discussion_r2884294379
##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java:
##
@@ -457,21 +458,22 @@ protected boolean shouldRangeFetch(final long
emitRangeLowerBound, final long em
}
private void updateWindowAndForward(final Window window,
-final ValueAndTimestamp
valueAndTime,
+final ValueTimestampHeaders
valueTimestampHeaders,
final Record record,
final long windowCloseTime) {
final long windowStart = window.start();
final long windowEnd = window.end();
if (windowEnd >= windowCloseTime) {
// get aggregate from existing window
-final VAgg oldAgg = getValueOrNull(valueAndTime);
+final VAgg oldAgg = getValueOrNull(valueTimestampHeaders);
final VAgg newAgg = aggregator.apply(record.key(),
record.value(), oldAgg);
-final long newTimestamp = oldAgg == null ? record.timestamp()
: Math.max(record.timestamp(), valueAndTime.timestamp());
+final long newTimestamp = oldAgg == null ? record.timestamp()
: Math.max(record.timestamp(), valueTimestampHeaders.timestamp());
+final Headers headers = oldAgg == null ? record.headers() :
valueTimestampHeaders.headers();
windowStore.put(
record.key(),
-ValueAndTimestamp.make(newAgg, newTimestamp),
+ValueTimestampHeaders.make(newAgg, newTimestamp, headers),
Review Comment:
I assume this must be empty headers?!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] KAFKA-20220: Enable TimestampedKVStoreWithHeaders in DSL (2/N) [kafka]
frankvicky commented on code in PR #21580:
URL: https://github.com/apache/kafka/pull/21580#discussion_r2873702738
##
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java:
##
@@ -217,38 +292,71 @@ public void testValueGetter() {
final KTableValueGetter getter1 =
getterSupplier1.get();
getter1.init(driver.setCurrentNodeForProcessorContext(table1.name));
-inputTopic1.pipeInput("A", "01", 10L);
-inputTopic1.pipeInput("B", "01", 20L);
-inputTopic1.pipeInput("C", "01", 15L);
-
-assertEquals(ValueAndTimestamp.make("01", 10L), getter1.get("A"));
-assertEquals(ValueAndTimestamp.make("01", 20L), getter1.get("B"));
-assertEquals(ValueAndTimestamp.make("01", 15L), getter1.get("C"));
-
-inputTopic1.pipeInput("A", "02", 30L);
-inputTopic1.pipeInput("B", "02", 5L);
-
-assertEquals(ValueAndTimestamp.make("02", 30L), getter1.get("A"));
-assertEquals(ValueAndTimestamp.make("02", 5L), getter1.get("B"));
-assertEquals(ValueAndTimestamp.make("01", 15L), getter1.get("C"));
-
-inputTopic1.pipeInput("A", "03", 29L);
-
-assertEquals(ValueAndTimestamp.make("03", 29L), getter1.get("A"));
-assertEquals(ValueAndTimestamp.make("02", 5L), getter1.get("B"));
-assertEquals(ValueAndTimestamp.make("01", 15L), getter1.get("C"));
-
-inputTopic1.pipeInput("A", null, 50L);
-inputTopic1.pipeInput("B", null, 3L);
-
-assertNull(getter1.get("A"));
-assertNull(getter1.get("B"));
-assertEquals(ValueAndTimestamp.make("01", 15L), getter1.get("C"));
+// Send records with unique headers for each key
+final Headers headersA = makeHeaders("key", "A");
+final Headers headersB = makeHeaders("key", "B");
+final Headers headersC = makeHeaders("key", "C");
+
+inputTopic1.pipeInput(new TestRecord<>("A", "01", headersA, 10L));
+inputTopic1.pipeInput(new TestRecord<>("B", "01", headersB, 20L));
+inputTopic1.pipeInput(new TestRecord<>("C", "01", headersC, 15L));
+
+if (storeFormat.equals("defaults")) {
Review Comment:
We should introduce a const for `defaults` and `headers` since these two
value are using everywhere.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] KAFKA-20220: Enable TimestampedKVStoreWithHeaders in DSL (2/N) [kafka]
UladzislauBlok commented on code in PR #21580:
URL: https://github.com/apache/kafka/pull/21580#discussion_r2869315709
##
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java:
##
@@ -270,38 +378,77 @@ public void testNotSendingOldValue() {
);
final MockApiProcessor proc1 =
supplier.theCapturedProcessor();
-inputTopic1.pipeInput("A", "01", 10L);
-inputTopic1.pipeInput("B", "01", 20L);
-inputTopic1.pipeInput("C", "01", 15L);
-proc1.checkAndClearProcessResult(
-new KeyValueTimestamp<>("A", new Change<>("01", null), 10),
-new KeyValueTimestamp<>("B", new Change<>("01", null), 20),
-new KeyValueTimestamp<>("C", new Change<>("01", null), 15)
-);
+final Headers headers = makeHeaders("test", "header");
+inputTopic1.pipeInput(new TestRecord<>("A", "01", headers, 10L));
+inputTopic1.pipeInput(new TestRecord<>("B", "01", headers, 20L));
+inputTopic1.pipeInput(new TestRecord<>("C", "01", headers, 15L));
-inputTopic1.pipeInput("A", "02", 8L);
-inputTopic1.pipeInput("B", "02", 22L);
-proc1.checkAndClearProcessResult(
-new KeyValueTimestamp<>("A", new Change<>("02", null), 8),
-new KeyValueTimestamp<>("B", new Change<>("02", null), 22)
-);
+if (storeFormat.equals("default")) {
+proc1.checkAndClearProcessResult(
+new KeyValueTimestamp<>("A", new Change<>("01", null), 10),
+new KeyValueTimestamp<>("B", new Change<>("01", null), 20),
+new KeyValueTimestamp<>("C", new Change<>("01", null), 15)
+);
+} else if (storeFormat.equals("headers")) {
Review Comment:
We've reused `default` and `headers` many times. Can we use some constant?
e.g., psf field or enum
##
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java:
##
@@ -217,38 +292,71 @@ public void testValueGetter() {
final KTableValueGetter getter1 =
getterSupplier1.get();
getter1.init(driver.setCurrentNodeForProcessorContext(table1.name));
-inputTopic1.pipeInput("A", "01", 10L);
-inputTopic1.pipeInput("B", "01", 20L);
-inputTopic1.pipeInput("C", "01", 15L);
-
-assertEquals(ValueAndTimestamp.make("01", 10L), getter1.get("A"));
-assertEquals(ValueAndTimestamp.make("01", 20L), getter1.get("B"));
-assertEquals(ValueAndTimestamp.make("01", 15L), getter1.get("C"));
-
-inputTopic1.pipeInput("A", "02", 30L);
-inputTopic1.pipeInput("B", "02", 5L);
-
-assertEquals(ValueAndTimestamp.make("02", 30L), getter1.get("A"));
-assertEquals(ValueAndTimestamp.make("02", 5L), getter1.get("B"));
-assertEquals(ValueAndTimestamp.make("01", 15L), getter1.get("C"));
-
-inputTopic1.pipeInput("A", "03", 29L);
-
-assertEquals(ValueAndTimestamp.make("03", 29L), getter1.get("A"));
-assertEquals(ValueAndTimestamp.make("02", 5L), getter1.get("B"));
-assertEquals(ValueAndTimestamp.make("01", 15L), getter1.get("C"));
-
-inputTopic1.pipeInput("A", null, 50L);
-inputTopic1.pipeInput("B", null, 3L);
-
-assertNull(getter1.get("A"));
-assertNull(getter1.get("B"));
-assertEquals(ValueAndTimestamp.make("01", 15L), getter1.get("C"));
+// Send records with unique headers for each key
+final Headers headersA = makeHeaders("key", "A");
+final Headers headersB = makeHeaders("key", "B");
+final Headers headersC = makeHeaders("key", "C");
+
+inputTopic1.pipeInput(new TestRecord<>("A", "01", headersA, 10L));
+inputTopic1.pipeInput(new TestRecord<>("B", "01", headersB, 20L));
+inputTopic1.pipeInput(new TestRecord<>("C", "01", headersC, 15L));
+
+if (storeFormat.equals("defaults")) {
+assertFalse(getter1.supportsHeaders(), "Getter should not
support headers with 'default' format");
+assertEquals(ValueAndTimestamp.make("01", 10L),
getter1.get("A"));
+assertEquals(ValueAndTimestamp.make("01", 20L),
getter1.get("B"));
+assertEquals(ValueAndTimestamp.make("01", 15L),
getter1.get("C"));
+} else if (storeFormat.equals("headers")) {
+assertTrue(getter1.supportsHeaders(), "Getter should support
headers with 'headers' format");
+assertEquals(ValueTimestampHeaders.make("01", 10L, headersA),
getter1.getWithHeaders("A"));
+assertEquals(ValueTimestampHeaders.make("01", 20L, headersB),
getter1.getWithHeaders("B"));
+assertEquals(ValueTimestampHeader
