[jira] [Updated] (KAFKA-4485) Follower should be in the isr if its FetchRequest has fetched up to the logEndOffset of leader
[ https://issues.apache.org/jira/browse/KAFKA-4485?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dong Lin updated KAFKA-4485: Description: As of current implementation, we will exclude follower from ISR if the begin offset of FetchRequest from this follower is smaller than logEndOffset of leader for more than replicaLagTimeMaxMs. Also, we will add a follower to ISR if the beginOffset of FetchRequest from this follower is equal or larger than high watermark of this partition. This is problematic for the following reasons: 1) The criteria for ISR is inconsistent between maybeExpandIsr() and maybeShrinkIsr(). Therefore a follower may be repeatedly remove and added to the ISR (e.g. in the scenario described below). 2) A follower may be removed from the ISR even if its fetch rate can keep up with produce rate. Suppose a produce keeps producing a lot of small requests at high request rate but low byte rate (e.g. many mirror makers), and the follower is always able to read all the available data at the time leader receives it. However, the begin offset of fetch request will always be smaller than logEndOffset of leader. Thus the follower will be removed from ISR after replicaLagTimeMaxMs. In the following we describe the solution to this problem. Terminology: - Definition of replica lag: we say a replica lags behind leader by X ms if its current log end offset if equivalent to the log end offset of leader X ms ago. - Definition of pseudo-ISR set: pseudo-ISR set of a partition = { replica | replica belongs to the given partition AND replica's lag <= replicaLagTimeMaxMs} - Definition of high-watermark of a partition: high-watermark of a partition is the max(current high-watermark of the partition, min(offset of replicas in the pseudo-ISR set of this partition)) - Definition of ISR set: ISR set of a partition = {replica | replica is in pseudo-ISR set of the given partition AND log end offset of replica >= high-watermark of the given partition} Guarantee: 1) If a follower is close enough to the replica in the sense that its replica lag <= replicaLagTimeMaxMs, then this follower will be in the pseudo-ISR set. Thus the high-watermark will stop to increase until this follower's log end offset >= high-watermark, at which moment this follower will be added to the ISR set. This allows us the solve the 2nd problem described above. 2) If a follower lags behind leader for more than X ms, it will be removed out of ISR set. 3) High watermark of a partition will never decrease. 4) For any replica in ISR set, its log end offset >= high-watermark. Implementation: 1) For each follower, the leader keeps track of the time of the last fetch request from this follower. Let's call it lastFetchTime. In addition, the leader also maintains the log end offset of the leader at the lastFetchTime for each follower. Let's call it lastFetchLeaderLEO. Both variables will be updated after leader has processed a FetchRequest from a follower. 2) When leader receives FetchRequest from a follower, if begin offset of the FetchRequest >= current leader's LEO, follower's lastCatchUpTimeMs will be set to current system time. Otherwise, if begin offset of the FetchRequest >= lastFetchLeaderLEO, follower's lastCatchUpTimeMs will be set to lastFetchTime. Replica's lag = current system time - lastCatchUpTimeMs. 3) The leader can update pseudo-ISR set, high-watermark and ISR set of the partition based on the lag of replicas of this partition, according to the definition described above. was: As of current implementation, we will exclude follower from ISR if the begin offset of FetchRequest from this follower is smaller than logEndOffset of leader for more than replicaLagTimeMaxMs. Also, we will add a follower to ISR if the beginOffset of FetchRequest from this follower is equal or larger than high watermark of this partition. This is problematic for the following reasons: 1) The criteria for ISR is inconsistent between maybeExpandIsr() and maybeShrinkIsr(). Therefore a follower may be repeatedly remove and added to the ISR (e.g. in the scenario described below). 2) A follower may be removed from the ISR even if its fetch rate can keep up with produce rate. Suppose a produce keeps producing a lot of small requests at high request rate but low byte rate (e.g. many mirror makers), and the follower is always able to read all the available data at the time leader receives it. However, the begin offset of fetch request will always be smaller than logEndOffset of leader. Thus the follower will be removed from ISR after replicaLagTimeMaxMs. In the following we describe the solution to this problem. Terminology: - Definition of replica lag: we say a replica lags behind leader by X ms if its current log end offset if equivalent to the log end offset of leader X ms ago. - Definition of pseudo-ISR set: pseudo-ISR set of a partition = {
[jira] [Updated] (KAFKA-4485) Follower should be in the isr if its FetchRequest has fetched up to the logEndOffset of leader
[ https://issues.apache.org/jira/browse/KAFKA-4485?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dong Lin updated KAFKA-4485: Description: As of current implementation, we will exclude follower from ISR if the begin offset of FetchRequest from this follower is smaller than logEndOffset of leader for more than replicaLagTimeMaxMs. Also, we will add a follower to ISR if the beginOffset of FetchRequest from this follower is equal or larger than high watermark of this partition. This is problematic for the following reasons: 1) The criteria for ISR is inconsistent between maybeExpandIsr() and maybeShrinkIsr(). Therefore a follower may be repeatedly remove and added to the ISR (e.g. in the scenario described below). 2) A follower may be removed from the ISR even if its fetch rate can keep up with produce rate. Suppose a produce keeps producing a lot of small requests at high request rate but low byte rate (e.g. many mirror makers), and the follower is always able to read all the available data at the time leader receives it. However, the begin offset of fetch request will always be smaller than logEndOffset of leader. Thus the follower will be removed from ISR after replicaLagTimeMaxMs. In the following we describe the solution to this problem. Terminology: - Definition of replica lag: we say a replica lags behind leader by X ms if its current log end offset if equivalent to the log end offset of leader X ms ago. - Definition of pseudo-ISR set: pseudo-ISR set of a partition = { replica | replica belongs to the given partition AND replica's lag <= replicaLagTimeMaxMs} - Definition of high-watermark of a partition: high-watermark of a partition is the max(current high-watermark of the partition, min(offset of replicas in the pseudo-ISR set of this partition)) - Definition of ISR set: ISR set of a partition = {replica | replica is in pseudo-ISR set of the given partition AND log end offset of replica >= high-watermark of the given partition} Guarantee: 1) If a follower is close enough to the replica in the sense that its replica lag <= replicaLagTimeMaxMs, then this follower will be in the pseudo-ISR set. Thus the high-watermark will stop to increase until this follower's log end offset >= high-watermark, at which moment this follower will be added to the ISR set. This allows us the solve the 2nd problem described above. 2) If a follower lags behind leader for more than X ms, it will be removed out of ISR set. 3) High watermark of a partition will never decrease. 4) For any replica in ISR set, its log end offset >= high-watermark. Implementation: 1) For each follower, the leader keeps track of the time of the last fetch request from this follower. Let's call it lastFetchTime. In addition, the leader also maintains the log end offset of the leader at the lastFetchTime for each follower. Let's call it lastFetchLeaderLEO. Both variables will be updated after leader has processed a FetchRequest from a follower. 2) When leader receives FetchRequest from a follower, it will set the follower's lag to lastFetchTime if begin offset of the FetchRequest >= lastFetchLeaderLEO. 3) The leader can update pseudo-ISR set, high-watermark and ISR set of the partition based on the lag of replicas of this partition, according to the definition described above. was: As of current implementation, we will exclude follower from ISR if the begin offset of FetchRequest from this follower is always smaller than logEndOffset of leader for more than replicaLagTimeMaxMs. Also, we will add a follower to ISR if the beginOffset of FetchRequest from this follower is equal or larger than high watermark of this partition. This is problematic for the following reasons: 1) The criteria for ISR is inconsistent between maybeExpandIsr() and maybeShrinkIsr(). A follower may be repeatedly remove and added to the ISR (e.g. in the scenario described below). 2) A follower may be removed from the ISR even if its fetch rate can keep up with produce rate. Suppose a produce keeps producing a lot of small requests at high request rate but low byte rate (e.g. many mirror makers), and the follower is always able to read all the available data at the time leader receives it. However, the begin offset of fetch request will always be smaller than logEndOffset of leader. Thus the follower will be removed from ISR after replicaLagTimeMaxMs. The solution to the problem is the following: A follower should be in ISR if begin offset of its FetchRequest >= max(high watermark of partition, log end offset of leader at the time the leader receives the previous FetchRequest). The follower should be removed from ISR if this criteria is not met for more than replicaLagTimeMaxMs. Note that we are comparing begin offset of FetchRequest with log end offset of leader at the time the leader receives the previous FetchRequest as an approximate way to
[jira] [Updated] (KAFKA-4485) Follower should be in the isr if its FetchRequest has fetched up to the logEndOffset of leader
[ https://issues.apache.org/jira/browse/KAFKA-4485?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jiangjie Qin updated KAFKA-4485: Affects Version/s: 0.10.1.0 Reviewer: Jiangjie Qin Fix Version/s: 0.10.2.0 > Follower should be in the isr if its FetchRequest has fetched up to the > logEndOffset of leader > -- > > Key: KAFKA-4485 > URL: https://issues.apache.org/jira/browse/KAFKA-4485 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.10.1.0 >Reporter: Dong Lin >Assignee: Dong Lin > Fix For: 0.10.2.0 > > > As of current implementation, we will exclude follower from ISR if the begin > offset of FetchRequest from this follower is always smaller than logEndOffset > of leader for more than replicaLagTimeMaxMs. > Also, we will add a follower to ISR if the beginOffset of FetchRequest from > this follower is equal or larger than high watermark of this partition. > This is problematic for the following reasons: > 1) The criteria for ISR is inconsistent between maybeExpandIsr() and > maybeShrinkIsr(). A follower may be repeatedly remove and added to the ISR > (e.g. in the scenario described below). > 2) A follower may be removed from the ISR even if its fetch rate can keep up > with produce rate. Suppose a produce keeps producing a lot of small requests > at high request rate but low byte rate (e.g. many mirror makers), and the > follower is always able to read all the available data at the time leader > receives it. However, the begin offset of fetch request will always be > smaller than logEndOffset of leader. Thus the follower will be removed from > ISR after replicaLagTimeMaxMs. > The solution to the problem is the following: > A follower should be in ISR if begin offset of its FetchRequest >= max(high > watermark of partition, log end offset of leader at the time the leader > receives the previous FetchRequest). The follower should be removed from ISR > if this criteria is not met for more than replicaLagTimeMaxMs. Note that we > are comparing begin offset of FetchRequest with log end offset of leader at > the time the leader receives the previous FetchRequest as an approximate way > to compare the end offset of fetched data with log end offset of leader. This > is because we can not easily know the end offset of fetched data at the time > broker receives fetch request. > This solution makes the following guarantee: > 1) If a follower is in ISR, then its log end offset >= high watermark of > partition at least sometime in the last replicaLagTimeMaxMs. > 2) If a follower is not in ISR, then the end offset of its FetchRequest can > not catch up with log end offset of leader for more than replicaLagTimeMaxMs. > Either follower is in bootstrap phase, or the follower's average fetch rate > is smaller than average produce rate into the partition for the last > replicaLagTimeMaxMs. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-4485) Follower should be in the isr if its FetchRequest has fetched up to the logEndOffset of leader
[ https://issues.apache.org/jira/browse/KAFKA-4485?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dong Lin updated KAFKA-4485: Description: As of current implementation, we will exclude follower from ISR if the begin offset of FetchRequest from this follower is always smaller than logEndOffset of leader for more than replicaLagTimeMaxMs. Also, we will add a follower to ISR if the beginOffset of FetchRequest from this follower is equal or larger than high watermark of this partition. This is problematic for the following reasons: 1) The criteria for ISR is inconsistent between maybeExpandIsr() and maybeShrinkIsr(). A follower may be repeatedly remove and added to the ISR (e.g. in the scenario described below). 2) A follower may be removed from the ISR even if its fetch rate can keep up with produce rate. Suppose a produce keeps producing a lot of small requests at high request rate but low byte rate (e.g. many mirror makers), and the follower is always able to read all the available data at the time leader receives it. However, the begin offset of fetch request will always be smaller than logEndOffset of leader. Thus the follower will be removed from ISR after replicaLagTimeMaxMs. The solution to the problem is the following: A follower should be in ISR if begin offset of its FetchRequest >= max(high watermark of partition, log end offset of leader at the time the leader receives the previous FetchRequest). The follower should be removed from ISR if this criteria is not met for more than replicaLagTimeMaxMs. Note that we are comparing begin offset of FetchRequest with log end offset of leader at the time the leader receives the previous FetchRequest as an approximate way to compare the end offset of fetched data with log end offset of leader. This is because we can not easily know the end offset of fetched data at the time broker receives fetch request. This solution makes the following guarantee: 1) If a follower is in ISR, then its log end offset >= high watermark of partition at least sometime in the last replicaLagTimeMaxMs. 2) If a follower is not in ISR, then the end offset of its FetchRequest can not catch up with log end offset of leader for more than replicaLagTimeMaxMs. Either follower is in bootstrap phase, or the follower's average fetch rate is smaller than average produce rate into the partition for the last replicaLagTimeMaxMs. was: As of current implementation, we will exclude follower from ISR if the begin offset of FetchRequest from this follower is always smaller than logEndOffset of leader for more than replicaLagTimeMaxMs. Also, we will add a follower to ISR if the beginOffset of FetchRequest from this follower is equal or larger than high watermark of this partition. This is problematic for the following reasons: 1) The criteria for ISR is inconsistent between maybeExpandIsr() and maybeShrinkIsr(). A follower may be repeatedly remove and added to the ISR (e.g. in the scenario described below). 2) A follower may be removed from the ISR even if its fetch rate can keep up with produce rate. Suppose a produce keeps producing a lot of small requests at high request but low byte rate, the fetch request is always able to read all the available data at the time leader receives it. However, the begin offset of fetch request will always be smaller than logEndOffset of leader. Thus the follower will be removed from ISR. The solution to the problem is the following: A follower should be in ISR if begin offset of its FetchRequest >= max(high watermark of partition, log end offset of leader at the time the leader receives the previous FetchRequest). The follower should be removed from ISR if this criteria is not met for more than replicaLagTimeMaxMs. This solution makes the following guarantee: 1) If a follower is in ISR, then its log end offset >= high watermark of partition at least sometime in the last replicaLagTimeMaxMs. 2) If a follower is not in ISR, then the end offset of its FetchRequest can not catch up with log end offset of leader for more than replicaLagTimeMaxMs. Either follower is in bootstrap phase, or the follower's average fetch rate < produce rate into the partition for more than replicaLagTimeMaxMs. > Follower should be in the isr if its FetchRequest has fetched up to the > logEndOffset of leader > -- > > Key: KAFKA-4485 > URL: https://issues.apache.org/jira/browse/KAFKA-4485 > Project: Kafka > Issue Type: Bug >Reporter: Dong Lin >Assignee: Dong Lin > > As of current implementation, we will exclude follower from ISR if the begin > offset of FetchRequest from this follower is always smaller than logEndOffset > of leader for more than replicaLagTimeMaxMs. > Also, we will
[jira] [Updated] (KAFKA-4485) Follower should be in the isr if its FetchRequest has fetched up to the logEndOffset of leader
[ https://issues.apache.org/jira/browse/KAFKA-4485?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dong Lin updated KAFKA-4485: Description: As of current implementation, we will exclude follower from ISR if the begin offset of FetchRequest from this follower is always smaller than logEndOffset of leader for more than replicaLagTimeMaxMs. Also, we will add a follower to ISR if the beginOffset of FetchRequest from this follower is equal or larger than high watermark of this partition. This is problematic for the following reasons: 1) The criteria for ISR is inconsistent between maybeExpandIsr() and maybeShrinkIsr(). A follower may be repeatedly remove and added to the ISR (e.g. in the scenario described below). 2) A follower may be removed from the ISR even if its fetch rate can keep up with produce rate. Suppose a produce keeps producing a lot of small requests at high request but low byte rate, the fetch request is always able to read all the available data at the time leader receives it. However, the begin offset of fetch request will always be smaller than logEndOffset of leader. Thus the follower will be removed from ISR. The solution to the problem is the following: A follower should be in ISR if begin offset of its FetchRequest >= max(high watermark of partition, log end offset of leader at the time the leader receives the previous FetchRequest). The follower should be removed from ISR if this criteria is not met for more than replicaLagTimeMaxMs. This solution makes the following guarantee: 1) If a follower is in ISR, then its log end offset >= high watermark of partition at least sometime in the last replicaLagTimeMaxMs. 2) If a follower is not in ISR, then the end offset of its FetchRequest can not catch up with log end offset of leader for more than replicaLagTimeMaxMs. Either follower is in bootstrap phase, or the follower's average fetch rate < produce rate into the partition for more than replicaLagTimeMaxMs. was: As of current implementation, we will exclude follower from ISR if the begin offset of FetchRequest from this follower is always smaller than logEndOffset of leader for more than replicaLagTimeMaxMs. Also, we will add a follower to ISR if the beginOffset of FetchRequest from this follower is equal or larger than high watermark of this partition. This is problematic for the following reasons: 1) The criteria for ISR is inconsistent between maybeExpandIsr() and maybeShrinkIsr(). A follower may be repeatedly remove and added to the ISR (e.g. in the scenario described below). 2) A follower may be removed from the ISR even if its fetch rate can keep up with produce rate. Suppose a produce keeps producing a lot of small requests at high request but low byte rate, the fetch request is always able to read all the available data at the time leader receives it. However, the begin offset of fetch request will always be smaller than logEndOffset of leader. Thus the follower will be removed from ISR. The solution to the problem is the following: A follower should be in ISR if begin offset of its FetchRequest >= high watermark of partition AND end offset of its FetchRequest >= logEndOffset of leader at the time the leader receives FetRequest. The follower should be removed from ISR if this criteria is not met for more than replicaLagTimeMaxMs. This solution makes the following guarantee: 1) If a follower is in ISR, its log end offset >= high watermark of partition. Thus messages with offset < high watermark is guaranteed to be replicated on all replicas in the ISR. 2) If a follower is not in ISR, then the end offset of its FetchRequest has not reached log end offset of leader. Either follower is in bootstrap phase, or the follower's average fetch rate < produce rate into the partition for more than replicaLagTimeMaxMs. > Follower should be in the isr if its FetchRequest has fetched up to the > logEndOffset of leader > -- > > Key: KAFKA-4485 > URL: https://issues.apache.org/jira/browse/KAFKA-4485 > Project: Kafka > Issue Type: Bug >Reporter: Dong Lin >Assignee: Dong Lin > > As of current implementation, we will exclude follower from ISR if the begin > offset of FetchRequest from this follower is always smaller than logEndOffset > of leader for more than replicaLagTimeMaxMs. > Also, we will add a follower to ISR if the beginOffset of FetchRequest from > this follower is equal or larger than high watermark of this partition. > This is problematic for the following reasons: > 1) The criteria for ISR is inconsistent between maybeExpandIsr() and > maybeShrinkIsr(). A follower may be repeatedly remove and added to the ISR > (e.g. in the scenario described below). > 2) A follower may be removed from