mjsax commented on code in PR #21580:
URL: https://github.com/apache/kafka/pull/21580#discussion_r2943207276
##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java:
##########
@@ -109,16 +111,18 @@ private VOut computeValue(final KIn key, final VIn value)
{
return newValue;
}
- private ValueAndTimestamp<VOut> computeValueAndTimestamp(final KIn key,
final ValueAndTimestamp<VIn> valueAndTimestamp) {
+ private ValueTimestampHeaders<VOut> computeValueAndTimestamp(final KIn
key, final ValueTimestampHeaders<VIn> valueTimestampHeaders) {
VOut newValue = null;
long timestamp = 0;
+ Headers headers = null;
- if (valueAndTimestamp != null) {
- newValue = mapper.apply(key, valueAndTimestamp.value());
- timestamp = valueAndTimestamp.timestamp();
+ if (valueTimestampHeaders != null) {
+ newValue = mapper.apply(key, valueTimestampHeaders.value());
+ timestamp = valueTimestampHeaders.timestamp();
+ headers = valueTimestampHeaders.headers();
}
Review Comment:
If `valueTimestampHeaders == null`, should we set `headers =
context.headers()` ?
##########
streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueStoreWrapper.java:
##########
@@ -45,70 +49,72 @@ public class KeyValueStoreWrapper<K, V> implements
StateStore {
public static final long PUT_RETURN_CODE_IS_LATEST
= VersionedKeyValueStore.PUT_RETURN_CODE_VALID_TO_UNDEFINED;
- private TimestampedKeyValueStore<K, V> timestampedStore = null;
+ private TimestampedKeyValueStoreWithHeaders<K, V> headersStore = null;
private VersionedKeyValueStore<K, V> versionedStore = null;
// same as either timestampedStore or versionedStore above. kept merely as
a convenience
// to simplify implementation for methods which do not depend on store
type.
private StateStore store;
+ @SuppressWarnings("unchecked")
public KeyValueStoreWrapper(final ProcessorContext<?, ?> context, final
String storeName) {
+ final StateStore rawStore = context.getStateStore(storeName);
Review Comment:
Why do we introduce `rawStore` ? The leads to the new to add
`@SuppressWarnings("unchecked")` and cast below.
##########
streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueStoreWrapper.java:
##########
@@ -45,70 +49,72 @@ public class KeyValueStoreWrapper<K, V> implements
StateStore {
public static final long PUT_RETURN_CODE_IS_LATEST
= VersionedKeyValueStore.PUT_RETURN_CODE_VALID_TO_UNDEFINED;
- private TimestampedKeyValueStore<K, V> timestampedStore = null;
+ private TimestampedKeyValueStoreWithHeaders<K, V> headersStore = null;
private VersionedKeyValueStore<K, V> versionedStore = null;
// same as either timestampedStore or versionedStore above. kept merely as
a convenience
// to simplify implementation for methods which do not depend on store
type.
private StateStore store;
+ @SuppressWarnings("unchecked")
public KeyValueStoreWrapper(final ProcessorContext<?, ?> context, final
String storeName) {
+ final StateStore rawStore = context.getStateStore(storeName);
+
+ // Try headers-aware timestamped store
try {
- // first try timestamped store
- timestampedStore = context.getStateStore(storeName);
- store = timestampedStore;
+ headersStore = (TimestampedKeyValueStoreWithHeaders<K, V>)
rawStore;
+ store = headersStore;
return;
} catch (final ClassCastException e) {
- // ignore since could be versioned store instead
+ // not headers store, try versioned
}
+ // Try versioned store
try {
- // next try versioned store
- versionedStore = context.getStateStore(storeName);
+ versionedStore = (VersionedKeyValueStore<K, V>) rawStore;
store = versionedStore;
} catch (final ClassCastException e) {
- store = context.getStateStore(storeName);
- final String storeType = store == null ? "null" :
store.getClass().getName();
+ final String storeType = rawStore == null ? "null" :
rawStore.getClass().getName();
throw new InvalidStateStoreException("KTable source state store
must implement either "
- + "TimestampedKeyValueStore or VersionedKeyValueStore. Got: "
+ storeType);
+ + "TimestampedKeyValueStore,
TimestampedKeyValueStoreWithHeaders, or VersionedKeyValueStore. Got: " +
storeType);
Review Comment:
```suggestion
+ "TimestampedKeyValueStoreWithHeaders or
VersionedKeyValueStore. Got: " + storeType);
```
##########
streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueStoreWrapper.java:
##########
@@ -45,70 +49,72 @@ public class KeyValueStoreWrapper<K, V> implements
StateStore {
public static final long PUT_RETURN_CODE_IS_LATEST
= VersionedKeyValueStore.PUT_RETURN_CODE_VALID_TO_UNDEFINED;
- private TimestampedKeyValueStore<K, V> timestampedStore = null;
+ private TimestampedKeyValueStoreWithHeaders<K, V> headersStore = null;
private VersionedKeyValueStore<K, V> versionedStore = null;
// same as either timestampedStore or versionedStore above. kept merely as
a convenience
// to simplify implementation for methods which do not depend on store
type.
private StateStore store;
+ @SuppressWarnings("unchecked")
public KeyValueStoreWrapper(final ProcessorContext<?, ?> context, final
String storeName) {
+ final StateStore rawStore = context.getStateStore(storeName);
+
+ // Try headers-aware timestamped store
try {
- // first try timestamped store
- timestampedStore = context.getStateStore(storeName);
- store = timestampedStore;
+ headersStore = (TimestampedKeyValueStoreWithHeaders<K, V>)
rawStore;
+ store = headersStore;
return;
} catch (final ClassCastException e) {
- // ignore since could be versioned store instead
+ // not headers store, try versioned
}
+ // Try versioned store
try {
- // next try versioned store
- versionedStore = context.getStateStore(storeName);
+ versionedStore = (VersionedKeyValueStore<K, V>) rawStore;
store = versionedStore;
} catch (final ClassCastException e) {
- store = context.getStateStore(storeName);
- final String storeType = store == null ? "null" :
store.getClass().getName();
+ final String storeType = rawStore == null ? "null" :
rawStore.getClass().getName();
throw new InvalidStateStoreException("KTable source state store
must implement either "
- + "TimestampedKeyValueStore or VersionedKeyValueStore. Got: "
+ storeType);
+ + "TimestampedKeyValueStore,
TimestampedKeyValueStoreWithHeaders, or VersionedKeyValueStore. Got: " +
storeType);
}
}
- public ValueAndTimestamp<V> get(final K key) {
- if (timestampedStore != null) {
- return timestampedStore.get(key);
+ public ValueTimestampHeaders<V> get(final K key) {
+ if (headersStore != null) {
+ return headersStore.get(key);
}
if (versionedStore != null) {
final VersionedRecord<V> versionedRecord = versionedStore.get(key);
return versionedRecord == null
? null
- : ValueAndTimestamp.make(versionedRecord.value(),
versionedRecord.timestamp());
+ : ValueTimestampHeaders.make(versionedRecord.value(),
versionedRecord.timestamp(), null);
}
- throw new IllegalStateException("KeyValueStoreWrapper must be
initialized with either timestamped or versioned store");
+ throw new IllegalStateException("KeyValueStoreWrapper must be
initialized with either timestamped, headers, or versioned store");
Review Comment:
```suggestion
throw new IllegalStateException("KeyValueStoreWrapper must be
initialized with either headers or versioned store");
```
##########
streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ResponseJoinProcessorSupplierTest.java:
##########
@@ -61,8 +61,8 @@ public void init(final ProcessorContext<?, ?> context) {
}
@Override
- public ValueAndTimestamp<V> get(final K key) {
- return ValueAndTimestamp.make(map.get(key), -1);
+ public ValueTimestampHeaders<V> get(final K key) {
+ return ValueTimestampHeaders.make(map.get(key), -1, null);
Review Comment:
Nit: Should we align to prod-code and pass `new RecordHeaders()` instead of
`null`?
Also elsewhere, if we change it -- I am fine either way.
##########
streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java:
##########
Review Comment:
It seems, we should pass in `Stores.persistentWindowStoreWithHeaders` here
-- we change the DSL to work with header-store, so if we pass in a plain-store,
the code inserts the adaptor?
Given that `Stores.persistentWindowStoreWithHeaders(...)` doesn't exists yet
(seems this is something we should get fixed?), we could also just pass `new
RocksDbWindowBytesStoreSupplier(...)` instead?
##########
streams/src/test/java/org/apache/kafka/test/GenericInMemoryTimestampedKeyValueStoreWithHeaders.java:
##########
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.test;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.TimestampedKeyValueStoreWithHeaders;
+import org.apache.kafka.streams.state.ValueTimestampHeaders;
+import org.apache.kafka.streams.state.internals.CacheFlushListener;
+import
org.apache.kafka.streams.state.internals.DelegatingPeekingKeyValueIterator;
+import org.apache.kafka.streams.state.internals.WrappedStateStore;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+
+/**
+ * This class is a generic version of the in-memory key-value store that is
useful for testing when you
+ * need a basic KeyValueStore for arbitrary types and don't have/want to
write a serde
+ */
+@SuppressWarnings("deprecation")
+public class GenericInMemoryTimestampedKeyValueStoreWithHeaders<K extends
Comparable, V>
+ extends WrappedStateStore<StateStore, K, ValueTimestampHeaders<V>>
+ implements TimestampedKeyValueStoreWithHeaders<K, V> {
+
+ private final String name;
+ private final NavigableMap<K, ValueTimestampHeaders<V>> map;
+ private volatile boolean open = false;
+
+ public GenericInMemoryTimestampedKeyValueStoreWithHeaders(final String
name) {
+ // it's not really a `WrappedStateStore` so we pass `null`
+ // however, we need to implement `WrappedStateStore` to make the store
usable
+ super(null);
+ this.name = name;
+
+ this.map = new TreeMap<>();
+ }
+
+ @Override
+ public String name() {
+ return this.name;
Review Comment:
```suggestion
return name;
```
##########
streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java:
##########
@@ -1222,7 +1222,7 @@ public void
shouldUseSpecifiedStoreSupplierForEachOuterJoinOperationBetweenKStre
final ProcessorTopology topology =
builder.internalTopologyBuilder.rewriteTopology(new
StreamsConfig(props)).buildTopology();
assertTypesForStateStore(topology.stateStores(),
InMemoryWindowStore.class,
- RocksDBWindowStore.class,
+ PlainToHeadersWindowStoreAdapter.class,
Review Comment:
> We can either fix the test
Not sure what you exactly mean by this. But cf my comment above -- this is
expected given how the test is setup -- but yes, we might want to update the
test setup.
I don't think we need to change anything about
`PlainToHeadersWindowStoreAdapter`.
##########
streams/src/main/java/org/apache/kafka/streams/state/internals/ReadOnlyTimestampedWindowStoreWithHeadersFacade.java:
##########
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.ReadOnlyWindowStore;
+import org.apache.kafka.streams.state.TimestampedWindowStoreWithHeaders;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.streams.state.ValueTimestampHeaders;
+import org.apache.kafka.streams.state.WindowStoreIterator;
+
+import java.time.Instant;
+
+/**
+ * A facade that wraps {@link TimestampedWindowStoreWithHeaders} to provide a
+ * {@link ReadOnlyWindowStore} interface with {@link ValueAndTimestamp} values.
+ * This facade converts {@link ValueTimestampHeaders} to {@link
ValueAndTimestamp}, discarding headers.
+ *
+ * @param <K> key type
+ * @param <V> value type
+ */
+public class ReadOnlyTimestampedWindowStoreWithHeadersFacade<K, V> implements
ReadOnlyWindowStore<K, ValueAndTimestamp<V>> {
Review Comment:
One more about `GenericXxx` ?
##########
streams/src/main/java/org/apache/kafka/streams/state/internals/ReadOnlyTimestampedKeyValueStoreWithHeadersFacade.java:
##########
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
+import org.apache.kafka.streams.state.TimestampedKeyValueStoreWithHeaders;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.streams.state.ValueTimestampHeaders;
+
+/**
+ * A facade that wraps {@link TimestampedKeyValueStoreWithHeaders} to provide a
+ * {@link ReadOnlyKeyValueStore} interface with {@link ValueAndTimestamp}
values.
+ * This facade converts {@link ValueTimestampHeaders} to {@link
ValueAndTimestamp}, discarding headers.
+ *
+ * Similar to {@link ReadOnlyKeyValueStoreWithHeadersFacade} but for
timestamped queries.
+ *
+ * @param <K> key type
+ * @param <V> value type
+ */
+public class ReadOnlyTimestampedKeyValueStoreWithHeadersFacade<K, V>
implements ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>> {
Review Comment:
Similar question. Can we use `GenericXxx` to reduce boiler place code
duplication?
##########
streams/src/main/java/org/apache/kafka/streams/state/internals/ReadOnlyKeyValueStoreWithHeadersFacade.java:
##########
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
+import org.apache.kafka.streams.state.TimestampedKeyValueStoreWithHeaders;
+import org.apache.kafka.streams.state.ValueTimestampHeaders;
+
+/**
+ * A facade that wraps {@link TimestampedKeyValueStoreWithHeaders} to provide a
+ * {@link ReadOnlyKeyValueStore} interface. This facade extracts the plain
value
+ * from {@link ValueTimestampHeaders}, discarding timestamp and headers.
+ *
+ * @param <K> key type
+ * @param <V> value type
+ */
+public class ReadOnlyKeyValueStoreWithHeadersFacade<K, V> implements
ReadOnlyKeyValueStore<K, V> {
Review Comment:
Should this be `ReadOnlyKeyValueStoreWithHeadersFacade extends
GenericReadOnlyKeyValueStoreFacade` or maybe we can drop the class entirely and
just use `new GenericReadOnlyKeyValueStoreFacade(...)` (similar to what we did
for IQv1 and TTD case)?
We could do this also as follow up PR.
##########
streams/src/main/java/org/apache/kafka/streams/state/internals/ReadOnlyWindowStoreWithHeadersFacade.java:
##########
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.ReadOnlyWindowStore;
+import org.apache.kafka.streams.state.TimestampedWindowStoreWithHeaders;
+import org.apache.kafka.streams.state.ValueTimestampHeaders;
+import org.apache.kafka.streams.state.WindowStoreIterator;
+
+import java.time.Instant;
+
+/**
+ * A facade that wraps {@link TimestampedWindowStoreWithHeaders} to provide a
+ * {@link ReadOnlyWindowStore} interface with plain values.
+ * This facade converts {@link ValueTimestampHeaders} to plain values,
discarding both timestamps and headers.
+ *
+ * @param <K> key type
+ * @param <V> value type
+ */
+public class ReadOnlyWindowStoreWithHeadersFacade<K, V> implements
ReadOnlyWindowStore<K, V> {
Review Comment:
Another facede
##########
streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueStoreWrapper.java:
##########
@@ -45,70 +49,72 @@ public class KeyValueStoreWrapper<K, V> implements
StateStore {
public static final long PUT_RETURN_CODE_IS_LATEST
= VersionedKeyValueStore.PUT_RETURN_CODE_VALID_TO_UNDEFINED;
- private TimestampedKeyValueStore<K, V> timestampedStore = null;
+ private TimestampedKeyValueStoreWithHeaders<K, V> headersStore = null;
private VersionedKeyValueStore<K, V> versionedStore = null;
// same as either timestampedStore or versionedStore above. kept merely as
a convenience
// to simplify implementation for methods which do not depend on store
type.
private StateStore store;
+ @SuppressWarnings("unchecked")
public KeyValueStoreWrapper(final ProcessorContext<?, ?> context, final
String storeName) {
+ final StateStore rawStore = context.getStateStore(storeName);
+
+ // Try headers-aware timestamped store
try {
- // first try timestamped store
- timestampedStore = context.getStateStore(storeName);
- store = timestampedStore;
+ headersStore = (TimestampedKeyValueStoreWithHeaders<K, V>)
rawStore;
+ store = headersStore;
return;
} catch (final ClassCastException e) {
- // ignore since could be versioned store instead
+ // not headers store, try versioned
}
+ // Try versioned store
try {
- // next try versioned store
- versionedStore = context.getStateStore(storeName);
+ versionedStore = (VersionedKeyValueStore<K, V>) rawStore;
store = versionedStore;
} catch (final ClassCastException e) {
- store = context.getStateStore(storeName);
- final String storeType = store == null ? "null" :
store.getClass().getName();
+ final String storeType = rawStore == null ? "null" :
rawStore.getClass().getName();
throw new InvalidStateStoreException("KTable source state store
must implement either "
- + "TimestampedKeyValueStore or VersionedKeyValueStore. Got: "
+ storeType);
+ + "TimestampedKeyValueStore,
TimestampedKeyValueStoreWithHeaders, or VersionedKeyValueStore. Got: " +
storeType);
}
}
- public ValueAndTimestamp<V> get(final K key) {
- if (timestampedStore != null) {
- return timestampedStore.get(key);
+ public ValueTimestampHeaders<V> get(final K key) {
+ if (headersStore != null) {
+ return headersStore.get(key);
}
if (versionedStore != null) {
final VersionedRecord<V> versionedRecord = versionedStore.get(key);
return versionedRecord == null
? null
- : ValueAndTimestamp.make(versionedRecord.value(),
versionedRecord.timestamp());
+ : ValueTimestampHeaders.make(versionedRecord.value(),
versionedRecord.timestamp(), null);
}
- throw new IllegalStateException("KeyValueStoreWrapper must be
initialized with either timestamped or versioned store");
+ throw new IllegalStateException("KeyValueStoreWrapper must be
initialized with either timestamped, headers, or versioned store");
}
- public ValueAndTimestamp<V> get(final K key, final long asOfTimestamp) {
+ public ValueTimestampHeaders<V> get(final K key, final long asOfTimestamp)
{
if (!isVersionedStore()) {
throw new UnsupportedOperationException("get(key, timestamp) is
only supported for versioned stores");
}
final VersionedRecord<V> versionedRecord = versionedStore.get(key,
asOfTimestamp);
- return versionedRecord == null ? null :
ValueAndTimestamp.make(versionedRecord.value(), versionedRecord.timestamp());
+ return versionedRecord == null ? null :
ValueTimestampHeaders.make(versionedRecord.value(),
versionedRecord.timestamp(), new RecordHeaders());
}
/**
* @return {@code -1} if the put record is the latest for its key, and
{@code Long.MIN_VALUE}
* if the put was rejected (i.e., due to grace period having
elapsed for a versioned
* store). If neither, any other long value may be returned.
*/
- public long put(final K key, final V value, final long timestamp) {
- if (timestampedStore != null) {
- timestampedStore.put(key, ValueAndTimestamp.make(value,
timestamp));
+ public long put(final K key, final V value, final long timestamp, final
Headers headers) {
+ if (headersStore != null) {
+ headersStore.put(key, ValueTimestampHeaders.make(value, timestamp,
headers));
return PUT_RETURN_CODE_IS_LATEST;
}
if (versionedStore != null) {
return versionedStore.put(key, value, timestamp);
}
- throw new IllegalStateException("KeyValueStoreWrapper must be
initialized with either timestamped or versioned store");
+ throw new IllegalStateException("KeyValueStoreWrapper must be
initialized with either timestamped, headers, or versioned store");
Review Comment:
```suggestion
throw new IllegalStateException("KeyValueStoreWrapper must be
initialized with either headers or versioned store");
```
##########
streams/src/test/java/org/apache/kafka/streams/KeyValueTimestampHeaders.java:
##########
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams;
+
+import org.apache.kafka.common.header.Headers;
+
+import java.util.Objects;
+
+/**
+ * A key-value pair with timestamp and headers, used for testing KTable with
headers support.
+ */
+public class KeyValueTimestampHeaders<K, V> {
Review Comment:
Wondering why we add this? Can't we use `Record` ? (might be another follow
up PR to clean it up)
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/WindowStoreMaterializerTest.java:
##########
@@ -105,297 +101,146 @@ private void mockWindowStoreSupplier() {
}
@Test
- public void
shouldCreateTimestampedBuilderWithCachingAndLoggingEnabledByDefault() {
+ public void
shouldCreateHeadersBuilderWithCachingAndLoggingEnabledByDefault() {
final MaterializedInternal<String, String, WindowStore<Bytes, byte[]>>
materialized =
new MaterializedInternal<>(Materialized.as("store"), nameProvider,
STORE_PREFIX);
- final TimestampedWindowStore<String, String> store =
getTimestampedStore(materialized);
+ final TimestampedWindowStoreWithHeaders<String, String> store =
getHeadersStore(materialized);
final WrappedStateStore caching = (WrappedStateStore)
((WrappedStateStore) store).wrapped();
final StateStore logging = caching.wrapped();
- assertInstanceOf(MeteredTimestampedWindowStore.class, store);
+ assertInstanceOf(MeteredTimestampedWindowStoreWithHeaders.class,
store);
assertInstanceOf(CachingWindowStore.class, caching);
- assertInstanceOf(ChangeLoggingTimestampedWindowBytesStore.class,
logging);
+
assertInstanceOf(ChangeLoggingTimestampedWindowBytesStoreWithHeaders.class,
logging);
}
@Test
- public void shouldCreateTimestampedBuilderWithCachingDisabled() {
+ public void shouldCreateHeadersBuilderWithCachingDisabled() {
final MaterializedInternal<String, String, WindowStore<Bytes, byte[]>>
materialized = new MaterializedInternal<>(
Materialized.<String, String, WindowStore<Bytes,
byte[]>>as("store").withCachingDisabled(), nameProvider, STORE_PREFIX
);
- final TimestampedWindowStore<String, String> store =
getTimestampedStore(materialized);
+ final TimestampedWindowStoreWithHeaders<String, String> store =
getHeadersStore(materialized);
final WrappedStateStore logging = (WrappedStateStore)
((WrappedStateStore) store).wrapped();
- assertInstanceOf(ChangeLoggingTimestampedWindowBytesStore.class,
logging);
+
assertInstanceOf(ChangeLoggingTimestampedWindowBytesStoreWithHeaders.class,
logging);
}
@Test
- public void shouldCreateTimestampedBuilderWithLoggingDisabled() {
+ public void shouldCreateHeadersBuilderWithLoggingDisabled() {
final MaterializedInternal<String, String, WindowStore<Bytes, byte[]>>
materialized = new MaterializedInternal<>(
Materialized.<String, String, WindowStore<Bytes,
byte[]>>as("store").withLoggingDisabled(), nameProvider, STORE_PREFIX
);
- final TimestampedWindowStore<String, String> store =
getTimestampedStore(materialized);
+ final TimestampedWindowStoreWithHeaders<String, String> store =
getHeadersStore(materialized);
final WrappedStateStore caching = (WrappedStateStore)
((WrappedStateStore) store).wrapped();
assertInstanceOf(CachingWindowStore.class, caching);
- assertFalse(caching.wrapped() instanceof
ChangeLoggingTimestampedWindowBytesStore);
+ assertFalse(caching.wrapped() instanceof
ChangeLoggingTimestampedWindowBytesStoreWithHeaders);
}
@Test
- public void shouldCreateTimestampedBuilderWithCachingAndLoggingDisabled() {
+ public void shouldCreateHeadersBuilderWithCachingAndLoggingDisabled() {
final MaterializedInternal<String, String, WindowStore<Bytes, byte[]>>
materialized = new MaterializedInternal<>(
Materialized.<String, String, WindowStore<Bytes,
byte[]>>as("store").withCachingDisabled().withLoggingDisabled(), nameProvider,
STORE_PREFIX
);
- final TimestampedWindowStore<String, String> store =
getTimestampedStore(materialized);
+ final TimestampedWindowStoreWithHeaders<String, String> store =
getHeadersStore(materialized);
final StateStore wrapped = ((WrappedStateStore) store).wrapped();
assertFalse(wrapped instanceof CachingWindowStore);
- assertFalse(wrapped instanceof
ChangeLoggingTimestampedWindowBytesStore);
+ assertFalse(wrapped instanceof
ChangeLoggingTimestampedWindowBytesStoreWithHeaders);
}
@Test
- public void
shouldCreateTimestampedStoreWithProvidedSupplierAndCachingAndLoggingEnabledByDefault()
{
+ public void
shouldCreateHeadersStoreWithProvidedSupplierAndCachingAndLoggingEnabledByDefault()
{
mockWindowStoreSupplier();
final MaterializedInternal<String, String, WindowStore<Bytes, byte[]>>
materialized =
new MaterializedInternal<>(Materialized.as(windowStoreSupplier),
nameProvider, STORE_PREFIX);
- final TimestampedWindowStore<String, String> store =
getTimestampedStore(materialized);
+ final TimestampedWindowStoreWithHeaders<String, String> store =
getHeadersStore(materialized);
final WrappedStateStore caching = (WrappedStateStore)
((WrappedStateStore) store).wrapped();
final StateStore logging = caching.wrapped();
assertEquals(innerWindowStore.name(), store.name());
- assertInstanceOf(MeteredTimestampedWindowStore.class, store);
+ assertInstanceOf(MeteredTimestampedWindowStoreWithHeaders.class,
store);
assertInstanceOf(CachingWindowStore.class, caching);
- assertInstanceOf(ChangeLoggingTimestampedWindowBytesStore.class,
logging);
+
assertInstanceOf(ChangeLoggingTimestampedWindowBytesStoreWithHeaders.class,
logging);
}
@Test
- public void
shouldCreateTimestampedStoreWithProvidedSupplierAndCachingDisabled() {
+ public void
shouldCreateHeadersStoreWithProvidedSupplierAndCachingDisabled() {
mockWindowStoreSupplier();
final MaterializedInternal<String, String, WindowStore<Bytes, byte[]>>
materialized =
new MaterializedInternal<>(Materialized.<String,
String>as(windowStoreSupplier).withCachingDisabled(), nameProvider,
STORE_PREFIX);
- final TimestampedWindowStore<String, String> store =
getTimestampedStore(materialized);
+ final TimestampedWindowStoreWithHeaders<String, String> store =
getHeadersStore(materialized);
final WrappedStateStore logging = (WrappedStateStore)
((WrappedStateStore) store).wrapped();
assertEquals(innerWindowStore.name(), store.name());
- assertInstanceOf(ChangeLoggingTimestampedWindowBytesStore.class,
logging);
+
assertInstanceOf(ChangeLoggingTimestampedWindowBytesStoreWithHeaders.class,
logging);
}
@Test
- public void
shouldCreateTimestampedStoreWithProvidedSupplierAndLoggingDisabled() {
+ public void
shouldCreateHeadersStoreWithProvidedSupplierAndLoggingDisabled() {
mockWindowStoreSupplier();
final MaterializedInternal<String, String, WindowStore<Bytes, byte[]>>
materialized =
new MaterializedInternal<>(Materialized.<String,
String>as(windowStoreSupplier).withLoggingDisabled(), nameProvider,
STORE_PREFIX);
- final TimestampedWindowStore<String, String> store =
getTimestampedStore(materialized);
+ final TimestampedWindowStoreWithHeaders<String, String> store =
getHeadersStore(materialized);
final WrappedStateStore caching = (WrappedStateStore)
((WrappedStateStore) store).wrapped();
assertEquals(innerWindowStore.name(), store.name());
assertInstanceOf(CachingWindowStore.class, caching);
- assertFalse(caching.wrapped() instanceof
ChangeLoggingTimestampedWindowBytesStore);
+ assertFalse(caching.wrapped() instanceof
ChangeLoggingTimestampedWindowBytesStoreWithHeaders);
}
@Test
- public void
shouldCreateTimestampedStoreWithProvidedSupplierAndCachingAndLoggingDisabled() {
+ public void
shouldCreateHeadersStoreWithProvidedSupplierAndCachingAndLoggingDisabled() {
mockWindowStoreSupplier();
final MaterializedInternal<String, String, WindowStore<Bytes, byte[]>>
materialized =
new MaterializedInternal<>(Materialized.<String,
String>as(windowStoreSupplier).withCachingDisabled().withLoggingDisabled(),
nameProvider, STORE_PREFIX);
- final TimestampedWindowStore<String, String> store =
getTimestampedStore(materialized);
+ final TimestampedWindowStoreWithHeaders<String, String> store =
getHeadersStore(materialized);
final StateStore wrapped = ((WrappedStateStore) store).wrapped();
assertEquals(innerWindowStore.name(), store.name());
assertFalse(wrapped instanceof CachingWindowStore);
- assertFalse(wrapped instanceof
ChangeLoggingTimestampedWindowBytesStore);
- }
-
- @Test
- public void shouldCreateHeadersAwareStoreWithLoggingEnabledByDefault() {
- doReturn("headers")
-
.when(streamsConfig).getString(StreamsConfig.DSL_STORE_FORMAT_CONFIG);
-
- final MaterializedInternal<String, String, WindowStore<Bytes, byte[]>>
materialized =
- new MaterializedInternal<>(Materialized.<String, String,
WindowStore<Bytes, byte[]>>as("store")
- .withCachingDisabled(), nameProvider, STORE_PREFIX);
-
- final TimestampedWindowStoreWithHeaders<String, String> store =
getHeadersAwareStore(materialized);
-
- final WrappedStateStore logging = (WrappedStateStore)
((WrappedStateStore) store).wrapped();
- assertInstanceOf(MeteredTimestampedWindowStoreWithHeaders.class,
store);
-
assertInstanceOf(ChangeLoggingTimestampedWindowBytesStoreWithHeaders.class,
logging);
- }
-
- @Test
- public void shouldCreateHeadersAwareStoreWithLoggingDisabled() {
- doReturn("headers")
-
.when(streamsConfig).getString(StreamsConfig.DSL_STORE_FORMAT_CONFIG);
-
- final MaterializedInternal<String, String, WindowStore<Bytes, byte[]>>
materialized = new MaterializedInternal<>(
- Materialized.<String, String, WindowStore<Bytes,
byte[]>>as("store").withLoggingDisabled(), nameProvider, STORE_PREFIX
- );
-
- final TimestampedWindowStoreWithHeaders<String, String> store =
getHeadersAwareStore(materialized);
-
- final StateStore wrapped = ((WrappedStateStore) store).wrapped();
- assertFalse(wrapped instanceof
ChangeLoggingTimestampedWindowBytesStore);
- }
-
- @Test
- public void shouldBuildHeadersAwareStoreWithCachingEnabledByDefault() {
- doReturn("headers")
-
.when(streamsConfig).getString(StreamsConfig.DSL_STORE_FORMAT_CONFIG);
-
- final MaterializedInternal<String, String, WindowStore<Bytes, byte[]>>
materialized =
- new MaterializedInternal<>(Materialized.as("store"), nameProvider,
STORE_PREFIX);
-
- final TimestampedWindowStoreWithHeaders<String, String> store =
getHeadersAwareStore(materialized);
-
- final StateStore wrapped = ((WrappedStateStore) store).wrapped();
- assertInstanceOf(CachingWindowStore.class, wrapped);
- }
-
- @Test
- public void
shouldCreateHeadersAwareStoreWithProvidedSupplierAndLoggingEnabled() {
- mockWindowStoreSupplier();
- doReturn("headers")
-
.when(streamsConfig).getString(StreamsConfig.DSL_STORE_FORMAT_CONFIG);
-
- final MaterializedInternal<String, String, WindowStore<Bytes, byte[]>>
materialized =
- new MaterializedInternal<>(Materialized.<String,
String>as(windowStoreSupplier).withCachingDisabled(), nameProvider,
STORE_PREFIX);
-
- final TimestampedWindowStoreWithHeaders<String, String> store =
getHeadersAwareStore(materialized);
-
- final WrappedStateStore logging = (WrappedStateStore)
((WrappedStateStore) store).wrapped();
- assertEquals(innerWindowStore.name(), store.name());
- assertInstanceOf(MeteredTimestampedWindowStoreWithHeaders.class,
store);
-
assertInstanceOf(ChangeLoggingTimestampedWindowBytesStoreWithHeaders.class,
logging);
- }
-
- @Test
- public void shouldCreateHeadersAwareStoreWithCachingAndLoggingDisabled() {
- doReturn("headers")
-
.when(streamsConfig).getString(StreamsConfig.DSL_STORE_FORMAT_CONFIG);
-
- final MaterializedInternal<String, String, WindowStore<Bytes, byte[]>>
materialized = new MaterializedInternal<>(
- Materialized.<String, String, WindowStore<Bytes,
byte[]>>as("store").withCachingDisabled().withLoggingDisabled(), nameProvider,
STORE_PREFIX
- );
-
- final TimestampedWindowStoreWithHeaders<String, String> store =
getHeadersAwareStore(materialized);
-
- final StateStore wrapped = ((WrappedStateStore) store).wrapped();
- assertFalse(wrapped instanceof CachingWindowStore);
assertFalse(wrapped instanceof
ChangeLoggingTimestampedWindowBytesStoreWithHeaders);
}
@Test
- public void shouldCreateTimestampedStoreWithOnWindowClose() {
+ public void shouldCreateHeadersStoreWithOnWindowClose() {
emitStrategy = EmitStrategy.onWindowClose();
final MaterializedInternal<String, String, WindowStore<Bytes, byte[]>>
materialized =
new MaterializedInternal<>(Materialized.<String, String,
WindowStore<Bytes, byte[]>>as("store")
.withCachingDisabled(), nameProvider, STORE_PREFIX);
- final TimestampedWindowStore<String, String> store =
getTimestampedStore(materialized);
-
- final WrappedStateStore logging = (WrappedStateStore)
((WrappedStateStore) store).wrapped();
- assertInstanceOf(MeteredTimestampedWindowStore.class, store);
- assertInstanceOf(ChangeLoggingTimestampedWindowBytesStore.class,
logging);
- }
-
- @Test
- public void
shouldCreateTimestampedStoreWithOnWindowCloseAndCachingEnabled() {
- emitStrategy = EmitStrategy.onWindowClose();
-
- final MaterializedInternal<String, String, WindowStore<Bytes, byte[]>>
materialized =
- new MaterializedInternal<>(Materialized.as("store"), nameProvider,
STORE_PREFIX);
-
- final TimestampedWindowStore<String, String> store =
getTimestampedStore(materialized);
-
- final StateStore wrapped = ((WrappedStateStore) store).wrapped();
- assertInstanceOf(TimeOrderedCachingWindowStore.class, wrapped);
- }
-
- @Test
- public void shouldCreateHeadersAwareStoreWithOnWindowClose() {
- doReturn("headers")
-
.when(streamsConfig).getString(StreamsConfig.DSL_STORE_FORMAT_CONFIG);
- emitStrategy = EmitStrategy.onWindowClose();
-
- final MaterializedInternal<String, String, WindowStore<Bytes, byte[]>>
materialized =
- new MaterializedInternal<>(Materialized.<String, String,
WindowStore<Bytes, byte[]>>as("store")
- .withCachingDisabled(), nameProvider, STORE_PREFIX);
-
- final TimestampedWindowStoreWithHeaders<String, String> store =
getHeadersAwareStore(materialized);
+ final TimestampedWindowStoreWithHeaders<String, String> store =
getHeadersStore(materialized);
final WrappedStateStore logging = (WrappedStateStore)
((WrappedStateStore) store).wrapped();
assertInstanceOf(MeteredTimestampedWindowStoreWithHeaders.class,
store);
assertInstanceOf(ChangeLoggingTimestampedWindowBytesStoreWithHeaders.class,
logging);
}
@Test
- public void
shouldCreateHeadersAwareStoreWithOnWindowCloseAndLoggingDisabled() {
- doReturn("headers")
-
.when(streamsConfig).getString(StreamsConfig.DSL_STORE_FORMAT_CONFIG);
- emitStrategy = EmitStrategy.onWindowClose();
-
- final MaterializedInternal<String, String, WindowStore<Bytes, byte[]>>
materialized = new MaterializedInternal<>(
- Materialized.<String, String, WindowStore<Bytes,
byte[]>>as("store").withLoggingDisabled(), nameProvider, STORE_PREFIX
- );
-
- final TimestampedWindowStoreWithHeaders<String, String> store =
getHeadersAwareStore(materialized);
-
- final StateStore wrapped = ((WrappedStateStore) store).wrapped();
- assertFalse(wrapped instanceof
ChangeLoggingTimestampedWindowBytesStoreWithHeaders);
- }
-
- @Test
- public void
shouldCreateHeadersAwareStoreWithOnWindowCloseAndCachingEnabled() {
- doReturn("headers")
-
.when(streamsConfig).getString(StreamsConfig.DSL_STORE_FORMAT_CONFIG);
+ public void shouldCreateHeadersStoreWithOnWindowCloseAndCachingEnabled() {
emitStrategy = EmitStrategy.onWindowClose();
final MaterializedInternal<String, String, WindowStore<Bytes, byte[]>>
materialized =
new MaterializedInternal<>(Materialized.as("store"), nameProvider,
STORE_PREFIX);
- final TimestampedWindowStoreWithHeaders<String, String> store =
getHeadersAwareStore(materialized);
+ final TimestampedWindowStoreWithHeaders<String, String> store =
getHeadersStore(materialized);
final StateStore wrapped = ((WrappedStateStore) store).wrapped();
- assertInstanceOf(TimeOrderedCachingWindowStore.class, wrapped);
Review Comment:
Btw: Thanks for highlighting this change -- I would have missed it most
likely during review w/o your comment.
##########
streams/src/test/java/org/apache/kafka/test/GenericInMemoryTimestampedKeyValueStoreWithHeaders.java:
##########
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.test;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.TimestampedKeyValueStoreWithHeaders;
+import org.apache.kafka.streams.state.ValueTimestampHeaders;
+import org.apache.kafka.streams.state.internals.CacheFlushListener;
+import
org.apache.kafka.streams.state.internals.DelegatingPeekingKeyValueIterator;
+import org.apache.kafka.streams.state.internals.WrappedStateStore;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+
+/**
+ * This class is a generic version of the in-memory key-value store that is
useful for testing when you
+ * need a basic KeyValueStore for arbitrary types and don't have/want to
write a serde
+ */
+@SuppressWarnings("deprecation")
+public class GenericInMemoryTimestampedKeyValueStoreWithHeaders<K extends
Comparable, V>
+ extends WrappedStateStore<StateStore, K, ValueTimestampHeaders<V>>
+ implements TimestampedKeyValueStoreWithHeaders<K, V> {
+
+ private final String name;
+ private final NavigableMap<K, ValueTimestampHeaders<V>> map;
+ private volatile boolean open = false;
+
+ public GenericInMemoryTimestampedKeyValueStoreWithHeaders(final String
name) {
+ // it's not really a `WrappedStateStore` so we pass `null`
+ // however, we need to implement `WrappedStateStore` to make the store
usable
+ super(null);
+ this.name = name;
+
+ this.map = new TreeMap<>();
+ }
+
+ @Override
+ public String name() {
+ return this.name;
Review Comment:
nit: avoid unnecessary `this.` (similar below)
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/WindowStoreMaterializerTest.java:
##########
@@ -105,297 +101,146 @@ private void mockWindowStoreSupplier() {
}
@Test
- public void
shouldCreateTimestampedBuilderWithCachingAndLoggingEnabledByDefault() {
+ public void
shouldCreateHeadersBuilderWithCachingAndLoggingEnabledByDefault() {
final MaterializedInternal<String, String, WindowStore<Bytes, byte[]>>
materialized =
new MaterializedInternal<>(Materialized.as("store"), nameProvider,
STORE_PREFIX);
- final TimestampedWindowStore<String, String> store =
getTimestampedStore(materialized);
+ final TimestampedWindowStoreWithHeaders<String, String> store =
getHeadersStore(materialized);
final WrappedStateStore caching = (WrappedStateStore)
((WrappedStateStore) store).wrapped();
final StateStore logging = caching.wrapped();
- assertInstanceOf(MeteredTimestampedWindowStore.class, store);
+ assertInstanceOf(MeteredTimestampedWindowStoreWithHeaders.class,
store);
assertInstanceOf(CachingWindowStore.class, caching);
- assertInstanceOf(ChangeLoggingTimestampedWindowBytesStore.class,
logging);
+
assertInstanceOf(ChangeLoggingTimestampedWindowBytesStoreWithHeaders.class,
logging);
}
@Test
- public void shouldCreateTimestampedBuilderWithCachingDisabled() {
+ public void shouldCreateHeadersBuilderWithCachingDisabled() {
final MaterializedInternal<String, String, WindowStore<Bytes, byte[]>>
materialized = new MaterializedInternal<>(
Materialized.<String, String, WindowStore<Bytes,
byte[]>>as("store").withCachingDisabled(), nameProvider, STORE_PREFIX
);
- final TimestampedWindowStore<String, String> store =
getTimestampedStore(materialized);
+ final TimestampedWindowStoreWithHeaders<String, String> store =
getHeadersStore(materialized);
final WrappedStateStore logging = (WrappedStateStore)
((WrappedStateStore) store).wrapped();
- assertInstanceOf(ChangeLoggingTimestampedWindowBytesStore.class,
logging);
+
assertInstanceOf(ChangeLoggingTimestampedWindowBytesStoreWithHeaders.class,
logging);
}
@Test
- public void shouldCreateTimestampedBuilderWithLoggingDisabled() {
+ public void shouldCreateHeadersBuilderWithLoggingDisabled() {
final MaterializedInternal<String, String, WindowStore<Bytes, byte[]>>
materialized = new MaterializedInternal<>(
Materialized.<String, String, WindowStore<Bytes,
byte[]>>as("store").withLoggingDisabled(), nameProvider, STORE_PREFIX
);
- final TimestampedWindowStore<String, String> store =
getTimestampedStore(materialized);
+ final TimestampedWindowStoreWithHeaders<String, String> store =
getHeadersStore(materialized);
final WrappedStateStore caching = (WrappedStateStore)
((WrappedStateStore) store).wrapped();
assertInstanceOf(CachingWindowStore.class, caching);
- assertFalse(caching.wrapped() instanceof
ChangeLoggingTimestampedWindowBytesStore);
+ assertFalse(caching.wrapped() instanceof
ChangeLoggingTimestampedWindowBytesStoreWithHeaders);
}
@Test
- public void shouldCreateTimestampedBuilderWithCachingAndLoggingDisabled() {
+ public void shouldCreateHeadersBuilderWithCachingAndLoggingDisabled() {
final MaterializedInternal<String, String, WindowStore<Bytes, byte[]>>
materialized = new MaterializedInternal<>(
Materialized.<String, String, WindowStore<Bytes,
byte[]>>as("store").withCachingDisabled().withLoggingDisabled(), nameProvider,
STORE_PREFIX
);
- final TimestampedWindowStore<String, String> store =
getTimestampedStore(materialized);
+ final TimestampedWindowStoreWithHeaders<String, String> store =
getHeadersStore(materialized);
final StateStore wrapped = ((WrappedStateStore) store).wrapped();
assertFalse(wrapped instanceof CachingWindowStore);
- assertFalse(wrapped instanceof
ChangeLoggingTimestampedWindowBytesStore);
+ assertFalse(wrapped instanceof
ChangeLoggingTimestampedWindowBytesStoreWithHeaders);
}
@Test
- public void
shouldCreateTimestampedStoreWithProvidedSupplierAndCachingAndLoggingEnabledByDefault()
{
+ public void
shouldCreateHeadersStoreWithProvidedSupplierAndCachingAndLoggingEnabledByDefault()
{
mockWindowStoreSupplier();
final MaterializedInternal<String, String, WindowStore<Bytes, byte[]>>
materialized =
new MaterializedInternal<>(Materialized.as(windowStoreSupplier),
nameProvider, STORE_PREFIX);
- final TimestampedWindowStore<String, String> store =
getTimestampedStore(materialized);
+ final TimestampedWindowStoreWithHeaders<String, String> store =
getHeadersStore(materialized);
final WrappedStateStore caching = (WrappedStateStore)
((WrappedStateStore) store).wrapped();
final StateStore logging = caching.wrapped();
assertEquals(innerWindowStore.name(), store.name());
- assertInstanceOf(MeteredTimestampedWindowStore.class, store);
+ assertInstanceOf(MeteredTimestampedWindowStoreWithHeaders.class,
store);
assertInstanceOf(CachingWindowStore.class, caching);
- assertInstanceOf(ChangeLoggingTimestampedWindowBytesStore.class,
logging);
+
assertInstanceOf(ChangeLoggingTimestampedWindowBytesStoreWithHeaders.class,
logging);
}
@Test
- public void
shouldCreateTimestampedStoreWithProvidedSupplierAndCachingDisabled() {
+ public void
shouldCreateHeadersStoreWithProvidedSupplierAndCachingDisabled() {
mockWindowStoreSupplier();
final MaterializedInternal<String, String, WindowStore<Bytes, byte[]>>
materialized =
new MaterializedInternal<>(Materialized.<String,
String>as(windowStoreSupplier).withCachingDisabled(), nameProvider,
STORE_PREFIX);
- final TimestampedWindowStore<String, String> store =
getTimestampedStore(materialized);
+ final TimestampedWindowStoreWithHeaders<String, String> store =
getHeadersStore(materialized);
final WrappedStateStore logging = (WrappedStateStore)
((WrappedStateStore) store).wrapped();
assertEquals(innerWindowStore.name(), store.name());
- assertInstanceOf(ChangeLoggingTimestampedWindowBytesStore.class,
logging);
+
assertInstanceOf(ChangeLoggingTimestampedWindowBytesStoreWithHeaders.class,
logging);
}
@Test
- public void
shouldCreateTimestampedStoreWithProvidedSupplierAndLoggingDisabled() {
+ public void
shouldCreateHeadersStoreWithProvidedSupplierAndLoggingDisabled() {
mockWindowStoreSupplier();
final MaterializedInternal<String, String, WindowStore<Bytes, byte[]>>
materialized =
new MaterializedInternal<>(Materialized.<String,
String>as(windowStoreSupplier).withLoggingDisabled(), nameProvider,
STORE_PREFIX);
- final TimestampedWindowStore<String, String> store =
getTimestampedStore(materialized);
+ final TimestampedWindowStoreWithHeaders<String, String> store =
getHeadersStore(materialized);
final WrappedStateStore caching = (WrappedStateStore)
((WrappedStateStore) store).wrapped();
assertEquals(innerWindowStore.name(), store.name());
assertInstanceOf(CachingWindowStore.class, caching);
- assertFalse(caching.wrapped() instanceof
ChangeLoggingTimestampedWindowBytesStore);
+ assertFalse(caching.wrapped() instanceof
ChangeLoggingTimestampedWindowBytesStoreWithHeaders);
}
@Test
- public void
shouldCreateTimestampedStoreWithProvidedSupplierAndCachingAndLoggingDisabled() {
+ public void
shouldCreateHeadersStoreWithProvidedSupplierAndCachingAndLoggingDisabled() {
mockWindowStoreSupplier();
final MaterializedInternal<String, String, WindowStore<Bytes, byte[]>>
materialized =
new MaterializedInternal<>(Materialized.<String,
String>as(windowStoreSupplier).withCachingDisabled().withLoggingDisabled(),
nameProvider, STORE_PREFIX);
- final TimestampedWindowStore<String, String> store =
getTimestampedStore(materialized);
+ final TimestampedWindowStoreWithHeaders<String, String> store =
getHeadersStore(materialized);
final StateStore wrapped = ((WrappedStateStore) store).wrapped();
assertEquals(innerWindowStore.name(), store.name());
assertFalse(wrapped instanceof CachingWindowStore);
- assertFalse(wrapped instanceof
ChangeLoggingTimestampedWindowBytesStore);
- }
-
- @Test
- public void shouldCreateHeadersAwareStoreWithLoggingEnabledByDefault() {
- doReturn("headers")
-
.when(streamsConfig).getString(StreamsConfig.DSL_STORE_FORMAT_CONFIG);
-
- final MaterializedInternal<String, String, WindowStore<Bytes, byte[]>>
materialized =
- new MaterializedInternal<>(Materialized.<String, String,
WindowStore<Bytes, byte[]>>as("store")
- .withCachingDisabled(), nameProvider, STORE_PREFIX);
-
- final TimestampedWindowStoreWithHeaders<String, String> store =
getHeadersAwareStore(materialized);
-
- final WrappedStateStore logging = (WrappedStateStore)
((WrappedStateStore) store).wrapped();
- assertInstanceOf(MeteredTimestampedWindowStoreWithHeaders.class,
store);
-
assertInstanceOf(ChangeLoggingTimestampedWindowBytesStoreWithHeaders.class,
logging);
- }
-
- @Test
- public void shouldCreateHeadersAwareStoreWithLoggingDisabled() {
- doReturn("headers")
-
.when(streamsConfig).getString(StreamsConfig.DSL_STORE_FORMAT_CONFIG);
-
- final MaterializedInternal<String, String, WindowStore<Bytes, byte[]>>
materialized = new MaterializedInternal<>(
- Materialized.<String, String, WindowStore<Bytes,
byte[]>>as("store").withLoggingDisabled(), nameProvider, STORE_PREFIX
- );
-
- final TimestampedWindowStoreWithHeaders<String, String> store =
getHeadersAwareStore(materialized);
-
- final StateStore wrapped = ((WrappedStateStore) store).wrapped();
- assertFalse(wrapped instanceof
ChangeLoggingTimestampedWindowBytesStore);
- }
-
- @Test
- public void shouldBuildHeadersAwareStoreWithCachingEnabledByDefault() {
- doReturn("headers")
-
.when(streamsConfig).getString(StreamsConfig.DSL_STORE_FORMAT_CONFIG);
-
- final MaterializedInternal<String, String, WindowStore<Bytes, byte[]>>
materialized =
- new MaterializedInternal<>(Materialized.as("store"), nameProvider,
STORE_PREFIX);
-
- final TimestampedWindowStoreWithHeaders<String, String> store =
getHeadersAwareStore(materialized);
-
- final StateStore wrapped = ((WrappedStateStore) store).wrapped();
- assertInstanceOf(CachingWindowStore.class, wrapped);
- }
-
- @Test
- public void
shouldCreateHeadersAwareStoreWithProvidedSupplierAndLoggingEnabled() {
- mockWindowStoreSupplier();
- doReturn("headers")
-
.when(streamsConfig).getString(StreamsConfig.DSL_STORE_FORMAT_CONFIG);
-
- final MaterializedInternal<String, String, WindowStore<Bytes, byte[]>>
materialized =
- new MaterializedInternal<>(Materialized.<String,
String>as(windowStoreSupplier).withCachingDisabled(), nameProvider,
STORE_PREFIX);
-
- final TimestampedWindowStoreWithHeaders<String, String> store =
getHeadersAwareStore(materialized);
-
- final WrappedStateStore logging = (WrappedStateStore)
((WrappedStateStore) store).wrapped();
- assertEquals(innerWindowStore.name(), store.name());
- assertInstanceOf(MeteredTimestampedWindowStoreWithHeaders.class,
store);
-
assertInstanceOf(ChangeLoggingTimestampedWindowBytesStoreWithHeaders.class,
logging);
- }
-
- @Test
- public void shouldCreateHeadersAwareStoreWithCachingAndLoggingDisabled() {
- doReturn("headers")
-
.when(streamsConfig).getString(StreamsConfig.DSL_STORE_FORMAT_CONFIG);
-
- final MaterializedInternal<String, String, WindowStore<Bytes, byte[]>>
materialized = new MaterializedInternal<>(
- Materialized.<String, String, WindowStore<Bytes,
byte[]>>as("store").withCachingDisabled().withLoggingDisabled(), nameProvider,
STORE_PREFIX
- );
-
- final TimestampedWindowStoreWithHeaders<String, String> store =
getHeadersAwareStore(materialized);
-
- final StateStore wrapped = ((WrappedStateStore) store).wrapped();
- assertFalse(wrapped instanceof CachingWindowStore);
assertFalse(wrapped instanceof
ChangeLoggingTimestampedWindowBytesStoreWithHeaders);
}
@Test
- public void shouldCreateTimestampedStoreWithOnWindowClose() {
+ public void shouldCreateHeadersStoreWithOnWindowClose() {
emitStrategy = EmitStrategy.onWindowClose();
final MaterializedInternal<String, String, WindowStore<Bytes, byte[]>>
materialized =
new MaterializedInternal<>(Materialized.<String, String,
WindowStore<Bytes, byte[]>>as("store")
.withCachingDisabled(), nameProvider, STORE_PREFIX);
- final TimestampedWindowStore<String, String> store =
getTimestampedStore(materialized);
-
- final WrappedStateStore logging = (WrappedStateStore)
((WrappedStateStore) store).wrapped();
- assertInstanceOf(MeteredTimestampedWindowStore.class, store);
- assertInstanceOf(ChangeLoggingTimestampedWindowBytesStore.class,
logging);
- }
-
- @Test
- public void
shouldCreateTimestampedStoreWithOnWindowCloseAndCachingEnabled() {
- emitStrategy = EmitStrategy.onWindowClose();
-
- final MaterializedInternal<String, String, WindowStore<Bytes, byte[]>>
materialized =
- new MaterializedInternal<>(Materialized.as("store"), nameProvider,
STORE_PREFIX);
-
- final TimestampedWindowStore<String, String> store =
getTimestampedStore(materialized);
-
- final StateStore wrapped = ((WrappedStateStore) store).wrapped();
- assertInstanceOf(TimeOrderedCachingWindowStore.class, wrapped);
- }
-
- @Test
- public void shouldCreateHeadersAwareStoreWithOnWindowClose() {
- doReturn("headers")
-
.when(streamsConfig).getString(StreamsConfig.DSL_STORE_FORMAT_CONFIG);
- emitStrategy = EmitStrategy.onWindowClose();
-
- final MaterializedInternal<String, String, WindowStore<Bytes, byte[]>>
materialized =
- new MaterializedInternal<>(Materialized.<String, String,
WindowStore<Bytes, byte[]>>as("store")
- .withCachingDisabled(), nameProvider, STORE_PREFIX);
-
- final TimestampedWindowStoreWithHeaders<String, String> store =
getHeadersAwareStore(materialized);
+ final TimestampedWindowStoreWithHeaders<String, String> store =
getHeadersStore(materialized);
final WrappedStateStore logging = (WrappedStateStore)
((WrappedStateStore) store).wrapped();
assertInstanceOf(MeteredTimestampedWindowStoreWithHeaders.class,
store);
assertInstanceOf(ChangeLoggingTimestampedWindowBytesStoreWithHeaders.class,
logging);
}
@Test
- public void
shouldCreateHeadersAwareStoreWithOnWindowCloseAndLoggingDisabled() {
- doReturn("headers")
-
.when(streamsConfig).getString(StreamsConfig.DSL_STORE_FORMAT_CONFIG);
- emitStrategy = EmitStrategy.onWindowClose();
-
- final MaterializedInternal<String, String, WindowStore<Bytes, byte[]>>
materialized = new MaterializedInternal<>(
- Materialized.<String, String, WindowStore<Bytes,
byte[]>>as("store").withLoggingDisabled(), nameProvider, STORE_PREFIX
- );
-
- final TimestampedWindowStoreWithHeaders<String, String> store =
getHeadersAwareStore(materialized);
-
- final StateStore wrapped = ((WrappedStateStore) store).wrapped();
- assertFalse(wrapped instanceof
ChangeLoggingTimestampedWindowBytesStoreWithHeaders);
- }
-
- @Test
- public void
shouldCreateHeadersAwareStoreWithOnWindowCloseAndCachingEnabled() {
- doReturn("headers")
-
.when(streamsConfig).getString(StreamsConfig.DSL_STORE_FORMAT_CONFIG);
+ public void shouldCreateHeadersStoreWithOnWindowCloseAndCachingEnabled() {
emitStrategy = EmitStrategy.onWindowClose();
final MaterializedInternal<String, String, WindowStore<Bytes, byte[]>>
materialized =
new MaterializedInternal<>(Materialized.as("store"), nameProvider,
STORE_PREFIX);
- final TimestampedWindowStoreWithHeaders<String, String> store =
getHeadersAwareStore(materialized);
+ final TimestampedWindowStoreWithHeaders<String, String> store =
getHeadersStore(materialized);
final StateStore wrapped = ((WrappedStateStore) store).wrapped();
- assertInstanceOf(TimeOrderedCachingWindowStore.class, wrapped);
Review Comment:
Seems this is still an open question? Let's me sure we don't drop it... \cc
@frankvicky @bbejeck
Sound related to the fix that Frank is working on? So I assume we can merge
this PR (including this change), and this test starts failing with Frank's fix
and we need to update the test on the fix-PR?
##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableTransformValues.java:
##########
@@ -192,9 +192,11 @@ private ValueAndTimestamp<VOut> transformValue(final K
key, final ValueAndTimest
new RecordHeaders()
));
- final ValueAndTimestamp<VOut> result = ValueAndTimestamp.make(
- valueTransformer.transform(key,
getValueOrNull(valueAndTimestamp)),
- valueAndTimestamp == null ? UNKNOWN :
valueAndTimestamp.timestamp());
+ final ValueTimestampHeaders<VOut> result =
ValueTimestampHeaders.make(
+ valueTransformer.transform(key,
getValueOrNull(valueTimestampHeaders)),
+ valueTimestampHeaders == null ? UNKNOWN :
valueTimestampHeaders.timestamp(),
+ valueTimestampHeaders == null ? null :
valueTimestampHeaders.headers()
Review Comment:
Sound good -- wondering if we should use `context.headers()` instead of
`null` if `valueTimestampHeaders == null` ? Similar question as for
`KTable#mapValues`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]