Re: Does Kafka 0.9 can guarantee not loss data

2016-09-23 Thread Kafka
Oh please ignore my last reply.
I find if leaderReplica.highWatermark.messageOffset >= requiredOffset , this 
can ensure all replicas’ leo  in curInSyncReplicas is >=  the requiredOffset.

> 在 2016年9月23日,下午3:39,Kafka  写道:
> 
> OK, the example before is not enough to exposure problem.
> What will happen to the situation under the numAcks is 1,and 
> curInSyncReplica.size >= minIsr,but in fact the replica in curInSyncReplica 
> only have one replica has caught up to leader,
> and this replica is the leader replica itself,this is not safe when the 
> machine that deploys leader partition’s broker is restart. 
> 
> current code is as belows,
> if (minIsr <= curInSyncReplicas.size) {
>(true, ErrorMapping.NoError)
>  } else {
>(true, ErrorMapping.NotEnoughReplicasAfterAppendCode)
>  }
> 
> why not the code as belows,
> if (minIsr <= curInSyncReplicas.size && minIsr <= numAcks) {
>(true, ErrorMapping.NoError)
>  } else {
>(true, ErrorMapping.NotEnoughReplicasAfterAppendCode)
>  }
> 
> Its seems that only one condition in kafka broker’s code is not enough to 
> ensure safe,because replicas in curInSyncReplicas is not Strong 
> synchronization.
> 
>> 在 2016年9月23日,下午1:45,Becket Qin  写道:
>> 
>> In order to satisfy a produce response, there are two conditions:
>> A. The leader's high watermark should be higher than the requiredOffset
>> (max offset in that produce request of that partition)
>> B. The number of in sync replica is greater than min.isr.
>> 
>> The ultimate goal here is to make sure at least min.isr number of replicas
>> has caught up to requiredOffset. So the check is not only whether we have
>> enough number of replicas in the isr, but also whether those replicas in
>> the ISR has caught up to the required offset.
>> 
>> In your example, if numAcks is 0 and curInSyncReplica.size >= minIsr, the
>> produce response won't return if min.isr > 0, because
>> leaderReplica.highWatermark must be less than requiredOffset given the fact
>> that numAcks is 0. i.e. condition A is not met.
>> 
>> We are actually even doing a stronger than necessary check here.
>> Theoretically as long as min.isr number of replicas has caught up to
>> requiredOffset, we should be able to return the response, but we also
>> require those replicas to be in the ISR.
>> 
>> On Thu, Sep 22, 2016 at 8:15 PM, Kafka  wrote:
>> 
>>> @wangguozhang,could you give me some advices.
>>> 
 在 2016年9月22日,下午6:56,Kafka  写道:
 
 Hi all,
 in terms of topic, we create a topic with 6 partition,and each
>>> with 3 replicas.
  in terms of producer,when we send message with ack -1 using sync
>>> interface.
 in terms of brokers,we set min.insync.replicas to 2.
 
 after we review the kafka broker’s code,we know that we send a message
>>> to broker with ack -1, then we can get response if ISR of this partition is
>>> great than or equal to min.insync.replicas,but what confused
 me is replicas in ISR is not strongly consistent,in kafka 0.9 we use
>>> replica.lag.time.max.ms param to judge whether to shrink ISR, and the
>>> defaults is 1 ms, so replicas’ data in isr can lag 1ms at most,
 we we restart broker which own this partitions’ leader, then controller
>>> will start a new leader election, which will choose the first replica in
>>> ISR that not equals to current leader as new leader, then this will loss
>>> data.
 
 
 The main produce handle code shows below:
 val numAcks = curInSyncReplicas.count(r => {
if (!r.isLocal)
  if (r.logEndOffset.messageOffset >= requiredOffset) {
trace("Replica %d of %s-%d received offset
>>> %d".format(r.brokerId, topic, partitionId, requiredOffset))
true
  }
  else
false
else
  true /* also count the local (leader) replica */
  })
 
  trace("%d acks satisfied for %s-%d with acks =
>>> -1".format(numAcks, topic, partitionId))
 
  val minIsr = leaderReplica.log.get.config.minInSyncReplicas
 
  if (leaderReplica.highWatermark.messageOffset >= requiredOffset
>>> ) {
/*
* The topic may be configured not to accept messages if there
>>> are not enough replicas in ISR
* in this scenario the request was already appended locally and
>>> then added to the purgatory before the ISR was shrunk
*/
if (minIsr <= curInSyncReplicas.size) {
  (true, ErrorMapping.NoError)
} else {
  (true, ErrorMapping.NotEnoughReplicasAfterAppendCode)
}
  } else
(false, ErrorMapping.NoError)
 
 
 why only logging unAcks and not use numAcks to compare with minIsr, if
>>> numAcks is 0, but curInSyncReplicas.size >= minIsr, then this will 

Re: Does Kafka 0.9 can guarantee not loss data

2016-09-23 Thread Kafka
OK, the example before is not enough to exposure problem.
What will happen to the situation under the numAcks is 1,and 
curInSyncReplica.size >= minIsr,but in fact the replica in curInSyncReplica 
only have one replica has caught up to leader,
and this replica is the leader replica itself,this is not safe when the machine 
that deploys leader partition’s broker is restart. 

current code is as belows,
if (minIsr <= curInSyncReplicas.size) {
(true, ErrorMapping.NoError)
  } else {
(true, ErrorMapping.NotEnoughReplicasAfterAppendCode)
  }

why not the code as belows,
if (minIsr <= curInSyncReplicas.size && minIsr <= numAcks) {
(true, ErrorMapping.NoError)
  } else {
(true, ErrorMapping.NotEnoughReplicasAfterAppendCode)
  }

Its seems that only one condition in kafka broker’s code is not enough to 
ensure safe,because replicas in curInSyncReplicas is not Strong synchronization.

> 在 2016年9月23日,下午1:45,Becket Qin  写道:
> 
> In order to satisfy a produce response, there are two conditions:
> A. The leader's high watermark should be higher than the requiredOffset
> (max offset in that produce request of that partition)
> B. The number of in sync replica is greater than min.isr.
> 
> The ultimate goal here is to make sure at least min.isr number of replicas
> has caught up to requiredOffset. So the check is not only whether we have
> enough number of replicas in the isr, but also whether those replicas in
> the ISR has caught up to the required offset.
> 
> In your example, if numAcks is 0 and curInSyncReplica.size >= minIsr, the
> produce response won't return if min.isr > 0, because
> leaderReplica.highWatermark must be less than requiredOffset given the fact
> that numAcks is 0. i.e. condition A is not met.
> 
> We are actually even doing a stronger than necessary check here.
> Theoretically as long as min.isr number of replicas has caught up to
> requiredOffset, we should be able to return the response, but we also
> require those replicas to be in the ISR.
> 
> On Thu, Sep 22, 2016 at 8:15 PM, Kafka  wrote:
> 
>> @wangguozhang,could you give me some advices.
>> 
>>> 在 2016年9月22日,下午6:56,Kafka  写道:
>>> 
>>> Hi all,
>>>  in terms of topic, we create a topic with 6 partition,and each
>> with 3 replicas.
>>>   in terms of producer,when we send message with ack -1 using sync
>> interface.
>>>  in terms of brokers,we set min.insync.replicas to 2.
>>> 
>>> after we review the kafka broker’s code,we know that we send a message
>> to broker with ack -1, then we can get response if ISR of this partition is
>> great than or equal to min.insync.replicas,but what confused
>>> me is replicas in ISR is not strongly consistent,in kafka 0.9 we use
>> replica.lag.time.max.ms param to judge whether to shrink ISR, and the
>> defaults is 1 ms, so replicas’ data in isr can lag 1ms at most,
>>> we we restart broker which own this partitions’ leader, then controller
>> will start a new leader election, which will choose the first replica in
>> ISR that not equals to current leader as new leader, then this will loss
>> data.
>>> 
>>> 
>>> The main produce handle code shows below:
>>> val numAcks = curInSyncReplicas.count(r => {
>>> if (!r.isLocal)
>>>   if (r.logEndOffset.messageOffset >= requiredOffset) {
>>> trace("Replica %d of %s-%d received offset
>> %d".format(r.brokerId, topic, partitionId, requiredOffset))
>>> true
>>>   }
>>>   else
>>> false
>>> else
>>>   true /* also count the local (leader) replica */
>>>   })
>>> 
>>>   trace("%d acks satisfied for %s-%d with acks =
>> -1".format(numAcks, topic, partitionId))
>>> 
>>>   val minIsr = leaderReplica.log.get.config.minInSyncReplicas
>>> 
>>>   if (leaderReplica.highWatermark.messageOffset >= requiredOffset
>> ) {
>>> /*
>>> * The topic may be configured not to accept messages if there
>> are not enough replicas in ISR
>>> * in this scenario the request was already appended locally and
>> then added to the purgatory before the ISR was shrunk
>>> */
>>> if (minIsr <= curInSyncReplicas.size) {
>>>   (true, ErrorMapping.NoError)
>>> } else {
>>>   (true, ErrorMapping.NotEnoughReplicasAfterAppendCode)
>>> }
>>>   } else
>>> (false, ErrorMapping.NoError)
>>> 
>>> 
>>> why only logging unAcks and not use numAcks to compare with minIsr, if
>> numAcks is 0, but curInSyncReplicas.size >= minIsr, then this will return,
>> as ISR shrink procedure is not real time, does this will loss data after
>> leader election?
>>> 
>>> Feedback is greatly appreciated. Thanks.
>>> meituan.inf
>>> 
>>> 
>>> 
>> 
>> 
>> 




Re: Does Kafka 0.9 can guarantee not loss data

2016-09-22 Thread Becket Qin
In order to satisfy a produce response, there are two conditions:
A. The leader's high watermark should be higher than the requiredOffset
(max offset in that produce request of that partition)
B. The number of in sync replica is greater than min.isr.

The ultimate goal here is to make sure at least min.isr number of replicas
has caught up to requiredOffset. So the check is not only whether we have
enough number of replicas in the isr, but also whether those replicas in
the ISR has caught up to the required offset.

In your example, if numAcks is 0 and curInSyncReplica.size >= minIsr, the
produce response won't return if min.isr > 0, because
leaderReplica.highWatermark must be less than requiredOffset given the fact
that numAcks is 0. i.e. condition A is not met.

We are actually even doing a stronger than necessary check here.
Theoretically as long as min.isr number of replicas has caught up to
requiredOffset, we should be able to return the response, but we also
require those replicas to be in the ISR.

On Thu, Sep 22, 2016 at 8:15 PM, Kafka  wrote:

> @wangguozhang,could you give me some advices.
>
> > 在 2016年9月22日,下午6:56,Kafka  写道:
> >
> > Hi all,
> >   in terms of topic, we create a topic with 6 partition,and each
> with 3 replicas.
> >in terms of producer,when we send message with ack -1 using sync
> interface.
> >   in terms of brokers,we set min.insync.replicas to 2.
> >
> > after we review the kafka broker’s code,we know that we send a message
> to broker with ack -1, then we can get response if ISR of this partition is
> great than or equal to min.insync.replicas,but what confused
> > me is replicas in ISR is not strongly consistent,in kafka 0.9 we use
> replica.lag.time.max.ms param to judge whether to shrink ISR, and the
> defaults is 1 ms, so replicas’ data in isr can lag 1ms at most,
> > we we restart broker which own this partitions’ leader, then controller
> will start a new leader election, which will choose the first replica in
> ISR that not equals to current leader as new leader, then this will loss
> data.
> >
> >
> > The main produce handle code shows below:
> > val numAcks = curInSyncReplicas.count(r => {
> >  if (!r.isLocal)
> >if (r.logEndOffset.messageOffset >= requiredOffset) {
> >  trace("Replica %d of %s-%d received offset
> %d".format(r.brokerId, topic, partitionId, requiredOffset))
> >  true
> >}
> >else
> >  false
> >  else
> >true /* also count the local (leader) replica */
> >})
> >
> >trace("%d acks satisfied for %s-%d with acks =
> -1".format(numAcks, topic, partitionId))
> >
> >val minIsr = leaderReplica.log.get.config.minInSyncReplicas
> >
> >if (leaderReplica.highWatermark.messageOffset >= requiredOffset
> ) {
> >  /*
> >  * The topic may be configured not to accept messages if there
> are not enough replicas in ISR
> >  * in this scenario the request was already appended locally and
> then added to the purgatory before the ISR was shrunk
> >  */
> >  if (minIsr <= curInSyncReplicas.size) {
> >(true, ErrorMapping.NoError)
> >  } else {
> >(true, ErrorMapping.NotEnoughReplicasAfterAppendCode)
> >  }
> >} else
> >  (false, ErrorMapping.NoError)
> >
> >
> > why only logging unAcks and not use numAcks to compare with minIsr, if
> numAcks is 0, but curInSyncReplicas.size >= minIsr, then this will return,
> as ISR shrink procedure is not real time, does this will loss data after
> leader election?
> >
> > Feedback is greatly appreciated. Thanks.
> > meituan.inf
> >
> >
> >
>
>
>


Re: Does Kafka 0.9 can guarantee not loss data

2016-09-22 Thread Kafka
@wangguozhang,could you give me some advices.

> 在 2016年9月22日,下午6:56,Kafka  写道:
> 
> Hi all,   
>   in terms of topic, we create a topic with 6 partition,and each with 3 
> replicas.
>in terms of producer,when we send message with ack -1 using sync 
> interface.
>   in terms of brokers,we set min.insync.replicas to 2.
> 
> after we review the kafka broker’s code,we know that we send a message to 
> broker with ack -1, then we can get response if ISR of this partition is 
> great than or equal to min.insync.replicas,but what confused
> me is replicas in ISR is not strongly consistent,in kafka 0.9 we use 
> replica.lag.time.max.ms param to judge whether to shrink ISR, and the 
> defaults is 1 ms, so replicas’ data in isr can lag 1ms at most,
> we we restart broker which own this partitions’ leader, then controller will 
> start a new leader election, which will choose the first replica in ISR that 
> not equals to current leader as new leader, then this will loss data.
> 
> 
> The main produce handle code shows below:
> val numAcks = curInSyncReplicas.count(r => {
>  if (!r.isLocal)
>if (r.logEndOffset.messageOffset >= requiredOffset) {
>  trace("Replica %d of %s-%d received offset 
> %d".format(r.brokerId, topic, partitionId, requiredOffset))
>  true
>}
>else
>  false
>  else
>true /* also count the local (leader) replica */
>})
> 
>trace("%d acks satisfied for %s-%d with acks = -1".format(numAcks, 
> topic, partitionId))
> 
>val minIsr = leaderReplica.log.get.config.minInSyncReplicas
> 
>if (leaderReplica.highWatermark.messageOffset >= requiredOffset ) {
>  /*
>  * The topic may be configured not to accept messages if there are 
> not enough replicas in ISR
>  * in this scenario the request was already appended locally and then 
> added to the purgatory before the ISR was shrunk
>  */
>  if (minIsr <= curInSyncReplicas.size) {
>(true, ErrorMapping.NoError)
>  } else {
>(true, ErrorMapping.NotEnoughReplicasAfterAppendCode)
>  }
>} else
>  (false, ErrorMapping.NoError)
> 
> 
> why only logging unAcks and not use numAcks to compare with minIsr, if 
> numAcks is 0, but curInSyncReplicas.size >= minIsr, then this will return, as 
> ISR shrink procedure is not real time, does this will loss data after leader 
> election?
> 
> Feedback is greatly appreciated. Thanks.
> meituan.inf
> 
> 
> 




Does Kafka 0.9 can guarantee not loss data

2016-09-22 Thread Kafka
Hi all, 
in terms of topic, we create a topic with 6 partition,and each with 3 
replicas.
in terms of producer,when we send message with ack -1 using sync 
interface.
in terms of brokers,we set min.insync.replicas to 2.

after we review the kafka broker’s code,we know that we send a message to 
broker with ack -1, then we can get response if ISR of this partition is great 
than or equal to min.insync.replicas,but what confused
me is replicas in ISR is not strongly consistent,in kafka 0.9 we use 
replica.lag.time.max.ms param to judge whether to shrink ISR, and the defaults 
is 1 ms, so replicas’ data in isr can lag 1ms at most,
we we restart broker which own this partitions’ leader, then controller will 
start a new leader election, which will choose the first replica in ISR that 
not equals to current leader as new leader, then this will loss data.


The main produce handle code shows below:
val numAcks = curInSyncReplicas.count(r => {
  if (!r.isLocal)
if (r.logEndOffset.messageOffset >= requiredOffset) {
  trace("Replica %d of %s-%d received offset %d".format(r.brokerId, 
topic, partitionId, requiredOffset))
  true
}
else
  false
  else
true /* also count the local (leader) replica */
})

trace("%d acks satisfied for %s-%d with acks = -1".format(numAcks, 
topic, partitionId))

val minIsr = leaderReplica.log.get.config.minInSyncReplicas

if (leaderReplica.highWatermark.messageOffset >= requiredOffset ) {
  /*
  * The topic may be configured not to accept messages if there are not 
enough replicas in ISR
  * in this scenario the request was already appended locally and then 
added to the purgatory before the ISR was shrunk
  */
  if (minIsr <= curInSyncReplicas.size) {
(true, ErrorMapping.NoError)
  } else {
(true, ErrorMapping.NotEnoughReplicasAfterAppendCode)
  }
} else
  (false, ErrorMapping.NoError)


why only logging unAcks and not use numAcks to compare with minIsr, if numAcks 
is 0, but curInSyncReplicas.size >= minIsr, then this will return, as ISR 
shrink procedure is not real time, does this will loss data after leader 
election?

Feedback is greatly appreciated. Thanks.
meituan.inf