ableegoldman commented on a change in pull request #10877:
URL: https://github.com/apache/kafka/pull/10877#discussion_r670001461



##########
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
##########
@@ -845,6 +845,64 @@ public void shouldBeAbleToQueryMapValuesState() throws 
Exception {
         for (final KeyValue<String, String> batchEntry : batch1) {
             assertEquals(Long.valueOf(batchEntry.value), 
myMapStore.get(batchEntry.key));
         }
+
+        final KeyValueIterator<String, Long> range = myMapStore.range("hello", 
"kafka");
+        while (range.hasNext()) {
+            System.out.println(range.next());
+        }
+    }
+
+    @Test
+    public void shouldBeAbleToQueryKeysWithGivenPrefix() throws Exception {
+        streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
Serdes.String().getClass());
+        
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
Serdes.String().getClass());
+        final StreamsBuilder builder = new StreamsBuilder();
+        final String[] keys = {"hello", "goodbye", "welcome", "go", "kafka"};
+        final Set<KeyValue<String, String>> batch1 = new HashSet<>(
+            Arrays.asList(
+                new KeyValue<>(keys[0], "1"),
+                new KeyValue<>(keys[1], "1"),
+                new KeyValue<>(keys[2], "3"),
+                new KeyValue<>(keys[3], "5"),
+                new KeyValue<>(keys[4], "2"))
+        );
+
+        final List<KeyValue<String, Long>> expectedPrefixScanResult = 
Arrays.asList(
+            new KeyValue<>(keys[3], 5L),
+            new KeyValue<>(keys[1], 1L)
+        );
+
+        IntegrationTestUtils.produceKeyValuesSynchronously(
+            streamOne,
+            batch1,
+            TestUtils.producerConfig(
+                CLUSTER.bootstrapServers(),
+                StringSerializer.class,
+                StringSerializer.class,
+                new Properties()),
+            mockTime);
+
+        final KTable<String, String> t1 = builder.table(streamOne);
+        t1
+            .mapValues(
+                (ValueMapper<String, Long>) Long::valueOf,
+                Materialized.<String, Long, KeyValueStore<Bytes, 
byte[]>>as("queryMapValues").withValueSerde(Serdes.Long()))
+            .toStream()
+            .to(outputTopic, Produced.with(Serdes.String(), Serdes.Long()));
+
+        kafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration);
+        startKafkaStreamsAndWaitForRunningState(kafkaStreams);
+
+        waitUntilAtLeastNumRecordProcessed(outputTopic, 5);
+
+        final ReadOnlyKeyValueStore<String, Long> myMapStore =
+            IntegrationTestUtils.getStore("queryMapValues", kafkaStreams, 
keyValueStore());
+
+        int index = 0;
+        final KeyValueIterator<String, Long> range = 
myMapStore.prefixScan("go", Serdes.String().serializer());
+        while (range.hasNext()) {

Review comment:
       I know this is just how the other tests are doing it, but it's not 
really an airtight way to validate the expected results...if nothing is 
returned then we never enter the `while` loop and the test passes, even if we 
did in fact expect there to be actual output.
   
   The important thing here was just to make sure it didn't throw an exception 
so it still does that, but it would be good to fix this up maybe in a followup 
PR

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryNavigableLRUCache.java
##########
@@ -66,6 +68,20 @@ public MemoryNavigableLRUCache(final String name, final int 
maxCacheSize) {
                 .subMap(from, true, to, true).descendingKeySet().iterator(), 
treeMap));
     }
 
+    @Override
+    public <PS extends Serializer<P>, P> KeyValueIterator<Bytes, byte[]> 
prefixScan(final P prefix, final PS prefixKeySerializer) {

Review comment:
       1) Could you just treat them as Bytes all the same, and just convert 
to/from an Integer before putting/getting them from the store? That way you're 
still just handling Bytes like you are in this test, it just goes through an 
extra layer of de/serialization. Should be able to more or less copy over the 
existing tests with just a bit of extra code. Can you try this, in a followup 
PR?
   2) Yes, I was just suggesting to merge them as a possible way to make things 
easier and do less work, if it's going to be more then please do file a 
separate ticket for it.

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
##########
@@ -383,6 +387,1002 @@ public void 
testDrivingConnectedStateStoreInDifferentProcessorsTopology() {
         assertNull(store.get("key4"));
     }
 
+    @Test
+    public void testPrefixScanInMemoryStoreNoCachingNoLogging() {
+        final String storeName = "prefixScanStore";
+        final StoreBuilder<KeyValueStore<String, String>> storeBuilder =
+            
Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore(storeName), 
Serdes.String(), Serdes.String())
+                .withCachingDisabled()
+                .withLoggingDisabled();
+        topology
+            .addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER, 
INPUT_TOPIC_1)
+            .addProcessor("processor1", defineWithStores(() -> new 
StatefulProcessor(storeName), Collections.singleton(storeBuilder)), "source1")
+            .addSink("counts", OUTPUT_TOPIC_1, "processor1");
+
+        driver = new TopologyTestDriver(topology, props);
+
+        final TestInputTopic<String, String> inputTopic = 
driver.createInputTopic(INPUT_TOPIC_1, STRING_SERIALIZER, STRING_SERIALIZER);
+        final TestOutputTopic<Integer, String> outputTopic1 =
+            driver.createOutputTopic(OUTPUT_TOPIC_1, 
Serdes.Integer().deserializer(), Serdes.String().deserializer());
+
+        inputTopic.pipeInput("key1", "value1");
+        inputTopic.pipeInput("key2", "value2");
+        inputTopic.pipeInput("key3", "value3");
+        inputTopic.pipeInput("key1", "value4");
+        assertTrue(outputTopic1.isEmpty());
+
+        final KeyValueStore<String, String> store = 
driver.getKeyValueStore("prefixScanStore");
+        final KeyValueIterator<String, String> prefixScan = 
store.prefixScan("key", Serdes.String().serializer());
+        final List<KeyValue<String, String>> results = new ArrayList<>();
+        while (prefixScan.hasNext()) {
+            final KeyValue<String, String> next = prefixScan.next();
+            results.add(next);
+        }
+
+        assertEquals("key1", results.get(0).key);
+        assertEquals("value4", results.get(0).value);
+        assertEquals("key2", results.get(1).key);
+        assertEquals("value2", results.get(1).value);
+        assertEquals("key3", results.get(2).key);
+        assertEquals("value3", results.get(2).value);
+
+    }
+
+    @Test
+    public void testPrefixScanInMemoryStoreWithCachingNoLogging() {
+        final String storeName = "prefixScanStore";
+        final StoreBuilder<KeyValueStore<String, String>> storeBuilder =
+            
Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore(storeName), 
Serdes.String(), Serdes.String())
+                .withCachingEnabled()
+                .withLoggingDisabled();
+        topology
+            .addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER, 
INPUT_TOPIC_1)
+            .addProcessor("processor1", defineWithStores(() -> new 
StatefulProcessor(storeName), Collections.singleton(storeBuilder)), "source1")
+            .addSink("counts", OUTPUT_TOPIC_1, "processor1");
+
+        driver = new TopologyTestDriver(topology, props);
+
+        final TestInputTopic<String, String> inputTopic = 
driver.createInputTopic(INPUT_TOPIC_1, STRING_SERIALIZER, STRING_SERIALIZER);
+        final TestOutputTopic<Integer, String> outputTopic1 =
+            driver.createOutputTopic(OUTPUT_TOPIC_1, 
Serdes.Integer().deserializer(), Serdes.String().deserializer());
+
+        inputTopic.pipeInput("key1", "value1");
+        inputTopic.pipeInput("key2", "value2");
+        inputTopic.pipeInput("key3", "value3");
+        inputTopic.pipeInput("key1", "value4");
+        assertTrue(outputTopic1.isEmpty());
+
+        final KeyValueStore<String, String> store = 
driver.getKeyValueStore("prefixScanStore");
+        final KeyValueIterator<String, String> prefixScan = 
store.prefixScan("key", Serdes.String().serializer());
+        final List<KeyValue<String, String>> results = new ArrayList<>();
+        while (prefixScan.hasNext()) {
+            final KeyValue<String, String> next = prefixScan.next();
+            results.add(next);
+        }
+
+        assertEquals("key1", results.get(0).key);
+        assertEquals("value4", results.get(0).value);
+        assertEquals("key2", results.get(1).key);
+        assertEquals("value2", results.get(1).value);
+        assertEquals("key3", results.get(2).key);
+        assertEquals("value3", results.get(2).value);
+
+    }
+
+    @Test
+    public void testPrefixScanInMemoryStoreWithCachingWithLogging() {
+        final String storeName = "prefixScanStore";
+        final StoreBuilder<KeyValueStore<String, String>> storeBuilder =
+            
Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore(storeName), 
Serdes.String(), Serdes.String())
+                .withCachingEnabled()
+                .withLoggingEnabled(Collections.emptyMap());
+        topology
+            .addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER, 
INPUT_TOPIC_1)
+            .addProcessor("processor1", defineWithStores(() -> new 
StatefulProcessor(storeName), Collections.singleton(storeBuilder)), "source1")
+            .addSink("counts", OUTPUT_TOPIC_1, "processor1");
+
+        driver = new TopologyTestDriver(topology, props);
+
+        final TestInputTopic<String, String> inputTopic = 
driver.createInputTopic(INPUT_TOPIC_1, STRING_SERIALIZER, STRING_SERIALIZER);
+        final TestOutputTopic<Integer, String> outputTopic1 =
+            driver.createOutputTopic(OUTPUT_TOPIC_1, 
Serdes.Integer().deserializer(), Serdes.String().deserializer());
+
+        inputTopic.pipeInput("key1", "value1");
+        inputTopic.pipeInput("key2", "value2");
+        inputTopic.pipeInput("key3", "value3");
+        inputTopic.pipeInput("key1", "value4");
+        assertTrue(outputTopic1.isEmpty());
+
+        final KeyValueStore<String, String> store = 
driver.getKeyValueStore("prefixScanStore");
+        final KeyValueIterator<String, String> prefixScan = 
store.prefixScan("key", Serdes.String().serializer());
+        final List<KeyValue<String, String>> results = new ArrayList<>();
+        while (prefixScan.hasNext()) {
+            final KeyValue<String, String> next = prefixScan.next();
+            results.add(next);
+        }
+
+        assertEquals("key1", results.get(0).key);
+        assertEquals("value4", results.get(0).value);
+        assertEquals("key2", results.get(1).key);
+        assertEquals("value2", results.get(1).value);
+        assertEquals("key3", results.get(2).key);
+        assertEquals("value3", results.get(2).value);
+
+    }
+
+    @Test
+    public void testPrefixScanPersistentStoreNoCachingNoLogging() {
+        final String storeName = "prefixScanStore";
+        final StoreBuilder<KeyValueStore<String, String>> storeBuilder =
+            
Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore(storeName), 
Serdes.String(), Serdes.String())
+                .withCachingDisabled()
+                .withLoggingDisabled();
+        topology
+            .addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER, 
INPUT_TOPIC_1)
+            .addProcessor("processor1", defineWithStores(() -> new 
StatefulProcessor(storeName), Collections.singleton(storeBuilder)), "source1")
+            .addSink("counts", OUTPUT_TOPIC_1, "processor1");
+
+        driver = new TopologyTestDriver(topology, props);
+
+        final TestInputTopic<String, String> inputTopic = 
driver.createInputTopic(INPUT_TOPIC_1, STRING_SERIALIZER, STRING_SERIALIZER);
+        final TestOutputTopic<Integer, String> outputTopic1 =
+            driver.createOutputTopic(OUTPUT_TOPIC_1, 
Serdes.Integer().deserializer(), Serdes.String().deserializer());
+
+        inputTopic.pipeInput("key1", "value1");
+        inputTopic.pipeInput("key2", "value2");
+        inputTopic.pipeInput("key3", "value3");
+        inputTopic.pipeInput("key1", "value4");
+        assertTrue(outputTopic1.isEmpty());
+
+        final KeyValueStore<String, String> store = 
driver.getKeyValueStore("prefixScanStore");
+        final KeyValueIterator<String, String> prefixScan = 
store.prefixScan("key", Serdes.String().serializer());
+        final List<KeyValue<String, String>> results = new ArrayList<>();
+        while (prefixScan.hasNext()) {
+            final KeyValue<String, String> next = prefixScan.next();
+            results.add(next);
+        }
+
+        assertEquals("key1", results.get(0).key);
+        assertEquals("value4", results.get(0).value);
+        assertEquals("key2", results.get(1).key);
+        assertEquals("value2", results.get(1).value);
+        assertEquals("key3", results.get(2).key);
+        assertEquals("value3", results.get(2).value);
+
+    }
+
+    @Test
+    public void testPrefixScanPersistentStoreWithCachingNoLogging() {
+        final String storeName = "prefixScanStore";
+        final StoreBuilder<KeyValueStore<String, String>> storeBuilder =
+            
Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore(storeName), 
Serdes.String(), Serdes.String())
+                .withCachingEnabled()
+                .withLoggingDisabled();
+        topology
+            .addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER, 
INPUT_TOPIC_1)
+            .addProcessor("processor1", defineWithStores(() -> new 
StatefulProcessor(storeName), Collections.singleton(storeBuilder)), "source1")
+            .addSink("counts", OUTPUT_TOPIC_1, "processor1");
+
+        driver = new TopologyTestDriver(topology, props);
+
+        final TestInputTopic<String, String> inputTopic = 
driver.createInputTopic(INPUT_TOPIC_1, STRING_SERIALIZER, STRING_SERIALIZER);
+        final TestOutputTopic<Integer, String> outputTopic1 =
+            driver.createOutputTopic(OUTPUT_TOPIC_1, 
Serdes.Integer().deserializer(), Serdes.String().deserializer());
+
+        inputTopic.pipeInput("key1", "value1");
+        inputTopic.pipeInput("key2", "value2");
+        inputTopic.pipeInput("key3", "value3");
+        inputTopic.pipeInput("key1", "value4");
+        assertTrue(outputTopic1.isEmpty());
+
+        final KeyValueStore<String, String> store = 
driver.getKeyValueStore("prefixScanStore");
+        final KeyValueIterator<String, String> prefixScan = 
store.prefixScan("key", Serdes.String().serializer());
+        final List<KeyValue<String, String>> results = new ArrayList<>();
+        while (prefixScan.hasNext()) {
+            final KeyValue<String, String> next = prefixScan.next();
+            results.add(next);
+        }
+
+        assertEquals("key1", results.get(0).key);
+        assertEquals("value4", results.get(0).value);
+        assertEquals("key2", results.get(1).key);
+        assertEquals("value2", results.get(1).value);
+        assertEquals("key3", results.get(2).key);
+        assertEquals("value3", results.get(2).value);
+
+    }
+
+    @Test
+    public void testPrefixScanPersistentStoreWithCachingWithLogging() {
+        final String storeName = "prefixScanStore";
+        final StoreBuilder<KeyValueStore<String, String>> storeBuilder =
+            
Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore(storeName), 
Serdes.String(), Serdes.String())
+                .withCachingEnabled()
+                .withLoggingEnabled(Collections.emptyMap());
+        topology
+            .addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER, 
INPUT_TOPIC_1)
+            .addProcessor("processor1", defineWithStores(() -> new 
StatefulProcessor(storeName), Collections.singleton(storeBuilder)), "source1")
+            .addSink("counts", OUTPUT_TOPIC_1, "processor1");
+
+        driver = new TopologyTestDriver(topology, props);
+
+        final TestInputTopic<String, String> inputTopic = 
driver.createInputTopic(INPUT_TOPIC_1, STRING_SERIALIZER, STRING_SERIALIZER);
+        final TestOutputTopic<Integer, String> outputTopic1 =
+            driver.createOutputTopic(OUTPUT_TOPIC_1, 
Serdes.Integer().deserializer(), Serdes.String().deserializer());
+
+        inputTopic.pipeInput("key1", "value1");
+        inputTopic.pipeInput("key2", "value2");
+        inputTopic.pipeInput("key3", "value3");
+        inputTopic.pipeInput("key1", "value4");
+        assertTrue(outputTopic1.isEmpty());
+
+        final KeyValueStore<String, String> store = 
driver.getKeyValueStore("prefixScanStore");
+        final KeyValueIterator<String, String> prefixScan = 
store.prefixScan("key", Serdes.String().serializer());
+        final List<KeyValue<String, String>> results = new ArrayList<>();
+        while (prefixScan.hasNext()) {
+            final KeyValue<String, String> next = prefixScan.next();
+            results.add(next);
+        }
+
+        assertEquals("key1", results.get(0).key);
+        assertEquals("value4", results.get(0).value);
+        assertEquals("key2", results.get(1).key);
+        assertEquals("value2", results.get(1).value);
+        assertEquals("key3", results.get(2).key);
+        assertEquals("value3", results.get(2).value);
+
+    }
+
+    @Test
+    public void testPrefixScanPersistentTimestampedStoreNoCachingNoLogging() {

Review comment:
       Ah, I see, I thought it was trying to test the underlying `prefixScan` 
functionality itself, but what Matthias said makes sense. They do still feel 
very out of place here, but I'm not sure I can think of a better place to put 
them. Maybe just create a new file and put them in a separate test class of 
their own? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to