[
https://issues.apache.org/jira/browse/KAFKA-19779?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Lucas Brutschy updated KAFKA-19779:
-----------------------------------
Description:
h2. Fencing offset commits
In the Kafka protocol, when a consumer commits offsets or a producer tries to
add offsets to a transaction, it includes its epoch/generation of the consumer
group. The point of this is for the group coordinator to fence against zombie
commit requests, that is, commit requests that include an offset for a
partition that was since reassigned to a different member. If such a guard was
not in place, a zombie offset commit may overwrite offsets of the new owner, or
its offsets may be committed to the consumer offset topic but not be included
in the result of the new owners offset fetch request.
In KIP-848, when receiving an offset commit request that includes the
client-side member epoch and a member ID, the group coordinator performs the
check
{{Client-Side Member Epoch == Broker-Side Member Epoch}}
and, if the check fails, returns a {{STALE_MEMBER_EPOCH}} error for regular
offset commits and a {{ILLEGAL_GENERATION}} for transactional offset commits.
If the member epoch sent in the request is the current broker-side member
epoch, KIP-848 guarantees that the partition cannot also be owned by a
different member at the same or a larger epoch. Therefore, this is sufficient
for fencing zombie commits. Note that we assume zombie commits will always
contain offsets for partitions that were owned by the member at the member
epoch sent in the request. Commit requests that commit offsets for partitions
that are _not_ owned by the member in that epoch, are not possible in a correct
client-side implementation of the protocol.
Note that the broker-side member epoch is not the group epoch or the target
assignment epoch. For details, see KIP-848. Note also that commits can also be
fenced because a member falls out of the group (e.g. because it does not revoke
partitions within the rebalance timeout). At this point, its commits will be
fenced solely based on the member ID (which is not part of the group anymore).
We therefore ignore this case in this document, and only consider zombie
commits from members that are still part of the group.
h2. Downsides of the current approach
This fencing is, however, unnecessarily strict. Assume, for example, a member
owns P1 at epoch 1. The broker-side member epoch is bumped to 2, but the member
still has P1 assigned at epoch 2. The member may not learn about the new
broker-side member epoch in time, and submit an offset commit commit for P1
with epoch 1. This is not a zombie commit request as define above (because P1
was not reassigned to a different member), but it will still be rejected by a
KIP-848 group coordinator.
The trouble with this fencing mechanism is that it is very difficult to avoid
the broker-side member epoch being bumped concurrently with an offset commit.
Seen from the client-side, the broker-side member epoch may be bumped at any
time while a heartbeat to the group coordinator is in-flight. To make sure the
member epoch sent in an offset commit request is up-to-date would require
making sure that no consumer group or streams group heartbeat request is
in-flight at the same time.
h2. Why a broker-side fix is warranted
This problem is particularly challenging to solve on the client side for
transactional offset commits, since they are performed by the producer, not the
consumer, and the producer has no way of knowing when a consumer group
heartbeat or streams group heartbeat is in-flight. The member epoch is passed
from the Java consumer to the Java producer using the {{ConsumerGroupMetadata}}
object, which is passed into {{{}sendOffsetsToTransaction{}}}. By the time the
transactional offset commit is sent, the member epoch may be stale, the broker
will return an {{ILLEGAL_GENERATION}} exception. This will force the Java
producer into an abortable error state, surfacing the error as a
{{CommitFailedException}} to the user, the user has no other way to recover
from this to abort the transaction.
This may hurt in any Kafka client application, since aborting transactions
means throwing away work and restarting from an earlier point. But it is a
particularly big problem in Kafka Streams with exactly-once semantics, where
aborting a transaction usually means wiping and restoring the state store, so
each aborted transaction means some downtime for apps using state stores of
non-negligible size. Furthermore, since Kafka Streams commits every 100ms by
default in EOS, this is likely to happen fairly often.
h1. Conceptual Design
h2. Identifying zombies using the largest epoch a partition was revoked
To derive a more relaxed check, we need to identify an epoch which separates
zombie commits from commits of the current owner. As mentioned above, zombie
commit requests are commit requests that include a partition, member ID and
member epoch combination, so that the member owned the partition at that epoch.
However, the partition has since been reassigned to a different member.
Most precisely, on the level of a single partition, a relaxed offset commit
check can be defined using a *revocation epoch* for each partition and each
member, which is the largest epoch at which the partition was revoked from that
member. To fence from zombie commit requests, we can reject all offset commit
requests from a member at any member epoch that is smaller or equal than the
revocation epoch for that member and that partition.
{{Partition-Level RevocationEpoch < Client-Side Member Epoch <= Broker-Side
Member Epoch}}
The correctness of this is obvious: all commits of the current partition owner
will be accepted, since the {{Client-Side Member Epoch}} of the current owner
must always have an epoch that is larger than the revocation epoch (a partition
that is revoked in one epoch is never reassigned in the same epoch). It will
also correctly reject any zombie commits from that member, because KIP-848
guarantees that if a partition was owned by the member at {{Client-Side Member
Epoch}} (which we assume for zombie commits), but it was reassigned since, then
this partition must have been revoked from the member in an epoch greater or
equal to {{{}Client-Side Member Epoch{}}}, so we must have {{Partition-Level
RevocationEpoch >= Client-Side Member Epoch}} for zombie commits.
h2. From partition-and-member-level to member-level
However, tracking such a revocation epoch for every partition and every member
would be tedious. Instead, we can use an upper bound for partition-level
revocation epochs for every member: We know that there must be an epoch that is
larger or equal to all partition-level revocation epochs. It is the largest
epoch at which _any_ partition was revoked from the member, and which is
smaller than the current {{{}Broker-Side Member Epoch{}}}. We call it the
{*}member-level revocation epoch{*}, or {{{}RevocationEpoch{}}}.
The offset commit request validation can then be defined as
{{RevocationEpoch < Client-Side Member Epoch <= Broker-Side Member Epoch}}
Since {{RevocationEpoch}} is an upper bound on {{{}Partition-Level
RevocationEpoch{}}}, this check correctly fences all zombie commits by an
extension of the argument above. However, this formulation is less precise than
the partition-level revocation epoch, and there may still be cases where
commits of the current owner are fenced as well. This can, in particular,
happen when a partition P1 is revoked from the client, causing a broker-side
bump of {{RevocationEpoch}} and {{{}Broker-Side Member Epoch{}}}, but the
client attempts to commit offsets for a different partition P2 before learning
about the updated {{{}Broker-Side Member Epoch{}}}.
h2. Completely avoiding commit request errors through the use of rebalance
listeners
As explained above, the member-level revocation epoch check makes
{{{}STALE_MEMBER_EPOCH{}}}/{{{}ILLEGAL_GENERATION{}}} much less likely, but it
can still happen that a partition is revoked from the member, and while the
{{Broker-Side Member Epoch}} and {{RevocationEpoch}} are bumped on the broker,
the member concurrently attempts to commit offsets.
However, compared to any regular {{Broker-Side Member Epoch}} bump, a bump of
the {{RevocationEpoch}} always involves the client, which in fact needs to
revoke the partitions before the epoch bump can happen. This means that the
client is “in-the-loop”, and we can prevent failing commits by never committing
offsets between revoking partitions on the client, and the corresponding
broker-side member epoch bump.
A power client like Kafka Streams will therefore be able to completely avoid
{{{}STALE_MEMBER_EPOCH{}}}/{{{}ILLEGAL_GENERATION{}}} errors in the normal case
(that is, in the absence of the commit requests actually being delayed to
become zombie requests), in conjunction with the rebalance listener. We can
completely avoid failing offset commits (both transactional and
non-transactional):
# In {{{}onPartitionsRevoked{}}}/ {{{}onTasksRevoked{}}}, commit current
offsets / transaction and store the current
{{ConsumerGroupMetadata.generationId}} to remember that it is a revocation
epoch.
# Before calling {{Consumer.commit}} or
{{{}Producer.sendOffsetToTransaction{}}}, we compare the current value of
{{ConsumerGroupMetadata.generationId}} to the stored revocation epoch. If we
are still on the revocation epoch, we skip the commit.
We know that any partition revocation will require {{{}onPartitionsRevoked{}}}/
{{onTasksRevoked}} to be invoked on the client-side, and after revocations have
been completed and the broker-side member epoch has been communicated to the
client, {{ConsumerGroupMetadata.generationId}} will be updated.
This mechanism can be used by any user of the Java clients, not just Kafka
Streams. The mechanism can also be implemented within the Java consumer and
producer. The consumer would have to keep the last revocation epoch internally
and expose it to the producer via {{{}ConsumerGroupMetadata{}}}, and delay
committing of any offsets of non-revoked partitions until the member epoch is
bumped beyond the revocation epoch. Alternatively, any revocation should block
the application thread inside {{{}Consumer.poll{}}}, until the new epoch has
been received from the broker. However, we don’t include these changes in this
design document, and it is up for discussion whether we’d need to implement it,
since most users won’t need to completely avoid
{{{}STALE_MEMBER_EPOCH{}}}/{{{}ILLEGAL_GENERATION{}}}.
h1. Proposed Changes
h3. Introducing Per-Member Revocation Epoch
We extend the model of a consumer group / streams group member with a single
integer called the revocation epoch for each member of a group. The revocation
epoch is initially set to 0, and we have the invariant {{{}RevocationEpoch <
MemberEpoch <= TargetAssignmentEpoch <= GroupEpoch{}}}.
As before, whenever a member completes revocation of all partitions that are
pending revocation, its {{MemberEpoch}} is bumped to the
{{{}TargetAssignmentEpoch{}}}. At the same time, the {{RevocationEpoch}} is
bumped to the previous value of the {{{}MemberEpoch{}}}. This maintains the
invariant above. If the {{MemberEpoch}} is bumped to the
{{TargetAssignmentEpoch}} without requiring any revocations, the
{{RevocationEpoch}} is not changed.
The {{RevocationEpoch}} is added as a field to the
{{{}ConsumerGroupCurrentMemberAssignmentValue{}}}, so that it can be stored and
replayed from the committed offsets topic.
For streams groups, we will use the same logic but add the field
{{ActiveTaskRevocationEpoch}} to
{{{}StreamsGroupCurrentMemberAssignmentValue{}}}, since only active tasks
commit offsets in Kafka Streams.
h3. Relaxing the offset commit validation
We replace the current check offset commit validation check
{{Client-Side Member Epoch == Broker-Side Member Epoch}}
by
{{RevocationEpoch < Client-Side Member Epoch <= Broker-Side Member Epoch}}
h3. Disabling streams commits between revocation and member epoch bumps
In Kafka Streams, we keep track if we are currently on a revocation epoch. In
{{{}onTasksRevoked{}}}, we store {{ConsumerGroupMetadata.generationId}} as
revocation epoch, and disable committing (via the consumer or the producer) as
long as we are still on that epoch (by comparing against the current value of
{{{}ConsumerGroupMetadata.generationId{}}}. This disablement only affects
offset and transaction commits performed outside of {{{}onTasksRevoked{}}}. We
can still safely commit inside {{{}onTasksRevoked{}}}, since we know that any
broker-side member epoch bumps are blocked as long the member is revoking tasks.
was:
h2. Fencing offset commits
In the Kafka protocol, when a consumer commits offsets or a producer tries to
add offsets to a transaction, it includes its epoch/generation of the consumer
group. The point of this is for the group coordinator to fence against zombie
commit requests, that is, commit requests that include an offset for a
partition that was since reassigned to a different member. If such a guard was
not in place, a zombie offset commit may overwrite offsets of the new owner, or
its offsets may be committed to the consumer offset topic but not be included
in the result of the new owners offset fetch request.
In KIP-848, when receiving an offset commit request that includes the
client-side member epoch and a member ID, the group coordinator performs the
check
{{Client-Side Member Epoch == Broker-Side Member Epoch}}
and, if the check fails, returns a {{STALE_MEMBER_EPOCH}} error for regular
offset commits and a {{ILLEGAL_GENERATION}} for transactional offset commits.
If the member epoch sent in the request is the current broker-side member
epoch, KIP-848 guarantees that the partition cannot also be owned by a
different member at the same or a larger epoch. Therefore, this is sufficient
for fencing zombie commits. Note that we assume zombie commits will always
contain offsets for partitions that were owned by the member at the member
epoch sent in the request. Commit requests that commit offsets for partitions
that are _not_ owned by the member in that epoch, are not possible in a correct
client-side implementation of the protocol.
Note that the broker-side member epoch is not the group epoch or the target
assignment epoch. For details, see KIP-848. Note also that commits can also be
fenced because a member falls out of the group (e.g. because it does not revoke
partitions within the rebalance timeout). At this point, its commits will be
fenced solely based on the member ID (which is not part of the group anymore).
We therefore ignore this case in this document, and only consider zombie
commits from members that are still part of the group.
h2. Downsides of the current approach
This fencing is, however, unnecessarily strict. Assume, for example, a member
owns P1 at epoch 1. The broker-side member epoch is bumped to 2, but the member
still has P1 assigned at epoch 2. The member may not learn about the new
broker-side member epoch in time, and submit an offset commit commit for P1
with epoch 1. This is not a zombie commit request as define above (because P1
was not reassigned to a different member), but it will still be rejected by a
KIP-848 group coordinator.
The trouble with this fencing mechanism is that it is very difficult to avoid
the broker-side member epoch being bumped concurrently with an offset commit.
Seen from the client-side, the broker-side member epoch may be bumped at any
time while a heartbeat to the group coordinator is in-flight. To make sure the
member epoch sent in an offset commit request is up-to-date would require
making sure that no consumer group or streams group heartbeat request is
in-flight at the same time.
h2. Why a broker-side fix is warranted
This problem is particularly challenging to solve on the client side for
transactional offset commits, since they are performed by the producer, not the
consumer, and the producer has no way of knowing when a consumer group
heartbeat or streams group heartbeat is in-flight. The member epoch is passed
from the Java consumer to the Java producer using the {{ConsumerGroupMetadata}}
object, which is passed into {{{}sendOffsetsToTransaction{}}}. By the time the
transactional offset commit is sent, the member epoch may be stale, the broker
will return an {{ILLEGAL_GENERATION}} exception. This will force the Java
producer into an abortable error state, surfacing the error as a
{{CommitFailedException}} to the user, the user has no other way to recover
from this to abort the transaction.
This may hurt in any Kafka client application, since aborting transactions
means throwing away work and restarting from an earlier point. But it is a
particularly big problem in Kafka Streams with exactly-once semantics, where
aborting a transaction usually means wiping and restoring the state store, so
each aborted transaction means some downtime for apps using state stores of
non-negligible size. Furthermore, since Kafka Streams commits every 100ms by
default in EOS, this is likely to happen fairly often.
h1. Conceptual Design
In this design document, we therefore propose to relax the condition for offset
commit fencing.
h2. Identifying zombies using the largest epoch a partition was revoked
To derive a more relaxed check, we need to identify an epoch which separates
zombie commits from commits of the current owner. As mentioned above, zombie
commit requests are commit requests that include a partition, member ID and
member epoch combination, so that the member owned the partition at that epoch.
However, the partition has since been reassigned to a different member.
Most precisely, on the level of a single partition, a relaxed offset commit
check can be defined using a *revocation epoch* for each partition and each
member, which is the largest epoch at which the partition was revoked from that
member. To fence from zombie commit requests, we can reject all offset commit
requests from a member at any member epoch that is smaller or equal than the
revocation epoch for that member and that partition.
{{Partition-Level RevocationEpoch < Client-Side Member Epoch <= Broker-Side
Member Epoch}}
The correctness of this is obvious: all commits of the current partition owner
will be accepted, since the {{Client-Side Member Epoch}} of the current owner
must always have an epoch that is larger than the revocation epoch (a partition
that is revoked in one epoch is never reassigned in the same epoch). It will
also correctly reject any zombie commits from that member, because KIP-848
guarantees that if a partition was owned by the member at {{Client-Side Member
Epoch}} (which we assume for zombie commits), but it was reassigned since, then
this partition must have been revoked from the member in an epoch greater or
equal to {{{}Client-Side Member Epoch{}}}, so we must have {{Partition-Level
RevocationEpoch >= Client-Side Member Epoch}} for zombie commits.
h2. From partition-and-member-level to member-level
However, tracking such a revocation epoch for every partition and every member
would be tedious. Instead, we can use an upper bound for partition-level
revocation epochs for every member: We know that there must be an epoch that is
larger or equal to all partition-level revocation epochs. It is the largest
epoch at which _any_ partition was revoked from the member, and which is
smaller than the current {{{}Broker-Side Member Epoch{}}}. We call it the
{*}member-level revocation epoch{*}, or {{{}RevocationEpoch{}}}.
The offset commit request validation can then be defined as
{{RevocationEpoch < Client-Side Member Epoch <= Broker-Side Member Epoch}}
Since {{RevocationEpoch}} is an upper bound on {{{}Partition-Level
RevocationEpoch{}}}, this check correctly fences all zombie commits by an
extension of the argument above. However, this formulation is less precise than
the partition-level revocation epoch, and there may still be cases where
commits of the current owner are fenced as well. This can, in particular,
happen when a partition P1 is revoked from the client, causing a broker-side
bump of {{RevocationEpoch}} and {{{}Broker-Side Member Epoch{}}}, but the
client attempts to commit offsets for a different partition P2 before learning
about the updated {{{}Broker-Side Member Epoch{}}}.
h2. Completely avoiding commit request errors through the use of rebalance
listeners
As explained above, the member-level revocation epoch check makes
{{{}STALE_MEMBER_EPOCH{}}}/{{{}ILLEGAL_GENERATION{}}} much less likely, but it
can still happen that a partition is revoked from the member, and while the
{{Broker-Side Member Epoch}} and {{RevocationEpoch}} are bumped on the broker,
the member concurrently attempts to commit offsets.
However, compared to any regular {{Broker-Side Member Epoch}} bump, a bump of
the {{RevocationEpoch}} always involves the client, which in fact needs to
revoke the partitions before the epoch bump can happen. This means that the
client is “in-the-loop”, and we can prevent failing commits by never committing
offsets between revoking partitions on the client, and the corresponding
broker-side member epoch bump.
A power client like Kafka Streams will therefore be able to completely avoid
{{{}STALE_MEMBER_EPOCH{}}}/{{{}ILLEGAL_GENERATION{}}} errors in the normal case
(that is, in the absence of the commit requests actually being delayed to
become zombie requests), in conjunction with the rebalance listener. We can
completely avoid failing offset commits (both transactional and
non-transactional):
# In {{{}onPartitionsRevoked{}}}/ {{{}onTasksRevoked{}}}, commit current
offsets / transaction and store the current
{{ConsumerGroupMetadata.generationId}} to remember that it is a revocation
epoch.
# Before calling {{Consumer.commit}} or
{{{}Producer.sendOffsetToTransaction{}}}, we compare the current value of
{{ConsumerGroupMetadata.generationId}} to the stored revocation epoch. If we
are still on the revocation epoch, we skip the commit.
We know that any partition revocation will require {{{}onPartitionsRevoked{}}}/
{{onTasksRevoked}} to be invoked on the client-side, and after revocations have
been completed and the broker-side member epoch has been communicated to the
client, {{ConsumerGroupMetadata.generationId}} will be updated.
This mechanism can be used by any user of the Java clients, not just Kafka
Streams. The mechanism can also be implemented within the Java consumer and
producer. The consumer would have to keep the last revocation epoch internally
and expose it to the producer via {{{}ConsumerGroupMetadata{}}}, and delay
committing of any offsets of non-revoked partitions until the member epoch is
bumped beyond the revocation epoch. Alternatively, any revocation should block
the application thread inside {{{}Consumer.poll{}}}, until the new epoch has
been received from the broker. However, we don’t include these changes in this
design document, and it is up for discussion whether we’d need to implement it,
since most users won’t need to completely avoid
{{{}STALE_MEMBER_EPOCH{}}}/{{{}ILLEGAL_GENERATION{}}}.
h1. Proposed Changes
h3. Introducing Per-Member Revocation Epoch
We extend the model of a consumer group / streams group member with a single
integer called the revocation epoch for each member of a group. The revocation
epoch is initially set to 0, and we have the invariant {{{}RevocationEpoch <
MemberEpoch <= TargetAssignmentEpoch <= GroupEpoch{}}}.
As before, whenever a member completes revocation of all partitions that are
pending revocation, its {{MemberEpoch}} is bumped to the
{{{}TargetAssignmentEpoch{}}}. At the same time, the {{RevocationEpoch}} is
bumped to the previous value of the {{{}MemberEpoch{}}}. This maintains the
invariant above. If the {{MemberEpoch}} is bumped to the
{{TargetAssignmentEpoch}} without requiring any revocations, the
{{RevocationEpoch}} is not changed.
The {{RevocationEpoch}} is added as a field to the
{{{}ConsumerGroupCurrentMemberAssignmentValue{}}}, so that it can be stored and
replayed from the committed offsets topic.
For streams groups, we will use the same logic but add the field
{{ActiveTaskRevocationEpoch}} to
{{{}StreamsGroupCurrentMemberAssignmentValue{}}}, since only active tasks
commit offsets in Kafka Streams.
h3. Relaxing the offset commit validation
We replace the current check offset commit validation check
{{Client-Side Member Epoch == Broker-Side Member Epoch}}
by
{{RevocationEpoch < Client-Side Member Epoch <= Broker-Side Member Epoch}}
h3. Disabling streams commits between revocation and member epoch bumps
In Kafka Streams, we keep track if we are currently on a revocation epoch. In
{{{}onTasksRevoked{}}}, we store {{ConsumerGroupMetadata.generationId}} as
revocation epoch, and disable committing (via the consumer or the producer) as
long as we are still on that epoch (by comparing against the current value of
{{{}ConsumerGroupMetadata.generationId{}}}. This disablement only affects
offset and transaction commits performed outside of {{{}onTasksRevoked{}}}. We
can still safely commit inside {{{}onTasksRevoked{}}}, since we know that any
broker-side member epoch bumps are blocked as long the member is revoking tasks.
> Relax offset commit validation to allow member epochs since the last
> revocation
> -------------------------------------------------------------------------------
>
> Key: KAFKA-19779
> URL: https://issues.apache.org/jira/browse/KAFKA-19779
> Project: Kafka
> Issue Type: Sub-task
> Reporter: Lucas Brutschy
> Assignee: Lucas Brutschy
> Priority: Major
>
> h2. Fencing offset commits
> In the Kafka protocol, when a consumer commits offsets or a producer tries to
> add offsets to a transaction, it includes its epoch/generation of the
> consumer group. The point of this is for the group coordinator to fence
> against zombie commit requests, that is, commit requests that include an
> offset for a partition that was since reassigned to a different member. If
> such a guard was not in place, a zombie offset commit may overwrite offsets
> of the new owner, or its offsets may be committed to the consumer offset
> topic but not be included in the result of the new owners offset fetch
> request.
> In KIP-848, when receiving an offset commit request that includes the
> client-side member epoch and a member ID, the group coordinator performs the
> check
> {{Client-Side Member Epoch == Broker-Side Member Epoch}}
> and, if the check fails, returns a {{STALE_MEMBER_EPOCH}} error for regular
> offset commits and a {{ILLEGAL_GENERATION}} for transactional offset commits.
> If the member epoch sent in the request is the current broker-side member
> epoch, KIP-848 guarantees that the partition cannot also be owned by a
> different member at the same or a larger epoch. Therefore, this is sufficient
> for fencing zombie commits. Note that we assume zombie commits will always
> contain offsets for partitions that were owned by the member at the member
> epoch sent in the request. Commit requests that commit offsets for partitions
> that are _not_ owned by the member in that epoch, are not possible in a
> correct client-side implementation of the protocol.
> Note that the broker-side member epoch is not the group epoch or the target
> assignment epoch. For details, see KIP-848. Note also that commits can also
> be fenced because a member falls out of the group (e.g. because it does not
> revoke partitions within the rebalance timeout). At this point, its commits
> will be fenced solely based on the member ID (which is not part of the group
> anymore). We therefore ignore this case in this document, and only consider
> zombie commits from members that are still part of the group.
> h2. Downsides of the current approach
> This fencing is, however, unnecessarily strict. Assume, for example, a member
> owns P1 at epoch 1. The broker-side member epoch is bumped to 2, but the
> member still has P1 assigned at epoch 2. The member may not learn about the
> new broker-side member epoch in time, and submit an offset commit commit for
> P1 with epoch 1. This is not a zombie commit request as define above (because
> P1 was not reassigned to a different member), but it will still be rejected
> by a KIP-848 group coordinator.
> The trouble with this fencing mechanism is that it is very difficult to avoid
> the broker-side member epoch being bumped concurrently with an offset commit.
> Seen from the client-side, the broker-side member epoch may be bumped at any
> time while a heartbeat to the group coordinator is in-flight. To make sure
> the member epoch sent in an offset commit request is up-to-date would require
> making sure that no consumer group or streams group heartbeat request is
> in-flight at the same time.
> h2. Why a broker-side fix is warranted
> This problem is particularly challenging to solve on the client side for
> transactional offset commits, since they are performed by the producer, not
> the consumer, and the producer has no way of knowing when a consumer group
> heartbeat or streams group heartbeat is in-flight. The member epoch is passed
> from the Java consumer to the Java producer using the
> {{ConsumerGroupMetadata}} object, which is passed into
> {{{}sendOffsetsToTransaction{}}}. By the time the transactional offset commit
> is sent, the member epoch may be stale, the broker will return an
> {{ILLEGAL_GENERATION}} exception. This will force the Java producer into an
> abortable error state, surfacing the error as a {{CommitFailedException}} to
> the user, the user has no other way to recover from this to abort the
> transaction.
> This may hurt in any Kafka client application, since aborting transactions
> means throwing away work and restarting from an earlier point. But it is a
> particularly big problem in Kafka Streams with exactly-once semantics, where
> aborting a transaction usually means wiping and restoring the state store, so
> each aborted transaction means some downtime for apps using state stores of
> non-negligible size. Furthermore, since Kafka Streams commits every 100ms by
> default in EOS, this is likely to happen fairly often.
> h1. Conceptual Design
> h2. Identifying zombies using the largest epoch a partition was revoked
> To derive a more relaxed check, we need to identify an epoch which separates
> zombie commits from commits of the current owner. As mentioned above, zombie
> commit requests are commit requests that include a partition, member ID and
> member epoch combination, so that the member owned the partition at that
> epoch. However, the partition has since been reassigned to a different member.
> Most precisely, on the level of a single partition, a relaxed offset commit
> check can be defined using a *revocation epoch* for each partition and each
> member, which is the largest epoch at which the partition was revoked from
> that member. To fence from zombie commit requests, we can reject all offset
> commit requests from a member at any member epoch that is smaller or equal
> than the revocation epoch for that member and that partition.
> {{Partition-Level RevocationEpoch < Client-Side Member Epoch <= Broker-Side
> Member Epoch}}
> The correctness of this is obvious: all commits of the current partition
> owner will be accepted, since the {{Client-Side Member Epoch}} of the current
> owner must always have an epoch that is larger than the revocation epoch (a
> partition that is revoked in one epoch is never reassigned in the same
> epoch). It will also correctly reject any zombie commits from that member,
> because KIP-848 guarantees that if a partition was owned by the member at
> {{Client-Side Member Epoch}} (which we assume for zombie commits), but it was
> reassigned since, then this partition must have been revoked from the member
> in an epoch greater or equal to {{{}Client-Side Member Epoch{}}}, so we must
> have {{Partition-Level RevocationEpoch >= Client-Side Member Epoch}} for
> zombie commits.
> h2. From partition-and-member-level to member-level
> However, tracking such a revocation epoch for every partition and every
> member would be tedious. Instead, we can use an upper bound for
> partition-level revocation epochs for every member: We know that there must
> be an epoch that is larger or equal to all partition-level revocation epochs.
> It is the largest epoch at which _any_ partition was revoked from the member,
> and which is smaller than the current {{{}Broker-Side Member Epoch{}}}. We
> call it the {*}member-level revocation epoch{*}, or {{{}RevocationEpoch{}}}.
> The offset commit request validation can then be defined as
> {{RevocationEpoch < Client-Side Member Epoch <= Broker-Side Member Epoch}}
> Since {{RevocationEpoch}} is an upper bound on {{{}Partition-Level
> RevocationEpoch{}}}, this check correctly fences all zombie commits by an
> extension of the argument above. However, this formulation is less precise
> than the partition-level revocation epoch, and there may still be cases where
> commits of the current owner are fenced as well. This can, in particular,
> happen when a partition P1 is revoked from the client, causing a broker-side
> bump of {{RevocationEpoch}} and {{{}Broker-Side Member Epoch{}}}, but the
> client attempts to commit offsets for a different partition P2 before
> learning about the updated {{{}Broker-Side Member Epoch{}}}.
> h2. Completely avoiding commit request errors through the use of rebalance
> listeners
> As explained above, the member-level revocation epoch check makes
> {{{}STALE_MEMBER_EPOCH{}}}/{{{}ILLEGAL_GENERATION{}}} much less likely, but
> it can still happen that a partition is revoked from the member, and while
> the {{Broker-Side Member Epoch}} and {{RevocationEpoch}} are bumped on the
> broker, the member concurrently attempts to commit offsets.
> However, compared to any regular {{Broker-Side Member Epoch}} bump, a bump of
> the {{RevocationEpoch}} always involves the client, which in fact needs to
> revoke the partitions before the epoch bump can happen. This means that the
> client is “in-the-loop”, and we can prevent failing commits by never
> committing offsets between revoking partitions on the client, and the
> corresponding broker-side member epoch bump.
> A power client like Kafka Streams will therefore be able to completely avoid
> {{{}STALE_MEMBER_EPOCH{}}}/{{{}ILLEGAL_GENERATION{}}} errors in the normal
> case (that is, in the absence of the commit requests actually being delayed
> to become zombie requests), in conjunction with the rebalance listener. We
> can completely avoid failing offset commits (both transactional and
> non-transactional):
> # In {{{}onPartitionsRevoked{}}}/ {{{}onTasksRevoked{}}}, commit current
> offsets / transaction and store the current
> {{ConsumerGroupMetadata.generationId}} to remember that it is a revocation
> epoch.
> # Before calling {{Consumer.commit}} or
> {{{}Producer.sendOffsetToTransaction{}}}, we compare the current value of
> {{ConsumerGroupMetadata.generationId}} to the stored revocation epoch. If we
> are still on the revocation epoch, we skip the commit.
> We know that any partition revocation will require
> {{{}onPartitionsRevoked{}}}/ {{onTasksRevoked}} to be invoked on the
> client-side, and after revocations have been completed and the broker-side
> member epoch has been communicated to the client,
> {{ConsumerGroupMetadata.generationId}} will be updated.
> This mechanism can be used by any user of the Java clients, not just Kafka
> Streams. The mechanism can also be implemented within the Java consumer and
> producer. The consumer would have to keep the last revocation epoch
> internally and expose it to the producer via {{{}ConsumerGroupMetadata{}}},
> and delay committing of any offsets of non-revoked partitions until the
> member epoch is bumped beyond the revocation epoch. Alternatively, any
> revocation should block the application thread inside {{{}Consumer.poll{}}},
> until the new epoch has been received from the broker. However, we don’t
> include these changes in this design document, and it is up for discussion
> whether we’d need to implement it, since most users won’t need to completely
> avoid {{{}STALE_MEMBER_EPOCH{}}}/{{{}ILLEGAL_GENERATION{}}}.
> h1. Proposed Changes
> h3. Introducing Per-Member Revocation Epoch
> We extend the model of a consumer group / streams group member with a single
> integer called the revocation epoch for each member of a group. The
> revocation epoch is initially set to 0, and we have the invariant
> {{{}RevocationEpoch < MemberEpoch <= TargetAssignmentEpoch <= GroupEpoch{}}}.
> As before, whenever a member completes revocation of all partitions that are
> pending revocation, its {{MemberEpoch}} is bumped to the
> {{{}TargetAssignmentEpoch{}}}. At the same time, the {{RevocationEpoch}} is
> bumped to the previous value of the {{{}MemberEpoch{}}}. This maintains the
> invariant above. If the {{MemberEpoch}} is bumped to the
> {{TargetAssignmentEpoch}} without requiring any revocations, the
> {{RevocationEpoch}} is not changed.
> The {{RevocationEpoch}} is added as a field to the
> {{{}ConsumerGroupCurrentMemberAssignmentValue{}}}, so that it can be stored
> and replayed from the committed offsets topic.
> For streams groups, we will use the same logic but add the field
> {{ActiveTaskRevocationEpoch}} to
> {{{}StreamsGroupCurrentMemberAssignmentValue{}}}, since only active tasks
> commit offsets in Kafka Streams.
> h3. Relaxing the offset commit validation
> We replace the current check offset commit validation check
> {{Client-Side Member Epoch == Broker-Side Member Epoch}}
> by
> {{RevocationEpoch < Client-Side Member Epoch <= Broker-Side Member Epoch}}
> h3. Disabling streams commits between revocation and member epoch bumps
> In Kafka Streams, we keep track if we are currently on a revocation epoch. In
> {{{}onTasksRevoked{}}}, we store {{ConsumerGroupMetadata.generationId}} as
> revocation epoch, and disable committing (via the consumer or the producer)
> as long as we are still on that epoch (by comparing against the current value
> of {{{}ConsumerGroupMetadata.generationId{}}}. This disablement only affects
> offset and transaction commits performed outside of {{{}onTasksRevoked{}}}.
> We can still safely commit inside {{{}onTasksRevoked{}}}, since we know that
> any broker-side member epoch bumps are blocked as long the member is revoking
> tasks.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)