This is an automated email from the ASF dual-hosted git repository. ewencp pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 36a8fec KAFKA-7225: Pretransform validated props 36a8fec is described below commit 36a8fec0ab2d05a8386ecd386bbbd294c3dc9126 Author: Robert Yokota <rayok...@gmail.com> AuthorDate: Tue Aug 7 13:18:16 2018 -0700 KAFKA-7225: Pretransform validated props If a property requires validation, it should be pretransformed if it is a variable reference, in order to have a value that will properly pass the validation. Author: Robert Yokota <rayok...@gmail.com> Reviewers: Randall Hauch <rha...@gmail.com>, Ewen Cheslack-Postava <e...@confluent.io> Closes #5445 from rayokota/KAFKA-7225-pretransform-validated-props --- .../apache/kafka/connect/runtime/AbstractHerder.java | 3 +++ .../connect/runtime/WorkerConfigTransformer.java | 8 +++++++- .../kafka/connect/runtime/AbstractHerderTest.java | 4 ++++ .../runtime/distributed/DistributedHerderTest.java | 20 ++++++++++++++++++++ .../runtime/standalone/StandaloneHerderTest.java | 17 +++++++++++++++++ tests/kafkatest/tests/connect/connect_test.py | 11 +++++++++-- .../templates/connect-file-external.properties | 16 ++++++++++++++++ .../connect/templates/connect-standalone.properties | 3 +++ 8 files changed, 79 insertions(+), 3 deletions(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java index b5e0ec2..cadb4e0 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java @@ -246,6 +246,9 @@ public abstract class AbstractHerder implements Herder, TaskStatus.Listener, Con @Override public ConfigInfos validateConnectorConfig(Map<String, String> connectorProps) { + if (worker.configTransformer() != null) { + connectorProps = worker.configTransformer().transform(connectorProps); + } String connType = connectorProps.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG); if (connType == null) throw new BadRequestException("Connector config " + connectorProps + " contains no connector type"); diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java index 7efb481..1b715c7 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java @@ -38,10 +38,16 @@ public class WorkerConfigTransformer { this.configTransformer = new ConfigTransformer(configProviders); } + public Map<String, String> transform(Map<String, String> configs) { + return transform(null, configs); + } + public Map<String, String> transform(String connectorName, Map<String, String> configs) { if (configs == null) return null; ConfigTransformerResult result = configTransformer.transform(configs); - scheduleReload(connectorName, result.ttls()); + if (connectorName != null) { + scheduleReload(connectorName, result.ttls()); + } return result.data(); } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java index 5728465..db3cf27 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java @@ -67,6 +67,7 @@ public class AbstractHerderTest { private final String connector = "connector"; @MockStrict private Worker worker; + @MockStrict private WorkerConfigTransformer transformer; @MockStrict private Plugins plugins; @MockStrict private ClassLoader classLoader; @MockStrict private ConfigBackingStore configStore; @@ -261,6 +262,9 @@ public class AbstractHerderTest { EasyMock.expect(herder.generation()).andStubReturn(generation); // Call to validateConnectorConfig + EasyMock.expect(worker.configTransformer()).andReturn(transformer).times(2); + final Capture<Map<String, String>> configCapture = EasyMock.newCapture(); + EasyMock.expect(transformer.transform(EasyMock.capture(configCapture))).andAnswer(configCapture::getValue); EasyMock.expect(worker.getPlugins()).andStubReturn(plugins); final Connector connector; try { diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java index 911afe7..a0de8cf 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java @@ -34,6 +34,7 @@ import org.apache.kafka.connect.runtime.TargetState; import org.apache.kafka.connect.runtime.TaskConfig; import org.apache.kafka.connect.runtime.Worker; import org.apache.kafka.connect.runtime.WorkerConfig; +import org.apache.kafka.connect.runtime.WorkerConfigTransformer; import org.apache.kafka.connect.runtime.distributed.DistributedHerder.HerderMetrics; import org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader; import org.apache.kafka.connect.runtime.isolation.PluginClassLoader; @@ -158,6 +159,7 @@ public class DistributedHerderTest { private DistributedHerder herder; private MockConnectMetrics metrics; @Mock private Worker worker; + @Mock private WorkerConfigTransformer transformer; @Mock private Callback<Herder.Created<ConnectorInfo>> putConnectorCallback; @Mock private Plugins plugins; @@ -356,6 +358,9 @@ public class DistributedHerderTest { // config validation Connector connectorMock = PowerMock.createMock(SourceConnector.class); + EasyMock.expect(worker.configTransformer()).andReturn(transformer).times(2); + final Capture<Map<String, String>> configCapture = EasyMock.newCapture(); + EasyMock.expect(transformer.transform(EasyMock.capture(configCapture))).andAnswer(configCapture::getValue); EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(3); EasyMock.expect(plugins.compareAndSwapLoaders(connectorMock)).andReturn(delegatingLoader); EasyMock.expect(plugins.newConnector(EasyMock.anyString())).andReturn(connectorMock); @@ -399,6 +404,9 @@ public class DistributedHerderTest { // config validation Connector connectorMock = PowerMock.createMock(SourceConnector.class); + EasyMock.expect(worker.configTransformer()).andReturn(transformer).times(2); + final Capture<Map<String, String>> configCapture = EasyMock.newCapture(); + EasyMock.expect(transformer.transform(EasyMock.capture(configCapture))).andAnswer(configCapture::getValue); EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(3); EasyMock.expect(plugins.compareAndSwapLoaders(connectorMock)).andReturn(delegatingLoader); EasyMock.expect(plugins.newConnector(EasyMock.anyString())).andReturn(connectorMock); @@ -444,6 +452,9 @@ public class DistributedHerderTest { // config validation Connector connectorMock = PowerMock.createMock(SourceConnector.class); + EasyMock.expect(worker.configTransformer()).andReturn(transformer).times(2); + final Capture<Map<String, String>> configCapture = EasyMock.newCapture(); + EasyMock.expect(transformer.transform(EasyMock.capture(configCapture))).andAnswer(configCapture::getValue); EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(3); EasyMock.expect(plugins.compareAndSwapLoaders(connectorMock)).andReturn(delegatingLoader); EasyMock.expect(plugins.newConnector(EasyMock.anyString())).andReturn(connectorMock); @@ -495,6 +506,9 @@ public class DistributedHerderTest { // config validation Connector connectorMock = PowerMock.createMock(SinkConnector.class); + EasyMock.expect(worker.configTransformer()).andReturn(transformer).times(2); + final Capture<Map<String, String>> configCapture = EasyMock.newCapture(); + EasyMock.expect(transformer.transform(EasyMock.capture(configCapture))).andAnswer(configCapture::getValue); EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(3); EasyMock.expect(plugins.compareAndSwapLoaders(connectorMock)).andReturn(delegatingLoader); EasyMock.expect(plugins.newConnector(EasyMock.anyString())).andReturn(connectorMock); @@ -530,6 +544,9 @@ public class DistributedHerderTest { @Test public void testCreateConnectorAlreadyExists() throws Exception { EasyMock.expect(member.memberId()).andStubReturn("leader"); + EasyMock.expect(worker.configTransformer()).andReturn(transformer).times(2); + final Capture<Map<String, String>> configCapture = EasyMock.newCapture(); + EasyMock.expect(transformer.transform(EasyMock.capture(configCapture))).andAnswer(configCapture::getValue); EasyMock.expect(worker.getPlugins()).andReturn(plugins); EasyMock.expect(plugins.newConnector(EasyMock.anyString())).andReturn(null); expectRebalance(1, Collections.<String>emptyList(), Collections.<ConnectorTaskId>emptyList()); @@ -1339,6 +1356,9 @@ public class DistributedHerderTest { // config validation Connector connectorMock = PowerMock.createMock(SourceConnector.class); + EasyMock.expect(worker.configTransformer()).andReturn(transformer).times(2); + final Capture<Map<String, String>> configCapture = EasyMock.newCapture(); + EasyMock.expect(transformer.transform(EasyMock.capture(configCapture))).andAnswer(configCapture::getValue); EasyMock.expect(worker.getPlugins()).andReturn(plugins).anyTimes(); EasyMock.expect(plugins.compareAndSwapLoaders(connectorMock)).andReturn(delegatingLoader); EasyMock.expect(plugins.newConnector(EasyMock.anyString())).andReturn(connectorMock); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java index 5372a3a..b98c15e 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java @@ -36,6 +36,7 @@ import org.apache.kafka.connect.runtime.TargetState; import org.apache.kafka.connect.runtime.TaskConfig; import org.apache.kafka.connect.runtime.TaskStatus; import org.apache.kafka.connect.runtime.Worker; +import org.apache.kafka.connect.runtime.WorkerConfigTransformer; import org.apache.kafka.connect.runtime.WorkerConnector; import org.apache.kafka.connect.runtime.distributed.ClusterConfigState; import org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader; @@ -101,6 +102,7 @@ public class StandaloneHerderTest { private Connector connector; @Mock protected Worker worker; + @Mock protected WorkerConfigTransformer transformer; @Mock private Plugins plugins; @Mock private PluginClassLoader pluginLoader; @@ -146,6 +148,9 @@ public class StandaloneHerderTest { config.remove(ConnectorConfig.NAME_CONFIG); Connector connectorMock = PowerMock.createMock(SourceConnector.class); + EasyMock.expect(worker.configTransformer()).andReturn(transformer).times(2); + final Capture<Map<String, String>> configCapture = EasyMock.newCapture(); + EasyMock.expect(transformer.transform(EasyMock.capture(configCapture))).andAnswer(configCapture::getValue); EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(3); EasyMock.expect(plugins.compareAndSwapLoaders(connectorMock)).andReturn(delegatingLoader); EasyMock.expect(plugins.newConnector(EasyMock.anyString())).andReturn(connectorMock); @@ -171,6 +176,9 @@ public class StandaloneHerderTest { connector = PowerMock.createMock(BogusSourceConnector.class); Connector connectorMock = PowerMock.createMock(SourceConnector.class); + EasyMock.expect(worker.configTransformer()).andReturn(transformer).times(2); + final Capture<Map<String, String>> configCapture = EasyMock.newCapture(); + EasyMock.expect(transformer.transform(EasyMock.capture(configCapture))).andAnswer(configCapture::getValue); EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(3); EasyMock.expect(plugins.compareAndSwapLoaders(connectorMock)).andReturn(delegatingLoader); EasyMock.expect(plugins.newConnector(EasyMock.anyString())).andReturn(connectorMock); @@ -205,6 +213,9 @@ public class StandaloneHerderTest { Connector connectorMock = PowerMock.createMock(SourceConnector.class); expectConfigValidation(connectorMock, true, config, config); + EasyMock.expect(worker.configTransformer()).andReturn(transformer).times(2); + final Capture<Map<String, String>> configCapture = EasyMock.newCapture(); + EasyMock.expect(transformer.transform(EasyMock.capture(configCapture))).andAnswer(configCapture::getValue); EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(2); EasyMock.expect(plugins.compareAndSwapLoaders(connectorMock)).andReturn(delegatingLoader); // No new connector is created @@ -565,6 +576,9 @@ public class StandaloneHerderTest { ); ConfigDef configDef = new ConfigDef(); configDef.define(key, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, ""); + EasyMock.expect(worker.configTransformer()).andReturn(transformer).times(2); + final Capture<Map<String, String>> configCapture = EasyMock.newCapture(); + EasyMock.expect(transformer.transform(EasyMock.capture(configCapture))).andAnswer(configCapture::getValue); EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(3); EasyMock.expect(plugins.compareAndSwapLoaders(connectorMock)).andReturn(delegatingLoader); EasyMock.expect(worker.getPlugins()).andStubReturn(plugins); @@ -672,6 +686,9 @@ public class StandaloneHerderTest { Map<String, String>... configs ) { // config validation + EasyMock.expect(worker.configTransformer()).andReturn(transformer).times(2); + final Capture<Map<String, String>> configCapture = EasyMock.newCapture(); + EasyMock.expect(transformer.transform(EasyMock.capture(configCapture))).andAnswer(configCapture::getValue); EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(3); EasyMock.expect(plugins.compareAndSwapLoaders(connectorMock)).andReturn(delegatingLoader); if (shouldCreateConnector) { diff --git a/tests/kafkatest/tests/connect/connect_test.py b/tests/kafkatest/tests/connect/connect_test.py index 3753876..9d34c48 100644 --- a/tests/kafkatest/tests/connect/connect_test.py +++ b/tests/kafkatest/tests/connect/connect_test.py @@ -27,6 +27,7 @@ from kafkatest.services.security.security_config import SecurityConfig import hashlib import json +import os.path class ConnectStandaloneFileTest(Test): @@ -44,7 +45,8 @@ class ConnectStandaloneFileTest(Test): OFFSETS_FILE = "/mnt/connect.offsets" - TOPIC = "test" + TOPIC = "${file:/mnt/connect/connect-file-external.properties:topic.external}" + TOPIC_TEST = "test" FIRST_INPUT_LIST = ["foo", "bar", "baz"] FIRST_INPUT = "\n".join(FIRST_INPUT_LIST) + "\n" @@ -90,13 +92,18 @@ class ConnectStandaloneFileTest(Test): self.source = ConnectStandaloneService(self.test_context, self.kafka, [self.INPUT_FILE, self.OFFSETS_FILE]) self.sink = ConnectStandaloneService(self.test_context, self.kafka, [self.OUTPUT_FILE, self.OFFSETS_FILE]) - self.consumer_validator = ConsoleConsumer(self.test_context, 1, self.kafka, self.TOPIC, + self.consumer_validator = ConsoleConsumer(self.test_context, 1, self.kafka, self.TOPIC_TEST, consumer_timeout_ms=10000) self.zk.start() self.kafka.start() + source_external_props = os.path.join(self.source.PERSISTENT_ROOT, "connect-file-external.properties") + self.source.node.account.create_file(source_external_props, self.render('connect-file-external.properties')) self.source.set_configs(lambda node: self.render("connect-standalone.properties", node=node), [self.render("connect-file-source.properties")]) + + sink_external_props = os.path.join(self.sink.PERSISTENT_ROOT, "connect-file-external.properties") + self.sink.node.account.create_file(sink_external_props, self.render('connect-file-external.properties')) self.sink.set_configs(lambda node: self.render("connect-standalone.properties", node=node), [self.render("connect-file-sink.properties")]) self.source.start() diff --git a/tests/kafkatest/tests/connect/templates/connect-file-external.properties b/tests/kafkatest/tests/connect/templates/connect-file-external.properties new file mode 100644 index 0000000..8dccd25 --- /dev/null +++ b/tests/kafkatest/tests/connect/templates/connect-file-external.properties @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +topic.external={{ TOPIC_TEST }} diff --git a/tests/kafkatest/tests/connect/templates/connect-standalone.properties b/tests/kafkatest/tests/connect/templates/connect-standalone.properties index a8eaa44..cbfe491 100644 --- a/tests/kafkatest/tests/connect/templates/connect-standalone.properties +++ b/tests/kafkatest/tests/connect/templates/connect-standalone.properties @@ -31,3 +31,6 @@ offset.storage.file.filename={{ OFFSETS_FILE }} # Reduce the admin client request timeouts so that we don't wait the default 120 sec before failing to connect the admin client request.timeout.ms=30000 + +config.providers=file +config.providers.file.class=org.apache.kafka.common.config.provider.FileConfigProvider