This is an automated email from the ASF dual-hosted git repository. ewencp pushed a commit to branch 2.1 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.1 by this push: new 265b58b KAFKA-5117: Stop resolving externalized configs in Connect REST API 265b58b is described below commit 265b58bd11dfbd8014ccabe320589f7163a82925 Author: Chris Egerton <chr...@confluent.io> AuthorDate: Wed Jan 23 11:00:23 2019 -0800 KAFKA-5117: Stop resolving externalized configs in Connect REST API [KIP-297](https://cwiki.apache.org/confluence/display/KAFKA/KIP-297%3A+Externalizing+Secrets+for+Connect+Configurations#KIP-297:ExternalizingSecretsforConnectConfigurations-PublicInterfaces) introduced the `ConfigProvider` mechanism, which was primarily intended for externalizing secrets provided in connector configurations. However, when querying the Connect REST API for the configuration of a connector or its tasks, those secrets are still exposed. The changes here prevent the Conne [...] Tested and verified manually. If these changes are approved unit tests can be added to prevent a regression. Author: Chris Egerton <chr...@confluent.io> Reviewers: Robert Yokota <rayok...@gmail.com>, Randall Hauch <rha...@gmail.com, Ewen Cheslack-Postava <e...@confluent.io> Closes #6129 from C0urante/hide-provided-connect-configs (cherry picked from commit 743607af5aa625a19377688709870b021014dee2) Signed-off-by: Ewen Cheslack-Postava <m...@ewencp.org> --- .../runtime/distributed/DistributedHerder.java | 4 ++-- .../runtime/standalone/StandaloneHerder.java | 4 ++-- .../runtime/distributed/DistributedHerderTest.java | 11 +++++++++- .../runtime/standalone/StandaloneHerderTest.java | 24 +++++++++++++++------- tests/kafkatest/tests/connect/connect_rest_test.py | 7 +++++-- tests/kafkatest/tests/connect/connect_test.py | 5 ++--- .../templates/connect-distributed.properties | 6 ++++++ 7 files changed, 44 insertions(+), 17 deletions(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java index 099f084..7edc3b2 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java @@ -451,7 +451,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable { if (!configState.contains(connName)) { callback.onCompletion(new NotFoundException("Connector " + connName + " not found"), null); } else { - Map<String, String> config = configState.connectorConfig(connName); + Map<String, String> config = configState.rawConnectorConfig(connName); callback.onCompletion(null, new ConnectorInfo(connName, config, configState.tasks(connName), connectorTypeForClass(config.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG)))); @@ -607,7 +607,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable { List<TaskInfo> result = new ArrayList<>(); for (int i = 0; i < configState.taskCount(connName); i++) { ConnectorTaskId id = new ConnectorTaskId(connName, i); - result.add(new TaskInfo(id, configState.taskConfig(id))); + result.add(new TaskInfo(id, configState.rawTaskConfig(id))); } callback.onCompletion(null, result); } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java index fe31c28..95b53e5 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java @@ -134,7 +134,7 @@ public class StandaloneHerder extends AbstractHerder { private ConnectorInfo createConnectorInfo(String connector) { if (!configState.contains(connector)) return null; - Map<String, String> config = configState.connectorConfig(connector); + Map<String, String> config = configState.rawConnectorConfig(connector); return new ConnectorInfo(connector, config, configState.tasks(connector), connectorTypeForClass(config.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG))); } @@ -232,7 +232,7 @@ public class StandaloneHerder extends AbstractHerder { List<TaskInfo> result = new ArrayList<>(); for (ConnectorTaskId taskId : configState.tasks(connName)) - result.add(new TaskInfo(taskId, configState.taskConfig(taskId))); + result.add(new TaskInfo(taskId, configState.rawTaskConfig(taskId))); callback.onCompletion(null, result); } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java index a0de8cf..25c1da8 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java @@ -1293,7 +1293,16 @@ public class DistributedHerderTest { EasyMock.expect(member.memberId()).andStubReturn("leader"); EasyMock.expect(worker.getPlugins()).andReturn(plugins).anyTimes(); expectRebalance(1, Collections.<String>emptyList(), Collections.<ConnectorTaskId>emptyList()); - expectPostRebalanceCatchup(SNAPSHOT); + + WorkerConfigTransformer configTransformer = EasyMock.mock(WorkerConfigTransformer.class); + EasyMock.expect(configTransformer.transform(EasyMock.eq(CONN1), EasyMock.anyObject())) + .andThrow(new AssertionError("Config transformation should not occur when requesting connector or task info")); + EasyMock.replay(configTransformer); + ClusterConfigState snapshotWithTransform = new ClusterConfigState(1, Collections.singletonMap(CONN1, 3), + Collections.singletonMap(CONN1, CONN1_CONFIG), Collections.singletonMap(CONN1, TargetState.STARTED), + TASK_CONFIGS_MAP, Collections.<String>emptySet(), configTransformer); + + expectPostRebalanceCatchup(snapshotWithTransform); member.wakeup(); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java index b98c15e..a23ee10 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java @@ -115,12 +115,14 @@ public class StandaloneHerderTest { public void setup() { worker = PowerMock.createMock(Worker.class); herder = PowerMock.createPartialMock(StandaloneHerder.class, new String[]{"connectorTypeForClass"}, - worker, WORKER_ID, KAFKA_CLUSTER_ID, statusBackingStore, new MemoryConfigBackingStore()); + worker, WORKER_ID, KAFKA_CLUSTER_ID, statusBackingStore, new MemoryConfigBackingStore(transformer)); plugins = PowerMock.createMock(Plugins.class); pluginLoader = PowerMock.createMock(PluginClassLoader.class); delegatingLoader = PowerMock.createMock(DelegatingClassLoader.class); PowerMock.mockStatic(Plugins.class); PowerMock.mockStatic(WorkerConnector.class); + Capture<Map<String, String>> configCapture = Capture.newInstance(); + EasyMock.expect(transformer.transform(EasyMock.eq(CONNECTOR_NAME), EasyMock.capture(configCapture))).andAnswer(configCapture::getValue).anyTimes(); } @Test @@ -357,7 +359,8 @@ public class StandaloneHerderTest { Collections.singletonMap(CONNECTOR_NAME, connectorConfig), Collections.singletonMap(CONNECTOR_NAME, TargetState.STARTED), Collections.singletonMap(taskId, taskConfig(SourceSink.SOURCE)), - new HashSet<>()); + new HashSet<>(), + transformer); worker.startTask(taskId, configState, connectorConfig, taskConfig(SourceSink.SOURCE), herder, TargetState.STARTED); EasyMock.expectLastCall().andReturn(true); @@ -390,7 +393,8 @@ public class StandaloneHerderTest { Collections.singletonMap(CONNECTOR_NAME, connectorConfig), Collections.singletonMap(CONNECTOR_NAME, TargetState.STARTED), Collections.singletonMap(new ConnectorTaskId(CONNECTOR_NAME, 0), taskConfig(SourceSink.SOURCE)), - new HashSet<>()); + new HashSet<>(), + transformer); worker.startTask(taskId, configState, connectorConfig, taskConfig(SourceSink.SOURCE), herder, TargetState.STARTED); EasyMock.expectLastCall().andReturn(false); @@ -458,7 +462,6 @@ public class StandaloneHerderTest { // Create connector connector = PowerMock.createMock(BogusSourceConnector.class); expectAdd(SourceSink.SOURCE); - Connector connectorMock = PowerMock.createMock(SourceConnector.class); expectConfigValidation(connector, true, connConfig); // Validate accessors with 1 connector @@ -485,6 +488,13 @@ public class StandaloneHerderTest { herder.taskConfigs(CONNECTOR_NAME, taskConfigsCb); herder.putConnectorConfig(CONNECTOR_NAME, connConfig, false, createCallback); + + EasyMock.reset(transformer); + EasyMock.expect(transformer.transform(EasyMock.eq(CONNECTOR_NAME), EasyMock.anyObject())) + .andThrow(new AssertionError("Config transformation should not occur when requesting connector or task info")) + .anyTimes(); + EasyMock.replay(transformer); + herder.connectors(listConnectorsCb); herder.connectorInfo(CONNECTOR_NAME, connectorInfoCb); herder.connectorConfig(CONNECTOR_NAME, connectorConfigCb); @@ -604,8 +614,7 @@ public class StandaloneHerderTest { PowerMock.verifyAll(); } - private void expectAdd(SourceSink sourceSink) throws Exception { - + private void expectAdd(SourceSink sourceSink) { Map<String, String> connectorProps = connectorConfig(sourceSink); ConnectorConfig connConfig = sourceSink == SourceSink.SOURCE ? new SourceConnectorConfig(plugins, connectorProps) : @@ -634,7 +643,8 @@ public class StandaloneHerderTest { Collections.singletonMap(CONNECTOR_NAME, connectorConfig(sourceSink)), Collections.singletonMap(CONNECTOR_NAME, TargetState.STARTED), Collections.singletonMap(new ConnectorTaskId(CONNECTOR_NAME, 0), generatedTaskProps), - new HashSet<>()); + new HashSet<>(), + transformer); worker.startTask(new ConnectorTaskId(CONNECTOR_NAME, 0), configState, connectorConfig(sourceSink), generatedTaskProps, herder, TargetState.STARTED); EasyMock.expectLastCall().andReturn(true); diff --git a/tests/kafkatest/tests/connect/connect_rest_test.py b/tests/kafkatest/tests/connect/connect_rest_test.py index 8b6157b..c13515b 100644 --- a/tests/kafkatest/tests/connect/connect_rest_test.py +++ b/tests/kafkatest/tests/connect/connect_rest_test.py @@ -14,7 +14,7 @@ # limitations under the License. from kafkatest.tests.kafka_test import KafkaTest -from kafkatest.services.connect import ConnectDistributedService, ConnectRestError +from kafkatest.services.connect import ConnectDistributedService, ConnectRestError, ConnectServiceBase from ducktape.utils.util import wait_until from ducktape.mark.resource import cluster from ducktape.cluster.remoteaccount import RemoteCommandError @@ -43,7 +43,9 @@ class ConnectRestApiTest(KafkaTest): INPUT_FILE2 = "/mnt/connect.input2" OUTPUT_FILE = "/mnt/connect.output" - TOPIC = "test" + TOPIC = "${file:%s:topic.external}" % ConnectServiceBase.EXTERNAL_CONFIGS_FILE + TOPIC_TEST = "test" + DEFAULT_BATCH_SIZE = "2000" OFFSETS_TOPIC = "connect-offsets" OFFSETS_REPLICATION_FACTOR = "1" @@ -78,6 +80,7 @@ class ConnectRestApiTest(KafkaTest): self.schemas = True self.cc.set_configs(lambda node: self.render("connect-distributed.properties", node=node)) + self.cc.set_external_configs(lambda node: self.render("connect-file-external.properties", node=node)) self.cc.start() diff --git a/tests/kafkatest/tests/connect/connect_test.py b/tests/kafkatest/tests/connect/connect_test.py index f01ff0a..9a1ff1b 100644 --- a/tests/kafkatest/tests/connect/connect_test.py +++ b/tests/kafkatest/tests/connect/connect_test.py @@ -22,8 +22,7 @@ from ducktape.errors import TimeoutError from kafkatest.services.zookeeper import ZookeeperService from kafkatest.services.kafka import KafkaService -from kafkatest.services.connect import ConnectStandaloneService -from kafkatest.services.connect import ErrorTolerance +from kafkatest.services.connect import ConnectServiceBase, ConnectStandaloneService, ErrorTolerance from kafkatest.services.console_consumer import ConsoleConsumer from kafkatest.services.security.security_config import SecurityConfig @@ -47,7 +46,7 @@ class ConnectStandaloneFileTest(Test): OFFSETS_FILE = "/mnt/connect.offsets" - TOPIC = "${file:/mnt/connect/connect-external-configs.properties:topic.external}" + TOPIC = "${file:%s:topic.external}" % ConnectServiceBase.EXTERNAL_CONFIGS_FILE TOPIC_TEST = "test" FIRST_INPUT_LIST = ["foo", "bar", "baz"] diff --git a/tests/kafkatest/tests/connect/templates/connect-distributed.properties b/tests/kafkatest/tests/connect/templates/connect-distributed.properties index 186773e..ca8c4f8 100644 --- a/tests/kafkatest/tests/connect/templates/connect-distributed.properties +++ b/tests/kafkatest/tests/connect/templates/connect-distributed.properties @@ -50,3 +50,9 @@ consumer.session.timeout.ms=10000 # Reduce the admin client request timeouts so that we don't wait the default 120 sec before failing to connect the admin client request.timeout.ms=30000 + +# Allow connector configs to use externalized config values of the form: +# ${file:/mnt/connect/connect-external-configs.properties:topic.external} +# +config.providers=file +config.providers.file.class=org.apache.kafka.common.config.provider.FileConfigProvider