smjn commented on code in PR #19431:
URL: https://github.com/apache/kafka/pull/19431#discussion_r2036832379


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##########
@@ -1248,29 +1248,118 @@ private CompletableFuture<Map<String, Errors>> 
persisterDeleteToGroupIdErrorMap(
         });
     }
 
-    private void populateDeleteShareGroupOffsetsFuture(
+    private CompletableFuture<DeleteShareGroupOffsetsResponseData> 
checkInitializedSharePartitionsAndProcess(
+        String groupId,
+        DeleteShareGroupOffsetsRequestData requestData
+    ) {
+        Map<Uuid, String> requestTopicIdToNameMapping = new HashMap<>();
+        List<DeleteShareGroupStateRequestData.DeleteStateData> 
deleteShareGroupStateRequestTopicsData = new ArrayList<>();
+        
List<DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic> 
deleteShareGroupOffsetsResponseTopicList =
+            new ArrayList<>(requestData.topics().size());
+
+        return runtime.scheduleReadOperation(
+            "share-group-initialized-partitions",
+            topicPartitionFor(groupId),
+            (coordinator, offset) -> 
coordinator.initializedShareGroupPartitions(groupId)
+        ).thenCompose(topicPartitionMap -> {
+            requestData.topics().forEach(topic -> {
+                Uuid topicId = 
metadataImage.topics().topicNameToIdView().get(topic.topicName());
+                if (topicId != null) {
+                    // A deleteState request to persister should only be sent 
with those topic partitions for which corresponding
+                    // share partitions are initialized for the group.
+                    if (topicPartitionMap.containsKey(topicId)) {
+                        requestTopicIdToNameMapping.put(topicId, 
topic.topicName());
+                        List<DeleteShareGroupStateRequestData.PartitionData> 
partitions = new ArrayList<>();
+                        topic.partitions().forEach(partition -> {
+                            if 
(topicPartitionMap.get(topicId).contains(partition)) {
+                                partitions.add(new 
DeleteShareGroupStateRequestData.PartitionData().setPartition(partition));
+                            }
+                        });
+                        deleteShareGroupStateRequestTopicsData.add(new 
DeleteShareGroupStateRequestData.DeleteStateData()
+                            .setTopicId(topicId)
+                            .setPartitions(partitions));
+                    }
+                } else {
+                    deleteShareGroupOffsetsResponseTopicList.add(new 
DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic()
+                        .setTopicName(topic.topicName())
+                        .setPartitions(topic.partitions().stream().map(
+                            partition -> new 
DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponsePartition()
+                                .setPartitionIndex(partition)
+                                
.setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code())
+                                
.setErrorMessage(Errors.UNKNOWN_TOPIC_OR_PARTITION.message())
+                        ).toList()));
+                }
+            });
+
+            // If the request for the persister is empty, just complete the 
operation right away.
+            if (deleteShareGroupStateRequestTopicsData.isEmpty()) {
+                return CompletableFuture.completedFuture(new 
DeleteShareGroupOffsetsResponseData().setResponses(deleteShareGroupOffsetsResponseTopicList));
+            }
+
+            return sendPersisterDeleteStateRequest(
+                requestData,
+                requestTopicIdToNameMapping,
+                deleteShareGroupStateRequestTopicsData,
+                deleteShareGroupOffsetsResponseTopicList
+            );
+        }).exceptionally(throwable -> {
+            log.error("Failed to get initialized topic partitions for the 
group {}", groupId, throwable);
+            return 
DeleteShareGroupOffsetsRequest.getErrorDeleteResponseData(Errors.forException(throwable));
+        });
+    }
+
+    private CompletableFuture<DeleteShareGroupOffsetsResponseData> 
checkIfGroupIsEmptyAndProcess(
+        String groupId,
+        DeleteShareGroupOffsetsRequestData requestData
+    ) {
+        // This is done to make sure the provided group is empty. Offsets can 
be deleted only for an empty share group.

Review Comment:
   This seems unnecessary. 
   
   There is no need to make the `describe-groups` call.
   
   You can directly call the shard method on the request and the shard can 
check its internal state to verify that the group is empty and return the 
appropriate persister request.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to