ableegoldman commented on a change in pull request #8900: URL: https://github.com/apache/kafka/pull/8900#discussion_r443877830
########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java ########## @@ -267,7 +283,17 @@ public void close() { private void checkForException() { if (sendException != null) { - throw sendException; + if (sendException.getCause() instanceof KafkaException + && sendException.getCause().getMessage().equals("Failing batch since transaction was aborted")) { Review comment: If we revive a task, we don't recreate the record collector AFAICT. So there may still be a `sendException` hanging around even after we `close` the record collector. If this was a truly-fatal exception, we'll check and throw it. But we shouldn't rethrow this particular non-fatal exception. Therefore, we need to check for it and reset the `sendException` iff we find this exact exception ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org