ableegoldman commented on a change in pull request #9700:
URL: https://github.com/apache/kafka/pull/9700#discussion_r537917953



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
##########
@@ -199,7 +200,9 @@ private void recordSendError(final String topic, final 
Exception exception, fina
         if (isFatalException(exception)) {
             errorMessage += "\nWritten offsets would not be recorded and no 
more records would be sent since this is a fatal error.";
             sendException.set(new StreamsException(errorMessage, exception));
-        } else if (exception instanceof ProducerFencedException || exception 
instanceof OutOfOrderSequenceException) {
+        } else if (exception instanceof ProducerFencedException ||
+                exception instanceof InvalidProducerEpochException ||
+                exception instanceof OutOfOrderSequenceException) {

Review comment:
       Why is that here, we catch these three exceptions -- ProducerFenced, 
InvalidProducerEpoch, and OutOfOrderSequence -- and wrap them as 
TaskMigratedException, while in `StreamsProducer#send`, we catch  
ProducerFenced, InvalidProducerEpoch, and UnknownProducerId exceptions and wrap 
those as TaskMigrated?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
##########
@@ -199,7 +200,9 @@ private void recordSendError(final String topic, final 
Exception exception, fina
         if (isFatalException(exception)) {
             errorMessage += "\nWritten offsets would not be recorded and no 
more records would be sent since this is a fatal error.";
             sendException.set(new StreamsException(errorMessage, exception));
-        } else if (exception instanceof ProducerFencedException || exception 
instanceof OutOfOrderSequenceException) {
+        } else if (exception instanceof ProducerFencedException ||
+                exception instanceof InvalidProducerEpochException ||
+                exception instanceof OutOfOrderSequenceException) {

Review comment:
       Just to clarify, I think my question/confusion is twofold:
   
   1) Why is it OutOfOrderSequence in one place and UnknownProducerId in 
another?
   2) What is the difference between these two code paths? Is it really 
possible for example for the ProducerFencedException to sometimes be thrown 
directly from `Producer#send`, and sometimes be passed along through the 
callback ( as in `streamsProducer.send(serializedRecord, (metadata, exception)` 
)

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsProducer.java
##########
@@ -184,12 +184,12 @@ public void resetProducer() {
         transactionInitialized = false;
     }
 
-    private void maybeBeginTransaction() throws ProducerFencedException {
+    private void maybeBeginTransaction() {
         if (eosEnabled() && !transactionInFlight) {
             try {
                 producer.beginTransaction();
                 transactionInFlight = true;
-            } catch (final ProducerFencedException error) {
+            } catch (final ProducerFencedException | 
InvalidProducerEpochException error) {

Review comment:
       ```suggestion
               } catch (final ProducerFencedException || 
InvalidProducerEpochException error) {
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to