This is an automated email from the ASF dual-hosted git repository.

totalo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 9cc0210d55e Refactor DataSourcePreparer and DataSourcePrepareOption 
(#29423)
9cc0210d55e is described below

commit 9cc0210d55e4cbd6e4c581c8dc23f0d413889302
Author: Liang Zhang <[email protected]>
AuthorDate: Sun Dec 17 00:10:36 2023 +0800

    Refactor DataSourcePreparer and DataSourcePrepareOption (#29423)
    
    * Refactor AbstractDataSourcePreparer
    
    * Refactor DataSourcePrepareEngine
    
    * Refactor DataSourcePreparer and DataSourcePrepareOption
---
 .../core/preparer/PipelineJobPreparerUtils.java    |  23 +++--
 .../datasource/AbstractDataSourcePreparer.java     | 112 --------------------
 ...ePreparer.java => DataSourcePrepareOption.java} |  34 +++----
 .../preparer/datasource/DataSourcePreparer.java    | 113 ++++++++++++++++++---
 .../datasource/AbstractDataSourcePreparerTest.java |  77 --------------
 .../datasource/MySQLDataSourcePrepareOption.java   |  31 ++++++
 .../datasource/MySQLDataSourcePreparer.java        |  48 ---------
 ...re.preparer.datasource.DataSourcePrepareOption} |   2 +-
 .../OpenGaussDataSourcePrepareOption.java          |  46 +++++++++
 .../datasource/OpenGaussDataSourcePreparer.java    |  74 --------------
 ...re.preparer.datasource.DataSourcePrepareOption} |   2 +-
 .../PostgreSQLDataSourcePrepareOption.java         |  31 ++++++
 .../datasource/PostgreSQLDataSourcePreparer.java   |  51 ----------
 ...re.preparer.datasource.DataSourcePrepareOption} |   2 +-
 .../core/fixture/H2DataSourcePrepareOption.java    |  31 ++++++
 .../core/fixture/H2DataSourcePreparer.java         |  48 ---------
 ...re.preparer.datasource.DataSourcePrepareOption} |   2 +-
 17 files changed, 268 insertions(+), 459 deletions(-)

diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/PipelineJobPreparerUtils.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/PipelineJobPreparerUtils.java
index d17716d8517..ebb18af17eb 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/PipelineJobPreparerUtils.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/PipelineJobPreparerUtils.java
@@ -20,17 +20,18 @@ package 
org.apache.shardingsphere.data.pipeline.core.preparer;
 import lombok.AccessLevel;
 import lombok.NoArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
-import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.IncrementalDumperContext;
 import 
org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.type.ShardingSpherePipelineDataSourceConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.type.StandardPipelineDataSourceConfiguration;
-import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPosition;
-import 
org.apache.shardingsphere.data.pipeline.core.importer.ImporterConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
 import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceWrapper;
+import 
org.apache.shardingsphere.data.pipeline.core.importer.ImporterConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.IncrementalDumperContext;
+import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPosition;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.JobItemIncrementalTasksProgress;
 import 
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.DataSourceCheckEngine;
 import 
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.DataSourcePreparer;
+import 
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.DataSourcePrepareOption;
 import 
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param.PrepareTargetSchemasParameter;
 import 
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param.PrepareTargetTablesParameter;
 import 
org.apache.shardingsphere.data.pipeline.core.spi.ingest.dumper.IncrementalDumperCreator;
@@ -75,12 +76,12 @@ public final class PipelineJobPreparerUtils {
      * @throws SQLException if prepare target schema fail
      */
     public static void prepareTargetSchema(final DatabaseType databaseType, 
final PrepareTargetSchemasParameter prepareTargetSchemasParam) throws 
SQLException {
-        Optional<DataSourcePreparer> dataSourcePreparer = 
DatabaseTypedSPILoader.findService(DataSourcePreparer.class, databaseType);
-        if (!dataSourcePreparer.isPresent()) {
-            log.info("dataSourcePreparer null, ignore prepare target");
+        Optional<DataSourcePrepareOption> option = 
DatabaseTypedSPILoader.findService(DataSourcePrepareOption.class, databaseType);
+        if (!option.isPresent()) {
+            log.info("Data source preparer option null, ignore prepare 
target");
             return;
         }
-        
dataSourcePreparer.get().prepareTargetSchemas(prepareTargetSchemasParam);
+        new 
DataSourcePreparer(option.get()).prepareTargetSchemas(prepareTargetSchemasParam);
     }
     
     /**
@@ -104,13 +105,13 @@ public final class PipelineJobPreparerUtils {
      * @throws SQLException SQL exception
      */
     public static void prepareTargetTables(final DatabaseType databaseType, 
final PrepareTargetTablesParameter prepareTargetTablesParam) throws 
SQLException {
-        Optional<DataSourcePreparer> dataSourcePreparer = 
DatabaseTypedSPILoader.findService(DataSourcePreparer.class, databaseType);
-        if (!dataSourcePreparer.isPresent()) {
-            log.info("dataSourcePreparer null, ignore prepare target");
+        Optional<DataSourcePrepareOption> option = 
DatabaseTypedSPILoader.findService(DataSourcePrepareOption.class, databaseType);
+        if (!option.isPresent()) {
+            log.info("Data source preparer option null, ignore prepare 
target");
             return;
         }
         long startTimeMillis = System.currentTimeMillis();
-        dataSourcePreparer.get().prepareTargetTables(prepareTargetTablesParam);
+        new 
DataSourcePreparer(option.get()).prepareTargetTables(prepareTargetTablesParam);
         log.info("prepareTargetTables cost {} ms", System.currentTimeMillis() 
- startTimeMillis);
     }
     
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/AbstractDataSourcePreparer.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/AbstractDataSourcePreparer.java
deleted file mode 100644
index 9988ce6e3ca..00000000000
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/AbstractDataSourcePreparer.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.data.pipeline.core.preparer.datasource;
-
-import lombok.extern.slf4j.Slf4j;
-import 
org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration;
-import 
org.apache.shardingsphere.data.pipeline.core.preparer.CreateTableConfiguration;
-import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
-import 
org.apache.shardingsphere.data.pipeline.core.metadata.generator.PipelineDDLGenerator;
-import 
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param.PrepareTargetSchemasParameter;
-import 
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.PipelineCommonSQLBuilder;
-import 
org.apache.shardingsphere.infra.database.core.metadata.database.DialectDatabaseMetaData;
-import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
-import org.apache.shardingsphere.infra.database.core.type.DatabaseTypeRegistry;
-import org.apache.shardingsphere.infra.parser.SQLParserEngine;
-
-import javax.sql.DataSource;
-import java.sql.Connection;
-import java.sql.SQLException;
-import java.sql.Statement;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.Optional;
-import java.util.regex.Pattern;
-
-/**
- * Abstract data source preparer.
- */
-@Slf4j
-public abstract class AbstractDataSourcePreparer implements DataSourcePreparer 
{
-    
-    private static final Pattern PATTERN_CREATE_TABLE_IF_NOT_EXISTS = 
Pattern.compile("CREATE\\s+TABLE\\s+IF\\s+NOT\\s+EXISTS\\s+", 
Pattern.CASE_INSENSITIVE);
-    
-    private static final Pattern PATTERN_CREATE_TABLE = 
Pattern.compile("CREATE\\s+TABLE\\s+", Pattern.CASE_INSENSITIVE);
-    
-    @Override
-    public final void prepareTargetSchemas(final PrepareTargetSchemasParameter 
param) throws SQLException {
-        DatabaseType targetDatabaseType = param.getTargetDatabaseType();
-        DialectDatabaseMetaData dialectDatabaseMetaData = new 
DatabaseTypeRegistry(targetDatabaseType).getDialectDatabaseMetaData();
-        if (!dialectDatabaseMetaData.isSchemaAvailable()) {
-            return;
-        }
-        String defaultSchema = 
dialectDatabaseMetaData.getDefaultSchema().orElse(null);
-        PipelineCommonSQLBuilder pipelineSQLBuilder = new 
PipelineCommonSQLBuilder(targetDatabaseType);
-        Collection<String> createdSchemaNames = new HashSet<>();
-        for (CreateTableConfiguration each : 
param.getCreateTableConfigurations()) {
-            String targetSchemaName = 
each.getTargetName().getSchemaName().toString();
-            if (null == targetSchemaName || 
targetSchemaName.equalsIgnoreCase(defaultSchema) || 
createdSchemaNames.contains(targetSchemaName)) {
-                continue;
-            }
-            Optional<String> sql = 
pipelineSQLBuilder.buildCreateSchemaSQL(targetSchemaName);
-            if (sql.isPresent()) {
-                executeCreateSchema(param.getDataSourceManager(), 
each.getTargetDataSourceConfig(), sql.get());
-                createdSchemaNames.add(targetSchemaName);
-            }
-        }
-    }
-    
-    private void executeCreateSchema(final PipelineDataSourceManager 
dataSourceManager, final PipelineDataSourceConfiguration 
targetDataSourceConfig, final String sql) throws SQLException {
-        log.info("Prepare target schemas SQL: {}", sql);
-        try (
-                Connection connection = 
dataSourceManager.getDataSource(targetDataSourceConfig).getConnection();
-                Statement statement = connection.createStatement()) {
-            statement.execute(sql);
-        } catch (final SQLException ex) {
-            if (isSupportIfNotExistsOnCreateSchema()) {
-                throw ex;
-            }
-            log.warn("create schema failed", ex);
-        }
-    }
-    
-    protected void executeTargetTableSQL(final Connection targetConnection, 
final String sql) throws SQLException {
-        log.info("Execute target table SQL: {}", sql);
-        try (Statement statement = targetConnection.createStatement()) {
-            statement.execute(sql);
-        }
-    }
-    
-    protected final String addIfNotExistsForCreateTableSQL(final String 
createTableSQL) {
-        if (PATTERN_CREATE_TABLE_IF_NOT_EXISTS.matcher(createTableSQL).find()) 
{
-            return createTableSQL;
-        }
-        return 
PATTERN_CREATE_TABLE.matcher(createTableSQL).replaceFirst("CREATE TABLE IF NOT 
EXISTS ");
-    }
-    
-    protected final String getCreateTargetTableSQL(final 
CreateTableConfiguration createTableConfig, final PipelineDataSourceManager 
dataSourceManager,
-                                                   final SQLParserEngine 
sqlParserEngine) throws SQLException {
-        DatabaseType databaseType = 
createTableConfig.getSourceDataSourceConfig().getDatabaseType();
-        DataSource sourceDataSource = 
dataSourceManager.getDataSource(createTableConfig.getSourceDataSourceConfig());
-        String schemaName = 
createTableConfig.getSourceName().getSchemaName().toString();
-        String sourceTableName = 
createTableConfig.getSourceName().getTableName().toString();
-        String targetTableName = 
createTableConfig.getTargetName().getTableName().toString();
-        PipelineDDLGenerator generator = new PipelineDDLGenerator();
-        return generator.generateLogicDDL(databaseType, sourceDataSource, 
schemaName, sourceTableName, targetTableName, sqlParserEngine);
-    }
-}
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/DataSourcePreparer.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/DataSourcePrepareOption.java
similarity index 61%
copy from 
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/DataSourcePreparer.java
copy to 
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/DataSourcePrepareOption.java
index 3219b840767..52bc1962bcd 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/DataSourcePreparer.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/DataSourcePrepareOption.java
@@ -17,34 +17,17 @@
 
 package org.apache.shardingsphere.data.pipeline.core.preparer.datasource;
 
-import 
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param.PrepareTargetSchemasParameter;
-import 
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param.PrepareTargetTablesParameter;
 import org.apache.shardingsphere.infra.database.core.spi.DatabaseTypedSPI;
 import org.apache.shardingsphere.infra.spi.annotation.SingletonSPI;
 
-import java.sql.SQLException;
+import java.util.Collection;
+import java.util.Collections;
 
 /**
- * Data source preparer.
+ * Data source prepare option.
  */
 @SingletonSPI
-public interface DataSourcePreparer extends DatabaseTypedSPI {
-    
-    /**
-     * Prepare target schemas.
-     *
-     * @param param prepare target schemas parameter
-     * @throws SQLException if prepare target schema fail
-     */
-    void prepareTargetSchemas(PrepareTargetSchemasParameter param) throws 
SQLException;
-    
-    /**
-     * Prepare target tables.
-     *
-     * @param param prepare target tables parameter
-     * @throws SQLException SQL exception
-     */
-    void prepareTargetTables(PrepareTargetTablesParameter param) throws 
SQLException;
+public interface DataSourcePrepareOption extends DatabaseTypedSPI {
     
     /**
      * Is support if not exists on create schema SQL.
@@ -54,4 +37,13 @@ public interface DataSourcePreparer extends DatabaseTypedSPI 
{
     default boolean isSupportIfNotExistsOnCreateSchema() {
         return true;
     }
+    
+    /**
+     * Get ignored exception messages.
+     *
+     * @return ignored exception messages
+     */
+    default Collection<String> getIgnoredExceptionMessages() {
+        return Collections.emptyList();
+    }
 }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/DataSourcePreparer.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/DataSourcePreparer.java
index 3219b840767..d738edeba8e 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/DataSourcePreparer.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/DataSourcePreparer.java
@@ -17,18 +17,42 @@
 
 package org.apache.shardingsphere.data.pipeline.core.preparer.datasource;
 
+import com.google.common.base.Splitter;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import 
org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
+import 
org.apache.shardingsphere.data.pipeline.core.metadata.generator.PipelineDDLGenerator;
+import 
org.apache.shardingsphere.data.pipeline.core.preparer.CreateTableConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param.PrepareTargetSchemasParameter;
 import 
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param.PrepareTargetTablesParameter;
-import org.apache.shardingsphere.infra.database.core.spi.DatabaseTypedSPI;
-import org.apache.shardingsphere.infra.spi.annotation.SingletonSPI;
+import 
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.PipelineCommonSQLBuilder;
+import 
org.apache.shardingsphere.infra.database.core.metadata.database.DialectDatabaseMetaData;
+import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
+import org.apache.shardingsphere.infra.database.core.type.DatabaseTypeRegistry;
+import org.apache.shardingsphere.infra.parser.SQLParserEngine;
 
+import javax.sql.DataSource;
+import java.sql.Connection;
 import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Optional;
+import java.util.regex.Pattern;
 
 /**
  * Data source preparer.
  */
-@SingletonSPI
-public interface DataSourcePreparer extends DatabaseTypedSPI {
+@RequiredArgsConstructor
+@Slf4j
+public final class DataSourcePreparer {
+    
+    private static final Pattern PATTERN_CREATE_TABLE_IF_NOT_EXISTS = 
Pattern.compile("CREATE\\s+TABLE\\s+IF\\s+NOT\\s+EXISTS\\s+", 
Pattern.CASE_INSENSITIVE);
+    
+    private static final Pattern PATTERN_CREATE_TABLE = 
Pattern.compile("CREATE\\s+TABLE\\s+", Pattern.CASE_INSENSITIVE);
+    
+    private final DataSourcePrepareOption option;
     
     /**
      * Prepare target schemas.
@@ -36,7 +60,41 @@ public interface DataSourcePreparer extends DatabaseTypedSPI 
{
      * @param param prepare target schemas parameter
      * @throws SQLException if prepare target schema fail
      */
-    void prepareTargetSchemas(PrepareTargetSchemasParameter param) throws 
SQLException;
+    public void prepareTargetSchemas(final PrepareTargetSchemasParameter 
param) throws SQLException {
+        DatabaseType targetDatabaseType = param.getTargetDatabaseType();
+        DialectDatabaseMetaData dialectDatabaseMetaData = new 
DatabaseTypeRegistry(targetDatabaseType).getDialectDatabaseMetaData();
+        if (!dialectDatabaseMetaData.isSchemaAvailable()) {
+            return;
+        }
+        String defaultSchema = 
dialectDatabaseMetaData.getDefaultSchema().orElse(null);
+        PipelineCommonSQLBuilder pipelineSQLBuilder = new 
PipelineCommonSQLBuilder(targetDatabaseType);
+        Collection<String> createdSchemaNames = new HashSet<>();
+        for (CreateTableConfiguration each : 
param.getCreateTableConfigurations()) {
+            String targetSchemaName = 
each.getTargetName().getSchemaName().toString();
+            if (null == targetSchemaName || 
targetSchemaName.equalsIgnoreCase(defaultSchema) || 
createdSchemaNames.contains(targetSchemaName)) {
+                continue;
+            }
+            Optional<String> sql = 
pipelineSQLBuilder.buildCreateSchemaSQL(targetSchemaName);
+            if (sql.isPresent()) {
+                executeCreateSchema(param.getDataSourceManager(), 
each.getTargetDataSourceConfig(), sql.get());
+                createdSchemaNames.add(targetSchemaName);
+            }
+        }
+    }
+    
+    private void executeCreateSchema(final PipelineDataSourceManager 
dataSourceManager, final PipelineDataSourceConfiguration 
targetDataSourceConfig, final String sql) throws SQLException {
+        log.info("Prepare target schemas SQL: {}", sql);
+        try (
+                Connection connection = 
dataSourceManager.getDataSource(targetDataSourceConfig).getConnection();
+                Statement statement = connection.createStatement()) {
+            statement.execute(sql);
+        } catch (final SQLException ex) {
+            if (option.isSupportIfNotExistsOnCreateSchema()) {
+                throw ex;
+            }
+            log.warn("Create schema failed", ex);
+        }
+    }
     
     /**
      * Prepare target tables.
@@ -44,14 +102,43 @@ public interface DataSourcePreparer extends 
DatabaseTypedSPI {
      * @param param prepare target tables parameter
      * @throws SQLException SQL exception
      */
-    void prepareTargetTables(PrepareTargetTablesParameter param) throws 
SQLException;
+    public void prepareTargetTables(final PrepareTargetTablesParameter param) 
throws SQLException {
+        PipelineDataSourceManager dataSourceManager = 
param.getDataSourceManager();
+        for (CreateTableConfiguration each : 
param.getCreateTableConfigurations()) {
+            String createTargetTableSQL = getCreateTargetTableSQL(each, 
dataSourceManager, param.getSqlParserEngine());
+            try (Connection targetConnection = 
dataSourceManager.getDataSource(each.getTargetDataSourceConfig()).getConnection())
 {
+                for (String sql : 
Splitter.on(";").trimResults().omitEmptyStrings().splitToList(createTargetTableSQL))
 {
+                    executeTargetTableSQL(targetConnection, 
addIfNotExistsForCreateTableSQL(sql));
+                }
+            }
+        }
+    }
     
-    /**
-     * Is support if not exists on create schema SQL.
-     * 
-     * @return supported or not
-     */
-    default boolean isSupportIfNotExistsOnCreateSchema() {
-        return true;
+    private void executeTargetTableSQL(final Connection targetConnection, 
final String sql) throws SQLException {
+        log.info("Execute target table SQL: {}", sql);
+        try (Statement statement = targetConnection.createStatement()) {
+            statement.execute(sql);
+        } catch (final SQLException ex) {
+            for (String each : option.getIgnoredExceptionMessages()) {
+                if (ex.getMessage().contains(each)) {
+                    return;
+                }
+            }
+            throw ex;
+        }
+    }
+    
+    private String addIfNotExistsForCreateTableSQL(final String 
createTableSQL) {
+        return 
PATTERN_CREATE_TABLE_IF_NOT_EXISTS.matcher(createTableSQL).find() ? 
createTableSQL : 
PATTERN_CREATE_TABLE.matcher(createTableSQL).replaceFirst("CREATE TABLE IF NOT 
EXISTS ");
+    }
+    
+    private String getCreateTargetTableSQL(final CreateTableConfiguration 
createTableConfig,
+                                           final PipelineDataSourceManager 
dataSourceManager, final SQLParserEngine sqlParserEngine) throws SQLException {
+        DatabaseType databaseType = 
createTableConfig.getSourceDataSourceConfig().getDatabaseType();
+        DataSource sourceDataSource = 
dataSourceManager.getDataSource(createTableConfig.getSourceDataSourceConfig());
+        String schemaName = 
createTableConfig.getSourceName().getSchemaName().toString();
+        String sourceTableName = 
createTableConfig.getSourceName().getTableName().toString();
+        String targetTableName = 
createTableConfig.getTargetName().getTableName().toString();
+        return new PipelineDDLGenerator().generateLogicDDL(databaseType, 
sourceDataSource, schemaName, sourceTableName, targetTableName, 
sqlParserEngine);
     }
 }
diff --git 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/AbstractDataSourcePreparerTest.java
 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/AbstractDataSourcePreparerTest.java
deleted file mode 100644
index d736c7286cf..00000000000
--- 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/AbstractDataSourcePreparerTest.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.data.pipeline.core.preparer.datasource;
-
-import 
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param.PrepareTargetTablesParameter;
-import org.junit.jupiter.api.Test;
-
-import java.sql.Connection;
-import java.sql.SQLException;
-import java.sql.Statement;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.regex.Pattern;
-
-import static org.hamcrest.CoreMatchers.is;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-class AbstractDataSourcePreparerTest {
-    
-    private static final Pattern PATTERN_CREATE_TABLE_IF_NOT_EXISTS = 
Pattern.compile("CREATE\\s+TABLE\\s+IF\\s+NOT\\s+EXISTS\\s+", 
Pattern.CASE_INSENSITIVE);
-    
-    private final AbstractDataSourcePreparer preparer = new 
AbstractDataSourcePreparer() {
-        
-        @Override
-        public void prepareTargetTables(final PrepareTargetTablesParameter 
param) {
-        }
-        
-        @Override
-        public String getDatabaseType() {
-            return "FIXTURE";
-        }
-    };
-    
-    @Test
-    void assertExecuteTargetTableSQL() throws SQLException {
-        Statement statement = mock(Statement.class);
-        Connection targetConnection = mock(Connection.class);
-        when(targetConnection.createStatement()).thenReturn(statement);
-        String sql = "CREATE TABLE t (id int)";
-        preparer.executeTargetTableSQL(targetConnection, sql);
-        verify(statement).execute(sql);
-    }
-    
-    @Test
-    void assertAddIfNotExistsForCreateTableSQL() {
-        Collection<String> createTableSQLs = Arrays.asList("CREATE TABLE IF 
NOT EXISTS t (id int)", "CREATE TABLE t (id int)",
-                "CREATE  TABLE IF \nNOT \tEXISTS t (id int)", "CREATE \tTABLE 
t (id int)", "CREATE TABLE \tt_order (id bigint) WITH (orientation=row, 
compression=no);");
-        for (String each : createTableSQLs) {
-            String actual = preparer.addIfNotExistsForCreateTableSQL(each);
-            
assertTrue(PATTERN_CREATE_TABLE_IF_NOT_EXISTS.matcher(actual).find());
-        }
-        Collection<String> mismatchedSQLs = Arrays.asList("SET search_path = 
public", "UPDATE t_order SET id = 1");
-        for (String each : mismatchedSQLs) {
-            String actual = preparer.addIfNotExistsForCreateTableSQL(each);
-            assertThat(actual, is(each));
-        }
-    }
-}
diff --git 
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/prepare/datasource/MySQLDataSourcePrepareOption.java
 
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/prepare/datasource/MySQLDataSourcePrepareOption.java
new file mode 100644
index 00000000000..ed9a05a053c
--- /dev/null
+++ 
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/prepare/datasource/MySQLDataSourcePrepareOption.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.mysql.prepare.datasource;
+
+import 
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.DataSourcePrepareOption;
+
+/**
+ * Data source prepare option for MySQL.
+ */
+public final class MySQLDataSourcePrepareOption implements 
DataSourcePrepareOption {
+    
+    @Override
+    public String getDatabaseType() {
+        return "MySQL";
+    }
+}
diff --git 
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/prepare/datasource/MySQLDataSourcePreparer.java
 
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/prepare/datasource/MySQLDataSourcePreparer.java
deleted file mode 100644
index 044054b1311..00000000000
--- 
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/prepare/datasource/MySQLDataSourcePreparer.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.data.pipeline.mysql.prepare.datasource;
-
-import 
org.apache.shardingsphere.data.pipeline.core.preparer.CreateTableConfiguration;
-import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
-import 
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.AbstractDataSourcePreparer;
-import 
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param.PrepareTargetTablesParameter;
-
-import java.sql.Connection;
-import java.sql.SQLException;
-
-/**
- * Data source preparer for MySQL.
- */
-public final class MySQLDataSourcePreparer extends AbstractDataSourcePreparer {
-    
-    @Override
-    public void prepareTargetTables(final PrepareTargetTablesParameter param) 
throws SQLException {
-        PipelineDataSourceManager dataSourceManager = 
param.getDataSourceManager();
-        for (CreateTableConfiguration each : 
param.getCreateTableConfigurations()) {
-            String createTargetTableSQL = getCreateTargetTableSQL(each, 
dataSourceManager, param.getSqlParserEngine());
-            try (Connection targetConnection = 
dataSourceManager.getDataSource(each.getTargetDataSourceConfig()).getConnection())
 {
-                executeTargetTableSQL(targetConnection, 
addIfNotExistsForCreateTableSQL(createTargetTableSQL));
-            }
-        }
-    }
-    
-    @Override
-    public String getDatabaseType() {
-        return "MySQL";
-    }
-}
diff --git 
a/kernel/data-pipeline/dialect/mysql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.preparer.datasource.DataSourcePreparer
 
b/kernel/data-pipeline/dialect/mysql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.preparer.datasource.DataSourcePrepareOption
similarity index 96%
rename from 
kernel/data-pipeline/dialect/mysql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.preparer.datasource.DataSourcePreparer
rename to 
kernel/data-pipeline/dialect/mysql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.preparer.datasource.DataSourcePrepareOption
index a94249837e7..980f6a50028 100644
--- 
a/kernel/data-pipeline/dialect/mysql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.preparer.datasource.DataSourcePreparer
+++ 
b/kernel/data-pipeline/dialect/mysql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.preparer.datasource.DataSourcePrepareOption
@@ -15,4 +15,4 @@
 # limitations under the License.
 #
 
-org.apache.shardingsphere.data.pipeline.mysql.prepare.datasource.MySQLDataSourcePreparer
+org.apache.shardingsphere.data.pipeline.mysql.prepare.datasource.MySQLDataSourcePrepareOption
diff --git 
a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/prepare/datasource/OpenGaussDataSourcePrepareOption.java
 
b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/prepare/datasource/OpenGaussDataSourcePrepareOption.java
new file mode 100644
index 00000000000..846a041b4d2
--- /dev/null
+++ 
b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/prepare/datasource/OpenGaussDataSourcePrepareOption.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.opengauss.prepare.datasource;
+
+import lombok.extern.slf4j.Slf4j;
+import 
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.DataSourcePrepareOption;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+/**
+ * Data source prepare option for openGauss.
+ */
+@Slf4j
+public final class OpenGaussDataSourcePrepareOption implements 
DataSourcePrepareOption {
+    
+    @Override
+    public boolean isSupportIfNotExistsOnCreateSchema() {
+        return false;
+    }
+    
+    @Override
+    public Collection<String> getIgnoredExceptionMessages() {
+        return Arrays.asList("multiple primary keys for table", "already 
exists");
+    }
+    
+    @Override
+    public String getDatabaseType() {
+        return "openGauss";
+    }
+}
diff --git 
a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/prepare/datasource/OpenGaussDataSourcePreparer.java
 
b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/prepare/datasource/OpenGaussDataSourcePreparer.java
deleted file mode 100644
index eb0b467d5a7..00000000000
--- 
a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/prepare/datasource/OpenGaussDataSourcePreparer.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.data.pipeline.opengauss.prepare.datasource;
-
-import com.google.common.base.Splitter;
-import lombok.extern.slf4j.Slf4j;
-import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
-import 
org.apache.shardingsphere.data.pipeline.core.preparer.CreateTableConfiguration;
-import 
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.AbstractDataSourcePreparer;
-import 
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param.PrepareTargetTablesParameter;
-
-import java.sql.Connection;
-import java.sql.SQLException;
-
-/**
- * Data source preparer for openGauss.
- */
-@Slf4j
-public final class OpenGaussDataSourcePreparer extends 
AbstractDataSourcePreparer {
-    
-    private static final String[] IGNORE_EXCEPTION_MESSAGE = {"multiple 
primary keys for table", "already exists"};
-    
-    @Override
-    public void prepareTargetTables(final PrepareTargetTablesParameter param) 
throws SQLException {
-        PipelineDataSourceManager dataSourceManager = 
param.getDataSourceManager();
-        for (CreateTableConfiguration each : 
param.getCreateTableConfigurations()) {
-            String createTargetTableSQL = getCreateTargetTableSQL(each, 
dataSourceManager, param.getSqlParserEngine());
-            try (Connection targetConnection = 
dataSourceManager.getDataSource(each.getTargetDataSourceConfig()).getConnection())
 {
-                for (String sql : 
Splitter.on(";").trimResults().omitEmptyStrings().splitToList(createTargetTableSQL))
 {
-                    executeTargetTableSQL(targetConnection, 
addIfNotExistsForCreateTableSQL(sql));
-                }
-            }
-        }
-    }
-    
-    @Override
-    protected void executeTargetTableSQL(final Connection targetConnection, 
final String sql) throws SQLException {
-        try {
-            super.executeTargetTableSQL(targetConnection, sql);
-        } catch (final SQLException ex) {
-            for (String ignoreMessage : IGNORE_EXCEPTION_MESSAGE) {
-                if (ex.getMessage().contains(ignoreMessage)) {
-                    return;
-                }
-            }
-            throw ex;
-        }
-    }
-    
-    @Override
-    public boolean isSupportIfNotExistsOnCreateSchema() {
-        return false;
-    }
-    
-    @Override
-    public String getDatabaseType() {
-        return "openGauss";
-    }
-}
diff --git 
a/kernel/data-pipeline/dialect/opengauss/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.preparer.datasource.DataSourcePreparer
 
b/kernel/data-pipeline/dialect/opengauss/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.preparer.datasource.DataSourcePrepareOption
similarity index 95%
rename from 
kernel/data-pipeline/dialect/opengauss/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.preparer.datasource.DataSourcePreparer
rename to 
kernel/data-pipeline/dialect/opengauss/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.preparer.datasource.DataSourcePrepareOption
index 2436cc09377..72923bbe665 100644
--- 
a/kernel/data-pipeline/dialect/opengauss/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.preparer.datasource.DataSourcePreparer
+++ 
b/kernel/data-pipeline/dialect/opengauss/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.preparer.datasource.DataSourcePrepareOption
@@ -15,4 +15,4 @@
 # limitations under the License.
 #
 
-org.apache.shardingsphere.data.pipeline.opengauss.prepare.datasource.OpenGaussDataSourcePreparer
+org.apache.shardingsphere.data.pipeline.opengauss.prepare.datasource.OpenGaussDataSourcePrepareOption
diff --git 
a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/prepare/datasource/PostgreSQLDataSourcePrepareOption.java
 
b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/prepare/datasource/PostgreSQLDataSourcePrepareOption.java
new file mode 100644
index 00000000000..7deed376013
--- /dev/null
+++ 
b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/prepare/datasource/PostgreSQLDataSourcePrepareOption.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.postgresql.prepare.datasource;
+
+import 
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.DataSourcePrepareOption;
+
+/**
+ * Data source prepare option for PostgreSQL.
+ */
+public final class PostgreSQLDataSourcePrepareOption implements 
DataSourcePrepareOption {
+    
+    @Override
+    public String getDatabaseType() {
+        return "PostgreSQL";
+    }
+}
diff --git 
a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/prepare/datasource/PostgreSQLDataSourcePreparer.java
 
b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/prepare/datasource/PostgreSQLDataSourcePreparer.java
deleted file mode 100644
index 735cf423b84..00000000000
--- 
a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/prepare/datasource/PostgreSQLDataSourcePreparer.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.data.pipeline.postgresql.prepare.datasource;
-
-import com.google.common.base.Splitter;
-import 
org.apache.shardingsphere.data.pipeline.core.preparer.CreateTableConfiguration;
-import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
-import 
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.AbstractDataSourcePreparer;
-import 
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param.PrepareTargetTablesParameter;
-
-import java.sql.Connection;
-import java.sql.SQLException;
-
-/**
- * Data source preparer for PostgreSQL.
- */
-public final class PostgreSQLDataSourcePreparer extends 
AbstractDataSourcePreparer {
-    
-    @Override
-    public void prepareTargetTables(final PrepareTargetTablesParameter param) 
throws SQLException {
-        PipelineDataSourceManager dataSourceManager = 
param.getDataSourceManager();
-        for (CreateTableConfiguration each : 
param.getCreateTableConfigurations()) {
-            String createTargetTableSQL = getCreateTargetTableSQL(each, 
dataSourceManager, param.getSqlParserEngine());
-            try (Connection targetConnection = 
dataSourceManager.getDataSource(each.getTargetDataSourceConfig()).getConnection())
 {
-                for (String sql : 
Splitter.on(";").trimResults().omitEmptyStrings().splitToList(createTargetTableSQL))
 {
-                    executeTargetTableSQL(targetConnection, sql);
-                }
-            }
-        }
-    }
-    
-    @Override
-    public String getDatabaseType() {
-        return "PostgreSQL";
-    }
-}
diff --git 
a/kernel/data-pipeline/dialect/postgresql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.preparer.datasource.DataSourcePreparer
 
b/kernel/data-pipeline/dialect/postgresql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.preparer.datasource.DataSourcePrepareOption
similarity index 95%
rename from 
kernel/data-pipeline/dialect/postgresql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.preparer.datasource.DataSourcePreparer
rename to 
kernel/data-pipeline/dialect/postgresql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.preparer.datasource.DataSourcePrepareOption
index 073139e15e2..d6f7f72a35e 100644
--- 
a/kernel/data-pipeline/dialect/postgresql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.preparer.datasource.DataSourcePreparer
+++ 
b/kernel/data-pipeline/dialect/postgresql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.preparer.datasource.DataSourcePrepareOption
@@ -15,4 +15,4 @@
 # limitations under the License.
 #
 
-org.apache.shardingsphere.data.pipeline.postgresql.prepare.datasource.PostgreSQLDataSourcePreparer
+org.apache.shardingsphere.data.pipeline.postgresql.prepare.datasource.PostgreSQLDataSourcePrepareOption
diff --git 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/H2DataSourcePrepareOption.java
 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/H2DataSourcePrepareOption.java
new file mode 100644
index 00000000000..7b1fc0f64bb
--- /dev/null
+++ 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/H2DataSourcePrepareOption.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.test.it.data.pipeline.core.fixture;
+
+import 
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.DataSourcePrepareOption;
+
+/**
+ * Data source prepare option for H2.
+ */
+public final class H2DataSourcePrepareOption implements 
DataSourcePrepareOption {
+    
+    @Override
+    public String getDatabaseType() {
+        return "H2";
+    }
+}
diff --git 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/H2DataSourcePreparer.java
 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/H2DataSourcePreparer.java
deleted file mode 100644
index af256eed7fa..00000000000
--- 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/H2DataSourcePreparer.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.test.it.data.pipeline.core.fixture;
-
-import 
org.apache.shardingsphere.data.pipeline.core.preparer.CreateTableConfiguration;
-import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
-import 
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.AbstractDataSourcePreparer;
-import 
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param.PrepareTargetTablesParameter;
-
-import java.sql.Connection;
-import java.sql.SQLException;
-
-/**
- * Data source preparer for H2.
- */
-public final class H2DataSourcePreparer extends AbstractDataSourcePreparer {
-    
-    @Override
-    public void prepareTargetTables(final PrepareTargetTablesParameter param) 
throws SQLException {
-        PipelineDataSourceManager dataSourceManager = 
param.getDataSourceManager();
-        for (CreateTableConfiguration each : 
param.getCreateTableConfigurations()) {
-            String createTargetTableSQL = getCreateTargetTableSQL(each, 
dataSourceManager, param.getSqlParserEngine());
-            try (Connection targetConnection = 
dataSourceManager.getDataSource(each.getTargetDataSourceConfig()).getConnection())
 {
-                executeTargetTableSQL(targetConnection, 
addIfNotExistsForCreateTableSQL(createTargetTableSQL));
-            }
-        }
-    }
-    
-    @Override
-    public String getDatabaseType() {
-        return "H2";
-    }
-}
diff --git 
a/test/it/pipeline/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.preparer.datasource.DataSourcePreparer
 
b/test/it/pipeline/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.preparer.datasource.DataSourcePrepareOption
similarity index 97%
rename from 
test/it/pipeline/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.preparer.datasource.DataSourcePreparer
rename to 
test/it/pipeline/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.preparer.datasource.DataSourcePrepareOption
index d2cc59ec453..fb6848a83c3 100644
--- 
a/test/it/pipeline/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.preparer.datasource.DataSourcePreparer
+++ 
b/test/it/pipeline/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.preparer.datasource.DataSourcePrepareOption
@@ -15,4 +15,4 @@
 # limitations under the License.
 #
 
-org.apache.shardingsphere.test.it.data.pipeline.core.fixture.H2DataSourcePreparer
+org.apache.shardingsphere.test.it.data.pipeline.core.fixture.H2DataSourcePrepareOption

Reply via email to