AndrewJSchofield commented on code in PR #18929: URL: https://github.com/apache/kafka/pull/18929#discussion_r2083622116
########## core/src/main/scala/kafka/server/KafkaApis.scala: ########## @@ -3620,7 +3620,50 @@ class KafkaApis(val requestChannel: RequestChannel, def handleAlterShareGroupOffsetsRequest(request: RequestChannel.Request): Unit = { val alterShareGroupOffsetsRequest = request.body[AlterShareGroupOffsetsRequest] - requestHelper.sendMaybeThrottle(request, alterShareGroupOffsetsRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception)) + val groupId = alterShareGroupOffsetsRequest.data.groupId + + if (!isShareGroupProtocolEnabled) { + requestHelper.sendMaybeThrottle(request, alterShareGroupOffsetsRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception)) + } else if (!authHelper.authorize(request.context, READ, GROUP, groupId)) { + requestHelper.sendMaybeThrottle(request, alterShareGroupOffsetsRequest.getErrorResponse(Errors.GROUP_AUTHORIZATION_FAILED.exception)) + } else { + val responseBuilder = new AlterShareGroupOffsetsResponse.Builder() + val authorizedTopicPartitions = new AlterShareGroupOffsetsRequestData.AlterShareGroupOffsetsRequestTopicCollection() + + alterShareGroupOffsetsRequest.data.topics.forEach(topic => { + val invalidTopicError = checkValidTopic(topic.topicName()) + val topicError = invalidTopicError.orElse { + if (!authHelper.authorize(request.context, READ, TOPIC, topic.topicName())) { + Some(new ApiError(Errors.TOPIC_AUTHORIZATION_FAILED)) + } else if (!metadataCache.contains(topic.topicName())) { + Some(new ApiError(Errors.UNKNOWN_TOPIC_OR_PARTITION)) + } else { + None + } + } + topicError match { + case Some(error) => + topic.partitions().forEach(partition => responseBuilder.addPartition(topic.topicName(), partition.partitionIndex(), error.error)) + case None => + authorizedTopicPartitions.add(topic) + } + }) + + val data = new AlterShareGroupOffsetsRequestData() + .setGroupId(groupId) + .setTopics(authorizedTopicPartitions) + groupCoordinator.alterShareGroupOffsets( + request.context, + groupId, + data + ).handle[Unit] { (response, exception) => + if (exception != null) { + requestHelper.sendMaybeThrottle(request, alterShareGroupOffsetsRequest.getErrorResponse(exception)) + } else { + requestHelper.sendMaybeThrottle(request, responseBuilder.merge(response).build()) + } + } + } CompletableFuture.completedFuture[Unit](()) Review Comment: There is no good reason. In KafkaApis.scale, some handle methods return `CompletableFuture` while others are just `Unit`. It would be nice if they were all the same (and even better if it was Java, but that's for another day). ########## clients/src/main/java/org/apache/kafka/common/requests/AlterShareGroupOffsetsRequest.java: ########## @@ -65,7 +65,8 @@ public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) { .setPartitions(topicResult.partitions().stream() .map(partitionData -> new AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponsePartition() .setPartitionIndex(partitionData.partitionIndex()) - .setErrorCode(Errors.forException(e).code())) + .setErrorCode(Errors.forException(e).code()) Review Comment: `Errors.forException()` scans the list of Errors matching on exception class. I would prefer this to be done just once, such as assigning a local Errors variable and then using it to set the error code and message on each look iteration. ########## clients/src/main/java/org/apache/kafka/common/requests/AlterShareGroupOffsetsRequest.java: ########## @@ -78,6 +79,25 @@ public static AlterShareGroupOffsetsRequest parse(Readable readable, short versi ); } + public static AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponseTopic getErrorAlterShareGroup( + Errors error + ) { + return new AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponseTopic() Review Comment: In `DeleteShareGroupOffsetsResponse`, there are a top-level error code and message. These are missing in `AlterShareGroupOffsetResponse`. It would probably be best to add them, rather than setting an empty topic name and -1 partition index like this. wdyt? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org