cadonna commented on a change in pull request #10052: URL: https://github.com/apache/kafka/pull/10052#discussion_r581735306
########## File path: streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreTest.java ########## @@ -60,4 +97,131 @@ public void shouldRemoveKeysWithNullValues() { assertThat(store.get(0), nullValue()); } + + + @Test + public void shouldReturnKeysWithGivenPrefix() { + + final List<KeyValue<Bytes, byte[]>> entries = new ArrayList<>(); + entries.add(new KeyValue<>( + new Bytes(stringSerializer.serialize(null, "k1")), + stringSerializer.serialize(null, "a"))); + entries.add(new KeyValue<>( + new Bytes(stringSerializer.serialize(null, "prefix_3")), + stringSerializer.serialize(null, "b"))); + entries.add(new KeyValue<>( + new Bytes(stringSerializer.serialize(null, "k2")), + stringSerializer.serialize(null, "c"))); + entries.add(new KeyValue<>( + new Bytes(stringSerializer.serialize(null, "prefix_2")), + stringSerializer.serialize(null, "d"))); + entries.add(new KeyValue<>( + new Bytes(stringSerializer.serialize(null, "k3")), + stringSerializer.serialize(null, "e"))); + entries.add(new KeyValue<>( + new Bytes(stringSerializer.serialize(null, "prefix_1")), + stringSerializer.serialize(null, "f"))); + + byteStore.putAll(entries); + byteStore.flush(); + + final KeyValueIterator<Bytes, byte[]> keysWithPrefix = byteStore.prefixScan("prefix", stringSerializer); + final List<String> valuesWithPrefix = new ArrayList<>(); + int numberOfKeysReturned = 0; + + while (keysWithPrefix.hasNext()) { + final KeyValue<Bytes, byte[]> next = keysWithPrefix.next(); + valuesWithPrefix.add(new String(next.value)); + numberOfKeysReturned++; + } + assertThat(numberOfKeysReturned, is(3)); + assertThat(valuesWithPrefix.get(0), is("f")); + assertThat(valuesWithPrefix.get(1), is("d")); + assertThat(valuesWithPrefix.get(2), is("b")); + } + + @Test + public void shouldReturnKeysWithGivenPrefixExcludingNextKeyLargestKey() { + final List<KeyValue<Bytes, byte[]>> entries = new ArrayList<>(); + entries.add(new KeyValue<>( + new Bytes(stringSerializer.serialize(null, "abc")), + stringSerializer.serialize(null, "f"))); + + entries.add(new KeyValue<>( + new Bytes(stringSerializer.serialize(null, "abcd")), + stringSerializer.serialize(null, "f"))); + + entries.add(new KeyValue<>( + new Bytes(stringSerializer.serialize(null, "abce")), + stringSerializer.serialize(null, "f"))); + + byteStore.putAll(entries); + byteStore.flush(); + + final KeyValueIterator<Bytes, byte[]> keysWithPrefixAsabcd = byteStore.prefixScan("abcd", stringSerializer); + int numberOfKeysReturned = 0; + + while (keysWithPrefixAsabcd.hasNext()) { + keysWithPrefixAsabcd.next().key.get(); + numberOfKeysReturned++; + } + + assertThat(numberOfKeysReturned, is(1)); + } + + @Test + public void shouldReturnUUIDsWithStringPrefix() { + final List<KeyValue<Bytes, byte[]>> entries = new ArrayList<>(); + final Serializer<UUID> uuidSerializer = Serdes.UUID().serializer(); + final UUID uuid1 = UUID.randomUUID(); + final UUID uuid2 = UUID.randomUUID(); + final String prefix = uuid1.toString().substring(0, 4); + entries.add(new KeyValue<>( + new Bytes(uuidSerializer.serialize(null, uuid1)), + stringSerializer.serialize(null, "a"))); + entries.add(new KeyValue<>( + new Bytes(uuidSerializer.serialize(null, uuid2)), + stringSerializer.serialize(null, "b"))); + + byteStore.putAll(entries); + byteStore.flush(); + + final KeyValueIterator<Bytes, byte[]> keysWithPrefix = byteStore.prefixScan(prefix, stringSerializer); + final List<String> valuesWithPrefix = new ArrayList<>(); + int numberOfKeysReturned = 0; + + while (keysWithPrefix.hasNext()) { + final KeyValue<Bytes, byte[]> next = keysWithPrefix.next(); + valuesWithPrefix.add(new String(next.value)); + numberOfKeysReturned++; + } + + assertThat(numberOfKeysReturned, is(1)); + assertThat(valuesWithPrefix.get(0), is("a")); + } + + @Test + public void shouldReturnNoKeys() { + final List<KeyValue<Bytes, byte[]>> entries = new ArrayList<>(); + entries.add(new KeyValue<>( + new Bytes(stringSerializer.serialize(null, "a")), + stringSerializer.serialize(null, "a"))); + entries.add(new KeyValue<>( + new Bytes(stringSerializer.serialize(null, "b")), + stringSerializer.serialize(null, "c"))); + entries.add(new KeyValue<>( + new Bytes(stringSerializer.serialize(null, "c")), + stringSerializer.serialize(null, "e"))); + byteStore.putAll(entries); + byteStore.flush(); + + final KeyValueIterator<Bytes, byte[]> keysWithPrefix = byteStore.prefixScan("d", stringSerializer); Review comment: Here, I would use `bb` as the prefix to cover the case where a key is a prefix of the prefix. ```suggestion final KeyValueIterator<Bytes, byte[]> keysWithPrefix = byteStore.prefixScan("bb", stringSerializer); ``` ########## File path: streams/src/main/java/org/apache/kafka/streams/state/ReadOnlyKeyValueStore.java ########## @@ -104,6 +104,9 @@ * prefix into the format in which the keys are stored in the stores needs to be passed to this method. * The returned iterator must be safe from {@link java.util.ConcurrentModificationException}s * and must not return null values. + * Order is not guaranteed as bytes lexicographical ordering might not represent key order. + * For example, if the key type is Integer, and the store contains keys [1, 2, 11, 13], + * then running store.prefixScan(1, new IntegerSerializer()) would return [1] and not [1,11,13]. Review comment: ```suggestion * Since {@code prefixScan()} relies on byte lexicographical ordering and not on the ordering of the key type, results for some types might be unexpected. * For example, if the key type is {@code Integer}, and the store contains keys [1, 2, 11, 13], * then running {@code store.prefixScan(1, new IntegerSerializer())} will return [1] and not [1,11,13]. * In contrast, if the key type is {@code String} the keys will be sorted [1, 11, 13, 2] in the store and {@code store.prefixScan(1, new StringSerializer())} will return [1,11,13]. * In both cases {@code prefixScan()} starts the scan at 1 and stops at 2. ``` ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org