[jira] [Updated] (KAFKA-4485) Follower should be in the isr if its FetchRequest has fetched up to the logEndOffset of leader

2016-12-07 Thread Dong Lin (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4485?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dong Lin updated KAFKA-4485:

Description: 
As of current implementation, we will exclude follower from ISR if the begin 
offset of FetchRequest from this follower is smaller than logEndOffset of 
leader for more than replicaLagTimeMaxMs. Also, we will add a follower to ISR 
if the beginOffset of FetchRequest from this follower is equal or larger than 
high watermark of this partition.

This is problematic for the following reasons:

1) The criteria for ISR is inconsistent between maybeExpandIsr() and 
maybeShrinkIsr(). Therefore a follower may be repeatedly remove and added to 
the ISR (e.g. in the scenario described below).

2) A follower may be removed from the ISR even if its fetch rate can keep up 
with produce rate. Suppose a produce keeps producing a lot of small requests at 
high request rate but low byte rate (e.g. many mirror makers), and the follower 
is always able to read all the available data at the time leader receives it. 
However, the begin offset of fetch request will always be smaller than 
logEndOffset of leader. Thus the follower will be removed from ISR after 
replicaLagTimeMaxMs.

In the following we describe the solution to this problem.

Terminology:
- Definition of replica lag: we say a replica lags behind leader by X ms if its 
current log end offset if equivalent to the log end offset of leader X ms ago.
- Definition of pseudo-ISR set: pseudo-ISR set of a partition = { replica | 
replica belongs to the given partition AND replica's lag <= replicaLagTimeMaxMs}
- Definition of high-watermark of a partition: high-watermark of a partition is 
the max(current high-watermark of the partition, min(offset of replicas in the 
pseudo-ISR set of this partition))
- Definition of ISR set: ISR set of a partition = {replica | replica is in 
pseudo-ISR set of the given partition AND log end offset of replica >= 
high-watermark of the given partition}

Guarantee:
1) If a follower is close enough to the replica in the sense that its replica 
lag <= replicaLagTimeMaxMs, then this follower will be in the pseudo-ISR set. 
Thus the high-watermark will stop to increase until this follower's log end 
offset >= high-watermark, at which moment this follower will be added to the 
ISR set. This allows us the solve the 2nd problem described above.
2) If a follower lags behind leader for more than X ms, it will be removed out 
of ISR set.
3) High watermark of a partition will never decrease.
4) For any replica in ISR set, its log end offset >= high-watermark. 

Implementation:
1) For each follower, the leader keeps track of the time of the last fetch 
request from this follower. Let's call it lastFetchTime. In addition, the 
leader also maintains the log end offset of the leader at the lastFetchTime for 
each follower. Let's call it lastFetchLeaderLEO. Both variables will be updated 
after leader has processed a FetchRequest from a follower.
2) When leader receives FetchRequest from a follower, if begin offset of the 
FetchRequest >= current leader's LEO, follower's lastCatchUpTimeMs will be set 
to current system time. Otherwise, if begin offset of the FetchRequest >= 
lastFetchLeaderLEO, follower's lastCatchUpTimeMs will be set to lastFetchTime. 
Replica's lag = current system time - lastCatchUpTimeMs.
3) The leader can update pseudo-ISR set, high-watermark and ISR set of the 
partition based on the lag of replicas of this partition, according to the 
definition described above.





  was:
As of current implementation, we will exclude follower from ISR if the begin 
offset of FetchRequest from this follower is smaller than logEndOffset of 
leader for more than replicaLagTimeMaxMs. Also, we will add a follower to ISR 
if the beginOffset of FetchRequest from this follower is equal or larger than 
high watermark of this partition.

This is problematic for the following reasons:

1) The criteria for ISR is inconsistent between maybeExpandIsr() and 
maybeShrinkIsr(). Therefore a follower may be repeatedly remove and added to 
the ISR (e.g. in the scenario described below).

2) A follower may be removed from the ISR even if its fetch rate can keep up 
with produce rate. Suppose a produce keeps producing a lot of small requests at 
high request rate but low byte rate (e.g. many mirror makers), and the follower 
is always able to read all the available data at the time leader receives it. 
However, the begin offset of fetch request will always be smaller than 
logEndOffset of leader. Thus the follower will be removed from ISR after 
replicaLagTimeMaxMs.

In the following we describe the solution to this problem.

Terminology:
- Definition of replica lag: we say a replica lags behind leader by X ms if its 
current log end offset if equivalent to the log end offset of leader X ms ago.
- Definition of pseudo-ISR set: pseudo-ISR set of a partition = { 

[jira] [Updated] (KAFKA-4485) Follower should be in the isr if its FetchRequest has fetched up to the logEndOffset of leader

2016-12-07 Thread Dong Lin (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4485?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dong Lin updated KAFKA-4485:

Description: 
As of current implementation, we will exclude follower from ISR if the begin 
offset of FetchRequest from this follower is smaller than logEndOffset of 
leader for more than replicaLagTimeMaxMs. Also, we will add a follower to ISR 
if the beginOffset of FetchRequest from this follower is equal or larger than 
high watermark of this partition.

This is problematic for the following reasons:

1) The criteria for ISR is inconsistent between maybeExpandIsr() and 
maybeShrinkIsr(). Therefore a follower may be repeatedly remove and added to 
the ISR (e.g. in the scenario described below).

2) A follower may be removed from the ISR even if its fetch rate can keep up 
with produce rate. Suppose a produce keeps producing a lot of small requests at 
high request rate but low byte rate (e.g. many mirror makers), and the follower 
is always able to read all the available data at the time leader receives it. 
However, the begin offset of fetch request will always be smaller than 
logEndOffset of leader. Thus the follower will be removed from ISR after 
replicaLagTimeMaxMs.

In the following we describe the solution to this problem.

Terminology:
- Definition of replica lag: we say a replica lags behind leader by X ms if its 
current log end offset if equivalent to the log end offset of leader X ms ago.
- Definition of pseudo-ISR set: pseudo-ISR set of a partition = { replica | 
replica belongs to the given partition AND replica's lag <= replicaLagTimeMaxMs}
- Definition of high-watermark of a partition: high-watermark of a partition is 
the max(current high-watermark of the partition, min(offset of replicas in the 
pseudo-ISR set of this partition))
- Definition of ISR set: ISR set of a partition = {replica | replica is in 
pseudo-ISR set of the given partition AND log end offset of replica >= 
high-watermark of the given partition}

Guarantee:
1) If a follower is close enough to the replica in the sense that its replica 
lag <= replicaLagTimeMaxMs, then this follower will be in the pseudo-ISR set. 
Thus the high-watermark will stop to increase until this follower's log end 
offset >= high-watermark, at which moment this follower will be added to the 
ISR set. This allows us the solve the 2nd problem described above.
2) If a follower lags behind leader for more than X ms, it will be removed out 
of ISR set.
3) High watermark of a partition will never decrease.
4) For any replica in ISR set, its log end offset >= high-watermark. 

Implementation:
1) For each follower, the leader keeps track of the time of the last fetch 
request from this follower. Let's call it lastFetchTime. In addition, the 
leader also maintains the log end offset of the leader at the lastFetchTime for 
each follower. Let's call it lastFetchLeaderLEO. Both variables will be updated 
after leader has processed a FetchRequest from a follower.
2) When leader receives FetchRequest from a follower, it will set the 
follower's lag to lastFetchTime if begin offset of the FetchRequest >= 
lastFetchLeaderLEO.
3) The leader can update pseudo-ISR set, high-watermark and ISR set of the 
partition based on the lag of replicas of this partition, according to the 
definition described above.





  was:
As of current implementation, we will exclude follower from ISR if the begin 
offset of FetchRequest from this follower is always smaller than logEndOffset 
of leader for more than replicaLagTimeMaxMs.

Also, we will add a follower to ISR if the beginOffset of FetchRequest from 
this follower is equal or larger than high watermark of this partition.

This is problematic for the following reasons:

1) The criteria for ISR is inconsistent between maybeExpandIsr() and 
maybeShrinkIsr(). A follower may be repeatedly remove and added to the ISR 
(e.g. in the scenario described below).

2) A follower may be removed from the ISR even if its fetch rate can keep up 
with produce rate. Suppose a produce keeps producing a lot of small requests at 
high request rate but low byte rate (e.g. many mirror makers), and the follower 
is always able to read all the available data at the time leader receives it. 
However, the begin offset of fetch request will always be smaller than 
logEndOffset of leader. Thus the follower will be removed from ISR after 
replicaLagTimeMaxMs.

The solution to the problem is the following:

A follower should be in ISR if begin offset of its FetchRequest >= max(high 
watermark of partition, log end offset of leader at the time the leader 
receives the previous FetchRequest). The follower should be removed from ISR if 
this criteria is not met for more than replicaLagTimeMaxMs. Note that we are 
comparing begin offset of FetchRequest with log end offset of leader at the 
time the leader receives the previous FetchRequest as an approximate way to 

[jira] [Updated] (KAFKA-4485) Follower should be in the isr if its FetchRequest has fetched up to the logEndOffset of leader

2016-12-06 Thread Jiangjie Qin (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4485?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiangjie Qin updated KAFKA-4485:

Affects Version/s: 0.10.1.0
 Reviewer: Jiangjie Qin
Fix Version/s: 0.10.2.0

> Follower should be in the isr if its FetchRequest has fetched up to the 
> logEndOffset of leader
> --
>
> Key: KAFKA-4485
> URL: https://issues.apache.org/jira/browse/KAFKA-4485
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.1.0
>Reporter: Dong Lin
>Assignee: Dong Lin
> Fix For: 0.10.2.0
>
>
> As of current implementation, we will exclude follower from ISR if the begin 
> offset of FetchRequest from this follower is always smaller than logEndOffset 
> of leader for more than replicaLagTimeMaxMs.
> Also, we will add a follower to ISR if the beginOffset of FetchRequest from 
> this follower is equal or larger than high watermark of this partition.
> This is problematic for the following reasons:
> 1) The criteria for ISR is inconsistent between maybeExpandIsr() and 
> maybeShrinkIsr(). A follower may be repeatedly remove and added to the ISR 
> (e.g. in the scenario described below).
> 2) A follower may be removed from the ISR even if its fetch rate can keep up 
> with produce rate. Suppose a produce keeps producing a lot of small requests 
> at high request rate but low byte rate (e.g. many mirror makers), and the 
> follower is always able to read all the available data at the time leader 
> receives it. However, the begin offset of fetch request will always be 
> smaller than logEndOffset of leader. Thus the follower will be removed from 
> ISR after replicaLagTimeMaxMs.
> The solution to the problem is the following:
> A follower should be in ISR if begin offset of its FetchRequest >= max(high 
> watermark of partition, log end offset of leader at the time the leader 
> receives the previous FetchRequest). The follower should be removed from ISR 
> if this criteria is not met for more than replicaLagTimeMaxMs. Note that we 
> are comparing begin offset of FetchRequest with log end offset of leader at 
> the time the leader receives the previous FetchRequest as an approximate way 
> to compare the end offset of fetched data with log end offset of leader. This 
> is because we can not easily know the end offset of fetched data at the time 
> broker receives fetch request.
> This solution makes the following guarantee:
> 1) If a follower is in ISR, then its log end offset >= high watermark of 
> partition at least sometime in the last replicaLagTimeMaxMs.
> 2) If a follower is not in ISR, then the end offset of its FetchRequest can 
> not catch up with log end offset of leader for more than replicaLagTimeMaxMs. 
> Either follower is in bootstrap phase, or the follower's average fetch rate 
> is smaller than average produce rate into the partition for the last 
> replicaLagTimeMaxMs.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-4485) Follower should be in the isr if its FetchRequest has fetched up to the logEndOffset of leader

2016-12-05 Thread Dong Lin (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4485?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dong Lin updated KAFKA-4485:

Description: 
As of current implementation, we will exclude follower from ISR if the begin 
offset of FetchRequest from this follower is always smaller than logEndOffset 
of leader for more than replicaLagTimeMaxMs.

Also, we will add a follower to ISR if the beginOffset of FetchRequest from 
this follower is equal or larger than high watermark of this partition.

This is problematic for the following reasons:

1) The criteria for ISR is inconsistent between maybeExpandIsr() and 
maybeShrinkIsr(). A follower may be repeatedly remove and added to the ISR 
(e.g. in the scenario described below).

2) A follower may be removed from the ISR even if its fetch rate can keep up 
with produce rate. Suppose a produce keeps producing a lot of small requests at 
high request rate but low byte rate (e.g. many mirror makers), and the follower 
is always able to read all the available data at the time leader receives it. 
However, the begin offset of fetch request will always be smaller than 
logEndOffset of leader. Thus the follower will be removed from ISR after 
replicaLagTimeMaxMs.

The solution to the problem is the following:

A follower should be in ISR if begin offset of its FetchRequest >= max(high 
watermark of partition, log end offset of leader at the time the leader 
receives the previous FetchRequest). The follower should be removed from ISR if 
this criteria is not met for more than replicaLagTimeMaxMs. Note that we are 
comparing begin offset of FetchRequest with log end offset of leader at the 
time the leader receives the previous FetchRequest as an approximate way to 
compare the end offset of fetched data with log end offset of leader. This is 
because we can not easily know the end offset of fetched data at the time 
broker receives fetch request.

This solution makes the following guarantee:

1) If a follower is in ISR, then its log end offset >= high watermark of 
partition at least sometime in the last replicaLagTimeMaxMs.

2) If a follower is not in ISR, then the end offset of its FetchRequest can not 
catch up with log end offset of leader for more than replicaLagTimeMaxMs. 
Either follower is in bootstrap phase, or the follower's average fetch rate is 
smaller than average produce rate into the partition for the last 
replicaLagTimeMaxMs.








  was:
As of current implementation, we will exclude follower from ISR if the begin 
offset of FetchRequest from this follower is always smaller than logEndOffset 
of leader for more than replicaLagTimeMaxMs.

Also, we will add a follower to ISR if the beginOffset of FetchRequest from 
this follower is equal or larger than high watermark of this partition.

This is problematic for the following reasons:

1) The criteria for ISR is inconsistent between maybeExpandIsr() and 
maybeShrinkIsr(). A follower may be repeatedly remove and added to the ISR 
(e.g. in the scenario described below).

2) A follower may be removed from the ISR even if its fetch rate can keep up 
with produce rate. Suppose a produce keeps producing a lot of small requests at 
high request but low byte rate, the fetch request is always able to read all 
the available data at the time leader receives it. However, the begin offset of 
fetch request will always be smaller than logEndOffset of leader. Thus the 
follower will be removed from ISR.

The solution to the problem is the following:

A follower should be in ISR if begin offset of its FetchRequest >= max(high 
watermark of partition, log end offset of leader at the time the leader 
receives the previous FetchRequest). The follower should be removed from ISR if 
this criteria is not met for more than replicaLagTimeMaxMs.

This solution makes the following guarantee:

1) If a follower is in ISR, then its log end offset >= high watermark of 
partition at least sometime in the last replicaLagTimeMaxMs.

2) If a follower is not in ISR, then the end offset of its FetchRequest can not 
catch up with log end offset of leader for more than replicaLagTimeMaxMs. 
Either follower is in bootstrap phase, or the follower's average fetch rate < 
produce rate into the partition for more than replicaLagTimeMaxMs.









> Follower should be in the isr if its FetchRequest has fetched up to the 
> logEndOffset of leader
> --
>
> Key: KAFKA-4485
> URL: https://issues.apache.org/jira/browse/KAFKA-4485
> Project: Kafka
>  Issue Type: Bug
>Reporter: Dong Lin
>Assignee: Dong Lin
>
> As of current implementation, we will exclude follower from ISR if the begin 
> offset of FetchRequest from this follower is always smaller than logEndOffset 
> of leader for more than replicaLagTimeMaxMs.
> Also, we will 

[jira] [Updated] (KAFKA-4485) Follower should be in the isr if its FetchRequest has fetched up to the logEndOffset of leader

2016-12-02 Thread Dong Lin (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4485?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dong Lin updated KAFKA-4485:

Description: 
As of current implementation, we will exclude follower from ISR if the begin 
offset of FetchRequest from this follower is always smaller than logEndOffset 
of leader for more than replicaLagTimeMaxMs.

Also, we will add a follower to ISR if the beginOffset of FetchRequest from 
this follower is equal or larger than high watermark of this partition.

This is problematic for the following reasons:

1) The criteria for ISR is inconsistent between maybeExpandIsr() and 
maybeShrinkIsr(). A follower may be repeatedly remove and added to the ISR 
(e.g. in the scenario described below).

2) A follower may be removed from the ISR even if its fetch rate can keep up 
with produce rate. Suppose a produce keeps producing a lot of small requests at 
high request but low byte rate, the fetch request is always able to read all 
the available data at the time leader receives it. However, the begin offset of 
fetch request will always be smaller than logEndOffset of leader. Thus the 
follower will be removed from ISR.

The solution to the problem is the following:

A follower should be in ISR if begin offset of its FetchRequest >= max(high 
watermark of partition, log end offset of leader at the time the leader 
receives the previous FetchRequest). The follower should be removed from ISR if 
this criteria is not met for more than replicaLagTimeMaxMs.

This solution makes the following guarantee:

1) If a follower is in ISR, then its log end offset >= high watermark of 
partition at least sometime in the last replicaLagTimeMaxMs.

2) If a follower is not in ISR, then the end offset of its FetchRequest can not 
catch up with log end offset of leader for more than replicaLagTimeMaxMs. 
Either follower is in bootstrap phase, or the follower's average fetch rate < 
produce rate into the partition for more than replicaLagTimeMaxMs.








  was:
As of current implementation, we will exclude follower from ISR if the begin 
offset of FetchRequest from this follower is always smaller than logEndOffset 
of leader for more than replicaLagTimeMaxMs.

Also, we will add a follower to ISR if the beginOffset of FetchRequest from 
this follower is equal or larger than high watermark of this partition.

This is problematic for the following reasons:

1) The criteria for ISR is inconsistent between maybeExpandIsr() and 
maybeShrinkIsr(). A follower may be repeatedly remove and added to the ISR 
(e.g. in the scenario described below).

2) A follower may be removed from the ISR even if its fetch rate can keep up 
with produce rate. Suppose a produce keeps producing a lot of small requests at 
high request but low byte rate, the fetch request is always able to read all 
the available data at the time leader receives it. However, the begin offset of 
fetch request will always be smaller than logEndOffset of leader. Thus the 
follower will be removed from ISR.

The solution to the problem is the following:

A follower should be in ISR if begin offset of its FetchRequest >= high 
watermark of partition AND end offset of its FetchRequest >= logEndOffset of 
leader at the time the leader receives FetRequest. The follower should be 
removed from ISR if this criteria is not met for more than replicaLagTimeMaxMs.

This solution makes the following guarantee:

1) If a follower is in ISR, its log end offset >= high watermark of partition. 
Thus messages with offset < high watermark is guaranteed to be replicated on 
all replicas in the ISR.

2) If a follower is not in ISR, then the end offset of its FetchRequest has not 
reached log end offset of leader. Either follower is in bootstrap phase, or the 
follower's average fetch rate < produce rate into the partition for more than 
replicaLagTimeMaxMs.








> Follower should be in the isr if its FetchRequest has fetched up to the 
> logEndOffset of leader
> --
>
> Key: KAFKA-4485
> URL: https://issues.apache.org/jira/browse/KAFKA-4485
> Project: Kafka
>  Issue Type: Bug
>Reporter: Dong Lin
>Assignee: Dong Lin
>
> As of current implementation, we will exclude follower from ISR if the begin 
> offset of FetchRequest from this follower is always smaller than logEndOffset 
> of leader for more than replicaLagTimeMaxMs.
> Also, we will add a follower to ISR if the beginOffset of FetchRequest from 
> this follower is equal or larger than high watermark of this partition.
> This is problematic for the following reasons:
> 1) The criteria for ISR is inconsistent between maybeExpandIsr() and 
> maybeShrinkIsr(). A follower may be repeatedly remove and added to the ISR 
> (e.g. in the scenario described below).
> 2) A follower may be removed from