This is an automated email from the ASF dual-hosted git repository.

markusthoemmes pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new b03c0b8  Handle kafka exception thrown when creating the admin client. 
(#4187)
b03c0b8 is described below

commit b03c0b8790a48ca21ba7bee781aa8bb595bc8025
Author: Martin Henke <martin.he...@web.de>
AuthorDate: Wed Jan 9 15:10:44 2019 +0100

    Handle kafka exception thrown when creating the admin client. (#4187)
---
 .../connector/kafka/KafkaMessagingProvider.scala   | 51 ++++++++++++----------
 1 file changed, 29 insertions(+), 22 deletions(-)

diff --git 
a/common/scala/src/main/scala/org/apache/openwhisk/connector/kafka/KafkaMessagingProvider.scala
 
b/common/scala/src/main/scala/org/apache/openwhisk/connector/kafka/KafkaMessagingProvider.scala
index 5cd719d..86d258e 100644
--- 
a/common/scala/src/main/scala/org/apache/openwhisk/connector/kafka/KafkaMessagingProvider.scala
+++ 
b/common/scala/src/main/scala/org/apache/openwhisk/connector/kafka/KafkaMessagingProvider.scala
@@ -60,30 +60,37 @@ object KafkaMessagingProvider extends MessagingProvider {
       } getOrElse Map.empty)
 
     val commonConfig = configMapToKafkaConfig(loadConfigOrThrow[Map[String, 
String]](ConfigKeys.kafkaCommon))
-    val client = AdminClient.create(commonConfig + 
(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG -> config.kafkaHosts))
-    val partitions = 1
-    val nt = new NewTopic(topic, partitions, 
kafkaConfig.replicationFactor).configs(topicConfig.asJava)
-
-    def createTopic(retries: Int = 5): Try[Unit] = {
-      Try(client.createTopics(List(nt).asJava).values().get(topic).get())
-        .map(_ => logging.info(this, s"created topic $topic"))
-        .recoverWith {
-          case CausedBy(_: TopicExistsException) =>
-            Success(logging.info(this, s"topic $topic already existed"))
-          case CausedBy(t: RetriableException) if retries > 0 =>
-            logging.warn(this, s"topic $topic could not be created because of 
$t, retries left: $retries")
-            Thread.sleep(1.second.toMillis)
-            createTopic(retries - 1)
-          case t =>
-            logging.error(this, s"ensureTopic for $topic failed due to $t")
-            Failure(t)
-        }
-    }
 
-    val result = createTopic()
+    Try(AdminClient.create(commonConfig + 
(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG -> config.kafkaHosts)))
+      .flatMap(client => {
+        val partitions = 1
+        val nt = new NewTopic(topic, partitions, 
kafkaConfig.replicationFactor).configs(topicConfig.asJava)
+
+        def createTopic(retries: Int = 5): Try[Unit] = {
+          Try(client.createTopics(List(nt).asJava).values().get(topic).get())
+            .map(_ => logging.info(this, s"created topic $topic"))
+            .recoverWith {
+              case CausedBy(_: TopicExistsException) =>
+                Success(logging.info(this, s"topic $topic already existed"))
+              case CausedBy(t: RetriableException) if retries > 0 =>
+                logging.warn(this, s"topic $topic could not be created because 
of $t, retries left: $retries")
+                Thread.sleep(1.second.toMillis)
+                createTopic(retries - 1)
+              case t =>
+                logging.error(this, s"ensureTopic for $topic failed due to $t")
+                Failure(t)
+            }
+        }
 
-    client.close()
-    result
+        val result = createTopic()
+        client.close()
+        result
+      })
+      .recoverWith {
+        case e =>
+          logging.error(this, s"ensureTopic for $topic failed due to $e")
+          Failure(e)
+      }
   }
 }
 

Reply via email to