AndrewJSchofield commented on code in PR #18929:
URL: https://github.com/apache/kafka/pull/18929#discussion_r2101636242


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##########
@@ -667,6 +670,56 @@ public void run() {
         ));
     }
 
+    // Visibility for testing
+    CompletableFuture<AlterShareGroupOffsetsResponseData> persisterInitialize(
+        InitializeShareGroupStateParameters request,
+        AlterShareGroupOffsetsResponseData response
+    ) {
+        return persister.initializeState(request)
+            .handle((result, exp) -> {
+                if (exp == null) {
+                    if (result.errorCounts().isEmpty()) {
+                        
handlePersisterInitializeResponse(request.groupTopicPartitionData().groupId(), 
result, new ShareGroupHeartbeatResponseData());
+                        return response;
+                    } else {
+                        //TODO build new AlterShareGroupOffsetsResponseData 
for error response

Review Comment:
   We can leave this for a following PR if you like.



##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -3738,11 +3738,50 @@ class KafkaApis(val requestChannel: RequestChannel,
 
   def handleAlterShareGroupOffsetsRequest(request: RequestChannel.Request): 
CompletableFuture[Unit] = {
     val alterShareGroupOffsetsRequest = 
request.body[AlterShareGroupOffsetsRequest]
+    val groupId = alterShareGroupOffsetsRequest.data.groupId
+
     if (!isShareGroupProtocolEnabled) {
       requestHelper.sendMaybeThrottle(request, 
alterShareGroupOffsetsRequest.getErrorResponse(AbstractResponse.DEFAULT_THROTTLE_TIME,
 Errors.UNSUPPORTED_VERSION.exception))
       return CompletableFuture.completedFuture[Unit](())
+    } else if (!authHelper.authorize(request.context, READ, GROUP, groupId)) {
+      requestHelper.sendMaybeThrottle(request, 
alterShareGroupOffsetsRequest.getErrorResponse(Errors.GROUP_AUTHORIZATION_FAILED.exception))
+    } else {
+      val responseBuilder = new AlterShareGroupOffsetsResponse.Builder()
+      val authorizedTopicPartitions = new 
AlterShareGroupOffsetsRequestData.AlterShareGroupOffsetsRequestTopicCollection()
+
+      alterShareGroupOffsetsRequest.data.topics.forEach(topic => {
+        val topicError = {
+          if (!authHelper.authorize(request.context, READ, TOPIC, 
topic.topicName())) {
+            Some(new ApiError(Errors.TOPIC_AUTHORIZATION_FAILED))
+          } else if (!metadataCache.contains(topic.topicName())) {
+            Some(new ApiError(Errors.UNKNOWN_TOPIC_OR_PARTITION))
+          } else {
+            None
+          }
+        }
+        topicError match {
+          case Some(error) =>
+            topic.partitions().forEach(partition => 
responseBuilder.addPartition(topic.topicName(), partition.partitionIndex(), 
error.error))
+          case None =>
+            authorizedTopicPartitions.add(topic)
+        }
+      })
+
+      val data = new AlterShareGroupOffsetsRequestData()
+        .setGroupId(groupId)
+        .setTopics(authorizedTopicPartitions)
+      groupCoordinator.alterShareGroupOffsets(
+        request.context,
+        groupId,
+        data
+      ).handle[Unit] { (response, exception) =>
+        if (exception != null) {
+          requestHelper.sendMaybeThrottle(request, 
alterShareGroupOffsetsRequest.getErrorResponse(exception))
+        } else {
+          requestHelper.sendMaybeThrottle(request, 
responseBuilder.merge(response).build())

Review Comment:
   I think there's a problem with the response building here.
   
   For topics which have an error, such as `TOPIC_AUTHORIZATION_FAILED` or 
`UNKNOWN_TOPIC_OR_PARTITION`, not setting the topic ID is correct. However, for 
topics which work, the response should contain the topic ID. In 
`GroupMetadataManager.completeAlterShareGroupOffsets`, the topic ID is added to 
the response data. However, then the call to `responseBuilder.merge` brings 
together the successful and failed parts of the response, and it does not seem 
to copy the topic ID into the consolidated response.
   
   It looks like there could be more validation of the response contents too.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java:
##########
@@ -776,6 +778,38 @@ public 
CoordinatorResult<DeleteShareGroupOffsetsResponseData, CoordinatorRecord>
         );
     }
 
+    /**
+     * Make the following checks to make sure the 
AlterShareGroupOffsetsRequest request is valid:
+     * 1. Checks whether the provided group is empty
+     * 2. Checks the requested topics are presented in the metadataImage
+     * 3. Checks the corresponding share partitions in 
AlterShareGroupOffsetsRequest are existing
+     *
+     * @param groupId - The group ID
+     * @param alterShareGroupOffsetsRequestData - The request data for 
AlterShareGroupOffsetsRequestData
+     * @return A Result containing a pair of AlterShareGroupOffsets 
InitializeShareGroupStateParameters
+     *         and a list of records to update the state machine.
+     */
+    public CoordinatorResult<Map.Entry<AlterShareGroupOffsetsResponseData, 
InitializeShareGroupStateParameters>, CoordinatorRecord> alterShareGroupOffsets(
+        String groupId,
+        AlterShareGroupOffsetsRequestData alterShareGroupOffsetsRequestData
+    ) {
+        List<CoordinatorRecord> records = new ArrayList<>();
+        ShareGroup group = groupMetadataManager.shareGroup(groupId);
+        group.validateOffsetsAlterable();
+
+        Map.Entry<AlterShareGroupOffsetsResponseData, 
InitializeShareGroupStateParameters> response = 
groupMetadataManager.completeAlterShareGroupOffsets(
+            groupId,
+            alterShareGroupOffsetsRequestData,
+            records
+        );
+        return new CoordinatorResult<>(
+            records,
+            response
+        );
+    }
+
+

Review Comment:
   nit: Excessive blank lines.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to