This is an automated email from the ASF dual-hosted git repository.

ewencp pushed a commit to branch 1.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/1.1 by this push:
     new cdb3b8d  KAFKA-6513: Corrected how Converters and HeaderConverters are 
instantiated and configured
cdb3b8d is described below

commit cdb3b8d680114e45a6b361ad9f417c95c4ff4804
Author: Randall Hauch <rha...@gmail.com>
AuthorDate: Fri Feb 9 15:44:55 2018 -0800

    KAFKA-6513: Corrected how Converters and HeaderConverters are instantiated 
and configured
    
    The commits for KIP-145 (KAFKA-5142) changed how the Connect workers 
instantiate and configure the Converters, and also added the ability to do the 
same for the new HeaderConverters. However, the last few commits removed the 
default value for the `converter.type` property for Converters and 
HeaderConverters, and this broke how the internal converters were being created.
    
    This change corrects the behavior so that the `converter.type` property is 
always set by the worker (or by the Plugins class), which means the existing 
Converter implementations will not have to do this. The built-in JsonConverter, 
ByteArrayConverter, and StringConverter also implement HeaderConverter which 
implements Configurable, but the Worker and Plugins methods do not yet use the 
`Configurable.configure(Map)` method and instead still use the 
`Converter.configure(Map,boolean)`.
    
    Several tests were modified, and a new PluginsTest was added to verify the 
new behavior in Plugins for instantiating and configuring the Converter and 
HeaderConverter instances.
    
    Author: Randall Hauch <rha...@gmail.com>
    
    Reviewers: Konstantine Karantasis <konstant...@confluent.io>, Ewen 
Cheslack-Postava <e...@confluent.io>
    
    Closes #4512 from rhauch/kafka-6513
    
    (cherry picked from commit 976a3b0cc858d4a13cf8b34b325aefc8d706be9e)
    Signed-off-by: Ewen Cheslack-Postava <m...@ewencp.org>
---
 .../kafka/connect/storage/HeaderConverter.java     |   2 +-
 .../org/apache/kafka/connect/runtime/Worker.java   |  59 +++---
 .../runtime/isolation/DelegatingClassLoader.java   |  11 +
 .../runtime/isolation/PluginScanResult.java        |  13 +-
 .../kafka/connect/runtime/isolation/Plugins.java   | 171 ++++++++++++----
 .../apache/kafka/connect/runtime/WorkerTest.java   | 227 ++++++++++++++-------
 .../connect/runtime/isolation/PluginsTest.java     | 191 +++++++++++++++++
 7 files changed, 520 insertions(+), 154 deletions(-)

diff --git 
a/connect/api/src/main/java/org/apache/kafka/connect/storage/HeaderConverter.java
 
b/connect/api/src/main/java/org/apache/kafka/connect/storage/HeaderConverter.java
index b8455e1..3f9d504 100644
--- 
a/connect/api/src/main/java/org/apache/kafka/connect/storage/HeaderConverter.java
+++ 
b/connect/api/src/main/java/org/apache/kafka/connect/storage/HeaderConverter.java
@@ -36,7 +36,7 @@ public interface HeaderConverter extends Configurable, 
Closeable {
     SchemaAndValue toConnectHeader(String topic, String headerKey, byte[] 
value);
 
     /**
-     * Convert the {@link Header}'s {@link Header#valueAsBytes() value} into 
its byte array representation.
+     * Convert the {@link Header}'s {@link Header#value() value} into its byte 
array representation.
      * @param topic the name of the topic for the record containing the header
      * @param headerKey the header's key; may not be null
      * @param schema the schema for the header's value; may be null
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
index 834eb39..e3d9cf4 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
@@ -31,13 +31,12 @@ import org.apache.kafka.connect.errors.ConnectException;
 import org.apache.kafka.connect.runtime.ConnectMetrics.LiteralSupplier;
 import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup;
 import org.apache.kafka.connect.runtime.isolation.Plugins;
+import org.apache.kafka.connect.runtime.isolation.Plugins.ClassLoaderUsage;
 import org.apache.kafka.connect.sink.SinkRecord;
 import org.apache.kafka.connect.sink.SinkTask;
 import org.apache.kafka.connect.source.SourceRecord;
 import org.apache.kafka.connect.source.SourceTask;
 import org.apache.kafka.connect.storage.Converter;
-import org.apache.kafka.connect.storage.ConverterConfig;
-import org.apache.kafka.connect.storage.ConverterType;
 import org.apache.kafka.connect.storage.HeaderConverter;
 import org.apache.kafka.connect.storage.OffsetBackingStore;
 import org.apache.kafka.connect.storage.OffsetStorageReader;
@@ -105,19 +104,14 @@ public class Worker {
 
         // Internal converters are required properties, thus getClass won't 
return null.
         this.internalKeyConverter = plugins.newConverter(
-                
config.getClass(WorkerConfig.INTERNAL_KEY_CONVERTER_CLASS_CONFIG).getName(),
-                config
+                config,
+                WorkerConfig.INTERNAL_KEY_CONVERTER_CLASS_CONFIG,
+                ClassLoaderUsage.PLUGINS
         );
-        this.internalKeyConverter.configure(
-                config.originalsWithPrefix("internal.key.converter."),
-                true);
         this.internalValueConverter = plugins.newConverter(
-                
config.getClass(WorkerConfig.INTERNAL_VALUE_CONVERTER_CLASS_CONFIG).getName(),
-                config
-        );
-        this.internalValueConverter.configure(
-                config.originalsWithPrefix("internal.value.converter."),
-                false
+                config,
+                WorkerConfig.INTERNAL_VALUE_CONVERTER_CLASS_CONFIG,
+                ClassLoaderUsage.PLUGINS
         );
 
         this.offsetBackingStore = offsetBackingStore;
@@ -383,30 +377,33 @@ public class Worker {
             log.info("Instantiated task {} with version {} of type {}", id, 
task.version(), taskClass.getName());
 
             // By maintaining connector's specific class loader for this 
thread here, we first
-            // search for converters within the connector dependencies, and if 
not found the
-            // plugin class loader delegates loading to the delegating 
classloader.
-            Converter keyConverter = 
connConfig.getConfiguredInstance(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, 
Converter.class);
+            // search for converters within the connector dependencies.
+            // If any of these aren't found, that means the connector didn't 
configure specific converters,
+            // so we should instantiate based upon the worker configuration
+            Converter keyConverter = plugins.newConverter(
+                    connConfig,
+                    WorkerConfig.KEY_CONVERTER_CLASS_CONFIG,
+                    ClassLoaderUsage.CURRENT_CLASSLOADER
+            );
+            Converter valueConverter = plugins.newConverter(
+                    connConfig,
+                    WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG,
+                    ClassLoaderUsage.CURRENT_CLASSLOADER
+            );
+            HeaderConverter headerConverter = plugins.newHeaderConverter(
+                    connConfig,
+                    WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG,
+                    ClassLoaderUsage.CURRENT_CLASSLOADER
+            );
             if (keyConverter == null) {
-                String className = 
config.getClass(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG).getName();
-                keyConverter = plugins.newConverter(className, config);
+                keyConverter = plugins.newConverter(config, 
WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, ClassLoaderUsage.PLUGINS);
             }
-            
keyConverter.configure(connConfig.originalsWithPrefix("key.converter."), true);
-
-            Converter valueConverter = 
connConfig.getConfiguredInstance(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, 
Converter.class);
             if (valueConverter == null) {
-                String className = 
config.getClass(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG).getName();
-                valueConverter = plugins.newConverter(className, config);
+                valueConverter = plugins.newConverter(config, 
WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, ClassLoaderUsage.PLUGINS);
             }
-            
valueConverter.configure(connConfig.originalsWithPrefix("value.converter."), 
false);
-
-            HeaderConverter headerConverter = 
connConfig.getConfiguredInstance(WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG, 
HeaderConverter.class);
             if (headerConverter == null) {
-                String className = 
config.getClass(WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG).getName();
-                headerConverter = plugins.newHeaderConverter(className, 
config);
+                headerConverter = plugins.newHeaderConverter(config, 
WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG, ClassLoaderUsage.PLUGINS);
             }
-            Map<String, Object> converterConfig = 
connConfig.originalsWithPrefix("header.converter.");
-            converterConfig.put(ConverterConfig.TYPE_CONFIG, 
ConverterType.HEADER.getName());
-            headerConverter.configure(converterConfig);
 
             workerTask = buildWorkerTask(connConfig, id, task, statusListener, 
initialState, keyConverter, valueConverter, headerConverter, connectorLoader);
             workerTask.initialize(taskConfig);
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
index 8a44d4d..345d7ef 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
@@ -18,6 +18,7 @@ package org.apache.kafka.connect.runtime.isolation;
 
 import org.apache.kafka.connect.connector.Connector;
 import org.apache.kafka.connect.storage.Converter;
+import org.apache.kafka.connect.storage.HeaderConverter;
 import org.apache.kafka.connect.transforms.Transformation;
 import org.reflections.Reflections;
 import org.reflections.util.ClasspathHelper;
@@ -58,6 +59,7 @@ public class DelegatingClassLoader extends URLClassLoader {
     private final Map<String, String> aliases;
     private final SortedSet<PluginDesc<Connector>> connectors;
     private final SortedSet<PluginDesc<Converter>> converters;
+    private final SortedSet<PluginDesc<HeaderConverter>> headerConverters;
     private final SortedSet<PluginDesc<Transformation>> transformations;
     private final List<String> pluginPaths;
     private final Map<Path, PluginClassLoader> activePaths;
@@ -70,6 +72,7 @@ public class DelegatingClassLoader extends URLClassLoader {
         this.activePaths = new HashMap<>();
         this.connectors = new TreeSet<>();
         this.converters = new TreeSet<>();
+        this.headerConverters = new TreeSet<>();
         this.transformations = new TreeSet<>();
     }
 
@@ -85,6 +88,10 @@ public class DelegatingClassLoader extends URLClassLoader {
         return converters;
     }
 
+    public Set<PluginDesc<HeaderConverter>> headerConverters() {
+        return headerConverters;
+    }
+
     public Set<PluginDesc<Transformation>> transformations() {
         return transformations;
     }
@@ -214,6 +221,8 @@ public class DelegatingClassLoader extends URLClassLoader {
             connectors.addAll(plugins.connectors());
             addPlugins(plugins.converters(), loader);
             converters.addAll(plugins.converters());
+            addPlugins(plugins.headerConverters(), loader);
+            headerConverters.addAll(plugins.headerConverters());
             addPlugins(plugins.transformations(), loader);
             transformations.addAll(plugins.transformations());
         }
@@ -265,6 +274,7 @@ public class DelegatingClassLoader extends URLClassLoader {
         return new PluginScanResult(
                 getPluginDesc(reflections, Connector.class, loader),
                 getPluginDesc(reflections, Converter.class, loader),
+                getPluginDesc(reflections, HeaderConverter.class, loader),
                 getPluginDesc(reflections, Transformation.class, loader)
         );
     }
@@ -319,6 +329,7 @@ public class DelegatingClassLoader extends URLClassLoader {
     private void addAllAliases() {
         addAliases(connectors);
         addAliases(converters);
+        addAliases(headerConverters);
         addAliases(transformations);
     }
 
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginScanResult.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginScanResult.java
index f3d2f21..c680f08 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginScanResult.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginScanResult.java
@@ -18,6 +18,7 @@ package org.apache.kafka.connect.runtime.isolation;
 
 import org.apache.kafka.connect.connector.Connector;
 import org.apache.kafka.connect.storage.Converter;
+import org.apache.kafka.connect.storage.HeaderConverter;
 import org.apache.kafka.connect.transforms.Transformation;
 
 import java.util.Collection;
@@ -25,15 +26,18 @@ import java.util.Collection;
 public class PluginScanResult {
     private final Collection<PluginDesc<Connector>> connectors;
     private final Collection<PluginDesc<Converter>> converters;
+    private final Collection<PluginDesc<HeaderConverter>> headerConverters;
     private final Collection<PluginDesc<Transformation>> transformations;
 
     public PluginScanResult(
             Collection<PluginDesc<Connector>> connectors,
             Collection<PluginDesc<Converter>> converters,
+            Collection<PluginDesc<HeaderConverter>> headerConverters,
             Collection<PluginDesc<Transformation>> transformations
     ) {
         this.connectors = connectors;
         this.converters = converters;
+        this.headerConverters = headerConverters;
         this.transformations = transformations;
     }
 
@@ -45,11 +49,18 @@ public class PluginScanResult {
         return converters;
     }
 
+    public Collection<PluginDesc<HeaderConverter>> headerConverters() {
+        return headerConverters;
+    }
+
     public Collection<PluginDesc<Transformation>> transformations() {
         return transformations;
     }
 
     public boolean isEmpty() {
-        return connectors().isEmpty() && converters().isEmpty() && 
transformations().isEmpty();
+        return connectors().isEmpty()
+               && converters().isEmpty()
+               && headerConverters().isEmpty()
+               && transformations().isEmpty();
     }
 }
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
index bcf2afb..94f2771 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
@@ -16,7 +16,7 @@
  */
 package org.apache.kafka.connect.runtime.isolation;
 
-import org.apache.kafka.common.Configurable;
+import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.config.AbstractConfig;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.connect.connector.ConnectRecord;
@@ -25,6 +25,8 @@ import org.apache.kafka.connect.connector.Task;
 import org.apache.kafka.connect.errors.ConnectException;
 import org.apache.kafka.connect.runtime.WorkerConfig;
 import org.apache.kafka.connect.storage.Converter;
+import org.apache.kafka.connect.storage.ConverterConfig;
+import org.apache.kafka.connect.storage.ConverterType;
 import org.apache.kafka.connect.storage.HeaderConverter;
 import org.apache.kafka.connect.transforms.Transformation;
 import org.slf4j.Logger;
@@ -39,6 +41,12 @@ import java.util.Map;
 import java.util.Set;
 
 public class Plugins {
+
+    public enum ClassLoaderUsage {
+        CURRENT_CLASSLOADER,
+        PLUGINS
+    }
+
     private static final Logger log = LoggerFactory.getLogger(Plugins.class);
     private final DelegatingClassLoader delegatingLoader;
 
@@ -71,14 +79,6 @@ public class Plugins {
         }
     }
 
-    protected static <T> T newConfiguredPlugin(AbstractConfig config, Class<T> 
klass) {
-        T plugin = Utils.newInstance(klass);
-        if (plugin instanceof Configurable) {
-            ((Configurable) plugin).configure(config.originals());
-        }
-        return plugin;
-    }
-
     @SuppressWarnings("unchecked")
     protected static <U> Class<? extends U> pluginClass(
             DelegatingClassLoader loader,
@@ -185,46 +185,131 @@ public class Plugins {
         return newPlugin(taskClass);
     }
 
-    public Converter newConverter(String converterClassOrAlias) {
-        return newConverter(converterClassOrAlias, null);
+    /**
+     * If the given configuration defines a {@link Converter} using the named 
configuration property, return a new configured instance.
+     *
+     * @param config             the configuration containing the {@link 
Converter}'s configuration; may not be null
+     * @param classPropertyName  the name of the property that contains the 
name of the {@link Converter} class; may not be null
+     * @param classLoaderUsage   which classloader should be used
+     * @return the instantiated and configured {@link Converter}; null if the 
configuration did not define the specified property
+     * @throws ConnectException if the {@link Converter} implementation class 
could not be found
+     */
+    public Converter newConverter(AbstractConfig config, String 
classPropertyName, ClassLoaderUsage classLoaderUsage) {
+        if (!config.originals().containsKey(classPropertyName)) {
+            // This configuration does not define the converter via the 
specified property name
+            return null;
+        }
+        Converter plugin = null;
+        switch (classLoaderUsage) {
+            case CURRENT_CLASSLOADER:
+                // Attempt to load first with the current classloader, and 
plugins as a fallback.
+                // Note: we can't use config.getConfiguredInstance because 
Converter doesn't implement Configurable, and even if it did
+                // we have to remove the property prefixes before calling 
config(...) and we still always want to call Converter.config.
+                plugin = getInstance(config, classPropertyName, 
Converter.class);
+                break;
+            case PLUGINS:
+                // Attempt to load with the plugin class loader, which uses 
the current classloader as a fallback
+                String converterClassOrAlias = 
config.getClass(classPropertyName).getName();
+                Class<? extends Converter> klass;
+                try {
+                    klass = pluginClass(delegatingLoader, 
converterClassOrAlias, Converter.class);
+                } catch (ClassNotFoundException e) {
+                    throw new ConnectException(
+                            "Failed to find any class that implements 
Converter and which name matches "
+                            + converterClassOrAlias + ", available converters 
are: "
+                            + pluginNames(delegatingLoader.converters())
+                    );
+                }
+                plugin = newPlugin(klass);
+                break;
+        }
+        if (plugin == null) {
+            throw new ConnectException("Unable to instantiate the Converter 
specified in '" + classPropertyName + "'");
+        }
+
+        // Determine whether this is a key or value converter based upon the 
supplied property name ...
+        final boolean isKeyConverter = 
WorkerConfig.KEY_CONVERTER_CLASS_CONFIG.equals(classPropertyName)
+                                     || 
WorkerConfig.INTERNAL_KEY_CONVERTER_CLASS_CONFIG.equals(classPropertyName);
+
+        // Configure the Converter using only the old configuration mechanism 
...
+        String configPrefix = classPropertyName + ".";
+        Map<String, Object> converterConfig = 
config.originalsWithPrefix(configPrefix);
+        plugin.configure(converterConfig, isKeyConverter);
+        return plugin;
     }
 
-    public Converter newConverter(String converterClassOrAlias, AbstractConfig 
config) {
-        Class<? extends Converter> klass;
-        try {
-            klass = pluginClass(
-                    delegatingLoader,
-                    converterClassOrAlias,
-                    Converter.class
-            );
-        } catch (ClassNotFoundException e) {
-            throw new ConnectException(
-                    "Failed to find any class that implements Converter and 
which name matches "
-                            + converterClassOrAlias
-                            + ", available converters are: "
-                            + pluginNames(delegatingLoader.converters())
-            );
+    /**
+     * If the given configuration defines a {@link HeaderConverter} using the 
named configuration property, return a new configured
+     * instance.
+     *
+     * @param config             the configuration containing the {@link 
Converter}'s configuration; may not be null
+     * @param classPropertyName  the name of the property that contains the 
name of the {@link Converter} class; may not be null
+     * @param classLoaderUsage   which classloader should be used
+     * @return the instantiated and configured {@link HeaderConverter}; null 
if the configuration did not define the specified property
+     * @throws ConnectException if the {@link HeaderConverter} implementation 
class could not be found
+     */
+    public HeaderConverter newHeaderConverter(AbstractConfig config, String 
classPropertyName, ClassLoaderUsage classLoaderUsage) {
+        if (!config.originals().containsKey(classPropertyName)) {
+            // This configuration does not define the header converter via the 
specified property name
+            return null;
         }
-        return config != null ? newConfiguredPlugin(config, klass) : 
newPlugin(klass);
+        HeaderConverter plugin = null;
+        switch (classLoaderUsage) {
+            case CURRENT_CLASSLOADER:
+                // Attempt to load first with the current classloader, and 
plugins as a fallback.
+                // Note: we can't use config.getConfiguredInstance because we 
have to remove the property prefixes
+                // before calling config(...)
+                plugin = getInstance(config, classPropertyName, 
HeaderConverter.class);
+                break;
+            case PLUGINS:
+                // Attempt to load with the plugin class loader, which uses 
the current classloader as a fallback
+                String converterClassOrAlias = 
config.getClass(classPropertyName).getName();
+                Class<? extends HeaderConverter> klass;
+                try {
+                    klass = pluginClass(
+                            delegatingLoader,
+                            converterClassOrAlias,
+                            HeaderConverter.class
+                    );
+                } catch (ClassNotFoundException e) {
+                    throw new ConnectException(
+                            "Failed to find any class that implements 
HeaderConverter and which name matches "
+                                    + converterClassOrAlias
+                                    + ", available header converters are: "
+                                    + 
pluginNames(delegatingLoader.headerConverters())
+                    );
+                }
+                plugin = newPlugin(klass);
+        }
+        if (plugin == null) {
+            throw new ConnectException("Unable to instantiate the Converter 
specified in '" + classPropertyName + "'");
+        }
+
+        String configPrefix = classPropertyName + ".";
+        Map<String, Object> converterConfig = 
config.originalsWithPrefix(configPrefix);
+        converterConfig.put(ConverterConfig.TYPE_CONFIG, 
ConverterType.HEADER.getName());
+        plugin.configure(converterConfig);
+        return plugin;
     }
 
-    public HeaderConverter newHeaderConverter(String converterClassOrAlias, 
AbstractConfig config) {
-        Class<? extends HeaderConverter> klass;
-        try {
-            klass = pluginClass(
-                    delegatingLoader,
-                    converterClassOrAlias,
-                    HeaderConverter.class
-            );
-        } catch (ClassNotFoundException e) {
-            throw new ConnectException(
-                    "Failed to find any class that implements HeaderConverter 
and which name matches "
-                            + converterClassOrAlias
-                            + ", available header converters are: "
-                            + pluginNames(delegatingLoader.converters())
-            );
+    /**
+     * Get an instance of the give class specified by the given configuration 
key.
+     *
+     * @param key The configuration key for the class
+     * @param t The interface the class should implement
+     * @return A instance of the class
+     */
+    private <T> T getInstance(AbstractConfig config, String key, Class<T> t) {
+        Class<?> c = config.getClass(key);
+        if (c == null) {
+            return null;
+        }
+        // Instantiate the class, but we don't know if the class extends the 
supplied type
+        Object o = Utils.newInstance(c);
+        if (!t.isInstance(o)) {
+            throw new KafkaException(c.getName() + " is not an instance of " + 
t.getName());
         }
-        return config != null ? newConfiguredPlugin(config, klass) : 
newPlugin(klass);
+        return t.cast(o);
     }
 
     public <R extends ConnectRecord<R>> Transformation<R> newTranformations(
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
index 2c04b88..f062436 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
@@ -18,6 +18,8 @@ package org.apache.kafka.connect.runtime;
 
 import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.common.Configurable;
+import org.apache.kafka.common.config.AbstractConfig;
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.common.utils.Time;
@@ -28,11 +30,13 @@ import org.apache.kafka.connect.data.Schema;
 import org.apache.kafka.connect.data.SchemaAndValue;
 import org.apache.kafka.connect.errors.ConnectException;
 import org.apache.kafka.connect.json.JsonConverter;
+import org.apache.kafka.connect.json.JsonConverterConfig;
 import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup;
 import org.apache.kafka.connect.runtime.MockConnectMetrics.MockMetricsReporter;
 import org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader;
 import org.apache.kafka.connect.runtime.isolation.PluginClassLoader;
 import org.apache.kafka.connect.runtime.isolation.Plugins;
+import org.apache.kafka.connect.runtime.isolation.Plugins.ClassLoaderUsage;
 import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
 import org.apache.kafka.connect.sink.SinkTask;
 import org.apache.kafka.connect.source.SourceRecord;
@@ -64,8 +68,11 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 
+import static org.easymock.EasyMock.anyObject;
+import static org.easymock.EasyMock.eq;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.fail;
 
 @RunWith(PowerMockRunner.class)
@@ -99,6 +106,9 @@ public class WorkerTest extends ThreadedTest {
     @Mock private WorkerSourceTask workerTask;
     @Mock private Converter keyConverter;
     @Mock private Converter valueConverter;
+    @Mock private Converter taskKeyConverter;
+    @Mock private Converter taskValueConverter;
+    @Mock private HeaderConverter taskHeaderConverter;
 
     @Before
     public void setup() {
@@ -140,7 +150,7 @@ public class WorkerTest extends ThreadedTest {
         EasyMock.expect(plugins.compareAndSwapLoaders(connector))
                 .andReturn(delegatingLoader)
                 .times(2);
-        connector.initialize(EasyMock.anyObject(ConnectorContext.class));
+        connector.initialize(anyObject(ConnectorContext.class));
         EasyMock.expectLastCall();
         connector.start(props);
         EasyMock.expectLastCall();
@@ -250,7 +260,7 @@ public class WorkerTest extends ThreadedTest {
         EasyMock.expect(plugins.compareAndSwapLoaders(connector))
                 .andReturn(delegatingLoader)
                 .times(2);
-        connector.initialize(EasyMock.anyObject(ConnectorContext.class));
+        connector.initialize(anyObject(ConnectorContext.class));
         EasyMock.expectLastCall();
         connector.start(props);
         EasyMock.expectLastCall();
@@ -314,7 +324,7 @@ public class WorkerTest extends ThreadedTest {
         EasyMock.expect(plugins.compareAndSwapLoaders(connector))
                 .andReturn(delegatingLoader)
                 .times(2);
-        connector.initialize(EasyMock.anyObject(ConnectorContext.class));
+        connector.initialize(anyObject(ConnectorContext.class));
         EasyMock.expectLastCall();
         connector.start(props);
         EasyMock.expectLastCall();
@@ -391,7 +401,7 @@ public class WorkerTest extends ThreadedTest {
         EasyMock.expect(plugins.compareAndSwapLoaders(connector))
                 .andReturn(delegatingLoader)
                 .times(3);
-        connector.initialize(EasyMock.anyObject(ConnectorContext.class));
+        connector.initialize(anyObject(ConnectorContext.class));
         EasyMock.expectLastCall();
         connector.start(props);
         EasyMock.expectLastCall();
@@ -461,7 +471,7 @@ public class WorkerTest extends ThreadedTest {
 
     @Test
     public void testAddRemoveTask() throws Exception {
-        expectConverters(true);
+        expectConverters();
         expectStartStorage();
 
         EasyMock.expect(workerTask.id()).andStubReturn(TASK_ID);
@@ -470,19 +480,19 @@ public class WorkerTest extends ThreadedTest {
         PowerMock.expectNew(
                 WorkerSourceTask.class, EasyMock.eq(TASK_ID),
                 EasyMock.eq(task),
-                EasyMock.anyObject(TaskStatus.Listener.class),
+                anyObject(TaskStatus.Listener.class),
                 EasyMock.eq(TargetState.STARTED),
-                EasyMock.anyObject(JsonConverter.class),
-                EasyMock.anyObject(JsonConverter.class),
-                EasyMock.anyObject(JsonConverter.class),
+                anyObject(JsonConverter.class),
+                anyObject(JsonConverter.class),
+                anyObject(JsonConverter.class),
                 EasyMock.eq(TransformationChain.<SourceRecord>noOp()),
-                EasyMock.anyObject(KafkaProducer.class),
-                EasyMock.anyObject(OffsetStorageReader.class),
-                EasyMock.anyObject(OffsetStorageWriter.class),
+                anyObject(KafkaProducer.class),
+                anyObject(OffsetStorageReader.class),
+                anyObject(OffsetStorageWriter.class),
                 EasyMock.eq(config),
-                EasyMock.anyObject(ConnectMetrics.class),
-                EasyMock.anyObject(ClassLoader.class),
-                EasyMock.anyObject(Time.class))
+                anyObject(ConnectMetrics.class),
+                anyObject(ClassLoader.class),
+                anyObject(Time.class))
                 .andReturn(workerTask);
         Map<String, String> origProps = new HashMap<>();
         origProps.put(TaskConfig.TASK_CLASS_CONFIG, 
TestSourceTask.class.getName());
@@ -496,11 +506,14 @@ public class WorkerTest extends ThreadedTest {
 
         workerTask.initialize(taskConfig);
         EasyMock.expectLastCall();
-        // We should expect this call, but the pluginLoader being swapped in 
is only mocked.
-        // Serializers for the Producer that the task generates. These are 
loaded while the PluginClassLoader is active
-        // and then delegated to the system classloader. This is only called 
once due to caching
-        // 
EasyMock.expect(pluginLoader.loadClass(ByteArraySerializer.class.getName()))
-        //        .andReturn((Class) ByteArraySerializer.class);
+
+        // Expect that the worker will create converters and will find them 
using the current classloader ...
+        assertNotNull(taskKeyConverter);
+        assertNotNull(taskValueConverter);
+        assertNotNull(taskHeaderConverter);
+        expectTaskKeyConverters(ClassLoaderUsage.CURRENT_CLASSLOADER, 
taskKeyConverter);
+        expectTaskValueConverters(ClassLoaderUsage.CURRENT_CLASSLOADER, 
taskValueConverter);
+        expectTaskHeaderConverter(ClassLoaderUsage.CURRENT_CLASSLOADER, 
taskHeaderConverter);
 
         workerTask.run();
         EasyMock.expectLastCall();
@@ -595,7 +608,7 @@ public class WorkerTest extends ThreadedTest {
 
     @Test
     public void testCleanupTasksOnStop() throws Exception {
-        expectConverters(true);
+        expectConverters();
         expectStartStorage();
 
         EasyMock.expect(workerTask.id()).andStubReturn(TASK_ID);
@@ -604,19 +617,19 @@ public class WorkerTest extends ThreadedTest {
         PowerMock.expectNew(
                 WorkerSourceTask.class, EasyMock.eq(TASK_ID),
                 EasyMock.eq(task),
-                EasyMock.anyObject(TaskStatus.Listener.class),
+                anyObject(TaskStatus.Listener.class),
                 EasyMock.eq(TargetState.STARTED),
-                EasyMock.anyObject(JsonConverter.class),
-                EasyMock.anyObject(JsonConverter.class),
-                EasyMock.anyObject(JsonConverter.class),
+                anyObject(JsonConverter.class),
+                anyObject(JsonConverter.class),
+                anyObject(JsonConverter.class),
                 EasyMock.eq(TransformationChain.<SourceRecord>noOp()),
-                EasyMock.anyObject(KafkaProducer.class),
-                EasyMock.anyObject(OffsetStorageReader.class),
-                EasyMock.anyObject(OffsetStorageWriter.class),
-                EasyMock.anyObject(WorkerConfig.class),
-                EasyMock.anyObject(ConnectMetrics.class),
+                anyObject(KafkaProducer.class),
+                anyObject(OffsetStorageReader.class),
+                anyObject(OffsetStorageWriter.class),
+                anyObject(WorkerConfig.class),
+                anyObject(ConnectMetrics.class),
                 EasyMock.eq(pluginLoader),
-                EasyMock.anyObject(Time.class))
+                anyObject(Time.class))
                 .andReturn(workerTask);
         Map<String, String> origProps = new HashMap<>();
         origProps.put(TaskConfig.TASK_CLASS_CONFIG, 
TestSourceTask.class.getName());
@@ -630,11 +643,17 @@ public class WorkerTest extends ThreadedTest {
 
         workerTask.initialize(taskConfig);
         EasyMock.expectLastCall();
-        // We should expect this call, but the pluginLoader being swapped in 
is only mocked.
-        // Serializers for the Producer that the task generates. These are 
loaded while the PluginClassLoader is active
-        // and then delegated to the system classloader. This is only called 
once due to caching
-        // 
EasyMock.expect(pluginLoader.loadClass(ByteArraySerializer.class.getName()))
-        //        .andReturn((Class) ByteArraySerializer.class);
+
+        // Expect that the worker will create converters and will not 
initially find them using the current classloader ...
+        assertNotNull(taskKeyConverter);
+        assertNotNull(taskValueConverter);
+        assertNotNull(taskHeaderConverter);
+        expectTaskKeyConverters(ClassLoaderUsage.CURRENT_CLASSLOADER, null);
+        expectTaskKeyConverters(ClassLoaderUsage.PLUGINS, taskKeyConverter);
+        expectTaskValueConverters(ClassLoaderUsage.CURRENT_CLASSLOADER, null);
+        expectTaskValueConverters(ClassLoaderUsage.PLUGINS, 
taskValueConverter);
+        expectTaskHeaderConverter(ClassLoaderUsage.CURRENT_CLASSLOADER, null);
+        expectTaskHeaderConverter(ClassLoaderUsage.PLUGINS, 
taskHeaderConverter);
 
         workerTask.run();
         EasyMock.expectLastCall();
@@ -682,26 +701,26 @@ public class WorkerTest extends ThreadedTest {
         EasyMock.expect(workerTask.id()).andStubReturn(TASK_ID);
 
         Capture<TestConverter> keyConverter = EasyMock.newCapture();
-        Capture<TestConverter> valueConverter = EasyMock.newCapture();
+        Capture<TestConfigurableConverter> valueConverter = 
EasyMock.newCapture();
         Capture<HeaderConverter> headerConverter = EasyMock.newCapture();
 
         
EasyMock.expect(plugins.currentThreadLoader()).andReturn(delegatingLoader).times(2);
         PowerMock.expectNew(
                 WorkerSourceTask.class, EasyMock.eq(TASK_ID),
                 EasyMock.eq(task),
-                EasyMock.anyObject(TaskStatus.Listener.class),
+                anyObject(TaskStatus.Listener.class),
                 EasyMock.eq(TargetState.STARTED),
                 EasyMock.capture(keyConverter),
                 EasyMock.capture(valueConverter),
                 EasyMock.capture(headerConverter),
                 EasyMock.eq(TransformationChain.<SourceRecord>noOp()),
-                EasyMock.anyObject(KafkaProducer.class),
-                EasyMock.anyObject(OffsetStorageReader.class),
-                EasyMock.anyObject(OffsetStorageWriter.class),
-                EasyMock.anyObject(WorkerConfig.class),
-                EasyMock.anyObject(ConnectMetrics.class),
+                anyObject(KafkaProducer.class),
+                anyObject(OffsetStorageReader.class),
+                anyObject(OffsetStorageWriter.class),
+                anyObject(WorkerConfig.class),
+                anyObject(ConnectMetrics.class),
                 EasyMock.eq(pluginLoader),
-                EasyMock.anyObject(Time.class))
+                anyObject(Time.class))
                 .andReturn(workerTask);
         Map<String, String> origProps = new HashMap<>();
         origProps.put(TaskConfig.TASK_CLASS_CONFIG, 
TestSourceTask.class.getName());
@@ -715,11 +734,17 @@ public class WorkerTest extends ThreadedTest {
 
         workerTask.initialize(taskConfig);
         EasyMock.expectLastCall();
-        // We should expect this call, but the pluginLoader being swapped in 
is only mocked.
-        // Serializers for the Producer that the task generates. These are 
loaded while the PluginClassLoader is active
-        // and then delegated to the system classloader. This is only called 
once due to caching
-        // 
EasyMock.expect(pluginLoader.loadClass(ByteArraySerializer.class.getName()))
-        //        .andReturn((Class) ByteArraySerializer.class);
+
+        // Expect that the worker will create converters and will not 
initially find them using the current classloader ...
+        assertNotNull(taskKeyConverter);
+        assertNotNull(taskValueConverter);
+        assertNotNull(taskHeaderConverter);
+        expectTaskKeyConverters(ClassLoaderUsage.CURRENT_CLASSLOADER, null);
+        expectTaskKeyConverters(ClassLoaderUsage.PLUGINS, taskKeyConverter);
+        expectTaskValueConverters(ClassLoaderUsage.CURRENT_CLASSLOADER, null);
+        expectTaskValueConverters(ClassLoaderUsage.PLUGINS, 
taskValueConverter);
+        expectTaskHeaderConverter(ClassLoaderUsage.CURRENT_CLASSLOADER, null);
+        expectTaskHeaderConverter(ClassLoaderUsage.PLUGINS, 
taskHeaderConverter);
 
         workerTask.run();
         EasyMock.expectLastCall();
@@ -753,7 +778,7 @@ public class WorkerTest extends ThreadedTest {
         Map<String, String> connProps = anyConnectorConfigMap();
         connProps.put(ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG, 
TestConverter.class.getName());
         connProps.put("key.converter.extra.config", "foo");
-        connProps.put(ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG, 
TestConverter.class.getName());
+        connProps.put(ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG, 
TestConfigurableConverter.class.getName());
         connProps.put("value.converter.extra.config", "bar");
         worker.startTask(TASK_ID, connProps, origProps, taskStatusListener, 
TargetState.STARTED);
         assertStatistics(worker, 0, 1);
@@ -765,9 +790,7 @@ public class WorkerTest extends ThreadedTest {
         worker.stop();
         assertStatistics(worker, 0, 0);
 
-        // Validate extra configs got passed through to overridden converters
-        assertEquals("foo", 
keyConverter.getValue().configs.get("extra.config"));
-        assertEquals("bar", 
valueConverter.getValue().configs.get("extra.config"));
+        // We've mocked the Plugin.newConverter method, so we don't currently 
configure the converters
 
         PowerMock.verifyAll();
     }
@@ -808,7 +831,7 @@ public class WorkerTest extends ThreadedTest {
     }
 
     private void expectStartStorage() {
-        offsetBackingStore.configure(EasyMock.anyObject(WorkerConfig.class));
+        offsetBackingStore.configure(anyObject(WorkerConfig.class));
         EasyMock.expectLastCall();
         offsetBackingStore.start();
         EasyMock.expectLastCall();
@@ -832,19 +855,10 @@ public class WorkerTest extends ThreadedTest {
         if (expectDefaultConverters) {
 
             // Instantiate and configure default
-            
EasyMock.expect(plugins.newConverter(JsonConverter.class.getName(), config))
+            EasyMock.expect(plugins.newConverter(config, 
WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, ClassLoaderUsage.PLUGINS))
                     .andReturn(keyConverter);
-            keyConverter.configure(
-                    EasyMock.<Map<String, ?>>anyObject(),
-                    EasyMock.anyBoolean()
-            );
-            EasyMock.expectLastCall();
-            
EasyMock.expect(plugins.newConverter(JsonConverter.class.getName(), config))
+            EasyMock.expect(plugins.newConverter(config, 
WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, ClassLoaderUsage.PLUGINS))
                     .andReturn(valueConverter);
-            valueConverter.configure(
-                    EasyMock.<Map<String, ?>>anyObject(),
-                    EasyMock.anyBoolean()
-            );
             EasyMock.expectLastCall();
         }
 
@@ -853,22 +867,50 @@ public class WorkerTest extends ThreadedTest {
         Converter internalValueConverter = 
PowerMock.createMock(converterClass);
 
         // Instantiate and configure internal
-        EasyMock.expect(plugins.newConverter(JsonConverter.class.getName(), 
config))
-                .andReturn(internalKeyConverter);
-        internalKeyConverter.configure(
-                EasyMock.<Map<String, ?>>anyObject(),
-                EasyMock.anyBoolean()
-        );
-        EasyMock.expectLastCall();
-        EasyMock.expect(plugins.newConverter(JsonConverter.class.getName(), 
config))
-                .andReturn(internalValueConverter);
-        internalValueConverter.configure(
-                EasyMock.<Map<String, ?>>anyObject(),
-                EasyMock.anyBoolean()
-        );
+        EasyMock.expect(
+                plugins.newConverter(
+                        config,
+                        WorkerConfig.INTERNAL_KEY_CONVERTER_CLASS_CONFIG,
+                        ClassLoaderUsage.PLUGINS
+                )
+        ).andReturn(internalKeyConverter);
+        EasyMock.expect(
+                plugins.newConverter(
+                        config,
+                        WorkerConfig.INTERNAL_VALUE_CONVERTER_CLASS_CONFIG,
+                        ClassLoaderUsage.PLUGINS
+                )
+        ).andReturn(internalValueConverter);
         EasyMock.expectLastCall();
     }
 
+    private void expectTaskKeyConverters(ClassLoaderUsage classLoaderUsage, 
Converter returning) {
+        EasyMock.expect(
+                plugins.newConverter(
+                        anyObject(AbstractConfig.class),
+                        eq(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG),
+                        eq(classLoaderUsage)))
+                .andReturn(returning);
+    }
+
+    private void expectTaskValueConverters(ClassLoaderUsage classLoaderUsage, 
Converter returning) {
+        EasyMock.expect(
+                plugins.newConverter(
+                        anyObject(AbstractConfig.class),
+                        eq(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG),
+                        eq(classLoaderUsage)))
+                .andReturn(returning);
+    }
+
+    private void expectTaskHeaderConverter(ClassLoaderUsage classLoaderUsage, 
HeaderConverter returning) {
+        EasyMock.expect(
+                plugins.newHeaderConverter(
+                        anyObject(AbstractConfig.class),
+                        eq(WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG),
+                        eq(classLoaderUsage)))
+                .andReturn(returning);
+    }
+
     private Map<String, String> anyConnectorConfigMap() {
         Map<String, String> props = new HashMap<>();
         props.put(ConnectorConfig.NAME_CONFIG, CONNECTOR_ID);
@@ -955,4 +997,33 @@ public class WorkerTest extends ThreadedTest {
             return null;
         }
     }
+
+    public static class TestConfigurableConverter implements Converter, 
Configurable {
+        public Map<String, ?> configs;
+
+        public ConfigDef config() {
+            return JsonConverterConfig.configDef();
+        }
+
+        @Override
+        public void configure(Map<String, ?> configs) {
+            this.configs = configs;
+            new JsonConverterConfig(configs); // requires the `converter.type` 
config be set
+        }
+
+        @Override
+        public void configure(Map<String, ?> configs, boolean isKey) {
+            this.configs = configs;
+        }
+
+        @Override
+        public byte[] fromConnectData(String topic, Schema schema, Object 
value) {
+            return new byte[0];
+        }
+
+        @Override
+        public SchemaAndValue toConnectData(String topic, byte[] value) {
+            return null;
+        }
+    }
 }
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java
new file mode 100644
index 0000000..6de92ee
--- /dev/null
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.connect.runtime.isolation;
+
+import org.apache.kafka.common.Configurable;
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaAndValue;
+import org.apache.kafka.connect.json.JsonConverterConfig;
+import org.apache.kafka.connect.runtime.WorkerConfig;
+import org.apache.kafka.connect.runtime.isolation.Plugins.ClassLoaderUsage;
+import org.apache.kafka.connect.storage.Converter;
+import org.apache.kafka.connect.storage.ConverterConfig;
+import org.apache.kafka.connect.storage.ConverterType;
+import org.apache.kafka.connect.storage.HeaderConverter;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+public class PluginsTest {
+
+    private static Map<String, String> props;
+    private static Plugins plugins;
+    private AbstractConfig config;
+    private TestConverter converter;
+    private TestHeaderConverter headerConverter;
+
+    @BeforeClass
+    public static void beforeAll() {
+        props = new HashMap<>();
+        props.put(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, 
TestConverter.class.getName());
+        props.put(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, 
TestConverter.class.getName());
+        props.put("key.converter." + 
JsonConverterConfig.SCHEMAS_ENABLE_CONFIG, "true");
+        props.put("value.converter." + 
JsonConverterConfig.SCHEMAS_ENABLE_CONFIG, "true");
+        props.put("key.converter.extra.config", "foo1");
+        props.put("value.converter.extra.config", "foo2");
+        props.put(WorkerConfig.INTERNAL_KEY_CONVERTER_CLASS_CONFIG, 
TestConverter.class.getName());
+        props.put(WorkerConfig.INTERNAL_VALUE_CONVERTER_CLASS_CONFIG, 
TestConverter.class.getName());
+        props.put("internal.key.converter." + 
JsonConverterConfig.SCHEMAS_ENABLE_CONFIG, "false");
+        props.put("internal.value.converter." + 
JsonConverterConfig.SCHEMAS_ENABLE_CONFIG, "false");
+        props.put("internal.key.converter.extra.config", "bar1");
+        props.put("internal.value.converter.extra.config", "bar2");
+        props.put(WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG, 
TestHeaderConverter.class.getName());
+        props.put("header.converter.extra.config", "baz");
+
+        // Set up the plugins to have no additional plugin directories.
+        // This won't allow us to test classpath isolation, but it will allow 
us to test some of the utility methods.
+        props.put(WorkerConfig.PLUGIN_PATH_CONFIG, "");
+        plugins = new Plugins(props);
+    }
+
+    @Before
+    public void setup() {
+        this.config = new TestableWorkerConfig(props);
+    }
+
+    @Test
+    public void shouldInstantiateAndConfigureConverters() {
+        
instantiateAndConfigureConverter(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, 
ClassLoaderUsage.CURRENT_CLASSLOADER);
+        // Validate extra configs got passed through to overridden converters
+        assertEquals("true", 
converter.configs.get(JsonConverterConfig.SCHEMAS_ENABLE_CONFIG));
+        assertEquals("foo1", converter.configs.get("extra.config"));
+
+        
instantiateAndConfigureConverter(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, 
ClassLoaderUsage.PLUGINS);
+        // Validate extra configs got passed through to overridden converters
+        assertEquals("true", 
converter.configs.get(JsonConverterConfig.SCHEMAS_ENABLE_CONFIG));
+        assertEquals("foo2", converter.configs.get("extra.config"));
+    }
+
+    @Test
+    public void shouldInstantiateAndConfigureInternalConverters() {
+        
instantiateAndConfigureConverter(WorkerConfig.INTERNAL_KEY_CONVERTER_CLASS_CONFIG,
 ClassLoaderUsage.CURRENT_CLASSLOADER);
+        // Validate extra configs got passed through to overridden converters
+        assertEquals("false", 
converter.configs.get(JsonConverterConfig.SCHEMAS_ENABLE_CONFIG));
+        assertEquals("bar1", converter.configs.get("extra.config"));
+
+        
instantiateAndConfigureConverter(WorkerConfig.INTERNAL_VALUE_CONVERTER_CLASS_CONFIG,
 ClassLoaderUsage.PLUGINS);
+        // Validate extra configs got passed through to overridden converters
+        assertEquals("false", 
converter.configs.get(JsonConverterConfig.SCHEMAS_ENABLE_CONFIG));
+        assertEquals("bar2", converter.configs.get("extra.config"));
+    }
+
+    @Test
+    public void shouldInstantiateAndConfigureHeaderConverter() {
+        
instantiateAndConfigureHeaderConverter(WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG);
+        // Validate extra configs got passed through to overridden converters
+        assertConverterType(ConverterType.HEADER, headerConverter.configs);
+        assertEquals("baz", headerConverter.configs.get("extra.config"));
+    }
+
+    protected void instantiateAndConfigureConverter(String configPropName, 
ClassLoaderUsage classLoaderUsage) {
+        converter = (TestConverter) plugins.newConverter(config, 
configPropName, classLoaderUsage);
+        assertNotNull(converter);
+    }
+
+    protected void instantiateAndConfigureHeaderConverter(String 
configPropName) {
+        headerConverter = (TestHeaderConverter) 
plugins.newHeaderConverter(config, configPropName, 
ClassLoaderUsage.CURRENT_CLASSLOADER);
+        assertNotNull(headerConverter);
+    }
+
+    protected void assertConverterType(ConverterType type, Map<String, ?> 
props) {
+        assertEquals(type.getName(), props.get(ConverterConfig.TYPE_CONFIG));
+    }
+
+    public static class TestableWorkerConfig extends WorkerConfig {
+        public TestableWorkerConfig(Map<String, String> props) {
+            super(WorkerConfig.baseConfigDef(), props);
+        }
+    }
+
+    public static class TestConverter implements Converter, Configurable {
+        public Map<String, ?> configs;
+
+        public ConfigDef config() {
+            return JsonConverterConfig.configDef();
+        }
+
+        @Override
+        public void configure(Map<String, ?> configs) {
+            this.configs = configs;
+            new JsonConverterConfig(configs); // requires the `converter.type` 
config be set
+        }
+
+        @Override
+        public void configure(Map<String, ?> configs, boolean isKey) {
+            this.configs = configs;
+        }
+
+        @Override
+        public byte[] fromConnectData(String topic, Schema schema, Object 
value) {
+            return new byte[0];
+        }
+
+        @Override
+        public SchemaAndValue toConnectData(String topic, byte[] value) {
+            return null;
+        }
+    }
+
+    public static class TestHeaderConverter implements HeaderConverter {
+        public Map<String, ?> configs;
+
+        @Override
+        public ConfigDef config() {
+            return JsonConverterConfig.configDef();
+        }
+
+        @Override
+        public void configure(Map<String, ?> configs) {
+            this.configs = configs;
+            new JsonConverterConfig(configs); // requires the `converter.type` 
config be set
+        }
+
+        @Override
+        public byte[] fromConnectHeader(String topic, String headerKey, Schema 
schema, Object value) {
+            return new byte[0];
+        }
+
+        @Override
+        public SchemaAndValue toConnectHeader(String topic, String headerKey, 
byte[] value) {
+            return null;
+        }
+
+        @Override
+        public void close() throws IOException {
+        }
+    }
+}
\ No newline at end of file

-- 
To stop receiving notification emails like this one, please contact
ewe...@apache.org.

Reply via email to