I am using akka-stream-kafka 0.18. I have a flow that reads from one kafka 
topic, does some processing and then writes to a different kafka topic. The 
flow has been shutting down intermittently when kafka brokers fail. 

Sometimes the brokers will fail repeatedly over a long period and the flow 
does not shut down and other times it shuts down as soon as the broker 
fails the first time. In the logs below, once the message 'Closing the 
Kafka producer' appears, we no longer receive any messages from the Kafka 
topic.

Here is the code:

  private val consumerSettings = ConsumerSettings(actorSystem, new 
ByteArrayDeserializer, new StringDeserializer)
    .withBootstrapServers(bootstrapServers)
    .withGroupId(requestGroup)
    .withProperty(AUTO_OFFSET_RESET_CONFIG, "earliest")

  private val producerSettings = ProducerSettings(actorSystem, new 
ByteArraySerializer, new StringSerializer)
    .withBootstrapServers(bootstrapServers)

  private def decider(throwable: Throwable): Supervision.Directive = {
    logger.error("Received error in request consumer - restarting", 
throwable)
    Supervision.Restart
  }

  private implicit val materializer: Materializer = ActorMaterializer()

  private val parallelism: Int = parallelismFactor * 
getRuntime.availableProcessors

  private val source = Consumer.committableSource(consumerSettings, 
Subscriptions.topics(requestTopic))
    .mapAsync(parallelism)(messageProcessor.processMessage)
    .withAttributes(supervisionStrategy(decider))
    
.via(Producer.flow(producerSettings).withAttributes(supervisionStrategy(decider)))
    .map(_.message.passThrough)
    .groupedWithin(batchSize, DurationUtils.toFiniteDuration(batchDelay))
    .map(group => group.foldLeft(CommittableOffsetBatch.empty) { (batch, 
elem) => batch.updated(elem) })
    .mapAsync(parallelism)(_.commitScaladsl())
  source.runWith(Sink.ignore)

Am I missing something in the code that is necessary to keep the flow 
running when errors occur?

Here's the config:

akka.kafka.consumer {
  wakeup-timeout = 10s
  max-wakeups = 8640
  kafka-clients {
    reconnect.backoff.ms = 1000
    reconnect.backoff.max.ms = 60000
    enable.auto.commit = false
  }
}

Here's the logs just before things stop working:

2018-01-12 16:01:55.899 kafka-coordinator-heartbeat-thread | sherlock INFO 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator Marking the 
coordinator XXX.XXX.XXX.159:6667 (id: 1253353944 rack: null) dead for group 
sherlock
2018-01-12 16:02:02.815 application-akka.kafka.default-dispatcher-23 INFO 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator Discovered 
coordinator XXX.XXX.XXX.159:6667 (id: 1253353944 rack: null) for group 
sherlock.
2018-01-12 16:02:02.839 application-akka.kafka.default-dispatcher-23 INFO 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator Revoking 
previously assigned partitions [dlp_request-9, dlp_request-11, 
dlp_request-10] for group sherlock
2018-01-12 16:02:02.839 application-akka.kafka.default-dispatcher-23 INFO 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator 
(Re-)joining group sherlock
2018-01-12 16:02:03.015 application-akka.kafka.default-dispatcher-23 INFO 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator 
(Re-)joining group sherlock
2018-01-12 16:02:04.864 application-akka.actor.default-dispatcher-308541 
INFO org.apache.kafka.clients.producer.KafkaProducer Closing the Kafka 
producer with timeoutMillis = 60000 ms.
2018-01-12 16:02:12.831 application-akka.actor.default-dispatcher-308634 
WARN akka.kafka.KafkaConsumerActor Consumer interrupted with 
WakeupException after timeout. Message: null. Current value of 
akka.kafka.consumer.wakeup-timeout is 10000 milliseconds
2018-01-12 16:02:12.831 application-akka.actor.default-dispatcher-308634 
WARN akka.kafka.KafkaConsumerActor Kafka commit took longer than 
`commit-time-warning`: 22991676910 ms
2018-01-12 16:02:12.832 application-akka.actor.default-dispatcher-308541 
WARN akka.kafka.KafkaConsumerActor Kafka commit took longer than 
`commit-time-warning`: 10016185009 ms
2018-01-12 16:02:12.919 application-akka.kafka.default-dispatcher-23 INFO 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator 
Successfully joined group sherlock with generation 257
2018-01-12 16:02:12.920 application-akka.kafka.default-dispatcher-23 INFO 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator Setting 
newly assigned partitions [dlp_request-9, dlp_request-11, dlp_request-10] 
for group sherlock

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to