This is an automated email from the ASF dual-hosted git repository.

zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 7962d23a69a Simplify PipelineDataSourceCheckEngine methods param 
(#38038)
7962d23a69a is described below

commit 7962d23a69a1ea5b3caf94f875a23f9d238f9783
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Sat Feb 14 11:27:51 2026 +0800

    Simplify PipelineDataSourceCheckEngine methods param (#38038)
    
    * Simplify PipelineDataSourceCheckEngine dataSources param to dataSource
    
    * Simplify PipelineDataSourceCheckEngine.checkTargetDataSource 
importerConfig param to qualifiedTables
    
    * Refactor job item
---
 .../checker/PipelineDataSourceCheckEngine.java     | 39 ++++++++++------------
 ...YamlJobItemIncrementalTasksProgressSwapper.java | 13 ++++++--
 .../core/job/service/PipelineJobItemManager.java   |  3 ++
 .../checker/PipelineDataSourceCheckEngineTest.java | 34 +++++++------------
 .../migration/preparer/MigrationJobPreparer.java   |  6 ++--
 5 files changed, 45 insertions(+), 50 deletions(-)

diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/checker/PipelineDataSourceCheckEngine.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/checker/PipelineDataSourceCheckEngine.java
index 58f86c758d9..635853b25ee 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/checker/PipelineDataSourceCheckEngine.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/checker/PipelineDataSourceCheckEngine.java
@@ -18,7 +18,6 @@
 package org.apache.shardingsphere.data.pipeline.core.checker;
 
 import 
org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithTargetTableNotEmptyException;
-import 
org.apache.shardingsphere.data.pipeline.core.importer.ImporterConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.sql.PipelinePrepareSQLBuilder;
 import 
org.apache.shardingsphere.database.connector.core.checker.DialectDatabasePrivilegeChecker;
 import 
org.apache.shardingsphere.database.connector.core.checker.PrivilegeCheckType;
@@ -52,14 +51,12 @@ public final class PipelineDataSourceCheckEngine {
     /**
      * Check data source connections.
      *
-     * @param dataSources data sources
+     * @param dataSource data source
      * @throws SQLWrapperException SQL wrapper exception
      */
-    public void checkConnection(final Collection<DataSource> dataSources) {
+    public void checkConnection(final DataSource dataSource) {
         try {
-            for (DataSource each : dataSources) {
-                each.getConnection().close();
-            }
+            dataSource.getConnection().close();
         } catch (final SQLException ex) {
             throw new SQLWrapperException(ex);
         }
@@ -68,31 +65,29 @@ public final class PipelineDataSourceCheckEngine {
     /**
      * Check source data source.
      *
-     * @param dataSources to be checked source data source
+     * @param dataSource to be checked source data source
      */
-    public void checkSourceDataSources(final Collection<DataSource> 
dataSources) {
-        checkConnection(dataSources);
-        
DatabaseTypedSPILoader.findService(DialectDatabasePrivilegeChecker.class, 
databaseType).ifPresent(optional -> dataSources.forEach(each -> 
optional.check(each, PrivilegeCheckType.PIPELINE)));
-        
DatabaseTypedSPILoader.findService(DialectPipelineDatabaseVariableChecker.class,
 databaseType).ifPresent(optional -> dataSources.forEach(optional::check));
+    public void checkSourceDataSource(final DataSource dataSource) {
+        checkConnection(dataSource);
+        
DatabaseTypedSPILoader.findService(DialectDatabasePrivilegeChecker.class, 
databaseType).ifPresent(optional -> optional.check(dataSource, 
PrivilegeCheckType.PIPELINE));
+        
DatabaseTypedSPILoader.findService(DialectPipelineDatabaseVariableChecker.class,
 databaseType).ifPresent(optional -> optional.check(dataSource));
     }
     
     /**
-     * Check target data sources.
+     * Check target data source.
      *
-     * @param dataSources to be checked target data sources
-     * @param importerConfig importer configuration
+     * @param dataSource to be checked target data sources
+     * @param qualifiedTables qualified tables
      */
-    public void checkTargetDataSources(final Collection<DataSource> 
dataSources, final ImporterConfiguration importerConfig) {
-        checkConnection(dataSources);
-        checkEmptyTable(dataSources, importerConfig);
+    public void checkTargetDataSource(final DataSource dataSource, final 
Collection<QualifiedTable> qualifiedTables) {
+        checkConnection(dataSource);
+        checkEmptyTable(dataSource, qualifiedTables);
     }
     
-    private void checkEmptyTable(final Collection<DataSource> dataSources, 
final ImporterConfiguration importerConfig) {
+    private void checkEmptyTable(final DataSource dataSource, final 
Collection<QualifiedTable> qualifiedTables) {
         try {
-            for (DataSource each : dataSources) {
-                for (QualifiedTable qualifiedTable : 
importerConfig.getTableAndSchemaNameMapper().getQualifiedTables()) {
-                    
ShardingSpherePreconditions.checkState(checkEmptyTable(each, qualifiedTable), 
() -> new 
PrepareJobWithTargetTableNotEmptyException(qualifiedTable.getTableName()));
-                }
+            for (QualifiedTable qualifiedTable : qualifiedTables) {
+                
ShardingSpherePreconditions.checkState(checkEmptyTable(dataSource, 
qualifiedTable), () -> new 
PrepareJobWithTargetTableNotEmptyException(qualifiedTable.getTableName()));
             }
         } catch (final SQLException ex) {
             throw new SQLWrapperException(ex);
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/swapper/YamlJobItemIncrementalTasksProgressSwapper.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/swapper/YamlJobItemIncrementalTasksProgressSwapper.java
index fc5b9fbf158..fb5f29b3fbb 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/swapper/YamlJobItemIncrementalTasksProgressSwapper.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/swapper/YamlJobItemIncrementalTasksProgressSwapper.java
@@ -18,6 +18,7 @@
 package org.apache.shardingsphere.data.pipeline.core.job.progress.yaml.swapper;
 
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.DialectIncrementalPositionManager;
+import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPosition;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.JobItemIncrementalTasksProgress;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.yaml.config.YamlJobItemIncrementalTasksProgress;
 import 
org.apache.shardingsphere.data.pipeline.core.task.progress.IncrementalTaskProgress;
@@ -45,7 +46,10 @@ public final class 
YamlJobItemIncrementalTasksProgressSwapper {
             return new YamlJobItemIncrementalTasksProgress();
         }
         YamlJobItemIncrementalTasksProgress result = new 
YamlJobItemIncrementalTasksProgress();
-        
result.setPosition(progress.getIncrementalTaskProgress().getPosition().toString());
+        IngestPosition position = 
progress.getIncrementalTaskProgress().getPosition();
+        if (null != position) {
+            result.setPosition(position.toString());
+        }
         
result.setDelay(progress.getIncrementalTaskProgress().getIncrementalTaskDelay());
         return result;
     }
@@ -62,8 +66,11 @@ public final class 
YamlJobItemIncrementalTasksProgressSwapper {
             return new JobItemIncrementalTasksProgress(null);
         }
         DialectIncrementalPositionManager positionInitializer = 
DatabaseTypedSPILoader.getService(DialectIncrementalPositionManager.class, 
TypedSPILoader.getService(DatabaseType.class, databaseType));
-        IncrementalTaskProgress taskProgress = new 
IncrementalTaskProgress(positionInitializer.init(yamlProgress.getPosition()));
-        taskProgress.setIncrementalTaskDelay(yamlProgress.getDelay());
+        IncrementalTaskProgress taskProgress = null;
+        if (null != yamlProgress.getPosition()) {
+            taskProgress = new 
IncrementalTaskProgress(positionInitializer.init(yamlProgress.getPosition()));
+            taskProgress.setIncrementalTaskDelay(yamlProgress.getDelay());
+        }
         return new JobItemIncrementalTasksProgress(taskProgress);
     }
 }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobItemManager.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobItemManager.java
index 696afff8333..5224c310550 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobItemManager.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobItemManager.java
@@ -17,6 +17,7 @@
 
 package org.apache.shardingsphere.data.pipeline.core.job.service;
 
+import lombok.extern.slf4j.Slf4j;
 import 
org.apache.shardingsphere.data.pipeline.core.context.PipelineJobItemContext;
 import org.apache.shardingsphere.data.pipeline.core.job.JobStatus;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.PipelineJobItemProgress;
@@ -33,6 +34,7 @@ import java.util.Optional;
  * 
  * @param <T> type of pipeline job item progress
  */
+@Slf4j
 public final class PipelineJobItemManager<T extends PipelineJobItemProgress> {
     
     private final 
YamlPipelineJobItemProgressSwapper<YamlPipelineJobItemProgressConfiguration, T> 
swapper;
@@ -52,6 +54,7 @@ public final class PipelineJobItemManager<T extends 
PipelineJobItemProgress> {
     public void updateStatus(final String jobId, final int shardingItem, final 
JobStatus status) {
         Optional<T> jobItemProgress = getProgress(jobId, shardingItem);
         if (!jobItemProgress.isPresent()) {
+            log.info("Skip update status for non-existent job item progress, 
jobId={}, shardingItem={}, status={}", jobId, shardingItem, status);
             return;
         }
         jobItemProgress.get().setStatus(status);
diff --git 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/checker/PipelineDataSourceCheckEngineTest.java
 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/checker/PipelineDataSourceCheckEngineTest.java
index 0725692e3e8..9b327e69066 100644
--- 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/checker/PipelineDataSourceCheckEngineTest.java
+++ 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/checker/PipelineDataSourceCheckEngineTest.java
@@ -18,7 +18,6 @@
 package org.apache.shardingsphere.data.pipeline.core.checker;
 
 import 
org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithTargetTableNotEmptyException;
-import 
org.apache.shardingsphere.data.pipeline.core.importer.ImporterConfiguration;
 import org.apache.shardingsphere.database.connector.core.type.DatabaseType;
 import 
org.apache.shardingsphere.infra.exception.external.sql.type.wrapper.SQLWrapperException;
 import org.apache.shardingsphere.infra.metadata.database.schema.QualifiedTable;
@@ -36,25 +35,22 @@ import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.LinkedList;
 
 import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 import static org.junit.jupiter.api.Assertions.assertThrows;
-import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
-import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 @ExtendWith(MockitoExtension.class)
 class PipelineDataSourceCheckEngineTest {
     
+    private final Collection<QualifiedTable> qualifiedTables = 
Collections.singleton(new QualifiedTable(null, "t_order"));
+    
     @Mock(extraInterfaces = AutoCloseable.class)
     private DataSource dataSource;
     
     private PipelineDataSourceCheckEngine pipelineDataSourceCheckEngine;
     
-    private Collection<DataSource> dataSources;
-    
     @Mock
     private Connection connection;
     
@@ -67,54 +63,48 @@ class PipelineDataSourceCheckEngineTest {
     @BeforeEach
     void setUp() {
         pipelineDataSourceCheckEngine = new 
PipelineDataSourceCheckEngine(TypedSPILoader.getService(DatabaseType.class, 
"FIXTURE"));
-        dataSources = new LinkedList<>();
-        dataSources.add(dataSource);
     }
     
     @Test
     void assertCheckConnection() throws SQLException {
         when(dataSource.getConnection()).thenReturn(connection);
-        pipelineDataSourceCheckEngine.checkConnection(dataSources);
+        pipelineDataSourceCheckEngine.checkConnection(dataSource);
         verify(dataSource).getConnection();
     }
     
     @Test
     void assertCheckConnectionFailed() throws SQLException {
         when(dataSource.getConnection()).thenThrow(new SQLException("error"));
-        assertThrows(SQLWrapperException.class, () -> 
pipelineDataSourceCheckEngine.checkConnection(dataSources));
+        assertThrows(SQLWrapperException.class, () -> 
pipelineDataSourceCheckEngine.checkConnection(dataSource));
     }
     
     @Test
-    void assertCheckSourceDataSources() throws SQLException {
+    void assertCheckSourceDataSource() throws SQLException {
         when(dataSource.getConnection()).thenReturn(connection);
-        pipelineDataSourceCheckEngine.checkSourceDataSources(dataSources);
+        pipelineDataSourceCheckEngine.checkSourceDataSource(dataSource);
         verify(dataSource).getConnection();
     }
     
     @Test
-    void assertCheckTargetDataSources() throws SQLException {
+    void assertCheckTargetDataSource() throws SQLException {
         when(dataSource.getConnection()).thenReturn(connection);
         when(connection.prepareStatement("SELECT * FROM t_order LIMIT 
1")).thenReturn(preparedStatement);
         when(preparedStatement.executeQuery()).thenReturn(resultSet);
-        ImporterConfiguration importerConfig = 
mock(ImporterConfiguration.class, RETURNS_DEEP_STUBS);
-        
when(importerConfig.getTableAndSchemaNameMapper().getQualifiedTables()).thenReturn(Collections.singleton(new
 QualifiedTable(null, "t_order")));
-        assertDoesNotThrow(() -> 
pipelineDataSourceCheckEngine.checkTargetDataSources(dataSources, 
importerConfig));
+        assertDoesNotThrow(() -> 
pipelineDataSourceCheckEngine.checkTargetDataSource(dataSource, 
qualifiedTables));
     }
     
     @Test
-    void assertCheckTargetDataSourcesFailed() throws SQLException {
+    void assertCheckTargetDataSourceFailed() throws SQLException {
         when(dataSource.getConnection()).thenReturn(connection);
         when(connection.prepareStatement("SELECT * FROM t_order LIMIT 
1")).thenReturn(preparedStatement);
         when(preparedStatement.executeQuery()).thenReturn(resultSet);
         when(resultSet.next()).thenReturn(true);
-        ImporterConfiguration importerConfig = 
mock(ImporterConfiguration.class, RETURNS_DEEP_STUBS);
-        
when(importerConfig.getTableAndSchemaNameMapper().getQualifiedTables()).thenReturn(Collections.singleton(new
 QualifiedTable(null, "t_order")));
-        assertThrows(PrepareJobWithTargetTableNotEmptyException.class, () -> 
pipelineDataSourceCheckEngine.checkTargetDataSources(dataSources, 
importerConfig));
+        assertThrows(PrepareJobWithTargetTableNotEmptyException.class, () -> 
pipelineDataSourceCheckEngine.checkTargetDataSource(dataSource, 
qualifiedTables));
     }
     
     @Test
-    void assertCheckTargetDataSourcesWhenSQLExceptionThrown() throws 
SQLException {
+    void assertCheckTargetDataSourceWhenSQLExceptionThrown() throws 
SQLException {
         when(dataSource.getConnection()).thenThrow(new SQLException(""));
-        assertThrows(SQLWrapperException.class, () -> 
pipelineDataSourceCheckEngine.checkTargetDataSources(dataSources, 
mock(ImporterConfiguration.class)));
+        assertThrows(SQLWrapperException.class, () -> 
pipelineDataSourceCheckEngine.checkTargetDataSource(dataSource, 
qualifiedTables));
     }
 }
diff --git 
a/kernel/data-pipeline/scenario/migration/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java
 
b/kernel/data-pipeline/scenario/migration/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java
index fe3bcf8e4a7..a171568f809 100644
--- 
a/kernel/data-pipeline/scenario/migration/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java
+++ 
b/kernel/data-pipeline/scenario/migration/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java
@@ -88,7 +88,7 @@ public final class MigrationJobPreparer implements 
PipelineJobPreparer<Migration
                 
jobItemContext.getTaskConfig().getDumperContext().getCommonContext().getDataSourceConfig().getClass()),
                 () -> new UnsupportedSQLOperationException("Migration 
inventory dumper only support StandardPipelineDataSourceConfiguration."));
         DatabaseType sourceDatabaseType = 
jobItemContext.getJobConfig().getSourceDatabaseType();
-        new 
PipelineDataSourceCheckEngine(sourceDatabaseType).checkSourceDataSources(Collections.singleton(jobItemContext.getSourceDataSource()));
+        new 
PipelineDataSourceCheckEngine(sourceDatabaseType).checkSourceDataSource(jobItemContext.getSourceDataSource());
         ShardingSpherePreconditions.checkState(!jobItemContext.isStopping(), 
PipelineJobCancelingException::new);
         prepareAndCheckTargetWithLock(jobItemContext);
         ShardingSpherePreconditions.checkState(!jobItemContext.isStopping(), 
PipelineJobCancelingException::new);
@@ -131,8 +131,8 @@ public final class MigrationJobPreparer implements 
PipelineJobPreparer<Migration
         }
         if (null == jobItemContext.getInitProgress()) {
             PipelineDataSource targetDataSource = 
jobItemContext.getDataSourceManager().getDataSource(jobItemContext.getTaskConfig().getImporterConfig().getDataSourceConfig());
-            new 
PipelineDataSourceCheckEngine(jobItemContext.getJobConfig().getTargetDatabaseType()).checkTargetDataSources(Collections.singleton(targetDataSource),
-                    jobItemContext.getTaskConfig().getImporterConfig());
+            new 
PipelineDataSourceCheckEngine(jobItemContext.getJobConfig().getTargetDatabaseType())
+                    .checkTargetDataSource(targetDataSource, 
jobItemContext.getTaskConfig().getImporterConfig().getTableAndSchemaNameMapper().getQualifiedTables());
         }
     }
     

Reply via email to