This is an automated email from the ASF dual-hosted git repository.
jianglongtao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 2838da8ff74 Remove PipelineJobPreparer.checkSourceDataSource() and
checkTargetDataSource() (#29460)
2838da8ff74 is described below
commit 2838da8ff74bfd0127647f4b9880a206a7b0bc73
Author: Liang Zhang <[email protected]>
AuthorDate: Tue Dec 19 23:25:44 2023 +0800
Remove PipelineJobPreparer.checkSourceDataSource() and
checkTargetDataSource() (#29460)
* Remove PipelineJobPreparer.checkSourceDataSource()
* Remove PipelineJobPreparer.checkSourceDataSource()
* Remove PipelineJobPreparer.checkTargetDataSource()
---
.../core/checker/DataSourceCheckEngine.java | 32 +++++++++++++++++--
.../core/preparer/PipelineJobPreparer.java | 36 +---------------------
.../data/pipeline/cdc/api/CDCJobAPI.java | 5 ++-
.../pipeline/cdc/core/prepare/CDCJobPreparer.java | 5 +--
.../migration/preparer/MigrationJobPreparer.java | 22 ++++++-------
5 files changed, 46 insertions(+), 54 deletions(-)
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/checker/DataSourceCheckEngine.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/checker/DataSourceCheckEngine.java
index 89dbd1d2b75..f9e48d9a224 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/checker/DataSourceCheckEngine.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/checker/DataSourceCheckEngine.java
@@ -17,12 +17,13 @@
package org.apache.shardingsphere.data.pipeline.core.checker;
-import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mapper.TableAndSchemaNameMapper;
-import
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.PipelineCommonSQLBuilder;
import
org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithInvalidConnectionException;
import
org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithTargetTableNotEmptyException;
-import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
+import
org.apache.shardingsphere.data.pipeline.core.importer.ImporterConfiguration;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mapper.TableAndSchemaNameMapper;
+import
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.PipelineCommonSQLBuilder;
import
org.apache.shardingsphere.infra.database.core.spi.DatabaseTypedSPILoader;
+import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
import javax.sql.DataSource;
import java.sql.Connection;
@@ -30,6 +31,7 @@ import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Collection;
+import java.util.Collections;
/**
* Data source check engine.
@@ -45,6 +47,30 @@ public final class DataSourceCheckEngine {
sqlBuilder = new PipelineCommonSQLBuilder(databaseType);
}
+ /**
+ * Check source data source.
+ *
+ * @param dataSource to be checked source data source
+ */
+ public void checkSourceDataSource(final DataSource dataSource) {
+ Collection<DataSource> dataSources = Collections.singleton(dataSource);
+ checkConnection(dataSources);
+ checkPrivilege(dataSources);
+ checkVariable(dataSources);
+ }
+
+ /**
+ * Check target data source.
+ *
+ * @param dataSource to be checked target data source
+ * @param importerConfig importer configuration
+ */
+ public void checkTargetDataSource(final DataSource dataSource, final
ImporterConfiguration importerConfig) {
+ Collection<DataSource> dataSources = Collections.singleton(dataSource);
+ checkConnection(dataSources);
+ checkTargetTable(dataSources,
importerConfig.getTableAndSchemaNameMapper(),
importerConfig.getLogicTableNames());
+ }
+
/**
* Check data source connections.
*
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/PipelineJobPreparer.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/PipelineJobPreparer.java
index 7b6494a7686..792f581f092 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/PipelineJobPreparer.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/PipelineJobPreparer.java
@@ -22,10 +22,8 @@ import lombok.extern.slf4j.Slf4j;
import
org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.type.ShardingSpherePipelineDataSourceConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.type.StandardPipelineDataSourceConfiguration;
-import
org.apache.shardingsphere.data.pipeline.core.checker.DataSourceCheckEngine;
import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceWrapper;
-import
org.apache.shardingsphere.data.pipeline.core.importer.ImporterConfiguration;
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.IncrementalDumperContext;
import
org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPosition;
import
org.apache.shardingsphere.data.pipeline.core.job.progress.JobItemIncrementalTasksProgress;
@@ -38,7 +36,6 @@ import
org.apache.shardingsphere.infra.yaml.config.swapper.resource.YamlDataSour
import javax.sql.DataSource;
import java.sql.SQLException;
-import java.util.Collection;
import java.util.Optional;
/**
@@ -54,7 +51,7 @@ public final class PipelineJobPreparer {
* Get incremental position.
*
* @param initIncremental init incremental
- * @param dumperContext dumper config
+ * @param dumperContext incremental dumper context
* @param dataSourceManager data source manager
* @return ingest position
* @throws SQLException SQL exception
@@ -71,37 +68,6 @@ public final class PipelineJobPreparer {
return DatabaseTypedSPILoader.getService(PositionInitializer.class,
databaseType).init(dataSource, dumperContext.getJobId());
}
- /**
- * Check data source.
- *
- * @param dataSources data source
- */
- public void checkSourceDataSource(final Collection<? extends DataSource>
dataSources) {
- if (dataSources.isEmpty()) {
- return;
- }
- DataSourceCheckEngine checkEngine = new
DataSourceCheckEngine(databaseType);
- checkEngine.checkConnection(dataSources);
- checkEngine.checkPrivilege(dataSources);
- checkEngine.checkVariable(dataSources);
- }
-
- /**
- * Check target data source.
- *
- * @param importerConfig importer config
- * @param targetDataSources target data sources
- */
- public void checkTargetDataSource(final ImporterConfiguration
importerConfig, final Collection<? extends DataSource> targetDataSources) {
- if (null == targetDataSources || targetDataSources.isEmpty()) {
- log.info("target data source is empty, skip check");
- return;
- }
- DataSourceCheckEngine dataSourceCheckEngine = new
DataSourceCheckEngine(databaseType);
- dataSourceCheckEngine.checkConnection(targetDataSources);
- dataSourceCheckEngine.checkTargetTable(targetDataSources,
importerConfig.getTableAndSchemaNameMapper(),
importerConfig.getLogicTableNames());
- }
-
/**
* Cleanup job preparer.
*
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
index 04ef66dcac7..a177080d867 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
@@ -196,9 +196,8 @@ public final class CDCJobAPI implements TransmissionJobAPI {
jobConfig.getJobId(), jobConfig.isDecodeWithTX());
}
- private static TransmissionJobItemProgress
getTransmissionJobItemProgress(final CDCJobConfiguration jobConfig,
-
final PipelineDataSourceManager dataSourceManager,
-
final IncrementalDumperContext incrementalDumperContext) throws SQLException {
+ private TransmissionJobItemProgress getTransmissionJobItemProgress(final
CDCJobConfiguration jobConfig, final PipelineDataSourceManager
dataSourceManager,
+ final
IncrementalDumperContext incrementalDumperContext) throws SQLException {
TransmissionJobItemProgress result = new TransmissionJobItemProgress();
result.setSourceDatabaseType(jobConfig.getSourceDatabaseType());
result.setDataSourceName(incrementalDumperContext.getCommonContext().getDataSourceName());
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
index f22418276c4..c9da76d64ba 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
@@ -105,8 +105,9 @@ public final class CDCJobPreparer {
CDCTaskConfiguration taskConfig = jobItemContext.getTaskConfig();
JobItemIncrementalTasksProgress initIncremental = null ==
jobItemContext.getInitProgress() ? null :
jobItemContext.getInitProgress().getIncremental();
try {
- taskConfig.getDumperContext().getCommonContext().setPosition(new
PipelineJobPreparer(taskConfig.getDumperContext().getCommonContext().getDataSourceConfig().getDatabaseType())
- .getIncrementalPosition(initIncremental,
taskConfig.getDumperContext(), jobItemContext.getDataSourceManager()));
+ DatabaseType databaseType =
taskConfig.getDumperContext().getCommonContext().getDataSourceConfig().getDatabaseType();
+ IngestPosition position = new
PipelineJobPreparer(databaseType).getIncrementalPosition(initIncremental,
taskConfig.getDumperContext(), jobItemContext.getDataSourceManager());
+
taskConfig.getDumperContext().getCommonContext().setPosition(position);
} catch (final SQLException ex) {
throw new
PrepareJobWithGetBinlogPositionException(jobItemContext.getJobId(), ex);
}
diff --git
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java
index ddbb5a3a416..db5e6407798 100644
---
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java
+++
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java
@@ -20,6 +20,7 @@ package
org.apache.shardingsphere.data.pipeline.scenario.migration.preparer;
import lombok.extern.slf4j.Slf4j;
import
org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.type.StandardPipelineDataSourceConfiguration;
+import
org.apache.shardingsphere.data.pipeline.core.checker.DataSourceCheckEngine;
import
org.apache.shardingsphere.data.pipeline.core.context.PipelineContextManager;
import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceWrapper;
@@ -34,6 +35,7 @@ import
org.apache.shardingsphere.data.pipeline.core.ingest.channel.PipelineChann
import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.Dumper;
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.IncrementalDumperContext;
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.InventoryDumperContext;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPosition;
import org.apache.shardingsphere.data.pipeline.core.job.JobStatus;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobRegistry;
import org.apache.shardingsphere.data.pipeline.core.job.api.PipelineAPIFactory;
@@ -74,7 +76,6 @@ import org.apache.shardingsphere.parser.rule.SQLParserRule;
import java.sql.SQLException;
import java.util.Collection;
-import java.util.Collections;
import java.util.LinkedList;
import java.util.Map.Entry;
import java.util.concurrent.TimeUnit;
@@ -100,7 +101,7 @@ public final class MigrationJobPreparer {
jobItemContext.getTaskConfig().getDumperContext().getCommonContext().getDataSourceConfig().getClass()),
() -> new UnsupportedSQLOperationException("Migration
inventory dumper only support StandardPipelineDataSourceConfiguration"));
DatabaseType sourceDatabaseType =
jobItemContext.getJobConfig().getSourceDatabaseType();
- new
PipelineJobPreparer(sourceDatabaseType).checkSourceDataSource(Collections.singleton(jobItemContext.getSourceDataSource()));
+ new
DataSourceCheckEngine(sourceDatabaseType).checkSourceDataSource(jobItemContext.getSourceDataSource());
if (jobItemContext.isStopping()) {
PipelineJobRegistry.stop(jobItemContext.getJobId());
return;
@@ -156,21 +157,19 @@ public final class MigrationJobPreparer {
}
private void prepareAndCheckTarget(final MigrationJobItemContext
jobItemContext) throws SQLException {
+ DatabaseType targetDatabaseType =
jobItemContext.getJobConfig().getTargetDatabaseType();
if (jobItemContext.isSourceTargetDatabaseTheSame()) {
- prepareTarget(jobItemContext);
+ prepareTarget(jobItemContext, targetDatabaseType);
}
- TransmissionJobItemProgress initProgress =
jobItemContext.getInitProgress();
- if (null == initProgress) {
+ if (null == jobItemContext.getInitProgress()) {
PipelineDataSourceWrapper targetDataSource =
jobItemContext.getDataSourceManager().getDataSource(jobItemContext.getTaskConfig().getImporterConfig().getDataSourceConfig());
- new
PipelineJobPreparer(jobItemContext.getJobConfig().getTargetDatabaseType()).checkTargetDataSource(
- jobItemContext.getTaskConfig().getImporterConfig(),
Collections.singleton(targetDataSource));
+ new
DataSourceCheckEngine(targetDatabaseType).checkTargetDataSource(targetDataSource,
jobItemContext.getTaskConfig().getImporterConfig());
}
}
- private void prepareTarget(final MigrationJobItemContext jobItemContext)
throws SQLException {
+ private void prepareTarget(final MigrationJobItemContext jobItemContext,
final DatabaseType targetDatabaseType) throws SQLException {
MigrationJobConfiguration jobConfig = jobItemContext.getJobConfig();
Collection<CreateTableConfiguration> createTableConfigs =
jobItemContext.getTaskConfig().getCreateTableConfigurations();
- DatabaseType targetDatabaseType =
jobItemContext.getJobConfig().getTargetDatabaseType();
PipelineDataSourceManager dataSourceManager =
jobItemContext.getDataSourceManager();
PipelineJobDataSourcePreparer preparer = new
PipelineJobDataSourcePreparer(DatabaseTypedSPILoader.getService(DialectPipelineJobDataSourcePrepareOption.class,
targetDatabaseType));
preparer.prepareTargetSchemas(new
PrepareTargetSchemasParameter(targetDatabaseType, createTableConfigs,
dataSourceManager));
@@ -184,8 +183,9 @@ public final class MigrationJobPreparer {
MigrationTaskConfiguration taskConfig = jobItemContext.getTaskConfig();
JobItemIncrementalTasksProgress initIncremental = null ==
jobItemContext.getInitProgress() ? null :
jobItemContext.getInitProgress().getIncremental();
try {
- taskConfig.getDumperContext().getCommonContext().setPosition(new
PipelineJobPreparer(taskConfig.getDumperContext().getCommonContext().getDataSourceConfig().getDatabaseType())
- .getIncrementalPosition(initIncremental,
taskConfig.getDumperContext(), jobItemContext.getDataSourceManager()));
+ DatabaseType databaseType =
taskConfig.getDumperContext().getCommonContext().getDataSourceConfig().getDatabaseType();
+ IngestPosition position = new
PipelineJobPreparer(databaseType).getIncrementalPosition(initIncremental,
taskConfig.getDumperContext(), jobItemContext.getDataSourceManager());
+
taskConfig.getDumperContext().getCommonContext().setPosition(position);
} catch (final SQLException ex) {
throw new
PrepareJobWithGetBinlogPositionException(jobItemContext.getJobId(), ex);
}