mjsax commented on code in PR #14626: URL: https://github.com/apache/kafka/pull/14626#discussion_r1414568376
########## streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalSegmentIterator.java: ########## @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.query.ResultOrder; +import org.apache.kafka.streams.state.VersionedRecord; +import org.apache.kafka.streams.state.VersionedRecordIterator; +import org.rocksdb.Snapshot; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.ListIterator; +import java.util.NoSuchElementException; + +public class LogicalSegmentIterator implements VersionedRecordIterator { + protected final ListIterator<LogicalKeyValueSegment> segmentIterator; + private final Bytes key; + private final Long fromTime; + private final Long toTime; + private final ResultOrder order; + private ListIterator<VersionedRecord<byte[]>> iterator; + private volatile boolean open = true; + + // defined for creating/releasing the snapshot. + private LogicalKeyValueSegment snapshotOwner = null; + private Snapshot snapshot = null; + + + + public LogicalSegmentIterator(final ListIterator<LogicalKeyValueSegment> segmentIterator, + final Bytes key, + final Long fromTime, + final Long toTime, + final ResultOrder order) { + + this.segmentIterator = segmentIterator; + this.key = key; + this.fromTime = fromTime; + this.toTime = toTime; + this.iterator = Collections.emptyListIterator(); + this.order = order; + } + + @Override + public void close() { + open = false; + // user may refuse consuming all returned records, so release the snapshot when closing the iterator if it is not released yet! + releaseSnapshot(); + } + + @Override + public boolean hasNext() { + if (!open) { + throw new IllegalStateException("The iterator is out of scope."); + } + final boolean hasStillLoad = order.equals(ResultOrder.ASCENDING) ? iterator.hasPrevious() : iterator.hasNext(); Review Comment: We might was to add a comment about why `ASCENDING` is using `hasPrevious()` -- it's not intuitive and only makes sense if one knows that data is storing descending inside a segement. ########## streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java: ########## @@ -263,6 +268,31 @@ public VersionedRecord<byte[]> get(final Bytes key, final long asOfTimestamp) { return null; } + @SuppressWarnings("unchecked") + VersionedRecordIterator<byte[]> get(final Bytes key, final long fromTimestamp, final long toTimestamp, final ResultOrder order) { + validateStoreOpen(); + + if (toTimestamp < observedStreamTime - historyRetention) { + // history retention exceeded. we still check the latest value store in case the + // latest record version satisfies the timestamp bound, in which case it should + // still be returned (i.e., the latest record version per key never expires). + return new LogicalSegmentIterator(Collections.singletonList(latestValueStore).listIterator(), key, fromTimestamp, toTimestamp, order); + } else { + final List<LogicalKeyValueSegment> segments = new ArrayList<>(); + // add segment stores + // consider the search lower bound as -INF (LONG.MIN_VALUE) to find the record that has been inserted before the {@code fromTimestamp} + // but is still valid in query specified time interval. + if (order.equals(ResultOrder.ASCENDING)) { + segments.addAll(segmentStores.segments(Long.MIN_VALUE, toTimestamp, true)); Review Comment: Given we pass in `MIN_VALUE` twice, could we omit the first parameter? ########## streams/src/test/java/org/apache/kafka/streams/integration/IQv2VersionedStoreIntegrationTest.java: ########## @@ -153,14 +192,189 @@ private void shouldHandleVersionedKeyQuery(final Integer key, final VersionedRecord<Integer> result1 = queryResult.getResult(); assertThat(result1.value(), is(expectedValue)); assertThat(result1.timestamp(), is(expectedTimestamp)); + assertThat(result1.validTo(), is(expectedValidToTime)); assertThat(queryResult.getExecutionInfo(), is(empty())); } - private void shouldVerifyGetNull(final Integer key, final Instant queryTimestamp) { + private void shouldVerifyGetNullForVersionedKeyQuery(final Integer key, final Instant queryTimestamp) { VersionedKeyQuery<Integer, Integer> query = VersionedKeyQuery.withKey(key); query = query.asOf(queryTimestamp); final StateQueryRequest<VersionedRecord<Integer>> request = StateQueryRequest.inStore(STORE_NAME).withQuery(query).withPositionBound(PositionBound.at(INPUT_POSITION)); final StateQueryResult<VersionedRecord<Integer>> result = IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request); assertThat(result.getOnlyPartitionResult(), nullValue()); } + + private void shouldHandleMultiVersionedKeyQuery(final Optional<Instant> fromTime, final Optional<Instant> toTime, + final ResultOrder order, final int expectedArrayLowerBound, final int expectedArrayUpperBound) { + // define query + MultiVersionedKeyQuery<Integer, Integer> query = MultiVersionedKeyQuery.withKey(RECORD_KEY); + if (fromTime.isPresent()) { + query = query.fromTime(fromTime.get()); + } + if (toTime.isPresent()) { + query = query.toTime(toTime.get()); + } + if (order.equals(ResultOrder.ASCENDING)) { + query = query.withAscendingTimestamps(); + } + + // send request and get the results + final StateQueryRequest<VersionedRecordIterator<Integer>> request = StateQueryRequest.inStore(STORE_NAME).withQuery(query).withPositionBound(PositionBound.at(INPUT_POSITION)); + final StateQueryResult<VersionedRecordIterator<Integer>> result = IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request); + final QueryResult<VersionedRecordIterator<Integer>> queryResult = result.getOnlyPartitionResult(); + if (queryResult.isFailure()) { + throw new AssertionError(queryResult.toString()); + } + assertThat(queryResult.isSuccess(), is(true)); + + assertThrows(IllegalArgumentException.class, queryResult::getFailureReason); + assertThrows(IllegalArgumentException.class, queryResult::getFailureMessage); + + // verify results + final Map<Integer, QueryResult<VersionedRecordIterator<Integer>>> partitionResults = result.getPartitionResults(); + for (final Entry<Integer, QueryResult<VersionedRecordIterator<Integer>>> partitionResultsEntry : partitionResults.entrySet()) { + try (final VersionedRecordIterator<Integer> iterator = partitionResultsEntry.getValue().getResult()) { + int i = order.equals(ResultOrder.ASCENDING) ? 0 : expectedArrayUpperBound; + int iteratorSize = 0; + while (iterator.hasNext()) { + final VersionedRecord<Integer> record = iterator.next(); + final Long timestamp = record.timestamp(); + final Optional<Long> validTo = record.validTo(); + final Integer value = record.value(); + + final Optional<Long> expectedValidTo = i < expectedArrayUpperBound ? Optional.of(RECORD_TIMESTAMPS[i + 1]) : Optional.empty(); + assertThat(value, is(RECORD_VALUES[i])); + assertThat(timestamp, is(RECORD_TIMESTAMPS[i])); + assertThat(validTo, is(expectedValidTo)); + assertThat(queryResult.getExecutionInfo(), is(empty())); + i = order.equals(ResultOrder.ASCENDING) ? i + 1 : i - 1; + iteratorSize++; + } + // The number of returned records by query is equal to expected number of records + assertThat(iteratorSize, equalTo(expectedArrayUpperBound - expectedArrayLowerBound + 1)); + } + } + } + + private void shouldVerifyGetNullForMultiVersionedKeyQuery(final Integer key, final Optional<Instant> fromTime, final Optional<Instant> toTime) { + // define query + MultiVersionedKeyQuery<Integer, Integer> query = MultiVersionedKeyQuery.withKey(key); + if (fromTime.isPresent()) { + query = query.fromTime(fromTime.get()); + } + if (toTime.isPresent()) { + query = query.toTime(toTime.get()); + } + + // send query and receive results + final StateQueryRequest<VersionedRecordIterator<Integer>> request = StateQueryRequest.inStore(STORE_NAME).withQuery(query).withPositionBound(PositionBound.at(INPUT_POSITION)); + final StateQueryResult<VersionedRecordIterator<Integer>> result = IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request); + final QueryResult<VersionedRecordIterator<Integer>> queryResult = result.getOnlyPartitionResult(); + + if (queryResult.isFailure()) { + throw new AssertionError(queryResult.toString()); + } + assertThat(queryResult.isSuccess(), is(true)); + + assertThrows(IllegalArgumentException.class, queryResult::getFailureReason); + assertThrows(IllegalArgumentException.class, queryResult::getFailureMessage); + + // verify results + final Map<Integer, QueryResult<VersionedRecordIterator<Integer>>> partitionResults = result.getPartitionResults(); + for (final Entry<Integer, QueryResult<VersionedRecordIterator<Integer>>> partitionResultsEntry : partitionResults.entrySet()) { + try (final VersionedRecordIterator<Integer> iterator = partitionResultsEntry.getValue().getResult()) { + assertFalse(iterator.hasNext()); + } + } + } + + private void shouldHandleRaceCondition() { + final MultiVersionedKeyQuery<Integer, Integer> query = MultiVersionedKeyQuery.withKey(RECORD_KEY); + + final StateQueryRequest<VersionedRecordIterator<Integer>> request = StateQueryRequest.inStore(STORE_NAME).withQuery(query).withPositionBound(PositionBound.at(INPUT_POSITION)); + final StateQueryResult<VersionedRecordIterator<Integer>> result = IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request); + final QueryResult<VersionedRecordIterator<Integer>> queryResult = result.getOnlyPartitionResult(); + if (queryResult.isFailure()) { + throw new AssertionError(queryResult.toString()); + } + assertThat(queryResult.isSuccess(), is(true)); + + assertThrows(IllegalArgumentException.class, queryResult::getFailureReason); + assertThrows(IllegalArgumentException.class, queryResult::getFailureMessage); + + // verify results in two steps + final Map<Integer, QueryResult<VersionedRecordIterator<Integer>>> partitionResults = result.getPartitionResults(); + for (final Entry<Integer, QueryResult<VersionedRecordIterator<Integer>>> partitionResultsEntry : partitionResults.entrySet()) { + try (final VersionedRecordIterator<Integer> iterator = partitionResultsEntry.getValue().getResult()) { + int i = LAST_INDEX; + int iteratorSize = 0; + + // step 1: + while (iterator.hasNext()) { + final VersionedRecord<Integer> record = iterator.next(); + final Long timestamp = record.timestamp(); + final Optional<Long> validTo = record.validTo(); + final Integer value = record.value(); + + final Optional<Long> expectedValidTo = i < LAST_INDEX ? Optional.of(RECORD_TIMESTAMPS[i + 1]) : Optional.empty(); + assertThat(value, is(RECORD_VALUES[i])); + assertThat(timestamp, is(RECORD_TIMESTAMPS[i])); + assertThat(validTo, is(expectedValidTo)); + assertThat(queryResult.getExecutionInfo(), is(empty())); + i--; + iteratorSize++; + if (i == 2) Review Comment: nit: we usually always use `{ }` for all blocks ########## streams/src/main/java/org/apache/kafka/streams/state/VersionedRecord.java: ########## @@ -27,18 +28,34 @@ public final class VersionedRecord<V> { private final V value; private final long timestamp; + private final Optional<Long> validTo; /** * Create a new {@link VersionedRecord} instance. {@code value} cannot be {@code null}. * - * @param value the value - * @param timestamp the timestamp + * @param value The value + * @param timestamp The type of the result returned by this query. */ public VersionedRecord(final V value, final long timestamp) { + this.value = Objects.requireNonNull(value, "value cannot be null."); + this.timestamp = timestamp; + this.validTo = Optional.empty(); + } + + /** + * Create a new {@link VersionedRecord} instance. {@code value} cannot be {@code null}. + * + * @param value The value + * @param timestamp The timestamp + * @param validTo The exclusive upper bound of the validity interval + */ + public VersionedRecord(final V value, final long timestamp, final Long validTo) { Review Comment: Should it be `long` instead of `Long` ? ########## streams/src/main/java/org/apache/kafka/streams/query/MultiVersionedKeyQuery.java: ########## @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.query; + +import java.time.Instant; +import java.util.Objects; +import java.util.Optional; +import org.apache.kafka.common.annotation.InterfaceStability.Evolving; +import org.apache.kafka.streams.state.VersionedRecordIterator; + +/** + * Interactive query for retrieving a set of records with the same specified key and different timestamps within the specified time range. + * No ordering is guaranteed for the results, but the results can be sorted by timestamp (in ascending or descending order) by calling the corresponding defined methods. + * + * @param <K> The type of the key. + * @param <V> The type of the result returned by this query. + */ +@Evolving +public final class MultiVersionedKeyQuery<K, V> implements Query<VersionedRecordIterator<V>> { + + private final K key; + private final Optional<Instant> fromTime; + private final Optional<Instant> toTime; + private final ResultOrder order; + + private MultiVersionedKeyQuery(final K key, final Optional<Instant> fromTime, final Optional<Instant> toTime, final ResultOrder order) { + this.key = key; + this.fromTime = fromTime; + this.toTime = toTime; + this.order = order; + } + + /** Review Comment: nit: indention should be 4, not 2 ########## streams/src/main/java/org/apache/kafka/streams/query/MultiVersionedKeyQuery.java: ########## @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.query; + +import java.time.Instant; +import java.util.Objects; +import java.util.Optional; +import org.apache.kafka.common.annotation.InterfaceStability.Evolving; +import org.apache.kafka.streams.state.VersionedRecordIterator; + +/** + * Interactive query for retrieving a set of records with the same specified key and different timestamps within the specified time range. + * No ordering is guaranteed for the results, but the results can be sorted by timestamp (in ascending or descending order) by calling the corresponding defined methods. + * + * @param <K> The type of the key. + * @param <V> The type of the result returned by this query. + */ +@Evolving +public final class MultiVersionedKeyQuery<K, V> implements Query<VersionedRecordIterator<V>> { + + private final K key; + private final Optional<Instant> fromTime; + private final Optional<Instant> toTime; + private final ResultOrder order; + + private MultiVersionedKeyQuery(final K key, final Optional<Instant> fromTime, final Optional<Instant> toTime, final ResultOrder order) { + this.key = key; + this.fromTime = fromTime; + this.toTime = toTime; + this.order = order; + } + + /** + * Creates a query that will retrieve the set of records identified by {@code key} if any exists + * (or {@code null} otherwise). + * + * <p> + * While the query by default returns the all the record versions of the specified {@code key}, setting + * the {@code fromTimestamp} (by calling the {@link #fromTime(Instant)} method), and the {@code toTimestamp} + * (by calling the {@link #toTime(Instant)} method) makes the query to return the record versions associated + * to the specified time range. + * + * @param key The specified key by the query + * @param <K> The type of the key + * @param <V> The type of the value that will be retrieved + * @throws NullPointerException if @param key is null Review Comment: `if {@code key} is null` ########## streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalSegmentIterator.java: ########## @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.query.ResultOrder; +import org.apache.kafka.streams.state.VersionedRecord; +import org.apache.kafka.streams.state.VersionedRecordIterator; +import org.rocksdb.Snapshot; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.ListIterator; +import java.util.NoSuchElementException; + +public class LogicalSegmentIterator implements VersionedRecordIterator { + protected final ListIterator<LogicalKeyValueSegment> segmentIterator; + private final Bytes key; + private final Long fromTime; + private final Long toTime; + private final ResultOrder order; + private ListIterator<VersionedRecord<byte[]>> iterator; + private volatile boolean open = true; + + // defined for creating/releasing the snapshot. + private LogicalKeyValueSegment snapshotOwner = null; + private Snapshot snapshot = null; + + + + public LogicalSegmentIterator(final ListIterator<LogicalKeyValueSegment> segmentIterator, + final Bytes key, + final Long fromTime, + final Long toTime, + final ResultOrder order) { + + this.segmentIterator = segmentIterator; + this.key = key; + this.fromTime = fromTime; + this.toTime = toTime; + this.iterator = Collections.emptyListIterator(); + this.order = order; + } + + @Override + public void close() { + open = false; + // user may refuse consuming all returned records, so release the snapshot when closing the iterator if it is not released yet! + releaseSnapshot(); + } + + @Override + public boolean hasNext() { + if (!open) { + throw new IllegalStateException("The iterator is out of scope."); + } + final boolean hasStillLoad = order.equals(ResultOrder.ASCENDING) ? iterator.hasPrevious() : iterator.hasNext(); + return hasStillLoad || maybeFillIterator(); + } + + @Override + public Object next() { + if (hasNext()) { + return order.equals(ResultOrder.ASCENDING) ? iterator.previous() : iterator.next(); Review Comment: has above (maybe add comment) ########## streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalSegmentIterator.java: ########## @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.query.ResultOrder; +import org.apache.kafka.streams.state.VersionedRecord; +import org.apache.kafka.streams.state.VersionedRecordIterator; +import org.rocksdb.Snapshot; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.ListIterator; +import java.util.NoSuchElementException; + +public class LogicalSegmentIterator implements VersionedRecordIterator { + protected final ListIterator<LogicalKeyValueSegment> segmentIterator; + private final Bytes key; + private final Long fromTime; + private final Long toTime; + private final ResultOrder order; + private ListIterator<VersionedRecord<byte[]>> iterator; + private volatile boolean open = true; + + // defined for creating/releasing the snapshot. + private LogicalKeyValueSegment snapshotOwner = null; + private Snapshot snapshot = null; + + + + public LogicalSegmentIterator(final ListIterator<LogicalKeyValueSegment> segmentIterator, + final Bytes key, + final Long fromTime, + final Long toTime, + final ResultOrder order) { + + this.segmentIterator = segmentIterator; + this.key = key; + this.fromTime = fromTime; + this.toTime = toTime; + this.iterator = Collections.emptyListIterator(); + this.order = order; + } + + @Override + public void close() { + open = false; + // user may refuse consuming all returned records, so release the snapshot when closing the iterator if it is not released yet! + releaseSnapshot(); + } + + @Override + public boolean hasNext() { + if (!open) { + throw new IllegalStateException("The iterator is out of scope."); + } + final boolean hasStillLoad = order.equals(ResultOrder.ASCENDING) ? iterator.hasPrevious() : iterator.hasNext(); + return hasStillLoad || maybeFillIterator(); + } + + @Override + public Object next() { + if (hasNext()) { + return order.equals(ResultOrder.ASCENDING) ? iterator.previous() : iterator.next(); + } + throw new NoSuchElementException(); + } + + private boolean maybeFillIterator() { + + final List<VersionedRecord<byte[]>> queryResults = new ArrayList<>(); + while (segmentIterator.hasNext()) { + final LogicalKeyValueSegment segment = segmentIterator.next(); + + if (snapshot == null) { // create the snapshot (this will happen only one time). + this.snapshotOwner = segment; Review Comment: We might want to add a comment what we can pick a random segment as owner, because all use the same physical RocksDB under-the-hood ########## streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalSegmentIterator.java: ########## @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.query.ResultOrder; +import org.apache.kafka.streams.state.VersionedRecord; +import org.apache.kafka.streams.state.VersionedRecordIterator; +import org.rocksdb.Snapshot; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.ListIterator; +import java.util.NoSuchElementException; + +public class LogicalSegmentIterator implements VersionedRecordIterator { + protected final ListIterator<LogicalKeyValueSegment> segmentIterator; Review Comment: Why `protected` ? ########## streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalSegmentIterator.java: ########## @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.query.ResultOrder; +import org.apache.kafka.streams.state.VersionedRecord; +import org.apache.kafka.streams.state.VersionedRecordIterator; +import org.rocksdb.Snapshot; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.ListIterator; +import java.util.NoSuchElementException; + +public class LogicalSegmentIterator implements VersionedRecordIterator { + protected final ListIterator<LogicalKeyValueSegment> segmentIterator; + private final Bytes key; + private final Long fromTime; + private final Long toTime; + private final ResultOrder order; + private ListIterator<VersionedRecord<byte[]>> iterator; + private volatile boolean open = true; + + // defined for creating/releasing the snapshot. + private LogicalKeyValueSegment snapshotOwner = null; + private Snapshot snapshot = null; + + + + public LogicalSegmentIterator(final ListIterator<LogicalKeyValueSegment> segmentIterator, + final Bytes key, + final Long fromTime, + final Long toTime, + final ResultOrder order) { + + this.segmentIterator = segmentIterator; + this.key = key; + this.fromTime = fromTime; + this.toTime = toTime; + this.iterator = Collections.emptyListIterator(); + this.order = order; + } + + @Override + public void close() { + open = false; + // user may refuse consuming all returned records, so release the snapshot when closing the iterator if it is not released yet! + releaseSnapshot(); + } + + @Override + public boolean hasNext() { + if (!open) { + throw new IllegalStateException("The iterator is out of scope."); + } + final boolean hasStillLoad = order.equals(ResultOrder.ASCENDING) ? iterator.hasPrevious() : iterator.hasNext(); + return hasStillLoad || maybeFillIterator(); + } + + @Override + public Object next() { + if (hasNext()) { + return order.equals(ResultOrder.ASCENDING) ? iterator.previous() : iterator.next(); + } + throw new NoSuchElementException(); + } + + private boolean maybeFillIterator() { + + final List<VersionedRecord<byte[]>> queryResults = new ArrayList<>(); + while (segmentIterator.hasNext()) { + final LogicalKeyValueSegment segment = segmentIterator.next(); + + if (snapshot == null) { // create the snapshot (this will happen only one time). + this.snapshotOwner = segment; + // take a RocksDB snapshot to return the segments content at the query time (in order to guarantee consistency) + this.snapshot = snapshotOwner.getSnapshot(); + } + + final byte[] rawSegmentValue = segment.get(key, snapshot); + if (rawSegmentValue != null) { // this segment contains record(s) with the specified key + if (segment.id() == -1) { // this is the latestValueStore + final long recordTimestamp = RocksDBVersionedStore.LatestValueFormatter.getTimestamp(rawSegmentValue); + if (recordTimestamp <= toTime) { + // latest value satisfies timestamp bound + queryResults.add(new VersionedRecord<>(RocksDBVersionedStore.LatestValueFormatter.getValue(rawSegmentValue), recordTimestamp)); + } + } else { + // this segment contains records with the specified key and time range + final List<RocksDBVersionedStoreSegmentValueFormatter.SegmentValue.SegmentSearchResult> searchResults = + RocksDBVersionedStoreSegmentValueFormatter.deserialize(rawSegmentValue).findAll(fromTime, toTime); + for (final RocksDBVersionedStoreSegmentValueFormatter.SegmentValue.SegmentSearchResult searchResult : searchResults) { + queryResults.add(new VersionedRecord<>(searchResult.value(), searchResult.validFrom(), searchResult.validTo())); + } + } + } + if (!queryResults.isEmpty()) { + break; + } + } + if (!queryResults.isEmpty()) { + this.iterator = order.equals(ResultOrder.ASCENDING) ? queryResults.listIterator(queryResults.size()) : queryResults.listIterator(); Review Comment: add comment (as above) ########## streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java: ########## @@ -286,16 +316,16 @@ public void close() { @Override public <R> QueryResult<R> query( - final Query<R> query, Review Comment: more formatting ########## streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java: ########## @@ -181,8 +186,8 @@ public VersionedRecord<byte[]> get(final Bytes key) { final byte[] rawLatestValueAndTimestamp = latestValueStore.get(key); if (rawLatestValueAndTimestamp != null) { return new VersionedRecord<>( - LatestValueFormatter.getValue(rawLatestValueAndTimestamp), - LatestValueFormatter.getTimestamp(rawLatestValueAndTimestamp) + LatestValueFormatter.getValue(rawLatestValueAndTimestamp), + LatestValueFormatter.getTimestamp(rawLatestValueAndTimestamp) Review Comment: unnecessary reformatting (some more below) ########## streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java: ########## @@ -242,9 +249,9 @@ private static <R> QueryResult<R> runKeyQuery(final Query<R> query, @SuppressWarnings("unchecked") private static <R> QueryResult<R> runWindowKeyQuery(final Query<R> query, - final PositionBound positionBound, - final QueryConfig config, - final StateStore store) { + final PositionBound positionBound, Review Comment: fix formatting ########## streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalSegmentIterator.java: ########## @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.query.ResultOrder; +import org.apache.kafka.streams.state.VersionedRecord; +import org.apache.kafka.streams.state.VersionedRecordIterator; +import org.rocksdb.Snapshot; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.ListIterator; +import java.util.NoSuchElementException; + +public class LogicalSegmentIterator implements VersionedRecordIterator { + protected final ListIterator<LogicalKeyValueSegment> segmentIterator; + private final Bytes key; + private final Long fromTime; + private final Long toTime; + private final ResultOrder order; + private ListIterator<VersionedRecord<byte[]>> iterator; + private volatile boolean open = true; + + // defined for creating/releasing the snapshot. + private LogicalKeyValueSegment snapshotOwner = null; + private Snapshot snapshot = null; + + + + public LogicalSegmentIterator(final ListIterator<LogicalKeyValueSegment> segmentIterator, + final Bytes key, + final Long fromTime, + final Long toTime, + final ResultOrder order) { + + this.segmentIterator = segmentIterator; + this.key = key; + this.fromTime = fromTime; + this.toTime = toTime; + this.iterator = Collections.emptyListIterator(); + this.order = order; + } + + @Override + public void close() { + open = false; + // user may refuse consuming all returned records, so release the snapshot when closing the iterator if it is not released yet! + releaseSnapshot(); + } + + @Override + public boolean hasNext() { + if (!open) { + throw new IllegalStateException("The iterator is out of scope."); + } + final boolean hasStillLoad = order.equals(ResultOrder.ASCENDING) ? iterator.hasPrevious() : iterator.hasNext(); + return hasStillLoad || maybeFillIterator(); + } + + @Override + public Object next() { + if (hasNext()) { + return order.equals(ResultOrder.ASCENDING) ? iterator.previous() : iterator.next(); + } + throw new NoSuchElementException(); + } + + private boolean maybeFillIterator() { + + final List<VersionedRecord<byte[]>> queryResults = new ArrayList<>(); + while (segmentIterator.hasNext()) { + final LogicalKeyValueSegment segment = segmentIterator.next(); + + if (snapshot == null) { // create the snapshot (this will happen only one time). + this.snapshotOwner = segment; + // take a RocksDB snapshot to return the segments content at the query time (in order to guarantee consistency) + this.snapshot = snapshotOwner.getSnapshot(); + } + + final byte[] rawSegmentValue = segment.get(key, snapshot); + if (rawSegmentValue != null) { // this segment contains record(s) with the specified key + if (segment.id() == -1) { // this is the latestValueStore + final long recordTimestamp = RocksDBVersionedStore.LatestValueFormatter.getTimestamp(rawSegmentValue); + if (recordTimestamp <= toTime) { + // latest value satisfies timestamp bound + queryResults.add(new VersionedRecord<>(RocksDBVersionedStore.LatestValueFormatter.getValue(rawSegmentValue), recordTimestamp)); + } + } else { + // this segment contains records with the specified key and time range + final List<RocksDBVersionedStoreSegmentValueFormatter.SegmentValue.SegmentSearchResult> searchResults = Review Comment: Instead of using `SegmentSearchResult` it might be better to return type `SegmentValue` (ie a `PartiallyDeserializedSegmentValue` object), and push the logic to get individual values out of the segment inside the iterator (ie, being lazy again)? -- Otherwise `findAll` creates potentially a lot of `SegmentSearchResult` object that the user never retrieves when closing the iterator early. ########## streams/src/test/java/org/apache/kafka/streams/integration/IQv2VersionedStoreIntegrationTest.java: ########## @@ -153,14 +192,189 @@ private void shouldHandleVersionedKeyQuery(final Integer key, final VersionedRecord<Integer> result1 = queryResult.getResult(); assertThat(result1.value(), is(expectedValue)); assertThat(result1.timestamp(), is(expectedTimestamp)); + assertThat(result1.validTo(), is(expectedValidToTime)); assertThat(queryResult.getExecutionInfo(), is(empty())); } - private void shouldVerifyGetNull(final Integer key, final Instant queryTimestamp) { + private void shouldVerifyGetNullForVersionedKeyQuery(final Integer key, final Instant queryTimestamp) { VersionedKeyQuery<Integer, Integer> query = VersionedKeyQuery.withKey(key); query = query.asOf(queryTimestamp); final StateQueryRequest<VersionedRecord<Integer>> request = StateQueryRequest.inStore(STORE_NAME).withQuery(query).withPositionBound(PositionBound.at(INPUT_POSITION)); final StateQueryResult<VersionedRecord<Integer>> result = IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request); assertThat(result.getOnlyPartitionResult(), nullValue()); } + + private void shouldHandleMultiVersionedKeyQuery(final Optional<Instant> fromTime, final Optional<Instant> toTime, + final ResultOrder order, final int expectedArrayLowerBound, final int expectedArrayUpperBound) { + // define query + MultiVersionedKeyQuery<Integer, Integer> query = MultiVersionedKeyQuery.withKey(RECORD_KEY); + if (fromTime.isPresent()) { + query = query.fromTime(fromTime.get()); + } + if (toTime.isPresent()) { + query = query.toTime(toTime.get()); + } + if (order.equals(ResultOrder.ASCENDING)) { + query = query.withAscendingTimestamps(); + } + + // send request and get the results + final StateQueryRequest<VersionedRecordIterator<Integer>> request = StateQueryRequest.inStore(STORE_NAME).withQuery(query).withPositionBound(PositionBound.at(INPUT_POSITION)); + final StateQueryResult<VersionedRecordIterator<Integer>> result = IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request); + final QueryResult<VersionedRecordIterator<Integer>> queryResult = result.getOnlyPartitionResult(); + if (queryResult.isFailure()) { + throw new AssertionError(queryResult.toString()); + } + assertThat(queryResult.isSuccess(), is(true)); + + assertThrows(IllegalArgumentException.class, queryResult::getFailureReason); + assertThrows(IllegalArgumentException.class, queryResult::getFailureMessage); + + // verify results + final Map<Integer, QueryResult<VersionedRecordIterator<Integer>>> partitionResults = result.getPartitionResults(); + for (final Entry<Integer, QueryResult<VersionedRecordIterator<Integer>>> partitionResultsEntry : partitionResults.entrySet()) { + try (final VersionedRecordIterator<Integer> iterator = partitionResultsEntry.getValue().getResult()) { + int i = order.equals(ResultOrder.ASCENDING) ? 0 : expectedArrayUpperBound; + int iteratorSize = 0; + while (iterator.hasNext()) { + final VersionedRecord<Integer> record = iterator.next(); + final Long timestamp = record.timestamp(); + final Optional<Long> validTo = record.validTo(); + final Integer value = record.value(); + + final Optional<Long> expectedValidTo = i < expectedArrayUpperBound ? Optional.of(RECORD_TIMESTAMPS[i + 1]) : Optional.empty(); + assertThat(value, is(RECORD_VALUES[i])); + assertThat(timestamp, is(RECORD_TIMESTAMPS[i])); + assertThat(validTo, is(expectedValidTo)); + assertThat(queryResult.getExecutionInfo(), is(empty())); + i = order.equals(ResultOrder.ASCENDING) ? i + 1 : i - 1; + iteratorSize++; + } + // The number of returned records by query is equal to expected number of records + assertThat(iteratorSize, equalTo(expectedArrayUpperBound - expectedArrayLowerBound + 1)); + } + } + } + + private void shouldVerifyGetNullForMultiVersionedKeyQuery(final Integer key, final Optional<Instant> fromTime, final Optional<Instant> toTime) { + // define query + MultiVersionedKeyQuery<Integer, Integer> query = MultiVersionedKeyQuery.withKey(key); Review Comment: Seems the "define query" part is the same for multiple test methods? Can you unify/extract this part into helper methods (including the part about "send query and receive result")? ########## streams/src/test/java/org/apache/kafka/streams/integration/IQv2VersionedStoreIntegrationTest.java: ########## @@ -153,14 +192,189 @@ private void shouldHandleVersionedKeyQuery(final Integer key, final VersionedRecord<Integer> result1 = queryResult.getResult(); assertThat(result1.value(), is(expectedValue)); assertThat(result1.timestamp(), is(expectedTimestamp)); + assertThat(result1.validTo(), is(expectedValidToTime)); assertThat(queryResult.getExecutionInfo(), is(empty())); } - private void shouldVerifyGetNull(final Integer key, final Instant queryTimestamp) { + private void shouldVerifyGetNullForVersionedKeyQuery(final Integer key, final Instant queryTimestamp) { VersionedKeyQuery<Integer, Integer> query = VersionedKeyQuery.withKey(key); query = query.asOf(queryTimestamp); final StateQueryRequest<VersionedRecord<Integer>> request = StateQueryRequest.inStore(STORE_NAME).withQuery(query).withPositionBound(PositionBound.at(INPUT_POSITION)); final StateQueryResult<VersionedRecord<Integer>> result = IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request); assertThat(result.getOnlyPartitionResult(), nullValue()); } + + private void shouldHandleMultiVersionedKeyQuery(final Optional<Instant> fromTime, final Optional<Instant> toTime, + final ResultOrder order, final int expectedArrayLowerBound, final int expectedArrayUpperBound) { + // define query + MultiVersionedKeyQuery<Integer, Integer> query = MultiVersionedKeyQuery.withKey(RECORD_KEY); + if (fromTime.isPresent()) { + query = query.fromTime(fromTime.get()); + } + if (toTime.isPresent()) { + query = query.toTime(toTime.get()); + } + if (order.equals(ResultOrder.ASCENDING)) { + query = query.withAscendingTimestamps(); + } + + // send request and get the results + final StateQueryRequest<VersionedRecordIterator<Integer>> request = StateQueryRequest.inStore(STORE_NAME).withQuery(query).withPositionBound(PositionBound.at(INPUT_POSITION)); + final StateQueryResult<VersionedRecordIterator<Integer>> result = IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request); + final QueryResult<VersionedRecordIterator<Integer>> queryResult = result.getOnlyPartitionResult(); + if (queryResult.isFailure()) { + throw new AssertionError(queryResult.toString()); + } + assertThat(queryResult.isSuccess(), is(true)); + + assertThrows(IllegalArgumentException.class, queryResult::getFailureReason); + assertThrows(IllegalArgumentException.class, queryResult::getFailureMessage); + + // verify results + final Map<Integer, QueryResult<VersionedRecordIterator<Integer>>> partitionResults = result.getPartitionResults(); + for (final Entry<Integer, QueryResult<VersionedRecordIterator<Integer>>> partitionResultsEntry : partitionResults.entrySet()) { + try (final VersionedRecordIterator<Integer> iterator = partitionResultsEntry.getValue().getResult()) { + int i = order.equals(ResultOrder.ASCENDING) ? 0 : expectedArrayUpperBound; + int iteratorSize = 0; + while (iterator.hasNext()) { + final VersionedRecord<Integer> record = iterator.next(); + final Long timestamp = record.timestamp(); + final Optional<Long> validTo = record.validTo(); + final Integer value = record.value(); + + final Optional<Long> expectedValidTo = i < expectedArrayUpperBound ? Optional.of(RECORD_TIMESTAMPS[i + 1]) : Optional.empty(); + assertThat(value, is(RECORD_VALUES[i])); + assertThat(timestamp, is(RECORD_TIMESTAMPS[i])); + assertThat(validTo, is(expectedValidTo)); + assertThat(queryResult.getExecutionInfo(), is(empty())); + i = order.equals(ResultOrder.ASCENDING) ? i + 1 : i - 1; + iteratorSize++; + } + // The number of returned records by query is equal to expected number of records + assertThat(iteratorSize, equalTo(expectedArrayUpperBound - expectedArrayLowerBound + 1)); + } + } + } + + private void shouldVerifyGetNullForMultiVersionedKeyQuery(final Integer key, final Optional<Instant> fromTime, final Optional<Instant> toTime) { + // define query + MultiVersionedKeyQuery<Integer, Integer> query = MultiVersionedKeyQuery.withKey(key); + if (fromTime.isPresent()) { + query = query.fromTime(fromTime.get()); + } + if (toTime.isPresent()) { + query = query.toTime(toTime.get()); + } + + // send query and receive results + final StateQueryRequest<VersionedRecordIterator<Integer>> request = StateQueryRequest.inStore(STORE_NAME).withQuery(query).withPositionBound(PositionBound.at(INPUT_POSITION)); + final StateQueryResult<VersionedRecordIterator<Integer>> result = IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request); + final QueryResult<VersionedRecordIterator<Integer>> queryResult = result.getOnlyPartitionResult(); + + if (queryResult.isFailure()) { + throw new AssertionError(queryResult.toString()); + } + assertThat(queryResult.isSuccess(), is(true)); + + assertThrows(IllegalArgumentException.class, queryResult::getFailureReason); + assertThrows(IllegalArgumentException.class, queryResult::getFailureMessage); + + // verify results + final Map<Integer, QueryResult<VersionedRecordIterator<Integer>>> partitionResults = result.getPartitionResults(); + for (final Entry<Integer, QueryResult<VersionedRecordIterator<Integer>>> partitionResultsEntry : partitionResults.entrySet()) { + try (final VersionedRecordIterator<Integer> iterator = partitionResultsEntry.getValue().getResult()) { + assertFalse(iterator.hasNext()); + } + } + } + + private void shouldHandleRaceCondition() { + final MultiVersionedKeyQuery<Integer, Integer> query = MultiVersionedKeyQuery.withKey(RECORD_KEY); + + final StateQueryRequest<VersionedRecordIterator<Integer>> request = StateQueryRequest.inStore(STORE_NAME).withQuery(query).withPositionBound(PositionBound.at(INPUT_POSITION)); + final StateQueryResult<VersionedRecordIterator<Integer>> result = IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request); + final QueryResult<VersionedRecordIterator<Integer>> queryResult = result.getOnlyPartitionResult(); + if (queryResult.isFailure()) { + throw new AssertionError(queryResult.toString()); + } + assertThat(queryResult.isSuccess(), is(true)); + + assertThrows(IllegalArgumentException.class, queryResult::getFailureReason); + assertThrows(IllegalArgumentException.class, queryResult::getFailureMessage); + + // verify results in two steps + final Map<Integer, QueryResult<VersionedRecordIterator<Integer>>> partitionResults = result.getPartitionResults(); + for (final Entry<Integer, QueryResult<VersionedRecordIterator<Integer>>> partitionResultsEntry : partitionResults.entrySet()) { + try (final VersionedRecordIterator<Integer> iterator = partitionResultsEntry.getValue().getResult()) { + int i = LAST_INDEX; + int iteratorSize = 0; + + // step 1: + while (iterator.hasNext()) { + final VersionedRecord<Integer> record = iterator.next(); + final Long timestamp = record.timestamp(); + final Optional<Long> validTo = record.validTo(); + final Integer value = record.value(); + + final Optional<Long> expectedValidTo = i < LAST_INDEX ? Optional.of(RECORD_TIMESTAMPS[i + 1]) : Optional.empty(); + assertThat(value, is(RECORD_VALUES[i])); + assertThat(timestamp, is(RECORD_TIMESTAMPS[i])); + assertThat(validTo, is(expectedValidTo)); + assertThat(queryResult.getExecutionInfo(), is(empty())); + i--; + iteratorSize++; + if (i == 2) + break; + } + + // update the value of the oldest record + updateRecordValue(); + + // step 2: continue reading records from through the already opened iterator + while (iterator.hasNext()) { + final VersionedRecord<Integer> record = iterator.next(); + final Long timestamp = record.timestamp(); + final Optional<Long> validTo = record.validTo(); + final Integer value = record.value(); + + final Optional<Long> expectedValidTo = i < LAST_INDEX ? Optional.of(RECORD_TIMESTAMPS[i + 1]) : Optional.empty(); + assertThat(value, is(RECORD_VALUES[i])); + assertThat(timestamp, is(RECORD_TIMESTAMPS[i])); + assertThat(validTo, is(expectedValidTo)); + assertThat(queryResult.getExecutionInfo(), is(empty())); + i--; + iteratorSize++; + } + + // The number of returned records by query is equal to expected number of records + assertThat(iteratorSize, equalTo(RECORD_NUMBER)); + } + } + } + + private void updateRecordValue() { + // update the record value at RECORD_TIMESTAMPS[0] + final Properties producerProps = new Properties(); + producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); + producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class); + producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class); + try (final KafkaProducer<Integer, Integer> producer = new KafkaProducer<>(producerProps)) { + producer.send(new ProducerRecord<>(INPUT_TOPIC_NAME, 0, RECORD_TIMESTAMPS[0], RECORD_KEY, 999999)); Review Comment: Why do we re-use `RECORD_TIMESTAMPS[0]` here? Not sure if I can follow? -- Won't this overwrite the first/oldest value which was already returned in the first `while` loop before `updateRecordValue` is executed? ########## streams/src/test/java/org/apache/kafka/streams/integration/IQv2VersionedStoreIntegrationTest.java: ########## @@ -153,14 +192,189 @@ private void shouldHandleVersionedKeyQuery(final Integer key, final VersionedRecord<Integer> result1 = queryResult.getResult(); assertThat(result1.value(), is(expectedValue)); assertThat(result1.timestamp(), is(expectedTimestamp)); + assertThat(result1.validTo(), is(expectedValidToTime)); assertThat(queryResult.getExecutionInfo(), is(empty())); } - private void shouldVerifyGetNull(final Integer key, final Instant queryTimestamp) { + private void shouldVerifyGetNullForVersionedKeyQuery(final Integer key, final Instant queryTimestamp) { VersionedKeyQuery<Integer, Integer> query = VersionedKeyQuery.withKey(key); query = query.asOf(queryTimestamp); final StateQueryRequest<VersionedRecord<Integer>> request = StateQueryRequest.inStore(STORE_NAME).withQuery(query).withPositionBound(PositionBound.at(INPUT_POSITION)); final StateQueryResult<VersionedRecord<Integer>> result = IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request); assertThat(result.getOnlyPartitionResult(), nullValue()); } + + private void shouldHandleMultiVersionedKeyQuery(final Optional<Instant> fromTime, final Optional<Instant> toTime, + final ResultOrder order, final int expectedArrayLowerBound, final int expectedArrayUpperBound) { + // define query + MultiVersionedKeyQuery<Integer, Integer> query = MultiVersionedKeyQuery.withKey(RECORD_KEY); + if (fromTime.isPresent()) { + query = query.fromTime(fromTime.get()); + } + if (toTime.isPresent()) { + query = query.toTime(toTime.get()); + } + if (order.equals(ResultOrder.ASCENDING)) { + query = query.withAscendingTimestamps(); + } + + // send request and get the results + final StateQueryRequest<VersionedRecordIterator<Integer>> request = StateQueryRequest.inStore(STORE_NAME).withQuery(query).withPositionBound(PositionBound.at(INPUT_POSITION)); + final StateQueryResult<VersionedRecordIterator<Integer>> result = IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request); + final QueryResult<VersionedRecordIterator<Integer>> queryResult = result.getOnlyPartitionResult(); + if (queryResult.isFailure()) { + throw new AssertionError(queryResult.toString()); + } + assertThat(queryResult.isSuccess(), is(true)); + + assertThrows(IllegalArgumentException.class, queryResult::getFailureReason); + assertThrows(IllegalArgumentException.class, queryResult::getFailureMessage); + + // verify results + final Map<Integer, QueryResult<VersionedRecordIterator<Integer>>> partitionResults = result.getPartitionResults(); + for (final Entry<Integer, QueryResult<VersionedRecordIterator<Integer>>> partitionResultsEntry : partitionResults.entrySet()) { + try (final VersionedRecordIterator<Integer> iterator = partitionResultsEntry.getValue().getResult()) { + int i = order.equals(ResultOrder.ASCENDING) ? 0 : expectedArrayUpperBound; + int iteratorSize = 0; + while (iterator.hasNext()) { + final VersionedRecord<Integer> record = iterator.next(); + final Long timestamp = record.timestamp(); + final Optional<Long> validTo = record.validTo(); + final Integer value = record.value(); + + final Optional<Long> expectedValidTo = i < expectedArrayUpperBound ? Optional.of(RECORD_TIMESTAMPS[i + 1]) : Optional.empty(); + assertThat(value, is(RECORD_VALUES[i])); + assertThat(timestamp, is(RECORD_TIMESTAMPS[i])); + assertThat(validTo, is(expectedValidTo)); + assertThat(queryResult.getExecutionInfo(), is(empty())); + i = order.equals(ResultOrder.ASCENDING) ? i + 1 : i - 1; + iteratorSize++; + } + // The number of returned records by query is equal to expected number of records + assertThat(iteratorSize, equalTo(expectedArrayUpperBound - expectedArrayLowerBound + 1)); + } + } + } + + private void shouldVerifyGetNullForMultiVersionedKeyQuery(final Integer key, final Optional<Instant> fromTime, final Optional<Instant> toTime) { + // define query + MultiVersionedKeyQuery<Integer, Integer> query = MultiVersionedKeyQuery.withKey(key); + if (fromTime.isPresent()) { + query = query.fromTime(fromTime.get()); + } + if (toTime.isPresent()) { + query = query.toTime(toTime.get()); + } + + // send query and receive results + final StateQueryRequest<VersionedRecordIterator<Integer>> request = StateQueryRequest.inStore(STORE_NAME).withQuery(query).withPositionBound(PositionBound.at(INPUT_POSITION)); + final StateQueryResult<VersionedRecordIterator<Integer>> result = IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request); + final QueryResult<VersionedRecordIterator<Integer>> queryResult = result.getOnlyPartitionResult(); + + if (queryResult.isFailure()) { + throw new AssertionError(queryResult.toString()); + } + assertThat(queryResult.isSuccess(), is(true)); + + assertThrows(IllegalArgumentException.class, queryResult::getFailureReason); + assertThrows(IllegalArgumentException.class, queryResult::getFailureMessage); + + // verify results + final Map<Integer, QueryResult<VersionedRecordIterator<Integer>>> partitionResults = result.getPartitionResults(); + for (final Entry<Integer, QueryResult<VersionedRecordIterator<Integer>>> partitionResultsEntry : partitionResults.entrySet()) { + try (final VersionedRecordIterator<Integer> iterator = partitionResultsEntry.getValue().getResult()) { + assertFalse(iterator.hasNext()); + } + } + } + + private void shouldHandleRaceCondition() { + final MultiVersionedKeyQuery<Integer, Integer> query = MultiVersionedKeyQuery.withKey(RECORD_KEY); + + final StateQueryRequest<VersionedRecordIterator<Integer>> request = StateQueryRequest.inStore(STORE_NAME).withQuery(query).withPositionBound(PositionBound.at(INPUT_POSITION)); + final StateQueryResult<VersionedRecordIterator<Integer>> result = IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request); + final QueryResult<VersionedRecordIterator<Integer>> queryResult = result.getOnlyPartitionResult(); + if (queryResult.isFailure()) { + throw new AssertionError(queryResult.toString()); + } + assertThat(queryResult.isSuccess(), is(true)); + + assertThrows(IllegalArgumentException.class, queryResult::getFailureReason); + assertThrows(IllegalArgumentException.class, queryResult::getFailureMessage); + + // verify results in two steps + final Map<Integer, QueryResult<VersionedRecordIterator<Integer>>> partitionResults = result.getPartitionResults(); + for (final Entry<Integer, QueryResult<VersionedRecordIterator<Integer>>> partitionResultsEntry : partitionResults.entrySet()) { + try (final VersionedRecordIterator<Integer> iterator = partitionResultsEntry.getValue().getResult()) { + int i = LAST_INDEX; + int iteratorSize = 0; + + // step 1: + while (iterator.hasNext()) { + final VersionedRecord<Integer> record = iterator.next(); + final Long timestamp = record.timestamp(); + final Optional<Long> validTo = record.validTo(); + final Integer value = record.value(); + + final Optional<Long> expectedValidTo = i < LAST_INDEX ? Optional.of(RECORD_TIMESTAMPS[i + 1]) : Optional.empty(); + assertThat(value, is(RECORD_VALUES[i])); + assertThat(timestamp, is(RECORD_TIMESTAMPS[i])); + assertThat(validTo, is(expectedValidTo)); + assertThat(queryResult.getExecutionInfo(), is(empty())); + i--; + iteratorSize++; + if (i == 2) + break; + } + + // update the value of the oldest record + updateRecordValue(); + + // step 2: continue reading records from through the already opened iterator + while (iterator.hasNext()) { + final VersionedRecord<Integer> record = iterator.next(); + final Long timestamp = record.timestamp(); + final Optional<Long> validTo = record.validTo(); + final Integer value = record.value(); + + final Optional<Long> expectedValidTo = i < LAST_INDEX ? Optional.of(RECORD_TIMESTAMPS[i + 1]) : Optional.empty(); + assertThat(value, is(RECORD_VALUES[i])); + assertThat(timestamp, is(RECORD_TIMESTAMPS[i])); + assertThat(validTo, is(expectedValidTo)); + assertThat(queryResult.getExecutionInfo(), is(empty())); + i--; + iteratorSize++; + } + + // The number of returned records by query is equal to expected number of records + assertThat(iteratorSize, equalTo(RECORD_NUMBER)); + } + } + } + + private void updateRecordValue() { + // update the record value at RECORD_TIMESTAMPS[0] + final Properties producerProps = new Properties(); + producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); + producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class); + producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class); + try (final KafkaProducer<Integer, Integer> producer = new KafkaProducer<>(producerProps)) { + producer.send(new ProducerRecord<>(INPUT_TOPIC_NAME, 0, RECORD_TIMESTAMPS[0], RECORD_KEY, 999999)); + } + INPUT_POSITION.withComponent(INPUT_TOPIC_NAME, 0, 4); + assertThat(INPUT_POSITION, equalTo(Position.emptyPosition().withComponent(INPUT_TOPIC_NAME, 0, 4))); + + // make sure that the new value is picked up by the store + final Properties consumerProps = new Properties(); + consumerProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); + consumerProps.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class.getName()); + consumerProps.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class.getName()); + consumerProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "foo"); + consumerProps.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + try (final KafkaConsumer<Integer, Integer> consumer = new KafkaConsumer<>(consumerProps)) { + consumer.subscribe(Collections.singletonList(INPUT_TOPIC_NAME)); + // the last record is the newly added one + assertThat(consumer.poll(Duration.ofMillis(1000)).count(), equalTo(RECORD_NUMBER + 1)); Review Comment: We don't really have a guarantee that `poll()` returns all record in the first call... Use `org.apache.kafka.streams.integration.utils.IntegrationTestUtils` corresponding helpers `waitXxx(...)`. -- Otherwise we introduce a potentially flaky test. ########## streams/src/test/java/org/apache/kafka/streams/integration/IQv2VersionedStoreIntegrationTest.java: ########## @@ -153,14 +192,189 @@ private void shouldHandleVersionedKeyQuery(final Integer key, final VersionedRecord<Integer> result1 = queryResult.getResult(); assertThat(result1.value(), is(expectedValue)); assertThat(result1.timestamp(), is(expectedTimestamp)); + assertThat(result1.validTo(), is(expectedValidToTime)); assertThat(queryResult.getExecutionInfo(), is(empty())); } - private void shouldVerifyGetNull(final Integer key, final Instant queryTimestamp) { + private void shouldVerifyGetNullForVersionedKeyQuery(final Integer key, final Instant queryTimestamp) { VersionedKeyQuery<Integer, Integer> query = VersionedKeyQuery.withKey(key); query = query.asOf(queryTimestamp); final StateQueryRequest<VersionedRecord<Integer>> request = StateQueryRequest.inStore(STORE_NAME).withQuery(query).withPositionBound(PositionBound.at(INPUT_POSITION)); final StateQueryResult<VersionedRecord<Integer>> result = IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request); assertThat(result.getOnlyPartitionResult(), nullValue()); } + + private void shouldHandleMultiVersionedKeyQuery(final Optional<Instant> fromTime, final Optional<Instant> toTime, + final ResultOrder order, final int expectedArrayLowerBound, final int expectedArrayUpperBound) { + // define query + MultiVersionedKeyQuery<Integer, Integer> query = MultiVersionedKeyQuery.withKey(RECORD_KEY); + if (fromTime.isPresent()) { + query = query.fromTime(fromTime.get()); + } + if (toTime.isPresent()) { + query = query.toTime(toTime.get()); + } + if (order.equals(ResultOrder.ASCENDING)) { + query = query.withAscendingTimestamps(); + } + + // send request and get the results + final StateQueryRequest<VersionedRecordIterator<Integer>> request = StateQueryRequest.inStore(STORE_NAME).withQuery(query).withPositionBound(PositionBound.at(INPUT_POSITION)); + final StateQueryResult<VersionedRecordIterator<Integer>> result = IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request); + final QueryResult<VersionedRecordIterator<Integer>> queryResult = result.getOnlyPartitionResult(); + if (queryResult.isFailure()) { + throw new AssertionError(queryResult.toString()); + } + assertThat(queryResult.isSuccess(), is(true)); + + assertThrows(IllegalArgumentException.class, queryResult::getFailureReason); + assertThrows(IllegalArgumentException.class, queryResult::getFailureMessage); + + // verify results + final Map<Integer, QueryResult<VersionedRecordIterator<Integer>>> partitionResults = result.getPartitionResults(); + for (final Entry<Integer, QueryResult<VersionedRecordIterator<Integer>>> partitionResultsEntry : partitionResults.entrySet()) { + try (final VersionedRecordIterator<Integer> iterator = partitionResultsEntry.getValue().getResult()) { + int i = order.equals(ResultOrder.ASCENDING) ? 0 : expectedArrayUpperBound; + int iteratorSize = 0; + while (iterator.hasNext()) { + final VersionedRecord<Integer> record = iterator.next(); + final Long timestamp = record.timestamp(); + final Optional<Long> validTo = record.validTo(); + final Integer value = record.value(); + + final Optional<Long> expectedValidTo = i < expectedArrayUpperBound ? Optional.of(RECORD_TIMESTAMPS[i + 1]) : Optional.empty(); + assertThat(value, is(RECORD_VALUES[i])); + assertThat(timestamp, is(RECORD_TIMESTAMPS[i])); + assertThat(validTo, is(expectedValidTo)); + assertThat(queryResult.getExecutionInfo(), is(empty())); + i = order.equals(ResultOrder.ASCENDING) ? i + 1 : i - 1; + iteratorSize++; + } + // The number of returned records by query is equal to expected number of records + assertThat(iteratorSize, equalTo(expectedArrayUpperBound - expectedArrayLowerBound + 1)); + } + } + } + + private void shouldVerifyGetNullForMultiVersionedKeyQuery(final Integer key, final Optional<Instant> fromTime, final Optional<Instant> toTime) { + // define query + MultiVersionedKeyQuery<Integer, Integer> query = MultiVersionedKeyQuery.withKey(key); + if (fromTime.isPresent()) { + query = query.fromTime(fromTime.get()); + } + if (toTime.isPresent()) { + query = query.toTime(toTime.get()); + } + + // send query and receive results + final StateQueryRequest<VersionedRecordIterator<Integer>> request = StateQueryRequest.inStore(STORE_NAME).withQuery(query).withPositionBound(PositionBound.at(INPUT_POSITION)); + final StateQueryResult<VersionedRecordIterator<Integer>> result = IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request); + final QueryResult<VersionedRecordIterator<Integer>> queryResult = result.getOnlyPartitionResult(); + + if (queryResult.isFailure()) { + throw new AssertionError(queryResult.toString()); + } + assertThat(queryResult.isSuccess(), is(true)); + + assertThrows(IllegalArgumentException.class, queryResult::getFailureReason); + assertThrows(IllegalArgumentException.class, queryResult::getFailureMessage); + + // verify results + final Map<Integer, QueryResult<VersionedRecordIterator<Integer>>> partitionResults = result.getPartitionResults(); + for (final Entry<Integer, QueryResult<VersionedRecordIterator<Integer>>> partitionResultsEntry : partitionResults.entrySet()) { + try (final VersionedRecordIterator<Integer> iterator = partitionResultsEntry.getValue().getResult()) { + assertFalse(iterator.hasNext()); + } + } + } + + private void shouldHandleRaceCondition() { Review Comment: Guess this test needs some comment describing the race condition (and that it verify's that using `Snapshots` works correctly). ########## streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreTest.java: ########## @@ -471,15 +475,195 @@ public void shouldDistinguishEmptyAndNull() { verifyGetNullFromStore("k"); verifyTimestampedGetNullFromStore("k", SEGMENT_INTERVAL + 30); verifyTimestampedGetNullFromStore("k", SEGMENT_INTERVAL + 15); - verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL + 6, "", SEGMENT_INTERVAL + 5); + verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL + 6, "", SEGMENT_INTERVAL + 5, SEGMENT_INTERVAL + 10); verifyTimestampedGetNullFromStore("k", SEGMENT_INTERVAL + 2); verifyTimestampedGetNullFromStore("k", SEGMENT_INTERVAL); verifyTimestampedGetNullFromStore("k", SEGMENT_INTERVAL - 1); - verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL - 5, "", SEGMENT_INTERVAL - 5); - verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL - 6, "", SEGMENT_INTERVAL - 6); + verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL - 5, "", SEGMENT_INTERVAL - 5, SEGMENT_INTERVAL - 1); + verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL - 6, "", SEGMENT_INTERVAL - 6, SEGMENT_INTERVAL - 5); verifyTimestampedGetNullFromStore("k", SEGMENT_INTERVAL - 8); } + @Test + public void shouldGetRecordVersionsFromOlderSegments() { + // use a different key to create three different segments + putToStore("ko", null, SEGMENT_INTERVAL - 10, PUT_RETURN_CODE_VALID_TO_UNDEFINED); + putToStore("ko", null, 2 * SEGMENT_INTERVAL - 10, PUT_RETURN_CODE_VALID_TO_UNDEFINED); + putToStore("ko", null, 3 * SEGMENT_INTERVAL - 10, PUT_RETURN_CODE_VALID_TO_UNDEFINED); + + // return null after visiting all segments (the key does not exist.) + verifyTimestampedGetNullFromStore("k", SEGMENT_INTERVAL - 20, SEGMENT_INTERVAL); + + // insert data to create non-empty (first) segment + putToStore("k", "v1", SEGMENT_INTERVAL - 30, PUT_RETURN_CODE_VALID_TO_UNDEFINED); + putToStore("k", "v2", SEGMENT_INTERVAL - 25, PUT_RETURN_CODE_VALID_TO_UNDEFINED); + putToStore("k", null, SEGMENT_INTERVAL - 20, PUT_RETURN_CODE_VALID_TO_UNDEFINED); + putToStore("k", null, SEGMENT_INTERVAL - 15, PUT_RETURN_CODE_VALID_TO_UNDEFINED); + putToStore("k", "v3", SEGMENT_INTERVAL - 10, PUT_RETURN_CODE_VALID_TO_UNDEFINED); + putToStore("k", "v4", SEGMENT_INTERVAL - 5, PUT_RETURN_CODE_VALID_TO_UNDEFINED); + + + // return null for the query with a time range prior to inserting values + verifyTimestampedGetNullFromStore("k", SEGMENT_INTERVAL - 40, SEGMENT_INTERVAL - 35); + + // return values for the query with query time range in which values are still valid and there are multiple tombstones + verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL - 30, SEGMENT_INTERVAL - 5, ResultOrder.ANY, + Arrays.asList("v4", "v3", "v2", "v1"), + Arrays.asList(SEGMENT_INTERVAL - 5, SEGMENT_INTERVAL - 10, SEGMENT_INTERVAL - 25, SEGMENT_INTERVAL - 30), + Arrays.asList(PUT_RETURN_CODE_VALID_TO_UNDEFINED, SEGMENT_INTERVAL - 5, SEGMENT_INTERVAL - 20, SEGMENT_INTERVAL - 25)); + + // return values for the query with time range (MIN, MAX) + verifyTimestampedGetValueFromStore("k", Long.MIN_VALUE, Long.MAX_VALUE, ResultOrder.ANY, + Arrays.asList("v4", "v3", "v2", "v1"), + Arrays.asList(SEGMENT_INTERVAL - 5, SEGMENT_INTERVAL - 10, SEGMENT_INTERVAL - 25, SEGMENT_INTERVAL - 30), + Arrays.asList(PUT_RETURN_CODE_VALID_TO_UNDEFINED, SEGMENT_INTERVAL - 5, SEGMENT_INTERVAL - 20, SEGMENT_INTERVAL - 25)); + + // return the latest record (retrieve only from the latestValueStore) + verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL - 4, SEGMENT_INTERVAL, ResultOrder.ANY, + Collections.singletonList("v4"), + Collections.singletonList(SEGMENT_INTERVAL - 5), + Collections.singletonList(PUT_RETURN_CODE_VALID_TO_UNDEFINED)); + + // return two values for the query with time fromTimeStamp = toTimestamp + verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL - 5, SEGMENT_INTERVAL - 5, ResultOrder.ANY, Review Comment: Should this not return only one value, `v4`, because `v3` is not valid any longer? It's upper bound is exclusive, and `[-10,-5)` does not intersect with search interval `[-5,-5]` (a `[-5,-5]` range query is basically a point in time query and should return the same as `KeyQuery#asOf(-5)`), right? ########## streams/src/test/java/org/apache/kafka/streams/integration/IQv2VersionedStoreIntegrationTest.java: ########## @@ -153,14 +192,189 @@ private void shouldHandleVersionedKeyQuery(final Integer key, final VersionedRecord<Integer> result1 = queryResult.getResult(); assertThat(result1.value(), is(expectedValue)); assertThat(result1.timestamp(), is(expectedTimestamp)); + assertThat(result1.validTo(), is(expectedValidToTime)); assertThat(queryResult.getExecutionInfo(), is(empty())); } - private void shouldVerifyGetNull(final Integer key, final Instant queryTimestamp) { + private void shouldVerifyGetNullForVersionedKeyQuery(final Integer key, final Instant queryTimestamp) { VersionedKeyQuery<Integer, Integer> query = VersionedKeyQuery.withKey(key); query = query.asOf(queryTimestamp); final StateQueryRequest<VersionedRecord<Integer>> request = StateQueryRequest.inStore(STORE_NAME).withQuery(query).withPositionBound(PositionBound.at(INPUT_POSITION)); final StateQueryResult<VersionedRecord<Integer>> result = IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request); assertThat(result.getOnlyPartitionResult(), nullValue()); } + + private void shouldHandleMultiVersionedKeyQuery(final Optional<Instant> fromTime, final Optional<Instant> toTime, + final ResultOrder order, final int expectedArrayLowerBound, final int expectedArrayUpperBound) { + // define query + MultiVersionedKeyQuery<Integer, Integer> query = MultiVersionedKeyQuery.withKey(RECORD_KEY); + if (fromTime.isPresent()) { + query = query.fromTime(fromTime.get()); + } + if (toTime.isPresent()) { + query = query.toTime(toTime.get()); + } + if (order.equals(ResultOrder.ASCENDING)) { + query = query.withAscendingTimestamps(); + } + + // send request and get the results + final StateQueryRequest<VersionedRecordIterator<Integer>> request = StateQueryRequest.inStore(STORE_NAME).withQuery(query).withPositionBound(PositionBound.at(INPUT_POSITION)); + final StateQueryResult<VersionedRecordIterator<Integer>> result = IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request); + final QueryResult<VersionedRecordIterator<Integer>> queryResult = result.getOnlyPartitionResult(); + if (queryResult.isFailure()) { + throw new AssertionError(queryResult.toString()); + } + assertThat(queryResult.isSuccess(), is(true)); + + assertThrows(IllegalArgumentException.class, queryResult::getFailureReason); + assertThrows(IllegalArgumentException.class, queryResult::getFailureMessage); + + // verify results + final Map<Integer, QueryResult<VersionedRecordIterator<Integer>>> partitionResults = result.getPartitionResults(); + for (final Entry<Integer, QueryResult<VersionedRecordIterator<Integer>>> partitionResultsEntry : partitionResults.entrySet()) { + try (final VersionedRecordIterator<Integer> iterator = partitionResultsEntry.getValue().getResult()) { + int i = order.equals(ResultOrder.ASCENDING) ? 0 : expectedArrayUpperBound; + int iteratorSize = 0; + while (iterator.hasNext()) { + final VersionedRecord<Integer> record = iterator.next(); + final Long timestamp = record.timestamp(); + final Optional<Long> validTo = record.validTo(); + final Integer value = record.value(); + + final Optional<Long> expectedValidTo = i < expectedArrayUpperBound ? Optional.of(RECORD_TIMESTAMPS[i + 1]) : Optional.empty(); + assertThat(value, is(RECORD_VALUES[i])); + assertThat(timestamp, is(RECORD_TIMESTAMPS[i])); + assertThat(validTo, is(expectedValidTo)); + assertThat(queryResult.getExecutionInfo(), is(empty())); + i = order.equals(ResultOrder.ASCENDING) ? i + 1 : i - 1; + iteratorSize++; + } + // The number of returned records by query is equal to expected number of records + assertThat(iteratorSize, equalTo(expectedArrayUpperBound - expectedArrayLowerBound + 1)); + } + } + } + + private void shouldVerifyGetNullForMultiVersionedKeyQuery(final Integer key, final Optional<Instant> fromTime, final Optional<Instant> toTime) { + // define query + MultiVersionedKeyQuery<Integer, Integer> query = MultiVersionedKeyQuery.withKey(key); + if (fromTime.isPresent()) { + query = query.fromTime(fromTime.get()); + } + if (toTime.isPresent()) { + query = query.toTime(toTime.get()); + } + + // send query and receive results + final StateQueryRequest<VersionedRecordIterator<Integer>> request = StateQueryRequest.inStore(STORE_NAME).withQuery(query).withPositionBound(PositionBound.at(INPUT_POSITION)); + final StateQueryResult<VersionedRecordIterator<Integer>> result = IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request); + final QueryResult<VersionedRecordIterator<Integer>> queryResult = result.getOnlyPartitionResult(); + + if (queryResult.isFailure()) { + throw new AssertionError(queryResult.toString()); + } + assertThat(queryResult.isSuccess(), is(true)); + + assertThrows(IllegalArgumentException.class, queryResult::getFailureReason); + assertThrows(IllegalArgumentException.class, queryResult::getFailureMessage); + + // verify results + final Map<Integer, QueryResult<VersionedRecordIterator<Integer>>> partitionResults = result.getPartitionResults(); + for (final Entry<Integer, QueryResult<VersionedRecordIterator<Integer>>> partitionResultsEntry : partitionResults.entrySet()) { + try (final VersionedRecordIterator<Integer> iterator = partitionResultsEntry.getValue().getResult()) { + assertFalse(iterator.hasNext()); + } + } + } + + private void shouldHandleRaceCondition() { + final MultiVersionedKeyQuery<Integer, Integer> query = MultiVersionedKeyQuery.withKey(RECORD_KEY); + + final StateQueryRequest<VersionedRecordIterator<Integer>> request = StateQueryRequest.inStore(STORE_NAME).withQuery(query).withPositionBound(PositionBound.at(INPUT_POSITION)); + final StateQueryResult<VersionedRecordIterator<Integer>> result = IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request); + final QueryResult<VersionedRecordIterator<Integer>> queryResult = result.getOnlyPartitionResult(); + if (queryResult.isFailure()) { + throw new AssertionError(queryResult.toString()); + } + assertThat(queryResult.isSuccess(), is(true)); + + assertThrows(IllegalArgumentException.class, queryResult::getFailureReason); + assertThrows(IllegalArgumentException.class, queryResult::getFailureMessage); + + // verify results in two steps + final Map<Integer, QueryResult<VersionedRecordIterator<Integer>>> partitionResults = result.getPartitionResults(); + for (final Entry<Integer, QueryResult<VersionedRecordIterator<Integer>>> partitionResultsEntry : partitionResults.entrySet()) { + try (final VersionedRecordIterator<Integer> iterator = partitionResultsEntry.getValue().getResult()) { + int i = LAST_INDEX; + int iteratorSize = 0; + + // step 1: + while (iterator.hasNext()) { + final VersionedRecord<Integer> record = iterator.next(); + final Long timestamp = record.timestamp(); + final Optional<Long> validTo = record.validTo(); + final Integer value = record.value(); + + final Optional<Long> expectedValidTo = i < LAST_INDEX ? Optional.of(RECORD_TIMESTAMPS[i + 1]) : Optional.empty(); Review Comment: Seem we can simplify this, given that we break if `i == 2 ad thus we never reach `LAST_INDEX` ? ########## streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStoreTest.java: ########## @@ -296,6 +300,19 @@ public void shouldThrowOnIQv2KeyQuery() { assertThrows(UnsupportedOperationException.class, () -> store.query(mock(KeyQuery.class), null, null)); } + @SuppressWarnings("unchecked") Review Comment: Can we avoid this suppression? ########## streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreTest.java: ########## @@ -471,15 +475,195 @@ public void shouldDistinguishEmptyAndNull() { verifyGetNullFromStore("k"); verifyTimestampedGetNullFromStore("k", SEGMENT_INTERVAL + 30); verifyTimestampedGetNullFromStore("k", SEGMENT_INTERVAL + 15); - verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL + 6, "", SEGMENT_INTERVAL + 5); + verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL + 6, "", SEGMENT_INTERVAL + 5, SEGMENT_INTERVAL + 10); verifyTimestampedGetNullFromStore("k", SEGMENT_INTERVAL + 2); verifyTimestampedGetNullFromStore("k", SEGMENT_INTERVAL); verifyTimestampedGetNullFromStore("k", SEGMENT_INTERVAL - 1); - verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL - 5, "", SEGMENT_INTERVAL - 5); - verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL - 6, "", SEGMENT_INTERVAL - 6); + verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL - 5, "", SEGMENT_INTERVAL - 5, SEGMENT_INTERVAL - 1); + verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL - 6, "", SEGMENT_INTERVAL - 6, SEGMENT_INTERVAL - 5); verifyTimestampedGetNullFromStore("k", SEGMENT_INTERVAL - 8); } + @Test + public void shouldGetRecordVersionsFromOlderSegments() { + // use a different key to create three different segments + putToStore("ko", null, SEGMENT_INTERVAL - 10, PUT_RETURN_CODE_VALID_TO_UNDEFINED); + putToStore("ko", null, 2 * SEGMENT_INTERVAL - 10, PUT_RETURN_CODE_VALID_TO_UNDEFINED); + putToStore("ko", null, 3 * SEGMENT_INTERVAL - 10, PUT_RETURN_CODE_VALID_TO_UNDEFINED); + + // return null after visiting all segments (the key does not exist.) + verifyTimestampedGetNullFromStore("k", SEGMENT_INTERVAL - 20, SEGMENT_INTERVAL); + + // insert data to create non-empty (first) segment + putToStore("k", "v1", SEGMENT_INTERVAL - 30, PUT_RETURN_CODE_VALID_TO_UNDEFINED); + putToStore("k", "v2", SEGMENT_INTERVAL - 25, PUT_RETURN_CODE_VALID_TO_UNDEFINED); + putToStore("k", null, SEGMENT_INTERVAL - 20, PUT_RETURN_CODE_VALID_TO_UNDEFINED); + putToStore("k", null, SEGMENT_INTERVAL - 15, PUT_RETURN_CODE_VALID_TO_UNDEFINED); + putToStore("k", "v3", SEGMENT_INTERVAL - 10, PUT_RETURN_CODE_VALID_TO_UNDEFINED); + putToStore("k", "v4", SEGMENT_INTERVAL - 5, PUT_RETURN_CODE_VALID_TO_UNDEFINED); + + + // return null for the query with a time range prior to inserting values + verifyTimestampedGetNullFromStore("k", SEGMENT_INTERVAL - 40, SEGMENT_INTERVAL - 35); + + // return values for the query with query time range in which values are still valid and there are multiple tombstones + verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL - 30, SEGMENT_INTERVAL - 5, ResultOrder.ANY, + Arrays.asList("v4", "v3", "v2", "v1"), + Arrays.asList(SEGMENT_INTERVAL - 5, SEGMENT_INTERVAL - 10, SEGMENT_INTERVAL - 25, SEGMENT_INTERVAL - 30), + Arrays.asList(PUT_RETURN_CODE_VALID_TO_UNDEFINED, SEGMENT_INTERVAL - 5, SEGMENT_INTERVAL - 20, SEGMENT_INTERVAL - 25)); + + // return values for the query with time range (MIN, MAX) + verifyTimestampedGetValueFromStore("k", Long.MIN_VALUE, Long.MAX_VALUE, ResultOrder.ANY, + Arrays.asList("v4", "v3", "v2", "v1"), + Arrays.asList(SEGMENT_INTERVAL - 5, SEGMENT_INTERVAL - 10, SEGMENT_INTERVAL - 25, SEGMENT_INTERVAL - 30), + Arrays.asList(PUT_RETURN_CODE_VALID_TO_UNDEFINED, SEGMENT_INTERVAL - 5, SEGMENT_INTERVAL - 20, SEGMENT_INTERVAL - 25)); + + // return the latest record (retrieve only from the latestValueStore) + verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL - 4, SEGMENT_INTERVAL, ResultOrder.ANY, + Collections.singletonList("v4"), + Collections.singletonList(SEGMENT_INTERVAL - 5), + Collections.singletonList(PUT_RETURN_CODE_VALID_TO_UNDEFINED)); + + // return two values for the query with time fromTimeStamp = toTimestamp + verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL - 5, SEGMENT_INTERVAL - 5, ResultOrder.ANY, + Arrays.asList("v4", "v3"), + Arrays.asList(SEGMENT_INTERVAL - 5, SEGMENT_INTERVAL - 10), + Arrays.asList(PUT_RETURN_CODE_VALID_TO_UNDEFINED, SEGMENT_INTERVAL - 5)); + + // return one values for the query with time fromTimeStamp = toTimestamp + verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL - 4, SEGMENT_INTERVAL - 4, ResultOrder.ANY, + Collections.singletonList("v4"), + Collections.singletonList(SEGMENT_INTERVAL - 5), + Collections.singletonList(PUT_RETURN_CODE_VALID_TO_UNDEFINED)); + + // return values before insertion of any tombstone + verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL - 31, SEGMENT_INTERVAL - 21, ResultOrder.ANY, + Arrays.asList("v2", "v1"), + Arrays.asList(SEGMENT_INTERVAL - 25, SEGMENT_INTERVAL - 30), + Arrays.asList(SEGMENT_INTERVAL - 20, SEGMENT_INTERVAL - 25)); + + // return values for the query with time range that covers both tombstones + verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL - 24, SEGMENT_INTERVAL - 11, ResultOrder.ANY, + Collections.singletonList("v2"), + Collections.singletonList(SEGMENT_INTERVAL - 25), + Collections.singletonList(SEGMENT_INTERVAL - 20)); + + // return values for the query with time range that after insertion of tombstones + verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL - 11, SEGMENT_INTERVAL - 4, ResultOrder.ANY, + Arrays.asList("v4", "v3"), + Arrays.asList(SEGMENT_INTERVAL - 5, SEGMENT_INTERVAL - 10), + Arrays.asList(PUT_RETURN_CODE_VALID_TO_UNDEFINED, SEGMENT_INTERVAL - 5)); + + // return all the records that are valid during the query time range but inserted beforehand + verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL - 26, SEGMENT_INTERVAL - 5, ResultOrder.ANY, + Arrays.asList("v4", "v3", "v2", "v1"), + Arrays.asList(SEGMENT_INTERVAL - 5, SEGMENT_INTERVAL - 10, SEGMENT_INTERVAL - 25, SEGMENT_INTERVAL - 30), + Arrays.asList(PUT_RETURN_CODE_VALID_TO_UNDEFINED, SEGMENT_INTERVAL - 5, SEGMENT_INTERVAL - 20, SEGMENT_INTERVAL - 25)); + + // return the valid record that is still valid till the beginning of query time range (validTo = query lower time bound) + verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL - 20, SEGMENT_INTERVAL - 15, ResultOrder.ANY, Review Comment: This should be `-19` to `-14` I stop looking int the rest of the cases -- seems there is some "off by one" question? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org