This is an automated email from the ASF dual-hosted git repository.
zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 56a258187c0 Refactor PipelineJobPreparer (#29427)
56a258187c0 is described below
commit 56a258187c004b1932f2115e1d8e04eae6cd2da3
Author: Liang Zhang <[email protected]>
AuthorDate: Sun Dec 17 22:35:44 2023 +0800
Refactor PipelineJobPreparer (#29427)
* Refactor DefaultDataSourcePrepareOption
* Rename DialectDataSourcePrepareOption
* Move DialectDataSourcePrepareOption
* Add checker package
* Refactor DataSourceCheckEngine
* Refactor DataSourceCheckEngine
* Refactor InventoryTaskSplitter
* Move InventoryTaskSplitter
* Refactor CreateTableConfiguration
* Rename PipelineJobDataSourcePreparer
* Rename PipelineJobPreparer
* Refactor PipelineJobPreparer
* Refactor PipelineJobPreparer
* Refactor PipelineJobPreparer
* Refactor PipelineJobPreparer
---
.../DataSourceCheckEngine.java | 22 ++++----
.../DialectDataSourceChecker.java | 2 +-
...PreparerUtils.java => PipelineJobPreparer.java} | 66 +++++++++-------------
...rer.java => PipelineJobDataSourcePreparer.java} | 9 +--
...DefaultPipelineJobDataSourcePrepareOption.java} | 19 ++++++-
...DialectPipelineJobDataSourcePrepareOption.java} | 15 ++---
.../param}/CreateTableConfiguration.java | 4 +-
.../param/PrepareTargetSchemasParameter.java | 1 -
.../param/PrepareTargetTablesParameter.java | 1 -
.../InventoryRecordsCountCalculator.java | 2 +-
.../{ => inventory}/InventoryTaskSplitter.java | 4 +-
...tion.DialectPipelineJobDataSourcePrepareOption} | 2 +-
.../datasource/DataSourceCheckEngineTest.java | 1 +
.../check/datasource/MySQLDataSourceChecker.java | 2 +-
...pipeline.core.checker.DialectDataSourceChecker} | 0
.../datasource/OpenGaussDataSourceChecker.java | 2 +-
...enGaussPipelineJobDataSourcePrepareOption.java} | 6 +-
...pipeline.core.checker.DialectDataSourceChecker} | 0
...tion.DialectPipelineJobDataSourcePrepareOption} | 2 +-
.../datasource/PostgreSQLDataSourceChecker.java | 2 +-
...pipeline.core.checker.DialectDataSourceChecker} | 0
.../data/pipeline/cdc/api/CDCJobAPI.java | 8 ++-
.../pipeline/cdc/core/prepare/CDCJobPreparer.java | 8 +--
.../pipeline/scenario/migration/MigrationJob.java | 9 +--
.../config/MigrationTaskConfiguration.java | 2 +-
.../migration/preparer/MigrationJobPreparer.java | 30 +++++-----
.../core/fixture/FixtureDataSourceChecker.java | 2 +-
.../core/prepare/InventoryTaskSplitterTest.java | 2 +-
.../pipeline/core/util/PipelineContextUtils.java | 2 +-
...pipeline.core.checker.DialectDataSourceChecker} | 0
30 files changed, 110 insertions(+), 115 deletions(-)
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/DataSourceCheckEngine.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/checker/DataSourceCheckEngine.java
similarity index 83%
rename from
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/DataSourceCheckEngine.java
rename to
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/checker/DataSourceCheckEngine.java
index 6b8565fb5dd..89dbd1d2b75 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/DataSourceCheckEngine.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/checker/DataSourceCheckEngine.java
@@ -15,13 +15,12 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.core.preparer.datasource;
+package org.apache.shardingsphere.data.pipeline.core.checker;
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mapper.TableAndSchemaNameMapper;
import
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.PipelineCommonSQLBuilder;
import
org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithInvalidConnectionException;
import
org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithTargetTableNotEmptyException;
-import
org.apache.shardingsphere.data.pipeline.core.spi.datasource.DialectDataSourceChecker;
import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
import
org.apache.shardingsphere.infra.database.core.spi.DatabaseTypedSPILoader;
@@ -37,13 +36,13 @@ import java.util.Collection;
*/
public final class DataSourceCheckEngine {
- private final DatabaseType databaseType;
+ private final DialectDataSourceChecker checker;
- private final DialectDataSourceChecker dialectDataSourceChecker;
+ private final PipelineCommonSQLBuilder sqlBuilder;
public DataSourceCheckEngine(final DatabaseType databaseType) {
- this.databaseType = databaseType;
- dialectDataSourceChecker =
DatabaseTypedSPILoader.findService(DialectDataSourceChecker.class,
databaseType).orElse(null);
+ checker =
DatabaseTypedSPILoader.findService(DialectDataSourceChecker.class,
databaseType).orElse(null);
+ sqlBuilder = new PipelineCommonSQLBuilder(databaseType);
}
/**
@@ -87,8 +86,7 @@ public final class DataSourceCheckEngine {
}
private boolean checkEmpty(final DataSource dataSource, final String
schemaName, final String tableName) throws SQLException {
- PipelineCommonSQLBuilder pipelineSQLBuilder = new
PipelineCommonSQLBuilder(databaseType);
- String sql = pipelineSQLBuilder.buildCheckEmptySQL(schemaName,
tableName);
+ String sql = sqlBuilder.buildCheckEmptySQL(schemaName, tableName);
try (
Connection connection = dataSource.getConnection();
PreparedStatement preparedStatement =
connection.prepareStatement(sql);
@@ -103,11 +101,11 @@ public final class DataSourceCheckEngine {
* @param dataSources data sources
*/
public void checkPrivilege(final Collection<? extends DataSource>
dataSources) {
- if (null == dialectDataSourceChecker) {
+ if (null == checker) {
return;
}
for (DataSource each : dataSources) {
- dialectDataSourceChecker.checkPrivilege(each);
+ checker.checkPrivilege(each);
}
}
@@ -117,11 +115,11 @@ public final class DataSourceCheckEngine {
* @param dataSources data sources
*/
public void checkVariable(final Collection<? extends DataSource>
dataSources) {
- if (null == dialectDataSourceChecker) {
+ if (null == checker) {
return;
}
for (DataSource each : dataSources) {
- dialectDataSourceChecker.checkVariable(each);
+ checker.checkVariable(each);
}
}
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/datasource/DialectDataSourceChecker.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/checker/DialectDataSourceChecker.java
similarity index 95%
rename from
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/datasource/DialectDataSourceChecker.java
rename to
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/checker/DialectDataSourceChecker.java
index 0f9bca5cea9..946f6b53e71 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/datasource/DialectDataSourceChecker.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/checker/DialectDataSourceChecker.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.core.spi.datasource;
+package org.apache.shardingsphere.data.pipeline.core.checker;
import org.apache.shardingsphere.infra.database.core.spi.DatabaseTypedSPI;
import org.apache.shardingsphere.infra.spi.annotation.SingletonSPI;
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/PipelineJobPreparerUtils.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/PipelineJobPreparer.java
similarity index 71%
rename from
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/PipelineJobPreparerUtils.java
rename to
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/PipelineJobPreparer.java
index 8be1d3e1e3d..51d3a5e17d1 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/PipelineJobPreparerUtils.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/PipelineJobPreparer.java
@@ -17,21 +17,20 @@
package org.apache.shardingsphere.data.pipeline.core.preparer;
-import lombok.AccessLevel;
-import lombok.NoArgsConstructor;
+import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import
org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.type.ShardingSpherePipelineDataSourceConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.type.StandardPipelineDataSourceConfiguration;
+import
org.apache.shardingsphere.data.pipeline.core.checker.DataSourceCheckEngine;
import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceWrapper;
import
org.apache.shardingsphere.data.pipeline.core.importer.ImporterConfiguration;
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.IncrementalDumperContext;
import
org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPosition;
import
org.apache.shardingsphere.data.pipeline.core.job.progress.JobItemIncrementalTasksProgress;
-import
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.DataSourceCheckEngine;
-import
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.DataSourcePreparer;
-import
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.DataSourcePrepareOption;
+import
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.PipelineJobDataSourcePreparer;
+import
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.option.DialectPipelineJobDataSourcePrepareOption;
import
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param.PrepareTargetSchemasParameter;
import
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param.PrepareTargetTablesParameter;
import
org.apache.shardingsphere.data.pipeline.core.spi.ingest.dumper.IncrementalDumperCreator;
@@ -41,7 +40,6 @@ import
org.apache.shardingsphere.infra.database.core.type.DatabaseType;
import
org.apache.shardingsphere.infra.datasource.pool.creator.DataSourcePoolCreator;
import
org.apache.shardingsphere.infra.datasource.pool.props.domain.DataSourcePoolProperties;
import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
-import
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
import org.apache.shardingsphere.infra.parser.SQLParserEngine;
import
org.apache.shardingsphere.infra.yaml.config.swapper.resource.YamlDataSourceConfigurationSwapper;
import org.apache.shardingsphere.parser.rule.SQLParserRule;
@@ -52,60 +50,56 @@ import java.util.Collection;
import java.util.Optional;
/**
- * Pipeline job preparer utility class.
+ * Pipeline job preparer.
*/
-@NoArgsConstructor(access = AccessLevel.PRIVATE)
+@RequiredArgsConstructor
@Slf4j
-public final class PipelineJobPreparerUtils {
+public final class PipelineJobPreparer {
+
+ private final DatabaseType databaseType;
/**
* Is incremental supported.
*
- * @param databaseType database type
- * @return true if supported, otherwise false
+ * @return support incremental or not
*/
- public static boolean isIncrementalSupported(final DatabaseType
databaseType) {
+ public boolean isIncrementalSupported() {
return
DatabaseTypedSPILoader.findService(IncrementalDumperCreator.class,
databaseType).map(IncrementalDumperCreator::isSupportIncrementalDump).orElse(false);
}
/**
* Prepare target schema.
*
- * @param databaseType database type
- * @param prepareTargetSchemasParam prepare target schemas parameter
+ * @param param prepare target schemas parameter
* @throws SQLException if prepare target schema fail
*/
- public static void prepareTargetSchema(final DatabaseType databaseType,
final PrepareTargetSchemasParameter prepareTargetSchemasParam) throws
SQLException {
- DataSourcePrepareOption option =
DatabaseTypedSPILoader.findService(DataSourcePrepareOption.class, databaseType)
- .orElseGet(() ->
DatabaseTypedSPILoader.getService(DataSourcePrepareOption.class, null));
- new
DataSourcePreparer(option).prepareTargetSchemas(prepareTargetSchemasParam);
+ public void prepareTargetSchema(final PrepareTargetSchemasParameter param)
throws SQLException {
+ DialectPipelineJobDataSourcePrepareOption option =
DatabaseTypedSPILoader.findService(DialectPipelineJobDataSourcePrepareOption.class,
databaseType)
+ .orElseGet(() ->
DatabaseTypedSPILoader.getService(DialectPipelineJobDataSourcePrepareOption.class,
null));
+ new PipelineJobDataSourcePreparer(option).prepareTargetSchemas(param);
}
/**
* Get SQL parser engine.
*
* @param metaData meta data
- * @param targetDatabaseName target database name
* @return SQL parser engine
*/
- public static SQLParserEngine getSQLParserEngine(final
ShardingSphereMetaData metaData, final String targetDatabaseName) {
- ShardingSphereDatabase database =
metaData.getDatabase(targetDatabaseName);
- DatabaseType databaseType =
database.getProtocolType().getTrunkDatabaseType().orElse(database.getProtocolType());
+ public SQLParserEngine getSQLParserEngine(final ShardingSphereMetaData
metaData) {
return
metaData.getGlobalRuleMetaData().getSingleRule(SQLParserRule.class).getSQLParserEngine(databaseType);
}
/**
* Prepare target tables.
*
- * @param databaseType database type
* @param prepareTargetTablesParam prepare target tables parameter
* @throws SQLException SQL exception
*/
- public static void prepareTargetTables(final DatabaseType databaseType,
final PrepareTargetTablesParameter prepareTargetTablesParam) throws
SQLException {
- DataSourcePrepareOption option =
DatabaseTypedSPILoader.findService(DataSourcePrepareOption.class, databaseType)
- .orElseGet(() ->
DatabaseTypedSPILoader.getService(DataSourcePrepareOption.class, null));
+ public void prepareTargetTables(final PrepareTargetTablesParameter
prepareTargetTablesParam) throws SQLException {
+ DialectPipelineJobDataSourcePrepareOption option =
DatabaseTypedSPILoader.findService(DialectPipelineJobDataSourcePrepareOption.class,
databaseType)
+ .orElseGet(() ->
DatabaseTypedSPILoader.getService(DialectPipelineJobDataSourcePrepareOption.class,
null));
long startTimeMillis = System.currentTimeMillis();
- new
DataSourcePreparer(option).prepareTargetTables(prepareTargetTablesParam);
+ new
PipelineJobDataSourcePreparer(option).prepareTargetTables(prepareTargetTablesParam);
log.info("prepareTargetTables cost {} ms", System.currentTimeMillis()
- startTimeMillis);
}
@@ -118,15 +112,14 @@ public final class PipelineJobPreparerUtils {
* @return ingest position
* @throws SQLException sql exception
*/
- public static IngestPosition getIncrementalPosition(final
JobItemIncrementalTasksProgress initIncremental, final IncrementalDumperContext
dumperContext,
- final
PipelineDataSourceManager dataSourceManager) throws SQLException {
+ public IngestPosition getIncrementalPosition(final
JobItemIncrementalTasksProgress initIncremental, final IncrementalDumperContext
dumperContext,
+ final
PipelineDataSourceManager dataSourceManager) throws SQLException {
if (null != initIncremental) {
Optional<IngestPosition> position =
initIncremental.getIncrementalPosition();
if (position.isPresent()) {
return position.get();
}
}
- DatabaseType databaseType =
dumperContext.getCommonContext().getDataSourceConfig().getDatabaseType();
DataSource dataSource =
dataSourceManager.getDataSource(dumperContext.getCommonContext().getDataSourceConfig());
return DatabaseTypedSPILoader.getService(PositionInitializer.class,
databaseType).init(dataSource, dumperContext.getJobId());
}
@@ -134,10 +127,9 @@ public final class PipelineJobPreparerUtils {
/**
* Check data source.
*
- * @param databaseType database type
* @param dataSources data source
*/
- public static void checkSourceDataSource(final DatabaseType databaseType,
final Collection<? extends DataSource> dataSources) {
+ public void checkSourceDataSource(final Collection<? extends DataSource>
dataSources) {
if (dataSources.isEmpty()) {
return;
}
@@ -150,11 +142,10 @@ public final class PipelineJobPreparerUtils {
/**
* Check target data source.
*
- * @param databaseType database type
* @param importerConfig importer config
* @param targetDataSources target data sources
*/
- public static void checkTargetDataSource(final DatabaseType databaseType,
final ImporterConfiguration importerConfig, final Collection<? extends
DataSource> targetDataSources) {
+ public void checkTargetDataSource(final ImporterConfiguration
importerConfig, final Collection<? extends DataSource> targetDataSources) {
if (null == targetDataSources || targetDataSources.isEmpty()) {
log.info("target data source is empty, skip check");
return;
@@ -167,12 +158,11 @@ public final class PipelineJobPreparerUtils {
/**
* Cleanup job preparer.
*
- * @param jobId job id
+ * @param jobId pipeline job id
* @param pipelineDataSourceConfig pipeline data source config
- * @throws SQLException sql exception
+ * @throws SQLException SQL exception
*/
- public static void destroyPosition(final String jobId, final
PipelineDataSourceConfiguration pipelineDataSourceConfig) throws SQLException {
- DatabaseType databaseType = pipelineDataSourceConfig.getDatabaseType();
+ public void destroyPosition(final String jobId, final
PipelineDataSourceConfiguration pipelineDataSourceConfig) throws SQLException {
PositionInitializer positionInitializer =
DatabaseTypedSPILoader.getService(PositionInitializer.class, databaseType);
final long startTimeMillis = System.currentTimeMillis();
log.info("Cleanup database type:{}, data source type:{}",
databaseType.getType(), pipelineDataSourceConfig.getType());
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/DataSourcePreparer.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/PipelineJobDataSourcePreparer.java
similarity index 95%
rename from
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/DataSourcePreparer.java
rename to
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/PipelineJobDataSourcePreparer.java
index d738edeba8e..3f9ae7a8bc0 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/DataSourcePreparer.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/PipelineJobDataSourcePreparer.java
@@ -23,7 +23,8 @@ import lombok.extern.slf4j.Slf4j;
import
org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration;
import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
import
org.apache.shardingsphere.data.pipeline.core.metadata.generator.PipelineDDLGenerator;
-import
org.apache.shardingsphere.data.pipeline.core.preparer.CreateTableConfiguration;
+import
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.option.DialectPipelineJobDataSourcePrepareOption;
+import
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param.CreateTableConfiguration;
import
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param.PrepareTargetSchemasParameter;
import
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param.PrepareTargetTablesParameter;
import
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.PipelineCommonSQLBuilder;
@@ -42,17 +43,17 @@ import java.util.Optional;
import java.util.regex.Pattern;
/**
- * Data source preparer.
+ * Pipeline job data source preparer.
*/
@RequiredArgsConstructor
@Slf4j
-public final class DataSourcePreparer {
+public final class PipelineJobDataSourcePreparer {
private static final Pattern PATTERN_CREATE_TABLE_IF_NOT_EXISTS =
Pattern.compile("CREATE\\s+TABLE\\s+IF\\s+NOT\\s+EXISTS\\s+",
Pattern.CASE_INSENSITIVE);
private static final Pattern PATTERN_CREATE_TABLE =
Pattern.compile("CREATE\\s+TABLE\\s+", Pattern.CASE_INSENSITIVE);
- private final DataSourcePrepareOption option;
+ private final DialectPipelineJobDataSourcePrepareOption option;
/**
* Prepare target schemas.
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/DefaultDataSourcePrepareOption.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/option/DefaultPipelineJobDataSourcePrepareOption.java
similarity index 66%
rename from
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/DefaultDataSourcePrepareOption.java
rename to
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/option/DefaultPipelineJobDataSourcePrepareOption.java
index fd22b23e244..34a42ed258e 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/DefaultDataSourcePrepareOption.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/option/DefaultPipelineJobDataSourcePrepareOption.java
@@ -15,12 +15,25 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.core.preparer.datasource;
+package
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.option;
+
+import java.util.Collection;
+import java.util.Collections;
/**
- * Default data source prepare option.
+ * Default pipeline job data source prepare option.
*/
-public final class DefaultDataSourcePrepareOption implements
DataSourcePrepareOption {
+public final class DefaultPipelineJobDataSourcePrepareOption implements
DialectPipelineJobDataSourcePrepareOption {
+
+ @Override
+ public boolean isSupportIfNotExistsOnCreateSchema() {
+ return true;
+ }
+
+ @Override
+ public Collection<String> getIgnoredExceptionMessages() {
+ return Collections.emptyList();
+ }
@Override
public boolean isDefault() {
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/DataSourcePrepareOption.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/option/DialectPipelineJobDataSourcePrepareOption.java
similarity index 79%
rename from
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/DataSourcePrepareOption.java
rename to
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/option/DialectPipelineJobDataSourcePrepareOption.java
index 52bc1962bcd..70a0d7944ac 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/DataSourcePrepareOption.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/option/DialectPipelineJobDataSourcePrepareOption.java
@@ -15,35 +15,30 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.core.preparer.datasource;
+package
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.option;
import org.apache.shardingsphere.infra.database.core.spi.DatabaseTypedSPI;
import org.apache.shardingsphere.infra.spi.annotation.SingletonSPI;
import java.util.Collection;
-import java.util.Collections;
/**
- * Data source prepare option.
+ * Dialect pipeline job data source prepare option.
*/
@SingletonSPI
-public interface DataSourcePrepareOption extends DatabaseTypedSPI {
+public interface DialectPipelineJobDataSourcePrepareOption extends
DatabaseTypedSPI {
/**
* Is support if not exists on create schema SQL.
*
* @return supported or not
*/
- default boolean isSupportIfNotExistsOnCreateSchema() {
- return true;
- }
+ boolean isSupportIfNotExistsOnCreateSchema();
/**
* Get ignored exception messages.
*
* @return ignored exception messages
*/
- default Collection<String> getIgnoredExceptionMessages() {
- return Collections.emptyList();
- }
+ Collection<String> getIgnoredExceptionMessages();
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/CreateTableConfiguration.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/param/CreateTableConfiguration.java
similarity index 90%
rename from
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/CreateTableConfiguration.java
rename to
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/param/CreateTableConfiguration.java
index 2f7d284e103..737e16159bf 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/CreateTableConfiguration.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/param/CreateTableConfiguration.java
@@ -15,11 +15,10 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.core.preparer;
+package org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
-import lombok.ToString;
import
org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration;
import
org.apache.shardingsphere.data.pipeline.core.metadata.CaseInsensitiveQualifiedTable;
@@ -28,7 +27,6 @@ import
org.apache.shardingsphere.data.pipeline.core.metadata.CaseInsensitiveQual
*/
@RequiredArgsConstructor
@Getter
-@ToString(exclude = {"sourceDataSourceConfig", "targetDataSourceConfig"})
public final class CreateTableConfiguration {
private final PipelineDataSourceConfiguration sourceDataSourceConfig;
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/param/PrepareTargetSchemasParameter.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/param/PrepareTargetSchemasParameter.java
index 7b4cf8e8d36..a4ae2cc3942 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/param/PrepareTargetSchemasParameter.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/param/PrepareTargetSchemasParameter.java
@@ -19,7 +19,6 @@ package
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
-import
org.apache.shardingsphere.data.pipeline.core.preparer.CreateTableConfiguration;
import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/param/PrepareTargetTablesParameter.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/param/PrepareTargetTablesParameter.java
index 348c931eb0a..26487299886 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/param/PrepareTargetTablesParameter.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/param/PrepareTargetTablesParameter.java
@@ -19,7 +19,6 @@ package
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
-import
org.apache.shardingsphere.data.pipeline.core.preparer.CreateTableConfiguration;
import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
import org.apache.shardingsphere.infra.parser.SQLParserEngine;
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/InventoryRecordsCountCalculator.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/InventoryRecordsCountCalculator.java
similarity index 98%
rename from
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/InventoryRecordsCountCalculator.java
rename to
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/InventoryRecordsCountCalculator.java
index f910490fe56..894d5d55f91 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/InventoryRecordsCountCalculator.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/InventoryRecordsCountCalculator.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.core.preparer;
+package org.apache.shardingsphere.data.pipeline.core.preparer.inventory;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/InventoryTaskSplitter.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/InventoryTaskSplitter.java
similarity index 99%
rename from
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/InventoryTaskSplitter.java
rename to
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/InventoryTaskSplitter.java
index 74d1bf53d04..95d9c9e4eaa 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/InventoryTaskSplitter.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/InventoryTaskSplitter.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.core.preparer;
+package org.apache.shardingsphere.data.pipeline.core.preparer.inventory;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
@@ -60,7 +60,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
/**
- * Inventory data task splitter.
+ * Inventory task splitter.
*/
@RequiredArgsConstructor
@Slf4j
diff --git
a/kernel/data-pipeline/core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.preparer.datasource.DataSourcePrepareOption
b/kernel/data-pipeline/core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.preparer.datasource.option.DialectPipelineJobDataSourcePrepareOption
similarity index 94%
rename from
kernel/data-pipeline/core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.preparer.datasource.DataSourcePrepareOption
rename to
kernel/data-pipeline/core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.preparer.datasource.option.DialectPipelineJobDataSourcePrepareOption
index 6acc3318877..9a154a9272c 100644
---
a/kernel/data-pipeline/core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.preparer.datasource.DataSourcePrepareOption
+++
b/kernel/data-pipeline/core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.preparer.datasource.option.DialectPipelineJobDataSourcePrepareOption
@@ -15,4 +15,4 @@
# limitations under the License.
#
-org.apache.shardingsphere.data.pipeline.core.preparer.datasource.DefaultDataSourcePrepareOption
+org.apache.shardingsphere.data.pipeline.core.preparer.datasource.option.DefaultPipelineJobDataSourcePrepareOption
diff --git
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/DataSourceCheckEngineTest.java
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/DataSourceCheckEngineTest.java
index 3f8f36c97a8..f117cf933dd 100644
---
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/DataSourceCheckEngineTest.java
+++
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/DataSourceCheckEngineTest.java
@@ -17,6 +17,7 @@
package org.apache.shardingsphere.data.pipeline.core.preparer.datasource;
+import
org.apache.shardingsphere.data.pipeline.core.checker.DataSourceCheckEngine;
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mapper.TableAndSchemaNameMapper;
import
org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithInvalidConnectionException;
import
org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithTargetTableNotEmptyException;
diff --git
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/check/datasource/MySQLDataSourceChecker.java
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/check/datasource/MySQLDataSourceChecker.java
index 2bc39c2ecd4..36132ba2769 100644
---
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/check/datasource/MySQLDataSourceChecker.java
+++
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/check/datasource/MySQLDataSourceChecker.java
@@ -20,7 +20,7 @@ package
org.apache.shardingsphere.data.pipeline.mysql.check.datasource;
import
org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithCheckPrivilegeFailedException;
import
org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithInvalidSourceDataSourceException;
import
org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithoutEnoughPrivilegeException;
-import
org.apache.shardingsphere.data.pipeline.core.spi.datasource.DialectDataSourceChecker;
+import
org.apache.shardingsphere.data.pipeline.core.checker.DialectDataSourceChecker;
import
org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
import javax.sql.DataSource;
diff --git
a/kernel/data-pipeline/dialect/mysql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.spi.datasource.DialectDataSourceChecker
b/kernel/data-pipeline/dialect/mysql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.checker.DialectDataSourceChecker
similarity index 100%
rename from
kernel/data-pipeline/dialect/mysql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.spi.datasource.DialectDataSourceChecker
rename to
kernel/data-pipeline/dialect/mysql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.checker.DialectDataSourceChecker
diff --git
a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/check/datasource/OpenGaussDataSourceChecker.java
b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/check/datasource/OpenGaussDataSourceChecker.java
index eae8d6e6672..f9819c42f46 100644
---
a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/check/datasource/OpenGaussDataSourceChecker.java
+++
b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/check/datasource/OpenGaussDataSourceChecker.java
@@ -20,7 +20,7 @@ package
org.apache.shardingsphere.data.pipeline.opengauss.check.datasource;
import
org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithCheckPrivilegeFailedException;
import
org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithoutEnoughPrivilegeException;
import
org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithoutUserException;
-import
org.apache.shardingsphere.data.pipeline.core.spi.datasource.DialectDataSourceChecker;
+import
org.apache.shardingsphere.data.pipeline.core.checker.DialectDataSourceChecker;
import
org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
import javax.sql.DataSource;
diff --git
a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/prepare/datasource/OpenGaussDataSourcePrepareOption.java
b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/prepare/datasource/OpenGaussPipelineJobDataSourcePrepareOption.java
similarity index 85%
rename from
kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/prepare/datasource/OpenGaussDataSourcePrepareOption.java
rename to
kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/prepare/datasource/OpenGaussPipelineJobDataSourcePrepareOption.java
index 846a041b4d2..2771c5d9353 100644
---
a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/prepare/datasource/OpenGaussDataSourcePrepareOption.java
+++
b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/prepare/datasource/OpenGaussPipelineJobDataSourcePrepareOption.java
@@ -18,16 +18,16 @@
package org.apache.shardingsphere.data.pipeline.opengauss.prepare.datasource;
import lombok.extern.slf4j.Slf4j;
-import
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.DataSourcePrepareOption;
+import
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.option.DialectPipelineJobDataSourcePrepareOption;
import java.util.Arrays;
import java.util.Collection;
/**
- * Data source prepare option for openGauss.
+ * Pipeline job data source prepare option for openGauss.
*/
@Slf4j
-public final class OpenGaussDataSourcePrepareOption implements
DataSourcePrepareOption {
+public final class OpenGaussPipelineJobDataSourcePrepareOption implements
DialectPipelineJobDataSourcePrepareOption {
@Override
public boolean isSupportIfNotExistsOnCreateSchema() {
diff --git
a/kernel/data-pipeline/dialect/opengauss/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.spi.datasource.DialectDataSourceChecker
b/kernel/data-pipeline/dialect/opengauss/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.checker.DialectDataSourceChecker
similarity index 100%
rename from
kernel/data-pipeline/dialect/opengauss/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.spi.datasource.DialectDataSourceChecker
rename to
kernel/data-pipeline/dialect/opengauss/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.checker.DialectDataSourceChecker
diff --git
a/kernel/data-pipeline/dialect/opengauss/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.preparer.datasource.DataSourcePrepareOption
b/kernel/data-pipeline/dialect/opengauss/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.preparer.datasource.option.DialectPipelineJobDataSourcePrepareOption
similarity index 94%
rename from
kernel/data-pipeline/dialect/opengauss/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.preparer.datasource.DataSourcePrepareOption
rename to
kernel/data-pipeline/dialect/opengauss/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.preparer.datasource.option.DialectPipelineJobDataSourcePrepareOption
index 72923bbe665..e1fd88a4da6 100644
---
a/kernel/data-pipeline/dialect/opengauss/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.preparer.datasource.DataSourcePrepareOption
+++
b/kernel/data-pipeline/dialect/opengauss/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.preparer.datasource.option.DialectPipelineJobDataSourcePrepareOption
@@ -15,4 +15,4 @@
# limitations under the License.
#
-org.apache.shardingsphere.data.pipeline.opengauss.prepare.datasource.OpenGaussDataSourcePrepareOption
+org.apache.shardingsphere.data.pipeline.opengauss.prepare.datasource.OpenGaussPipelineJobDataSourcePrepareOption
diff --git
a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/check/datasource/PostgreSQLDataSourceChecker.java
b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/check/datasource/PostgreSQLDataSourceChecker.java
index c8e00146663..693afc16076 100644
---
a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/check/datasource/PostgreSQLDataSourceChecker.java
+++
b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/check/datasource/PostgreSQLDataSourceChecker.java
@@ -21,7 +21,7 @@ import lombok.extern.slf4j.Slf4j;
import
org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithCheckPrivilegeFailedException;
import
org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithoutEnoughPrivilegeException;
import
org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithoutUserException;
-import
org.apache.shardingsphere.data.pipeline.core.spi.datasource.DialectDataSourceChecker;
+import
org.apache.shardingsphere.data.pipeline.core.checker.DialectDataSourceChecker;
import
org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
import javax.sql.DataSource;
diff --git
a/kernel/data-pipeline/dialect/postgresql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.spi.datasource.DialectDataSourceChecker
b/kernel/data-pipeline/dialect/postgresql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.checker.DialectDataSourceChecker
similarity index 100%
rename from
kernel/data-pipeline/dialect/postgresql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.spi.datasource.DialectDataSourceChecker
rename to
kernel/data-pipeline/dialect/postgresql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.checker.DialectDataSourceChecker
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
index 8474fcfe2e9..04ef66dcac7 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
@@ -52,7 +52,7 @@ import
org.apache.shardingsphere.data.pipeline.core.job.progress.TransmissionJob
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobConfigurationManager;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
-import
org.apache.shardingsphere.data.pipeline.core.preparer.PipelineJobPreparerUtils;
+import
org.apache.shardingsphere.data.pipeline.core.preparer.PipelineJobPreparer;
import
org.apache.shardingsphere.data.pipeline.core.registrycenter.repository.PipelineGovernanceFacade;
import
org.apache.shardingsphere.data.pipeline.core.task.progress.IncrementalTaskProgress;
import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
@@ -202,7 +202,8 @@ public final class CDCJobAPI implements TransmissionJobAPI {
TransmissionJobItemProgress result = new TransmissionJobItemProgress();
result.setSourceDatabaseType(jobConfig.getSourceDatabaseType());
result.setDataSourceName(incrementalDumperContext.getCommonContext().getDataSourceName());
- IncrementalTaskProgress incrementalTaskProgress = new
IncrementalTaskProgress(PipelineJobPreparerUtils.getIncrementalPosition(null,
incrementalDumperContext, dataSourceManager));
+ IncrementalTaskProgress incrementalTaskProgress = new
IncrementalTaskProgress(new PipelineJobPreparer(
+
incrementalDumperContext.getCommonContext().getDataSourceConfig().getDatabaseType()).getIncrementalPosition(null,
incrementalDumperContext, dataSourceManager));
result.setIncremental(new
JobItemIncrementalTasksProgress(incrementalTaskProgress));
return result;
}
@@ -258,7 +259,8 @@ public final class CDCJobAPI implements TransmissionJobAPI {
private void cleanup(final CDCJobConfiguration jobConfig) {
for (Entry<String, Map<String, Object>> entry :
jobConfig.getDataSourceConfig().getRootConfig().getDataSources().entrySet()) {
try {
- PipelineJobPreparerUtils.destroyPosition(jobConfig.getJobId(),
new StandardPipelineDataSourceConfiguration(entry.getValue()));
+ StandardPipelineDataSourceConfiguration
pipelineDataSourceConfig = new
StandardPipelineDataSourceConfiguration(entry.getValue());
+ new
PipelineJobPreparer(pipelineDataSourceConfig.getDatabaseType()).destroyPosition(jobConfig.getJobId(),
pipelineDataSourceConfig);
} catch (final SQLException ex) {
log.warn("job destroying failed, jobId={}, dataSourceName={}",
jobConfig.getJobId(), entry.getKey(), ex);
}
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
index 82e1330c395..9b991c3e8e1 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
@@ -41,8 +41,8 @@ import
org.apache.shardingsphere.data.pipeline.core.job.PipelineJobRegistry;
import
org.apache.shardingsphere.data.pipeline.core.job.progress.JobItemIncrementalTasksProgress;
import
org.apache.shardingsphere.data.pipeline.core.job.progress.TransmissionJobItemProgress;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
-import
org.apache.shardingsphere.data.pipeline.core.preparer.InventoryTaskSplitter;
-import
org.apache.shardingsphere.data.pipeline.core.preparer.PipelineJobPreparerUtils;
+import
org.apache.shardingsphere.data.pipeline.core.preparer.inventory.InventoryTaskSplitter;
+import
org.apache.shardingsphere.data.pipeline.core.preparer.PipelineJobPreparer;
import
org.apache.shardingsphere.data.pipeline.core.spi.ingest.dumper.IncrementalDumperCreator;
import org.apache.shardingsphere.data.pipeline.core.task.PipelineTask;
import org.apache.shardingsphere.data.pipeline.core.task.PipelineTaskUtils;
@@ -106,8 +106,8 @@ public final class CDCJobPreparer {
CDCTaskConfiguration taskConfig = jobItemContext.getTaskConfig();
JobItemIncrementalTasksProgress initIncremental = null ==
jobItemContext.getInitProgress() ? null :
jobItemContext.getInitProgress().getIncremental();
try {
- taskConfig.getDumperContext().getCommonContext().setPosition(
-
PipelineJobPreparerUtils.getIncrementalPosition(initIncremental,
taskConfig.getDumperContext(), jobItemContext.getDataSourceManager()));
+ taskConfig.getDumperContext().getCommonContext().setPosition(new
PipelineJobPreparer(taskConfig.getDumperContext().getCommonContext().getDataSourceConfig().getDatabaseType())
+ .getIncrementalPosition(initIncremental,
taskConfig.getDumperContext(), jobItemContext.getDataSourceManager()));
} catch (final SQLException ex) {
throw new
PrepareJobWithGetBinlogPositionException(jobItemContext.getJobId(), ex);
}
diff --git
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
index af98652a279..7b276da8749 100644
---
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
+++
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
@@ -30,7 +30,7 @@ import
org.apache.shardingsphere.data.pipeline.core.job.progress.TransmissionJob
import
org.apache.shardingsphere.data.pipeline.core.job.progress.config.PipelineProcessConfiguration;
import
org.apache.shardingsphere.data.pipeline.core.metadata.CaseInsensitiveIdentifier;
import
org.apache.shardingsphere.data.pipeline.core.metadata.CaseInsensitiveQualifiedTable;
-import
org.apache.shardingsphere.data.pipeline.core.preparer.CreateTableConfiguration;
+import
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param.CreateTableConfiguration;
import
org.apache.shardingsphere.data.pipeline.core.spi.algorithm.JobRateLimitAlgorithm;
import
org.apache.shardingsphere.data.pipeline.core.task.runner.PipelineTasksRunner;
import
org.apache.shardingsphere.data.pipeline.core.task.runner.TransmissionTasksRunner;
@@ -45,7 +45,6 @@ import org.apache.shardingsphere.infra.datanode.DataNode;
import java.sql.SQLException;
import java.util.Collection;
-import java.util.LinkedList;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
@@ -76,11 +75,7 @@ public final class MigrationJob extends
AbstractSeparablePipelineJob<MigrationJo
}
private Collection<CreateTableConfiguration>
buildCreateTableConfigurations(final MigrationJobConfiguration jobConfig, final
TableAndSchemaNameMapper mapper) {
- Collection<CreateTableConfiguration> result = new LinkedList<>();
- for (JobDataNodeEntry each :
jobConfig.getTablesFirstDataNodes().getEntries()) {
- result.add(getCreateTableConfiguration(jobConfig, mapper, each));
- }
- return result;
+ return
jobConfig.getTablesFirstDataNodes().getEntries().stream().map(each ->
getCreateTableConfiguration(jobConfig, mapper,
each)).collect(Collectors.toList());
}
private CreateTableConfiguration getCreateTableConfiguration(final
MigrationJobConfiguration jobConfig, final TableAndSchemaNameMapper mapper,
final JobDataNodeEntry jobDataNodeEntry) {
diff --git
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/config/MigrationTaskConfiguration.java
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/config/MigrationTaskConfiguration.java
index 07aba7d6dd9..d36dd904cde 100644
---
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/config/MigrationTaskConfiguration.java
+++
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/config/MigrationTaskConfiguration.java
@@ -20,7 +20,7 @@ package
org.apache.shardingsphere.data.pipeline.scenario.migration.config;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.ToString;
-import
org.apache.shardingsphere.data.pipeline.core.preparer.CreateTableConfiguration;
+import
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param.CreateTableConfiguration;
import
org.apache.shardingsphere.data.pipeline.core.importer.ImporterConfiguration;
import
org.apache.shardingsphere.data.pipeline.core.task.config.PipelineTaskConfiguration;
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.IncrementalDumperContext;
diff --git
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java
index 69c4780ab64..cf01823a977 100644
---
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java
+++
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java
@@ -20,7 +20,7 @@ package
org.apache.shardingsphere.data.pipeline.scenario.migration.preparer;
import lombok.extern.slf4j.Slf4j;
import
org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.type.StandardPipelineDataSourceConfiguration;
-import
org.apache.shardingsphere.data.pipeline.core.preparer.CreateTableConfiguration;
+import
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param.CreateTableConfiguration;
import
org.apache.shardingsphere.data.pipeline.core.importer.ImporterConfiguration;
import
org.apache.shardingsphere.data.pipeline.core.context.PipelineContextManager;
import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
@@ -47,8 +47,8 @@ import
org.apache.shardingsphere.data.pipeline.core.job.PipelineJobRegistry;
import org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils;
import org.apache.shardingsphere.data.pipeline.core.job.api.PipelineAPIFactory;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
-import
org.apache.shardingsphere.data.pipeline.core.preparer.InventoryTaskSplitter;
-import
org.apache.shardingsphere.data.pipeline.core.preparer.PipelineJobPreparerUtils;
+import
org.apache.shardingsphere.data.pipeline.core.preparer.inventory.InventoryTaskSplitter;
+import
org.apache.shardingsphere.data.pipeline.core.preparer.PipelineJobPreparer;
import
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param.PrepareTargetSchemasParameter;
import
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param.PrepareTargetTablesParameter;
import org.apache.shardingsphere.data.pipeline.core.task.IncrementalTask;
@@ -95,7 +95,8 @@ public final class MigrationJobPreparer {
ShardingSpherePreconditions.checkState(StandardPipelineDataSourceConfiguration.class.equals(
jobItemContext.getTaskConfig().getDumperContext().getCommonContext().getDataSourceConfig().getClass()),
() -> new UnsupportedSQLOperationException("Migration
inventory dumper only support StandardPipelineDataSourceConfiguration"));
-
PipelineJobPreparerUtils.checkSourceDataSource(jobItemContext.getJobConfig().getSourceDatabaseType(),
Collections.singleton(jobItemContext.getSourceDataSource()));
+ PipelineJobPreparer preparer = new
PipelineJobPreparer(jobItemContext.getJobConfig().getSourceDatabaseType());
+
preparer.checkSourceDataSource(Collections.singleton(jobItemContext.getSourceDataSource()));
if (jobItemContext.isStopping()) {
PipelineJobRegistry.stop(jobItemContext.getJobId());
return;
@@ -105,7 +106,7 @@ public final class MigrationJobPreparer {
PipelineJobRegistry.stop(jobItemContext.getJobId());
return;
}
- boolean isIncrementalSupported =
PipelineJobPreparerUtils.isIncrementalSupported(jobItemContext.getJobConfig().getSourceDatabaseType());
+ boolean isIncrementalSupported = preparer.isIncrementalSupported();
if (isIncrementalSupported) {
prepareIncremental(jobItemContext);
}
@@ -157,8 +158,8 @@ public final class MigrationJobPreparer {
TransmissionJobItemProgress initProgress =
jobItemContext.getInitProgress();
if (null == initProgress) {
PipelineDataSourceWrapper targetDataSource =
jobItemContext.getDataSourceManager().getDataSource(jobItemContext.getTaskConfig().getImporterConfig().getDataSourceConfig());
- PipelineJobPreparerUtils.checkTargetDataSource(
- jobItemContext.getJobConfig().getTargetDatabaseType(),
jobItemContext.getTaskConfig().getImporterConfig(),
Collections.singleton(targetDataSource));
+ new
PipelineJobPreparer(jobItemContext.getJobConfig().getTargetDatabaseType()).checkTargetDataSource(
+ jobItemContext.getTaskConfig().getImporterConfig(),
Collections.singleton(targetDataSource));
}
}
@@ -167,18 +168,21 @@ public final class MigrationJobPreparer {
Collection<CreateTableConfiguration> createTableConfigs =
jobItemContext.getTaskConfig().getCreateTableConfigurations();
PipelineDataSourceManager dataSourceManager =
jobItemContext.getDataSourceManager();
PrepareTargetSchemasParameter prepareTargetSchemasParam = new
PrepareTargetSchemasParameter(jobItemContext.getJobConfig().getTargetDatabaseType(),
createTableConfigs, dataSourceManager);
-
PipelineJobPreparerUtils.prepareTargetSchema(jobItemContext.getJobConfig().getTargetDatabaseType(),
prepareTargetSchemasParam);
+ PipelineJobPreparer targetDataSourcePreparer = new
PipelineJobPreparer(jobItemContext.getJobConfig().getTargetDatabaseType());
+
targetDataSourcePreparer.prepareTargetSchema(prepareTargetSchemasParam);
ShardingSphereMetaData metaData =
PipelineContextManager.getContext(PipelineJobIdUtils.parseContextKey(jobConfig.getJobId())).getContextManager().getMetaDataContexts().getMetaData();
- SQLParserEngine sqlParserEngine =
PipelineJobPreparerUtils.getSQLParserEngine(metaData,
jobConfig.getTargetDatabaseName());
-
PipelineJobPreparerUtils.prepareTargetTables(jobItemContext.getJobConfig().getTargetDatabaseType(),
new PrepareTargetTablesParameter(createTableConfigs, dataSourceManager,
sqlParserEngine));
+ SQLParserEngine sqlParserEngine = new PipelineJobPreparer(
+
metaData.getDatabase(jobConfig.getTargetDatabaseName()).getProtocolType().getTrunkDatabaseType().orElse(metaData.getDatabase(jobConfig.getTargetDatabaseName()).getProtocolType()))
+ .getSQLParserEngine(metaData);
+ targetDataSourcePreparer.prepareTargetTables(new
PrepareTargetTablesParameter(createTableConfigs, dataSourceManager,
sqlParserEngine));
}
private void prepareIncremental(final MigrationJobItemContext
jobItemContext) {
MigrationTaskConfiguration taskConfig = jobItemContext.getTaskConfig();
JobItemIncrementalTasksProgress initIncremental = null ==
jobItemContext.getInitProgress() ? null :
jobItemContext.getInitProgress().getIncremental();
try {
- taskConfig.getDumperContext().getCommonContext().setPosition(
-
PipelineJobPreparerUtils.getIncrementalPosition(initIncremental,
taskConfig.getDumperContext(), jobItemContext.getDataSourceManager()));
+ taskConfig.getDumperContext().getCommonContext().setPosition(new
PipelineJobPreparer(taskConfig.getDumperContext().getCommonContext().getDataSourceConfig().getDatabaseType())
+ .getIncrementalPosition(initIncremental,
taskConfig.getDumperContext(), jobItemContext.getDataSourceManager()));
} catch (final SQLException ex) {
throw new
PrepareJobWithGetBinlogPositionException(jobItemContext.getJobId(), ex);
}
@@ -224,7 +228,7 @@ public final class MigrationJobPreparer {
public void cleanup(final MigrationJobConfiguration jobConfig) {
for (Entry<String, PipelineDataSourceConfiguration> entry :
jobConfig.getSources().entrySet()) {
try {
- PipelineJobPreparerUtils.destroyPosition(jobConfig.getJobId(),
entry.getValue());
+ new
PipelineJobPreparer(entry.getValue().getDatabaseType()).destroyPosition(jobConfig.getJobId(),
entry.getValue());
} catch (final SQLException ex) {
log.warn("job destroying failed, jobId={}, dataSourceName={}",
jobConfig.getJobId(), entry.getKey(), ex);
}
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/FixtureDataSourceChecker.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/FixtureDataSourceChecker.java
index e80538695ab..23018527cf3 100644
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/FixtureDataSourceChecker.java
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/FixtureDataSourceChecker.java
@@ -17,7 +17,7 @@
package org.apache.shardingsphere.test.it.data.pipeline.core.fixture;
-import
org.apache.shardingsphere.data.pipeline.core.spi.datasource.DialectDataSourceChecker;
+import
org.apache.shardingsphere.data.pipeline.core.checker.DialectDataSourceChecker;
import javax.sql.DataSource;
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/prepare/InventoryTaskSplitterTest.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/prepare/InventoryTaskSplitterTest.java
index 473baa149d5..9581f5452bb 100644
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/prepare/InventoryTaskSplitterTest.java
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/prepare/InventoryTaskSplitterTest.java
@@ -25,7 +25,7 @@ import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourc
import
org.apache.shardingsphere.data.pipeline.core.ingest.position.pk.type.IntegerPrimaryKeyPosition;
import
org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataUtils;
import
org.apache.shardingsphere.data.pipeline.core.metadata.loader.StandardPipelineTableMetaDataLoader;
-import
org.apache.shardingsphere.data.pipeline.core.preparer.InventoryTaskSplitter;
+import
org.apache.shardingsphere.data.pipeline.core.preparer.inventory.InventoryTaskSplitter;
import org.apache.shardingsphere.data.pipeline.core.task.InventoryTask;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.context.MigrationJobItemContext;
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/PipelineContextUtils.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/PipelineContextUtils.java
index 939ff8881a7..970e4ad2625 100644
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/PipelineContextUtils.java
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/PipelineContextUtils.java
@@ -38,7 +38,7 @@ import
org.apache.shardingsphere.data.pipeline.core.job.progress.config.yaml.swa
import
org.apache.shardingsphere.data.pipeline.core.metadata.CaseInsensitiveIdentifier;
import
org.apache.shardingsphere.data.pipeline.core.metadata.CaseInsensitiveQualifiedTable;
import
org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineColumnMetaData;
-import
org.apache.shardingsphere.data.pipeline.core.preparer.CreateTableConfiguration;
+import
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param.CreateTableConfiguration;
import
org.apache.shardingsphere.data.pipeline.core.spi.algorithm.JobRateLimitAlgorithm;
import
org.apache.shardingsphere.data.pipeline.core.util.ShardingColumnsExtractor;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationJobConfiguration;
diff --git
a/test/it/pipeline/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.spi.datasource.DialectDataSourceChecker
b/test/it/pipeline/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.checker.DialectDataSourceChecker
similarity index 100%
rename from
test/it/pipeline/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.spi.datasource.DialectDataSourceChecker
rename to
test/it/pipeline/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.checker.DialectDataSourceChecker