This is an automated email from the ASF dual-hosted git repository.
zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 7962d23a69a Simplify PipelineDataSourceCheckEngine methods param
(#38038)
7962d23a69a is described below
commit 7962d23a69a1ea5b3caf94f875a23f9d238f9783
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Sat Feb 14 11:27:51 2026 +0800
Simplify PipelineDataSourceCheckEngine methods param (#38038)
* Simplify PipelineDataSourceCheckEngine dataSources param to dataSource
* Simplify PipelineDataSourceCheckEngine.checkTargetDataSource
importerConfig param to qualifiedTables
* Refactor job item
---
.../checker/PipelineDataSourceCheckEngine.java | 39 ++++++++++------------
...YamlJobItemIncrementalTasksProgressSwapper.java | 13 ++++++--
.../core/job/service/PipelineJobItemManager.java | 3 ++
.../checker/PipelineDataSourceCheckEngineTest.java | 34 +++++++------------
.../migration/preparer/MigrationJobPreparer.java | 6 ++--
5 files changed, 45 insertions(+), 50 deletions(-)
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/checker/PipelineDataSourceCheckEngine.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/checker/PipelineDataSourceCheckEngine.java
index 58f86c758d9..635853b25ee 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/checker/PipelineDataSourceCheckEngine.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/checker/PipelineDataSourceCheckEngine.java
@@ -18,7 +18,6 @@
package org.apache.shardingsphere.data.pipeline.core.checker;
import
org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithTargetTableNotEmptyException;
-import
org.apache.shardingsphere.data.pipeline.core.importer.ImporterConfiguration;
import
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.sql.PipelinePrepareSQLBuilder;
import
org.apache.shardingsphere.database.connector.core.checker.DialectDatabasePrivilegeChecker;
import
org.apache.shardingsphere.database.connector.core.checker.PrivilegeCheckType;
@@ -52,14 +51,12 @@ public final class PipelineDataSourceCheckEngine {
/**
* Check data source connections.
*
- * @param dataSources data sources
+ * @param dataSource data source
* @throws SQLWrapperException SQL wrapper exception
*/
- public void checkConnection(final Collection<DataSource> dataSources) {
+ public void checkConnection(final DataSource dataSource) {
try {
- for (DataSource each : dataSources) {
- each.getConnection().close();
- }
+ dataSource.getConnection().close();
} catch (final SQLException ex) {
throw new SQLWrapperException(ex);
}
@@ -68,31 +65,29 @@ public final class PipelineDataSourceCheckEngine {
/**
* Check source data source.
*
- * @param dataSources to be checked source data source
+ * @param dataSource to be checked source data source
*/
- public void checkSourceDataSources(final Collection<DataSource>
dataSources) {
- checkConnection(dataSources);
-
DatabaseTypedSPILoader.findService(DialectDatabasePrivilegeChecker.class,
databaseType).ifPresent(optional -> dataSources.forEach(each ->
optional.check(each, PrivilegeCheckType.PIPELINE)));
-
DatabaseTypedSPILoader.findService(DialectPipelineDatabaseVariableChecker.class,
databaseType).ifPresent(optional -> dataSources.forEach(optional::check));
+ public void checkSourceDataSource(final DataSource dataSource) {
+ checkConnection(dataSource);
+
DatabaseTypedSPILoader.findService(DialectDatabasePrivilegeChecker.class,
databaseType).ifPresent(optional -> optional.check(dataSource,
PrivilegeCheckType.PIPELINE));
+
DatabaseTypedSPILoader.findService(DialectPipelineDatabaseVariableChecker.class,
databaseType).ifPresent(optional -> optional.check(dataSource));
}
/**
- * Check target data sources.
+ * Check target data source.
*
- * @param dataSources to be checked target data sources
- * @param importerConfig importer configuration
+ * @param dataSource to be checked target data sources
+ * @param qualifiedTables qualified tables
*/
- public void checkTargetDataSources(final Collection<DataSource>
dataSources, final ImporterConfiguration importerConfig) {
- checkConnection(dataSources);
- checkEmptyTable(dataSources, importerConfig);
+ public void checkTargetDataSource(final DataSource dataSource, final
Collection<QualifiedTable> qualifiedTables) {
+ checkConnection(dataSource);
+ checkEmptyTable(dataSource, qualifiedTables);
}
- private void checkEmptyTable(final Collection<DataSource> dataSources,
final ImporterConfiguration importerConfig) {
+ private void checkEmptyTable(final DataSource dataSource, final
Collection<QualifiedTable> qualifiedTables) {
try {
- for (DataSource each : dataSources) {
- for (QualifiedTable qualifiedTable :
importerConfig.getTableAndSchemaNameMapper().getQualifiedTables()) {
-
ShardingSpherePreconditions.checkState(checkEmptyTable(each, qualifiedTable),
() -> new
PrepareJobWithTargetTableNotEmptyException(qualifiedTable.getTableName()));
- }
+ for (QualifiedTable qualifiedTable : qualifiedTables) {
+
ShardingSpherePreconditions.checkState(checkEmptyTable(dataSource,
qualifiedTable), () -> new
PrepareJobWithTargetTableNotEmptyException(qualifiedTable.getTableName()));
}
} catch (final SQLException ex) {
throw new SQLWrapperException(ex);
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/swapper/YamlJobItemIncrementalTasksProgressSwapper.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/swapper/YamlJobItemIncrementalTasksProgressSwapper.java
index fc5b9fbf158..fb5f29b3fbb 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/swapper/YamlJobItemIncrementalTasksProgressSwapper.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/swapper/YamlJobItemIncrementalTasksProgressSwapper.java
@@ -18,6 +18,7 @@
package org.apache.shardingsphere.data.pipeline.core.job.progress.yaml.swapper;
import
org.apache.shardingsphere.data.pipeline.core.ingest.position.DialectIncrementalPositionManager;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPosition;
import
org.apache.shardingsphere.data.pipeline.core.job.progress.JobItemIncrementalTasksProgress;
import
org.apache.shardingsphere.data.pipeline.core.job.progress.yaml.config.YamlJobItemIncrementalTasksProgress;
import
org.apache.shardingsphere.data.pipeline.core.task.progress.IncrementalTaskProgress;
@@ -45,7 +46,10 @@ public final class
YamlJobItemIncrementalTasksProgressSwapper {
return new YamlJobItemIncrementalTasksProgress();
}
YamlJobItemIncrementalTasksProgress result = new
YamlJobItemIncrementalTasksProgress();
-
result.setPosition(progress.getIncrementalTaskProgress().getPosition().toString());
+ IngestPosition position =
progress.getIncrementalTaskProgress().getPosition();
+ if (null != position) {
+ result.setPosition(position.toString());
+ }
result.setDelay(progress.getIncrementalTaskProgress().getIncrementalTaskDelay());
return result;
}
@@ -62,8 +66,11 @@ public final class
YamlJobItemIncrementalTasksProgressSwapper {
return new JobItemIncrementalTasksProgress(null);
}
DialectIncrementalPositionManager positionInitializer =
DatabaseTypedSPILoader.getService(DialectIncrementalPositionManager.class,
TypedSPILoader.getService(DatabaseType.class, databaseType));
- IncrementalTaskProgress taskProgress = new
IncrementalTaskProgress(positionInitializer.init(yamlProgress.getPosition()));
- taskProgress.setIncrementalTaskDelay(yamlProgress.getDelay());
+ IncrementalTaskProgress taskProgress = null;
+ if (null != yamlProgress.getPosition()) {
+ taskProgress = new
IncrementalTaskProgress(positionInitializer.init(yamlProgress.getPosition()));
+ taskProgress.setIncrementalTaskDelay(yamlProgress.getDelay());
+ }
return new JobItemIncrementalTasksProgress(taskProgress);
}
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobItemManager.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobItemManager.java
index 696afff8333..5224c310550 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobItemManager.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobItemManager.java
@@ -17,6 +17,7 @@
package org.apache.shardingsphere.data.pipeline.core.job.service;
+import lombok.extern.slf4j.Slf4j;
import
org.apache.shardingsphere.data.pipeline.core.context.PipelineJobItemContext;
import org.apache.shardingsphere.data.pipeline.core.job.JobStatus;
import
org.apache.shardingsphere.data.pipeline.core.job.progress.PipelineJobItemProgress;
@@ -33,6 +34,7 @@ import java.util.Optional;
*
* @param <T> type of pipeline job item progress
*/
+@Slf4j
public final class PipelineJobItemManager<T extends PipelineJobItemProgress> {
private final
YamlPipelineJobItemProgressSwapper<YamlPipelineJobItemProgressConfiguration, T>
swapper;
@@ -52,6 +54,7 @@ public final class PipelineJobItemManager<T extends
PipelineJobItemProgress> {
public void updateStatus(final String jobId, final int shardingItem, final
JobStatus status) {
Optional<T> jobItemProgress = getProgress(jobId, shardingItem);
if (!jobItemProgress.isPresent()) {
+ log.info("Skip update status for non-existent job item progress,
jobId={}, shardingItem={}, status={}", jobId, shardingItem, status);
return;
}
jobItemProgress.get().setStatus(status);
diff --git
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/checker/PipelineDataSourceCheckEngineTest.java
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/checker/PipelineDataSourceCheckEngineTest.java
index 0725692e3e8..9b327e69066 100644
---
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/checker/PipelineDataSourceCheckEngineTest.java
+++
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/checker/PipelineDataSourceCheckEngineTest.java
@@ -18,7 +18,6 @@
package org.apache.shardingsphere.data.pipeline.core.checker;
import
org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithTargetTableNotEmptyException;
-import
org.apache.shardingsphere.data.pipeline.core.importer.ImporterConfiguration;
import org.apache.shardingsphere.database.connector.core.type.DatabaseType;
import
org.apache.shardingsphere.infra.exception.external.sql.type.wrapper.SQLWrapperException;
import org.apache.shardingsphere.infra.metadata.database.schema.QualifiedTable;
@@ -36,25 +35,22 @@ import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Collection;
import java.util.Collections;
-import java.util.LinkedList;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertThrows;
-import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
-import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@ExtendWith(MockitoExtension.class)
class PipelineDataSourceCheckEngineTest {
+ private final Collection<QualifiedTable> qualifiedTables =
Collections.singleton(new QualifiedTable(null, "t_order"));
+
@Mock(extraInterfaces = AutoCloseable.class)
private DataSource dataSource;
private PipelineDataSourceCheckEngine pipelineDataSourceCheckEngine;
- private Collection<DataSource> dataSources;
-
@Mock
private Connection connection;
@@ -67,54 +63,48 @@ class PipelineDataSourceCheckEngineTest {
@BeforeEach
void setUp() {
pipelineDataSourceCheckEngine = new
PipelineDataSourceCheckEngine(TypedSPILoader.getService(DatabaseType.class,
"FIXTURE"));
- dataSources = new LinkedList<>();
- dataSources.add(dataSource);
}
@Test
void assertCheckConnection() throws SQLException {
when(dataSource.getConnection()).thenReturn(connection);
- pipelineDataSourceCheckEngine.checkConnection(dataSources);
+ pipelineDataSourceCheckEngine.checkConnection(dataSource);
verify(dataSource).getConnection();
}
@Test
void assertCheckConnectionFailed() throws SQLException {
when(dataSource.getConnection()).thenThrow(new SQLException("error"));
- assertThrows(SQLWrapperException.class, () ->
pipelineDataSourceCheckEngine.checkConnection(dataSources));
+ assertThrows(SQLWrapperException.class, () ->
pipelineDataSourceCheckEngine.checkConnection(dataSource));
}
@Test
- void assertCheckSourceDataSources() throws SQLException {
+ void assertCheckSourceDataSource() throws SQLException {
when(dataSource.getConnection()).thenReturn(connection);
- pipelineDataSourceCheckEngine.checkSourceDataSources(dataSources);
+ pipelineDataSourceCheckEngine.checkSourceDataSource(dataSource);
verify(dataSource).getConnection();
}
@Test
- void assertCheckTargetDataSources() throws SQLException {
+ void assertCheckTargetDataSource() throws SQLException {
when(dataSource.getConnection()).thenReturn(connection);
when(connection.prepareStatement("SELECT * FROM t_order LIMIT
1")).thenReturn(preparedStatement);
when(preparedStatement.executeQuery()).thenReturn(resultSet);
- ImporterConfiguration importerConfig =
mock(ImporterConfiguration.class, RETURNS_DEEP_STUBS);
-
when(importerConfig.getTableAndSchemaNameMapper().getQualifiedTables()).thenReturn(Collections.singleton(new
QualifiedTable(null, "t_order")));
- assertDoesNotThrow(() ->
pipelineDataSourceCheckEngine.checkTargetDataSources(dataSources,
importerConfig));
+ assertDoesNotThrow(() ->
pipelineDataSourceCheckEngine.checkTargetDataSource(dataSource,
qualifiedTables));
}
@Test
- void assertCheckTargetDataSourcesFailed() throws SQLException {
+ void assertCheckTargetDataSourceFailed() throws SQLException {
when(dataSource.getConnection()).thenReturn(connection);
when(connection.prepareStatement("SELECT * FROM t_order LIMIT
1")).thenReturn(preparedStatement);
when(preparedStatement.executeQuery()).thenReturn(resultSet);
when(resultSet.next()).thenReturn(true);
- ImporterConfiguration importerConfig =
mock(ImporterConfiguration.class, RETURNS_DEEP_STUBS);
-
when(importerConfig.getTableAndSchemaNameMapper().getQualifiedTables()).thenReturn(Collections.singleton(new
QualifiedTable(null, "t_order")));
- assertThrows(PrepareJobWithTargetTableNotEmptyException.class, () ->
pipelineDataSourceCheckEngine.checkTargetDataSources(dataSources,
importerConfig));
+ assertThrows(PrepareJobWithTargetTableNotEmptyException.class, () ->
pipelineDataSourceCheckEngine.checkTargetDataSource(dataSource,
qualifiedTables));
}
@Test
- void assertCheckTargetDataSourcesWhenSQLExceptionThrown() throws
SQLException {
+ void assertCheckTargetDataSourceWhenSQLExceptionThrown() throws
SQLException {
when(dataSource.getConnection()).thenThrow(new SQLException(""));
- assertThrows(SQLWrapperException.class, () ->
pipelineDataSourceCheckEngine.checkTargetDataSources(dataSources,
mock(ImporterConfiguration.class)));
+ assertThrows(SQLWrapperException.class, () ->
pipelineDataSourceCheckEngine.checkTargetDataSource(dataSource,
qualifiedTables));
}
}
diff --git
a/kernel/data-pipeline/scenario/migration/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java
b/kernel/data-pipeline/scenario/migration/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java
index fe3bcf8e4a7..a171568f809 100644
---
a/kernel/data-pipeline/scenario/migration/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java
+++
b/kernel/data-pipeline/scenario/migration/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java
@@ -88,7 +88,7 @@ public final class MigrationJobPreparer implements
PipelineJobPreparer<Migration
jobItemContext.getTaskConfig().getDumperContext().getCommonContext().getDataSourceConfig().getClass()),
() -> new UnsupportedSQLOperationException("Migration
inventory dumper only support StandardPipelineDataSourceConfiguration."));
DatabaseType sourceDatabaseType =
jobItemContext.getJobConfig().getSourceDatabaseType();
- new
PipelineDataSourceCheckEngine(sourceDatabaseType).checkSourceDataSources(Collections.singleton(jobItemContext.getSourceDataSource()));
+ new
PipelineDataSourceCheckEngine(sourceDatabaseType).checkSourceDataSource(jobItemContext.getSourceDataSource());
ShardingSpherePreconditions.checkState(!jobItemContext.isStopping(),
PipelineJobCancelingException::new);
prepareAndCheckTargetWithLock(jobItemContext);
ShardingSpherePreconditions.checkState(!jobItemContext.isStopping(),
PipelineJobCancelingException::new);
@@ -131,8 +131,8 @@ public final class MigrationJobPreparer implements
PipelineJobPreparer<Migration
}
if (null == jobItemContext.getInitProgress()) {
PipelineDataSource targetDataSource =
jobItemContext.getDataSourceManager().getDataSource(jobItemContext.getTaskConfig().getImporterConfig().getDataSourceConfig());
- new
PipelineDataSourceCheckEngine(jobItemContext.getJobConfig().getTargetDatabaseType()).checkTargetDataSources(Collections.singleton(targetDataSource),
- jobItemContext.getTaskConfig().getImporterConfig());
+ new
PipelineDataSourceCheckEngine(jobItemContext.getJobConfig().getTargetDatabaseType())
+ .checkTargetDataSource(targetDataSource,
jobItemContext.getTaskConfig().getImporterConfig().getTableAndSchemaNameMapper().getQualifiedTables());
}
}