Hi,

I still face the same issue sometimes. My kafka consumer is giving this
exception after failing to claim any partition.

java.nio.channels.ClosedChannelException
        at kafka.network.BlockingChannel.send(BlockingChannel.scala:110)
        at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:75)
        at
kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:74)
        at kafka.producer.SyncProducer.send(SyncProducer.scala:119)
        at
kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:59)
        at
kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:94)
        at
kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
        at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)


Regards,
Sahitya Agrawal

On Tue, May 17, 2016 at 12:22 AM, Jason Gustafson <ja...@confluent.io>
wrote:

> To pile on a little bit, the API is designed to ensure consumer liveness so
> that partitions cannot be held indefinitely by a defunct process. Since
> heartbeating and message processing are done in the same thread, the
> consumer needs to demonstrate "progress" by calling poll() often enough not
> to get kicked out. What "often enough" means is dictated by the session
> timeout, which is 30s by default. If you fail to call poll() before the
> session timer expires, then the broker will assume that the member is dead
> and begin a rebalance. If you need more time to handle messages, increase
> session.timeout.ms in your configuration. The only downside to a higher
> timeout in general is that it will take longer to detect other kinds of
> failures (such as process crashes or network partitions).
>
> This was the initial design, but it hasn't worked out quite as well as we
> would have liked, at least not in all situations. The first problem in 0.9
> is that you don't have a direct way to control the amount of data that can
> be returned in a call to poll(), which makes it difficult to estimate the
> session timeout. You can set max.partition.fetch.bytes and, based on an
> estimate for the total number of partitions that you need to read, try to
> come up with a guess, but this is kind of hard in practice. So in 0.10
> we've introduced a new setting max.poll.records, which lets you set an
> explicit bound on the number of messages that need to be handled on each
> poll iteration. The idea is hopefully that you can set this to a reasonably
> low value so that you're never risking a session timeout.
>
> It's also worthwhile understanding a little bit about how the rebalance
> mechanism works. After a consumer group is created, each consumer begins
> sending heartbeat messages to a special broker known as the coordinator.
> When a new consumer joins the group (or when the session timeout of an
> existing member expires), the other members find out about it through the
> error code in the heartbeat response. The group coordination protocol
> basically implements a synchronization barrier. When a rebalance begins,
> all members of the group have to join the barrier for it to complete. So if
> you want to reduce the impact from rebalancing, then you need to ensure
> that all members can join the barrier as soon as possible after it begins.
> For this, we expose heartbeat.interval.ms, but note that we can't actually
> send heartbeats any faster than the poll() interval itself because
> everything is done from the same thread. So if you want to always have fast
> rebalances, then the target for setting the processing bound should be the
> heartbeat interval instead of the session timeout.
>
> We've made some other small improvements to make unexpected rebalancing
> less of a problem in practice. For example, we modified the protocol
> behavior to allow offset commits to serve as effective heartbeats, which
> wasn't the case in 0.9. However, we're still encountering situations where
> there's really no clear way to estimate the session timeout other than
> somewhat exhaustive testing. Even max.poll.records doesn't help when the
> impact of a single message can vary disproportionately (as is sometimes the
> case in Kafka Streams which uses the consumer internally). You could set a
> ridiculously large session timeout in these cases, but that guarantees also
> a long time to recover from hard failures. I think this basically means
> that these use cases need a separate notion of liveness, which they have a
> bit more control over. For example, we can expose a method in the consumer
> which applications can call from any thread to know that they're still
> around. I'm working on a KIP right now to address this problem, so look for
> it in the next few weeks.
>
> Thanks,
> Jason
>
> On Sat, May 14, 2016 at 8:05 AM, sahitya agrawal <sahitya2...@gmail.com>
> wrote:
>
> > Thanks Cees and Abhinav, will give this trick a try and update if it
> helped
> > for my case.
> >
> > Regards,
> > Sahitya Agrawal
> >
> > On Fri, May 13, 2016 at 9:36 PM, Cees de Groot <c...@pagerduty.com>
> wrote:
> >
> > > What Abhinav said. To give some context: the common cause of frequent
> > > rebalances is that your consumer takes too long to process batches. As
> > > long as you don't call into the consumer library, heartbeats aren't
> > > sent so if you take too long working through a batch, the broker
> > > things your consumer is gone and starts re-balancing. The message
> > > batch under processing never gets marked as done, so after
> > > rebalancing, things start over from the same spot.
> > >
> > > So the solution is to either make the batches smaller or the heartbeat
> > > interval longer. There are fancier solutions for when this doesn't
> > > work, but it should do the trick for most normal cases.
> > >
> > > On Fri, May 13, 2016 at 10:20 AM, Abhinav Solan <
> abhinav.so...@gmail.com
> > >
> > > wrote:
> > > > Hi Sahitya,
> > > >
> > > > Try reducing max.partition.fetch.bytes in your consumer.
> > > > Then also increase heartbeat.interval.ms, this might help in to
> delay
> > > the
> > > > consumer rebalance of your inbound process is taking more time than
> > this
> > > >
> > > > - Abhinav
> > > >
> > > > On Fri, May 13, 2016 at 5:42 AM sahitya agrawal <
> sahitya2...@gmail.com
> > >
> > > > wrote:
> > > >
> > > >> Hi,
> > > >>
> > > >> I am using new Kafka consumer API ( 0.9.0.0) . I created 100
> > > partitions  of
> > > >> a topic and started only one consumer to consume. Many of times , In
> > > >> consumer logs I see lot of rebalancing activity and no object is
> > > consumed
> > > >> due to that.
> > > >>
> > > >> Is this a known issue? Please let me know if some body can help with
> > > regard
> > > >> to this.
> > > >>
> > > >> My Consumer config:
> > > >>         props.put("zookeeper.session.timeout.ms", "10000");
> > > >>         props.put("rebalance.backoff.ms","10000");
> > > >>         props.put("zookeeper.sync.time.ms","200");
> > > >>         props.put("rebalance.max.retries","10");
> > > >>         props.put("enable.auto.commit", "false");
> > > >>         props.put("consumer.timeout.ms","20000");
> > > >>         props.put("auto.offset.reset", "smallest");
> > > >>
> > > >> Thanks,
> > > >> Sahitya
> > > >>
> > >
> > >
> > >
> > > --
> > > Cees de Groot
> > > Principal Software Engineer
> > > PagerDuty, Inc.
> > >
> >
>

Reply via email to