[ 
https://issues.apache.org/jira/browse/KAFKA-5638?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16577894#comment-16577894
 ] 

ASF GitHub Bot commented on KAFKA-5638:
---------------------------------------

hachikuji closed pull request #5352: KAFKA-5638: Improve the Required ACL of 
ListGroups API (KIP-231)
URL: https://github.com/apache/kafka/pull/5352
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala 
b/core/src/main/scala/kafka/server/KafkaApis.scala
index de9e0bf66ef..0273e3da557 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -53,6 +53,7 @@ import 
org.apache.kafka.common.requests.DescribeLogDirsResponse.LogDirInfo
 import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
 import org.apache.kafka.common.requests._
 import org.apache.kafka.common.resource.PatternType.LITERAL
+import org.apache.kafka.common.resource.ResourcePattern
 import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
 import org.apache.kafka.common.security.token.delegation.{DelegationToken, 
TokenInformation}
 import org.apache.kafka.common.utils.{Time, Utils}
@@ -63,7 +64,6 @@ import scala.collection._
 import scala.collection.mutable.ArrayBuffer
 import scala.util.{Failure, Success, Try}
 import org.apache.kafka.common.requests.CreateTopicsRequest.TopicDetails
-import org.apache.kafka.common.resource.ResourcePattern
 
 /**
  * Logic to handle the various Kafka requests
@@ -1238,14 +1238,15 @@ class KafkaApis(val requestChannel: RequestChannel,
   }
 
   def handleListGroupsRequest(request: RequestChannel.Request) {
-    if (!authorize(request.session, Describe, Resource.ClusterResource)) {
+    val (error, groups) = groupCoordinator.handleListGroups()
+    if (authorize(request.session, Describe, Resource.ClusterResource))
+      // With describe cluster access all groups are returned. We keep this 
alternative for backward compatibility.
       sendResponseMaybeThrottle(request, requestThrottleMs =>
-        request.body[ListGroupsRequest].getErrorResponse(requestThrottleMs, 
Errors.CLUSTER_AUTHORIZATION_FAILED.exception))
-    } else {
-      val (error, groups) = groupCoordinator.handleListGroups()
-      val allGroups = groups.map { group => new 
ListGroupsResponse.Group(group.groupId, group.protocolType) }
+        new ListGroupsResponse(requestThrottleMs, error, groups.map { group => 
new ListGroupsResponse.Group(group.groupId, group.protocolType) }.asJava))
+    else {
+      val filteredGroups = groups.filter(group => authorize(request.session, 
Describe, new Resource(Group, group.groupId, LITERAL)))
       sendResponseMaybeThrottle(request, requestThrottleMs =>
-        new ListGroupsResponse(requestThrottleMs, error, allGroups.asJava))
+        new ListGroupsResponse(requestThrottleMs, error, filteredGroups.map { 
group => new ListGroupsResponse.Group(group.groupId, group.protocolType) 
}.asJava))
     }
   }
 
diff --git 
a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala 
b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index dc9ca8568ea..faad071b909 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -1027,6 +1027,48 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
     consumerGroupService.close()
   }
 
+  @Test
+  def testListGroupApiWithAndWithoutListGroupAcls() {
+    // write some record to the topic
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, 
Write)), topicResource)
+    sendRecords(1, tp)
+
+    // use two consumers to write to two different groups
+    val group2 = "other group"
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, 
Read)), groupResource)
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, 
Read)), Resource(Group, group2, LITERAL))
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, 
Read)), topicResource)
+    this.consumers.head.subscribe(Collections.singleton(topic))
+    consumeRecords(this.consumers.head)
+
+    val otherConsumer = 
TestUtils.createConsumer(TestUtils.getBrokerListStrFromServers(servers), 
groupId = group2, securityProtocol = SecurityProtocol.PLAINTEXT)
+    otherConsumer.subscribe(Collections.singleton(topic))
+    consumeRecords(otherConsumer)
+
+    val adminClient = createAdminClient()
+
+    // first use cluster describe permission
+    removeAllAcls()
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, 
Describe)), Resource.ClusterResource)
+    // it should list both groups (due to cluster describe permission)
+    assertEquals(Set(group, group2), 
adminClient.listConsumerGroups().all().get().asScala.map(_.groupId()).toSet)
+
+    // now replace cluster describe with group read permission
+    removeAllAcls()
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, 
Read)), groupResource)
+    // it should list only one group now
+    val groupList = adminClient.listConsumerGroups().all().get().asScala.toList
+    assertEquals(1, groupList.length)
+    assertEquals(group, groupList.head.groupId)
+
+    // now remove all acls and verify describe group access is required to 
list any group
+    removeAllAcls()
+    val listGroupResult = adminClient.listConsumerGroups()
+    assertEquals(List(), listGroupResult.errors().get().asScala.toList)
+    assertEquals(List(), listGroupResult.all().get().asScala.toList)
+    otherConsumer.close()
+  }
+
   @Test
   def testDeleteGroupApiWithDeleteGroupAcl() {
     addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, 
Read)), groupResource)
diff --git a/docs/upgrade.html b/docs/upgrade.html
index 264e26d6b02..979190db7b7 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -33,12 +33,12 @@ <h4><a id="upgrade_2_1_0" href="#upgrade_2_1_0">Upgrading 
from 0.8.x, 0.9.x, 0.1
     <li>The default value for the producer's <code>retries</code> config was 
changed to <code>Integer.MAX_VALUE</code>, as we introduced 
<code>delivery.timeout.ms</code>
         in <a 
href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-91+Provide+Intuitive+User+Timeouts+in+The+Producer";>KIP-91</a>,
         which sets an upper bound on the total time between sending a record 
and receiving acknowledgement from the broker. By default,
-        the delivery timeout is set to 2 minutes.
-    </li>
+        the delivery timeout is set to 2 minutes.</li>
     <li>By default, MirrorMaker now overrides <code>delivery.timeout.ms</code> 
to <code>Integer.MAX_VALUE</code> when
         configuring the producer. If you have overridden the value of 
<code>retries</code> in order to fail faster,
-        you will instead need to override <code>delivery.timeout.ms</code>.
-    </li>
+        you will instead need to override 
<code>delivery.timeout.ms</code>.</li>
+    <li>The <code>ListGroup</code> API now expects, as a recommended 
alternative, <code>Describe Group</code> access to the groups a user should be 
able to list.
+        Even though the old <code>Describe Cluster</code> access is still 
supported for backward compatibility, using it for this API is not advised.</li>
 </ol>
 
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Inconsistency in consumer group related ACLs
> --------------------------------------------
>
>                 Key: KAFKA-5638
>                 URL: https://issues.apache.org/jira/browse/KAFKA-5638
>             Project: Kafka
>          Issue Type: Bug
>          Components: security
>    Affects Versions: 0.11.0.0, 1.0.0
>            Reporter: Vahid Hashemian
>            Assignee: Vahid Hashemian
>            Priority: Minor
>              Labels: kip
>             Fix For: 2.1.0
>
>
> Users can see all groups in the cluster (using consumer group’s {{--list}} 
> option) provided that they have {{Describe}} access to the cluster. It would 
> make more sense to modify that experience and limit what is listed in the 
> output to only those groups they have {{Describe}} access to. The reason is, 
> almost everything else is accessible by a user only if the access is 
> specifically granted (through ACL {{--add}}); and this scenario should not be 
> an exception. The potential change would be updating the minimum required 
> permission of {{ListGroup}} from {{Describe (Cluster)}} to {{Describe 
> (Group)}}.
> We can also look at this issue from a different angle: A user with {{Read}} 
> access to a group can describe the group, but the same user would not see 
> anything when listing groups (assuming there is no {{Describe}} access to the 
> cluster). It makes more sense for this user to be able to list all groups 
> s/he can already describe.
> It would be great to know if any user is relying on the existing behavior 
> (listing all consumer groups using a {{Describe (Cluster)}} ACL).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to