This is an automated email from the ASF dual-hosted git repository. nicoloboschi pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new f7c0b3c49c9 [improve][fn] Allow unknown fields in connectors config (#20116) f7c0b3c49c9 is described below commit f7c0b3c49c9ad8c28d0b00aa30d727850eb8bc04 Author: Nicolò Boschi <boschi1...@gmail.com> AuthorDate: Wed Apr 26 17:36:56 2023 +0200 [improve][fn] Allow unknown fields in connectors config (#20116) --- conf/functions_worker.yml | 6 ++ .../pulsar/functions/instance/InstanceConfig.java | 1 + .../functions/instance/JavaInstanceRunnable.java | 97 +++++++++++++++++-- .../instance/JavaInstanceRunnableTest.java | 104 ++++++++++++++++++--- .../functions/runtime/JavaInstanceStarter.java | 7 ++ .../pulsar/functions/runtime/RuntimeUtils.java | 6 +- .../pulsar/functions/worker/WorkerConfig.java | 11 +++ .../pulsar/functions/worker/FunctionActioner.java | 1 + .../resources/META-INF/services/pulsar-io.yaml | 3 +- .../resources/META-INF/services/pulsar-io.yaml | 1 + .../resources/META-INF/services/pulsar-io.yaml | 1 + .../resources/META-INF/services/pulsar-io.yaml | 1 + .../resources/META-INF/services/pulsar-io.yaml | 1 + .../resources/META-INF/services/pulsar-io.yaml | 1 + 14 files changed, 217 insertions(+), 24 deletions(-) diff --git a/conf/functions_worker.yml b/conf/functions_worker.yml index bb15e0ca416..4c5b6aab1b7 100644 --- a/conf/functions_worker.yml +++ b/conf/functions_worker.yml @@ -407,6 +407,12 @@ validateConnectorConfig: false # If it is set to true, you must ensure that it has been initialized by "bin/pulsar initialize-cluster-metadata" command. initializedDlogMetadata: false +# Whether to ignore unknown properties when deserializing the connector configuration. +# After upgrading a connector to a new version with a new configuration, the new configuration may not be compatible with the old connector. +# In case of rollback, it's required to also rollback the connector configuration. +# Ignoring unknown fields makes possible to keep the new configuration and only rollback the connector. +ignoreUnknownConfigFields: false + ########################### # Arbitrary Configuration ########################### diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceConfig.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceConfig.java index 1a89505d9bb..fcee6d734d6 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceConfig.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceConfig.java @@ -48,6 +48,7 @@ public class InstanceConfig { private boolean exposePulsarAdminClientEnabled = false; private int metricsPort; private List<String> additionalJavaRuntimeArguments = Collections.emptyList(); + private boolean ignoreUnknownConfigFields; /** * Get the string representation of {@link #getInstanceId()}. diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java index e2ad9e4c989..c3f36f754da 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java @@ -19,13 +19,20 @@ package org.apache.pulsar.functions.instance; import static org.apache.pulsar.functions.utils.FunctionCommon.convertFromFunctionDetailsSubscriptionPosition; +import com.fasterxml.jackson.core.JsonParser; import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.DeserializationConfig; +import com.fasterxml.jackson.databind.DeserializationContext; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.deser.BeanDeserializer; import com.google.common.annotations.VisibleForTesting; import com.scurrilous.circe.checksum.Crc32cIntChecksum; import io.netty.buffer.ByteBuf; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Objects; import java.util.TreeMap; @@ -34,6 +41,7 @@ import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.Consumer; import lombok.Getter; +import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import net.jodah.typetools.TypeResolver; import org.apache.commons.lang3.StringUtils; @@ -59,6 +67,7 @@ import org.apache.pulsar.client.impl.schema.ProtobufSchema; import org.apache.pulsar.common.functions.ConsumerConfig; import org.apache.pulsar.common.functions.FunctionConfig; import org.apache.pulsar.common.functions.ProducerConfig; +import org.apache.pulsar.common.nar.NarClassLoader; import org.apache.pulsar.common.protocol.schema.SchemaVersion; import org.apache.pulsar.common.schema.KeyValue; import org.apache.pulsar.common.schema.KeyValueEncodingType; @@ -94,6 +103,7 @@ import org.apache.pulsar.functions.source.SingleConsumerPulsarSourceConfig; import org.apache.pulsar.functions.source.batch.BatchSourceExecutor; import org.apache.pulsar.functions.utils.CryptoUtils; import org.apache.pulsar.functions.utils.FunctionCommon; +import org.apache.pulsar.functions.utils.io.ConnectorUtils; import org.apache.pulsar.io.core.Sink; import org.apache.pulsar.io.core.Source; import org.slf4j.Logger; @@ -855,10 +865,7 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable { if (sourceSpec.getConfigs().isEmpty()) { this.source.open(new HashMap<>(), contextImpl); } else { - this.source.open( - ObjectMapperFactory.getMapper().reader().forType(new TypeReference<Map<String, Object>>() { - }).readValue(sourceSpec.getConfigs()) - , contextImpl); + this.source.open(parseComponentConfig(sourceSpec.getConfigs()), contextImpl); } if (this.source instanceof PulsarSource) { contextImpl.setInputConsumers(((PulsarSource) this.source).getInputConsumers()); @@ -870,6 +877,83 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable { Thread.currentThread().setContextClassLoader(this.instanceClassLoader); } } + private Map<String, Object> parseComponentConfig(String connectorConfigs) throws IOException { + return parseComponentConfig(connectorConfigs, instanceConfig, componentClassLoader, componentType); + } + + static Map<String, Object> parseComponentConfig(String connectorConfigs, + InstanceConfig instanceConfig, + ClassLoader componentClassLoader, + org.apache.pulsar.functions.proto.Function + .FunctionDetails.ComponentType componentType) + throws IOException { + final Map<String, Object> config = ObjectMapperFactory + .getMapper() + .reader() + .forType(new TypeReference<Map<String, Object>>() {}) + .readValue(connectorConfigs); + if (instanceConfig.isIgnoreUnknownConfigFields() && componentClassLoader instanceof NarClassLoader) { + final String configClassName; + if (componentType == org.apache.pulsar.functions.proto.Function.FunctionDetails.ComponentType.SOURCE) { + configClassName = ConnectorUtils + .getConnectorDefinition((NarClassLoader) componentClassLoader).getSourceConfigClass(); + } else if (componentType == org.apache.pulsar.functions.proto.Function.FunctionDetails.ComponentType.SINK) { + configClassName = ConnectorUtils + .getConnectorDefinition((NarClassLoader) componentClassLoader).getSinkConfigClass(); + } else { + return config; + } + if (configClassName != null) { + + Class<?> configClass; + try { + configClass = Class.forName(configClassName, + true, Thread.currentThread().getContextClassLoader()); + } catch (ClassNotFoundException e) { + throw new RuntimeException("Config class not found: " + configClassName, e); + } + final List<String> allFields = BeanPropertiesReader.getBeanProperties(configClass); + + for (String s : config.keySet()) { + if (!allFields.contains(s)) { + log.error("Field '{}' not defined in the {} configuration {}, the field will be ignored", + s, + componentType, + configClass); + config.remove(s); + } + } + } + } + return config; + } + + static final class BeanPropertiesReader { + + private static final MapperBeanReader reader = new MapperBeanReader(); + + private static final class MapperBeanReader extends ObjectMapper { + @SneakyThrows + List<String> getBeanProperties(Class<?> valueType) { + final JsonParser parser = ObjectMapperFactory + .getMapper() + .getObjectMapper() + .createParser(""); + DeserializationConfig config = getDeserializationConfig(); + DeserializationContext ctxt = createDeserializationContext(parser, config); + BeanDeserializer deser = (BeanDeserializer) + _findRootDeserializer(ctxt, _typeFactory.constructType(valueType)); + List<String> list = new ArrayList<>(); + deser.properties().forEachRemaining(p -> list.add(p.getName())); + return list; + } + } + + static List<String> getBeanProperties(Class<?> valueType) { + return reader.getBeanProperties(valueType); + } + } + private void setupOutput(ContextImpl contextImpl) throws Exception { @@ -940,9 +1024,8 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable { log.debug("Opening Sink with SinkSpec {} and contextImpl: {} ", sinkSpec, contextImpl.toString()); } - this.sink.open(ObjectMapperFactory.getMapper().reader().forType( - new TypeReference<Map<String, Object>>() { - }).readValue(sinkSpec.getConfigs()), contextImpl); + final Map<String, Object> config = parseComponentConfig(sinkSpec.getConfigs()); + this.sink.open(config, contextImpl); } } catch (Exception e) { log.error("Sink open produced uncaught exception: ", e); diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java index a36a3ca62d1..5fea8bcc9fd 100644 --- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java +++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java @@ -18,19 +18,27 @@ */ package org.apache.pulsar.functions.instance; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.clearInvocations; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -import com.fasterxml.jackson.core.type.TypeReference; -import com.fasterxml.jackson.databind.ObjectMapper; + import java.lang.reflect.Field; import java.lang.reflect.Method; +import java.util.Arrays; +import java.util.List; import java.util.Map; +import java.util.TreeSet; + +import com.fasterxml.jackson.annotation.JsonIgnore; import lombok.Getter; import lombok.Setter; import org.apache.pulsar.client.api.ClientBuilder; +import org.apache.pulsar.common.io.ConnectorDefinition; +import org.apache.pulsar.common.nar.NarClassLoader; +import org.apache.pulsar.common.util.ObjectMapperFactory; import org.apache.pulsar.functions.api.Context; import org.apache.pulsar.functions.api.Function; import org.apache.pulsar.functions.api.Record; @@ -38,11 +46,10 @@ import org.apache.pulsar.functions.api.SerDe; import org.apache.pulsar.functions.instance.stats.ComponentStatsManager; import org.apache.pulsar.functions.proto.Function.FunctionDetails; import org.apache.pulsar.functions.proto.Function.SinkSpec; -import org.apache.pulsar.functions.proto.Function.SinkSpecOrBuilder; -import org.apache.pulsar.functions.proto.Function.SourceSpecOrBuilder; import org.apache.pulsar.functions.proto.InstanceCommunication; import org.jetbrains.annotations.NotNull; import org.testng.Assert; +import org.testng.annotations.DataProvider; import org.testng.annotations.Test; public class JavaInstanceRunnableTest { @@ -159,7 +166,7 @@ public class JavaInstanceRunnableTest { @NotNull private JavaInstanceRunnable getJavaInstanceRunnable(boolean autoAck, - org.apache.pulsar.functions.proto.Function.ProcessingGuarantees processingGuarantees) throws Exception { + org.apache.pulsar.functions.proto.Function.ProcessingGuarantees processingGuarantees) throws Exception { FunctionDetails functionDetails = FunctionDetails.newBuilder() .setAutoAck(autoAck) .setProcessingGuarantees(processingGuarantees).build(); @@ -184,23 +191,90 @@ public class JavaInstanceRunnableTest { @Test public void testSinkConfigParsingPreservesOriginalType() throws Exception { - SinkSpecOrBuilder sinkSpec = mock(SinkSpecOrBuilder.class); - when(sinkSpec.getConfigs()).thenReturn("{\"ttl\": 9223372036854775807}"); - Map<String, Object> parsedConfig = - new ObjectMapper().readValue(sinkSpec.getConfigs(), new TypeReference<Map<String, Object>>() { - }); + final Map<String, Object> parsedConfig = JavaInstanceRunnable.parseComponentConfig( + "{\"ttl\": 9223372036854775807}", + new InstanceConfig(), + null, + FunctionDetails.ComponentType.SINK + ); Assert.assertEquals(parsedConfig.get("ttl").getClass(), Long.class); Assert.assertEquals(parsedConfig.get("ttl"), Long.MAX_VALUE); } @Test public void testSourceConfigParsingPreservesOriginalType() throws Exception { - SourceSpecOrBuilder sourceSpec = mock(SourceSpecOrBuilder.class); - when(sourceSpec.getConfigs()).thenReturn("{\"ttl\": 9223372036854775807}"); - Map<String, Object> parsedConfig = - new ObjectMapper().readValue(sourceSpec.getConfigs(), new TypeReference<Map<String, Object>>() { - }); + final Map<String, Object> parsedConfig = JavaInstanceRunnable.parseComponentConfig( + "{\"ttl\": 9223372036854775807}", + new InstanceConfig(), + null, + FunctionDetails.ComponentType.SOURCE + ); Assert.assertEquals(parsedConfig.get("ttl").getClass(), Long.class); Assert.assertEquals(parsedConfig.get("ttl"), Long.MAX_VALUE); } + + + public static class ConnectorTestConfig1 { + public String field1; + } + + @DataProvider(name = "configIgnoreUnknownFields") + public static Object[][] configIgnoreUnknownFields() { + return new Object[][]{ + {false, FunctionDetails.ComponentType.SINK}, + {true, FunctionDetails.ComponentType.SINK}, + {false, FunctionDetails.ComponentType.SOURCE}, + {true, FunctionDetails.ComponentType.SOURCE} + }; + } + + @Test(dataProvider = "configIgnoreUnknownFields") + public void testSinkConfigIgnoreUnknownFields(boolean ignoreUnknownConfigFields, + FunctionDetails.ComponentType type) throws Exception { + NarClassLoader narClassLoader = mock(NarClassLoader.class); + final ConnectorDefinition connectorDefinition = new ConnectorDefinition(); + if (type == FunctionDetails.ComponentType.SINK) { + connectorDefinition.setSinkConfigClass(ConnectorTestConfig1.class.getName()); + } else { + connectorDefinition.setSourceConfigClass(ConnectorTestConfig1.class.getName()); + } + when(narClassLoader.getServiceDefinition(any())).thenReturn(ObjectMapperFactory + .getMapper().writer().writeValueAsString(connectorDefinition)); + final InstanceConfig instanceConfig = new InstanceConfig(); + instanceConfig.setIgnoreUnknownConfigFields(ignoreUnknownConfigFields); + + final Map<String, Object> parsedConfig = JavaInstanceRunnable.parseComponentConfig( + "{\"field1\": \"value\", \"field2\": \"value2\"}", + instanceConfig, + narClassLoader, + type + ); + if (ignoreUnknownConfigFields) { + Assert.assertEquals(parsedConfig.size(), 1); + Assert.assertEquals(parsedConfig.get("field1"), "value"); + } else { + Assert.assertEquals(parsedConfig.size(), 2); + Assert.assertEquals(parsedConfig.get("field1"), "value"); + Assert.assertEquals(parsedConfig.get("field2"), "value2"); + } + } + + public static class ConnectorTestConfig2 { + public static int constantField = 1; + public String field1; + private long withGetter; + @JsonIgnore + private ConnectorTestConfig1 ignore; + + public long getWithGetter() { + return withGetter; + } + } + + @Test + public void testBeanPropertiesReader() throws Exception { + final List<String> beanProperties = JavaInstanceRunnable.BeanPropertiesReader + .getBeanProperties(ConnectorTestConfig2.class); + Assert.assertEquals(new TreeSet<>(beanProperties), new TreeSet<>(Arrays.asList("field1", "withGetter"))); + } } diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceStarter.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceStarter.java index deff690815d..c4f44be3df3 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceStarter.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceStarter.java @@ -150,6 +150,12 @@ public class JavaInstanceStarter implements AutoCloseable { + "exposed to function context, default is disabled.", required = false) public Boolean exposePulsarAdminClientEnabled = false; + @Parameter(names = "--ignore_unknown_config_fields", + description = "Whether to ignore unknown properties when deserializing the connector configuration.", + required = false) + public Boolean ignoreUnknownConfigFields = false; + + private Server server; private RuntimeSpawner runtimeSpawner; private ThreadRuntimeFactory containerFactory; @@ -177,6 +183,7 @@ public class JavaInstanceStarter implements AutoCloseable { instanceConfig.setClusterName(clusterName); instanceConfig.setMaxPendingAsyncRequests(maxPendingAsyncRequests); instanceConfig.setExposePulsarAdminClientEnabled(exposePulsarAdminClientEnabled); + instanceConfig.setIgnoreUnknownConfigFields(ignoreUnknownConfigFields); Function.FunctionDetails.Builder functionDetailsBuilder = Function.FunctionDetails.newBuilder(); if (functionDetailsJsonString.charAt(0) == '\'') { functionDetailsJsonString = functionDetailsJsonString.substring(1); diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java index 53ebfcbfaf0..5392697e928 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java @@ -435,10 +435,14 @@ public class RuntimeUtils { args.add("--metrics_port"); args.add(String.valueOf(instanceConfig.getMetricsPort())); - // only the Java instance supports --pending_async_requests right now. + // params supported only by the Java instance runtime. if (instanceConfig.getFunctionDetails().getRuntime() == Function.FunctionDetails.Runtime.JAVA) { args.add("--pending_async_requests"); args.add(String.valueOf(instanceConfig.getMaxPendingAsyncRequests())); + + if (instanceConfig.isIgnoreUnknownConfigFields()) { + args.add("--ignore_unknown_config_fields"); + } } // state storage configs diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java index 3b8ddf774d1..0ed73953d7a 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java @@ -738,6 +738,17 @@ public class WorkerConfig implements Serializable, PulsarConfiguration { ) private List<String> additionalJavaRuntimeArguments = new ArrayList<>(); + @FieldContext( + category = CATEGORY_CONNECTORS, + doc = "Whether to ignore unknown properties when deserializing the connector configuration. " + + "After upgrading a connector to a new version with a new configuration, " + + "the new configuration may not be compatible with the old connector. " + + "In case of rollback, it's required to also rollback the connector configuration. " + + "Ignoring unknown fields makes possible to keep the new configuration and " + + "only rollback the connector." + ) + private boolean ignoreUnknownConfigFields = false; + public String getFunctionMetadataTopic() { return String.format("persistent://%s/%s", pulsarFunctionsNamespace, functionMetadataTopicName); } diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java index c587a8a7348..03c6eb79218 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java @@ -221,6 +221,7 @@ public class FunctionActioner { if (workerConfig.getAdditionalJavaRuntimeArguments() != null) { instanceConfig.setAdditionalJavaRuntimeArguments(workerConfig.getAdditionalJavaRuntimeArguments()); } + instanceConfig.setIgnoreUnknownConfigFields(workerConfig.isIgnoreUnknownConfigFields()); return instanceConfig; } diff --git a/pulsar-io/alluxio/src/main/resources/META-INF/services/pulsar-io.yaml b/pulsar-io/alluxio/src/main/resources/META-INF/services/pulsar-io.yaml index 02314241f28..2fc04034f65 100644 --- a/pulsar-io/alluxio/src/main/resources/META-INF/services/pulsar-io.yaml +++ b/pulsar-io/alluxio/src/main/resources/META-INF/services/pulsar-io.yaml @@ -18,4 +18,5 @@ # name: alluxio description: Writes data into Alluxio -sinkClass: org.apache.pulsar.io.alluxio.sink.AlluxioSink \ No newline at end of file +sinkClass: org.apache.pulsar.io.alluxio.sink.AlluxioSink +sinkConfigClass: org.apache.pulsar.io.alluxio.sink.AlluxioSinkConfig \ No newline at end of file diff --git a/pulsar-io/dynamodb/src/main/resources/META-INF/services/pulsar-io.yaml b/pulsar-io/dynamodb/src/main/resources/META-INF/services/pulsar-io.yaml index dd6ba976adb..50cad346b3e 100644 --- a/pulsar-io/dynamodb/src/main/resources/META-INF/services/pulsar-io.yaml +++ b/pulsar-io/dynamodb/src/main/resources/META-INF/services/pulsar-io.yaml @@ -20,3 +20,4 @@ name: dynamodb description: DynamoDB connectors sourceClass: org.apache.pulsar.io.dynamodb.DynamoDBSource +sourceConfigClass: org.apache.pulsar.io.dynamodb.DynamoDBSourceConfig diff --git a/pulsar-io/jdbc/clickhouse/src/main/resources/META-INF/services/pulsar-io.yaml b/pulsar-io/jdbc/clickhouse/src/main/resources/META-INF/services/pulsar-io.yaml index 1b45638dd4d..4b80907ee86 100644 --- a/pulsar-io/jdbc/clickhouse/src/main/resources/META-INF/services/pulsar-io.yaml +++ b/pulsar-io/jdbc/clickhouse/src/main/resources/META-INF/services/pulsar-io.yaml @@ -20,3 +20,4 @@ name: jdbc-clickhouse description: JDBC sink for ClickHouse sinkClass: org.apache.pulsar.io.jdbc.ClickHouseJdbcAutoSchemaSink +sinkConfigClass: org.apache.pulsar.io.jdbc.JdbcSinkConfig diff --git a/pulsar-io/jdbc/mariadb/src/main/resources/META-INF/services/pulsar-io.yaml b/pulsar-io/jdbc/mariadb/src/main/resources/META-INF/services/pulsar-io.yaml index 463c8cba162..81dd7277e1f 100644 --- a/pulsar-io/jdbc/mariadb/src/main/resources/META-INF/services/pulsar-io.yaml +++ b/pulsar-io/jdbc/mariadb/src/main/resources/META-INF/services/pulsar-io.yaml @@ -20,3 +20,4 @@ name: jdbc-mariadb description: JDBC sink for MariaDB sinkClass: org.apache.pulsar.io.jdbc.MariadbJdbcAutoSchemaSink +sinkConfigClass: org.apache.pulsar.io.jdbc.JdbcSinkConfig diff --git a/pulsar-io/jdbc/openmldb/src/main/resources/META-INF/services/pulsar-io.yaml b/pulsar-io/jdbc/openmldb/src/main/resources/META-INF/services/pulsar-io.yaml index f6262df6997..38c120f0893 100644 --- a/pulsar-io/jdbc/openmldb/src/main/resources/META-INF/services/pulsar-io.yaml +++ b/pulsar-io/jdbc/openmldb/src/main/resources/META-INF/services/pulsar-io.yaml @@ -20,3 +20,4 @@ name: jdbc-openmldb description: JDBC sink for OpenMLDB sinkClass: org.apache.pulsar.io.jdbc.OpenMLDBJdbcAutoSchemaSink +sinkConfigClass: org.apache.pulsar.io.jdbc.JdbcSinkConfig diff --git a/pulsar-io/jdbc/postgres/src/main/resources/META-INF/services/pulsar-io.yaml b/pulsar-io/jdbc/postgres/src/main/resources/META-INF/services/pulsar-io.yaml index a8d192a4568..282ac8a0230 100644 --- a/pulsar-io/jdbc/postgres/src/main/resources/META-INF/services/pulsar-io.yaml +++ b/pulsar-io/jdbc/postgres/src/main/resources/META-INF/services/pulsar-io.yaml @@ -20,3 +20,4 @@ name: jdbc-postgres description: JDBC sink for PostgreSQL sinkClass: org.apache.pulsar.io.jdbc.PostgresJdbcAutoSchemaSink +sinkConfigClass: org.apache.pulsar.io.jdbc.JdbcSinkConfig \ No newline at end of file