[
https://issues.apache.org/jira/browse/KAFKA-19779?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Lucas Brutschy updated KAFKA-19779:
-----------------------------------
Description:
h2. Fencing offset commits
In the Kafka protocol, when a consumer commits offsets or a producer tries to
add offsets to a transaction, it includes its epoch/generation of the consumer
group. The point of this is for the group coordinator to fence against zombie
commit requests, that is, commit requests that include an offset for a
partition that was since reassigned to a different member. If such a guard was
not in place, a zombie offset commit may overwrite offsets of the new owner, or
its offsets may be committed to the consumer offset topic but not be included
in the result of the new owners offset fetch request.
In KIP-848, when receiving an offset commit request that includes the
client-side member epoch and a member ID, the group coordinator performs the
check
{{Client-Side Member Epoch == Broker-Side Member Epoch}}
and, if the check fails, returns a {{STALE_MEMBER_EPOCH}} error for regular
offset commits and a {{ILLEGAL_GENERATION}} for transactional offset commits.
If the member epoch sent in the request is the current broker-side member
epoch, KIP-848 guarantees that the partition cannot also be owned by a
different member at the same or a larger epoch. Therefore, this is sufficient
for fencing zombie commits. Note that we assume zombie commits will always
contain offsets for partitions that were owned by the member at the member
epoch sent in the request. Commit requests that commit offsets for partitions
that are _not_ owned by the member in that epoch, are not possible in a correct
client-side implementation of the protocol.
Note that the broker-side member epoch is not the group epoch or the target
assignment epoch. For details, see KIP-848. Note also that commits can also be
fenced because a member falls out of the group (e.g. because it does not revoke
partitions within the rebalance timeout). At this point, its commits will be
fenced solely based on the member ID (which is not part of the group anymore).
We therefore ignore this case in this document, and only consider zombie
commits from members that are still part of the group.
h2. Downsides of the current approach
This fencing is, however, unnecessarily strict. Assume, for example, a member
owns P1 at epoch 1. The broker-side member epoch is bumped to 2, but the member
still has P1 assigned at epoch 2. The member may not learn about the new
broker-side member epoch in time, and submit an offset commit commit for P1
with epoch 1. This is not a zombie commit request as define above (because P1
was not reassigned to a different member), but it will still be rejected by a
KIP-848 group coordinator.
The trouble with this fencing mechanism is that it is very difficult to avoid
the broker-side member epoch being bumped concurrently with an offset commit.
Seen from the client-side, the broker-side member epoch may be bumped at any
time while a heartbeat to the group coordinator is in-flight. To make sure the
member epoch sent in an offset commit request is up-to-date would require
making sure that no consumer group or streams group heartbeat request is
in-flight at the same time.
h2. Why a broker-side fix is warranted
This problem is particularly challenging to solve on the client side for
transactional offset commits, since they are performed by the producer, not the
consumer, and the producer has no way of knowing when a consumer group
heartbeat or streams group heartbeat is in-flight. The member epoch is passed
from the Java consumer to the Java producer using the {{ConsumerGroupMetadata}}
object, which is passed into {{{}sendOffsetsToTransaction{}}}. By the time the
transactional offset commit is sent, the member epoch may be stale, the broker
will return an {{ILLEGAL_GENERATION}} exception. This will force the Java
producer into an abortable error state, surfacing the error as a
{{CommitFailedException}} to the user, the user has no other way to recover
from this to abort the transaction.
This may hurt in any Kafka client application, since aborting transactions
means throwing away work and restarting from an earlier point. But it is a
particularly big problem in Kafka Streams with exactly-once semantics, where
aborting a transaction usually means wiping and restoring the state store, so
each aborted transaction means some downtime for apps using state stores of
non-negligible size. Furthermore, since Kafka Streams commits every 100ms by
default in EOS, this is likely to happen fairly often.
h1. Conceptual Design
n this design document, we therefore propose to relax the condition for offset
commit fencing.
h2. Identifying zombies using the last epoch a partition was assigned to the
member
To derive a more relaxed check, we need to identify an epoch which separates
zombie commits from commits of the current owner. As mentioned above, zombie
commit requests are commit requests that include a partition, member ID and
member epoch combination, so that the member owned the partition at that epoch.
However, the partition has since been reassigned to a different member.
Most precisely, on the level of a single partition, a relaxed offset commit
check can be defined using a *assignment epoch* for each assigned partition and
each member, which is the epoch at which the partition was assigned to that
member. To fence from zombie commit requests, we can reject all offset commit
requests from a member that either does not have the partition assigned, or
that includes any member epoch that is smaller or equal than the assignment
epoch for that member and that partition.
Assignment Epoch <= Client-Side Member Epoch <= Broker-Side Member Epoch
The correctness of this is obvious: all commits of the current partition owner
will be accepted, since the Client-Side Member Epoch of the current owner must
always have an epoch that is larger or equal than the assignment epoch (a
partition that is revoked in one epoch is never reassigned in the same epoch).
It will also correctly reject any zombie commits from that member, because if a
partition was owned by the member A at Client-Side Member Epoch (which we
assume for zombie commits), but it was reassigned to member B since, we have
two possible cases:
# Member A currently does not have the partition assigned
# Member A does currently have the partition assigned, but then it must have
been reassigned to member A after being assigned to member B. By KIP-848 this
cannot all happen in the same epoch, so we must have Assignment Epoch >
Client-Side Member Epoch.
h3. Differences to the design above
This design does not need to disable commits on the client-side. The need to
disable commits came from the fact that we are tracking epochs “imprecisely”,
on the member-level and not on the partition-level. So in the RevocationEpoch
design, when we have just revoked P2 on the client, we may attempt to commit a
partition P1, triggering the race condition because the broker can concurrently
bump the revocation epoch for that member because of the revocation of P2. We
prevent this by disabling commits while a partition is revoked, and by “while a
partition is revoked” I mean the timeframe from executing the revocation on the
client, and seeing the following epoch bump on the client. In the
partition-level AssignmentEpoch design, if we are committing P1, we are still
convinced that we own P1, so we must also still own it on the broker. At the
same time, we may remove the assignment epoch for P2 on the broker, but it
doesn’t matter since this doesn’t impact whether we can commit P1, and we are
not going to try to commit P2 after having revoked it on the client side.
h1. Proposed Changes
h3. Introducing Per-Member and Per-Partition Assignment Epoch
We extend the model of a consumer group / streams group member with one integer
per assigned partition for each member of a group. This includes both
partitions directly assigned to the member, and partitions pending revocation.
The assignment epoch is set to the epoch in which the partition was assigned to
the member, and we have the invariant Assignment Epoch <= MemberEpoch <=
TargetAssignmentEpoch <= GroupEpoch.
The AssignmentEpoch is added as a field to TopicPartitions in
ConsumerGroupCurrentMemberAssignmentValue, so that it can be stored and
replayed from the committed offsets topic.
For streams groups, we will use the same logic but add assignment epochs only
for active tasks in StreamsGroupCurrentMemberAssignmentValue, since only active
tasks commit offsets in Kafka Streams.
h3. Relaxing the offset commit validation
We replace the current check offset commit validation check
Client-Side Member Epoch == Broker-Side Member Epoch
by
Assignment Epoch <= Client-Side Member Epoch <= Broker-Side Member Epoch
where, for simplicity, we can assume the assignment epoch of a partition that
is not assigned to that member to be Integer.maxValue.
was:
h2. Fencing offset commits
In the Kafka protocol, when a consumer commits offsets or a producer tries to
add offsets to a transaction, it includes its epoch/generation of the consumer
group. The point of this is for the group coordinator to fence against zombie
commit requests, that is, commit requests that include an offset for a
partition that was since reassigned to a different member. If such a guard was
not in place, a zombie offset commit may overwrite offsets of the new owner, or
its offsets may be committed to the consumer offset topic but not be included
in the result of the new owners offset fetch request.
In KIP-848, when receiving an offset commit request that includes the
client-side member epoch and a member ID, the group coordinator performs the
check
{{Client-Side Member Epoch == Broker-Side Member Epoch}}
and, if the check fails, returns a {{STALE_MEMBER_EPOCH}} error for regular
offset commits and a {{ILLEGAL_GENERATION}} for transactional offset commits.
If the member epoch sent in the request is the current broker-side member
epoch, KIP-848 guarantees that the partition cannot also be owned by a
different member at the same or a larger epoch. Therefore, this is sufficient
for fencing zombie commits. Note that we assume zombie commits will always
contain offsets for partitions that were owned by the member at the member
epoch sent in the request. Commit requests that commit offsets for partitions
that are _not_ owned by the member in that epoch, are not possible in a correct
client-side implementation of the protocol.
Note that the broker-side member epoch is not the group epoch or the target
assignment epoch. For details, see KIP-848. Note also that commits can also be
fenced because a member falls out of the group (e.g. because it does not revoke
partitions within the rebalance timeout). At this point, its commits will be
fenced solely based on the member ID (which is not part of the group anymore).
We therefore ignore this case in this document, and only consider zombie
commits from members that are still part of the group.
h2. Downsides of the current approach
This fencing is, however, unnecessarily strict. Assume, for example, a member
owns P1 at epoch 1. The broker-side member epoch is bumped to 2, but the member
still has P1 assigned at epoch 2. The member may not learn about the new
broker-side member epoch in time, and submit an offset commit commit for P1
with epoch 1. This is not a zombie commit request as define above (because P1
was not reassigned to a different member), but it will still be rejected by a
KIP-848 group coordinator.
The trouble with this fencing mechanism is that it is very difficult to avoid
the broker-side member epoch being bumped concurrently with an offset commit.
Seen from the client-side, the broker-side member epoch may be bumped at any
time while a heartbeat to the group coordinator is in-flight. To make sure the
member epoch sent in an offset commit request is up-to-date would require
making sure that no consumer group or streams group heartbeat request is
in-flight at the same time.
h2. Why a broker-side fix is warranted
This problem is particularly challenging to solve on the client side for
transactional offset commits, since they are performed by the producer, not the
consumer, and the producer has no way of knowing when a consumer group
heartbeat or streams group heartbeat is in-flight. The member epoch is passed
from the Java consumer to the Java producer using the {{ConsumerGroupMetadata}}
object, which is passed into {{{}sendOffsetsToTransaction{}}}. By the time the
transactional offset commit is sent, the member epoch may be stale, the broker
will return an {{ILLEGAL_GENERATION}} exception. This will force the Java
producer into an abortable error state, surfacing the error as a
{{CommitFailedException}} to the user, the user has no other way to recover
from this to abort the transaction.
This may hurt in any Kafka client application, since aborting transactions
means throwing away work and restarting from an earlier point. But it is a
particularly big problem in Kafka Streams with exactly-once semantics, where
aborting a transaction usually means wiping and restoring the state store, so
each aborted transaction means some downtime for apps using state stores of
non-negligible size. Furthermore, since Kafka Streams commits every 100ms by
default in EOS, this is likely to happen fairly often.
h1. Conceptual Design
h2. Identifying zombies using the largest epoch a partition was revoked
To derive a more relaxed check, we need to identify an epoch which separates
zombie commits from commits of the current owner. As mentioned above, zombie
commit requests are commit requests that include a partition, member ID and
member epoch combination, so that the member owned the partition at that epoch.
However, the partition has since been reassigned to a different member.
Most precisely, on the level of a single partition, a relaxed offset commit
check can be defined using a *revocation epoch* for each partition and each
member, which is the largest epoch at which the partition was revoked from that
member. To fence from zombie commit requests, we can reject all offset commit
requests from a member at any member epoch that is smaller or equal than the
revocation epoch for that member and that partition.
{{Partition-Level RevocationEpoch < Client-Side Member Epoch <= Broker-Side
Member Epoch}}
The correctness of this is obvious: all commits of the current partition owner
will be accepted, since the {{Client-Side Member Epoch}} of the current owner
must always have an epoch that is larger than the revocation epoch (a partition
that is revoked in one epoch is never reassigned in the same epoch). It will
also correctly reject any zombie commits from that member, because KIP-848
guarantees that if a partition was owned by the member at {{Client-Side Member
Epoch}} (which we assume for zombie commits), but it was reassigned since, then
this partition must have been revoked from the member in an epoch greater or
equal to {{{}Client-Side Member Epoch{}}}, so we must have {{Partition-Level
RevocationEpoch >= Client-Side Member Epoch}} for zombie commits.
h2. From partition-and-member-level to member-level
However, tracking such a revocation epoch for every partition and every member
would be tedious. Instead, we can use an upper bound for partition-level
revocation epochs for every member: We know that there must be an epoch that is
larger or equal to all partition-level revocation epochs. It is the largest
epoch at which _any_ partition was revoked from the member, and which is
smaller than the current {{{}Broker-Side Member Epoch{}}}. We call it the
{*}member-level revocation epoch{*}, or {{{}RevocationEpoch{}}}.
The offset commit request validation can then be defined as
{{RevocationEpoch < Client-Side Member Epoch <= Broker-Side Member Epoch}}
Since {{RevocationEpoch}} is an upper bound on {{{}Partition-Level
RevocationEpoch{}}}, this check correctly fences all zombie commits by an
extension of the argument above. However, this formulation is less precise than
the partition-level revocation epoch, and there may still be cases where
commits of the current owner are fenced as well. This can, in particular,
happen when a partition P1 is revoked from the client, causing a broker-side
bump of {{RevocationEpoch}} and {{{}Broker-Side Member Epoch{}}}, but the
client attempts to commit offsets for a different partition P2 before learning
about the updated {{{}Broker-Side Member Epoch{}}}.
h2. Completely avoiding commit request errors through the use of rebalance
listeners
As explained above, the member-level revocation epoch check makes
{{{}STALE_MEMBER_EPOCH{}}}/{{{}ILLEGAL_GENERATION{}}} much less likely, but it
can still happen that a partition is revoked from the member, and while the
{{Broker-Side Member Epoch}} and {{RevocationEpoch}} are bumped on the broker,
the member concurrently attempts to commit offsets.
However, compared to any regular {{Broker-Side Member Epoch}} bump, a bump of
the {{RevocationEpoch}} always involves the client, which in fact needs to
revoke the partitions before the epoch bump can happen. This means that the
client is “in-the-loop”, and we can prevent failing commits by never committing
offsets between revoking partitions on the client, and the corresponding
broker-side member epoch bump.
A power client like Kafka Streams will therefore be able to completely avoid
{{{}STALE_MEMBER_EPOCH{}}}/{{{}ILLEGAL_GENERATION{}}} errors in the normal case
(that is, in the absence of the commit requests actually being delayed to
become zombie requests), in conjunction with the rebalance listener. We can
completely avoid failing offset commits (both transactional and
non-transactional):
# In {{{}onPartitionsRevoked{}}}/ {{{}onTasksRevoked{}}}, commit current
offsets / transaction and store the current
{{ConsumerGroupMetadata.generationId}} to remember that it is a revocation
epoch.
# Before calling {{Consumer.commit}} or
{{{}Producer.sendOffsetToTransaction{}}}, we compare the current value of
{{ConsumerGroupMetadata.generationId}} to the stored revocation epoch. If we
are still on the revocation epoch, we skip the commit.
We know that any partition revocation will require {{{}onPartitionsRevoked{}}}/
{{onTasksRevoked}} to be invoked on the client-side, and after revocations have
been completed and the broker-side member epoch has been communicated to the
client, {{ConsumerGroupMetadata.generationId}} will be updated.
This mechanism can be used by any user of the Java clients, not just Kafka
Streams. The mechanism can also be implemented within the Java consumer and
producer. The consumer would have to keep the last revocation epoch internally
and expose it to the producer via {{{}ConsumerGroupMetadata{}}}, and delay
committing of any offsets of non-revoked partitions until the member epoch is
bumped beyond the revocation epoch. Alternatively, any revocation should block
the application thread inside {{{}Consumer.poll{}}}, until the new epoch has
been received from the broker. However, we don’t include these changes in this
design document, and it is up for discussion whether we’d need to implement it,
since most users won’t need to completely avoid
{{{}STALE_MEMBER_EPOCH{}}}/{{{}ILLEGAL_GENERATION{}}}.
h1. Proposed Changes
h3. Introducing Per-Member Revocation Epoch
We extend the model of a consumer group / streams group member with a single
integer called the revocation epoch for each member of a group. The revocation
epoch is initially set to 0, and we have the invariant {{{}RevocationEpoch <
MemberEpoch <= TargetAssignmentEpoch <= GroupEpoch{}}}.
As before, whenever a member completes revocation of all partitions that are
pending revocation, its {{MemberEpoch}} is bumped to the
{{{}TargetAssignmentEpoch{}}}. At the same time, the {{RevocationEpoch}} is
bumped to the previous value of the {{{}MemberEpoch{}}}. This maintains the
invariant above. If the {{MemberEpoch}} is bumped to the
{{TargetAssignmentEpoch}} without requiring any revocations, the
{{RevocationEpoch}} is not changed.
The {{RevocationEpoch}} is added as a field to the
{{{}ConsumerGroupCurrentMemberAssignmentValue{}}}, so that it can be stored and
replayed from the committed offsets topic.
For streams groups, we will use the same logic but add the field
{{ActiveTaskRevocationEpoch}} to
{{{}StreamsGroupCurrentMemberAssignmentValue{}}}, since only active tasks
commit offsets in Kafka Streams.
h3. Relaxing the offset commit validation
We replace the current check offset commit validation check
{{Client-Side Member Epoch == Broker-Side Member Epoch}}
by
{{RevocationEpoch < Client-Side Member Epoch <= Broker-Side Member Epoch}}
h3. Disabling streams commits between revocation and member epoch bumps
In Kafka Streams, we keep track if we are currently on a revocation epoch. In
{{{}onTasksRevoked{}}}, we store {{ConsumerGroupMetadata.generationId}} as
revocation epoch, and disable committing (via the consumer or the producer) as
long as we are still on that epoch (by comparing against the current value of
{{{}ConsumerGroupMetadata.generationId{}}}. This disablement only affects
offset and transaction commits performed outside of {{{}onTasksRevoked{}}}. We
can still safely commit inside {{{}onTasksRevoked{}}}, since we know that any
broker-side member epoch bumps are blocked as long the member is revoking tasks.
> Relax offset commit validation to allow member epochs since the last
> revocation
> -------------------------------------------------------------------------------
>
> Key: KAFKA-19779
> URL: https://issues.apache.org/jira/browse/KAFKA-19779
> Project: Kafka
> Issue Type: Sub-task
> Components: streams
> Reporter: Lucas Brutschy
> Assignee: Lucas Brutschy
> Priority: Major
>
> h2. Fencing offset commits
> In the Kafka protocol, when a consumer commits offsets or a producer tries to
> add offsets to a transaction, it includes its epoch/generation of the
> consumer group. The point of this is for the group coordinator to fence
> against zombie commit requests, that is, commit requests that include an
> offset for a partition that was since reassigned to a different member. If
> such a guard was not in place, a zombie offset commit may overwrite offsets
> of the new owner, or its offsets may be committed to the consumer offset
> topic but not be included in the result of the new owners offset fetch
> request.
> In KIP-848, when receiving an offset commit request that includes the
> client-side member epoch and a member ID, the group coordinator performs the
> check
> {{Client-Side Member Epoch == Broker-Side Member Epoch}}
> and, if the check fails, returns a {{STALE_MEMBER_EPOCH}} error for regular
> offset commits and a {{ILLEGAL_GENERATION}} for transactional offset commits.
> If the member epoch sent in the request is the current broker-side member
> epoch, KIP-848 guarantees that the partition cannot also be owned by a
> different member at the same or a larger epoch. Therefore, this is sufficient
> for fencing zombie commits. Note that we assume zombie commits will always
> contain offsets for partitions that were owned by the member at the member
> epoch sent in the request. Commit requests that commit offsets for partitions
> that are _not_ owned by the member in that epoch, are not possible in a
> correct client-side implementation of the protocol.
> Note that the broker-side member epoch is not the group epoch or the target
> assignment epoch. For details, see KIP-848. Note also that commits can also
> be fenced because a member falls out of the group (e.g. because it does not
> revoke partitions within the rebalance timeout). At this point, its commits
> will be fenced solely based on the member ID (which is not part of the group
> anymore). We therefore ignore this case in this document, and only consider
> zombie commits from members that are still part of the group.
> h2. Downsides of the current approach
> This fencing is, however, unnecessarily strict. Assume, for example, a member
> owns P1 at epoch 1. The broker-side member epoch is bumped to 2, but the
> member still has P1 assigned at epoch 2. The member may not learn about the
> new broker-side member epoch in time, and submit an offset commit commit for
> P1 with epoch 1. This is not a zombie commit request as define above (because
> P1 was not reassigned to a different member), but it will still be rejected
> by a KIP-848 group coordinator.
> The trouble with this fencing mechanism is that it is very difficult to avoid
> the broker-side member epoch being bumped concurrently with an offset commit.
> Seen from the client-side, the broker-side member epoch may be bumped at any
> time while a heartbeat to the group coordinator is in-flight. To make sure
> the member epoch sent in an offset commit request is up-to-date would require
> making sure that no consumer group or streams group heartbeat request is
> in-flight at the same time.
> h2. Why a broker-side fix is warranted
> This problem is particularly challenging to solve on the client side for
> transactional offset commits, since they are performed by the producer, not
> the consumer, and the producer has no way of knowing when a consumer group
> heartbeat or streams group heartbeat is in-flight. The member epoch is passed
> from the Java consumer to the Java producer using the
> {{ConsumerGroupMetadata}} object, which is passed into
> {{{}sendOffsetsToTransaction{}}}. By the time the transactional offset commit
> is sent, the member epoch may be stale, the broker will return an
> {{ILLEGAL_GENERATION}} exception. This will force the Java producer into an
> abortable error state, surfacing the error as a {{CommitFailedException}} to
> the user, the user has no other way to recover from this to abort the
> transaction.
> This may hurt in any Kafka client application, since aborting transactions
> means throwing away work and restarting from an earlier point. But it is a
> particularly big problem in Kafka Streams with exactly-once semantics, where
> aborting a transaction usually means wiping and restoring the state store, so
> each aborted transaction means some downtime for apps using state stores of
> non-negligible size. Furthermore, since Kafka Streams commits every 100ms by
> default in EOS, this is likely to happen fairly often.
> h1. Conceptual Design
> n this design document, we therefore propose to relax the condition for
> offset commit fencing.
> h2. Identifying zombies using the last epoch a partition was assigned to the
> member
> To derive a more relaxed check, we need to identify an epoch which separates
> zombie commits from commits of the current owner. As mentioned above, zombie
> commit requests are commit requests that include a partition, member ID and
> member epoch combination, so that the member owned the partition at that
> epoch. However, the partition has since been reassigned to a different member.
> Most precisely, on the level of a single partition, a relaxed offset commit
> check can be defined using a *assignment epoch* for each assigned partition
> and each member, which is the epoch at which the partition was assigned to
> that member. To fence from zombie commit requests, we can reject all offset
> commit requests from a member that either does not have the partition
> assigned, or that includes any member epoch that is smaller or equal than the
> assignment epoch for that member and that partition.
> Assignment Epoch <= Client-Side Member Epoch <= Broker-Side Member Epoch
> The correctness of this is obvious: all commits of the current partition
> owner will be accepted, since the Client-Side Member Epoch of the current
> owner must always have an epoch that is larger or equal than the assignment
> epoch (a partition that is revoked in one epoch is never reassigned in the
> same epoch). It will also correctly reject any zombie commits from that
> member, because if a partition was owned by the member A at Client-Side
> Member Epoch (which we assume for zombie commits), but it was reassigned to
> member B since, we have two possible cases:
> # Member A currently does not have the partition assigned
> # Member A does currently have the partition assigned, but then it must have
> been reassigned to member A after being assigned to member B. By KIP-848 this
> cannot all happen in the same epoch, so we must have Assignment Epoch >
> Client-Side Member Epoch.
> h3. Differences to the design above
> This design does not need to disable commits on the client-side. The need to
> disable commits came from the fact that we are tracking epochs “imprecisely”,
> on the member-level and not on the partition-level. So in the RevocationEpoch
> design, when we have just revoked P2 on the client, we may attempt to commit
> a partition P1, triggering the race condition because the broker can
> concurrently bump the revocation epoch for that member because of the
> revocation of P2. We prevent this by disabling commits while a partition is
> revoked, and by “while a partition is revoked” I mean the timeframe from
> executing the revocation on the client, and seeing the following epoch bump
> on the client. In the partition-level AssignmentEpoch design, if we are
> committing P1, we are still convinced that we own P1, so we must also still
> own it on the broker. At the same time, we may remove the assignment epoch
> for P2 on the broker, but it doesn’t matter since this doesn’t impact whether
> we can commit P1, and we are not going to try to commit P2 after having
> revoked it on the client side.
> h1. Proposed Changes
> h3. Introducing Per-Member and Per-Partition Assignment Epoch
> We extend the model of a consumer group / streams group member with one
> integer per assigned partition for each member of a group. This includes both
> partitions directly assigned to the member, and partitions pending
> revocation. The assignment epoch is set to the epoch in which the partition
> was assigned to the member, and we have the invariant Assignment Epoch <=
> MemberEpoch <= TargetAssignmentEpoch <= GroupEpoch.
> The AssignmentEpoch is added as a field to TopicPartitions in
> ConsumerGroupCurrentMemberAssignmentValue, so that it can be stored and
> replayed from the committed offsets topic.
> For streams groups, we will use the same logic but add assignment epochs only
> for active tasks in StreamsGroupCurrentMemberAssignmentValue, since only
> active tasks commit offsets in Kafka Streams.
> h3. Relaxing the offset commit validation
> We replace the current check offset commit validation check
> Client-Side Member Epoch == Broker-Side Member Epoch
> by
> Assignment Epoch <= Client-Side Member Epoch <= Broker-Side Member Epoch
> where, for simplicity, we can assume the assignment epoch of a partition that
> is not assigned to that member to be Integer.maxValue.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)