guozhangwang commented on a change in pull request #10331:
URL: https://github.com/apache/kafka/pull/10331#discussion_r608220386



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/TimeOrderedKeySchema.java
##########
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.kstream.Window;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.internals.TimeWindow;
+import org.apache.kafka.streams.state.StateSerdes;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+
+/**
+ * A {@link RocksDBSegmentedBytesStore.KeySchema} to serialize/deserialize a 
RocksDB store
+ * key into a schema combined of (time,seq,key). This key schema is more 
efficient when doing
+ * range queries between a time interval. For key range queries better use 
{@link WindowKeySchema}.
+ */
+public class TimeOrderedKeySchema implements 
RocksDBSegmentedBytesStore.KeySchema {
+    private static final Logger LOG = 
LoggerFactory.getLogger(TimeOrderedKeySchema.class);
+
+    private static final int TIMESTAMP_SIZE = 8;
+    private static final int SEQNUM_SIZE = 4;
+    private static final int PREFIX_SIZE = TIMESTAMP_SIZE + SEQNUM_SIZE;
+
+    @Override
+    public Bytes upperRange(final Bytes key, final long to) {
+        final byte[] maxPrefix = ByteBuffer.allocate(PREFIX_SIZE)
+            .putLong(to)
+            .putInt(Integer.MAX_VALUE)
+            .array();
+
+        return OrderedBytes.range(maxPrefix, key);
+    }
+
+    @Override
+    public Bytes lowerRange(final Bytes key, final long from) {
+        final byte[] minPrefix = ByteBuffer.allocate(PREFIX_SIZE)
+            .putLong(from)
+            .putInt(0)
+            .array();
+
+        return OrderedBytes.range(minPrefix, key);
+    }
+
+    @Override
+    public Bytes upperRangeFixedSize(final Bytes key, final long to) {
+        return TimeOrderedKeySchema.toStoreKeyBinary(key, to, 
Integer.MAX_VALUE);
+    }
+
+    @Override
+    public Bytes lowerRangeFixedSize(final Bytes key, final long from) {
+        return TimeOrderedKeySchema.toStoreKeyBinary(key, Math.max(0, from), 
0);
+    }
+
+    @Override
+    public long segmentTimestamp(final Bytes key) {
+        return TimeOrderedKeySchema.extractStoreTimestamp(key.get());
+    }
+
+    @Override
+    public HasNextCondition hasNextCondition(final Bytes binaryKeyFrom, final 
Bytes binaryKeyTo, final long from, final long to) {

Review comment:
       As discussed before, for `fetchAll(final long timeFrom, final long 
timeTo)` we actually do not need to trigger this function at all since we know 
it should always return true.
   
   I think we can either 1) claim that `fetchAll(final long timeFrom, final 
long timeTo)` is also not optimal and people should avoid using it with the new 
schema, or 2) try to still keep that impl as optimal as possible, i.e. in 
`AbstractRocksDBSegmentedBytesStore#fetchAll` we have a condition like this:
   
   ```
   return keySchema instanceOf TimeOrderedKeySchema ?
               return new SegmentIterator<>(
               searchSpace.iterator(),
               (....) -> true,
               TimeOrderedKeySchema.toStoreKeyBinary(0, from, 0),
               TimeOrderedKeySchema.toStoreKeyBinary(0, to + 1, 
Integer.MAX_VALUE),
               true) : // else return the normal implementation
   ```

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/TimeOrderedKeySchema.java
##########
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.kstream.Window;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.internals.TimeWindow;
+import org.apache.kafka.streams.state.StateSerdes;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+
+/**
+ * A {@link RocksDBSegmentedBytesStore.KeySchema} to serialize/deserialize a 
RocksDB store
+ * key into a schema combined of (time,seq,key). This key schema is more 
efficient when doing
+ * range queries between a time interval. For key range queries better use 
{@link WindowKeySchema}.
+ */
+public class TimeOrderedKeySchema implements 
RocksDBSegmentedBytesStore.KeySchema {
+    private static final Logger LOG = 
LoggerFactory.getLogger(TimeOrderedKeySchema.class);
+
+    private static final int TIMESTAMP_SIZE = 8;
+    private static final int SEQNUM_SIZE = 4;
+    private static final int PREFIX_SIZE = TIMESTAMP_SIZE + SEQNUM_SIZE;
+
+    @Override
+    public Bytes upperRange(final Bytes key, final long to) {
+        final byte[] maxPrefix = ByteBuffer.allocate(PREFIX_SIZE)
+            .putLong(to)
+            .putInt(Integer.MAX_VALUE)
+            .array();
+
+        return OrderedBytes.range(maxPrefix, key);
+    }
+
+    @Override
+    public Bytes lowerRange(final Bytes key, final long from) {
+        final byte[] minPrefix = ByteBuffer.allocate(PREFIX_SIZE)
+            .putLong(from)
+            .putInt(0)
+            .array();
+
+        return OrderedBytes.range(minPrefix, key);
+    }
+
+    @Override
+    public Bytes upperRangeFixedSize(final Bytes key, final long to) {

Review comment:
       We know these functions are triggered by `fetch(final Bytes key, final 
long timeFrom, final long timeTo)` and following the default implementation it 
is sub-optimal since we will range over a large scan and then drop a lot of the 
records. Let's add a oneliner comment on top of them referring readers to the 
head javadoc of this schema class that they should try avoid ever calling these 
functions.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/OrderedBytes.java
##########
@@ -65,4 +65,16 @@ static Bytes lowerRange(final Bytes key, final byte[] 
minSuffix) {
                 .array()
         );
     }
+
+    static Bytes range(final byte[] prefix, final Bytes key) {

Review comment:
       Please let me know if you want to keep it as is -- which is fine.
   
   But I think we should try to avoid byte array copies as much as possible: 
right now we first do a byte array for the prefix, in the schema, and then 
allocate a longer byte array copying the prefix, and then the key bytes. We 
can, instead, moving this logic directly to the schema where we just create a 
single array once, and put in the timestamp / sequence / key correspondingly

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedWindowStore.java
##########
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.WindowStore;
+import org.apache.kafka.streams.state.WindowStoreIterator;
+
+/**
+ * A persistent (time-key)-value store based on RocksDB.
+ */
+public class RocksDBTimeOrderedWindowStore

Review comment:
       Yeah that's a good point about unit test complexities. Let's just 
emphasize that key ranges are sub-optimally implemented with the new schema 
then (see my other comment below).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to