dajac commented on code in PR #16874:
URL: https://github.com/apache/kafka/pull/16874#discussion_r1716537013
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -3984,6 +3984,8 @@ CoordinatorResult<Void, CoordinatorRecord>
classicGroupJoinToClassicGroup(
Review Comment:
We have another case in this file where we delete a classic group. Take a
look at `maybeDeleteEmptyClassicGroup` method.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java:
##########
@@ -621,18 +623,32 @@ public CoordinatorResult<OffsetDeleteResponseData,
CoordinatorRecord> deleteOffs
public CoordinatorResult<Void, CoordinatorRecord> cleanupGroupMetadata() {
long startMs = time.milliseconds();
List<CoordinatorRecord> records = new ArrayList<>();
+ AtomicInteger deletedClassicGroupCount = new AtomicInteger(0);
groupMetadataManager.groupIds().forEach(groupId -> {
boolean allOffsetsExpired =
offsetMetadataManager.cleanupExpiredOffsets(groupId, records);
if (allOffsetsExpired) {
- groupMetadataManager.maybeDeleteGroup(groupId, records);
+ if (groupMetadataManager.maybeDeleteGroup(groupId, records)) {
+ if (groupMetadataManager.group(groupId).type() ==
Group.GroupType.CLASSIC) {
+ deletedClassicGroupCount.incrementAndGet();
Review Comment:
I wonder if we need this logic in `deleteGroups` too. What do you think?
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java:
##########
@@ -621,18 +623,32 @@ public CoordinatorResult<OffsetDeleteResponseData,
CoordinatorRecord> deleteOffs
public CoordinatorResult<Void, CoordinatorRecord> cleanupGroupMetadata() {
long startMs = time.milliseconds();
List<CoordinatorRecord> records = new ArrayList<>();
+ AtomicInteger deletedClassicGroupCount = new AtomicInteger(0);
groupMetadataManager.groupIds().forEach(groupId -> {
boolean allOffsetsExpired =
offsetMetadataManager.cleanupExpiredOffsets(groupId, records);
if (allOffsetsExpired) {
- groupMetadataManager.maybeDeleteGroup(groupId, records);
+ if (groupMetadataManager.maybeDeleteGroup(groupId, records)) {
+ if (groupMetadataManager.group(groupId).type() ==
Group.GroupType.CLASSIC) {
+ deletedClassicGroupCount.incrementAndGet();
+ }
+ }
}
});
log.info("Generated {} tombstone records while cleaning up group
metadata in {} milliseconds.",
records.size(), time.milliseconds() - startMs);
// Reschedule the next cycle.
scheduleGroupMetadataExpiration();
- return new CoordinatorResult<>(records, false);
+
+ // If the append operation fails, revert classic group state
transitions. Groups were only deleted
+ // if they were in Empty state.
+ CompletableFuture<Void> appendFuture = new CompletableFuture<>();
+ appendFuture.exceptionally(__ -> {
+ IntStream.range(0, deletedClassicGroupCount.get()).forEach(___ ->
+ metricsShard.onClassicGroupStateTransition(null,
ClassicGroupState.EMPTY));
+ return null;
+ });
+ return new CoordinatorResult<>(records, null, appendFuture, true,
false);
Review Comment:
nit: Let's add an empty line before this one.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -3984,6 +3984,8 @@ CoordinatorResult<Void, CoordinatorRecord>
classicGroupJoinToClassicGroup(
responseFuture.complete(new JoinGroupResponseData()
.setErrorCode(appendGroupMetadataErrorToResponseError(Errors.forException(t)).code()));
+
+ metrics.onClassicGroupStateTransition(EMPTY, null);
Review Comment:
Should we add a unit test to cover this one too?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]