AndrewJSchofield commented on code in PR #18929:
URL: https://github.com/apache/kafka/pull/18929#discussion_r2083622116


##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -3620,7 +3620,50 @@ class KafkaApis(val requestChannel: RequestChannel,
 
   def handleAlterShareGroupOffsetsRequest(request: RequestChannel.Request): 
Unit = {
     val alterShareGroupOffsetsRequest = 
request.body[AlterShareGroupOffsetsRequest]
-    requestHelper.sendMaybeThrottle(request, 
alterShareGroupOffsetsRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
+    val groupId = alterShareGroupOffsetsRequest.data.groupId
+
+    if (!isShareGroupProtocolEnabled) {
+      requestHelper.sendMaybeThrottle(request, 
alterShareGroupOffsetsRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
+    } else if (!authHelper.authorize(request.context, READ, GROUP, groupId)) {
+      requestHelper.sendMaybeThrottle(request, 
alterShareGroupOffsetsRequest.getErrorResponse(Errors.GROUP_AUTHORIZATION_FAILED.exception))
+    } else {
+      val responseBuilder = new AlterShareGroupOffsetsResponse.Builder()
+      val authorizedTopicPartitions = new 
AlterShareGroupOffsetsRequestData.AlterShareGroupOffsetsRequestTopicCollection()
+
+      alterShareGroupOffsetsRequest.data.topics.forEach(topic => {
+        val invalidTopicError = checkValidTopic(topic.topicName())
+        val topicError = invalidTopicError.orElse {
+          if (!authHelper.authorize(request.context, READ, TOPIC, 
topic.topicName())) {
+            Some(new ApiError(Errors.TOPIC_AUTHORIZATION_FAILED))
+          } else if (!metadataCache.contains(topic.topicName())) {
+            Some(new ApiError(Errors.UNKNOWN_TOPIC_OR_PARTITION))
+          } else {
+            None
+          }
+        }
+        topicError match {
+          case Some(error) =>
+            topic.partitions().forEach(partition => 
responseBuilder.addPartition(topic.topicName(), partition.partitionIndex(), 
error.error))
+          case None =>
+            authorizedTopicPartitions.add(topic)
+        }
+      })
+
+      val data = new AlterShareGroupOffsetsRequestData()
+        .setGroupId(groupId)
+        .setTopics(authorizedTopicPartitions)
+      groupCoordinator.alterShareGroupOffsets(
+        request.context,
+        groupId,
+        data
+      ).handle[Unit] { (response, exception) =>
+        if (exception != null) {
+          requestHelper.sendMaybeThrottle(request, 
alterShareGroupOffsetsRequest.getErrorResponse(exception))
+        } else {
+          requestHelper.sendMaybeThrottle(request, 
responseBuilder.merge(response).build())
+        }
+      }
+    }
     CompletableFuture.completedFuture[Unit](())

Review Comment:
   There is no good reason. In KafkaApis.scale, some handle methods return 
`CompletableFuture` while others are just `Unit`. It would be nice if they were 
all the same (and even better if it was Java, but that's for another day).



##########
clients/src/main/java/org/apache/kafka/common/requests/AlterShareGroupOffsetsRequest.java:
##########
@@ -65,7 +65,8 @@ public AbstractResponse getErrorResponse(int throttleTimeMs, 
Throwable e) {
                 .setPartitions(topicResult.partitions().stream()
                     .map(partitionData -> new 
AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponsePartition()
                         .setPartitionIndex(partitionData.partitionIndex())
-                        .setErrorCode(Errors.forException(e).code()))
+                        .setErrorCode(Errors.forException(e).code())

Review Comment:
   `Errors.forException()` scans the list of Errors matching on exception 
class. I would prefer this to be done just once, such as assigning a local 
Errors variable and then using it to set the error code and message on each 
look iteration.



##########
clients/src/main/java/org/apache/kafka/common/requests/AlterShareGroupOffsetsRequest.java:
##########
@@ -78,6 +79,25 @@ public static AlterShareGroupOffsetsRequest parse(Readable 
readable, short versi
         );
     }
 
+    public static 
AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponseTopic 
getErrorAlterShareGroup(
+        Errors error
+    ) {
+        return new 
AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponseTopic()

Review Comment:
   In `DeleteShareGroupOffsetsResponse`, there are a top-level error code and 
message. These are missing in `AlterShareGroupOffsetResponse`. It would 
probably be best to add them, rather than setting an empty topic name and -1 
partition index like this. wdyt?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to