This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 51f0f3e  MINOR: additional check to follower fetch handling  (#4433)
51f0f3e is described below

commit 51f0f3ee792cf9352ce61afeca098c765cdad664
Author: Edoardo Comar <eco...@uk.ibm.com>
AuthorDate: Thu Jan 18 19:14:49 2018 +0000

    MINOR: additional check to follower fetch handling  (#4433)
    
    add check to KafkaApis, add unit test specific to follower fetch
    developed with @mimaison
    
    Reviewers: Mickael Maison <mickael.mai...@gmail.com>, Ismael Juma 
<ism...@juma.me.uk>, Rajini Sivaram <rajinisiva...@googlemail.com>
---
 core/src/main/scala/kafka/server/KafkaApis.scala   | 26 +++++++++++++---------
 .../kafka/api/AuthorizerIntegrationTest.scala      | 24 ++++++++++++++++++++
 .../kafka/api/EndToEndAuthorizationTest.scala      |  3 ++-
 3 files changed, 42 insertions(+), 11 deletions(-)

diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala 
b/core/src/main/scala/kafka/server/KafkaApis.scala
index b145d3c..b54a637 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -488,18 +488,24 @@ class KafkaApis(val requestChannel: RequestChannel,
     val nonExistingTopicResponseData = mutable.ArrayBuffer[(TopicPartition, 
FetchResponse.PartitionData)]()
     val authorizedRequestInfo = mutable.ArrayBuffer[(TopicPartition, 
FetchRequest.PartitionData)]()
 
-    for ((topicPartition, partitionData) <- fetchRequest.fetchData.asScala) {
-      if (!authorize(request.session, Read, new Resource(Topic, 
topicPartition.topic)))
-        unauthorizedTopicResponseData += topicPartition -> new 
FetchResponse.PartitionData(Errors.TOPIC_AUTHORIZATION_FAILED,
+    if (fetchRequest.isFromFollower() && !authorize(request.session, 
ClusterAction, Resource.ClusterResource))    
+      for (topicPartition <- fetchRequest.fetchData.asScala.keys)
+        unauthorizedTopicResponseData += topicPartition -> new 
FetchResponse.PartitionData(Errors.CLUSTER_AUTHORIZATION_FAILED,
           FetchResponse.INVALID_HIGHWATERMARK, 
FetchResponse.INVALID_LAST_STABLE_OFFSET,
           FetchResponse.INVALID_LOG_START_OFFSET, null, MemoryRecords.EMPTY)
-      else if (!metadataCache.contains(topicPartition.topic))
-        nonExistingTopicResponseData += topicPartition -> new 
FetchResponse.PartitionData(Errors.UNKNOWN_TOPIC_OR_PARTITION,
-          FetchResponse.INVALID_HIGHWATERMARK, 
FetchResponse.INVALID_LAST_STABLE_OFFSET,
-          FetchResponse.INVALID_LOG_START_OFFSET, null, MemoryRecords.EMPTY)
-      else
-        authorizedRequestInfo += (topicPartition -> partitionData)
-    }
+    else
+      for ((topicPartition, partitionData) <- fetchRequest.fetchData.asScala) {
+        if (!authorize(request.session, Read, new Resource(Topic, 
topicPartition.topic)))
+          unauthorizedTopicResponseData += topicPartition -> new 
FetchResponse.PartitionData(Errors.TOPIC_AUTHORIZATION_FAILED,
+            FetchResponse.INVALID_HIGHWATERMARK, 
FetchResponse.INVALID_LAST_STABLE_OFFSET,
+            FetchResponse.INVALID_LOG_START_OFFSET, null, MemoryRecords.EMPTY)
+        else if (!metadataCache.contains(topicPartition.topic))
+          nonExistingTopicResponseData += topicPartition -> new 
FetchResponse.PartitionData(Errors.UNKNOWN_TOPIC_OR_PARTITION,
+            FetchResponse.INVALID_HIGHWATERMARK, 
FetchResponse.INVALID_LAST_STABLE_OFFSET,
+            FetchResponse.INVALID_LOG_START_OFFSET, null, MemoryRecords.EMPTY)
+        else
+          authorizedRequestInfo += (topicPartition -> partitionData)
+      }
 
     def convertedPartitionData(tp: TopicPartition, data: 
FetchResponse.PartitionData) = {
 
diff --git 
a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala 
b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index 4230225..39c1ea3 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -273,6 +273,12 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
   private def createFetchRequest = {
     val partitionMap = new util.LinkedHashMap[TopicPartition, 
requests.FetchRequest.PartitionData]
     partitionMap.put(tp, new requests.FetchRequest.PartitionData(0, 0, 100))
+    requests.FetchRequest.Builder.forConsumer(100, Int.MaxValue, 
partitionMap).build()
+  }
+
+  private def createFetchFollowerRequest = {
+    val partitionMap = new util.LinkedHashMap[TopicPartition, 
requests.FetchRequest.PartitionData]
+    partitionMap.put(tp, new requests.FetchRequest.PartitionData(0, 0, 100))
     val version = ApiKeys.FETCH.latestVersion
     requests.FetchRequest.Builder.forReplica(version, 5000, 100, Int.MaxValue, 
partitionMap).build()
   }
@@ -486,6 +492,24 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
   }
 
   @Test
+  def testFetchFollowerRequest() {
+    val key = ApiKeys.FETCH
+    val request = createFetchFollowerRequest
+      
+    removeAllAcls()
+    val resources = Set(topicResource.resourceType, 
Resource.ClusterResource.resourceType)
+    sendRequestAndVerifyResponseError(key, request, resources, isAuthorized = 
false)
+
+    val readAcls = topicReadAcl.get(topicResource).get
+    addAndVerifyAcls(readAcls, topicResource)
+    sendRequestAndVerifyResponseError(key, request, resources, isAuthorized = 
false)
+ 
+    val clusterAcls = clusterAcl.get(Resource.ClusterResource).get
+    addAndVerifyAcls(clusterAcls, Resource.ClusterResource)
+    sendRequestAndVerifyResponseError(key, request, resources, isAuthorized = 
true)
+  }
+
+  @Test
   def testProduceWithNoTopicAccess() {
     try {
       sendRecords(numRecords, tp)
diff --git 
a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala 
b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
index 1e93b37..4500d49 100644
--- a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
@@ -61,6 +61,7 @@ abstract class EndToEndAuthorizationTest extends 
IntegrationTestHarness with Sas
 
   override def configureSecurityBeforeServersStart() {
     AclCommand.main(clusterAclArgs)
+    AclCommand.main(topicBrokerReadAclArgs)
   }
 
   val numRecords = 1
@@ -156,8 +157,8 @@ abstract class EndToEndAuthorizationTest extends 
IntegrationTestHarness with Sas
   @Before
   override def setUp() {
     super.setUp()
-    AclCommand.main(topicBrokerReadAclArgs)
     servers.foreach { s =>
+      TestUtils.waitAndVerifyAcls(ClusterActionAcl, s.apis.authorizer.get, 
Resource.ClusterResource)
       TestUtils.waitAndVerifyAcls(TopicBrokerReadAcl, s.apis.authorizer.get, 
new Resource(Topic, "*"))
     }
     // create the test topic with all the brokers as replicas

-- 
To stop receiving notification emails like this one, please contact
['"commits@kafka.apache.org" <commits@kafka.apache.org>'].

Reply via email to