Hi, I am trying to shutdown kafka consumer (version 0.8.2) gracefully by calling consumer.shutdown (ConsumerConnector.shutdown) and then waiting for executor threads to finish. However what I have noticed is that during next start, some of the messages are replayed. I have auto commit enabled.
I looked at the code of kafka.consumer.ZookeeperConsumerConnector <https://github.com/kafka-dev/kafka/blob/master/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala> and realised that there is no commitOffset call during shutdown. Here is the code snippet from ZookeeperConsumerConnector class. Ideally after shutting down fetchers, commit offset should have been called to commit all offsets fetched till now. Also I noticed that during shutdown, zkClient is being closed, so I can not call commitOffset from outside also after shutdown. Is this expected behaviour? Or there is anything I am missing. Is there any way using high level consumer to make sure all offsets are committed before shutting down? def shutdown() { val canShutdown = isShuttingDown.compareAndSet(false, true); if (canShutdown) { logger.info("ZKConsumerConnector shutting down") try { scheduler.shutdown fetcher match { case Some(f) => f.shutdown case None => } sendShudownToAllQueues if (zkClient != null) { zkClient.close() zkClient = null } } catch { case e => logger.fatal(e) logger.fatal(Utils.stackTrace(e)) } logger.info("ZKConsumerConnector shut down completed") } }
