This is an automated email from the ASF dual-hosted git repository. cschneider pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal-kafka.git
commit 866da21ab4071d19aa790131149e90a62832b6e9 Author: Christian Schneider <cschn...@adobe.com> AuthorDate: Tue Jul 2 14:40:02 2019 +0200 Reinterrupt thread --- .../distribution/journal/kafka/KafkaMessageSender.java | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaMessageSender.java b/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaMessageSender.java index 1c96af9..870f444 100644 --- a/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaMessageSender.java +++ b/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaMessageSender.java @@ -19,7 +19,6 @@ package org.apache.sling.distribution.journal.kafka; import java.util.Arrays; -import java.util.concurrent.ExecutionException; import org.apache.sling.distribution.journal.messages.Types; import org.apache.sling.distribution.journal.ExceptionEventSender; @@ -66,12 +65,19 @@ public class KafkaMessageSender<T extends GeneratedMessage> implements MessageSe try { RecordMetadata metadata = producer.send(record).get(); LOG.info(format("Sent to %s", metadata)); - } catch (InterruptedException | ExecutionException e) { - eventSender.send(e); - throw new MessagingException(format("Failed to send message on topic %s", topic), e); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + handleException(topic, e); + } catch (Exception e) { + handleException(topic, e); } } + private void handleException(String topic, Exception e) { + eventSender.send(e); + throw new MessagingException(format("Failed to send message on topic %s", topic), e); + } + private Iterable<Header> toHeaders(int type, int version) { return Arrays.asList(toHeader("type", type), toHeader("version",version));