This is an automated email from the ASF dual-hosted git repository.
panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 630948191ce Remove
PipelineDataSourceConfiguration.appendJDBCQueryProperties and add
JdbcQueryPropertiesExtension (#18047)
630948191ce is described below
commit 630948191cec7f9cb7b233b366b46dbafc73aa19
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Sun May 29 17:02:45 2022 +0800
Remove PipelineDataSourceConfiguration.appendJDBCQueryProperties and add
JdbcQueryPropertiesExtension (#18047)
---
.../config/PipelineDataSourceConfiguration.java | 9 -----
...rdingSpherePipelineDataSourceConfiguration.java | 35 ++++++++++------
.../StandardPipelineDataSourceConfiguration.java | 21 +++++++---
.../data/pipeline/api/job/JobOperationType.java | 2 +-
.../datasource/JdbcQueryPropertiesExtension.java} | 19 +++++++--
.../JdbcQueryPropertiesExtensionFactory.java | 46 ++++++++++++++++++++++
...gSpherePipelineDataSourceConfigurationTest.java | 36 ++++++++---------
...tandardPipelineDataSourceConfigurationTest.java | 14 +------
.../check/consistency/DataConsistencyChecker.java | 11 ------
.../datasource/AbstractDataSourcePreparer.java | 1 +
.../rulealtered/RuleAlteredJobPreparer.java | 2 -
.../scenario/rulealtered/RuleAlteredJobWorker.java | 25 ------------
.../MySQLJdbcQueryPropertiesExtension.java} | 29 ++++++++------
.../pipeline/mysql/importer/MySQLImporter.java | 5 ---
.../mysql/ingest/MySQLInventoryDumper.java | 4 --
...ine.spi.datasource.JdbcQueryPropertiesExtension | 18 +++++++++
.../OpenGaussJdbcQueryPropertiesExtension.java} | 22 +++++++++--
...ine.spi.datasource.JdbcQueryPropertiesExtension | 18 +++++++++
.../PostgreSQLJdbcQueryPropertiesExtension.java} | 22 +++++++++--
...ine.spi.datasource.JdbcQueryPropertiesExtension | 18 +++++++++
20 files changed, 225 insertions(+), 132 deletions(-)
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/datasource/config/PipelineDataSourceConfiguration.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/datasource/config/PipelineDataSourceConfiguration.java
index 4bd03a5e9ff..b852e3fac39 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/datasource/config/PipelineDataSourceConfiguration.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/datasource/config/PipelineDataSourceConfiguration.java
@@ -19,8 +19,6 @@ package
org.apache.shardingsphere.data.pipeline.api.datasource.config;
import org.apache.shardingsphere.infra.database.type.DatabaseType;
-import java.util.Properties;
-
/**
* Pipeline data source configuration.
*/
@@ -47,13 +45,6 @@ public interface PipelineDataSourceConfiguration {
*/
Object getDataSourceConfiguration();
- /**
- * Append JDBC queryProps.
- *
- * @param queryProps JDBC query properties
- */
- void appendJDBCQueryProperties(Properties queryProps);
-
/**
* Get database type.
*
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/datasource/config/impl/ShardingSpherePipelineDataSourceConfiguration.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/datasource/config/impl/ShardingSpherePipelineDataSourceConfiguration.java
index d69a5584944..3f2204a2f00 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/datasource/config/impl/ShardingSpherePipelineDataSourceConfiguration.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/datasource/config/impl/ShardingSpherePipelineDataSourceConfiguration.java
@@ -24,6 +24,8 @@ import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
+import
org.apache.shardingsphere.data.pipeline.spi.datasource.JdbcQueryPropertiesExtension;
+import
org.apache.shardingsphere.data.pipeline.spi.datasource.JdbcQueryPropertiesExtensionFactory;
import org.apache.shardingsphere.infra.database.metadata.url.JdbcUrlAppender;
import org.apache.shardingsphere.infra.database.type.DatabaseType;
import org.apache.shardingsphere.infra.database.type.DatabaseTypeEngine;
@@ -36,6 +38,7 @@ import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.Map;
+import java.util.Optional;
import java.util.Properties;
/**
@@ -58,14 +61,16 @@ public final class
ShardingSpherePipelineDataSourceConfiguration implements Pipe
rootConfig = YamlEngine.unmarshal(parameter,
YamlRootConfiguration.class, true);
Map<String, Object> props =
rootConfig.getDataSources().values().iterator().next();
databaseType = DatabaseTypeEngine.getDatabaseType(getJdbcUrl(props));
+ appendJdbcQueryProperties(databaseType.getType());
}
public ShardingSpherePipelineDataSourceConfiguration(final
YamlRootConfiguration rootConfig) {
- YamlParameterConfiguration parameterConfig = new
YamlParameterConfiguration(rootConfig.getDatabaseName(),
rootConfig.getDataSources(), rootConfig.getRules());
+ YamlParameterConfiguration parameterConfig = new
YamlParameterConfiguration(rootConfig.getDataSources(), rootConfig.getRules());
this.parameter = YamlEngine.marshal(parameterConfig);
this.rootConfig = rootConfig;
Map<String, Object> props =
rootConfig.getDataSources().values().iterator().next();
databaseType = DatabaseTypeEngine.getDatabaseType(getJdbcUrl(props));
+ appendJdbcQueryProperties(databaseType.getType());
}
private String getJdbcUrl(final Map<String, Object> props) {
@@ -74,6 +79,22 @@ public final class
ShardingSpherePipelineDataSourceConfiguration implements Pipe
return result.toString();
}
+ private void appendJdbcQueryProperties(final String databaseType) {
+ Optional<JdbcQueryPropertiesExtension> extension =
JdbcQueryPropertiesExtensionFactory.getInstance(databaseType);
+ if (!extension.isPresent()) {
+ return;
+ }
+ Properties queryProps = extension.get().extendQueryProperties();
+ if (queryProps.isEmpty()) {
+ return;
+ }
+ rootConfig.getDataSources()
+ .forEach((key, value) -> {
+ String jdbcUrlKey = value.containsKey("url") ? "url" :
"jdbcUrl";
+ value.replace(jdbcUrlKey, new
JdbcUrlAppender().appendQueryProperties(value.get(jdbcUrlKey).toString(),
queryProps));
+ });
+ }
+
@Override
public String getType() {
return TYPE;
@@ -84,21 +105,13 @@ public final class
ShardingSpherePipelineDataSourceConfiguration implements Pipe
return rootConfig;
}
- @Override
- public void appendJDBCQueryProperties(final Properties queryProps) {
- rootConfig.getDataSources()
- .forEach((key, value) -> {
- String jdbcUrlKey = value.containsKey("url") ? "url" :
"jdbcUrl";
- value.replace(jdbcUrlKey, new
JdbcUrlAppender().appendQueryProperties(value.get(jdbcUrlKey).toString(),
queryProps));
- });
- }
-
/**
* Get actual data source configuration.
*
* @param actualDataSourceName actual data source name
* @return actual data source configuration
*/
+ // TODO the invocation is disabled for now, it might be used again for
next new feature
public StandardPipelineDataSourceConfiguration
getActualDataSourceConfig(final String actualDataSourceName) {
Map<String, Object> yamlDataSourceConfig =
rootConfig.getDataSources().get(actualDataSourceName);
Preconditions.checkNotNull(yamlDataSourceConfig, "actualDataSourceName
'{}' does not exist", actualDataSourceName);
@@ -114,8 +127,6 @@ public final class
ShardingSpherePipelineDataSourceConfiguration implements Pipe
@Setter
private static class YamlParameterConfiguration implements
YamlConfiguration {
- private String databaseName;
-
private Map<String, Map<String, Object>> dataSources = new HashMap<>();
private Collection<YamlRuleConfiguration> rules = new LinkedList<>();
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/datasource/config/impl/StandardPipelineDataSourceConfiguration.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/datasource/config/impl/StandardPipelineDataSourceConfiguration.java
index 1af802884d4..5c51fe29de7 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/datasource/config/impl/StandardPipelineDataSourceConfiguration.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/datasource/config/impl/StandardPipelineDataSourceConfiguration.java
@@ -21,6 +21,8 @@ import lombok.EqualsAndHashCode;
import lombok.Getter;
import
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.datasource.config.yaml.YamlJdbcConfiguration;
+import
org.apache.shardingsphere.data.pipeline.spi.datasource.JdbcQueryPropertiesExtension;
+import
org.apache.shardingsphere.data.pipeline.spi.datasource.JdbcQueryPropertiesExtensionFactory;
import org.apache.shardingsphere.infra.database.metadata.url.JdbcUrlAppender;
import org.apache.shardingsphere.infra.database.type.DatabaseType;
import org.apache.shardingsphere.infra.database.type.DatabaseTypeEngine;
@@ -31,6 +33,7 @@ import org.apache.shardingsphere.infra.yaml.engine.YamlEngine;
import java.util.LinkedHashMap;
import java.util.Map;
+import java.util.Optional;
import java.util.Properties;
/**
@@ -72,6 +75,7 @@ public final class StandardPipelineDataSourceConfiguration
implements PipelineDa
yamlConfig.remove(DATA_SOURCE_CLASS_NAME);
jdbcConfig = YamlEngine.unmarshal(YamlEngine.marshal(yamlConfig),
YamlJdbcConfiguration.class, true);
databaseType =
DatabaseTypeEngine.getDatabaseType(jdbcConfig.getJdbcUrl());
+ appendJdbcQueryProperties(databaseType.getType());
}
public StandardPipelineDataSourceConfiguration(final String jdbcUrl, final
String username, final String password) {
@@ -87,6 +91,18 @@ public final class StandardPipelineDataSourceConfiguration
implements PipelineDa
return result;
}
+ private void appendJdbcQueryProperties(final String databaseType) {
+ Optional<JdbcQueryPropertiesExtension> extension =
JdbcQueryPropertiesExtensionFactory.getInstance(databaseType);
+ if (!extension.isPresent()) {
+ return;
+ }
+ Properties queryProps = extension.get().extendQueryProperties();
+ if (queryProps.isEmpty()) {
+ return;
+ }
+ jdbcConfig.setJdbcUrl(new
JdbcUrlAppender().appendQueryProperties(jdbcConfig.getJdbcUrl(), queryProps));
+ }
+
@Override
public String getType() {
return TYPE;
@@ -97,10 +113,5 @@ public final class StandardPipelineDataSourceConfiguration
implements PipelineDa
return dataSourceProperties;
}
- @Override
- public void appendJDBCQueryProperties(final Properties queryProps) {
- jdbcConfig.setJdbcUrl(new
JdbcUrlAppender().appendQueryProperties(jdbcConfig.getJdbcUrl(), queryProps));
- }
-
// TODO toShardingSphereJDBCDataSource(final String actualDataSourceName,
final String logicTableName, final String actualTableName)
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/JobOperationType.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/JobOperationType.java
index bb02d209927..62a8f79bc0c 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/JobOperationType.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/JobOperationType.java
@@ -22,5 +22,5 @@ package org.apache.shardingsphere.data.pipeline.api.job;
*/
public enum JobOperationType {
- INSERT, DELETE, UPDATE, SELECT, SYSTEM_LOAD, CPU_USAGE
+ INSERT, DELETE, UPDATE, SELECT,
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/JobOperationType.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/datasource/JdbcQueryPropertiesExtension.java
similarity index 63%
copy from
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/JobOperationType.java
copy to
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/datasource/JdbcQueryPropertiesExtension.java
index bb02d209927..b3301a0b07a 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/JobOperationType.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/datasource/JdbcQueryPropertiesExtension.java
@@ -15,12 +15,23 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.api.job;
+package org.apache.shardingsphere.data.pipeline.spi.datasource;
+
+import org.apache.shardingsphere.spi.annotation.SingletonSPI;
+import org.apache.shardingsphere.spi.type.typed.TypedSPI;
+
+import java.util.Properties;
/**
- * Job operation type.
+ * JDBC query properties extension.
*/
-public enum JobOperationType {
+@SingletonSPI
+public interface JdbcQueryPropertiesExtension extends TypedSPI {
- INSERT, DELETE, UPDATE, SELECT, SYSTEM_LOAD, CPU_USAGE
+ /**
+ * Extend query properties.
+ *
+ * @return JDBC query properties for extension
+ */
+ Properties extendQueryProperties();
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/datasource/JdbcQueryPropertiesExtensionFactory.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/datasource/JdbcQueryPropertiesExtensionFactory.java
new file mode 100644
index 00000000000..42fccdd5e6c
--- /dev/null
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/datasource/JdbcQueryPropertiesExtensionFactory.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.spi.datasource;
+
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
+import org.apache.shardingsphere.spi.ShardingSphereServiceLoader;
+import org.apache.shardingsphere.spi.type.typed.TypedSPIRegistry;
+
+import java.util.Optional;
+
+/**
+ * JDBC query properties extension factory.
+ */
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
+public final class JdbcQueryPropertiesExtensionFactory {
+
+ static {
+
ShardingSphereServiceLoader.register(JdbcQueryPropertiesExtension.class);
+ }
+
+ /**
+ * Get instance of JDBC query properties extension.
+ *
+ * @param databaseType database type
+ * @return got instance
+ */
+ public static Optional<JdbcQueryPropertiesExtension> getInstance(final
String databaseType) {
+ return
TypedSPIRegistry.findRegisteredService(JdbcQueryPropertiesExtension.class,
databaseType);
+ }
+}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/test/java/org/apache/shardingsphere/data/pipeline/api/datasource/config/impl/ShardingSpherePipelineDataSourceConfigurationTest.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/test/java/org/apache/shardingsphere/data/pipeline/api/datasource/config/impl/ShardingSpherePipelineDataSourceConfigurationTest.java
index c522bb89e19..2da52c21aa0 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/test/java/org/apache/shardingsphere/data/pipeline/api/datasource/config/impl/ShardingSpherePipelineDataSourceConfigurationTest.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/test/java/org/apache/shardingsphere/data/pipeline/api/datasource/config/impl/ShardingSpherePipelineDataSourceConfigurationTest.java
@@ -17,30 +17,32 @@
package org.apache.shardingsphere.data.pipeline.api.datasource.config.impl;
-import org.apache.shardingsphere.infra.datasource.props.DataSourceProperties;
-import
org.apache.shardingsphere.infra.yaml.config.swapper.YamlDataSourceConfigurationSwapper;
+import org.apache.shardingsphere.infra.yaml.config.pojo.YamlRootConfiguration;
import org.junit.Test;
-import java.util.LinkedList;
-import java.util.List;
import java.util.Map;
-import java.util.Properties;
-import java.util.LinkedHashMap;
+import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
public final class ShardingSpherePipelineDataSourceConfigurationTest {
@Test
- public void assertAppendJDBCParameters() {
- ShardingSpherePipelineDataSourceConfiguration dataSourceConfig = new
ShardingSpherePipelineDataSourceConfiguration(getDataSourceYaml());
- Properties queryProps = new Properties();
- queryProps.setProperty("rewriteBatchedStatements",
Boolean.TRUE.toString());
- dataSourceConfig.appendJDBCQueryProperties(queryProps);
- List<DataSourceProperties> actual = new
LinkedList<>(getDataSourcePropertiesMap(dataSourceConfig.getRootConfig().getDataSources()).values());
- assertThat(actual.get(0).getAllLocalProperties().get("jdbcUrl"),
is("jdbc:mysql://192.168.0.2:3306/scaling?serverTimezone=UTC&useSSL=false&rewriteBatchedStatements=true"));
- assertThat(actual.get(1).getAllLocalProperties().get("jdbcUrl"),
is("jdbc:mysql://192.168.0.1:3306/scaling?serverTimezone=UTC&useSSL=false&rewriteBatchedStatements=true"));
+ public void assertCreate() {
+ ShardingSpherePipelineDataSourceConfiguration actual = new
ShardingSpherePipelineDataSourceConfiguration(getDataSourceYaml());
+ assertGetConfig(actual);
+ }
+
+ private void assertGetConfig(final
ShardingSpherePipelineDataSourceConfiguration actual) {
+ assertThat(actual.getDatabaseType().getType(), is("MySQL"));
+ assertThat(actual.getType(),
is(ShardingSpherePipelineDataSourceConfiguration.TYPE));
+ assertThat(actual.getDataSourceConfiguration(),
instanceOf(YamlRootConfiguration.class));
+ Map<String, Map<String, Object>> dataSources =
actual.getRootConfig().getDataSources();
+ assertThat(dataSources.size(), is(2));
+ assertTrue(dataSources.containsKey("ds_0"));
+ assertTrue(dataSources.containsKey("ds_1"));
}
private String getDataSourceYaml() {
@@ -52,10 +54,4 @@ public final class
ShardingSpherePipelineDataSourceConfigurationTest {
+ " dataSourceClassName:
com.zaxxer.hikari.HikariDataSource\n"
+ " url:
jdbc:mysql://192.168.0.1:3306/scaling?serverTimezone=UTC&useSSL=false\n";
}
-
- private static Map<String, DataSourceProperties>
getDataSourcePropertiesMap(final Map<String, Map<String, Object>>
yamlDataSourceConfigs) {
- Map<String, DataSourceProperties> result = new
LinkedHashMap<>(yamlDataSourceConfigs.size());
- yamlDataSourceConfigs.forEach((key, value) -> result.put(key, new
YamlDataSourceConfigurationSwapper().swapToDataSourceProperties(value)));
- return result;
- }
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/test/java/org/apache/shardingsphere/data/pipeline/api/datasource/config/impl/StandardPipelineDataSourceConfigurationTest.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/test/java/org/apache/shardingsphere/data/pipeline/api/datasource/config/impl/StandardPipelineDataSourceConfigurationTest.java
index 56cc562842f..6d3a402b8d2 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/test/java/org/apache/shardingsphere/data/pipeline/api/datasource/config/impl/StandardPipelineDataSourceConfigurationTest.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/test/java/org/apache/shardingsphere/data/pipeline/api/datasource/config/impl/StandardPipelineDataSourceConfigurationTest.java
@@ -21,15 +21,11 @@ import
org.apache.shardingsphere.data.pipeline.api.datasource.config.yaml.YamlJd
import org.apache.shardingsphere.infra.datasource.props.DataSourceProperties;
import org.junit.Test;
-import java.util.Properties;
-
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;
public final class StandardPipelineDataSourceConfigurationTest {
- private static final String NEW_JDBC_URL =
"jdbc:mysql://127.0.0.1:3306/demo_ds?serverTimezone=UTC&useSSL=true";
-
private static final String JDBC_URL =
"jdbc:mysql://127.0.0.1:3306/demo_ds?serverTimezone=UTC&useSSL=false";
private static final String USERNAME = "userName";
@@ -42,12 +38,11 @@ public final class
StandardPipelineDataSourceConfigurationTest {
assertGetConfig(actual);
actual = new
StandardPipelineDataSourceConfiguration(actual.getParameter());
assertGetConfig(actual);
- assertAppendJDBCQueryProperties(actual);
}
private void assertGetConfig(final StandardPipelineDataSourceConfiguration
actual) {
assertThat(actual.getDatabaseType().getType(), is("MySQL"));
- assertThat(actual.getType(), is("JDBC"));
+ assertThat(actual.getType(),
is(StandardPipelineDataSourceConfiguration.TYPE));
assertThat(((DataSourceProperties)
actual.getDataSourceConfiguration()).getDataSourceClassName(),
is("com.zaxxer.hikari.HikariDataSource"));
assertGetJdbcConfig(actual.getJdbcConfig());
}
@@ -57,11 +52,4 @@ public final class
StandardPipelineDataSourceConfigurationTest {
assertThat(actual.getUsername(), is(USERNAME));
assertThat(actual.getPassword(), is(PASSWORD));
}
-
- private void assertAppendJDBCQueryProperties(final
StandardPipelineDataSourceConfiguration actual) {
- Properties props = new Properties();
- props.setProperty("useSSL", Boolean.TRUE.toString());
- actual.appendJDBCQueryProperties(props);
- assertThat(actual.getJdbcConfig().getJdbcUrl(), is(NEW_JDBC_URL));
- }
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyChecker.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyChecker.java
index a140c2fcb6d..e58a79a9317 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyChecker.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyChecker.java
@@ -36,7 +36,6 @@ import
org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJ
import
org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCalculateAlgorithm;
import
org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
import org.apache.shardingsphere.infra.database.type.DatabaseType;
-import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType;
import
org.apache.shardingsphere.infra.executor.kernel.thread.ExecutorThreadFactoryBuilder;
import
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
import
org.apache.shardingsphere.infra.metadata.database.schema.decorator.model.ShardingSphereSchema;
@@ -56,7 +55,6 @@ import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
-import java.util.Properties;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
@@ -205,7 +203,6 @@ public final class DataConsistencyChecker {
private void decoratePipelineDataSourceConfiguration(final
DataConsistencyCalculateAlgorithm calculator, final
PipelineDataSourceConfiguration dataSourceConfig) {
checkDatabaseTypeSupported(calculator.getSupportedDatabaseTypes(),
dataSourceConfig.getDatabaseType().getType());
- addMySQLDataSourceConfig(dataSourceConfig);
}
private void checkDatabaseTypeSupported(final Collection<String>
supportedDatabaseTypes, final String databaseType) {
@@ -214,14 +211,6 @@ public final class DataConsistencyChecker {
}
}
- private void addMySQLDataSourceConfig(final
PipelineDataSourceConfiguration dataSourceConfig) {
- if (dataSourceConfig.getDatabaseType().getType().equalsIgnoreCase(new
MySQLDatabaseType().getType())) {
- Properties queryProps = new Properties();
- queryProps.setProperty("yearIsDateType", Boolean.FALSE.toString());
- dataSourceConfig.appendJDBCQueryProperties(queryProps);
- }
- }
-
private ShardingSphereTable getTableMetaData(final String databaseName,
final String logicTableName) {
ContextManager contextManager = PipelineContext.getContextManager();
Preconditions.checkNotNull(contextManager, "ContextManager null");
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/AbstractDataSourcePreparer.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/AbstractDataSourcePreparer.java
index 0436ecd08aa..bdc8667cdc9 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/AbstractDataSourcePreparer.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/AbstractDataSourcePreparer.java
@@ -100,6 +100,7 @@ public abstract class AbstractDataSourcePreparer implements
DataSourcePreparer {
return result;
}
+ // TODO the invocation is disabled for now, it might be used again for
next new feature
protected final PipelineDataSourceWrapper getSourceCachedDataSource(final
RuleAlteredJobConfiguration jobConfig, final PipelineDataSourceManager
dataSourceManager) {
return
dataSourceManager.getDataSource(PipelineDataSourceConfigurationFactory.newInstance(jobConfig.getSource().getType(),
jobConfig.getSource().getParameter()));
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPreparer.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPreparer.java
index 358392f8f64..6455944a0fe 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPreparer.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPreparer.java
@@ -75,8 +75,6 @@ public final class RuleAlteredJobPreparer {
* @param jobContext job context
*/
public void prepare(final RuleAlteredJobContext jobContext) {
- // TODO Initialize source and target data source after tasks
initialization, since dumper and importer constructor might call
appendJDBCQueryProperties.
- // But InventoryTaskSplitter need to check target tables. It need to
do some refactoring for appendJDBCQueryProperties vocations.
checkSourceDataSource(jobContext);
if (jobContext.isStopping()) {
throw new PipelineJobPrepareFailedException("Job stopping, jobId="
+ jobContext.getJobId());
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorker.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorker.java
index fbccab73eec..d65e651de40 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorker.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorker.java
@@ -42,10 +42,6 @@ import
org.apache.shardingsphere.data.pipeline.spi.rulealtered.RuleAlteredDetect
import
org.apache.shardingsphere.data.pipeline.spi.rulealtered.RuleAlteredJobConfigurationPreparerFactory;
import org.apache.shardingsphere.infra.config.RuleConfiguration;
import
org.apache.shardingsphere.infra.config.rulealtered.OnRuleAlteredActionConfiguration;
-import org.apache.shardingsphere.infra.database.metadata.url.JdbcUrlAppender;
-import org.apache.shardingsphere.infra.database.type.DatabaseType;
-import org.apache.shardingsphere.infra.database.type.DatabaseTypeEngine;
-import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType;
import org.apache.shardingsphere.infra.eventbus.ShardingSphereEventBus;
import org.apache.shardingsphere.infra.lock.LockContext;
import org.apache.shardingsphere.infra.yaml.config.pojo.YamlRootConfiguration;
@@ -63,7 +59,6 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
-import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.stream.Collectors;
@@ -246,32 +241,12 @@ public final class RuleAlteredJobWorker {
YamlRootConfiguration result = new YamlRootConfiguration();
result.setDatabaseName(databaseName);
Map<String, Map<String, Object>> yamlDataSources =
YamlEngine.unmarshal(dataSources, Map.class);
- disableSSLForMySQL(yamlDataSources);
result.setDataSources(yamlDataSources);
Collection<YamlRuleConfiguration> yamlRuleConfigs =
YamlEngine.unmarshal(rules, Collection.class, true);
result.setRules(yamlRuleConfigs);
return result;
}
- private void disableSSLForMySQL(final Map<String, Map<String, Object>>
yamlDataSources) {
- Map<String, Object> firstDataSourceProps =
yamlDataSources.entrySet().iterator().next().getValue();
- String jdbcUrlKey = firstDataSourceProps.containsKey("url") ? "url" :
"jdbcUrl";
- String jdbcUrl = (String) firstDataSourceProps.get(jdbcUrlKey);
- if (null == jdbcUrl) {
- log.warn("disableSSLForMySQL, could not get jdbcUrl,
jdbcUrlKey={}", jdbcUrlKey);
- return;
- }
- DatabaseType databaseType =
DatabaseTypeEngine.getDatabaseType(jdbcUrl);
- if (!(databaseType instanceof MySQLDatabaseType)) {
- return;
- }
- Properties queryProps = new Properties();
- queryProps.setProperty("useSSL", Boolean.FALSE.toString());
- for (Entry<String, Map<String, Object>> entry :
yamlDataSources.entrySet()) {
- entry.getValue().put(jdbcUrlKey, new
JdbcUrlAppender().appendQueryProperties((String)
entry.getValue().get(jdbcUrlKey), queryProps));
- }
- }
-
/**
* Build task configuration.
*
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/importer/MySQLImporter.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/datasource/MySQLJdbcQueryPropertiesExtension.java
similarity index 51%
copy from
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/importer/MySQLImporter.java
copy to
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/datasource/MySQLJdbcQueryPropertiesExtension.java
index c0b295a962e..8f7401f62bd 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/importer/MySQLImporter.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/datasource/MySQLJdbcQueryPropertiesExtension.java
@@ -15,29 +15,32 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.mysql.importer;
+package org.apache.shardingsphere.data.pipeline.mysql.datasource;
-import
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.ImporterConfiguration;
-import
org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
-import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
-import org.apache.shardingsphere.data.pipeline.core.importer.AbstractImporter;
+import
org.apache.shardingsphere.data.pipeline.spi.datasource.JdbcQueryPropertiesExtension;
import java.util.Properties;
/**
- * MySQL importer.
+ * MySQL JDBC query properties extension.
*/
-public final class MySQLImporter extends AbstractImporter {
+public final class MySQLJdbcQueryPropertiesExtension implements
JdbcQueryPropertiesExtension {
- public MySQLImporter(final ImporterConfiguration importerConfig, final
PipelineDataSourceManager dataSourceManager, final PipelineChannel channel) {
- super(importerConfig, dataSourceManager, channel);
- Properties queryProps = new Properties();
+ private final Properties queryProps = new Properties();
+
+ public MySQLJdbcQueryPropertiesExtension() {
+ queryProps.setProperty("useSSL", Boolean.FALSE.toString());
queryProps.setProperty("rewriteBatchedStatements",
Boolean.TRUE.toString());
-
importerConfig.getDataSourceConfig().appendJDBCQueryProperties(queryProps);
+ queryProps.setProperty("yearIsDateType", Boolean.FALSE.toString());
+ }
+
+ @Override
+ public Properties extendQueryProperties() {
+ return queryProps;
}
@Override
- protected String getSchemaName(final String logicTableName) {
- return null;
+ public String getType() {
+ return "MySQL";
}
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/importer/MySQLImporter.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/importer/MySQLImporter.java
index c0b295a962e..843c6448057 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/importer/MySQLImporter.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/importer/MySQLImporter.java
@@ -22,8 +22,6 @@ import
org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChanne
import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
import org.apache.shardingsphere.data.pipeline.core.importer.AbstractImporter;
-import java.util.Properties;
-
/**
* MySQL importer.
*/
@@ -31,9 +29,6 @@ public final class MySQLImporter extends AbstractImporter {
public MySQLImporter(final ImporterConfiguration importerConfig, final
PipelineDataSourceManager dataSourceManager, final PipelineChannel channel) {
super(importerConfig, dataSourceManager, channel);
- Properties queryProps = new Properties();
- queryProps.setProperty("rewriteBatchedStatements",
Boolean.TRUE.toString());
-
importerConfig.getDataSourceConfig().appendJDBCQueryProperties(queryProps);
}
@Override
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLInventoryDumper.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLInventoryDumper.java
index 5a22aec8962..0fbfb35ca87 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLInventoryDumper.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLInventoryDumper.java
@@ -28,7 +28,6 @@ import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Types;
-import java.util.Properties;
/**
* MySQL JDBC Dumper.
@@ -40,9 +39,6 @@ public final class MySQLInventoryDumper extends
AbstractInventoryDumper {
public MySQLInventoryDumper(final InventoryDumperConfiguration
inventoryDumperConfig, final PipelineChannel channel,
final DataSource dataSource, final
PipelineTableMetaDataLoader metaDataLoader) {
super(inventoryDumperConfig, channel, dataSource, metaDataLoader);
- Properties queryProps = new Properties();
- queryProps.setProperty("yearIsDateType", Boolean.FALSE.toString());
-
inventoryDumperConfig.getDataSourceConfig().appendJDBCQueryProperties(queryProps);
}
@Override
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.datasource.JdbcQueryPropertiesExtension
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.datasource.JdbcQueryProperties
[...]
new file mode 100644
index 00000000000..91f401b95df
--- /dev/null
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.datasource.JdbcQueryPropertiesExtension
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+org.apache.shardingsphere.data.pipeline.mysql.datasource.MySQLJdbcQueryPropertiesExtension
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/JobOperationType.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/datasource/OpenGaussJdbcQueryPropertiesExtension.java
similarity index 58%
copy from
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/JobOperationType.java
copy to
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/datasource/OpenGaussJdbcQueryPropertiesExtension.java
index bb02d209927..c2952349a32 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/JobOperationType.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/datasource/OpenGaussJdbcQueryPropertiesExtension.java
@@ -15,12 +15,26 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.api.job;
+package org.apache.shardingsphere.data.pipeline.opengauss.datasource;
+
+import
org.apache.shardingsphere.data.pipeline.spi.datasource.JdbcQueryPropertiesExtension;
+
+import java.util.Properties;
/**
- * Job operation type.
+ * JDBC query properties extension of openGauss.
*/
-public enum JobOperationType {
+public final class OpenGaussJdbcQueryPropertiesExtension implements
JdbcQueryPropertiesExtension {
+
+ private final Properties queryProps = new Properties();
+
+ @Override
+ public Properties extendQueryProperties() {
+ return queryProps;
+ }
- INSERT, DELETE, UPDATE, SELECT, SYSTEM_LOAD, CPU_USAGE
+ @Override
+ public String getType() {
+ return "openGauss";
+ }
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.datasource.JdbcQueryPropertiesExtension
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.datasource.JdbcQueryPr
[...]
new file mode 100644
index 00000000000..402db058030
--- /dev/null
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.datasource.JdbcQueryPropertiesExtension
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+org.apache.shardingsphere.data.pipeline.opengauss.datasource.OpenGaussJdbcQueryPropertiesExtension
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/JobOperationType.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/datasource/PostgreSQLJdbcQueryPropertiesExtension.java
similarity index 58%
copy from
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/JobOperationType.java
copy to
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/datasource/PostgreSQLJdbcQueryPropertiesExtension.java
index bb02d209927..96e639dde87 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/JobOperationType.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/datasource/PostgreSQLJdbcQueryPropertiesExtension.java
@@ -15,12 +15,26 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.api.job;
+package org.apache.shardingsphere.data.pipeline.postgresql.datasource;
+
+import
org.apache.shardingsphere.data.pipeline.spi.datasource.JdbcQueryPropertiesExtension;
+
+import java.util.Properties;
/**
- * Job operation type.
+ * PostgreSQL JDBC query properties extension.
*/
-public enum JobOperationType {
+public final class PostgreSQLJdbcQueryPropertiesExtension implements
JdbcQueryPropertiesExtension {
+
+ private final Properties queryProps = new Properties();
+
+ @Override
+ public Properties extendQueryProperties() {
+ return queryProps;
+ }
- INSERT, DELETE, UPDATE, SELECT, SYSTEM_LOAD, CPU_USAGE
+ @Override
+ public String getType() {
+ return "PostgreSQL";
+ }
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.datasource.JdbcQueryPropertiesExtension
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.datasource.JdbcQuery
[...]
new file mode 100644
index 00000000000..4d9c8aeb216
--- /dev/null
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.datasource.JdbcQueryPropertiesExtension
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+org.apache.shardingsphere.data.pipeline.postgresql.datasource.PostgreSQLJdbcQueryPropertiesExtension