[ https://issues.apache.org/jira/browse/KAFKA-13295?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17429067#comment-17429067 ]
A. Sophie Blee-Goldman commented on KAFKA-13295: ------------------------------------------------ 1) The rebalance callbacks can take a bit to wrap your head around, hence the comment trying to explain more or less the flow of task management...but basically, at the end of each rebalance the following are invoked in this order: # StreamsRebalanceListener#onPartitionsRevoked (only if there are revoked partitions) # StreamsPartitionAssignor#onAssignment (always) # StreamsRebalanceListener#onPartitionsAssigned (always) Callback #1 invokes TaskManager#handleRevocation, which cleans up and commits any active tasks that were revoked. Then callback #2 invokes TaskManager#handleAssignment, which is what that comment is referring to. This is where you'll want to add the necessary logic 2) It's probably most instructive to look at the #handleRevocation implementation, which also covers the case of committing tasks that are not revoked (because if we have to commit one active task that's revoked, we have to commit all of them) and how to handle exceptions there. In this case, since it's EOS by definition, the only possible exceptions are TaskCorruptedException and a general RuntimeException. For the former case, you can just closeDirtyAndRevive, while for the latter you should keep track of the exception and then rethrow it at the end of #handleAssignment 3) Yes, but only if the commit succeeded (ie didn't throw either of the above) > Long restoration times for new tasks can lead to transaction timeouts > --------------------------------------------------------------------- > > Key: KAFKA-13295 > URL: https://issues.apache.org/jira/browse/KAFKA-13295 > Project: Kafka > Issue Type: Bug > Components: streams > Reporter: A. Sophie Blee-Goldman > Assignee: Sagar Rao > Priority: Critical > Labels: eos > Fix For: 3.1.0 > > > In some EOS applications with relatively long restoration times we've noticed > a series of ProducerFencedExceptions occurring during/immediately after > restoration. The broker logs were able to confirm these were due to > transactions timing out. > In Streams, it turns out we automatically begin a new txn when calling > {{send}} (if there isn’t already one in flight). A {{send}} occurs often > outside a commit during active processing (eg writing to the changelog), > leaving the txn open until the next commit. And if a StreamThread has been > actively processing when a rebalance results in a new stateful task without > revoking any existing tasks, the thread won’t actually commit this open txn > before it goes back into the restoration phase while it builds up state for > the new task. So the in-flight transaction is left open during restoration, > during which the StreamThread only consumes from the changelog without > committing, leaving it vulnerable to timing out when restoration times exceed > the configured transaction.timeout.ms for the producer client. -- This message was sent by Atlassian Jira (v8.3.4#803005)