This is an automated email from the ASF dual-hosted git repository.

panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 630948191ce Remove 
PipelineDataSourceConfiguration.appendJDBCQueryProperties and add 
JdbcQueryPropertiesExtension (#18047)
630948191ce is described below

commit 630948191cec7f9cb7b233b366b46dbafc73aa19
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Sun May 29 17:02:45 2022 +0800

    Remove PipelineDataSourceConfiguration.appendJDBCQueryProperties and add 
JdbcQueryPropertiesExtension (#18047)
---
 .../config/PipelineDataSourceConfiguration.java    |  9 -----
 ...rdingSpherePipelineDataSourceConfiguration.java | 35 ++++++++++------
 .../StandardPipelineDataSourceConfiguration.java   | 21 +++++++---
 .../data/pipeline/api/job/JobOperationType.java    |  2 +-
 .../datasource/JdbcQueryPropertiesExtension.java}  | 19 +++++++--
 .../JdbcQueryPropertiesExtensionFactory.java       | 46 ++++++++++++++++++++++
 ...gSpherePipelineDataSourceConfigurationTest.java | 36 ++++++++---------
 ...tandardPipelineDataSourceConfigurationTest.java | 14 +------
 .../check/consistency/DataConsistencyChecker.java  | 11 ------
 .../datasource/AbstractDataSourcePreparer.java     |  1 +
 .../rulealtered/RuleAlteredJobPreparer.java        |  2 -
 .../scenario/rulealtered/RuleAlteredJobWorker.java | 25 ------------
 .../MySQLJdbcQueryPropertiesExtension.java}        | 29 ++++++++------
 .../pipeline/mysql/importer/MySQLImporter.java     |  5 ---
 .../mysql/ingest/MySQLInventoryDumper.java         |  4 --
 ...ine.spi.datasource.JdbcQueryPropertiesExtension | 18 +++++++++
 .../OpenGaussJdbcQueryPropertiesExtension.java}    | 22 +++++++++--
 ...ine.spi.datasource.JdbcQueryPropertiesExtension | 18 +++++++++
 .../PostgreSQLJdbcQueryPropertiesExtension.java}   | 22 +++++++++--
 ...ine.spi.datasource.JdbcQueryPropertiesExtension | 18 +++++++++
 20 files changed, 225 insertions(+), 132 deletions(-)

diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/datasource/config/PipelineDataSourceConfiguration.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/datasource/config/PipelineDataSourceConfiguration.java
index 4bd03a5e9ff..b852e3fac39 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/datasource/config/PipelineDataSourceConfiguration.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/datasource/config/PipelineDataSourceConfiguration.java
@@ -19,8 +19,6 @@ package 
org.apache.shardingsphere.data.pipeline.api.datasource.config;
 
 import org.apache.shardingsphere.infra.database.type.DatabaseType;
 
-import java.util.Properties;
-
 /**
  * Pipeline data source configuration.
  */
@@ -47,13 +45,6 @@ public interface PipelineDataSourceConfiguration {
      */
     Object getDataSourceConfiguration();
     
-    /**
-     * Append JDBC queryProps.
-     *
-     * @param queryProps JDBC query properties
-     */
-    void appendJDBCQueryProperties(Properties queryProps);
-    
     /**
      * Get database type.
      *
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/datasource/config/impl/ShardingSpherePipelineDataSourceConfiguration.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/datasource/config/impl/ShardingSpherePipelineDataSourceConfiguration.java
index d69a5584944..3f2204a2f00 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/datasource/config/impl/ShardingSpherePipelineDataSourceConfiguration.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/datasource/config/impl/ShardingSpherePipelineDataSourceConfiguration.java
@@ -24,6 +24,8 @@ import lombok.Getter;
 import lombok.NoArgsConstructor;
 import lombok.Setter;
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.spi.datasource.JdbcQueryPropertiesExtension;
+import 
org.apache.shardingsphere.data.pipeline.spi.datasource.JdbcQueryPropertiesExtensionFactory;
 import org.apache.shardingsphere.infra.database.metadata.url.JdbcUrlAppender;
 import org.apache.shardingsphere.infra.database.type.DatabaseType;
 import org.apache.shardingsphere.infra.database.type.DatabaseTypeEngine;
@@ -36,6 +38,7 @@ import java.util.Collection;
 import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Properties;
 
 /**
@@ -58,14 +61,16 @@ public final class 
ShardingSpherePipelineDataSourceConfiguration implements Pipe
         rootConfig = YamlEngine.unmarshal(parameter, 
YamlRootConfiguration.class, true);
         Map<String, Object> props = 
rootConfig.getDataSources().values().iterator().next();
         databaseType = DatabaseTypeEngine.getDatabaseType(getJdbcUrl(props));
+        appendJdbcQueryProperties(databaseType.getType());
     }
     
     public ShardingSpherePipelineDataSourceConfiguration(final 
YamlRootConfiguration rootConfig) {
-        YamlParameterConfiguration parameterConfig = new 
YamlParameterConfiguration(rootConfig.getDatabaseName(), 
rootConfig.getDataSources(), rootConfig.getRules());
+        YamlParameterConfiguration parameterConfig = new 
YamlParameterConfiguration(rootConfig.getDataSources(), rootConfig.getRules());
         this.parameter = YamlEngine.marshal(parameterConfig);
         this.rootConfig = rootConfig;
         Map<String, Object> props = 
rootConfig.getDataSources().values().iterator().next();
         databaseType = DatabaseTypeEngine.getDatabaseType(getJdbcUrl(props));
+        appendJdbcQueryProperties(databaseType.getType());
     }
     
     private String getJdbcUrl(final Map<String, Object> props) {
@@ -74,6 +79,22 @@ public final class 
ShardingSpherePipelineDataSourceConfiguration implements Pipe
         return result.toString();
     }
     
+    private void appendJdbcQueryProperties(final String databaseType) {
+        Optional<JdbcQueryPropertiesExtension> extension = 
JdbcQueryPropertiesExtensionFactory.getInstance(databaseType);
+        if (!extension.isPresent()) {
+            return;
+        }
+        Properties queryProps = extension.get().extendQueryProperties();
+        if (queryProps.isEmpty()) {
+            return;
+        }
+        rootConfig.getDataSources()
+                .forEach((key, value) -> {
+                    String jdbcUrlKey = value.containsKey("url") ? "url" : 
"jdbcUrl";
+                    value.replace(jdbcUrlKey, new 
JdbcUrlAppender().appendQueryProperties(value.get(jdbcUrlKey).toString(), 
queryProps));
+                });
+    }
+    
     @Override
     public String getType() {
         return TYPE;
@@ -84,21 +105,13 @@ public final class 
ShardingSpherePipelineDataSourceConfiguration implements Pipe
         return rootConfig;
     }
     
-    @Override
-    public void appendJDBCQueryProperties(final Properties queryProps) {
-        rootConfig.getDataSources()
-                .forEach((key, value) -> {
-                    String jdbcUrlKey = value.containsKey("url") ? "url" : 
"jdbcUrl";
-                    value.replace(jdbcUrlKey, new 
JdbcUrlAppender().appendQueryProperties(value.get(jdbcUrlKey).toString(), 
queryProps));
-                });
-    }
-    
     /**
      * Get actual data source configuration.
      *
      * @param actualDataSourceName actual data source name
      * @return actual data source configuration
      */
+    // TODO the invocation is disabled for now, it might be used again for 
next new feature
     public StandardPipelineDataSourceConfiguration 
getActualDataSourceConfig(final String actualDataSourceName) {
         Map<String, Object> yamlDataSourceConfig = 
rootConfig.getDataSources().get(actualDataSourceName);
         Preconditions.checkNotNull(yamlDataSourceConfig, "actualDataSourceName 
'{}' does not exist", actualDataSourceName);
@@ -114,8 +127,6 @@ public final class 
ShardingSpherePipelineDataSourceConfiguration implements Pipe
     @Setter
     private static class YamlParameterConfiguration implements 
YamlConfiguration {
         
-        private String databaseName;
-        
         private Map<String, Map<String, Object>> dataSources = new HashMap<>();
         
         private Collection<YamlRuleConfiguration> rules = new LinkedList<>();
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/datasource/config/impl/StandardPipelineDataSourceConfiguration.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/datasource/config/impl/StandardPipelineDataSourceConfiguration.java
index 1af802884d4..5c51fe29de7 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/datasource/config/impl/StandardPipelineDataSourceConfiguration.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/datasource/config/impl/StandardPipelineDataSourceConfiguration.java
@@ -21,6 +21,8 @@ import lombok.EqualsAndHashCode;
 import lombok.Getter;
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.yaml.YamlJdbcConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.spi.datasource.JdbcQueryPropertiesExtension;
+import 
org.apache.shardingsphere.data.pipeline.spi.datasource.JdbcQueryPropertiesExtensionFactory;
 import org.apache.shardingsphere.infra.database.metadata.url.JdbcUrlAppender;
 import org.apache.shardingsphere.infra.database.type.DatabaseType;
 import org.apache.shardingsphere.infra.database.type.DatabaseTypeEngine;
@@ -31,6 +33,7 @@ import org.apache.shardingsphere.infra.yaml.engine.YamlEngine;
 
 import java.util.LinkedHashMap;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Properties;
 
 /**
@@ -72,6 +75,7 @@ public final class StandardPipelineDataSourceConfiguration 
implements PipelineDa
         yamlConfig.remove(DATA_SOURCE_CLASS_NAME);
         jdbcConfig = YamlEngine.unmarshal(YamlEngine.marshal(yamlConfig), 
YamlJdbcConfiguration.class, true);
         databaseType = 
DatabaseTypeEngine.getDatabaseType(jdbcConfig.getJdbcUrl());
+        appendJdbcQueryProperties(databaseType.getType());
     }
     
     public StandardPipelineDataSourceConfiguration(final String jdbcUrl, final 
String username, final String password) {
@@ -87,6 +91,18 @@ public final class StandardPipelineDataSourceConfiguration 
implements PipelineDa
         return result;
     }
     
+    private void appendJdbcQueryProperties(final String databaseType) {
+        Optional<JdbcQueryPropertiesExtension> extension = 
JdbcQueryPropertiesExtensionFactory.getInstance(databaseType);
+        if (!extension.isPresent()) {
+            return;
+        }
+        Properties queryProps = extension.get().extendQueryProperties();
+        if (queryProps.isEmpty()) {
+            return;
+        }
+        jdbcConfig.setJdbcUrl(new 
JdbcUrlAppender().appendQueryProperties(jdbcConfig.getJdbcUrl(), queryProps));
+    }
+    
     @Override
     public String getType() {
         return TYPE;
@@ -97,10 +113,5 @@ public final class StandardPipelineDataSourceConfiguration 
implements PipelineDa
         return dataSourceProperties;
     }
     
-    @Override
-    public void appendJDBCQueryProperties(final Properties queryProps) {
-        jdbcConfig.setJdbcUrl(new 
JdbcUrlAppender().appendQueryProperties(jdbcConfig.getJdbcUrl(), queryProps));
-    }
-    
     // TODO toShardingSphereJDBCDataSource(final String actualDataSourceName, 
final String logicTableName, final String actualTableName)
 }
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/JobOperationType.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/JobOperationType.java
index bb02d209927..62a8f79bc0c 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/JobOperationType.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/JobOperationType.java
@@ -22,5 +22,5 @@ package org.apache.shardingsphere.data.pipeline.api.job;
  */
 public enum JobOperationType {
     
-    INSERT, DELETE, UPDATE, SELECT, SYSTEM_LOAD, CPU_USAGE
+    INSERT, DELETE, UPDATE, SELECT,
 }
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/JobOperationType.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/datasource/JdbcQueryPropertiesExtension.java
similarity index 63%
copy from 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/JobOperationType.java
copy to 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/datasource/JdbcQueryPropertiesExtension.java
index bb02d209927..b3301a0b07a 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/JobOperationType.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/datasource/JdbcQueryPropertiesExtension.java
@@ -15,12 +15,23 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.api.job;
+package org.apache.shardingsphere.data.pipeline.spi.datasource;
+
+import org.apache.shardingsphere.spi.annotation.SingletonSPI;
+import org.apache.shardingsphere.spi.type.typed.TypedSPI;
+
+import java.util.Properties;
 
 /**
- * Job operation type.
+ * JDBC query properties extension.
  */
-public enum JobOperationType {
+@SingletonSPI
+public interface JdbcQueryPropertiesExtension extends TypedSPI {
     
-    INSERT, DELETE, UPDATE, SELECT, SYSTEM_LOAD, CPU_USAGE
+    /**
+     * Extend query properties.
+     *
+     * @return JDBC query properties for extension
+     */
+    Properties extendQueryProperties();
 }
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/datasource/JdbcQueryPropertiesExtensionFactory.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/datasource/JdbcQueryPropertiesExtensionFactory.java
new file mode 100644
index 00000000000..42fccdd5e6c
--- /dev/null
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/datasource/JdbcQueryPropertiesExtensionFactory.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.spi.datasource;
+
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
+import org.apache.shardingsphere.spi.ShardingSphereServiceLoader;
+import org.apache.shardingsphere.spi.type.typed.TypedSPIRegistry;
+
+import java.util.Optional;
+
+/**
+ * JDBC query properties extension factory.
+ */
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
+public final class JdbcQueryPropertiesExtensionFactory {
+    
+    static {
+        
ShardingSphereServiceLoader.register(JdbcQueryPropertiesExtension.class);
+    }
+    
+    /**
+     * Get instance of JDBC query properties extension.
+     *
+     * @param databaseType database type
+     * @return got instance
+     */
+    public static Optional<JdbcQueryPropertiesExtension> getInstance(final 
String databaseType) {
+        return 
TypedSPIRegistry.findRegisteredService(JdbcQueryPropertiesExtension.class, 
databaseType);
+    }
+}
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/test/java/org/apache/shardingsphere/data/pipeline/api/datasource/config/impl/ShardingSpherePipelineDataSourceConfigurationTest.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/test/java/org/apache/shardingsphere/data/pipeline/api/datasource/config/impl/ShardingSpherePipelineDataSourceConfigurationTest.java
index c522bb89e19..2da52c21aa0 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/test/java/org/apache/shardingsphere/data/pipeline/api/datasource/config/impl/ShardingSpherePipelineDataSourceConfigurationTest.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/test/java/org/apache/shardingsphere/data/pipeline/api/datasource/config/impl/ShardingSpherePipelineDataSourceConfigurationTest.java
@@ -17,30 +17,32 @@
 
 package org.apache.shardingsphere.data.pipeline.api.datasource.config.impl;
 
-import org.apache.shardingsphere.infra.datasource.props.DataSourceProperties;
-import 
org.apache.shardingsphere.infra.yaml.config.swapper.YamlDataSourceConfigurationSwapper;
+import org.apache.shardingsphere.infra.yaml.config.pojo.YamlRootConfiguration;
 import org.junit.Test;
 
-import java.util.LinkedList;
-import java.util.List;
 import java.util.Map;
-import java.util.Properties;
-import java.util.LinkedHashMap;
 
+import static org.hamcrest.CoreMatchers.instanceOf;
 import static org.hamcrest.CoreMatchers.is;
 import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
 
 public final class ShardingSpherePipelineDataSourceConfigurationTest {
     
     @Test
-    public void assertAppendJDBCParameters() {
-        ShardingSpherePipelineDataSourceConfiguration dataSourceConfig = new 
ShardingSpherePipelineDataSourceConfiguration(getDataSourceYaml());
-        Properties queryProps = new Properties();
-        queryProps.setProperty("rewriteBatchedStatements", 
Boolean.TRUE.toString());
-        dataSourceConfig.appendJDBCQueryProperties(queryProps);
-        List<DataSourceProperties> actual = new 
LinkedList<>(getDataSourcePropertiesMap(dataSourceConfig.getRootConfig().getDataSources()).values());
-        assertThat(actual.get(0).getAllLocalProperties().get("jdbcUrl"), 
is("jdbc:mysql://192.168.0.2:3306/scaling?serverTimezone=UTC&useSSL=false&rewriteBatchedStatements=true"));
-        assertThat(actual.get(1).getAllLocalProperties().get("jdbcUrl"), 
is("jdbc:mysql://192.168.0.1:3306/scaling?serverTimezone=UTC&useSSL=false&rewriteBatchedStatements=true"));
+    public void assertCreate() {
+        ShardingSpherePipelineDataSourceConfiguration actual = new 
ShardingSpherePipelineDataSourceConfiguration(getDataSourceYaml());
+        assertGetConfig(actual);
+    }
+    
+    private void assertGetConfig(final 
ShardingSpherePipelineDataSourceConfiguration actual) {
+        assertThat(actual.getDatabaseType().getType(), is("MySQL"));
+        assertThat(actual.getType(), 
is(ShardingSpherePipelineDataSourceConfiguration.TYPE));
+        assertThat(actual.getDataSourceConfiguration(), 
instanceOf(YamlRootConfiguration.class));
+        Map<String, Map<String, Object>> dataSources = 
actual.getRootConfig().getDataSources();
+        assertThat(dataSources.size(), is(2));
+        assertTrue(dataSources.containsKey("ds_0"));
+        assertTrue(dataSources.containsKey("ds_1"));
     }
     
     private String getDataSourceYaml() {
@@ -52,10 +54,4 @@ public final class 
ShardingSpherePipelineDataSourceConfigurationTest {
                 + "    dataSourceClassName: 
com.zaxxer.hikari.HikariDataSource\n"
                 + "    url: 
jdbc:mysql://192.168.0.1:3306/scaling?serverTimezone=UTC&useSSL=false\n";
     }
-    
-    private static Map<String, DataSourceProperties> 
getDataSourcePropertiesMap(final Map<String, Map<String, Object>> 
yamlDataSourceConfigs) {
-        Map<String, DataSourceProperties> result = new 
LinkedHashMap<>(yamlDataSourceConfigs.size());
-        yamlDataSourceConfigs.forEach((key, value) -> result.put(key, new 
YamlDataSourceConfigurationSwapper().swapToDataSourceProperties(value)));
-        return result;
-    }
 }
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/test/java/org/apache/shardingsphere/data/pipeline/api/datasource/config/impl/StandardPipelineDataSourceConfigurationTest.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/test/java/org/apache/shardingsphere/data/pipeline/api/datasource/config/impl/StandardPipelineDataSourceConfigurationTest.java
index 56cc562842f..6d3a402b8d2 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/test/java/org/apache/shardingsphere/data/pipeline/api/datasource/config/impl/StandardPipelineDataSourceConfigurationTest.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/test/java/org/apache/shardingsphere/data/pipeline/api/datasource/config/impl/StandardPipelineDataSourceConfigurationTest.java
@@ -21,15 +21,11 @@ import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.yaml.YamlJd
 import org.apache.shardingsphere.infra.datasource.props.DataSourceProperties;
 import org.junit.Test;
 
-import java.util.Properties;
-
 import static org.hamcrest.CoreMatchers.is;
 import static org.junit.Assert.assertThat;
 
 public final class StandardPipelineDataSourceConfigurationTest {
     
-    private static final String NEW_JDBC_URL = 
"jdbc:mysql://127.0.0.1:3306/demo_ds?serverTimezone=UTC&useSSL=true";
-    
     private static final String JDBC_URL = 
"jdbc:mysql://127.0.0.1:3306/demo_ds?serverTimezone=UTC&useSSL=false";
     
     private static final String USERNAME = "userName";
@@ -42,12 +38,11 @@ public final class 
StandardPipelineDataSourceConfigurationTest {
         assertGetConfig(actual);
         actual = new 
StandardPipelineDataSourceConfiguration(actual.getParameter());
         assertGetConfig(actual);
-        assertAppendJDBCQueryProperties(actual);
     }
     
     private void assertGetConfig(final StandardPipelineDataSourceConfiguration 
actual) {
         assertThat(actual.getDatabaseType().getType(), is("MySQL"));
-        assertThat(actual.getType(), is("JDBC"));
+        assertThat(actual.getType(), 
is(StandardPipelineDataSourceConfiguration.TYPE));
         assertThat(((DataSourceProperties) 
actual.getDataSourceConfiguration()).getDataSourceClassName(), 
is("com.zaxxer.hikari.HikariDataSource"));
         assertGetJdbcConfig(actual.getJdbcConfig());
     }
@@ -57,11 +52,4 @@ public final class 
StandardPipelineDataSourceConfigurationTest {
         assertThat(actual.getUsername(), is(USERNAME));
         assertThat(actual.getPassword(), is(PASSWORD));
     }
-    
-    private void assertAppendJDBCQueryProperties(final 
StandardPipelineDataSourceConfiguration actual) {
-        Properties props = new Properties();
-        props.setProperty("useSSL", Boolean.TRUE.toString());
-        actual.appendJDBCQueryProperties(props);
-        assertThat(actual.getJdbcConfig().getJdbcUrl(), is(NEW_JDBC_URL));
-    }
 }
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyChecker.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyChecker.java
index a140c2fcb6d..e58a79a9317 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyChecker.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyChecker.java
@@ -36,7 +36,6 @@ import 
org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJ
 import 
org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCalculateAlgorithm;
 import 
org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
 import org.apache.shardingsphere.infra.database.type.DatabaseType;
-import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType;
 import 
org.apache.shardingsphere.infra.executor.kernel.thread.ExecutorThreadFactoryBuilder;
 import 
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
 import 
org.apache.shardingsphere.infra.metadata.database.schema.decorator.model.ShardingSphereSchema;
@@ -56,7 +55,6 @@ import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Objects;
-import java.util.Properties;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
@@ -205,7 +203,6 @@ public final class DataConsistencyChecker {
     
     private void decoratePipelineDataSourceConfiguration(final 
DataConsistencyCalculateAlgorithm calculator, final 
PipelineDataSourceConfiguration dataSourceConfig) {
         checkDatabaseTypeSupported(calculator.getSupportedDatabaseTypes(), 
dataSourceConfig.getDatabaseType().getType());
-        addMySQLDataSourceConfig(dataSourceConfig);
     }
     
     private void checkDatabaseTypeSupported(final Collection<String> 
supportedDatabaseTypes, final String databaseType) {
@@ -214,14 +211,6 @@ public final class DataConsistencyChecker {
         }
     }
     
-    private void addMySQLDataSourceConfig(final 
PipelineDataSourceConfiguration dataSourceConfig) {
-        if (dataSourceConfig.getDatabaseType().getType().equalsIgnoreCase(new 
MySQLDatabaseType().getType())) {
-            Properties queryProps = new Properties();
-            queryProps.setProperty("yearIsDateType", Boolean.FALSE.toString());
-            dataSourceConfig.appendJDBCQueryProperties(queryProps);
-        }
-    }
-    
     private ShardingSphereTable getTableMetaData(final String databaseName, 
final String logicTableName) {
         ContextManager contextManager = PipelineContext.getContextManager();
         Preconditions.checkNotNull(contextManager, "ContextManager null");
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/AbstractDataSourcePreparer.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/AbstractDataSourcePreparer.java
index 0436ecd08aa..bdc8667cdc9 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/AbstractDataSourcePreparer.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/AbstractDataSourcePreparer.java
@@ -100,6 +100,7 @@ public abstract class AbstractDataSourcePreparer implements 
DataSourcePreparer {
         return result;
     }
     
+    // TODO the invocation is disabled for now, it might be used again for 
next new feature
     protected final PipelineDataSourceWrapper getSourceCachedDataSource(final 
RuleAlteredJobConfiguration jobConfig, final PipelineDataSourceManager 
dataSourceManager) {
         return 
dataSourceManager.getDataSource(PipelineDataSourceConfigurationFactory.newInstance(jobConfig.getSource().getType(),
 jobConfig.getSource().getParameter()));
     }
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPreparer.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPreparer.java
index 358392f8f64..6455944a0fe 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPreparer.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPreparer.java
@@ -75,8 +75,6 @@ public final class RuleAlteredJobPreparer {
      * @param jobContext job context
      */
     public void prepare(final RuleAlteredJobContext jobContext) {
-        // TODO Initialize source and target data source after tasks 
initialization, since dumper and importer constructor might call 
appendJDBCQueryProperties.
-        // But InventoryTaskSplitter need to check target tables. It need to 
do some refactoring for appendJDBCQueryProperties vocations.
         checkSourceDataSource(jobContext);
         if (jobContext.isStopping()) {
             throw new PipelineJobPrepareFailedException("Job stopping, jobId=" 
+ jobContext.getJobId());
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorker.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorker.java
index fbccab73eec..d65e651de40 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorker.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorker.java
@@ -42,10 +42,6 @@ import 
org.apache.shardingsphere.data.pipeline.spi.rulealtered.RuleAlteredDetect
 import 
org.apache.shardingsphere.data.pipeline.spi.rulealtered.RuleAlteredJobConfigurationPreparerFactory;
 import org.apache.shardingsphere.infra.config.RuleConfiguration;
 import 
org.apache.shardingsphere.infra.config.rulealtered.OnRuleAlteredActionConfiguration;
-import org.apache.shardingsphere.infra.database.metadata.url.JdbcUrlAppender;
-import org.apache.shardingsphere.infra.database.type.DatabaseType;
-import org.apache.shardingsphere.infra.database.type.DatabaseTypeEngine;
-import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType;
 import org.apache.shardingsphere.infra.eventbus.ShardingSphereEventBus;
 import org.apache.shardingsphere.infra.lock.LockContext;
 import org.apache.shardingsphere.infra.yaml.config.pojo.YamlRootConfiguration;
@@ -63,7 +59,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Optional;
-import java.util.Properties;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Function;
 import java.util.stream.Collectors;
@@ -246,32 +241,12 @@ public final class RuleAlteredJobWorker {
         YamlRootConfiguration result = new YamlRootConfiguration();
         result.setDatabaseName(databaseName);
         Map<String, Map<String, Object>> yamlDataSources = 
YamlEngine.unmarshal(dataSources, Map.class);
-        disableSSLForMySQL(yamlDataSources);
         result.setDataSources(yamlDataSources);
         Collection<YamlRuleConfiguration> yamlRuleConfigs = 
YamlEngine.unmarshal(rules, Collection.class, true);
         result.setRules(yamlRuleConfigs);
         return result;
     }
     
-    private void disableSSLForMySQL(final Map<String, Map<String, Object>> 
yamlDataSources) {
-        Map<String, Object> firstDataSourceProps = 
yamlDataSources.entrySet().iterator().next().getValue();
-        String jdbcUrlKey = firstDataSourceProps.containsKey("url") ? "url" : 
"jdbcUrl";
-        String jdbcUrl = (String) firstDataSourceProps.get(jdbcUrlKey);
-        if (null == jdbcUrl) {
-            log.warn("disableSSLForMySQL, could not get jdbcUrl, 
jdbcUrlKey={}", jdbcUrlKey);
-            return;
-        }
-        DatabaseType databaseType = 
DatabaseTypeEngine.getDatabaseType(jdbcUrl);
-        if (!(databaseType instanceof MySQLDatabaseType)) {
-            return;
-        }
-        Properties queryProps = new Properties();
-        queryProps.setProperty("useSSL", Boolean.FALSE.toString());
-        for (Entry<String, Map<String, Object>> entry : 
yamlDataSources.entrySet()) {
-            entry.getValue().put(jdbcUrlKey, new 
JdbcUrlAppender().appendQueryProperties((String) 
entry.getValue().get(jdbcUrlKey), queryProps));
-        }
-    }
-    
     /**
      * Build task configuration.
      *
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/importer/MySQLImporter.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/datasource/MySQLJdbcQueryPropertiesExtension.java
similarity index 51%
copy from 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/importer/MySQLImporter.java
copy to 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/datasource/MySQLJdbcQueryPropertiesExtension.java
index c0b295a962e..8f7401f62bd 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/importer/MySQLImporter.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/datasource/MySQLJdbcQueryPropertiesExtension.java
@@ -15,29 +15,32 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.mysql.importer;
+package org.apache.shardingsphere.data.pipeline.mysql.datasource;
 
-import 
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.ImporterConfiguration;
-import 
org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
-import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
-import org.apache.shardingsphere.data.pipeline.core.importer.AbstractImporter;
+import 
org.apache.shardingsphere.data.pipeline.spi.datasource.JdbcQueryPropertiesExtension;
 
 import java.util.Properties;
 
 /**
- * MySQL importer.
+ * MySQL JDBC query properties extension.
  */
-public final class MySQLImporter extends AbstractImporter {
+public final class MySQLJdbcQueryPropertiesExtension implements 
JdbcQueryPropertiesExtension {
     
-    public MySQLImporter(final ImporterConfiguration importerConfig, final 
PipelineDataSourceManager dataSourceManager, final PipelineChannel channel) {
-        super(importerConfig, dataSourceManager, channel);
-        Properties queryProps = new Properties();
+    private final Properties queryProps = new Properties();
+    
+    public MySQLJdbcQueryPropertiesExtension() {
+        queryProps.setProperty("useSSL", Boolean.FALSE.toString());
         queryProps.setProperty("rewriteBatchedStatements", 
Boolean.TRUE.toString());
-        
importerConfig.getDataSourceConfig().appendJDBCQueryProperties(queryProps);
+        queryProps.setProperty("yearIsDateType", Boolean.FALSE.toString());
+    }
+    
+    @Override
+    public Properties extendQueryProperties() {
+        return queryProps;
     }
     
     @Override
-    protected String getSchemaName(final String logicTableName) {
-        return null;
+    public String getType() {
+        return "MySQL";
     }
 }
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/importer/MySQLImporter.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/importer/MySQLImporter.java
index c0b295a962e..843c6448057 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/importer/MySQLImporter.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/importer/MySQLImporter.java
@@ -22,8 +22,6 @@ import 
org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChanne
 import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
 import org.apache.shardingsphere.data.pipeline.core.importer.AbstractImporter;
 
-import java.util.Properties;
-
 /**
  * MySQL importer.
  */
@@ -31,9 +29,6 @@ public final class MySQLImporter extends AbstractImporter {
     
     public MySQLImporter(final ImporterConfiguration importerConfig, final 
PipelineDataSourceManager dataSourceManager, final PipelineChannel channel) {
         super(importerConfig, dataSourceManager, channel);
-        Properties queryProps = new Properties();
-        queryProps.setProperty("rewriteBatchedStatements", 
Boolean.TRUE.toString());
-        
importerConfig.getDataSourceConfig().appendJDBCQueryProperties(queryProps);
     }
     
     @Override
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLInventoryDumper.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLInventoryDumper.java
index 5a22aec8962..0fbfb35ca87 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLInventoryDumper.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLInventoryDumper.java
@@ -28,7 +28,6 @@ import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Types;
-import java.util.Properties;
 
 /**
  * MySQL JDBC Dumper.
@@ -40,9 +39,6 @@ public final class MySQLInventoryDumper extends 
AbstractInventoryDumper {
     public MySQLInventoryDumper(final InventoryDumperConfiguration 
inventoryDumperConfig, final PipelineChannel channel,
                                 final DataSource dataSource, final 
PipelineTableMetaDataLoader metaDataLoader) {
         super(inventoryDumperConfig, channel, dataSource, metaDataLoader);
-        Properties queryProps = new Properties();
-        queryProps.setProperty("yearIsDateType", Boolean.FALSE.toString());
-        
inventoryDumperConfig.getDataSourceConfig().appendJDBCQueryProperties(queryProps);
     }
     
     @Override
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.datasource.JdbcQueryPropertiesExtension
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.datasource.JdbcQueryProperties
 [...]
new file mode 100644
index 00000000000..91f401b95df
--- /dev/null
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.datasource.JdbcQueryPropertiesExtension
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+org.apache.shardingsphere.data.pipeline.mysql.datasource.MySQLJdbcQueryPropertiesExtension
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/JobOperationType.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/datasource/OpenGaussJdbcQueryPropertiesExtension.java
similarity index 58%
copy from 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/JobOperationType.java
copy to 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/datasource/OpenGaussJdbcQueryPropertiesExtension.java
index bb02d209927..c2952349a32 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/JobOperationType.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/datasource/OpenGaussJdbcQueryPropertiesExtension.java
@@ -15,12 +15,26 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.api.job;
+package org.apache.shardingsphere.data.pipeline.opengauss.datasource;
+
+import 
org.apache.shardingsphere.data.pipeline.spi.datasource.JdbcQueryPropertiesExtension;
+
+import java.util.Properties;
 
 /**
- * Job operation type.
+ * JDBC query properties extension of openGauss.
  */
-public enum JobOperationType {
+public final class OpenGaussJdbcQueryPropertiesExtension implements 
JdbcQueryPropertiesExtension {
+    
+    private final Properties queryProps = new Properties();
+    
+    @Override
+    public Properties extendQueryProperties() {
+        return queryProps;
+    }
     
-    INSERT, DELETE, UPDATE, SELECT, SYSTEM_LOAD, CPU_USAGE
+    @Override
+    public String getType() {
+        return "openGauss";
+    }
 }
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.datasource.JdbcQueryPropertiesExtension
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.datasource.JdbcQueryPr
 [...]
new file mode 100644
index 00000000000..402db058030
--- /dev/null
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.datasource.JdbcQueryPropertiesExtension
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+org.apache.shardingsphere.data.pipeline.opengauss.datasource.OpenGaussJdbcQueryPropertiesExtension
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/JobOperationType.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/datasource/PostgreSQLJdbcQueryPropertiesExtension.java
similarity index 58%
copy from 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/JobOperationType.java
copy to 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/datasource/PostgreSQLJdbcQueryPropertiesExtension.java
index bb02d209927..96e639dde87 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/JobOperationType.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/datasource/PostgreSQLJdbcQueryPropertiesExtension.java
@@ -15,12 +15,26 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.api.job;
+package org.apache.shardingsphere.data.pipeline.postgresql.datasource;
+
+import 
org.apache.shardingsphere.data.pipeline.spi.datasource.JdbcQueryPropertiesExtension;
+
+import java.util.Properties;
 
 /**
- * Job operation type.
+ * PostgreSQL JDBC query properties extension.
  */
-public enum JobOperationType {
+public final class PostgreSQLJdbcQueryPropertiesExtension implements 
JdbcQueryPropertiesExtension {
+    
+    private final Properties queryProps = new Properties();
+    
+    @Override
+    public Properties extendQueryProperties() {
+        return queryProps;
+    }
     
-    INSERT, DELETE, UPDATE, SELECT, SYSTEM_LOAD, CPU_USAGE
+    @Override
+    public String getType() {
+        return "PostgreSQL";
+    }
 }
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.datasource.JdbcQueryPropertiesExtension
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.datasource.JdbcQuery
 [...]
new file mode 100644
index 00000000000..4d9c8aeb216
--- /dev/null
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.datasource.JdbcQueryPropertiesExtension
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+org.apache.shardingsphere.data.pipeline.postgresql.datasource.PostgreSQLJdbcQueryPropertiesExtension

Reply via email to