cadonna commented on a change in pull request #9508: URL: https://github.com/apache/kafka/pull/9508#discussion_r564405308
########## File path: streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java ########## @@ -196,6 +201,26 @@ public void shouldReturnValueOnGetWhenExists() { assertThat(store.get(hello), equalTo(world)); } + @Test + public void shouldGetRecordsWithPrefixKey() { + store.put(hi, there); + store.put(Bytes.increment(hi), world); + final KeyValueIterator<Bytes, byte[]> keysWithPrefix = store.prefixScan("hi", new StringSerializer()); Review comment: ```suggestion final KeyValueIterator<Bytes, byte[]> keysWithPrefix = store.prefixScan(hi.toString(), new StringSerializer()); ``` In such a way, you can reuse variable `hi` and `there`. Similar is true for my suggestions below. ########## File path: streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java ########## @@ -196,6 +201,26 @@ public void shouldReturnValueOnGetWhenExists() { assertThat(store.get(hello), equalTo(world)); } + @Test + public void shouldGetRecordsWithPrefixKey() { + store.put(hi, there); + store.put(Bytes.increment(hi), world); + final KeyValueIterator<Bytes, byte[]> keysWithPrefix = store.prefixScan("hi", new StringSerializer()); + final List<String> keys = new ArrayList<>(); + final List<String> values = new ArrayList<>(); + int numberOfKeysReturned = 0; + + while (keysWithPrefix.hasNext()) { + final KeyValue<Bytes, byte[]> next = keysWithPrefix.next(); + keys.add(next.key.toString()); + values.add(new String(next.value)); Review comment: ```suggestion keys.add(next.key); values.add(Bytes.wrap(next.value)); ``` ########## File path: streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java ########## @@ -434,6 +435,22 @@ public void shouldRemoveMetricsEvenIfWrappedStoreThrowsOnClose() { verify(inner); } + @Test + public void shouldGetRecordsWithPrefixKey() { + final StringSerializer stringSerializer = new StringSerializer(); + expect(inner.prefixScan(KEY, stringSerializer)) + .andReturn(new KeyValueIteratorStub<>(Collections.singletonList(BYTE_KEY_VALUE_PAIR).iterator())); + init(); + + final KeyValueIterator<String, String> iterator = metered.prefixScan(KEY, stringSerializer); + assertThat(iterator.next().value, equalTo(VALUE)); + iterator.close(); + + final KafkaMetric metric = metrics.metric(new MetricName("prefix-scan-rate", STORE_LEVEL_GROUP, "", tags)); Review comment: nit: ```suggestion final KafkaMetric metric = metric("prefix-scan-rate"); ``` ########## File path: streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/StateStoreMetricsTest.java ########## @@ -204,6 +204,39 @@ public void shouldGetRangeSensor() { ); } + @Test + public void shouldGetPrefixScanSensor() { + final String metricName = "prefix-scan"; + final String descriptionOfRate = "The average number of calls to prefix-scan per second"; + final String descriptionOfAvg = "The average latency of calls to prefix-scan"; + final String descriptionOfMax = "The maximum latency of calls to prefix-scan"; + expect(streamsMetrics.storeLevelSensor(TASK_ID, STORE_NAME, metricName, RecordingLevel.DEBUG)) + .andReturn(expectedSensor); + expect(streamsMetrics.storeLevelTagMap(TASK_ID, STORE_TYPE, STORE_NAME)).andReturn(storeTagMap); + StreamsMetricsImpl.addInvocationRateToSensor( + expectedSensor, + STORE_LEVEL_GROUP, + storeTagMap, + metricName, + descriptionOfRate + ); + StreamsMetricsImpl.addAvgAndMaxToSensor( + expectedSensor, + STORE_LEVEL_GROUP, + storeTagMap, + latencyMetricName(metricName), + descriptionOfAvg, + descriptionOfMax + ); + replay(StreamsMetricsImpl.class, streamsMetrics); + + final Sensor sensor = + StateStoreMetrics.prefixScanSensor(TASK_ID, STORE_TYPE, STORE_NAME, streamsMetrics); Review comment: nit: ```suggestion final Sensor sensor = StateStoreMetrics.prefixScanSensor(TASK_ID, STORE_TYPE, STORE_NAME, streamsMetrics); ``` ########## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java ########## @@ -291,6 +292,16 @@ public void putAll(final List<KeyValue<Bytes, byte[]>> entries) { return new MergedSortedCacheKeyValueBytesStoreIterator(cacheIterator, storeIterator, true); } + @Override + public <PS extends Serializer<P>, P> KeyValueIterator<Bytes, byte[]> prefixScan(final P prefix, final PS prefixKeySerializer) { Review comment: This method needs unit testing. Try to use a mock for the cache in the test. ########## File path: streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java ########## @@ -196,6 +201,26 @@ public void shouldReturnValueOnGetWhenExists() { assertThat(store.get(hello), equalTo(world)); } + @Test + public void shouldGetRecordsWithPrefixKey() { + store.put(hi, there); + store.put(Bytes.increment(hi), world); + final KeyValueIterator<Bytes, byte[]> keysWithPrefix = store.prefixScan("hi", new StringSerializer()); + final List<String> keys = new ArrayList<>(); + final List<String> values = new ArrayList<>(); Review comment: ```suggestion final List<Bytes> keys = new ArrayList<>(); final List<Bytes> values = new ArrayList<>(); ``` ########## File path: streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java ########## @@ -196,6 +201,26 @@ public void shouldReturnValueOnGetWhenExists() { assertThat(store.get(hello), equalTo(world)); } + @Test + public void shouldGetRecordsWithPrefixKey() { + store.put(hi, there); + store.put(Bytes.increment(hi), world); + final KeyValueIterator<Bytes, byte[]> keysWithPrefix = store.prefixScan("hi", new StringSerializer()); + final List<String> keys = new ArrayList<>(); + final List<String> values = new ArrayList<>(); + int numberOfKeysReturned = 0; + + while (keysWithPrefix.hasNext()) { + final KeyValue<Bytes, byte[]> next = keysWithPrefix.next(); + keys.add(next.key.toString()); + values.add(new String(next.value)); + numberOfKeysReturned++; + } + assertThat(numberOfKeysReturned, is(1)); + assertThat(keys, is(Collections.singletonList("hi"))); + assertThat(values, is(Collections.singletonList("there"))); Review comment: ```suggestion assertThat(keys, is(Collections.singletonList(hi))); assertThat(values, is(Collections.singletonList(Bytes.wrap(there)))); ``` ########## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java ########## @@ -103,6 +105,20 @@ public void putAll(final List<KeyValue<Bytes, byte[]>> entries) { } } + @Override + public <PS extends Serializer<P>, P> KeyValueIterator<Bytes, byte[]> prefixScan(final P prefix, final PS prefixKeySerializer) { Review comment: @vamossagar12 I can still not find the unit test for this method. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org