dajac commented on code in PR #18848:
URL: https://github.com/apache/kafka/pull/18848#discussion_r1954063627
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##########
@@ -845,6 +896,113 @@ public
CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection
(accumulator, newResults) -> newResults.forEach(result ->
accumulator.add(result.duplicate())));
}
+ private CompletableFuture<Map<String, Errors>>
deleteShareGroups(TopicPartition topicPartition, List<String> groupList) {
+ // topicPartition refers to internal topic __consumer_offsets
+ return runtime.scheduleReadOperation(
+ "delete-share-groups",
+ topicPartition,
+ (coordinator, offset) ->
coordinator.sharePartitions(groupList, offset)
+ )
Review Comment:
nit: The indentation is off here. We usually use 4 spaces for the arguments.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##########
@@ -845,6 +896,113 @@ public
CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection
(accumulator, newResults) -> newResults.forEach(result ->
accumulator.add(result.duplicate())));
}
+ private CompletableFuture<Map<String, Errors>>
deleteShareGroups(TopicPartition topicPartition, List<String> groupList) {
Review Comment:
nit: Let's put one argument per line to follow the format of the other
methods in this file.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##########
@@ -845,6 +896,113 @@ public
CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection
(accumulator, newResults) -> newResults.forEach(result ->
accumulator.add(result.duplicate())));
}
+ private CompletableFuture<Map<String, Errors>>
deleteShareGroups(TopicPartition topicPartition, List<String> groupList) {
+ // topicPartition refers to internal topic __consumer_offsets
+ return runtime.scheduleReadOperation(
Review Comment:
For consistency reason, I suggest to use a write operation to ensure that
you read the last state. Otherwise, there is a change that you have a share
group non-committed yet and you would not see it with a read.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##########
@@ -845,6 +896,113 @@ public
CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection
(accumulator, newResults) -> newResults.forEach(result ->
accumulator.add(result.duplicate())));
}
+ private CompletableFuture<Map<String, Errors>>
deleteShareGroups(TopicPartition topicPartition, List<String> groupList) {
+ // topicPartition refers to internal topic __consumer_offsets
+ return runtime.scheduleReadOperation(
+ "delete-share-groups",
+ topicPartition,
+ (coordinator, offset) ->
coordinator.sharePartitions(groupList, offset)
+ )
+ .thenCompose(this::performShareGroupsDeletion)
+ .exceptionally(exception -> handleOperationException(
+ "delete-share-groups",
+ groupList,
+ exception,
+ (error, __) -> {
+ Map<String, Errors> errors = new HashMap<>();
+ groupList.forEach(group -> errors.put(group, error));
+ return errors;
+ },
+ log
+ ));
+ }
+
+ private CompletableFuture<Map<String, Errors>> performShareGroupsDeletion(
+ Map<String, Map<Uuid, List<Integer>>> keys
+ ) {
+ List<CompletableFuture<AbstractMap.SimpleEntry<String,
DeleteShareGroupStateResult>>> futures = new ArrayList<>();
+ for (Map.Entry<String, Map<Uuid, List<Integer>>> groupEntry :
keys.entrySet()) {
+ List<TopicData<PartitionIdData>> topicData = new ArrayList<>();
+ for (Map.Entry<Uuid, List<Integer>> topicEntry :
groupEntry.getValue().entrySet()) {
+ topicData.add(
+ new TopicData<>(
+ topicEntry.getKey(),
+
topicEntry.getValue().stream().map(PartitionFactory::newPartitionIdData).toList()
+ )
+ );
+ }
Review Comment:
I wonder whether it would make sense to directly return the data structure
that the persister needs from the group metadata manager. We could avoid all
those conversions.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##########
@@ -822,21 +834,60 @@ public
CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection
});
groupsByTopicPartition.forEach((topicPartition, groupList) -> {
-
CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection>
future =
- runtime.scheduleWriteOperation(
- "delete-groups",
- topicPartition,
- Duration.ofMillis(config.offsetCommitTimeoutMs()),
- coordinator -> coordinator.deleteGroups(context, groupList)
- ).exceptionally(exception -> handleOperationException(
- "delete-groups",
- groupList,
- exception,
- (error, __) ->
DeleteGroupsRequest.getErrorResultCollection(groupList, error),
- log
- ));
-
- futures.add(future);
+
CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection>
shareFuture = deleteShareGroups(topicPartition, groupList)
Review Comment:
It would be great if you could put an comment explaining the main flow here.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##########
@@ -822,21 +834,60 @@ public
CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection
});
groupsByTopicPartition.forEach((topicPartition, groupList) -> {
-
CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection>
future =
- runtime.scheduleWriteOperation(
- "delete-groups",
- topicPartition,
- Duration.ofMillis(config.offsetCommitTimeoutMs()),
- coordinator -> coordinator.deleteGroups(context, groupList)
- ).exceptionally(exception -> handleOperationException(
- "delete-groups",
- groupList,
- exception,
- (error, __) ->
DeleteGroupsRequest.getErrorResultCollection(groupList, error),
- log
- ));
-
- futures.add(future);
+
CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection>
shareFuture = deleteShareGroups(topicPartition, groupList)
+ .thenCompose(groupErrMap -> {
+ DeleteGroupsResponseData.DeletableGroupResultCollection
collection = new DeleteGroupsResponseData.DeletableGroupResultCollection();
+ List<String> errGroupIds = new ArrayList<>();
+ groupErrMap.forEach((groupId, error) -> {
+ if (error.code() != Errors.NONE.code()) {
+ log.error("Error deleting share group {} due to
error {}", groupId, error);
+ errGroupIds.add(groupId);
+ collection.add(
+ new
DeleteGroupsResponseData.DeletableGroupResult()
+ .setGroupId(groupId)
+ .setErrorCode(error.code())
+ );
+ }
+ });
+
+ Set<String> groupSet = new HashSet<>(groupList);
+ // Remove all share group ids which have errored out
+ // when deleting with persister.
+ errGroupIds.forEach(groupSet::remove);
+
+ // If no non-share groupIds or non-error share group ids
present
+ // return.
+ if (groupSet.isEmpty()) {
+ return CompletableFuture.completedFuture(collection);
+ }
+
+ // Let us invoke the standard procedure of any non-share
+ // groups or successfully deleted share groups remaining.
+ List<String> retainedGroupIds = groupSet.stream().toList();
+ return runtime.scheduleWriteOperation(
Review Comment:
nit: It may be worth extracting this into a method to reduce the code in the
`thenCompose`.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##########
@@ -822,21 +834,60 @@ public
CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection
});
groupsByTopicPartition.forEach((topicPartition, groupList) -> {
-
CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection>
future =
- runtime.scheduleWriteOperation(
- "delete-groups",
- topicPartition,
- Duration.ofMillis(config.offsetCommitTimeoutMs()),
- coordinator -> coordinator.deleteGroups(context, groupList)
- ).exceptionally(exception -> handleOperationException(
- "delete-groups",
- groupList,
- exception,
- (error, __) ->
DeleteGroupsRequest.getErrorResultCollection(groupList, error),
- log
- ));
-
- futures.add(future);
+
CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection>
shareFuture = deleteShareGroups(topicPartition, groupList)
Review Comment:
nit: future? shareFuture is not quite right because it is the result of the
entire deletion.
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java:
##########
@@ -1627,6 +1637,374 @@ public void testDeleteGroups() throws Exception {
assertEquals(expectedResultCollection, future.get());
}
+ @Test
+ public void testDeleteWithShareGroups() throws Exception {
+ CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime =
mockRuntime();
+ Persister persister = mock(Persister.class);
+ GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
+ .setConfig(createConfig())
+ .setRuntime(runtime)
+ .setMetrics(mock(GroupCoordinatorMetrics.class))
+ .setPersister(persister)
+ .build();
+ service.startup(() -> 3);
+
+ DeleteGroupsResponseData.DeletableGroupResultCollection
resultCollection1 =
+ new DeleteGroupsResponseData.DeletableGroupResultCollection();
+ // share group
+ DeleteGroupsResponseData.DeletableGroupResult result1 = new
DeleteGroupsResponseData.DeletableGroupResult()
+ .setGroupId("share-group-id-1");
+ resultCollection1.add(result1);
+
+ DeleteGroupsResponseData.DeletableGroupResultCollection
resultCollection2 =
+ new DeleteGroupsResponseData.DeletableGroupResultCollection();
+ // non-share group
+ DeleteGroupsResponseData.DeletableGroupResult result2 = new
DeleteGroupsResponseData.DeletableGroupResult()
+ .setGroupId("group-id-2");
+ resultCollection2.add(result2);
+
+ // null
+ DeleteGroupsResponseData.DeletableGroupResult result3 = new
DeleteGroupsResponseData.DeletableGroupResult()
+ .setGroupId(null)
+ .setErrorCode(Errors.INVALID_GROUP_ID.code());
+
+ DeleteGroupsResponseData.DeletableGroupResultCollection
expectedResultCollection =
+ new DeleteGroupsResponseData.DeletableGroupResultCollection();
+ expectedResultCollection.addAll(List.of(
+ result3.duplicate(),
+ result2.duplicate(),
+ result1.duplicate()
+ )
+ );
+
+ Uuid shareGroupTopicId = Uuid.randomUuid();
+ when(runtime.scheduleReadOperation(
+ ArgumentMatchers.eq("delete-share-groups"),
+ ArgumentMatchers.any(),
+ ArgumentMatchers.any()
+ ))
+ .thenReturn(CompletableFuture.completedFuture(
+ Map.of(
+ "share-group-id-1",
+ Map.of(
+ shareGroupTopicId,
+ List.of(0, 1)
+ )
+ )
+ )
+ )
Review Comment:
This looks weird. The indentation is also incorrect. There are many such
cases in this file. I let you go through them.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -6280,6 +6281,29 @@ public void createGroupTombstoneRecords(
group.createGroupTombstoneRecords(records);
}
+ /**
+ * Returns all share partitions keys as a map from the input list of share
groups.
+ * @param shareGroups - A list representing share groups.
+ * @return Map representing the share partition keys for all the groups in
the input.
+ */
+ public Map<String, Map<Uuid, List<Integer>>>
sharePartitionKeysMap(List<ShareGroup> shareGroups) {
+ Map<String, Map<Uuid, List<Integer>>> keyMap = new HashMap<>();
+ if (metadataImage == null) {
+ return Map.of();
+ }
+ TopicsImage topicsImage = metadataImage.topics();
+ for (ShareGroup shareGroup : shareGroups) {
+ String groupId = shareGroup.groupId();
+ for (String topic : shareGroup.subscribedTopicNames().keySet()) {
+ TopicImage topicImage = topicsImage.getTopic(topic);
Review Comment:
The topic may not exist any more. For my understanding, how do we handle
this case? Does the share coordinator deletes offsets of deleted topics?
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java:
##########
@@ -476,6 +479,33 @@ public
CoordinatorResult<DeleteGroupsResponseData.DeletableGroupResultCollection
return new CoordinatorResult<>(records, resultCollection);
}
+ /**
+ * Method returns all share partition keys corresponding to a list of
groupIds.
+ * The groupIds are first filtered by type to restrict the list to share
groups.
+ * @param groupIds - A list of groupIds as string
+ * @param committedOffset - The last committedOffset for the internal
topic partition
+ * @return A map representing the share partition structure.
+ */
+ public Map<String, Map<Uuid, List<Integer>>> sharePartitions(List<String>
groupIds, long committedOffset) {
+ List<ShareGroup> shareGroups = new ArrayList<>();
+ for (String groupId : groupIds) {
Review Comment:
nit: With the current structure of the code, you iterate on the group ids
twice. It may be better to directly get the share partitions of the share group
when you have it in hand.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java:
##########
@@ -476,6 +479,33 @@ public
CoordinatorResult<DeleteGroupsResponseData.DeletableGroupResultCollection
return new CoordinatorResult<>(records, resultCollection);
}
+ /**
+ * Method returns all share partition keys corresponding to a list of
groupIds.
+ * The groupIds are first filtered by type to restrict the list to share
groups.
+ * @param groupIds - A list of groupIds as string
+ * @param committedOffset - The last committedOffset for the internal
topic partition
+ * @return A map representing the share partition structure.
+ */
+ public Map<String, Map<Uuid, List<Integer>>> sharePartitions(List<String>
groupIds, long committedOffset) {
+ List<ShareGroup> shareGroups = new ArrayList<>();
+ for (String groupId : groupIds) {
+ try {
+ Group group = groupMetadataManager.group(groupId);
+ if (group instanceof ShareGroup) {
+ shareGroups.add((ShareGroup) group);
+ }
+ } catch (ApiException exception) {
+ // We needn't do anything more than logging here as
deleteGroups
+ // method is handling these cases.
+ // Even if some groups cannot be found, we
+ // must check the entire list.
+ log.error("Failed to find group {}", groupId, exception);
Review Comment:
nit: I would remove this one because it may be spammy.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java:
##########
@@ -476,6 +479,33 @@ public
CoordinatorResult<DeleteGroupsResponseData.DeletableGroupResultCollection
return new CoordinatorResult<>(records, resultCollection);
}
+ /**
+ * Method returns all share partition keys corresponding to a list of
groupIds.
+ * The groupIds are first filtered by type to restrict the list to share
groups.
+ * @param groupIds - A list of groupIds as string
+ * @param committedOffset - The last committedOffset for the internal
topic partition
+ * @return A map representing the share partition structure.
+ */
+ public Map<String, Map<Uuid, List<Integer>>> sharePartitions(List<String>
groupIds, long committedOffset) {
+ List<ShareGroup> shareGroups = new ArrayList<>();
+ for (String groupId : groupIds) {
+ try {
+ Group group = groupMetadataManager.group(groupId);
+ if (group instanceof ShareGroup) {
+ shareGroups.add((ShareGroup) group);
+ }
+ } catch (ApiException exception) {
Review Comment:
Could we be specific about the exception that we handle here? I suppose that
we care about the group id not found.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##########
@@ -845,6 +896,113 @@ public
CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection
(accumulator, newResults) -> newResults.forEach(result ->
accumulator.add(result.duplicate())));
}
+ private CompletableFuture<Map<String, Errors>>
deleteShareGroups(TopicPartition topicPartition, List<String> groupList) {
+ // topicPartition refers to internal topic __consumer_offsets
+ return runtime.scheduleReadOperation(
+ "delete-share-groups",
+ topicPartition,
+ (coordinator, offset) ->
coordinator.sharePartitions(groupList, offset)
+ )
+ .thenCompose(this::performShareGroupsDeletion)
+ .exceptionally(exception -> handleOperationException(
+ "delete-share-groups",
+ groupList,
+ exception,
+ (error, __) -> {
+ Map<String, Errors> errors = new HashMap<>();
+ groupList.forEach(group -> errors.put(group, error));
+ return errors;
+ },
+ log
+ ));
+ }
+
+ private CompletableFuture<Map<String, Errors>> performShareGroupsDeletion(
+ Map<String, Map<Uuid, List<Integer>>> keys
+ ) {
+ List<CompletableFuture<AbstractMap.SimpleEntry<String,
DeleteShareGroupStateResult>>> futures = new ArrayList<>();
+ for (Map.Entry<String, Map<Uuid, List<Integer>>> groupEntry :
keys.entrySet()) {
+ List<TopicData<PartitionIdData>> topicData = new ArrayList<>();
+ for (Map.Entry<Uuid, List<Integer>> topicEntry :
groupEntry.getValue().entrySet()) {
+ topicData.add(
+ new TopicData<>(
+ topicEntry.getKey(),
+
topicEntry.getValue().stream().map(PartitionFactory::newPartitionIdData).toList()
+ )
+ );
+ }
+
+ futures.add(deleteShareGroup(groupEntry.getKey(), topicData));
+ }
+
+ return persisterDeleteToGroupIdErrorMap(futures);
+ }
+
+ private CompletableFuture<AbstractMap.SimpleEntry<String,
DeleteShareGroupStateResult>> deleteShareGroup(
+ String groupId,
+ List<TopicData<PartitionIdData>> topicData
+ ) {
+ return persister.deleteState(
+ new DeleteShareGroupStateParameters.Builder()
Review Comment:
nit: Indentation seems to be off here too.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -6280,6 +6281,29 @@ public void createGroupTombstoneRecords(
group.createGroupTombstoneRecords(records);
}
+ /**
+ * Returns all share partitions keys as a map from the input list of share
groups.
+ * @param shareGroups - A list representing share groups.
+ * @return Map representing the share partition keys for all the groups in
the input.
+ */
+ public Map<String, Map<Uuid, List<Integer>>>
sharePartitionKeysMap(List<ShareGroup> shareGroups) {
+ Map<String, Map<Uuid, List<Integer>>> keyMap = new HashMap<>();
+ if (metadataImage == null) {
+ return Map.of();
+ }
Review Comment:
nit: metadata image is never null.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##########
@@ -822,21 +834,60 @@ public
CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection
});
groupsByTopicPartition.forEach((topicPartition, groupList) -> {
-
CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection>
future =
- runtime.scheduleWriteOperation(
- "delete-groups",
- topicPartition,
- Duration.ofMillis(config.offsetCommitTimeoutMs()),
- coordinator -> coordinator.deleteGroups(context, groupList)
- ).exceptionally(exception -> handleOperationException(
- "delete-groups",
- groupList,
- exception,
- (error, __) ->
DeleteGroupsRequest.getErrorResultCollection(groupList, error),
- log
- ));
-
- futures.add(future);
+
CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection>
shareFuture = deleteShareGroups(topicPartition, groupList)
+ .thenCompose(groupErrMap -> {
+ DeleteGroupsResponseData.DeletableGroupResultCollection
collection = new DeleteGroupsResponseData.DeletableGroupResultCollection();
+ List<String> errGroupIds = new ArrayList<>();
+ groupErrMap.forEach((groupId, error) -> {
+ if (error.code() != Errors.NONE.code()) {
Review Comment:
What kind of errors can we get here? Are they all expected/allowed by the
DeleteGroups API?
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##########
@@ -845,6 +896,113 @@ public
CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection
(accumulator, newResults) -> newResults.forEach(result ->
accumulator.add(result.duplicate())));
}
+ private CompletableFuture<Map<String, Errors>>
deleteShareGroups(TopicPartition topicPartition, List<String> groupList) {
+ // topicPartition refers to internal topic __consumer_offsets
+ return runtime.scheduleReadOperation(
+ "delete-share-groups",
+ topicPartition,
+ (coordinator, offset) ->
coordinator.sharePartitions(groupList, offset)
+ )
+ .thenCompose(this::performShareGroupsDeletion)
+ .exceptionally(exception -> handleOperationException(
+ "delete-share-groups",
+ groupList,
+ exception,
+ (error, __) -> {
+ Map<String, Errors> errors = new HashMap<>();
+ groupList.forEach(group -> errors.put(group, error));
+ return errors;
+ },
+ log
+ ));
+ }
+
+ private CompletableFuture<Map<String, Errors>> performShareGroupsDeletion(
+ Map<String, Map<Uuid, List<Integer>>> keys
+ ) {
+ List<CompletableFuture<AbstractMap.SimpleEntry<String,
DeleteShareGroupStateResult>>> futures = new ArrayList<>();
+ for (Map.Entry<String, Map<Uuid, List<Integer>>> groupEntry :
keys.entrySet()) {
+ List<TopicData<PartitionIdData>> topicData = new ArrayList<>();
+ for (Map.Entry<Uuid, List<Integer>> topicEntry :
groupEntry.getValue().entrySet()) {
+ topicData.add(
+ new TopicData<>(
+ topicEntry.getKey(),
+
topicEntry.getValue().stream().map(PartitionFactory::newPartitionIdData).toList()
+ )
+ );
+ }
+
+ futures.add(deleteShareGroup(groupEntry.getKey(), topicData));
+ }
+
+ return persisterDeleteToGroupIdErrorMap(futures);
+ }
+
+ private CompletableFuture<AbstractMap.SimpleEntry<String,
DeleteShareGroupStateResult>> deleteShareGroup(
+ String groupId,
+ List<TopicData<PartitionIdData>> topicData
+ ) {
+ return persister.deleteState(
+ new DeleteShareGroupStateParameters.Builder()
+ .setGroupTopicPartitionData(new
GroupTopicPartitionData.Builder<PartitionIdData>()
+ .setGroupId(groupId)
+ .setTopicsData(topicData)
+ .build()
+ )
+ .build()
+ )
+ .thenCompose(result -> CompletableFuture.completedFuture(new
AbstractMap.SimpleEntry<>(groupId, result)))
+ .exceptionally(exception -> {
+ // In case the deleteState call fails,
+ // we should construct the appropriate response here
+ // so that the subsequent callbacks don't see runtime
exceptions.
+ log.error("Unable to delete share group partition(s) - {},
{}", groupId, topicData);
+ List<TopicData<PartitionErrorData>> respTopicData =
topicData.stream()
+ .map(reqTopicData -> new TopicData<>(
+ reqTopicData.topicId(),
+ reqTopicData.partitions().stream()
+ .map(reqPartData -> {
+ Errors err =
Errors.forException(exception);
+ return
PartitionFactory.newPartitionErrorData(reqPartData.partition(), err.code(),
err.message());
+ })
+ .toList()
+ )
+ )
+ .toList();
+
+ return new AbstractMap.SimpleEntry<>(groupId, new
DeleteShareGroupStateResult.Builder()
+ .setTopicsData(respTopicData)
+ .build()
+ );
+ });
+ }
+
+ private CompletableFuture<Map<String, Errors>>
persisterDeleteToGroupIdErrorMap(
+ List<CompletableFuture<AbstractMap.SimpleEntry<String,
DeleteShareGroupStateResult>>> futures
+ ) {
+ return CompletableFuture.allOf(futures.toArray(new
CompletableFuture[]{}))
+ .thenCompose(v -> {
+ Map<String, Errors> groupIds = new HashMap<>();
+ for (CompletableFuture<AbstractMap.SimpleEntry<String,
DeleteShareGroupStateResult>> future : futures) {
+ Map.Entry<String, DeleteShareGroupStateResult> entry =
future.getNow(null); // safe as within allOff
+ groupIds.putIfAbsent(entry.getKey(), Errors.NONE);
+ for (TopicData<PartitionErrorData> topicData :
entry.getValue().topicsData()) {
+ Optional<PartitionErrorData> errItem =
topicData.partitions().stream()
+ .filter(errData -> errData.errorCode() !=
Errors.NONE.code())
+ .findAny();
+
+ errItem.ifPresent(val -> {
+ log.error("Received error while deleting share
group {} - {}", entry.getKey(), val);
+ groupIds.put(entry.getKey(),
Errors.forCode(val.errorCode()));
+ }
+ );
Review Comment:
nit: Format is incorrect here too.
```
errItem.ifPresent(val -> {
log.error("Received error while deleting share group {} - {}",
entry.getKey(), val);
groupIds.put(entry.getKey(), Errors.forCode(val.errorCode()));
});
```
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##########
@@ -822,21 +834,60 @@ public
CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection
});
groupsByTopicPartition.forEach((topicPartition, groupList) -> {
-
CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection>
future =
- runtime.scheduleWriteOperation(
- "delete-groups",
- topicPartition,
- Duration.ofMillis(config.offsetCommitTimeoutMs()),
- coordinator -> coordinator.deleteGroups(context, groupList)
- ).exceptionally(exception -> handleOperationException(
- "delete-groups",
- groupList,
- exception,
- (error, __) ->
DeleteGroupsRequest.getErrorResultCollection(groupList, error),
- log
- ));
-
- futures.add(future);
+
CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection>
shareFuture = deleteShareGroups(topicPartition, groupList)
+ .thenCompose(groupErrMap -> {
+ DeleteGroupsResponseData.DeletableGroupResultCollection
collection = new DeleteGroupsResponseData.DeletableGroupResultCollection();
+ List<String> errGroupIds = new ArrayList<>();
+ groupErrMap.forEach((groupId, error) -> {
+ if (error.code() != Errors.NONE.code()) {
+ log.error("Error deleting share group {} due to
error {}", groupId, error);
+ errGroupIds.add(groupId);
+ collection.add(
+ new
DeleteGroupsResponseData.DeletableGroupResult()
+ .setGroupId(groupId)
+ .setErrorCode(error.code())
+ );
+ }
+ });
+
+ Set<String> groupSet = new HashSet<>(groupList);
+ // Remove all share group ids which have errored out
+ // when deleting with persister.
+ errGroupIds.forEach(groupSet::remove);
Review Comment:
nit: removeAll?
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##########
@@ -822,21 +834,60 @@ public
CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection
});
groupsByTopicPartition.forEach((topicPartition, groupList) -> {
-
CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection>
future =
- runtime.scheduleWriteOperation(
- "delete-groups",
- topicPartition,
- Duration.ofMillis(config.offsetCommitTimeoutMs()),
- coordinator -> coordinator.deleteGroups(context, groupList)
- ).exceptionally(exception -> handleOperationException(
- "delete-groups",
- groupList,
- exception,
- (error, __) ->
DeleteGroupsRequest.getErrorResultCollection(groupList, error),
- log
- ));
-
- futures.add(future);
+
CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection>
shareFuture = deleteShareGroups(topicPartition, groupList)
+ .thenCompose(groupErrMap -> {
+ DeleteGroupsResponseData.DeletableGroupResultCollection
collection = new DeleteGroupsResponseData.DeletableGroupResultCollection();
+ List<String> errGroupIds = new ArrayList<>();
+ groupErrMap.forEach((groupId, error) -> {
+ if (error.code() != Errors.NONE.code()) {
+ log.error("Error deleting share group {} due to
error {}", groupId, error);
+ errGroupIds.add(groupId);
+ collection.add(
+ new
DeleteGroupsResponseData.DeletableGroupResult()
+ .setGroupId(groupId)
+ .setErrorCode(error.code())
+ );
+ }
+ });
+
+ Set<String> groupSet = new HashSet<>(groupList);
+ // Remove all share group ids which have errored out
+ // when deleting with persister.
+ errGroupIds.forEach(groupSet::remove);
+
+ // If no non-share groupIds or non-error share group ids
present
+ // return.
+ if (groupSet.isEmpty()) {
+ return CompletableFuture.completedFuture(collection);
+ }
+
+ // Let us invoke the standard procedure of any non-share
+ // groups or successfully deleted share groups remaining.
+ List<String> retainedGroupIds = groupSet.stream().toList();
+ return runtime.scheduleWriteOperation(
+ "delete-groups",
+ topicPartition,
+ Duration.ofMillis(config.offsetCommitTimeoutMs()),
+ coordinator -> coordinator.deleteGroups(context,
retainedGroupIds)
+ ).thenApply(deletedCollection -> {
+ deletedCollection.forEach(item ->
collection.add(item.duplicate()));
+ return collection;
+ })
+ .exceptionally(exception -> handleOperationException(
+ "delete-groups",
+ groupList,
+ exception,
+ (error, __) -> {
+
DeleteGroupsRequest.getErrorResultCollection(retainedGroupIds,
error).forEach(item -> collection.add(item.duplicate()));
+ return collection;
Review Comment:
I don't like the fact that we add to collection in two places (L873 and
L881). This is error prone. Could we handle it at the end for both?
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##########
@@ -822,21 +834,60 @@ public
CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection
});
groupsByTopicPartition.forEach((topicPartition, groupList) -> {
-
CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection>
future =
- runtime.scheduleWriteOperation(
- "delete-groups",
- topicPartition,
- Duration.ofMillis(config.offsetCommitTimeoutMs()),
- coordinator -> coordinator.deleteGroups(context, groupList)
- ).exceptionally(exception -> handleOperationException(
- "delete-groups",
- groupList,
- exception,
- (error, __) ->
DeleteGroupsRequest.getErrorResultCollection(groupList, error),
- log
- ));
-
- futures.add(future);
+
CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection>
shareFuture = deleteShareGroups(topicPartition, groupList)
+ .thenCompose(groupErrMap -> {
+ DeleteGroupsResponseData.DeletableGroupResultCollection
collection = new DeleteGroupsResponseData.DeletableGroupResultCollection();
+ List<String> errGroupIds = new ArrayList<>();
+ groupErrMap.forEach((groupId, error) -> {
+ if (error.code() != Errors.NONE.code()) {
+ log.error("Error deleting share group {} due to
error {}", groupId, error);
+ errGroupIds.add(groupId);
+ collection.add(
+ new
DeleteGroupsResponseData.DeletableGroupResult()
+ .setGroupId(groupId)
+ .setErrorCode(error.code())
+ );
+ }
+ });
+
+ Set<String> groupSet = new HashSet<>(groupList);
+ // Remove all share group ids which have errored out
+ // when deleting with persister.
+ errGroupIds.forEach(groupSet::remove);
+
+ // If no non-share groupIds or non-error share group ids
present
+ // return.
+ if (groupSet.isEmpty()) {
+ return CompletableFuture.completedFuture(collection);
+ }
+
+ // Let us invoke the standard procedure of any non-share
+ // groups or successfully deleted share groups remaining.
+ List<String> retainedGroupIds = groupSet.stream().toList();
+ return runtime.scheduleWriteOperation(
+ "delete-groups",
Review Comment:
nit: Indentation is incorrect here too.
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -16294,6 +16299,76 @@ barTopicName, new TopicMetadata(barTopicId,
barTopicName, 3)
);
}
+ @Test
+ public void testSharePartitionKeyMap() {
+ MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+ assignor.prepareGroupAssignment(new
GroupAssignment(Collections.emptyMap()));
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+
.withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG,
List.of(assignor))
+ .build();
+
+ MetadataImage image = mock(MetadataImage.class);
+ TopicsImage topicsImage = mock(TopicsImage.class);
+ TopicImage t1image = mock(TopicImage.class);
+ TopicImage t2image = mock(TopicImage.class);
+ when(topicsImage.getTopic(anyString()))
+ .thenReturn(t1image)
+ .thenReturn(t2image);
+
+ ShareGroup shareGroup = mock(ShareGroup.class);
+ when(shareGroup.subscribedTopicNames())
+ .thenReturn(Map.of(
+ "t1", mock(SubscriptionCount.class),
+ "t2", mock(SubscriptionCount.class)
+ )
+ );
Review Comment:
nit: I would format it as follow:
```
when(shareGroup.subscribedTopicNames()).thenReturn(Map.of(
"t1", mock(SubscriptionCount.class),
"t2", mock(SubscriptionCount.class)
));
```
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java:
##########
@@ -1627,6 +1637,374 @@ public void testDeleteGroups() throws Exception {
assertEquals(expectedResultCollection, future.get());
}
+ @Test
+ public void testDeleteWithShareGroups() throws Exception {
+ CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime =
mockRuntime();
+ Persister persister = mock(Persister.class);
+ GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
+ .setConfig(createConfig())
+ .setRuntime(runtime)
+ .setMetrics(mock(GroupCoordinatorMetrics.class))
+ .setPersister(persister)
+ .build();
+ service.startup(() -> 3);
+
+ DeleteGroupsResponseData.DeletableGroupResultCollection
resultCollection1 =
+ new DeleteGroupsResponseData.DeletableGroupResultCollection();
+ // share group
+ DeleteGroupsResponseData.DeletableGroupResult result1 = new
DeleteGroupsResponseData.DeletableGroupResult()
+ .setGroupId("share-group-id-1");
+ resultCollection1.add(result1);
+
+ DeleteGroupsResponseData.DeletableGroupResultCollection
resultCollection2 =
+ new DeleteGroupsResponseData.DeletableGroupResultCollection();
+ // non-share group
+ DeleteGroupsResponseData.DeletableGroupResult result2 = new
DeleteGroupsResponseData.DeletableGroupResult()
+ .setGroupId("group-id-2");
+ resultCollection2.add(result2);
+
+ // null
+ DeleteGroupsResponseData.DeletableGroupResult result3 = new
DeleteGroupsResponseData.DeletableGroupResult()
+ .setGroupId(null)
+ .setErrorCode(Errors.INVALID_GROUP_ID.code());
+
+ DeleteGroupsResponseData.DeletableGroupResultCollection
expectedResultCollection =
+ new DeleteGroupsResponseData.DeletableGroupResultCollection();
+ expectedResultCollection.addAll(List.of(
+ result3.duplicate(),
+ result2.duplicate(),
+ result1.duplicate()
+ )
+ );
Review Comment:
nit:
```
expectedResultCollection.addAll(List.of(
result3.duplicate(),
result2.duplicate(),
result1.duplicate()
));
```
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -16294,6 +16299,76 @@ barTopicName, new TopicMetadata(barTopicId,
barTopicName, 3)
);
}
+ @Test
+ public void testSharePartitionKeyMap() {
+ MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+ assignor.prepareGroupAssignment(new
GroupAssignment(Collections.emptyMap()));
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+
.withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG,
List.of(assignor))
+ .build();
+
+ MetadataImage image = mock(MetadataImage.class);
+ TopicsImage topicsImage = mock(TopicsImage.class);
+ TopicImage t1image = mock(TopicImage.class);
+ TopicImage t2image = mock(TopicImage.class);
+ when(topicsImage.getTopic(anyString()))
+ .thenReturn(t1image)
+ .thenReturn(t2image);
Review Comment:
It may be easier to just build the MetadataImage that you need rather than
mocking it. We have `MetadataImageBuilder` which is pretty handy for it.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##########
@@ -845,6 +896,113 @@ public
CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection
(accumulator, newResults) -> newResults.forEach(result ->
accumulator.add(result.duplicate())));
}
+ private CompletableFuture<Map<String, Errors>>
deleteShareGroups(TopicPartition topicPartition, List<String> groupList) {
+ // topicPartition refers to internal topic __consumer_offsets
+ return runtime.scheduleReadOperation(
+ "delete-share-groups",
+ topicPartition,
+ (coordinator, offset) ->
coordinator.sharePartitions(groupList, offset)
+ )
+ .thenCompose(this::performShareGroupsDeletion)
+ .exceptionally(exception -> handleOperationException(
+ "delete-share-groups",
+ groupList,
+ exception,
+ (error, __) -> {
+ Map<String, Errors> errors = new HashMap<>();
+ groupList.forEach(group -> errors.put(group, error));
+ return errors;
+ },
+ log
+ ));
+ }
+
+ private CompletableFuture<Map<String, Errors>> performShareGroupsDeletion(
+ Map<String, Map<Uuid, List<Integer>>> keys
+ ) {
+ List<CompletableFuture<AbstractMap.SimpleEntry<String,
DeleteShareGroupStateResult>>> futures = new ArrayList<>();
+ for (Map.Entry<String, Map<Uuid, List<Integer>>> groupEntry :
keys.entrySet()) {
+ List<TopicData<PartitionIdData>> topicData = new ArrayList<>();
+ for (Map.Entry<Uuid, List<Integer>> topicEntry :
groupEntry.getValue().entrySet()) {
+ topicData.add(
+ new TopicData<>(
+ topicEntry.getKey(),
+
topicEntry.getValue().stream().map(PartitionFactory::newPartitionIdData).toList()
+ )
+ );
+ }
+
+ futures.add(deleteShareGroup(groupEntry.getKey(), topicData));
+ }
+
+ return persisterDeleteToGroupIdErrorMap(futures);
+ }
+
+ private CompletableFuture<AbstractMap.SimpleEntry<String,
DeleteShareGroupStateResult>> deleteShareGroup(
+ String groupId,
+ List<TopicData<PartitionIdData>> topicData
+ ) {
+ return persister.deleteState(
+ new DeleteShareGroupStateParameters.Builder()
+ .setGroupTopicPartitionData(new
GroupTopicPartitionData.Builder<PartitionIdData>()
+ .setGroupId(groupId)
+ .setTopicsData(topicData)
+ .build()
+ )
+ .build()
+ )
+ .thenCompose(result -> CompletableFuture.completedFuture(new
AbstractMap.SimpleEntry<>(groupId, result)))
+ .exceptionally(exception -> {
+ // In case the deleteState call fails,
+ // we should construct the appropriate response here
+ // so that the subsequent callbacks don't see runtime
exceptions.
+ log.error("Unable to delete share group partition(s) - {},
{}", groupId, topicData);
+ List<TopicData<PartitionErrorData>> respTopicData =
topicData.stream()
+ .map(reqTopicData -> new TopicData<>(
+ reqTopicData.topicId(),
+ reqTopicData.partitions().stream()
+ .map(reqPartData -> {
+ Errors err =
Errors.forException(exception);
+ return
PartitionFactory.newPartitionErrorData(reqPartData.partition(), err.code(),
err.message());
+ })
+ .toList()
+ )
+ )
+ .toList();
+
+ return new AbstractMap.SimpleEntry<>(groupId, new
DeleteShareGroupStateResult.Builder()
+ .setTopicsData(respTopicData)
+ .build()
+ );
+ });
+ }
+
+ private CompletableFuture<Map<String, Errors>>
persisterDeleteToGroupIdErrorMap(
+ List<CompletableFuture<AbstractMap.SimpleEntry<String,
DeleteShareGroupStateResult>>> futures
+ ) {
+ return CompletableFuture.allOf(futures.toArray(new
CompletableFuture[]{}))
+ .thenCompose(v -> {
Review Comment:
nit: We would usually format it as follow:
```
return CompletableFuture.allOf(futures.toArray(new
CompletableFuture[]{})).thenCompose(v -> {
...
});
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -16294,6 +16299,76 @@ barTopicName, new TopicMetadata(barTopicId,
barTopicName, 3)
);
}
+ @Test
+ public void testSharePartitionKeyMap() {
+ MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+ assignor.prepareGroupAssignment(new
GroupAssignment(Collections.emptyMap()));
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+
.withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG,
List.of(assignor))
+ .build();
+
+ MetadataImage image = mock(MetadataImage.class);
+ TopicsImage topicsImage = mock(TopicsImage.class);
+ TopicImage t1image = mock(TopicImage.class);
+ TopicImage t2image = mock(TopicImage.class);
+ when(topicsImage.getTopic(anyString()))
+ .thenReturn(t1image)
+ .thenReturn(t2image);
+
+ ShareGroup shareGroup = mock(ShareGroup.class);
+ when(shareGroup.subscribedTopicNames())
+ .thenReturn(Map.of(
+ "t1", mock(SubscriptionCount.class),
+ "t2", mock(SubscriptionCount.class)
+ )
+ );
+
+ when(shareGroup.groupId())
+ .thenReturn("share-group");
+ when(image.topics())
+ .thenReturn(topicsImage);
+ when(image.provenance())
+ .thenReturn(new MetadataProvenance(-1, -1, -1, true));
+
+ when(t1image.partitions())
+ .thenReturn(
+ Map.of(
+ 0, mock(PartitionRegistration.class),
+ 1, mock(PartitionRegistration.class)
+ )
+ );
+ Uuid t1Uuid = Uuid.randomUuid();
+ when(t1image.id()).thenReturn(t1Uuid);
+
+ when(t2image.partitions())
+ .thenReturn(
+ Map.of(
+ 0, mock(PartitionRegistration.class),
+ 1, mock(PartitionRegistration.class)
+ )
+ );
+ Uuid t2Uuid = Uuid.randomUuid();
+ when(t2image.id()).thenReturn(t2Uuid);
+
+ context.groupMetadataManager.onNewMetadataImage(image,
mock(MetadataDelta.class));
+ Map<String, Map<Uuid, List<Integer>>> keyMap =
context.groupMetadataManager.sharePartitionKeysMap(List.of(shareGroup));
+ assertEquals(1, keyMap.size());
+ assertEquals(2, keyMap.get("share-group").size());
+ for (Uuid topic : List.of(t1Uuid, t2Uuid)) {
+ assertEquals(2, keyMap.get("share-group").get(topic).size());
+ assertTrue(keyMap.get("share-group").get(topic).contains(0));
+ assertTrue(keyMap.get("share-group").get(topic).contains(1));
+ }
Review Comment:
We usually prefer to build the expected map and to use
assertEquals(expectedMap, actualMap).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]