This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 15c24da KAFKA-10847: Delete Time-ordered duplicated records using deleteRange() internally (#10537) 15c24da is described below commit 15c24da888bc0a2582fd84d89fa1b84354fede64 Author: Sergio Peña <ser...@confluent.io> AuthorDate: Sun Apr 18 13:18:09 2021 -0500 KAFKA-10847: Delete Time-ordered duplicated records using deleteRange() internally (#10537) This PR changes the TimeOrderedKeySchema composite key from time-seq-key -> time-key-seq to allow deletion of duplicated time-key records using the RocksDB deleteRange API. It also removes all duplicates when put(key, null) is called. Currently, the put(key, null) was a no-op, which was causing problems because there was no way to delete any keys when duplicates are allowed. The RocksDB deleteRange(keyFrom, keyTo) deletes a range of keys from keyFrom (inclusive) to keyTo (exclusive). To make keyTo inclusive, I incremented the end key by one when calling the RocksDBAccessor. Reviewers: Guozhang Wang <wangg...@gmail.com> --- .../AbstractRocksDBSegmentedBytesStore.java | 9 + .../streams/state/internals/KeyValueSegment.java | 6 + .../streams/state/internals/RocksDBStore.java | 26 +++ .../internals/RocksDBTimeOrderedWindowStore.java | 29 ++- .../state/internals/RocksDBTimestampedStore.java | 16 ++ .../kafka/streams/state/internals/Segment.java | 1 + .../state/internals/SegmentedBytesStore.java | 20 ++ .../state/internals/TimeOrderedKeySchema.java | 106 +++++------ .../state/internals/TimestampedSegment.java | 6 + .../AbstractRocksDBSegmentedBytesStoreTest.java | 2 +- .../RocksDBTimeOrderedWindowStoreTest.java | 205 +++++++++++++++++++-- .../state/internals/TimeOrderedKeySchemaTest.java | 93 ---------- 12 files changed, 325 insertions(+), 194 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java index 4c545a5..51fb3ca 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java @@ -197,6 +197,15 @@ public class AbstractRocksDBSegmentedBytesStore<S extends Segment> implements Se } @Override + public void remove(final Bytes key, final long timestamp) { + final Bytes keyBytes = keySchema.toStoreBinaryKeyPrefix(key, timestamp); + final S segment = segments.getSegmentForTimestamp(timestamp); + if (segment != null) { + segment.deleteRange(keyBytes, keyBytes); + } + } + + @Override public void put(final Bytes key, final byte[] value) { final long timestamp = keySchema.segmentTimestamp(key); diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueSegment.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueSegment.java index b6d6504..66c55fc 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueSegment.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueSegment.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.streams.state.internals; +import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecorder; @@ -41,6 +42,11 @@ class KeyValueSegment extends RocksDBStore implements Comparable<KeyValueSegment } @Override + public synchronized void deleteRange(final Bytes keyFrom, final Bytes keyTo) { + super.deleteRange(keyFrom, keyTo); + } + + @Override public int compareTo(final KeyValueSegment segment) { return Long.compare(id, segment.id); } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java index 12ba4eb..c03f86d 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java @@ -349,6 +349,16 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BatchWritingS return oldValue; } + void deleteRange(final Bytes keyFrom, final Bytes keyTo) { + Objects.requireNonNull(keyFrom, "keyFrom cannot be null"); + Objects.requireNonNull(keyTo, "keyTo cannot be null"); + + validateStoreOpen(); + + // End of key is exclusive, so we increment it by 1 byte to make keyTo inclusive + dbAccessor.deleteRange(keyFrom.get(), Bytes.increment(keyTo).get()); + } + @Override public synchronized KeyValueIterator<Bytes, byte[]> range(final Bytes from, final Bytes to) { @@ -524,6 +534,12 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BatchWritingS final Bytes to, final boolean forward); + /** + * Deletes keys entries in the range ['from', 'to'], including 'from' and excluding 'to'. + */ + void deleteRange(final byte[] from, + final byte[] to); + KeyValueIterator<Bytes, byte[]> all(final boolean forward); KeyValueIterator<Bytes, byte[]> prefixScan(final Bytes prefix); @@ -604,6 +620,16 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BatchWritingS } @Override + public void deleteRange(final byte[] from, final byte[] to) { + try { + db.deleteRange(columnFamily, wOptions, from, to); + } catch (final RocksDBException e) { + // String format is happening in wrapping stores. So formatted message is thrown from wrapping stores. + throw new ProcessorStateException("Error while removing key from store " + name, e); + } + } + + @Override public KeyValueIterator<Bytes, byte[]> all(final boolean forward) { final RocksIterator innerIterWithTimestamp = db.newIterator(columnFamily); if (forward) { diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedWindowStore.java index 3efeb32..f8ba883 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedWindowStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedWindowStore.java @@ -52,29 +52,29 @@ public class RocksDBTimeOrderedWindowStore @Override public void put(final Bytes key, final byte[] value, final long timestamp) { - // Skip if value is null and duplicates are allowed since this delete is a no-op if (!(value == null && retainDuplicates)) { maybeUpdateSeqnumForDups(); wrapped().put(TimeOrderedKeySchema.toStoreKeyBinary(key, timestamp, seqnum), value); + } else { + // Delete all duplicates for the specified key and timestamp + wrapped().remove(key, timestamp); } } @Override public byte[] fetch(final Bytes key, final long timestamp) { - return wrapped().get(TimeOrderedKeySchema.toStoreKeyBinary(key, timestamp, seqnum)); + throw new UnsupportedOperationException(); } - @SuppressWarnings("deprecation") // note, this method must be kept if super#fetch(...) is removed + @SuppressWarnings("deprecation") @Override public WindowStoreIterator<byte[]> fetch(final Bytes key, final long timeFrom, final long timeTo) { - final KeyValueIterator<Bytes, byte[]> bytesIterator = wrapped().fetch(key, timeFrom, timeTo); - return new TimeOrderedWindowStoreIteratorWrapper(bytesIterator, windowSize).valuesIterator(); + throw new UnsupportedOperationException(); } @Override public WindowStoreIterator<byte[]> backwardFetch(final Bytes key, final long timeFrom, final long timeTo) { - final KeyValueIterator<Bytes, byte[]> bytesIterator = wrapped().backwardFetch(key, timeFrom, timeTo); - return new TimeOrderedWindowStoreIteratorWrapper(bytesIterator, windowSize).valuesIterator(); + throw new UnsupportedOperationException(); } @SuppressWarnings("deprecation") // note, this method must be kept if super#fetch(...) is removed @@ -83,8 +83,7 @@ public class RocksDBTimeOrderedWindowStore final Bytes keyTo, final long timeFrom, final long timeTo) { - final KeyValueIterator<Bytes, byte[]> bytesIterator = wrapped().fetch(keyFrom, keyTo, timeFrom, timeTo); - return new TimeOrderedWindowStoreIteratorWrapper(bytesIterator, windowSize).keyValueIterator(); + throw new UnsupportedOperationException(); } @Override @@ -92,8 +91,7 @@ public class RocksDBTimeOrderedWindowStore final Bytes keyTo, final long timeFrom, final long timeTo) { - final KeyValueIterator<Bytes, byte[]> bytesIterator = wrapped().backwardFetch(keyFrom, keyTo, timeFrom, timeTo); - return new TimeOrderedWindowStoreIteratorWrapper(bytesIterator, windowSize).keyValueIterator(); + throw new UnsupportedOperationException(); } @Override @@ -104,21 +102,18 @@ public class RocksDBTimeOrderedWindowStore @Override public KeyValueIterator<Windowed<Bytes>, byte[]> backwardAll() { - final KeyValueIterator<Bytes, byte[]> bytesIterator = wrapped().backwardAll(); - return new TimeOrderedWindowStoreIteratorWrapper(bytesIterator, windowSize).keyValueIterator(); + throw new UnsupportedOperationException(); } @SuppressWarnings("deprecation") // note, this method must be kept if super#fetchAll(...) is removed @Override public KeyValueIterator<Windowed<Bytes>, byte[]> fetchAll(final long timeFrom, final long timeTo) { - final KeyValueIterator<Bytes, byte[]> bytesIterator = wrapped().fetchAll(timeFrom, timeTo); - return new TimeOrderedWindowStoreIteratorWrapper(bytesIterator, windowSize).keyValueIterator(); + throw new UnsupportedOperationException(); } @Override public KeyValueIterator<Windowed<Bytes>, byte[]> backwardFetchAll(final long timeFrom, final long timeTo) { - final KeyValueIterator<Bytes, byte[]> bytesIterator = wrapped().backwardFetchAll(timeFrom, timeTo); - return new TimeOrderedWindowStoreIteratorWrapper(bytesIterator, windowSize).keyValueIterator(); + throw new UnsupportedOperationException(); } private void maybeUpdateSeqnumForDups() { diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java index db3d175..dafbc8f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java @@ -206,6 +206,22 @@ public class RocksDBTimestampedStore extends RocksDBStore implements Timestamped } @Override + public void deleteRange(final byte[] from, final byte[] to) { + try { + db.deleteRange(oldColumnFamily, wOptions, from, to); + } catch (final RocksDBException e) { + // String format is happening in wrapping stores. So formatted message is thrown from wrapping stores. + throw new ProcessorStateException("Error while removing key from store " + name, e); + } + try { + db.deleteRange(newColumnFamily, wOptions, from, to); + } catch (final RocksDBException e) { + // String format is happening in wrapping stores. So formatted message is thrown from wrapping stores. + throw new ProcessorStateException("Error while removing key from store " + name, e); + } + } + + @Override public KeyValueIterator<Bytes, byte[]> all(final boolean forward) { final RocksIterator innerIterWithTimestamp = db.newIterator(newColumnFamily); final RocksIterator innerIterNoTimestamp = db.newIterator(oldColumnFamily); diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/Segment.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/Segment.java index 9fddc16..ea3b89a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/Segment.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/Segment.java @@ -25,4 +25,5 @@ public interface Segment extends KeyValueStore<Bytes, byte[]>, BatchWritingStore void destroy() throws IOException; + void deleteRange(Bytes keyFrom, Bytes keyTo); } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentedBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentedBytesStore.java index 79ada1f..4519929 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentedBytesStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentedBytesStore.java @@ -110,6 +110,14 @@ public interface SegmentedBytesStore extends StateStore { void remove(Bytes key); /** + * Remove all duplicated records with the provided key in the specified timestamp. + * + * @param key the segmented key to remove + * @param timestamp the timestamp to match + */ + void remove(Bytes key, long timestamp); + + /** * Write a new value to the store with the provided key. The key * should be a composite of the record key, and the timestamp information etc * as described by the {@link KeySchema} @@ -152,6 +160,18 @@ public interface SegmentedBytesStore extends StateStore { Bytes lowerRange(final Bytes key, final long from); /** + * Given a record key and a time, construct a Segmented key to search when performing + * prefixed queries. + * + * @param key + * @param timestamp + * @return The key that represents the prefixed Segmented key in bytes. + */ + default Bytes toStoreBinaryKeyPrefix(final Bytes key, long timestamp) { + throw new UnsupportedOperationException(); + } + + /** * Given a range of fixed size record keys and a time, construct a Segmented key that represents * the upper range of keys to search when performing range queries. * @see SessionKeySchema#upperRange diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimeOrderedKeySchema.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimeOrderedKeySchema.java index b834b3a..f191d2f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimeOrderedKeySchema.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimeOrderedKeySchema.java @@ -30,62 +30,40 @@ import java.util.List; /** * A {@link RocksDBSegmentedBytesStore.KeySchema} to serialize/deserialize a RocksDB store - * key into a schema combined of (time,seq,key). This key schema is more efficient when doing - * range queries between a time interval. For key range queries better use {@link WindowKeySchema}. + * key into a schema combined of (time,key,seq). Since key is variable length while time/seq is + * fixed length, when formatting in this order, varying time range query would be very inefficient + * since we'd need to be very conservative in picking the from / to boundaries; however for now + * we do not expect any varying time range access at all, only fixed time range only. */ public class TimeOrderedKeySchema implements RocksDBSegmentedBytesStore.KeySchema { private static final Logger LOG = LoggerFactory.getLogger(TimeOrderedKeySchema.class); private static final int TIMESTAMP_SIZE = 8; private static final int SEQNUM_SIZE = 4; - private static final int PREFIX_SIZE = TIMESTAMP_SIZE + SEQNUM_SIZE; - /** - * {@inheritdoc} - * - * Queries using the {@link TimeOrderedKeySchema} are optimized for time range queries only. Key - * range queries may be slower. If better performance on key range queries are necessary, then - * use the {@link WindowKeySchema}. - */ @Override public Bytes upperRange(final Bytes key, final long to) { - return toStoreKeyBinary(key.get(), to, Integer.MAX_VALUE); + throw new UnsupportedOperationException(); } - /** - * {@inheritdoc} - * - * Queries using the {@link TimeOrderedKeySchema} are optimized for time range queries only. Key - * range queries may be slower. If better performance on key range queries are necessary, then - * use the {@link WindowKeySchema}. - */ @Override public Bytes lowerRange(final Bytes key, final long from) { - return toStoreKeyBinary(key.get(), from, 0); + throw new UnsupportedOperationException(); + } + + @Override + public Bytes toStoreBinaryKeyPrefix(final Bytes key, final long timestamp) { + return toStoreKeyBinaryPrefix(key, timestamp); } - /** - * {@inheritdoc} - * - * Queries using the {@link TimeOrderedKeySchema} are optimized for time range queries only. Key - * range queries may be slower. If better performance on key range queries are necessary, then - * use the {@link WindowKeySchema}. - */ @Override public Bytes upperRangeFixedSize(final Bytes key, final long to) { - return toStoreKeyBinary(key, to, Integer.MAX_VALUE); + throw new UnsupportedOperationException(); } - /** - * {@inheritdoc} - * - * Queries using the {@link TimeOrderedKeySchema} are optimized for time range queries only. Key - * range queries may be slower. If better performance on key range queries are necessary, then - * use the {@link WindowKeySchema}. - */ @Override public Bytes lowerRangeFixedSize(final Bytes key, final long from) { - return toStoreKeyBinary(key, Math.max(0, from), 0); + throw new UnsupportedOperationException(); } @Override @@ -96,31 +74,36 @@ public class TimeOrderedKeySchema implements RocksDBSegmentedBytesStore.KeySchem /** * {@inheritdoc} * - * This method is not optimized for {@link TimeOrderedKeySchema}. The method may do unnecessary - * checks to find the next record. + * This method is optimized for {@link RocksDBTimeOrderedWindowStore#all()} only. Key and time + * range queries are not supported. */ @Override public HasNextCondition hasNextCondition(final Bytes binaryKeyFrom, final Bytes binaryKeyTo, final long from, final long to) { - return iterator -> { - while (iterator.hasNext()) { - final Bytes bytes = iterator.peekNextKey(); - final Bytes keyBytes = Bytes.wrap(extractStoreKeyBytes(bytes.get())); - final long time = extractStoreTimestamp(bytes.get()); - if ((binaryKeyFrom == null || keyBytes.compareTo(binaryKeyFrom) >= 0) - && (binaryKeyTo == null || keyBytes.compareTo(binaryKeyTo) <= 0) - && time >= from - && time <= to) { - return true; - } - iterator.next(); - } - return false; - }; + if (binaryKeyFrom != null || binaryKeyTo != null) { + throw new IllegalArgumentException("binaryKeyFrom/binaryKeyTo keys cannot be non-null. Key and time range queries are not supported."); + } + + if (from != 0 && to != Long.MAX_VALUE) { + throw new IllegalArgumentException("from/to time ranges should be 0 to Long.MAX_VALUE. Key and time range queries are not supported."); + } + + return iterator -> iterator.hasNext(); } @Override public <S extends Segment> List<S> segmentsToSearch(final Segments<S> segments, final long from, final long to, final boolean forward) { - return segments.segments(from, to, forward); + throw new UnsupportedOperationException(); + } + + public static Bytes toStoreKeyBinaryPrefix(final Bytes key, + final long timestamp) { + final byte[] serializedKey = key.get(); + + final ByteBuffer buf = ByteBuffer.allocate(TIMESTAMP_SIZE + serializedKey.length); + buf.putLong(timestamp); + buf.put(serializedKey); + + return Bytes.wrap(buf.array()); } public static Bytes toStoreKeyBinary(final Bytes key, @@ -149,30 +132,23 @@ public class TimeOrderedKeySchema implements RocksDBSegmentedBytesStore.KeySchem final int seqnum) { final ByteBuffer buf = ByteBuffer.allocate(TIMESTAMP_SIZE + serializedKey.length + SEQNUM_SIZE); buf.putLong(timestamp); - buf.putInt(seqnum); buf.put(serializedKey); + buf.putInt(seqnum); return Bytes.wrap(buf.array()); } static byte[] extractStoreKeyBytes(final byte[] binaryKey) { final byte[] bytes = new byte[binaryKey.length - TIMESTAMP_SIZE - SEQNUM_SIZE]; - System.arraycopy(binaryKey, PREFIX_SIZE, bytes, 0, bytes.length); + System.arraycopy(binaryKey, TIMESTAMP_SIZE, bytes, 0, bytes.length); return bytes; } - static <K> K extractStoreKey(final byte[] binaryKey, - final StateSerdes<K, ?> serdes) { - final byte[] bytes = new byte[binaryKey.length - TIMESTAMP_SIZE - SEQNUM_SIZE]; - System.arraycopy(binaryKey, PREFIX_SIZE, bytes, 0, bytes.length); - return serdes.keyFrom(bytes); - } - static long extractStoreTimestamp(final byte[] binaryKey) { return ByteBuffer.wrap(binaryKey).getLong(0); } static int extractStoreSequence(final byte[] binaryKey) { - return ByteBuffer.wrap(binaryKey).getInt(TIMESTAMP_SIZE); + return ByteBuffer.wrap(binaryKey).getInt(binaryKey.length - SEQNUM_SIZE); } static <K> Windowed<K> fromStoreKey(final byte[] binaryKey, @@ -202,8 +178,8 @@ public class TimeOrderedKeySchema implements RocksDBSegmentedBytesStore.KeySchem * Safely construct a time window of the given size, * taking care of bounding endMs to Long.MAX_VALUE if necessary */ - static TimeWindow timeWindowForSize(final long startMs, - final long windowSize) { + private static TimeWindow timeWindowForSize(final long startMs, + final long windowSize) { long endMs = startMs + windowSize; if (endMs < 0) { diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedSegment.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedSegment.java index 36fe0e5..f0e4cf6 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedSegment.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedSegment.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.streams.state.internals; +import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecorder; @@ -41,6 +42,11 @@ class TimestampedSegment extends RocksDBTimestampedStore implements Comparable<T } @Override + public void deleteRange(final Bytes keyFrom, final Bytes keyTo) { + throw new UnsupportedOperationException(); + } + + @Override public int compareTo(final TimestampedSegment segment) { return Long.compare(id, segment.id); } diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java index a1344f6..53d1846 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java @@ -93,7 +93,7 @@ public abstract class AbstractRocksDBSegmentedBytesStoreTest<S extends Segment> @Parameters(name = "{0}") public static Object[] getKeySchemas() { - return new Object[] {new SessionKeySchema(), new WindowKeySchema(), new TimeOrderedKeySchema()}; + return new Object[] {new SessionKeySchema(), new WindowKeySchema()}; } @Before diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedWindowStoreTest.java index 37f98f1..2646c4c 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedWindowStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedWindowStoreTest.java @@ -16,18 +16,68 @@ */ package org.apache.kafka.streams.state.internals; +import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Time; -import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender; -import org.apache.kafka.streams.state.StateSerdes; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.processor.StateStoreContext; +import org.apache.kafka.streams.processor.internals.MockStreamsMetrics; +import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.WindowStore; +import org.apache.kafka.test.InternalMockProcessorContext; +import org.apache.kafka.test.MockRecordCollector; +import org.apache.kafka.test.TestUtils; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import java.io.File; + +import static java.util.Arrays.asList; import static org.apache.kafka.streams.state.internals.RocksDbWindowBytesStoreSupplier.WindowStoreTypes; +import static org.apache.kafka.test.StreamsTestUtils.toList; +import static org.junit.Assert.assertEquals; + +public class RocksDBTimeOrderedWindowStoreTest { + private static final long WINDOW_SIZE = 3L; + private static final long SEGMENT_INTERVAL = 60_000L; + private static final long RETENTION_PERIOD = 2 * SEGMENT_INTERVAL; + + private static final String STORE_NAME = "rocksDB time-ordered window store"; + + WindowStore<Integer, String> windowStore; + InternalMockProcessorContext context; + MockRecordCollector recordCollector; + + final File baseDir = TestUtils.tempDirectory("test"); -public class RocksDBTimeOrderedWindowStoreTest extends RocksDBWindowStoreTest { - private static final String STORE_NAME = "rocksDB window store"; + @Before + public void setup() { + windowStore = buildWindowStore(RETENTION_PERIOD, WINDOW_SIZE, true, Serdes.Integer(), Serdes.String()); + + recordCollector = new MockRecordCollector(); + context = new InternalMockProcessorContext( + baseDir, + Serdes.String(), + Serdes.Integer(), + recordCollector, + new ThreadCache( + new LogContext("testCache"), + 0, + new MockStreamsMetrics(new Metrics()))); + context.setTime(1L); + + windowStore.init((StateStoreContext) context, windowStore); + } + + @After + public void after() { + windowStore.close(); + } - @Override <K, V> WindowStore<K, V> buildWindowStore(final long retentionPeriod, final long windowSize, final boolean retainDuplicates, @@ -47,24 +97,143 @@ public class RocksDBTimeOrderedWindowStoreTest extends RocksDBWindowStoreTest { .build(); } - @Override - String getMetricsScope() { - return new RocksDbWindowBytesStoreSupplier(null, 0, 0, 0, false, WindowStoreTypes.TIME_ORDERED_WINDOW_STORE).metricsScope(); + @Test + public void shouldGetAll() { + final long startTime = SEGMENT_INTERVAL - 4L; + + windowStore.put(0, "zero", startTime + 0); + windowStore.put(1, "one", startTime + 1); + windowStore.put(2, "two", startTime + 2); + + final KeyValue<Windowed<Integer>, String> zero = windowedPair(0, "zero", startTime + 0); + final KeyValue<Windowed<Integer>, String> one = windowedPair(1, "one", startTime + 1); + final KeyValue<Windowed<Integer>, String> two = windowedPair(2, "two", startTime + 2); + + assertEquals( + asList(zero, one, two), + toList(windowStore.all()) + ); + } + + @Test + public void shouldGetAllDuplicates() { + final long startTime = SEGMENT_INTERVAL - 4L; + + windowStore.put(0, "zero1", startTime + 0); + windowStore.put(0, "zero2", startTime + 0); + windowStore.put(0, "zero3", startTime + 0); + + final KeyValue<Windowed<Integer>, String> zero1 = windowedPair(0, "zero1", startTime + 0); + final KeyValue<Windowed<Integer>, String> zero2 = windowedPair(0, "zero2", startTime + 0); + final KeyValue<Windowed<Integer>, String> zero3 = windowedPair(0, "zero3", startTime + 0); + + assertEquals( + asList(zero1, zero2, zero3), + toList(windowStore.all()) + ); + } + + @Test + public void shouldGetAllNonDeletedRecords() { + final long startTime = SEGMENT_INTERVAL - 4L; + + // Add some records + windowStore.put(0, "zero", startTime + 0); + windowStore.put(1, "one", startTime + 1); + windowStore.put(2, "two", startTime + 2); + windowStore.put(3, "three", startTime + 3); + windowStore.put(4, "four", startTime + 4); + + // Delete some records + windowStore.put(1, null, startTime + 1); + windowStore.put(3, null, startTime + 3); + + // Only non-deleted records should appear in the all() iterator + final KeyValue<Windowed<Integer>, String> zero = windowedPair(0, "zero", startTime + 0); + final KeyValue<Windowed<Integer>, String> two = windowedPair(2, "two", startTime + 2); + final KeyValue<Windowed<Integer>, String> four = windowedPair(4, "four", startTime + 4); + + assertEquals( + asList(zero, two, four), + toList(windowStore.all()) + ); + } + + @Test + public void shouldDeleteAllDuplicates() { + final long startTime = SEGMENT_INTERVAL - 4L; + + windowStore.put(0, "zero1", startTime + 0); + windowStore.put(0, "zero2", startTime + 0); + windowStore.put(0, "zero3", startTime + 0); + windowStore.put(1, "one1", startTime + 1); + windowStore.put(1, "one2", startTime + 1); + + windowStore.put(0, null, startTime + 0); + + final KeyValue<Windowed<Integer>, String> one1 = windowedPair(1, "one1", startTime + 1); + final KeyValue<Windowed<Integer>, String> one2 = windowedPair(1, "one2", startTime + 1); + + assertEquals( + asList(one1, one2), + toList(windowStore.all()) + ); } - @Override - void setClassLoggerToDebug() { - LogCaptureAppender.setClassLoggerToDebug(AbstractRocksDBSegmentedBytesStore.class); + @Test + public void shouldGetAllReturnTimestampOrderedRecords() { + final long startTime = SEGMENT_INTERVAL - 4L; + + // Add some records in different order + windowStore.put(4, "four", startTime + 4); + windowStore.put(0, "zero", startTime + 0); + windowStore.put(2, "two1", startTime + 2); + windowStore.put(3, "three", startTime + 3); + windowStore.put(1, "one", startTime + 1); + + // Add duplicates + windowStore.put(2, "two2", startTime + 2); + + // Only non-deleted records should appear in the all() iterator + final KeyValue<Windowed<Integer>, String> zero = windowedPair(0, "zero", startTime + 0); + final KeyValue<Windowed<Integer>, String> one = windowedPair(1, "one", startTime + 1); + final KeyValue<Windowed<Integer>, String> two1 = windowedPair(2, "two1", startTime + 2); + final KeyValue<Windowed<Integer>, String> two2 = windowedPair(2, "two2", startTime + 2); + final KeyValue<Windowed<Integer>, String> three = windowedPair(3, "three", startTime + 3); + final KeyValue<Windowed<Integer>, String> four = windowedPair(4, "four", startTime + 4); + + assertEquals( + asList(zero, one, two1, two2, three, four), + toList(windowStore.all()) + ); + } + + @Test + public void shouldEarlyClosedIteratorStillGetAllRecords() { + final long startTime = SEGMENT_INTERVAL - 4L; + + windowStore.put(0, "zero", startTime + 0); + windowStore.put(1, "one", startTime + 1); + + final KeyValue<Windowed<Integer>, String> zero = windowedPair(0, "zero", startTime + 0); + final KeyValue<Windowed<Integer>, String> one = windowedPair(1, "one", startTime + 1); + + final KeyValueIterator<Windowed<Integer>, String> it = windowStore.all(); + assertEquals(zero, it.next()); + it.close(); + + // A new all() iterator after a previous all() iterator was closed should return all elements. + assertEquals( + asList(zero, one), + toList(windowStore.all()) + ); } - @Override - long extractStoreTimestamp(final byte[] binaryKey) { - return TimeOrderedKeySchema.extractStoreTimestamp(binaryKey); + private static <K, V> KeyValue<Windowed<K>, V> windowedPair(final K key, final V value, final long timestamp) { + return windowedPair(key, value, timestamp, WINDOW_SIZE); } - @Override - <K> K extractStoreKey(final byte[] binaryKey, - final StateSerdes<K, ?> serdes) { - return TimeOrderedKeySchema.extractStoreKey(binaryKey, serdes); + private static <K, V> KeyValue<Windowed<K>, V> windowedPair(final K key, final V value, final long timestamp, final long windowSize) { + return KeyValue.pair(new Windowed<>(key, WindowKeySchema.timeWindowForSize(timestamp, windowSize)), value); } } diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedKeySchemaTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedKeySchemaTest.java index 5a5c4fc..03f81e3 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedKeySchemaTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedKeySchemaTest.java @@ -19,20 +19,12 @@ package org.apache.kafka.streams.state.internals; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.utils.Bytes; -import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.kstream.Window; import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.kstream.internals.TimeWindow; import org.apache.kafka.streams.state.StateSerdes; -import org.apache.kafka.test.KeyValueIteratorStub; import org.junit.Test; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; - -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.core.IsEqual.equalTo; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; @@ -44,94 +36,9 @@ public class TimeOrderedKeySchemaTest { final private Window window = new TimeWindow(startTime, endTime); final private Windowed<String> windowedKey = new Windowed<>(key, window); - final private TimeOrderedKeySchema timeOrderedKeySchema = new TimeOrderedKeySchema(); final private StateSerdes<String, byte[]> stateSerdes = new StateSerdes<>("dummy", serde, Serdes.ByteArray()); @Test - public void testHasNextConditionUsingNullKeys() { - final List<KeyValue<Bytes, Integer>> keys = Arrays.asList( - KeyValue.pair(TimeOrderedKeySchema.toStoreKeyBinary(new Windowed<>(Bytes.wrap(new byte[] {0, 0}), new TimeWindow(0, 1)), 0), 1), - KeyValue.pair(TimeOrderedKeySchema.toStoreKeyBinary(new Windowed<>(Bytes.wrap(new byte[] {0}), new TimeWindow(0, 1)), 0), 2), - KeyValue.pair(TimeOrderedKeySchema.toStoreKeyBinary(new Windowed<>(Bytes.wrap(new byte[] {0, 0, 0}), new TimeWindow(0, 1)), 0), 3), - KeyValue.pair(TimeOrderedKeySchema.toStoreKeyBinary(new Windowed<>(Bytes.wrap(new byte[] {0}), new TimeWindow(10, 20)), 4), 4), - KeyValue.pair(TimeOrderedKeySchema.toStoreKeyBinary(new Windowed<>(Bytes.wrap(new byte[] {0, 0}), new TimeWindow(10, 20)), 5), 5), - KeyValue.pair(TimeOrderedKeySchema.toStoreKeyBinary(new Windowed<>(Bytes.wrap(new byte[] {0, 0, 0}), new TimeWindow(10, 20)), 6), 6)); - final DelegatingPeekingKeyValueIterator<Bytes, Integer> iterator = new DelegatingPeekingKeyValueIterator<>("foo", new KeyValueIteratorStub<>(keys.iterator())); - - final HasNextCondition hasNextCondition = timeOrderedKeySchema.hasNextCondition(null, null, 0, Long.MAX_VALUE); - final List<Integer> results = new ArrayList<>(); - while (hasNextCondition.hasNext(iterator)) { - results.add(iterator.next().value); - } - assertThat(results, equalTo(Arrays.asList(1, 2, 3, 4, 5, 6))); - } - - @Test - public void testUpperBoundWithLargeTimestamps() { - final Bytes upper = timeOrderedKeySchema.upperRange(Bytes.wrap(new byte[] {0xA, 0xB, 0xC}), Long.MAX_VALUE); - - assertThat( - "shorter key with max timestamp should be in range", - upper.compareTo( - TimeOrderedKeySchema.toStoreKeyBinary( - new byte[] {0xA}, - Long.MAX_VALUE, - Integer.MAX_VALUE - ) - ) >= 0 - ); - - assertThat( - "shorter key with max timestamp should be in range", - upper.compareTo( - TimeOrderedKeySchema.toStoreKeyBinary( - new byte[] {0xA, 0xB}, - Long.MAX_VALUE, - Integer.MAX_VALUE - ) - ) >= 0 - ); - - assertThat(upper, equalTo(TimeOrderedKeySchema.toStoreKeyBinary(new byte[] {0xA, 0xB, 0xC}, Long.MAX_VALUE, Integer.MAX_VALUE))); - } - - @Test - public void testUpperBoundWithZeroTimestamp() { - final Bytes upper = timeOrderedKeySchema.upperRange(Bytes.wrap(new byte[] {0xA, 0xB, 0xC}), 0); - assertThat(upper, equalTo(TimeOrderedKeySchema.toStoreKeyBinary(new byte[] {0xA, 0xB, 0xC}, 0, Integer.MAX_VALUE))); - } - - @Test - public void testLowerBoundWithZeroTimestamp() { - final Bytes lower = timeOrderedKeySchema.lowerRange(Bytes.wrap(new byte[] {0xA, 0xB, 0xC}), 0); - assertThat(lower, equalTo(TimeOrderedKeySchema.toStoreKeyBinary(new byte[] {0xA, 0xB, 0xC}, 0, 0))); - } - - @Test - public void testLowerBoundWithNonZeroTimestamp() { - final Bytes lower = timeOrderedKeySchema.lowerRange(Bytes.wrap(new byte[] {0xA, 0xB, 0xC}), 42); - assertThat(lower, equalTo(TimeOrderedKeySchema.toStoreKeyBinary(new byte[] {0xA, 0xB, 0xC}, 42, 0))); - } - - @Test - public void testLowerBoundMatchesTrailingZeros() { - final Bytes lower = timeOrderedKeySchema.lowerRange(Bytes.wrap(new byte[] {0xA, 0xB, 0xC}), Long.MAX_VALUE - 1); - - assertThat( - "appending zeros to key should still be in range", - lower.compareTo( - TimeOrderedKeySchema.toStoreKeyBinary( - new byte[] {0xA, 0xB, 0xC, 0, 0, 0, 0, 0, 0, 0, 0}, - Long.MAX_VALUE - 1, - 0 - ) - ) < 0 - ); - - assertThat(lower, equalTo(TimeOrderedKeySchema.toStoreKeyBinary(new byte[] {0xA, 0xB, 0xC}, Long.MAX_VALUE - 1, 0))); - } - - @Test public void shouldConvertToBinaryAndBack() { final Bytes serialized = TimeOrderedKeySchema.toStoreKeyBinary(windowedKey, 0, stateSerdes); final Windowed<String> result = TimeOrderedKeySchema.fromStoreKey(serialized.get(), endTime - startTime, stateSerdes.keyDeserializer(), stateSerdes.topic());