guozhangwang commented on a change in pull request #10331:
URL: https://github.com/apache/kafka/pull/10331#discussion_r599952559



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/Stores.java
##########
@@ -37,6 +37,7 @@
 
 import static 
org.apache.kafka.streams.internals.ApiUtils.prepareMillisCheckFailMsgPrefix;
 import static 
org.apache.kafka.streams.internals.ApiUtils.validateMillisecondDuration;
+import static 
org.apache.kafka.streams.state.internals.RocksDbWindowBytesStoreSupplier.WindowStoreTypes;

Review comment:
       Since `Stores` are public APIs, we would need to file a KIP in order to 
change it. On the other hand, `Stores` is used by users to customize their 
materialized state stores, while for KAFKA-10847 we can just hard-code which 
types of stores to use not through `Stores` factory.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/Segment.java
##########
@@ -25,4 +26,35 @@
 
     void destroy() throws IOException;
 
+    /**
+     * INTERNAL USE ONLY - Move this method to ReadOnlyKeyValueStore to make 
it a public API
+     *
+     * Get an iterator over a given range of keys. This iterator must be 
closed after use.
+     * The returned iterator must be safe from {@link 
java.util.ConcurrentModificationException}s
+     * and must not return null values.
+     * Order is not guaranteed as bytes lexicographical ordering might not 
represent key order.
+     *
+     * @param from The first key that could be in the range, where iteration 
starts from.
+     * @param to   The last key that could be in the range, where iteration 
ends.
+     * @param prefixScan If true, then it iterates using the common key 
prefixes.
+     * @return The iterator for this range, from smallest to largest bytes.
+     * @throws NullPointerException       If null is used for from or to.*
+     */
+    KeyValueIterator<Bytes, byte[]> range(Bytes from, Bytes to, boolean 
prefixScan);
+
+    /**
+     * INTERNAL USE ONLY - - Move this method to ReadOnlyKeyValueStore to make 
it a public API
+     *
+     * Get a reverse iterator over a given range of keys. This iterator must 
be closed after use.
+     * The returned iterator must be safe from {@link 
java.util.ConcurrentModificationException}s
+     * and must not return null values.
+     * Order is not guaranteed as bytes lexicographical ordering might not 
represent key order.
+     *
+     * @param from The first key that could be in the range, where iteration 
ends.
+     * @param to   The last key that could be in the range, where iteration 
starts from.
+     * @param prefixScan If true, then it iterates using the common key 
prefixes.
+     * @return The reverse iterator for this range, from largest to smallest 
key bytes.
+     * @throws NullPointerException       If null is used for from or to.
+     */
+    KeyValueIterator<Bytes, byte[]> reverseRange(Bytes from, Bytes to, boolean 
prefixScan);

Review comment:
       Related to the other comment: since in stream-stream join we do not 
really need reverse-prefixScan, just adding a forward `prefixScan(..)` 
interface may be better.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/TimeOrderedWindowStoreBuilder.java
##########
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
+import org.apache.kafka.streams.state.WindowStore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Objects;
+
+public class TimeOrderedWindowStoreBuilder<K, V> extends 
AbstractStoreBuilder<K, V, WindowStore<K, V>> {

Review comment:
       If we do not allow such stores to be created from `Stores`, maybe we 
could remove this class as well.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentIterator.java
##########
@@ -82,9 +93,9 @@ public boolean hasNext() {
                     }
                 } else {
                     if (forward) {
-                        currentIterator = currentSegment.range(from, to);
+                        currentIterator = currentSegment.range(from, to, 
prefixScan);

Review comment:
       Instead of passing this boolean around and creating overloaded 
interfaces, could we:
   
   * add `prefixScan` in Segment / SegmentIterator.
   * add a different `RocksDBPrefixIterator` besides `RocksDBRangeIterator` 
which would be used in `prefixScan`, and then the `RocksDBStore#prefixScan` to 
be used in SegmentIterator.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to