mjsax commented on code in PR #14626: URL: https://github.com/apache/kafka/pull/14626#discussion_r1411475602
########## streams/src/main/java/org/apache/kafka/streams/query/MultiVersionedKeyQuery.java: ########## @@ -130,9 +129,9 @@ public Optional<Instant> toTime() { /** * The order of the returned records by timestamp. - * @return true if the query returns records in ascending order of timestamps + * @return UNORDERED, ASCENDING, or DESCENDING if the query returns records in an unordered, ascending, or descending order of timestamps. Review Comment: We should not list `UNORDERED, ASCENDING, or DESCENDING` but phrase it generically -- if we change the enum I am sure we will not remember to update it here introducing incorrect docs ########## streams/src/main/java/org/apache/kafka/streams/state/VersionedRecord.java: ########## @@ -48,7 +49,7 @@ public VersionedRecord(final V value, final long timestamp) { * @param timestamp The timestamp * @param validTo The exclusive upper bound of the validity interval */ - public VersionedRecord(final V value, final long timestamp, final long validTo) { + public VersionedRecord(final V value, final long timestamp, final Optional<Long> validTo) { Review Comment: Should we pass in an `Optional<Long> validTo`? Seems if there is no `validTo`, we would use `VersionedRecord(final V value, final long timestamp)` instead of passing an `Optiona.empty()`, and thus `validTo` here should just be `long`, and we assign `this.validTo = Optinal.of(validTo)`? ########## streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStore.java: ########## @@ -23,6 +23,7 @@ import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency; +import java.security.InvalidParameterException; Review Comment: I don't think that's the right exception (cf the package, and comment below). ########## streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalSegmentIterator.java: ########## @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.query.ResultOrder; +import org.apache.kafka.streams.state.VersionedRecord; +import org.apache.kafka.streams.state.VersionedRecordIterator; +import org.rocksdb.Snapshot; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.ListIterator; +import java.util.Optional; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +public class LogicalSegmentIterator implements VersionedRecordIterator { + protected final ListIterator<LogicalKeyValueSegment> segmentIterator; + private final Bytes key; + private final Long fromTime; + private final Long toTime; + private final ResultOrder order; + protected ListIterator<VersionedRecord<byte[]>> iterator; Review Comment: Why is this one `protected` but not `private`? ########## streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java: ########## @@ -263,6 +266,78 @@ public VersionedRecord<byte[]> get(final Bytes key, final long asOfTimestamp) { return null; } + public VersionedRecordIterator<byte[]> get(final Bytes key, final long fromTimestamp, final long toTimestamp, final boolean isAscending) { Review Comment: Given it's one RocksDB, this comment seems to be void. ########## streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java: ########## @@ -263,6 +266,78 @@ public VersionedRecord<byte[]> get(final Bytes key, final long asOfTimestamp) { return null; } + public VersionedRecordIterator<byte[]> get(final Bytes key, final long fromTimestamp, final long toTimestamp, final boolean isAscending) { + + Objects.requireNonNull(key, "key cannot be null"); + validateStoreOpen(); + + final List<VersionedRecord<byte[]>> queryResults = new ArrayList<>(); + + if (toTimestamp < observedStreamTime - historyRetention) { + // history retention exceeded. we still check the latest value store in case the + // latest record version satisfies the timestamp bound, in which case it should + // still be returned (i.e., the latest record version per key never expires). + final byte[] rawLatestValueAndTimestamp = latestValueStore.get(key); + if (rawLatestValueAndTimestamp != null) { + final long recordTimestamp = LatestValueFormatter.getTimestamp(rawLatestValueAndTimestamp); + if (recordTimestamp <= toTimestamp) { + // latest value satisfies timestamp bound + queryResults.add(new VersionedRecord<>(LatestValueFormatter.getValue(rawLatestValueAndTimestamp), recordTimestamp)); + } + } + + // history retention has elapsed and the latest record version (if present) does + // not satisfy the timestamp bound. return null for predictability, even if data + // is still present in segments. + if (queryResults.size() == 0) { + LOG.warn("Returning null for expired get."); + } + return new VersionedRecordIteratorImpl<>(queryResults.listIterator()); + } else { + // take a RocksDB snapshot to return the segments content at the query time (in order to guarantee consistency) + final Snapshot snapshot = latestValueStore.getSnapshot(); + // first check the latest value store + final byte[] rawLatestValueAndTimestamp = latestValueStore.get(key, snapshot); + if (rawLatestValueAndTimestamp != null) { + final long recordTimestamp = LatestValueFormatter.getTimestamp(rawLatestValueAndTimestamp); + if (recordTimestamp <= toTimestamp) { + queryResults.add(new VersionedRecord<>(LatestValueFormatter.getValue(rawLatestValueAndTimestamp), recordTimestamp)); + } + } + + // check segment stores + // consider the search lower bound as -INF (LONG.MIN_VALUE) to find the record that has been inserted before the {@code fromTimestamp} + // but is still valid in query specified time interval. + final List<LogicalKeyValueSegment> segments = segmentStores.segments(Long.MIN_VALUE, toTimestamp, false); + for (final LogicalKeyValueSegment segment : segments) { + final byte[] rawSegmentValue = segment.get(key, snapshot); Review Comment: Ah. Thanks for clarifying. My bad. ########## streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java: ########## @@ -136,11 +139,11 @@ public long put(final Bytes key, final byte[] value, final long timestamp) { observedStreamTime = Math.max(observedStreamTime, timestamp); final long foundTs = doPut( - versionedStoreClient, - observedStreamTime, - key, - value, - timestamp + versionedStoreClient, Review Comment: nit: avoid unnecessary reformation / intention changes -- similar below. 80% of diff in this file is just adding/removing spaces... very noisy ########## streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java: ########## @@ -1016,21 +972,21 @@ private <T extends VersionedStoreSegment> long finishPut( * Bytes layout for the value portion of rows stored in the latest value store. The layout is * a fixed-size timestamp concatenated with the actual record value. */ - private static final class LatestValueFormatter { + protected static final class LatestValueFormatter { Review Comment: It seem package-private would be sufficient (instead of `protected`)? ########## streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStore.java: ########## @@ -244,30 +246,37 @@ private <R> QueryResult<R> runVersionedKeyQuery(final Query<R> query, return result; } - @SuppressWarnings("unchecked") - private <R> QueryResult<R> runMultiVersionedKeyQuery(final Query<R> query, final PositionBound positionBound, final QueryConfig config) { - final QueryResult<R> result; - final MultiVersionedKeyQuery<K, V> typedKeyQuery = (MultiVersionedKeyQuery<K, V>) query; - - final Instant fromTime = typedKeyQuery.fromTime().isPresent() ? typedKeyQuery.fromTime().get() : Instant.ofEpochMilli(Long.MIN_VALUE); - final Instant toTime = typedKeyQuery.toTime().isPresent() ? typedKeyQuery.toTime().get() : Instant.ofEpochMilli(Long.MAX_VALUE); - MultiVersionedKeyQuery<Bytes, byte[]> rawKeyQuery = MultiVersionedKeyQuery.withKey(keyBytes(typedKeyQuery.key())); - rawKeyQuery = rawKeyQuery.fromTime(fromTime).toTime(toTime); - if (!typedKeyQuery.isAscending()) { - rawKeyQuery = rawKeyQuery.withDescendingTimestamps(); - } - - final QueryResult<VersionedRecordIterator<byte[]>> rawResult = wrapped().query(rawKeyQuery, positionBound, config); - if (rawResult.isSuccess()) { - final MeteredMultiVersionedKeyQueryIterator<V> typedResult = new MeteredMultiVersionedKeyQueryIterator<V>(rawResult.getResult(), StoreQueryUtils.getDeserializeValue(plainValueSerdes)); - final QueryResult<MeteredMultiVersionedKeyQueryIterator<V>> typedQueryResult = InternalQueryResultUtil.copyAndSubstituteDeserializedResult(rawResult, typedResult); - result = (QueryResult<R>) typedQueryResult; - } else { - // the generic type doesn't matter, since failed queries have no result set. - result = (QueryResult<R>) rawResult; - } - return result; - } + @SuppressWarnings("unchecked") + private <R> QueryResult<R> runMultiVersionedKeyQuery(final Query<R> query, final PositionBound positionBound, final QueryConfig config) { + final QueryResult<R> result; + final MultiVersionedKeyQuery<K, V> typedKeyQuery = (MultiVersionedKeyQuery<K, V>) query; + + final Instant fromTime = typedKeyQuery.fromTime().isPresent() ? typedKeyQuery.fromTime().get() : Instant.ofEpochMilli(Long.MIN_VALUE); Review Comment: Can we simplify this and the next two lines? ``` final Instant fromTime = typedKeyQuery.orElse(Instant.ofEpochMilli(Long.MIN_VALUE)); ``` ########## streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java: ########## @@ -266,75 +269,28 @@ public VersionedRecord<byte[]> get(final Bytes key, final long asOfTimestamp) { return null; } - public VersionedRecordIterator<byte[]> get(final Bytes key, final long fromTimestamp, final long toTimestamp, final boolean isAscending) { - - Objects.requireNonNull(key, "key cannot be null"); + @SuppressWarnings("unchecked") Review Comment: Why do we need this suppression? ########## streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalSegmentIterator.java: ########## @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.query.ResultOrder; +import org.apache.kafka.streams.state.VersionedRecord; +import org.apache.kafka.streams.state.VersionedRecordIterator; +import org.rocksdb.Snapshot; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.ListIterator; +import java.util.Optional; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +public class LogicalSegmentIterator implements VersionedRecordIterator { + protected final ListIterator<LogicalKeyValueSegment> segmentIterator; + private final Bytes key; + private final Long fromTime; + private final Long toTime; + private final ResultOrder order; + protected ListIterator<VersionedRecord<byte[]>> iterator; + + // defined for creating/releasing the snapshot. + private LogicalKeyValueSegment snapshotOwner; + private Snapshot snapshot; + + + + public LogicalSegmentIterator(final ListIterator<LogicalKeyValueSegment> segmentIterator, + final Bytes key, + final Long fromTime, + final Long toTime, + final ResultOrder order) { + + this.segmentIterator = segmentIterator; + this.key = key; + this.fromTime = fromTime; + this.toTime = toTime; + this.iterator = Collections.emptyListIterator(); + this.order = order; + this.snapshot = null; + this.snapshotOwner = null; + } + + @Override + public void close() { + // user may refuse consuming all returned records, so release the snapshot when closing the iterator if it is not released yet! + releaseSnapshot(); + } + + @Override + public boolean hasNext() { + return iterator.hasNext() || maybeFillIterator(); + } + + @Override + public Object next() { + if (hasNext()) { + return iterator.next(); + } + return null; + } + private boolean maybeFillIterator() { Review Comment: nit: missing blank line ########## streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalSegmentIterator.java: ########## @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.query.ResultOrder; +import org.apache.kafka.streams.state.VersionedRecord; +import org.apache.kafka.streams.state.VersionedRecordIterator; +import org.rocksdb.Snapshot; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.ListIterator; +import java.util.Optional; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +public class LogicalSegmentIterator implements VersionedRecordIterator { Review Comment: Should we `extend AbstractIterator` to share some common code? I remember that we also introduced `ManagedIterator` interface for versioned state store -- would we need to implement it, too? ########## streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStore.java: ########## @@ -244,30 +246,37 @@ private <R> QueryResult<R> runVersionedKeyQuery(final Query<R> query, return result; } - @SuppressWarnings("unchecked") - private <R> QueryResult<R> runMultiVersionedKeyQuery(final Query<R> query, final PositionBound positionBound, final QueryConfig config) { - final QueryResult<R> result; - final MultiVersionedKeyQuery<K, V> typedKeyQuery = (MultiVersionedKeyQuery<K, V>) query; - - final Instant fromTime = typedKeyQuery.fromTime().isPresent() ? typedKeyQuery.fromTime().get() : Instant.ofEpochMilli(Long.MIN_VALUE); - final Instant toTime = typedKeyQuery.toTime().isPresent() ? typedKeyQuery.toTime().get() : Instant.ofEpochMilli(Long.MAX_VALUE); - MultiVersionedKeyQuery<Bytes, byte[]> rawKeyQuery = MultiVersionedKeyQuery.withKey(keyBytes(typedKeyQuery.key())); - rawKeyQuery = rawKeyQuery.fromTime(fromTime).toTime(toTime); - if (!typedKeyQuery.isAscending()) { - rawKeyQuery = rawKeyQuery.withDescendingTimestamps(); - } - - final QueryResult<VersionedRecordIterator<byte[]>> rawResult = wrapped().query(rawKeyQuery, positionBound, config); - if (rawResult.isSuccess()) { - final MeteredMultiVersionedKeyQueryIterator<V> typedResult = new MeteredMultiVersionedKeyQueryIterator<V>(rawResult.getResult(), StoreQueryUtils.getDeserializeValue(plainValueSerdes)); - final QueryResult<MeteredMultiVersionedKeyQueryIterator<V>> typedQueryResult = InternalQueryResultUtil.copyAndSubstituteDeserializedResult(rawResult, typedResult); - result = (QueryResult<R>) typedQueryResult; - } else { - // the generic type doesn't matter, since failed queries have no result set. - result = (QueryResult<R>) rawResult; - } - return result; - } + @SuppressWarnings("unchecked") + private <R> QueryResult<R> runMultiVersionedKeyQuery(final Query<R> query, final PositionBound positionBound, final QueryConfig config) { + final QueryResult<R> result; + final MultiVersionedKeyQuery<K, V> typedKeyQuery = (MultiVersionedKeyQuery<K, V>) query; + + final Instant fromTime = typedKeyQuery.fromTime().isPresent() ? typedKeyQuery.fromTime().get() : Instant.ofEpochMilli(Long.MIN_VALUE); + final Instant toTime = typedKeyQuery.toTime().isPresent() ? typedKeyQuery.toTime().get() : Instant.ofEpochMilli(Long.MAX_VALUE); + if (fromTime.compareTo(toTime) > 0) { + throw new InvalidParameterException("The `fromTime` timestamp must be smaller than the `toTime` timestamp."); Review Comment: Should this be `IllegelArgumentException`? ########## streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java: ########## @@ -266,75 +269,28 @@ public VersionedRecord<byte[]> get(final Bytes key, final long asOfTimestamp) { return null; } - public VersionedRecordIterator<byte[]> get(final Bytes key, final long fromTimestamp, final long toTimestamp, final boolean isAscending) { - - Objects.requireNonNull(key, "key cannot be null"); + @SuppressWarnings("unchecked") + protected VersionedRecordIterator<byte[]> get(final Bytes key, final long fromTimestamp, final long toTimestamp, final ResultOrder order) { Review Comment: Can we make it `private` (or package private)? ########## streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalSegmentIterator.java: ########## @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.query.ResultOrder; +import org.apache.kafka.streams.state.VersionedRecord; +import org.apache.kafka.streams.state.VersionedRecordIterator; +import org.rocksdb.Snapshot; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.ListIterator; +import java.util.Optional; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +public class LogicalSegmentIterator implements VersionedRecordIterator { + protected final ListIterator<LogicalKeyValueSegment> segmentIterator; + private final Bytes key; + private final Long fromTime; + private final Long toTime; + private final ResultOrder order; + protected ListIterator<VersionedRecord<byte[]>> iterator; + + // defined for creating/releasing the snapshot. + private LogicalKeyValueSegment snapshotOwner; + private Snapshot snapshot; + + + + public LogicalSegmentIterator(final ListIterator<LogicalKeyValueSegment> segmentIterator, + final Bytes key, + final Long fromTime, + final Long toTime, + final ResultOrder order) { + + this.segmentIterator = segmentIterator; + this.key = key; + this.fromTime = fromTime; + this.toTime = toTime; + this.iterator = Collections.emptyListIterator(); + this.order = order; + this.snapshot = null; + this.snapshotOwner = null; + } + + @Override + public void close() { + // user may refuse consuming all returned records, so release the snapshot when closing the iterator if it is not released yet! + releaseSnapshot(); + } + + @Override + public boolean hasNext() { + return iterator.hasNext() || maybeFillIterator(); Review Comment: Should we add a member `close` and throw if iterator is already closed? (cf `RocksDbIterator` in the code-base) ########## streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalSegmentIterator.java: ########## @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.query.ResultOrder; +import org.apache.kafka.streams.state.VersionedRecord; +import org.apache.kafka.streams.state.VersionedRecordIterator; +import org.rocksdb.Snapshot; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.ListIterator; +import java.util.Optional; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +public class LogicalSegmentIterator implements VersionedRecordIterator { + protected final ListIterator<LogicalKeyValueSegment> segmentIterator; + private final Bytes key; + private final Long fromTime; + private final Long toTime; + private final ResultOrder order; + protected ListIterator<VersionedRecord<byte[]>> iterator; + + // defined for creating/releasing the snapshot. + private LogicalKeyValueSegment snapshotOwner; + private Snapshot snapshot; + + + + public LogicalSegmentIterator(final ListIterator<LogicalKeyValueSegment> segmentIterator, + final Bytes key, + final Long fromTime, + final Long toTime, + final ResultOrder order) { + + this.segmentIterator = segmentIterator; + this.key = key; + this.fromTime = fromTime; + this.toTime = toTime; + this.iterator = Collections.emptyListIterator(); + this.order = order; + this.snapshot = null; + this.snapshotOwner = null; + } + + @Override + public void close() { + // user may refuse consuming all returned records, so release the snapshot when closing the iterator if it is not released yet! + releaseSnapshot(); + } + + @Override + public boolean hasNext() { + return iterator.hasNext() || maybeFillIterator(); + } + + @Override + public Object next() { + if (hasNext()) { + return iterator.next(); + } + return null; Review Comment: I think we should throw an exception for this case (or maybe we can just inherit from `AbstractIterator` instead. ########## streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalSegmentIterator.java: ########## @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.query.ResultOrder; +import org.apache.kafka.streams.state.VersionedRecord; +import org.apache.kafka.streams.state.VersionedRecordIterator; +import org.rocksdb.Snapshot; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.ListIterator; +import java.util.Optional; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +public class LogicalSegmentIterator implements VersionedRecordIterator { + protected final ListIterator<LogicalKeyValueSegment> segmentIterator; + private final Bytes key; + private final Long fromTime; + private final Long toTime; + private final ResultOrder order; + protected ListIterator<VersionedRecord<byte[]>> iterator; + + // defined for creating/releasing the snapshot. + private LogicalKeyValueSegment snapshotOwner; + private Snapshot snapshot; + + + + public LogicalSegmentIterator(final ListIterator<LogicalKeyValueSegment> segmentIterator, + final Bytes key, + final Long fromTime, + final Long toTime, + final ResultOrder order) { + + this.segmentIterator = segmentIterator; + this.key = key; + this.fromTime = fromTime; + this.toTime = toTime; + this.iterator = Collections.emptyListIterator(); + this.order = order; + this.snapshot = null; + this.snapshotOwner = null; + } + + @Override + public void close() { + // user may refuse consuming all returned records, so release the snapshot when closing the iterator if it is not released yet! + releaseSnapshot(); + } + + @Override + public boolean hasNext() { + return iterator.hasNext() || maybeFillIterator(); + } + + @Override + public Object next() { + if (hasNext()) { + return iterator.next(); + } + return null; + } + private boolean maybeFillIterator() { + + final List<VersionedRecord<byte[]>> queryResults = new ArrayList<>(); + while (segmentIterator.hasNext()) { + final LogicalKeyValueSegment segment = segmentIterator.next(); + + if (snapshot == null) { // create the snapshot (this will happen only one time). + this.snapshotOwner = segment; + // take a RocksDB snapshot to return the segments content at the query time (in order to guarantee consistency) + final Lock lock = new ReentrantLock(); Review Comment: My original comment about locks was because I thought there is multiple RocksDBs -- but you explained to me, that all segments (latest and historical) are logical abstraction over the same physical RocksDB, so maybe we don't need locks? This might also simplify the code, because we can get a single snapshot when the iterator is creates, and we don't need to track anything else, but use the same snaphot throughout the lifetime of the iterator? (Also don't need `snapshotOwner` any longer, as we don't have about the "logical segment" in use, given all of them have the same physical RocksDB instance under the hood? ########## streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalSegmentIterator.java: ########## @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.query.ResultOrder; +import org.apache.kafka.streams.state.VersionedRecord; +import org.apache.kafka.streams.state.VersionedRecordIterator; +import org.rocksdb.Snapshot; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.ListIterator; +import java.util.Optional; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +public class LogicalSegmentIterator implements VersionedRecordIterator { + protected final ListIterator<LogicalKeyValueSegment> segmentIterator; + private final Bytes key; + private final Long fromTime; + private final Long toTime; + private final ResultOrder order; + protected ListIterator<VersionedRecord<byte[]>> iterator; + + // defined for creating/releasing the snapshot. + private LogicalKeyValueSegment snapshotOwner; + private Snapshot snapshot; + + + + public LogicalSegmentIterator(final ListIterator<LogicalKeyValueSegment> segmentIterator, + final Bytes key, + final Long fromTime, + final Long toTime, + final ResultOrder order) { + + this.segmentIterator = segmentIterator; + this.key = key; + this.fromTime = fromTime; + this.toTime = toTime; + this.iterator = Collections.emptyListIterator(); + this.order = order; + this.snapshot = null; + this.snapshotOwner = null; + } + + @Override + public void close() { + // user may refuse consuming all returned records, so release the snapshot when closing the iterator if it is not released yet! + releaseSnapshot(); + } + + @Override + public boolean hasNext() { + return iterator.hasNext() || maybeFillIterator(); + } + + @Override + public Object next() { + if (hasNext()) { + return iterator.next(); + } + return null; + } + private boolean maybeFillIterator() { + + final List<VersionedRecord<byte[]>> queryResults = new ArrayList<>(); + while (segmentIterator.hasNext()) { + final LogicalKeyValueSegment segment = segmentIterator.next(); + + if (snapshot == null) { // create the snapshot (this will happen only one time). + this.snapshotOwner = segment; + // take a RocksDB snapshot to return the segments content at the query time (in order to guarantee consistency) + final Lock lock = new ReentrantLock(); + lock.lock(); + try { + this.snapshot = snapshotOwner.getSnapshot(); + } finally { + lock.unlock(); + } + } + + final byte[] rawSegmentValue = segment.get(key, snapshot); + if (rawSegmentValue != null) { // this segment contains record(s) with the specified key + if (segment.id() == -1) { // this is the latestValueStore + final long recordTimestamp = RocksDBVersionedStore.LatestValueFormatter.getTimestamp(rawSegmentValue); + if (recordTimestamp <= toTime) { + // latest value satisfies timestamp bound + queryResults.add(new VersionedRecord<>(RocksDBVersionedStore.LatestValueFormatter.getValue(rawSegmentValue), recordTimestamp)); + } + } else { + // this segment contains records with the specified key and time range + final List<RocksDBVersionedStoreSegmentValueFormatter.SegmentValue.SegmentSearchResult> searchResults = + RocksDBVersionedStoreSegmentValueFormatter.deserialize(rawSegmentValue).findAll(fromTime, toTime); + for (final RocksDBVersionedStoreSegmentValueFormatter.SegmentValue.SegmentSearchResult searchResult : searchResults) { + queryResults.add(new VersionedRecord<>(searchResult.value(), searchResult.validFrom(), Optional.of(searchResult.validTo()))); + } + } + } + if (!queryResults.isEmpty()) { + break; + } + } + if (!queryResults.isEmpty()) { + if (order.equals(ResultOrder.ASCENDING)) { + queryResults.sort((r1, r2) -> (int) (r1.timestamp() - r2.timestamp())); Review Comment: Wondering if we need to sort? Could we also travers `queryResult` backwards later instead? It seems we could init with `queryResults.listIterator(queryResult.size())` and use `iterator.previous()` instead of `iterator.next()` inside `next()` ? ########## streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java: ########## @@ -263,6 +266,78 @@ public VersionedRecord<byte[]> get(final Bytes key, final long asOfTimestamp) { return null; } + public VersionedRecordIterator<byte[]> get(final Bytes key, final long fromTimestamp, final long toTimestamp, final boolean isAscending) { + + Objects.requireNonNull(key, "key cannot be null"); + validateStoreOpen(); + + final List<VersionedRecord<byte[]>> queryResults = new ArrayList<>(); + + if (toTimestamp < observedStreamTime - historyRetention) { + // history retention exceeded. we still check the latest value store in case the + // latest record version satisfies the timestamp bound, in which case it should + // still be returned (i.e., the latest record version per key never expires). + final byte[] rawLatestValueAndTimestamp = latestValueStore.get(key); + if (rawLatestValueAndTimestamp != null) { + final long recordTimestamp = LatestValueFormatter.getTimestamp(rawLatestValueAndTimestamp); + if (recordTimestamp <= toTimestamp) { + // latest value satisfies timestamp bound + queryResults.add(new VersionedRecord<>(LatestValueFormatter.getValue(rawLatestValueAndTimestamp), recordTimestamp)); + } + } + + // history retention has elapsed and the latest record version (if present) does + // not satisfy the timestamp bound. return null for predictability, even if data + // is still present in segments. + if (queryResults.size() == 0) { + LOG.warn("Returning null for expired get."); + } + return new VersionedRecordIteratorImpl<>(queryResults.listIterator()); + } else { + // take a RocksDB snapshot to return the segments content at the query time (in order to guarantee consistency) + final Snapshot snapshot = latestValueStore.getSnapshot(); Review Comment: Seems this commend it void given it's a single RocksDB. My bad. ########## streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java: ########## @@ -266,75 +269,28 @@ public VersionedRecord<byte[]> get(final Bytes key, final long asOfTimestamp) { return null; } - public VersionedRecordIterator<byte[]> get(final Bytes key, final long fromTimestamp, final long toTimestamp, final boolean isAscending) { - - Objects.requireNonNull(key, "key cannot be null"); + @SuppressWarnings("unchecked") + protected VersionedRecordIterator<byte[]> get(final Bytes key, final long fromTimestamp, final long toTimestamp, final ResultOrder order) { validateStoreOpen(); - final List<VersionedRecord<byte[]>> queryResults = new ArrayList<>(); - if (toTimestamp < observedStreamTime - historyRetention) { // history retention exceeded. we still check the latest value store in case the // latest record version satisfies the timestamp bound, in which case it should // still be returned (i.e., the latest record version per key never expires). - final byte[] rawLatestValueAndTimestamp = latestValueStore.get(key); - if (rawLatestValueAndTimestamp != null) { - final long recordTimestamp = LatestValueFormatter.getTimestamp(rawLatestValueAndTimestamp); - if (recordTimestamp <= toTimestamp) { - // latest value satisfies timestamp bound - queryResults.add(new VersionedRecord<>(LatestValueFormatter.getValue(rawLatestValueAndTimestamp), recordTimestamp)); - } - } - - // history retention has elapsed and the latest record version (if present) does - // not satisfy the timestamp bound. return null for predictability, even if data - // is still present in segments. - if (queryResults.size() == 0) { - LOG.warn("Returning null for expired get."); - } - return new VersionedRecordIteratorImpl<>(queryResults.listIterator()); + return new LogicalSegmentIterator(Collections.singletonList(latestValueStore).listIterator(), key, fromTimestamp, toTimestamp, order); Review Comment: I love that you added `LogicalSegmentIterator` -- very clean and elegant! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org