This is an automated email from the ASF dual-hosted git repository.

duanzhengqiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new aac47159d92 Refactor sqlbuilder package (#29481)
aac47159d92 is described below

commit aac47159d9296cf9c251ada84d2881b4f6f2f817
Author: Liang Zhang <[email protected]>
AuthorDate: Thu Dec 21 12:00:52 2023 +0800

    Refactor sqlbuilder package (#29481)
    
    * Rename DialectPipelineSQLBuilder.buildCheckEmptyTableSQL
    
    * Move DialectPipelineSQLBuilder
    
    * Refactor sql package
    
    * Merge DialectPipelineSQLBuilder and CreateTableSQLGenerator
    
    * Rename PipelinePrepareSQLBuilder
    
    * Refactor sqlbuilder package
    
    * Fix test case
---
 .../type/TestcontainersDatabaseTypeTest.java       | 12 +--
 .../core/checker/DataSourceCheckEngine.java        |  8 +-
 .../CRC32SingleTableInventoryCalculator.java       |  2 +-
 .../RecordSingleTableInventoryCalculator.java      |  2 +-
 .../core/importer/sink/PipelineDataSourceSink.java |  2 +-
 .../core/ingest/dumper/InventoryDumper.java        |  2 +-
 .../metadata/generator/PipelineDDLGenerator.java   |  4 +-
 .../datasource/PipelineJobDataSourcePreparer.java  |  4 +-
 .../inventory/InventoryRecordsCountCalculator.java |  4 +-
 .../preparer/inventory/InventoryTaskSplitter.java  |  4 +-
 .../core/spi/sql/CreateTableSQLGenerator.java      | 43 -----------
 .../dialect}/DialectPipelineSQLBuilder.java        | 20 ++++-
 .../{ => segment}/PipelineSQLSegmentBuilder.java   |  2 +-
 ...PipelineDataConsistencyCalculateSQLBuilder.java |  5 +-
 .../{ => sql}/PipelineImportSQLBuilder.java        |  5 +-
 .../{ => sql}/PipelineInventoryDumpSQLBuilder.java |  3 +-
 .../PipelinePrepareSQLBuilder.java}                | 17 ++--
 ...lineDataConsistencyCalculateSQLBuilderTest.java |  1 +
 .../sqlbuilder/PipelineImportSQLBuilderTest.java   |  1 +
 .../PipelineInventoryDumpSQLBuilderTest.java       |  1 +
 .../sqlbuilder/PipelineSQLSegmentBuilderTest.java  |  1 +
 .../fixture/FixturePipelineSQLBuilder.java         | 12 ++-
 ...e.sqlbuilder.dialect.DialectPipelineSQLBuilder} |  0
 .../ddlgenerator/MySQLCreateTableSQLGenerator.java | 57 --------------
 .../mysql/sqlbuilder/MySQLPipelineSQLBuilder.java  | 27 ++++++-
 ...a.pipeline.core.spi.sql.CreateTableSQLGenerator | 18 -----
 ...e.sqlbuilder.dialect.DialectPipelineSQLBuilder} |  0
 .../OpenGaussCreateTableSQLGenerator.java          | 60 ---------------
 .../sqlbuilder/OpenGaussPipelineSQLBuilder.java    | 28 ++++++-
 ...a.pipeline.core.spi.sql.CreateTableSQLGenerator | 18 -----
 ...e.sqlbuilder.dialect.DialectPipelineSQLBuilder} |  0
 .../PostgreSQLCreateTableSQLGenerator.java         | 90 ----------------------
 .../sqlbuilder/PostgreSQLPipelineSQLBuilder.java   | 66 +++++++++++++++-
 ...a.pipeline.core.spi.sql.CreateTableSQLGenerator | 18 -----
 ...e.sqlbuilder.dialect.DialectPipelineSQLBuilder} |  0
 .../pipeline/cdc/core/prepare/CDCJobPreparer.java  |  4 +-
 .../scenario/migration/api/MigrationJobAPI.java    |  4 +-
 .../createtable/CreateTableSQLGeneratorIT.java     |  6 +-
 .../fixture/h2/sql/H2CreateTableSQLGenerator.java  | 45 -----------
 .../h2/sqlbuilder/H2PipelineSQLBuilder.java        | 17 +++-
 ...a.pipeline.core.spi.sql.CreateTableSQLGenerator | 18 -----
 ...e.sqlbuilder.dialect.DialectPipelineSQLBuilder} |  0
 42 files changed, 205 insertions(+), 426 deletions(-)

diff --git 
a/infra/database/type/testcontainers/src/test/java/org/apache/shardingsphere/infra/database/testcontainers/type/TestcontainersDatabaseTypeTest.java
 
b/infra/database/type/testcontainers/src/test/java/org/apache/shardingsphere/infra/database/testcontainers/type/TestcontainersDatabaseTypeTest.java
index 91dcaac3307..6229c141e4b 100644
--- 
a/infra/database/type/testcontainers/src/test/java/org/apache/shardingsphere/infra/database/testcontainers/type/TestcontainersDatabaseTypeTest.java
+++ 
b/infra/database/type/testcontainers/src/test/java/org/apache/shardingsphere/infra/database/testcontainers/type/TestcontainersDatabaseTypeTest.java
@@ -30,11 +30,11 @@ class TestcontainersDatabaseTypeTest {
     
     @Test
     void assertGetJdbcUrlPrefixes() {
-        assertThat(TypedSPILoader.getService(DatabaseType.class, 
"TestContainersClickHouse").getJdbcUrlPrefixes(), 
is(Collections.singletonList("jdbc:tc:clickhouse:")));
-        assertThat(TypedSPILoader.getService(DatabaseType.class, 
"TestContainersMariaDB").getJdbcUrlPrefixes(), 
is(Collections.singletonList("jdbc:tc:mariadb:")));
-        assertThat(TypedSPILoader.getService(DatabaseType.class, 
"TestContainersMySQL").getJdbcUrlPrefixes(), 
is(Collections.singletonList("jdbc:tc:mysql:")));
-        assertThat(TypedSPILoader.getService(DatabaseType.class, 
"TestContainersOracle").getJdbcUrlPrefixes(), 
is(Collections.singletonList("jdbc:tc:oracle:")));
-        assertThat(TypedSPILoader.getService(DatabaseType.class, 
"TestContainersPostgreSQL").getJdbcUrlPrefixes(), 
is(Collections.singletonList("jdbc:tc:postgresql:")));
-        assertThat(TypedSPILoader.getService(DatabaseType.class, 
"TestContainersSQLServer").getJdbcUrlPrefixes(), 
is(Collections.singletonList("jdbc:tc:sqlserver:")));
+        assertThat(TypedSPILoader.getService(DatabaseType.class, 
"TC-ClickHouse").getJdbcUrlPrefixes(), 
is(Collections.singleton("jdbc:tc:clickhouse:")));
+        assertThat(TypedSPILoader.getService(DatabaseType.class, 
"TC-MariaDB").getJdbcUrlPrefixes(), 
is(Collections.singleton("jdbc:tc:mariadb:")));
+        assertThat(TypedSPILoader.getService(DatabaseType.class, 
"TC-MySQL").getJdbcUrlPrefixes(), is(Collections.singleton("jdbc:tc:mysql:")));
+        assertThat(TypedSPILoader.getService(DatabaseType.class, 
"TC-Oracle").getJdbcUrlPrefixes(), 
is(Collections.singleton("jdbc:tc:oracle:")));
+        assertThat(TypedSPILoader.getService(DatabaseType.class, 
"TC-PostgreSQL").getJdbcUrlPrefixes(), 
is(Collections.singleton("jdbc:tc:postgresql:")));
+        assertThat(TypedSPILoader.getService(DatabaseType.class, 
"TC-SQLServer").getJdbcUrlPrefixes(), 
is(Collections.singleton("jdbc:tc:sqlserver:")));
     }
 }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/checker/DataSourceCheckEngine.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/checker/DataSourceCheckEngine.java
index 001a278fef2..6e5a82e8f0e 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/checker/DataSourceCheckEngine.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/checker/DataSourceCheckEngine.java
@@ -21,7 +21,7 @@ import 
org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWith
 import 
org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithTargetTableNotEmptyException;
 import 
org.apache.shardingsphere.data.pipeline.core.importer.ImporterConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.CaseInsensitiveQualifiedTable;
-import 
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.PipelineCommonSQLBuilder;
+import 
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.sql.PipelinePrepareSQLBuilder;
 import 
org.apache.shardingsphere.infra.database.core.spi.DatabaseTypedSPILoader;
 import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
 import 
org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
@@ -40,11 +40,11 @@ public final class DataSourceCheckEngine {
     
     private final DialectDataSourceChecker checker;
     
-    private final PipelineCommonSQLBuilder sqlBuilder;
+    private final PipelinePrepareSQLBuilder sqlBuilder;
     
     public DataSourceCheckEngine(final DatabaseType databaseType) {
         checker = 
DatabaseTypedSPILoader.findService(DialectDataSourceChecker.class, 
databaseType).orElse(null);
-        sqlBuilder = new PipelineCommonSQLBuilder(databaseType);
+        sqlBuilder = new PipelinePrepareSQLBuilder(databaseType);
     }
     
     /**
@@ -109,7 +109,7 @@ public final class DataSourceCheckEngine {
      * @throws SQLException if there's database operation failure
      */
     public boolean checkEmptyTable(final DataSource dataSource, final 
CaseInsensitiveQualifiedTable qualifiedTable) throws SQLException {
-        String sql = 
sqlBuilder.buildCheckEmptySQL(qualifiedTable.getSchemaName().toString(), 
qualifiedTable.getTableName().toString());
+        String sql = 
sqlBuilder.buildCheckEmptyTableSQL(qualifiedTable.getSchemaName().toString(), 
qualifiedTable.getTableName().toString());
         try (
                 Connection connection = dataSource.getConnection();
                 PreparedStatement preparedStatement = 
connection.prepareStatement(sql);
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/CRC32SingleTableInventoryCalculator.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/CRC32SingleTableInventoryCalculator.java
index def1d4c6c94..14effc106d1 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/CRC32SingleTableInventoryCalculator.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/CRC32SingleTableInventoryCalculator.java
@@ -20,7 +20,7 @@ package 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.calc
 import lombok.Getter;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
-import 
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.PipelineDataConsistencyCalculateSQLBuilder;
+import 
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.sql.PipelineDataConsistencyCalculateSQLBuilder;
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.SingleTableInventoryCalculatedResult;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.data.PipelineTableDataConsistencyCheckLoadingFailedException;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.data.UnsupportedCRC32SingleTableInventoryCalculatorException;
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/RecordSingleTableInventoryCalculator.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/RecordSingleTableInventoryCalculator.java
index c6c561ebec6..2b1c2f24f76 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/RecordSingleTableInventoryCalculator.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/RecordSingleTableInventoryCalculator.java
@@ -20,7 +20,7 @@ package 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.calc
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import 
org.apache.shardingsphere.data.pipeline.core.query.JDBCStreamQueryBuilder;
-import 
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.PipelineDataConsistencyCalculateSQLBuilder;
+import 
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.sql.PipelineDataConsistencyCalculateSQLBuilder;
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.RecordSingleTableInventoryCalculatedResult;
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.SingleTableInventoryCalculatedResult;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.ColumnValueReaderEngine;
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/PipelineDataSourceSink.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/PipelineDataSourceSink.java
index 011187eab9d..a45828ba96e 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/PipelineDataSourceSink.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/PipelineDataSourceSink.java
@@ -31,7 +31,7 @@ import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourc
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.IngestDataChangeType;
 import org.apache.shardingsphere.data.pipeline.core.ingest.record.RecordUtils;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.listener.PipelineJobProgressUpdatedParameter;
-import 
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.PipelineImportSQLBuilder;
+import 
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.sql.PipelineImportSQLBuilder;
 import org.apache.shardingsphere.data.pipeline.core.util.PipelineJdbcUtils;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineImporterJobWriteException;
 import org.apache.shardingsphere.data.pipeline.core.importer.DataRecordMerger;
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/InventoryDumper.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/InventoryDumper.java
index ac0b876db8d..e6e0cc17c2d 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/InventoryDumper.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/InventoryDumper.java
@@ -42,7 +42,7 @@ import 
org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineColum
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineTableMetaData;
 import 
org.apache.shardingsphere.data.pipeline.core.query.JDBCStreamQueryBuilder;
 import 
org.apache.shardingsphere.data.pipeline.core.spi.algorithm.JobRateLimitAlgorithm;
-import 
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.PipelineInventoryDumpSQLBuilder;
+import 
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.sql.PipelineInventoryDumpSQLBuilder;
 import org.apache.shardingsphere.data.pipeline.core.util.PipelineJdbcUtils;
 import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
 import org.apache.shardingsphere.infra.database.mysql.type.MySQLDatabaseType;
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/generator/PipelineDDLGenerator.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/generator/PipelineDDLGenerator.java
index b6a24d9e0fd..82656a3f608 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/generator/PipelineDDLGenerator.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/generator/PipelineDDLGenerator.java
@@ -19,7 +19,7 @@ package 
org.apache.shardingsphere.data.pipeline.core.metadata.generator;
 
 import com.google.common.base.Strings;
 import lombok.extern.slf4j.Slf4j;
-import 
org.apache.shardingsphere.data.pipeline.core.spi.sql.CreateTableSQLGenerator;
+import 
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.dialect.DialectPipelineSQLBuilder;
 import 
org.apache.shardingsphere.infra.binder.context.statement.SQLStatementContext;
 import 
org.apache.shardingsphere.infra.binder.context.statement.ddl.AlterTableStatementContext;
 import 
org.apache.shardingsphere.infra.binder.context.statement.ddl.CommentStatementContext;
@@ -76,7 +76,7 @@ public final class PipelineDDLGenerator {
                                    final String schemaName, final String 
sourceTableName, final String targetTableName, final SQLParserEngine 
parserEngine) throws SQLException {
         long startTimeMillis = System.currentTimeMillis();
         StringBuilder result = new StringBuilder();
-        for (String each : 
DatabaseTypedSPILoader.getService(CreateTableSQLGenerator.class, 
databaseType).generate(sourceDataSource, schemaName, sourceTableName)) {
+        for (String each : 
DatabaseTypedSPILoader.getService(DialectPipelineSQLBuilder.class, 
databaseType).buildCreateTableSQLs(sourceDataSource, schemaName, 
sourceTableName)) {
             Optional<String> queryContext = decorate(databaseType, 
sourceDataSource, schemaName, targetTableName, parserEngine, each);
             queryContext.ifPresent(optional -> 
result.append(optional).append(DELIMITER).append(System.lineSeparator()));
         }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/PipelineJobDataSourcePreparer.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/PipelineJobDataSourcePreparer.java
index 3f59a1f3f7d..b62427ccfea 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/PipelineJobDataSourcePreparer.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/PipelineJobDataSourcePreparer.java
@@ -27,7 +27,7 @@ import 
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.option.D
 import 
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param.CreateTableConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param.PrepareTargetSchemasParameter;
 import 
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param.PrepareTargetTablesParameter;
-import 
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.PipelineCommonSQLBuilder;
+import 
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.sql.PipelinePrepareSQLBuilder;
 import 
org.apache.shardingsphere.infra.database.core.metadata.database.DialectDatabaseMetaData;
 import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
 import org.apache.shardingsphere.infra.database.core.type.DatabaseTypeRegistry;
@@ -68,7 +68,7 @@ public final class PipelineJobDataSourcePreparer {
             return;
         }
         String defaultSchema = 
dialectDatabaseMetaData.getDefaultSchema().orElse(null);
-        PipelineCommonSQLBuilder pipelineSQLBuilder = new 
PipelineCommonSQLBuilder(targetDatabaseType);
+        PipelinePrepareSQLBuilder pipelineSQLBuilder = new 
PipelinePrepareSQLBuilder(targetDatabaseType);
         Collection<String> createdSchemaNames = new HashSet<>();
         for (CreateTableConfiguration each : 
param.getCreateTableConfigurations()) {
             String targetSchemaName = 
each.getTargetName().getSchemaName().toString();
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/InventoryRecordsCountCalculator.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/InventoryRecordsCountCalculator.java
index 786a5f8ff62..db13ef1e697 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/InventoryRecordsCountCalculator.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/InventoryRecordsCountCalculator.java
@@ -22,7 +22,7 @@ import lombok.NoArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.InventoryDumperContext;
 import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceWrapper;
-import 
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.PipelineCommonSQLBuilder;
+import 
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.sql.PipelinePrepareSQLBuilder;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.job.SplitPipelineJobByUniqueKeyException;
 import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
 import org.apache.shardingsphere.infra.database.mysql.type.MySQLDatabaseType;
@@ -53,7 +53,7 @@ public final class InventoryRecordsCountCalculator {
     public static long getTableRecordsCount(final InventoryDumperContext 
dumperContext, final PipelineDataSourceWrapper dataSource) {
         String schemaName = 
dumperContext.getCommonContext().getTableAndSchemaNameMapper().getSchemaName(dumperContext.getLogicTableName());
         String actualTableName = dumperContext.getActualTableName();
-        PipelineCommonSQLBuilder pipelineSQLBuilder = new 
PipelineCommonSQLBuilder(dataSource.getDatabaseType());
+        PipelinePrepareSQLBuilder pipelineSQLBuilder = new 
PipelinePrepareSQLBuilder(dataSource.getDatabaseType());
         Optional<String> sql = 
pipelineSQLBuilder.buildEstimatedCountSQL(schemaName, actualTableName);
         try {
             if (sql.isPresent()) {
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/InventoryTaskSplitter.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/InventoryTaskSplitter.java
index 95d9c9e4eaa..7e84b31e8dd 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/InventoryTaskSplitter.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/InventoryTaskSplitter.java
@@ -36,7 +36,7 @@ import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.pk.type.Stri
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.pk.type.UnsupportedKeyPosition;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.TransmissionJobItemProgress;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataUtils;
-import 
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.PipelineCommonSQLBuilder;
+import 
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.sql.PipelinePrepareSQLBuilder;
 import 
org.apache.shardingsphere.data.pipeline.core.util.IntervalToRangeIterator;
 import org.apache.shardingsphere.data.pipeline.core.util.PipelineJdbcUtils;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.InventoryDumper;
@@ -202,7 +202,7 @@ public final class InventoryTaskSplitter {
     
     private Range<Long> getUniqueKeyValuesRange(final 
TransmissionJobItemContext jobItemContext, final DataSource dataSource, final 
InventoryDumperContext dumperContext) {
         String uniqueKey = 
dumperContext.getUniqueKeyColumns().get(0).getName();
-        PipelineCommonSQLBuilder pipelineSQLBuilder = new 
PipelineCommonSQLBuilder(jobItemContext.getJobConfig().getSourceDatabaseType());
+        PipelinePrepareSQLBuilder pipelineSQLBuilder = new 
PipelinePrepareSQLBuilder(jobItemContext.getJobConfig().getSourceDatabaseType());
         String sql = pipelineSQLBuilder.buildUniqueKeyMinMaxValuesSQL(
                 
dumperContext.getCommonContext().getTableAndSchemaNameMapper().getSchemaName(dumperContext.getLogicTableName()),
 dumperContext.getActualTableName(), uniqueKey);
         try (
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/sql/CreateTableSQLGenerator.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/sql/CreateTableSQLGenerator.java
deleted file mode 100644
index 1ab890861a3..00000000000
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/sql/CreateTableSQLGenerator.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.data.pipeline.core.spi.sql;
-
-import org.apache.shardingsphere.infra.database.core.spi.DatabaseTypedSPI;
-import org.apache.shardingsphere.infra.spi.annotation.SingletonSPI;
-
-import javax.sql.DataSource;
-import java.sql.SQLException;
-import java.util.Collection;
-
-/**
- * Create table SQL generator.
- */
-@SingletonSPI
-public interface CreateTableSQLGenerator extends DatabaseTypedSPI {
-    
-    /**
-    * Generate create table SQLs.
-    * 
-    * @param dataSource dataSource
-    * @param schemaName schema name
-    * @param tableName table name
-    * @return generated SQLs
-    * @throws SQLException SQL exception
-    */
-    Collection<String> generate(DataSource dataSource, String schemaName, 
String tableName) throws SQLException;
-}
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/sql/DialectPipelineSQLBuilder.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/dialect/DialectPipelineSQLBuilder.java
similarity index 78%
rename from 
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/sql/DialectPipelineSQLBuilder.java
rename to 
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/dialect/DialectPipelineSQLBuilder.java
index b1f1afedec1..57897e8908d 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/sql/DialectPipelineSQLBuilder.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/dialect/DialectPipelineSQLBuilder.java
@@ -15,11 +15,14 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.core.spi.sql;
+package org.apache.shardingsphere.data.pipeline.core.sqlbuilder.dialect;
 
 import org.apache.shardingsphere.data.pipeline.core.ingest.record.DataRecord;
 import org.apache.shardingsphere.infra.database.core.spi.DatabaseTypedSPI;
 
+import javax.sql.DataSource;
+import java.sql.SQLException;
+import java.util.Collection;
 import java.util.Optional;
 
 /**
@@ -48,12 +51,12 @@ public interface DialectPipelineSQLBuilder extends 
DatabaseTypedSPI {
     }
     
     /**
-     * Build check empty SQL.
+     * Build check empty table SQL.
      *
      * @param qualifiedTableName qualified table name
      * @return built SQL
      */
-    String buildCheckEmptySQL(String qualifiedTableName);
+    String buildCheckEmptyTableSQL(String qualifiedTableName);
     
     /**
      * Build estimated count SQL.
@@ -75,4 +78,15 @@ public interface DialectPipelineSQLBuilder extends 
DatabaseTypedSPI {
     default Optional<String> buildCRC32SQL(String qualifiedTableName, final 
String columnName) {
         return Optional.empty();
     }
+    
+    /**
+     * Build create table SQLs.
+     *
+     * @param dataSource dataSource
+     * @param schemaName schema name
+     * @param tableName table name
+     * @return built SQLs
+     * @throws SQLException SQL exception
+     */
+    Collection<String> buildCreateTableSQLs(DataSource dataSource, String 
schemaName, String tableName) throws SQLException;
 }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/PipelineSQLSegmentBuilder.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/segment/PipelineSQLSegmentBuilder.java
similarity index 97%
rename from 
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/PipelineSQLSegmentBuilder.java
rename to 
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/segment/PipelineSQLSegmentBuilder.java
index 82be6b81951..8668fc48b13 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/PipelineSQLSegmentBuilder.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/segment/PipelineSQLSegmentBuilder.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.core.sqlbuilder;
+package org.apache.shardingsphere.data.pipeline.core.sqlbuilder.segment;
 
 import com.google.common.base.Strings;
 import 
org.apache.shardingsphere.infra.database.core.metadata.database.DialectDatabaseMetaData;
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/PipelineDataConsistencyCalculateSQLBuilder.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/sql/PipelineDataConsistencyCalculateSQLBuilder.java
similarity index 92%
rename from 
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/PipelineDataConsistencyCalculateSQLBuilder.java
rename to 
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/sql/PipelineDataConsistencyCalculateSQLBuilder.java
index 1b37ef04114..c3a6f313fca 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/PipelineDataConsistencyCalculateSQLBuilder.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/sql/PipelineDataConsistencyCalculateSQLBuilder.java
@@ -15,9 +15,10 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.core.sqlbuilder;
+package org.apache.shardingsphere.data.pipeline.core.sqlbuilder.sql;
 
-import 
org.apache.shardingsphere.data.pipeline.core.spi.sql.DialectPipelineSQLBuilder;
+import 
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.dialect.DialectPipelineSQLBuilder;
+import 
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.segment.PipelineSQLSegmentBuilder;
 import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
 import 
org.apache.shardingsphere.infra.database.core.spi.DatabaseTypedSPILoader;
 
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/PipelineImportSQLBuilder.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/sql/PipelineImportSQLBuilder.java
similarity index 95%
rename from 
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/PipelineImportSQLBuilder.java
rename to 
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/sql/PipelineImportSQLBuilder.java
index 341cc6d7af5..735d6d266d3 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/PipelineImportSQLBuilder.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/sql/PipelineImportSQLBuilder.java
@@ -15,13 +15,14 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.core.sqlbuilder;
+package org.apache.shardingsphere.data.pipeline.core.sqlbuilder.sql;
 
 import com.github.benmanes.caffeine.cache.Cache;
 import com.github.benmanes.caffeine.cache.Caffeine;
 import org.apache.shardingsphere.data.pipeline.core.ingest.record.Column;
 import org.apache.shardingsphere.data.pipeline.core.ingest.record.DataRecord;
-import 
org.apache.shardingsphere.data.pipeline.core.spi.sql.DialectPipelineSQLBuilder;
+import 
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.dialect.DialectPipelineSQLBuilder;
+import 
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.segment.PipelineSQLSegmentBuilder;
 import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
 import 
org.apache.shardingsphere.infra.database.core.spi.DatabaseTypedSPILoader;
 
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/PipelineInventoryDumpSQLBuilder.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/sql/PipelineInventoryDumpSQLBuilder.java
similarity index 96%
rename from 
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/PipelineInventoryDumpSQLBuilder.java
rename to 
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/sql/PipelineInventoryDumpSQLBuilder.java
index e3d9d6db4fb..c53793fd917 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/PipelineInventoryDumpSQLBuilder.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/sql/PipelineInventoryDumpSQLBuilder.java
@@ -15,8 +15,9 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.core.sqlbuilder;
+package org.apache.shardingsphere.data.pipeline.core.sqlbuilder.sql;
 
+import 
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.segment.PipelineSQLSegmentBuilder;
 import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
 
 import java.util.Collection;
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/PipelineCommonSQLBuilder.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/sql/PipelinePrepareSQLBuilder.java
similarity index 83%
rename from 
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/PipelineCommonSQLBuilder.java
rename to 
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/sql/PipelinePrepareSQLBuilder.java
index 25de1b1ab0a..6218a482e51 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/PipelineCommonSQLBuilder.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/sql/PipelinePrepareSQLBuilder.java
@@ -15,24 +15,25 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.core.sqlbuilder;
+package org.apache.shardingsphere.data.pipeline.core.sqlbuilder.sql;
 
-import 
org.apache.shardingsphere.data.pipeline.core.spi.sql.DialectPipelineSQLBuilder;
+import 
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.dialect.DialectPipelineSQLBuilder;
+import 
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.segment.PipelineSQLSegmentBuilder;
 import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
 import 
org.apache.shardingsphere.infra.database.core.spi.DatabaseTypedSPILoader;
 
 import java.util.Optional;
 
 /**
- * Pipeline common SQL builder.
+ * Pipeline prepare SQL builder.
  */
-public final class PipelineCommonSQLBuilder {
+public final class PipelinePrepareSQLBuilder {
     
     private final DialectPipelineSQLBuilder dialectSQLBuilder;
     
     private final PipelineSQLSegmentBuilder sqlSegmentBuilder;
     
-    public PipelineCommonSQLBuilder(final DatabaseType databaseType) {
+    public PipelinePrepareSQLBuilder(final DatabaseType databaseType) {
         dialectSQLBuilder = 
DatabaseTypedSPILoader.getService(DialectPipelineSQLBuilder.class, 
databaseType);
         sqlSegmentBuilder = new PipelineSQLSegmentBuilder(databaseType);
     }
@@ -94,13 +95,13 @@ public final class PipelineCommonSQLBuilder {
     }
     
     /**
-     * Build check empty SQL.
+     * Build check empty table SQL.
      *
      * @param schemaName schema name
      * @param tableName table name
      * @return check SQL
      */
-    public String buildCheckEmptySQL(final String schemaName, final String 
tableName) {
-        return 
dialectSQLBuilder.buildCheckEmptySQL(sqlSegmentBuilder.getQualifiedTableName(schemaName,
 tableName));
+    public String buildCheckEmptyTableSQL(final String schemaName, final 
String tableName) {
+        return 
dialectSQLBuilder.buildCheckEmptyTableSQL(sqlSegmentBuilder.getQualifiedTableName(schemaName,
 tableName));
     }
 }
diff --git 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/PipelineDataConsistencyCalculateSQLBuilderTest.java
 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/PipelineDataConsistencyCalculateSQLBuilderTest.java
index 98addaafc98..6a28e997970 100644
--- 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/PipelineDataConsistencyCalculateSQLBuilderTest.java
+++ 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/PipelineDataConsistencyCalculateSQLBuilderTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.shardingsphere.data.pipeline.core.sqlbuilder;
 
+import 
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.sql.PipelineDataConsistencyCalculateSQLBuilder;
 import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
 import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
 import org.junit.jupiter.api.Test;
diff --git 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/PipelineImportSQLBuilderTest.java
 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/PipelineImportSQLBuilderTest.java
index ca7c8668403..66282a8dfa5 100644
--- 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/PipelineImportSQLBuilderTest.java
+++ 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/PipelineImportSQLBuilderTest.java
@@ -22,6 +22,7 @@ import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.PlaceholderP
 import org.apache.shardingsphere.data.pipeline.core.ingest.record.Column;
 import org.apache.shardingsphere.data.pipeline.core.ingest.record.DataRecord;
 import org.apache.shardingsphere.data.pipeline.core.ingest.record.RecordUtils;
+import 
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.sql.PipelineImportSQLBuilder;
 import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
 import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
 import org.junit.jupiter.api.Test;
diff --git 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/PipelineInventoryDumpSQLBuilderTest.java
 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/PipelineInventoryDumpSQLBuilderTest.java
index d50563842b9..f765003f5fc 100644
--- 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/PipelineInventoryDumpSQLBuilderTest.java
+++ 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/PipelineInventoryDumpSQLBuilderTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.shardingsphere.data.pipeline.core.sqlbuilder;
 
+import 
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.sql.PipelineInventoryDumpSQLBuilder;
 import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
 import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
 import org.junit.jupiter.api.Test;
diff --git 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/PipelineSQLSegmentBuilderTest.java
 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/PipelineSQLSegmentBuilderTest.java
index 869244db9c7..0390dff9213 100644
--- 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/PipelineSQLSegmentBuilderTest.java
+++ 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/PipelineSQLSegmentBuilderTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.shardingsphere.data.pipeline.core.sqlbuilder;
 
+import 
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.segment.PipelineSQLSegmentBuilder;
 import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
 import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
 import org.junit.jupiter.api.Test;
diff --git 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/fixture/FixturePipelineSQLBuilder.java
 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/fixture/FixturePipelineSQLBuilder.java
index 8c749701502..66087b5f63d 100644
--- 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/fixture/FixturePipelineSQLBuilder.java
+++ 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/fixture/FixturePipelineSQLBuilder.java
@@ -17,14 +17,17 @@
 
 package org.apache.shardingsphere.data.pipeline.core.sqlbuilder.fixture;
 
-import 
org.apache.shardingsphere.data.pipeline.core.spi.sql.DialectPipelineSQLBuilder;
+import 
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.dialect.DialectPipelineSQLBuilder;
 
+import javax.sql.DataSource;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.Optional;
 
 public final class FixturePipelineSQLBuilder implements 
DialectPipelineSQLBuilder {
     
     @Override
-    public String buildCheckEmptySQL(final String qualifiedTableName) {
+    public String buildCheckEmptyTableSQL(final String qualifiedTableName) {
         return String.format("SELECT * FROM %s LIMIT 1", qualifiedTableName);
     }
     
@@ -33,6 +36,11 @@ public final class FixturePipelineSQLBuilder implements 
DialectPipelineSQLBuilde
         return Optional.of(String.format("SELECT CRC32(%s) FROM %s", 
columnName, qualifiedTableName));
     }
     
+    @Override
+    public Collection<String> buildCreateTableSQLs(final DataSource 
dataSource, final String schemaName, final String tableName) {
+        return Collections.emptyList();
+    }
+    
     @Override
     public String getDatabaseType() {
         return "FIXTURE";
diff --git 
a/kernel/data-pipeline/core/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.spi.sql.DialectPipelineSQLBuilder
 
b/kernel/data-pipeline/core/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.sqlbuilder.dialect.DialectPipelineSQLBuilder
similarity index 100%
rename from 
kernel/data-pipeline/core/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.spi.sql.DialectPipelineSQLBuilder
rename to 
kernel/data-pipeline/core/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.sqlbuilder.dialect.DialectPipelineSQLBuilder
diff --git 
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ddlgenerator/MySQLCreateTableSQLGenerator.java
 
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ddlgenerator/MySQLCreateTableSQLGenerator.java
deleted file mode 100644
index 15bd028e803..00000000000
--- 
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ddlgenerator/MySQLCreateTableSQLGenerator.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.data.pipeline.mysql.ddlgenerator;
-
-import 
org.apache.shardingsphere.data.pipeline.core.exception.syntax.CreateTableSQLGenerateException;
-import 
org.apache.shardingsphere.data.pipeline.core.spi.sql.CreateTableSQLGenerator;
-
-import javax.sql.DataSource;
-import java.sql.Connection;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.sql.Statement;
-import java.util.Collection;
-import java.util.Collections;
-
-/**
-* Create table SQL generator for MySQL.
- */
-public final class MySQLCreateTableSQLGenerator implements 
CreateTableSQLGenerator {
-    
-    private static final String SHOW_CREATE_SQL = "SHOW CREATE TABLE %s";
-    
-    private static final String COLUMN_LABEL = "create table";
-    
-    @Override
-    public Collection<String> generate(final DataSource dataSource, final 
String schemaName, final String tableName) throws SQLException {
-        try (
-                Connection connection = dataSource.getConnection();
-                Statement statement = connection.createStatement();
-                ResultSet resultSet = 
statement.executeQuery(String.format(SHOW_CREATE_SQL, tableName))) {
-            if (resultSet.next()) {
-                return 
Collections.singletonList(resultSet.getString(COLUMN_LABEL));
-            }
-        }
-        throw new CreateTableSQLGenerateException(tableName);
-    }
-    
-    @Override
-    public String getDatabaseType() {
-        return "MySQL";
-    }
-}
diff --git 
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/sqlbuilder/MySQLPipelineSQLBuilder.java
 
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/sqlbuilder/MySQLPipelineSQLBuilder.java
index a71dda64e09..cd4c69bdd6e 100644
--- 
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/sqlbuilder/MySQLPipelineSQLBuilder.java
+++ 
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/sqlbuilder/MySQLPipelineSQLBuilder.java
@@ -17,11 +17,19 @@
 
 package org.apache.shardingsphere.data.pipeline.mysql.sqlbuilder;
 
+import 
org.apache.shardingsphere.data.pipeline.core.exception.syntax.CreateTableSQLGenerateException;
 import org.apache.shardingsphere.data.pipeline.core.ingest.record.Column;
 import org.apache.shardingsphere.data.pipeline.core.ingest.record.DataRecord;
-import 
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.PipelineSQLSegmentBuilder;
-import 
org.apache.shardingsphere.data.pipeline.core.spi.sql.DialectPipelineSQLBuilder;
+import 
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.segment.PipelineSQLSegmentBuilder;
+import 
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.dialect.DialectPipelineSQLBuilder;
 
+import javax.sql.DataSource;
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.Optional;
 
 /**
@@ -49,7 +57,7 @@ public final class MySQLPipelineSQLBuilder implements 
DialectPipelineSQLBuilder
     }
     
     @Override
-    public String buildCheckEmptySQL(final String qualifiedTableName) {
+    public String buildCheckEmptyTableSQL(final String qualifiedTableName) {
         return String.format("SELECT * FROM %s LIMIT 1", qualifiedTableName);
     }
     
@@ -63,6 +71,19 @@ public final class MySQLPipelineSQLBuilder implements 
DialectPipelineSQLBuilder
         return Optional.of(String.format("SELECT BIT_XOR(CAST(CRC32(%s) AS 
UNSIGNED)) AS checksum, COUNT(1) AS cnt FROM %s", columnName, 
qualifiedTableName));
     }
     
+    @Override
+    public Collection<String> buildCreateTableSQLs(final DataSource 
dataSource, final String schemaName, final String tableName) throws 
SQLException {
+        try (
+                Connection connection = dataSource.getConnection();
+                Statement statement = connection.createStatement();
+                ResultSet resultSet = 
statement.executeQuery(String.format("SHOW CREATE TABLE %s", tableName))) {
+            if (resultSet.next()) {
+                return Collections.singleton(resultSet.getString("create 
table"));
+            }
+        }
+        throw new CreateTableSQLGenerateException(tableName);
+    }
+    
     @Override
     public String getDatabaseType() {
         return "MySQL";
diff --git 
a/kernel/data-pipeline/dialect/mysql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.spi.sql.CreateTableSQLGenerator
 
b/kernel/data-pipeline/dialect/mysql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.spi.sql.CreateTableSQLGenerator
deleted file mode 100644
index 4d8d8e80a06..00000000000
--- 
a/kernel/data-pipeline/dialect/mysql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.spi.sql.CreateTableSQLGenerator
+++ /dev/null
@@ -1,18 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-org.apache.shardingsphere.data.pipeline.mysql.ddlgenerator.MySQLCreateTableSQLGenerator
diff --git 
a/kernel/data-pipeline/dialect/mysql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.spi.sql.DialectPipelineSQLBuilder
 
b/kernel/data-pipeline/dialect/mysql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.sqlbuilder.dialect.DialectPipelineSQLBuilder
similarity index 100%
rename from 
kernel/data-pipeline/dialect/mysql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.spi.sql.DialectPipelineSQLBuilder
rename to 
kernel/data-pipeline/dialect/mysql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.sqlbuilder.dialect.DialectPipelineSQLBuilder
diff --git 
a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ddlgenerator/OpenGaussCreateTableSQLGenerator.java
 
b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ddlgenerator/OpenGaussCreateTableSQLGenerator.java
deleted file mode 100644
index f98369ecf90..00000000000
--- 
a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ddlgenerator/OpenGaussCreateTableSQLGenerator.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.data.pipeline.opengauss.ddlgenerator;
-
-import 
org.apache.shardingsphere.data.pipeline.core.exception.syntax.CreateTableSQLGenerateException;
-import 
org.apache.shardingsphere.data.pipeline.core.spi.sql.CreateTableSQLGenerator;
-
-import javax.sql.DataSource;
-import java.sql.Connection;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.sql.Statement;
-import java.util.Arrays;
-import java.util.Collection;
-
-/**
-* Create table SQL generator for openGauss.
- */
-public final class OpenGaussCreateTableSQLGenerator implements 
CreateTableSQLGenerator {
-    
-    private static final String SELECT_TABLE_DEF_SQL = "SELECT * FROM 
pg_get_tabledef('%s.%s')";
-    
-    private static final String COLUMN_LABEL = "pg_get_tabledef";
-    
-    private static final String DELIMITER = ";";
-    
-    @Override
-    public Collection<String> generate(final DataSource dataSource, final 
String schemaName, final String tableName) throws SQLException {
-        try (
-                Connection connection = dataSource.getConnection();
-                Statement statement = connection.createStatement();
-                ResultSet resultSet = 
statement.executeQuery(String.format(SELECT_TABLE_DEF_SQL, schemaName, 
tableName))) {
-            if (resultSet.next()) {
-                // TODO use ";" to split is not always correct
-                return 
Arrays.asList(resultSet.getString(COLUMN_LABEL).split(DELIMITER));
-            }
-        }
-        throw new CreateTableSQLGenerateException(tableName);
-    }
-    
-    @Override
-    public String getDatabaseType() {
-        return "openGauss";
-    }
-}
diff --git 
a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/sqlbuilder/OpenGaussPipelineSQLBuilder.java
 
b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/sqlbuilder/OpenGaussPipelineSQLBuilder.java
index a35310c2d49..9d191b093d9 100644
--- 
a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/sqlbuilder/OpenGaussPipelineSQLBuilder.java
+++ 
b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/sqlbuilder/OpenGaussPipelineSQLBuilder.java
@@ -17,10 +17,18 @@
 
 package org.apache.shardingsphere.data.pipeline.opengauss.sqlbuilder;
 
+import 
org.apache.shardingsphere.data.pipeline.core.exception.syntax.CreateTableSQLGenerateException;
 import org.apache.shardingsphere.data.pipeline.core.ingest.record.DataRecord;
-import 
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.PipelineSQLSegmentBuilder;
-import 
org.apache.shardingsphere.data.pipeline.core.spi.sql.DialectPipelineSQLBuilder;
+import 
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.segment.PipelineSQLSegmentBuilder;
+import 
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.dialect.DialectPipelineSQLBuilder;
 
+import javax.sql.DataSource;
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Arrays;
+import java.util.Collection;
 import java.util.Optional;
 import java.util.stream.Collectors;
 
@@ -45,7 +53,7 @@ public final class OpenGaussPipelineSQLBuilder implements 
DialectPipelineSQLBuil
     }
     
     @Override
-    public String buildCheckEmptySQL(final String qualifiedTableName) {
+    public String buildCheckEmptyTableSQL(final String qualifiedTableName) {
         return String.format("SELECT * FROM %s LIMIT 1", qualifiedTableName);
     }
     
@@ -54,6 +62,20 @@ public final class OpenGaussPipelineSQLBuilder implements 
DialectPipelineSQLBuil
         return Optional.of(String.format("SELECT reltuples::integer FROM 
pg_class WHERE oid='%s'::regclass::oid;", qualifiedTableName));
     }
     
+    @Override
+    public Collection<String> buildCreateTableSQLs(final DataSource 
dataSource, final String schemaName, final String tableName) throws 
SQLException {
+        try (
+                Connection connection = dataSource.getConnection();
+                Statement statement = connection.createStatement();
+                ResultSet resultSet = 
statement.executeQuery(String.format("SELECT * FROM pg_get_tabledef('%s.%s')", 
schemaName, tableName))) {
+            if (resultSet.next()) {
+                // TODO use ";" to split is not always correct
+                return 
Arrays.asList(resultSet.getString("pg_get_tabledef").split(";"));
+            }
+        }
+        throw new CreateTableSQLGenerateException(tableName);
+    }
+    
     @Override
     public String getDatabaseType() {
         return "openGauss";
diff --git 
a/kernel/data-pipeline/dialect/opengauss/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.spi.sql.CreateTableSQLGenerator
 
b/kernel/data-pipeline/dialect/opengauss/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.spi.sql.CreateTableSQLGenerator
deleted file mode 100644
index a57974eebae..00000000000
--- 
a/kernel/data-pipeline/dialect/opengauss/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.spi.sql.CreateTableSQLGenerator
+++ /dev/null
@@ -1,18 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-org.apache.shardingsphere.data.pipeline.opengauss.ddlgenerator.OpenGaussCreateTableSQLGenerator
diff --git 
a/kernel/data-pipeline/dialect/opengauss/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.spi.sql.DialectPipelineSQLBuilder
 
b/kernel/data-pipeline/dialect/opengauss/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.sqlbuilder.dialect.DialectPipelineSQLBuilder
similarity index 100%
rename from 
kernel/data-pipeline/dialect/opengauss/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.spi.sql.DialectPipelineSQLBuilder
rename to 
kernel/data-pipeline/dialect/opengauss/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.sqlbuilder.dialect.DialectPipelineSQLBuilder
diff --git 
a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ddlgenerator/PostgreSQLCreateTableSQLGenerator.java
 
b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ddlgenerator/PostgreSQLCreateTableSQLGenerator.java
deleted file mode 100644
index 79991a3a3ec..00000000000
--- 
a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ddlgenerator/PostgreSQLCreateTableSQLGenerator.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.data.pipeline.postgresql.ddlgenerator;
-
-import 
org.apache.shardingsphere.data.pipeline.postgresql.util.PostgreSQLPipelineFreemarkerManager;
-import 
org.apache.shardingsphere.data.pipeline.core.spi.sql.CreateTableSQLGenerator;
-
-import javax.sql.DataSource;
-import java.sql.Connection;
-import java.sql.SQLException;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Map;
-
-/**
- * Create table SQL generator for PostgreSQL.
- */
-public final class PostgreSQLCreateTableSQLGenerator implements 
CreateTableSQLGenerator {
-    
-    private static final String DELIMITER = ";";
-    
-    // TODO support partitions etc.
-    @Override
-    public Collection<String> generate(final DataSource dataSource, final 
String schemaName, final String tableName) throws SQLException {
-        try (Connection connection = dataSource.getConnection()) {
-            int majorVersion = 
connection.getMetaData().getDatabaseMajorVersion();
-            int minorVersion = 
connection.getMetaData().getDatabaseMinorVersion();
-            Map<String, Object> materials = loadMaterials(tableName, 
schemaName, connection, majorVersion, minorVersion);
-            String tableSQL = generateCreateTableSQL(majorVersion, 
minorVersion, materials);
-            String indexSQL = generateCreateIndexSQL(connection, majorVersion, 
minorVersion, materials);
-            // TODO use ";" to split is not always correct
-            return Arrays.asList((tableSQL + System.lineSeparator() + 
indexSQL).trim().split(DELIMITER));
-        }
-    }
-    
-    private Map<String, Object> loadMaterials(final String tableName, final 
String schemaName, final Connection connection, final int majorVersion, final 
int minorVersion) throws SQLException {
-        Map<String, Object> result = new 
PostgreSQLTablePropertiesLoader(connection, tableName, schemaName, 
majorVersion, minorVersion).load();
-        new PostgreSQLColumnPropertiesAppender(connection, majorVersion, 
minorVersion).append(result);
-        new PostgreSQLConstraintsPropertiesAppender(connection, majorVersion, 
minorVersion).append(result);
-        formatColumns(result);
-        return result;
-    }
-    
-    private String generateCreateTableSQL(final int majorVersion, final int 
minorVersion, final Map<String, Object> materials) {
-        return PostgreSQLPipelineFreemarkerManager.getSQLByVersion(materials, 
"component/table/%s/create.ftl", majorVersion, minorVersion).trim();
-    }
-    
-    private String generateCreateIndexSQL(final Connection connection, final 
int majorVersion, final int minorVersion, final Map<String, Object> materials) 
throws SQLException {
-        return new PostgreSQLIndexSQLGenerator(connection, majorVersion, 
minorVersion).generate(materials);
-    }
-    
-    @SuppressWarnings("unchecked")
-    private void formatColumns(final Map<String, Object> context) {
-        Collection<Map<String, Object>> columns = (Collection<Map<String, 
Object>>) context.get("columns");
-        for (Map<String, Object> each : columns) {
-            if (each.containsKey("cltype")) {
-                typeFormatter(each, (String) each.get("cltype"));
-            }
-        }
-    }
-    
-    private void typeFormatter(final Map<String, Object> column, final String 
columnType) {
-        if (columnType.contains("[]")) {
-            column.put("cltype", columnType.substring(0, columnType.length() - 
2));
-            column.put("hasSqrBracket", true);
-        } else {
-            column.put("hasSqrBracket", false);
-        }
-    }
-    
-    @Override
-    public String getDatabaseType() {
-        return "PostgreSQL";
-    }
-}
diff --git 
a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/PostgreSQLPipelineSQLBuilder.java
 
b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/PostgreSQLPipelineSQLBuilder.java
index a38a0e050f0..3e15bfbba9e 100644
--- 
a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/PostgreSQLPipelineSQLBuilder.java
+++ 
b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/PostgreSQLPipelineSQLBuilder.java
@@ -19,9 +19,20 @@ package 
org.apache.shardingsphere.data.pipeline.postgresql.sqlbuilder;
 
 import org.apache.shardingsphere.data.pipeline.core.ingest.record.Column;
 import org.apache.shardingsphere.data.pipeline.core.ingest.record.DataRecord;
-import 
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.PipelineSQLSegmentBuilder;
-import 
org.apache.shardingsphere.data.pipeline.core.spi.sql.DialectPipelineSQLBuilder;
+import 
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.segment.PipelineSQLSegmentBuilder;
+import 
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.dialect.DialectPipelineSQLBuilder;
+import 
org.apache.shardingsphere.data.pipeline.postgresql.ddlgenerator.PostgreSQLColumnPropertiesAppender;
+import 
org.apache.shardingsphere.data.pipeline.postgresql.ddlgenerator.PostgreSQLConstraintsPropertiesAppender;
+import 
org.apache.shardingsphere.data.pipeline.postgresql.ddlgenerator.PostgreSQLIndexSQLGenerator;
+import 
org.apache.shardingsphere.data.pipeline.postgresql.ddlgenerator.PostgreSQLTablePropertiesLoader;
+import 
org.apache.shardingsphere.data.pipeline.postgresql.util.PostgreSQLPipelineFreemarkerManager;
 
+import javax.sql.DataSource;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Map;
 import java.util.Optional;
 import java.util.stream.Collectors;
 
@@ -52,7 +63,7 @@ public final class PostgreSQLPipelineSQLBuilder implements 
DialectPipelineSQLBui
     }
     
     @Override
-    public String buildCheckEmptySQL(final String qualifiedTableName) {
+    public String buildCheckEmptyTableSQL(final String qualifiedTableName) {
         return String.format("SELECT * FROM %s LIMIT 1", qualifiedTableName);
     }
     
@@ -61,6 +72,55 @@ public final class PostgreSQLPipelineSQLBuilder implements 
DialectPipelineSQLBui
         return Optional.of(String.format("SELECT reltuples::integer FROM 
pg_class WHERE oid='%s'::regclass::oid;", qualifiedTableName));
     }
     
+    // TODO support partitions etc.
+    @Override
+    public Collection<String> buildCreateTableSQLs(final DataSource 
dataSource, final String schemaName, final String tableName) throws 
SQLException {
+        try (Connection connection = dataSource.getConnection()) {
+            int majorVersion = 
connection.getMetaData().getDatabaseMajorVersion();
+            int minorVersion = 
connection.getMetaData().getDatabaseMinorVersion();
+            Map<String, Object> materials = loadMaterials(tableName, 
schemaName, connection, majorVersion, minorVersion);
+            String tableSQL = generateCreateTableSQL(majorVersion, 
minorVersion, materials);
+            String indexSQL = generateCreateIndexSQL(connection, majorVersion, 
minorVersion, materials);
+            // TODO use ";" to split is not always correct
+            return Arrays.asList((tableSQL + System.lineSeparator() + 
indexSQL).trim().split(";"));
+        }
+    }
+    
+    private Map<String, Object> loadMaterials(final String tableName, final 
String schemaName, final Connection connection, final int majorVersion, final 
int minorVersion) throws SQLException {
+        Map<String, Object> result = new 
PostgreSQLTablePropertiesLoader(connection, tableName, schemaName, 
majorVersion, minorVersion).load();
+        new PostgreSQLColumnPropertiesAppender(connection, majorVersion, 
minorVersion).append(result);
+        new PostgreSQLConstraintsPropertiesAppender(connection, majorVersion, 
minorVersion).append(result);
+        formatColumns(result);
+        return result;
+    }
+    
+    private String generateCreateTableSQL(final int majorVersion, final int 
minorVersion, final Map<String, Object> materials) {
+        return PostgreSQLPipelineFreemarkerManager.getSQLByVersion(materials, 
"component/table/%s/create.ftl", majorVersion, minorVersion).trim();
+    }
+    
+    private String generateCreateIndexSQL(final Connection connection, final 
int majorVersion, final int minorVersion, final Map<String, Object> materials) 
throws SQLException {
+        return new PostgreSQLIndexSQLGenerator(connection, majorVersion, 
minorVersion).generate(materials);
+    }
+    
+    @SuppressWarnings("unchecked")
+    private void formatColumns(final Map<String, Object> context) {
+        Collection<Map<String, Object>> columns = (Collection<Map<String, 
Object>>) context.get("columns");
+        for (Map<String, Object> each : columns) {
+            if (each.containsKey("cltype")) {
+                typeFormatter(each, (String) each.get("cltype"));
+            }
+        }
+    }
+    
+    private void typeFormatter(final Map<String, Object> column, final String 
columnType) {
+        if (columnType.contains("[]")) {
+            column.put("cltype", columnType.substring(0, columnType.length() - 
2));
+            column.put("hasSqrBracket", true);
+        } else {
+            column.put("hasSqrBracket", false);
+        }
+    }
+    
     @Override
     public String getDatabaseType() {
         return "PostgreSQL";
diff --git 
a/kernel/data-pipeline/dialect/postgresql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.spi.sql.CreateTableSQLGenerator
 
b/kernel/data-pipeline/dialect/postgresql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.spi.sql.CreateTableSQLGenerator
deleted file mode 100644
index e34da9d903c..00000000000
--- 
a/kernel/data-pipeline/dialect/postgresql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.spi.sql.CreateTableSQLGenerator
+++ /dev/null
@@ -1,18 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-org.apache.shardingsphere.data.pipeline.postgresql.ddlgenerator.PostgreSQLCreateTableSQLGenerator
diff --git 
a/kernel/data-pipeline/dialect/postgresql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.spi.sql.DialectPipelineSQLBuilder
 
b/kernel/data-pipeline/dialect/postgresql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.sqlbuilder.dialect.DialectPipelineSQLBuilder
similarity index 100%
rename from 
kernel/data-pipeline/dialect/postgresql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.spi.sql.DialectPipelineSQLBuilder
rename to 
kernel/data-pipeline/dialect/postgresql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.sqlbuilder.dialect.DialectPipelineSQLBuilder
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
index c527f4f9312..bb19d367c2a 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
@@ -46,9 +46,9 @@ import 
org.apache.shardingsphere.data.pipeline.core.spi.ingest.dumper.Incrementa
 import org.apache.shardingsphere.data.pipeline.core.task.PipelineTask;
 import org.apache.shardingsphere.data.pipeline.core.task.PipelineTaskUtils;
 import 
org.apache.shardingsphere.data.pipeline.core.task.progress.IncrementalTaskProgress;
-import 
org.apache.shardingsphere.infra.database.core.metadata.database.DialectDatabaseMetaData;
 import 
org.apache.shardingsphere.infra.database.core.spi.DatabaseTypedSPILoader;
 import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
+import org.apache.shardingsphere.infra.database.core.type.DatabaseTypeRegistry;
 
 import java.sql.SQLException;
 import java.util.Collection;
@@ -139,7 +139,7 @@ public final class CDCJobPreparer {
     }
     
     private boolean needSorting(final DatabaseType databaseType) {
-        return 
DatabaseTypedSPILoader.getService(DialectDatabaseMetaData.class, 
databaseType).isSupportGlobalCSN();
+        return new 
DatabaseTypeRegistry(databaseType).getDialectDatabaseMetaData().isSupportGlobalCSN();
     }
     
     private void initIncrementalTask(final CDCJobItemContext jobItemContext, 
final AtomicBoolean importerUsed, final List<CDCChannelProgressPair> 
channelProgressPairs) {
diff --git 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/MigrationJobAPI.java
 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/MigrationJobAPI.java
index 76cfef64a17..bc451127cd8 100644
--- 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/MigrationJobAPI.java
+++ 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/MigrationJobAPI.java
@@ -32,7 +32,7 @@ import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourc
 import 
org.apache.shardingsphere.data.pipeline.core.datasource.yaml.YamlPipelineDataSourceConfiguration;
 import org.apache.shardingsphere.data.pipeline.core.job.type.PipelineJobType;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineSchemaUtils;
-import 
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.PipelineCommonSQLBuilder;
+import 
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.sql.PipelinePrepareSQLBuilder;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.connection.RegisterMigrationSourceStorageUnitException;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.connection.UnregisterMigrationSourceStorageUnitException;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.metadata.NoAnyRuleExistsException;
@@ -319,7 +319,7 @@ public final class MigrationJobAPI implements 
TransmissionJobAPI {
     
     private void cleanTempTableOnRollback(final String jobId) throws 
SQLException {
         MigrationJobConfiguration jobConfig = new 
PipelineJobConfigurationManager(TypedSPILoader.getService(PipelineJobType.class,
 getType())).getJobConfiguration(jobId);
-        PipelineCommonSQLBuilder pipelineSQLBuilder = new 
PipelineCommonSQLBuilder(jobConfig.getTargetDatabaseType());
+        PipelinePrepareSQLBuilder pipelineSQLBuilder = new 
PipelinePrepareSQLBuilder(jobConfig.getTargetDatabaseType());
         TableAndSchemaNameMapper mapping = new 
TableAndSchemaNameMapper(jobConfig.getTargetTableSchemaMap());
         try (
                 PipelineDataSourceWrapper dataSource = 
PipelineDataSourceFactory.newInstance(jobConfig.getTarget());
diff --git 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/createtable/CreateTableSQLGeneratorIT.java
 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/createtable/CreateTableSQLGeneratorIT.java
index d7ccd4536a2..8c81a9bfa21 100644
--- 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/createtable/CreateTableSQLGeneratorIT.java
+++ 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/createtable/CreateTableSQLGeneratorIT.java
@@ -17,7 +17,7 @@
 
 package org.apache.shardingsphere.test.e2e.data.pipeline.cases.createtable;
 
-import 
org.apache.shardingsphere.data.pipeline.core.spi.sql.CreateTableSQLGenerator;
+import 
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.dialect.DialectPipelineSQLBuilder;
 import 
org.apache.shardingsphere.infra.database.core.spi.DatabaseTypedSPILoader;
 import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
 import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
@@ -86,8 +86,8 @@ class CreateTableSQLGeneratorIT {
             int majorVersion = 
connection.getMetaData().getDatabaseMajorVersion();
             for (CreateTableSQLGeneratorAssertionEntity each : 
rootEntity.getAssertions()) {
                 statement.execute(each.getInput().getSql());
-                Collection<String> actualDDLs = 
DatabaseTypedSPILoader.getService(
-                        CreateTableSQLGenerator.class, 
testParam.getDatabaseType()).generate(dataSource, DEFAULT_SCHEMA, 
each.getInput().getTable());
+                DialectPipelineSQLBuilder sqlBuilder = 
DatabaseTypedSPILoader.getService(DialectPipelineSQLBuilder.class, 
testParam.getDatabaseType());
+                Collection<String> actualDDLs = 
sqlBuilder.buildCreateTableSQLs(dataSource, DEFAULT_SCHEMA, 
each.getInput().getTable());
                 assertSQL(actualDDLs, getVersionOutput(each.getOutputs(), 
majorVersion));
             }
         }
diff --git 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/h2/sql/H2CreateTableSQLGenerator.java
 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/h2/sql/H2CreateTableSQLGenerator.java
deleted file mode 100644
index 29a771630d8..00000000000
--- 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/h2/sql/H2CreateTableSQLGenerator.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.test.it.data.pipeline.core.fixture.h2.sql;
-
-import 
org.apache.shardingsphere.data.pipeline.core.exception.syntax.CreateTableSQLGenerateException;
-import 
org.apache.shardingsphere.data.pipeline.core.spi.sql.CreateTableSQLGenerator;
-import 
org.apache.shardingsphere.test.it.data.pipeline.core.util.PipelineContextUtils;
-
-import javax.sql.DataSource;
-import java.util.Collection;
-import java.util.Collections;
-
-/**
-* Create table SQL generator for H2.
- */
-public final class H2CreateTableSQLGenerator implements 
CreateTableSQLGenerator {
-    
-    @Override
-    public Collection<String> generate(final DataSource dataSource, final 
String schemaName, final String tableName) {
-        if ("t_order".equalsIgnoreCase(tableName)) {
-            return 
Collections.singletonList(PipelineContextUtils.getCreateOrderTableSchema());
-        }
-        throw new CreateTableSQLGenerateException(tableName);
-    }
-    
-    @Override
-    public String getDatabaseType() {
-        return "H2";
-    }
-}
diff --git 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/h2/sqlbuilder/H2PipelineSQLBuilder.java
 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/h2/sqlbuilder/H2PipelineSQLBuilder.java
index e848883d8cd..e0a8b73e81d 100644
--- 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/h2/sqlbuilder/H2PipelineSQLBuilder.java
+++ 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/h2/sqlbuilder/H2PipelineSQLBuilder.java
@@ -17,7 +17,14 @@
 
 package 
org.apache.shardingsphere.test.it.data.pipeline.core.fixture.h2.sqlbuilder;
 
-import 
org.apache.shardingsphere.data.pipeline.core.spi.sql.DialectPipelineSQLBuilder;
+import 
org.apache.shardingsphere.data.pipeline.core.exception.syntax.CreateTableSQLGenerateException;
+import 
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.dialect.DialectPipelineSQLBuilder;
+import 
org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
+import 
org.apache.shardingsphere.test.it.data.pipeline.core.util.PipelineContextUtils;
+
+import javax.sql.DataSource;
+import java.util.Collection;
+import java.util.Collections;
 
 /**
  * Pipeline SQL builder for H2.
@@ -25,10 +32,16 @@ import 
org.apache.shardingsphere.data.pipeline.core.spi.sql.DialectPipelineSQLBu
 public final class H2PipelineSQLBuilder implements DialectPipelineSQLBuilder {
     
     @Override
-    public String buildCheckEmptySQL(final String qualifiedTableName) {
+    public String buildCheckEmptyTableSQL(final String qualifiedTableName) {
         return String.format("SELECT * FROM %s LIMIT 1", qualifiedTableName);
     }
     
+    @Override
+    public Collection<String> buildCreateTableSQLs(final DataSource 
dataSource, final String schemaName, final String tableName) {
+        
ShardingSpherePreconditions.checkState("t_order".equalsIgnoreCase(tableName), 
() -> new CreateTableSQLGenerateException(tableName));
+        return 
Collections.singleton(PipelineContextUtils.getCreateOrderTableSchema());
+    }
+    
     @Override
     public String getDatabaseType() {
         return "H2";
diff --git 
a/test/it/pipeline/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.spi.sql.CreateTableSQLGenerator
 
b/test/it/pipeline/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.spi.sql.CreateTableSQLGenerator
deleted file mode 100644
index 33ec905f699..00000000000
--- 
a/test/it/pipeline/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.spi.sql.CreateTableSQLGenerator
+++ /dev/null
@@ -1,18 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-org.apache.shardingsphere.test.it.data.pipeline.core.fixture.h2.sql.H2CreateTableSQLGenerator
diff --git 
a/test/it/pipeline/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.spi.sql.DialectPipelineSQLBuilder
 
b/test/it/pipeline/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.sqlbuilder.dialect.DialectPipelineSQLBuilder
similarity index 100%
rename from 
test/it/pipeline/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.spi.sql.DialectPipelineSQLBuilder
rename to 
test/it/pipeline/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.sqlbuilder.dialect.DialectPipelineSQLBuilder

Reply via email to