This is an automated email from the ASF dual-hosted git repository.
totalo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 9cc0210d55e Refactor DataSourcePreparer and DataSourcePrepareOption
(#29423)
9cc0210d55e is described below
commit 9cc0210d55e4cbd6e4c581c8dc23f0d413889302
Author: Liang Zhang <[email protected]>
AuthorDate: Sun Dec 17 00:10:36 2023 +0800
Refactor DataSourcePreparer and DataSourcePrepareOption (#29423)
* Refactor AbstractDataSourcePreparer
* Refactor DataSourcePrepareEngine
* Refactor DataSourcePreparer and DataSourcePrepareOption
---
.../core/preparer/PipelineJobPreparerUtils.java | 23 +++--
.../datasource/AbstractDataSourcePreparer.java | 112 --------------------
...ePreparer.java => DataSourcePrepareOption.java} | 34 +++----
.../preparer/datasource/DataSourcePreparer.java | 113 ++++++++++++++++++---
.../datasource/AbstractDataSourcePreparerTest.java | 77 --------------
.../datasource/MySQLDataSourcePrepareOption.java | 31 ++++++
.../datasource/MySQLDataSourcePreparer.java | 48 ---------
...re.preparer.datasource.DataSourcePrepareOption} | 2 +-
.../OpenGaussDataSourcePrepareOption.java | 46 +++++++++
.../datasource/OpenGaussDataSourcePreparer.java | 74 --------------
...re.preparer.datasource.DataSourcePrepareOption} | 2 +-
.../PostgreSQLDataSourcePrepareOption.java | 31 ++++++
.../datasource/PostgreSQLDataSourcePreparer.java | 51 ----------
...re.preparer.datasource.DataSourcePrepareOption} | 2 +-
.../core/fixture/H2DataSourcePrepareOption.java | 31 ++++++
.../core/fixture/H2DataSourcePreparer.java | 48 ---------
...re.preparer.datasource.DataSourcePrepareOption} | 2 +-
17 files changed, 268 insertions(+), 459 deletions(-)
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/PipelineJobPreparerUtils.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/PipelineJobPreparerUtils.java
index d17716d8517..ebb18af17eb 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/PipelineJobPreparerUtils.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/PipelineJobPreparerUtils.java
@@ -20,17 +20,18 @@ package
org.apache.shardingsphere.data.pipeline.core.preparer;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
-import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.IncrementalDumperContext;
import
org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.type.ShardingSpherePipelineDataSourceConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.type.StandardPipelineDataSourceConfiguration;
-import
org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPosition;
-import
org.apache.shardingsphere.data.pipeline.core.importer.ImporterConfiguration;
import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceWrapper;
+import
org.apache.shardingsphere.data.pipeline.core.importer.ImporterConfiguration;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.IncrementalDumperContext;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPosition;
import
org.apache.shardingsphere.data.pipeline.core.job.progress.JobItemIncrementalTasksProgress;
import
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.DataSourceCheckEngine;
import
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.DataSourcePreparer;
+import
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.DataSourcePrepareOption;
import
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param.PrepareTargetSchemasParameter;
import
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param.PrepareTargetTablesParameter;
import
org.apache.shardingsphere.data.pipeline.core.spi.ingest.dumper.IncrementalDumperCreator;
@@ -75,12 +76,12 @@ public final class PipelineJobPreparerUtils {
* @throws SQLException if prepare target schema fail
*/
public static void prepareTargetSchema(final DatabaseType databaseType,
final PrepareTargetSchemasParameter prepareTargetSchemasParam) throws
SQLException {
- Optional<DataSourcePreparer> dataSourcePreparer =
DatabaseTypedSPILoader.findService(DataSourcePreparer.class, databaseType);
- if (!dataSourcePreparer.isPresent()) {
- log.info("dataSourcePreparer null, ignore prepare target");
+ Optional<DataSourcePrepareOption> option =
DatabaseTypedSPILoader.findService(DataSourcePrepareOption.class, databaseType);
+ if (!option.isPresent()) {
+ log.info("Data source preparer option null, ignore prepare
target");
return;
}
-
dataSourcePreparer.get().prepareTargetSchemas(prepareTargetSchemasParam);
+ new
DataSourcePreparer(option.get()).prepareTargetSchemas(prepareTargetSchemasParam);
}
/**
@@ -104,13 +105,13 @@ public final class PipelineJobPreparerUtils {
* @throws SQLException SQL exception
*/
public static void prepareTargetTables(final DatabaseType databaseType,
final PrepareTargetTablesParameter prepareTargetTablesParam) throws
SQLException {
- Optional<DataSourcePreparer> dataSourcePreparer =
DatabaseTypedSPILoader.findService(DataSourcePreparer.class, databaseType);
- if (!dataSourcePreparer.isPresent()) {
- log.info("dataSourcePreparer null, ignore prepare target");
+ Optional<DataSourcePrepareOption> option =
DatabaseTypedSPILoader.findService(DataSourcePrepareOption.class, databaseType);
+ if (!option.isPresent()) {
+ log.info("Data source preparer option null, ignore prepare
target");
return;
}
long startTimeMillis = System.currentTimeMillis();
- dataSourcePreparer.get().prepareTargetTables(prepareTargetTablesParam);
+ new
DataSourcePreparer(option.get()).prepareTargetTables(prepareTargetTablesParam);
log.info("prepareTargetTables cost {} ms", System.currentTimeMillis()
- startTimeMillis);
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/AbstractDataSourcePreparer.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/AbstractDataSourcePreparer.java
deleted file mode 100644
index 9988ce6e3ca..00000000000
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/AbstractDataSourcePreparer.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.data.pipeline.core.preparer.datasource;
-
-import lombok.extern.slf4j.Slf4j;
-import
org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration;
-import
org.apache.shardingsphere.data.pipeline.core.preparer.CreateTableConfiguration;
-import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
-import
org.apache.shardingsphere.data.pipeline.core.metadata.generator.PipelineDDLGenerator;
-import
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param.PrepareTargetSchemasParameter;
-import
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.PipelineCommonSQLBuilder;
-import
org.apache.shardingsphere.infra.database.core.metadata.database.DialectDatabaseMetaData;
-import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
-import org.apache.shardingsphere.infra.database.core.type.DatabaseTypeRegistry;
-import org.apache.shardingsphere.infra.parser.SQLParserEngine;
-
-import javax.sql.DataSource;
-import java.sql.Connection;
-import java.sql.SQLException;
-import java.sql.Statement;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.Optional;
-import java.util.regex.Pattern;
-
-/**
- * Abstract data source preparer.
- */
-@Slf4j
-public abstract class AbstractDataSourcePreparer implements DataSourcePreparer
{
-
- private static final Pattern PATTERN_CREATE_TABLE_IF_NOT_EXISTS =
Pattern.compile("CREATE\\s+TABLE\\s+IF\\s+NOT\\s+EXISTS\\s+",
Pattern.CASE_INSENSITIVE);
-
- private static final Pattern PATTERN_CREATE_TABLE =
Pattern.compile("CREATE\\s+TABLE\\s+", Pattern.CASE_INSENSITIVE);
-
- @Override
- public final void prepareTargetSchemas(final PrepareTargetSchemasParameter
param) throws SQLException {
- DatabaseType targetDatabaseType = param.getTargetDatabaseType();
- DialectDatabaseMetaData dialectDatabaseMetaData = new
DatabaseTypeRegistry(targetDatabaseType).getDialectDatabaseMetaData();
- if (!dialectDatabaseMetaData.isSchemaAvailable()) {
- return;
- }
- String defaultSchema =
dialectDatabaseMetaData.getDefaultSchema().orElse(null);
- PipelineCommonSQLBuilder pipelineSQLBuilder = new
PipelineCommonSQLBuilder(targetDatabaseType);
- Collection<String> createdSchemaNames = new HashSet<>();
- for (CreateTableConfiguration each :
param.getCreateTableConfigurations()) {
- String targetSchemaName =
each.getTargetName().getSchemaName().toString();
- if (null == targetSchemaName ||
targetSchemaName.equalsIgnoreCase(defaultSchema) ||
createdSchemaNames.contains(targetSchemaName)) {
- continue;
- }
- Optional<String> sql =
pipelineSQLBuilder.buildCreateSchemaSQL(targetSchemaName);
- if (sql.isPresent()) {
- executeCreateSchema(param.getDataSourceManager(),
each.getTargetDataSourceConfig(), sql.get());
- createdSchemaNames.add(targetSchemaName);
- }
- }
- }
-
- private void executeCreateSchema(final PipelineDataSourceManager
dataSourceManager, final PipelineDataSourceConfiguration
targetDataSourceConfig, final String sql) throws SQLException {
- log.info("Prepare target schemas SQL: {}", sql);
- try (
- Connection connection =
dataSourceManager.getDataSource(targetDataSourceConfig).getConnection();
- Statement statement = connection.createStatement()) {
- statement.execute(sql);
- } catch (final SQLException ex) {
- if (isSupportIfNotExistsOnCreateSchema()) {
- throw ex;
- }
- log.warn("create schema failed", ex);
- }
- }
-
- protected void executeTargetTableSQL(final Connection targetConnection,
final String sql) throws SQLException {
- log.info("Execute target table SQL: {}", sql);
- try (Statement statement = targetConnection.createStatement()) {
- statement.execute(sql);
- }
- }
-
- protected final String addIfNotExistsForCreateTableSQL(final String
createTableSQL) {
- if (PATTERN_CREATE_TABLE_IF_NOT_EXISTS.matcher(createTableSQL).find())
{
- return createTableSQL;
- }
- return
PATTERN_CREATE_TABLE.matcher(createTableSQL).replaceFirst("CREATE TABLE IF NOT
EXISTS ");
- }
-
- protected final String getCreateTargetTableSQL(final
CreateTableConfiguration createTableConfig, final PipelineDataSourceManager
dataSourceManager,
- final SQLParserEngine
sqlParserEngine) throws SQLException {
- DatabaseType databaseType =
createTableConfig.getSourceDataSourceConfig().getDatabaseType();
- DataSource sourceDataSource =
dataSourceManager.getDataSource(createTableConfig.getSourceDataSourceConfig());
- String schemaName =
createTableConfig.getSourceName().getSchemaName().toString();
- String sourceTableName =
createTableConfig.getSourceName().getTableName().toString();
- String targetTableName =
createTableConfig.getTargetName().getTableName().toString();
- PipelineDDLGenerator generator = new PipelineDDLGenerator();
- return generator.generateLogicDDL(databaseType, sourceDataSource,
schemaName, sourceTableName, targetTableName, sqlParserEngine);
- }
-}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/DataSourcePreparer.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/DataSourcePrepareOption.java
similarity index 61%
copy from
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/DataSourcePreparer.java
copy to
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/DataSourcePrepareOption.java
index 3219b840767..52bc1962bcd 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/DataSourcePreparer.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/DataSourcePrepareOption.java
@@ -17,34 +17,17 @@
package org.apache.shardingsphere.data.pipeline.core.preparer.datasource;
-import
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param.PrepareTargetSchemasParameter;
-import
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param.PrepareTargetTablesParameter;
import org.apache.shardingsphere.infra.database.core.spi.DatabaseTypedSPI;
import org.apache.shardingsphere.infra.spi.annotation.SingletonSPI;
-import java.sql.SQLException;
+import java.util.Collection;
+import java.util.Collections;
/**
- * Data source preparer.
+ * Data source prepare option.
*/
@SingletonSPI
-public interface DataSourcePreparer extends DatabaseTypedSPI {
-
- /**
- * Prepare target schemas.
- *
- * @param param prepare target schemas parameter
- * @throws SQLException if prepare target schema fail
- */
- void prepareTargetSchemas(PrepareTargetSchemasParameter param) throws
SQLException;
-
- /**
- * Prepare target tables.
- *
- * @param param prepare target tables parameter
- * @throws SQLException SQL exception
- */
- void prepareTargetTables(PrepareTargetTablesParameter param) throws
SQLException;
+public interface DataSourcePrepareOption extends DatabaseTypedSPI {
/**
* Is support if not exists on create schema SQL.
@@ -54,4 +37,13 @@ public interface DataSourcePreparer extends DatabaseTypedSPI
{
default boolean isSupportIfNotExistsOnCreateSchema() {
return true;
}
+
+ /**
+ * Get ignored exception messages.
+ *
+ * @return ignored exception messages
+ */
+ default Collection<String> getIgnoredExceptionMessages() {
+ return Collections.emptyList();
+ }
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/DataSourcePreparer.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/DataSourcePreparer.java
index 3219b840767..d738edeba8e 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/DataSourcePreparer.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/DataSourcePreparer.java
@@ -17,18 +17,42 @@
package org.apache.shardingsphere.data.pipeline.core.preparer.datasource;
+import com.google.common.base.Splitter;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import
org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration;
+import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
+import
org.apache.shardingsphere.data.pipeline.core.metadata.generator.PipelineDDLGenerator;
+import
org.apache.shardingsphere.data.pipeline.core.preparer.CreateTableConfiguration;
import
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param.PrepareTargetSchemasParameter;
import
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param.PrepareTargetTablesParameter;
-import org.apache.shardingsphere.infra.database.core.spi.DatabaseTypedSPI;
-import org.apache.shardingsphere.infra.spi.annotation.SingletonSPI;
+import
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.PipelineCommonSQLBuilder;
+import
org.apache.shardingsphere.infra.database.core.metadata.database.DialectDatabaseMetaData;
+import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
+import org.apache.shardingsphere.infra.database.core.type.DatabaseTypeRegistry;
+import org.apache.shardingsphere.infra.parser.SQLParserEngine;
+import javax.sql.DataSource;
+import java.sql.Connection;
import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Optional;
+import java.util.regex.Pattern;
/**
* Data source preparer.
*/
-@SingletonSPI
-public interface DataSourcePreparer extends DatabaseTypedSPI {
+@RequiredArgsConstructor
+@Slf4j
+public final class DataSourcePreparer {
+
+ private static final Pattern PATTERN_CREATE_TABLE_IF_NOT_EXISTS =
Pattern.compile("CREATE\\s+TABLE\\s+IF\\s+NOT\\s+EXISTS\\s+",
Pattern.CASE_INSENSITIVE);
+
+ private static final Pattern PATTERN_CREATE_TABLE =
Pattern.compile("CREATE\\s+TABLE\\s+", Pattern.CASE_INSENSITIVE);
+
+ private final DataSourcePrepareOption option;
/**
* Prepare target schemas.
@@ -36,7 +60,41 @@ public interface DataSourcePreparer extends DatabaseTypedSPI
{
* @param param prepare target schemas parameter
* @throws SQLException if prepare target schema fail
*/
- void prepareTargetSchemas(PrepareTargetSchemasParameter param) throws
SQLException;
+ public void prepareTargetSchemas(final PrepareTargetSchemasParameter
param) throws SQLException {
+ DatabaseType targetDatabaseType = param.getTargetDatabaseType();
+ DialectDatabaseMetaData dialectDatabaseMetaData = new
DatabaseTypeRegistry(targetDatabaseType).getDialectDatabaseMetaData();
+ if (!dialectDatabaseMetaData.isSchemaAvailable()) {
+ return;
+ }
+ String defaultSchema =
dialectDatabaseMetaData.getDefaultSchema().orElse(null);
+ PipelineCommonSQLBuilder pipelineSQLBuilder = new
PipelineCommonSQLBuilder(targetDatabaseType);
+ Collection<String> createdSchemaNames = new HashSet<>();
+ for (CreateTableConfiguration each :
param.getCreateTableConfigurations()) {
+ String targetSchemaName =
each.getTargetName().getSchemaName().toString();
+ if (null == targetSchemaName ||
targetSchemaName.equalsIgnoreCase(defaultSchema) ||
createdSchemaNames.contains(targetSchemaName)) {
+ continue;
+ }
+ Optional<String> sql =
pipelineSQLBuilder.buildCreateSchemaSQL(targetSchemaName);
+ if (sql.isPresent()) {
+ executeCreateSchema(param.getDataSourceManager(),
each.getTargetDataSourceConfig(), sql.get());
+ createdSchemaNames.add(targetSchemaName);
+ }
+ }
+ }
+
+ private void executeCreateSchema(final PipelineDataSourceManager
dataSourceManager, final PipelineDataSourceConfiguration
targetDataSourceConfig, final String sql) throws SQLException {
+ log.info("Prepare target schemas SQL: {}", sql);
+ try (
+ Connection connection =
dataSourceManager.getDataSource(targetDataSourceConfig).getConnection();
+ Statement statement = connection.createStatement()) {
+ statement.execute(sql);
+ } catch (final SQLException ex) {
+ if (option.isSupportIfNotExistsOnCreateSchema()) {
+ throw ex;
+ }
+ log.warn("Create schema failed", ex);
+ }
+ }
/**
* Prepare target tables.
@@ -44,14 +102,43 @@ public interface DataSourcePreparer extends
DatabaseTypedSPI {
* @param param prepare target tables parameter
* @throws SQLException SQL exception
*/
- void prepareTargetTables(PrepareTargetTablesParameter param) throws
SQLException;
+ public void prepareTargetTables(final PrepareTargetTablesParameter param)
throws SQLException {
+ PipelineDataSourceManager dataSourceManager =
param.getDataSourceManager();
+ for (CreateTableConfiguration each :
param.getCreateTableConfigurations()) {
+ String createTargetTableSQL = getCreateTargetTableSQL(each,
dataSourceManager, param.getSqlParserEngine());
+ try (Connection targetConnection =
dataSourceManager.getDataSource(each.getTargetDataSourceConfig()).getConnection())
{
+ for (String sql :
Splitter.on(";").trimResults().omitEmptyStrings().splitToList(createTargetTableSQL))
{
+ executeTargetTableSQL(targetConnection,
addIfNotExistsForCreateTableSQL(sql));
+ }
+ }
+ }
+ }
- /**
- * Is support if not exists on create schema SQL.
- *
- * @return supported or not
- */
- default boolean isSupportIfNotExistsOnCreateSchema() {
- return true;
+ private void executeTargetTableSQL(final Connection targetConnection,
final String sql) throws SQLException {
+ log.info("Execute target table SQL: {}", sql);
+ try (Statement statement = targetConnection.createStatement()) {
+ statement.execute(sql);
+ } catch (final SQLException ex) {
+ for (String each : option.getIgnoredExceptionMessages()) {
+ if (ex.getMessage().contains(each)) {
+ return;
+ }
+ }
+ throw ex;
+ }
+ }
+
+ private String addIfNotExistsForCreateTableSQL(final String
createTableSQL) {
+ return
PATTERN_CREATE_TABLE_IF_NOT_EXISTS.matcher(createTableSQL).find() ?
createTableSQL :
PATTERN_CREATE_TABLE.matcher(createTableSQL).replaceFirst("CREATE TABLE IF NOT
EXISTS ");
+ }
+
+ private String getCreateTargetTableSQL(final CreateTableConfiguration
createTableConfig,
+ final PipelineDataSourceManager
dataSourceManager, final SQLParserEngine sqlParserEngine) throws SQLException {
+ DatabaseType databaseType =
createTableConfig.getSourceDataSourceConfig().getDatabaseType();
+ DataSource sourceDataSource =
dataSourceManager.getDataSource(createTableConfig.getSourceDataSourceConfig());
+ String schemaName =
createTableConfig.getSourceName().getSchemaName().toString();
+ String sourceTableName =
createTableConfig.getSourceName().getTableName().toString();
+ String targetTableName =
createTableConfig.getTargetName().getTableName().toString();
+ return new PipelineDDLGenerator().generateLogicDDL(databaseType,
sourceDataSource, schemaName, sourceTableName, targetTableName,
sqlParserEngine);
}
}
diff --git
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/AbstractDataSourcePreparerTest.java
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/AbstractDataSourcePreparerTest.java
deleted file mode 100644
index d736c7286cf..00000000000
---
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/AbstractDataSourcePreparerTest.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.data.pipeline.core.preparer.datasource;
-
-import
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param.PrepareTargetTablesParameter;
-import org.junit.jupiter.api.Test;
-
-import java.sql.Connection;
-import java.sql.SQLException;
-import java.sql.Statement;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.regex.Pattern;
-
-import static org.hamcrest.CoreMatchers.is;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-class AbstractDataSourcePreparerTest {
-
- private static final Pattern PATTERN_CREATE_TABLE_IF_NOT_EXISTS =
Pattern.compile("CREATE\\s+TABLE\\s+IF\\s+NOT\\s+EXISTS\\s+",
Pattern.CASE_INSENSITIVE);
-
- private final AbstractDataSourcePreparer preparer = new
AbstractDataSourcePreparer() {
-
- @Override
- public void prepareTargetTables(final PrepareTargetTablesParameter
param) {
- }
-
- @Override
- public String getDatabaseType() {
- return "FIXTURE";
- }
- };
-
- @Test
- void assertExecuteTargetTableSQL() throws SQLException {
- Statement statement = mock(Statement.class);
- Connection targetConnection = mock(Connection.class);
- when(targetConnection.createStatement()).thenReturn(statement);
- String sql = "CREATE TABLE t (id int)";
- preparer.executeTargetTableSQL(targetConnection, sql);
- verify(statement).execute(sql);
- }
-
- @Test
- void assertAddIfNotExistsForCreateTableSQL() {
- Collection<String> createTableSQLs = Arrays.asList("CREATE TABLE IF
NOT EXISTS t (id int)", "CREATE TABLE t (id int)",
- "CREATE TABLE IF \nNOT \tEXISTS t (id int)", "CREATE \tTABLE
t (id int)", "CREATE TABLE \tt_order (id bigint) WITH (orientation=row,
compression=no);");
- for (String each : createTableSQLs) {
- String actual = preparer.addIfNotExistsForCreateTableSQL(each);
-
assertTrue(PATTERN_CREATE_TABLE_IF_NOT_EXISTS.matcher(actual).find());
- }
- Collection<String> mismatchedSQLs = Arrays.asList("SET search_path =
public", "UPDATE t_order SET id = 1");
- for (String each : mismatchedSQLs) {
- String actual = preparer.addIfNotExistsForCreateTableSQL(each);
- assertThat(actual, is(each));
- }
- }
-}
diff --git
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/prepare/datasource/MySQLDataSourcePrepareOption.java
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/prepare/datasource/MySQLDataSourcePrepareOption.java
new file mode 100644
index 00000000000..ed9a05a053c
--- /dev/null
+++
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/prepare/datasource/MySQLDataSourcePrepareOption.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.mysql.prepare.datasource;
+
+import
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.DataSourcePrepareOption;
+
+/**
+ * Data source prepare option for MySQL.
+ */
+public final class MySQLDataSourcePrepareOption implements
DataSourcePrepareOption {
+
+ @Override
+ public String getDatabaseType() {
+ return "MySQL";
+ }
+}
diff --git
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/prepare/datasource/MySQLDataSourcePreparer.java
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/prepare/datasource/MySQLDataSourcePreparer.java
deleted file mode 100644
index 044054b1311..00000000000
---
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/prepare/datasource/MySQLDataSourcePreparer.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.data.pipeline.mysql.prepare.datasource;
-
-import
org.apache.shardingsphere.data.pipeline.core.preparer.CreateTableConfiguration;
-import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
-import
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.AbstractDataSourcePreparer;
-import
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param.PrepareTargetTablesParameter;
-
-import java.sql.Connection;
-import java.sql.SQLException;
-
-/**
- * Data source preparer for MySQL.
- */
-public final class MySQLDataSourcePreparer extends AbstractDataSourcePreparer {
-
- @Override
- public void prepareTargetTables(final PrepareTargetTablesParameter param)
throws SQLException {
- PipelineDataSourceManager dataSourceManager =
param.getDataSourceManager();
- for (CreateTableConfiguration each :
param.getCreateTableConfigurations()) {
- String createTargetTableSQL = getCreateTargetTableSQL(each,
dataSourceManager, param.getSqlParserEngine());
- try (Connection targetConnection =
dataSourceManager.getDataSource(each.getTargetDataSourceConfig()).getConnection())
{
- executeTargetTableSQL(targetConnection,
addIfNotExistsForCreateTableSQL(createTargetTableSQL));
- }
- }
- }
-
- @Override
- public String getDatabaseType() {
- return "MySQL";
- }
-}
diff --git
a/kernel/data-pipeline/dialect/mysql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.preparer.datasource.DataSourcePreparer
b/kernel/data-pipeline/dialect/mysql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.preparer.datasource.DataSourcePrepareOption
similarity index 96%
rename from
kernel/data-pipeline/dialect/mysql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.preparer.datasource.DataSourcePreparer
rename to
kernel/data-pipeline/dialect/mysql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.preparer.datasource.DataSourcePrepareOption
index a94249837e7..980f6a50028 100644
---
a/kernel/data-pipeline/dialect/mysql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.preparer.datasource.DataSourcePreparer
+++
b/kernel/data-pipeline/dialect/mysql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.preparer.datasource.DataSourcePrepareOption
@@ -15,4 +15,4 @@
# limitations under the License.
#
-org.apache.shardingsphere.data.pipeline.mysql.prepare.datasource.MySQLDataSourcePreparer
+org.apache.shardingsphere.data.pipeline.mysql.prepare.datasource.MySQLDataSourcePrepareOption
diff --git
a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/prepare/datasource/OpenGaussDataSourcePrepareOption.java
b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/prepare/datasource/OpenGaussDataSourcePrepareOption.java
new file mode 100644
index 00000000000..846a041b4d2
--- /dev/null
+++
b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/prepare/datasource/OpenGaussDataSourcePrepareOption.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.opengauss.prepare.datasource;
+
+import lombok.extern.slf4j.Slf4j;
+import
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.DataSourcePrepareOption;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+/**
+ * Data source prepare option for openGauss.
+ */
+@Slf4j
+public final class OpenGaussDataSourcePrepareOption implements
DataSourcePrepareOption {
+
+ @Override
+ public boolean isSupportIfNotExistsOnCreateSchema() {
+ return false;
+ }
+
+ @Override
+ public Collection<String> getIgnoredExceptionMessages() {
+ return Arrays.asList("multiple primary keys for table", "already
exists");
+ }
+
+ @Override
+ public String getDatabaseType() {
+ return "openGauss";
+ }
+}
diff --git
a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/prepare/datasource/OpenGaussDataSourcePreparer.java
b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/prepare/datasource/OpenGaussDataSourcePreparer.java
deleted file mode 100644
index eb0b467d5a7..00000000000
---
a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/prepare/datasource/OpenGaussDataSourcePreparer.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.data.pipeline.opengauss.prepare.datasource;
-
-import com.google.common.base.Splitter;
-import lombok.extern.slf4j.Slf4j;
-import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
-import
org.apache.shardingsphere.data.pipeline.core.preparer.CreateTableConfiguration;
-import
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.AbstractDataSourcePreparer;
-import
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param.PrepareTargetTablesParameter;
-
-import java.sql.Connection;
-import java.sql.SQLException;
-
-/**
- * Data source preparer for openGauss.
- */
-@Slf4j
-public final class OpenGaussDataSourcePreparer extends
AbstractDataSourcePreparer {
-
- private static final String[] IGNORE_EXCEPTION_MESSAGE = {"multiple
primary keys for table", "already exists"};
-
- @Override
- public void prepareTargetTables(final PrepareTargetTablesParameter param)
throws SQLException {
- PipelineDataSourceManager dataSourceManager =
param.getDataSourceManager();
- for (CreateTableConfiguration each :
param.getCreateTableConfigurations()) {
- String createTargetTableSQL = getCreateTargetTableSQL(each,
dataSourceManager, param.getSqlParserEngine());
- try (Connection targetConnection =
dataSourceManager.getDataSource(each.getTargetDataSourceConfig()).getConnection())
{
- for (String sql :
Splitter.on(";").trimResults().omitEmptyStrings().splitToList(createTargetTableSQL))
{
- executeTargetTableSQL(targetConnection,
addIfNotExistsForCreateTableSQL(sql));
- }
- }
- }
- }
-
- @Override
- protected void executeTargetTableSQL(final Connection targetConnection,
final String sql) throws SQLException {
- try {
- super.executeTargetTableSQL(targetConnection, sql);
- } catch (final SQLException ex) {
- for (String ignoreMessage : IGNORE_EXCEPTION_MESSAGE) {
- if (ex.getMessage().contains(ignoreMessage)) {
- return;
- }
- }
- throw ex;
- }
- }
-
- @Override
- public boolean isSupportIfNotExistsOnCreateSchema() {
- return false;
- }
-
- @Override
- public String getDatabaseType() {
- return "openGauss";
- }
-}
diff --git
a/kernel/data-pipeline/dialect/opengauss/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.preparer.datasource.DataSourcePreparer
b/kernel/data-pipeline/dialect/opengauss/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.preparer.datasource.DataSourcePrepareOption
similarity index 95%
rename from
kernel/data-pipeline/dialect/opengauss/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.preparer.datasource.DataSourcePreparer
rename to
kernel/data-pipeline/dialect/opengauss/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.preparer.datasource.DataSourcePrepareOption
index 2436cc09377..72923bbe665 100644
---
a/kernel/data-pipeline/dialect/opengauss/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.preparer.datasource.DataSourcePreparer
+++
b/kernel/data-pipeline/dialect/opengauss/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.preparer.datasource.DataSourcePrepareOption
@@ -15,4 +15,4 @@
# limitations under the License.
#
-org.apache.shardingsphere.data.pipeline.opengauss.prepare.datasource.OpenGaussDataSourcePreparer
+org.apache.shardingsphere.data.pipeline.opengauss.prepare.datasource.OpenGaussDataSourcePrepareOption
diff --git
a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/prepare/datasource/PostgreSQLDataSourcePrepareOption.java
b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/prepare/datasource/PostgreSQLDataSourcePrepareOption.java
new file mode 100644
index 00000000000..7deed376013
--- /dev/null
+++
b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/prepare/datasource/PostgreSQLDataSourcePrepareOption.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.postgresql.prepare.datasource;
+
+import
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.DataSourcePrepareOption;
+
+/**
+ * Data source prepare option for PostgreSQL.
+ */
+public final class PostgreSQLDataSourcePrepareOption implements
DataSourcePrepareOption {
+
+ @Override
+ public String getDatabaseType() {
+ return "PostgreSQL";
+ }
+}
diff --git
a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/prepare/datasource/PostgreSQLDataSourcePreparer.java
b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/prepare/datasource/PostgreSQLDataSourcePreparer.java
deleted file mode 100644
index 735cf423b84..00000000000
---
a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/prepare/datasource/PostgreSQLDataSourcePreparer.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.data.pipeline.postgresql.prepare.datasource;
-
-import com.google.common.base.Splitter;
-import
org.apache.shardingsphere.data.pipeline.core.preparer.CreateTableConfiguration;
-import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
-import
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.AbstractDataSourcePreparer;
-import
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param.PrepareTargetTablesParameter;
-
-import java.sql.Connection;
-import java.sql.SQLException;
-
-/**
- * Data source preparer for PostgreSQL.
- */
-public final class PostgreSQLDataSourcePreparer extends
AbstractDataSourcePreparer {
-
- @Override
- public void prepareTargetTables(final PrepareTargetTablesParameter param)
throws SQLException {
- PipelineDataSourceManager dataSourceManager =
param.getDataSourceManager();
- for (CreateTableConfiguration each :
param.getCreateTableConfigurations()) {
- String createTargetTableSQL = getCreateTargetTableSQL(each,
dataSourceManager, param.getSqlParserEngine());
- try (Connection targetConnection =
dataSourceManager.getDataSource(each.getTargetDataSourceConfig()).getConnection())
{
- for (String sql :
Splitter.on(";").trimResults().omitEmptyStrings().splitToList(createTargetTableSQL))
{
- executeTargetTableSQL(targetConnection, sql);
- }
- }
- }
- }
-
- @Override
- public String getDatabaseType() {
- return "PostgreSQL";
- }
-}
diff --git
a/kernel/data-pipeline/dialect/postgresql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.preparer.datasource.DataSourcePreparer
b/kernel/data-pipeline/dialect/postgresql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.preparer.datasource.DataSourcePrepareOption
similarity index 95%
rename from
kernel/data-pipeline/dialect/postgresql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.preparer.datasource.DataSourcePreparer
rename to
kernel/data-pipeline/dialect/postgresql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.preparer.datasource.DataSourcePrepareOption
index 073139e15e2..d6f7f72a35e 100644
---
a/kernel/data-pipeline/dialect/postgresql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.preparer.datasource.DataSourcePreparer
+++
b/kernel/data-pipeline/dialect/postgresql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.preparer.datasource.DataSourcePrepareOption
@@ -15,4 +15,4 @@
# limitations under the License.
#
-org.apache.shardingsphere.data.pipeline.postgresql.prepare.datasource.PostgreSQLDataSourcePreparer
+org.apache.shardingsphere.data.pipeline.postgresql.prepare.datasource.PostgreSQLDataSourcePrepareOption
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/H2DataSourcePrepareOption.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/H2DataSourcePrepareOption.java
new file mode 100644
index 00000000000..7b1fc0f64bb
--- /dev/null
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/H2DataSourcePrepareOption.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.test.it.data.pipeline.core.fixture;
+
+import
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.DataSourcePrepareOption;
+
+/**
+ * Data source prepare option for H2.
+ */
+public final class H2DataSourcePrepareOption implements
DataSourcePrepareOption {
+
+ @Override
+ public String getDatabaseType() {
+ return "H2";
+ }
+}
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/H2DataSourcePreparer.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/H2DataSourcePreparer.java
deleted file mode 100644
index af256eed7fa..00000000000
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/H2DataSourcePreparer.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.test.it.data.pipeline.core.fixture;
-
-import
org.apache.shardingsphere.data.pipeline.core.preparer.CreateTableConfiguration;
-import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
-import
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.AbstractDataSourcePreparer;
-import
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param.PrepareTargetTablesParameter;
-
-import java.sql.Connection;
-import java.sql.SQLException;
-
-/**
- * Data source preparer for H2.
- */
-public final class H2DataSourcePreparer extends AbstractDataSourcePreparer {
-
- @Override
- public void prepareTargetTables(final PrepareTargetTablesParameter param)
throws SQLException {
- PipelineDataSourceManager dataSourceManager =
param.getDataSourceManager();
- for (CreateTableConfiguration each :
param.getCreateTableConfigurations()) {
- String createTargetTableSQL = getCreateTargetTableSQL(each,
dataSourceManager, param.getSqlParserEngine());
- try (Connection targetConnection =
dataSourceManager.getDataSource(each.getTargetDataSourceConfig()).getConnection())
{
- executeTargetTableSQL(targetConnection,
addIfNotExistsForCreateTableSQL(createTargetTableSQL));
- }
- }
- }
-
- @Override
- public String getDatabaseType() {
- return "H2";
- }
-}
diff --git
a/test/it/pipeline/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.preparer.datasource.DataSourcePreparer
b/test/it/pipeline/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.preparer.datasource.DataSourcePrepareOption
similarity index 97%
rename from
test/it/pipeline/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.preparer.datasource.DataSourcePreparer
rename to
test/it/pipeline/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.preparer.datasource.DataSourcePrepareOption
index d2cc59ec453..fb6848a83c3 100644
---
a/test/it/pipeline/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.preparer.datasource.DataSourcePreparer
+++
b/test/it/pipeline/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.preparer.datasource.DataSourcePrepareOption
@@ -15,4 +15,4 @@
# limitations under the License.
#
-org.apache.shardingsphere.test.it.data.pipeline.core.fixture.H2DataSourcePreparer
+org.apache.shardingsphere.test.it.data.pipeline.core.fixture.H2DataSourcePrepareOption