Repository: kafka
Updated Branches:
  refs/heads/trunk 402aa093d -> 7c988a3c8


KAFKA-5330: Use per-task converters in Connect

Instead of sharing the same converter instance within the worker, use a 
converter per task.

More details:
- https://github.com/confluentinc/schema-registry/issues/514
- https://issues.apache.org/jira/browse/KAFKA-5330

Author: Thibaud Chardonnens <thibaud.chardonn...@swisscom.com>

Reviewers: Ewen Cheslack-Postava <e...@confluent.io>

Closes #3196 from tbcdns/KAFKA-5330


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/7c988a3c
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/7c988a3c
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/7c988a3c

Branch: refs/heads/trunk
Commit: 7c988a3c8bc9429c1f28f9c29dafed185cec1a67
Parents: 402aa09
Author: Thibaud Chardonnens <thibaud.chardonn...@swisscom.com>
Authored: Thu Sep 21 20:12:08 2017 -0700
Committer: Ewen Cheslack-Postava <m...@ewencp.org>
Committed: Thu Sep 21 20:12:08 2017 -0700

----------------------------------------------------------------------
 .../apache/kafka/connect/runtime/Worker.java    | 31 ++++++-----
 .../kafka/connect/runtime/WorkerTest.java       | 54 +++++++++++---------
 2 files changed, 46 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/7c988a3c/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
----------------------------------------------------------------------
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
index 12802c1..01611ab 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
@@ -68,8 +68,6 @@ public class Worker {
     private final String workerId;
     private final Plugins plugins;
     private final WorkerConfig config;
-    private final Converter defaultKeyConverter;
-    private final Converter defaultValueConverter;
     private final Converter internalKeyConverter;
     private final Converter internalValueConverter;
     private final OffsetBackingStore offsetBackingStore;
@@ -91,18 +89,7 @@ public class Worker {
         this.time = time;
         this.plugins = plugins;
         this.config = config;
-        // Converters are required properties, thus getClass won't return null.
-        this.defaultKeyConverter = plugins.newConverter(
-                
config.getClass(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG).getName(),
-                config
-        );
-        
this.defaultKeyConverter.configure(config.originalsWithPrefix("key.converter."),
 true);
-        this.defaultValueConverter = plugins.newConverter(
-                
config.getClass(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG).getName(),
-                config
-        );
-        
this.defaultValueConverter.configure(config.originalsWithPrefix("value.converter."),
 false);
-        // Same, internal converters are required properties, thus getClass 
won't return null.
+        // Internal converters are required properties, thus getClass won't 
return null.
         this.internalKeyConverter = plugins.newConverter(
                 
config.getClass(WorkerConfig.INTERNAL_KEY_CONVERTER_CLASS_CONFIG).getName(),
                 config
@@ -378,13 +365,25 @@ public class Worker {
             Converter keyConverter = 
connConfig.getConfiguredInstance(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, 
Converter.class);
             if (keyConverter != null)
                 
keyConverter.configure(connConfig.originalsWithPrefix("key.converter."), true);
-            else
+            else {
+                Converter defaultKeyConverter = plugins.newConverter(
+                        
config.getClass(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG).getName(),
+                        config
+                );
+                
defaultKeyConverter.configure(config.originalsWithPrefix("key.converter."), 
true);
                 keyConverter = defaultKeyConverter;
+            }
             Converter valueConverter = 
connConfig.getConfiguredInstance(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, 
Converter.class);
             if (valueConverter != null)
                 
valueConverter.configure(connConfig.originalsWithPrefix("value.converter."), 
false);
-            else
+            else {
+                Converter defaultValueConverter = plugins.newConverter(
+                        
config.getClass(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG).getName(),
+                        config
+                );
+                
defaultValueConverter.configure(config.originalsWithPrefix("value.converter."), 
false);
                 valueConverter = defaultValueConverter;
+            }
 
             workerTask = buildWorkerTask(connConfig, id, task, statusListener, 
initialState, keyConverter, valueConverter, connectorLoader);
             workerTask.initialize(taskConfig);

http://git-wip-us.apache.org/repos/asf/kafka/blob/7c988a3c/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
----------------------------------------------------------------------
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
index a807a30..91b07be 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
@@ -429,7 +429,7 @@ public class WorkerTest extends ThreadedTest {
 
     @Test
     public void testAddRemoveTask() throws Exception {
-        expectConverters();
+        expectConverters(true);
         expectStartStorage();
 
         // Create
@@ -551,7 +551,7 @@ public class WorkerTest extends ThreadedTest {
 
     @Test
     public void testCleanupTasksOnStop() throws Exception {
-        expectConverters();
+        expectConverters(true);
         expectStartStorage();
 
         // Create
@@ -734,33 +734,41 @@ public class WorkerTest extends ThreadedTest {
     }
 
     private void expectConverters() {
-        expectConverters(JsonConverter.class);
+        expectConverters(JsonConverter.class, false);
     }
 
-    private void expectConverters(Class<? extends Converter> converterClass) {
-        // connector default
-        Converter keyConverter = PowerMock.createMock(converterClass);
-        Converter valueConverter = PowerMock.createMock(converterClass);
+    private void expectConverters(Boolean expectDefaultConverters) {
+        expectConverters(JsonConverter.class, expectDefaultConverters);
+    }
+
+    private void expectConverters(Class<? extends Converter> converterClass, 
Boolean expectDefaultConverters) {
+        // As default converters are instantiated when a task starts, they are 
expected only if the `startTask` method is called
+        if (expectDefaultConverters) {
+            // connector default
+            Converter keyConverter = PowerMock.createMock(converterClass);
+            Converter valueConverter = PowerMock.createMock(converterClass);
+
+            // Instantiate and configure default
+            
EasyMock.expect(plugins.newConverter(JsonConverter.class.getName(), config))
+                    .andReturn(keyConverter);
+            keyConverter.configure(
+                    EasyMock.<Map<String, ?>>anyObject(),
+                    EasyMock.anyBoolean()
+            );
+            EasyMock.expectLastCall();
+            
EasyMock.expect(plugins.newConverter(JsonConverter.class.getName(), config))
+                    .andReturn(valueConverter);
+            valueConverter.configure(
+                    EasyMock.<Map<String, ?>>anyObject(),
+                    EasyMock.anyBoolean()
+            );
+            EasyMock.expectLastCall();
+        }
+
         //internal
         Converter internalKeyConverter = PowerMock.createMock(converterClass);
         Converter internalValueConverter = 
PowerMock.createMock(converterClass);
 
-        // Instantiate and configure default
-        EasyMock.expect(plugins.newConverter(JsonConverter.class.getName(), 
config))
-                .andReturn(keyConverter);
-        keyConverter.configure(
-                EasyMock.<Map<String, ?>>anyObject(),
-                EasyMock.anyBoolean()
-        );
-        EasyMock.expectLastCall();
-        EasyMock.expect(plugins.newConverter(JsonConverter.class.getName(), 
config))
-                .andReturn(valueConverter);
-        valueConverter.configure(
-                EasyMock.<Map<String, ?>>anyObject(),
-                EasyMock.anyBoolean()
-        );
-        EasyMock.expectLastCall();
-
         // Instantiate and configure internal
         EasyMock.expect(plugins.newConverter(JsonConverter.class.getName(), 
config))
                 .andReturn(internalKeyConverter);

Reply via email to