This is an automated email from the ASF dual-hosted git repository. markusthoemmes pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git
The following commit(s) were added to refs/heads/master by this push: new a91feb0 Enhance Kafka message provider for resiliency. (#3072) a91feb0 is described below commit a91feb013dde79eca2e892f70840157f9b18ccb4 Author: Sang Heon Lee <developis...@gmail.com> AuthorDate: Mon Feb 26 18:55:19 2018 +0900 Enhance Kafka message provider for resiliency. (#3072) Co-authored-by: Markus Thömmes <markusthoem...@me.com> --- common/scala/src/main/resources/application.conf | 2 + .../connector/kafka/KafkaConsumerConnector.scala | 90 +++++++++++++++++----- .../connector/kafka/KafkaMessagingProvider.scala | 12 ++- .../connector/kafka/KafkaProducerConnector.scala | 79 +++++++++++-------- .../whisk/core/connector/MessageConsumer.scala | 4 +- .../whisk/core/connector/MessagingProvider.scala | 16 ++-- .../core/database/RemoteCacheInvalidation.scala | 5 +- .../core/loadBalancer/ContainerPoolBalancer.scala | 2 +- .../ShardingContainerPoolBalancer.scala | 2 +- .../main/scala/whisk/core/invoker/Invoker.scala | 2 +- tests/src/test/scala/ha/ShootComponentsTests.scala | 70 +++++++++-------- .../test/scala/services/KafkaConnectorTests.scala | 44 ++++++++--- .../core/connector/test/MessageFeedTests.scala | 2 +- .../whisk/core/connector/test/TestConnector.scala | 6 +- 14 files changed, 219 insertions(+), 117 deletions(-) diff --git a/common/scala/src/main/resources/application.conf b/common/scala/src/main/resources/application.conf index 3e09bba..1b9b739 100644 --- a/common/scala/src/main/resources/application.conf +++ b/common/scala/src/main/resources/application.conf @@ -60,6 +60,8 @@ whisk { producer { acks = 1 max-request-size = ${whisk.activation.payload.max} + request-timeout-ms = 30000 + metadata-max-age-ms = 15000 } consumer { session-timeout-ms = 30000 diff --git a/common/scala/src/main/scala/whisk/connector/kafka/KafkaConsumerConnector.scala b/common/scala/src/main/scala/whisk/connector/kafka/KafkaConsumerConnector.scala index 09cd3ac..51a27ad 100644 --- a/common/scala/src/main/scala/whisk/connector/kafka/KafkaConsumerConnector.scala +++ b/common/scala/src/main/scala/whisk/connector/kafka/KafkaConsumerConnector.scala @@ -19,43 +19,86 @@ package whisk.connector.kafka import java.util.Properties -import scala.collection.JavaConversions.iterableAsScalaIterable -import scala.collection.JavaConversions.seqAsJavaList -import scala.concurrent.duration.Duration -import scala.concurrent.duration.DurationInt -import org.apache.kafka.clients.consumer.ConsumerConfig -import org.apache.kafka.clients.consumer.KafkaConsumer +import akka.actor.ActorSystem +import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer} +import org.apache.kafka.common.errors.{RetriableException, WakeupException} import org.apache.kafka.common.serialization.ByteArrayDeserializer import pureconfig.loadConfigOrThrow import whisk.common.Logging import whisk.core.ConfigKeys import whisk.core.connector.MessageConsumer -class KafkaConsumerConnector(kafkahost: String, - groupid: String, - topic: String, - override val maxPeek: Int = Int.MaxValue)(implicit logging: Logging) +import scala.collection.JavaConversions.{iterableAsScalaIterable, seqAsJavaList} +import scala.concurrent.duration._ +import scala.concurrent.{ExecutionContext, Future} + +case class KafkaConsumerConfig(sessionTimeoutMs: Long) + +class KafkaConsumerConnector( + kafkahost: String, + groupid: String, + topic: String, + override val maxPeek: Int = Int.MaxValue)(implicit logging: Logging, actorSystem: ActorSystem) extends MessageConsumer { + implicit val ec: ExecutionContext = actorSystem.dispatcher + private val gracefulWaitTime = 100.milliseconds + + // The consumer is generally configured via getProps. This configuration only loads values necessary for "outer" + // logic, like the wakeup timer. + private val cfg = loadConfigOrThrow[KafkaConsumerConfig](ConfigKeys.kafkaConsumer) + /** * Long poll for messages. Method returns once message are available but no later than given * duration. * * @param duration the maximum duration for the long poll */ - override def peek(duration: Duration = 500.milliseconds) = { - val records = consumer.poll(duration.toMillis) - records map { r => - (r.topic, r.partition, r.offset, r.value) - } + override def peek(duration: FiniteDuration = 500.milliseconds, + retry: Int = 3): Iterable[(String, Int, Long, Array[Byte])] = { + + // poll can be infinitely blocked in edge-cases, so we need to wakeup explicitly. + val wakeUpTask = actorSystem.scheduler.scheduleOnce(cfg.sessionTimeoutMs.milliseconds + 1.second)(consumer.wakeup()) + + try { + consumer.poll(duration.toMillis).map(r => (r.topic, r.partition, r.offset, r.value)) + } catch { + // Happens if the peek hangs. + case _: WakeupException if retry > 0 => + logging.error(this, s"poll timeout occurred. Retrying $retry more times.") + Thread.sleep(gracefulWaitTime.toMillis) // using Thread.sleep is okay, since `poll` is blocking anyway + peek(duration, retry - 1) + case e: RetriableException if retry > 0 => + logging.error(this, s"$e: Retrying $retry more times") + wakeUpTask.cancel() + Thread.sleep(gracefulWaitTime.toMillis) // using Thread.sleep is okay, since `poll` is blocking anyway + peek(duration, retry - 1) + // Every other error results in a restart of the consumer + case e: Throwable => + recreateConsumer() + throw e + } finally wakeUpTask.cancel() } /** * Commits offsets from last poll. */ - def commit() = consumer.commitSync() + def commit(retry: Int = 3): Unit = + try { + consumer.commitSync() + } catch { + case e: RetriableException => + if (retry > 0) { + logging.error(this, s"$e: retrying $retry more times") + Thread.sleep(gracefulWaitTime.toMillis) // using Thread.sleep is okay, since `commitSync` is blocking anyway + commit(retry - 1) + } else { + throw e + } + } - override def close() = { + override def close(): Unit = { + consumer.close() logging.info(this, s"closing '$topic' consumer") } @@ -79,9 +122,18 @@ class KafkaConsumerConnector(kafkahost: String, val keyDeserializer = new ByteArrayDeserializer val valueDeserializer = new ByteArrayDeserializer val consumer = new KafkaConsumer(props, keyDeserializer, valueDeserializer) - topics foreach { consumer.subscribe(_) } + topics.foreach(consumer.subscribe(_)) consumer } - private val consumer = getConsumer(getProps, Some(List(topic))) + private def recreateConsumer(): Unit = { + val oldConsumer = consumer + Future { + oldConsumer.close() + logging.info(this, s"old consumer closed") + } + consumer = getConsumer(getProps, Some(List(topic))) + } + + @volatile private var consumer = getConsumer(getProps, Some(List(topic))) } diff --git a/common/scala/src/main/scala/whisk/connector/kafka/KafkaMessagingProvider.scala b/common/scala/src/main/scala/whisk/connector/kafka/KafkaMessagingProvider.scala index 3c8df85..6b0fc14 100644 --- a/common/scala/src/main/scala/whisk/connector/kafka/KafkaMessagingProvider.scala +++ b/common/scala/src/main/scala/whisk/connector/kafka/KafkaMessagingProvider.scala @@ -20,8 +20,10 @@ package whisk.connector.kafka import java.util.Properties import java.util.concurrent.ExecutionException -import scala.concurrent.ExecutionContext import scala.concurrent.duration.FiniteDuration +import akka.actor.ActorSystem + +import scala.concurrent.duration._ import scala.collection.JavaConverters._ import org.apache.kafka.clients.admin.AdminClientConfig import org.apache.kafka.clients.admin.AdminClient @@ -43,11 +45,12 @@ case class KafkaConfig(replicationFactor: Short) object KafkaMessagingProvider extends MessagingProvider { def getConsumer(config: WhiskConfig, groupId: String, topic: String, maxPeek: Int, maxPollInterval: FiniteDuration)( - implicit logging: Logging): MessageConsumer = + implicit logging: Logging, + actorSystem: ActorSystem): MessageConsumer = new KafkaConsumerConnector(config.kafkaHosts, groupId, topic, maxPeek) - def getProducer(config: WhiskConfig, ec: ExecutionContext)(implicit logging: Logging): MessageProducer = - new KafkaProducerConnector(config.kafkaHosts, ec) + def getProducer(config: WhiskConfig)(implicit logging: Logging, actorSystem: ActorSystem): MessageProducer = + new KafkaProducerConnector(config.kafkaHosts) def ensureTopic(config: WhiskConfig, topic: String, topicConfig: String)(implicit logging: Logging): Boolean = { val kc = loadConfigOrThrow[KafkaConfig](ConfigKeys.kafka) @@ -55,6 +58,7 @@ object KafkaMessagingProvider extends MessagingProvider { loadConfigOrThrow[Map[String, String]](ConfigKeys.kafkaTopics + s".$topicConfig")) val props = new Properties props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, config.kafkaHosts) + val commonConfig = KafkaConfiguration.configMapToKafkaConfig(loadConfigOrThrow[Map[String, String]](ConfigKeys.kafkaCommon)) commonConfig.foreach { diff --git a/common/scala/src/main/scala/whisk/connector/kafka/KafkaProducerConnector.scala b/common/scala/src/main/scala/whisk/connector/kafka/KafkaProducerConnector.scala index dca7cce..0c511f3 100644 --- a/common/scala/src/main/scala/whisk/connector/kafka/KafkaProducerConnector.scala +++ b/common/scala/src/main/scala/whisk/connector/kafka/KafkaProducerConnector.scala @@ -19,36 +19,38 @@ package whisk.connector.kafka import java.util.Properties -import scala.concurrent.ExecutionContext -import scala.concurrent.Future -import scala.concurrent.Promise -import scala.util.Failure -import scala.util.Success -import org.apache.kafka.clients.producer.Callback -import org.apache.kafka.clients.producer.KafkaProducer -import org.apache.kafka.clients.producer.ProducerConfig -import org.apache.kafka.clients.producer.ProducerRecord -import org.apache.kafka.clients.producer.RecordMetadata -import org.apache.kafka.common.errors.NotLeaderForPartitionException +import akka.actor.ActorSystem +import akka.pattern.after +import org.apache.kafka.clients.producer._ +import org.apache.kafka.common.errors.{ + NotEnoughReplicasAfterAppendException, + RecordTooLargeException, + RetriableException, + TimeoutException +} import org.apache.kafka.common.serialization.StringSerializer -import whisk.common.Counter -import whisk.common.Logging -import whisk.core.connector.Message -import whisk.core.connector.MessageProducer -import whisk.core.entity.UUIDs import pureconfig._ +import whisk.common.{Counter, Logging, TransactionId} import whisk.core.ConfigKeys +import whisk.core.connector.{Message, MessageProducer} +import whisk.core.entity.UUIDs + +import scala.concurrent.duration._ +import scala.concurrent.{ExecutionContext, Future, Promise} +import scala.util.{Failure, Success} -class KafkaProducerConnector(kafkahosts: String, - implicit val executionContext: ExecutionContext, - id: String = UUIDs.randomUUID().toString)(implicit logging: Logging) +class KafkaProducerConnector(kafkahosts: String, id: String = UUIDs.randomUUID().toString)(implicit logging: Logging, + actorSystem: ActorSystem) extends MessageProducer { - override def sentCount() = sentCounter.cur + implicit val ec: ExecutionContext = actorSystem.dispatcher + private val gracefulWaitTime = 100.milliseconds + + override def sentCount(): Long = sentCounter.cur /** Sends msg to topic. This is an asynchronous operation. */ - override def send(topic: String, msg: Message, retry: Int = 2): Future[RecordMetadata] = { - implicit val transid = msg.transid + override def send(topic: String, msg: Message, retry: Int = 3): Future[RecordMetadata] = { + implicit val transid: TransactionId = msg.transid val record = new ProducerRecord[String, String](topic, "messages", msg.serialize) val produced = Promise[RecordMetadata]() @@ -66,17 +68,25 @@ class KafkaProducerConnector(kafkahosts: String, case Failure(t) => logging.error(this, s"sending message on topic '$topic' failed: ${t.getMessage}") } recoverWith { - case t: NotLeaderForPartitionException => - if (retry > 0) { - logging.error(this, s"NotLeaderForPartitionException is retryable, remain $retry retry") - Thread.sleep(100) - send(topic, msg, retry - 1) - } else produced.future + // Do not retry on these exceptions as they may cause duplicate messages on Kafka. + case _: NotEnoughReplicasAfterAppendException | _: TimeoutException => + recreateProducer() + produced.future + case r: RetriableException if retry > 0 => + logging.info(this, s"$r: Retrying $retry more times") + after(gracefulWaitTime, actorSystem.scheduler)(send(topic, msg, retry - 1)) + // Ignore this exception as restarting the producer doesn't make sense + case e: RecordTooLargeException => + Future.failed(e) + // All unknown errors just result in a recreation of the producer. The failure is propagated. + case _: Throwable => + recreateProducer() + produced.future } } /** Closes producer. */ - override def close() = { + override def close(): Unit = { logging.info(this, "closing producer") producer.close() } @@ -104,5 +114,14 @@ class KafkaProducerConnector(kafkahosts: String, new KafkaProducer(props, keySerializer, valueSerializer) } - private val producer = getProducer(getProps) + private def recreateProducer(): Unit = { + val oldProducer = producer + Future { + oldProducer.close() + logging.info(this, s"old consumer closed") + } + producer = getProducer(getProps) + } + + @volatile private var producer = getProducer(getProps) } diff --git a/common/scala/src/main/scala/whisk/core/connector/MessageConsumer.scala b/common/scala/src/main/scala/whisk/core/connector/MessageConsumer.scala index 2dd8ba3..14af69e 100644 --- a/common/scala/src/main/scala/whisk/core/connector/MessageConsumer.scala +++ b/common/scala/src/main/scala/whisk/core/connector/MessageConsumer.scala @@ -42,13 +42,13 @@ trait MessageConsumer { * @param duration for the long poll * @return iterable collection (topic, partition, offset, bytes) */ - def peek(duration: Duration): Iterable[(String, Int, Long, Array[Byte])] + def peek(duration: FiniteDuration, retry: Int = 3): Iterable[(String, Int, Long, Array[Byte])] /** * Commits offsets from last peek operation to ensure they are removed * from the connector. */ - def commit(): Unit + def commit(retry: Int = 3): Unit /** Closes consumer. */ def close(): Unit diff --git a/common/scala/src/main/scala/whisk/core/connector/MessagingProvider.scala b/common/scala/src/main/scala/whisk/core/connector/MessagingProvider.scala index 1737624..8ec1f5a 100644 --- a/common/scala/src/main/scala/whisk/core/connector/MessagingProvider.scala +++ b/common/scala/src/main/scala/whisk/core/connector/MessagingProvider.scala @@ -17,7 +17,8 @@ package whisk.core.connector -import scala.concurrent.ExecutionContext +import akka.actor.ActorSystem + import scala.concurrent.duration.DurationInt import scala.concurrent.duration.FiniteDuration import whisk.common.Logging @@ -28,11 +29,12 @@ import whisk.spi.Spi * An Spi for providing Messaging implementations. */ trait MessagingProvider extends Spi { - def getConsumer(config: WhiskConfig, - groupId: String, - topic: String, - maxPeek: Int = Int.MaxValue, - maxPollInterval: FiniteDuration = 5.minutes)(implicit logging: Logging): MessageConsumer - def getProducer(config: WhiskConfig, ec: ExecutionContext)(implicit logging: Logging): MessageProducer + def getConsumer( + config: WhiskConfig, + groupId: String, + topic: String, + maxPeek: Int = Int.MaxValue, + maxPollInterval: FiniteDuration = 5.minutes)(implicit logging: Logging, actorSystem: ActorSystem): MessageConsumer + def getProducer(config: WhiskConfig)(implicit logging: Logging, actorSystem: ActorSystem): MessageProducer def ensureTopic(config: WhiskConfig, topic: String, topicConfig: String)(implicit logging: Logging): Boolean } diff --git a/common/scala/src/main/scala/whisk/core/database/RemoteCacheInvalidation.scala b/common/scala/src/main/scala/whisk/core/database/RemoteCacheInvalidation.scala index b615708..c632c83 100644 --- a/common/scala/src/main/scala/whisk/core/database/RemoteCacheInvalidation.scala +++ b/common/scala/src/main/scala/whisk/core/database/RemoteCacheInvalidation.scala @@ -60,8 +60,9 @@ class RemoteCacheInvalidation(config: WhiskConfig, component: String, instance: private val instanceId = s"$component${instance.toInt}" private val msgProvider = SpiLoader.get[MessagingProvider] - private val cacheInvalidationConsumer = msgProvider.getConsumer(config, s"$topic$instanceId", topic, maxPeek = 128) - private val cacheInvalidationProducer = msgProvider.getProducer(config, ec) + private val cacheInvalidationConsumer = + msgProvider.getConsumer(config, s"$topic$instanceId", topic, maxPeek = 128) + private val cacheInvalidationProducer = msgProvider.getProducer(config) def notifyOtherInstancesAboutInvalidation(key: CacheKey): Future[Unit] = { cacheInvalidationProducer.send(topic, CacheInvalidationMessage(key, instanceId)).map(_ => Unit) diff --git a/core/controller/src/main/scala/whisk/core/loadBalancer/ContainerPoolBalancer.scala b/core/controller/src/main/scala/whisk/core/loadBalancer/ContainerPoolBalancer.scala index a5327b0..dfa57bb 100644 --- a/core/controller/src/main/scala/whisk/core/loadBalancer/ContainerPoolBalancer.scala +++ b/core/controller/src/main/scala/whisk/core/loadBalancer/ContainerPoolBalancer.scala @@ -169,7 +169,7 @@ class ContainerPoolBalancer(config: WhiskConfig, controllerInstance: InstanceId) /** Gets a producer which can publish messages to the kafka bus. */ private val messagingProvider = SpiLoader.get[MessagingProvider] - private val messageProducer = messagingProvider.getProducer(config, executionContext) + private val messageProducer = messagingProvider.getProducer(config) private def sendActivationToInvoker(producer: MessageProducer, msg: ActivationMessage, diff --git a/core/controller/src/main/scala/whisk/core/loadBalancer/ShardingContainerPoolBalancer.scala b/core/controller/src/main/scala/whisk/core/loadBalancer/ShardingContainerPoolBalancer.scala index 2ec822a..607670d 100644 --- a/core/controller/src/main/scala/whisk/core/loadBalancer/ShardingContainerPoolBalancer.scala +++ b/core/controller/src/main/scala/whisk/core/loadBalancer/ShardingContainerPoolBalancer.scala @@ -173,7 +173,7 @@ class ShardingContainerPoolBalancer(config: WhiskConfig, controllerInstance: Ins } private val messagingProvider = SpiLoader.get[MessagingProvider] - private val messageProducer = messagingProvider.getProducer(config, executionContext) + private val messageProducer = messagingProvider.getProducer(config) /** 3. Send the activation to the invoker */ private def sendActivationToInvoker(producer: MessageProducer, diff --git a/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala b/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala index 4ba2214..247b303 100644 --- a/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala +++ b/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala @@ -182,7 +182,7 @@ object Invoker { if (!msgProvider.ensureTopic(config, topic = "invoker" + assignedInvokerId, topicConfig = "invoker")) { abort(s"failure during msgProvider.ensureTopic for topic invoker$assignedInvokerId") } - val producer = msgProvider.getProducer(config, ec) + val producer = msgProvider.getProducer(config) val invoker = try { new InvokerReactive(config, invokerInstance, producer) } catch { diff --git a/tests/src/test/scala/ha/ShootComponentsTests.scala b/tests/src/test/scala/ha/ShootComponentsTests.scala index 6c2d00c..fa7d3a8 100644 --- a/tests/src/test/scala/ha/ShootComponentsTests.scala +++ b/tests/src/test/scala/ha/ShootComponentsTests.scala @@ -47,7 +47,8 @@ class ShootComponentsTests with WskTestHelpers with ScalaFutures with WskActorSystem - with StreamLogging { + with StreamLogging + with ShootComponentUtils { implicit val wskprops = WskProps() val wsk = new WskRest @@ -63,10 +64,8 @@ class ShootComponentsTests val allowedRequestsPerMinute = (amountOfControllers - 1.0) * limitPerController val timeBeweenRequests = 60.seconds / allowedRequestsPerMinute - val controller0DockerHost = WhiskProperties.getBaseControllerHost() + ":" + WhiskProperties.getProperty( - WhiskConfig.dockerPort) - - val couchDB0DockerHost = WhiskProperties.getBaseDBHost() + ":" + WhiskProperties.getProperty(WhiskConfig.dockerPort) + val controller0DockerHost = WhiskProperties.getBaseControllerHost() + val couchDB0DockerHost = WhiskProperties.getBaseDBHost() val dbProtocol = WhiskProperties.getProperty(WhiskConfig.dbProtocol) val dbHostsList = WhiskProperties.getDBHosts @@ -76,35 +75,6 @@ class ShootComponentsTests val dbPrefix = WhiskProperties.getProperty(WhiskConfig.dbPrefix) val dbWhiskAuth = WhiskProperties.getProperty(WhiskConfig.dbAuths) - private def getDockerCommand(host: String, component: String, cmd: String) = { - def file(path: String) = Try(new File(path)).filter(_.exists).map(_.getAbsolutePath).toOption - - val docker = (file("/usr/bin/docker") orElse file("/usr/local/bin/docker")).getOrElse("docker") - - Seq(docker, "--host", host, cmd, component) - } - - def restartComponent(host: String, component: String) = { - val cmd: Seq[String] = getDockerCommand(host, component, "restart") - println(s"Running command: ${cmd.mkString(" ")}") - - TestUtils.runCmd(0, new File("."), cmd: _*) - } - - def stopComponent(host: String, component: String) = { - val cmd: Seq[String] = getDockerCommand(host, component, "stop") - println(s"Running command: ${cmd.mkString(" ")}") - - TestUtils.runCmd(0, new File("."), cmd: _*) - } - - def startComponent(host: String, component: String) = { - val cmd: Seq[String] = getDockerCommand(host, component, "start") - println(s"Running command: ${cmd.mkString(" ")}") - - TestUtils.runCmd(0, new File("."), cmd: _*) - } - def ping(host: String, port: Int, path: String = "/") = { val response = Try { Http().singleRequest(HttpRequest(uri = s"http://$host:$port$path")).futureValue @@ -317,3 +287,35 @@ class ShootComponentsTests } } } + +trait ShootComponentUtils { + private def getDockerCommand(host: String, component: String, cmd: String) = { + def file(path: String) = Try(new File(path)).filter(_.exists).map(_.getAbsolutePath).toOption + + val docker = (file("/usr/bin/docker") orElse file("/usr/local/bin/docker")).getOrElse("docker") + val dockerPort = WhiskProperties.getProperty(WhiskConfig.dockerPort) + + Seq(docker, "--host", host + ":" + dockerPort, cmd, component) + } + + def restartComponent(host: String, component: String) = { + val cmd: Seq[String] = getDockerCommand(host, component, "restart") + println(s"Running command: ${cmd.mkString(" ")}") + + TestUtils.runCmd(0, new File("."), cmd: _*) + } + + def stopComponent(host: String, component: String) = { + val cmd: Seq[String] = getDockerCommand(host, component, "stop") + println(s"Running command: ${cmd.mkString(" ")}") + + TestUtils.runCmd(0, new File("."), cmd: _*) + } + + def startComponent(host: String, component: String) = { + val cmd: Seq[String] = getDockerCommand(host, component, "start") + println(s"Running command: ${cmd.mkString(" ")}") + + TestUtils.runCmd(0, new File("."), cmd: _*) + } +} diff --git a/tests/src/test/scala/services/KafkaConnectorTests.scala b/tests/src/test/scala/services/KafkaConnectorTests.scala index d987cb7..3f91b59 100644 --- a/tests/src/test/scala/services/KafkaConnectorTests.scala +++ b/tests/src/test/scala/services/KafkaConnectorTests.scala @@ -21,23 +21,30 @@ import java.io.File import java.util.Calendar import common.{StreamLogging, TestUtils, WhiskProperties, WskActorSystem} +import ha.ShootComponentUtils import org.apache.kafka.clients.consumer.CommitFailedException import org.junit.runner.RunWith -import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers} import org.scalatest.junit.JUnitRunner +import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers} import whisk.common.TransactionId import whisk.connector.kafka.{KafkaConsumerConnector, KafkaMessagingProvider, KafkaProducerConnector} import whisk.core.WhiskConfig import whisk.core.connector.Message import whisk.utils.{retry, ExecutionContextFactory} -import scala.concurrent.{Await, ExecutionContext} import scala.concurrent.duration.{DurationInt, FiniteDuration} +import scala.concurrent.{Await, ExecutionContext} import scala.language.postfixOps import scala.util.Try @RunWith(classOf[JUnitRunner]) -class KafkaConnectorTests extends FlatSpec with Matchers with WskActorSystem with BeforeAndAfterAll with StreamLogging { +class KafkaConnectorTests + extends FlatSpec + with Matchers + with WskActorSystem + with BeforeAndAfterAll + with StreamLogging + with ShootComponentUtils { implicit val transid: TransactionId = TransactionId.testing implicit val ec: ExecutionContext = ExecutionContextFactory.makeCachedThreadPoolExecutionContext() @@ -46,7 +53,7 @@ class KafkaConnectorTests extends FlatSpec with Matchers with WskActorSystem wit val groupid = "kafkatest" val topic = "KafkaConnectorTestTopic" - val maxPollInterval = 10 seconds + val maxPollInterval = 10.seconds // Need to overwrite replication factor for tests that shut down and start // Kafka instances intentionally. These tests will fail if there is more than @@ -57,10 +64,10 @@ class KafkaConnectorTests extends FlatSpec with Matchers with WskActorSystem wit println(s"Create test topic '$topic' with replicationFactor=$replicationFactor") assert(KafkaMessagingProvider.ensureTopic(config, topic, topic), s"Creation of topic $topic failed") - println(s"Create test topic '${topic}' with replicationFactor=${replicationFactor}") - assert(KafkaMessagingProvider.ensureTopic(config, topic, topic), s"Creation of topic ${topic} failed") + println(s"Create test topic '$topic' with replicationFactor=$replicationFactor") + assert(KafkaMessagingProvider.ensureTopic(config, topic, topic), s"Creation of topic $topic failed") - val producer = new KafkaProducerConnector(config.kafkaHosts, ec) + val producer = new KafkaProducerConnector(config.kafkaHosts) val consumer = new KafkaConsumerConnector(config.kafkaHosts, groupid, topic) override def afterAll(): Unit = { @@ -143,18 +150,31 @@ class KafkaConnectorTests extends FlatSpec with Matchers with WskActorSystem wit val startLog = s", started" val prevCount = startLog.r.findAllMatchIn(commandComponent(kafkaHost, "logs", s"kafka$i").stdout).length - commandComponent(kafkaHost, "stop", s"kafka$i") - sendAndReceiveMessage(message, 30 seconds, 30 seconds) should have size 1 + // 1. stop one of kafka node + stopComponent(kafkaHost, s"kafka$i") + + // 2. kafka cluster should be ok at least after three retries + retry({ + val received = sendAndReceiveMessage(message, 40 seconds, 40 seconds) + received.size should be >= 1 + }, 3, Some(100.milliseconds)) consumer.commit() - commandComponent(kafkaHost, "start", s"kafka$i") + // 3. recover stopped node + startComponent(kafkaHost, s"kafka$i") + + // 4. wait until kafka is up retry({ startLog.r .findAllMatchIn(commandComponent(kafkaHost, "logs", s"kafka$i").stdout) .length shouldBe prevCount + 1 - }, 20, Some(1.second)) // wait until kafka is up + }, 20, Some(1.second)) - sendAndReceiveMessage(message, 30 seconds, 30 seconds) should have size 1 + // 5. kafka cluster should be ok at least after three retires + retry({ + val received = sendAndReceiveMessage(message, 40 seconds, 40 seconds) + received.size should be >= 1 + }, 3, Some(100.milliseconds)) consumer.commit() } } diff --git a/tests/src/test/scala/whisk/core/connector/test/MessageFeedTests.scala b/tests/src/test/scala/whisk/core/connector/test/MessageFeedTests.scala index b240edd..c6743aa 100644 --- a/tests/src/test/scala/whisk/core/connector/test/MessageFeedTests.scala +++ b/tests/src/test/scala/whisk/core/connector/test/MessageFeedTests.scala @@ -65,7 +65,7 @@ class MessageFeedTests val peekCount = new AtomicInteger() val consumer = new TestConnector("feedtest", 4, true) { - override def peek(duration: Duration) = { + override def peek(duration: FiniteDuration, retry: Int = 0) = { peekCount.incrementAndGet() super.peek(duration) } diff --git a/tests/src/test/scala/whisk/core/connector/test/TestConnector.scala b/tests/src/test/scala/whisk/core/connector/test/TestConnector.scala index e3b9597..8b428e5 100644 --- a/tests/src/test/scala/whisk/core/connector/test/TestConnector.scala +++ b/tests/src/test/scala/whisk/core/connector/test/TestConnector.scala @@ -21,7 +21,7 @@ import java.util.ArrayList import java.util.concurrent.LinkedBlockingQueue import scala.concurrent.Future -import scala.concurrent.duration.Duration +import scala.concurrent.duration._ import scala.collection.JavaConversions._ import org.apache.kafka.clients.producer.RecordMetadata @@ -37,7 +37,7 @@ class TestConnector(topic: String, override val maxPeek: Int, allowMoreThanMax: extends MessageConsumer with StreamLogging { - override def peek(duration: Duration) = { + override def peek(duration: FiniteDuration, retry: Int = 0) = { val msgs = new ArrayList[Message] queue.synchronized { queue.drainTo(msgs, if (allowMoreThanMax) Int.MaxValue else maxPeek) @@ -48,7 +48,7 @@ class TestConnector(topic: String, override val maxPeek: Int, allowMoreThanMax: } } - override def commit() = { + override def commit(retry: Int = 0) = { if (throwCommitException) { throw new Exception("commit failed") } else { -- To stop receiving notification emails like this one, please contact markusthoem...@apache.org.