This is an automated email from the ASF dual-hosted git repository. markusthoemmes pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git
The following commit(s) were added to refs/heads/master by this push: new b03c0b8 Handle kafka exception thrown when creating the admin client. (#4187) b03c0b8 is described below commit b03c0b8790a48ca21ba7bee781aa8bb595bc8025 Author: Martin Henke <martin.he...@web.de> AuthorDate: Wed Jan 9 15:10:44 2019 +0100 Handle kafka exception thrown when creating the admin client. (#4187) --- .../connector/kafka/KafkaMessagingProvider.scala | 51 ++++++++++++---------- 1 file changed, 29 insertions(+), 22 deletions(-) diff --git a/common/scala/src/main/scala/org/apache/openwhisk/connector/kafka/KafkaMessagingProvider.scala b/common/scala/src/main/scala/org/apache/openwhisk/connector/kafka/KafkaMessagingProvider.scala index 5cd719d..86d258e 100644 --- a/common/scala/src/main/scala/org/apache/openwhisk/connector/kafka/KafkaMessagingProvider.scala +++ b/common/scala/src/main/scala/org/apache/openwhisk/connector/kafka/KafkaMessagingProvider.scala @@ -60,30 +60,37 @@ object KafkaMessagingProvider extends MessagingProvider { } getOrElse Map.empty) val commonConfig = configMapToKafkaConfig(loadConfigOrThrow[Map[String, String]](ConfigKeys.kafkaCommon)) - val client = AdminClient.create(commonConfig + (AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG -> config.kafkaHosts)) - val partitions = 1 - val nt = new NewTopic(topic, partitions, kafkaConfig.replicationFactor).configs(topicConfig.asJava) - - def createTopic(retries: Int = 5): Try[Unit] = { - Try(client.createTopics(List(nt).asJava).values().get(topic).get()) - .map(_ => logging.info(this, s"created topic $topic")) - .recoverWith { - case CausedBy(_: TopicExistsException) => - Success(logging.info(this, s"topic $topic already existed")) - case CausedBy(t: RetriableException) if retries > 0 => - logging.warn(this, s"topic $topic could not be created because of $t, retries left: $retries") - Thread.sleep(1.second.toMillis) - createTopic(retries - 1) - case t => - logging.error(this, s"ensureTopic for $topic failed due to $t") - Failure(t) - } - } - val result = createTopic() + Try(AdminClient.create(commonConfig + (AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG -> config.kafkaHosts))) + .flatMap(client => { + val partitions = 1 + val nt = new NewTopic(topic, partitions, kafkaConfig.replicationFactor).configs(topicConfig.asJava) + + def createTopic(retries: Int = 5): Try[Unit] = { + Try(client.createTopics(List(nt).asJava).values().get(topic).get()) + .map(_ => logging.info(this, s"created topic $topic")) + .recoverWith { + case CausedBy(_: TopicExistsException) => + Success(logging.info(this, s"topic $topic already existed")) + case CausedBy(t: RetriableException) if retries > 0 => + logging.warn(this, s"topic $topic could not be created because of $t, retries left: $retries") + Thread.sleep(1.second.toMillis) + createTopic(retries - 1) + case t => + logging.error(this, s"ensureTopic for $topic failed due to $t") + Failure(t) + } + } - client.close() - result + val result = createTopic() + client.close() + result + }) + .recoverWith { + case e => + logging.error(this, s"ensureTopic for $topic failed due to $e") + Failure(e) + } } }