guozhangwang commented on a change in pull request #11367: URL: https://github.com/apache/kafka/pull/11367#discussion_r718905933
########## File path: streams/src/test/java/org/apache/kafka/streams/state/internals/ListValueStoreTest.java ########## @@ -136,8 +138,9 @@ public void shouldGetAllNonDeletedRecords() { listStore.put(4, "four"); // Delete some records - listStore.put(1, null); - listStore.put(3, null); + listStore.putIfAbsent(1, null); + listStore.putIfAbsent(3, null); + listStore.putIfAbsent(5, null); Review comment: That's because we used `putIfAbsent` in non-testing code. And I want to test the case where a `putIfAbsent` on non-existing keys would not have side effects. ########## File path: streams/src/test/java/org/apache/kafka/streams/state/internals/ListValueStoreTest.java ########## @@ -178,26 +181,46 @@ public void shouldGetAllReturnTimestampOrderedRecords() { @Test public void shouldAllowDeleteWhileIterateRecords() { - listStore.put(0, "zero1"); - listStore.put(0, "zero2"); - listStore.put(1, "one"); + final Random rand = new Random(); Review comment: Ack. And I realized that I need to test the case for in-memory kv store for disabling the copyOnRange flag too. ########## File path: streams/src/test/java/org/apache/kafka/streams/state/internals/ListValueStoreTest.java ########## @@ -178,26 +181,46 @@ public void shouldGetAllReturnTimestampOrderedRecords() { @Test public void shouldAllowDeleteWhileIterateRecords() { - listStore.put(0, "zero1"); - listStore.put(0, "zero2"); - listStore.put(1, "one"); + final Random rand = new Random(); + int count = 0; + for (int i = 0; i < 10000; i++) { + listStore.put(i, "zero" + i); + count++; + + while (rand.nextBoolean()) { + listStore.put(i, "zero" + i); + count++; + } + } - final KeyValue<Integer, String> zero1 = KeyValue.pair(0, "zero1"); - final KeyValue<Integer, String> zero2 = KeyValue.pair(0, "zero2"); - final KeyValue<Integer, String> one = KeyValue.pair(1, "one"); + final int size = toList(listStore.all()).size(); Review comment: toList would explicitly close the iterator at the end. ########## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java ########## @@ -311,7 +314,25 @@ private void assertUniqueStoreNames(final WindowBytesStoreSupplier supplier, final StoreBuilder<KeyValueStore<TimestampedKeyAndJoinSide<K>, LeftOrRightValue<V1, V2>>> builder = new ListValueStoreBuilder<>( - persistent ? Stores.persistentKeyValueStore(storeName) : Stores.inMemoryKeyValueStore(storeName), + persistent ? + Stores.persistentKeyValueStore(storeName) : + new KeyValueBytesStoreSupplier() { + @Override + public String name() { + return storeName; + } + + @Override + public KeyValueStore<Bytes, byte[]> get() { + // do not copy of range since it would not be used for IQ Review comment: ack. ########## File path: streams/src/test/java/org/apache/kafka/streams/state/internals/ListValueStoreTest.java ########## @@ -178,26 +181,46 @@ public void shouldGetAllReturnTimestampOrderedRecords() { @Test public void shouldAllowDeleteWhileIterateRecords() { - listStore.put(0, "zero1"); - listStore.put(0, "zero2"); - listStore.put(1, "one"); + final Random rand = new Random(); + int count = 0; + for (int i = 0; i < 10000; i++) { + listStore.put(i, "zero" + i); + count++; + + while (rand.nextBoolean()) { + listStore.put(i, "zero" + i); + count++; + } + } - final KeyValue<Integer, String> zero1 = KeyValue.pair(0, "zero1"); - final KeyValue<Integer, String> zero2 = KeyValue.pair(0, "zero2"); - final KeyValue<Integer, String> one = KeyValue.pair(1, "one"); + final int size = toList(listStore.all()).size(); + assertEquals(count, size); final KeyValueIterator<Integer, String> it = listStore.all(); - assertEquals(zero1, it.next()); - listStore.put(0, null); + int prev = -1; + int deleted = 0; + int dupCount = 0; + while (it.hasNext()) { + final KeyValue<Integer, String> entry = it.next(); + + if (prev != -1 && prev != entry.key) { + if (rand.nextBoolean()) { + listStore.put(prev, null); + deleted += dupCount; + } - // zero2 should still be returned from the iterator after the delete call - assertEquals(zero2, it.next()); + dupCount = 0; + } + + dupCount++; + prev = entry.key; + } it.close(); // A new all() iterator after a previous all() iterator was closed should not return deleted records. - assertEquals(Collections.singletonList(one), toList(listStore.all())); + assertEquals(size - deleted, toList(listStore.all()).size()); Review comment: Ditto, toList would close the iterator. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org