[ 
https://issues.apache.org/jira/browse/KAFKA-19779?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-19779:
------------------------------------
    Component/s: streams

> Relax offset commit validation to allow member epochs since the last 
> revocation
> -------------------------------------------------------------------------------
>
>                 Key: KAFKA-19779
>                 URL: https://issues.apache.org/jira/browse/KAFKA-19779
>             Project: Kafka
>          Issue Type: Sub-task
>          Components: streams
>            Reporter: Lucas Brutschy
>            Assignee: Lucas Brutschy
>            Priority: Major
>
> h2. Fencing offset commits
> In the Kafka protocol, when a consumer commits offsets or a producer tries to 
> add offsets to a transaction, it includes its epoch/generation of the 
> consumer group. The point of this is for the group coordinator to fence 
> against zombie commit requests, that is, commit requests that include an 
> offset for a partition that was since reassigned to a different member. If 
> such a guard was not in place, a zombie offset commit may overwrite offsets 
> of the new owner, or its offsets may be committed to the consumer offset 
> topic but not be included in the result of the new owners offset fetch 
> request.
> In KIP-848, when receiving an offset commit request that includes the 
> client-side member epoch and a member ID, the group coordinator performs the 
> check
> {{Client-Side Member Epoch == Broker-Side Member Epoch}}
> and, if the check fails, returns a {{STALE_MEMBER_EPOCH}} error for regular 
> offset commits and a {{ILLEGAL_GENERATION}} for transactional offset commits. 
> If the member epoch sent in the request is the current broker-side member 
> epoch, KIP-848 guarantees that the partition cannot also be owned by a 
> different member at the same or a larger epoch. Therefore, this is sufficient 
> for fencing zombie commits. Note that we assume zombie commits will always 
> contain offsets for partitions that were owned by the member at the member 
> epoch sent in the request. Commit requests that commit offsets for partitions 
> that are _not_ owned by the member in that epoch, are not possible in a 
> correct client-side implementation of the protocol.
> Note that the broker-side member epoch is not the group epoch or the target 
> assignment epoch. For details, see KIP-848. Note also that commits can also 
> be fenced because a member falls out of the group (e.g. because it does not 
> revoke partitions within the rebalance timeout). At this point, its commits 
> will be fenced solely based on the member ID (which is not part of the group 
> anymore). We therefore ignore this case in this document, and only consider 
> zombie commits from members that are still part of the group.
> h2. Downsides of the current approach
> This fencing is, however, unnecessarily strict. Assume, for example, a member 
> owns P1 at epoch 1. The broker-side member epoch is bumped to 2, but the 
> member still has P1 assigned at epoch 2. The member may not learn about the 
> new broker-side member epoch in time, and submit an offset commit commit for 
> P1 with epoch 1. This is not a zombie commit request as define above (because 
> P1 was not reassigned to a different member), but it will still be rejected 
> by a KIP-848 group coordinator.
> The trouble with this fencing mechanism is that it is very difficult to avoid 
> the broker-side member epoch being bumped concurrently with an offset commit. 
> Seen from the client-side, the broker-side member epoch may be bumped at any 
> time while a heartbeat to the group coordinator is in-flight. To make sure 
> the member epoch sent in an offset commit request is up-to-date would require 
> making sure that no consumer group or streams group heartbeat request is 
> in-flight at the same time.
> h2. Why a broker-side fix is warranted
> This problem is particularly challenging to solve on the client side for 
> transactional offset commits, since they are performed by the producer, not 
> the consumer, and the producer has no way of knowing when a consumer group 
> heartbeat or streams group heartbeat is in-flight. The member epoch is passed 
> from the Java consumer to the Java producer using the 
> {{ConsumerGroupMetadata}} object, which is passed into 
> {{{}sendOffsetsToTransaction{}}}. By the time the transactional offset commit 
> is sent, the member epoch may be stale, the broker will return an 
> {{ILLEGAL_GENERATION}} exception. This will force the Java producer into an 
> abortable error state, surfacing the error as a {{CommitFailedException}} to 
> the user, the user has no other way to recover from this to abort the 
> transaction.
> This may hurt in any Kafka client application, since aborting transactions 
> means throwing away work and restarting from an earlier point. But it is a 
> particularly big problem in Kafka Streams with exactly-once semantics, where 
> aborting a transaction usually means wiping and restoring the state store, so 
> each aborted transaction means some downtime for apps using state stores of 
> non-negligible size. Furthermore, since Kafka Streams commits every 100ms by 
> default in EOS, this is likely to happen fairly often.
> h1. Conceptual Design
> h2. Identifying zombies using the largest epoch a partition was revoked
> To derive a more relaxed check, we need to identify an epoch which separates 
> zombie commits from commits of the current owner. As mentioned above, zombie 
> commit requests are commit requests that include a partition, member ID and 
> member epoch combination, so that the member owned the partition at that 
> epoch. However, the partition has since been reassigned to a different member.
> Most precisely, on the level of a single partition, a relaxed offset commit 
> check can be defined using a *revocation epoch* for each partition and each 
> member, which is the largest epoch at which the partition was revoked from 
> that member. To fence from zombie commit requests, we can reject all offset 
> commit requests from a member at any member epoch that is smaller or equal 
> than the revocation epoch for that member and that partition.
> {{Partition-Level RevocationEpoch < Client-Side Member Epoch <= Broker-Side 
> Member Epoch}}
> The correctness of this is obvious: all commits of the current partition 
> owner will be accepted, since the {{Client-Side Member Epoch}} of the current 
> owner must always have an epoch that is larger than the revocation epoch (a 
> partition that is revoked in one epoch is never reassigned in the same 
> epoch). It will also correctly reject any zombie commits from that member, 
> because KIP-848 guarantees that if a partition was owned by the member at 
> {{Client-Side Member Epoch}} (which we assume for zombie commits), but it was 
> reassigned since, then this partition must have been revoked from the member 
> in an epoch greater or equal to {{{}Client-Side Member Epoch{}}}, so we must 
> have {{Partition-Level RevocationEpoch >= Client-Side Member Epoch}} for 
> zombie commits.
> h2. From partition-and-member-level to member-level
> However, tracking such a revocation epoch for every partition and every 
> member would be tedious. Instead, we can use an upper bound for 
> partition-level revocation epochs for every member: We know that there must 
> be an epoch that is larger or equal to all partition-level revocation epochs. 
> It is the largest epoch at which _any_ partition was revoked from the member, 
> and which is smaller than the current {{{}Broker-Side Member Epoch{}}}. We 
> call it the {*}member-level revocation epoch{*}, or {{{}RevocationEpoch{}}}.
> The offset commit request validation can then be defined as
> {{RevocationEpoch < Client-Side Member Epoch <= Broker-Side Member Epoch}}
> Since {{RevocationEpoch}} is an upper bound on {{{}Partition-Level 
> RevocationEpoch{}}}, this check correctly fences all zombie commits by an 
> extension of the argument above. However, this formulation is less precise 
> than the partition-level revocation epoch, and there may still be cases where 
> commits of the current owner are fenced as well. This can, in particular, 
> happen when a partition P1 is revoked from the client, causing a broker-side 
> bump of {{RevocationEpoch}} and {{{}Broker-Side Member Epoch{}}}, but the 
> client attempts to commit offsets for a different partition P2 before 
> learning about the updated {{{}Broker-Side Member Epoch{}}}.
> h2. Completely avoiding commit request errors through the use of rebalance 
> listeners
> As explained above, the member-level revocation epoch check makes 
> {{{}STALE_MEMBER_EPOCH{}}}/{{{}ILLEGAL_GENERATION{}}} much less likely, but 
> it can still happen that a partition is revoked from the member, and while 
> the {{Broker-Side Member Epoch}} and {{RevocationEpoch}} are bumped on the 
> broker, the member concurrently attempts to commit offsets.
> However, compared to any regular {{Broker-Side Member Epoch}} bump, a bump of 
> the {{RevocationEpoch}} always involves the client, which in fact needs to 
> revoke the partitions before the epoch bump can happen. This means that the 
> client is “in-the-loop”, and we can prevent failing commits by never 
> committing offsets between revoking partitions on the client, and the 
> corresponding broker-side member epoch bump.
> A power client like Kafka Streams will therefore be able to completely avoid 
> {{{}STALE_MEMBER_EPOCH{}}}/{{{}ILLEGAL_GENERATION{}}} errors in the normal 
> case (that is, in the absence of the commit requests actually being delayed 
> to become zombie requests), in conjunction with the rebalance listener. We 
> can completely avoid failing offset commits (both transactional and 
> non-transactional):
>  # In {{{}onPartitionsRevoked{}}}/ {{{}onTasksRevoked{}}}, commit current 
> offsets / transaction and store the current 
> {{ConsumerGroupMetadata.generationId}} to remember that it is a revocation 
> epoch.
>  # Before calling {{Consumer.commit}} or 
> {{{}Producer.sendOffsetToTransaction{}}}, we compare the current value of 
> {{ConsumerGroupMetadata.generationId}} to the stored revocation epoch. If we 
> are still on the revocation epoch, we skip the commit.
> We know that any partition revocation will require 
> {{{}onPartitionsRevoked{}}}/ {{onTasksRevoked}} to be invoked on the 
> client-side, and after revocations have been completed and the broker-side 
> member epoch has been communicated to the client, 
> {{ConsumerGroupMetadata.generationId}} will be updated.
> This mechanism can be used by any user of the Java clients, not just Kafka 
> Streams. The mechanism can also be implemented within the Java consumer and 
> producer. The consumer would have to keep the last revocation epoch 
> internally and expose it to the producer via {{{}ConsumerGroupMetadata{}}}, 
> and delay committing of any offsets of non-revoked partitions until the 
> member epoch is bumped beyond the revocation epoch. Alternatively, any 
> revocation should block the application thread inside {{{}Consumer.poll{}}}, 
> until the new epoch has been received from the broker. However, we don’t 
> include these changes in this design document, and it is up for discussion 
> whether we’d need to implement it, since most users won’t need to completely 
> avoid {{{}STALE_MEMBER_EPOCH{}}}/{{{}ILLEGAL_GENERATION{}}}.
> h1. Proposed Changes
> h3. Introducing Per-Member Revocation Epoch
> We extend the model of a consumer group / streams group member with a single 
> integer called the revocation epoch for each member of a group. The 
> revocation epoch is initially set to 0, and we have the invariant 
> {{{}RevocationEpoch < MemberEpoch <= TargetAssignmentEpoch <= GroupEpoch{}}}.
> As before, whenever a member completes revocation of all partitions that are 
> pending revocation, its {{MemberEpoch}} is bumped to the 
> {{{}TargetAssignmentEpoch{}}}. At the same time, the {{RevocationEpoch}} is 
> bumped to the previous value of the {{{}MemberEpoch{}}}. This maintains the 
> invariant above. If the {{MemberEpoch}} is bumped to the 
> {{TargetAssignmentEpoch}} without requiring any revocations, the 
> {{RevocationEpoch}} is not changed.
> The {{RevocationEpoch}} is added as a field to the 
> {{{}ConsumerGroupCurrentMemberAssignmentValue{}}}, so that it can be stored 
> and replayed from the committed offsets topic.
> For streams groups, we will use the same logic but add the field 
> {{ActiveTaskRevocationEpoch}} to 
> {{{}StreamsGroupCurrentMemberAssignmentValue{}}}, since only active tasks 
> commit offsets in Kafka Streams.
> h3. Relaxing the offset commit validation
> We replace the current check offset commit validation check
> {{Client-Side Member Epoch == Broker-Side Member Epoch}}
> by
> {{RevocationEpoch < Client-Side Member Epoch <= Broker-Side Member Epoch}}
> h3. Disabling streams commits between revocation and member epoch bumps
> In Kafka Streams, we keep track if we are currently on a revocation epoch. In 
> {{{}onTasksRevoked{}}}, we store {{ConsumerGroupMetadata.generationId}} as 
> revocation epoch, and disable committing (via the consumer or the producer) 
> as long as we are still on that epoch (by comparing against the current value 
> of {{{}ConsumerGroupMetadata.generationId{}}}. This disablement only affects 
> offset and transaction commits performed outside of {{{}onTasksRevoked{}}}. 
> We can still safely commit inside {{{}onTasksRevoked{}}}, since we know that 
> any broker-side member epoch bumps are blocked as long the member is revoking 
> tasks.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to