[
https://issues.apache.org/jira/browse/KAFKA-19779?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Matthias J. Sax updated KAFKA-19779:
------------------------------------
Component/s: streams
> Relax offset commit validation to allow member epochs since the last
> revocation
> -------------------------------------------------------------------------------
>
> Key: KAFKA-19779
> URL: https://issues.apache.org/jira/browse/KAFKA-19779
> Project: Kafka
> Issue Type: Sub-task
> Components: streams
> Reporter: Lucas Brutschy
> Assignee: Lucas Brutschy
> Priority: Major
>
> h2. Fencing offset commits
> In the Kafka protocol, when a consumer commits offsets or a producer tries to
> add offsets to a transaction, it includes its epoch/generation of the
> consumer group. The point of this is for the group coordinator to fence
> against zombie commit requests, that is, commit requests that include an
> offset for a partition that was since reassigned to a different member. If
> such a guard was not in place, a zombie offset commit may overwrite offsets
> of the new owner, or its offsets may be committed to the consumer offset
> topic but not be included in the result of the new owners offset fetch
> request.
> In KIP-848, when receiving an offset commit request that includes the
> client-side member epoch and a member ID, the group coordinator performs the
> check
> {{Client-Side Member Epoch == Broker-Side Member Epoch}}
> and, if the check fails, returns a {{STALE_MEMBER_EPOCH}} error for regular
> offset commits and a {{ILLEGAL_GENERATION}} for transactional offset commits.
> If the member epoch sent in the request is the current broker-side member
> epoch, KIP-848 guarantees that the partition cannot also be owned by a
> different member at the same or a larger epoch. Therefore, this is sufficient
> for fencing zombie commits. Note that we assume zombie commits will always
> contain offsets for partitions that were owned by the member at the member
> epoch sent in the request. Commit requests that commit offsets for partitions
> that are _not_ owned by the member in that epoch, are not possible in a
> correct client-side implementation of the protocol.
> Note that the broker-side member epoch is not the group epoch or the target
> assignment epoch. For details, see KIP-848. Note also that commits can also
> be fenced because a member falls out of the group (e.g. because it does not
> revoke partitions within the rebalance timeout). At this point, its commits
> will be fenced solely based on the member ID (which is not part of the group
> anymore). We therefore ignore this case in this document, and only consider
> zombie commits from members that are still part of the group.
> h2. Downsides of the current approach
> This fencing is, however, unnecessarily strict. Assume, for example, a member
> owns P1 at epoch 1. The broker-side member epoch is bumped to 2, but the
> member still has P1 assigned at epoch 2. The member may not learn about the
> new broker-side member epoch in time, and submit an offset commit commit for
> P1 with epoch 1. This is not a zombie commit request as define above (because
> P1 was not reassigned to a different member), but it will still be rejected
> by a KIP-848 group coordinator.
> The trouble with this fencing mechanism is that it is very difficult to avoid
> the broker-side member epoch being bumped concurrently with an offset commit.
> Seen from the client-side, the broker-side member epoch may be bumped at any
> time while a heartbeat to the group coordinator is in-flight. To make sure
> the member epoch sent in an offset commit request is up-to-date would require
> making sure that no consumer group or streams group heartbeat request is
> in-flight at the same time.
> h2. Why a broker-side fix is warranted
> This problem is particularly challenging to solve on the client side for
> transactional offset commits, since they are performed by the producer, not
> the consumer, and the producer has no way of knowing when a consumer group
> heartbeat or streams group heartbeat is in-flight. The member epoch is passed
> from the Java consumer to the Java producer using the
> {{ConsumerGroupMetadata}} object, which is passed into
> {{{}sendOffsetsToTransaction{}}}. By the time the transactional offset commit
> is sent, the member epoch may be stale, the broker will return an
> {{ILLEGAL_GENERATION}} exception. This will force the Java producer into an
> abortable error state, surfacing the error as a {{CommitFailedException}} to
> the user, the user has no other way to recover from this to abort the
> transaction.
> This may hurt in any Kafka client application, since aborting transactions
> means throwing away work and restarting from an earlier point. But it is a
> particularly big problem in Kafka Streams with exactly-once semantics, where
> aborting a transaction usually means wiping and restoring the state store, so
> each aborted transaction means some downtime for apps using state stores of
> non-negligible size. Furthermore, since Kafka Streams commits every 100ms by
> default in EOS, this is likely to happen fairly often.
> h1. Conceptual Design
> h2. Identifying zombies using the largest epoch a partition was revoked
> To derive a more relaxed check, we need to identify an epoch which separates
> zombie commits from commits of the current owner. As mentioned above, zombie
> commit requests are commit requests that include a partition, member ID and
> member epoch combination, so that the member owned the partition at that
> epoch. However, the partition has since been reassigned to a different member.
> Most precisely, on the level of a single partition, a relaxed offset commit
> check can be defined using a *revocation epoch* for each partition and each
> member, which is the largest epoch at which the partition was revoked from
> that member. To fence from zombie commit requests, we can reject all offset
> commit requests from a member at any member epoch that is smaller or equal
> than the revocation epoch for that member and that partition.
> {{Partition-Level RevocationEpoch < Client-Side Member Epoch <= Broker-Side
> Member Epoch}}
> The correctness of this is obvious: all commits of the current partition
> owner will be accepted, since the {{Client-Side Member Epoch}} of the current
> owner must always have an epoch that is larger than the revocation epoch (a
> partition that is revoked in one epoch is never reassigned in the same
> epoch). It will also correctly reject any zombie commits from that member,
> because KIP-848 guarantees that if a partition was owned by the member at
> {{Client-Side Member Epoch}} (which we assume for zombie commits), but it was
> reassigned since, then this partition must have been revoked from the member
> in an epoch greater or equal to {{{}Client-Side Member Epoch{}}}, so we must
> have {{Partition-Level RevocationEpoch >= Client-Side Member Epoch}} for
> zombie commits.
> h2. From partition-and-member-level to member-level
> However, tracking such a revocation epoch for every partition and every
> member would be tedious. Instead, we can use an upper bound for
> partition-level revocation epochs for every member: We know that there must
> be an epoch that is larger or equal to all partition-level revocation epochs.
> It is the largest epoch at which _any_ partition was revoked from the member,
> and which is smaller than the current {{{}Broker-Side Member Epoch{}}}. We
> call it the {*}member-level revocation epoch{*}, or {{{}RevocationEpoch{}}}.
> The offset commit request validation can then be defined as
> {{RevocationEpoch < Client-Side Member Epoch <= Broker-Side Member Epoch}}
> Since {{RevocationEpoch}} is an upper bound on {{{}Partition-Level
> RevocationEpoch{}}}, this check correctly fences all zombie commits by an
> extension of the argument above. However, this formulation is less precise
> than the partition-level revocation epoch, and there may still be cases where
> commits of the current owner are fenced as well. This can, in particular,
> happen when a partition P1 is revoked from the client, causing a broker-side
> bump of {{RevocationEpoch}} and {{{}Broker-Side Member Epoch{}}}, but the
> client attempts to commit offsets for a different partition P2 before
> learning about the updated {{{}Broker-Side Member Epoch{}}}.
> h2. Completely avoiding commit request errors through the use of rebalance
> listeners
> As explained above, the member-level revocation epoch check makes
> {{{}STALE_MEMBER_EPOCH{}}}/{{{}ILLEGAL_GENERATION{}}} much less likely, but
> it can still happen that a partition is revoked from the member, and while
> the {{Broker-Side Member Epoch}} and {{RevocationEpoch}} are bumped on the
> broker, the member concurrently attempts to commit offsets.
> However, compared to any regular {{Broker-Side Member Epoch}} bump, a bump of
> the {{RevocationEpoch}} always involves the client, which in fact needs to
> revoke the partitions before the epoch bump can happen. This means that the
> client is “in-the-loop”, and we can prevent failing commits by never
> committing offsets between revoking partitions on the client, and the
> corresponding broker-side member epoch bump.
> A power client like Kafka Streams will therefore be able to completely avoid
> {{{}STALE_MEMBER_EPOCH{}}}/{{{}ILLEGAL_GENERATION{}}} errors in the normal
> case (that is, in the absence of the commit requests actually being delayed
> to become zombie requests), in conjunction with the rebalance listener. We
> can completely avoid failing offset commits (both transactional and
> non-transactional):
> # In {{{}onPartitionsRevoked{}}}/ {{{}onTasksRevoked{}}}, commit current
> offsets / transaction and store the current
> {{ConsumerGroupMetadata.generationId}} to remember that it is a revocation
> epoch.
> # Before calling {{Consumer.commit}} or
> {{{}Producer.sendOffsetToTransaction{}}}, we compare the current value of
> {{ConsumerGroupMetadata.generationId}} to the stored revocation epoch. If we
> are still on the revocation epoch, we skip the commit.
> We know that any partition revocation will require
> {{{}onPartitionsRevoked{}}}/ {{onTasksRevoked}} to be invoked on the
> client-side, and after revocations have been completed and the broker-side
> member epoch has been communicated to the client,
> {{ConsumerGroupMetadata.generationId}} will be updated.
> This mechanism can be used by any user of the Java clients, not just Kafka
> Streams. The mechanism can also be implemented within the Java consumer and
> producer. The consumer would have to keep the last revocation epoch
> internally and expose it to the producer via {{{}ConsumerGroupMetadata{}}},
> and delay committing of any offsets of non-revoked partitions until the
> member epoch is bumped beyond the revocation epoch. Alternatively, any
> revocation should block the application thread inside {{{}Consumer.poll{}}},
> until the new epoch has been received from the broker. However, we don’t
> include these changes in this design document, and it is up for discussion
> whether we’d need to implement it, since most users won’t need to completely
> avoid {{{}STALE_MEMBER_EPOCH{}}}/{{{}ILLEGAL_GENERATION{}}}.
> h1. Proposed Changes
> h3. Introducing Per-Member Revocation Epoch
> We extend the model of a consumer group / streams group member with a single
> integer called the revocation epoch for each member of a group. The
> revocation epoch is initially set to 0, and we have the invariant
> {{{}RevocationEpoch < MemberEpoch <= TargetAssignmentEpoch <= GroupEpoch{}}}.
> As before, whenever a member completes revocation of all partitions that are
> pending revocation, its {{MemberEpoch}} is bumped to the
> {{{}TargetAssignmentEpoch{}}}. At the same time, the {{RevocationEpoch}} is
> bumped to the previous value of the {{{}MemberEpoch{}}}. This maintains the
> invariant above. If the {{MemberEpoch}} is bumped to the
> {{TargetAssignmentEpoch}} without requiring any revocations, the
> {{RevocationEpoch}} is not changed.
> The {{RevocationEpoch}} is added as a field to the
> {{{}ConsumerGroupCurrentMemberAssignmentValue{}}}, so that it can be stored
> and replayed from the committed offsets topic.
> For streams groups, we will use the same logic but add the field
> {{ActiveTaskRevocationEpoch}} to
> {{{}StreamsGroupCurrentMemberAssignmentValue{}}}, since only active tasks
> commit offsets in Kafka Streams.
> h3. Relaxing the offset commit validation
> We replace the current check offset commit validation check
> {{Client-Side Member Epoch == Broker-Side Member Epoch}}
> by
> {{RevocationEpoch < Client-Side Member Epoch <= Broker-Side Member Epoch}}
> h3. Disabling streams commits between revocation and member epoch bumps
> In Kafka Streams, we keep track if we are currently on a revocation epoch. In
> {{{}onTasksRevoked{}}}, we store {{ConsumerGroupMetadata.generationId}} as
> revocation epoch, and disable committing (via the consumer or the producer)
> as long as we are still on that epoch (by comparing against the current value
> of {{{}ConsumerGroupMetadata.generationId{}}}. This disablement only affects
> offset and transaction commits performed outside of {{{}onTasksRevoked{}}}.
> We can still safely commit inside {{{}onTasksRevoked{}}}, since we know that
> any broker-side member epoch bumps are blocked as long the member is revoking
> tasks.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)