[ 
https://issues.apache.org/jira/browse/KAFKA-19779?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lucas Brutschy updated KAFKA-19779:
-----------------------------------
    Description: 
h2. Fencing offset commits

In the Kafka protocol, when a consumer commits offsets or a producer tries to 
add offsets to a transaction, it includes its epoch/generation of the consumer 
group. The point of this is for the group coordinator to fence against zombie 
commit requests, that is, commit requests that include an offset for a 
partition that was since reassigned to a different member. If such a guard was 
not in place, a zombie offset commit may overwrite offsets of the new owner, or 
its offsets may be committed to the consumer offset topic but not be included 
in the result of the new owners offset fetch request.

In KIP-848, when receiving an offset commit request that includes the 
client-side member epoch and a member ID, the group coordinator performs the 
check

{{Client-Side Member Epoch == Broker-Side Member Epoch}}

and, if the check fails, returns a {{STALE_MEMBER_EPOCH}} error for regular 
offset commits and a {{ILLEGAL_GENERATION}} for transactional offset commits. 
If the member epoch sent in the request is the current broker-side member 
epoch, KIP-848 guarantees that the partition cannot also be owned by a 
different member at the same or a larger epoch. Therefore, this is sufficient 
for fencing zombie commits. Note that we assume zombie commits will always 
contain offsets for partitions that were owned by the member at the member 
epoch sent in the request. Commit requests that commit offsets for partitions 
that are _not_ owned by the member in that epoch, are not possible in a correct 
client-side implementation of the protocol.

Note that the broker-side member epoch is not the group epoch or the target 
assignment epoch. For details, see KIP-848. Note also that commits can also be 
fenced because a member falls out of the group (e.g. because it does not revoke 
partitions within the rebalance timeout). At this point, its commits will be 
fenced solely based on the member ID (which is not part of the group anymore). 
We therefore ignore this case in this document, and only consider zombie 
commits from members that are still part of the group.
h2. Downsides of the current approach

This fencing is, however, unnecessarily strict. Assume, for example, a member 
owns P1 at epoch 1. The broker-side member epoch is bumped to 2, but the member 
still has P1 assigned at epoch 2. The member may not learn about the new 
broker-side member epoch in time, and submit an offset commit commit for P1 
with epoch 1. This is not a zombie commit request as define above (because P1 
was not reassigned to a different member), but it will still be rejected by a 
KIP-848 group coordinator.

The trouble with this fencing mechanism is that it is very difficult to avoid 
the broker-side member epoch being bumped concurrently with an offset commit. 
Seen from the client-side, the broker-side member epoch may be bumped at any 
time while a heartbeat to the group coordinator is in-flight. To make sure the 
member epoch sent in an offset commit request is up-to-date would require 
making sure that no consumer group or streams group heartbeat request is 
in-flight at the same time.
h2. Why a broker-side fix is warranted

This problem is particularly challenging to solve on the client side for 
transactional offset commits, since they are performed by the producer, not the 
consumer, and the producer has no way of knowing when a consumer group 
heartbeat or streams group heartbeat is in-flight. The member epoch is passed 
from the Java consumer to the Java producer using the {{ConsumerGroupMetadata}} 
object, which is passed into {{{}sendOffsetsToTransaction{}}}. By the time the 
transactional offset commit is sent, the member epoch may be stale, the broker 
will return an {{ILLEGAL_GENERATION}} exception. This will force the Java 
producer into an abortable error state, surfacing the error as a 
{{CommitFailedException}} to the user, the user has no other way to recover 
from this to abort the transaction.

This may hurt in any Kafka client application, since aborting transactions 
means throwing away work and restarting from an earlier point. But it is a 
particularly big problem in Kafka Streams with exactly-once semantics, where 
aborting a transaction usually means wiping and restoring the state store, so 
each aborted transaction means some downtime for apps using state stores of 
non-negligible size. Furthermore, since Kafka Streams commits every 100ms by 
default in EOS, this is likely to happen fairly often.
h1. Conceptual Design

n this design document, we therefore propose to relax the condition for offset 
commit fencing.
h2. Identifying zombies using the last epoch a partition was assigned to the 
member

To derive a more relaxed check, we need to identify an epoch which separates 
zombie commits from commits of the current owner. As mentioned above, zombie 
commit requests are commit requests that include a partition, member ID and 
member epoch combination, so that the member owned the partition at that epoch. 
However, the partition has since been reassigned to a different member.

Most precisely, on the level of a single partition, a relaxed offset commit 
check can be defined using a *assignment epoch* for each assigned partition and 
each member, which is the epoch at which the partition was assigned to that 
member. To fence from zombie commit requests, we can reject all offset commit 
requests from a member that either does not have the partition assigned, or 
that includes any member epoch that is smaller or equal than the assignment 
epoch for that member and that partition.

Assignment Epoch <= Client-Side Member Epoch <= Broker-Side Member Epoch

The correctness of this is obvious: all commits of the current partition owner 
will be accepted, since the Client-Side Member Epoch of the current owner must 
always have an epoch that is larger or equal than the assignment epoch (a 
partition that is revoked in one epoch is never reassigned in the same epoch). 
It will also correctly reject any zombie commits from that member, because if a 
partition was owned by the member A at Client-Side Member Epoch (which we 
assume for zombie commits), but it was reassigned to member B since, we have 
two possible cases:
 # Member A currently does not have the partition assigned

 # Member A does currently have the partition assigned, but then it must have 
been reassigned to member A after being assigned to member B. By KIP-848 this 
cannot all happen in the same epoch, so we must have Assignment Epoch > 
Client-Side Member Epoch.

h3. Differences to the design above

This design does not need to disable commits on the client-side. The need to 
disable commits came from the fact that we are tracking epochs “imprecisely”, 
on the member-level and not on the partition-level. So in the RevocationEpoch 
design, when we have just revoked P2 on the client, we may attempt to commit a 
partition P1, triggering the race condition because the broker can concurrently 
bump the revocation epoch for that member because of the revocation of P2. We 
prevent this by disabling commits while a partition is revoked, and by “while a 
partition is revoked” I mean the timeframe from executing the revocation on the 
client, and seeing the following epoch bump on the client. In the 
partition-level AssignmentEpoch design, if we are committing P1, we are still 
convinced that we own P1, so we must also still own it on the broker. At the 
same time, we may remove the assignment epoch for P2 on the broker, but it 
doesn’t matter since this doesn’t impact whether we can commit P1, and we are 
not going to try to commit P2 after having revoked it on the client side.
h1. Proposed Changes
h3. Introducing Per-Member and Per-Partition Assignment Epoch

We extend the model of a consumer group / streams group member with one integer 
per assigned partition for each member of a group. This includes both 
partitions directly assigned to the member, and partitions pending revocation. 
The assignment epoch is set to the epoch in which the partition was assigned to 
the member, and we have the invariant Assignment Epoch <= MemberEpoch <= 
TargetAssignmentEpoch <= GroupEpoch.

The AssignmentEpoch is added as a field to TopicPartitions in 
ConsumerGroupCurrentMemberAssignmentValue, so that it can be stored and 
replayed from the committed offsets topic.

For streams groups, we will use the same logic but add assignment epochs only 
for active tasks in StreamsGroupCurrentMemberAssignmentValue, since only active 
tasks commit offsets in Kafka Streams.
h3. Relaxing the offset commit validation

We replace the current check offset commit validation check

Client-Side Member Epoch == Broker-Side Member Epoch

by

Assignment Epoch <= Client-Side Member Epoch <= Broker-Side Member Epoch

where, for simplicity, we can assume the assignment epoch of a partition that 
is not assigned to that member to be Integer.maxValue.

  was:
h2. Fencing offset commits

In the Kafka protocol, when a consumer commits offsets or a producer tries to 
add offsets to a transaction, it includes its epoch/generation of the consumer 
group. The point of this is for the group coordinator to fence against zombie 
commit requests, that is, commit requests that include an offset for a 
partition that was since reassigned to a different member. If such a guard was 
not in place, a zombie offset commit may overwrite offsets of the new owner, or 
its offsets may be committed to the consumer offset topic but not be included 
in the result of the new owners offset fetch request.

In KIP-848, when receiving an offset commit request that includes the 
client-side member epoch and a member ID, the group coordinator performs the 
check

{{Client-Side Member Epoch == Broker-Side Member Epoch}}

and, if the check fails, returns a {{STALE_MEMBER_EPOCH}} error for regular 
offset commits and a {{ILLEGAL_GENERATION}} for transactional offset commits. 
If the member epoch sent in the request is the current broker-side member 
epoch, KIP-848 guarantees that the partition cannot also be owned by a 
different member at the same or a larger epoch. Therefore, this is sufficient 
for fencing zombie commits. Note that we assume zombie commits will always 
contain offsets for partitions that were owned by the member at the member 
epoch sent in the request. Commit requests that commit offsets for partitions 
that are _not_ owned by the member in that epoch, are not possible in a correct 
client-side implementation of the protocol.

Note that the broker-side member epoch is not the group epoch or the target 
assignment epoch. For details, see KIP-848. Note also that commits can also be 
fenced because a member falls out of the group (e.g. because it does not revoke 
partitions within the rebalance timeout). At this point, its commits will be 
fenced solely based on the member ID (which is not part of the group anymore). 
We therefore ignore this case in this document, and only consider zombie 
commits from members that are still part of the group.
h2. Downsides of the current approach

This fencing is, however, unnecessarily strict. Assume, for example, a member 
owns P1 at epoch 1. The broker-side member epoch is bumped to 2, but the member 
still has P1 assigned at epoch 2. The member may not learn about the new 
broker-side member epoch in time, and submit an offset commit commit for P1 
with epoch 1. This is not a zombie commit request as define above (because P1 
was not reassigned to a different member), but it will still be rejected by a 
KIP-848 group coordinator.

The trouble with this fencing mechanism is that it is very difficult to avoid 
the broker-side member epoch being bumped concurrently with an offset commit. 
Seen from the client-side, the broker-side member epoch may be bumped at any 
time while a heartbeat to the group coordinator is in-flight. To make sure the 
member epoch sent in an offset commit request is up-to-date would require 
making sure that no consumer group or streams group heartbeat request is 
in-flight at the same time.
h2. Why a broker-side fix is warranted

This problem is particularly challenging to solve on the client side for 
transactional offset commits, since they are performed by the producer, not the 
consumer, and the producer has no way of knowing when a consumer group 
heartbeat or streams group heartbeat is in-flight. The member epoch is passed 
from the Java consumer to the Java producer using the {{ConsumerGroupMetadata}} 
object, which is passed into {{{}sendOffsetsToTransaction{}}}. By the time the 
transactional offset commit is sent, the member epoch may be stale, the broker 
will return an {{ILLEGAL_GENERATION}} exception. This will force the Java 
producer into an abortable error state, surfacing the error as a 
{{CommitFailedException}} to the user, the user has no other way to recover 
from this to abort the transaction.

This may hurt in any Kafka client application, since aborting transactions 
means throwing away work and restarting from an earlier point. But it is a 
particularly big problem in Kafka Streams with exactly-once semantics, where 
aborting a transaction usually means wiping and restoring the state store, so 
each aborted transaction means some downtime for apps using state stores of 
non-negligible size. Furthermore, since Kafka Streams commits every 100ms by 
default in EOS, this is likely to happen fairly often.
h1. Conceptual Design
h2. Identifying zombies using the largest epoch a partition was revoked

To derive a more relaxed check, we need to identify an epoch which separates 
zombie commits from commits of the current owner. As mentioned above, zombie 
commit requests are commit requests that include a partition, member ID and 
member epoch combination, so that the member owned the partition at that epoch. 
However, the partition has since been reassigned to a different member.

Most precisely, on the level of a single partition, a relaxed offset commit 
check can be defined using a *revocation epoch* for each partition and each 
member, which is the largest epoch at which the partition was revoked from that 
member. To fence from zombie commit requests, we can reject all offset commit 
requests from a member at any member epoch that is smaller or equal than the 
revocation epoch for that member and that partition.

{{Partition-Level RevocationEpoch < Client-Side Member Epoch <= Broker-Side 
Member Epoch}}

The correctness of this is obvious: all commits of the current partition owner 
will be accepted, since the {{Client-Side Member Epoch}} of the current owner 
must always have an epoch that is larger than the revocation epoch (a partition 
that is revoked in one epoch is never reassigned in the same epoch). It will 
also correctly reject any zombie commits from that member, because KIP-848 
guarantees that if a partition was owned by the member at {{Client-Side Member 
Epoch}} (which we assume for zombie commits), but it was reassigned since, then 
this partition must have been revoked from the member in an epoch greater or 
equal to {{{}Client-Side Member Epoch{}}}, so we must have {{Partition-Level 
RevocationEpoch >= Client-Side Member Epoch}} for zombie commits.
h2. From partition-and-member-level to member-level

However, tracking such a revocation epoch for every partition and every member 
would be tedious. Instead, we can use an upper bound for partition-level 
revocation epochs for every member: We know that there must be an epoch that is 
larger or equal to all partition-level revocation epochs. It is the largest 
epoch at which _any_ partition was revoked from the member, and which is 
smaller than the current {{{}Broker-Side Member Epoch{}}}. We call it the 
{*}member-level revocation epoch{*}, or {{{}RevocationEpoch{}}}.

The offset commit request validation can then be defined as

{{RevocationEpoch < Client-Side Member Epoch <= Broker-Side Member Epoch}}

Since {{RevocationEpoch}} is an upper bound on {{{}Partition-Level 
RevocationEpoch{}}}, this check correctly fences all zombie commits by an 
extension of the argument above. However, this formulation is less precise than 
the partition-level revocation epoch, and there may still be cases where 
commits of the current owner are fenced as well. This can, in particular, 
happen when a partition P1 is revoked from the client, causing a broker-side 
bump of {{RevocationEpoch}} and {{{}Broker-Side Member Epoch{}}}, but the 
client attempts to commit offsets for a different partition P2 before learning 
about the updated {{{}Broker-Side Member Epoch{}}}.
h2. Completely avoiding commit request errors through the use of rebalance 
listeners

As explained above, the member-level revocation epoch check makes 
{{{}STALE_MEMBER_EPOCH{}}}/{{{}ILLEGAL_GENERATION{}}} much less likely, but it 
can still happen that a partition is revoked from the member, and while the 
{{Broker-Side Member Epoch}} and {{RevocationEpoch}} are bumped on the broker, 
the member concurrently attempts to commit offsets.

However, compared to any regular {{Broker-Side Member Epoch}} bump, a bump of 
the {{RevocationEpoch}} always involves the client, which in fact needs to 
revoke the partitions before the epoch bump can happen. This means that the 
client is “in-the-loop”, and we can prevent failing commits by never committing 
offsets between revoking partitions on the client, and the corresponding 
broker-side member epoch bump.

A power client like Kafka Streams will therefore be able to completely avoid 
{{{}STALE_MEMBER_EPOCH{}}}/{{{}ILLEGAL_GENERATION{}}} errors in the normal case 
(that is, in the absence of the commit requests actually being delayed to 
become zombie requests), in conjunction with the rebalance listener. We can 
completely avoid failing offset commits (both transactional and 
non-transactional):
 # In {{{}onPartitionsRevoked{}}}/ {{{}onTasksRevoked{}}}, commit current 
offsets / transaction and store the current 
{{ConsumerGroupMetadata.generationId}} to remember that it is a revocation 
epoch.

 # Before calling {{Consumer.commit}} or 
{{{}Producer.sendOffsetToTransaction{}}}, we compare the current value of 
{{ConsumerGroupMetadata.generationId}} to the stored revocation epoch. If we 
are still on the revocation epoch, we skip the commit.

We know that any partition revocation will require {{{}onPartitionsRevoked{}}}/ 
{{onTasksRevoked}} to be invoked on the client-side, and after revocations have 
been completed and the broker-side member epoch has been communicated to the 
client, {{ConsumerGroupMetadata.generationId}} will be updated.

This mechanism can be used by any user of the Java clients, not just Kafka 
Streams. The mechanism can also be implemented within the Java consumer and 
producer. The consumer would have to keep the last revocation epoch internally 
and expose it to the producer via {{{}ConsumerGroupMetadata{}}}, and delay 
committing of any offsets of non-revoked partitions until the member epoch is 
bumped beyond the revocation epoch. Alternatively, any revocation should block 
the application thread inside {{{}Consumer.poll{}}}, until the new epoch has 
been received from the broker. However, we don’t include these changes in this 
design document, and it is up for discussion whether we’d need to implement it, 
since most users won’t need to completely avoid 
{{{}STALE_MEMBER_EPOCH{}}}/{{{}ILLEGAL_GENERATION{}}}.
h1. Proposed Changes
h3. Introducing Per-Member Revocation Epoch

We extend the model of a consumer group / streams group member with a single 
integer called the revocation epoch for each member of a group. The revocation 
epoch is initially set to 0, and we have the invariant {{{}RevocationEpoch < 
MemberEpoch <= TargetAssignmentEpoch <= GroupEpoch{}}}.

As before, whenever a member completes revocation of all partitions that are 
pending revocation, its {{MemberEpoch}} is bumped to the 
{{{}TargetAssignmentEpoch{}}}. At the same time, the {{RevocationEpoch}} is 
bumped to the previous value of the {{{}MemberEpoch{}}}. This maintains the 
invariant above. If the {{MemberEpoch}} is bumped to the 
{{TargetAssignmentEpoch}} without requiring any revocations, the 
{{RevocationEpoch}} is not changed.

The {{RevocationEpoch}} is added as a field to the 
{{{}ConsumerGroupCurrentMemberAssignmentValue{}}}, so that it can be stored and 
replayed from the committed offsets topic.

For streams groups, we will use the same logic but add the field 
{{ActiveTaskRevocationEpoch}} to 
{{{}StreamsGroupCurrentMemberAssignmentValue{}}}, since only active tasks 
commit offsets in Kafka Streams.
h3. Relaxing the offset commit validation

We replace the current check offset commit validation check

{{Client-Side Member Epoch == Broker-Side Member Epoch}}

by

{{RevocationEpoch < Client-Side Member Epoch <= Broker-Side Member Epoch}}
h3. Disabling streams commits between revocation and member epoch bumps

In Kafka Streams, we keep track if we are currently on a revocation epoch. In 
{{{}onTasksRevoked{}}}, we store {{ConsumerGroupMetadata.generationId}} as 
revocation epoch, and disable committing (via the consumer or the producer) as 
long as we are still on that epoch (by comparing against the current value of 
{{{}ConsumerGroupMetadata.generationId{}}}. This disablement only affects 
offset and transaction commits performed outside of {{{}onTasksRevoked{}}}. We 
can still safely commit inside {{{}onTasksRevoked{}}}, since we know that any 
broker-side member epoch bumps are blocked as long the member is revoking tasks.


> Relax offset commit validation to allow member epochs since the last 
> revocation
> -------------------------------------------------------------------------------
>
>                 Key: KAFKA-19779
>                 URL: https://issues.apache.org/jira/browse/KAFKA-19779
>             Project: Kafka
>          Issue Type: Sub-task
>          Components: streams
>            Reporter: Lucas Brutschy
>            Assignee: Lucas Brutschy
>            Priority: Major
>
> h2. Fencing offset commits
> In the Kafka protocol, when a consumer commits offsets or a producer tries to 
> add offsets to a transaction, it includes its epoch/generation of the 
> consumer group. The point of this is for the group coordinator to fence 
> against zombie commit requests, that is, commit requests that include an 
> offset for a partition that was since reassigned to a different member. If 
> such a guard was not in place, a zombie offset commit may overwrite offsets 
> of the new owner, or its offsets may be committed to the consumer offset 
> topic but not be included in the result of the new owners offset fetch 
> request.
> In KIP-848, when receiving an offset commit request that includes the 
> client-side member epoch and a member ID, the group coordinator performs the 
> check
> {{Client-Side Member Epoch == Broker-Side Member Epoch}}
> and, if the check fails, returns a {{STALE_MEMBER_EPOCH}} error for regular 
> offset commits and a {{ILLEGAL_GENERATION}} for transactional offset commits. 
> If the member epoch sent in the request is the current broker-side member 
> epoch, KIP-848 guarantees that the partition cannot also be owned by a 
> different member at the same or a larger epoch. Therefore, this is sufficient 
> for fencing zombie commits. Note that we assume zombie commits will always 
> contain offsets for partitions that were owned by the member at the member 
> epoch sent in the request. Commit requests that commit offsets for partitions 
> that are _not_ owned by the member in that epoch, are not possible in a 
> correct client-side implementation of the protocol.
> Note that the broker-side member epoch is not the group epoch or the target 
> assignment epoch. For details, see KIP-848. Note also that commits can also 
> be fenced because a member falls out of the group (e.g. because it does not 
> revoke partitions within the rebalance timeout). At this point, its commits 
> will be fenced solely based on the member ID (which is not part of the group 
> anymore). We therefore ignore this case in this document, and only consider 
> zombie commits from members that are still part of the group.
> h2. Downsides of the current approach
> This fencing is, however, unnecessarily strict. Assume, for example, a member 
> owns P1 at epoch 1. The broker-side member epoch is bumped to 2, but the 
> member still has P1 assigned at epoch 2. The member may not learn about the 
> new broker-side member epoch in time, and submit an offset commit commit for 
> P1 with epoch 1. This is not a zombie commit request as define above (because 
> P1 was not reassigned to a different member), but it will still be rejected 
> by a KIP-848 group coordinator.
> The trouble with this fencing mechanism is that it is very difficult to avoid 
> the broker-side member epoch being bumped concurrently with an offset commit. 
> Seen from the client-side, the broker-side member epoch may be bumped at any 
> time while a heartbeat to the group coordinator is in-flight. To make sure 
> the member epoch sent in an offset commit request is up-to-date would require 
> making sure that no consumer group or streams group heartbeat request is 
> in-flight at the same time.
> h2. Why a broker-side fix is warranted
> This problem is particularly challenging to solve on the client side for 
> transactional offset commits, since they are performed by the producer, not 
> the consumer, and the producer has no way of knowing when a consumer group 
> heartbeat or streams group heartbeat is in-flight. The member epoch is passed 
> from the Java consumer to the Java producer using the 
> {{ConsumerGroupMetadata}} object, which is passed into 
> {{{}sendOffsetsToTransaction{}}}. By the time the transactional offset commit 
> is sent, the member epoch may be stale, the broker will return an 
> {{ILLEGAL_GENERATION}} exception. This will force the Java producer into an 
> abortable error state, surfacing the error as a {{CommitFailedException}} to 
> the user, the user has no other way to recover from this to abort the 
> transaction.
> This may hurt in any Kafka client application, since aborting transactions 
> means throwing away work and restarting from an earlier point. But it is a 
> particularly big problem in Kafka Streams with exactly-once semantics, where 
> aborting a transaction usually means wiping and restoring the state store, so 
> each aborted transaction means some downtime for apps using state stores of 
> non-negligible size. Furthermore, since Kafka Streams commits every 100ms by 
> default in EOS, this is likely to happen fairly often.
> h1. Conceptual Design
> n this design document, we therefore propose to relax the condition for 
> offset commit fencing.
> h2. Identifying zombies using the last epoch a partition was assigned to the 
> member
> To derive a more relaxed check, we need to identify an epoch which separates 
> zombie commits from commits of the current owner. As mentioned above, zombie 
> commit requests are commit requests that include a partition, member ID and 
> member epoch combination, so that the member owned the partition at that 
> epoch. However, the partition has since been reassigned to a different member.
> Most precisely, on the level of a single partition, a relaxed offset commit 
> check can be defined using a *assignment epoch* for each assigned partition 
> and each member, which is the epoch at which the partition was assigned to 
> that member. To fence from zombie commit requests, we can reject all offset 
> commit requests from a member that either does not have the partition 
> assigned, or that includes any member epoch that is smaller or equal than the 
> assignment epoch for that member and that partition.
> Assignment Epoch <= Client-Side Member Epoch <= Broker-Side Member Epoch
> The correctness of this is obvious: all commits of the current partition 
> owner will be accepted, since the Client-Side Member Epoch of the current 
> owner must always have an epoch that is larger or equal than the assignment 
> epoch (a partition that is revoked in one epoch is never reassigned in the 
> same epoch). It will also correctly reject any zombie commits from that 
> member, because if a partition was owned by the member A at Client-Side 
> Member Epoch (which we assume for zombie commits), but it was reassigned to 
> member B since, we have two possible cases:
>  # Member A currently does not have the partition assigned
>  # Member A does currently have the partition assigned, but then it must have 
> been reassigned to member A after being assigned to member B. By KIP-848 this 
> cannot all happen in the same epoch, so we must have Assignment Epoch > 
> Client-Side Member Epoch.
> h3. Differences to the design above
> This design does not need to disable commits on the client-side. The need to 
> disable commits came from the fact that we are tracking epochs “imprecisely”, 
> on the member-level and not on the partition-level. So in the RevocationEpoch 
> design, when we have just revoked P2 on the client, we may attempt to commit 
> a partition P1, triggering the race condition because the broker can 
> concurrently bump the revocation epoch for that member because of the 
> revocation of P2. We prevent this by disabling commits while a partition is 
> revoked, and by “while a partition is revoked” I mean the timeframe from 
> executing the revocation on the client, and seeing the following epoch bump 
> on the client. In the partition-level AssignmentEpoch design, if we are 
> committing P1, we are still convinced that we own P1, so we must also still 
> own it on the broker. At the same time, we may remove the assignment epoch 
> for P2 on the broker, but it doesn’t matter since this doesn’t impact whether 
> we can commit P1, and we are not going to try to commit P2 after having 
> revoked it on the client side.
> h1. Proposed Changes
> h3. Introducing Per-Member and Per-Partition Assignment Epoch
> We extend the model of a consumer group / streams group member with one 
> integer per assigned partition for each member of a group. This includes both 
> partitions directly assigned to the member, and partitions pending 
> revocation. The assignment epoch is set to the epoch in which the partition 
> was assigned to the member, and we have the invariant Assignment Epoch <= 
> MemberEpoch <= TargetAssignmentEpoch <= GroupEpoch.
> The AssignmentEpoch is added as a field to TopicPartitions in 
> ConsumerGroupCurrentMemberAssignmentValue, so that it can be stored and 
> replayed from the committed offsets topic.
> For streams groups, we will use the same logic but add assignment epochs only 
> for active tasks in StreamsGroupCurrentMemberAssignmentValue, since only 
> active tasks commit offsets in Kafka Streams.
> h3. Relaxing the offset commit validation
> We replace the current check offset commit validation check
> Client-Side Member Epoch == Broker-Side Member Epoch
> by
> Assignment Epoch <= Client-Side Member Epoch <= Broker-Side Member Epoch
> where, for simplicity, we can assume the assignment epoch of a partition that 
> is not assigned to that member to be Integer.maxValue.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to