guozhangwang commented on code in PR #12135: URL: https://github.com/apache/kafka/pull/12135#discussion_r872913253
########## streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedCachingPersistentWindowStoreTest.java: ########## @@ -0,0 +1,1238 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import java.util.Collection; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.header.internals.RecordHeaders; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.TestInputTopic; +import org.apache.kafka.streams.TopologyTestDriver; +import org.apache.kafka.streams.errors.InvalidStateStoreException; +import org.apache.kafka.streams.kstream.Consumed; +import org.apache.kafka.streams.kstream.TimeWindowedDeserializer; +import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.kstream.internals.TimeWindow; +import org.apache.kafka.streams.processor.StateStoreContext; +import org.apache.kafka.streams.processor.api.Processor; +import org.apache.kafka.streams.processor.api.ProcessorContext; +import org.apache.kafka.streams.processor.api.Record; +import org.apache.kafka.streams.processor.internals.MockStreamsMetrics; +import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; +import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender; +import org.apache.kafka.streams.query.Position; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.StoreBuilder; +import org.apache.kafka.streams.state.Stores; +import org.apache.kafka.streams.state.TimestampedWindowStore; +import org.apache.kafka.streams.state.ValueAndTimestamp; +import org.apache.kafka.streams.state.WindowStore; +import org.apache.kafka.streams.state.WindowStoreIterator; +import org.apache.kafka.streams.state.internals.PrefixedWindowKeySchemas.KeyFirstWindowKeySchema; +import org.apache.kafka.streams.state.internals.PrefixedWindowKeySchemas.TimeFirstWindowKeySchema; +import org.apache.kafka.test.InternalMockProcessorContext; +import org.apache.kafka.test.TestUtils; +import org.easymock.EasyMock; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.nio.charset.StandardCharsets; +import java.time.Duration; +import java.time.Instant; +import java.util.Arrays; +import java.util.List; +import java.util.Properties; +import java.util.UUID; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameter; + +import static java.time.Duration.ofHours; +import static java.time.Duration.ofMinutes; +import static java.time.Instant.ofEpochMilli; +import static java.util.Arrays.asList; +import static org.apache.kafka.common.utils.Utils.mkEntry; +import static org.apache.kafka.common.utils.Utils.mkMap; +import static org.apache.kafka.streams.state.internals.ThreadCacheTest.memoryCacheEntrySize; +import static org.apache.kafka.test.StreamsTestUtils.toList; +import static org.apache.kafka.test.StreamsTestUtils.verifyAllWindowedKeyValues; +import static org.apache.kafka.test.StreamsTestUtils.verifyKeyValueList; +import static org.apache.kafka.test.StreamsTestUtils.verifyWindowedKeyValue; +import static org.hamcrest.CoreMatchers.containsString; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.hasItem; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertThrows; +import static org.junit.Assert.assertTrue; + +@RunWith(Parameterized.class) +public class TimeOrderedCachingPersistentWindowStoreTest { + + private static final int MAX_CACHE_SIZE_BYTES = 300; + private static final long DEFAULT_TIMESTAMP = 10L; + private static final Long WINDOW_SIZE = 10L; + private static final long SEGMENT_INTERVAL = 100L; + private final static String TOPIC = "topic"; + private static final String CACHE_NAMESPACE = "0_0-store-name"; + + private ThreadCache cache; + private InternalMockProcessorContext context; + private TimeFirstWindowKeySchema baseKeySchema; + private WindowStore<Bytes, byte[]> underlyingStore; + private TimeOrderedCachingWindowStore cachingStore; + private RocksDBTimeOrderedWindowSegmentedBytesStore bytesStore; + private CacheFlushListenerStub<Windowed<String>, String> cacheListener; + + @Parameter + public boolean hasIndex; + + @Parameterized.Parameters(name = "{0}") + public static Collection<Object[]> data() { + return asList(new Object[][] { + {true}, + {false} + }); + } + + @Before + public void setUp() { + baseKeySchema = new TimeFirstWindowKeySchema(); + bytesStore = new RocksDBTimeOrderedWindowSegmentedBytesStore("test", "metrics-scope", 100, SEGMENT_INTERVAL, hasIndex); + underlyingStore = new RocksDBTimeOrderedWindowStore(bytesStore, false, WINDOW_SIZE); + final TimeWindowedDeserializer<String> keyDeserializer = new TimeWindowedDeserializer<>(new StringDeserializer(), WINDOW_SIZE); + keyDeserializer.setIsChangelogTopic(true); + cacheListener = new CacheFlushListenerStub<>(keyDeserializer, new StringDeserializer()); + cachingStore = new TimeOrderedCachingWindowStore(underlyingStore, WINDOW_SIZE, SEGMENT_INTERVAL); + cachingStore.setFlushListener(cacheListener, false); + cache = new ThreadCache(new LogContext("testCache "), MAX_CACHE_SIZE_BYTES, new MockStreamsMetrics(new Metrics())); + context = new InternalMockProcessorContext<>(TestUtils.tempDirectory(), null, null, null, cache); + context.setRecordContext(new ProcessorRecordContext(DEFAULT_TIMESTAMP, 0, 0, TOPIC, new RecordHeaders())); + cachingStore.init((StateStoreContext) context, cachingStore); + } + + @After + public void closeStore() { + cachingStore.close(); + } + + @SuppressWarnings("deprecation") + @Test + public void shouldDelegateDeprecatedInit() { + final RocksDBTimeOrderedWindowStore inner = EasyMock.mock(RocksDBTimeOrderedWindowStore.class); + EasyMock.expect(inner.hasIndex()).andReturn(hasIndex); + EasyMock.replay(inner); + final TimeOrderedCachingWindowStore outer = new TimeOrderedCachingWindowStore(inner, WINDOW_SIZE, SEGMENT_INTERVAL); + + EasyMock.reset(inner); + EasyMock.expect(inner.name()).andStubReturn("store"); + inner.init((org.apache.kafka.streams.processor.ProcessorContext) context, outer); + EasyMock.expectLastCall(); + EasyMock.replay(inner); + outer.init((org.apache.kafka.streams.processor.ProcessorContext) context, outer); + EasyMock.verify(inner); + } + + @Test + public void shouldDelegateInit() { + final RocksDBTimeOrderedWindowStore inner = EasyMock.mock(RocksDBTimeOrderedWindowStore.class); + EasyMock.expect(inner.hasIndex()).andReturn(hasIndex); + EasyMock.replay(inner); + final TimeOrderedCachingWindowStore outer = new TimeOrderedCachingWindowStore(inner, WINDOW_SIZE, SEGMENT_INTERVAL); + + EasyMock.reset(inner); + EasyMock.expect(inner.name()).andStubReturn("store"); + inner.init((StateStoreContext) context, outer); + EasyMock.expectLastCall(); + EasyMock.replay(inner); + outer.init((StateStoreContext) context, outer); + EasyMock.verify(inner); + } + + @Test + public void shouldThrowIfWrongStore() { + final RocksDBTimestampedWindowStore innerWrong = EasyMock.mock(RocksDBTimestampedWindowStore.class); + final Exception e = assertThrows(IllegalArgumentException.class, + () -> new TimeOrderedCachingWindowStore(innerWrong, WINDOW_SIZE, SEGMENT_INTERVAL)); + assertThat(e.getMessage(), + containsString("TimeOrderedCachingWindowStore only supports RocksDBTimeOrderedWindowStore backed store")); + + final RocksDBTimeOrderedWindowStore inner = EasyMock.mock(RocksDBTimeOrderedWindowStore.class); + // Nothing happens + new TimeOrderedCachingWindowStore(inner, WINDOW_SIZE, SEGMENT_INTERVAL); + } + + @Test + public void shouldNotReturnDuplicatesInRanges() { + final StreamsBuilder builder = new StreamsBuilder(); + + final StoreBuilder<TimestampedWindowStore<String, String>> storeBuilder = Stores.timestampedWindowStoreBuilder( + RocksDbIndexedTimeOrderedWindowBytesStoreSupplier.create( + "store-name", + ofHours(1L), + ofMinutes(1), + false, + hasIndex + ), Serdes.String(), Serdes.String()) + .withCachingEnabled(); + + builder.addStateStore(storeBuilder); + + builder.stream(TOPIC, + Consumed.with(Serdes.String(), Serdes.String())) + .process(() -> new Processor<String, String, String, String>() { + private int numRecordsProcessed; + private WindowStore<String, ValueAndTimestamp<String>> store; + + @Override + public void init(final ProcessorContext<String, String> processorContext) { + this.store = processorContext.getStateStore("store-name"); + int count = 0; + + try (final KeyValueIterator<Windowed<String>, ValueAndTimestamp<String>> all = store.all()) { + while (all.hasNext()) { + count++; + all.next(); + } + } + + assertThat(count, equalTo(0)); + } + + @Override + public void process(final Record<String, String> record) { + int count = 0; + + try (final KeyValueIterator<Windowed<String>, ValueAndTimestamp<String>> all = store.all()) { + while (all.hasNext()) { + count++; + all.next(); + } + } + + assertThat(count, equalTo(numRecordsProcessed)); + + store.put(record.value(), ValueAndTimestamp.make(record.value(), record.timestamp()), record.timestamp()); + + numRecordsProcessed++; + } + + }, "store-name"); + + final Properties streamsConfiguration = new Properties(); + streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); + streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); + streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()); + streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10 * 1000L); + + final Instant initialWallClockTime = Instant.ofEpochMilli(0L); + final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), streamsConfiguration, initialWallClockTime); + + final TestInputTopic<String, String> inputTopic = driver.createInputTopic(TOPIC, + Serdes.String().serializer(), + Serdes.String().serializer(), + initialWallClockTime, + Duration.ZERO); + + for (int i = 0; i < 5; i++) { + inputTopic.pipeInput(UUID.randomUUID().toString(), UUID.randomUUID().toString()); + } + driver.advanceWallClockTime(Duration.ofSeconds(10)); + inputTopic.advanceTime(Duration.ofSeconds(10)); + for (int i = 0; i < 5; i++) { + inputTopic.pipeInput(UUID.randomUUID().toString(), UUID.randomUUID().toString()); + } + driver.advanceWallClockTime(Duration.ofSeconds(10)); + inputTopic.advanceTime(Duration.ofSeconds(10)); + for (int i = 0; i < 5; i++) { + inputTopic.pipeInput(UUID.randomUUID().toString(), UUID.randomUUID().toString()); + } + driver.advanceWallClockTime(Duration.ofSeconds(10)); + inputTopic.advanceTime(Duration.ofSeconds(10)); + for (int i = 0; i < 5; i++) { + inputTopic.pipeInput(UUID.randomUUID().toString(), UUID.randomUUID().toString()); + } + + driver.close(); + } + + @Test + public void shouldPutFetchFromCache() { + cachingStore.put(bytesKey("a"), bytesValue("a"), DEFAULT_TIMESTAMP); + cachingStore.put(bytesKey("b"), bytesValue("b"), DEFAULT_TIMESTAMP); + + assertThat(cachingStore.fetch(bytesKey("a"), 10), equalTo(bytesValue("a"))); + assertThat(cachingStore.fetch(bytesKey("b"), 10), equalTo(bytesValue("b"))); + assertThat(cachingStore.fetch(bytesKey("c"), 10), equalTo(null)); + assertThat(cachingStore.fetch(bytesKey("a"), 0), equalTo(null)); + + try (final WindowStoreIterator<byte[]> a = cachingStore.fetch(bytesKey("a"), ofEpochMilli(10), ofEpochMilli(10)); + final WindowStoreIterator<byte[]> b = cachingStore.fetch(bytesKey("b"), ofEpochMilli(10), ofEpochMilli(10))) { + verifyKeyValue(a.next(), DEFAULT_TIMESTAMP, "a"); + verifyKeyValue(b.next(), DEFAULT_TIMESTAMP, "b"); + assertFalse(a.hasNext()); + assertFalse(b.hasNext()); + final int expectedSize = hasIndex ? 4 : 2; + assertEquals(expectedSize, cache.size()); + } + } + + @Test + public void shouldMatchPositionAfterPutWithFlushListener() { + cachingStore.setFlushListener(record -> { }, false); + shouldMatchPositionAfterPut(); + } + + @Test + public void shouldMatchPositionAfterPutWithoutFlushListener() { + cachingStore.setFlushListener(null, false); + shouldMatchPositionAfterPut(); + } + + private void shouldMatchPositionAfterPut() { + context.setRecordContext(new ProcessorRecordContext(0, 1, 0, "", new RecordHeaders())); + cachingStore.put(bytesKey("key1"), bytesValue("value1"), DEFAULT_TIMESTAMP); + context.setRecordContext(new ProcessorRecordContext(0, 2, 0, "", new RecordHeaders())); + cachingStore.put(bytesKey("key2"), bytesValue("value2"), DEFAULT_TIMESTAMP); + + // Position should correspond to the last record's context, not the current context. + context.setRecordContext( + new ProcessorRecordContext(0, 3, 0, "", new RecordHeaders()) + ); + + // the caching window store doesn't maintain a separate + // position because it never serves queries from the cache + assertEquals(Position.emptyPosition(), cachingStore.getPosition()); + assertEquals(Position.emptyPosition(), underlyingStore.getPosition()); + + cachingStore.flush(); + + assertEquals( + Position.fromMap(mkMap(mkEntry("", mkMap(mkEntry(0, 2L))))), + cachingStore.getPosition() + ); + assertEquals( + Position.fromMap(mkMap(mkEntry("", mkMap(mkEntry(0, 2L))))), + underlyingStore.getPosition() + ); + } + + private void verifyKeyValue(final KeyValue<Long, byte[]> next, + final long expectedKey, + final String expectedValue) { + assertThat(next.key, equalTo(expectedKey)); + assertThat(next.value, equalTo(bytesValue(expectedValue))); + } + + private static byte[] bytesValue(final String value) { + return value.getBytes(); + } + + private static Bytes bytesKey(final String key) { + return Bytes.wrap(key.getBytes()); + } + + private String stringFrom(final byte[] from) { + return Serdes.String().deserializer().deserialize("", from); + } + + @Test + public void shouldPutFetchRangeFromCache() { + cachingStore.put(bytesKey("a"), bytesValue("a"), DEFAULT_TIMESTAMP); + cachingStore.put(bytesKey("b"), bytesValue("b"), DEFAULT_TIMESTAMP); + + try (final KeyValueIterator<Windowed<Bytes>, byte[]> iterator = + cachingStore.fetch(bytesKey("a"), bytesKey("b"), ofEpochMilli(DEFAULT_TIMESTAMP), ofEpochMilli(DEFAULT_TIMESTAMP))) { + final List<Windowed<Bytes>> expectedKeys = Arrays.asList( + new Windowed<>(bytesKey("a"), new TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE)), + new Windowed<>(bytesKey("b"), new TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE)) + ); + + final List<String> expectedValues = Arrays.asList("a", "b"); + + verifyAllWindowedKeyValues(iterator, expectedKeys, expectedValues); + final int expectedSize = hasIndex ? 4 : 2; + assertEquals(expectedSize, cache.size()); + } + } + + @Test + public void shouldPutFetchRangeFromCacheForNullKeyFrom() { + cachingStore.put(bytesKey("a"), bytesValue("a"), DEFAULT_TIMESTAMP); + cachingStore.put(bytesKey("b"), bytesValue("b"), DEFAULT_TIMESTAMP); + cachingStore.put(bytesKey("c"), bytesValue("c"), DEFAULT_TIMESTAMP + 10L); + cachingStore.put(bytesKey("d"), bytesValue("d"), DEFAULT_TIMESTAMP + 20L); + cachingStore.put(bytesKey("e"), bytesValue("e"), DEFAULT_TIMESTAMP + 20L); + + try (final KeyValueIterator<Windowed<Bytes>, byte[]> iterator = + cachingStore.fetch(null, bytesKey("d"), ofEpochMilli(DEFAULT_TIMESTAMP), ofEpochMilli(DEFAULT_TIMESTAMP + 20L))) { + final List<Windowed<Bytes>> expectedKeys = Arrays.asList( + new Windowed<>(bytesKey("a"), new TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE)), + new Windowed<>(bytesKey("b"), new TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE)), + new Windowed<>(bytesKey("c"), new TimeWindow(DEFAULT_TIMESTAMP + 10L, DEFAULT_TIMESTAMP + 10L + WINDOW_SIZE)), + new Windowed<>(bytesKey("d"), new TimeWindow(DEFAULT_TIMESTAMP + 20L, DEFAULT_TIMESTAMP + 20L + WINDOW_SIZE)) + ); + + final List<String> expectedValues = Arrays.asList("a", "b", "c", "d"); + + verifyAllWindowedKeyValues(iterator, expectedKeys, expectedValues); + } + } + + @Test + public void shouldPutFetchRangeFromCacheForNullKeyTo() { + cachingStore.put(bytesKey("a"), bytesValue("a"), DEFAULT_TIMESTAMP); + cachingStore.put(bytesKey("b"), bytesValue("b"), DEFAULT_TIMESTAMP); + cachingStore.put(bytesKey("c"), bytesValue("c"), DEFAULT_TIMESTAMP + 10L); + cachingStore.put(bytesKey("d"), bytesValue("d"), DEFAULT_TIMESTAMP + 20L); + cachingStore.put(bytesKey("e"), bytesValue("e"), DEFAULT_TIMESTAMP + 20L); + + try (final KeyValueIterator<Windowed<Bytes>, byte[]> iterator = + cachingStore.fetch(bytesKey("b"), null, ofEpochMilli(DEFAULT_TIMESTAMP), ofEpochMilli(DEFAULT_TIMESTAMP + 20L))) { + final List<Windowed<Bytes>> expectedKeys = Arrays.asList( + new Windowed<>(bytesKey("b"), new TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE)), + new Windowed<>(bytesKey("c"), new TimeWindow(DEFAULT_TIMESTAMP + 10L, DEFAULT_TIMESTAMP + 10L + WINDOW_SIZE)), + new Windowed<>(bytesKey("d"), new TimeWindow(DEFAULT_TIMESTAMP + 20L, DEFAULT_TIMESTAMP + 20L + WINDOW_SIZE)), + new Windowed<>(bytesKey("e"), new TimeWindow(DEFAULT_TIMESTAMP + 20L, DEFAULT_TIMESTAMP + 20L + WINDOW_SIZE)) + ); + + final List<String> expectedValues = Arrays.asList("b", "c", "d", "e"); + + verifyAllWindowedKeyValues(iterator, expectedKeys, expectedValues); + } + } + + @Test + public void shouldPutFetchRangeFromCacheForNullKeyFromKeyTo() { + cachingStore.put(bytesKey("a"), bytesValue("a"), DEFAULT_TIMESTAMP); + cachingStore.put(bytesKey("b"), bytesValue("b"), DEFAULT_TIMESTAMP); + cachingStore.put(bytesKey("c"), bytesValue("c"), DEFAULT_TIMESTAMP + 10L); + cachingStore.put(bytesKey("d"), bytesValue("d"), DEFAULT_TIMESTAMP + 20L); + cachingStore.put(bytesKey("e"), bytesValue("e"), DEFAULT_TIMESTAMP + 20L); + + try (final KeyValueIterator<Windowed<Bytes>, byte[]> iterator = + cachingStore.fetch(null, null, ofEpochMilli(DEFAULT_TIMESTAMP), ofEpochMilli(DEFAULT_TIMESTAMP + 20L))) { + final List<Windowed<Bytes>> expectedKeys = Arrays.asList( + new Windowed<>(bytesKey("a"), new TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE)), + new Windowed<>(bytesKey("b"), new TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE)), + new Windowed<>(bytesKey("c"), new TimeWindow(DEFAULT_TIMESTAMP + 10L, DEFAULT_TIMESTAMP + 10L + WINDOW_SIZE)), + new Windowed<>(bytesKey("d"), new TimeWindow(DEFAULT_TIMESTAMP + 20L, DEFAULT_TIMESTAMP + 20L + WINDOW_SIZE)), + new Windowed<>(bytesKey("e"), new TimeWindow(DEFAULT_TIMESTAMP + 20L, DEFAULT_TIMESTAMP + 20L + WINDOW_SIZE)) + ); + + final List<String> expectedValues = Arrays.asList("a", "b", "c", "d", "e"); + + verifyAllWindowedKeyValues(iterator, expectedKeys, expectedValues); + } + } + + @Test + public void shouldPutBackwardFetchRangeFromCacheForNullKeyFrom() { + cachingStore.put(bytesKey("a"), bytesValue("a"), DEFAULT_TIMESTAMP); + cachingStore.put(bytesKey("b"), bytesValue("b"), DEFAULT_TIMESTAMP); + cachingStore.put(bytesKey("c"), bytesValue("c"), DEFAULT_TIMESTAMP + 10L); + cachingStore.put(bytesKey("d"), bytesValue("d"), DEFAULT_TIMESTAMP + 20L); + cachingStore.put(bytesKey("e"), bytesValue("e"), DEFAULT_TIMESTAMP + 20L); + + try (final KeyValueIterator<Windowed<Bytes>, byte[]> iterator = + cachingStore.backwardFetch(null, bytesKey("c"), ofEpochMilli(DEFAULT_TIMESTAMP), ofEpochMilli(DEFAULT_TIMESTAMP + 20L))) { + final List<Windowed<Bytes>> expectedKeys = Arrays.asList( + new Windowed<>(bytesKey("c"), new TimeWindow(DEFAULT_TIMESTAMP + 10L, DEFAULT_TIMESTAMP + 10L + WINDOW_SIZE)), + new Windowed<>(bytesKey("b"), new TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE)), + new Windowed<>(bytesKey("a"), new TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE)) + ); + + final List<String> expectedValues = Arrays.asList("c", "b", "a"); + + verifyAllWindowedKeyValues(iterator, expectedKeys, expectedValues); + } + } + + @Test + public void shouldPutBackwardFetchRangeFromCacheForNullKeyTo() { + cachingStore.put(bytesKey("a"), bytesValue("a"), DEFAULT_TIMESTAMP); + cachingStore.put(bytesKey("b"), bytesValue("b"), DEFAULT_TIMESTAMP); + cachingStore.put(bytesKey("c"), bytesValue("c"), DEFAULT_TIMESTAMP + 10L); + cachingStore.put(bytesKey("d"), bytesValue("d"), DEFAULT_TIMESTAMP + 20L); + cachingStore.put(bytesKey("e"), bytesValue("e"), DEFAULT_TIMESTAMP + 20L); + + try (final KeyValueIterator<Windowed<Bytes>, byte[]> iterator = + cachingStore.backwardFetch(bytesKey("c"), null, ofEpochMilli(DEFAULT_TIMESTAMP), ofEpochMilli(DEFAULT_TIMESTAMP + 20L))) { + final List<Windowed<Bytes>> expectedKeys = Arrays.asList( + new Windowed<>(bytesKey("e"), new TimeWindow(DEFAULT_TIMESTAMP + 20L, DEFAULT_TIMESTAMP + 20L + WINDOW_SIZE)), + new Windowed<>(bytesKey("d"), new TimeWindow(DEFAULT_TIMESTAMP + 20L, DEFAULT_TIMESTAMP + 20L + WINDOW_SIZE)), + new Windowed<>(bytesKey("c"), new TimeWindow(DEFAULT_TIMESTAMP + 10L, DEFAULT_TIMESTAMP + 10L + WINDOW_SIZE)) + ); + + final List<String> expectedValues = Arrays.asList("e", "d", "c"); + + verifyAllWindowedKeyValues(iterator, expectedKeys, expectedValues); + } + } + + @Test + public void shouldPutBackwardFetchRangeFromCacheForNullKeyFromKeyTo() { + cachingStore.put(bytesKey("a"), bytesValue("a"), DEFAULT_TIMESTAMP); + cachingStore.put(bytesKey("b"), bytesValue("b"), DEFAULT_TIMESTAMP); + cachingStore.put(bytesKey("c"), bytesValue("c"), DEFAULT_TIMESTAMP + 10L); + cachingStore.put(bytesKey("d"), bytesValue("d"), DEFAULT_TIMESTAMP + 20L); + cachingStore.put(bytesKey("e"), bytesValue("e"), DEFAULT_TIMESTAMP + 20L); + + try (final KeyValueIterator<Windowed<Bytes>, byte[]> iterator = + cachingStore.backwardFetch(null, null, ofEpochMilli(DEFAULT_TIMESTAMP), ofEpochMilli(DEFAULT_TIMESTAMP + 20L))) { + final List<Windowed<Bytes>> expectedKeys = Arrays.asList( + new Windowed<>(bytesKey("e"), new TimeWindow(DEFAULT_TIMESTAMP + 20L, DEFAULT_TIMESTAMP + 20L + WINDOW_SIZE)), + new Windowed<>(bytesKey("d"), new TimeWindow(DEFAULT_TIMESTAMP + 20L, DEFAULT_TIMESTAMP + 20L + WINDOW_SIZE)), + new Windowed<>(bytesKey("c"), new TimeWindow(DEFAULT_TIMESTAMP + 10L, DEFAULT_TIMESTAMP + 10L + WINDOW_SIZE)), + new Windowed<>(bytesKey("b"), new TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE)), + new Windowed<>(bytesKey("a"), new TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE)) + ); + + final List<String> expectedValues = Arrays.asList("e", "d", "c", "b", "a"); + + verifyAllWindowedKeyValues(iterator, expectedKeys, expectedValues); + } + } + + @Test + public void shouldGetAllFromCache() { + cachingStore.put(bytesKey("a"), bytesValue("a"), DEFAULT_TIMESTAMP); + cachingStore.put(bytesKey("b"), bytesValue("b"), DEFAULT_TIMESTAMP); + cachingStore.put(bytesKey("c"), bytesValue("c"), DEFAULT_TIMESTAMP); + cachingStore.put(bytesKey("d"), bytesValue("d"), DEFAULT_TIMESTAMP); + cachingStore.put(bytesKey("e"), bytesValue("e"), DEFAULT_TIMESTAMP); + cachingStore.put(bytesKey("f"), bytesValue("f"), DEFAULT_TIMESTAMP); + cachingStore.put(bytesKey("g"), bytesValue("g"), DEFAULT_TIMESTAMP); + cachingStore.put(bytesKey("h"), bytesValue("h"), DEFAULT_TIMESTAMP); + + try (final KeyValueIterator<Windowed<Bytes>, byte[]> iterator = cachingStore.all()) { + final String[] array = {"a", "b", "c", "d", "e", "f", "g", "h"}; + for (final String s : array) { + verifyWindowedKeyValue( + iterator.next(), + new Windowed<>(bytesKey(s), new TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE)), + s); + } + assertFalse(iterator.hasNext()); + } + } + + @Test + public void shouldGetAllBackwardFromCache() { + cachingStore.put(bytesKey("a"), bytesValue("a"), DEFAULT_TIMESTAMP); + cachingStore.put(bytesKey("b"), bytesValue("b"), DEFAULT_TIMESTAMP); + cachingStore.put(bytesKey("c"), bytesValue("c"), DEFAULT_TIMESTAMP); + cachingStore.put(bytesKey("d"), bytesValue("d"), DEFAULT_TIMESTAMP); + cachingStore.put(bytesKey("e"), bytesValue("e"), DEFAULT_TIMESTAMP); + cachingStore.put(bytesKey("f"), bytesValue("f"), DEFAULT_TIMESTAMP); + cachingStore.put(bytesKey("g"), bytesValue("g"), DEFAULT_TIMESTAMP); + cachingStore.put(bytesKey("h"), bytesValue("h"), DEFAULT_TIMESTAMP); + + try (final KeyValueIterator<Windowed<Bytes>, byte[]> iterator = cachingStore.backwardAll()) { + final String[] array = {"h", "g", "f", "e", "d", "c", "b", "a"}; + for (final String s : array) { + verifyWindowedKeyValue( + iterator.next(), + new Windowed<>(bytesKey(s), new TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE)), + s); + } + assertFalse(iterator.hasNext()); + } + } + + @Test + public void shouldFetchAllWithinTimestampRange() { + final String[] array = {"a", "b", "c", "d", "e", "f", "g", "h"}; + for (int i = 0; i < array.length; i++) { + cachingStore.put(bytesKey(array[i]), bytesValue(array[i]), i); + } + + try (final KeyValueIterator<Windowed<Bytes>, byte[]> iterator = + cachingStore.fetchAll(ofEpochMilli(0), ofEpochMilli(7))) { + for (int i = 0; i < array.length; i++) { + final String str = array[i]; + verifyWindowedKeyValue( + iterator.next(), + new Windowed<>(bytesKey(str), new TimeWindow(i, i + WINDOW_SIZE)), + str); + } + assertFalse(iterator.hasNext()); + } + + try (final KeyValueIterator<Windowed<Bytes>, byte[]> iterator1 = + cachingStore.fetchAll(ofEpochMilli(2), ofEpochMilli(4))) { + for (int i = 2; i <= 4; i++) { + final String str = array[i]; + verifyWindowedKeyValue( + iterator1.next(), + new Windowed<>(bytesKey(str), new TimeWindow(i, i + WINDOW_SIZE)), + str); + } + assertFalse(iterator1.hasNext()); + } + + try (final KeyValueIterator<Windowed<Bytes>, byte[]> iterator2 = + cachingStore.fetchAll(ofEpochMilli(5), ofEpochMilli(7))) { + for (int i = 5; i <= 7; i++) { + final String str = array[i]; + verifyWindowedKeyValue( + iterator2.next(), + new Windowed<>(bytesKey(str), new TimeWindow(i, i + WINDOW_SIZE)), + str); + } + assertFalse(iterator2.hasNext()); + } + } + + @Test + public void shouldFetchAllBackwardWithinTimestampRange() { + final String[] array = {"a", "b", "c", "d", "e", "f", "g", "h"}; + for (int i = 0; i < array.length; i++) { + cachingStore.put(bytesKey(array[i]), bytesValue(array[i]), i); + } + + try (final KeyValueIterator<Windowed<Bytes>, byte[]> iterator = + cachingStore.backwardFetchAll(ofEpochMilli(0), ofEpochMilli(7))) { + for (int i = array.length - 1; i >= 0; i--) { + final String str = array[i]; + verifyWindowedKeyValue( + iterator.next(), + new Windowed<>(bytesKey(str), new TimeWindow(i, i + WINDOW_SIZE)), + str); + } + assertFalse(iterator.hasNext()); + } + + try (final KeyValueIterator<Windowed<Bytes>, byte[]> iterator1 = + cachingStore.backwardFetchAll(ofEpochMilli(2), ofEpochMilli(4))) { + for (int i = 4; i >= 2; i--) { + final String str = array[i]; + verifyWindowedKeyValue( + iterator1.next(), + new Windowed<>(bytesKey(str), new TimeWindow(i, i + WINDOW_SIZE)), + str); + } + assertFalse(iterator1.hasNext()); + } + + try (final KeyValueIterator<Windowed<Bytes>, byte[]> iterator2 = + cachingStore.backwardFetchAll(ofEpochMilli(5), ofEpochMilli(7))) { + for (int i = 7; i >= 5; i--) { + final String str = array[i]; + verifyWindowedKeyValue( + iterator2.next(), + new Windowed<>(bytesKey(str), new TimeWindow(i, i + WINDOW_SIZE)), + str); + } + assertFalse(iterator2.hasNext()); + } + } + + @Test + public void shouldFlushEvictedItemsIntoUnderlyingStore() { + final int added = addItemsToCache(); + // all dirty entries should have been flushed + try (final KeyValueIterator<Bytes, byte[]> iter = bytesStore.fetch( + Bytes.wrap("0".getBytes(StandardCharsets.UTF_8)), + DEFAULT_TIMESTAMP, + DEFAULT_TIMESTAMP)) { + final KeyValue<Bytes, byte[]> next = iter.next(); + assertEquals(DEFAULT_TIMESTAMP, baseKeySchema.segmentTimestamp(next.key)); + assertArrayEquals("0".getBytes(), next.value); + assertFalse(iter.hasNext()); + assertEquals(added - 1, cache.size()); + } + } + + @Test + public void shouldForwardDirtyItemsWhenFlushCalled() { + final Windowed<String> windowedKey = + new Windowed<>("1", new TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE)); + cachingStore.put(bytesKey("1"), bytesValue("a"), DEFAULT_TIMESTAMP); + cachingStore.flush(); + assertEquals("a", cacheListener.forwarded.get(windowedKey).newValue); + assertNull(cacheListener.forwarded.get(windowedKey).oldValue); + } + + @Test + public void shouldSetFlushListener() { + assertTrue(cachingStore.setFlushListener(null, true)); + assertTrue(cachingStore.setFlushListener(null, false)); + } + + @Test + public void shouldForwardOldValuesWhenEnabled() { + cachingStore.setFlushListener(cacheListener, true); + final Windowed<String> windowedKey = + new Windowed<>("1", new TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE)); + cachingStore.put(bytesKey("1"), bytesValue("a"), DEFAULT_TIMESTAMP); + cachingStore.put(bytesKey("1"), bytesValue("b"), DEFAULT_TIMESTAMP); + cachingStore.flush(); + assertEquals("b", cacheListener.forwarded.get(windowedKey).newValue); + assertNull(cacheListener.forwarded.get(windowedKey).oldValue); + cacheListener.forwarded.clear(); + cachingStore.put(bytesKey("1"), bytesValue("c"), DEFAULT_TIMESTAMP); + cachingStore.flush(); + assertEquals("c", cacheListener.forwarded.get(windowedKey).newValue); + assertEquals("b", cacheListener.forwarded.get(windowedKey).oldValue); + cachingStore.put(bytesKey("1"), null, DEFAULT_TIMESTAMP); + cachingStore.flush(); + assertNull(cacheListener.forwarded.get(windowedKey).newValue); + assertEquals("c", cacheListener.forwarded.get(windowedKey).oldValue); + cacheListener.forwarded.clear(); + cachingStore.put(bytesKey("1"), bytesValue("a"), DEFAULT_TIMESTAMP); + cachingStore.put(bytesKey("1"), bytesValue("b"), DEFAULT_TIMESTAMP); + cachingStore.put(bytesKey("1"), null, DEFAULT_TIMESTAMP); + cachingStore.flush(); + assertNull(cacheListener.forwarded.get(windowedKey)); + cacheListener.forwarded.clear(); + } + + @Test + public void shouldForwardOldValuesWhenDisabled() { Review Comment: It should be `shouldNot..` will update. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
