ableegoldman commented on a change in pull request #10407: URL: https://github.com/apache/kafka/pull/10407#discussion_r602947595
########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java ########## @@ -972,42 +1006,53 @@ void addRecordsToTasks(final ConsumerRecords<byte[], byte[]> records) { /** * @throws TaskMigratedException if committing offsets failed (non-EOS) * or if the task producer got fenced (EOS) + * @throws TimeoutException if task.timeout.ms has been exceeded (non-EOS) + * @throws TaskCorruptedException if committing offsets failed due to TimeoutException (EOS) * @return number of committed offsets, or -1 if we are in the middle of a rebalance and cannot commit */ int commit(final Collection<Task> tasksToCommit) { + int committed = 0; if (rebalanceInProgress) { - return -1; + committed = -1; } else { - int committed = 0; final Map<Task, Map<TopicPartition, OffsetAndMetadata>> consumedOffsetsAndMetadataPerTask = new HashMap<>(); - for (final Task task : tasksToCommit) { - if (task.commitNeeded()) { - final Map<TopicPartition, OffsetAndMetadata> offsetAndMetadata = task.prepareCommit(); - if (task.isActive()) { - consumedOffsetsAndMetadataPerTask.put(task, offsetAndMetadata); - } - } - } - try { - commitOffsetsOrTransaction(consumedOffsetsAndMetadataPerTask); - - for (final Task task : tasksToCommit) { - if (task.commitNeeded()) { - task.clearTaskTimeout(); - ++committed; - task.postCommit(false); - } - } + committed = commitAndFillInConsumedOffsetsAndMetadataPerTaskMap(tasksToCommit, consumedOffsetsAndMetadataPerTask); } catch (final TimeoutException timeoutException) { consumedOffsetsAndMetadataPerTask .keySet() .forEach(t -> t.maybeInitTaskTimeoutOrThrow(time.milliseconds(), timeoutException)); } + } + return committed; + } + /** + * @param consumedOffsetsAndMetadataPerTask an empty map that will be filled in with the prepared offsets + */ + private int commitAndFillInConsumedOffsetsAndMetadataPerTaskMap(final Collection<Task> tasksToCommit, Review comment: Just pulled all the actual contents, excluding the TimeoutException + `maybeInitTaskTimeoutOrThrow` handling, so we could use it in `handleCorruption` without that stuff -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org