This is an automated email from the ASF dual-hosted git repository.

cschneider pushed a commit to branch master
in repository 
https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal-kafka.git

commit 866da21ab4071d19aa790131149e90a62832b6e9
Author: Christian Schneider <cschn...@adobe.com>
AuthorDate: Tue Jul 2 14:40:02 2019 +0200

    Reinterrupt thread
---
 .../distribution/journal/kafka/KafkaMessageSender.java     | 14 ++++++++++----
 1 file changed, 10 insertions(+), 4 deletions(-)

diff --git 
a/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaMessageSender.java
 
b/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaMessageSender.java
index 1c96af9..870f444 100644
--- 
a/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaMessageSender.java
+++ 
b/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaMessageSender.java
@@ -19,7 +19,6 @@
 package org.apache.sling.distribution.journal.kafka;
 
 import java.util.Arrays;
-import java.util.concurrent.ExecutionException;
 
 import org.apache.sling.distribution.journal.messages.Types;
 import org.apache.sling.distribution.journal.ExceptionEventSender;
@@ -66,12 +65,19 @@ public class KafkaMessageSender<T extends GeneratedMessage> 
implements MessageSe
         try {
             RecordMetadata metadata = producer.send(record).get();
             LOG.info(format("Sent to %s", metadata));
-        } catch (InterruptedException | ExecutionException e) {
-            eventSender.send(e);
-            throw new MessagingException(format("Failed to send message on 
topic %s", topic), e);
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            handleException(topic, e);
+        } catch (Exception e) {
+            handleException(topic, e);
         }
     }
 
+    private void handleException(String topic, Exception e) {
+        eventSender.send(e);
+        throw new MessagingException(format("Failed to send message on topic 
%s", topic), e);
+    }
+
     private Iterable<Header> toHeaders(int type, int version) {
         return Arrays.asList(toHeader("type", type),
                 toHeader("version",version));

Reply via email to