This is an automated email from the ASF dual-hosted git repository. cegerton pushed a commit to branch 3.4 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.4 by this push: new 2da64826fbc Kafka 14565: On failure, close AutoCloseable objects instantiated and configured by AbstractConfig (#13168) 2da64826fbc is described below commit 2da64826fbc39567c424a1114e5777d6b84d184f Author: Terry <bea...@hotmail.com> AuthorDate: Thu Feb 16 12:39:24 2023 -0500 Kafka 14565: On failure, close AutoCloseable objects instantiated and configured by AbstractConfig (#13168) Reviewers: Chris Egerton <chr...@aiven.io> --- .../apache/kafka/common/config/AbstractConfig.java | 101 +++++++++++++-------- .../kafka/clients/consumer/KafkaConsumerTest.java | 27 ++++++ .../kafka/clients/producer/KafkaProducerTest.java | 24 +++++ .../kafka/common/config/AbstractConfigTest.java | 26 +++++- .../apache/kafka/test/MockConsumerInterceptor.java | 14 +++ .../apache/kafka/test/MockProducerInterceptor.java | 14 +++ 6 files changed, 166 insertions(+), 40 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java index e3fda4d9f54..13637163311 100644 --- a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java +++ b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java @@ -66,44 +66,43 @@ public class AbstractConfig { * Construct a configuration with a ConfigDef and the configuration properties, which can include properties * for zero or more {@link ConfigProvider} that will be used to resolve variables in configuration property * values. - * + * <p> * The originals is a name-value pair configuration properties and optional config provider configs. The * value of the configuration can be a variable as defined below or the actual value. This constructor will * first instantiate the ConfigProviders using the config provider configs, then it will find all the * variables in the values of the originals configurations, attempt to resolve the variables using the named * ConfigProviders, and then parse and validate the configurations. - * + * <p> * ConfigProvider configs can be passed either as configs in the originals map or in the separate * configProviderProps map. If config providers properties are passed in the configProviderProps any config * provider properties in originals map will be ignored. If ConfigProvider properties are not provided, the * constructor will skip the variable substitution step and will simply validate and parse the supplied * configuration. - * + * <p> * The "{@code config.providers}" configuration property and all configuration properties that begin with the * "{@code config.providers.}" prefix are reserved. The "{@code config.providers}" configuration property * specifies the names of the config providers, and properties that begin with the "{@code config.providers..}" * prefix correspond to the properties for that named provider. For example, the "{@code config.providers..class}" * property specifies the name of the {@link ConfigProvider} implementation class that should be used for * the provider. - * + * <p> * The keys for ConfigProvider configs in both originals and configProviderProps will start with the above * mentioned "{@code config.providers.}" prefix. - * + * <p> * Variables have the form "${providerName:[path:]key}", where "providerName" is the name of a ConfigProvider, * "path" is an optional string, and "key" is a required string. This variable is resolved by passing the "key" * and optional "path" to a ConfigProvider with the specified name, and the result from the ConfigProvider is * then used in place of the variable. Variables that cannot be resolved by the AbstractConfig constructor will * be left unchanged in the configuration. * - * - * @param definition the definition of the configurations; may not be null - * @param originals the configuration properties plus any optional config provider properties; + * @param definition the definition of the configurations; may not be null + * @param originals the configuration properties plus any optional config provider properties; * @param configProviderProps the map of properties of config providers which will be instantiated by - * the constructor to resolve any variables in {@code originals}; may be null or empty - * @param doLog whether the configurations should be logged + * the constructor to resolve any variables in {@code originals}; may be null or empty + * @param doLog whether the configurations should be logged */ @SuppressWarnings("unchecked") - public AbstractConfig(ConfigDef definition, Map<?, ?> originals, Map<String, ?> configProviderProps, boolean doLog) { + public AbstractConfig(ConfigDef definition, Map<?, ?> originals, Map<String, ?> configProviderProps, boolean doLog) { /* check that all the keys are really strings */ for (Map.Entry<?, ?> entry : originals.entrySet()) if (!(entry.getKey() instanceof String)) @@ -127,7 +126,7 @@ public class AbstractConfig { * that will be used to resolve variables in configuration property values. * * @param definition the definition of the configurations; may not be null - * @param originals the configuration properties plus any optional config provider properties; may not be null + * @param originals the configuration properties plus any optional config provider properties; may not be null */ public AbstractConfig(ConfigDef definition, Map<?, ?> originals) { this(definition, originals, Collections.emptyMap(), true); @@ -139,8 +138,8 @@ public class AbstractConfig { * that will be used to resolve variables in configuration property values. * * @param definition the definition of the configurations; may not be null - * @param originals the configuration properties plus any optional config provider properties; may not be null - * @param doLog whether the configurations should be logged + * @param originals the configuration properties plus any optional config provider properties; may not be null + * @param doLog whether the configurations should be logged */ public AbstractConfig(ConfigDef definition, Map<?, ?> originals, boolean doLog) { this(definition, originals, Collections.emptyMap(), doLog); @@ -241,6 +240,7 @@ public class AbstractConfig { /** * Get all the original settings, ensuring that all values are of type String. + * * @return the original settings * @throws ClassCastException if any of the values are not strings */ @@ -269,7 +269,7 @@ public class AbstractConfig { * Gets all original settings with the given prefix. * * @param prefix the prefix to use as a filter - * @param strip strip the prefix before adding to the output if set true + * @param strip strip the prefix before adding to the output if set true * @return a Map containing the settings with the prefix */ public Map<String, Object> originalsWithPrefix(String prefix, boolean strip) { @@ -288,7 +288,7 @@ public class AbstractConfig { /** * Put all keys that do not start with {@code prefix} and their parsed values in the result map and then * put all the remaining keys with the prefix stripped and their parsed values in the result map. - * + * <p> * This is useful if one wants to allow prefixed configs to override default ones. * <p> * Two forms of prefixes are supported: @@ -323,7 +323,7 @@ public class AbstractConfig { /** * If at least one key with {@code prefix} exists, all prefixed values will be parsed and put into map. * If no value with {@code prefix} exists all unprefixed values will be returned. - * + * <p> * This is useful if one wants to allow prefixed configs to override default ones, but wants to use either * only prefixed configs or only regular configs, but not mix them. */ @@ -389,8 +389,8 @@ public class AbstractConfig { private <T> T getConfiguredInstance(Object klass, Class<T> t, Map<String, Object> configPairs) { if (klass == null) return null; - Object o; + if (klass instanceof String) { try { o = Utils.newInstance((String) klass, t); @@ -401,11 +401,15 @@ public class AbstractConfig { o = Utils.newInstance((Class<?>) klass); } else throw new KafkaException("Unexpected element of type " + klass.getClass().getName() + ", expected String or Class"); - if (!t.isInstance(o)) - throw new KafkaException(klass + " is not an instance of " + t.getName()); - if (o instanceof Configurable) - ((Configurable) o).configure(configPairs); - + try { + if (!t.isInstance(o)) + throw new KafkaException(klass + " is not an instance of " + t.getName()); + if (o instanceof Configurable) + ((Configurable) o).configure(configPairs); + } catch (Exception e) { + maybeClose(o, "AutoCloseable object constructed and configured during failed call to getConfiguredInstance"); + throw e; + } return t.cast(o); } @@ -414,7 +418,7 @@ public class AbstractConfig { * Configurable configure it using the configuration. * * @param key The configuration key for the class - * @param t The interface the class should implement + * @param t The interface the class should implement * @return A configured instance of the class */ public <T> T getConfiguredInstance(String key, Class<T> t) { @@ -425,8 +429,8 @@ public class AbstractConfig { * Get a configured instance of the give class specified by the given configuration key. If the object implements * Configurable configure it using the configuration. * - * @param key The configuration key for the class - * @param t The interface the class should implement + * @param key The configuration key for the class + * @param t The interface the class should implement * @param configOverrides override origin configs * @return A configured instance of the class */ @@ -440,8 +444,9 @@ public class AbstractConfig { * Get a list of configured instances of the given class specified by the given configuration key. The configuration * may specify either null or an empty string to indicate no configured instances. In both cases, this method * returns an empty list to indicate no configured instances. + * * @param key The configuration key for the class - * @param t The interface the class should implement + * @param t The interface the class should implement * @return The list of configured instances */ public <T> List<T> getConfiguredInstances(String key, Class<T> t) { @@ -452,8 +457,9 @@ public class AbstractConfig { * Get a list of configured instances of the given class specified by the given configuration key. The configuration * may specify either null or an empty string to indicate no configured instances. In both cases, this method * returns an empty list to indicate no configured instances. - * @param key The configuration key for the class - * @param t The interface the class should implement + * + * @param key The configuration key for the class + * @param t The interface the class should implement * @param configOverrides Configuration overrides to use. * @return The list of configured instances */ @@ -465,8 +471,9 @@ public class AbstractConfig { * Get a list of configured instances of the given class specified by the given configuration key. The configuration * may specify either null or an empty string to indicate no configured instances. In both cases, this method * returns an empty list to indicate no configured instances. - * @param classNames The list of class names of the instances to create - * @param t The interface the class should implement + * + * @param classNames The list of class names of the instances to create + * @param t The interface the class should implement * @param configOverrides Configuration overrides to use. * @return The list of configured instances */ @@ -476,14 +483,28 @@ public class AbstractConfig { return objects; Map<String, Object> configPairs = originals(); configPairs.putAll(configOverrides); - for (Object klass : classNames) { - Object o = getConfiguredInstance(klass, t, configPairs); - objects.add(t.cast(o)); + + try { + for (Object klass : classNames) { + Object o = getConfiguredInstance(klass, t, configPairs); + objects.add(t.cast(o)); + } + } catch (Exception e) { + for (Object object : objects) { + maybeClose(object, "AutoCloseable object constructed and configured during failed call to getConfiguredInstances"); + } + throw e; } return objects; } - private Map<String, String> extractPotentialVariables(Map<?, ?> configMap) { + private static void maybeClose(Object object, String name) { + if (object instanceof AutoCloseable) { + Utils.closeQuietly((AutoCloseable) object, name); + } + } + + private Map<String, String> extractPotentialVariables(Map<?, ?> configMap) { // Variables are tuples of the form "${providerName:[path:]key}". From the configMap we extract the subset of configs with string // values as potential variables. Map<String, String> configMapAsString = new HashMap<>(); @@ -498,12 +519,13 @@ public class AbstractConfig { /** * Instantiates given list of config providers and fetches the actual values of config variables from the config providers. * returns a map of config key and resolved values. + * * @param configProviderProps The map of config provider configs - * @param originals The map of raw configs. + * @param originals The map of raw configs. * @return map of resolved config variable. */ @SuppressWarnings("unchecked") - private Map<String, ?> resolveConfigVariables(Map<String, ?> configProviderProps, Map<String, Object> originals) { + private Map<String, ?> resolveConfigVariables(Map<String, ?> configProviderProps, Map<String, Object> originals) { Map<String, String> providerConfigString; Map<String, ?> configProperties; Map<String, Object> resolvedOriginals = new HashMap<>(); @@ -549,7 +571,8 @@ public class AbstractConfig { * config.providers.{name}.class : The Java class name for a provider. * config.providers.{name}.param.{param-name} : A parameter to be passed to the above Java class on initialization. * returns a map of config provider name and its instance. - * @param indirectConfigs The map of potential variable configs + * + * @param indirectConfigs The map of potential variable configs * @param providerConfigProperties The map of config provider configs * @return map map of config provider name and its instance. */ @@ -562,7 +585,7 @@ public class AbstractConfig { Map<String, String> providerMap = new HashMap<>(); - for (String provider: configProviders.split(",")) { + for (String provider : configProviders.split(",")) { String providerClass = providerClassProperty(provider); if (indirectConfigs.containsKey(providerClass)) providerMap.put(provider, indirectConfigs.get(providerClass)); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java index f08ac45ddaf..7729892a64f 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java @@ -151,6 +151,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; public class KafkaConsumerTest { + private final String topic = "test"; private final Uuid topicId = Uuid.randomUuid(); private final TopicPartition tp0 = new TopicPartition(topic, 0); @@ -503,6 +504,32 @@ public class KafkaConsumerTest { } } + @Test + public void testInterceptorConstructorConfigurationWithExceptionShouldCloseRemainingInstances() { + final int targetInterceptor = 3; + + try { + Properties props = new Properties(); + props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999"); + props.setProperty(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, MockConsumerInterceptor.class.getName() + ", " + + MockConsumerInterceptor.class.getName() + ", " + + MockConsumerInterceptor.class.getName()); + + MockConsumerInterceptor.setThrowOnConfigExceptionThreshold(targetInterceptor); + + assertThrows(KafkaException.class, () -> { + new KafkaConsumer<>( + props, new StringDeserializer(), new StringDeserializer()); + }); + + assertEquals(3, MockConsumerInterceptor.CONFIG_COUNT.get()); + assertEquals(3, MockConsumerInterceptor.CLOSE_COUNT.get()); + + } finally { + MockConsumerInterceptor.resetCounters(); + } + } + @Test public void testPause() { KafkaConsumer<byte[], byte[]> consumer = newConsumer(groupId); diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java index a1a854fc671..8023c41ac36 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java @@ -569,7 +569,31 @@ public class KafkaProducerTest { MockProducerInterceptor.resetCounters(); } } + @Test + public void testInterceptorConstructorConfigurationWithExceptionShouldCloseRemainingInstances() { + final int targetInterceptor = 3; + try { + Properties props = new Properties(); + props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999"); + props.setProperty(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, org.apache.kafka.test.MockProducerInterceptor.class.getName() + ", " + + org.apache.kafka.test.MockProducerInterceptor.class.getName() + ", " + + org.apache.kafka.test.MockProducerInterceptor.class.getName()); + props.setProperty(MockProducerInterceptor.APPEND_STRING_PROP, "something"); + + MockProducerInterceptor.setThrowOnConfigExceptionThreshold(targetInterceptor); + + assertThrows(KafkaException.class, () -> { + new KafkaProducer<>( + props, new StringSerializer(), new StringSerializer()); + }); + + assertEquals(3, MockProducerInterceptor.CONFIG_COUNT.get()); + assertEquals(3, MockProducerInterceptor.CLOSE_COUNT.get()); + } finally { + MockProducerInterceptor.resetCounters(); + } + } @Test public void testPartitionerClose() { try { diff --git a/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java b/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java index e07cdb8de16..c0c6f8cee37 100644 --- a/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java +++ b/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java @@ -26,6 +26,7 @@ import org.apache.kafka.common.metrics.MetricsReporter; import org.apache.kafka.common.security.TestSecurityConfig; import org.apache.kafka.common.config.provider.MockVaultConfigProvider; import org.apache.kafka.common.config.provider.MockFileConfigProvider; +import org.apache.kafka.test.MockConsumerInterceptor; import org.junit.jupiter.api.Test; import java.util.Arrays; @@ -259,6 +260,30 @@ public class AbstractConfigTest { } } + @Test + public void testConfiguredInstancesClosedOnFailure() { + + try { + Map<String, String> props = new HashMap<>(); + String threeConsumerInterceptors = MockConsumerInterceptor.class.getName() + ", " + + MockConsumerInterceptor.class.getName() + ", " + + MockConsumerInterceptor.class.getName(); + props.put(TestConfig.METRIC_REPORTER_CLASSES_CONFIG, threeConsumerInterceptors); + props.put("client.id", "test"); + TestConfig testConfig = new TestConfig(props); + + MockConsumerInterceptor.setThrowOnConfigExceptionThreshold(3); + assertThrows( + Exception.class, + () -> testConfig.getConfiguredInstances(TestConfig.METRIC_REPORTER_CLASSES_CONFIG, Object.class) + ); + assertEquals(3, MockConsumerInterceptor.CONFIG_COUNT.get()); + assertEquals(3, MockConsumerInterceptor.CLOSE_COUNT.get()); + } finally { + MockConsumerInterceptor.resetCounters(); + } + } + @Test public void testClassConfigs() { class RestrictedClassLoader extends ClassLoader { @@ -585,7 +610,6 @@ public class AbstractConfigTest { public static final String METRIC_REPORTER_CLASSES_CONFIG = "metric.reporters"; private static final String METRIC_REPORTER_CLASSES_DOC = "A list of classes to use as metrics reporters."; - static { CONFIG = new ConfigDef().define(METRIC_REPORTER_CLASSES_CONFIG, Type.LIST, diff --git a/clients/src/test/java/org/apache/kafka/test/MockConsumerInterceptor.java b/clients/src/test/java/org/apache/kafka/test/MockConsumerInterceptor.java index b01584b62a2..ccc93c9accb 100644 --- a/clients/src/test/java/org/apache/kafka/test/MockConsumerInterceptor.java +++ b/clients/src/test/java/org/apache/kafka/test/MockConsumerInterceptor.java @@ -41,6 +41,9 @@ public class MockConsumerInterceptor implements ClusterResourceListener, Consume public static final AtomicInteger INIT_COUNT = new AtomicInteger(0); public static final AtomicInteger CLOSE_COUNT = new AtomicInteger(0); public static final AtomicInteger ON_COMMIT_COUNT = new AtomicInteger(0); + public static final AtomicInteger CONFIG_COUNT = new AtomicInteger(0); + public static final AtomicInteger THROW_CONFIG_EXCEPTION = new AtomicInteger(0); + public static final AtomicInteger THROW_ON_CONFIG_EXCEPTION_THRESHOLD = new AtomicInteger(0); public static final AtomicReference<ClusterResource> CLUSTER_META = new AtomicReference<>(); public static final ClusterResource NO_CLUSTER_ID = new ClusterResource("no_cluster_id"); public static final AtomicReference<ClusterResource> CLUSTER_ID_BEFORE_ON_CONSUME = new AtomicReference<>(NO_CLUSTER_ID); @@ -55,6 +58,11 @@ public class MockConsumerInterceptor implements ClusterResourceListener, Consume Object clientIdValue = configs.get(ConsumerConfig.CLIENT_ID_CONFIG); if (clientIdValue == null) throw new ConfigException("Mock consumer interceptor expects configuration " + ProducerConfig.CLIENT_ID_CONFIG); + + CONFIG_COUNT.incrementAndGet(); + if (CONFIG_COUNT.get() == THROW_ON_CONFIG_EXCEPTION_THRESHOLD.get()) { + throw new ConfigException("Failed to instantiate interceptor. Reached configuration exception threshold."); + } } @Override @@ -90,10 +98,16 @@ public class MockConsumerInterceptor implements ClusterResourceListener, Consume CLOSE_COUNT.incrementAndGet(); } + public static void setThrowOnConfigExceptionThreshold(int value) { + THROW_ON_CONFIG_EXCEPTION_THRESHOLD.set(value); + } + public static void resetCounters() { INIT_COUNT.set(0); CLOSE_COUNT.set(0); ON_COMMIT_COUNT.set(0); + CONFIG_COUNT.set(0); + THROW_CONFIG_EXCEPTION.set(0); CLUSTER_META.set(null); CLUSTER_ID_BEFORE_ON_CONSUME.set(NO_CLUSTER_ID); } diff --git a/clients/src/test/java/org/apache/kafka/test/MockProducerInterceptor.java b/clients/src/test/java/org/apache/kafka/test/MockProducerInterceptor.java index eedc3bdaecd..6ef4f50893e 100644 --- a/clients/src/test/java/org/apache/kafka/test/MockProducerInterceptor.java +++ b/clients/src/test/java/org/apache/kafka/test/MockProducerInterceptor.java @@ -32,6 +32,9 @@ public class MockProducerInterceptor implements ClusterResourceListener, Produce public static final AtomicInteger INIT_COUNT = new AtomicInteger(0); public static final AtomicInteger CLOSE_COUNT = new AtomicInteger(0); public static final AtomicInteger ONSEND_COUNT = new AtomicInteger(0); + public static final AtomicInteger CONFIG_COUNT = new AtomicInteger(0); + public static final AtomicInteger THROW_CONFIG_EXCEPTION = new AtomicInteger(0); + public static final AtomicInteger THROW_ON_CONFIG_EXCEPTION_THRESHOLD = new AtomicInteger(0); public static final AtomicInteger ON_SUCCESS_COUNT = new AtomicInteger(0); public static final AtomicInteger ON_ERROR_COUNT = new AtomicInteger(0); public static final AtomicInteger ON_ERROR_WITH_METADATA_COUNT = new AtomicInteger(0); @@ -59,6 +62,11 @@ public class MockProducerInterceptor implements ClusterResourceListener, Produce Object clientIdValue = configs.get(ProducerConfig.CLIENT_ID_CONFIG); if (clientIdValue == null) throw new ConfigException("Mock producer interceptor expects configuration " + ProducerConfig.CLIENT_ID_CONFIG); + + CONFIG_COUNT.incrementAndGet(); + if (CONFIG_COUNT.get() == THROW_ON_CONFIG_EXCEPTION_THRESHOLD.get()) { + throw new ConfigException("Failed to instantiate interceptor. Reached configuration exception threshold."); + } } @Override @@ -89,10 +97,16 @@ public class MockProducerInterceptor implements ClusterResourceListener, Produce CLOSE_COUNT.incrementAndGet(); } + public static void setThrowOnConfigExceptionThreshold(int value) { + THROW_ON_CONFIG_EXCEPTION_THRESHOLD.set(value); + } + public static void resetCounters() { INIT_COUNT.set(0); CLOSE_COUNT.set(0); ONSEND_COUNT.set(0); + CONFIG_COUNT.set(0); + THROW_CONFIG_EXCEPTION.set(0); ON_SUCCESS_COUNT.set(0); ON_ERROR_COUNT.set(0); ON_ERROR_WITH_METADATA_COUNT.set(0);