This is an automated email from the ASF dual-hosted git repository. rsivaram pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 51f0f3e MINOR: additional check to follower fetch handling (#4433) 51f0f3e is described below commit 51f0f3ee792cf9352ce61afeca098c765cdad664 Author: Edoardo Comar <eco...@uk.ibm.com> AuthorDate: Thu Jan 18 19:14:49 2018 +0000 MINOR: additional check to follower fetch handling (#4433) add check to KafkaApis, add unit test specific to follower fetch developed with @mimaison Reviewers: Mickael Maison <mickael.mai...@gmail.com>, Ismael Juma <ism...@juma.me.uk>, Rajini Sivaram <rajinisiva...@googlemail.com> --- core/src/main/scala/kafka/server/KafkaApis.scala | 26 +++++++++++++--------- .../kafka/api/AuthorizerIntegrationTest.scala | 24 ++++++++++++++++++++ .../kafka/api/EndToEndAuthorizationTest.scala | 3 ++- 3 files changed, 42 insertions(+), 11 deletions(-) diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index b145d3c..b54a637 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -488,18 +488,24 @@ class KafkaApis(val requestChannel: RequestChannel, val nonExistingTopicResponseData = mutable.ArrayBuffer[(TopicPartition, FetchResponse.PartitionData)]() val authorizedRequestInfo = mutable.ArrayBuffer[(TopicPartition, FetchRequest.PartitionData)]() - for ((topicPartition, partitionData) <- fetchRequest.fetchData.asScala) { - if (!authorize(request.session, Read, new Resource(Topic, topicPartition.topic))) - unauthorizedTopicResponseData += topicPartition -> new FetchResponse.PartitionData(Errors.TOPIC_AUTHORIZATION_FAILED, + if (fetchRequest.isFromFollower() && !authorize(request.session, ClusterAction, Resource.ClusterResource)) + for (topicPartition <- fetchRequest.fetchData.asScala.keys) + unauthorizedTopicResponseData += topicPartition -> new FetchResponse.PartitionData(Errors.CLUSTER_AUTHORIZATION_FAILED, FetchResponse.INVALID_HIGHWATERMARK, FetchResponse.INVALID_LAST_STABLE_OFFSET, FetchResponse.INVALID_LOG_START_OFFSET, null, MemoryRecords.EMPTY) - else if (!metadataCache.contains(topicPartition.topic)) - nonExistingTopicResponseData += topicPartition -> new FetchResponse.PartitionData(Errors.UNKNOWN_TOPIC_OR_PARTITION, - FetchResponse.INVALID_HIGHWATERMARK, FetchResponse.INVALID_LAST_STABLE_OFFSET, - FetchResponse.INVALID_LOG_START_OFFSET, null, MemoryRecords.EMPTY) - else - authorizedRequestInfo += (topicPartition -> partitionData) - } + else + for ((topicPartition, partitionData) <- fetchRequest.fetchData.asScala) { + if (!authorize(request.session, Read, new Resource(Topic, topicPartition.topic))) + unauthorizedTopicResponseData += topicPartition -> new FetchResponse.PartitionData(Errors.TOPIC_AUTHORIZATION_FAILED, + FetchResponse.INVALID_HIGHWATERMARK, FetchResponse.INVALID_LAST_STABLE_OFFSET, + FetchResponse.INVALID_LOG_START_OFFSET, null, MemoryRecords.EMPTY) + else if (!metadataCache.contains(topicPartition.topic)) + nonExistingTopicResponseData += topicPartition -> new FetchResponse.PartitionData(Errors.UNKNOWN_TOPIC_OR_PARTITION, + FetchResponse.INVALID_HIGHWATERMARK, FetchResponse.INVALID_LAST_STABLE_OFFSET, + FetchResponse.INVALID_LOG_START_OFFSET, null, MemoryRecords.EMPTY) + else + authorizedRequestInfo += (topicPartition -> partitionData) + } def convertedPartitionData(tp: TopicPartition, data: FetchResponse.PartitionData) = { diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala index 4230225..39c1ea3 100644 --- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala @@ -273,6 +273,12 @@ class AuthorizerIntegrationTest extends BaseRequestTest { private def createFetchRequest = { val partitionMap = new util.LinkedHashMap[TopicPartition, requests.FetchRequest.PartitionData] partitionMap.put(tp, new requests.FetchRequest.PartitionData(0, 0, 100)) + requests.FetchRequest.Builder.forConsumer(100, Int.MaxValue, partitionMap).build() + } + + private def createFetchFollowerRequest = { + val partitionMap = new util.LinkedHashMap[TopicPartition, requests.FetchRequest.PartitionData] + partitionMap.put(tp, new requests.FetchRequest.PartitionData(0, 0, 100)) val version = ApiKeys.FETCH.latestVersion requests.FetchRequest.Builder.forReplica(version, 5000, 100, Int.MaxValue, partitionMap).build() } @@ -486,6 +492,24 @@ class AuthorizerIntegrationTest extends BaseRequestTest { } @Test + def testFetchFollowerRequest() { + val key = ApiKeys.FETCH + val request = createFetchFollowerRequest + + removeAllAcls() + val resources = Set(topicResource.resourceType, Resource.ClusterResource.resourceType) + sendRequestAndVerifyResponseError(key, request, resources, isAuthorized = false) + + val readAcls = topicReadAcl.get(topicResource).get + addAndVerifyAcls(readAcls, topicResource) + sendRequestAndVerifyResponseError(key, request, resources, isAuthorized = false) + + val clusterAcls = clusterAcl.get(Resource.ClusterResource).get + addAndVerifyAcls(clusterAcls, Resource.ClusterResource) + sendRequestAndVerifyResponseError(key, request, resources, isAuthorized = true) + } + + @Test def testProduceWithNoTopicAccess() { try { sendRecords(numRecords, tp) diff --git a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala index 1e93b37..4500d49 100644 --- a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala +++ b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala @@ -61,6 +61,7 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas override def configureSecurityBeforeServersStart() { AclCommand.main(clusterAclArgs) + AclCommand.main(topicBrokerReadAclArgs) } val numRecords = 1 @@ -156,8 +157,8 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas @Before override def setUp() { super.setUp() - AclCommand.main(topicBrokerReadAclArgs) servers.foreach { s => + TestUtils.waitAndVerifyAcls(ClusterActionAcl, s.apis.authorizer.get, Resource.ClusterResource) TestUtils.waitAndVerifyAcls(TopicBrokerReadAcl, s.apis.authorizer.get, new Resource(Topic, "*")) } // create the test topic with all the brokers as replicas -- To stop receiving notification emails like this one, please contact ['"commits@kafka.apache.org" <commits@kafka.apache.org>'].