This is an automated email from the ASF dual-hosted git repository.

himanshug pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git


The following commit(s) were added to refs/heads/master by this push:
     new ab7b479  Securing passwords used for SSL connections to Kafka (#6285)
ab7b479 is described below

commit ab7b4798cc2747bb68550b67f69660965a276dc8
Author: Atul Mohan <atulmohan....@gmail.com>
AuthorDate: Thu Oct 11 12:03:01 2018 -0500

    Securing passwords used for SSL connections to Kafka (#6285)
    
    * Secure credentials in consumer properties
    
    * Merge master
    
    * Refactor property population into separate method
    
    * Fix property setter
    
    * Fix tests
---
 .../development/extensions-core/kafka-ingestion.md |  2 +-
 .../apache/druid/indexing/kafka/KafkaIOConfig.java |  6 ++--
 .../druid/indexing/kafka/KafkaIndexTask.java       | 32 ++++++++++++++++++----
 .../indexing/kafka/supervisor/KafkaSupervisor.java |  7 +++--
 .../kafka/supervisor/KafkaSupervisorIOConfig.java  |  9 ++++--
 .../druid/indexing/kafka/KafkaIndexTaskTest.java   | 12 ++++----
 .../supervisor/KafkaSupervisorIOConfigTest.java    | 28 +++++++++++++++++++
 .../kafka/supervisor/KafkaSupervisorTest.java      |  5 ++--
 .../druid/indexing/kafka/test/TestBroker.java      |  4 +--
 9 files changed, 81 insertions(+), 24 deletions(-)

diff --git a/docs/content/development/extensions-core/kafka-ingestion.md 
b/docs/content/development/extensions-core/kafka-ingestion.md
index 568fc94..12bd5f6 100644
--- a/docs/content/development/extensions-core/kafka-ingestion.md
+++ b/docs/content/development/extensions-core/kafka-ingestion.md
@@ -166,7 +166,7 @@ For Roaring bitmaps:
 |Field|Type|Description|Required|
 |-----|----|-----------|--------|
 |`topic`|String|The Kafka topic to read from. This must be a specific topic as 
topic patterns are not supported.|yes|
-|`consumerProperties`|Map<String, String>|A map of properties to be passed to 
the Kafka consumer. This must contain a property `bootstrap.servers` with a 
list of Kafka brokers in the form: 
`<BROKER_1>:<PORT_1>,<BROKER_2>:<PORT_2>,...`.|yes|
+|`consumerProperties`|Map<String, Object>|A map of properties to be passed to 
the Kafka consumer. This must contain a property `bootstrap.servers` with a 
list of Kafka brokers in the form: 
`<BROKER_1>:<PORT_1>,<BROKER_2>:<PORT_2>,...`. For SSL connections, the 
`keystore`, `truststore` and `key` passwords can be provided as a [Password 
Provider](../../operations/password-provider.html) or String password.|yes|
 |`replicas`|Integer|The number of replica sets, where 1 means a single set of 
tasks (no replication). Replica tasks will always be assigned to different 
workers to provide resiliency against node failure.|no (default == 1)|
 |`taskCount`|Integer|The maximum number of *reading* tasks in a *replica set*. 
This means that the maximum number of reading tasks will be `taskCount * 
replicas` and the total number of tasks (*reading* + *publishing*) will be 
higher than this. See 'Capacity Planning' below for more details. The number of 
reading tasks will be less than `taskCount` if `taskCount > 
{numKafkaPartitions}`.|no (default == 1)|
 |`taskDuration`|ISO8601 Period|The length of time before tasks stop reading 
and begin publishing their segment.|no (default == PT1H)|
diff --git 
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIOConfig.java
 
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIOConfig.java
index 3c60449..6a9af7f 100644
--- 
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIOConfig.java
+++ 
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIOConfig.java
@@ -39,7 +39,7 @@ public class KafkaIOConfig implements IOConfig
   private final String baseSequenceName;
   private final KafkaPartitions startPartitions;
   private final KafkaPartitions endPartitions;
-  private final Map<String, String> consumerProperties;
+  private final Map<String, Object> consumerProperties;
   private final boolean useTransaction;
   private final Optional<DateTime> minimumMessageTime;
   private final Optional<DateTime> maximumMessageTime;
@@ -51,7 +51,7 @@ public class KafkaIOConfig implements IOConfig
       @JsonProperty("baseSequenceName") String baseSequenceName,
       @JsonProperty("startPartitions") KafkaPartitions startPartitions,
       @JsonProperty("endPartitions") KafkaPartitions endPartitions,
-      @JsonProperty("consumerProperties") Map<String, String> 
consumerProperties,
+      @JsonProperty("consumerProperties") Map<String, Object> 
consumerProperties,
       @JsonProperty("useTransaction") Boolean useTransaction,
       @JsonProperty("minimumMessageTime") DateTime minimumMessageTime,
       @JsonProperty("maximumMessageTime") DateTime maximumMessageTime,
@@ -114,7 +114,7 @@ public class KafkaIOConfig implements IOConfig
   }
 
   @JsonProperty
-  public Map<String, String> getConsumerProperties()
+  public Map<String, Object> getConsumerProperties()
   {
     return consumerProperties;
   }
diff --git 
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java
 
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java
index 0e362e2..bb73651 100644
--- 
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java
+++ 
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java
@@ -22,6 +22,7 @@ package org.apache.druid.indexing.kafka;
 import com.fasterxml.jackson.annotation.JacksonInject;
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Joiner;
 import com.google.common.base.Optional;
@@ -41,10 +42,12 @@ import 
org.apache.druid.indexing.common.task.RealtimeIndexTask;
 import org.apache.druid.indexing.common.task.TaskResource;
 import org.apache.druid.indexing.common.task.Tasks;
 import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisor;
+import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorIOConfig;
 import org.apache.druid.java.util.common.Intervals;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.parsers.ParseException;
 import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.metadata.PasswordProvider;
 import org.apache.druid.query.NoopQueryRunner;
 import org.apache.druid.query.Query;
 import org.apache.druid.query.QueryRunner;
@@ -92,6 +95,7 @@ public class KafkaIndexTask extends AbstractTask implements 
ChatHandler
   private final KafkaIOConfig ioConfig;
   private final Optional<ChatHandlerProvider> chatHandlerProvider;
   private final KafkaIndexTaskRunner runner;
+  private final ObjectMapper configMapper;
 
   // This value can be tuned in some tests
   private long pollRetryMs = 30000;
@@ -106,7 +110,8 @@ public class KafkaIndexTask extends AbstractTask implements 
ChatHandler
       @JsonProperty("context") Map<String, Object> context,
       @JacksonInject ChatHandlerProvider chatHandlerProvider,
       @JacksonInject AuthorizerMapper authorizerMapper,
-      @JacksonInject RowIngestionMetersFactory rowIngestionMetersFactory
+      @JacksonInject RowIngestionMetersFactory rowIngestionMetersFactory,
+      @JacksonInject ObjectMapper configMapper
   )
   {
     super(
@@ -122,6 +127,7 @@ public class KafkaIndexTask extends AbstractTask implements 
ChatHandler
     this.tuningConfig = Preconditions.checkNotNull(tuningConfig, 
"tuningConfig");
     this.ioConfig = Preconditions.checkNotNull(ioConfig, "ioConfig");
     this.chatHandlerProvider = Optional.fromNullable(chatHandlerProvider);
+    this.configMapper = configMapper;
     final CircularBuffer<Throwable> savedParseExceptions;
     if (tuningConfig.getMaxSavedParseExceptions() > 0) {
       savedParseExceptions = new 
CircularBuffer<>(tuningConfig.getMaxSavedParseExceptions());
@@ -198,7 +204,6 @@ public class KafkaIndexTask extends AbstractTask implements 
ChatHandler
   }
 
 
-
   @Override
   public TaskStatus run(final TaskToolbox toolbox)
   {
@@ -285,9 +290,7 @@ public class KafkaIndexTask extends AbstractTask implements 
ChatHandler
 
       final Properties props = new Properties();
 
-      for (Map.Entry<String, String> entry : 
ioConfig.getConsumerProperties().entrySet()) {
-        props.setProperty(entry.getKey(), entry.getValue());
-      }
+      addConsumerPropertiesFromConfig(props, configMapper, 
ioConfig.getConsumerProperties());
 
       props.setProperty("enable.auto.commit", "false");
       props.setProperty("auto.offset.reset", "none");
@@ -301,6 +304,25 @@ public class KafkaIndexTask extends AbstractTask 
implements ChatHandler
     }
   }
 
+  public static void addConsumerPropertiesFromConfig(Properties properties, 
ObjectMapper configMapper, Map<String, Object> consumerProperties)
+  {
+    // Extract passwords before SSL connection to Kafka
+    for (Map.Entry<String, Object> entry : consumerProperties.entrySet()) {
+      String propertyKey = entry.getKey();
+      if (propertyKey.equals(KafkaSupervisorIOConfig.TRUST_STORE_PASSWORD_KEY)
+          || propertyKey.equals(KafkaSupervisorIOConfig.KEY_STORE_PASSWORD_KEY)
+          || propertyKey.equals(KafkaSupervisorIOConfig.KEY_PASSWORD_KEY)) {
+        PasswordProvider configPasswordProvider = configMapper.convertValue(
+            entry.getValue(),
+            PasswordProvider.class
+        );
+        properties.setProperty(propertyKey, 
configPasswordProvider.getPassword());
+      } else {
+        properties.setProperty(propertyKey, String.valueOf(entry.getValue()));
+      }
+    }
+  }
+
   static void assignPartitions(
       final KafkaConsumer consumer,
       final String topic,
diff --git 
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
 
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
index 4808658..b7845ca 100644
--- 
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
+++ 
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
@@ -1018,7 +1018,7 @@ public class KafkaSupervisor implements Supervisor
     props.setProperty("metadata.max.age.ms", "10000");
     props.setProperty("group.id", StringUtils.format("kafka-supervisor-%s", 
RealtimeIndexTask.makeRandomId()));
 
-    props.putAll(ioConfig.getConsumerProperties());
+    KafkaIndexTask.addConsumerPropertiesFromConfig(props, sortingMapper, 
ioConfig.getConsumerProperties());
 
     props.setProperty("enable.auto.commit", "false");
 
@@ -1918,7 +1918,7 @@ public class KafkaSupervisor implements Supervisor
     }
     TaskGroup group = taskGroups.get(groupId);
 
-    Map<String, String> consumerProperties = 
Maps.newHashMap(ioConfig.getConsumerProperties());
+    Map<String, Object> consumerProperties = 
Maps.newHashMap(ioConfig.getConsumerProperties());
     DateTime minimumMessageTime = 
taskGroups.get(groupId).minimumMessageTime.orNull();
     DateTime maximumMessageTime = 
taskGroups.get(groupId).maximumMessageTime.orNull();
 
@@ -1960,7 +1960,8 @@ public class KafkaSupervisor implements Supervisor
           context,
           null,
           null,
-          rowIngestionMetersFactory
+          rowIngestionMetersFactory,
+          sortingMapper
       );
 
       Optional<TaskQueue> taskQueue = taskMaster.getTaskQueue();
diff --git 
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfig.java
 
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfig.java
index b02458f..44c2bb2 100644
--- 
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfig.java
+++ 
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfig.java
@@ -32,12 +32,15 @@ import java.util.Map;
 public class KafkaSupervisorIOConfig
 {
   public static final String BOOTSTRAP_SERVERS_KEY = "bootstrap.servers";
+  public static final String TRUST_STORE_PASSWORD_KEY = 
"ssl.truststore.password";
+  public static final String KEY_STORE_PASSWORD_KEY = "ssl.keystore.password";
+  public static final String KEY_PASSWORD_KEY = "ssl.key.password";
 
   private final String topic;
   private final Integer replicas;
   private final Integer taskCount;
   private final Duration taskDuration;
-  private final Map<String, String> consumerProperties;
+  private final Map<String, Object> consumerProperties;
   private final Duration startDelay;
   private final Duration period;
   private final boolean useEarliestOffset;
@@ -52,7 +55,7 @@ public class KafkaSupervisorIOConfig
       @JsonProperty("replicas") Integer replicas,
       @JsonProperty("taskCount") Integer taskCount,
       @JsonProperty("taskDuration") Period taskDuration,
-      @JsonProperty("consumerProperties") Map<String, String> 
consumerProperties,
+      @JsonProperty("consumerProperties") Map<String, Object> 
consumerProperties,
       @JsonProperty("startDelay") Period startDelay,
       @JsonProperty("period") Period period,
       @JsonProperty("useEarliestOffset") Boolean useEarliestOffset,
@@ -110,7 +113,7 @@ public class KafkaSupervisorIOConfig
   }
 
   @JsonProperty
-  public Map<String, String> getConsumerProperties()
+  public Map<String, Object> getConsumerProperties()
   {
     return consumerProperties;
   }
diff --git 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
index cd44d68..6dd210a 100644
--- 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
+++ 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
@@ -486,7 +486,7 @@ public class KafkaIndexTaskTest
         kafkaProducer.send(record).get();
       }
     }
-    Map<String, String> consumerProps = kafkaServer.consumerProperties();
+    Map<String, Object> consumerProps = kafkaServer.consumerProperties();
     consumerProps.put("max.poll.records", "1");
 
     final KafkaPartitions startPartitions = new KafkaPartitions(topic, 
ImmutableMap.of(0, 0L, 1, 0L));
@@ -581,7 +581,7 @@ public class KafkaIndexTaskTest
         kafkaProducer.send(records.get(i)).get();
       }
 
-      Map<String, String> consumerProps = kafkaServer.consumerProperties();
+      Map<String, Object> consumerProps = kafkaServer.consumerProperties();
       consumerProps.put("max.poll.records", "1");
 
       final KafkaPartitions startPartitions = new KafkaPartitions(topic, 
ImmutableMap.of(0, 0L, 1, 0L));
@@ -698,7 +698,7 @@ public class KafkaIndexTaskTest
         kafkaProducer.send(record).get();
       }
     }
-    Map<String, String> consumerProps = kafkaServer.consumerProperties();
+    Map<String, Object> consumerProps = kafkaServer.consumerProperties();
     consumerProps.put("max.poll.records", "1");
 
     final KafkaPartitions startPartitions = new KafkaPartitions(topic, 
ImmutableMap.of(0, 0L, 1, 0L));
@@ -2027,7 +2027,8 @@ public class KafkaIndexTaskTest
         context,
         null,
         null,
-        rowIngestionMetersFactory
+        rowIngestionMetersFactory,
+        objectMapper
     );
     task.setPollRetryMs(POLL_RETRY_MS);
     return task;
@@ -2073,7 +2074,8 @@ public class KafkaIndexTaskTest
         context,
         null,
         null,
-        rowIngestionMetersFactory
+        rowIngestionMetersFactory,
+        objectMapper
     );
     task.setPollRetryMs(POLL_RETRY_MS);
     return task;
diff --git 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java
 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java
index 5a7df1c..a7dc204 100644
--- 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java
+++ 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java
@@ -23,6 +23,7 @@ import com.fasterxml.jackson.databind.JsonMappingException;
 import com.fasterxml.jackson.databind.Module;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.ImmutableMap;
+import org.apache.druid.indexing.kafka.KafkaIndexTask;
 import org.apache.druid.indexing.kafka.KafkaIndexTaskModule;
 import org.apache.druid.jackson.DefaultObjectMapper;
 import org.hamcrest.CoreMatchers;
@@ -32,6 +33,8 @@ import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
 
+import java.util.Properties;
+
 public class KafkaSupervisorIOConfigTest
 {
   private final ObjectMapper mapper;
@@ -120,6 +123,31 @@ public class KafkaSupervisorIOConfigTest
   }
 
   @Test
+  public void testSerdeForConsumerPropertiesWithPasswords() throws Exception
+  {
+    String jsonStr = "{\n"
+                     + "  \"type\": \"kafka\",\n"
+                     + "  \"topic\": \"my-topic\",\n"
+                     + "  \"consumerProperties\": 
{\"bootstrap.servers\":\"localhost:9092\",\n"
+                     + "   \"ssl.truststore.password\":{\"type\": \"default\", 
\"password\": \"mytruststorepassword\"},\n"
+                     + "   \"ssl.keystore.password\":{\"type\": \"default\", 
\"password\": \"mykeystorepassword\"},\n"
+                     + "   \"ssl.key.password\":\"mykeypassword\"}\n"
+                     + "}";
+
+    KafkaSupervisorIOConfig config = mapper.readValue(
+        jsonStr, KafkaSupervisorIOConfig.class
+    );
+    Properties props = new Properties();
+    KafkaIndexTask.addConsumerPropertiesFromConfig(props, mapper, 
config.getConsumerProperties());
+
+    Assert.assertEquals("my-topic", config.getTopic());
+    Assert.assertEquals("localhost:9092", 
props.getProperty("bootstrap.servers"));
+    Assert.assertEquals("mytruststorepassword", 
props.getProperty("ssl.truststore.password"));
+    Assert.assertEquals("mykeystorepassword", 
props.getProperty("ssl.keystore.password"));
+    Assert.assertEquals("mykeypassword", 
props.getProperty("ssl.key.password"));
+  }
+
+  @Test
   public void testTopicRequired() throws Exception
   {
     String jsonStr = "{\n"
diff --git 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
index d5b048a..c4e24f1 100644
--- 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
+++ 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
@@ -2583,7 +2583,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
       String kafkaHost
   )
   {
-    Map<String, String> consumerProperties = new HashMap<>();
+    Map<String, Object> consumerProperties = new HashMap<>();
     consumerProperties.put("myCustomKey", "myCustomValue");
     consumerProperties.put("bootstrap.servers", kafkaHost);
     KafkaSupervisorIOConfig kafkaSupervisorIOConfig = new 
KafkaSupervisorIOConfig(
@@ -2711,7 +2711,8 @@ public class KafkaSupervisorTest extends EasyMockSupport
         Collections.emptyMap(),
         null,
         null,
-        rowIngestionMetersFactory
+        rowIngestionMetersFactory,
+        objectMapper
     );
   }
 
diff --git 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/test/TestBroker.java
 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/test/TestBroker.java
index 561276e..c1a0671 100644
--- 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/test/TestBroker.java
+++ 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/test/TestBroker.java
@@ -110,9 +110,9 @@ public class TestBroker implements Closeable
     return props;
   }
 
-  public Map<String, String> consumerProperties()
+  public Map<String, Object> consumerProperties()
   {
-    final Map<String, String> props = Maps.newHashMap();
+    final Map<String, Object> props = Maps.newHashMap();
     props.put("bootstrap.servers", StringUtils.format("localhost:%d", 
getPort()));
     props.put("key.deserializer", ByteArrayDeserializer.class.getName());
     props.put("value.deserializer", ByteArrayDeserializer.class.getName());


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org

Reply via email to