guozhangwang commented on a change in pull request #11367:
URL: https://github.com/apache/kafka/pull/11367#discussion_r718905933



##########
File path: 
streams/src/test/java/org/apache/kafka/streams/state/internals/ListValueStoreTest.java
##########
@@ -136,8 +138,9 @@ public void shouldGetAllNonDeletedRecords() {
         listStore.put(4, "four");
 
         // Delete some records
-        listStore.put(1, null);
-        listStore.put(3, null);
+        listStore.putIfAbsent(1, null);
+        listStore.putIfAbsent(3, null);
+        listStore.putIfAbsent(5, null);

Review comment:
       That's because we used `putIfAbsent` in non-testing code. And I want to 
test the case where a `putIfAbsent` on non-existing keys would not have side 
effects.

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/state/internals/ListValueStoreTest.java
##########
@@ -178,26 +181,46 @@ public void shouldGetAllReturnTimestampOrderedRecords() {
 
     @Test
     public void shouldAllowDeleteWhileIterateRecords() {
-        listStore.put(0, "zero1");
-        listStore.put(0, "zero2");
-        listStore.put(1, "one");
+        final Random rand = new Random();

Review comment:
       Ack. And I realized that I need to test the case for in-memory kv store 
for disabling the copyOnRange flag too.

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/state/internals/ListValueStoreTest.java
##########
@@ -178,26 +181,46 @@ public void shouldGetAllReturnTimestampOrderedRecords() {
 
     @Test
     public void shouldAllowDeleteWhileIterateRecords() {
-        listStore.put(0, "zero1");
-        listStore.put(0, "zero2");
-        listStore.put(1, "one");
+        final Random rand = new Random();
+        int count = 0;
+        for (int i = 0; i < 10000; i++) {
+            listStore.put(i, "zero" + i);
+            count++;
+
+            while (rand.nextBoolean()) {
+                listStore.put(i, "zero" + i);
+                count++;
+            }
+        }
 
-        final KeyValue<Integer, String> zero1 = KeyValue.pair(0, "zero1");
-        final KeyValue<Integer, String> zero2 = KeyValue.pair(0, "zero2");
-        final KeyValue<Integer, String> one = KeyValue.pair(1, "one");
+        final int size = toList(listStore.all()).size();

Review comment:
       toList would explicitly close the iterator at the end.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java
##########
@@ -311,7 +314,25 @@ private void assertUniqueStoreNames(final 
WindowBytesStoreSupplier supplier,
 
         final StoreBuilder<KeyValueStore<TimestampedKeyAndJoinSide<K>, 
LeftOrRightValue<V1, V2>>> builder =
             new ListValueStoreBuilder<>(
-                persistent ? Stores.persistentKeyValueStore(storeName) : 
Stores.inMemoryKeyValueStore(storeName),
+                persistent ?
+                    Stores.persistentKeyValueStore(storeName) :
+                    new KeyValueBytesStoreSupplier() {
+                    @Override
+                    public String name() {
+                        return storeName;
+                    }
+
+                    @Override
+                    public KeyValueStore<Bytes, byte[]> get() {
+                        // do not copy of range since it would not be used for 
IQ

Review comment:
       ack.

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/state/internals/ListValueStoreTest.java
##########
@@ -178,26 +181,46 @@ public void shouldGetAllReturnTimestampOrderedRecords() {
 
     @Test
     public void shouldAllowDeleteWhileIterateRecords() {
-        listStore.put(0, "zero1");
-        listStore.put(0, "zero2");
-        listStore.put(1, "one");
+        final Random rand = new Random();
+        int count = 0;
+        for (int i = 0; i < 10000; i++) {
+            listStore.put(i, "zero" + i);
+            count++;
+
+            while (rand.nextBoolean()) {
+                listStore.put(i, "zero" + i);
+                count++;
+            }
+        }
 
-        final KeyValue<Integer, String> zero1 = KeyValue.pair(0, "zero1");
-        final KeyValue<Integer, String> zero2 = KeyValue.pair(0, "zero2");
-        final KeyValue<Integer, String> one = KeyValue.pair(1, "one");
+        final int size = toList(listStore.all()).size();
+        assertEquals(count, size);
 
         final KeyValueIterator<Integer, String> it = listStore.all();
-        assertEquals(zero1, it.next());
 
-        listStore.put(0, null);
+        int prev = -1;
+        int deleted = 0;
+        int dupCount = 0;
+        while (it.hasNext()) {
+            final KeyValue<Integer, String> entry = it.next();
+
+            if (prev != -1 && prev != entry.key) {
+                if (rand.nextBoolean()) {
+                    listStore.put(prev, null);
+                    deleted += dupCount;
+                }
 
-        // zero2 should still be returned from the iterator after the delete 
call
-        assertEquals(zero2, it.next());
+                dupCount = 0;
+            }
+
+            dupCount++;
+            prev = entry.key;
+        }
 
         it.close();
 
         // A new all() iterator after a previous all() iterator was closed 
should not return deleted records.
-        assertEquals(Collections.singletonList(one), toList(listStore.all()));
+        assertEquals(size - deleted, toList(listStore.all()).size());

Review comment:
       Ditto, toList would close the iterator.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to