frankvicky opened a new pull request, #22545:
URL: https://github.com/apache/kafka/pull/22545
JIRA: KAFKA-20623
This PR is a part of KIP-1331.
### Builder plumbing
- `GroupCoordinatorService.Builder` accepts an
`Optional<StreamsGroupTopologyDescriptionPlugin>` and constructs the
per-group back-off
state at build time so the shard-builder supplier closure can wire a
removal listener
back into it.
- `BrokerServer` reads
`groupCoordinatorConfig.streamsGroupTopologyDescriptionPlugin(...)` and
passes the
resulting `Optional` through. When the config is unset the feature is
fully disabled.
### Heartbeat post-processing
- `StreamsGroupHeartbeatResult` now carries `storedDescriptionTopologyEpoch`
and
`failedDescriptionTopologyEpoch` alongside `currentTopologyEpoch`, so the
service layer
can decide whether to set `TopologyDescriptionRequired=true` without
re-reading the
group.
- `maybeSetTopologyDescriptionRequired` applies the KIP-1331 gating: plugin
present,
response has no error code, c
initial delay; a successful push or a permanent plugin failure clears the
entry.
- The back-off is per broker (service-level), not per partition. To avoid
leaking entries
for groups that vanish, `GroupMetadataManager` now exposes a
`Consumer<String>`
streams-group removal listener (wired through
`GroupCoordinatorShard.Builder`). The
listener fires from `removeGroup` whenever the removed group is a streams
group, and
from the `STREAMS` branch of `onUnloaded`; the service registers it as
`backoff::clear`.
### `StreamsGroupTopologyDescriptionUpdate` handler
- Synchronous pre-checks reject the request fast when the coordinator is
inactive
(`COORDINATOR_NOT_AVAILABLE`), no plugin is configured
(`UNSUPPORTED_VERSION`), or
the request is structurally malformed (`INVALID_REQUEST` for empty
`MemberId`,
empty `GroupId`, or null `TopologyDescription`).
- The chain runs `validateStreamsGroupMember` *first* (so a fenced caller
gets
`UNKNOWN_MEMBER_ID` rather than an `INVALID_REQUEST` from a payload
conversion
failure), then converts the wire payload to the broker-side POJO, then
calls the
plugin.
- Plugin success writes a metadata record advancing
`StoredDescriptionTopologyEpoch`;
a `StreamsTopologyDescriptionPermanentFailureException` (or a synchronous
throw)
writes `FailedDescriptionTopologyEpoch`; any other exception is treated as
transient
and writes no record.
- The client-visible error is always
`STREAMS_TOPOLOGY_DESCRIPTION_UPDATE_FAILED` with
the plugin's message; the permanent-vs-transient split is broker-internal.
- All back-off state mutations happen in a single `whenComplete`, driven by a
`BackoffAction` holder populated by each terminal branch. A post-plugin
write failure
leaves the action at `ARM`, so the next heartbeat sees the drift and
re-solicits an
idempotent re-push, matching the KIP-1331 invariant.
### `DeleteGroups` integration
- Before tombstoning, the service identifies retained group ids that are
streams groups
with a non-default `StoredDescriptionTopologyEpoch`
(`streamsGroupsWithStoredTopologyDescription`)
and calls `plugin.deleteTopology` for each in parallel.
- Per-group plugin failure surfaces as `GROUP_DELETION_FAILED` with the
cause string in
`ErrorMessage`; the group is held back from tombstoning so the caller can
retry.
Mixed batches are honoured: groups that succeed proceed to tombstone,
groups that fail
return the per-group error.
- For DeleteGroups requests at version < 3 (pre-KIP-1331), the error is
downgraded to
`UNKNOWN_SERVER_ERROR` with no message, matching the convention used by
KIP-1043 for
new error codes that older request versions cannot interpret.
### Wire ↔ POJO converter
- `StreamsGroupTopologyDescriptionConverter` translates
`StreamsGroupTopologyDescriptionUpdateRequestData.TopologyDescription`
into the
broker-side `StreamsGroupTopologyDescription` POJO. Subtopology node
ordering and
string-collection iteration order are preserved via `LinkedHashSet`, so a
downstream
pretty-printer (e.g. the future `kafka-streams-groups.sh --topology`
command) can
reproduce the source ordering.
- Wire-level node types (`SOURCE=1`, `PROCESSOR=2`, `SINK=3`) map to the
sealed `Node`
hierarchy. `GlobalStore` shape is validated; a malformed payload throws
`InvalidRequestException`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]