kafka git commit: KAFKA-2798: Use prefixedd configurations for Kafka Connect producer and consumer settings so they do not conflict with the distributed herder's settings.
Repository: kafka Updated Branches: refs/heads/trunk ae5a5d7c0 -> 403d89ede KAFKA-2798: Use prefixedd configurations for Kafka Connect producer and consumer settings so they do not conflict with the distributed herder's settings. Author: Ewen Cheslack-PostavaReviewers: Gwen Shapira Closes #486 from ewencp/kafka-2798-conflicting-herder-producer-consumer-configs Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/403d89ed Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/403d89ed Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/403d89ed Branch: refs/heads/trunk Commit: 403d89edeaa7808f71c0e7318411c925895210f2 Parents: ae5a5d7 Author: Ewen Cheslack-Postava Authored: Tue Nov 10 11:07:26 2015 -0800 Committer: Gwen Shapira Committed: Tue Nov 10 11:07:26 2015 -0800 -- .../java/org/apache/kafka/common/config/AbstractConfig.java | 8 .../main/java/org/apache/kafka/connect/runtime/Worker.java | 2 +- .../org/apache/kafka/connect/runtime/WorkerSinkTask.java | 5 - 3 files changed, 5 insertions(+), 10 deletions(-) -- http://git-wip-us.apache.org/repos/asf/kafka/blob/403d89ed/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java -- diff --git a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java index 07b64c0..1029356 100644 --- a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java +++ b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java @@ -105,14 +105,6 @@ public class AbstractConfig { return keys; } -public Map unusedConfigs() { -Set unusedKeys = this.unused(); -Map unusedProps = new HashMap<>(); -for (String key : unusedKeys) -unusedProps.put(key, this.originals.get(key)); -return unusedProps; -} - public Map originals() { Map copy = new RecordingMap<>(); copy.putAll(originals); http://git-wip-us.apache.org/repos/asf/kafka/blob/403d89ed/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java -- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java index 359a79c..f5b23ec 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java @@ -101,7 +101,7 @@ public class Worker { producerProps.put(ProducerConfig.ACKS_CONFIG, "all"); producerProps.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1"); -producerProps.putAll(config.unusedConfigs()); +producerProps.putAll(config.originalsWithPrefix("producer.")); producer = new KafkaProducer<>(producerProps); http://git-wip-us.apache.org/repos/asf/kafka/blob/403d89ed/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java -- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java index 643b10e..e0a3e04 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java @@ -233,7 +233,8 @@ class WorkerSinkTask implements WorkerTask { private KafkaConsumer createConsumer() { // Include any unknown worker configs so consumer configs can be set globally on the worker // and through to the task -Map props = workerConfig.unusedConfigs(); +Map props = new HashMap<>(); + props.put(ConsumerConfig.GROUP_ID_CONFIG, "connect-" + id.connector()); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, Utils.join(workerConfig.getList(WorkerConfig.BOOTSTRAP_SERVERS_CONFIG), ",")); @@ -242,6 +243,8 @@ class WorkerSinkTask implements WorkerTask { props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer"); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer"); +props.putAll(workerConfig.originalsWithPrefix("consumer.")); +
kafka git commit: KAFKA-2798: Use prefixedd configurations for Kafka Connect producer and consumer settings so they do not conflict with the distributed herder's settings.
Repository: kafka Updated Branches: refs/heads/0.9.0 69f2ad8e2 -> dd8a870e4 KAFKA-2798: Use prefixedd configurations for Kafka Connect producer and consumer settings so they do not conflict with the distributed herder's settings. Author: Ewen Cheslack-PostavaReviewers: Gwen Shapira Closes #486 from ewencp/kafka-2798-conflicting-herder-producer-consumer-configs (cherry picked from commit 403d89edeaa7808f71c0e7318411c925895210f2) Signed-off-by: Gwen Shapira Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/dd8a870e Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/dd8a870e Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/dd8a870e Branch: refs/heads/0.9.0 Commit: dd8a870e4df11cad009498ebfb9b6eac78f1652f Parents: 69f2ad8 Author: Ewen Cheslack-Postava Authored: Tue Nov 10 11:07:26 2015 -0800 Committer: Gwen Shapira Committed: Tue Nov 10 11:07:40 2015 -0800 -- .../java/org/apache/kafka/common/config/AbstractConfig.java | 8 .../main/java/org/apache/kafka/connect/runtime/Worker.java | 2 +- .../org/apache/kafka/connect/runtime/WorkerSinkTask.java | 5 - 3 files changed, 5 insertions(+), 10 deletions(-) -- http://git-wip-us.apache.org/repos/asf/kafka/blob/dd8a870e/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java -- diff --git a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java index 07b64c0..1029356 100644 --- a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java +++ b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java @@ -105,14 +105,6 @@ public class AbstractConfig { return keys; } -public Map unusedConfigs() { -Set unusedKeys = this.unused(); -Map unusedProps = new HashMap<>(); -for (String key : unusedKeys) -unusedProps.put(key, this.originals.get(key)); -return unusedProps; -} - public Map originals() { Map copy = new RecordingMap<>(); copy.putAll(originals); http://git-wip-us.apache.org/repos/asf/kafka/blob/dd8a870e/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java -- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java index 359a79c..f5b23ec 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java @@ -101,7 +101,7 @@ public class Worker { producerProps.put(ProducerConfig.ACKS_CONFIG, "all"); producerProps.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1"); -producerProps.putAll(config.unusedConfigs()); +producerProps.putAll(config.originalsWithPrefix("producer.")); producer = new KafkaProducer<>(producerProps); http://git-wip-us.apache.org/repos/asf/kafka/blob/dd8a870e/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java -- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java index 643b10e..e0a3e04 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java @@ -233,7 +233,8 @@ class WorkerSinkTask implements WorkerTask { private KafkaConsumer createConsumer() { // Include any unknown worker configs so consumer configs can be set globally on the worker // and through to the task -Map props = workerConfig.unusedConfigs(); +Map props = new HashMap<>(); + props.put(ConsumerConfig.GROUP_ID_CONFIG, "connect-" + id.connector()); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, Utils.join(workerConfig.getList(WorkerConfig.BOOTSTRAP_SERVERS_CONFIG), ",")); @@ -242,6 +243,8 @@ class WorkerSinkTask implements WorkerTask { props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer"); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,