Repository: kafka Updated Branches: refs/heads/trunk 402aa093d -> 7c988a3c8
KAFKA-5330: Use per-task converters in Connect Instead of sharing the same converter instance within the worker, use a converter per task. More details: - https://github.com/confluentinc/schema-registry/issues/514 - https://issues.apache.org/jira/browse/KAFKA-5330 Author: Thibaud Chardonnens <thibaud.chardonn...@swisscom.com> Reviewers: Ewen Cheslack-Postava <e...@confluent.io> Closes #3196 from tbcdns/KAFKA-5330 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/7c988a3c Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/7c988a3c Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/7c988a3c Branch: refs/heads/trunk Commit: 7c988a3c8bc9429c1f28f9c29dafed185cec1a67 Parents: 402aa09 Author: Thibaud Chardonnens <thibaud.chardonn...@swisscom.com> Authored: Thu Sep 21 20:12:08 2017 -0700 Committer: Ewen Cheslack-Postava <m...@ewencp.org> Committed: Thu Sep 21 20:12:08 2017 -0700 ---------------------------------------------------------------------- .../apache/kafka/connect/runtime/Worker.java | 31 ++++++----- .../kafka/connect/runtime/WorkerTest.java | 54 +++++++++++--------- 2 files changed, 46 insertions(+), 39 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/7c988a3c/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java index 12802c1..01611ab 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java @@ -68,8 +68,6 @@ public class Worker { private final String workerId; private final Plugins plugins; private final WorkerConfig config; - private final Converter defaultKeyConverter; - private final Converter defaultValueConverter; private final Converter internalKeyConverter; private final Converter internalValueConverter; private final OffsetBackingStore offsetBackingStore; @@ -91,18 +89,7 @@ public class Worker { this.time = time; this.plugins = plugins; this.config = config; - // Converters are required properties, thus getClass won't return null. - this.defaultKeyConverter = plugins.newConverter( - config.getClass(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG).getName(), - config - ); - this.defaultKeyConverter.configure(config.originalsWithPrefix("key.converter."), true); - this.defaultValueConverter = plugins.newConverter( - config.getClass(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG).getName(), - config - ); - this.defaultValueConverter.configure(config.originalsWithPrefix("value.converter."), false); - // Same, internal converters are required properties, thus getClass won't return null. + // Internal converters are required properties, thus getClass won't return null. this.internalKeyConverter = plugins.newConverter( config.getClass(WorkerConfig.INTERNAL_KEY_CONVERTER_CLASS_CONFIG).getName(), config @@ -378,13 +365,25 @@ public class Worker { Converter keyConverter = connConfig.getConfiguredInstance(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, Converter.class); if (keyConverter != null) keyConverter.configure(connConfig.originalsWithPrefix("key.converter."), true); - else + else { + Converter defaultKeyConverter = plugins.newConverter( + config.getClass(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG).getName(), + config + ); + defaultKeyConverter.configure(config.originalsWithPrefix("key.converter."), true); keyConverter = defaultKeyConverter; + } Converter valueConverter = connConfig.getConfiguredInstance(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, Converter.class); if (valueConverter != null) valueConverter.configure(connConfig.originalsWithPrefix("value.converter."), false); - else + else { + Converter defaultValueConverter = plugins.newConverter( + config.getClass(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG).getName(), + config + ); + defaultValueConverter.configure(config.originalsWithPrefix("value.converter."), false); valueConverter = defaultValueConverter; + } workerTask = buildWorkerTask(connConfig, id, task, statusListener, initialState, keyConverter, valueConverter, connectorLoader); workerTask.initialize(taskConfig); http://git-wip-us.apache.org/repos/asf/kafka/blob/7c988a3c/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java index a807a30..91b07be 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java @@ -429,7 +429,7 @@ public class WorkerTest extends ThreadedTest { @Test public void testAddRemoveTask() throws Exception { - expectConverters(); + expectConverters(true); expectStartStorage(); // Create @@ -551,7 +551,7 @@ public class WorkerTest extends ThreadedTest { @Test public void testCleanupTasksOnStop() throws Exception { - expectConverters(); + expectConverters(true); expectStartStorage(); // Create @@ -734,33 +734,41 @@ public class WorkerTest extends ThreadedTest { } private void expectConverters() { - expectConverters(JsonConverter.class); + expectConverters(JsonConverter.class, false); } - private void expectConverters(Class<? extends Converter> converterClass) { - // connector default - Converter keyConverter = PowerMock.createMock(converterClass); - Converter valueConverter = PowerMock.createMock(converterClass); + private void expectConverters(Boolean expectDefaultConverters) { + expectConverters(JsonConverter.class, expectDefaultConverters); + } + + private void expectConverters(Class<? extends Converter> converterClass, Boolean expectDefaultConverters) { + // As default converters are instantiated when a task starts, they are expected only if the `startTask` method is called + if (expectDefaultConverters) { + // connector default + Converter keyConverter = PowerMock.createMock(converterClass); + Converter valueConverter = PowerMock.createMock(converterClass); + + // Instantiate and configure default + EasyMock.expect(plugins.newConverter(JsonConverter.class.getName(), config)) + .andReturn(keyConverter); + keyConverter.configure( + EasyMock.<Map<String, ?>>anyObject(), + EasyMock.anyBoolean() + ); + EasyMock.expectLastCall(); + EasyMock.expect(plugins.newConverter(JsonConverter.class.getName(), config)) + .andReturn(valueConverter); + valueConverter.configure( + EasyMock.<Map<String, ?>>anyObject(), + EasyMock.anyBoolean() + ); + EasyMock.expectLastCall(); + } + //internal Converter internalKeyConverter = PowerMock.createMock(converterClass); Converter internalValueConverter = PowerMock.createMock(converterClass); - // Instantiate and configure default - EasyMock.expect(plugins.newConverter(JsonConverter.class.getName(), config)) - .andReturn(keyConverter); - keyConverter.configure( - EasyMock.<Map<String, ?>>anyObject(), - EasyMock.anyBoolean() - ); - EasyMock.expectLastCall(); - EasyMock.expect(plugins.newConverter(JsonConverter.class.getName(), config)) - .andReturn(valueConverter); - valueConverter.configure( - EasyMock.<Map<String, ?>>anyObject(), - EasyMock.anyBoolean() - ); - EasyMock.expectLastCall(); - // Instantiate and configure internal EasyMock.expect(plugins.newConverter(JsonConverter.class.getName(), config)) .andReturn(internalKeyConverter);