ableegoldman commented on a change in pull request #8900:
URL: https://github.com/apache/kafka/pull/8900#discussion_r443877830



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
##########
@@ -267,7 +283,17 @@ public void close() {
 
     private void checkForException() {
         if (sendException != null) {
-            throw sendException;
+            if (sendException.getCause() instanceof KafkaException
+                && sendException.getCause().getMessage().equals("Failing batch 
since transaction was aborted")) {

Review comment:
       If we revive a task, we don't recreate the record collector AFAICT. So 
there may still be a `sendException` hanging around even after we `close` the 
record collector. If this was a truly-fatal exception, we'll check and throw 
it. But we shouldn't rethrow this particular non-fatal exception. Therefore, we 
need to check for it and reset the `sendException` iff we find this exact 
exception




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to