This is an automated email from the ASF dual-hosted git repository.

ewencp pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 36a8fec  KAFKA-7225: Pretransform validated props
36a8fec is described below

commit 36a8fec0ab2d05a8386ecd386bbbd294c3dc9126
Author: Robert Yokota <rayok...@gmail.com>
AuthorDate: Tue Aug 7 13:18:16 2018 -0700

    KAFKA-7225: Pretransform validated props
    
    If a property requires validation, it should be pretransformed if it is a 
variable reference, in order to have a value that will properly pass the 
validation.
    
    Author: Robert Yokota <rayok...@gmail.com>
    
    Reviewers: Randall Hauch <rha...@gmail.com>, Ewen Cheslack-Postava 
<e...@confluent.io>
    
    Closes #5445 from rayokota/KAFKA-7225-pretransform-validated-props
---
 .../apache/kafka/connect/runtime/AbstractHerder.java |  3 +++
 .../connect/runtime/WorkerConfigTransformer.java     |  8 +++++++-
 .../kafka/connect/runtime/AbstractHerderTest.java    |  4 ++++
 .../runtime/distributed/DistributedHerderTest.java   | 20 ++++++++++++++++++++
 .../runtime/standalone/StandaloneHerderTest.java     | 17 +++++++++++++++++
 tests/kafkatest/tests/connect/connect_test.py        | 11 +++++++++--
 .../templates/connect-file-external.properties       | 16 ++++++++++++++++
 .../connect/templates/connect-standalone.properties  |  3 +++
 8 files changed, 79 insertions(+), 3 deletions(-)

diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
index b5e0ec2..cadb4e0 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
@@ -246,6 +246,9 @@ public abstract class AbstractHerder implements Herder, 
TaskStatus.Listener, Con
 
     @Override
     public ConfigInfos validateConnectorConfig(Map<String, String> 
connectorProps) {
+        if (worker.configTransformer() != null) {
+            connectorProps = 
worker.configTransformer().transform(connectorProps);
+        }
         String connType = 
connectorProps.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG);
         if (connType == null)
             throw new BadRequestException("Connector config " + connectorProps 
+ " contains no connector type");
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java
index 7efb481..1b715c7 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java
@@ -38,10 +38,16 @@ public class WorkerConfigTransformer {
         this.configTransformer = new ConfigTransformer(configProviders);
     }
 
+    public Map<String, String> transform(Map<String, String> configs) {
+        return transform(null, configs);
+    }
+
     public Map<String, String> transform(String connectorName, Map<String, 
String> configs) {
         if (configs == null) return null;
         ConfigTransformerResult result = configTransformer.transform(configs);
-        scheduleReload(connectorName, result.ttls());
+        if (connectorName != null) {
+            scheduleReload(connectorName, result.ttls());
+        }
         return result.data();
     }
 
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
index 5728465..db3cf27 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
@@ -67,6 +67,7 @@ public class AbstractHerderTest {
     private final String connector = "connector";
 
     @MockStrict private Worker worker;
+    @MockStrict private WorkerConfigTransformer transformer;
     @MockStrict private Plugins plugins;
     @MockStrict private ClassLoader classLoader;
     @MockStrict private ConfigBackingStore configStore;
@@ -261,6 +262,9 @@ public class AbstractHerderTest {
         EasyMock.expect(herder.generation()).andStubReturn(generation);
 
         // Call to validateConnectorConfig
+        
EasyMock.expect(worker.configTransformer()).andReturn(transformer).times(2);
+        final Capture<Map<String, String>> configCapture = 
EasyMock.newCapture();
+        
EasyMock.expect(transformer.transform(EasyMock.capture(configCapture))).andAnswer(configCapture::getValue);
         EasyMock.expect(worker.getPlugins()).andStubReturn(plugins);
         final Connector connector;
         try {
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
index 911afe7..a0de8cf 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
@@ -34,6 +34,7 @@ import org.apache.kafka.connect.runtime.TargetState;
 import org.apache.kafka.connect.runtime.TaskConfig;
 import org.apache.kafka.connect.runtime.Worker;
 import org.apache.kafka.connect.runtime.WorkerConfig;
+import org.apache.kafka.connect.runtime.WorkerConfigTransformer;
 import 
org.apache.kafka.connect.runtime.distributed.DistributedHerder.HerderMetrics;
 import org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader;
 import org.apache.kafka.connect.runtime.isolation.PluginClassLoader;
@@ -158,6 +159,7 @@ public class DistributedHerderTest {
     private DistributedHerder herder;
     private MockConnectMetrics metrics;
     @Mock private Worker worker;
+    @Mock private WorkerConfigTransformer transformer;
     @Mock private Callback<Herder.Created<ConnectorInfo>> putConnectorCallback;
     @Mock
     private Plugins plugins;
@@ -356,6 +358,9 @@ public class DistributedHerderTest {
 
         // config validation
         Connector connectorMock = PowerMock.createMock(SourceConnector.class);
+        
EasyMock.expect(worker.configTransformer()).andReturn(transformer).times(2);
+        final Capture<Map<String, String>> configCapture = 
EasyMock.newCapture();
+        
EasyMock.expect(transformer.transform(EasyMock.capture(configCapture))).andAnswer(configCapture::getValue);
         EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(3);
         
EasyMock.expect(plugins.compareAndSwapLoaders(connectorMock)).andReturn(delegatingLoader);
         
EasyMock.expect(plugins.newConnector(EasyMock.anyString())).andReturn(connectorMock);
@@ -399,6 +404,9 @@ public class DistributedHerderTest {
 
         // config validation
         Connector connectorMock = PowerMock.createMock(SourceConnector.class);
+        
EasyMock.expect(worker.configTransformer()).andReturn(transformer).times(2);
+        final Capture<Map<String, String>> configCapture = 
EasyMock.newCapture();
+        
EasyMock.expect(transformer.transform(EasyMock.capture(configCapture))).andAnswer(configCapture::getValue);
         EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(3);
         
EasyMock.expect(plugins.compareAndSwapLoaders(connectorMock)).andReturn(delegatingLoader);
         
EasyMock.expect(plugins.newConnector(EasyMock.anyString())).andReturn(connectorMock);
@@ -444,6 +452,9 @@ public class DistributedHerderTest {
 
         // config validation
         Connector connectorMock = PowerMock.createMock(SourceConnector.class);
+        
EasyMock.expect(worker.configTransformer()).andReturn(transformer).times(2);
+        final Capture<Map<String, String>> configCapture = 
EasyMock.newCapture();
+        
EasyMock.expect(transformer.transform(EasyMock.capture(configCapture))).andAnswer(configCapture::getValue);
         EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(3);
         
EasyMock.expect(plugins.compareAndSwapLoaders(connectorMock)).andReturn(delegatingLoader);
         
EasyMock.expect(plugins.newConnector(EasyMock.anyString())).andReturn(connectorMock);
@@ -495,6 +506,9 @@ public class DistributedHerderTest {
 
         // config validation
         Connector connectorMock = PowerMock.createMock(SinkConnector.class);
+        
EasyMock.expect(worker.configTransformer()).andReturn(transformer).times(2);
+        final Capture<Map<String, String>> configCapture = 
EasyMock.newCapture();
+        
EasyMock.expect(transformer.transform(EasyMock.capture(configCapture))).andAnswer(configCapture::getValue);
         EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(3);
         
EasyMock.expect(plugins.compareAndSwapLoaders(connectorMock)).andReturn(delegatingLoader);
         
EasyMock.expect(plugins.newConnector(EasyMock.anyString())).andReturn(connectorMock);
@@ -530,6 +544,9 @@ public class DistributedHerderTest {
     @Test
     public void testCreateConnectorAlreadyExists() throws Exception {
         EasyMock.expect(member.memberId()).andStubReturn("leader");
+        
EasyMock.expect(worker.configTransformer()).andReturn(transformer).times(2);
+        final Capture<Map<String, String>> configCapture = 
EasyMock.newCapture();
+        
EasyMock.expect(transformer.transform(EasyMock.capture(configCapture))).andAnswer(configCapture::getValue);
         EasyMock.expect(worker.getPlugins()).andReturn(plugins);
         
EasyMock.expect(plugins.newConnector(EasyMock.anyString())).andReturn(null);
         expectRebalance(1, Collections.<String>emptyList(), 
Collections.<ConnectorTaskId>emptyList());
@@ -1339,6 +1356,9 @@ public class DistributedHerderTest {
 
         // config validation
         Connector connectorMock = PowerMock.createMock(SourceConnector.class);
+        
EasyMock.expect(worker.configTransformer()).andReturn(transformer).times(2);
+        final Capture<Map<String, String>> configCapture = 
EasyMock.newCapture();
+        
EasyMock.expect(transformer.transform(EasyMock.capture(configCapture))).andAnswer(configCapture::getValue);
         EasyMock.expect(worker.getPlugins()).andReturn(plugins).anyTimes();
         
EasyMock.expect(plugins.compareAndSwapLoaders(connectorMock)).andReturn(delegatingLoader);
         
EasyMock.expect(plugins.newConnector(EasyMock.anyString())).andReturn(connectorMock);
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
index 5372a3a..b98c15e 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
@@ -36,6 +36,7 @@ import org.apache.kafka.connect.runtime.TargetState;
 import org.apache.kafka.connect.runtime.TaskConfig;
 import org.apache.kafka.connect.runtime.TaskStatus;
 import org.apache.kafka.connect.runtime.Worker;
+import org.apache.kafka.connect.runtime.WorkerConfigTransformer;
 import org.apache.kafka.connect.runtime.WorkerConnector;
 import org.apache.kafka.connect.runtime.distributed.ClusterConfigState;
 import org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader;
@@ -101,6 +102,7 @@ public class StandaloneHerderTest {
 
     private Connector connector;
     @Mock protected Worker worker;
+    @Mock protected WorkerConfigTransformer transformer;
     @Mock private Plugins plugins;
     @Mock
     private PluginClassLoader pluginLoader;
@@ -146,6 +148,9 @@ public class StandaloneHerderTest {
         config.remove(ConnectorConfig.NAME_CONFIG);
 
         Connector connectorMock = PowerMock.createMock(SourceConnector.class);
+        
EasyMock.expect(worker.configTransformer()).andReturn(transformer).times(2);
+        final Capture<Map<String, String>> configCapture = 
EasyMock.newCapture();
+        
EasyMock.expect(transformer.transform(EasyMock.capture(configCapture))).andAnswer(configCapture::getValue);
         EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(3);
         
EasyMock.expect(plugins.compareAndSwapLoaders(connectorMock)).andReturn(delegatingLoader);
         
EasyMock.expect(plugins.newConnector(EasyMock.anyString())).andReturn(connectorMock);
@@ -171,6 +176,9 @@ public class StandaloneHerderTest {
         connector = PowerMock.createMock(BogusSourceConnector.class);
 
         Connector connectorMock = PowerMock.createMock(SourceConnector.class);
+        
EasyMock.expect(worker.configTransformer()).andReturn(transformer).times(2);
+        final Capture<Map<String, String>> configCapture = 
EasyMock.newCapture();
+        
EasyMock.expect(transformer.transform(EasyMock.capture(configCapture))).andAnswer(configCapture::getValue);
         EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(3);
         
EasyMock.expect(plugins.compareAndSwapLoaders(connectorMock)).andReturn(delegatingLoader);
         
EasyMock.expect(plugins.newConnector(EasyMock.anyString())).andReturn(connectorMock);
@@ -205,6 +213,9 @@ public class StandaloneHerderTest {
         Connector connectorMock = PowerMock.createMock(SourceConnector.class);
         expectConfigValidation(connectorMock, true, config, config);
 
+        
EasyMock.expect(worker.configTransformer()).andReturn(transformer).times(2);
+        final Capture<Map<String, String>> configCapture = 
EasyMock.newCapture();
+        
EasyMock.expect(transformer.transform(EasyMock.capture(configCapture))).andAnswer(configCapture::getValue);
         EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(2);
         
EasyMock.expect(plugins.compareAndSwapLoaders(connectorMock)).andReturn(delegatingLoader);
         // No new connector is created
@@ -565,6 +576,9 @@ public class StandaloneHerderTest {
         );
         ConfigDef configDef = new ConfigDef();
         configDef.define(key, ConfigDef.Type.STRING, 
ConfigDef.Importance.HIGH, "");
+        
EasyMock.expect(worker.configTransformer()).andReturn(transformer).times(2);
+        final Capture<Map<String, String>> configCapture = 
EasyMock.newCapture();
+        
EasyMock.expect(transformer.transform(EasyMock.capture(configCapture))).andAnswer(configCapture::getValue);
         EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(3);
         
EasyMock.expect(plugins.compareAndSwapLoaders(connectorMock)).andReturn(delegatingLoader);
         EasyMock.expect(worker.getPlugins()).andStubReturn(plugins);
@@ -672,6 +686,9 @@ public class StandaloneHerderTest {
             Map<String, String>... configs
     ) {
         // config validation
+        
EasyMock.expect(worker.configTransformer()).andReturn(transformer).times(2);
+        final Capture<Map<String, String>> configCapture = 
EasyMock.newCapture();
+        
EasyMock.expect(transformer.transform(EasyMock.capture(configCapture))).andAnswer(configCapture::getValue);
         EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(3);
         
EasyMock.expect(plugins.compareAndSwapLoaders(connectorMock)).andReturn(delegatingLoader);
         if (shouldCreateConnector) {
diff --git a/tests/kafkatest/tests/connect/connect_test.py 
b/tests/kafkatest/tests/connect/connect_test.py
index 3753876..9d34c48 100644
--- a/tests/kafkatest/tests/connect/connect_test.py
+++ b/tests/kafkatest/tests/connect/connect_test.py
@@ -27,6 +27,7 @@ from kafkatest.services.security.security_config import 
SecurityConfig
 
 import hashlib
 import json
+import os.path
 
 
 class ConnectStandaloneFileTest(Test):
@@ -44,7 +45,8 @@ class ConnectStandaloneFileTest(Test):
 
     OFFSETS_FILE = "/mnt/connect.offsets"
 
-    TOPIC = "test"
+    TOPIC = 
"${file:/mnt/connect/connect-file-external.properties:topic.external}"
+    TOPIC_TEST = "test"
 
     FIRST_INPUT_LIST = ["foo", "bar", "baz"]
     FIRST_INPUT = "\n".join(FIRST_INPUT_LIST) + "\n"
@@ -90,13 +92,18 @@ class ConnectStandaloneFileTest(Test):
 
         self.source = ConnectStandaloneService(self.test_context, self.kafka, 
[self.INPUT_FILE, self.OFFSETS_FILE])
         self.sink = ConnectStandaloneService(self.test_context, self.kafka, 
[self.OUTPUT_FILE, self.OFFSETS_FILE])
-        self.consumer_validator = ConsoleConsumer(self.test_context, 1, 
self.kafka, self.TOPIC,
+        self.consumer_validator = ConsoleConsumer(self.test_context, 1, 
self.kafka, self.TOPIC_TEST,
                                                   consumer_timeout_ms=10000)
 
         self.zk.start()
         self.kafka.start()
 
+        source_external_props = os.path.join(self.source.PERSISTENT_ROOT, 
"connect-file-external.properties")
+        self.source.node.account.create_file(source_external_props, 
self.render('connect-file-external.properties'))
         self.source.set_configs(lambda node: 
self.render("connect-standalone.properties", node=node), 
[self.render("connect-file-source.properties")])
+
+        sink_external_props = os.path.join(self.sink.PERSISTENT_ROOT, 
"connect-file-external.properties")
+        self.sink.node.account.create_file(sink_external_props, 
self.render('connect-file-external.properties'))
         self.sink.set_configs(lambda node: 
self.render("connect-standalone.properties", node=node), 
[self.render("connect-file-sink.properties")])
 
         self.source.start()
diff --git 
a/tests/kafkatest/tests/connect/templates/connect-file-external.properties 
b/tests/kafkatest/tests/connect/templates/connect-file-external.properties
new file mode 100644
index 0000000..8dccd25
--- /dev/null
+++ b/tests/kafkatest/tests/connect/templates/connect-file-external.properties
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+topic.external={{ TOPIC_TEST }}
diff --git 
a/tests/kafkatest/tests/connect/templates/connect-standalone.properties 
b/tests/kafkatest/tests/connect/templates/connect-standalone.properties
index a8eaa44..cbfe491 100644
--- a/tests/kafkatest/tests/connect/templates/connect-standalone.properties
+++ b/tests/kafkatest/tests/connect/templates/connect-standalone.properties
@@ -31,3 +31,6 @@ offset.storage.file.filename={{ OFFSETS_FILE }}
 
 # Reduce the admin client request timeouts so that we don't wait the default 
120 sec before failing to connect the admin client
 request.timeout.ms=30000
+
+config.providers=file
+config.providers.file.class=org.apache.kafka.common.config.provider.FileConfigProvider

Reply via email to