AndrewJSchofield commented on code in PR #18929: URL: https://github.com/apache/kafka/pull/18929#discussion_r2101636242
########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java: ########## @@ -667,6 +670,56 @@ public void run() { )); } + // Visibility for testing + CompletableFuture<AlterShareGroupOffsetsResponseData> persisterInitialize( + InitializeShareGroupStateParameters request, + AlterShareGroupOffsetsResponseData response + ) { + return persister.initializeState(request) + .handle((result, exp) -> { + if (exp == null) { + if (result.errorCounts().isEmpty()) { + handlePersisterInitializeResponse(request.groupTopicPartitionData().groupId(), result, new ShareGroupHeartbeatResponseData()); + return response; + } else { + //TODO build new AlterShareGroupOffsetsResponseData for error response Review Comment: We can leave this for a following PR if you like. ########## core/src/main/scala/kafka/server/KafkaApis.scala: ########## @@ -3738,11 +3738,50 @@ class KafkaApis(val requestChannel: RequestChannel, def handleAlterShareGroupOffsetsRequest(request: RequestChannel.Request): CompletableFuture[Unit] = { val alterShareGroupOffsetsRequest = request.body[AlterShareGroupOffsetsRequest] + val groupId = alterShareGroupOffsetsRequest.data.groupId + if (!isShareGroupProtocolEnabled) { requestHelper.sendMaybeThrottle(request, alterShareGroupOffsetsRequest.getErrorResponse(AbstractResponse.DEFAULT_THROTTLE_TIME, Errors.UNSUPPORTED_VERSION.exception)) return CompletableFuture.completedFuture[Unit](()) + } else if (!authHelper.authorize(request.context, READ, GROUP, groupId)) { + requestHelper.sendMaybeThrottle(request, alterShareGroupOffsetsRequest.getErrorResponse(Errors.GROUP_AUTHORIZATION_FAILED.exception)) + } else { + val responseBuilder = new AlterShareGroupOffsetsResponse.Builder() + val authorizedTopicPartitions = new AlterShareGroupOffsetsRequestData.AlterShareGroupOffsetsRequestTopicCollection() + + alterShareGroupOffsetsRequest.data.topics.forEach(topic => { + val topicError = { + if (!authHelper.authorize(request.context, READ, TOPIC, topic.topicName())) { + Some(new ApiError(Errors.TOPIC_AUTHORIZATION_FAILED)) + } else if (!metadataCache.contains(topic.topicName())) { + Some(new ApiError(Errors.UNKNOWN_TOPIC_OR_PARTITION)) + } else { + None + } + } + topicError match { + case Some(error) => + topic.partitions().forEach(partition => responseBuilder.addPartition(topic.topicName(), partition.partitionIndex(), error.error)) + case None => + authorizedTopicPartitions.add(topic) + } + }) + + val data = new AlterShareGroupOffsetsRequestData() + .setGroupId(groupId) + .setTopics(authorizedTopicPartitions) + groupCoordinator.alterShareGroupOffsets( + request.context, + groupId, + data + ).handle[Unit] { (response, exception) => + if (exception != null) { + requestHelper.sendMaybeThrottle(request, alterShareGroupOffsetsRequest.getErrorResponse(exception)) + } else { + requestHelper.sendMaybeThrottle(request, responseBuilder.merge(response).build()) Review Comment: I think there's a problem with the response building here. For topics which have an error, such as `TOPIC_AUTHORIZATION_FAILED` or `UNKNOWN_TOPIC_OR_PARTITION`, not setting the topic ID is correct. However, for topics which work, the response should contain the topic ID. In `GroupMetadataManager.completeAlterShareGroupOffsets`, the topic ID is added to the response data. However, then the call to `responseBuilder.merge` brings together the successful and failed parts of the response, and it does not seem to copy the topic ID into the consolidated response. It looks like there could be more validation of the response contents too. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java: ########## @@ -776,6 +778,38 @@ public CoordinatorResult<DeleteShareGroupOffsetsResponseData, CoordinatorRecord> ); } + /** + * Make the following checks to make sure the AlterShareGroupOffsetsRequest request is valid: + * 1. Checks whether the provided group is empty + * 2. Checks the requested topics are presented in the metadataImage + * 3. Checks the corresponding share partitions in AlterShareGroupOffsetsRequest are existing + * + * @param groupId - The group ID + * @param alterShareGroupOffsetsRequestData - The request data for AlterShareGroupOffsetsRequestData + * @return A Result containing a pair of AlterShareGroupOffsets InitializeShareGroupStateParameters + * and a list of records to update the state machine. + */ + public CoordinatorResult<Map.Entry<AlterShareGroupOffsetsResponseData, InitializeShareGroupStateParameters>, CoordinatorRecord> alterShareGroupOffsets( + String groupId, + AlterShareGroupOffsetsRequestData alterShareGroupOffsetsRequestData + ) { + List<CoordinatorRecord> records = new ArrayList<>(); + ShareGroup group = groupMetadataManager.shareGroup(groupId); + group.validateOffsetsAlterable(); + + Map.Entry<AlterShareGroupOffsetsResponseData, InitializeShareGroupStateParameters> response = groupMetadataManager.completeAlterShareGroupOffsets( + groupId, + alterShareGroupOffsetsRequestData, + records + ); + return new CoordinatorResult<>( + records, + response + ); + } + + Review Comment: nit: Excessive blank lines. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org