This is an automated email from the ASF dual-hosted git repository.

cbickel pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new 538517b  Catch kafka producer exceptions and recreate the producer. 
(#4080)
538517b is described below

commit 538517bbb5ba5aa1ffc7699864e9a2c53ee76561
Author: Vadim Raskin <raskinva...@gmail.com>
AuthorDate: Fri Oct 26 09:51:14 2018 +0200

    Catch kafka producer exceptions and recreate the producer. (#4080)
    
    * Catch producer exceptions and recreate the producer
    
    * Replace try, plain failure/success with trying, remove dup error logging
---
 .../whisk/connector/kafka/KafkaProducerConnector.scala  | 17 +++++++++++------
 1 file changed, 11 insertions(+), 6 deletions(-)

diff --git 
a/common/scala/src/main/scala/whisk/connector/kafka/KafkaProducerConnector.scala
 
b/common/scala/src/main/scala/whisk/connector/kafka/KafkaProducerConnector.scala
index bda2a11..7af5c18 100644
--- 
a/common/scala/src/main/scala/whisk/connector/kafka/KafkaProducerConnector.scala
+++ 
b/common/scala/src/main/scala/whisk/connector/kafka/KafkaProducerConnector.scala
@@ -55,12 +55,17 @@ class KafkaProducerConnector(
 
     Future {
       blocking {
-        producer.send(record, new Callback {
-          override def onCompletion(metadata: RecordMetadata, exception: 
Exception): Unit = {
-            if (exception == null) produced.success(metadata)
-            else produced.failure(exception)
-          }
-        })
+        try {
+          producer.send(record, new Callback {
+            override def onCompletion(metadata: RecordMetadata, exception: 
Exception): Unit = {
+              if (exception == null) produced.trySuccess(metadata)
+              else produced.tryFailure(exception)
+            }
+          })
+        } catch {
+          case e: Throwable =>
+            produced.tryFailure(e)
+        }
       }
     }
 

Reply via email to