This is an automated email from the ASF dual-hosted git repository. cbickel pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git
The following commit(s) were added to refs/heads/master by this push: new 538517b Catch kafka producer exceptions and recreate the producer. (#4080) 538517b is described below commit 538517bbb5ba5aa1ffc7699864e9a2c53ee76561 Author: Vadim Raskin <raskinva...@gmail.com> AuthorDate: Fri Oct 26 09:51:14 2018 +0200 Catch kafka producer exceptions and recreate the producer. (#4080) * Catch producer exceptions and recreate the producer * Replace try, plain failure/success with trying, remove dup error logging --- .../whisk/connector/kafka/KafkaProducerConnector.scala | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/common/scala/src/main/scala/whisk/connector/kafka/KafkaProducerConnector.scala b/common/scala/src/main/scala/whisk/connector/kafka/KafkaProducerConnector.scala index bda2a11..7af5c18 100644 --- a/common/scala/src/main/scala/whisk/connector/kafka/KafkaProducerConnector.scala +++ b/common/scala/src/main/scala/whisk/connector/kafka/KafkaProducerConnector.scala @@ -55,12 +55,17 @@ class KafkaProducerConnector( Future { blocking { - producer.send(record, new Callback { - override def onCompletion(metadata: RecordMetadata, exception: Exception): Unit = { - if (exception == null) produced.success(metadata) - else produced.failure(exception) - } - }) + try { + producer.send(record, new Callback { + override def onCompletion(metadata: RecordMetadata, exception: Exception): Unit = { + if (exception == null) produced.trySuccess(metadata) + else produced.tryFailure(exception) + } + }) + } catch { + case e: Throwable => + produced.tryFailure(e) + } } }