[
https://issues.apache.org/jira/browse/KAFKA-7287?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16595681#comment-16595681
]
ASF GitHub Bot commented on KAFKA-7287:
---------------------------------------
junrao closed pull request #5503: KAFKA-7287: Set open ACL permissions for old
consumer znode path
URL: https://github.com/apache/kafka/pull/5503
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/core/src/main/scala/kafka/zk/ZkData.scala
b/core/src/main/scala/kafka/zk/ZkData.scala
index f918b616024..760bd67299d 100644
--- a/core/src/main/scala/kafka/zk/ZkData.scala
+++ b/core/src/main/scala/kafka/zk/ZkData.scala
@@ -429,8 +429,13 @@ object PreferredReplicaElectionZNode {
}.map(_.toSet).getOrElse(Set.empty)
}
+//old consumer path znode
+object ConsumerPathZNode {
+ def path = "/consumers"
+}
+
object ConsumerOffset {
- def path(group: String, topic: String, partition: Integer) =
s"/consumers/${group}/offsets/${topic}/${partition}"
+ def path(group: String, topic: String, partition: Integer) =
s"${ConsumerPathZNode.path}/${group}/offsets/${topic}/${partition}"
def encode(offset: Long): Array[Byte] = offset.toString.getBytes(UTF_8)
def decode(bytes: Array[Byte]): Option[Long] = Option(bytes).map(new
String(_, UTF_8).toLong)
}
@@ -721,7 +726,7 @@ object ZkData {
// These are persistent ZK paths that should exist on kafka broker startup.
val PersistentZkPaths = Seq(
- "/consumers", // old consumer path
+ ConsumerPathZNode.path, // old consumer path
BrokerIdsZNode.path,
TopicsZNode.path,
ConfigEntityChangeNotificationZNode.path,
@@ -743,7 +748,8 @@ object ZkData {
}
def defaultAcls(isSecure: Boolean, path: String): Seq[ACL] = {
- if (isSecure) {
+ //Old Consumer path is kept open as different consumers will write under
this node.
+ if (!ConsumerPathZNode.path.equals(path) && isSecure) {
val acls = new ArrayBuffer[ACL]
acls ++= ZooDefs.Ids.CREATOR_ALL_ACL.asScala
if (!sensitivePath(path))
diff --git
a/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala
b/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala
index 19fa19dafbc..1cdbe4b2a0e 100644
--- a/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala
+++ b/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala
@@ -19,10 +19,10 @@ package kafka.security.auth
import kafka.admin.ZkSecurityMigrator
import kafka.utils.{CoreUtils, Logging, TestUtils, ZkUtils}
-import kafka.zk.ZooKeeperTestHarness
+import kafka.zk.{ConsumerPathZNode, ZooKeeperTestHarness}
import org.apache.kafka.common.KafkaException
import org.apache.kafka.common.security.JaasUtils
-import org.apache.zookeeper.data.ACL
+import org.apache.zookeeper.data.{ACL, Stat}
import org.junit.Assert._
import org.junit.{After, Before, Test}
@@ -304,4 +304,12 @@ class ZkAuthorizationTest extends ZooKeeperTestHarness
with Logging {
}
}
}
+
+ @Test
+ def testConsumerOffsetPathAcls(): Unit = {
+ zkClient.makeSurePersistentPathExists(ConsumerPathZNode.path)
+
+ val consumerPathAcls =
zkClient.currentZooKeeper.getACL(ConsumerPathZNode.path, new Stat())
+ assertTrue("old consumer znode path acls are not open",
consumerPathAcls.asScala.forall(TestUtils.isAclUnsecure))
+ }
}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> Set open ACL permissions for old consumer znode path
> ----------------------------------------------------
>
> Key: KAFKA-7287
> URL: https://issues.apache.org/jira/browse/KAFKA-7287
> Project: Kafka
> Issue Type: Bug
> Affects Versions: 1.1.0
> Reporter: Manikumar
> Assignee: Manikumar
> Priority: Major
>
> Old consumer znode path should have open ACL permissions in kerberized
> environment. This got missed in kafkaZkClient changes.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)