unknowntpo commented on code in PR #22458:
URL: https://github.com/apache/kafka/pull/22458#discussion_r3378708350
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##########
@@ -1159,8 +1170,26 @@ private void subscribeConsumer() {
}
}
- public void resizeCache(final long size) {
- cacheResizeSize.set(size);
+ public void resizeCacheAndBufferMemory(final long cacheSize, final long
maxBufferSize) {
+ cacheResizeSize.set(cacheSize);
+ maxBufferSizeInBytes.set(maxBufferSize);
+ }
+
+ private void maybeResumePartitionsPausedForBufferOverflow() {
Review Comment:
`partitionsPausedForBufferOverflow` is updated when partitions are paused by
the byte cap, but it is not pruned when partitions are revoked or lost. After a
rebalance, this set may still contain partitions that are no longer assigned
to the consumer. When the buffer later drains, `StreamThread` calls
`resume()` with that stale set, and `KafkaConsumer.resume()` can throw
`IllegalStateException` for unassigned partitions.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]