Repository: ranger
Updated Branches:
  refs/heads/ranger-1.0 5439720fd -> 36b1286e8


RANGER-1967: Upgrade to Kafka 1.0.0 with configurable JAAS configuration name - 
and fix the tests with introducing checks for topic availability


Project: http://git-wip-us.apache.org/repos/asf/ranger/repo
Commit: http://git-wip-us.apache.org/repos/asf/ranger/commit/81a6d5e4
Tree: http://git-wip-us.apache.org/repos/asf/ranger/tree/81a6d5e4
Diff: http://git-wip-us.apache.org/repos/asf/ranger/diff/81a6d5e4

Branch: refs/heads/ranger-1.0
Commit: 81a6d5e433d0f9a6080e7afb0cec541d2b0efdd2
Parents: fdce224
Author: Zsombor Gegesy <zsom...@apache.org>
Authored: Wed Feb 21 18:20:18 2018 +0100
Committer: Sailaja Polavarapu <spolavar...@hortonworks.com>
Committed: Mon Mar 5 16:55:12 2018 -0800

----------------------------------------------------------------------
 agents-audit/pom.xml                            |  2 +-
 plugin-kafka/pom.xml                            |  3 +-
 .../kafka/authorizer/RangerKafkaAuthorizer.java | 15 +++++-
 .../KafkaRangerAuthorizerGSSTest.java           | 54 +++++++++++++++++---
 .../KafkaRangerAuthorizerSASLSSLTest.java       |  5 +-
 .../authorizer/KafkaRangerAuthorizerTest.java   |  4 +-
 .../src/test/resources/log4j.properties         | 28 ++++++++++
 pom.xml                                         |  2 +-
 ranger-kafka-plugin-shim/pom.xml                |  2 +-
 9 files changed, 100 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ranger/blob/81a6d5e4/agents-audit/pom.xml
----------------------------------------------------------------------
diff --git a/agents-audit/pom.xml b/agents-audit/pom.xml
index ad9c558..0139ddc 100644
--- a/agents-audit/pom.xml
+++ b/agents-audit/pom.xml
@@ -69,7 +69,7 @@
         </dependency>
         <dependency>
             <groupId>org.apache.kafka</groupId>
-            <artifactId>kafka_2.10</artifactId>
+            <artifactId>kafka_${scala.binary.version}</artifactId>
             <version>${kafka.version}</version>
         </dependency>
         <dependency>

http://git-wip-us.apache.org/repos/asf/ranger/blob/81a6d5e4/plugin-kafka/pom.xml
----------------------------------------------------------------------
diff --git a/plugin-kafka/pom.xml b/plugin-kafka/pom.xml
index d851e8e..2bbd1b7 100644
--- a/plugin-kafka/pom.xml
+++ b/plugin-kafka/pom.xml
@@ -49,7 +49,7 @@
         </dependency>
         <dependency>
             <groupId>org.apache.kafka</groupId>
-            <artifactId>kafka_2.10</artifactId>
+            <artifactId>kafka_${scala.binary.version}</artifactId>
             <version>${kafka.version}</version>
         </dependency>
         <dependency>
@@ -97,6 +97,7 @@
                 <directory>src/test/resources</directory>
                 <includes>
                     <include>**/*.xml</include>
+                    <include>log4j.properties</include>
                 </includes>
                 <filtering>true</filtering>
             </testResource>

http://git-wip-us.apache.org/repos/asf/ranger/blob/81a6d5e4/plugin-kafka/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuthorizer.java
----------------------------------------------------------------------
diff --git 
a/plugin-kafka/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuthorizer.java
 
b/plugin-kafka/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuthorizer.java
index b3d5a74..630d1af 100644
--- 
a/plugin-kafka/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuthorizer.java
+++ 
b/plugin-kafka/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuthorizer.java
@@ -24,8 +24,11 @@ import java.util.Map;
 
 import javax.security.auth.Subject;
 
-import org.apache.kafka.common.network.LoginType;
+import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.common.security.JaasContext;
+import org.apache.kafka.common.security.JaasContext.Type;
 import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
 
 import kafka.security.auth.*;
 import kafka.network.RequestChannel.Session;
@@ -81,7 +84,15 @@ public class RangerKafkaAuthorizer implements Authorizer {
                                me = rangerPlugin;
                                if (me == null) {
                                        try {
-                                               LoginManager loginManager = 
LoginManager.acquireLoginManager(LoginType.SERVER, true, configs);
+                                               // Possible to override JAAS 
configuration which is used by Ranger, otherwise
+                                               // SASL_PLAINTEXT is used, 
which force Kafka to use 'sasl_plaintext.KafkaServer',
+                                               // if it's not defined, then it 
reverts to 'KafkaServer' configuration.
+                                               final Object jaasContext = 
configs.get("ranger.jaas.context");
+                                               final String listenerName = 
(jaasContext instanceof String
+                                                               && 
StringUtils.isNotEmpty((String) jaasContext)) ? (String) jaasContext
+                                                                               
: SecurityProtocol.SASL_PLAINTEXT.name();
+                                               JaasContext context = 
JaasContext.load(Type.SERVER, new ListenerName(listenerName), configs);
+                                               LoginManager loginManager = 
LoginManager.acquireLoginManager(context, true, configs);
                                                Subject subject = 
loginManager.subject();
                                                UserGroupInformation ugi = 
MiscUtil
                                                                
.createUGIFromSubject(subject);

http://git-wip-us.apache.org/repos/asf/ranger/blob/81a6d5e4/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaRangerAuthorizerGSSTest.java
----------------------------------------------------------------------
diff --git 
a/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaRangerAuthorizerGSSTest.java
 
b/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaRangerAuthorizerGSSTest.java
index 4ea39ed..23b9299 100644
--- 
a/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaRangerAuthorizerGSSTest.java
+++ 
b/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaRangerAuthorizerGSSTest.java
@@ -25,6 +25,7 @@ import java.nio.file.Files;
 import java.nio.file.Path;
 import java.util.Arrays;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.Future;
@@ -42,9 +43,12 @@ import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.PartitionInfo;
 import org.apache.kerby.kerberos.kerb.server.SimpleKdcServer;
 import org.junit.Assert;
 import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import kafka.admin.AdminUtils;
 import kafka.admin.RackAwareMode;
@@ -68,6 +72,7 @@ import kafka.utils.ZkUtils;
  * Authentication is done via Kerberos/GSS.
  */
 public class KafkaRangerAuthorizerGSSTest {
+    private final static Logger LOG = 
LoggerFactory.getLogger(KafkaRangerAuthorizerGSSTest.class);
 
     private static KafkaServerStartable kafkaServer;
     private static TestingServer zkServer;
@@ -112,6 +117,9 @@ public class KafkaRangerAuthorizerGSSTest {
 
         tempDir = Files.createTempDirectory("kafka");
 
+        LOG.info("Port is {}", port);
+        LOG.info("Temporary directory is at {}", tempDir);
+
         final Properties props = new Properties();
         props.put("broker.id", 1);
         props.put("host.name", "localhost");
@@ -126,6 +134,8 @@ public class KafkaRangerAuthorizerGSSTest {
         props.put("sasl.enabled.mechanisms", "GSSAPI");
         props.put("sasl.mechanism.inter.broker.protocol", "GSSAPI");
         props.put("sasl.kerberos.service.name", "kafka");
+        props.put("offsets.topic.replication.factor", (short) 1);
+        props.put("offsets.topic.num.partitions", 1);
 
         // Plug in Apache Ranger authorizer
         props.put("authorizer.class.name", 
"org.apache.ranger.authorization.kafka.authorizer.RangerKafkaAuthorizer");
@@ -194,7 +204,7 @@ public class KafkaRangerAuthorizerGSSTest {
 
     // The "public" group can write to and read from "test"
     @Test
-    public void testAuthorizedRead() throws Exception {
+    public void testAuthorizedRead() {
         // Create the Producer
         Properties producerProps = new Properties();
         producerProps.put("bootstrap.servers", "localhost:" + port);
@@ -210,7 +220,7 @@ public class KafkaRangerAuthorizerGSSTest {
         // Create the Consumer
         Properties consumerProps = new Properties();
         consumerProps.put("bootstrap.servers", "localhost:" + port);
-        consumerProps.put("group.id", "test");
+        consumerProps.put("group.id", "consumerTestGroup");
         consumerProps.put("enable.auto.commit", "true");
         consumerProps.put("auto.offset.reset", "earliest");
         consumerProps.put("auto.commit.interval.ms", "1000");
@@ -222,22 +232,23 @@ public class KafkaRangerAuthorizerGSSTest {
         consumerProps.put("sasl.kerberos.service.name", "kafka");
 
         final KafkaConsumer<String, String> consumer = new 
KafkaConsumer<>(consumerProps);
+        checkTopicExists(consumer);
+        LOG.info("Subscribing to 'test'");
         consumer.subscribe(Arrays.asList("test"));
 
-        // Send a message
-        producer.send(new ProducerRecord<String, String>("test", "somekey", 
"somevalue"));
-        producer.flush();
+        sendMessage(producer);
 
         // Poll until we consume it
-
         ConsumerRecord<String, String> record = null;
         for (int i = 0; i < 1000; i++) {
+            LOG.info("Waiting for messages {}. try", i);
             ConsumerRecords<String, String> records = consumer.poll(100);
             if (records.count() > 0) {
+                LOG.info("Found {} messages", records.count());
                 record = records.iterator().next();
                 break;
             }
-            Thread.sleep(1000);
+            sleep();
         }
 
         Assert.assertNotNull(record);
@@ -247,6 +258,35 @@ public class KafkaRangerAuthorizerGSSTest {
         consumer.close();
     }
 
+    private void checkTopicExists(final KafkaConsumer<String, String> 
consumer) {
+        Map<String, List<PartitionInfo>> topics = consumer.listTopics();
+        while (!topics.containsKey("test")) {
+            LOG.warn("Required topic is not available, only {} present", 
topics.keySet());
+            sleep();
+            topics = consumer.listTopics();
+        }
+        LOG.warn("Available topics: {}", topics.keySet());
+    }
+
+    private void sendMessage(final Producer<String, String> producer) {
+        // Send a message
+        try {
+            LOG.info("Send a message to 'test'");
+            producer.send(new ProducerRecord<String, String>("test", 
"somekey", "somevalue"));
+            producer.flush();
+        } catch (RuntimeException e) {
+            LOG.error("Unable to send message to topic 'test' ", e);
+        }
+    }
+
+    private void sleep() {
+        try {
+            Thread.sleep(1000);
+        } catch (InterruptedException e) {
+            LOG.info("Interrupted sleep, nothing important");
+        }
+    }
+
     // The "public" group can't write to "dev"
     @Test
     public void testUnauthorizedWrite() throws Exception {

http://git-wip-us.apache.org/repos/asf/ranger/blob/81a6d5e4/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaRangerAuthorizerSASLSSLTest.java
----------------------------------------------------------------------
diff --git 
a/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaRangerAuthorizerSASLSSLTest.java
 
b/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaRangerAuthorizerSASLSSLTest.java
index fb541cd..88a3e02 100644
--- 
a/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaRangerAuthorizerSASLSSLTest.java
+++ 
b/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaRangerAuthorizerSASLSSLTest.java
@@ -127,7 +127,10 @@ public class KafkaRangerAuthorizerSASLSSLTest {
         props.put("security.inter.broker.protocol", "SASL_SSL");
         props.put("sasl.enabled.mechanisms", "PLAIN");
         props.put("sasl.mechanism.inter.broker.protocol", "PLAIN");
-        
+
+        props.put("offsets.topic.replication.factor", (short) 1);
+        props.put("offsets.topic.num.partitions", 1);
+
         props.put("ssl.keystore.location", serviceKeystorePath);
         props.put("ssl.keystore.password", "sspass");
         props.put("ssl.key.password", "skpass");

http://git-wip-us.apache.org/repos/asf/ranger/blob/81a6d5e4/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaRangerAuthorizerTest.java
----------------------------------------------------------------------
diff --git 
a/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaRangerAuthorizerTest.java
 
b/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaRangerAuthorizerTest.java
index fb0a2c0..bccdb80 100644
--- 
a/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaRangerAuthorizerTest.java
+++ 
b/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaRangerAuthorizerTest.java
@@ -128,7 +128,9 @@ public class KafkaRangerAuthorizerTest {
         props.put("ssl.truststore.password", "security");
         props.put("security.inter.broker.protocol", "SSL");
         props.put("ssl.client.auth", "required");
-        
+        props.put("offsets.topic.replication.factor", (short) 1);
+        props.put("offsets.topic.num.partitions", 1);
+
         // Plug in Apache Ranger authorizer
         props.put("authorizer.class.name", 
"org.apache.ranger.authorization.kafka.authorizer.RangerKafkaAuthorizer");
         

http://git-wip-us.apache.org/repos/asf/ranger/blob/81a6d5e4/plugin-kafka/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/plugin-kafka/src/test/resources/log4j.properties 
b/plugin-kafka/src/test/resources/log4j.properties
new file mode 100644
index 0000000..4ad14de
--- /dev/null
+++ b/plugin-kafka/src/test/resources/log4j.properties
@@ -0,0 +1,28 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Root logger option
+log4j.rootLogger=INFO, stdout
+log4j.logger.kafka=WARN
+log4j.logger.org.apache.zookeeper=WARN
+
+# Direct log messages to stdout
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.Target=System.out
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p 
%c{10}:%L - %m%n

http://git-wip-us.apache.org/repos/asf/ranger/blob/81a6d5e4/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 3be50f2..38df285 100644
--- a/pom.xml
+++ b/pom.xml
@@ -179,7 +179,7 @@
         <json4s.version>3.2.11</json4s.version>
         <jsr305.version>1.3.9</jsr305.version>
         <junit.version>4.12</junit.version>
-        <kafka.version>0.10.0.0</kafka.version>
+        <kafka.version>1.0.0</kafka.version>
         <kerby.version>1.0.0</kerby.version>
         <knox.gateway.version>1.0.0</knox.gateway.version>
         <libpam4j.version>1.8</libpam4j.version>

http://git-wip-us.apache.org/repos/asf/ranger/blob/81a6d5e4/ranger-kafka-plugin-shim/pom.xml
----------------------------------------------------------------------
diff --git a/ranger-kafka-plugin-shim/pom.xml b/ranger-kafka-plugin-shim/pom.xml
index 9620632..24702f8 100644
--- a/ranger-kafka-plugin-shim/pom.xml
+++ b/ranger-kafka-plugin-shim/pom.xml
@@ -54,7 +54,7 @@
         </dependency>
         <dependency>
             <groupId>org.apache.kafka</groupId>
-            <artifactId>kafka_2.10</artifactId>
+            <artifactId>kafka_${scala.binary.version}</artifactId>
             <version>${kafka.version}</version>
         </dependency>
     </dependencies>

Reply via email to