guozhangwang commented on a change in pull request #11424: URL: https://github.com/apache/kafka/pull/11424#discussion_r773490666
########## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ########## @@ -1074,9 +1083,22 @@ private static Metrics getMetrics(final StreamsConfig config, final Time time, f * The removed stream thread is gracefully shut down. This method does not specify which stream * thread is shut down. * <p> +<<<<<<< HEAD Review comment: Rebase leftovers here. ########## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ########## @@ -1224,17 +1248,20 @@ private int getNextThreadIndex() { } } - private long getCacheSizePerThread(final int numStreamThreads) { + private long getMemorySizePerThread(final int numStreamThreads, final boolean resizeCache) { Review comment: I think what's originally suggested is a bit more than having a single consolidated function, but also to consolidate the caller: since we are always calling "getMemorySizePerThread" and `resizeThreadCacheOrBufferMemory` for both input buffer and cache size at the same place, we can have a single call rather than two calls. For example, we can just have: 1) a single `resizeThreadCacheAndBufferMemory(final long numThreads)` in which we get the cache-size and input-buffer-size per thread based on the total values of the two and the passed in number of threads, and then let each thread set to the corresponding values directly. 2) the `getMemorySizePerThread` function is then only needed for logging purposes, since otherwise it is now encapsulated inside the `resizeThreadCacheAndBufferMemory` function. We can have just a single `getThreadCacheAndBufferMemoryString()` which returns in the format of "value1/value2" and used in the logging message alone; here we can just get any thread and get its values. ########## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ########## @@ -1116,16 +1138,18 @@ private static Metrics getMetrics(final StreamsConfig config, final Time time, f } } else { log.info("{} is the last remaining thread and must remove itself, therefore we cannot wait " - + "for it to complete shutdown as this will result in deadlock.", streamThread.getName()); + + "for it to complete shutdown as this will result in deadlock.", streamThread.getName()); } - final long cacheSizePerThread = getCacheSizePerThread(getNumLiveStreamThreads()); - log.info("Resizing thread cache due to thread removal, new cache size per thread is {}", cacheSizePerThread); - resizeThreadCache(cacheSizePerThread); + final long cacheSizePerThread = getMemorySizePerThread(getNumLiveStreamThreads(), true); + resizeThreadCacheOrBufferMemory(cacheSizePerThread, true); + final long maxBufferSizePerThread = getMemorySizePerThread(getNumLiveStreamThreads(), false); + resizeThreadCacheOrBufferMemory(maxBufferSizePerThread, false); + log.info("Resizing thread cache/max buffer size due to thread removal, new cache size/max buffer size per thread is {}/{}", cacheSizePerThread, maxBufferSizePerThread); Review comment: Please see my other comment earlier: I'd suggest we also include the terminating thread name in the log message here for better trouble-shooting, this would be valuable when multiple threads are being created / terminated at the same time. ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StampedRecord.java ########## @@ -49,6 +49,14 @@ public Headers headers() { return value.headers(); } + public int serializedKeySize() { Review comment: These functions seem not used any more? ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java ########## @@ -967,6 +980,17 @@ private long pollPhase() { if (!records.isEmpty()) { pollRecordsSensor.record(numRecords, now); taskManager.addRecordsToTasks(records); + // Check buffer size after adding records to tasks + final long bufferSize = taskManager.getInputBufferSizeInBytes(); + // Pausing partitions as the buffer size now exceeds max buffer size + if (bufferSize > maxBufferSizeBytes.get()) { + log.info("Buffered records size {} bytes exceeds {}. Pausing the consumer", bufferSize, maxBufferSizeBytes.get()); + // Only non-empty partitions are paused here. Reason is that, if a task has multiple partitions with + // some of them empty, then in that case pausing even empty partitions would sacrifice ordered processing + // and even lead to temporal deadlock. More explanation can be found here: + // https://issues.apache.org/jira/browse/KAFKA-13152?focusedCommentId=17400647&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17400647 Review comment: nit: we do not need the `?focusedCommentId=17400647&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17400647` suffix in the javadoc :) ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java ########## @@ -354,10 +370,22 @@ int numBuffered(final TopicPartition partition) { return recordQueue.size(); } + Set<TopicPartition> getNonEmptyTopicPartitions() { + final Set<TopicPartition> nonEmptyTopicPartitions = new HashSet<>(); + for (final RecordQueue recordQueue : nonEmptyQueuesByTime) { + nonEmptyTopicPartitions.add(recordQueue.partition()); + } + return nonEmptyTopicPartitions; + } + int numBuffered() { return totalBuffered; } + long totalBytesBuffered() { Review comment: This function seems not addressed? Or was I missing anything? ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java ########## @@ -728,8 +733,11 @@ private void subscribeConsumer() { } } - public void resizeCache(final long size) { - cacheResizeSize.set(size); + public void resizeCacheOrBufferMemory(final long size, final boolean cacheResize) { Review comment: See the other comment: if we always size the two at the same time, we can just make one function call instead of two calls with a second boolean flag. ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/TaskMetrics.java ########## @@ -84,6 +84,9 @@ private TaskMetrics() {} private static final String NUM_BUFFERED_RECORDS_DESCRIPTION = "The count of buffered records that are polled " + "from consumer and not yet processed for this active task"; + private static final String TOTAL_BYTES = "total-bytes"; + private static final String TOTAL_BYTES_DESCRIPTION = "The total number of bytes accumulated by this task"; Review comment: This comment seems not addressed. ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java ########## @@ -738,7 +739,7 @@ public boolean process(final long wallClockTime) { // after processing this record, if its partition queue's buffered size has been // decreased to the threshold, we can then resume the consumption on this partition - if (recordInfo.queue().size() == maxBufferedSize) { + if (maxBufferedSize != -1 && recordInfo.queue().size() == maxBufferedSize) { Review comment: nit: could we add a TODO here that this logic should be removed once we remove the deprecated old config as well? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org