Copilot commented on code in PR #19978:
URL: https://github.com/apache/kafka/pull/19978#discussion_r2152001964


##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -2792,11 +2792,19 @@ class KafkaApis(val requestChannel: RequestChannel,
               if (responseData.status() == null) {
                 responseData.setStatus(new util.ArrayList());
               }
-              responseData.status().add(
-                new StreamsGroupHeartbeatResponseData.Status()
-                  
.setStatusCode(StreamsGroupHeartbeatResponse.Status.MISSING_INTERNAL_TOPICS.code())
-                  .setStatusDetail("Unauthorized to CREATE on topics " + 
createTopicUnauthorized.mkString(",") + ".")
-              )
+              val missingInternalTopicStatus =
+                responseData.status().stream().filter(x => x.statusCode() == 
StreamsGroupHeartbeatResponse.Status.MISSING_INTERNAL_TOPICS.code()).findFirst()
+              if (missingInternalTopicStatus.isPresent) {
+                missingInternalTopicStatus.get().setStatusDetail(
+                  missingInternalTopicStatus.get().statusDetail() + "; 
Unauthorized to CREATE on topics " + createTopicUnauthorized.mkString(",") + "."
+                )
+              } else {
+                responseData.status().add(
+                  new StreamsGroupHeartbeatResponseData.Status()
+                    
.setStatusCode(StreamsGroupHeartbeatResponse.Status.MISSING_INTERNAL_TOPICS.code())
+                    .setStatusDetail("Unauthorized to CREATE on topics " + 
createTopicUnauthorized.mkString(",") + ".")
+                )

Review Comment:
   The code always appends an 'Unauthorized to CREATE' detail when a 
missing-internal-topics status exists, even if `createTopicUnauthorized` is 
empty. This will leave a dangling message when all CREATE ACLs are present and 
break the corresponding test. Wrap the append logic in a check for nonEmpty 
`createTopicUnauthorized`.
   ```suggestion
                 if (createTopicUnauthorized.nonEmpty) {
                   if (missingInternalTopicStatus.isPresent) {
                     missingInternalTopicStatus.get().setStatusDetail(
                       missingInternalTopicStatus.get().statusDetail() + "; 
Unauthorized to CREATE on topics " + createTopicUnauthorized.mkString(",") + "."
                     )
                   } else {
                     responseData.status().add(
                       new StreamsGroupHeartbeatResponseData.Status()
                         
.setStatusCode(StreamsGroupHeartbeatResponse.Status.MISSING_INTERNAL_TOPICS.code())
                         .setStatusDetail("Unauthorized to CREATE on topics " + 
createTopicUnauthorized.mkString(",") + ".")
                     )
                   }
   ```



##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -2792,11 +2792,19 @@ class KafkaApis(val requestChannel: RequestChannel,
               if (responseData.status() == null) {
                 responseData.setStatus(new util.ArrayList());
               }
-              responseData.status().add(
-                new StreamsGroupHeartbeatResponseData.Status()
-                  
.setStatusCode(StreamsGroupHeartbeatResponse.Status.MISSING_INTERNAL_TOPICS.code())
-                  .setStatusDetail("Unauthorized to CREATE on topics " + 
createTopicUnauthorized.mkString(",") + ".")
-              )
+              val missingInternalTopicStatus =
+                responseData.status().stream().filter(x => x.statusCode() == 
StreamsGroupHeartbeatResponse.Status.MISSING_INTERNAL_TOPICS.code()).findFirst()
+              if (missingInternalTopicStatus.isPresent) {
+                missingInternalTopicStatus.get().setStatusDetail(
+                  missingInternalTopicStatus.get().statusDetail() + "; 
Unauthorized to CREATE on topics " + createTopicUnauthorized.mkString(",") + "."
+                )
+              } else {
+                responseData.status().add(
+                  new StreamsGroupHeartbeatResponseData.Status()
+                    
.setStatusCode(StreamsGroupHeartbeatResponse.Status.MISSING_INTERNAL_TOPICS.code())
+                    .setStatusDetail("Unauthorized to CREATE on topics " + 
createTopicUnauthorized.mkString(",") + ".")

Review Comment:
   [nitpick] Use `mkString(", ")` instead of `mkString(",")` to include a space 
after commas for readability in the statusDetail.
   ```suggestion
                     missingInternalTopicStatus.get().statusDetail() + "; 
Unauthorized to CREATE on topics " + createTopicUnauthorized.mkString(", ") + 
"."
                   )
                 } else {
                   responseData.status().add(
                     new StreamsGroupHeartbeatResponseData.Status()
                       
.setStatusCode(StreamsGroupHeartbeatResponse.Status.MISSING_INTERNAL_TOPICS.code())
                       .setStatusDetail("Unauthorized to CREATE on topics " + 
createTopicUnauthorized.mkString(", ") + ".")
   ```



##########
core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala:
##########
@@ -825,6 +828,48 @@ class AuthorizerIntegrationTest extends 
AbstractAuthorizerIntegrationTest {
     new 
AlterShareGroupOffsetsRequest.Builder(data).build(ApiKeys.ALTER_SHARE_GROUP_OFFSETS.latestVersion)
   }
 
+  private def streamsGroupHeartbeatRequest = new 
StreamsGroupHeartbeatRequest.Builder(

Review Comment:
   [nitpick] There are two overloaded methods named 
`streamsGroupHeartbeatRequest` (one with parameters and one without), which can 
be confusing. Consider renaming the no-arg version to 
`defaultStreamsGroupHeartbeatRequest` for clarity.
   ```suggestion
     private def defaultStreamsGroupHeartbeatRequest = new 
StreamsGroupHeartbeatRequest.Builder(
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to