Repository: ranger Updated Branches: refs/heads/ranger-1.0 5439720fd -> 36b1286e8
RANGER-1967: Upgrade to Kafka 1.0.0 with configurable JAAS configuration name - and fix the tests with introducing checks for topic availability Project: http://git-wip-us.apache.org/repos/asf/ranger/repo Commit: http://git-wip-us.apache.org/repos/asf/ranger/commit/81a6d5e4 Tree: http://git-wip-us.apache.org/repos/asf/ranger/tree/81a6d5e4 Diff: http://git-wip-us.apache.org/repos/asf/ranger/diff/81a6d5e4 Branch: refs/heads/ranger-1.0 Commit: 81a6d5e433d0f9a6080e7afb0cec541d2b0efdd2 Parents: fdce224 Author: Zsombor Gegesy <zsom...@apache.org> Authored: Wed Feb 21 18:20:18 2018 +0100 Committer: Sailaja Polavarapu <spolavar...@hortonworks.com> Committed: Mon Mar 5 16:55:12 2018 -0800 ---------------------------------------------------------------------- agents-audit/pom.xml | 2 +- plugin-kafka/pom.xml | 3 +- .../kafka/authorizer/RangerKafkaAuthorizer.java | 15 +++++- .../KafkaRangerAuthorizerGSSTest.java | 54 +++++++++++++++++--- .../KafkaRangerAuthorizerSASLSSLTest.java | 5 +- .../authorizer/KafkaRangerAuthorizerTest.java | 4 +- .../src/test/resources/log4j.properties | 28 ++++++++++ pom.xml | 2 +- ranger-kafka-plugin-shim/pom.xml | 2 +- 9 files changed, 100 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ranger/blob/81a6d5e4/agents-audit/pom.xml ---------------------------------------------------------------------- diff --git a/agents-audit/pom.xml b/agents-audit/pom.xml index ad9c558..0139ddc 100644 --- a/agents-audit/pom.xml +++ b/agents-audit/pom.xml @@ -69,7 +69,7 @@ </dependency> <dependency> <groupId>org.apache.kafka</groupId> - <artifactId>kafka_2.10</artifactId> + <artifactId>kafka_${scala.binary.version}</artifactId> <version>${kafka.version}</version> </dependency> <dependency> http://git-wip-us.apache.org/repos/asf/ranger/blob/81a6d5e4/plugin-kafka/pom.xml ---------------------------------------------------------------------- diff --git a/plugin-kafka/pom.xml b/plugin-kafka/pom.xml index d851e8e..2bbd1b7 100644 --- a/plugin-kafka/pom.xml +++ b/plugin-kafka/pom.xml @@ -49,7 +49,7 @@ </dependency> <dependency> <groupId>org.apache.kafka</groupId> - <artifactId>kafka_2.10</artifactId> + <artifactId>kafka_${scala.binary.version}</artifactId> <version>${kafka.version}</version> </dependency> <dependency> @@ -97,6 +97,7 @@ <directory>src/test/resources</directory> <includes> <include>**/*.xml</include> + <include>log4j.properties</include> </includes> <filtering>true</filtering> </testResource> http://git-wip-us.apache.org/repos/asf/ranger/blob/81a6d5e4/plugin-kafka/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuthorizer.java ---------------------------------------------------------------------- diff --git a/plugin-kafka/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuthorizer.java b/plugin-kafka/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuthorizer.java index b3d5a74..630d1af 100644 --- a/plugin-kafka/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuthorizer.java +++ b/plugin-kafka/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuthorizer.java @@ -24,8 +24,11 @@ import java.util.Map; import javax.security.auth.Subject; -import org.apache.kafka.common.network.LoginType; +import org.apache.kafka.common.network.ListenerName; +import org.apache.kafka.common.security.JaasContext; +import org.apache.kafka.common.security.JaasContext.Type; import org.apache.kafka.common.security.auth.KafkaPrincipal; +import org.apache.kafka.common.security.auth.SecurityProtocol; import kafka.security.auth.*; import kafka.network.RequestChannel.Session; @@ -81,7 +84,15 @@ public class RangerKafkaAuthorizer implements Authorizer { me = rangerPlugin; if (me == null) { try { - LoginManager loginManager = LoginManager.acquireLoginManager(LoginType.SERVER, true, configs); + // Possible to override JAAS configuration which is used by Ranger, otherwise + // SASL_PLAINTEXT is used, which force Kafka to use 'sasl_plaintext.KafkaServer', + // if it's not defined, then it reverts to 'KafkaServer' configuration. + final Object jaasContext = configs.get("ranger.jaas.context"); + final String listenerName = (jaasContext instanceof String + && StringUtils.isNotEmpty((String) jaasContext)) ? (String) jaasContext + : SecurityProtocol.SASL_PLAINTEXT.name(); + JaasContext context = JaasContext.load(Type.SERVER, new ListenerName(listenerName), configs); + LoginManager loginManager = LoginManager.acquireLoginManager(context, true, configs); Subject subject = loginManager.subject(); UserGroupInformation ugi = MiscUtil .createUGIFromSubject(subject); http://git-wip-us.apache.org/repos/asf/ranger/blob/81a6d5e4/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaRangerAuthorizerGSSTest.java ---------------------------------------------------------------------- diff --git a/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaRangerAuthorizerGSSTest.java b/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaRangerAuthorizerGSSTest.java index 4ea39ed..23b9299 100644 --- a/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaRangerAuthorizerGSSTest.java +++ b/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaRangerAuthorizerGSSTest.java @@ -25,6 +25,7 @@ import java.nio.file.Files; import java.nio.file.Path; import java.util.Arrays; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Properties; import java.util.concurrent.Future; @@ -42,9 +43,12 @@ import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.PartitionInfo; import org.apache.kerby.kerberos.kerb.server.SimpleKdcServer; import org.junit.Assert; import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import kafka.admin.AdminUtils; import kafka.admin.RackAwareMode; @@ -68,6 +72,7 @@ import kafka.utils.ZkUtils; * Authentication is done via Kerberos/GSS. */ public class KafkaRangerAuthorizerGSSTest { + private final static Logger LOG = LoggerFactory.getLogger(KafkaRangerAuthorizerGSSTest.class); private static KafkaServerStartable kafkaServer; private static TestingServer zkServer; @@ -112,6 +117,9 @@ public class KafkaRangerAuthorizerGSSTest { tempDir = Files.createTempDirectory("kafka"); + LOG.info("Port is {}", port); + LOG.info("Temporary directory is at {}", tempDir); + final Properties props = new Properties(); props.put("broker.id", 1); props.put("host.name", "localhost"); @@ -126,6 +134,8 @@ public class KafkaRangerAuthorizerGSSTest { props.put("sasl.enabled.mechanisms", "GSSAPI"); props.put("sasl.mechanism.inter.broker.protocol", "GSSAPI"); props.put("sasl.kerberos.service.name", "kafka"); + props.put("offsets.topic.replication.factor", (short) 1); + props.put("offsets.topic.num.partitions", 1); // Plug in Apache Ranger authorizer props.put("authorizer.class.name", "org.apache.ranger.authorization.kafka.authorizer.RangerKafkaAuthorizer"); @@ -194,7 +204,7 @@ public class KafkaRangerAuthorizerGSSTest { // The "public" group can write to and read from "test" @Test - public void testAuthorizedRead() throws Exception { + public void testAuthorizedRead() { // Create the Producer Properties producerProps = new Properties(); producerProps.put("bootstrap.servers", "localhost:" + port); @@ -210,7 +220,7 @@ public class KafkaRangerAuthorizerGSSTest { // Create the Consumer Properties consumerProps = new Properties(); consumerProps.put("bootstrap.servers", "localhost:" + port); - consumerProps.put("group.id", "test"); + consumerProps.put("group.id", "consumerTestGroup"); consumerProps.put("enable.auto.commit", "true"); consumerProps.put("auto.offset.reset", "earliest"); consumerProps.put("auto.commit.interval.ms", "1000"); @@ -222,22 +232,23 @@ public class KafkaRangerAuthorizerGSSTest { consumerProps.put("sasl.kerberos.service.name", "kafka"); final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps); + checkTopicExists(consumer); + LOG.info("Subscribing to 'test'"); consumer.subscribe(Arrays.asList("test")); - // Send a message - producer.send(new ProducerRecord<String, String>("test", "somekey", "somevalue")); - producer.flush(); + sendMessage(producer); // Poll until we consume it - ConsumerRecord<String, String> record = null; for (int i = 0; i < 1000; i++) { + LOG.info("Waiting for messages {}. try", i); ConsumerRecords<String, String> records = consumer.poll(100); if (records.count() > 0) { + LOG.info("Found {} messages", records.count()); record = records.iterator().next(); break; } - Thread.sleep(1000); + sleep(); } Assert.assertNotNull(record); @@ -247,6 +258,35 @@ public class KafkaRangerAuthorizerGSSTest { consumer.close(); } + private void checkTopicExists(final KafkaConsumer<String, String> consumer) { + Map<String, List<PartitionInfo>> topics = consumer.listTopics(); + while (!topics.containsKey("test")) { + LOG.warn("Required topic is not available, only {} present", topics.keySet()); + sleep(); + topics = consumer.listTopics(); + } + LOG.warn("Available topics: {}", topics.keySet()); + } + + private void sendMessage(final Producer<String, String> producer) { + // Send a message + try { + LOG.info("Send a message to 'test'"); + producer.send(new ProducerRecord<String, String>("test", "somekey", "somevalue")); + producer.flush(); + } catch (RuntimeException e) { + LOG.error("Unable to send message to topic 'test' ", e); + } + } + + private void sleep() { + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + LOG.info("Interrupted sleep, nothing important"); + } + } + // The "public" group can't write to "dev" @Test public void testUnauthorizedWrite() throws Exception { http://git-wip-us.apache.org/repos/asf/ranger/blob/81a6d5e4/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaRangerAuthorizerSASLSSLTest.java ---------------------------------------------------------------------- diff --git a/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaRangerAuthorizerSASLSSLTest.java b/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaRangerAuthorizerSASLSSLTest.java index fb541cd..88a3e02 100644 --- a/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaRangerAuthorizerSASLSSLTest.java +++ b/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaRangerAuthorizerSASLSSLTest.java @@ -127,7 +127,10 @@ public class KafkaRangerAuthorizerSASLSSLTest { props.put("security.inter.broker.protocol", "SASL_SSL"); props.put("sasl.enabled.mechanisms", "PLAIN"); props.put("sasl.mechanism.inter.broker.protocol", "PLAIN"); - + + props.put("offsets.topic.replication.factor", (short) 1); + props.put("offsets.topic.num.partitions", 1); + props.put("ssl.keystore.location", serviceKeystorePath); props.put("ssl.keystore.password", "sspass"); props.put("ssl.key.password", "skpass"); http://git-wip-us.apache.org/repos/asf/ranger/blob/81a6d5e4/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaRangerAuthorizerTest.java ---------------------------------------------------------------------- diff --git a/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaRangerAuthorizerTest.java b/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaRangerAuthorizerTest.java index fb0a2c0..bccdb80 100644 --- a/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaRangerAuthorizerTest.java +++ b/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaRangerAuthorizerTest.java @@ -128,7 +128,9 @@ public class KafkaRangerAuthorizerTest { props.put("ssl.truststore.password", "security"); props.put("security.inter.broker.protocol", "SSL"); props.put("ssl.client.auth", "required"); - + props.put("offsets.topic.replication.factor", (short) 1); + props.put("offsets.topic.num.partitions", 1); + // Plug in Apache Ranger authorizer props.put("authorizer.class.name", "org.apache.ranger.authorization.kafka.authorizer.RangerKafkaAuthorizer"); http://git-wip-us.apache.org/repos/asf/ranger/blob/81a6d5e4/plugin-kafka/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/plugin-kafka/src/test/resources/log4j.properties b/plugin-kafka/src/test/resources/log4j.properties new file mode 100644 index 0000000..4ad14de --- /dev/null +++ b/plugin-kafka/src/test/resources/log4j.properties @@ -0,0 +1,28 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# Root logger option +log4j.rootLogger=INFO, stdout +log4j.logger.kafka=WARN +log4j.logger.org.apache.zookeeper=WARN + +# Direct log messages to stdout +log4j.appender.stdout=org.apache.log4j.ConsoleAppender +log4j.appender.stdout.Target=System.out +log4j.appender.stdout.layout=org.apache.log4j.PatternLayout +log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{10}:%L - %m%n http://git-wip-us.apache.org/repos/asf/ranger/blob/81a6d5e4/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 3be50f2..38df285 100644 --- a/pom.xml +++ b/pom.xml @@ -179,7 +179,7 @@ <json4s.version>3.2.11</json4s.version> <jsr305.version>1.3.9</jsr305.version> <junit.version>4.12</junit.version> - <kafka.version>0.10.0.0</kafka.version> + <kafka.version>1.0.0</kafka.version> <kerby.version>1.0.0</kerby.version> <knox.gateway.version>1.0.0</knox.gateway.version> <libpam4j.version>1.8</libpam4j.version> http://git-wip-us.apache.org/repos/asf/ranger/blob/81a6d5e4/ranger-kafka-plugin-shim/pom.xml ---------------------------------------------------------------------- diff --git a/ranger-kafka-plugin-shim/pom.xml b/ranger-kafka-plugin-shim/pom.xml index 9620632..24702f8 100644 --- a/ranger-kafka-plugin-shim/pom.xml +++ b/ranger-kafka-plugin-shim/pom.xml @@ -54,7 +54,7 @@ </dependency> <dependency> <groupId>org.apache.kafka</groupId> - <artifactId>kafka_2.10</artifactId> + <artifactId>kafka_${scala.binary.version}</artifactId> <version>${kafka.version}</version> </dependency> </dependencies>