frankvicky opened a new pull request, #22545:
URL: https://github.com/apache/kafka/pull/22545

   JIRA: KAFKA-20623
   This PR is a part of KIP-1331.
   
   ### Builder plumbing
   - `GroupCoordinatorService.Builder` accepts an
     `Optional<StreamsGroupTopologyDescriptionPlugin>` and constructs the 
per-group back-off
     state at build time so the shard-builder supplier closure can wire a 
removal listener
     back into it.
   - `BrokerServer` reads
     `groupCoordinatorConfig.streamsGroupTopologyDescriptionPlugin(...)` and 
passes the
     resulting `Optional` through. When the config is unset the feature is 
fully disabled.
   
   ### Heartbeat post-processing
   - `StreamsGroupHeartbeatResult` now carries `storedDescriptionTopologyEpoch` 
and
     `failedDescriptionTopologyEpoch` alongside `currentTopologyEpoch`, so the 
service layer
     can decide whether to set `TopologyDescriptionRequired=true` without 
re-reading the
     group.
   - `maybeSetTopologyDescriptionRequired` applies the KIP-1331 gating: plugin 
present,
     response has no error code, c
     initial delay; a successful push or a permanent plugin failure clears the 
entry.
   - The back-off is per broker (service-level), not per partition. To avoid 
leaking entries
     for groups that vanish, `GroupMetadataManager` now exposes a 
`Consumer<String>`
     streams-group removal listener (wired through 
`GroupCoordinatorShard.Builder`). The
     listener fires from `removeGroup` whenever the removed group is a streams 
group, and
     from the `STREAMS` branch of `onUnloaded`; the service registers it as
     `backoff::clear`.
   
   ### `StreamsGroupTopologyDescriptionUpdate` handler
   - Synchronous pre-checks reject the request fast when the coordinator is 
inactive
     (`COORDINATOR_NOT_AVAILABLE`), no plugin is configured 
(`UNSUPPORTED_VERSION`), or
     the request is structurally malformed (`INVALID_REQUEST` for empty 
`MemberId`,
     empty `GroupId`, or null `TopologyDescription`).
   - The chain runs `validateStreamsGroupMember` *first* (so a fenced caller 
gets
     `UNKNOWN_MEMBER_ID` rather than an `INVALID_REQUEST` from a payload 
conversion
     failure), then converts the wire payload to the broker-side POJO, then 
calls the
     plugin.
   - Plugin success writes a metadata record advancing 
`StoredDescriptionTopologyEpoch`;
     a `StreamsTopologyDescriptionPermanentFailureException` (or a synchronous 
throw)
     writes `FailedDescriptionTopologyEpoch`; any other exception is treated as 
transient
     and writes no record.
   - The client-visible error is always 
`STREAMS_TOPOLOGY_DESCRIPTION_UPDATE_FAILED` with
     the plugin's message; the permanent-vs-transient split is broker-internal.
   - All back-off state mutations happen in a single `whenComplete`, driven by a
     `BackoffAction` holder populated by each terminal branch. A post-plugin 
write failure
     leaves the action at `ARM`, so the next heartbeat sees the drift and 
re-solicits an
     idempotent re-push, matching the KIP-1331 invariant.
   
   ### `DeleteGroups` integration
   - Before tombstoning, the service identifies retained group ids that are 
streams groups
     with a non-default `StoredDescriptionTopologyEpoch` 
(`streamsGroupsWithStoredTopologyDescription`)
     and calls `plugin.deleteTopology` for each in parallel.
   - Per-group plugin failure surfaces as `GROUP_DELETION_FAILED` with the 
cause string in
     `ErrorMessage`; the group is held back from tombstoning so the caller can 
retry.
     Mixed batches are honoured: groups that succeed proceed to tombstone, 
groups that fail
     return the per-group error.
   - For DeleteGroups requests at version < 3 (pre-KIP-1331), the error is 
downgraded to
     `UNKNOWN_SERVER_ERROR` with no message, matching the convention used by 
KIP-1043 for
     new error codes that older request versions cannot interpret.
   
   ### Wire ↔ POJO converter
   - `StreamsGroupTopologyDescriptionConverter` translates
     `StreamsGroupTopologyDescriptionUpdateRequestData.TopologyDescription` 
into the
     broker-side `StreamsGroupTopologyDescription` POJO. Subtopology node 
ordering and
     string-collection iteration order are preserved via `LinkedHashSet`, so a 
downstream
     pretty-printer (e.g. the future `kafka-streams-groups.sh --topology` 
command) can
     reproduce the source ordering.
   - Wire-level node types (`SOURCE=1`, `PROCESSOR=2`, `SINK=3`) map to the 
sealed `Node`
     hierarchy. `GlobalStore` shape is validated; a malformed payload throws
     `InvalidRequestException`.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to