frankvicky opened a new pull request, #22554:
URL: https://github.com/apache/kafka/pull/22554
JIRA: KAFKA-20623
This is a part of KIP-1331
This PR shouldn't be merge before #22552
Final of three stacked sub-tasks. The `deleteGroups` flow now calls
`plugin.deleteTopology` for every streams group with a stored topology
description before tombstoning. Plugin failure surfaces as
`GROUP_DELETION_FAILED`
(downgraded to `UNKNOWN_SERVER_ERROR` for DeleteGroups v2) and the group is
held
back from tombstoning; mixed batches are honoured per-group.
### Pre-tombstone deletion hook on the manager
`TopologyDescriptionManager.deleteBeforeGroupDelete(topicPartition,
groupIds)` calls
`plugin.deleteTopology` in parallel for every group id in the batch that is
a streams
group with a non-default `StoredDescriptionTopologyEpoch`. The eligibility
filter is
resolved through `scheduleReadOperation` so it sees the same shard snapshot
used by
the subsequent tombstone write.
Returns a per-group failure map keyed by group id. Groups absent from the
map either
had no plugin state to clean up or the plugin call succeeded; groups present
in the
map carry an `ApiError` describing the failure. A read failure
(NOT_COORDINATOR etc.)
on the eligibility query is treated the same as a plugin failure: every
retained
group in the batch is held back from tombstoning so the caller can retry.
The manager re-gains the `LogContext` constructor parameter so it can emit a
broker-side `WARN` for each plugin failure alongside the per-group response
error.
### Service-layer chain extension
`GroupCoordinatorService.deleteGroups` now splices the deletion hook between
the
share-group pre-step and the underlying `delete-groups` write:
1. `deleteShareGroups` filters share-group failures (unchanged).
2. `topologyDescriptionManager.deleteBeforeGroupDelete` runs for the
retained ids.
3. `filterStreamsTopologyErrors` moves per-group plugin failures into the
response
collection and returns the subset that should still proceed to
tombstoning.
4. `handleDeleteGroups` writes tombstones for the retained subset.
If every retained group fails the plugin call, the underlying
`delete-groups` write
is skipped entirely. Mixed batches are honoured per-group — the successful
ids reach
the tombstone write, the failed ids carry `GROUP_DELETION_FAILED` in the
response,
and an idempotent retry of `DeleteGroups` converges once the plugin recovers.
### v2 downgrade
Per KIP-1331, `GROUP_DELETION_FAILED` and the per-group `ErrorMessage` field
were
introduced at `DeleteGroups` v3. For older clients the broker downgrades the
error
code to `UNKNOWN_SERVER_ERROR` with no message — matching the convention
used by
KIP-1043 for new error codes that pre-existing request versions cannot
interpret.
Errors that predate this KIP (e.g. `NOT_COORDINATOR` surfaced from the
runtime)
pass through unchanged on all versions.
### Coordinator shard + GMM
`GroupMetadataManager.streamsGroupsWithStoredTopologyDescription(groupIds,
committedOffset)`
returns the subset of the input that is (a) a streams group on this shard and
(b) carries a non-default `StoredDescriptionTopologyEpoch`. Non-existent
groups and
non-streams groups are silently skipped — they have no plugin state to clean
up.
`GroupCoordinatorShard` exposes the method to the runtime read-operation
scheduler.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]