guozhangwang commented on a change in pull request #10331:
URL: https://github.com/apache/kafka/pull/10331#discussion_r599952559
##########
File path: streams/src/main/java/org/apache/kafka/streams/state/Stores.java
##########
@@ -37,6 +37,7 @@
import static
org.apache.kafka.streams.internals.ApiUtils.prepareMillisCheckFailMsgPrefix;
import static
org.apache.kafka.streams.internals.ApiUtils.validateMillisecondDuration;
+import static
org.apache.kafka.streams.state.internals.RocksDbWindowBytesStoreSupplier.WindowStoreTypes;
Review comment:
Since `Stores` are public APIs, we would need to file a KIP in order to
change it. On the other hand, `Stores` is used by users to customize their
materialized state stores, while for KAFKA-10847 we can just hard-code which
types of stores to use not through `Stores` factory.
##########
File path:
streams/src/main/java/org/apache/kafka/streams/state/internals/Segment.java
##########
@@ -25,4 +26,35 @@
void destroy() throws IOException;
+ /**
+ * INTERNAL USE ONLY - Move this method to ReadOnlyKeyValueStore to make
it a public API
+ *
+ * Get an iterator over a given range of keys. This iterator must be
closed after use.
+ * The returned iterator must be safe from {@link
java.util.ConcurrentModificationException}s
+ * and must not return null values.
+ * Order is not guaranteed as bytes lexicographical ordering might not
represent key order.
+ *
+ * @param from The first key that could be in the range, where iteration
starts from.
+ * @param to The last key that could be in the range, where iteration
ends.
+ * @param prefixScan If true, then it iterates using the common key
prefixes.
+ * @return The iterator for this range, from smallest to largest bytes.
+ * @throws NullPointerException If null is used for from or to.*
+ */
+ KeyValueIterator<Bytes, byte[]> range(Bytes from, Bytes to, boolean
prefixScan);
+
+ /**
+ * INTERNAL USE ONLY - - Move this method to ReadOnlyKeyValueStore to make
it a public API
+ *
+ * Get a reverse iterator over a given range of keys. This iterator must
be closed after use.
+ * The returned iterator must be safe from {@link
java.util.ConcurrentModificationException}s
+ * and must not return null values.
+ * Order is not guaranteed as bytes lexicographical ordering might not
represent key order.
+ *
+ * @param from The first key that could be in the range, where iteration
ends.
+ * @param to The last key that could be in the range, where iteration
starts from.
+ * @param prefixScan If true, then it iterates using the common key
prefixes.
+ * @return The reverse iterator for this range, from largest to smallest
key bytes.
+ * @throws NullPointerException If null is used for from or to.
+ */
+ KeyValueIterator<Bytes, byte[]> reverseRange(Bytes from, Bytes to, boolean
prefixScan);
Review comment:
Related to the other comment: since in stream-stream join we do not
really need reverse-prefixScan, just adding a forward `prefixScan(..)`
interface may be better.
##########
File path:
streams/src/main/java/org/apache/kafka/streams/state/internals/TimeOrderedWindowStoreBuilder.java
##########
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
+import org.apache.kafka.streams.state.WindowStore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Objects;
+
+public class TimeOrderedWindowStoreBuilder<K, V> extends
AbstractStoreBuilder<K, V, WindowStore<K, V>> {
Review comment:
If we do not allow such stores to be created from `Stores`, maybe we
could remove this class as well.
##########
File path:
streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentIterator.java
##########
@@ -82,9 +93,9 @@ public boolean hasNext() {
}
} else {
if (forward) {
- currentIterator = currentSegment.range(from, to);
+ currentIterator = currentSegment.range(from, to,
prefixScan);
Review comment:
Instead of passing this boolean around and creating overloaded
interfaces, could we:
* add `prefixScan` in Segment / SegmentIterator.
* add a different `RocksDBPrefixIterator` besides `RocksDBRangeIterator`
which would be used in `prefixScan`, and then the `RocksDBStore#prefixScan` to
be used in SegmentIterator.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]