ableegoldman commented on a change in pull request #10877:
URL: https://github.com/apache/kafka/pull/10877#discussion_r668417652



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryNavigableLRUCache.java
##########
@@ -66,6 +68,20 @@ public MemoryNavigableLRUCache(final String name, final int 
maxCacheSize) {
                 .subMap(from, true, to, true).descendingKeySet().iterator(), 
treeMap));
     }
 
+    @Override
+    public <PS extends Serializer<P>, P> KeyValueIterator<Bytes, byte[]> 
prefixScan(final P prefix, final PS prefixKeySerializer) {

Review comment:
       Can you add a test or two for this in `InMemoryLRUCacheStoreTest`? 
Alternatively, I think you should be able to just collect all the tests in 
`InMemoryKeyValueStoreTest` and `CachingInMemoryKeyValueStoreTest` and move 
them over to
   `AbstractKeyValueStoreTest` instead. That way you get test coverage for both 
of those plus a handful of other store classes at once, without having to copy 
the same test over and over across a bunch of different files.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
##########
@@ -26,13 +26,15 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Iterator;
-import java.util.List;
 import java.util.NavigableMap;
-import java.util.Set;
 import java.util.TreeMap;
+import java.util.List;

Review comment:
       Can you revert the changes in this file?

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
##########
@@ -383,6 +387,1002 @@ public void 
testDrivingConnectedStateStoreInDifferentProcessorsTopology() {
         assertNull(store.get("key4"));
     }
 
+    @Test
+    public void testPrefixScanInMemoryStoreNoCachingNoLogging() {
+        final String storeName = "prefixScanStore";
+        final StoreBuilder<KeyValueStore<String, String>> storeBuilder =
+            
Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore(storeName), 
Serdes.String(), Serdes.String())
+                .withCachingDisabled()
+                .withLoggingDisabled();
+        topology
+            .addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER, 
INPUT_TOPIC_1)
+            .addProcessor("processor1", defineWithStores(() -> new 
StatefulProcessor(storeName), Collections.singleton(storeBuilder)), "source1")
+            .addSink("counts", OUTPUT_TOPIC_1, "processor1");
+
+        driver = new TopologyTestDriver(topology, props);
+
+        final TestInputTopic<String, String> inputTopic = 
driver.createInputTopic(INPUT_TOPIC_1, STRING_SERIALIZER, STRING_SERIALIZER);
+        final TestOutputTopic<Integer, String> outputTopic1 =
+            driver.createOutputTopic(OUTPUT_TOPIC_1, 
Serdes.Integer().deserializer(), Serdes.String().deserializer());
+
+        inputTopic.pipeInput("key1", "value1");
+        inputTopic.pipeInput("key2", "value2");
+        inputTopic.pipeInput("key3", "value3");
+        inputTopic.pipeInput("key1", "value4");
+        assertTrue(outputTopic1.isEmpty());
+
+        final KeyValueStore<String, String> store = 
driver.getKeyValueStore("prefixScanStore");
+        final KeyValueIterator<String, String> prefixScan = 
store.prefixScan("key", Serdes.String().serializer());
+        final List<KeyValue<String, String>> results = new ArrayList<>();
+        while (prefixScan.hasNext()) {
+            final KeyValue<String, String> next = prefixScan.next();
+            results.add(next);
+        }
+
+        assertEquals("key1", results.get(0).key);
+        assertEquals("value4", results.get(0).value);
+        assertEquals("key2", results.get(1).key);
+        assertEquals("value2", results.get(1).value);
+        assertEquals("key3", results.get(2).key);
+        assertEquals("value3", results.get(2).value);
+
+    }
+
+    @Test
+    public void testPrefixScanInMemoryStoreWithCachingNoLogging() {
+        final String storeName = "prefixScanStore";
+        final StoreBuilder<KeyValueStore<String, String>> storeBuilder =
+            
Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore(storeName), 
Serdes.String(), Serdes.String())
+                .withCachingEnabled()
+                .withLoggingDisabled();
+        topology
+            .addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER, 
INPUT_TOPIC_1)
+            .addProcessor("processor1", defineWithStores(() -> new 
StatefulProcessor(storeName), Collections.singleton(storeBuilder)), "source1")
+            .addSink("counts", OUTPUT_TOPIC_1, "processor1");
+
+        driver = new TopologyTestDriver(topology, props);
+
+        final TestInputTopic<String, String> inputTopic = 
driver.createInputTopic(INPUT_TOPIC_1, STRING_SERIALIZER, STRING_SERIALIZER);
+        final TestOutputTopic<Integer, String> outputTopic1 =
+            driver.createOutputTopic(OUTPUT_TOPIC_1, 
Serdes.Integer().deserializer(), Serdes.String().deserializer());
+
+        inputTopic.pipeInput("key1", "value1");
+        inputTopic.pipeInput("key2", "value2");
+        inputTopic.pipeInput("key3", "value3");
+        inputTopic.pipeInput("key1", "value4");
+        assertTrue(outputTopic1.isEmpty());
+
+        final KeyValueStore<String, String> store = 
driver.getKeyValueStore("prefixScanStore");
+        final KeyValueIterator<String, String> prefixScan = 
store.prefixScan("key", Serdes.String().serializer());
+        final List<KeyValue<String, String>> results = new ArrayList<>();
+        while (prefixScan.hasNext()) {
+            final KeyValue<String, String> next = prefixScan.next();
+            results.add(next);
+        }
+
+        assertEquals("key1", results.get(0).key);
+        assertEquals("value4", results.get(0).value);
+        assertEquals("key2", results.get(1).key);
+        assertEquals("value2", results.get(1).value);
+        assertEquals("key3", results.get(2).key);
+        assertEquals("value3", results.get(2).value);
+
+    }
+
+    @Test
+    public void testPrefixScanInMemoryStoreWithCachingWithLogging() {
+        final String storeName = "prefixScanStore";
+        final StoreBuilder<KeyValueStore<String, String>> storeBuilder =
+            
Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore(storeName), 
Serdes.String(), Serdes.String())
+                .withCachingEnabled()
+                .withLoggingEnabled(Collections.emptyMap());
+        topology
+            .addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER, 
INPUT_TOPIC_1)
+            .addProcessor("processor1", defineWithStores(() -> new 
StatefulProcessor(storeName), Collections.singleton(storeBuilder)), "source1")
+            .addSink("counts", OUTPUT_TOPIC_1, "processor1");
+
+        driver = new TopologyTestDriver(topology, props);
+
+        final TestInputTopic<String, String> inputTopic = 
driver.createInputTopic(INPUT_TOPIC_1, STRING_SERIALIZER, STRING_SERIALIZER);
+        final TestOutputTopic<Integer, String> outputTopic1 =
+            driver.createOutputTopic(OUTPUT_TOPIC_1, 
Serdes.Integer().deserializer(), Serdes.String().deserializer());
+
+        inputTopic.pipeInput("key1", "value1");
+        inputTopic.pipeInput("key2", "value2");
+        inputTopic.pipeInput("key3", "value3");
+        inputTopic.pipeInput("key1", "value4");
+        assertTrue(outputTopic1.isEmpty());
+
+        final KeyValueStore<String, String> store = 
driver.getKeyValueStore("prefixScanStore");
+        final KeyValueIterator<String, String> prefixScan = 
store.prefixScan("key", Serdes.String().serializer());
+        final List<KeyValue<String, String>> results = new ArrayList<>();
+        while (prefixScan.hasNext()) {
+            final KeyValue<String, String> next = prefixScan.next();
+            results.add(next);
+        }
+
+        assertEquals("key1", results.get(0).key);
+        assertEquals("value4", results.get(0).value);
+        assertEquals("key2", results.get(1).key);
+        assertEquals("value2", results.get(1).value);
+        assertEquals("key3", results.get(2).key);
+        assertEquals("value3", results.get(2).value);
+
+    }
+
+    @Test
+    public void testPrefixScanPersistentStoreNoCachingNoLogging() {
+        final String storeName = "prefixScanStore";
+        final StoreBuilder<KeyValueStore<String, String>> storeBuilder =
+            
Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore(storeName), 
Serdes.String(), Serdes.String())
+                .withCachingDisabled()
+                .withLoggingDisabled();
+        topology
+            .addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER, 
INPUT_TOPIC_1)
+            .addProcessor("processor1", defineWithStores(() -> new 
StatefulProcessor(storeName), Collections.singleton(storeBuilder)), "source1")
+            .addSink("counts", OUTPUT_TOPIC_1, "processor1");
+
+        driver = new TopologyTestDriver(topology, props);
+
+        final TestInputTopic<String, String> inputTopic = 
driver.createInputTopic(INPUT_TOPIC_1, STRING_SERIALIZER, STRING_SERIALIZER);
+        final TestOutputTopic<Integer, String> outputTopic1 =
+            driver.createOutputTopic(OUTPUT_TOPIC_1, 
Serdes.Integer().deserializer(), Serdes.String().deserializer());
+
+        inputTopic.pipeInput("key1", "value1");
+        inputTopic.pipeInput("key2", "value2");
+        inputTopic.pipeInput("key3", "value3");
+        inputTopic.pipeInput("key1", "value4");
+        assertTrue(outputTopic1.isEmpty());
+
+        final KeyValueStore<String, String> store = 
driver.getKeyValueStore("prefixScanStore");
+        final KeyValueIterator<String, String> prefixScan = 
store.prefixScan("key", Serdes.String().serializer());
+        final List<KeyValue<String, String>> results = new ArrayList<>();
+        while (prefixScan.hasNext()) {
+            final KeyValue<String, String> next = prefixScan.next();
+            results.add(next);
+        }
+
+        assertEquals("key1", results.get(0).key);
+        assertEquals("value4", results.get(0).value);
+        assertEquals("key2", results.get(1).key);
+        assertEquals("value2", results.get(1).value);
+        assertEquals("key3", results.get(2).key);
+        assertEquals("value3", results.get(2).value);
+
+    }
+
+    @Test
+    public void testPrefixScanPersistentStoreWithCachingNoLogging() {
+        final String storeName = "prefixScanStore";
+        final StoreBuilder<KeyValueStore<String, String>> storeBuilder =
+            
Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore(storeName), 
Serdes.String(), Serdes.String())
+                .withCachingEnabled()
+                .withLoggingDisabled();
+        topology
+            .addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER, 
INPUT_TOPIC_1)
+            .addProcessor("processor1", defineWithStores(() -> new 
StatefulProcessor(storeName), Collections.singleton(storeBuilder)), "source1")
+            .addSink("counts", OUTPUT_TOPIC_1, "processor1");
+
+        driver = new TopologyTestDriver(topology, props);
+
+        final TestInputTopic<String, String> inputTopic = 
driver.createInputTopic(INPUT_TOPIC_1, STRING_SERIALIZER, STRING_SERIALIZER);
+        final TestOutputTopic<Integer, String> outputTopic1 =
+            driver.createOutputTopic(OUTPUT_TOPIC_1, 
Serdes.Integer().deserializer(), Serdes.String().deserializer());
+
+        inputTopic.pipeInput("key1", "value1");
+        inputTopic.pipeInput("key2", "value2");
+        inputTopic.pipeInput("key3", "value3");
+        inputTopic.pipeInput("key1", "value4");
+        assertTrue(outputTopic1.isEmpty());
+
+        final KeyValueStore<String, String> store = 
driver.getKeyValueStore("prefixScanStore");
+        final KeyValueIterator<String, String> prefixScan = 
store.prefixScan("key", Serdes.String().serializer());
+        final List<KeyValue<String, String>> results = new ArrayList<>();
+        while (prefixScan.hasNext()) {
+            final KeyValue<String, String> next = prefixScan.next();
+            results.add(next);
+        }
+
+        assertEquals("key1", results.get(0).key);
+        assertEquals("value4", results.get(0).value);
+        assertEquals("key2", results.get(1).key);
+        assertEquals("value2", results.get(1).value);
+        assertEquals("key3", results.get(2).key);
+        assertEquals("value3", results.get(2).value);
+
+    }
+
+    @Test
+    public void testPrefixScanPersistentStoreWithCachingWithLogging() {
+        final String storeName = "prefixScanStore";
+        final StoreBuilder<KeyValueStore<String, String>> storeBuilder =
+            
Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore(storeName), 
Serdes.String(), Serdes.String())
+                .withCachingEnabled()
+                .withLoggingEnabled(Collections.emptyMap());
+        topology
+            .addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER, 
INPUT_TOPIC_1)
+            .addProcessor("processor1", defineWithStores(() -> new 
StatefulProcessor(storeName), Collections.singleton(storeBuilder)), "source1")
+            .addSink("counts", OUTPUT_TOPIC_1, "processor1");
+
+        driver = new TopologyTestDriver(topology, props);
+
+        final TestInputTopic<String, String> inputTopic = 
driver.createInputTopic(INPUT_TOPIC_1, STRING_SERIALIZER, STRING_SERIALIZER);
+        final TestOutputTopic<Integer, String> outputTopic1 =
+            driver.createOutputTopic(OUTPUT_TOPIC_1, 
Serdes.Integer().deserializer(), Serdes.String().deserializer());
+
+        inputTopic.pipeInput("key1", "value1");
+        inputTopic.pipeInput("key2", "value2");
+        inputTopic.pipeInput("key3", "value3");
+        inputTopic.pipeInput("key1", "value4");
+        assertTrue(outputTopic1.isEmpty());
+
+        final KeyValueStore<String, String> store = 
driver.getKeyValueStore("prefixScanStore");
+        final KeyValueIterator<String, String> prefixScan = 
store.prefixScan("key", Serdes.String().serializer());
+        final List<KeyValue<String, String>> results = new ArrayList<>();
+        while (prefixScan.hasNext()) {
+            final KeyValue<String, String> next = prefixScan.next();
+            results.add(next);
+        }
+
+        assertEquals("key1", results.get(0).key);
+        assertEquals("value4", results.get(0).value);
+        assertEquals("key2", results.get(1).key);
+        assertEquals("value2", results.get(1).value);
+        assertEquals("key3", results.get(2).key);
+        assertEquals("value3", results.get(2).value);
+
+    }
+
+    @Test
+    public void testPrefixScanPersistentTimestampedStoreNoCachingNoLogging() {

Review comment:
       I'm not sure this is really the right place to test the `prefixScan` 
functionality for all of these different store types, this test class is really 
more for making sure the topology itself is all wired up correctly. If you're 
just trying to test a method on a specific store type, that generally makes 
sense to do in the test class for that store itself. In other words you don't 
need to have a separate test here for each underlying store type (eg 
`PersistentTimestampedStore` or `LruMap`, etc), there are dedicated test 
classes for that (like `RocksDBTimestampedStoreTest` or 
`InMemoryLRUCacheStoreTest`)
   
   That said, it sounds like the original bug report uncovered the missing 
implementations "when accessing the state stores through the processor context" 
-- which does sound like it could/would be reproduced through a test here. 
Maybe you can just pick a store type and write a single test that reproduces 
the issue when run without this patch, and I would consider that sufficient for 
this.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to