cadonna commented on a change in pull request #10052:
URL: https://github.com/apache/kafka/pull/10052#discussion_r581735306



##########
File path: 
streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreTest.java
##########
@@ -60,4 +97,131 @@ public void shouldRemoveKeysWithNullValues() {
 
         assertThat(store.get(0), nullValue());
     }
+
+
+    @Test
+    public void shouldReturnKeysWithGivenPrefix() {
+
+        final List<KeyValue<Bytes, byte[]>> entries = new ArrayList<>();
+        entries.add(new KeyValue<>(
+            new Bytes(stringSerializer.serialize(null, "k1")),
+            stringSerializer.serialize(null, "a")));
+        entries.add(new KeyValue<>(
+            new Bytes(stringSerializer.serialize(null, "prefix_3")),
+            stringSerializer.serialize(null, "b")));
+        entries.add(new KeyValue<>(
+            new Bytes(stringSerializer.serialize(null, "k2")),
+            stringSerializer.serialize(null, "c")));
+        entries.add(new KeyValue<>(
+            new Bytes(stringSerializer.serialize(null, "prefix_2")),
+            stringSerializer.serialize(null, "d")));
+        entries.add(new KeyValue<>(
+            new Bytes(stringSerializer.serialize(null, "k3")),
+            stringSerializer.serialize(null, "e")));
+        entries.add(new KeyValue<>(
+            new Bytes(stringSerializer.serialize(null, "prefix_1")),
+            stringSerializer.serialize(null, "f")));
+
+        byteStore.putAll(entries);
+        byteStore.flush();
+
+        final KeyValueIterator<Bytes, byte[]> keysWithPrefix = 
byteStore.prefixScan("prefix", stringSerializer);
+        final List<String> valuesWithPrefix = new ArrayList<>();
+        int numberOfKeysReturned = 0;
+
+        while (keysWithPrefix.hasNext()) {
+            final KeyValue<Bytes, byte[]> next = keysWithPrefix.next();
+            valuesWithPrefix.add(new String(next.value));
+            numberOfKeysReturned++;
+        }
+        assertThat(numberOfKeysReturned, is(3));
+        assertThat(valuesWithPrefix.get(0), is("f"));
+        assertThat(valuesWithPrefix.get(1), is("d"));
+        assertThat(valuesWithPrefix.get(2), is("b"));
+    }
+
+    @Test
+    public void shouldReturnKeysWithGivenPrefixExcludingNextKeyLargestKey() {
+        final List<KeyValue<Bytes, byte[]>> entries = new ArrayList<>();
+        entries.add(new KeyValue<>(
+            new Bytes(stringSerializer.serialize(null, "abc")),
+            stringSerializer.serialize(null, "f")));
+
+        entries.add(new KeyValue<>(
+            new Bytes(stringSerializer.serialize(null, "abcd")),
+            stringSerializer.serialize(null, "f")));
+
+        entries.add(new KeyValue<>(
+            new Bytes(stringSerializer.serialize(null, "abce")),
+            stringSerializer.serialize(null, "f")));
+
+        byteStore.putAll(entries);
+        byteStore.flush();
+
+        final KeyValueIterator<Bytes, byte[]> keysWithPrefixAsabcd = 
byteStore.prefixScan("abcd", stringSerializer);
+        int numberOfKeysReturned = 0;
+
+        while (keysWithPrefixAsabcd.hasNext()) {
+            keysWithPrefixAsabcd.next().key.get();
+            numberOfKeysReturned++;
+        }
+
+        assertThat(numberOfKeysReturned, is(1));
+    }
+
+    @Test
+    public void shouldReturnUUIDsWithStringPrefix() {
+        final List<KeyValue<Bytes, byte[]>> entries = new ArrayList<>();
+        final Serializer<UUID> uuidSerializer = Serdes.UUID().serializer();
+        final UUID uuid1 = UUID.randomUUID();
+        final UUID uuid2 = UUID.randomUUID();
+        final String prefix = uuid1.toString().substring(0, 4);
+        entries.add(new KeyValue<>(
+            new Bytes(uuidSerializer.serialize(null, uuid1)),
+            stringSerializer.serialize(null, "a")));
+        entries.add(new KeyValue<>(
+            new Bytes(uuidSerializer.serialize(null, uuid2)),
+            stringSerializer.serialize(null, "b")));
+
+        byteStore.putAll(entries);
+        byteStore.flush();
+
+        final KeyValueIterator<Bytes, byte[]> keysWithPrefix = 
byteStore.prefixScan(prefix, stringSerializer);
+        final List<String> valuesWithPrefix = new ArrayList<>();
+        int numberOfKeysReturned = 0;
+
+        while (keysWithPrefix.hasNext()) {
+            final KeyValue<Bytes, byte[]> next = keysWithPrefix.next();
+            valuesWithPrefix.add(new String(next.value));
+            numberOfKeysReturned++;
+        }
+
+        assertThat(numberOfKeysReturned, is(1));
+        assertThat(valuesWithPrefix.get(0), is("a"));
+    }
+
+    @Test
+    public void shouldReturnNoKeys() {
+        final List<KeyValue<Bytes, byte[]>> entries = new ArrayList<>();
+        entries.add(new KeyValue<>(
+            new Bytes(stringSerializer.serialize(null, "a")),
+            stringSerializer.serialize(null, "a")));
+        entries.add(new KeyValue<>(
+            new Bytes(stringSerializer.serialize(null, "b")),
+            stringSerializer.serialize(null, "c")));
+        entries.add(new KeyValue<>(
+            new Bytes(stringSerializer.serialize(null, "c")),
+            stringSerializer.serialize(null, "e")));
+        byteStore.putAll(entries);
+        byteStore.flush();
+
+        final KeyValueIterator<Bytes, byte[]> keysWithPrefix = 
byteStore.prefixScan("d", stringSerializer);

Review comment:
       Here, I would use `bb` as the prefix to cover the case where a key is a 
prefix of the prefix.  
   
   ```suggestion
           final KeyValueIterator<Bytes, byte[]> keysWithPrefix = 
byteStore.prefixScan("bb", stringSerializer);
   ```

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/state/ReadOnlyKeyValueStore.java
##########
@@ -104,6 +104,9 @@
      * prefix into the format in which the keys are stored in the stores needs 
to be passed to this method.
      * The returned iterator must be safe from {@link 
java.util.ConcurrentModificationException}s
      * and must not return null values.
+     * Order is not guaranteed as bytes lexicographical ordering might not 
represent key order.
+     * For example, if the key type is Integer, and the store contains keys 
[1, 2, 11, 13],
+     * then running store.prefixScan(1, new IntegerSerializer()) would return 
[1] and not [1,11,13].

Review comment:
       ```suggestion
        * Since {@code prefixScan()} relies on byte lexicographical ordering 
and not on the ordering of the key type, results for some types might be 
unexpected.
        * For example, if the key type is {@code Integer}, and the store 
contains keys [1, 2, 11, 13],
        * then running {@code store.prefixScan(1, new IntegerSerializer())} 
will return [1] and not [1,11,13]. 
        * In contrast, if the key type is {@code String} the keys will be 
sorted [1, 11, 13, 2] in the store and {@code store.prefixScan(1, new 
StringSerializer())} will return [1,11,13]. 
        * In both cases {@code prefixScan()} starts the scan at 1 and stops at 
2.
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to