guozhangwang commented on a change in pull request #11252: URL: https://github.com/apache/kafka/pull/11252#discussion_r707810706
########## File path: streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedTimestampedKeyAndJoinSideSerializerTest.java ########## @@ -24,49 +24,49 @@ import static org.hamcrest.Matchers.notNullValue; import static org.junit.Assert.assertThrows; -public class KeyAndJoinSideSerializerTest { +public class TimestampedTimestampedKeyAndJoinSideSerializerTest { Review comment: Ack. ########## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/ListValueStore.java ########## @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.NoSuchElementException; + +import org.apache.kafka.common.utils.AbstractIterator; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.Serde; + +/** + * A wrapper key-value store that serializes the record values bytes as a list. + * As a result put calls would be interpreted as a get-append-put to the underlying RocksDB store. Review comment: Ack. ########## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimeOrderedKeyValueBytesStore.java ########## @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.state.KeyValueStore; + +public class ChangeLoggingTimeOrderedKeyValueBytesStore extends ChangeLoggingKeyValueBytesStore { + + ChangeLoggingTimeOrderedKeyValueBytesStore(final KeyValueStore<Bytes, byte[]> inner) { + super(inner); + } + + @Override + public void put(final Bytes key, + final byte[] value) { + wrapped().put(key, value); + // we need to log the new value, which is different from the put value; Review comment: Ack ########## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimeOrderedKeyValueBytesStore.java ########## @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.state.KeyValueStore; + +public class ChangeLoggingTimeOrderedKeyValueBytesStore extends ChangeLoggingKeyValueBytesStore { Review comment: Good point :) ########## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java ########## @@ -209,37 +208,37 @@ private void emitNonJoinedOuterRecords(final WindowStore<KeyAndJoinSide<K>, Left // reset to MAX_VALUE in case the store is empty sharedTimeTracker.minTime = Long.MAX_VALUE; - try (final KeyValueIterator<Windowed<KeyAndJoinSide<K>>, LeftOrRightValue> it = store.all()) { + try (final KeyValueIterator<TimestampedKeyAndJoinSide<K>, LeftOrRightValue<V1, V2>> it = store.all()) { while (it.hasNext()) { - final KeyValue<Windowed<KeyAndJoinSide<K>>, LeftOrRightValue> record = it.next(); + final KeyValue<TimestampedKeyAndJoinSide<K>, LeftOrRightValue<V1, V2>> record = it.next(); - final Windowed<KeyAndJoinSide<K>> windowedKey = record.key; - final LeftOrRightValue value = record.value; - sharedTimeTracker.minTime = windowedKey.window().start(); + final TimestampedKeyAndJoinSide<K> timestampedKeyAndJoinSide = record.key; + final LeftOrRightValue<V1, V2> value = record.value; + final K key = timestampedKeyAndJoinSide.getKey(); + final long timestamp = timestampedKeyAndJoinSide.getTimestamp(); + sharedTimeTracker.minTime = timestamp; // Skip next records if window has not closed - if (windowedKey.window().start() + joinAfterMs + joinGraceMs >= sharedTimeTracker.streamTime) { + if (timestamp + joinAfterMs + joinGraceMs >= sharedTimeTracker.streamTime) { break; } - final K key = windowedKey.key().getKey(); - final long time = windowedKey.window().start(); - final R nullJoinedValue; if (isLeftSide) { nullJoinedValue = joiner.apply(key, - (V1) value.getLeftValue(), - (V2) value.getRightValue()); + value.getLeftValue(), + value.getRightValue()); Review comment: We actually only need to case in the `else` branch, since there we are switching the left/right sides. I think it was originally added in the if branch just for symmetry. ########## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java ########## @@ -280,85 +279,51 @@ private void assertUniqueStoreNames(final WindowBytesStoreSupplier supplier, } } - @SuppressWarnings("unchecked") - private <K, V1, V2> StoreBuilder<WindowStore<KeyAndJoinSide<K>, LeftOrRightValue<V1, V2>>> sharedOuterJoinWindowStoreBuilder(final JoinWindows windows, - final StreamJoinedInternal<K, V1, V2> streamJoinedInternal, - final String joinThisGeneratedName) { + private <K, V1, V2> StoreBuilder<KeyValueStore<TimestampedKeyAndJoinSide<K>, LeftOrRightValue<V1, V2>>> sharedOuterJoinWindowStoreBuilder(final JoinWindows windows, + final StreamJoinedInternal<K, V1, V2> streamJoinedInternal, + final String joinThisGeneratedName) { final boolean persistent = streamJoinedInternal.thisStoreSupplier() == null || streamJoinedInternal.thisStoreSupplier().get().persistent(); - final String storeName = buildOuterJoinWindowStoreName(streamJoinedInternal, joinThisGeneratedName); - - final KeyAndJoinSideSerde keyAndJoinSideSerde = new KeyAndJoinSideSerde<>(streamJoinedInternal.keySerde()); - final LeftOrRightValueSerde leftOrRightValueSerde = new LeftOrRightValueSerde(streamJoinedInternal.valueSerde(), streamJoinedInternal.otherValueSerde()); - - final StoreBuilder<WindowStore<KeyAndJoinSide<K>, LeftOrRightValue<V1, V2>>> builder; - if (persistent) { - builder = new TimeOrderedWindowStoreBuilder<KeyAndJoinSide<K>, LeftOrRightValue<V1, V2>>( - persistentTimeOrderedWindowStore( - storeName + "-store", - Duration.ofMillis(windows.size() + windows.gracePeriodMs()), - Duration.ofMillis(windows.size()) - ), - keyAndJoinSideSerde, - leftOrRightValueSerde, - Time.SYSTEM - ); - } else { - builder = Stores.windowStoreBuilder( - Stores.inMemoryWindowStore( - storeName + "-store", - Duration.ofMillis(windows.size() + windows.gracePeriodMs()), - Duration.ofMillis(windows.size()), - false - ), - keyAndJoinSideSerde, - leftOrRightValueSerde - ); - } - - if (streamJoinedInternal.loggingEnabled()) { - builder.withLoggingEnabled(streamJoinedInternal.logConfig()); - } else { - builder.withLoggingDisabled(); - } - - return builder; - } + final String storeName = buildOuterJoinWindowStoreName(streamJoinedInternal, joinThisGeneratedName) + "-store"; Review comment: No, originally it was added later (see https://github.com/apache/kafka/pull/11252/files/22663b04258efbc7f770ec7c7feca46bb5310340#diff-9ce43046fdef1233ab762e728abd1d3d44d7c270b28dcf6b63aa31a93a30af07L308), but after the refactoring we only added it once now, before the `persistent` flag branch. ########## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/ListValueStore.java ########## @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.NoSuchElementException; + +import org.apache.kafka.common.utils.AbstractIterator; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.Serde; + +/** + * A wrapper key-value store that serializes the record values bytes as a list. + * As a result put calls would be interpreted as a get-append-put to the underlying RocksDB store. + * Range iterators would also flatten the value lists and return the values one-by-one. + * + * This store is used for cases where we do not want to de-duplicate values of the same keys but want to retain all such values. + */ +@SuppressWarnings("unchecked") +public class ListValueStore + extends WrappedStateStore<KeyValueStore<Bytes, byte[]>, Bytes, byte[]> + implements KeyValueStore<Bytes, byte[]> { + + static private final Serde<List<byte[]>> LIST_SERDE = Serdes.ListSerde(ArrayList.class, Serdes.ByteArray()); + + ListValueStore(final KeyValueStore<Bytes, byte[]> bytesStore) { + super(bytesStore); + } + + @Override + public void put(final Bytes key, final byte[] value) { + // if the value is null we can skip the get and blind delete + if (value == null) { + wrapped().put(key, null); + } else { + final byte[] oldValue = wrapped().get(key); + + if (oldValue == null) { + wrapped().put(key, LIST_SERDE.serializer().serialize(null, Collections.singletonList(value))); + } else { + final List<byte[]> list = LIST_SERDE.deserializer().deserialize(null, oldValue); + list.add(value); + + wrapped().put(key, LIST_SERDE.serializer().serialize(null, list)); + } + } + } + + @Override + public byte[] putIfAbsent(final Bytes key, final byte[] value) { + throw new UnsupportedOperationException("putIfAbsent not supported"); + } + + @Override + public void putAll(final List<KeyValue<Bytes, byte[]>> entries) { + throw new UnsupportedOperationException("putAll not supported"); + } + + @Override + public byte[] delete(final Bytes key) { + // we intentionally disable delete calls since the returned bytes would + // represent a list, not a single value; we need to have a new API for delete if we do need it + throw new UnsupportedOperationException("delete not supported"); Review comment: Yeah maybe I'd need to make that a bit clearer: the reason I disallow caller using this call intentionally is to avoid the caller mistakenly trying to interpret/deserialize the returned byte as a single value (also note that I did not implement the deserialization of the list value in get either). Since for stream-stream join we only need the `put` and `range` calls (using `put(null)` for delete too) only, not implementing the `get/delete` is fine --- if we want to support that all the way up to the caller in KStreamImplJoin, then we need a lot more work. ########## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/ListValueStore.java ########## @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.NoSuchElementException; + +import org.apache.kafka.common.utils.AbstractIterator; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.Serde; + +/** + * A wrapper key-value store that serializes the record values bytes as a list. + * As a result put calls would be interpreted as a get-append-put to the underlying RocksDB store. + * Range iterators would also flatten the value lists and return the values one-by-one. + * + * This store is used for cases where we do not want to de-duplicate values of the same keys but want to retain all such values. + */ +@SuppressWarnings("unchecked") +public class ListValueStore + extends WrappedStateStore<KeyValueStore<Bytes, byte[]>, Bytes, byte[]> + implements KeyValueStore<Bytes, byte[]> { + + static private final Serde<List<byte[]>> LIST_SERDE = Serdes.ListSerde(ArrayList.class, Serdes.ByteArray()); + + ListValueStore(final KeyValueStore<Bytes, byte[]> bytesStore) { + super(bytesStore); + } + + @Override + public void put(final Bytes key, final byte[] value) { + // if the value is null we can skip the get and blind delete + if (value == null) { + wrapped().put(key, null); + } else { + final byte[] oldValue = wrapped().get(key); + + if (oldValue == null) { + wrapped().put(key, LIST_SERDE.serializer().serialize(null, Collections.singletonList(value))); + } else { + final List<byte[]> list = LIST_SERDE.deserializer().deserialize(null, oldValue); + list.add(value); + + wrapped().put(key, LIST_SERDE.serializer().serialize(null, list)); + } + } + } + + @Override + public byte[] putIfAbsent(final Bytes key, final byte[] value) { + throw new UnsupportedOperationException("putIfAbsent not supported"); + } + + @Override + public void putAll(final List<KeyValue<Bytes, byte[]>> entries) { + throw new UnsupportedOperationException("putAll not supported"); + } + + @Override + public byte[] delete(final Bytes key) { + // we intentionally disable delete calls since the returned bytes would + // represent a list, not a single value; we need to have a new API for delete if we do need it + throw new UnsupportedOperationException("delete not supported"); Review comment: Note that in the `KStreamKStreamJoinProcessor` the shared store is still defined as `KeyValueStore<TimestampedKeyAndJoinSide<K>, LeftOrRightValue<V1, V2>>`, hence calling its `get/delete` calls expecting to return a single `LeftOrRightValue<V1, V2>` would be doomed to fail. Thinking about that once more, I think it's better do disable `get` as well for now. ########## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/ListValueStore.java ########## @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.NoSuchElementException; + +import org.apache.kafka.common.utils.AbstractIterator; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.Serde; + +/** + * A wrapper key-value store that serializes the record values bytes as a list. + * As a result put calls would be interpreted as a get-append-put to the underlying RocksDB store. + * Range iterators would also flatten the value lists and return the values one-by-one. + * + * This store is used for cases where we do not want to de-duplicate values of the same keys but want to retain all such values. + */ +@SuppressWarnings("unchecked") +public class ListValueStore + extends WrappedStateStore<KeyValueStore<Bytes, byte[]>, Bytes, byte[]> + implements KeyValueStore<Bytes, byte[]> { + + static private final Serde<List<byte[]>> LIST_SERDE = Serdes.ListSerde(ArrayList.class, Serdes.ByteArray()); + + ListValueStore(final KeyValueStore<Bytes, byte[]> bytesStore) { + super(bytesStore); + } + + @Override + public void put(final Bytes key, final byte[] value) { + // if the value is null we can skip the get and blind delete + if (value == null) { + wrapped().put(key, null); + } else { + final byte[] oldValue = wrapped().get(key); + + if (oldValue == null) { + wrapped().put(key, LIST_SERDE.serializer().serialize(null, Collections.singletonList(value))); + } else { + final List<byte[]> list = LIST_SERDE.deserializer().deserialize(null, oldValue); + list.add(value); + + wrapped().put(key, LIST_SERDE.serializer().serialize(null, list)); + } + } + } + + @Override + public byte[] putIfAbsent(final Bytes key, final byte[] value) { + throw new UnsupportedOperationException("putIfAbsent not supported"); + } + + @Override + public void putAll(final List<KeyValue<Bytes, byte[]>> entries) { + throw new UnsupportedOperationException("putAll not supported"); + } + + @Override + public byte[] delete(final Bytes key) { + // we intentionally disable delete calls since the returned bytes would + // represent a list, not a single value; we need to have a new API for delete if we do need it + throw new UnsupportedOperationException("delete not supported"); + } + + @Override + public byte[] get(final Bytes key) { + return wrapped().get(key); + } + + @Override + public KeyValueIterator<Bytes, byte[]> range(final Bytes from, final Bytes to) { + throw new UnsupportedOperationException("range not supported"); + } + + @Override + public KeyValueIterator<Bytes, byte[]> all() { + return new WrappedStoreIterator(wrapped().all()); + } + + @Override + public long approximateNumEntries() { + return wrapped().approximateNumEntries(); + } + + private static class WrappedStoreIterator extends AbstractIterator<KeyValue<Bytes, byte[]>> Review comment: Ack. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org