No, we don't normally see conflicts. We'll see endless attempts to
rebalance.

-Mike

On Thu, Mar 26, 2015 at 5:15 PM, Mayuresh Gharat <gharatmayures...@gmail.com
> wrote:

> Did you see something like this in any of the consumer logs :
>
> "Conflict in ….. data : ……. stored data :……”  ?
>
> Thanks,
>
> Mayuresh
>
> On Thu, Mar 26, 2015 at 1:50 PM, Mike Axiak <m...@axiak.net> wrote:
>
> > Hi guys,
> >
> > At HubSpot we think the issue is related to slow consumers. During a
> > rebalance, one of the first things the consumer does is signal a shutdown
> > to the fetcher [1] [2], in order to relinquish ownership of the
> partitions.
> >
> > This waits for the shutdown of all shutdown fetcher threads, which can
> only
> > happen until the thread's "enqueue current chunk" command finishes. If
> you
> > have a slow consumer or large chunk sizes, this could take a while which
> > would make it difficult for the rebalance to actually occur successfully.
> >
> > We're testing out different solutions now. Currently under review, we're
> > thinking about making the enqueue into the blocking queue timeout so we
> can
> > check to see if we're running, to end the process of the current chunk
> > early.
> >
> > Has anyone else noticed this? If so, are there any patches people have
> > written. Once we have a clearer picture of solutions we'll send a few
> > patches in JIRAs.
> >
> > Best,
> > Mike
> >
> > 1:
> >
> >
> https://github.com/apache/kafka/blob/0.8.2/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala#L655
> > 2:
> >
> >
> https://github.com/apache/kafka/blob/0.8.2/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala#L712
> >
> >
> > On Mon, Dec 22, 2014 at 6:51 PM, Neha Narkhede <n...@confluent.io>
> wrote:
> >
> > > Can you share a reproducible test case?
> > >
> > > On Tue, Dec 9, 2014 at 7:11 AM, Mohit Kathuria <mkathu...@sprinklr.com
> >
> > > wrote:
> > >
> > > > Neha,
> > > >
> > > > The same issue reoccured with just 2 consumer processes. The
> exception
> > > was
> > > > related to conflict in writing the ephemeral node. Below was the
> > > exception.
> > > > Topic name is
> > > >
> > >
> >
> "lst_plugin_com.spr.listening.plugin.impl.plugins.SemantriaEnrichmentPlugin"
> > > > with 30 partitions. The 2 processes were running on 2 servers with
> ips
> > > > 10.0.8.222 and 10.0.8.225.
> > > >
> > > > *2014-12-09 13:22:11 k.u.ZkUtils$ [INFO] I wrote this conflicted
> > > ephemeral
> > > > node
> > > >
> > >
> >
> [{"version":1,"subscription":{"lst_plugin_com.spr.listening.plugin.impl.plugins.SemantriaEnrichmentPlugin":5},"pattern":"static","timestamp":"1417964160024"}]
> > > > at
> > > >
> > >
> >
> /consumers/lst_plugin_com.spr.listening.plugin.impl.plugins.SemantriaEnrichmentPlugin/ids/lst_plugin_com.spr.listening.plugin.impl.plugins.SemantriaEnrichmentPlugin_ip-10-0-8-222-1417963753598-b19de58d
> > > > a while back in a different session, hence I will backoff for this
> node
> > > to
> > > > be deleted by Zookeeper and retry*
> > > > Attached the complete error logs. The exception occured after the
> > > > rebalance failed even after 40 retries. Rebalance failed as the
> process
> > > > already owning some of the partitions did not give us ownership due
> to
> > > > conflicting ephemeral nodes. As you suggested, we ran the wchp
> command
> > > on
> > > > the 3 zookeeper nodes at this time and figured out that the watcher
> was
> > > > registered for only one of the process. I am copying the kafka
> consumer
> > > > watcher registered on one of the zookeeper servers. (Attached are the
> > > wchp
> > > > outputs of all 3 zk servers)
> > > >
> > > > *$echo wchp | nc localhost 2181 *
> > > >
> > > >
> > > >
> > >
> >
> */kafka/consumers/lst_plugin_com.spr.listening.plugin.impl.plugins.SemantriaEnrichmentPlugin/ids*
> > > >
> > > > * 0x34a175e1d5d0130*
> > > >
> > > >
> > > > "0x34a175e1d5d0130" was the ephemeral node session Id. I went back to
> > the
> > > > zookeeper shell and checked the consumers registered for this topic
> and
> > > > consumer group(same as topic name). Attaching the output in
> > > zkCommands.txt.
> > > > This clearly shows that
> > > >
> > > > 10.0.8.222 has ephemeralOwner = 0x34a175e1d5d0130
> > > >
> > > > 10.0.8.225 has ephemeralOwner = 0x34a175e1d5d0127
> > > >
> > > >
> > > > I think we have the issue here that both consumers have written to
> > > > different ephemeral nodes. Watchers are registered for the one of
> the 2
> > > > ephemeral node. The root cause seems to be the inconsistent state
> while
> > > > writing the ephemeral nodes in ZK.
> > > >
> > > > Let me know if you need more details.
> > > >
> > > > -Thanks,
> > > >
> > > > Mohit
> > > >
> > > >
> > > >
> > > >
> > > > On Mon, Nov 10, 2014 at 8:46 AM, Neha Narkhede <
> > neha.narkh...@gmail.com>
> > > > wrote:
> > > >
> > > >> A rebalance should trigger on all consumers when you add a new
> > consumer
> > > to
> > > >> the group. If you don't see the zookeeper watch fire, the consumer
> may
> > > >> have
> > > >> somehow lost the watch. We have seen this behavior on older zk
> > > versions, I
> > > >> wonder it that bug got reintroduced. To verify if this is the case,
> > you
> > > >> can
> > > >> run the wchp zookeeper command on the zk leader and check if each
> > > consumer
> > > >> has a watch registered.
> > > >>
> > > >> Do you have a way to try this on zk 3.3.4? I would recommend you try
> > the
> > > >> wchp suggestion as well.
> > > >>
> > > >> On Fri, Nov 7, 2014 at 6:07 AM, Mohit Kathuria <
> > mkathu...@sprinklr.com>
> > > >> wrote:
> > > >>
> > > >> > Hi all,
> > > >> >
> > > >> > Can someone help here. We are getting constant rebalance failure
> > each
> > > >> time
> > > >> > a consumer is added beyond a certain number. Did quite a lot of
> > > >> debugging
> > > >> > on this and still not able to figure out the pattern.
> > > >> >
> > > >> > -Thanks,
> > > >> > Mohit
> > > >> >
> > > >> > On Mon, Nov 3, 2014 at 10:53 PM, Mohit Kathuria <
> > > mkathu...@sprinklr.com
> > > >> >
> > > >> > wrote:
> > > >> >
> > > >> > > Neha,
> > > >> > >
> > > >> > > Looks like an issue with the consumer rebalance not able to
> > complete
> > > >> > > successfully. We were able to reproduce the issue on topic with
> 30
> > > >> > > partitions,  3 consumer processes(p1,p2 and p3), properties -
> 40
> > > >> > > rebalance.max.retries and 10000(10s) rebalance.backoff.ms.
> > > >> > >
> > > >> > > Before the process p3 was started, partition ownership was as
> > > >> expected:
> > > >> > >
> > > >> > > partitions 0-14 owned by p1
> > > >> > > partitions 15-29 -> owner p2
> > > >> > >
> > > >> > > As the process p3 started, rebalance was triggered. Process p3
> was
> > > >> > > successfully able to acquire partition ownership for partitions
> > > 20-29
> > > >> as
> > > >> > > expected as per the rebalance algorithm. However, process p2
> while
> > > >> trying
> > > >> > > to acquire ownership of partitions 10-19 saw rebalance failure
> > after
> > > >> 40
> > > >> > > retries.
> > > >> > >
> > > >> > > Attaching the logs from process p2 and process p1. It says that
> p2
> > > was
> > > >> > > attempting to rebalance, it was trying to acquire ownership of
> > > >> partitions
> > > >> > > 10-14 which were owned by process p1. However, at the same time
> > > >> process
> > > >> > p1
> > > >> > > did not get any event for giving up the partition ownership for
> > > >> > partitions
> > > >> > > 1-14.
> > > >> > > We were expecting a rebalance to have triggered in p1 - but it
> > > didn't
> > > >> and
> > > >> > > hence not giving up ownership. Is our assumption
> > correct/incorrect?
> > > >> > > And if the rebalance gets triggered in p1 - how to figure out
> > apart
> > > >> from
> > > >> > > logs as the logs on p1 did not have anything.
> > > >> > >
> > > >> > > *2014-11-03 06:57:36 k.c.ZookeeperConsumerConnector [INFO]
> > > >> > > [topic_consumerIdString], waiting for the partition ownership to
> > be
> > > >> > > deleted: 11*
> > > >> > >
> > > >> > > During and after the rebalance failed on process p2, Partition
> > > >> Ownership
> > > >> > > was as below:
> > > >> > > 0-14 -> owner p1
> > > >> > > 15-19 -> none
> > > >> > > 20-29 -> owner p3
> > > >> > >
> > > >> > > This left the consumers in inconsistent state as 5 partitions
> were
> > > >> never
> > > >> > > consumer from and neither was the partitions ownership balanced.
> > > >> > >
> > > >> > > However, there was no conflict in creating the ephemeral node
> > which
> > > >> was
> > > >> > > the case last time. Just to note that the ephemeral node
> conflict
> > > >> which
> > > >> > we
> > > >> > > were seeing earlier also appeared after rebalance failed. My
> hunch
> > > is
> > > >> > that
> > > >> > > fixing the rebalance failure will fix that issue as well.
> > > >> > >
> > > >> > > -Thanks,
> > > >> > > Mohit
> > > >> > >
> > > >> > >
> > > >> > >
> > > >> > > On Mon, Oct 20, 2014 at 7:48 PM, Neha Narkhede <
> > > >> neha.narkh...@gmail.com>
> > > >> > > wrote:
> > > >> > >
> > > >> > >> Mohit,
> > > >> > >>
> > > >> > >> I wonder if it is related to
> > > >> > >> https://issues.apache.org/jira/browse/KAFKA-1585. When
> zookeeper
> > > >> > expires
> > > >> > >> a
> > > >> > >> session, it doesn't delete the ephemeral nodes immediately. So
> if
> > > you
> > > >> > end
> > > >> > >> up trying to recreate ephemeral nodes quickly, it could either
> be
> > > in
> > > >> the
> > > >> > >> valid latest session or from the previously expired session. If
> > you
> > > >> hit
> > > >> > >> this problem, then waiting would resolve it. But if not, then
> > this
> > > >> may
> > > >> > be
> > > >> > >> a
> > > >> > >> legitimate bug in ZK 3.4.6.
> > > >> > >>
> > > >> > >> Can you try shutting down all your consumers, waiting until
> > session
> > > >> > >> timeout
> > > >> > >> and restarting them?
> > > >> > >>
> > > >> > >> Thanks,
> > > >> > >> Neha
> > > >> > >>
> > > >> > >> On Mon, Oct 20, 2014 at 6:15 AM, Mohit Kathuria <
> > > >> mkathu...@sprinklr.com
> > > >> > >
> > > >> > >> wrote:
> > > >> > >>
> > > >> > >> > Dear Experts,
> > > >> > >> >
> > > >> > >> > We recently updated to kafka v0.8.1.1 with zookeeper v3.4.5.
> I
> > > >> have of
> > > >> > >> > topic with 30 partitions and 2 replicas. We are using High
> > level
> > > >> > >> consumer
> > > >> > >> > api.
> > > >> > >> > Each consumer process which is a storm topolofy has 5 streams
> > > which
> > > >> > >> > connects to 1 or more partitions. We are not using storm's
> > > inbuilt
> > > >> > kafka
> > > >> > >> > spout. Everything runs fine till the 5th consumer process(25
> > > >> streams)
> > > >> > is
> > > >> > >> > added for this topic.
> > > >> > >> >
> > > >> > >> > As soon as the sixth consumer process is added, the newly
> added
> > > >> > >> partition
> > > >> > >> > does not get the ownership of the partitions that it requests
> > for
> > > >> as
> > > >> > the
> > > >> > >> > already existing owners have not yet given up the ownership.
> > > >> > >> >
> > > >> > >> > We changed certain properties on consumer :
> > > >> > >> >
> > > >> > >> > 1. Max Rebalance attempts - 20 ( rebalance.backoff.ms *
> > > >> > >> > rebalance.max.retries >> zk connection timeout)
> > > >> > >> > 2. Back off ms between rebalances - 10000 (10seconds)
> > > >> > >> > 3. ZK connection timeout - 100,000 (100 seconds)
> > > >> > >> >
> > > >> > >> > Although when I am looking in the zookeeper shell when the
> > > >> rebalance
> > > >> > is
> > > >> > >> > happening, the consumer is registered fine on the zookeeper.
> > Just
> > > >> that
> > > >> > >> the
> > > >> > >> > rebalance does not happen.
> > > >> > >> > After the 20th rebalance gets completed, we get
> > > >> > >> >
> > > >> > >> >
> > > >> > >> > *2014-10-11 11:10:08 k.c.ZookeeperConsumerConnector [INFO]
> > > >> > >> > [rule-engine-feed_ip-10-0-2-170-1413025767369-4679959b],
> > > Committing
> > > >> > all
> > > >> > >> > offsets after clearing the fetcher queues*
> > > >> > >> > *2014-10-11 11:10:10 c.s.m.k.i.c.KafkaFeedStreamer [WARN]
> > > Ignoring
> > > >> > >> > exception while trying to start streamer threads:
> > > >> > >> > rule-engine-feed_ip-10-0-2-170-1413025767369-4679959b can't
> > > >> rebalance
> > > >> > >> after
> > > >> > >> > 20 retries*
> > > >> > >> > *kafka.common.ConsumerRebalanceFailedException:
> > > >> > >> > rule-engine-feed_ip-10-0-2-170-1413025767369-4679959b can't
> > > >> rebalance
> > > >> > >> after
> > > >> > >> > 20 retries*
> > > >> > >> > *        at
> > > >> > >> >
> > > >> > >> >
> > > >> > >>
> > > >> >
> > > >>
> > >
> >
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:432)
> > > >> > >> > ~[stormjar.jar:na]*
> > > >> > >> > *        at
> > > >> > >> >
> > > >> > >> >
> > > >> > >>
> > > >> >
> > > >>
> > >
> >
> kafka.consumer.ZookeeperConsumerConnector.kafka$consumer$ZookeeperConsumerConnector$$reinitializeConsumer(ZookeeperConsumerConnector.scala:722)
> > > >> > >> > ~[stormjar.jar:na]*
> > > >> > >> > *        at
> > > >> > >> >
> > > >> > >> >
> > > >> > >>
> > > >> >
> > > >>
> > >
> >
> kafka.consumer.ZookeeperConsumerConnector.consume(ZookeeperConsumerConnector.scala:212)
> > > >> > >> > ~[stormjar.jar:na]*
> > > >> > >> > *        at
> > > >> > >> >
> > > >> > >> >
> > > >> > >>
> > > >> >
> > > >>
> > >
> >
> kafka.javaapi.consumer.ZookeeperConsumerConnector.createMessageStreams(ZookeeperConsumerConnector.scala:80)
> > > >> > >> > ~[stormjar.jar:na]*
> > > >> > >> > *        at
> > > >> > >> >
> > > >> > >> >
> > > >> > >>
> > > >> >
> > > >>
> > >
> >
> com.spr.messaging.kafka.impl.consumer.KafkaFeedStreamer.createAndStartThreads(KafkaFeedStreamer.java:79)
> > > >> > >> > ~[stormjar.jar:na]*
> > > >> > >> > *        at
> > > >> > >> >
> > > >> > >> >
> > > >> > >>
> > > >> >
> > > >>
> > >
> >
> com.spr.messaging.kafka.impl.consumer.KafkaFeedStreamer.startKafkaStreamThreadsIfNecessary(KafkaFeedStreamer.java:64)
> > > >> > >> > ~[stormjar.jar:na]*
> > > >> > >> > *        at
> > > >> > >> >
> > > >> > >> >
> > > >> > >>
> > > >> >
> > > >>
> > >
> >
> com.spr.messaging.kafka.impl.consumer.KafkaFeedConsumerFactoryImpl.startStreamerIfNotRunning(KafkaFeedConsumerFactoryImpl.java:71)
> > > >> > >> > [stormjar.jar:na]*
> > > >> > >> > *        at
> > > >> > >> >
> > > >> > >> >
> > > >> > >>
> > > >> >
> > > >>
> > >
> >
> com.spr.messaging.kafka.impl.consumer.KafkaFeedPullConsumerImpl.startStreamerIfNotRunning(KafkaFeedPullConsumerImpl.java:48)
> > > >> > >> > [stormjar.jar:na]*
> > > >> > >> > *        at
> > > >> > >> >
> > > >> > >> >
> > > >> > >>
> > > >> >
> > > >>
> > >
> >
> com.spr.messaging.kafka.impl.KafkaFeedServiceImpl.getKafkaFeedPullConsumer(KafkaFeedServiceImpl.java:63)
> > > >> > >> > [stormjar.jar:na]*
> > > >> > >> > *        at
> > > >> > >> >
> > > >> > >> >
> > > >> > >>
> > > >> >
> > > >>
> > >
> >
> com.spr.storm.topology.spout.AbstractSprKafkaSpout.nextTuple(AbstractSprKafkaSpout.java:121)
> > > >> > >> > [stormjar.jar:na]*
> > > >> > >> > *        at
> > > >> > >> >
> > > >> > >> >
> > > >> > >>
> > > >> >
> > > >>
> > >
> >
> backtype.storm.daemon.executor$eval3848$fn__3849$fn__3864$fn__3893.invoke(executor.clj:562)
> > > >> > >> > [na:0.9.1-incubating]*
> > > >> > >> > *        at
> > > >> > backtype.storm.util$async_loop$fn__384.invoke(util.clj:433)
> > > >> > >> > [na:0.9.1-incubating]*
> > > >> > >> > *        at clojure.lang.AFn.run(AFn.java:24)
> > > >> [clojure-1.4.0.jar:na]*
> > > >> > >> > *        at java.lang.Thread.run(Thread.java:745)
> > [na:1.7.0_55]*
> > > >> > >> > *2014-10-11 11:10:10 k.c.ZookeeperConsumerConnector [INFO]
> > > >> > >> > [rule-engine-feed_ip-10-0-2-170-1413025767369-4679959b],
> begin
> > > >> > >> registering
> > > >> > >> > consumer
> rule-engine-feed_ip-10-0-2-170-1413025767369-4679959b
> > in
> > > >> ZK*
> > > >> > >> > *2014-10-11 11:10:10 k.u.ZkUtils$ [INFO] conflict in
> > > >> > >> >
> > > >> > >> >
> > > >> > >>
> > > >> >
> > > >>
> > >
> >
> /consumers/rule-engine-feed/ids/rule-engine-feed_ip-10-0-2-170-1413025767369-4679959b
> > > >> > >> > data:
> > > >> > >> >
> > > >> > >> >
> > > >> > >>
> > > >> >
> > > >>
> > >
> >
> {"version":1,"subscription":{"rule-engine-feed":5},"pattern":"static","timestamp":"1413025810635"}
> > > >> > >> > stored data:
> > > >> > >> >
> > > >> > >> >
> > > >> > >>
> > > >> >
> > > >>
> > >
> >
> {"version":1,"subscription":{"rule-engine-feed":5},"pattern":"static","timestamp":"1413025767483"}*
> > > >> > >> > *2014-10-11 11:10:10 k.u.ZkUtils$ [INFO] I wrote this
> > conflicted
> > > >> > >> ephemeral
> > > >> > >> > node
> > > >> > >> >
> > > >> > >> >
> > > >> > >>
> > > >> >
> > > >>
> > >
> >
> [{"version":1,"subscription":{"rule-engine-feed":5},"pattern":"static","timestamp":"1413025810635"}]
> > > >> > >> > at
> > > >> > >> >
> > > >> > >> >
> > > >> > >>
> > > >> >
> > > >>
> > >
> >
> /consumers/rule-engine-feed/ids/rule-engine-feed_ip-10-0-2-170-1413025767369-4679959b
> > > >> > >> > a while back in a different session, hence I will backoff for
> > > this
> > > >> > node
> > > >> > >> to
> > > >> > >> > be deleted by Zookeeper and retry*
> > > >> > >> >
> > > >> > >> > Due to this error, none of the consumer consumes from these
> > > >> partitions
> > > >> > >> in
> > > >> > >> > contention which creates a sort of skewed lag on kafka side.
> > > When
> > > >> the
> > > >> > >> 6th
> > > >> > >> > consumer was added, the existing owner process of the
> > partitions
> > > in
> > > >> > >> > question did not get rebalanced.
> > > >> > >> >
> > > >> > >> > Any help would be highly appreciated.
> > > >> > >> >
> > > >> > >> > -Thanks,
> > > >> > >> > Mohit
> > > >> > >> >
> > > >> > >>
> > > >> > >
> > > >> > >
> > > >> >
> > > >>
> > > >
> > > >
> > >
> > >
> > > --
> > > Thanks,
> > > Neha
> > >
> >
>
>
>
> --
> -Regards,
> Mayuresh R. Gharat
> (862) 250-7125
>

Reply via email to