Hi, I still face the same issue sometimes. My kafka consumer is giving this exception after failing to claim any partition.
java.nio.channels.ClosedChannelException at kafka.network.BlockingChannel.send(BlockingChannel.scala:110) at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:75) at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:74) at kafka.producer.SyncProducer.send(SyncProducer.scala:119) at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:59) at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:94) at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63) Regards, Sahitya Agrawal On Tue, May 17, 2016 at 12:22 AM, Jason Gustafson <ja...@confluent.io> wrote: > To pile on a little bit, the API is designed to ensure consumer liveness so > that partitions cannot be held indefinitely by a defunct process. Since > heartbeating and message processing are done in the same thread, the > consumer needs to demonstrate "progress" by calling poll() often enough not > to get kicked out. What "often enough" means is dictated by the session > timeout, which is 30s by default. If you fail to call poll() before the > session timer expires, then the broker will assume that the member is dead > and begin a rebalance. If you need more time to handle messages, increase > session.timeout.ms in your configuration. The only downside to a higher > timeout in general is that it will take longer to detect other kinds of > failures (such as process crashes or network partitions). > > This was the initial design, but it hasn't worked out quite as well as we > would have liked, at least not in all situations. The first problem in 0.9 > is that you don't have a direct way to control the amount of data that can > be returned in a call to poll(), which makes it difficult to estimate the > session timeout. You can set max.partition.fetch.bytes and, based on an > estimate for the total number of partitions that you need to read, try to > come up with a guess, but this is kind of hard in practice. So in 0.10 > we've introduced a new setting max.poll.records, which lets you set an > explicit bound on the number of messages that need to be handled on each > poll iteration. The idea is hopefully that you can set this to a reasonably > low value so that you're never risking a session timeout. > > It's also worthwhile understanding a little bit about how the rebalance > mechanism works. After a consumer group is created, each consumer begins > sending heartbeat messages to a special broker known as the coordinator. > When a new consumer joins the group (or when the session timeout of an > existing member expires), the other members find out about it through the > error code in the heartbeat response. The group coordination protocol > basically implements a synchronization barrier. When a rebalance begins, > all members of the group have to join the barrier for it to complete. So if > you want to reduce the impact from rebalancing, then you need to ensure > that all members can join the barrier as soon as possible after it begins. > For this, we expose heartbeat.interval.ms, but note that we can't actually > send heartbeats any faster than the poll() interval itself because > everything is done from the same thread. So if you want to always have fast > rebalances, then the target for setting the processing bound should be the > heartbeat interval instead of the session timeout. > > We've made some other small improvements to make unexpected rebalancing > less of a problem in practice. For example, we modified the protocol > behavior to allow offset commits to serve as effective heartbeats, which > wasn't the case in 0.9. However, we're still encountering situations where > there's really no clear way to estimate the session timeout other than > somewhat exhaustive testing. Even max.poll.records doesn't help when the > impact of a single message can vary disproportionately (as is sometimes the > case in Kafka Streams which uses the consumer internally). You could set a > ridiculously large session timeout in these cases, but that guarantees also > a long time to recover from hard failures. I think this basically means > that these use cases need a separate notion of liveness, which they have a > bit more control over. For example, we can expose a method in the consumer > which applications can call from any thread to know that they're still > around. I'm working on a KIP right now to address this problem, so look for > it in the next few weeks. > > Thanks, > Jason > > On Sat, May 14, 2016 at 8:05 AM, sahitya agrawal <sahitya2...@gmail.com> > wrote: > > > Thanks Cees and Abhinav, will give this trick a try and update if it > helped > > for my case. > > > > Regards, > > Sahitya Agrawal > > > > On Fri, May 13, 2016 at 9:36 PM, Cees de Groot <c...@pagerduty.com> > wrote: > > > > > What Abhinav said. To give some context: the common cause of frequent > > > rebalances is that your consumer takes too long to process batches. As > > > long as you don't call into the consumer library, heartbeats aren't > > > sent so if you take too long working through a batch, the broker > > > things your consumer is gone and starts re-balancing. The message > > > batch under processing never gets marked as done, so after > > > rebalancing, things start over from the same spot. > > > > > > So the solution is to either make the batches smaller or the heartbeat > > > interval longer. There are fancier solutions for when this doesn't > > > work, but it should do the trick for most normal cases. > > > > > > On Fri, May 13, 2016 at 10:20 AM, Abhinav Solan < > abhinav.so...@gmail.com > > > > > > wrote: > > > > Hi Sahitya, > > > > > > > > Try reducing max.partition.fetch.bytes in your consumer. > > > > Then also increase heartbeat.interval.ms, this might help in to > delay > > > the > > > > consumer rebalance of your inbound process is taking more time than > > this > > > > > > > > - Abhinav > > > > > > > > On Fri, May 13, 2016 at 5:42 AM sahitya agrawal < > sahitya2...@gmail.com > > > > > > > wrote: > > > > > > > >> Hi, > > > >> > > > >> I am using new Kafka consumer API ( 0.9.0.0) . I created 100 > > > partitions of > > > >> a topic and started only one consumer to consume. Many of times , In > > > >> consumer logs I see lot of rebalancing activity and no object is > > > consumed > > > >> due to that. > > > >> > > > >> Is this a known issue? Please let me know if some body can help with > > > regard > > > >> to this. > > > >> > > > >> My Consumer config: > > > >> props.put("zookeeper.session.timeout.ms", "10000"); > > > >> props.put("rebalance.backoff.ms","10000"); > > > >> props.put("zookeeper.sync.time.ms","200"); > > > >> props.put("rebalance.max.retries","10"); > > > >> props.put("enable.auto.commit", "false"); > > > >> props.put("consumer.timeout.ms","20000"); > > > >> props.put("auto.offset.reset", "smallest"); > > > >> > > > >> Thanks, > > > >> Sahitya > > > >> > > > > > > > > > > > > -- > > > Cees de Groot > > > Principal Software Engineer > > > PagerDuty, Inc. > > > > > >