coltmcnealy-lh commented on code in PR #20833:
URL: https://github.com/apache/kafka/pull/20833#discussion_r2525646040
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java:
##########
@@ -1458,6 +1458,14 @@ protected boolean maybeUpdateSubscriptionVersion(final
int receivedAssignmentMet
@Override
public void onAssignment(final Assignment assignment, final
ConsumerGroupMetadata metadata) {
+ final Set<Task> tasksWithOpenTransactions = taskManager.allOwnedTasks()
+ .values()
+ .stream()
+ .filter(t -> t.commitNeeded())
Review Comment:
@mjsax and @lianetm thanks for the comments here. I did test this in our
soak test, and the `commit()` did not fail. In fact it did help—deploying the
fix allowed the soak to go from an unrecoverable state to finish restoration
and be healthy again.
My reading of `StreamThread.java` led me to the same conclusion that Lianet
came to, which is that the exception would bubble up to the call to `poll()`.
The reasoning for this is that `pollPhase()` is inside
`runOnceWithProcessingThreads()` which is inside all of the exception handling
in `runLoop()`.
I see what Matthias is saying: failing to do further exception handling will
cause the `StreamThread` to crash. But if a commit fails, we have to close all
tasks dirty anyways, which is extremely disruptive in EOS already; does losing
and recreating a thread (couple hundred milliseconds perhaps) matter compared
to wiping 100's of GB's of RocksDB state?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]