This is an automated email from the ASF dual-hosted git repository.
duanzhengqiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new aac47159d92 Refactor sqlbuilder package (#29481)
aac47159d92 is described below
commit aac47159d9296cf9c251ada84d2881b4f6f2f817
Author: Liang Zhang <[email protected]>
AuthorDate: Thu Dec 21 12:00:52 2023 +0800
Refactor sqlbuilder package (#29481)
* Rename DialectPipelineSQLBuilder.buildCheckEmptyTableSQL
* Move DialectPipelineSQLBuilder
* Refactor sql package
* Merge DialectPipelineSQLBuilder and CreateTableSQLGenerator
* Rename PipelinePrepareSQLBuilder
* Refactor sqlbuilder package
* Fix test case
---
.../type/TestcontainersDatabaseTypeTest.java | 12 +--
.../core/checker/DataSourceCheckEngine.java | 8 +-
.../CRC32SingleTableInventoryCalculator.java | 2 +-
.../RecordSingleTableInventoryCalculator.java | 2 +-
.../core/importer/sink/PipelineDataSourceSink.java | 2 +-
.../core/ingest/dumper/InventoryDumper.java | 2 +-
.../metadata/generator/PipelineDDLGenerator.java | 4 +-
.../datasource/PipelineJobDataSourcePreparer.java | 4 +-
.../inventory/InventoryRecordsCountCalculator.java | 4 +-
.../preparer/inventory/InventoryTaskSplitter.java | 4 +-
.../core/spi/sql/CreateTableSQLGenerator.java | 43 -----------
.../dialect}/DialectPipelineSQLBuilder.java | 20 ++++-
.../{ => segment}/PipelineSQLSegmentBuilder.java | 2 +-
...PipelineDataConsistencyCalculateSQLBuilder.java | 5 +-
.../{ => sql}/PipelineImportSQLBuilder.java | 5 +-
.../{ => sql}/PipelineInventoryDumpSQLBuilder.java | 3 +-
.../PipelinePrepareSQLBuilder.java} | 17 ++--
...lineDataConsistencyCalculateSQLBuilderTest.java | 1 +
.../sqlbuilder/PipelineImportSQLBuilderTest.java | 1 +
.../PipelineInventoryDumpSQLBuilderTest.java | 1 +
.../sqlbuilder/PipelineSQLSegmentBuilderTest.java | 1 +
.../fixture/FixturePipelineSQLBuilder.java | 12 ++-
...e.sqlbuilder.dialect.DialectPipelineSQLBuilder} | 0
.../ddlgenerator/MySQLCreateTableSQLGenerator.java | 57 --------------
.../mysql/sqlbuilder/MySQLPipelineSQLBuilder.java | 27 ++++++-
...a.pipeline.core.spi.sql.CreateTableSQLGenerator | 18 -----
...e.sqlbuilder.dialect.DialectPipelineSQLBuilder} | 0
.../OpenGaussCreateTableSQLGenerator.java | 60 ---------------
.../sqlbuilder/OpenGaussPipelineSQLBuilder.java | 28 ++++++-
...a.pipeline.core.spi.sql.CreateTableSQLGenerator | 18 -----
...e.sqlbuilder.dialect.DialectPipelineSQLBuilder} | 0
.../PostgreSQLCreateTableSQLGenerator.java | 90 ----------------------
.../sqlbuilder/PostgreSQLPipelineSQLBuilder.java | 66 +++++++++++++++-
...a.pipeline.core.spi.sql.CreateTableSQLGenerator | 18 -----
...e.sqlbuilder.dialect.DialectPipelineSQLBuilder} | 0
.../pipeline/cdc/core/prepare/CDCJobPreparer.java | 4 +-
.../scenario/migration/api/MigrationJobAPI.java | 4 +-
.../createtable/CreateTableSQLGeneratorIT.java | 6 +-
.../fixture/h2/sql/H2CreateTableSQLGenerator.java | 45 -----------
.../h2/sqlbuilder/H2PipelineSQLBuilder.java | 17 +++-
...a.pipeline.core.spi.sql.CreateTableSQLGenerator | 18 -----
...e.sqlbuilder.dialect.DialectPipelineSQLBuilder} | 0
42 files changed, 205 insertions(+), 426 deletions(-)
diff --git
a/infra/database/type/testcontainers/src/test/java/org/apache/shardingsphere/infra/database/testcontainers/type/TestcontainersDatabaseTypeTest.java
b/infra/database/type/testcontainers/src/test/java/org/apache/shardingsphere/infra/database/testcontainers/type/TestcontainersDatabaseTypeTest.java
index 91dcaac3307..6229c141e4b 100644
---
a/infra/database/type/testcontainers/src/test/java/org/apache/shardingsphere/infra/database/testcontainers/type/TestcontainersDatabaseTypeTest.java
+++
b/infra/database/type/testcontainers/src/test/java/org/apache/shardingsphere/infra/database/testcontainers/type/TestcontainersDatabaseTypeTest.java
@@ -30,11 +30,11 @@ class TestcontainersDatabaseTypeTest {
@Test
void assertGetJdbcUrlPrefixes() {
- assertThat(TypedSPILoader.getService(DatabaseType.class,
"TestContainersClickHouse").getJdbcUrlPrefixes(),
is(Collections.singletonList("jdbc:tc:clickhouse:")));
- assertThat(TypedSPILoader.getService(DatabaseType.class,
"TestContainersMariaDB").getJdbcUrlPrefixes(),
is(Collections.singletonList("jdbc:tc:mariadb:")));
- assertThat(TypedSPILoader.getService(DatabaseType.class,
"TestContainersMySQL").getJdbcUrlPrefixes(),
is(Collections.singletonList("jdbc:tc:mysql:")));
- assertThat(TypedSPILoader.getService(DatabaseType.class,
"TestContainersOracle").getJdbcUrlPrefixes(),
is(Collections.singletonList("jdbc:tc:oracle:")));
- assertThat(TypedSPILoader.getService(DatabaseType.class,
"TestContainersPostgreSQL").getJdbcUrlPrefixes(),
is(Collections.singletonList("jdbc:tc:postgresql:")));
- assertThat(TypedSPILoader.getService(DatabaseType.class,
"TestContainersSQLServer").getJdbcUrlPrefixes(),
is(Collections.singletonList("jdbc:tc:sqlserver:")));
+ assertThat(TypedSPILoader.getService(DatabaseType.class,
"TC-ClickHouse").getJdbcUrlPrefixes(),
is(Collections.singleton("jdbc:tc:clickhouse:")));
+ assertThat(TypedSPILoader.getService(DatabaseType.class,
"TC-MariaDB").getJdbcUrlPrefixes(),
is(Collections.singleton("jdbc:tc:mariadb:")));
+ assertThat(TypedSPILoader.getService(DatabaseType.class,
"TC-MySQL").getJdbcUrlPrefixes(), is(Collections.singleton("jdbc:tc:mysql:")));
+ assertThat(TypedSPILoader.getService(DatabaseType.class,
"TC-Oracle").getJdbcUrlPrefixes(),
is(Collections.singleton("jdbc:tc:oracle:")));
+ assertThat(TypedSPILoader.getService(DatabaseType.class,
"TC-PostgreSQL").getJdbcUrlPrefixes(),
is(Collections.singleton("jdbc:tc:postgresql:")));
+ assertThat(TypedSPILoader.getService(DatabaseType.class,
"TC-SQLServer").getJdbcUrlPrefixes(),
is(Collections.singleton("jdbc:tc:sqlserver:")));
}
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/checker/DataSourceCheckEngine.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/checker/DataSourceCheckEngine.java
index 001a278fef2..6e5a82e8f0e 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/checker/DataSourceCheckEngine.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/checker/DataSourceCheckEngine.java
@@ -21,7 +21,7 @@ import
org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWith
import
org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithTargetTableNotEmptyException;
import
org.apache.shardingsphere.data.pipeline.core.importer.ImporterConfiguration;
import
org.apache.shardingsphere.data.pipeline.core.metadata.CaseInsensitiveQualifiedTable;
-import
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.PipelineCommonSQLBuilder;
+import
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.sql.PipelinePrepareSQLBuilder;
import
org.apache.shardingsphere.infra.database.core.spi.DatabaseTypedSPILoader;
import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
import
org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
@@ -40,11 +40,11 @@ public final class DataSourceCheckEngine {
private final DialectDataSourceChecker checker;
- private final PipelineCommonSQLBuilder sqlBuilder;
+ private final PipelinePrepareSQLBuilder sqlBuilder;
public DataSourceCheckEngine(final DatabaseType databaseType) {
checker =
DatabaseTypedSPILoader.findService(DialectDataSourceChecker.class,
databaseType).orElse(null);
- sqlBuilder = new PipelineCommonSQLBuilder(databaseType);
+ sqlBuilder = new PipelinePrepareSQLBuilder(databaseType);
}
/**
@@ -109,7 +109,7 @@ public final class DataSourceCheckEngine {
* @throws SQLException if there's database operation failure
*/
public boolean checkEmptyTable(final DataSource dataSource, final
CaseInsensitiveQualifiedTable qualifiedTable) throws SQLException {
- String sql =
sqlBuilder.buildCheckEmptySQL(qualifiedTable.getSchemaName().toString(),
qualifiedTable.getTableName().toString());
+ String sql =
sqlBuilder.buildCheckEmptyTableSQL(qualifiedTable.getSchemaName().toString(),
qualifiedTable.getTableName().toString());
try (
Connection connection = dataSource.getConnection();
PreparedStatement preparedStatement =
connection.prepareStatement(sql);
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/CRC32SingleTableInventoryCalculator.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/CRC32SingleTableInventoryCalculator.java
index def1d4c6c94..14effc106d1 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/CRC32SingleTableInventoryCalculator.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/CRC32SingleTableInventoryCalculator.java
@@ -20,7 +20,7 @@ package
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.calc
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
-import
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.PipelineDataConsistencyCalculateSQLBuilder;
+import
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.sql.PipelineDataConsistencyCalculateSQLBuilder;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.SingleTableInventoryCalculatedResult;
import
org.apache.shardingsphere.data.pipeline.core.exception.data.PipelineTableDataConsistencyCheckLoadingFailedException;
import
org.apache.shardingsphere.data.pipeline.core.exception.data.UnsupportedCRC32SingleTableInventoryCalculatorException;
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/RecordSingleTableInventoryCalculator.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/RecordSingleTableInventoryCalculator.java
index c6c561ebec6..2b1c2f24f76 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/RecordSingleTableInventoryCalculator.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/RecordSingleTableInventoryCalculator.java
@@ -20,7 +20,7 @@ package
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.calc
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import
org.apache.shardingsphere.data.pipeline.core.query.JDBCStreamQueryBuilder;
-import
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.PipelineDataConsistencyCalculateSQLBuilder;
+import
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.sql.PipelineDataConsistencyCalculateSQLBuilder;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.RecordSingleTableInventoryCalculatedResult;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.SingleTableInventoryCalculatedResult;
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.ColumnValueReaderEngine;
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/PipelineDataSourceSink.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/PipelineDataSourceSink.java
index 011187eab9d..a45828ba96e 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/PipelineDataSourceSink.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/PipelineDataSourceSink.java
@@ -31,7 +31,7 @@ import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourc
import
org.apache.shardingsphere.data.pipeline.core.ingest.IngestDataChangeType;
import org.apache.shardingsphere.data.pipeline.core.ingest.record.RecordUtils;
import
org.apache.shardingsphere.data.pipeline.core.job.progress.listener.PipelineJobProgressUpdatedParameter;
-import
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.PipelineImportSQLBuilder;
+import
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.sql.PipelineImportSQLBuilder;
import org.apache.shardingsphere.data.pipeline.core.util.PipelineJdbcUtils;
import
org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineImporterJobWriteException;
import org.apache.shardingsphere.data.pipeline.core.importer.DataRecordMerger;
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/InventoryDumper.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/InventoryDumper.java
index ac0b876db8d..e6e0cc17c2d 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/InventoryDumper.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/InventoryDumper.java
@@ -42,7 +42,7 @@ import
org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineColum
import
org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineTableMetaData;
import
org.apache.shardingsphere.data.pipeline.core.query.JDBCStreamQueryBuilder;
import
org.apache.shardingsphere.data.pipeline.core.spi.algorithm.JobRateLimitAlgorithm;
-import
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.PipelineInventoryDumpSQLBuilder;
+import
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.sql.PipelineInventoryDumpSQLBuilder;
import org.apache.shardingsphere.data.pipeline.core.util.PipelineJdbcUtils;
import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
import org.apache.shardingsphere.infra.database.mysql.type.MySQLDatabaseType;
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/generator/PipelineDDLGenerator.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/generator/PipelineDDLGenerator.java
index b6a24d9e0fd..82656a3f608 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/generator/PipelineDDLGenerator.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/generator/PipelineDDLGenerator.java
@@ -19,7 +19,7 @@ package
org.apache.shardingsphere.data.pipeline.core.metadata.generator;
import com.google.common.base.Strings;
import lombok.extern.slf4j.Slf4j;
-import
org.apache.shardingsphere.data.pipeline.core.spi.sql.CreateTableSQLGenerator;
+import
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.dialect.DialectPipelineSQLBuilder;
import
org.apache.shardingsphere.infra.binder.context.statement.SQLStatementContext;
import
org.apache.shardingsphere.infra.binder.context.statement.ddl.AlterTableStatementContext;
import
org.apache.shardingsphere.infra.binder.context.statement.ddl.CommentStatementContext;
@@ -76,7 +76,7 @@ public final class PipelineDDLGenerator {
final String schemaName, final String
sourceTableName, final String targetTableName, final SQLParserEngine
parserEngine) throws SQLException {
long startTimeMillis = System.currentTimeMillis();
StringBuilder result = new StringBuilder();
- for (String each :
DatabaseTypedSPILoader.getService(CreateTableSQLGenerator.class,
databaseType).generate(sourceDataSource, schemaName, sourceTableName)) {
+ for (String each :
DatabaseTypedSPILoader.getService(DialectPipelineSQLBuilder.class,
databaseType).buildCreateTableSQLs(sourceDataSource, schemaName,
sourceTableName)) {
Optional<String> queryContext = decorate(databaseType,
sourceDataSource, schemaName, targetTableName, parserEngine, each);
queryContext.ifPresent(optional ->
result.append(optional).append(DELIMITER).append(System.lineSeparator()));
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/PipelineJobDataSourcePreparer.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/PipelineJobDataSourcePreparer.java
index 3f59a1f3f7d..b62427ccfea 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/PipelineJobDataSourcePreparer.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/PipelineJobDataSourcePreparer.java
@@ -27,7 +27,7 @@ import
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.option.D
import
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param.CreateTableConfiguration;
import
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param.PrepareTargetSchemasParameter;
import
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param.PrepareTargetTablesParameter;
-import
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.PipelineCommonSQLBuilder;
+import
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.sql.PipelinePrepareSQLBuilder;
import
org.apache.shardingsphere.infra.database.core.metadata.database.DialectDatabaseMetaData;
import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
import org.apache.shardingsphere.infra.database.core.type.DatabaseTypeRegistry;
@@ -68,7 +68,7 @@ public final class PipelineJobDataSourcePreparer {
return;
}
String defaultSchema =
dialectDatabaseMetaData.getDefaultSchema().orElse(null);
- PipelineCommonSQLBuilder pipelineSQLBuilder = new
PipelineCommonSQLBuilder(targetDatabaseType);
+ PipelinePrepareSQLBuilder pipelineSQLBuilder = new
PipelinePrepareSQLBuilder(targetDatabaseType);
Collection<String> createdSchemaNames = new HashSet<>();
for (CreateTableConfiguration each :
param.getCreateTableConfigurations()) {
String targetSchemaName =
each.getTargetName().getSchemaName().toString();
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/InventoryRecordsCountCalculator.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/InventoryRecordsCountCalculator.java
index 786a5f8ff62..db13ef1e697 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/InventoryRecordsCountCalculator.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/InventoryRecordsCountCalculator.java
@@ -22,7 +22,7 @@ import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.InventoryDumperContext;
import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceWrapper;
-import
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.PipelineCommonSQLBuilder;
+import
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.sql.PipelinePrepareSQLBuilder;
import
org.apache.shardingsphere.data.pipeline.core.exception.job.SplitPipelineJobByUniqueKeyException;
import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
import org.apache.shardingsphere.infra.database.mysql.type.MySQLDatabaseType;
@@ -53,7 +53,7 @@ public final class InventoryRecordsCountCalculator {
public static long getTableRecordsCount(final InventoryDumperContext
dumperContext, final PipelineDataSourceWrapper dataSource) {
String schemaName =
dumperContext.getCommonContext().getTableAndSchemaNameMapper().getSchemaName(dumperContext.getLogicTableName());
String actualTableName = dumperContext.getActualTableName();
- PipelineCommonSQLBuilder pipelineSQLBuilder = new
PipelineCommonSQLBuilder(dataSource.getDatabaseType());
+ PipelinePrepareSQLBuilder pipelineSQLBuilder = new
PipelinePrepareSQLBuilder(dataSource.getDatabaseType());
Optional<String> sql =
pipelineSQLBuilder.buildEstimatedCountSQL(schemaName, actualTableName);
try {
if (sql.isPresent()) {
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/InventoryTaskSplitter.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/InventoryTaskSplitter.java
index 95d9c9e4eaa..7e84b31e8dd 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/InventoryTaskSplitter.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/InventoryTaskSplitter.java
@@ -36,7 +36,7 @@ import
org.apache.shardingsphere.data.pipeline.core.ingest.position.pk.type.Stri
import
org.apache.shardingsphere.data.pipeline.core.ingest.position.pk.type.UnsupportedKeyPosition;
import
org.apache.shardingsphere.data.pipeline.core.job.progress.TransmissionJobItemProgress;
import
org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataUtils;
-import
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.PipelineCommonSQLBuilder;
+import
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.sql.PipelinePrepareSQLBuilder;
import
org.apache.shardingsphere.data.pipeline.core.util.IntervalToRangeIterator;
import org.apache.shardingsphere.data.pipeline.core.util.PipelineJdbcUtils;
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.InventoryDumper;
@@ -202,7 +202,7 @@ public final class InventoryTaskSplitter {
private Range<Long> getUniqueKeyValuesRange(final
TransmissionJobItemContext jobItemContext, final DataSource dataSource, final
InventoryDumperContext dumperContext) {
String uniqueKey =
dumperContext.getUniqueKeyColumns().get(0).getName();
- PipelineCommonSQLBuilder pipelineSQLBuilder = new
PipelineCommonSQLBuilder(jobItemContext.getJobConfig().getSourceDatabaseType());
+ PipelinePrepareSQLBuilder pipelineSQLBuilder = new
PipelinePrepareSQLBuilder(jobItemContext.getJobConfig().getSourceDatabaseType());
String sql = pipelineSQLBuilder.buildUniqueKeyMinMaxValuesSQL(
dumperContext.getCommonContext().getTableAndSchemaNameMapper().getSchemaName(dumperContext.getLogicTableName()),
dumperContext.getActualTableName(), uniqueKey);
try (
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/sql/CreateTableSQLGenerator.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/sql/CreateTableSQLGenerator.java
deleted file mode 100644
index 1ab890861a3..00000000000
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/sql/CreateTableSQLGenerator.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.data.pipeline.core.spi.sql;
-
-import org.apache.shardingsphere.infra.database.core.spi.DatabaseTypedSPI;
-import org.apache.shardingsphere.infra.spi.annotation.SingletonSPI;
-
-import javax.sql.DataSource;
-import java.sql.SQLException;
-import java.util.Collection;
-
-/**
- * Create table SQL generator.
- */
-@SingletonSPI
-public interface CreateTableSQLGenerator extends DatabaseTypedSPI {
-
- /**
- * Generate create table SQLs.
- *
- * @param dataSource dataSource
- * @param schemaName schema name
- * @param tableName table name
- * @return generated SQLs
- * @throws SQLException SQL exception
- */
- Collection<String> generate(DataSource dataSource, String schemaName,
String tableName) throws SQLException;
-}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/sql/DialectPipelineSQLBuilder.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/dialect/DialectPipelineSQLBuilder.java
similarity index 78%
rename from
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/sql/DialectPipelineSQLBuilder.java
rename to
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/dialect/DialectPipelineSQLBuilder.java
index b1f1afedec1..57897e8908d 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/sql/DialectPipelineSQLBuilder.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/dialect/DialectPipelineSQLBuilder.java
@@ -15,11 +15,14 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.core.spi.sql;
+package org.apache.shardingsphere.data.pipeline.core.sqlbuilder.dialect;
import org.apache.shardingsphere.data.pipeline.core.ingest.record.DataRecord;
import org.apache.shardingsphere.infra.database.core.spi.DatabaseTypedSPI;
+import javax.sql.DataSource;
+import java.sql.SQLException;
+import java.util.Collection;
import java.util.Optional;
/**
@@ -48,12 +51,12 @@ public interface DialectPipelineSQLBuilder extends
DatabaseTypedSPI {
}
/**
- * Build check empty SQL.
+ * Build check empty table SQL.
*
* @param qualifiedTableName qualified table name
* @return built SQL
*/
- String buildCheckEmptySQL(String qualifiedTableName);
+ String buildCheckEmptyTableSQL(String qualifiedTableName);
/**
* Build estimated count SQL.
@@ -75,4 +78,15 @@ public interface DialectPipelineSQLBuilder extends
DatabaseTypedSPI {
default Optional<String> buildCRC32SQL(String qualifiedTableName, final
String columnName) {
return Optional.empty();
}
+
+ /**
+ * Build create table SQLs.
+ *
+ * @param dataSource dataSource
+ * @param schemaName schema name
+ * @param tableName table name
+ * @return built SQLs
+ * @throws SQLException SQL exception
+ */
+ Collection<String> buildCreateTableSQLs(DataSource dataSource, String
schemaName, String tableName) throws SQLException;
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/PipelineSQLSegmentBuilder.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/segment/PipelineSQLSegmentBuilder.java
similarity index 97%
rename from
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/PipelineSQLSegmentBuilder.java
rename to
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/segment/PipelineSQLSegmentBuilder.java
index 82be6b81951..8668fc48b13 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/PipelineSQLSegmentBuilder.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/segment/PipelineSQLSegmentBuilder.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.core.sqlbuilder;
+package org.apache.shardingsphere.data.pipeline.core.sqlbuilder.segment;
import com.google.common.base.Strings;
import
org.apache.shardingsphere.infra.database.core.metadata.database.DialectDatabaseMetaData;
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/PipelineDataConsistencyCalculateSQLBuilder.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/sql/PipelineDataConsistencyCalculateSQLBuilder.java
similarity index 92%
rename from
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/PipelineDataConsistencyCalculateSQLBuilder.java
rename to
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/sql/PipelineDataConsistencyCalculateSQLBuilder.java
index 1b37ef04114..c3a6f313fca 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/PipelineDataConsistencyCalculateSQLBuilder.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/sql/PipelineDataConsistencyCalculateSQLBuilder.java
@@ -15,9 +15,10 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.core.sqlbuilder;
+package org.apache.shardingsphere.data.pipeline.core.sqlbuilder.sql;
-import
org.apache.shardingsphere.data.pipeline.core.spi.sql.DialectPipelineSQLBuilder;
+import
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.dialect.DialectPipelineSQLBuilder;
+import
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.segment.PipelineSQLSegmentBuilder;
import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
import
org.apache.shardingsphere.infra.database.core.spi.DatabaseTypedSPILoader;
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/PipelineImportSQLBuilder.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/sql/PipelineImportSQLBuilder.java
similarity index 95%
rename from
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/PipelineImportSQLBuilder.java
rename to
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/sql/PipelineImportSQLBuilder.java
index 341cc6d7af5..735d6d266d3 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/PipelineImportSQLBuilder.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/sql/PipelineImportSQLBuilder.java
@@ -15,13 +15,14 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.core.sqlbuilder;
+package org.apache.shardingsphere.data.pipeline.core.sqlbuilder.sql;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import org.apache.shardingsphere.data.pipeline.core.ingest.record.Column;
import org.apache.shardingsphere.data.pipeline.core.ingest.record.DataRecord;
-import
org.apache.shardingsphere.data.pipeline.core.spi.sql.DialectPipelineSQLBuilder;
+import
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.dialect.DialectPipelineSQLBuilder;
+import
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.segment.PipelineSQLSegmentBuilder;
import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
import
org.apache.shardingsphere.infra.database.core.spi.DatabaseTypedSPILoader;
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/PipelineInventoryDumpSQLBuilder.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/sql/PipelineInventoryDumpSQLBuilder.java
similarity index 96%
rename from
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/PipelineInventoryDumpSQLBuilder.java
rename to
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/sql/PipelineInventoryDumpSQLBuilder.java
index e3d9d6db4fb..c53793fd917 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/PipelineInventoryDumpSQLBuilder.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/sql/PipelineInventoryDumpSQLBuilder.java
@@ -15,8 +15,9 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.core.sqlbuilder;
+package org.apache.shardingsphere.data.pipeline.core.sqlbuilder.sql;
+import
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.segment.PipelineSQLSegmentBuilder;
import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
import java.util.Collection;
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/PipelineCommonSQLBuilder.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/sql/PipelinePrepareSQLBuilder.java
similarity index 83%
rename from
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/PipelineCommonSQLBuilder.java
rename to
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/sql/PipelinePrepareSQLBuilder.java
index 25de1b1ab0a..6218a482e51 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/PipelineCommonSQLBuilder.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/sql/PipelinePrepareSQLBuilder.java
@@ -15,24 +15,25 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.core.sqlbuilder;
+package org.apache.shardingsphere.data.pipeline.core.sqlbuilder.sql;
-import
org.apache.shardingsphere.data.pipeline.core.spi.sql.DialectPipelineSQLBuilder;
+import
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.dialect.DialectPipelineSQLBuilder;
+import
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.segment.PipelineSQLSegmentBuilder;
import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
import
org.apache.shardingsphere.infra.database.core.spi.DatabaseTypedSPILoader;
import java.util.Optional;
/**
- * Pipeline common SQL builder.
+ * Pipeline prepare SQL builder.
*/
-public final class PipelineCommonSQLBuilder {
+public final class PipelinePrepareSQLBuilder {
private final DialectPipelineSQLBuilder dialectSQLBuilder;
private final PipelineSQLSegmentBuilder sqlSegmentBuilder;
- public PipelineCommonSQLBuilder(final DatabaseType databaseType) {
+ public PipelinePrepareSQLBuilder(final DatabaseType databaseType) {
dialectSQLBuilder =
DatabaseTypedSPILoader.getService(DialectPipelineSQLBuilder.class,
databaseType);
sqlSegmentBuilder = new PipelineSQLSegmentBuilder(databaseType);
}
@@ -94,13 +95,13 @@ public final class PipelineCommonSQLBuilder {
}
/**
- * Build check empty SQL.
+ * Build check empty table SQL.
*
* @param schemaName schema name
* @param tableName table name
* @return check SQL
*/
- public String buildCheckEmptySQL(final String schemaName, final String
tableName) {
- return
dialectSQLBuilder.buildCheckEmptySQL(sqlSegmentBuilder.getQualifiedTableName(schemaName,
tableName));
+ public String buildCheckEmptyTableSQL(final String schemaName, final
String tableName) {
+ return
dialectSQLBuilder.buildCheckEmptyTableSQL(sqlSegmentBuilder.getQualifiedTableName(schemaName,
tableName));
}
}
diff --git
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/PipelineDataConsistencyCalculateSQLBuilderTest.java
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/PipelineDataConsistencyCalculateSQLBuilderTest.java
index 98addaafc98..6a28e997970 100644
---
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/PipelineDataConsistencyCalculateSQLBuilderTest.java
+++
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/PipelineDataConsistencyCalculateSQLBuilderTest.java
@@ -17,6 +17,7 @@
package org.apache.shardingsphere.data.pipeline.core.sqlbuilder;
+import
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.sql.PipelineDataConsistencyCalculateSQLBuilder;
import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
import org.junit.jupiter.api.Test;
diff --git
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/PipelineImportSQLBuilderTest.java
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/PipelineImportSQLBuilderTest.java
index ca7c8668403..66282a8dfa5 100644
---
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/PipelineImportSQLBuilderTest.java
+++
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/PipelineImportSQLBuilderTest.java
@@ -22,6 +22,7 @@ import
org.apache.shardingsphere.data.pipeline.core.ingest.position.PlaceholderP
import org.apache.shardingsphere.data.pipeline.core.ingest.record.Column;
import org.apache.shardingsphere.data.pipeline.core.ingest.record.DataRecord;
import org.apache.shardingsphere.data.pipeline.core.ingest.record.RecordUtils;
+import
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.sql.PipelineImportSQLBuilder;
import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
import org.junit.jupiter.api.Test;
diff --git
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/PipelineInventoryDumpSQLBuilderTest.java
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/PipelineInventoryDumpSQLBuilderTest.java
index d50563842b9..f765003f5fc 100644
---
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/PipelineInventoryDumpSQLBuilderTest.java
+++
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/PipelineInventoryDumpSQLBuilderTest.java
@@ -17,6 +17,7 @@
package org.apache.shardingsphere.data.pipeline.core.sqlbuilder;
+import
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.sql.PipelineInventoryDumpSQLBuilder;
import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
import org.junit.jupiter.api.Test;
diff --git
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/PipelineSQLSegmentBuilderTest.java
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/PipelineSQLSegmentBuilderTest.java
index 869244db9c7..0390dff9213 100644
---
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/PipelineSQLSegmentBuilderTest.java
+++
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/PipelineSQLSegmentBuilderTest.java
@@ -17,6 +17,7 @@
package org.apache.shardingsphere.data.pipeline.core.sqlbuilder;
+import
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.segment.PipelineSQLSegmentBuilder;
import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
import org.junit.jupiter.api.Test;
diff --git
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/fixture/FixturePipelineSQLBuilder.java
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/fixture/FixturePipelineSQLBuilder.java
index 8c749701502..66087b5f63d 100644
---
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/fixture/FixturePipelineSQLBuilder.java
+++
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/fixture/FixturePipelineSQLBuilder.java
@@ -17,14 +17,17 @@
package org.apache.shardingsphere.data.pipeline.core.sqlbuilder.fixture;
-import
org.apache.shardingsphere.data.pipeline.core.spi.sql.DialectPipelineSQLBuilder;
+import
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.dialect.DialectPipelineSQLBuilder;
+import javax.sql.DataSource;
+import java.util.Collection;
+import java.util.Collections;
import java.util.Optional;
public final class FixturePipelineSQLBuilder implements
DialectPipelineSQLBuilder {
@Override
- public String buildCheckEmptySQL(final String qualifiedTableName) {
+ public String buildCheckEmptyTableSQL(final String qualifiedTableName) {
return String.format("SELECT * FROM %s LIMIT 1", qualifiedTableName);
}
@@ -33,6 +36,11 @@ public final class FixturePipelineSQLBuilder implements
DialectPipelineSQLBuilde
return Optional.of(String.format("SELECT CRC32(%s) FROM %s",
columnName, qualifiedTableName));
}
+ @Override
+ public Collection<String> buildCreateTableSQLs(final DataSource
dataSource, final String schemaName, final String tableName) {
+ return Collections.emptyList();
+ }
+
@Override
public String getDatabaseType() {
return "FIXTURE";
diff --git
a/kernel/data-pipeline/core/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.spi.sql.DialectPipelineSQLBuilder
b/kernel/data-pipeline/core/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.sqlbuilder.dialect.DialectPipelineSQLBuilder
similarity index 100%
rename from
kernel/data-pipeline/core/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.spi.sql.DialectPipelineSQLBuilder
rename to
kernel/data-pipeline/core/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.sqlbuilder.dialect.DialectPipelineSQLBuilder
diff --git
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ddlgenerator/MySQLCreateTableSQLGenerator.java
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ddlgenerator/MySQLCreateTableSQLGenerator.java
deleted file mode 100644
index 15bd028e803..00000000000
---
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ddlgenerator/MySQLCreateTableSQLGenerator.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.data.pipeline.mysql.ddlgenerator;
-
-import
org.apache.shardingsphere.data.pipeline.core.exception.syntax.CreateTableSQLGenerateException;
-import
org.apache.shardingsphere.data.pipeline.core.spi.sql.CreateTableSQLGenerator;
-
-import javax.sql.DataSource;
-import java.sql.Connection;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.sql.Statement;
-import java.util.Collection;
-import java.util.Collections;
-
-/**
-* Create table SQL generator for MySQL.
- */
-public final class MySQLCreateTableSQLGenerator implements
CreateTableSQLGenerator {
-
- private static final String SHOW_CREATE_SQL = "SHOW CREATE TABLE %s";
-
- private static final String COLUMN_LABEL = "create table";
-
- @Override
- public Collection<String> generate(final DataSource dataSource, final
String schemaName, final String tableName) throws SQLException {
- try (
- Connection connection = dataSource.getConnection();
- Statement statement = connection.createStatement();
- ResultSet resultSet =
statement.executeQuery(String.format(SHOW_CREATE_SQL, tableName))) {
- if (resultSet.next()) {
- return
Collections.singletonList(resultSet.getString(COLUMN_LABEL));
- }
- }
- throw new CreateTableSQLGenerateException(tableName);
- }
-
- @Override
- public String getDatabaseType() {
- return "MySQL";
- }
-}
diff --git
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/sqlbuilder/MySQLPipelineSQLBuilder.java
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/sqlbuilder/MySQLPipelineSQLBuilder.java
index a71dda64e09..cd4c69bdd6e 100644
---
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/sqlbuilder/MySQLPipelineSQLBuilder.java
+++
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/sqlbuilder/MySQLPipelineSQLBuilder.java
@@ -17,11 +17,19 @@
package org.apache.shardingsphere.data.pipeline.mysql.sqlbuilder;
+import
org.apache.shardingsphere.data.pipeline.core.exception.syntax.CreateTableSQLGenerateException;
import org.apache.shardingsphere.data.pipeline.core.ingest.record.Column;
import org.apache.shardingsphere.data.pipeline.core.ingest.record.DataRecord;
-import
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.PipelineSQLSegmentBuilder;
-import
org.apache.shardingsphere.data.pipeline.core.spi.sql.DialectPipelineSQLBuilder;
+import
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.segment.PipelineSQLSegmentBuilder;
+import
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.dialect.DialectPipelineSQLBuilder;
+import javax.sql.DataSource;
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Collection;
+import java.util.Collections;
import java.util.Optional;
/**
@@ -49,7 +57,7 @@ public final class MySQLPipelineSQLBuilder implements
DialectPipelineSQLBuilder
}
@Override
- public String buildCheckEmptySQL(final String qualifiedTableName) {
+ public String buildCheckEmptyTableSQL(final String qualifiedTableName) {
return String.format("SELECT * FROM %s LIMIT 1", qualifiedTableName);
}
@@ -63,6 +71,19 @@ public final class MySQLPipelineSQLBuilder implements
DialectPipelineSQLBuilder
return Optional.of(String.format("SELECT BIT_XOR(CAST(CRC32(%s) AS
UNSIGNED)) AS checksum, COUNT(1) AS cnt FROM %s", columnName,
qualifiedTableName));
}
+ @Override
+ public Collection<String> buildCreateTableSQLs(final DataSource
dataSource, final String schemaName, final String tableName) throws
SQLException {
+ try (
+ Connection connection = dataSource.getConnection();
+ Statement statement = connection.createStatement();
+ ResultSet resultSet =
statement.executeQuery(String.format("SHOW CREATE TABLE %s", tableName))) {
+ if (resultSet.next()) {
+ return Collections.singleton(resultSet.getString("create
table"));
+ }
+ }
+ throw new CreateTableSQLGenerateException(tableName);
+ }
+
@Override
public String getDatabaseType() {
return "MySQL";
diff --git
a/kernel/data-pipeline/dialect/mysql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.spi.sql.CreateTableSQLGenerator
b/kernel/data-pipeline/dialect/mysql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.spi.sql.CreateTableSQLGenerator
deleted file mode 100644
index 4d8d8e80a06..00000000000
---
a/kernel/data-pipeline/dialect/mysql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.spi.sql.CreateTableSQLGenerator
+++ /dev/null
@@ -1,18 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-org.apache.shardingsphere.data.pipeline.mysql.ddlgenerator.MySQLCreateTableSQLGenerator
diff --git
a/kernel/data-pipeline/dialect/mysql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.spi.sql.DialectPipelineSQLBuilder
b/kernel/data-pipeline/dialect/mysql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.sqlbuilder.dialect.DialectPipelineSQLBuilder
similarity index 100%
rename from
kernel/data-pipeline/dialect/mysql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.spi.sql.DialectPipelineSQLBuilder
rename to
kernel/data-pipeline/dialect/mysql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.sqlbuilder.dialect.DialectPipelineSQLBuilder
diff --git
a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ddlgenerator/OpenGaussCreateTableSQLGenerator.java
b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ddlgenerator/OpenGaussCreateTableSQLGenerator.java
deleted file mode 100644
index f98369ecf90..00000000000
---
a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ddlgenerator/OpenGaussCreateTableSQLGenerator.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.data.pipeline.opengauss.ddlgenerator;
-
-import
org.apache.shardingsphere.data.pipeline.core.exception.syntax.CreateTableSQLGenerateException;
-import
org.apache.shardingsphere.data.pipeline.core.spi.sql.CreateTableSQLGenerator;
-
-import javax.sql.DataSource;
-import java.sql.Connection;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.sql.Statement;
-import java.util.Arrays;
-import java.util.Collection;
-
-/**
-* Create table SQL generator for openGauss.
- */
-public final class OpenGaussCreateTableSQLGenerator implements
CreateTableSQLGenerator {
-
- private static final String SELECT_TABLE_DEF_SQL = "SELECT * FROM
pg_get_tabledef('%s.%s')";
-
- private static final String COLUMN_LABEL = "pg_get_tabledef";
-
- private static final String DELIMITER = ";";
-
- @Override
- public Collection<String> generate(final DataSource dataSource, final
String schemaName, final String tableName) throws SQLException {
- try (
- Connection connection = dataSource.getConnection();
- Statement statement = connection.createStatement();
- ResultSet resultSet =
statement.executeQuery(String.format(SELECT_TABLE_DEF_SQL, schemaName,
tableName))) {
- if (resultSet.next()) {
- // TODO use ";" to split is not always correct
- return
Arrays.asList(resultSet.getString(COLUMN_LABEL).split(DELIMITER));
- }
- }
- throw new CreateTableSQLGenerateException(tableName);
- }
-
- @Override
- public String getDatabaseType() {
- return "openGauss";
- }
-}
diff --git
a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/sqlbuilder/OpenGaussPipelineSQLBuilder.java
b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/sqlbuilder/OpenGaussPipelineSQLBuilder.java
index a35310c2d49..9d191b093d9 100644
---
a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/sqlbuilder/OpenGaussPipelineSQLBuilder.java
+++
b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/sqlbuilder/OpenGaussPipelineSQLBuilder.java
@@ -17,10 +17,18 @@
package org.apache.shardingsphere.data.pipeline.opengauss.sqlbuilder;
+import
org.apache.shardingsphere.data.pipeline.core.exception.syntax.CreateTableSQLGenerateException;
import org.apache.shardingsphere.data.pipeline.core.ingest.record.DataRecord;
-import
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.PipelineSQLSegmentBuilder;
-import
org.apache.shardingsphere.data.pipeline.core.spi.sql.DialectPipelineSQLBuilder;
+import
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.segment.PipelineSQLSegmentBuilder;
+import
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.dialect.DialectPipelineSQLBuilder;
+import javax.sql.DataSource;
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Arrays;
+import java.util.Collection;
import java.util.Optional;
import java.util.stream.Collectors;
@@ -45,7 +53,7 @@ public final class OpenGaussPipelineSQLBuilder implements
DialectPipelineSQLBuil
}
@Override
- public String buildCheckEmptySQL(final String qualifiedTableName) {
+ public String buildCheckEmptyTableSQL(final String qualifiedTableName) {
return String.format("SELECT * FROM %s LIMIT 1", qualifiedTableName);
}
@@ -54,6 +62,20 @@ public final class OpenGaussPipelineSQLBuilder implements
DialectPipelineSQLBuil
return Optional.of(String.format("SELECT reltuples::integer FROM
pg_class WHERE oid='%s'::regclass::oid;", qualifiedTableName));
}
+ @Override
+ public Collection<String> buildCreateTableSQLs(final DataSource
dataSource, final String schemaName, final String tableName) throws
SQLException {
+ try (
+ Connection connection = dataSource.getConnection();
+ Statement statement = connection.createStatement();
+ ResultSet resultSet =
statement.executeQuery(String.format("SELECT * FROM pg_get_tabledef('%s.%s')",
schemaName, tableName))) {
+ if (resultSet.next()) {
+ // TODO use ";" to split is not always correct
+ return
Arrays.asList(resultSet.getString("pg_get_tabledef").split(";"));
+ }
+ }
+ throw new CreateTableSQLGenerateException(tableName);
+ }
+
@Override
public String getDatabaseType() {
return "openGauss";
diff --git
a/kernel/data-pipeline/dialect/opengauss/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.spi.sql.CreateTableSQLGenerator
b/kernel/data-pipeline/dialect/opengauss/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.spi.sql.CreateTableSQLGenerator
deleted file mode 100644
index a57974eebae..00000000000
---
a/kernel/data-pipeline/dialect/opengauss/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.spi.sql.CreateTableSQLGenerator
+++ /dev/null
@@ -1,18 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-org.apache.shardingsphere.data.pipeline.opengauss.ddlgenerator.OpenGaussCreateTableSQLGenerator
diff --git
a/kernel/data-pipeline/dialect/opengauss/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.spi.sql.DialectPipelineSQLBuilder
b/kernel/data-pipeline/dialect/opengauss/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.sqlbuilder.dialect.DialectPipelineSQLBuilder
similarity index 100%
rename from
kernel/data-pipeline/dialect/opengauss/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.spi.sql.DialectPipelineSQLBuilder
rename to
kernel/data-pipeline/dialect/opengauss/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.sqlbuilder.dialect.DialectPipelineSQLBuilder
diff --git
a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ddlgenerator/PostgreSQLCreateTableSQLGenerator.java
b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ddlgenerator/PostgreSQLCreateTableSQLGenerator.java
deleted file mode 100644
index 79991a3a3ec..00000000000
---
a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ddlgenerator/PostgreSQLCreateTableSQLGenerator.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.data.pipeline.postgresql.ddlgenerator;
-
-import
org.apache.shardingsphere.data.pipeline.postgresql.util.PostgreSQLPipelineFreemarkerManager;
-import
org.apache.shardingsphere.data.pipeline.core.spi.sql.CreateTableSQLGenerator;
-
-import javax.sql.DataSource;
-import java.sql.Connection;
-import java.sql.SQLException;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Map;
-
-/**
- * Create table SQL generator for PostgreSQL.
- */
-public final class PostgreSQLCreateTableSQLGenerator implements
CreateTableSQLGenerator {
-
- private static final String DELIMITER = ";";
-
- // TODO support partitions etc.
- @Override
- public Collection<String> generate(final DataSource dataSource, final
String schemaName, final String tableName) throws SQLException {
- try (Connection connection = dataSource.getConnection()) {
- int majorVersion =
connection.getMetaData().getDatabaseMajorVersion();
- int minorVersion =
connection.getMetaData().getDatabaseMinorVersion();
- Map<String, Object> materials = loadMaterials(tableName,
schemaName, connection, majorVersion, minorVersion);
- String tableSQL = generateCreateTableSQL(majorVersion,
minorVersion, materials);
- String indexSQL = generateCreateIndexSQL(connection, majorVersion,
minorVersion, materials);
- // TODO use ";" to split is not always correct
- return Arrays.asList((tableSQL + System.lineSeparator() +
indexSQL).trim().split(DELIMITER));
- }
- }
-
- private Map<String, Object> loadMaterials(final String tableName, final
String schemaName, final Connection connection, final int majorVersion, final
int minorVersion) throws SQLException {
- Map<String, Object> result = new
PostgreSQLTablePropertiesLoader(connection, tableName, schemaName,
majorVersion, minorVersion).load();
- new PostgreSQLColumnPropertiesAppender(connection, majorVersion,
minorVersion).append(result);
- new PostgreSQLConstraintsPropertiesAppender(connection, majorVersion,
minorVersion).append(result);
- formatColumns(result);
- return result;
- }
-
- private String generateCreateTableSQL(final int majorVersion, final int
minorVersion, final Map<String, Object> materials) {
- return PostgreSQLPipelineFreemarkerManager.getSQLByVersion(materials,
"component/table/%s/create.ftl", majorVersion, minorVersion).trim();
- }
-
- private String generateCreateIndexSQL(final Connection connection, final
int majorVersion, final int minorVersion, final Map<String, Object> materials)
throws SQLException {
- return new PostgreSQLIndexSQLGenerator(connection, majorVersion,
minorVersion).generate(materials);
- }
-
- @SuppressWarnings("unchecked")
- private void formatColumns(final Map<String, Object> context) {
- Collection<Map<String, Object>> columns = (Collection<Map<String,
Object>>) context.get("columns");
- for (Map<String, Object> each : columns) {
- if (each.containsKey("cltype")) {
- typeFormatter(each, (String) each.get("cltype"));
- }
- }
- }
-
- private void typeFormatter(final Map<String, Object> column, final String
columnType) {
- if (columnType.contains("[]")) {
- column.put("cltype", columnType.substring(0, columnType.length() -
2));
- column.put("hasSqrBracket", true);
- } else {
- column.put("hasSqrBracket", false);
- }
- }
-
- @Override
- public String getDatabaseType() {
- return "PostgreSQL";
- }
-}
diff --git
a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/PostgreSQLPipelineSQLBuilder.java
b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/PostgreSQLPipelineSQLBuilder.java
index a38a0e050f0..3e15bfbba9e 100644
---
a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/PostgreSQLPipelineSQLBuilder.java
+++
b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/PostgreSQLPipelineSQLBuilder.java
@@ -19,9 +19,20 @@ package
org.apache.shardingsphere.data.pipeline.postgresql.sqlbuilder;
import org.apache.shardingsphere.data.pipeline.core.ingest.record.Column;
import org.apache.shardingsphere.data.pipeline.core.ingest.record.DataRecord;
-import
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.PipelineSQLSegmentBuilder;
-import
org.apache.shardingsphere.data.pipeline.core.spi.sql.DialectPipelineSQLBuilder;
+import
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.segment.PipelineSQLSegmentBuilder;
+import
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.dialect.DialectPipelineSQLBuilder;
+import
org.apache.shardingsphere.data.pipeline.postgresql.ddlgenerator.PostgreSQLColumnPropertiesAppender;
+import
org.apache.shardingsphere.data.pipeline.postgresql.ddlgenerator.PostgreSQLConstraintsPropertiesAppender;
+import
org.apache.shardingsphere.data.pipeline.postgresql.ddlgenerator.PostgreSQLIndexSQLGenerator;
+import
org.apache.shardingsphere.data.pipeline.postgresql.ddlgenerator.PostgreSQLTablePropertiesLoader;
+import
org.apache.shardingsphere.data.pipeline.postgresql.util.PostgreSQLPipelineFreemarkerManager;
+import javax.sql.DataSource;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
@@ -52,7 +63,7 @@ public final class PostgreSQLPipelineSQLBuilder implements
DialectPipelineSQLBui
}
@Override
- public String buildCheckEmptySQL(final String qualifiedTableName) {
+ public String buildCheckEmptyTableSQL(final String qualifiedTableName) {
return String.format("SELECT * FROM %s LIMIT 1", qualifiedTableName);
}
@@ -61,6 +72,55 @@ public final class PostgreSQLPipelineSQLBuilder implements
DialectPipelineSQLBui
return Optional.of(String.format("SELECT reltuples::integer FROM
pg_class WHERE oid='%s'::regclass::oid;", qualifiedTableName));
}
+ // TODO support partitions etc.
+ @Override
+ public Collection<String> buildCreateTableSQLs(final DataSource
dataSource, final String schemaName, final String tableName) throws
SQLException {
+ try (Connection connection = dataSource.getConnection()) {
+ int majorVersion =
connection.getMetaData().getDatabaseMajorVersion();
+ int minorVersion =
connection.getMetaData().getDatabaseMinorVersion();
+ Map<String, Object> materials = loadMaterials(tableName,
schemaName, connection, majorVersion, minorVersion);
+ String tableSQL = generateCreateTableSQL(majorVersion,
minorVersion, materials);
+ String indexSQL = generateCreateIndexSQL(connection, majorVersion,
minorVersion, materials);
+ // TODO use ";" to split is not always correct
+ return Arrays.asList((tableSQL + System.lineSeparator() +
indexSQL).trim().split(";"));
+ }
+ }
+
+ private Map<String, Object> loadMaterials(final String tableName, final
String schemaName, final Connection connection, final int majorVersion, final
int minorVersion) throws SQLException {
+ Map<String, Object> result = new
PostgreSQLTablePropertiesLoader(connection, tableName, schemaName,
majorVersion, minorVersion).load();
+ new PostgreSQLColumnPropertiesAppender(connection, majorVersion,
minorVersion).append(result);
+ new PostgreSQLConstraintsPropertiesAppender(connection, majorVersion,
minorVersion).append(result);
+ formatColumns(result);
+ return result;
+ }
+
+ private String generateCreateTableSQL(final int majorVersion, final int
minorVersion, final Map<String, Object> materials) {
+ return PostgreSQLPipelineFreemarkerManager.getSQLByVersion(materials,
"component/table/%s/create.ftl", majorVersion, minorVersion).trim();
+ }
+
+ private String generateCreateIndexSQL(final Connection connection, final
int majorVersion, final int minorVersion, final Map<String, Object> materials)
throws SQLException {
+ return new PostgreSQLIndexSQLGenerator(connection, majorVersion,
minorVersion).generate(materials);
+ }
+
+ @SuppressWarnings("unchecked")
+ private void formatColumns(final Map<String, Object> context) {
+ Collection<Map<String, Object>> columns = (Collection<Map<String,
Object>>) context.get("columns");
+ for (Map<String, Object> each : columns) {
+ if (each.containsKey("cltype")) {
+ typeFormatter(each, (String) each.get("cltype"));
+ }
+ }
+ }
+
+ private void typeFormatter(final Map<String, Object> column, final String
columnType) {
+ if (columnType.contains("[]")) {
+ column.put("cltype", columnType.substring(0, columnType.length() -
2));
+ column.put("hasSqrBracket", true);
+ } else {
+ column.put("hasSqrBracket", false);
+ }
+ }
+
@Override
public String getDatabaseType() {
return "PostgreSQL";
diff --git
a/kernel/data-pipeline/dialect/postgresql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.spi.sql.CreateTableSQLGenerator
b/kernel/data-pipeline/dialect/postgresql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.spi.sql.CreateTableSQLGenerator
deleted file mode 100644
index e34da9d903c..00000000000
---
a/kernel/data-pipeline/dialect/postgresql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.spi.sql.CreateTableSQLGenerator
+++ /dev/null
@@ -1,18 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-org.apache.shardingsphere.data.pipeline.postgresql.ddlgenerator.PostgreSQLCreateTableSQLGenerator
diff --git
a/kernel/data-pipeline/dialect/postgresql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.spi.sql.DialectPipelineSQLBuilder
b/kernel/data-pipeline/dialect/postgresql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.sqlbuilder.dialect.DialectPipelineSQLBuilder
similarity index 100%
rename from
kernel/data-pipeline/dialect/postgresql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.spi.sql.DialectPipelineSQLBuilder
rename to
kernel/data-pipeline/dialect/postgresql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.sqlbuilder.dialect.DialectPipelineSQLBuilder
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
index c527f4f9312..bb19d367c2a 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
@@ -46,9 +46,9 @@ import
org.apache.shardingsphere.data.pipeline.core.spi.ingest.dumper.Incrementa
import org.apache.shardingsphere.data.pipeline.core.task.PipelineTask;
import org.apache.shardingsphere.data.pipeline.core.task.PipelineTaskUtils;
import
org.apache.shardingsphere.data.pipeline.core.task.progress.IncrementalTaskProgress;
-import
org.apache.shardingsphere.infra.database.core.metadata.database.DialectDatabaseMetaData;
import
org.apache.shardingsphere.infra.database.core.spi.DatabaseTypedSPILoader;
import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
+import org.apache.shardingsphere.infra.database.core.type.DatabaseTypeRegistry;
import java.sql.SQLException;
import java.util.Collection;
@@ -139,7 +139,7 @@ public final class CDCJobPreparer {
}
private boolean needSorting(final DatabaseType databaseType) {
- return
DatabaseTypedSPILoader.getService(DialectDatabaseMetaData.class,
databaseType).isSupportGlobalCSN();
+ return new
DatabaseTypeRegistry(databaseType).getDialectDatabaseMetaData().isSupportGlobalCSN();
}
private void initIncrementalTask(final CDCJobItemContext jobItemContext,
final AtomicBoolean importerUsed, final List<CDCChannelProgressPair>
channelProgressPairs) {
diff --git
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/MigrationJobAPI.java
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/MigrationJobAPI.java
index 76cfef64a17..bc451127cd8 100644
---
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/MigrationJobAPI.java
+++
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/MigrationJobAPI.java
@@ -32,7 +32,7 @@ import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourc
import
org.apache.shardingsphere.data.pipeline.core.datasource.yaml.YamlPipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.core.job.type.PipelineJobType;
import
org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineSchemaUtils;
-import
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.PipelineCommonSQLBuilder;
+import
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.sql.PipelinePrepareSQLBuilder;
import
org.apache.shardingsphere.data.pipeline.core.exception.connection.RegisterMigrationSourceStorageUnitException;
import
org.apache.shardingsphere.data.pipeline.core.exception.connection.UnregisterMigrationSourceStorageUnitException;
import
org.apache.shardingsphere.data.pipeline.core.exception.metadata.NoAnyRuleExistsException;
@@ -319,7 +319,7 @@ public final class MigrationJobAPI implements
TransmissionJobAPI {
private void cleanTempTableOnRollback(final String jobId) throws
SQLException {
MigrationJobConfiguration jobConfig = new
PipelineJobConfigurationManager(TypedSPILoader.getService(PipelineJobType.class,
getType())).getJobConfiguration(jobId);
- PipelineCommonSQLBuilder pipelineSQLBuilder = new
PipelineCommonSQLBuilder(jobConfig.getTargetDatabaseType());
+ PipelinePrepareSQLBuilder pipelineSQLBuilder = new
PipelinePrepareSQLBuilder(jobConfig.getTargetDatabaseType());
TableAndSchemaNameMapper mapping = new
TableAndSchemaNameMapper(jobConfig.getTargetTableSchemaMap());
try (
PipelineDataSourceWrapper dataSource =
PipelineDataSourceFactory.newInstance(jobConfig.getTarget());
diff --git
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/createtable/CreateTableSQLGeneratorIT.java
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/createtable/CreateTableSQLGeneratorIT.java
index d7ccd4536a2..8c81a9bfa21 100644
---
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/createtable/CreateTableSQLGeneratorIT.java
+++
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/createtable/CreateTableSQLGeneratorIT.java
@@ -17,7 +17,7 @@
package org.apache.shardingsphere.test.e2e.data.pipeline.cases.createtable;
-import
org.apache.shardingsphere.data.pipeline.core.spi.sql.CreateTableSQLGenerator;
+import
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.dialect.DialectPipelineSQLBuilder;
import
org.apache.shardingsphere.infra.database.core.spi.DatabaseTypedSPILoader;
import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
@@ -86,8 +86,8 @@ class CreateTableSQLGeneratorIT {
int majorVersion =
connection.getMetaData().getDatabaseMajorVersion();
for (CreateTableSQLGeneratorAssertionEntity each :
rootEntity.getAssertions()) {
statement.execute(each.getInput().getSql());
- Collection<String> actualDDLs =
DatabaseTypedSPILoader.getService(
- CreateTableSQLGenerator.class,
testParam.getDatabaseType()).generate(dataSource, DEFAULT_SCHEMA,
each.getInput().getTable());
+ DialectPipelineSQLBuilder sqlBuilder =
DatabaseTypedSPILoader.getService(DialectPipelineSQLBuilder.class,
testParam.getDatabaseType());
+ Collection<String> actualDDLs =
sqlBuilder.buildCreateTableSQLs(dataSource, DEFAULT_SCHEMA,
each.getInput().getTable());
assertSQL(actualDDLs, getVersionOutput(each.getOutputs(),
majorVersion));
}
}
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/h2/sql/H2CreateTableSQLGenerator.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/h2/sql/H2CreateTableSQLGenerator.java
deleted file mode 100644
index 29a771630d8..00000000000
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/h2/sql/H2CreateTableSQLGenerator.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.test.it.data.pipeline.core.fixture.h2.sql;
-
-import
org.apache.shardingsphere.data.pipeline.core.exception.syntax.CreateTableSQLGenerateException;
-import
org.apache.shardingsphere.data.pipeline.core.spi.sql.CreateTableSQLGenerator;
-import
org.apache.shardingsphere.test.it.data.pipeline.core.util.PipelineContextUtils;
-
-import javax.sql.DataSource;
-import java.util.Collection;
-import java.util.Collections;
-
-/**
-* Create table SQL generator for H2.
- */
-public final class H2CreateTableSQLGenerator implements
CreateTableSQLGenerator {
-
- @Override
- public Collection<String> generate(final DataSource dataSource, final
String schemaName, final String tableName) {
- if ("t_order".equalsIgnoreCase(tableName)) {
- return
Collections.singletonList(PipelineContextUtils.getCreateOrderTableSchema());
- }
- throw new CreateTableSQLGenerateException(tableName);
- }
-
- @Override
- public String getDatabaseType() {
- return "H2";
- }
-}
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/h2/sqlbuilder/H2PipelineSQLBuilder.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/h2/sqlbuilder/H2PipelineSQLBuilder.java
index e848883d8cd..e0a8b73e81d 100644
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/h2/sqlbuilder/H2PipelineSQLBuilder.java
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/h2/sqlbuilder/H2PipelineSQLBuilder.java
@@ -17,7 +17,14 @@
package
org.apache.shardingsphere.test.it.data.pipeline.core.fixture.h2.sqlbuilder;
-import
org.apache.shardingsphere.data.pipeline.core.spi.sql.DialectPipelineSQLBuilder;
+import
org.apache.shardingsphere.data.pipeline.core.exception.syntax.CreateTableSQLGenerateException;
+import
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.dialect.DialectPipelineSQLBuilder;
+import
org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
+import
org.apache.shardingsphere.test.it.data.pipeline.core.util.PipelineContextUtils;
+
+import javax.sql.DataSource;
+import java.util.Collection;
+import java.util.Collections;
/**
* Pipeline SQL builder for H2.
@@ -25,10 +32,16 @@ import
org.apache.shardingsphere.data.pipeline.core.spi.sql.DialectPipelineSQLBu
public final class H2PipelineSQLBuilder implements DialectPipelineSQLBuilder {
@Override
- public String buildCheckEmptySQL(final String qualifiedTableName) {
+ public String buildCheckEmptyTableSQL(final String qualifiedTableName) {
return String.format("SELECT * FROM %s LIMIT 1", qualifiedTableName);
}
+ @Override
+ public Collection<String> buildCreateTableSQLs(final DataSource
dataSource, final String schemaName, final String tableName) {
+
ShardingSpherePreconditions.checkState("t_order".equalsIgnoreCase(tableName),
() -> new CreateTableSQLGenerateException(tableName));
+ return
Collections.singleton(PipelineContextUtils.getCreateOrderTableSchema());
+ }
+
@Override
public String getDatabaseType() {
return "H2";
diff --git
a/test/it/pipeline/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.spi.sql.CreateTableSQLGenerator
b/test/it/pipeline/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.spi.sql.CreateTableSQLGenerator
deleted file mode 100644
index 33ec905f699..00000000000
---
a/test/it/pipeline/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.spi.sql.CreateTableSQLGenerator
+++ /dev/null
@@ -1,18 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-org.apache.shardingsphere.test.it.data.pipeline.core.fixture.h2.sql.H2CreateTableSQLGenerator
diff --git
a/test/it/pipeline/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.spi.sql.DialectPipelineSQLBuilder
b/test/it/pipeline/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.sqlbuilder.dialect.DialectPipelineSQLBuilder
similarity index 100%
rename from
test/it/pipeline/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.spi.sql.DialectPipelineSQLBuilder
rename to
test/it/pipeline/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.sqlbuilder.dialect.DialectPipelineSQLBuilder