This is an automated email from the ASF dual-hosted git repository.

nicoloboschi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new f7c0b3c49c9 [improve][fn] Allow unknown fields in connectors config 
(#20116)
f7c0b3c49c9 is described below

commit f7c0b3c49c9ad8c28d0b00aa30d727850eb8bc04
Author: Nicolò Boschi <boschi1...@gmail.com>
AuthorDate: Wed Apr 26 17:36:56 2023 +0200

    [improve][fn] Allow unknown fields in connectors config (#20116)
---
 conf/functions_worker.yml                          |   6 ++
 .../pulsar/functions/instance/InstanceConfig.java  |   1 +
 .../functions/instance/JavaInstanceRunnable.java   |  97 +++++++++++++++++--
 .../instance/JavaInstanceRunnableTest.java         | 104 ++++++++++++++++++---
 .../functions/runtime/JavaInstanceStarter.java     |   7 ++
 .../pulsar/functions/runtime/RuntimeUtils.java     |   6 +-
 .../pulsar/functions/worker/WorkerConfig.java      |  11 +++
 .../pulsar/functions/worker/FunctionActioner.java  |   1 +
 .../resources/META-INF/services/pulsar-io.yaml     |   3 +-
 .../resources/META-INF/services/pulsar-io.yaml     |   1 +
 .../resources/META-INF/services/pulsar-io.yaml     |   1 +
 .../resources/META-INF/services/pulsar-io.yaml     |   1 +
 .../resources/META-INF/services/pulsar-io.yaml     |   1 +
 .../resources/META-INF/services/pulsar-io.yaml     |   1 +
 14 files changed, 217 insertions(+), 24 deletions(-)

diff --git a/conf/functions_worker.yml b/conf/functions_worker.yml
index bb15e0ca416..4c5b6aab1b7 100644
--- a/conf/functions_worker.yml
+++ b/conf/functions_worker.yml
@@ -407,6 +407,12 @@ validateConnectorConfig: false
 # If it is set to true, you must ensure that it has been initialized by 
"bin/pulsar initialize-cluster-metadata" command.
 initializedDlogMetadata: false
 
+# Whether to ignore unknown properties when deserializing the connector 
configuration.
+# After upgrading a connector to a new version with a new configuration, the 
new configuration may not be compatible with the old connector.
+# In case of rollback, it's required to also rollback the connector 
configuration.
+# Ignoring unknown fields makes possible to keep the new configuration and 
only rollback the connector.
+ignoreUnknownConfigFields: false
+
 ###########################
 # Arbitrary Configuration
 ###########################
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceConfig.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceConfig.java
index 1a89505d9bb..fcee6d734d6 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceConfig.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceConfig.java
@@ -48,6 +48,7 @@ public class InstanceConfig {
     private boolean exposePulsarAdminClientEnabled = false;
     private int metricsPort;
     private List<String> additionalJavaRuntimeArguments = 
Collections.emptyList();
+    private boolean ignoreUnknownConfigFields;
 
     /**
      * Get the string representation of {@link #getInstanceId()}.
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index e2ad9e4c989..c3f36f754da 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -19,13 +19,20 @@
 package org.apache.pulsar.functions.instance;
 
 import static 
org.apache.pulsar.functions.utils.FunctionCommon.convertFromFunctionDetailsSubscriptionPosition;
+import com.fasterxml.jackson.core.JsonParser;
 import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.DeserializationConfig;
+import com.fasterxml.jackson.databind.DeserializationContext;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.deser.BeanDeserializer;
 import com.google.common.annotations.VisibleForTesting;
 import com.scurrilous.circe.checksum.Crc32cIntChecksum;
 import io.netty.buffer.ByteBuf;
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.TreeMap;
@@ -34,6 +41,7 @@ import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.function.Consumer;
 import lombok.Getter;
+import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
 import net.jodah.typetools.TypeResolver;
 import org.apache.commons.lang3.StringUtils;
@@ -59,6 +67,7 @@ import org.apache.pulsar.client.impl.schema.ProtobufSchema;
 import org.apache.pulsar.common.functions.ConsumerConfig;
 import org.apache.pulsar.common.functions.FunctionConfig;
 import org.apache.pulsar.common.functions.ProducerConfig;
+import org.apache.pulsar.common.nar.NarClassLoader;
 import org.apache.pulsar.common.protocol.schema.SchemaVersion;
 import org.apache.pulsar.common.schema.KeyValue;
 import org.apache.pulsar.common.schema.KeyValueEncodingType;
@@ -94,6 +103,7 @@ import 
org.apache.pulsar.functions.source.SingleConsumerPulsarSourceConfig;
 import org.apache.pulsar.functions.source.batch.BatchSourceExecutor;
 import org.apache.pulsar.functions.utils.CryptoUtils;
 import org.apache.pulsar.functions.utils.FunctionCommon;
+import org.apache.pulsar.functions.utils.io.ConnectorUtils;
 import org.apache.pulsar.io.core.Sink;
 import org.apache.pulsar.io.core.Source;
 import org.slf4j.Logger;
@@ -855,10 +865,7 @@ public class JavaInstanceRunnable implements 
AutoCloseable, Runnable {
             if (sourceSpec.getConfigs().isEmpty()) {
                 this.source.open(new HashMap<>(), contextImpl);
             } else {
-                this.source.open(
-                        ObjectMapperFactory.getMapper().reader().forType(new 
TypeReference<Map<String, Object>>() {
-                        }).readValue(sourceSpec.getConfigs())
-                        , contextImpl);
+                
this.source.open(parseComponentConfig(sourceSpec.getConfigs()), contextImpl);
             }
             if (this.source instanceof PulsarSource) {
                 contextImpl.setInputConsumers(((PulsarSource) 
this.source).getInputConsumers());
@@ -870,6 +877,83 @@ public class JavaInstanceRunnable implements 
AutoCloseable, Runnable {
             
Thread.currentThread().setContextClassLoader(this.instanceClassLoader);
         }
     }
+    private Map<String, Object> parseComponentConfig(String connectorConfigs) 
throws IOException {
+        return parseComponentConfig(connectorConfigs, instanceConfig, 
componentClassLoader, componentType);
+    }
+
+    static Map<String, Object> parseComponentConfig(String connectorConfigs,
+                                                    InstanceConfig 
instanceConfig,
+                                                    ClassLoader 
componentClassLoader,
+                                                    
org.apache.pulsar.functions.proto.Function
+                                                            
.FunctionDetails.ComponentType componentType)
+            throws IOException {
+        final Map<String, Object> config = ObjectMapperFactory
+                .getMapper()
+                .reader()
+                .forType(new TypeReference<Map<String, Object>>() {})
+                .readValue(connectorConfigs);
+        if (instanceConfig.isIgnoreUnknownConfigFields() && 
componentClassLoader instanceof NarClassLoader) {
+            final String configClassName;
+            if (componentType == 
org.apache.pulsar.functions.proto.Function.FunctionDetails.ComponentType.SOURCE)
 {
+                configClassName = ConnectorUtils
+                        .getConnectorDefinition((NarClassLoader) 
componentClassLoader).getSourceConfigClass();
+            } else if (componentType == 
org.apache.pulsar.functions.proto.Function.FunctionDetails.ComponentType.SINK) {
+                configClassName =  ConnectorUtils
+                        .getConnectorDefinition((NarClassLoader) 
componentClassLoader).getSinkConfigClass();
+            } else {
+                return config;
+            }
+            if (configClassName != null) {
+
+                Class<?> configClass;
+                try {
+                    configClass = Class.forName(configClassName,
+                            true, 
Thread.currentThread().getContextClassLoader());
+                } catch (ClassNotFoundException e) {
+                    throw new RuntimeException("Config class not found: " + 
configClassName, e);
+                }
+                final List<String> allFields = 
BeanPropertiesReader.getBeanProperties(configClass);
+
+                for (String s : config.keySet()) {
+                    if (!allFields.contains(s)) {
+                        log.error("Field '{}' not defined in the {} 
configuration {}, the field will be ignored",
+                                s,
+                                componentType,
+                                configClass);
+                        config.remove(s);
+                    }
+                }
+            }
+        }
+        return config;
+    }
+
+    static final class BeanPropertiesReader {
+
+        private static final MapperBeanReader reader = new MapperBeanReader();
+
+        private static final class MapperBeanReader extends ObjectMapper {
+            @SneakyThrows
+            List<String> getBeanProperties(Class<?> valueType) {
+                final JsonParser parser = ObjectMapperFactory
+                        .getMapper()
+                        .getObjectMapper()
+                        .createParser("");
+                DeserializationConfig config = getDeserializationConfig();
+                DeserializationContext ctxt = 
createDeserializationContext(parser, config);
+                BeanDeserializer deser = (BeanDeserializer)
+                        _findRootDeserializer(ctxt, 
_typeFactory.constructType(valueType));
+                List<String> list = new ArrayList<>();
+                deser.properties().forEachRemaining(p -> 
list.add(p.getName()));
+                return list;
+            }
+        }
+
+        static List<String> getBeanProperties(Class<?> valueType) {
+            return reader.getBeanProperties(valueType);
+        }
+    }
+
 
     private void setupOutput(ContextImpl contextImpl) throws Exception {
 
@@ -940,9 +1024,8 @@ public class JavaInstanceRunnable implements 
AutoCloseable, Runnable {
                     log.debug("Opening Sink with SinkSpec {} and contextImpl: 
{} ", sinkSpec,
                             contextImpl.toString());
                 }
-                
this.sink.open(ObjectMapperFactory.getMapper().reader().forType(
-                        new TypeReference<Map<String, Object>>() {
-                        }).readValue(sinkSpec.getConfigs()), contextImpl);
+                final Map<String, Object> config = 
parseComponentConfig(sinkSpec.getConfigs());
+                this.sink.open(config, contextImpl);
             }
         } catch (Exception e) {
             log.error("Sink open produced uncaught exception: ", e);
diff --git 
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java
 
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java
index a36a3ca62d1..5fea8bcc9fd 100644
--- 
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java
+++ 
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java
@@ -18,19 +18,27 @@
  */
 package org.apache.pulsar.functions.instance;
 
+import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.clearInvocations;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
-import com.fasterxml.jackson.core.type.TypeReference;
-import com.fasterxml.jackson.databind.ObjectMapper;
+
 import java.lang.reflect.Field;
 import java.lang.reflect.Method;
+import java.util.Arrays;
+import java.util.List;
 import java.util.Map;
+import java.util.TreeSet;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
 import lombok.Getter;
 import lombok.Setter;
 import org.apache.pulsar.client.api.ClientBuilder;
+import org.apache.pulsar.common.io.ConnectorDefinition;
+import org.apache.pulsar.common.nar.NarClassLoader;
+import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.apache.pulsar.functions.api.Context;
 import org.apache.pulsar.functions.api.Function;
 import org.apache.pulsar.functions.api.Record;
@@ -38,11 +46,10 @@ import org.apache.pulsar.functions.api.SerDe;
 import org.apache.pulsar.functions.instance.stats.ComponentStatsManager;
 import org.apache.pulsar.functions.proto.Function.FunctionDetails;
 import org.apache.pulsar.functions.proto.Function.SinkSpec;
-import org.apache.pulsar.functions.proto.Function.SinkSpecOrBuilder;
-import org.apache.pulsar.functions.proto.Function.SourceSpecOrBuilder;
 import org.apache.pulsar.functions.proto.InstanceCommunication;
 import org.jetbrains.annotations.NotNull;
 import org.testng.Assert;
+import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
 public class JavaInstanceRunnableTest {
@@ -159,7 +166,7 @@ public class JavaInstanceRunnableTest {
 
     @NotNull
     private JavaInstanceRunnable getJavaInstanceRunnable(boolean autoAck,
-            org.apache.pulsar.functions.proto.Function.ProcessingGuarantees 
processingGuarantees) throws Exception {
+                                                         
org.apache.pulsar.functions.proto.Function.ProcessingGuarantees 
processingGuarantees) throws Exception {
         FunctionDetails functionDetails = FunctionDetails.newBuilder()
                 .setAutoAck(autoAck)
                 .setProcessingGuarantees(processingGuarantees).build();
@@ -184,23 +191,90 @@ public class JavaInstanceRunnableTest {
 
     @Test
     public void testSinkConfigParsingPreservesOriginalType() throws Exception {
-        SinkSpecOrBuilder sinkSpec = mock(SinkSpecOrBuilder.class);
-        when(sinkSpec.getConfigs()).thenReturn("{\"ttl\": 
9223372036854775807}");
-        Map<String, Object> parsedConfig =
-                new ObjectMapper().readValue(sinkSpec.getConfigs(), new 
TypeReference<Map<String, Object>>() {
-                });
+        final Map<String, Object> parsedConfig = 
JavaInstanceRunnable.parseComponentConfig(
+                "{\"ttl\": 9223372036854775807}",
+                new InstanceConfig(),
+                null,
+                FunctionDetails.ComponentType.SINK
+        );
         Assert.assertEquals(parsedConfig.get("ttl").getClass(), Long.class);
         Assert.assertEquals(parsedConfig.get("ttl"), Long.MAX_VALUE);
     }
 
     @Test
     public void testSourceConfigParsingPreservesOriginalType() throws 
Exception {
-        SourceSpecOrBuilder sourceSpec = mock(SourceSpecOrBuilder.class);
-        when(sourceSpec.getConfigs()).thenReturn("{\"ttl\": 
9223372036854775807}");
-        Map<String, Object> parsedConfig =
-                new ObjectMapper().readValue(sourceSpec.getConfigs(), new 
TypeReference<Map<String, Object>>() {
-                });
+        final Map<String, Object> parsedConfig = 
JavaInstanceRunnable.parseComponentConfig(
+                "{\"ttl\": 9223372036854775807}",
+                new InstanceConfig(),
+                null,
+                FunctionDetails.ComponentType.SOURCE
+        );
         Assert.assertEquals(parsedConfig.get("ttl").getClass(), Long.class);
         Assert.assertEquals(parsedConfig.get("ttl"), Long.MAX_VALUE);
     }
+
+
+    public static class ConnectorTestConfig1 {
+        public String field1;
+    }
+
+    @DataProvider(name = "configIgnoreUnknownFields")
+    public static Object[][] configIgnoreUnknownFields() {
+        return new Object[][]{
+                {false, FunctionDetails.ComponentType.SINK},
+                {true, FunctionDetails.ComponentType.SINK},
+                {false, FunctionDetails.ComponentType.SOURCE},
+                {true, FunctionDetails.ComponentType.SOURCE}
+        };
+    }
+
+    @Test(dataProvider = "configIgnoreUnknownFields")
+    public void testSinkConfigIgnoreUnknownFields(boolean 
ignoreUnknownConfigFields,
+                                                  
FunctionDetails.ComponentType type) throws Exception {
+        NarClassLoader narClassLoader = mock(NarClassLoader.class);
+        final ConnectorDefinition connectorDefinition = new 
ConnectorDefinition();
+        if (type == FunctionDetails.ComponentType.SINK) {
+            
connectorDefinition.setSinkConfigClass(ConnectorTestConfig1.class.getName());
+        } else {
+            
connectorDefinition.setSourceConfigClass(ConnectorTestConfig1.class.getName());
+        }
+        
when(narClassLoader.getServiceDefinition(any())).thenReturn(ObjectMapperFactory
+                .getMapper().writer().writeValueAsString(connectorDefinition));
+        final InstanceConfig instanceConfig = new InstanceConfig();
+        instanceConfig.setIgnoreUnknownConfigFields(ignoreUnknownConfigFields);
+
+        final Map<String, Object> parsedConfig = 
JavaInstanceRunnable.parseComponentConfig(
+                "{\"field1\": \"value\", \"field2\": \"value2\"}",
+                instanceConfig,
+                narClassLoader,
+                type
+        );
+        if (ignoreUnknownConfigFields) {
+            Assert.assertEquals(parsedConfig.size(), 1);
+            Assert.assertEquals(parsedConfig.get("field1"), "value");
+        } else {
+            Assert.assertEquals(parsedConfig.size(), 2);
+            Assert.assertEquals(parsedConfig.get("field1"), "value");
+            Assert.assertEquals(parsedConfig.get("field2"), "value2");
+        }
+    }
+
+    public static class ConnectorTestConfig2 {
+        public static int constantField = 1;
+        public String field1;
+        private long withGetter;
+        @JsonIgnore
+        private ConnectorTestConfig1 ignore;
+
+        public long getWithGetter() {
+            return withGetter;
+        }
+    }
+
+    @Test
+    public void testBeanPropertiesReader() throws Exception {
+        final List<String> beanProperties = 
JavaInstanceRunnable.BeanPropertiesReader
+                .getBeanProperties(ConnectorTestConfig2.class);
+        Assert.assertEquals(new TreeSet<>(beanProperties), new 
TreeSet<>(Arrays.asList("field1", "withGetter")));
+    }
 }
diff --git 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceStarter.java
 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceStarter.java
index deff690815d..c4f44be3df3 100644
--- 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceStarter.java
+++ 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceStarter.java
@@ -150,6 +150,12 @@ public class JavaInstanceStarter implements AutoCloseable {
             + "exposed to function context, default is disabled.", required = 
false)
     public Boolean exposePulsarAdminClientEnabled = false;
 
+    @Parameter(names = "--ignore_unknown_config_fields",
+            description = "Whether to ignore unknown properties when 
deserializing the connector configuration.",
+            required = false)
+    public Boolean ignoreUnknownConfigFields = false;
+
+
     private Server server;
     private RuntimeSpawner runtimeSpawner;
     private ThreadRuntimeFactory containerFactory;
@@ -177,6 +183,7 @@ public class JavaInstanceStarter implements AutoCloseable {
         instanceConfig.setClusterName(clusterName);
         instanceConfig.setMaxPendingAsyncRequests(maxPendingAsyncRequests);
         
instanceConfig.setExposePulsarAdminClientEnabled(exposePulsarAdminClientEnabled);
+        instanceConfig.setIgnoreUnknownConfigFields(ignoreUnknownConfigFields);
         Function.FunctionDetails.Builder functionDetailsBuilder = 
Function.FunctionDetails.newBuilder();
         if (functionDetailsJsonString.charAt(0) == '\'') {
             functionDetailsJsonString = functionDetailsJsonString.substring(1);
diff --git 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
index 53ebfcbfaf0..5392697e928 100644
--- 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
+++ 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
@@ -435,10 +435,14 @@ public class RuntimeUtils {
         args.add("--metrics_port");
         args.add(String.valueOf(instanceConfig.getMetricsPort()));
 
-        // only the Java instance supports --pending_async_requests right now.
+        // params supported only by the Java instance runtime.
         if (instanceConfig.getFunctionDetails().getRuntime() == 
Function.FunctionDetails.Runtime.JAVA) {
             args.add("--pending_async_requests");
             
args.add(String.valueOf(instanceConfig.getMaxPendingAsyncRequests()));
+
+            if (instanceConfig.isIgnoreUnknownConfigFields()) {
+                args.add("--ignore_unknown_config_fields");
+            }
         }
 
         // state storage configs
diff --git 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
index 3b8ddf774d1..0ed73953d7a 100644
--- 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
+++ 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
@@ -738,6 +738,17 @@ public class WorkerConfig implements Serializable, 
PulsarConfiguration {
     )
     private List<String> additionalJavaRuntimeArguments = new ArrayList<>();
 
+    @FieldContext(
+            category = CATEGORY_CONNECTORS,
+            doc = "Whether to ignore unknown properties when deserializing the 
connector configuration. "
+                    + "After upgrading a connector to a new version with a new 
configuration, "
+                    + "the new configuration may not be compatible with the 
old connector. "
+                    + "In case of rollback, it's required to also rollback the 
connector configuration. "
+                    + "Ignoring unknown fields makes possible to keep the new 
configuration and "
+                    + "only rollback the connector."
+    )
+    private boolean ignoreUnknownConfigFields = false;
+
     public String getFunctionMetadataTopic() {
         return String.format("persistent://%s/%s", pulsarFunctionsNamespace, 
functionMetadataTopicName);
     }
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
index c587a8a7348..03c6eb79218 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
@@ -221,6 +221,7 @@ public class FunctionActioner {
         if (workerConfig.getAdditionalJavaRuntimeArguments() != null) {
             
instanceConfig.setAdditionalJavaRuntimeArguments(workerConfig.getAdditionalJavaRuntimeArguments());
         }
+        
instanceConfig.setIgnoreUnknownConfigFields(workerConfig.isIgnoreUnknownConfigFields());
         return instanceConfig;
     }
 
diff --git 
a/pulsar-io/alluxio/src/main/resources/META-INF/services/pulsar-io.yaml 
b/pulsar-io/alluxio/src/main/resources/META-INF/services/pulsar-io.yaml
index 02314241f28..2fc04034f65 100644
--- a/pulsar-io/alluxio/src/main/resources/META-INF/services/pulsar-io.yaml
+++ b/pulsar-io/alluxio/src/main/resources/META-INF/services/pulsar-io.yaml
@@ -18,4 +18,5 @@
 #
 name: alluxio
 description: Writes data into Alluxio
-sinkClass: org.apache.pulsar.io.alluxio.sink.AlluxioSink
\ No newline at end of file
+sinkClass: org.apache.pulsar.io.alluxio.sink.AlluxioSink
+sinkConfigClass: org.apache.pulsar.io.alluxio.sink.AlluxioSinkConfig
\ No newline at end of file
diff --git 
a/pulsar-io/dynamodb/src/main/resources/META-INF/services/pulsar-io.yaml 
b/pulsar-io/dynamodb/src/main/resources/META-INF/services/pulsar-io.yaml
index dd6ba976adb..50cad346b3e 100644
--- a/pulsar-io/dynamodb/src/main/resources/META-INF/services/pulsar-io.yaml
+++ b/pulsar-io/dynamodb/src/main/resources/META-INF/services/pulsar-io.yaml
@@ -20,3 +20,4 @@
 name: dynamodb
 description: DynamoDB connectors
 sourceClass: org.apache.pulsar.io.dynamodb.DynamoDBSource
+sourceConfigClass: org.apache.pulsar.io.dynamodb.DynamoDBSourceConfig
diff --git 
a/pulsar-io/jdbc/clickhouse/src/main/resources/META-INF/services/pulsar-io.yaml 
b/pulsar-io/jdbc/clickhouse/src/main/resources/META-INF/services/pulsar-io.yaml
index 1b45638dd4d..4b80907ee86 100644
--- 
a/pulsar-io/jdbc/clickhouse/src/main/resources/META-INF/services/pulsar-io.yaml
+++ 
b/pulsar-io/jdbc/clickhouse/src/main/resources/META-INF/services/pulsar-io.yaml
@@ -20,3 +20,4 @@
 name: jdbc-clickhouse
 description: JDBC sink for ClickHouse
 sinkClass: org.apache.pulsar.io.jdbc.ClickHouseJdbcAutoSchemaSink
+sinkConfigClass: org.apache.pulsar.io.jdbc.JdbcSinkConfig
diff --git 
a/pulsar-io/jdbc/mariadb/src/main/resources/META-INF/services/pulsar-io.yaml 
b/pulsar-io/jdbc/mariadb/src/main/resources/META-INF/services/pulsar-io.yaml
index 463c8cba162..81dd7277e1f 100644
--- a/pulsar-io/jdbc/mariadb/src/main/resources/META-INF/services/pulsar-io.yaml
+++ b/pulsar-io/jdbc/mariadb/src/main/resources/META-INF/services/pulsar-io.yaml
@@ -20,3 +20,4 @@
 name: jdbc-mariadb
 description: JDBC sink for MariaDB
 sinkClass: org.apache.pulsar.io.jdbc.MariadbJdbcAutoSchemaSink
+sinkConfigClass: org.apache.pulsar.io.jdbc.JdbcSinkConfig
diff --git 
a/pulsar-io/jdbc/openmldb/src/main/resources/META-INF/services/pulsar-io.yaml 
b/pulsar-io/jdbc/openmldb/src/main/resources/META-INF/services/pulsar-io.yaml
index f6262df6997..38c120f0893 100644
--- 
a/pulsar-io/jdbc/openmldb/src/main/resources/META-INF/services/pulsar-io.yaml
+++ 
b/pulsar-io/jdbc/openmldb/src/main/resources/META-INF/services/pulsar-io.yaml
@@ -20,3 +20,4 @@
 name: jdbc-openmldb
 description: JDBC sink for OpenMLDB
 sinkClass: org.apache.pulsar.io.jdbc.OpenMLDBJdbcAutoSchemaSink
+sinkConfigClass: org.apache.pulsar.io.jdbc.JdbcSinkConfig
diff --git 
a/pulsar-io/jdbc/postgres/src/main/resources/META-INF/services/pulsar-io.yaml 
b/pulsar-io/jdbc/postgres/src/main/resources/META-INF/services/pulsar-io.yaml
index a8d192a4568..282ac8a0230 100644
--- 
a/pulsar-io/jdbc/postgres/src/main/resources/META-INF/services/pulsar-io.yaml
+++ 
b/pulsar-io/jdbc/postgres/src/main/resources/META-INF/services/pulsar-io.yaml
@@ -20,3 +20,4 @@
 name: jdbc-postgres
 description: JDBC sink for PostgreSQL
 sinkClass: org.apache.pulsar.io.jdbc.PostgresJdbcAutoSchemaSink
+sinkConfigClass: org.apache.pulsar.io.jdbc.JdbcSinkConfig
\ No newline at end of file

Reply via email to