[ 
https://issues.apache.org/jira/browse/KAFKA-7284?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16581707#comment-16581707
 ] 

ASF GitHub Bot commented on KAFKA-7284:
---------------------------------------

guozhangwang closed pull request #5513: KAFKA-7284: streams should unwrap 
fenced exception
URL: https://github.com/apache/kafka/pull/5513
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java 
b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
index d2a84c66abe..a72714bb3df 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
@@ -21,6 +21,7 @@
 import org.apache.kafka.clients.producer.internals.FutureRecordMetadata;
 import org.apache.kafka.clients.producer.internals.ProduceRequestResult;
 import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.Metric;
 import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.PartitionInfo;
@@ -205,7 +206,7 @@ public void abortTransaction() throws 
ProducerFencedException {
         this.transactionInFlight = false;
     }
 
-    private void verifyProducerState() {
+    private synchronized void verifyProducerState() {
         if (this.closed) {
             throw new IllegalStateException("MockProducer is already closed.");
         }
@@ -243,7 +244,12 @@ private void verifyNoTransactionInFlight() {
      */
     @Override
     public synchronized Future<RecordMetadata> send(ProducerRecord<K, V> 
record, Callback callback) {
-        verifyProducerState();
+        if (this.closed) {
+            throw new IllegalStateException("MockProducer is already closed.");
+        }
+        if (this.producerFenced) {
+            throw new KafkaException("MockProducer is fenced.", new 
ProducerFencedException("Fenced"));
+        }
         int partition = 0;
         if (!this.cluster.partitionsForTopic(record.topic()).isEmpty())
             partition = partition(record, this.cluster);
@@ -313,7 +319,7 @@ public boolean closed() {
         return this.closed;
     }
 
-    public void fenceProducer() {
+    public synchronized void fenceProducer() {
         verifyProducerState();
         verifyTransactionsInitialized();
         this.producerFenced = true;
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java 
b/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java
index 27fac280afc..ee4803f635b 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java
@@ -19,6 +19,7 @@
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.clients.producer.internals.DefaultPartitioner;
 import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
@@ -267,18 +268,9 @@ public void shouldThrowOnSendIfProducerGotFenced() {
         try {
             producer.send(null);
             fail("Should have thrown as producer is fenced off");
-        } catch (ProducerFencedException e) { }
-    }
-
-    @Test
-    public void shouldThrowOnFlushIfProducerGotFenced() {
-        buildMockProducer(true);
-        producer.initTransactions();
-        producer.fenceProducer();
-        try {
-            producer.flush();
-            fail("Should have thrown as producer is fenced off");
-        } catch (ProducerFencedException e) { }
+        } catch (KafkaException e) {
+            assertTrue("The root cause of the exception should be 
ProducerFenced", e.getCause() instanceof ProducerFencedException);
+        }
     }
 
     @Test
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
index afdadf2dec3..44b2089f6ab 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
@@ -23,14 +23,14 @@
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.errors.AuthorizationException;
 import org.apache.kafka.common.errors.AuthenticationException;
+import org.apache.kafka.common.errors.AuthorizationException;
 import org.apache.kafka.common.errors.InvalidTopicException;
 import org.apache.kafka.common.errors.OffsetMetadataTooLarge;
 import org.apache.kafka.common.errors.ProducerFencedException;
 import org.apache.kafka.common.errors.RetriableException;
-import org.apache.kafka.common.errors.SerializationException;
 import org.apache.kafka.common.errors.SecurityDisabledException;
+import org.apache.kafka.common.errors.SerializationException;
 import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.errors.UnknownServerException;
 import org.apache.kafka.common.serialization.Serializer;
@@ -193,16 +193,26 @@ public void onCompletion(final RecordMetadata metadata,
                 "You can increase producer parameter `max.block.ms` to 
increase this timeout.", topic);
             throw new StreamsException(String.format("%sFailed to send record 
to topic %s due to timeout.", logPrefix, topic));
         } catch (final Exception uncaughtException) {
-            throw new StreamsException(
-                String.format(EXCEPTION_MESSAGE,
-                              logPrefix,
-                              "an error caught",
-                              key,
-                              value,
-                              timestamp,
-                              topic,
-                              uncaughtException.getMessage()),
-                uncaughtException);
+            if (uncaughtException instanceof KafkaException &&
+                uncaughtException.getCause() instanceof 
ProducerFencedException) {
+                final KafkaException kafkaException = (KafkaException) 
uncaughtException;
+                // producer.send() call may throw a KafkaException which wraps 
a FencedException,
+                // in this case we should throw its wrapped inner cause so 
that it can be captured and re-wrapped as TaskMigrationException
+                throw (ProducerFencedException) kafkaException.getCause();
+            } else {
+                throw new StreamsException(
+                    String.format(
+                        EXCEPTION_MESSAGE,
+                        logPrefix,
+                        "an error caught",
+                        key,
+                        value,
+                        timestamp,
+                        topic,
+                        uncaughtException.getMessage()
+                    ),
+                    uncaughtException);
+            }
         }
     }
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
index 39fe7ab20c3..68c779b50db 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
@@ -26,6 +26,7 @@
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.ProducerFencedException;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.common.utils.LogContext;
@@ -299,4 +300,27 @@ public void 
shouldThrowIfTopicIsUnknownWithContinueExceptionHandler() {
             new AlwaysContinueProductionExceptionHandler());
         collector.send("topic1", "3", "0", null, stringSerializer, 
stringSerializer, streamPartitioner);
     }
+
+    @Test
+    public void shouldUnwrapAndThrowProducerFencedExceptionFromCallToSend() {
+        final MockProducer<byte[], byte[]> producer =
+            new MockProducer<>(cluster, true, new DefaultPartitioner(), 
byteArraySerializer, byteArraySerializer);
+
+        final RecordCollector collector = new RecordCollectorImpl(
+            producer,
+            "test",
+            logContext,
+            new DefaultProductionExceptionHandler()
+        );
+
+        producer.initTransactions();
+        producer.fenceProducer();
+
+        try {
+            collector.send("topic1", "3", "0", null, stringSerializer, 
stringSerializer, streamPartitioner);
+            fail("expected a ProducerFencedException");
+        } catch (ProducerFencedException pfe) {
+            // expected
+        }
+    }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Producer getting fenced may cause Streams to shut down
> ------------------------------------------------------
>
>                 Key: KAFKA-7284
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7284
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 0.11.0.3, 1.0.2, 1.1.1, 2.0.0
>            Reporter: John Roesler
>            Assignee: John Roesler
>            Priority: Critical
>             Fix For: 2.0.1, 2.1.0
>
>
> As part of the investigation, I will determine what other versions are 
> affected.
>  
> In StreamTask, we catch a `ProducerFencedException` and throw a 
> `TaskMigratedException`. However, in this case, the `RecordCollectorImpl` is 
> throwing a `StreamsException`, caused by `KafkaException` caused by 
> `ProducerFencedException`.
> In response to a TaskMigratedException, we would rebalance, but when we get a 
> StreamsException, streams shuts itself down.
> In other words, we intended to do a rebalance in response to a producer 
> fence, but actually, we shut down (when the fence happens inside the record 
> collector).
> Coincidentally, Guozhang noticed and fixed this in a recent PR: 
> [https://github.com/apache/kafka/pull/5428/files#diff-4e5612eeba09dabf30d0b8430f269ff6]
>  
> The scope of this ticket is to extract that fix and associated tests, and 
> send a separate PR to trunk and 2.0, and also to determine what other 
> versions, if any, are affected.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to