muralibasani commented on code in PR #22458:
URL: https://github.com/apache/kafka/pull/22458#discussion_r3351207766


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##########
@@ -1163,6 +1174,28 @@ public void resizeCache(final long size) {
         cacheResizeSize.set(size);
     }
 
+    public void resizeCacheAndBufferMemory(final long cacheSize, final long 
maxBufferSize) {
+        cacheResizeSize.set(cacheSize);
+        maxBufferSizeInBytes.set(maxBufferSize);
+    }
+
+    private void maybeResumePartitionsPausedForBufferOverflow() {
+        if (maxBufferSizeInBytes.get() == UNDEFINED_INPUT_BUFFER_MAX_BYTES) {
+            return;
+        }
+        if (partitionsPausedForBufferOverflow.isEmpty()) {
+            return;
+        }
+        if (taskManager.getInputBufferSizeInBytes() <= 
maxBufferSizeInBytes.get()) {
+            // defensive copy — we clear the tracking set right after.
+            final Set<TopicPartition> toResume = new 
HashSet<>(partitionsPausedForBufferOverflow);

Review Comment:
   As raised in the previous PR 20292, creating another copy so the consumer is 
working on an independent set of partitions and clear after resume call



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to