kafka git commit: KAFKA-2798: Use prefixedd configurations for Kafka Connect producer and consumer settings so they do not conflict with the distributed herder's settings.

2015-11-10 Thread gwenshap
Repository: kafka
Updated Branches:
  refs/heads/trunk ae5a5d7c0 -> 403d89ede


KAFKA-2798: Use prefixedd configurations for Kafka Connect producer and 
consumer settings so they do not conflict with the distributed herder's 
settings.

Author: Ewen Cheslack-Postava 

Reviewers: Gwen Shapira

Closes #486 from ewencp/kafka-2798-conflicting-herder-producer-consumer-configs


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/403d89ed
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/403d89ed
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/403d89ed

Branch: refs/heads/trunk
Commit: 403d89edeaa7808f71c0e7318411c925895210f2
Parents: ae5a5d7
Author: Ewen Cheslack-Postava 
Authored: Tue Nov 10 11:07:26 2015 -0800
Committer: Gwen Shapira 
Committed: Tue Nov 10 11:07:26 2015 -0800

--
 .../java/org/apache/kafka/common/config/AbstractConfig.java  | 8 
 .../main/java/org/apache/kafka/connect/runtime/Worker.java   | 2 +-
 .../org/apache/kafka/connect/runtime/WorkerSinkTask.java | 5 -
 3 files changed, 5 insertions(+), 10 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/kafka/blob/403d89ed/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
--
diff --git 
a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java 
b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
index 07b64c0..1029356 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
@@ -105,14 +105,6 @@ public class AbstractConfig {
 return keys;
 }
 
-public Map unusedConfigs() {
-Set unusedKeys = this.unused();
-Map unusedProps = new HashMap<>();
-for (String key : unusedKeys)
-unusedProps.put(key, this.originals.get(key));
-return unusedProps;
-}
-
 public Map originals() {
 Map copy = new RecordingMap<>();
 copy.putAll(originals);

http://git-wip-us.apache.org/repos/asf/kafka/blob/403d89ed/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
--
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
index 359a79c..f5b23ec 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
@@ -101,7 +101,7 @@ public class Worker {
 producerProps.put(ProducerConfig.ACKS_CONFIG, "all");
 
producerProps.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1");
 
-producerProps.putAll(config.unusedConfigs());
+producerProps.putAll(config.originalsWithPrefix("producer."));
 
 producer = new KafkaProducer<>(producerProps);
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/403d89ed/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
--
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
index 643b10e..e0a3e04 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
@@ -233,7 +233,8 @@ class WorkerSinkTask implements WorkerTask {
 private KafkaConsumer createConsumer() {
 // Include any unknown worker configs so consumer configs can be set 
globally on the worker
 // and through to the task
-Map props = workerConfig.unusedConfigs();
+Map props = new HashMap<>();
+
 props.put(ConsumerConfig.GROUP_ID_CONFIG, "connect-" + id.connector());
 props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
 
Utils.join(workerConfig.getList(WorkerConfig.BOOTSTRAP_SERVERS_CONFIG), ","));
@@ -242,6 +243,8 @@ class WorkerSinkTask implements WorkerTask {
 props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.ByteArrayDeserializer");
 props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.ByteArrayDeserializer");
 
+props.putAll(workerConfig.originalsWithPrefix("consumer."));
+
 

kafka git commit: KAFKA-2798: Use prefixedd configurations for Kafka Connect producer and consumer settings so they do not conflict with the distributed herder's settings.

2015-11-10 Thread gwenshap
Repository: kafka
Updated Branches:
  refs/heads/0.9.0 69f2ad8e2 -> dd8a870e4


KAFKA-2798: Use prefixedd configurations for Kafka Connect producer and 
consumer settings so they do not conflict with the distributed herder's 
settings.

Author: Ewen Cheslack-Postava 

Reviewers: Gwen Shapira

Closes #486 from ewencp/kafka-2798-conflicting-herder-producer-consumer-configs

(cherry picked from commit 403d89edeaa7808f71c0e7318411c925895210f2)
Signed-off-by: Gwen Shapira 


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/dd8a870e
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/dd8a870e
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/dd8a870e

Branch: refs/heads/0.9.0
Commit: dd8a870e4df11cad009498ebfb9b6eac78f1652f
Parents: 69f2ad8
Author: Ewen Cheslack-Postava 
Authored: Tue Nov 10 11:07:26 2015 -0800
Committer: Gwen Shapira 
Committed: Tue Nov 10 11:07:40 2015 -0800

--
 .../java/org/apache/kafka/common/config/AbstractConfig.java  | 8 
 .../main/java/org/apache/kafka/connect/runtime/Worker.java   | 2 +-
 .../org/apache/kafka/connect/runtime/WorkerSinkTask.java | 5 -
 3 files changed, 5 insertions(+), 10 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/kafka/blob/dd8a870e/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
--
diff --git 
a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java 
b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
index 07b64c0..1029356 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
@@ -105,14 +105,6 @@ public class AbstractConfig {
 return keys;
 }
 
-public Map unusedConfigs() {
-Set unusedKeys = this.unused();
-Map unusedProps = new HashMap<>();
-for (String key : unusedKeys)
-unusedProps.put(key, this.originals.get(key));
-return unusedProps;
-}
-
 public Map originals() {
 Map copy = new RecordingMap<>();
 copy.putAll(originals);

http://git-wip-us.apache.org/repos/asf/kafka/blob/dd8a870e/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
--
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
index 359a79c..f5b23ec 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
@@ -101,7 +101,7 @@ public class Worker {
 producerProps.put(ProducerConfig.ACKS_CONFIG, "all");
 
producerProps.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1");
 
-producerProps.putAll(config.unusedConfigs());
+producerProps.putAll(config.originalsWithPrefix("producer."));
 
 producer = new KafkaProducer<>(producerProps);
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/dd8a870e/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
--
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
index 643b10e..e0a3e04 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
@@ -233,7 +233,8 @@ class WorkerSinkTask implements WorkerTask {
 private KafkaConsumer createConsumer() {
 // Include any unknown worker configs so consumer configs can be set 
globally on the worker
 // and through to the task
-Map props = workerConfig.unusedConfigs();
+Map props = new HashMap<>();
+
 props.put(ConsumerConfig.GROUP_ID_CONFIG, "connect-" + id.connector());
 props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
 
Utils.join(workerConfig.getList(WorkerConfig.BOOTSTRAP_SERVERS_CONFIG), ","));
@@ -242,6 +243,8 @@ class WorkerSinkTask implements WorkerTask {
 props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.ByteArrayDeserializer");
 props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,