I think the root problem is that replicas are falling behind and hence
are effectively "failed" under normal load and also that you have
unclean leader election enabled which "solves" this catastrophic
failure by electing new leaders without complete data.

Starting in 0.8.2 you will be able to selectively disable unclean
leader election.

The root problem for the spuriously failing replicas is the
configuration replica.lag.max.messages. This configuration defaults to
4000. But throughput can be really high, like a million messages per
second. At a million messages per second, 4k messages of lag is only
4ms behind, which can happen for all kinds of reasons (e.g. just
normal linux i/o latency jitter).

Jiang, I suspect you can resolve your issue by just making this higher.

However, raising this setting is not a panacea. The higher you raise
it the longer it will take to detect a partition that is actually
falling behind.

We have been discussing this setting, and if you think about it the
setting is actually somewhat impossible to set right in a cluster
which has both low volume and high volume topics/partitions. For the
low-volume topic it will take a very long time to detect a lagging
replica, and for the high-volume topic it will have false-positives.
One approach to making this easier would be to have the configuration
be something like replica.lag.max.ms and translate this into a number
of messages dynamically based on the throughput of the partition.

-Jay


On Fri, Jul 11, 2014 at 2:55 PM, Jiang Wu (Pricehistory) (BLOOMBERG/
731 LEX -) <jwu...@bloomberg.net> wrote:
> Hi Guozhang,
>
> KAFKA-1537 is created. https://issues.apache.org/jira/i#browse/KAFKA-1537
>
> I'll try to see if I'm able to submit a patch for this, but cannot commit a 
> date, so please feel free to assign it to others.
>
> Regards,
> Jiang
> ----- Original Message -----
> From: wangg...@gmail.com
> To: JIANG WU (PRICEHISTORY) (BLOOMBERG/ 731 LEX -), users@kafka.apache.org
> At: Jul 11 2014 16:42:55
>
> Hello Jiang,
>
> That is a valid point. The reason we design ack=-1 to be "receive acks from
> replicas in ISR" is basically trading consistency for availability. I think
> instead of change it meaning, we could add another ack, -2 for instance, to
> specify "receive acks from all replicas" as a favor of consistency.
>
> Since you already did this much investigation would you like to file a JIRA
> and submit a patch for this?
>
> Guozhang
>
>
> On Fri, Jul 11, 2014 at 11:49 AM, Jiang Wu (Pricehistory) (BLOOMBERG/ 731
> LEX -) <jwu...@bloomberg.net> wrote:
>
>> Hi,
>> I'm doing stress and failover tests on a 3 node 0.8.1.1 kafka cluster and
>> have the following observations.
>>
>> A topic is created with 1 partition and 3 replications.
>> request.required.acks is set to -1 for a sync producer. When the publishing
>> speed is high (3M messages, each 2000 bytes, published in lists of size
>> 2000), the two followers will fail out of sync. Only the leader remains in
>> ISR. But the producer can keep sending. If the leader is killed with CTR_C,
>> one follower will become leader, but message loss will happen because of
>> the unclean leader election.
>>
>> In the same test, request.required.acks=3 gives the desired result.
>> Followers will fail out of sync, but the producer will be blocked untill
>> all followers back to ISR. No data loss is observed in this case.
>>
>> From the code, this turns out to be how it's designed:
>> if ((requiredAcks < 0 && numAcks >= inSyncReplicas.size) ||
>> (requiredAcks > 0 && numAcks >= requiredAcks)) {
>> /*
>> * requiredAcks < 0 means acknowledge after all replicas in ISR
>> * are fully caught up to the (local) leader's offset
>> * corresponding to this produce request.
>> */
>> (true, ErrorMapping.NoError)
>> }
>>
>> I'm wondering if it's more reasonable to let request.required.acks=-1 mean
>> "receive acks from all replicas" instead of "receive acks from replicas in
>> ISR"? As in the above test, follower will fail out sync under high
>> publishing volume; that makes request.required.acks=-1 equivalent to
>> request.required.acks=1. Since the kafka document states
>> request.required.acks=-1 provides the best durability, one would expect it
>> is equivalent to request.required.acks=number_of_replications.
>>
>> Regards,
>> Jiang
>
>
>
>
> --
> -- Guozhang

Reply via email to