aliehsaeedii commented on code in PR #16554:
URL: https://github.com/apache/kafka/pull/16554#discussion_r2816218608


##########
streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueChangeBuffer.java:
##########
@@ -381,55 +383,46 @@ private void restoreBatch(final 
Collection<ConsumerRecord<byte[], byte[]>> batch
     @Override
     public void evictWhile(final Supplier<Boolean> predicate,
                            final Consumer<Eviction<K, Change<V>>> callback) {
-        final Iterator<Map.Entry<BufferKey, BufferValue>> delegate = 
sortedMap.entrySet().iterator();
-        int evictions = 0;
+        final Iterator<Map.Entry<BufferKey, BufferValue>> iterator = 
sortedMap.entrySet().iterator();
+        final List<Eviction<K, Change<V>>> evictions = new ArrayList<>();
+        int evictionCount = 0;
 
-        if (predicate.get()) {
-            Map.Entry<BufferKey, BufferValue> next = null;
-            if (delegate.hasNext()) {
-                next = delegate.next();
+        while (iterator.hasNext()) {
+            if (!predicate.get()) {
+                break;
             }
 
-            // predicate being true means we read one record, call the 
callback, and then remove it
-            while (next != null && predicate.get()) {
-                if (next.getKey().time() != minTimestamp) {

Review Comment:
   Is this check and throwing IllegalStateException completely removed?  Was 
this validation unnecessary, or does removing it risk data inconsistency? If 
`minTimestamp` could become out of sync with the actual minimum in sortedMap, 
this was a useful safeguard.
   



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueChangeBuffer.java:
##########
@@ -381,55 +383,46 @@ private void restoreBatch(final 
Collection<ConsumerRecord<byte[], byte[]>> batch
     @Override
     public void evictWhile(final Supplier<Boolean> predicate,
                            final Consumer<Eviction<K, Change<V>>> callback) {
-        final Iterator<Map.Entry<BufferKey, BufferValue>> delegate = 
sortedMap.entrySet().iterator();
-        int evictions = 0;
+        final Iterator<Map.Entry<BufferKey, BufferValue>> iterator = 
sortedMap.entrySet().iterator();
+        final List<Eviction<K, Change<V>>> evictions = new ArrayList<>();
+        int evictionCount = 0;
 
-        if (predicate.get()) {
-            Map.Entry<BufferKey, BufferValue> next = null;
-            if (delegate.hasNext()) {
-                next = delegate.next();
+        while (iterator.hasNext()) {
+            if (!predicate.get()) {
+                break;
             }
 
-            // predicate being true means we read one record, call the 
callback, and then remove it
-            while (next != null && predicate.get()) {
-                if (next.getKey().time() != minTimestamp) {
-                    throw new IllegalStateException(
-                        "minTimestamp [" + minTimestamp + "] did not match the 
actual min timestamp [" +
-                            next.getKey().time() + "]"
-                    );
-                }
-                final K key = 
keySerde.deserializer().deserialize(changelogTopic, next.getKey().key().get());
-                final BufferValue bufferValue = next.getValue();
-                final Change<V> value = valueSerde.deserializeParts(
+            final Map.Entry<BufferKey, BufferValue> next = iterator.next();
+            final BufferValue bufferValue = next.getValue();
+
+            // Collect evictions to process them outside the iterator loop

Review Comment:
   Consider adding a comment explaining why callbacks are deferred.



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueChangeBuffer.java:
##########
@@ -381,55 +383,46 @@ private void restoreBatch(final 
Collection<ConsumerRecord<byte[], byte[]>> batch
     @Override
     public void evictWhile(final Supplier<Boolean> predicate,
                            final Consumer<Eviction<K, Change<V>>> callback) {
-        final Iterator<Map.Entry<BufferKey, BufferValue>> delegate = 
sortedMap.entrySet().iterator();
-        int evictions = 0;
+        final Iterator<Map.Entry<BufferKey, BufferValue>> iterator = 
sortedMap.entrySet().iterator();
+        final List<Eviction<K, Change<V>>> evictions = new ArrayList<>();
+        int evictionCount = 0;
 
-        if (predicate.get()) {
-            Map.Entry<BufferKey, BufferValue> next = null;
-            if (delegate.hasNext()) {
-                next = delegate.next();
+        while (iterator.hasNext()) {
+            if (!predicate.get()) {
+                break;
             }
 
-            // predicate being true means we read one record, call the 
callback, and then remove it
-            while (next != null && predicate.get()) {
-                if (next.getKey().time() != minTimestamp) {
-                    throw new IllegalStateException(
-                        "minTimestamp [" + minTimestamp + "] did not match the 
actual min timestamp [" +
-                            next.getKey().time() + "]"
-                    );
-                }
-                final K key = 
keySerde.deserializer().deserialize(changelogTopic, next.getKey().key().get());
-                final BufferValue bufferValue = next.getValue();
-                final Change<V> value = valueSerde.deserializeParts(
+            final Map.Entry<BufferKey, BufferValue> next = iterator.next();
+            final BufferValue bufferValue = next.getValue();
+
+            // Collect evictions to process them outside the iterator loop

Review Comment:
   // Collect evictions to process them outside the iterator loop
     // This prevents callback side effects from interfering with iteration
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to