This is an automated email from the ASF dual-hosted git repository. rhauch pushed a commit to branch 2.3 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.3 by this push: new baddf8b KAFKA-8407: Fix validation of class and list configs in connector client overrides (#6789) baddf8b is described below commit baddf8bbbac9eaa429676d32fa576ec3787680a7 Author: Chris Egerton <chr...@confluent.io> AuthorDate: Thu May 23 12:21:19 2019 -0700 KAFKA-8407: Fix validation of class and list configs in connector client overrides (#6789) Because of how config values are converted into strings in the `AbstractHerder.validateClientOverrides()` method after being validated by the client override policy, an exception is thrown if the value returned by the policy isn't already parsed as the type expected by the client `ConfigDef`. The fix here involves parsing client override properties before passing them to the override policy. A unit test is added to ensure that several different types of configs are validated properly by the herder. Author: Chris Egerton <chr...@confluent.io> Reviewers: Magesh Nandakumar <magesh.n.ku...@gmail.com>, Randall Hauch <rha...@gmail.com> --- .../kafka/connect/runtime/AbstractHerder.java | 11 +++- .../kafka/connect/runtime/AbstractHerderTest.java | 64 ++++++++++++++++++++-- 2 files changed, 70 insertions(+), 5 deletions(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java index e92f55e..f66029c 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java @@ -403,7 +403,16 @@ public abstract class AbstractHerder implements Herder, TaskStatus.Listener, Con List<ConfigInfo> configInfoList = new LinkedList<>(); Map<String, ConfigKey> configKeys = configDef.configKeys(); Set<String> groups = new LinkedHashSet<>(); - Map<String, Object> clientConfigs = connectorConfig.originalsWithPrefix(prefix); + Map<String, Object> clientConfigs = new HashMap<>(); + for (Map.Entry<String, Object> rawClientConfig : connectorConfig.originalsWithPrefix(prefix).entrySet()) { + String configName = rawClientConfig.getKey(); + Object rawConfigValue = rawClientConfig.getValue(); + ConfigKey configKey = configDef.configKeys().get(configName); + Object parsedConfigValue = configKey != null + ? ConfigDef.parseType(configName, rawConfigValue, configKey.type) + : rawConfigValue; + clientConfigs.put(configName, parsedConfigValue); + } ConnectorClientConfigRequest connectorClientConfigRequest = new ConnectorClientConfigRequest( connName, connectorType, connectorClass, clientConfigs, clientType); List<ConfigValue> configValues = connectorClientConfigOverridePolicy.validate(connectorClientConfigRequest); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java index 35c0dd2..2c341bc 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java @@ -16,18 +16,22 @@ */ package org.apache.kafka.connect.runtime; +import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.common.config.SaslConfigs; +import org.apache.kafka.common.security.oauthbearer.internals.unsecured.OAuthBearerUnsecuredLoginCallbackHandler; import org.apache.kafka.connect.connector.ConnectRecord; import org.apache.kafka.connect.connector.Connector; +import org.apache.kafka.connect.connector.policy.AllConnectorClientConfigOverridePolicy; import org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy; import org.apache.kafka.connect.connector.policy.NoneConnectorClientConfigOverridePolicy; import org.apache.kafka.connect.connector.policy.PrincipalConnectorClientConfigOverridePolicy; import org.apache.kafka.connect.runtime.distributed.ClusterConfigState; import org.apache.kafka.connect.runtime.isolation.PluginDesc; import org.apache.kafka.connect.runtime.isolation.Plugins; +import org.apache.kafka.connect.runtime.rest.entities.ConfigInfo; import org.apache.kafka.connect.runtime.rest.entities.ConfigInfos; import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo; import org.apache.kafka.connect.runtime.rest.entities.ConnectorType; @@ -56,6 +60,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.stream.Collectors; import static org.powermock.api.easymock.PowerMock.verifyAll; import static org.powermock.api.easymock.PowerMock.replayAll; @@ -364,14 +369,12 @@ public class AbstractHerderTest { AbstractHerder herder = createConfigValidationHerder(TestSourceConnector.class, new PrincipalConnectorClientConfigOverridePolicy()); replayAll(); - // Define 2 transformations. One has a class defined and so can get embedded configs, the other is missing - // class info that should generate an error. Map<String, String> config = new HashMap<>(); config.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, TestSourceConnector.class.getName()); config.put(ConnectorConfig.NAME_CONFIG, "connector-name"); config.put("required", "value"); // connector required config - String ackConfigKey = ConnectorConfig.CONNECTOR_CLIENT_PRODUCER_OVERRIDES_PREFIX + ProducerConfig.ACKS_CONFIG; - String saslConfigKey = ConnectorConfig.CONNECTOR_CLIENT_PRODUCER_OVERRIDES_PREFIX + SaslConfigs.SASL_JAAS_CONFIG; + String ackConfigKey = producerOverrideKey(ProducerConfig.ACKS_CONFIG); + String saslConfigKey = producerOverrideKey(SaslConfigs.SASL_JAAS_CONFIG); config.put(ackConfigKey, "none"); config.put(saslConfigKey, "jaas_config"); @@ -400,6 +403,55 @@ public class AbstractHerderTest { } @Test + public void testConfigValidationAllOverride() { + AbstractHerder herder = createConfigValidationHerder(TestSourceConnector.class, new AllConnectorClientConfigOverridePolicy()); + replayAll(); + + Map<String, String> config = new HashMap<>(); + config.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, TestSourceConnector.class.getName()); + config.put(ConnectorConfig.NAME_CONFIG, "connector-name"); + config.put("required", "value"); // connector required config + // Try to test a variety of configuration types: string, int, long, boolean, list, class + String protocolConfigKey = producerOverrideKey(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG); + config.put(protocolConfigKey, "SASL_PLAINTEXT"); + String maxRequestSizeConfigKey = producerOverrideKey(ProducerConfig.MAX_REQUEST_SIZE_CONFIG); + config.put(maxRequestSizeConfigKey, "420"); + String maxBlockConfigKey = producerOverrideKey(ProducerConfig.MAX_BLOCK_MS_CONFIG); + config.put(maxBlockConfigKey, "28980"); + String idempotenceConfigKey = producerOverrideKey(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG); + config.put(idempotenceConfigKey, "true"); + String bootstrapServersConfigKey = producerOverrideKey(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG); + config.put(bootstrapServersConfigKey, "SASL_PLAINTEXT://localhost:12345,SASL_PLAINTEXT://localhost:23456"); + String loginCallbackHandlerConfigKey = producerOverrideKey(SaslConfigs.SASL_LOGIN_CALLBACK_HANDLER_CLASS); + config.put(loginCallbackHandlerConfigKey, OAuthBearerUnsecuredLoginCallbackHandler.class.getName()); + + final Set<String> overriddenClientConfigs = new HashSet<>(); + overriddenClientConfigs.add(protocolConfigKey); + overriddenClientConfigs.add(maxRequestSizeConfigKey); + overriddenClientConfigs.add(maxBlockConfigKey); + overriddenClientConfigs.add(idempotenceConfigKey); + overriddenClientConfigs.add(bootstrapServersConfigKey); + overriddenClientConfigs.add(loginCallbackHandlerConfigKey); + + ConfigInfos result = herder.validateConnectorConfig(config); + assertEquals(herder.connectorTypeForClass(config.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG)), ConnectorType.SOURCE); + + Map<String, String> validatedOverriddenClientConfigs = new HashMap<>(); + for (ConfigInfo configInfo : result.values()) { + String configName = configInfo.configKey().name(); + if (overriddenClientConfigs.contains(configName)) { + validatedOverriddenClientConfigs.put(configName, configInfo.configValue().value()); + } + } + Map<String, String> rawOverriddenClientConfigs = config.entrySet().stream() + .filter(e -> overriddenClientConfigs.contains(e.getKey())) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + + assertEquals(rawOverriddenClientConfigs, validatedOverriddenClientConfigs); + verifyAll(); + } + + @Test public void testReverseTransformConfigs() { // Construct a task config with constant values for TEST_KEY and TEST_KEY2 Map<String, String> newTaskConfig = new HashMap<>(); @@ -482,4 +534,8 @@ public class AbstractHerderTest { private abstract class BogusSourceTask extends SourceTask { } + + private static String producerOverrideKey(String config) { + return ConnectorConfig.CONNECTOR_CLIENT_PRODUCER_OVERRIDES_PREFIX + config; + } }