mjsax commented on code in PR #14626:
URL: https://github.com/apache/kafka/pull/14626#discussion_r1414568376


##########
streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalSegmentIterator.java:
##########
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.query.ResultOrder;
+import org.apache.kafka.streams.state.VersionedRecord;
+import org.apache.kafka.streams.state.VersionedRecordIterator;
+import org.rocksdb.Snapshot;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.NoSuchElementException;
+
+public class LogicalSegmentIterator implements VersionedRecordIterator {
+    protected final ListIterator<LogicalKeyValueSegment> segmentIterator;
+    private final Bytes key;
+    private final Long fromTime;
+    private final Long toTime;
+    private final ResultOrder order;
+    private ListIterator<VersionedRecord<byte[]>> iterator;
+    private volatile boolean open = true;
+
+    // defined for creating/releasing the snapshot. 
+    private LogicalKeyValueSegment snapshotOwner = null;
+    private Snapshot snapshot = null;
+
+
+
+    public LogicalSegmentIterator(final ListIterator<LogicalKeyValueSegment> 
segmentIterator,
+                                  final Bytes key,
+                                  final Long fromTime,
+                                  final Long toTime,
+                                  final ResultOrder order) {
+
+        this.segmentIterator = segmentIterator;
+        this.key = key;
+        this.fromTime = fromTime;
+        this.toTime = toTime;
+        this.iterator = Collections.emptyListIterator();
+        this.order = order;
+    }
+
+    @Override
+    public void close() {
+        open = false;
+        // user may refuse consuming all returned records, so release the 
snapshot when closing the iterator if it is not released yet!
+        releaseSnapshot();
+    }
+
+    @Override
+    public boolean hasNext() {
+        if (!open) {
+            throw new IllegalStateException("The iterator is out of scope.");
+        }
+        final boolean hasStillLoad = order.equals(ResultOrder.ASCENDING) ? 
iterator.hasPrevious() : iterator.hasNext();

Review Comment:
   We might was to add a comment about why `ASCENDING` is using `hasPrevious()` 
-- it's not intuitive and only makes sense if one knows that data is storing 
descending inside a segement.



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java:
##########
@@ -263,6 +268,31 @@ public VersionedRecord<byte[]> get(final Bytes key, final 
long asOfTimestamp) {
         return null;
     }
 
+    @SuppressWarnings("unchecked")
+    VersionedRecordIterator<byte[]> get(final Bytes key, final long 
fromTimestamp, final long toTimestamp, final ResultOrder order) {
+        validateStoreOpen();
+
+        if (toTimestamp < observedStreamTime - historyRetention) {
+            // history retention exceeded. we still check the latest value 
store in case the
+            // latest record version satisfies the timestamp bound, in which 
case it should
+            // still be returned (i.e., the latest record version per key 
never expires).
+            return new 
LogicalSegmentIterator(Collections.singletonList(latestValueStore).listIterator(),
 key, fromTimestamp, toTimestamp, order);
+        } else {
+            final List<LogicalKeyValueSegment> segments = new ArrayList<>();
+            // add segment stores
+            // consider the search lower bound as -INF (LONG.MIN_VALUE) to 
find the record that has been inserted before the {@code fromTimestamp}
+            // but is still valid in query specified time interval.
+            if (order.equals(ResultOrder.ASCENDING)) {
+                segments.addAll(segmentStores.segments(Long.MIN_VALUE, 
toTimestamp, true));

Review Comment:
   Given we pass in `MIN_VALUE` twice, could we omit the first parameter?



##########
streams/src/test/java/org/apache/kafka/streams/integration/IQv2VersionedStoreIntegrationTest.java:
##########
@@ -153,14 +192,189 @@ private void shouldHandleVersionedKeyQuery(final Integer 
key,
         final VersionedRecord<Integer> result1 = queryResult.getResult();
         assertThat(result1.value(), is(expectedValue));
         assertThat(result1.timestamp(), is(expectedTimestamp));
+        assertThat(result1.validTo(), is(expectedValidToTime));
         assertThat(queryResult.getExecutionInfo(), is(empty()));
     }
 
-    private void shouldVerifyGetNull(final Integer key, final Instant 
queryTimestamp) {
+    private void shouldVerifyGetNullForVersionedKeyQuery(final Integer key, 
final Instant queryTimestamp) {
         VersionedKeyQuery<Integer, Integer> query = 
VersionedKeyQuery.withKey(key);
         query = query.asOf(queryTimestamp);
         final StateQueryRequest<VersionedRecord<Integer>> request = 
StateQueryRequest.inStore(STORE_NAME).withQuery(query).withPositionBound(PositionBound.at(INPUT_POSITION));
         final StateQueryResult<VersionedRecord<Integer>> result = 
IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request);
         assertThat(result.getOnlyPartitionResult(), nullValue());
     }
+
+    private void shouldHandleMultiVersionedKeyQuery(final Optional<Instant> 
fromTime, final Optional<Instant> toTime,
+                                                    final ResultOrder order, 
final int expectedArrayLowerBound, final int expectedArrayUpperBound) {
+        // define query
+        MultiVersionedKeyQuery<Integer, Integer> query = 
MultiVersionedKeyQuery.withKey(RECORD_KEY);
+        if (fromTime.isPresent()) {
+            query = query.fromTime(fromTime.get());
+        }
+        if (toTime.isPresent()) {
+            query = query.toTime(toTime.get());
+        }
+        if (order.equals(ResultOrder.ASCENDING)) {
+            query = query.withAscendingTimestamps();
+        }
+
+        // send request and get the results
+        final StateQueryRequest<VersionedRecordIterator<Integer>> request = 
StateQueryRequest.inStore(STORE_NAME).withQuery(query).withPositionBound(PositionBound.at(INPUT_POSITION));
+        final StateQueryResult<VersionedRecordIterator<Integer>> result = 
IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request);
+        final QueryResult<VersionedRecordIterator<Integer>> queryResult = 
result.getOnlyPartitionResult();
+        if (queryResult.isFailure()) {
+            throw new AssertionError(queryResult.toString());
+        }
+        assertThat(queryResult.isSuccess(), is(true));
+
+        assertThrows(IllegalArgumentException.class, 
queryResult::getFailureReason);
+        assertThrows(IllegalArgumentException.class, 
queryResult::getFailureMessage);
+
+        // verify results
+        final Map<Integer, QueryResult<VersionedRecordIterator<Integer>>> 
partitionResults = result.getPartitionResults();
+        for (final Entry<Integer, 
QueryResult<VersionedRecordIterator<Integer>>> partitionResultsEntry : 
partitionResults.entrySet()) {
+            try (final VersionedRecordIterator<Integer> iterator = 
partitionResultsEntry.getValue().getResult()) {
+                int i = order.equals(ResultOrder.ASCENDING) ? 0 : 
expectedArrayUpperBound;
+                int iteratorSize = 0;
+                while (iterator.hasNext()) {
+                    final VersionedRecord<Integer> record = iterator.next();
+                    final Long timestamp = record.timestamp();
+                    final Optional<Long> validTo = record.validTo();
+                    final Integer value = record.value();
+
+                    final Optional<Long> expectedValidTo = i < 
expectedArrayUpperBound ? Optional.of(RECORD_TIMESTAMPS[i + 1]) : 
Optional.empty();
+                    assertThat(value, is(RECORD_VALUES[i]));
+                    assertThat(timestamp, is(RECORD_TIMESTAMPS[i]));
+                    assertThat(validTo, is(expectedValidTo));
+                    assertThat(queryResult.getExecutionInfo(), is(empty()));
+                    i = order.equals(ResultOrder.ASCENDING) ? i + 1 : i - 1;
+                    iteratorSize++;
+                }
+                // The number of returned records by query is equal to 
expected number of records
+                assertThat(iteratorSize, equalTo(expectedArrayUpperBound - 
expectedArrayLowerBound + 1));
+            }
+        }
+    }
+
+    private void shouldVerifyGetNullForMultiVersionedKeyQuery(final Integer 
key, final Optional<Instant> fromTime, final Optional<Instant> toTime) {
+        // define query
+        MultiVersionedKeyQuery<Integer, Integer> query = 
MultiVersionedKeyQuery.withKey(key);
+        if (fromTime.isPresent()) {
+            query = query.fromTime(fromTime.get());
+        }
+        if (toTime.isPresent()) {
+            query = query.toTime(toTime.get());
+        }
+
+        // send query and receive results
+        final StateQueryRequest<VersionedRecordIterator<Integer>> request = 
StateQueryRequest.inStore(STORE_NAME).withQuery(query).withPositionBound(PositionBound.at(INPUT_POSITION));
+        final StateQueryResult<VersionedRecordIterator<Integer>> result = 
IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request);
+        final QueryResult<VersionedRecordIterator<Integer>> queryResult = 
result.getOnlyPartitionResult();
+
+        if (queryResult.isFailure()) {
+            throw new AssertionError(queryResult.toString());
+        }
+        assertThat(queryResult.isSuccess(), is(true));
+
+        assertThrows(IllegalArgumentException.class, 
queryResult::getFailureReason);
+        assertThrows(IllegalArgumentException.class, 
queryResult::getFailureMessage);
+
+        // verify results
+        final Map<Integer, QueryResult<VersionedRecordIterator<Integer>>> 
partitionResults = result.getPartitionResults();
+        for (final Entry<Integer, 
QueryResult<VersionedRecordIterator<Integer>>> partitionResultsEntry : 
partitionResults.entrySet()) {
+            try (final VersionedRecordIterator<Integer> iterator = 
partitionResultsEntry.getValue().getResult()) {
+                assertFalse(iterator.hasNext());
+            }
+        }
+    }
+
+    private void shouldHandleRaceCondition() {
+        final MultiVersionedKeyQuery<Integer, Integer> query = 
MultiVersionedKeyQuery.withKey(RECORD_KEY);
+
+        final StateQueryRequest<VersionedRecordIterator<Integer>> request = 
StateQueryRequest.inStore(STORE_NAME).withQuery(query).withPositionBound(PositionBound.at(INPUT_POSITION));
+        final StateQueryResult<VersionedRecordIterator<Integer>> result = 
IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request);
+        final QueryResult<VersionedRecordIterator<Integer>> queryResult = 
result.getOnlyPartitionResult();
+        if (queryResult.isFailure()) {
+            throw new AssertionError(queryResult.toString());
+        }
+        assertThat(queryResult.isSuccess(), is(true));
+
+        assertThrows(IllegalArgumentException.class, 
queryResult::getFailureReason);
+        assertThrows(IllegalArgumentException.class, 
queryResult::getFailureMessage);
+
+        // verify results in two steps
+        final Map<Integer, QueryResult<VersionedRecordIterator<Integer>>> 
partitionResults = result.getPartitionResults();
+        for (final Entry<Integer, 
QueryResult<VersionedRecordIterator<Integer>>> partitionResultsEntry : 
partitionResults.entrySet()) {
+            try (final VersionedRecordIterator<Integer> iterator = 
partitionResultsEntry.getValue().getResult()) {
+                int i = LAST_INDEX;
+                int iteratorSize = 0;
+
+                // step 1:
+                while (iterator.hasNext()) {
+                    final VersionedRecord<Integer> record = iterator.next();
+                    final Long timestamp = record.timestamp();
+                    final Optional<Long> validTo = record.validTo();
+                    final Integer value = record.value();
+
+                    final Optional<Long> expectedValidTo = i < LAST_INDEX ? 
Optional.of(RECORD_TIMESTAMPS[i + 1]) : Optional.empty();
+                    assertThat(value, is(RECORD_VALUES[i]));
+                    assertThat(timestamp, is(RECORD_TIMESTAMPS[i]));
+                    assertThat(validTo, is(expectedValidTo));
+                    assertThat(queryResult.getExecutionInfo(), is(empty()));
+                    i--;
+                    iteratorSize++;
+                    if (i == 2)

Review Comment:
   nit: we usually always use `{ }` for all blocks



##########
streams/src/main/java/org/apache/kafka/streams/state/VersionedRecord.java:
##########
@@ -27,18 +28,34 @@
 public final class VersionedRecord<V> {
     private final V value;
     private final long timestamp;
+    private final Optional<Long> validTo;
 
     /**
      * Create a new {@link VersionedRecord} instance. {@code value} cannot be 
{@code null}.
      *
-     * @param value      the value
-     * @param timestamp  the timestamp
+     * @param value      The value
+     * @param timestamp  The type of the result returned by this query.
      */
     public VersionedRecord(final V value, final long timestamp) {
+        this.value = Objects.requireNonNull(value, "value cannot be null.");
+        this.timestamp = timestamp;
+        this.validTo = Optional.empty();
+    }
+
+    /**
+     * Create a new {@link VersionedRecord} instance. {@code value} cannot be 
{@code null}.
+     *
+     * @param value      The value
+     * @param timestamp  The timestamp
+     * @param validTo    The exclusive upper bound of the validity interval
+     */
+    public VersionedRecord(final V value, final long timestamp, final Long 
validTo) {

Review Comment:
   Should it be `long` instead of `Long` ?



##########
streams/src/main/java/org/apache/kafka/streams/query/MultiVersionedKeyQuery.java:
##########
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.query;
+
+import java.time.Instant;
+import java.util.Objects;
+import java.util.Optional;
+import org.apache.kafka.common.annotation.InterfaceStability.Evolving;
+import org.apache.kafka.streams.state.VersionedRecordIterator;
+
+/**
+ * Interactive query for retrieving a set of records with the same specified 
key and different timestamps within the specified time range.
+ * No ordering is guaranteed for the results, but the results can be sorted by 
timestamp (in ascending or descending order) by calling the corresponding 
defined methods.
+ *
+ *  @param <K> The type of the key.
+ *  @param <V> The type of the result returned by this query.
+ */
+@Evolving
+public final class MultiVersionedKeyQuery<K, V> implements 
Query<VersionedRecordIterator<V>> {
+
+    private final K key;
+    private final Optional<Instant> fromTime;
+    private final Optional<Instant> toTime;
+    private final ResultOrder order;
+
+    private MultiVersionedKeyQuery(final K key, final Optional<Instant> 
fromTime, final Optional<Instant> toTime, final ResultOrder order) {
+        this.key = key;
+        this.fromTime = fromTime;
+        this.toTime = toTime;
+        this.order = order;
+    }
+
+  /**

Review Comment:
   nit: indention should be 4, not 2



##########
streams/src/main/java/org/apache/kafka/streams/query/MultiVersionedKeyQuery.java:
##########
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.query;
+
+import java.time.Instant;
+import java.util.Objects;
+import java.util.Optional;
+import org.apache.kafka.common.annotation.InterfaceStability.Evolving;
+import org.apache.kafka.streams.state.VersionedRecordIterator;
+
+/**
+ * Interactive query for retrieving a set of records with the same specified 
key and different timestamps within the specified time range.
+ * No ordering is guaranteed for the results, but the results can be sorted by 
timestamp (in ascending or descending order) by calling the corresponding 
defined methods.
+ *
+ *  @param <K> The type of the key.
+ *  @param <V> The type of the result returned by this query.
+ */
+@Evolving
+public final class MultiVersionedKeyQuery<K, V> implements 
Query<VersionedRecordIterator<V>> {
+
+    private final K key;
+    private final Optional<Instant> fromTime;
+    private final Optional<Instant> toTime;
+    private final ResultOrder order;
+
+    private MultiVersionedKeyQuery(final K key, final Optional<Instant> 
fromTime, final Optional<Instant> toTime, final ResultOrder order) {
+        this.key = key;
+        this.fromTime = fromTime;
+        this.toTime = toTime;
+        this.order = order;
+    }
+
+  /**
+   * Creates a query that will retrieve the set of records identified by 
{@code key} if any exists
+   * (or {@code null} otherwise).
+   *
+   * <p>
+   * While the query by default returns the all the record versions of the 
specified {@code key}, setting
+   * the {@code fromTimestamp} (by calling the {@link #fromTime(Instant)} 
method), and the {@code toTimestamp}
+   * (by calling the {@link #toTime(Instant)} method) makes the query to 
return the record versions associated
+   * to the specified time range.
+   *
+   * @param key The specified key by the query
+   * @param <K> The type of the key
+   * @param <V> The type of the value that will be retrieved
+   * @throws NullPointerException if @param key is null

Review Comment:
   `if {@code key} is null`



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalSegmentIterator.java:
##########
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.query.ResultOrder;
+import org.apache.kafka.streams.state.VersionedRecord;
+import org.apache.kafka.streams.state.VersionedRecordIterator;
+import org.rocksdb.Snapshot;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.NoSuchElementException;
+
+public class LogicalSegmentIterator implements VersionedRecordIterator {
+    protected final ListIterator<LogicalKeyValueSegment> segmentIterator;
+    private final Bytes key;
+    private final Long fromTime;
+    private final Long toTime;
+    private final ResultOrder order;
+    private ListIterator<VersionedRecord<byte[]>> iterator;
+    private volatile boolean open = true;
+
+    // defined for creating/releasing the snapshot. 
+    private LogicalKeyValueSegment snapshotOwner = null;
+    private Snapshot snapshot = null;
+
+
+
+    public LogicalSegmentIterator(final ListIterator<LogicalKeyValueSegment> 
segmentIterator,
+                                  final Bytes key,
+                                  final Long fromTime,
+                                  final Long toTime,
+                                  final ResultOrder order) {
+
+        this.segmentIterator = segmentIterator;
+        this.key = key;
+        this.fromTime = fromTime;
+        this.toTime = toTime;
+        this.iterator = Collections.emptyListIterator();
+        this.order = order;
+    }
+
+    @Override
+    public void close() {
+        open = false;
+        // user may refuse consuming all returned records, so release the 
snapshot when closing the iterator if it is not released yet!
+        releaseSnapshot();
+    }
+
+    @Override
+    public boolean hasNext() {
+        if (!open) {
+            throw new IllegalStateException("The iterator is out of scope.");
+        }
+        final boolean hasStillLoad = order.equals(ResultOrder.ASCENDING) ? 
iterator.hasPrevious() : iterator.hasNext();
+        return hasStillLoad || maybeFillIterator();
+    }
+
+    @Override
+    public Object next() {
+        if (hasNext()) {
+            return order.equals(ResultOrder.ASCENDING) ? iterator.previous() : 
iterator.next();

Review Comment:
   has above (maybe add comment)



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalSegmentIterator.java:
##########
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.query.ResultOrder;
+import org.apache.kafka.streams.state.VersionedRecord;
+import org.apache.kafka.streams.state.VersionedRecordIterator;
+import org.rocksdb.Snapshot;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.NoSuchElementException;
+
+public class LogicalSegmentIterator implements VersionedRecordIterator {
+    protected final ListIterator<LogicalKeyValueSegment> segmentIterator;
+    private final Bytes key;
+    private final Long fromTime;
+    private final Long toTime;
+    private final ResultOrder order;
+    private ListIterator<VersionedRecord<byte[]>> iterator;
+    private volatile boolean open = true;
+
+    // defined for creating/releasing the snapshot. 
+    private LogicalKeyValueSegment snapshotOwner = null;
+    private Snapshot snapshot = null;
+
+
+
+    public LogicalSegmentIterator(final ListIterator<LogicalKeyValueSegment> 
segmentIterator,
+                                  final Bytes key,
+                                  final Long fromTime,
+                                  final Long toTime,
+                                  final ResultOrder order) {
+
+        this.segmentIterator = segmentIterator;
+        this.key = key;
+        this.fromTime = fromTime;
+        this.toTime = toTime;
+        this.iterator = Collections.emptyListIterator();
+        this.order = order;
+    }
+
+    @Override
+    public void close() {
+        open = false;
+        // user may refuse consuming all returned records, so release the 
snapshot when closing the iterator if it is not released yet!
+        releaseSnapshot();
+    }
+
+    @Override
+    public boolean hasNext() {
+        if (!open) {
+            throw new IllegalStateException("The iterator is out of scope.");
+        }
+        final boolean hasStillLoad = order.equals(ResultOrder.ASCENDING) ? 
iterator.hasPrevious() : iterator.hasNext();
+        return hasStillLoad || maybeFillIterator();
+    }
+
+    @Override
+    public Object next() {
+        if (hasNext()) {
+            return order.equals(ResultOrder.ASCENDING) ? iterator.previous() : 
iterator.next();
+        }
+        throw new NoSuchElementException();
+    }
+
+    private boolean maybeFillIterator() {
+
+        final List<VersionedRecord<byte[]>> queryResults = new ArrayList<>();
+        while (segmentIterator.hasNext()) {
+            final LogicalKeyValueSegment segment = segmentIterator.next();
+
+            if (snapshot == null) { // create the snapshot (this will happen 
only one time).
+                this.snapshotOwner = segment;

Review Comment:
   We might want to add a comment what we can pick a random segment as owner, 
because all use the same physical RocksDB under-the-hood



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalSegmentIterator.java:
##########
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.query.ResultOrder;
+import org.apache.kafka.streams.state.VersionedRecord;
+import org.apache.kafka.streams.state.VersionedRecordIterator;
+import org.rocksdb.Snapshot;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.NoSuchElementException;
+
+public class LogicalSegmentIterator implements VersionedRecordIterator {
+    protected final ListIterator<LogicalKeyValueSegment> segmentIterator;

Review Comment:
   Why `protected` ?



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalSegmentIterator.java:
##########
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.query.ResultOrder;
+import org.apache.kafka.streams.state.VersionedRecord;
+import org.apache.kafka.streams.state.VersionedRecordIterator;
+import org.rocksdb.Snapshot;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.NoSuchElementException;
+
+public class LogicalSegmentIterator implements VersionedRecordIterator {
+    protected final ListIterator<LogicalKeyValueSegment> segmentIterator;
+    private final Bytes key;
+    private final Long fromTime;
+    private final Long toTime;
+    private final ResultOrder order;
+    private ListIterator<VersionedRecord<byte[]>> iterator;
+    private volatile boolean open = true;
+
+    // defined for creating/releasing the snapshot. 
+    private LogicalKeyValueSegment snapshotOwner = null;
+    private Snapshot snapshot = null;
+
+
+
+    public LogicalSegmentIterator(final ListIterator<LogicalKeyValueSegment> 
segmentIterator,
+                                  final Bytes key,
+                                  final Long fromTime,
+                                  final Long toTime,
+                                  final ResultOrder order) {
+
+        this.segmentIterator = segmentIterator;
+        this.key = key;
+        this.fromTime = fromTime;
+        this.toTime = toTime;
+        this.iterator = Collections.emptyListIterator();
+        this.order = order;
+    }
+
+    @Override
+    public void close() {
+        open = false;
+        // user may refuse consuming all returned records, so release the 
snapshot when closing the iterator if it is not released yet!
+        releaseSnapshot();
+    }
+
+    @Override
+    public boolean hasNext() {
+        if (!open) {
+            throw new IllegalStateException("The iterator is out of scope.");
+        }
+        final boolean hasStillLoad = order.equals(ResultOrder.ASCENDING) ? 
iterator.hasPrevious() : iterator.hasNext();
+        return hasStillLoad || maybeFillIterator();
+    }
+
+    @Override
+    public Object next() {
+        if (hasNext()) {
+            return order.equals(ResultOrder.ASCENDING) ? iterator.previous() : 
iterator.next();
+        }
+        throw new NoSuchElementException();
+    }
+
+    private boolean maybeFillIterator() {
+
+        final List<VersionedRecord<byte[]>> queryResults = new ArrayList<>();
+        while (segmentIterator.hasNext()) {
+            final LogicalKeyValueSegment segment = segmentIterator.next();
+
+            if (snapshot == null) { // create the snapshot (this will happen 
only one time).
+                this.snapshotOwner = segment;
+                // take a RocksDB snapshot to return the segments content at 
the query time (in order to guarantee consistency)
+                this.snapshot = snapshotOwner.getSnapshot();
+            }
+
+            final byte[] rawSegmentValue = segment.get(key, snapshot);
+            if (rawSegmentValue != null) { // this segment contains record(s) 
with the specified key
+                if (segment.id() == -1) { // this is the latestValueStore
+                    final long recordTimestamp = 
RocksDBVersionedStore.LatestValueFormatter.getTimestamp(rawSegmentValue);
+                    if (recordTimestamp <= toTime) {
+                        // latest value satisfies timestamp bound
+                        queryResults.add(new 
VersionedRecord<>(RocksDBVersionedStore.LatestValueFormatter.getValue(rawSegmentValue),
 recordTimestamp));
+                    }
+                } else {
+                    // this segment contains records with the specified key 
and time range
+                    final 
List<RocksDBVersionedStoreSegmentValueFormatter.SegmentValue.SegmentSearchResult>
 searchResults =
+                            
RocksDBVersionedStoreSegmentValueFormatter.deserialize(rawSegmentValue).findAll(fromTime,
 toTime);
+                    for (final 
RocksDBVersionedStoreSegmentValueFormatter.SegmentValue.SegmentSearchResult 
searchResult : searchResults) {
+                        queryResults.add(new 
VersionedRecord<>(searchResult.value(), searchResult.validFrom(), 
searchResult.validTo()));
+                    }
+                }
+            }
+            if (!queryResults.isEmpty()) {
+                break;
+            }
+        }
+        if (!queryResults.isEmpty()) {
+            this.iterator = order.equals(ResultOrder.ASCENDING) ? 
queryResults.listIterator(queryResults.size()) : queryResults.listIterator();

Review Comment:
   add comment (as above)



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java:
##########
@@ -286,16 +316,16 @@ public void close() {
 
     @Override
     public <R> QueryResult<R> query(
-        final Query<R> query,

Review Comment:
   more formatting



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java:
##########
@@ -181,8 +186,8 @@ public VersionedRecord<byte[]> get(final Bytes key) {
         final byte[] rawLatestValueAndTimestamp = latestValueStore.get(key);
         if (rawLatestValueAndTimestamp != null) {
             return new VersionedRecord<>(
-                LatestValueFormatter.getValue(rawLatestValueAndTimestamp),
-                LatestValueFormatter.getTimestamp(rawLatestValueAndTimestamp)
+                    LatestValueFormatter.getValue(rawLatestValueAndTimestamp),
+                    
LatestValueFormatter.getTimestamp(rawLatestValueAndTimestamp)

Review Comment:
   unnecessary reformatting (some more below)



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java:
##########
@@ -242,9 +249,9 @@ private static <R> QueryResult<R> runKeyQuery(final 
Query<R> query,
 
     @SuppressWarnings("unchecked")
     private static <R> QueryResult<R> runWindowKeyQuery(final Query<R> query,
-                                                        final PositionBound 
positionBound,
-                                                        final QueryConfig 
config,
-                                                        final StateStore 
store) {
+        final PositionBound positionBound,

Review Comment:
   fix formatting



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalSegmentIterator.java:
##########
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.query.ResultOrder;
+import org.apache.kafka.streams.state.VersionedRecord;
+import org.apache.kafka.streams.state.VersionedRecordIterator;
+import org.rocksdb.Snapshot;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.NoSuchElementException;
+
+public class LogicalSegmentIterator implements VersionedRecordIterator {
+    protected final ListIterator<LogicalKeyValueSegment> segmentIterator;
+    private final Bytes key;
+    private final Long fromTime;
+    private final Long toTime;
+    private final ResultOrder order;
+    private ListIterator<VersionedRecord<byte[]>> iterator;
+    private volatile boolean open = true;
+
+    // defined for creating/releasing the snapshot. 
+    private LogicalKeyValueSegment snapshotOwner = null;
+    private Snapshot snapshot = null;
+
+
+
+    public LogicalSegmentIterator(final ListIterator<LogicalKeyValueSegment> 
segmentIterator,
+                                  final Bytes key,
+                                  final Long fromTime,
+                                  final Long toTime,
+                                  final ResultOrder order) {
+
+        this.segmentIterator = segmentIterator;
+        this.key = key;
+        this.fromTime = fromTime;
+        this.toTime = toTime;
+        this.iterator = Collections.emptyListIterator();
+        this.order = order;
+    }
+
+    @Override
+    public void close() {
+        open = false;
+        // user may refuse consuming all returned records, so release the 
snapshot when closing the iterator if it is not released yet!
+        releaseSnapshot();
+    }
+
+    @Override
+    public boolean hasNext() {
+        if (!open) {
+            throw new IllegalStateException("The iterator is out of scope.");
+        }
+        final boolean hasStillLoad = order.equals(ResultOrder.ASCENDING) ? 
iterator.hasPrevious() : iterator.hasNext();
+        return hasStillLoad || maybeFillIterator();
+    }
+
+    @Override
+    public Object next() {
+        if (hasNext()) {
+            return order.equals(ResultOrder.ASCENDING) ? iterator.previous() : 
iterator.next();
+        }
+        throw new NoSuchElementException();
+    }
+
+    private boolean maybeFillIterator() {
+
+        final List<VersionedRecord<byte[]>> queryResults = new ArrayList<>();
+        while (segmentIterator.hasNext()) {
+            final LogicalKeyValueSegment segment = segmentIterator.next();
+
+            if (snapshot == null) { // create the snapshot (this will happen 
only one time).
+                this.snapshotOwner = segment;
+                // take a RocksDB snapshot to return the segments content at 
the query time (in order to guarantee consistency)
+                this.snapshot = snapshotOwner.getSnapshot();
+            }
+
+            final byte[] rawSegmentValue = segment.get(key, snapshot);
+            if (rawSegmentValue != null) { // this segment contains record(s) 
with the specified key
+                if (segment.id() == -1) { // this is the latestValueStore
+                    final long recordTimestamp = 
RocksDBVersionedStore.LatestValueFormatter.getTimestamp(rawSegmentValue);
+                    if (recordTimestamp <= toTime) {
+                        // latest value satisfies timestamp bound
+                        queryResults.add(new 
VersionedRecord<>(RocksDBVersionedStore.LatestValueFormatter.getValue(rawSegmentValue),
 recordTimestamp));
+                    }
+                } else {
+                    // this segment contains records with the specified key 
and time range
+                    final 
List<RocksDBVersionedStoreSegmentValueFormatter.SegmentValue.SegmentSearchResult>
 searchResults =

Review Comment:
   Instead of using `SegmentSearchResult` it might be better to return type 
`SegmentValue` (ie a `PartiallyDeserializedSegmentValue` object), and push the 
logic to get individual values out of the segment inside the iterator (ie, 
being lazy again)? -- Otherwise `findAll` creates potentially a lot of 
`SegmentSearchResult` object that the user never retrieves when closing the 
iterator early.
   



##########
streams/src/test/java/org/apache/kafka/streams/integration/IQv2VersionedStoreIntegrationTest.java:
##########
@@ -153,14 +192,189 @@ private void shouldHandleVersionedKeyQuery(final Integer 
key,
         final VersionedRecord<Integer> result1 = queryResult.getResult();
         assertThat(result1.value(), is(expectedValue));
         assertThat(result1.timestamp(), is(expectedTimestamp));
+        assertThat(result1.validTo(), is(expectedValidToTime));
         assertThat(queryResult.getExecutionInfo(), is(empty()));
     }
 
-    private void shouldVerifyGetNull(final Integer key, final Instant 
queryTimestamp) {
+    private void shouldVerifyGetNullForVersionedKeyQuery(final Integer key, 
final Instant queryTimestamp) {
         VersionedKeyQuery<Integer, Integer> query = 
VersionedKeyQuery.withKey(key);
         query = query.asOf(queryTimestamp);
         final StateQueryRequest<VersionedRecord<Integer>> request = 
StateQueryRequest.inStore(STORE_NAME).withQuery(query).withPositionBound(PositionBound.at(INPUT_POSITION));
         final StateQueryResult<VersionedRecord<Integer>> result = 
IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request);
         assertThat(result.getOnlyPartitionResult(), nullValue());
     }
+
+    private void shouldHandleMultiVersionedKeyQuery(final Optional<Instant> 
fromTime, final Optional<Instant> toTime,
+                                                    final ResultOrder order, 
final int expectedArrayLowerBound, final int expectedArrayUpperBound) {
+        // define query
+        MultiVersionedKeyQuery<Integer, Integer> query = 
MultiVersionedKeyQuery.withKey(RECORD_KEY);
+        if (fromTime.isPresent()) {
+            query = query.fromTime(fromTime.get());
+        }
+        if (toTime.isPresent()) {
+            query = query.toTime(toTime.get());
+        }
+        if (order.equals(ResultOrder.ASCENDING)) {
+            query = query.withAscendingTimestamps();
+        }
+
+        // send request and get the results
+        final StateQueryRequest<VersionedRecordIterator<Integer>> request = 
StateQueryRequest.inStore(STORE_NAME).withQuery(query).withPositionBound(PositionBound.at(INPUT_POSITION));
+        final StateQueryResult<VersionedRecordIterator<Integer>> result = 
IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request);
+        final QueryResult<VersionedRecordIterator<Integer>> queryResult = 
result.getOnlyPartitionResult();
+        if (queryResult.isFailure()) {
+            throw new AssertionError(queryResult.toString());
+        }
+        assertThat(queryResult.isSuccess(), is(true));
+
+        assertThrows(IllegalArgumentException.class, 
queryResult::getFailureReason);
+        assertThrows(IllegalArgumentException.class, 
queryResult::getFailureMessage);
+
+        // verify results
+        final Map<Integer, QueryResult<VersionedRecordIterator<Integer>>> 
partitionResults = result.getPartitionResults();
+        for (final Entry<Integer, 
QueryResult<VersionedRecordIterator<Integer>>> partitionResultsEntry : 
partitionResults.entrySet()) {
+            try (final VersionedRecordIterator<Integer> iterator = 
partitionResultsEntry.getValue().getResult()) {
+                int i = order.equals(ResultOrder.ASCENDING) ? 0 : 
expectedArrayUpperBound;
+                int iteratorSize = 0;
+                while (iterator.hasNext()) {
+                    final VersionedRecord<Integer> record = iterator.next();
+                    final Long timestamp = record.timestamp();
+                    final Optional<Long> validTo = record.validTo();
+                    final Integer value = record.value();
+
+                    final Optional<Long> expectedValidTo = i < 
expectedArrayUpperBound ? Optional.of(RECORD_TIMESTAMPS[i + 1]) : 
Optional.empty();
+                    assertThat(value, is(RECORD_VALUES[i]));
+                    assertThat(timestamp, is(RECORD_TIMESTAMPS[i]));
+                    assertThat(validTo, is(expectedValidTo));
+                    assertThat(queryResult.getExecutionInfo(), is(empty()));
+                    i = order.equals(ResultOrder.ASCENDING) ? i + 1 : i - 1;
+                    iteratorSize++;
+                }
+                // The number of returned records by query is equal to 
expected number of records
+                assertThat(iteratorSize, equalTo(expectedArrayUpperBound - 
expectedArrayLowerBound + 1));
+            }
+        }
+    }
+
+    private void shouldVerifyGetNullForMultiVersionedKeyQuery(final Integer 
key, final Optional<Instant> fromTime, final Optional<Instant> toTime) {
+        // define query
+        MultiVersionedKeyQuery<Integer, Integer> query = 
MultiVersionedKeyQuery.withKey(key);

Review Comment:
   Seems the "define query" part is the same for multiple test methods? Can you 
unify/extract this part into helper methods (including the part about "send 
query and receive result")?



##########
streams/src/test/java/org/apache/kafka/streams/integration/IQv2VersionedStoreIntegrationTest.java:
##########
@@ -153,14 +192,189 @@ private void shouldHandleVersionedKeyQuery(final Integer 
key,
         final VersionedRecord<Integer> result1 = queryResult.getResult();
         assertThat(result1.value(), is(expectedValue));
         assertThat(result1.timestamp(), is(expectedTimestamp));
+        assertThat(result1.validTo(), is(expectedValidToTime));
         assertThat(queryResult.getExecutionInfo(), is(empty()));
     }
 
-    private void shouldVerifyGetNull(final Integer key, final Instant 
queryTimestamp) {
+    private void shouldVerifyGetNullForVersionedKeyQuery(final Integer key, 
final Instant queryTimestamp) {
         VersionedKeyQuery<Integer, Integer> query = 
VersionedKeyQuery.withKey(key);
         query = query.asOf(queryTimestamp);
         final StateQueryRequest<VersionedRecord<Integer>> request = 
StateQueryRequest.inStore(STORE_NAME).withQuery(query).withPositionBound(PositionBound.at(INPUT_POSITION));
         final StateQueryResult<VersionedRecord<Integer>> result = 
IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request);
         assertThat(result.getOnlyPartitionResult(), nullValue());
     }
+
+    private void shouldHandleMultiVersionedKeyQuery(final Optional<Instant> 
fromTime, final Optional<Instant> toTime,
+                                                    final ResultOrder order, 
final int expectedArrayLowerBound, final int expectedArrayUpperBound) {
+        // define query
+        MultiVersionedKeyQuery<Integer, Integer> query = 
MultiVersionedKeyQuery.withKey(RECORD_KEY);
+        if (fromTime.isPresent()) {
+            query = query.fromTime(fromTime.get());
+        }
+        if (toTime.isPresent()) {
+            query = query.toTime(toTime.get());
+        }
+        if (order.equals(ResultOrder.ASCENDING)) {
+            query = query.withAscendingTimestamps();
+        }
+
+        // send request and get the results
+        final StateQueryRequest<VersionedRecordIterator<Integer>> request = 
StateQueryRequest.inStore(STORE_NAME).withQuery(query).withPositionBound(PositionBound.at(INPUT_POSITION));
+        final StateQueryResult<VersionedRecordIterator<Integer>> result = 
IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request);
+        final QueryResult<VersionedRecordIterator<Integer>> queryResult = 
result.getOnlyPartitionResult();
+        if (queryResult.isFailure()) {
+            throw new AssertionError(queryResult.toString());
+        }
+        assertThat(queryResult.isSuccess(), is(true));
+
+        assertThrows(IllegalArgumentException.class, 
queryResult::getFailureReason);
+        assertThrows(IllegalArgumentException.class, 
queryResult::getFailureMessage);
+
+        // verify results
+        final Map<Integer, QueryResult<VersionedRecordIterator<Integer>>> 
partitionResults = result.getPartitionResults();
+        for (final Entry<Integer, 
QueryResult<VersionedRecordIterator<Integer>>> partitionResultsEntry : 
partitionResults.entrySet()) {
+            try (final VersionedRecordIterator<Integer> iterator = 
partitionResultsEntry.getValue().getResult()) {
+                int i = order.equals(ResultOrder.ASCENDING) ? 0 : 
expectedArrayUpperBound;
+                int iteratorSize = 0;
+                while (iterator.hasNext()) {
+                    final VersionedRecord<Integer> record = iterator.next();
+                    final Long timestamp = record.timestamp();
+                    final Optional<Long> validTo = record.validTo();
+                    final Integer value = record.value();
+
+                    final Optional<Long> expectedValidTo = i < 
expectedArrayUpperBound ? Optional.of(RECORD_TIMESTAMPS[i + 1]) : 
Optional.empty();
+                    assertThat(value, is(RECORD_VALUES[i]));
+                    assertThat(timestamp, is(RECORD_TIMESTAMPS[i]));
+                    assertThat(validTo, is(expectedValidTo));
+                    assertThat(queryResult.getExecutionInfo(), is(empty()));
+                    i = order.equals(ResultOrder.ASCENDING) ? i + 1 : i - 1;
+                    iteratorSize++;
+                }
+                // The number of returned records by query is equal to 
expected number of records
+                assertThat(iteratorSize, equalTo(expectedArrayUpperBound - 
expectedArrayLowerBound + 1));
+            }
+        }
+    }
+
+    private void shouldVerifyGetNullForMultiVersionedKeyQuery(final Integer 
key, final Optional<Instant> fromTime, final Optional<Instant> toTime) {
+        // define query
+        MultiVersionedKeyQuery<Integer, Integer> query = 
MultiVersionedKeyQuery.withKey(key);
+        if (fromTime.isPresent()) {
+            query = query.fromTime(fromTime.get());
+        }
+        if (toTime.isPresent()) {
+            query = query.toTime(toTime.get());
+        }
+
+        // send query and receive results
+        final StateQueryRequest<VersionedRecordIterator<Integer>> request = 
StateQueryRequest.inStore(STORE_NAME).withQuery(query).withPositionBound(PositionBound.at(INPUT_POSITION));
+        final StateQueryResult<VersionedRecordIterator<Integer>> result = 
IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request);
+        final QueryResult<VersionedRecordIterator<Integer>> queryResult = 
result.getOnlyPartitionResult();
+
+        if (queryResult.isFailure()) {
+            throw new AssertionError(queryResult.toString());
+        }
+        assertThat(queryResult.isSuccess(), is(true));
+
+        assertThrows(IllegalArgumentException.class, 
queryResult::getFailureReason);
+        assertThrows(IllegalArgumentException.class, 
queryResult::getFailureMessage);
+
+        // verify results
+        final Map<Integer, QueryResult<VersionedRecordIterator<Integer>>> 
partitionResults = result.getPartitionResults();
+        for (final Entry<Integer, 
QueryResult<VersionedRecordIterator<Integer>>> partitionResultsEntry : 
partitionResults.entrySet()) {
+            try (final VersionedRecordIterator<Integer> iterator = 
partitionResultsEntry.getValue().getResult()) {
+                assertFalse(iterator.hasNext());
+            }
+        }
+    }
+
+    private void shouldHandleRaceCondition() {
+        final MultiVersionedKeyQuery<Integer, Integer> query = 
MultiVersionedKeyQuery.withKey(RECORD_KEY);
+
+        final StateQueryRequest<VersionedRecordIterator<Integer>> request = 
StateQueryRequest.inStore(STORE_NAME).withQuery(query).withPositionBound(PositionBound.at(INPUT_POSITION));
+        final StateQueryResult<VersionedRecordIterator<Integer>> result = 
IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request);
+        final QueryResult<VersionedRecordIterator<Integer>> queryResult = 
result.getOnlyPartitionResult();
+        if (queryResult.isFailure()) {
+            throw new AssertionError(queryResult.toString());
+        }
+        assertThat(queryResult.isSuccess(), is(true));
+
+        assertThrows(IllegalArgumentException.class, 
queryResult::getFailureReason);
+        assertThrows(IllegalArgumentException.class, 
queryResult::getFailureMessage);
+
+        // verify results in two steps
+        final Map<Integer, QueryResult<VersionedRecordIterator<Integer>>> 
partitionResults = result.getPartitionResults();
+        for (final Entry<Integer, 
QueryResult<VersionedRecordIterator<Integer>>> partitionResultsEntry : 
partitionResults.entrySet()) {
+            try (final VersionedRecordIterator<Integer> iterator = 
partitionResultsEntry.getValue().getResult()) {
+                int i = LAST_INDEX;
+                int iteratorSize = 0;
+
+                // step 1:
+                while (iterator.hasNext()) {
+                    final VersionedRecord<Integer> record = iterator.next();
+                    final Long timestamp = record.timestamp();
+                    final Optional<Long> validTo = record.validTo();
+                    final Integer value = record.value();
+
+                    final Optional<Long> expectedValidTo = i < LAST_INDEX ? 
Optional.of(RECORD_TIMESTAMPS[i + 1]) : Optional.empty();
+                    assertThat(value, is(RECORD_VALUES[i]));
+                    assertThat(timestamp, is(RECORD_TIMESTAMPS[i]));
+                    assertThat(validTo, is(expectedValidTo));
+                    assertThat(queryResult.getExecutionInfo(), is(empty()));
+                    i--;
+                    iteratorSize++;
+                    if (i == 2)
+                        break;
+                }
+
+                // update the value of the oldest record
+                updateRecordValue();
+
+                // step 2: continue reading records from through the already 
opened iterator
+                while (iterator.hasNext()) {
+                    final VersionedRecord<Integer> record = iterator.next();
+                    final Long timestamp = record.timestamp();
+                    final Optional<Long> validTo = record.validTo();
+                    final Integer value = record.value();
+
+                    final Optional<Long> expectedValidTo = i < LAST_INDEX ? 
Optional.of(RECORD_TIMESTAMPS[i + 1]) : Optional.empty();
+                    assertThat(value, is(RECORD_VALUES[i]));
+                    assertThat(timestamp, is(RECORD_TIMESTAMPS[i]));
+                    assertThat(validTo, is(expectedValidTo));
+                    assertThat(queryResult.getExecutionInfo(), is(empty()));
+                    i--;
+                    iteratorSize++;
+                }
+
+                // The number of returned records by query is equal to 
expected number of records
+                assertThat(iteratorSize, equalTo(RECORD_NUMBER));
+            }
+        }
+    }
+
+    private void updateRecordValue() {
+        // update the record value at RECORD_TIMESTAMPS[0]
+        final Properties producerProps = new Properties();
+        producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers());
+        producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
IntegerSerializer.class);
+        producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
IntegerSerializer.class);
+        try (final KafkaProducer<Integer, Integer> producer = new 
KafkaProducer<>(producerProps)) {
+            producer.send(new ProducerRecord<>(INPUT_TOPIC_NAME, 0, 
RECORD_TIMESTAMPS[0], RECORD_KEY, 999999));

Review Comment:
   Why do we re-use `RECORD_TIMESTAMPS[0]` here? Not sure if I can follow? -- 
Won't this overwrite the first/oldest value which was already returned in the 
first `while` loop before `updateRecordValue` is executed?



##########
streams/src/test/java/org/apache/kafka/streams/integration/IQv2VersionedStoreIntegrationTest.java:
##########
@@ -153,14 +192,189 @@ private void shouldHandleVersionedKeyQuery(final Integer 
key,
         final VersionedRecord<Integer> result1 = queryResult.getResult();
         assertThat(result1.value(), is(expectedValue));
         assertThat(result1.timestamp(), is(expectedTimestamp));
+        assertThat(result1.validTo(), is(expectedValidToTime));
         assertThat(queryResult.getExecutionInfo(), is(empty()));
     }
 
-    private void shouldVerifyGetNull(final Integer key, final Instant 
queryTimestamp) {
+    private void shouldVerifyGetNullForVersionedKeyQuery(final Integer key, 
final Instant queryTimestamp) {
         VersionedKeyQuery<Integer, Integer> query = 
VersionedKeyQuery.withKey(key);
         query = query.asOf(queryTimestamp);
         final StateQueryRequest<VersionedRecord<Integer>> request = 
StateQueryRequest.inStore(STORE_NAME).withQuery(query).withPositionBound(PositionBound.at(INPUT_POSITION));
         final StateQueryResult<VersionedRecord<Integer>> result = 
IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request);
         assertThat(result.getOnlyPartitionResult(), nullValue());
     }
+
+    private void shouldHandleMultiVersionedKeyQuery(final Optional<Instant> 
fromTime, final Optional<Instant> toTime,
+                                                    final ResultOrder order, 
final int expectedArrayLowerBound, final int expectedArrayUpperBound) {
+        // define query
+        MultiVersionedKeyQuery<Integer, Integer> query = 
MultiVersionedKeyQuery.withKey(RECORD_KEY);
+        if (fromTime.isPresent()) {
+            query = query.fromTime(fromTime.get());
+        }
+        if (toTime.isPresent()) {
+            query = query.toTime(toTime.get());
+        }
+        if (order.equals(ResultOrder.ASCENDING)) {
+            query = query.withAscendingTimestamps();
+        }
+
+        // send request and get the results
+        final StateQueryRequest<VersionedRecordIterator<Integer>> request = 
StateQueryRequest.inStore(STORE_NAME).withQuery(query).withPositionBound(PositionBound.at(INPUT_POSITION));
+        final StateQueryResult<VersionedRecordIterator<Integer>> result = 
IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request);
+        final QueryResult<VersionedRecordIterator<Integer>> queryResult = 
result.getOnlyPartitionResult();
+        if (queryResult.isFailure()) {
+            throw new AssertionError(queryResult.toString());
+        }
+        assertThat(queryResult.isSuccess(), is(true));
+
+        assertThrows(IllegalArgumentException.class, 
queryResult::getFailureReason);
+        assertThrows(IllegalArgumentException.class, 
queryResult::getFailureMessage);
+
+        // verify results
+        final Map<Integer, QueryResult<VersionedRecordIterator<Integer>>> 
partitionResults = result.getPartitionResults();
+        for (final Entry<Integer, 
QueryResult<VersionedRecordIterator<Integer>>> partitionResultsEntry : 
partitionResults.entrySet()) {
+            try (final VersionedRecordIterator<Integer> iterator = 
partitionResultsEntry.getValue().getResult()) {
+                int i = order.equals(ResultOrder.ASCENDING) ? 0 : 
expectedArrayUpperBound;
+                int iteratorSize = 0;
+                while (iterator.hasNext()) {
+                    final VersionedRecord<Integer> record = iterator.next();
+                    final Long timestamp = record.timestamp();
+                    final Optional<Long> validTo = record.validTo();
+                    final Integer value = record.value();
+
+                    final Optional<Long> expectedValidTo = i < 
expectedArrayUpperBound ? Optional.of(RECORD_TIMESTAMPS[i + 1]) : 
Optional.empty();
+                    assertThat(value, is(RECORD_VALUES[i]));
+                    assertThat(timestamp, is(RECORD_TIMESTAMPS[i]));
+                    assertThat(validTo, is(expectedValidTo));
+                    assertThat(queryResult.getExecutionInfo(), is(empty()));
+                    i = order.equals(ResultOrder.ASCENDING) ? i + 1 : i - 1;
+                    iteratorSize++;
+                }
+                // The number of returned records by query is equal to 
expected number of records
+                assertThat(iteratorSize, equalTo(expectedArrayUpperBound - 
expectedArrayLowerBound + 1));
+            }
+        }
+    }
+
+    private void shouldVerifyGetNullForMultiVersionedKeyQuery(final Integer 
key, final Optional<Instant> fromTime, final Optional<Instant> toTime) {
+        // define query
+        MultiVersionedKeyQuery<Integer, Integer> query = 
MultiVersionedKeyQuery.withKey(key);
+        if (fromTime.isPresent()) {
+            query = query.fromTime(fromTime.get());
+        }
+        if (toTime.isPresent()) {
+            query = query.toTime(toTime.get());
+        }
+
+        // send query and receive results
+        final StateQueryRequest<VersionedRecordIterator<Integer>> request = 
StateQueryRequest.inStore(STORE_NAME).withQuery(query).withPositionBound(PositionBound.at(INPUT_POSITION));
+        final StateQueryResult<VersionedRecordIterator<Integer>> result = 
IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request);
+        final QueryResult<VersionedRecordIterator<Integer>> queryResult = 
result.getOnlyPartitionResult();
+
+        if (queryResult.isFailure()) {
+            throw new AssertionError(queryResult.toString());
+        }
+        assertThat(queryResult.isSuccess(), is(true));
+
+        assertThrows(IllegalArgumentException.class, 
queryResult::getFailureReason);
+        assertThrows(IllegalArgumentException.class, 
queryResult::getFailureMessage);
+
+        // verify results
+        final Map<Integer, QueryResult<VersionedRecordIterator<Integer>>> 
partitionResults = result.getPartitionResults();
+        for (final Entry<Integer, 
QueryResult<VersionedRecordIterator<Integer>>> partitionResultsEntry : 
partitionResults.entrySet()) {
+            try (final VersionedRecordIterator<Integer> iterator = 
partitionResultsEntry.getValue().getResult()) {
+                assertFalse(iterator.hasNext());
+            }
+        }
+    }
+
+    private void shouldHandleRaceCondition() {
+        final MultiVersionedKeyQuery<Integer, Integer> query = 
MultiVersionedKeyQuery.withKey(RECORD_KEY);
+
+        final StateQueryRequest<VersionedRecordIterator<Integer>> request = 
StateQueryRequest.inStore(STORE_NAME).withQuery(query).withPositionBound(PositionBound.at(INPUT_POSITION));
+        final StateQueryResult<VersionedRecordIterator<Integer>> result = 
IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request);
+        final QueryResult<VersionedRecordIterator<Integer>> queryResult = 
result.getOnlyPartitionResult();
+        if (queryResult.isFailure()) {
+            throw new AssertionError(queryResult.toString());
+        }
+        assertThat(queryResult.isSuccess(), is(true));
+
+        assertThrows(IllegalArgumentException.class, 
queryResult::getFailureReason);
+        assertThrows(IllegalArgumentException.class, 
queryResult::getFailureMessage);
+
+        // verify results in two steps
+        final Map<Integer, QueryResult<VersionedRecordIterator<Integer>>> 
partitionResults = result.getPartitionResults();
+        for (final Entry<Integer, 
QueryResult<VersionedRecordIterator<Integer>>> partitionResultsEntry : 
partitionResults.entrySet()) {
+            try (final VersionedRecordIterator<Integer> iterator = 
partitionResultsEntry.getValue().getResult()) {
+                int i = LAST_INDEX;
+                int iteratorSize = 0;
+
+                // step 1:
+                while (iterator.hasNext()) {
+                    final VersionedRecord<Integer> record = iterator.next();
+                    final Long timestamp = record.timestamp();
+                    final Optional<Long> validTo = record.validTo();
+                    final Integer value = record.value();
+
+                    final Optional<Long> expectedValidTo = i < LAST_INDEX ? 
Optional.of(RECORD_TIMESTAMPS[i + 1]) : Optional.empty();
+                    assertThat(value, is(RECORD_VALUES[i]));
+                    assertThat(timestamp, is(RECORD_TIMESTAMPS[i]));
+                    assertThat(validTo, is(expectedValidTo));
+                    assertThat(queryResult.getExecutionInfo(), is(empty()));
+                    i--;
+                    iteratorSize++;
+                    if (i == 2)
+                        break;
+                }
+
+                // update the value of the oldest record
+                updateRecordValue();
+
+                // step 2: continue reading records from through the already 
opened iterator
+                while (iterator.hasNext()) {
+                    final VersionedRecord<Integer> record = iterator.next();
+                    final Long timestamp = record.timestamp();
+                    final Optional<Long> validTo = record.validTo();
+                    final Integer value = record.value();
+
+                    final Optional<Long> expectedValidTo = i < LAST_INDEX ? 
Optional.of(RECORD_TIMESTAMPS[i + 1]) : Optional.empty();
+                    assertThat(value, is(RECORD_VALUES[i]));
+                    assertThat(timestamp, is(RECORD_TIMESTAMPS[i]));
+                    assertThat(validTo, is(expectedValidTo));
+                    assertThat(queryResult.getExecutionInfo(), is(empty()));
+                    i--;
+                    iteratorSize++;
+                }
+
+                // The number of returned records by query is equal to 
expected number of records
+                assertThat(iteratorSize, equalTo(RECORD_NUMBER));
+            }
+        }
+    }
+
+    private void updateRecordValue() {
+        // update the record value at RECORD_TIMESTAMPS[0]
+        final Properties producerProps = new Properties();
+        producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers());
+        producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
IntegerSerializer.class);
+        producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
IntegerSerializer.class);
+        try (final KafkaProducer<Integer, Integer> producer = new 
KafkaProducer<>(producerProps)) {
+            producer.send(new ProducerRecord<>(INPUT_TOPIC_NAME, 0, 
RECORD_TIMESTAMPS[0], RECORD_KEY, 999999));
+        }
+        INPUT_POSITION.withComponent(INPUT_TOPIC_NAME, 0, 4);
+        assertThat(INPUT_POSITION, 
equalTo(Position.emptyPosition().withComponent(INPUT_TOPIC_NAME, 0, 4)));
+
+        // make sure that the new value is picked up by the store
+        final Properties consumerProps = new Properties();
+        consumerProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers());
+        
consumerProps.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
IntegerDeserializer.class.getName());
+        
consumerProps.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
IntegerDeserializer.class.getName());
+        consumerProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "foo");
+        consumerProps.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"earliest");
+        try (final KafkaConsumer<Integer, Integer> consumer = new 
KafkaConsumer<>(consumerProps)) {
+            consumer.subscribe(Collections.singletonList(INPUT_TOPIC_NAME));
+            // the last record is the newly added one
+            assertThat(consumer.poll(Duration.ofMillis(1000)).count(), 
equalTo(RECORD_NUMBER + 1));

Review Comment:
   We don't really have a guarantee that `poll()` returns all record in the 
first call... Use 
`org.apache.kafka.streams.integration.utils.IntegrationTestUtils` corresponding 
helpers `waitXxx(...)`. -- Otherwise we introduce a potentially flaky test.



##########
streams/src/test/java/org/apache/kafka/streams/integration/IQv2VersionedStoreIntegrationTest.java:
##########
@@ -153,14 +192,189 @@ private void shouldHandleVersionedKeyQuery(final Integer 
key,
         final VersionedRecord<Integer> result1 = queryResult.getResult();
         assertThat(result1.value(), is(expectedValue));
         assertThat(result1.timestamp(), is(expectedTimestamp));
+        assertThat(result1.validTo(), is(expectedValidToTime));
         assertThat(queryResult.getExecutionInfo(), is(empty()));
     }
 
-    private void shouldVerifyGetNull(final Integer key, final Instant 
queryTimestamp) {
+    private void shouldVerifyGetNullForVersionedKeyQuery(final Integer key, 
final Instant queryTimestamp) {
         VersionedKeyQuery<Integer, Integer> query = 
VersionedKeyQuery.withKey(key);
         query = query.asOf(queryTimestamp);
         final StateQueryRequest<VersionedRecord<Integer>> request = 
StateQueryRequest.inStore(STORE_NAME).withQuery(query).withPositionBound(PositionBound.at(INPUT_POSITION));
         final StateQueryResult<VersionedRecord<Integer>> result = 
IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request);
         assertThat(result.getOnlyPartitionResult(), nullValue());
     }
+
+    private void shouldHandleMultiVersionedKeyQuery(final Optional<Instant> 
fromTime, final Optional<Instant> toTime,
+                                                    final ResultOrder order, 
final int expectedArrayLowerBound, final int expectedArrayUpperBound) {
+        // define query
+        MultiVersionedKeyQuery<Integer, Integer> query = 
MultiVersionedKeyQuery.withKey(RECORD_KEY);
+        if (fromTime.isPresent()) {
+            query = query.fromTime(fromTime.get());
+        }
+        if (toTime.isPresent()) {
+            query = query.toTime(toTime.get());
+        }
+        if (order.equals(ResultOrder.ASCENDING)) {
+            query = query.withAscendingTimestamps();
+        }
+
+        // send request and get the results
+        final StateQueryRequest<VersionedRecordIterator<Integer>> request = 
StateQueryRequest.inStore(STORE_NAME).withQuery(query).withPositionBound(PositionBound.at(INPUT_POSITION));
+        final StateQueryResult<VersionedRecordIterator<Integer>> result = 
IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request);
+        final QueryResult<VersionedRecordIterator<Integer>> queryResult = 
result.getOnlyPartitionResult();
+        if (queryResult.isFailure()) {
+            throw new AssertionError(queryResult.toString());
+        }
+        assertThat(queryResult.isSuccess(), is(true));
+
+        assertThrows(IllegalArgumentException.class, 
queryResult::getFailureReason);
+        assertThrows(IllegalArgumentException.class, 
queryResult::getFailureMessage);
+
+        // verify results
+        final Map<Integer, QueryResult<VersionedRecordIterator<Integer>>> 
partitionResults = result.getPartitionResults();
+        for (final Entry<Integer, 
QueryResult<VersionedRecordIterator<Integer>>> partitionResultsEntry : 
partitionResults.entrySet()) {
+            try (final VersionedRecordIterator<Integer> iterator = 
partitionResultsEntry.getValue().getResult()) {
+                int i = order.equals(ResultOrder.ASCENDING) ? 0 : 
expectedArrayUpperBound;
+                int iteratorSize = 0;
+                while (iterator.hasNext()) {
+                    final VersionedRecord<Integer> record = iterator.next();
+                    final Long timestamp = record.timestamp();
+                    final Optional<Long> validTo = record.validTo();
+                    final Integer value = record.value();
+
+                    final Optional<Long> expectedValidTo = i < 
expectedArrayUpperBound ? Optional.of(RECORD_TIMESTAMPS[i + 1]) : 
Optional.empty();
+                    assertThat(value, is(RECORD_VALUES[i]));
+                    assertThat(timestamp, is(RECORD_TIMESTAMPS[i]));
+                    assertThat(validTo, is(expectedValidTo));
+                    assertThat(queryResult.getExecutionInfo(), is(empty()));
+                    i = order.equals(ResultOrder.ASCENDING) ? i + 1 : i - 1;
+                    iteratorSize++;
+                }
+                // The number of returned records by query is equal to 
expected number of records
+                assertThat(iteratorSize, equalTo(expectedArrayUpperBound - 
expectedArrayLowerBound + 1));
+            }
+        }
+    }
+
+    private void shouldVerifyGetNullForMultiVersionedKeyQuery(final Integer 
key, final Optional<Instant> fromTime, final Optional<Instant> toTime) {
+        // define query
+        MultiVersionedKeyQuery<Integer, Integer> query = 
MultiVersionedKeyQuery.withKey(key);
+        if (fromTime.isPresent()) {
+            query = query.fromTime(fromTime.get());
+        }
+        if (toTime.isPresent()) {
+            query = query.toTime(toTime.get());
+        }
+
+        // send query and receive results
+        final StateQueryRequest<VersionedRecordIterator<Integer>> request = 
StateQueryRequest.inStore(STORE_NAME).withQuery(query).withPositionBound(PositionBound.at(INPUT_POSITION));
+        final StateQueryResult<VersionedRecordIterator<Integer>> result = 
IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request);
+        final QueryResult<VersionedRecordIterator<Integer>> queryResult = 
result.getOnlyPartitionResult();
+
+        if (queryResult.isFailure()) {
+            throw new AssertionError(queryResult.toString());
+        }
+        assertThat(queryResult.isSuccess(), is(true));
+
+        assertThrows(IllegalArgumentException.class, 
queryResult::getFailureReason);
+        assertThrows(IllegalArgumentException.class, 
queryResult::getFailureMessage);
+
+        // verify results
+        final Map<Integer, QueryResult<VersionedRecordIterator<Integer>>> 
partitionResults = result.getPartitionResults();
+        for (final Entry<Integer, 
QueryResult<VersionedRecordIterator<Integer>>> partitionResultsEntry : 
partitionResults.entrySet()) {
+            try (final VersionedRecordIterator<Integer> iterator = 
partitionResultsEntry.getValue().getResult()) {
+                assertFalse(iterator.hasNext());
+            }
+        }
+    }
+
+    private void shouldHandleRaceCondition() {

Review Comment:
   Guess this test needs some comment describing the race condition (and that 
it verify's that using `Snapshots` works correctly).



##########
streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreTest.java:
##########
@@ -471,15 +475,195 @@ public void shouldDistinguishEmptyAndNull() {
         verifyGetNullFromStore("k");
         verifyTimestampedGetNullFromStore("k", SEGMENT_INTERVAL + 30);
         verifyTimestampedGetNullFromStore("k", SEGMENT_INTERVAL + 15);
-        verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL + 6, "", 
SEGMENT_INTERVAL + 5);
+        verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL + 6, "", 
SEGMENT_INTERVAL + 5, SEGMENT_INTERVAL + 10);
         verifyTimestampedGetNullFromStore("k", SEGMENT_INTERVAL + 2);
         verifyTimestampedGetNullFromStore("k", SEGMENT_INTERVAL);
         verifyTimestampedGetNullFromStore("k", SEGMENT_INTERVAL - 1);
-        verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL - 5, "", 
SEGMENT_INTERVAL - 5);
-        verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL - 6, "", 
SEGMENT_INTERVAL - 6);
+        verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL - 5, "", 
SEGMENT_INTERVAL - 5, SEGMENT_INTERVAL - 1);
+        verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL - 6, "", 
SEGMENT_INTERVAL - 6, SEGMENT_INTERVAL - 5);
         verifyTimestampedGetNullFromStore("k", SEGMENT_INTERVAL - 8);
     }
 
+    @Test
+    public void shouldGetRecordVersionsFromOlderSegments() {
+        // use a different key to create three different segments
+        putToStore("ko", null, SEGMENT_INTERVAL - 10, 
PUT_RETURN_CODE_VALID_TO_UNDEFINED);
+        putToStore("ko", null, 2 * SEGMENT_INTERVAL - 10, 
PUT_RETURN_CODE_VALID_TO_UNDEFINED);
+        putToStore("ko", null, 3 * SEGMENT_INTERVAL - 10, 
PUT_RETURN_CODE_VALID_TO_UNDEFINED);
+
+        // return null after visiting all segments (the key does not exist.)
+        verifyTimestampedGetNullFromStore("k", SEGMENT_INTERVAL - 20, 
SEGMENT_INTERVAL);
+
+        // insert data to create non-empty (first) segment
+        putToStore("k", "v1", SEGMENT_INTERVAL - 30, 
PUT_RETURN_CODE_VALID_TO_UNDEFINED);
+        putToStore("k", "v2", SEGMENT_INTERVAL - 25, 
PUT_RETURN_CODE_VALID_TO_UNDEFINED);
+        putToStore("k", null, SEGMENT_INTERVAL - 20, 
PUT_RETURN_CODE_VALID_TO_UNDEFINED);
+        putToStore("k", null, SEGMENT_INTERVAL - 15, 
PUT_RETURN_CODE_VALID_TO_UNDEFINED);
+        putToStore("k", "v3", SEGMENT_INTERVAL - 10, 
PUT_RETURN_CODE_VALID_TO_UNDEFINED);
+        putToStore("k", "v4", SEGMENT_INTERVAL - 5, 
PUT_RETURN_CODE_VALID_TO_UNDEFINED);
+
+
+        // return null for the query with a time range prior to inserting 
values
+        verifyTimestampedGetNullFromStore("k", SEGMENT_INTERVAL - 40, 
SEGMENT_INTERVAL - 35);
+
+        // return values for the query with query time range in which values 
are still valid and there are multiple tombstones
+        verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL - 30, 
SEGMENT_INTERVAL - 5, ResultOrder.ANY,
+                                            Arrays.asList("v4", "v3", "v2", 
"v1"),
+                                            Arrays.asList(SEGMENT_INTERVAL - 
5, SEGMENT_INTERVAL - 10, SEGMENT_INTERVAL - 25, SEGMENT_INTERVAL - 30),
+                                            
Arrays.asList(PUT_RETURN_CODE_VALID_TO_UNDEFINED, SEGMENT_INTERVAL - 5, 
SEGMENT_INTERVAL - 20, SEGMENT_INTERVAL - 25));
+
+        // return values for the query with time range (MIN, MAX)
+        verifyTimestampedGetValueFromStore("k", Long.MIN_VALUE, 
Long.MAX_VALUE, ResultOrder.ANY,
+                                           Arrays.asList("v4", "v3", "v2", 
"v1"),
+                                           Arrays.asList(SEGMENT_INTERVAL - 5, 
SEGMENT_INTERVAL - 10, SEGMENT_INTERVAL - 25, SEGMENT_INTERVAL - 30),
+                                           
Arrays.asList(PUT_RETURN_CODE_VALID_TO_UNDEFINED, SEGMENT_INTERVAL - 5, 
SEGMENT_INTERVAL - 20, SEGMENT_INTERVAL - 25));
+
+        // return the latest record (retrieve only from the latestValueStore)
+        verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL - 4, 
SEGMENT_INTERVAL, ResultOrder.ANY,
+                                            Collections.singletonList("v4"),
+                                            
Collections.singletonList(SEGMENT_INTERVAL - 5),
+                                            
Collections.singletonList(PUT_RETURN_CODE_VALID_TO_UNDEFINED));
+
+        // return two values for the query with time fromTimeStamp = 
toTimestamp
+        verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL - 5, 
SEGMENT_INTERVAL - 5, ResultOrder.ANY,

Review Comment:
   Should this not return only one value, `v4`, because `v3` is not valid any 
longer? It's upper bound is exclusive, and `[-10,-5)` does not intersect with 
search interval `[-5,-5]` (a `[-5,-5]` range query is basically a point in time 
query and should return the same as `KeyQuery#asOf(-5)`), right?



##########
streams/src/test/java/org/apache/kafka/streams/integration/IQv2VersionedStoreIntegrationTest.java:
##########
@@ -153,14 +192,189 @@ private void shouldHandleVersionedKeyQuery(final Integer 
key,
         final VersionedRecord<Integer> result1 = queryResult.getResult();
         assertThat(result1.value(), is(expectedValue));
         assertThat(result1.timestamp(), is(expectedTimestamp));
+        assertThat(result1.validTo(), is(expectedValidToTime));
         assertThat(queryResult.getExecutionInfo(), is(empty()));
     }
 
-    private void shouldVerifyGetNull(final Integer key, final Instant 
queryTimestamp) {
+    private void shouldVerifyGetNullForVersionedKeyQuery(final Integer key, 
final Instant queryTimestamp) {
         VersionedKeyQuery<Integer, Integer> query = 
VersionedKeyQuery.withKey(key);
         query = query.asOf(queryTimestamp);
         final StateQueryRequest<VersionedRecord<Integer>> request = 
StateQueryRequest.inStore(STORE_NAME).withQuery(query).withPositionBound(PositionBound.at(INPUT_POSITION));
         final StateQueryResult<VersionedRecord<Integer>> result = 
IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request);
         assertThat(result.getOnlyPartitionResult(), nullValue());
     }
+
+    private void shouldHandleMultiVersionedKeyQuery(final Optional<Instant> 
fromTime, final Optional<Instant> toTime,
+                                                    final ResultOrder order, 
final int expectedArrayLowerBound, final int expectedArrayUpperBound) {
+        // define query
+        MultiVersionedKeyQuery<Integer, Integer> query = 
MultiVersionedKeyQuery.withKey(RECORD_KEY);
+        if (fromTime.isPresent()) {
+            query = query.fromTime(fromTime.get());
+        }
+        if (toTime.isPresent()) {
+            query = query.toTime(toTime.get());
+        }
+        if (order.equals(ResultOrder.ASCENDING)) {
+            query = query.withAscendingTimestamps();
+        }
+
+        // send request and get the results
+        final StateQueryRequest<VersionedRecordIterator<Integer>> request = 
StateQueryRequest.inStore(STORE_NAME).withQuery(query).withPositionBound(PositionBound.at(INPUT_POSITION));
+        final StateQueryResult<VersionedRecordIterator<Integer>> result = 
IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request);
+        final QueryResult<VersionedRecordIterator<Integer>> queryResult = 
result.getOnlyPartitionResult();
+        if (queryResult.isFailure()) {
+            throw new AssertionError(queryResult.toString());
+        }
+        assertThat(queryResult.isSuccess(), is(true));
+
+        assertThrows(IllegalArgumentException.class, 
queryResult::getFailureReason);
+        assertThrows(IllegalArgumentException.class, 
queryResult::getFailureMessage);
+
+        // verify results
+        final Map<Integer, QueryResult<VersionedRecordIterator<Integer>>> 
partitionResults = result.getPartitionResults();
+        for (final Entry<Integer, 
QueryResult<VersionedRecordIterator<Integer>>> partitionResultsEntry : 
partitionResults.entrySet()) {
+            try (final VersionedRecordIterator<Integer> iterator = 
partitionResultsEntry.getValue().getResult()) {
+                int i = order.equals(ResultOrder.ASCENDING) ? 0 : 
expectedArrayUpperBound;
+                int iteratorSize = 0;
+                while (iterator.hasNext()) {
+                    final VersionedRecord<Integer> record = iterator.next();
+                    final Long timestamp = record.timestamp();
+                    final Optional<Long> validTo = record.validTo();
+                    final Integer value = record.value();
+
+                    final Optional<Long> expectedValidTo = i < 
expectedArrayUpperBound ? Optional.of(RECORD_TIMESTAMPS[i + 1]) : 
Optional.empty();
+                    assertThat(value, is(RECORD_VALUES[i]));
+                    assertThat(timestamp, is(RECORD_TIMESTAMPS[i]));
+                    assertThat(validTo, is(expectedValidTo));
+                    assertThat(queryResult.getExecutionInfo(), is(empty()));
+                    i = order.equals(ResultOrder.ASCENDING) ? i + 1 : i - 1;
+                    iteratorSize++;
+                }
+                // The number of returned records by query is equal to 
expected number of records
+                assertThat(iteratorSize, equalTo(expectedArrayUpperBound - 
expectedArrayLowerBound + 1));
+            }
+        }
+    }
+
+    private void shouldVerifyGetNullForMultiVersionedKeyQuery(final Integer 
key, final Optional<Instant> fromTime, final Optional<Instant> toTime) {
+        // define query
+        MultiVersionedKeyQuery<Integer, Integer> query = 
MultiVersionedKeyQuery.withKey(key);
+        if (fromTime.isPresent()) {
+            query = query.fromTime(fromTime.get());
+        }
+        if (toTime.isPresent()) {
+            query = query.toTime(toTime.get());
+        }
+
+        // send query and receive results
+        final StateQueryRequest<VersionedRecordIterator<Integer>> request = 
StateQueryRequest.inStore(STORE_NAME).withQuery(query).withPositionBound(PositionBound.at(INPUT_POSITION));
+        final StateQueryResult<VersionedRecordIterator<Integer>> result = 
IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request);
+        final QueryResult<VersionedRecordIterator<Integer>> queryResult = 
result.getOnlyPartitionResult();
+
+        if (queryResult.isFailure()) {
+            throw new AssertionError(queryResult.toString());
+        }
+        assertThat(queryResult.isSuccess(), is(true));
+
+        assertThrows(IllegalArgumentException.class, 
queryResult::getFailureReason);
+        assertThrows(IllegalArgumentException.class, 
queryResult::getFailureMessage);
+
+        // verify results
+        final Map<Integer, QueryResult<VersionedRecordIterator<Integer>>> 
partitionResults = result.getPartitionResults();
+        for (final Entry<Integer, 
QueryResult<VersionedRecordIterator<Integer>>> partitionResultsEntry : 
partitionResults.entrySet()) {
+            try (final VersionedRecordIterator<Integer> iterator = 
partitionResultsEntry.getValue().getResult()) {
+                assertFalse(iterator.hasNext());
+            }
+        }
+    }
+
+    private void shouldHandleRaceCondition() {
+        final MultiVersionedKeyQuery<Integer, Integer> query = 
MultiVersionedKeyQuery.withKey(RECORD_KEY);
+
+        final StateQueryRequest<VersionedRecordIterator<Integer>> request = 
StateQueryRequest.inStore(STORE_NAME).withQuery(query).withPositionBound(PositionBound.at(INPUT_POSITION));
+        final StateQueryResult<VersionedRecordIterator<Integer>> result = 
IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request);
+        final QueryResult<VersionedRecordIterator<Integer>> queryResult = 
result.getOnlyPartitionResult();
+        if (queryResult.isFailure()) {
+            throw new AssertionError(queryResult.toString());
+        }
+        assertThat(queryResult.isSuccess(), is(true));
+
+        assertThrows(IllegalArgumentException.class, 
queryResult::getFailureReason);
+        assertThrows(IllegalArgumentException.class, 
queryResult::getFailureMessage);
+
+        // verify results in two steps
+        final Map<Integer, QueryResult<VersionedRecordIterator<Integer>>> 
partitionResults = result.getPartitionResults();
+        for (final Entry<Integer, 
QueryResult<VersionedRecordIterator<Integer>>> partitionResultsEntry : 
partitionResults.entrySet()) {
+            try (final VersionedRecordIterator<Integer> iterator = 
partitionResultsEntry.getValue().getResult()) {
+                int i = LAST_INDEX;
+                int iteratorSize = 0;
+
+                // step 1:
+                while (iterator.hasNext()) {
+                    final VersionedRecord<Integer> record = iterator.next();
+                    final Long timestamp = record.timestamp();
+                    final Optional<Long> validTo = record.validTo();
+                    final Integer value = record.value();
+
+                    final Optional<Long> expectedValidTo = i < LAST_INDEX ? 
Optional.of(RECORD_TIMESTAMPS[i + 1]) : Optional.empty();

Review Comment:
   Seem we can simplify this, given that we break if `i == 2 ad thus we never 
reach `LAST_INDEX` ?



##########
streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStoreTest.java:
##########
@@ -296,6 +300,19 @@ public void shouldThrowOnIQv2KeyQuery() {
         assertThrows(UnsupportedOperationException.class, () -> 
store.query(mock(KeyQuery.class), null, null));
     }
 
+    @SuppressWarnings("unchecked")

Review Comment:
   Can we avoid this suppression?



##########
streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreTest.java:
##########
@@ -471,15 +475,195 @@ public void shouldDistinguishEmptyAndNull() {
         verifyGetNullFromStore("k");
         verifyTimestampedGetNullFromStore("k", SEGMENT_INTERVAL + 30);
         verifyTimestampedGetNullFromStore("k", SEGMENT_INTERVAL + 15);
-        verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL + 6, "", 
SEGMENT_INTERVAL + 5);
+        verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL + 6, "", 
SEGMENT_INTERVAL + 5, SEGMENT_INTERVAL + 10);
         verifyTimestampedGetNullFromStore("k", SEGMENT_INTERVAL + 2);
         verifyTimestampedGetNullFromStore("k", SEGMENT_INTERVAL);
         verifyTimestampedGetNullFromStore("k", SEGMENT_INTERVAL - 1);
-        verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL - 5, "", 
SEGMENT_INTERVAL - 5);
-        verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL - 6, "", 
SEGMENT_INTERVAL - 6);
+        verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL - 5, "", 
SEGMENT_INTERVAL - 5, SEGMENT_INTERVAL - 1);
+        verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL - 6, "", 
SEGMENT_INTERVAL - 6, SEGMENT_INTERVAL - 5);
         verifyTimestampedGetNullFromStore("k", SEGMENT_INTERVAL - 8);
     }
 
+    @Test
+    public void shouldGetRecordVersionsFromOlderSegments() {
+        // use a different key to create three different segments
+        putToStore("ko", null, SEGMENT_INTERVAL - 10, 
PUT_RETURN_CODE_VALID_TO_UNDEFINED);
+        putToStore("ko", null, 2 * SEGMENT_INTERVAL - 10, 
PUT_RETURN_CODE_VALID_TO_UNDEFINED);
+        putToStore("ko", null, 3 * SEGMENT_INTERVAL - 10, 
PUT_RETURN_CODE_VALID_TO_UNDEFINED);
+
+        // return null after visiting all segments (the key does not exist.)
+        verifyTimestampedGetNullFromStore("k", SEGMENT_INTERVAL - 20, 
SEGMENT_INTERVAL);
+
+        // insert data to create non-empty (first) segment
+        putToStore("k", "v1", SEGMENT_INTERVAL - 30, 
PUT_RETURN_CODE_VALID_TO_UNDEFINED);
+        putToStore("k", "v2", SEGMENT_INTERVAL - 25, 
PUT_RETURN_CODE_VALID_TO_UNDEFINED);
+        putToStore("k", null, SEGMENT_INTERVAL - 20, 
PUT_RETURN_CODE_VALID_TO_UNDEFINED);
+        putToStore("k", null, SEGMENT_INTERVAL - 15, 
PUT_RETURN_CODE_VALID_TO_UNDEFINED);
+        putToStore("k", "v3", SEGMENT_INTERVAL - 10, 
PUT_RETURN_CODE_VALID_TO_UNDEFINED);
+        putToStore("k", "v4", SEGMENT_INTERVAL - 5, 
PUT_RETURN_CODE_VALID_TO_UNDEFINED);
+
+
+        // return null for the query with a time range prior to inserting 
values
+        verifyTimestampedGetNullFromStore("k", SEGMENT_INTERVAL - 40, 
SEGMENT_INTERVAL - 35);
+
+        // return values for the query with query time range in which values 
are still valid and there are multiple tombstones
+        verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL - 30, 
SEGMENT_INTERVAL - 5, ResultOrder.ANY,
+                                            Arrays.asList("v4", "v3", "v2", 
"v1"),
+                                            Arrays.asList(SEGMENT_INTERVAL - 
5, SEGMENT_INTERVAL - 10, SEGMENT_INTERVAL - 25, SEGMENT_INTERVAL - 30),
+                                            
Arrays.asList(PUT_RETURN_CODE_VALID_TO_UNDEFINED, SEGMENT_INTERVAL - 5, 
SEGMENT_INTERVAL - 20, SEGMENT_INTERVAL - 25));
+
+        // return values for the query with time range (MIN, MAX)
+        verifyTimestampedGetValueFromStore("k", Long.MIN_VALUE, 
Long.MAX_VALUE, ResultOrder.ANY,
+                                           Arrays.asList("v4", "v3", "v2", 
"v1"),
+                                           Arrays.asList(SEGMENT_INTERVAL - 5, 
SEGMENT_INTERVAL - 10, SEGMENT_INTERVAL - 25, SEGMENT_INTERVAL - 30),
+                                           
Arrays.asList(PUT_RETURN_CODE_VALID_TO_UNDEFINED, SEGMENT_INTERVAL - 5, 
SEGMENT_INTERVAL - 20, SEGMENT_INTERVAL - 25));
+
+        // return the latest record (retrieve only from the latestValueStore)
+        verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL - 4, 
SEGMENT_INTERVAL, ResultOrder.ANY,
+                                            Collections.singletonList("v4"),
+                                            
Collections.singletonList(SEGMENT_INTERVAL - 5),
+                                            
Collections.singletonList(PUT_RETURN_CODE_VALID_TO_UNDEFINED));
+
+        // return two values for the query with time fromTimeStamp = 
toTimestamp
+        verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL - 5, 
SEGMENT_INTERVAL - 5, ResultOrder.ANY,
+                                            Arrays.asList("v4", "v3"),
+                                            Arrays.asList(SEGMENT_INTERVAL - 
5, SEGMENT_INTERVAL - 10),
+                                            
Arrays.asList(PUT_RETURN_CODE_VALID_TO_UNDEFINED, SEGMENT_INTERVAL - 5));
+
+        // return one values for the query with time fromTimeStamp = 
toTimestamp
+        verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL - 4, 
SEGMENT_INTERVAL - 4, ResultOrder.ANY,
+                                            Collections.singletonList("v4"),
+                                            
Collections.singletonList(SEGMENT_INTERVAL - 5),
+                                            
Collections.singletonList(PUT_RETURN_CODE_VALID_TO_UNDEFINED));
+
+        // return values before insertion of any tombstone
+        verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL - 31, 
SEGMENT_INTERVAL - 21, ResultOrder.ANY,
+                                            Arrays.asList("v2", "v1"),
+                                            Arrays.asList(SEGMENT_INTERVAL - 
25, SEGMENT_INTERVAL - 30),
+                                            Arrays.asList(SEGMENT_INTERVAL - 
20, SEGMENT_INTERVAL - 25));
+
+        // return values for the query with time range that covers both 
tombstones
+        verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL - 24, 
SEGMENT_INTERVAL - 11, ResultOrder.ANY,
+                                           Collections.singletonList("v2"),
+                                           
Collections.singletonList(SEGMENT_INTERVAL - 25),
+                                           
Collections.singletonList(SEGMENT_INTERVAL - 20));
+
+        // return values for the query with time range that after insertion of 
tombstones
+        verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL - 11, 
SEGMENT_INTERVAL - 4, ResultOrder.ANY,
+                                           Arrays.asList("v4", "v3"),
+                                           Arrays.asList(SEGMENT_INTERVAL - 5, 
SEGMENT_INTERVAL - 10),
+                                           
Arrays.asList(PUT_RETURN_CODE_VALID_TO_UNDEFINED, SEGMENT_INTERVAL - 5));
+
+        // return all the records that are valid during the query time range 
but inserted beforehand
+        verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL - 26, 
SEGMENT_INTERVAL - 5, ResultOrder.ANY,
+                                            Arrays.asList("v4", "v3", "v2", 
"v1"),
+                                            Arrays.asList(SEGMENT_INTERVAL - 
5, SEGMENT_INTERVAL - 10, SEGMENT_INTERVAL - 25, SEGMENT_INTERVAL - 30),
+                                            
Arrays.asList(PUT_RETURN_CODE_VALID_TO_UNDEFINED, SEGMENT_INTERVAL - 5, 
SEGMENT_INTERVAL - 20, SEGMENT_INTERVAL - 25));
+
+        // return the valid record that is still valid till the beginning of 
query time range (validTo = query lower time bound)
+        verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL - 20, 
SEGMENT_INTERVAL - 15, ResultOrder.ANY,

Review Comment:
   This should be `-19` to `-14`
   
   I stop looking int the rest of the cases -- seems there is some "off by one" 
question?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to