ableegoldman commented on a change in pull request #10877:
URL: https://github.com/apache/kafka/pull/10877#discussion_r668417652
##########
File path:
streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryNavigableLRUCache.java
##########
@@ -66,6 +68,20 @@ public MemoryNavigableLRUCache(final String name, final int
maxCacheSize) {
.subMap(from, true, to, true).descendingKeySet().iterator(),
treeMap));
}
+ @Override
+ public <PS extends Serializer<P>, P> KeyValueIterator<Bytes, byte[]>
prefixScan(final P prefix, final PS prefixKeySerializer) {
Review comment:
Can you add a test or two for this in `InMemoryLRUCacheStoreTest`?
Alternatively, I think you should be able to just collect all the tests in
`InMemoryKeyValueStoreTest` and `CachingInMemoryKeyValueStoreTest` and move
them over to
`AbstractKeyValueStoreTest` instead. That way you get test coverage for both
of those plus a handful of other store classes at once, without having to copy
the same test over and over across a bunch of different files.
##########
File path:
streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
##########
@@ -26,13 +26,15 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.Iterator;
-import java.util.List;
import java.util.NavigableMap;
-import java.util.Set;
import java.util.TreeMap;
+import java.util.List;
Review comment:
Can you revert the changes in this file?
##########
File path:
streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
##########
@@ -383,6 +387,1002 @@ public void
testDrivingConnectedStateStoreInDifferentProcessorsTopology() {
assertNull(store.get("key4"));
}
+ @Test
+ public void testPrefixScanInMemoryStoreNoCachingNoLogging() {
+ final String storeName = "prefixScanStore";
+ final StoreBuilder<KeyValueStore<String, String>> storeBuilder =
+
Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore(storeName),
Serdes.String(), Serdes.String())
+ .withCachingDisabled()
+ .withLoggingDisabled();
+ topology
+ .addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER,
INPUT_TOPIC_1)
+ .addProcessor("processor1", defineWithStores(() -> new
StatefulProcessor(storeName), Collections.singleton(storeBuilder)), "source1")
+ .addSink("counts", OUTPUT_TOPIC_1, "processor1");
+
+ driver = new TopologyTestDriver(topology, props);
+
+ final TestInputTopic<String, String> inputTopic =
driver.createInputTopic(INPUT_TOPIC_1, STRING_SERIALIZER, STRING_SERIALIZER);
+ final TestOutputTopic<Integer, String> outputTopic1 =
+ driver.createOutputTopic(OUTPUT_TOPIC_1,
Serdes.Integer().deserializer(), Serdes.String().deserializer());
+
+ inputTopic.pipeInput("key1", "value1");
+ inputTopic.pipeInput("key2", "value2");
+ inputTopic.pipeInput("key3", "value3");
+ inputTopic.pipeInput("key1", "value4");
+ assertTrue(outputTopic1.isEmpty());
+
+ final KeyValueStore<String, String> store =
driver.getKeyValueStore("prefixScanStore");
+ final KeyValueIterator<String, String> prefixScan =
store.prefixScan("key", Serdes.String().serializer());
+ final List<KeyValue<String, String>> results = new ArrayList<>();
+ while (prefixScan.hasNext()) {
+ final KeyValue<String, String> next = prefixScan.next();
+ results.add(next);
+ }
+
+ assertEquals("key1", results.get(0).key);
+ assertEquals("value4", results.get(0).value);
+ assertEquals("key2", results.get(1).key);
+ assertEquals("value2", results.get(1).value);
+ assertEquals("key3", results.get(2).key);
+ assertEquals("value3", results.get(2).value);
+
+ }
+
+ @Test
+ public void testPrefixScanInMemoryStoreWithCachingNoLogging() {
+ final String storeName = "prefixScanStore";
+ final StoreBuilder<KeyValueStore<String, String>> storeBuilder =
+
Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore(storeName),
Serdes.String(), Serdes.String())
+ .withCachingEnabled()
+ .withLoggingDisabled();
+ topology
+ .addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER,
INPUT_TOPIC_1)
+ .addProcessor("processor1", defineWithStores(() -> new
StatefulProcessor(storeName), Collections.singleton(storeBuilder)), "source1")
+ .addSink("counts", OUTPUT_TOPIC_1, "processor1");
+
+ driver = new TopologyTestDriver(topology, props);
+
+ final TestInputTopic<String, String> inputTopic =
driver.createInputTopic(INPUT_TOPIC_1, STRING_SERIALIZER, STRING_SERIALIZER);
+ final TestOutputTopic<Integer, String> outputTopic1 =
+ driver.createOutputTopic(OUTPUT_TOPIC_1,
Serdes.Integer().deserializer(), Serdes.String().deserializer());
+
+ inputTopic.pipeInput("key1", "value1");
+ inputTopic.pipeInput("key2", "value2");
+ inputTopic.pipeInput("key3", "value3");
+ inputTopic.pipeInput("key1", "value4");
+ assertTrue(outputTopic1.isEmpty());
+
+ final KeyValueStore<String, String> store =
driver.getKeyValueStore("prefixScanStore");
+ final KeyValueIterator<String, String> prefixScan =
store.prefixScan("key", Serdes.String().serializer());
+ final List<KeyValue<String, String>> results = new ArrayList<>();
+ while (prefixScan.hasNext()) {
+ final KeyValue<String, String> next = prefixScan.next();
+ results.add(next);
+ }
+
+ assertEquals("key1", results.get(0).key);
+ assertEquals("value4", results.get(0).value);
+ assertEquals("key2", results.get(1).key);
+ assertEquals("value2", results.get(1).value);
+ assertEquals("key3", results.get(2).key);
+ assertEquals("value3", results.get(2).value);
+
+ }
+
+ @Test
+ public void testPrefixScanInMemoryStoreWithCachingWithLogging() {
+ final String storeName = "prefixScanStore";
+ final StoreBuilder<KeyValueStore<String, String>> storeBuilder =
+
Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore(storeName),
Serdes.String(), Serdes.String())
+ .withCachingEnabled()
+ .withLoggingEnabled(Collections.emptyMap());
+ topology
+ .addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER,
INPUT_TOPIC_1)
+ .addProcessor("processor1", defineWithStores(() -> new
StatefulProcessor(storeName), Collections.singleton(storeBuilder)), "source1")
+ .addSink("counts", OUTPUT_TOPIC_1, "processor1");
+
+ driver = new TopologyTestDriver(topology, props);
+
+ final TestInputTopic<String, String> inputTopic =
driver.createInputTopic(INPUT_TOPIC_1, STRING_SERIALIZER, STRING_SERIALIZER);
+ final TestOutputTopic<Integer, String> outputTopic1 =
+ driver.createOutputTopic(OUTPUT_TOPIC_1,
Serdes.Integer().deserializer(), Serdes.String().deserializer());
+
+ inputTopic.pipeInput("key1", "value1");
+ inputTopic.pipeInput("key2", "value2");
+ inputTopic.pipeInput("key3", "value3");
+ inputTopic.pipeInput("key1", "value4");
+ assertTrue(outputTopic1.isEmpty());
+
+ final KeyValueStore<String, String> store =
driver.getKeyValueStore("prefixScanStore");
+ final KeyValueIterator<String, String> prefixScan =
store.prefixScan("key", Serdes.String().serializer());
+ final List<KeyValue<String, String>> results = new ArrayList<>();
+ while (prefixScan.hasNext()) {
+ final KeyValue<String, String> next = prefixScan.next();
+ results.add(next);
+ }
+
+ assertEquals("key1", results.get(0).key);
+ assertEquals("value4", results.get(0).value);
+ assertEquals("key2", results.get(1).key);
+ assertEquals("value2", results.get(1).value);
+ assertEquals("key3", results.get(2).key);
+ assertEquals("value3", results.get(2).value);
+
+ }
+
+ @Test
+ public void testPrefixScanPersistentStoreNoCachingNoLogging() {
+ final String storeName = "prefixScanStore";
+ final StoreBuilder<KeyValueStore<String, String>> storeBuilder =
+
Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore(storeName),
Serdes.String(), Serdes.String())
+ .withCachingDisabled()
+ .withLoggingDisabled();
+ topology
+ .addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER,
INPUT_TOPIC_1)
+ .addProcessor("processor1", defineWithStores(() -> new
StatefulProcessor(storeName), Collections.singleton(storeBuilder)), "source1")
+ .addSink("counts", OUTPUT_TOPIC_1, "processor1");
+
+ driver = new TopologyTestDriver(topology, props);
+
+ final TestInputTopic<String, String> inputTopic =
driver.createInputTopic(INPUT_TOPIC_1, STRING_SERIALIZER, STRING_SERIALIZER);
+ final TestOutputTopic<Integer, String> outputTopic1 =
+ driver.createOutputTopic(OUTPUT_TOPIC_1,
Serdes.Integer().deserializer(), Serdes.String().deserializer());
+
+ inputTopic.pipeInput("key1", "value1");
+ inputTopic.pipeInput("key2", "value2");
+ inputTopic.pipeInput("key3", "value3");
+ inputTopic.pipeInput("key1", "value4");
+ assertTrue(outputTopic1.isEmpty());
+
+ final KeyValueStore<String, String> store =
driver.getKeyValueStore("prefixScanStore");
+ final KeyValueIterator<String, String> prefixScan =
store.prefixScan("key", Serdes.String().serializer());
+ final List<KeyValue<String, String>> results = new ArrayList<>();
+ while (prefixScan.hasNext()) {
+ final KeyValue<String, String> next = prefixScan.next();
+ results.add(next);
+ }
+
+ assertEquals("key1", results.get(0).key);
+ assertEquals("value4", results.get(0).value);
+ assertEquals("key2", results.get(1).key);
+ assertEquals("value2", results.get(1).value);
+ assertEquals("key3", results.get(2).key);
+ assertEquals("value3", results.get(2).value);
+
+ }
+
+ @Test
+ public void testPrefixScanPersistentStoreWithCachingNoLogging() {
+ final String storeName = "prefixScanStore";
+ final StoreBuilder<KeyValueStore<String, String>> storeBuilder =
+
Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore(storeName),
Serdes.String(), Serdes.String())
+ .withCachingEnabled()
+ .withLoggingDisabled();
+ topology
+ .addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER,
INPUT_TOPIC_1)
+ .addProcessor("processor1", defineWithStores(() -> new
StatefulProcessor(storeName), Collections.singleton(storeBuilder)), "source1")
+ .addSink("counts", OUTPUT_TOPIC_1, "processor1");
+
+ driver = new TopologyTestDriver(topology, props);
+
+ final TestInputTopic<String, String> inputTopic =
driver.createInputTopic(INPUT_TOPIC_1, STRING_SERIALIZER, STRING_SERIALIZER);
+ final TestOutputTopic<Integer, String> outputTopic1 =
+ driver.createOutputTopic(OUTPUT_TOPIC_1,
Serdes.Integer().deserializer(), Serdes.String().deserializer());
+
+ inputTopic.pipeInput("key1", "value1");
+ inputTopic.pipeInput("key2", "value2");
+ inputTopic.pipeInput("key3", "value3");
+ inputTopic.pipeInput("key1", "value4");
+ assertTrue(outputTopic1.isEmpty());
+
+ final KeyValueStore<String, String> store =
driver.getKeyValueStore("prefixScanStore");
+ final KeyValueIterator<String, String> prefixScan =
store.prefixScan("key", Serdes.String().serializer());
+ final List<KeyValue<String, String>> results = new ArrayList<>();
+ while (prefixScan.hasNext()) {
+ final KeyValue<String, String> next = prefixScan.next();
+ results.add(next);
+ }
+
+ assertEquals("key1", results.get(0).key);
+ assertEquals("value4", results.get(0).value);
+ assertEquals("key2", results.get(1).key);
+ assertEquals("value2", results.get(1).value);
+ assertEquals("key3", results.get(2).key);
+ assertEquals("value3", results.get(2).value);
+
+ }
+
+ @Test
+ public void testPrefixScanPersistentStoreWithCachingWithLogging() {
+ final String storeName = "prefixScanStore";
+ final StoreBuilder<KeyValueStore<String, String>> storeBuilder =
+
Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore(storeName),
Serdes.String(), Serdes.String())
+ .withCachingEnabled()
+ .withLoggingEnabled(Collections.emptyMap());
+ topology
+ .addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER,
INPUT_TOPIC_1)
+ .addProcessor("processor1", defineWithStores(() -> new
StatefulProcessor(storeName), Collections.singleton(storeBuilder)), "source1")
+ .addSink("counts", OUTPUT_TOPIC_1, "processor1");
+
+ driver = new TopologyTestDriver(topology, props);
+
+ final TestInputTopic<String, String> inputTopic =
driver.createInputTopic(INPUT_TOPIC_1, STRING_SERIALIZER, STRING_SERIALIZER);
+ final TestOutputTopic<Integer, String> outputTopic1 =
+ driver.createOutputTopic(OUTPUT_TOPIC_1,
Serdes.Integer().deserializer(), Serdes.String().deserializer());
+
+ inputTopic.pipeInput("key1", "value1");
+ inputTopic.pipeInput("key2", "value2");
+ inputTopic.pipeInput("key3", "value3");
+ inputTopic.pipeInput("key1", "value4");
+ assertTrue(outputTopic1.isEmpty());
+
+ final KeyValueStore<String, String> store =
driver.getKeyValueStore("prefixScanStore");
+ final KeyValueIterator<String, String> prefixScan =
store.prefixScan("key", Serdes.String().serializer());
+ final List<KeyValue<String, String>> results = new ArrayList<>();
+ while (prefixScan.hasNext()) {
+ final KeyValue<String, String> next = prefixScan.next();
+ results.add(next);
+ }
+
+ assertEquals("key1", results.get(0).key);
+ assertEquals("value4", results.get(0).value);
+ assertEquals("key2", results.get(1).key);
+ assertEquals("value2", results.get(1).value);
+ assertEquals("key3", results.get(2).key);
+ assertEquals("value3", results.get(2).value);
+
+ }
+
+ @Test
+ public void testPrefixScanPersistentTimestampedStoreNoCachingNoLogging() {
Review comment:
I'm not sure this is really the right place to test the `prefixScan`
functionality for all of these different store types, this test class is really
more for making sure the topology itself is all wired up correctly. If you're
just trying to test a method on a specific store type, that generally makes
sense to do in the test class for that store itself. In other words you don't
need to have a separate test here for each underlying store type (eg
`PersistentTimestampedStore` or `LruMap`, etc), there are dedicated test
classes for that (like `RocksDBTimestampedStoreTest` or
`InMemoryLRUCacheStoreTest`)
That said, it sounds like the original bug report uncovered the missing
implementations "when accessing the state stores through the processor context"
-- which does sound like it could/would be reproduced through a test here.
Maybe you can just pick a store type and write a single test that reproduces
the issue when run without this patch, and I would consider that sufficient for
this.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]