This is an automated email from the ASF dual-hosted git repository. cbickel pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git
The following commit(s) were added to refs/heads/master by this push: new 3c816aa Retry `ensureTopic` on transient, retriable exceptions. (#3753) 3c816aa is described below commit 3c816aa6e326e643da5ebb8d5ad504578a597b9b Author: Markus Thömmes <markusthoem...@me.com> AuthorDate: Fri Jun 15 17:08:35 2018 +0200 Retry `ensureTopic` on transient, retriable exceptions. (#3753) Like writing and reading from Kafka, creating topics can be subject to a battery of transient errors. Retrying those errors is safe and keeps us sane. --- .../connector/kafka/KafkaMessagingProvider.scala | 30 ++++++++++++++-------- 1 file changed, 19 insertions(+), 11 deletions(-) diff --git a/common/scala/src/main/scala/whisk/connector/kafka/KafkaMessagingProvider.scala b/common/scala/src/main/scala/whisk/connector/kafka/KafkaMessagingProvider.scala index 7351b64..6843cbc 100644 --- a/common/scala/src/main/scala/whisk/connector/kafka/KafkaMessagingProvider.scala +++ b/common/scala/src/main/scala/whisk/connector/kafka/KafkaMessagingProvider.scala @@ -21,14 +21,14 @@ import java.util.Properties import akka.actor.ActorSystem import org.apache.kafka.clients.admin.{AdminClient, AdminClientConfig, NewTopic} -import org.apache.kafka.common.errors.TopicExistsException +import org.apache.kafka.common.errors.{RetriableException, TopicExistsException} import pureconfig._ import whisk.common.{CausedBy, Logging} import whisk.core.{ConfigKeys, WhiskConfig} import whisk.core.connector.{MessageConsumer, MessageProducer, MessagingProvider} import scala.collection.JavaConverters._ -import scala.concurrent.duration.FiniteDuration +import scala.concurrent.duration._ import scala.util.{Failure, Success, Try} case class KafkaConfig(replicationFactor: Short) @@ -57,15 +57,23 @@ object KafkaMessagingProvider extends MessagingProvider { val partitions = 1 val nt = new NewTopic(topic, partitions, kafkaConfig.replicationFactor).configs(topicConfig.asJava) - val result = Try(client.createTopics(List(nt).asJava).values().get(topic).get()) - .map(_ => logging.info(this, s"created topic $topic")) - .recoverWith { - case CausedBy(_: TopicExistsException) => - Success(logging.info(this, s"topic $topic already existed")) - case t => - logging.error(this, s"ensureTopic for $topic failed due to $t") - Failure(t) - } + def createTopic(retries: Int = 5): Try[Unit] = { + Try(client.createTopics(List(nt).asJava).values().get(topic).get()) + .map(_ => logging.info(this, s"created topic $topic")) + .recoverWith { + case CausedBy(_: TopicExistsException) => + Success(logging.info(this, s"topic $topic already existed")) + case CausedBy(t: RetriableException) if retries > 0 => + logging.warn(this, s"topic $topic could not be created because of $t, retries left: $retries") + Thread.sleep(1.second.toMillis) + createTopic(retries - 1) + case t => + logging.error(this, s"ensureTopic for $topic failed due to $t") + Failure(t) + } + } + + val result = createTopic() client.close() result -- To stop receiving notification emails like this one, please contact cbic...@apache.org.