I am using akka-stream-kafka 0.18. I have a flow that reads from one kafka topic, does some processing and then writes to a different kafka topic. The flow has been shutting down intermittently when kafka brokers fail.
Sometimes the brokers will fail repeatedly over a long period and the flow does not shut down and other times it shuts down as soon as the broker fails the first time. In the logs below, once the message 'Closing the Kafka producer' appears, we no longer receive any messages from the Kafka topic. Here is the code: private val consumerSettings = ConsumerSettings(actorSystem, new ByteArrayDeserializer, new StringDeserializer) .withBootstrapServers(bootstrapServers) .withGroupId(requestGroup) .withProperty(AUTO_OFFSET_RESET_CONFIG, "earliest") private val producerSettings = ProducerSettings(actorSystem, new ByteArraySerializer, new StringSerializer) .withBootstrapServers(bootstrapServers) private def decider(throwable: Throwable): Supervision.Directive = { logger.error("Received error in request consumer - restarting", throwable) Supervision.Restart } private implicit val materializer: Materializer = ActorMaterializer() private val parallelism: Int = parallelismFactor * getRuntime.availableProcessors private val source = Consumer.committableSource(consumerSettings, Subscriptions.topics(requestTopic)) .mapAsync(parallelism)(messageProcessor.processMessage) .withAttributes(supervisionStrategy(decider)) .via(Producer.flow(producerSettings).withAttributes(supervisionStrategy(decider))) .map(_.message.passThrough) .groupedWithin(batchSize, DurationUtils.toFiniteDuration(batchDelay)) .map(group => group.foldLeft(CommittableOffsetBatch.empty) { (batch, elem) => batch.updated(elem) }) .mapAsync(parallelism)(_.commitScaladsl()) source.runWith(Sink.ignore) Am I missing something in the code that is necessary to keep the flow running when errors occur? Here's the config: akka.kafka.consumer { wakeup-timeout = 10s max-wakeups = 8640 kafka-clients { reconnect.backoff.ms = 1000 reconnect.backoff.max.ms = 60000 enable.auto.commit = false } } Here's the logs just before things stop working: 2018-01-12 16:01:55.899 kafka-coordinator-heartbeat-thread | sherlock INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator Marking the coordinator XXX.XXX.XXX.159:6667 (id: 1253353944 rack: null) dead for group sherlock 2018-01-12 16:02:02.815 application-akka.kafka.default-dispatcher-23 INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator Discovered coordinator XXX.XXX.XXX.159:6667 (id: 1253353944 rack: null) for group sherlock. 2018-01-12 16:02:02.839 application-akka.kafka.default-dispatcher-23 INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator Revoking previously assigned partitions [dlp_request-9, dlp_request-11, dlp_request-10] for group sherlock 2018-01-12 16:02:02.839 application-akka.kafka.default-dispatcher-23 INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator (Re-)joining group sherlock 2018-01-12 16:02:03.015 application-akka.kafka.default-dispatcher-23 INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator (Re-)joining group sherlock 2018-01-12 16:02:04.864 application-akka.actor.default-dispatcher-308541 INFO org.apache.kafka.clients.producer.KafkaProducer Closing the Kafka producer with timeoutMillis = 60000 ms. 2018-01-12 16:02:12.831 application-akka.actor.default-dispatcher-308634 WARN akka.kafka.KafkaConsumerActor Consumer interrupted with WakeupException after timeout. Message: null. Current value of akka.kafka.consumer.wakeup-timeout is 10000 milliseconds 2018-01-12 16:02:12.831 application-akka.actor.default-dispatcher-308634 WARN akka.kafka.KafkaConsumerActor Kafka commit took longer than `commit-time-warning`: 22991676910 ms 2018-01-12 16:02:12.832 application-akka.actor.default-dispatcher-308541 WARN akka.kafka.KafkaConsumerActor Kafka commit took longer than `commit-time-warning`: 10016185009 ms 2018-01-12 16:02:12.919 application-akka.kafka.default-dispatcher-23 INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator Successfully joined group sherlock with generation 257 2018-01-12 16:02:12.920 application-akka.kafka.default-dispatcher-23 INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator Setting newly assigned partitions [dlp_request-9, dlp_request-11, dlp_request-10] for group sherlock -- >>>>>>>>>> Read the docs: http://akka.io/docs/ >>>>>>>>>> Check the FAQ: >>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups "Akka User List" group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at https://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.