This is an automated email from the ASF dual-hosted git repository. kkarantasis pushed a commit to branch 2.3 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.3 by this push: new a342103 KAFKA-9969: Exclude ConnectorClientConfigRequest from class loading isolation (#8630) a342103 is described below commit a3421039e531db944b3acda12c4f091741afe06d Author: Greg Harris <gr...@confluent.io> AuthorDate: Wed Jun 10 15:04:36 2020 -0700 KAFKA-9969: Exclude ConnectorClientConfigRequest from class loading isolation (#8630) This fix excludes `ConnectorClientConfigRequest` and its inner class from class loading isolation in a similar way that KAFKA-8415 excluded `ConnectorClientConfigOverridePolicy`. Reviewer: Konstantine Karantasis <konstant...@confluent.io> --- .../connect/runtime/isolation/PluginUtils.java | 2 +- .../connect/runtime/isolation/PluginUtilsTest.java | 343 +++++++++++++++------ 2 files changed, 248 insertions(+), 97 deletions(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java index b9d5470..63805b1 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java @@ -135,7 +135,7 @@ public class PluginUtils { + "|storage\\.StringConverter" + "|storage\\.SimpleHeaderConverter" + "|rest\\.basic\\.auth\\.extension\\.BasicAuthSecurityRestExtension" - + "|connector\\.policy\\.(?!ConnectorClientConfigOverridePolicy$).*" + + "|connector\\.policy\\.(?!ConnectorClientConfig(?:OverridePolicy|Request(?:\\$ClientType)?)$).*" + ")" + "|common\\.config\\.provider\\.(?!ConfigProvider$).*" + ")$"); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginUtilsTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginUtilsTest.java index bf441ff..d59967d 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginUtilsTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginUtilsTest.java @@ -70,7 +70,7 @@ public class PluginUtilsTest { } @Test - public void testConnectFrameworkClasses() { + public void testKafkaDependencyClasses() { assertFalse(PluginUtils.shouldLoadInIsolation("org.apache.kafka.common.")); assertFalse(PluginUtils.shouldLoadInIsolation( "org.apache.kafka.common.config.AbstractConfig") @@ -81,30 +81,6 @@ public class PluginUtilsTest { assertFalse(PluginUtils.shouldLoadInIsolation( "org.apache.kafka.common.serialization.Deserializer") ); - assertFalse(PluginUtils.shouldLoadInIsolation("org.apache.kafka.connect.")); - assertFalse(PluginUtils.shouldLoadInIsolation( - "org.apache.kafka.connect.connector.Connector") - ); - assertFalse(PluginUtils.shouldLoadInIsolation( - "org.apache.kafka.connect.source.SourceConnector") - ); - assertFalse(PluginUtils.shouldLoadInIsolation( - "org.apache.kafka.connect.sink.SinkConnector") - ); - assertFalse(PluginUtils.shouldLoadInIsolation("org.apache.kafka.connect.connector.Task")); - assertFalse(PluginUtils.shouldLoadInIsolation( - "org.apache.kafka.connect.source.SourceTask") - ); - assertFalse(PluginUtils.shouldLoadInIsolation("org.apache.kafka.connect.sink.SinkTask")); - assertFalse(PluginUtils.shouldLoadInIsolation( - "org.apache.kafka.connect.transforms.Transformation") - ); - assertFalse(PluginUtils.shouldLoadInIsolation( - "org.apache.kafka.connect.storage.Converter") - ); - assertFalse(PluginUtils.shouldLoadInIsolation( - "org.apache.kafka.connect.storage.OffsetBackingStore") - ); assertFalse(PluginUtils.shouldLoadInIsolation( "org.apache.kafka.clients.producer.ProducerConfig") ); @@ -114,62 +90,256 @@ public class PluginUtilsTest { assertFalse(PluginUtils.shouldLoadInIsolation( "org.apache.kafka.clients.admin.KafkaAdminClient") ); - assertFalse(PluginUtils.shouldLoadInIsolation( - "org.apache.kafka.connect.rest.ConnectRestExtension") - ); } @Test - public void testAllowedConnectFrameworkClasses() { - assertTrue(PluginUtils.shouldLoadInIsolation("org.apache.kafka.connect.transforms.")); - assertTrue(PluginUtils.shouldLoadInIsolation( - "org.apache.kafka.connect.transforms.ExtractField") - ); - assertTrue(PluginUtils.shouldLoadInIsolation( - "org.apache.kafka.connect.transforms.ExtractField$Key") - ); - assertTrue(PluginUtils.shouldLoadInIsolation("org.apache.kafka.connect.json.")); - assertTrue(PluginUtils.shouldLoadInIsolation( - "org.apache.kafka.connect.json.JsonConverter") - ); - assertTrue(PluginUtils.shouldLoadInIsolation( - "org.apache.kafka.connect.json.JsonConverter$21") - ); - assertTrue(PluginUtils.shouldLoadInIsolation("org.apache.kafka.connect.file.")); - assertTrue(PluginUtils.shouldLoadInIsolation( - "org.apache.kafka.connect.file.FileStreamSourceTask") - ); - assertTrue(PluginUtils.shouldLoadInIsolation( - "org.apache.kafka.connect.file.FileStreamSinkConnector") - ); - assertTrue(PluginUtils.shouldLoadInIsolation("org.apache.kafka.connect.converters.")); - assertTrue(PluginUtils.shouldLoadInIsolation( - "org.apache.kafka.connect.converters.ByteArrayConverter") - ); - assertTrue(PluginUtils.shouldLoadInIsolation( - "org.apache.kafka.connect.converters.DoubleConverter") - ); - assertTrue(PluginUtils.shouldLoadInIsolation( - "org.apache.kafka.connect.converters.FloatConverter") - ); - assertTrue(PluginUtils.shouldLoadInIsolation( - "org.apache.kafka.connect.converters.IntegerConverter") - ); - assertTrue(PluginUtils.shouldLoadInIsolation( - "org.apache.kafka.connect.converters.LongConverter") - ); - assertTrue(PluginUtils.shouldLoadInIsolation( - "org.apache.kafka.connect.converters.ShortConverter") - ); - assertTrue(PluginUtils.shouldLoadInIsolation( - "org.apache.kafka.connect.storage.StringConverter") - ); - assertTrue(PluginUtils.shouldLoadInIsolation( - "org.apache.kafka.connect.storage.SimpleHeaderConverter") - ); - assertTrue(PluginUtils.shouldLoadInIsolation( + public void testConnectApiClasses() { + List<String> apiClasses = Arrays.asList( + // Enumerate all packages and classes + "org.apache.kafka.connect.", + "org.apache.kafka.connect.components.", + "org.apache.kafka.connect.components.Versioned", + //"org.apache.kafka.connect.connector.policy.", isolated by default + "org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy", + "org.apache.kafka.connect.connector.policy.ConnectorClientConfigRequest", + "org.apache.kafka.connect.connector.policy.ConnectorClientConfigRequest$ClientType", + "org.apache.kafka.connect.connector.", + "org.apache.kafka.connect.connector.Connector", + "org.apache.kafka.connect.connector.ConnectorContext", + "org.apache.kafka.connect.connector.ConnectRecord", + "org.apache.kafka.connect.connector.Task", + "org.apache.kafka.connect.data.", + "org.apache.kafka.connect.data.ConnectSchema", + "org.apache.kafka.connect.data.Date", + "org.apache.kafka.connect.data.Decimal", + "org.apache.kafka.connect.data.Field", + "org.apache.kafka.connect.data.Schema", + "org.apache.kafka.connect.data.SchemaAndValue", + "org.apache.kafka.connect.data.SchemaBuilder", + "org.apache.kafka.connect.data.SchemaProjector", + "org.apache.kafka.connect.data.Struct", + "org.apache.kafka.connect.data.Time", + "org.apache.kafka.connect.data.Timestamp", + "org.apache.kafka.connect.data.Values", + "org.apache.kafka.connect.errors.", + "org.apache.kafka.connect.errors.AlreadyExistsException", + "org.apache.kafka.connect.errors.ConnectException", + "org.apache.kafka.connect.errors.DataException", + "org.apache.kafka.connect.errors.IllegalWorkerStateException", + "org.apache.kafka.connect.errors.NotFoundException", + "org.apache.kafka.connect.errors.RetriableException", + "org.apache.kafka.connect.errors.SchemaBuilderException", + "org.apache.kafka.connect.errors.SchemaProjectorException", + "org.apache.kafka.connect.header.", + "org.apache.kafka.connect.header.ConnectHeader", + "org.apache.kafka.connect.header.ConnectHeaders", + "org.apache.kafka.connect.header.Header", + "org.apache.kafka.connect.header.Headers", + "org.apache.kafka.connect.health.", + "org.apache.kafka.connect.health.AbstractState", + "org.apache.kafka.connect.health.ConnectClusterDetails", + "org.apache.kafka.connect.health.ConnectClusterState", + "org.apache.kafka.connect.health.ConnectorHealth", + "org.apache.kafka.connect.health.ConnectorState", + "org.apache.kafka.connect.health.ConnectorType", + "org.apache.kafka.connect.health.TaskState", + "org.apache.kafka.connect.rest.", + "org.apache.kafka.connect.rest.ConnectRestExtension", + "org.apache.kafka.connect.rest.ConnectRestExtensionContext", + "org.apache.kafka.connect.sink.", + "org.apache.kafka.connect.sink.SinkConnector", + "org.apache.kafka.connect.sink.SinkRecord", + "org.apache.kafka.connect.sink.SinkTask", + "org.apache.kafka.connect.sink.SinkTaskContext", + "org.apache.kafka.connect.sink.ErrantRecordReporter", + "org.apache.kafka.connect.source.", + "org.apache.kafka.connect.source.SourceConnector", + "org.apache.kafka.connect.source.SourceRecord", + "org.apache.kafka.connect.source.SourceTask", + "org.apache.kafka.connect.source.SourceTaskContext", + "org.apache.kafka.connect.storage.", + "org.apache.kafka.connect.storage.Converter", + "org.apache.kafka.connect.storage.ConverterConfig", + "org.apache.kafka.connect.storage.ConverterType", + "org.apache.kafka.connect.storage.HeaderConverter", + "org.apache.kafka.connect.storage.OffsetStorageReader", + //"org.apache.kafka.connect.storage.SimpleHeaderConverter", explicitly isolated + //"org.apache.kafka.connect.storage.StringConverter", explicitly isolated + "org.apache.kafka.connect.storage.StringConverterConfig", + //"org.apache.kafka.connect.transforms.", isolated by default + "org.apache.kafka.connect.transforms.Transformation", + "org.apache.kafka.connect.util.", + "org.apache.kafka.connect.util.ConnectorUtils" + ); + // Classes in the API should never be loaded in isolation. + for (String clazz : apiClasses) { + assertFalse( + clazz + " from 'api' is loaded in isolation but should not be", + PluginUtils.shouldLoadInIsolation(clazz) + ); + } + } + + @Test + public void testConnectRuntimeClasses() { + // Only list packages, because there are too many classes. + List<String> runtimeClasses = Arrays.asList( + "org.apache.kafka.connect.cli.", + //"org.apache.kafka.connect.connector.policy.", isolated by default + //"org.apache.kafka.connect.converters.", isolated by default + "org.apache.kafka.connect.runtime.", + "org.apache.kafka.connect.runtime.distributed", + "org.apache.kafka.connect.runtime.errors", + "org.apache.kafka.connect.runtime.health", + "org.apache.kafka.connect.runtime.isolation", + "org.apache.kafka.connect.runtime.rest.", + "org.apache.kafka.connect.runtime.rest.entities.", + "org.apache.kafka.connect.runtime.rest.errors.", + "org.apache.kafka.connect.runtime.rest.resources.", + "org.apache.kafka.connect.runtime.rest.util.", + "org.apache.kafka.connect.runtime.standalone.", + "org.apache.kafka.connect.runtime.rest.", + "org.apache.kafka.connect.storage.", + "org.apache.kafka.connect.tools.", + "org.apache.kafka.connect.util." + ); + for (String clazz : runtimeClasses) { + assertFalse( + clazz + " from 'runtime' is loaded in isolation but should not be", + PluginUtils.shouldLoadInIsolation(clazz) + ); + } + } + + @Test + public void testAllowedRuntimeClasses() { + List<String> jsonConverterClasses = Arrays.asList( + "org.apache.kafka.connect.connector.policy.", + "org.apache.kafka.connect.connector.policy.AbstractConnectorClientConfigOverridePolicy", + "org.apache.kafka.connect.connector.policy.AllConnectorClientConfigOverridePolicy", + "org.apache.kafka.connect.connector.policy.NoneConnectorClientConfigOverridePolicy", + "org.apache.kafka.connect.connector.policy.PrincipalConnectorClientConfigOverridePolicy", + "org.apache.kafka.connect.converters.", + "org.apache.kafka.connect.converters.ByteArrayConverter", + "org.apache.kafka.connect.converters.DoubleConverter", + "org.apache.kafka.connect.converters.FloatConverter", + "org.apache.kafka.connect.converters.IntegerConverter", + "org.apache.kafka.connect.converters.LongConverter", + "org.apache.kafka.connect.converters.NumberConverter", + "org.apache.kafka.connect.converters.NumberConverterConfig", + "org.apache.kafka.connect.converters.ShortConverter", + //"org.apache.kafka.connect.storage.", not isolated by default + "org.apache.kafka.connect.storage.StringConverter", + "org.apache.kafka.connect.storage.SimpleHeaderConverter" + ); + for (String clazz : jsonConverterClasses) { + assertTrue( + clazz + " from 'runtime' is not loaded in isolation but should be", + PluginUtils.shouldLoadInIsolation(clazz) + ); + } + } + + @Test + public void testTransformsClasses() { + List<String> transformsClasses = Arrays.asList( + "org.apache.kafka.connect.transforms.", + "org.apache.kafka.connect.transforms.util.", + "org.apache.kafka.connect.transforms.util.NonEmptyListValidator", + "org.apache.kafka.connect.transforms.util.RegexValidator", + "org.apache.kafka.connect.transforms.util.Requirements", + "org.apache.kafka.connect.transforms.util.SchemaUtil", + "org.apache.kafka.connect.transforms.util.SimpleConfig", + "org.apache.kafka.connect.transforms.Cast", + "org.apache.kafka.connect.transforms.Cast$Key", + "org.apache.kafka.connect.transforms.Cast$Value", + "org.apache.kafka.connect.transforms.ExtractField", + "org.apache.kafka.connect.transforms.ExtractField$Key", + "org.apache.kafka.connect.transforms.ExtractField$Value", + "org.apache.kafka.connect.transforms.Flatten", + "org.apache.kafka.connect.transforms.Flatten$Key", + "org.apache.kafka.connect.transforms.Flatten$Value", + "org.apache.kafka.connect.transforms.HoistField", + "org.apache.kafka.connect.transforms.HoistField$Key", + "org.apache.kafka.connect.transforms.HoistField$Key", + "org.apache.kafka.connect.transforms.InsertField", + "org.apache.kafka.connect.transforms.InsertField$Key", + "org.apache.kafka.connect.transforms.InsertField$Value", + "org.apache.kafka.connect.transforms.MaskField", + "org.apache.kafka.connect.transforms.MaskField$Key", + "org.apache.kafka.connect.transforms.MaskField$Value", + "org.apache.kafka.connect.transforms.RegexRouter", + "org.apache.kafka.connect.transforms.ReplaceField", + "org.apache.kafka.connect.transforms.ReplaceField$Key", + "org.apache.kafka.connect.transforms.ReplaceField$Value", + "org.apache.kafka.connect.transforms.SetSchemaMetadata", + "org.apache.kafka.connect.transforms.SetSchemaMetadata$Key", + "org.apache.kafka.connect.transforms.SetSchemaMetadata$Value", + "org.apache.kafka.connect.transforms.TimestampConverter", + "org.apache.kafka.connect.transforms.TimestampConverter$Key", + "org.apache.kafka.connect.transforms.TimestampConverter$Value", + "org.apache.kafka.connect.transforms.TimestampRouter", + "org.apache.kafka.connect.transforms.TimestampRouter$Key", + "org.apache.kafka.connect.transforms.TimestampRouter$Value", + "org.apache.kafka.connect.transforms.ValueToKey" + ); + for (String clazz : transformsClasses) { + assertTrue( + clazz + " from 'transforms' is not loaded in isolation but should be", + PluginUtils.shouldLoadInIsolation(clazz) + ); + } + } + + @Test + public void testAllowedJsonConverterClasses() { + List<String> jsonConverterClasses = Arrays.asList( + "org.apache.kafka.connect.json.", + "org.apache.kafka.connect.json.DecimalFormat", + "org.apache.kafka.connect.json.JsonConverter", + "org.apache.kafka.connect.json.JsonConverterConfig", + "org.apache.kafka.connect.json.JsonDeserializer", + "org.apache.kafka.connect.json.JsonSchema", + "org.apache.kafka.connect.json.JsonSerializer" + ); + for (String clazz : jsonConverterClasses) { + assertTrue( + clazz + " from 'json' is not loaded in isolation but should be", + PluginUtils.shouldLoadInIsolation(clazz) + ); + } + } + + @Test + public void testAllowedFileConnectors() { + List<String> jsonConverterClasses = Arrays.asList( + "org.apache.kafka.connect.file.", + "org.apache.kafka.connect.file.FileStreamSinkConnector", + "org.apache.kafka.connect.file.FileStreamSinkTask", + "org.apache.kafka.connect.file.FileStreamSourceConnector", + "org.apache.kafka.connect.file.FileStreamSourceTask" + ); + for (String clazz : jsonConverterClasses) { + assertTrue( + clazz + " from 'file' is not loaded in isolation but should be", + PluginUtils.shouldLoadInIsolation(clazz) + ); + } + } + + @Test + public void testAllowedBasicAuthExtensionClasses() { + List<String> basicAuthExtensionClasses = Arrays.asList( "org.apache.kafka.connect.rest.basic.auth.extension.BasicAuthSecurityRestExtension" - )); + //"org.apache.kafka.connect.rest.basic.auth.extension.JaasBasicAuthFilter", TODO fix? + //"org.apache.kafka.connect.rest.basic.auth.extension.PropertyFileLoginModule" TODO fix? + ); + for (String clazz : basicAuthExtensionClasses) { + assertTrue( + clazz + " from 'basic-auth-extension' is not loaded in isolation but should be", + PluginUtils.shouldLoadInIsolation(clazz) + ); + } } @Test @@ -186,25 +356,6 @@ public class PluginUtilsTest { } @Test - public void testConnectorClientConfigOverridePolicy() { - assertFalse(PluginUtils.shouldLoadInIsolation( - "org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy") - ); - assertTrue(PluginUtils.shouldLoadInIsolation( - "org.apache.kafka.connect.connector.policy.AbstractConnectorClientConfigOverridePolicy") - ); - assertTrue(PluginUtils.shouldLoadInIsolation( - "org.apache.kafka.connect.connector.policy.AllConnectorClientConfigOverridePolicy") - ); - assertTrue(PluginUtils.shouldLoadInIsolation( - "org.apache.kafka.connect.connector.policy.NoneConnectorClientConfigOverridePolicy") - ); - assertTrue(PluginUtils.shouldLoadInIsolation( - "org.apache.kafka.connect.connector.policy.PrincipalConnectorClientConfigOverridePolicy") - ); - } - - @Test public void testEmptyPluginUrls() throws Exception { assertEquals(Collections.<Path>emptyList(), PluginUtils.pluginUrls(pluginPath)); }