cadonna commented on a change in pull request #9508:
URL: https://github.com/apache/kafka/pull/9508#discussion_r564405308



##########
File path: 
streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java
##########
@@ -196,6 +201,26 @@ public void shouldReturnValueOnGetWhenExists() {
         assertThat(store.get(hello), equalTo(world));
     }
 
+    @Test
+    public void shouldGetRecordsWithPrefixKey() {
+        store.put(hi, there);
+        store.put(Bytes.increment(hi), world);
+        final KeyValueIterator<Bytes, byte[]> keysWithPrefix = 
store.prefixScan("hi", new StringSerializer());

Review comment:
       ```suggestion
           final KeyValueIterator<Bytes, byte[]> keysWithPrefix = 
store.prefixScan(hi.toString(), new StringSerializer());
   ```
   In such a way, you can reuse variable `hi` and `there`. Similar is true for 
my suggestions below.

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java
##########
@@ -196,6 +201,26 @@ public void shouldReturnValueOnGetWhenExists() {
         assertThat(store.get(hello), equalTo(world));
     }
 
+    @Test
+    public void shouldGetRecordsWithPrefixKey() {
+        store.put(hi, there);
+        store.put(Bytes.increment(hi), world);
+        final KeyValueIterator<Bytes, byte[]> keysWithPrefix = 
store.prefixScan("hi", new StringSerializer());
+        final List<String> keys = new ArrayList<>();
+        final List<String> values = new ArrayList<>();
+        int numberOfKeysReturned = 0;
+
+        while (keysWithPrefix.hasNext()) {
+            final KeyValue<Bytes, byte[]> next = keysWithPrefix.next();
+            keys.add(next.key.toString());
+            values.add(new String(next.value));

Review comment:
       ```suggestion
               keys.add(next.key);
               values.add(Bytes.wrap(next.value));
   ```

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java
##########
@@ -434,6 +435,22 @@ public void 
shouldRemoveMetricsEvenIfWrappedStoreThrowsOnClose() {
         verify(inner);
     }
 
+    @Test
+    public void shouldGetRecordsWithPrefixKey() {
+        final StringSerializer stringSerializer = new StringSerializer();
+        expect(inner.prefixScan(KEY, stringSerializer))
+            .andReturn(new 
KeyValueIteratorStub<>(Collections.singletonList(BYTE_KEY_VALUE_PAIR).iterator()));
+        init();
+
+        final KeyValueIterator<String, String> iterator = 
metered.prefixScan(KEY, stringSerializer);
+        assertThat(iterator.next().value, equalTo(VALUE));
+        iterator.close();
+
+        final KafkaMetric metric = metrics.metric(new 
MetricName("prefix-scan-rate", STORE_LEVEL_GROUP, "", tags));

Review comment:
       nit:
   ```suggestion
           final KafkaMetric metric = metric("prefix-scan-rate");
   ```

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/StateStoreMetricsTest.java
##########
@@ -204,6 +204,39 @@ public void shouldGetRangeSensor() {
         );
     }
 
+    @Test
+    public void shouldGetPrefixScanSensor() {
+        final String metricName = "prefix-scan";
+        final String descriptionOfRate = "The average number of calls to 
prefix-scan per second";
+        final String descriptionOfAvg = "The average latency of calls to 
prefix-scan";
+        final String descriptionOfMax = "The maximum latency of calls to 
prefix-scan";
+        expect(streamsMetrics.storeLevelSensor(TASK_ID, STORE_NAME, 
metricName, RecordingLevel.DEBUG))
+            .andReturn(expectedSensor);
+        expect(streamsMetrics.storeLevelTagMap(TASK_ID, STORE_TYPE, 
STORE_NAME)).andReturn(storeTagMap);
+        StreamsMetricsImpl.addInvocationRateToSensor(
+            expectedSensor,
+            STORE_LEVEL_GROUP,
+            storeTagMap,
+            metricName,
+            descriptionOfRate
+        );
+        StreamsMetricsImpl.addAvgAndMaxToSensor(
+            expectedSensor,
+            STORE_LEVEL_GROUP,
+            storeTagMap,
+            latencyMetricName(metricName),
+            descriptionOfAvg,
+            descriptionOfMax
+        );
+        replay(StreamsMetricsImpl.class, streamsMetrics);
+
+        final Sensor sensor =
+                StateStoreMetrics.prefixScanSensor(TASK_ID, STORE_TYPE, 
STORE_NAME, streamsMetrics);

Review comment:
       nit:
   ```suggestion
           final Sensor sensor = StateStoreMetrics.prefixScanSensor(TASK_ID, 
STORE_TYPE, STORE_NAME, streamsMetrics);
   ```

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
##########
@@ -291,6 +292,16 @@ public void putAll(final List<KeyValue<Bytes, byte[]>> 
entries) {
         return new MergedSortedCacheKeyValueBytesStoreIterator(cacheIterator, 
storeIterator, true);
     }
 
+    @Override
+    public <PS extends Serializer<P>, P> KeyValueIterator<Bytes, byte[]> 
prefixScan(final P prefix, final PS prefixKeySerializer) {

Review comment:
       This method needs unit testing. Try to use a mock for the cache in the 
test.

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java
##########
@@ -196,6 +201,26 @@ public void shouldReturnValueOnGetWhenExists() {
         assertThat(store.get(hello), equalTo(world));
     }
 
+    @Test
+    public void shouldGetRecordsWithPrefixKey() {
+        store.put(hi, there);
+        store.put(Bytes.increment(hi), world);
+        final KeyValueIterator<Bytes, byte[]> keysWithPrefix = 
store.prefixScan("hi", new StringSerializer());
+        final List<String> keys = new ArrayList<>();
+        final List<String> values = new ArrayList<>();

Review comment:
       ```suggestion
           final List<Bytes> keys = new ArrayList<>();
           final List<Bytes> values = new ArrayList<>();
   ```

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java
##########
@@ -196,6 +201,26 @@ public void shouldReturnValueOnGetWhenExists() {
         assertThat(store.get(hello), equalTo(world));
     }
 
+    @Test
+    public void shouldGetRecordsWithPrefixKey() {
+        store.put(hi, there);
+        store.put(Bytes.increment(hi), world);
+        final KeyValueIterator<Bytes, byte[]> keysWithPrefix = 
store.prefixScan("hi", new StringSerializer());
+        final List<String> keys = new ArrayList<>();
+        final List<String> values = new ArrayList<>();
+        int numberOfKeysReturned = 0;
+
+        while (keysWithPrefix.hasNext()) {
+            final KeyValue<Bytes, byte[]> next = keysWithPrefix.next();
+            keys.add(next.key.toString());
+            values.add(new String(next.value));
+            numberOfKeysReturned++;
+        }
+        assertThat(numberOfKeysReturned, is(1));
+        assertThat(keys, is(Collections.singletonList("hi")));
+        assertThat(values, is(Collections.singletonList("there")));

Review comment:
       ```suggestion
           assertThat(keys, is(Collections.singletonList(hi)));
           assertThat(values, is(Collections.singletonList(Bytes.wrap(there))));
   ```

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
##########
@@ -103,6 +105,20 @@ public void putAll(final List<KeyValue<Bytes, byte[]>> 
entries) {
         }
     }
 
+    @Override
+    public <PS extends Serializer<P>, P> KeyValueIterator<Bytes, byte[]> 
prefixScan(final P prefix, final PS prefixKeySerializer) {

Review comment:
       @vamossagar12 I can still not find the unit test for this method.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to