ableegoldman commented on a change in pull request #10877: URL: https://github.com/apache/kafka/pull/10877#discussion_r670001461
########## File path: streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java ########## @@ -845,6 +845,64 @@ public void shouldBeAbleToQueryMapValuesState() throws Exception { for (final KeyValue<String, String> batchEntry : batch1) { assertEquals(Long.valueOf(batchEntry.value), myMapStore.get(batchEntry.key)); } + + final KeyValueIterator<String, Long> range = myMapStore.range("hello", "kafka"); + while (range.hasNext()) { + System.out.println(range.next()); + } + } + + @Test + public void shouldBeAbleToQueryKeysWithGivenPrefix() throws Exception { + streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); + streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); + final StreamsBuilder builder = new StreamsBuilder(); + final String[] keys = {"hello", "goodbye", "welcome", "go", "kafka"}; + final Set<KeyValue<String, String>> batch1 = new HashSet<>( + Arrays.asList( + new KeyValue<>(keys[0], "1"), + new KeyValue<>(keys[1], "1"), + new KeyValue<>(keys[2], "3"), + new KeyValue<>(keys[3], "5"), + new KeyValue<>(keys[4], "2")) + ); + + final List<KeyValue<String, Long>> expectedPrefixScanResult = Arrays.asList( + new KeyValue<>(keys[3], 5L), + new KeyValue<>(keys[1], 1L) + ); + + IntegrationTestUtils.produceKeyValuesSynchronously( + streamOne, + batch1, + TestUtils.producerConfig( + CLUSTER.bootstrapServers(), + StringSerializer.class, + StringSerializer.class, + new Properties()), + mockTime); + + final KTable<String, String> t1 = builder.table(streamOne); + t1 + .mapValues( + (ValueMapper<String, Long>) Long::valueOf, + Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("queryMapValues").withValueSerde(Serdes.Long())) + .toStream() + .to(outputTopic, Produced.with(Serdes.String(), Serdes.Long())); + + kafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration); + startKafkaStreamsAndWaitForRunningState(kafkaStreams); + + waitUntilAtLeastNumRecordProcessed(outputTopic, 5); + + final ReadOnlyKeyValueStore<String, Long> myMapStore = + IntegrationTestUtils.getStore("queryMapValues", kafkaStreams, keyValueStore()); + + int index = 0; + final KeyValueIterator<String, Long> range = myMapStore.prefixScan("go", Serdes.String().serializer()); + while (range.hasNext()) { Review comment: I know this is just how the other tests are doing it, but it's not really an airtight way to validate the expected results...if nothing is returned then we never enter the `while` loop and the test passes, even if we did in fact expect there to be actual output. The important thing here was just to make sure it didn't throw an exception so it still does that, but it would be good to fix this up maybe in a followup PR ########## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryNavigableLRUCache.java ########## @@ -66,6 +68,20 @@ public MemoryNavigableLRUCache(final String name, final int maxCacheSize) { .subMap(from, true, to, true).descendingKeySet().iterator(), treeMap)); } + @Override + public <PS extends Serializer<P>, P> KeyValueIterator<Bytes, byte[]> prefixScan(final P prefix, final PS prefixKeySerializer) { Review comment: 1) Could you just treat them as Bytes all the same, and just convert to/from an Integer before putting/getting them from the store? That way you're still just handling Bytes like you are in this test, it just goes through an extra layer of de/serialization. Should be able to more or less copy over the existing tests with just a bit of extra code. Can you try this, in a followup PR? 2) Yes, I was just suggesting to merge them as a possible way to make things easier and do less work, if it's going to be more then please do file a separate ticket for it. ########## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java ########## @@ -383,6 +387,1002 @@ public void testDrivingConnectedStateStoreInDifferentProcessorsTopology() { assertNull(store.get("key4")); } + @Test + public void testPrefixScanInMemoryStoreNoCachingNoLogging() { + final String storeName = "prefixScanStore"; + final StoreBuilder<KeyValueStore<String, String>> storeBuilder = + Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore(storeName), Serdes.String(), Serdes.String()) + .withCachingDisabled() + .withLoggingDisabled(); + topology + .addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC_1) + .addProcessor("processor1", defineWithStores(() -> new StatefulProcessor(storeName), Collections.singleton(storeBuilder)), "source1") + .addSink("counts", OUTPUT_TOPIC_1, "processor1"); + + driver = new TopologyTestDriver(topology, props); + + final TestInputTopic<String, String> inputTopic = driver.createInputTopic(INPUT_TOPIC_1, STRING_SERIALIZER, STRING_SERIALIZER); + final TestOutputTopic<Integer, String> outputTopic1 = + driver.createOutputTopic(OUTPUT_TOPIC_1, Serdes.Integer().deserializer(), Serdes.String().deserializer()); + + inputTopic.pipeInput("key1", "value1"); + inputTopic.pipeInput("key2", "value2"); + inputTopic.pipeInput("key3", "value3"); + inputTopic.pipeInput("key1", "value4"); + assertTrue(outputTopic1.isEmpty()); + + final KeyValueStore<String, String> store = driver.getKeyValueStore("prefixScanStore"); + final KeyValueIterator<String, String> prefixScan = store.prefixScan("key", Serdes.String().serializer()); + final List<KeyValue<String, String>> results = new ArrayList<>(); + while (prefixScan.hasNext()) { + final KeyValue<String, String> next = prefixScan.next(); + results.add(next); + } + + assertEquals("key1", results.get(0).key); + assertEquals("value4", results.get(0).value); + assertEquals("key2", results.get(1).key); + assertEquals("value2", results.get(1).value); + assertEquals("key3", results.get(2).key); + assertEquals("value3", results.get(2).value); + + } + + @Test + public void testPrefixScanInMemoryStoreWithCachingNoLogging() { + final String storeName = "prefixScanStore"; + final StoreBuilder<KeyValueStore<String, String>> storeBuilder = + Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore(storeName), Serdes.String(), Serdes.String()) + .withCachingEnabled() + .withLoggingDisabled(); + topology + .addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC_1) + .addProcessor("processor1", defineWithStores(() -> new StatefulProcessor(storeName), Collections.singleton(storeBuilder)), "source1") + .addSink("counts", OUTPUT_TOPIC_1, "processor1"); + + driver = new TopologyTestDriver(topology, props); + + final TestInputTopic<String, String> inputTopic = driver.createInputTopic(INPUT_TOPIC_1, STRING_SERIALIZER, STRING_SERIALIZER); + final TestOutputTopic<Integer, String> outputTopic1 = + driver.createOutputTopic(OUTPUT_TOPIC_1, Serdes.Integer().deserializer(), Serdes.String().deserializer()); + + inputTopic.pipeInput("key1", "value1"); + inputTopic.pipeInput("key2", "value2"); + inputTopic.pipeInput("key3", "value3"); + inputTopic.pipeInput("key1", "value4"); + assertTrue(outputTopic1.isEmpty()); + + final KeyValueStore<String, String> store = driver.getKeyValueStore("prefixScanStore"); + final KeyValueIterator<String, String> prefixScan = store.prefixScan("key", Serdes.String().serializer()); + final List<KeyValue<String, String>> results = new ArrayList<>(); + while (prefixScan.hasNext()) { + final KeyValue<String, String> next = prefixScan.next(); + results.add(next); + } + + assertEquals("key1", results.get(0).key); + assertEquals("value4", results.get(0).value); + assertEquals("key2", results.get(1).key); + assertEquals("value2", results.get(1).value); + assertEquals("key3", results.get(2).key); + assertEquals("value3", results.get(2).value); + + } + + @Test + public void testPrefixScanInMemoryStoreWithCachingWithLogging() { + final String storeName = "prefixScanStore"; + final StoreBuilder<KeyValueStore<String, String>> storeBuilder = + Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore(storeName), Serdes.String(), Serdes.String()) + .withCachingEnabled() + .withLoggingEnabled(Collections.emptyMap()); + topology + .addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC_1) + .addProcessor("processor1", defineWithStores(() -> new StatefulProcessor(storeName), Collections.singleton(storeBuilder)), "source1") + .addSink("counts", OUTPUT_TOPIC_1, "processor1"); + + driver = new TopologyTestDriver(topology, props); + + final TestInputTopic<String, String> inputTopic = driver.createInputTopic(INPUT_TOPIC_1, STRING_SERIALIZER, STRING_SERIALIZER); + final TestOutputTopic<Integer, String> outputTopic1 = + driver.createOutputTopic(OUTPUT_TOPIC_1, Serdes.Integer().deserializer(), Serdes.String().deserializer()); + + inputTopic.pipeInput("key1", "value1"); + inputTopic.pipeInput("key2", "value2"); + inputTopic.pipeInput("key3", "value3"); + inputTopic.pipeInput("key1", "value4"); + assertTrue(outputTopic1.isEmpty()); + + final KeyValueStore<String, String> store = driver.getKeyValueStore("prefixScanStore"); + final KeyValueIterator<String, String> prefixScan = store.prefixScan("key", Serdes.String().serializer()); + final List<KeyValue<String, String>> results = new ArrayList<>(); + while (prefixScan.hasNext()) { + final KeyValue<String, String> next = prefixScan.next(); + results.add(next); + } + + assertEquals("key1", results.get(0).key); + assertEquals("value4", results.get(0).value); + assertEquals("key2", results.get(1).key); + assertEquals("value2", results.get(1).value); + assertEquals("key3", results.get(2).key); + assertEquals("value3", results.get(2).value); + + } + + @Test + public void testPrefixScanPersistentStoreNoCachingNoLogging() { + final String storeName = "prefixScanStore"; + final StoreBuilder<KeyValueStore<String, String>> storeBuilder = + Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore(storeName), Serdes.String(), Serdes.String()) + .withCachingDisabled() + .withLoggingDisabled(); + topology + .addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC_1) + .addProcessor("processor1", defineWithStores(() -> new StatefulProcessor(storeName), Collections.singleton(storeBuilder)), "source1") + .addSink("counts", OUTPUT_TOPIC_1, "processor1"); + + driver = new TopologyTestDriver(topology, props); + + final TestInputTopic<String, String> inputTopic = driver.createInputTopic(INPUT_TOPIC_1, STRING_SERIALIZER, STRING_SERIALIZER); + final TestOutputTopic<Integer, String> outputTopic1 = + driver.createOutputTopic(OUTPUT_TOPIC_1, Serdes.Integer().deserializer(), Serdes.String().deserializer()); + + inputTopic.pipeInput("key1", "value1"); + inputTopic.pipeInput("key2", "value2"); + inputTopic.pipeInput("key3", "value3"); + inputTopic.pipeInput("key1", "value4"); + assertTrue(outputTopic1.isEmpty()); + + final KeyValueStore<String, String> store = driver.getKeyValueStore("prefixScanStore"); + final KeyValueIterator<String, String> prefixScan = store.prefixScan("key", Serdes.String().serializer()); + final List<KeyValue<String, String>> results = new ArrayList<>(); + while (prefixScan.hasNext()) { + final KeyValue<String, String> next = prefixScan.next(); + results.add(next); + } + + assertEquals("key1", results.get(0).key); + assertEquals("value4", results.get(0).value); + assertEquals("key2", results.get(1).key); + assertEquals("value2", results.get(1).value); + assertEquals("key3", results.get(2).key); + assertEquals("value3", results.get(2).value); + + } + + @Test + public void testPrefixScanPersistentStoreWithCachingNoLogging() { + final String storeName = "prefixScanStore"; + final StoreBuilder<KeyValueStore<String, String>> storeBuilder = + Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore(storeName), Serdes.String(), Serdes.String()) + .withCachingEnabled() + .withLoggingDisabled(); + topology + .addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC_1) + .addProcessor("processor1", defineWithStores(() -> new StatefulProcessor(storeName), Collections.singleton(storeBuilder)), "source1") + .addSink("counts", OUTPUT_TOPIC_1, "processor1"); + + driver = new TopologyTestDriver(topology, props); + + final TestInputTopic<String, String> inputTopic = driver.createInputTopic(INPUT_TOPIC_1, STRING_SERIALIZER, STRING_SERIALIZER); + final TestOutputTopic<Integer, String> outputTopic1 = + driver.createOutputTopic(OUTPUT_TOPIC_1, Serdes.Integer().deserializer(), Serdes.String().deserializer()); + + inputTopic.pipeInput("key1", "value1"); + inputTopic.pipeInput("key2", "value2"); + inputTopic.pipeInput("key3", "value3"); + inputTopic.pipeInput("key1", "value4"); + assertTrue(outputTopic1.isEmpty()); + + final KeyValueStore<String, String> store = driver.getKeyValueStore("prefixScanStore"); + final KeyValueIterator<String, String> prefixScan = store.prefixScan("key", Serdes.String().serializer()); + final List<KeyValue<String, String>> results = new ArrayList<>(); + while (prefixScan.hasNext()) { + final KeyValue<String, String> next = prefixScan.next(); + results.add(next); + } + + assertEquals("key1", results.get(0).key); + assertEquals("value4", results.get(0).value); + assertEquals("key2", results.get(1).key); + assertEquals("value2", results.get(1).value); + assertEquals("key3", results.get(2).key); + assertEquals("value3", results.get(2).value); + + } + + @Test + public void testPrefixScanPersistentStoreWithCachingWithLogging() { + final String storeName = "prefixScanStore"; + final StoreBuilder<KeyValueStore<String, String>> storeBuilder = + Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore(storeName), Serdes.String(), Serdes.String()) + .withCachingEnabled() + .withLoggingEnabled(Collections.emptyMap()); + topology + .addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC_1) + .addProcessor("processor1", defineWithStores(() -> new StatefulProcessor(storeName), Collections.singleton(storeBuilder)), "source1") + .addSink("counts", OUTPUT_TOPIC_1, "processor1"); + + driver = new TopologyTestDriver(topology, props); + + final TestInputTopic<String, String> inputTopic = driver.createInputTopic(INPUT_TOPIC_1, STRING_SERIALIZER, STRING_SERIALIZER); + final TestOutputTopic<Integer, String> outputTopic1 = + driver.createOutputTopic(OUTPUT_TOPIC_1, Serdes.Integer().deserializer(), Serdes.String().deserializer()); + + inputTopic.pipeInput("key1", "value1"); + inputTopic.pipeInput("key2", "value2"); + inputTopic.pipeInput("key3", "value3"); + inputTopic.pipeInput("key1", "value4"); + assertTrue(outputTopic1.isEmpty()); + + final KeyValueStore<String, String> store = driver.getKeyValueStore("prefixScanStore"); + final KeyValueIterator<String, String> prefixScan = store.prefixScan("key", Serdes.String().serializer()); + final List<KeyValue<String, String>> results = new ArrayList<>(); + while (prefixScan.hasNext()) { + final KeyValue<String, String> next = prefixScan.next(); + results.add(next); + } + + assertEquals("key1", results.get(0).key); + assertEquals("value4", results.get(0).value); + assertEquals("key2", results.get(1).key); + assertEquals("value2", results.get(1).value); + assertEquals("key3", results.get(2).key); + assertEquals("value3", results.get(2).value); + + } + + @Test + public void testPrefixScanPersistentTimestampedStoreNoCachingNoLogging() { Review comment: Ah, I see, I thought it was trying to test the underlying `prefixScan` functionality itself, but what Matthias said makes sense. They do still feel very out of place here, but I'm not sure I can think of a better place to put them. Maybe just create a new file and put them in a separate test class of their own? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org