This is an automated email from the ASF dual-hosted git repository.

sunnianjun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 1bfe759f3b7 Refactor DataSourceChecker (#27252)
1bfe759f3b7 is described below

commit 1bfe759f3b70675085be07276c41bcc3b3cb89f0
Author: Liang Zhang <[email protected]>
AuthorDate: Mon Jul 17 19:41:16 2023 +0800

    Refactor DataSourceChecker (#27252)
---
 .../spi/check/datasource/DataSourceChecker.java    | 30 ++--------
 .../core/preparer/PipelineJobPreparerUtils.java    | 17 +++---
 .../datasource/checker/BasicDataSourceChecker.java | 45 --------------
 ...urceChecker.java => DataSourceCheckEngine.java} | 68 ++++++++++++++++++----
 .../DataSourceCheckEngineTest.java}                | 33 ++++-------
 .../check/datasource/MySQLDataSourceChecker.java   | 21 ++-----
 .../datasource/MySQLDataSourceCheckerTest.java     | 24 ++++----
 .../datasource/OpenGaussDataSourceChecker.java     | 15 ++---
 .../datasource/PostgreSQLDataSourceChecker.java    | 15 ++---
 .../PostgreSQLDataSourceCheckerTest.java           |  7 +--
 .../core/fixture/FixtureDataSourceChecker.java     | 14 +----
 11 files changed, 108 insertions(+), 181 deletions(-)

diff --git 
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/check/datasource/DataSourceChecker.java
 
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/check/datasource/DataSourceChecker.java
index 1052c4e98c9..6bcfd3c4de9 100644
--- 
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/check/datasource/DataSourceChecker.java
+++ 
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/check/datasource/DataSourceChecker.java
@@ -17,12 +17,10 @@
 
 package org.apache.shardingsphere.data.pipeline.spi.check.datasource;
 
-import 
org.apache.shardingsphere.data.pipeline.api.config.TableNameSchemaNameMapping;
 import org.apache.shardingsphere.infra.spi.DatabaseTypedSPI;
 import org.apache.shardingsphere.infra.util.spi.annotation.SingletonSPI;
 
 import javax.sql.DataSource;
-import java.util.Collection;
 
 /**
  * Data source checker.
@@ -30,35 +28,17 @@ import java.util.Collection;
 @SingletonSPI
 public interface DataSourceChecker extends DatabaseTypedSPI {
     
-    /**
-     * Check data source connections.
-     *
-     * @param dataSources data sources
-     */
-    void checkConnection(Collection<? extends DataSource> dataSources);
-    
     /**
      * Check user privileges.
      *
-     * @param dataSources data sources
-     */
-    void checkPrivilege(Collection<? extends DataSource> dataSources);
-    
-    /**
-     * Check data source variables.
-     *
-     * @param dataSources data sources
+     * @param dataSource data source to be checked
      */
-    void checkVariable(Collection<? extends DataSource> dataSources);
+    void checkPrivilege(DataSource dataSource);
     
     /**
-     * Check table is empty.
+     * Check variables.
      *
-     * @param dataSources data sources
-     * @param tableNameSchemaNameMapping mapping
-     * @param logicTableNames logic table names
+     * @param dataSource data source to be checked
      */
-    // TODO rename to common usage name
-    // TODO Merge schemaName and tableNames
-    void checkTargetTable(Collection<? extends DataSource> dataSources, 
TableNameSchemaNameMapping tableNameSchemaNameMapping, Collection<String> 
logicTableNames);
+    void checkVariable(DataSource dataSource);
 }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/PipelineJobPreparerUtils.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/PipelineJobPreparerUtils.java
index 5968e2f450f..cf16ecf36cf 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/PipelineJobPreparerUtils.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/PipelineJobPreparerUtils.java
@@ -32,8 +32,7 @@ import 
org.apache.shardingsphere.data.pipeline.common.job.progress.JobItemIncrem
 import 
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.DataSourcePreparer;
 import 
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.PrepareTargetSchemasParameter;
 import 
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.PrepareTargetTablesParameter;
-import 
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.checker.BasicDataSourceChecker;
-import 
org.apache.shardingsphere.data.pipeline.spi.check.datasource.DataSourceChecker;
+import 
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.checker.DataSourceCheckEngine;
 import 
org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.IncrementalDumperCreator;
 import 
org.apache.shardingsphere.data.pipeline.spi.ingest.position.PositionInitializer;
 import org.apache.shardingsphere.infra.database.type.BranchDatabaseType;
@@ -151,10 +150,10 @@ public final class PipelineJobPreparerUtils {
         if (dataSources.isEmpty()) {
             return;
         }
-        DataSourceChecker dataSourceChecker = 
DatabaseTypedSPILoader.findService(DataSourceChecker.class, 
databaseType).orElseGet(() -> new 
BasicDataSourceChecker(databaseType.getType()));
-        dataSourceChecker.checkConnection(dataSources);
-        dataSourceChecker.checkPrivilege(dataSources);
-        dataSourceChecker.checkVariable(dataSources);
+        DataSourceCheckEngine dataSourceCheckEngine = new 
DataSourceCheckEngine(databaseType);
+        dataSourceCheckEngine.checkConnection(dataSources);
+        dataSourceCheckEngine.checkPrivilege(dataSources);
+        dataSourceCheckEngine.checkVariable(dataSources);
     }
     
     /**
@@ -169,9 +168,9 @@ public final class PipelineJobPreparerUtils {
             log.info("target data source is empty, skip check");
             return;
         }
-        DataSourceChecker dataSourceChecker = 
DatabaseTypedSPILoader.findService(DataSourceChecker.class, 
databaseType).orElseGet(() -> new 
BasicDataSourceChecker(databaseType.getType()));
-        dataSourceChecker.checkConnection(targetDataSources);
-        dataSourceChecker.checkTargetTable(targetDataSources, 
importerConfig.getTableNameSchemaNameMapping(), 
importerConfig.getLogicTableNames());
+        DataSourceCheckEngine dataSourceCheckEngine = new 
DataSourceCheckEngine(databaseType);
+        dataSourceCheckEngine.checkConnection(targetDataSources);
+        dataSourceCheckEngine.checkTargetTable(targetDataSources, 
importerConfig.getTableNameSchemaNameMapping(), 
importerConfig.getLogicTableNames());
     }
     
     /**
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/checker/BasicDataSourceChecker.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/checker/BasicDataSourceChecker.java
deleted file mode 100644
index f00e9ea561d..00000000000
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/checker/BasicDataSourceChecker.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package 
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.checker;
-
-import lombok.RequiredArgsConstructor;
-
-import javax.sql.DataSource;
-import java.util.Collection;
-
-/**
- * Basic data source checker.
- */
-@RequiredArgsConstructor
-public final class BasicDataSourceChecker extends AbstractDataSourceChecker {
-    
-    private final String databaseType;
-    
-    @Override
-    public void checkPrivilege(final Collection<? extends DataSource> 
dataSources) {
-    }
-    
-    @Override
-    public void checkVariable(final Collection<? extends DataSource> 
dataSources) {
-    }
-    
-    @Override
-    public String getDatabaseType() {
-        return databaseType;
-    }
-}
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/checker/AbstractDataSourceChecker.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/checker/DataSourceCheckEngine.java
similarity index 59%
rename from 
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/checker/AbstractDataSourceChecker.java
rename to 
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/checker/DataSourceCheckEngine.java
index 2244aa8b4a2..bd388e13688 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/checker/AbstractDataSourceChecker.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/checker/DataSourceCheckEngine.java
@@ -17,14 +17,13 @@
 
 package 
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.checker;
 
-import lombok.extern.slf4j.Slf4j;
 import 
org.apache.shardingsphere.data.pipeline.api.config.TableNameSchemaNameMapping;
 import 
org.apache.shardingsphere.data.pipeline.common.sqlbuilder.PipelineCommonSQLBuilder;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithInvalidConnectionException;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithTargetTableNotEmptyException;
 import 
org.apache.shardingsphere.data.pipeline.spi.check.datasource.DataSourceChecker;
 import org.apache.shardingsphere.infra.database.type.DatabaseType;
-import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPILoader;
+import org.apache.shardingsphere.infra.spi.DatabaseTypedSPILoader;
 
 import javax.sql.DataSource;
 import java.sql.Connection;
@@ -34,13 +33,26 @@ import java.sql.SQLException;
 import java.util.Collection;
 
 /**
- * Abstract data source checker.
+ * Data source check engine.
  */
-@Slf4j
-public abstract class AbstractDataSourceChecker implements DataSourceChecker {
+public final class DataSourceCheckEngine {
     
-    @Override
-    public final void checkConnection(final Collection<? extends DataSource> 
dataSources) {
+    private final DatabaseType databaseType;
+    
+    private final DataSourceChecker dataSourceChecker;
+    
+    public DataSourceCheckEngine(final DatabaseType databaseType) {
+        this.databaseType = databaseType;
+        dataSourceChecker = 
DatabaseTypedSPILoader.findService(DataSourceChecker.class, 
databaseType).orElse(null);
+    }
+    
+    /**
+     * Check data source connections.
+     *
+     * @param dataSources data sources
+     * @throws PrepareJobWithInvalidConnectionException prepare job with 
invalid connection exception
+     */
+    public void checkConnection(final Collection<? extends DataSource> 
dataSources) {
         try {
             for (DataSource each : dataSources) {
                 each.getConnection().close();
@@ -50,8 +62,17 @@ public abstract class AbstractDataSourceChecker implements 
DataSourceChecker {
         }
     }
     
-    @Override
-    public final void checkTargetTable(final Collection<? extends DataSource> 
dataSources, final TableNameSchemaNameMapping tableNameSchemaNameMapping, final 
Collection<String> logicTableNames) {
+    /**
+     * Check table is empty.
+     *
+     * @param dataSources data sources
+     * @param tableNameSchemaNameMapping mapping
+     * @param logicTableNames logic table names
+     * @throws PrepareJobWithInvalidConnectionException prepare job with 
invalid connection exception
+     */
+    // TODO rename to common usage name
+    // TODO Merge schemaName and tableNames
+    public void checkTargetTable(final Collection<? extends DataSource> 
dataSources, final TableNameSchemaNameMapping tableNameSchemaNameMapping, final 
Collection<String> logicTableNames) {
         try {
             for (DataSource each : dataSources) {
                 for (String tableName : logicTableNames) {
@@ -66,7 +87,6 @@ public abstract class AbstractDataSourceChecker implements 
DataSourceChecker {
     }
     
     private boolean checkEmpty(final DataSource dataSource, final String 
schemaName, final String tableName) throws SQLException {
-        DatabaseType databaseType = 
TypedSPILoader.getService(DatabaseType.class, getDatabaseType());
         PipelineCommonSQLBuilder pipelineSQLBuilder = new 
PipelineCommonSQLBuilder(databaseType);
         String sql = pipelineSQLBuilder.buildCheckEmptySQL(schemaName, 
tableName);
         try (
@@ -76,4 +96,32 @@ public abstract class AbstractDataSourceChecker implements 
DataSourceChecker {
             return !resultSet.next();
         }
     }
+    
+    /**
+     * Check user privileges.
+     *
+     * @param dataSources data sources
+     */
+    public void checkPrivilege(final Collection<? extends DataSource> 
dataSources) {
+        if (null == dataSourceChecker) {
+            return;
+        }
+        for (DataSource each : dataSources) {
+            dataSourceChecker.checkPrivilege(each);
+        }
+    }
+    
+    /**
+     * Check data source variables.
+     *
+     * @param dataSources data sources
+     */
+    public void checkVariable(final Collection<? extends DataSource> 
dataSources) {
+        if (null == dataSourceChecker) {
+            return;
+        }
+        for (DataSource each : dataSources) {
+            dataSourceChecker.checkVariable(each);
+        }
+    }
 }
diff --git 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/AbstractDataSourceCheckerTest.java
 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/checker/DataSourceCheckEngineTest.java
similarity index 75%
rename from 
kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/AbstractDataSourceCheckerTest.java
rename to 
kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/checker/DataSourceCheckEngineTest.java
index 0a912bb59a4..dbebfa6f1b4 100644
--- 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/AbstractDataSourceCheckerTest.java
+++ 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/checker/DataSourceCheckEngineTest.java
@@ -15,12 +15,13 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.core.preparer.datasource;
+package 
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.checker;
 
 import 
org.apache.shardingsphere.data.pipeline.api.config.TableNameSchemaNameMapping;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithInvalidConnectionException;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithTargetTableNotEmptyException;
-import 
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.checker.AbstractDataSourceChecker;
+import org.apache.shardingsphere.infra.database.type.DatabaseType;
+import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPILoader;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
@@ -41,12 +42,12 @@ import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 @ExtendWith(MockitoExtension.class)
-class AbstractDataSourceCheckerTest {
+class DataSourceCheckEngineTest {
     
     @Mock(extraInterfaces = AutoCloseable.class)
     private DataSource dataSource;
     
-    private AbstractDataSourceChecker dataSourceChecker;
+    private DataSourceCheckEngine dataSourceCheckEngine;
     
     private Collection<DataSource> dataSources;
     
@@ -61,21 +62,7 @@ class AbstractDataSourceCheckerTest {
     
     @BeforeEach
     void setUp() {
-        dataSourceChecker = new AbstractDataSourceChecker() {
-            
-            @Override
-            public void checkPrivilege(final Collection<? extends DataSource> 
dataSources) {
-            }
-            
-            @Override
-            public void checkVariable(final Collection<? extends DataSource> 
dataSources) {
-            }
-            
-            @Override
-            public String getDatabaseType() {
-                return "FIXTURE";
-            }
-        };
+        dataSourceCheckEngine = new 
DataSourceCheckEngine(TypedSPILoader.getService(DatabaseType.class, "FIXTURE"));
         dataSources = new LinkedList<>();
         dataSources.add(dataSource);
     }
@@ -83,14 +70,14 @@ class AbstractDataSourceCheckerTest {
     @Test
     void assertCheckConnection() throws SQLException {
         when(dataSource.getConnection()).thenReturn(connection);
-        dataSourceChecker.checkConnection(dataSources);
+        dataSourceCheckEngine.checkConnection(dataSources);
         verify(dataSource).getConnection();
     }
     
     @Test
     void assertCheckConnectionFailed() throws SQLException {
         when(dataSource.getConnection()).thenThrow(new SQLException("error"));
-        assertThrows(PrepareJobWithInvalidConnectionException.class, () -> 
dataSourceChecker.checkConnection(dataSources));
+        assertThrows(PrepareJobWithInvalidConnectionException.class, () -> 
dataSourceCheckEngine.checkConnection(dataSources));
     }
     
     @Test
@@ -98,7 +85,7 @@ class AbstractDataSourceCheckerTest {
         when(dataSource.getConnection()).thenReturn(connection);
         when(connection.prepareStatement("SELECT * FROM t_order LIMIT 
1")).thenReturn(preparedStatement);
         when(preparedStatement.executeQuery()).thenReturn(resultSet);
-        dataSourceChecker.checkTargetTable(dataSources, new 
TableNameSchemaNameMapping(Collections.emptyMap()), 
Collections.singletonList("t_order"));
+        dataSourceCheckEngine.checkTargetTable(dataSources, new 
TableNameSchemaNameMapping(Collections.emptyMap()), 
Collections.singletonList("t_order"));
     }
     
     @Test
@@ -108,6 +95,6 @@ class AbstractDataSourceCheckerTest {
         when(preparedStatement.executeQuery()).thenReturn(resultSet);
         when(resultSet.next()).thenReturn(true);
         assertThrows(PrepareJobWithTargetTableNotEmptyException.class,
-                () -> dataSourceChecker.checkTargetTable(dataSources, new 
TableNameSchemaNameMapping(Collections.emptyMap()), 
Collections.singletonList("t_order")));
+                () -> dataSourceCheckEngine.checkTargetTable(dataSources, new 
TableNameSchemaNameMapping(Collections.emptyMap()), 
Collections.singletonList("t_order")));
     }
 }
diff --git 
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/check/datasource/MySQLDataSourceChecker.java
 
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/check/datasource/MySQLDataSourceChecker.java
index b03acf9fb4d..673818b1141 100644
--- 
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/check/datasource/MySQLDataSourceChecker.java
+++ 
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/check/datasource/MySQLDataSourceChecker.java
@@ -20,7 +20,7 @@ package 
org.apache.shardingsphere.data.pipeline.mysql.check.datasource;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithCheckPrivilegeFailedException;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithInvalidSourceDataSourceException;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithoutEnoughPrivilegeException;
-import 
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.checker.AbstractDataSourceChecker;
+import 
org.apache.shardingsphere.data.pipeline.spi.check.datasource.DataSourceChecker;
 import 
org.apache.shardingsphere.infra.util.exception.ShardingSpherePreconditions;
 
 import javax.sql.DataSource;
@@ -29,7 +29,6 @@ import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.util.Arrays;
-import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -38,7 +37,7 @@ import java.util.stream.Collectors;
 /**
  * Data source checker for MySQL.
  */
-public final class MySQLDataSourceChecker extends AbstractDataSourceChecker {
+public final class MySQLDataSourceChecker implements DataSourceChecker {
     
     private static final String SHOW_GRANTS_SQL = "SHOW GRANTS";
     
@@ -57,13 +56,7 @@ public final class MySQLDataSourceChecker extends 
AbstractDataSourceChecker {
     }
     
     @Override
-    public void checkPrivilege(final Collection<? extends DataSource> 
dataSources) {
-        for (DataSource each : dataSources) {
-            checkPrivilege(each);
-        }
-    }
-    
-    private void checkPrivilege(final DataSource dataSource) {
+    public void checkPrivilege(final DataSource dataSource) {
         try (
                 Connection connection = dataSource.getConnection();
                 PreparedStatement preparedStatement = 
connection.prepareStatement(SHOW_GRANTS_SQL);
@@ -85,13 +78,7 @@ public final class MySQLDataSourceChecker extends 
AbstractDataSourceChecker {
     }
     
     @Override
-    public void checkVariable(final Collection<? extends DataSource> 
dataSources) {
-        for (DataSource each : dataSources) {
-            checkVariable(each);
-        }
-    }
-    
-    private void checkVariable(final DataSource dataSource) {
+    public void checkVariable(final DataSource dataSource) {
         try (
                 Connection connection = dataSource.getConnection();
                 PreparedStatement preparedStatement = 
connection.prepareStatement(SHOW_VARIABLES_SQL)) {
diff --git 
a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/check/datasource/MySQLDataSourceCheckerTest.java
 
b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/check/datasource/MySQLDataSourceCheckerTest.java
index 0c85a27ceb4..513e1462bff 100644
--- 
a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/check/datasource/MySQLDataSourceCheckerTest.java
+++ 
b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/check/datasource/MySQLDataSourceCheckerTest.java
@@ -23,6 +23,7 @@ import 
org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWith
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Answers;
 import org.mockito.Mock;
 import org.mockito.junit.jupiter.MockitoExtension;
 
@@ -30,14 +31,10 @@ import javax.sql.DataSource;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
-import java.util.Collection;
-import java.util.Collections;
 
 import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.mockito.ArgumentMatchers.anyString;
-import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
-import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
@@ -51,13 +48,12 @@ class MySQLDataSourceCheckerTest {
     @Mock
     private ResultSet resultSet;
     
-    private Collection<DataSource> dataSources;
+    @Mock(answer = Answers.RETURNS_DEEP_STUBS)
+    private DataSource dataSource;
     
     @BeforeEach
     void setUp() throws SQLException {
-        DataSource dataSource = mock(DataSource.class, RETURNS_DEEP_STUBS);
         
when(dataSource.getConnection().prepareStatement(anyString())).thenReturn(preparedStatement);
-        dataSources = Collections.singleton(dataSource);
         when(preparedStatement.executeQuery()).thenReturn(resultSet);
     }
     
@@ -65,7 +61,7 @@ class MySQLDataSourceCheckerTest {
     void assertCheckPrivilegeWithParticularSuccess() throws SQLException {
         when(resultSet.next()).thenReturn(true);
         when(resultSet.getString(1)).thenReturn("GRANT REPLICATION SLAVE, 
REPLICATION CLIENT ON *.* TO '%'@'%'");
-        new MySQLDataSourceChecker().checkPrivilege(dataSources);
+        new MySQLDataSourceChecker().checkPrivilege(dataSource);
         verify(preparedStatement).executeQuery();
     }
     
@@ -73,19 +69,19 @@ class MySQLDataSourceCheckerTest {
     void assertCheckPrivilegeWithAllSuccess() throws SQLException {
         when(resultSet.next()).thenReturn(true);
         when(resultSet.getString(1)).thenReturn("GRANT ALL PRIVILEGES CLIENT 
ON *.* TO '%'@'%'");
-        new MySQLDataSourceChecker().checkPrivilege(dataSources);
+        new MySQLDataSourceChecker().checkPrivilege(dataSource);
         verify(preparedStatement).executeQuery();
     }
     
     @Test
     void assertCheckPrivilegeLackPrivileges() {
-        assertThrows(PrepareJobWithoutEnoughPrivilegeException.class, () -> 
new MySQLDataSourceChecker().checkPrivilege(dataSources));
+        assertThrows(PrepareJobWithoutEnoughPrivilegeException.class, () -> 
new MySQLDataSourceChecker().checkPrivilege(dataSource));
     }
     
     @Test
     void assertCheckPrivilegeFailure() throws SQLException {
         when(resultSet.next()).thenThrow(new SQLException(""));
-        assertThrows(PrepareJobWithCheckPrivilegeFailedException.class, () -> 
new MySQLDataSourceChecker().checkPrivilege(dataSources));
+        assertThrows(PrepareJobWithCheckPrivilegeFailedException.class, () -> 
new MySQLDataSourceChecker().checkPrivilege(dataSource));
     }
     
     @Test
@@ -93,7 +89,7 @@ class MySQLDataSourceCheckerTest {
         when(resultSet.next()).thenReturn(true, true, true, false);
         when(resultSet.getString(1)).thenReturn("LOG_BIN", "BINLOG_FORMAT", 
"BINLOG_ROW_IMAGE");
         when(resultSet.getString(2)).thenReturn("ON", "ROW", "FULL");
-        assertDoesNotThrow(() -> new 
MySQLDataSourceChecker().checkVariable(dataSources));
+        assertDoesNotThrow(() -> new 
MySQLDataSourceChecker().checkVariable(dataSource));
         verify(preparedStatement, times(1)).executeQuery();
     }
     
@@ -102,12 +98,12 @@ class MySQLDataSourceCheckerTest {
         when(resultSet.next()).thenReturn(true, true, false);
         when(resultSet.getString(1)).thenReturn("BINLOG_FORMAT", "LOG_BIN");
         when(resultSet.getString(2)).thenReturn("ROW", "OFF");
-        assertThrows(PrepareJobWithInvalidSourceDataSourceException.class, () 
-> new MySQLDataSourceChecker().checkVariable(dataSources));
+        assertThrows(PrepareJobWithInvalidSourceDataSourceException.class, () 
-> new MySQLDataSourceChecker().checkVariable(dataSource));
     }
     
     @Test
     void assertCheckVariableFailure() throws SQLException {
         when(resultSet.next()).thenThrow(new SQLException(""));
-        assertThrows(PrepareJobWithCheckPrivilegeFailedException.class, () -> 
new MySQLDataSourceChecker().checkVariable(dataSources));
+        assertThrows(PrepareJobWithCheckPrivilegeFailedException.class, () -> 
new MySQLDataSourceChecker().checkVariable(dataSource));
     }
 }
diff --git 
a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/check/datasource/OpenGaussDataSourceChecker.java
 
b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/check/datasource/OpenGaussDataSourceChecker.java
index e80cf8c5b3f..83e2fc1749e 100644
--- 
a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/check/datasource/OpenGaussDataSourceChecker.java
+++ 
b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/check/datasource/OpenGaussDataSourceChecker.java
@@ -20,7 +20,7 @@ package 
org.apache.shardingsphere.data.pipeline.opengauss.check.datasource;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithCheckPrivilegeFailedException;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithoutEnoughPrivilegeException;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithoutUserException;
-import 
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.checker.AbstractDataSourceChecker;
+import 
org.apache.shardingsphere.data.pipeline.spi.check.datasource.DataSourceChecker;
 import 
org.apache.shardingsphere.infra.util.exception.ShardingSpherePreconditions;
 
 import javax.sql.DataSource;
@@ -29,24 +29,17 @@ import java.sql.DatabaseMetaData;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
-import java.util.Collection;
 import java.util.Collections;
 
 /**
  * Data source checker of openGauss.
  */
-public final class OpenGaussDataSourceChecker extends 
AbstractDataSourceChecker {
+public final class OpenGaussDataSourceChecker implements DataSourceChecker {
     
     private static final String SHOW_GRANTS_SQL = "SELECT * FROM pg_roles 
WHERE rolname = ?";
     
     @Override
-    public void checkPrivilege(final Collection<? extends DataSource> 
dataSources) {
-        for (DataSource each : dataSources) {
-            checkPrivilege(each);
-        }
-    }
-    
-    private void checkPrivilege(final DataSource dataSource) {
+    public void checkPrivilege(final DataSource dataSource) {
         try (Connection connection = dataSource.getConnection(); 
PreparedStatement preparedStatement = 
connection.prepareStatement(SHOW_GRANTS_SQL)) {
             DatabaseMetaData metaData = connection.getMetaData();
             preparedStatement.setString(1, metaData.getUserName());
@@ -65,7 +58,7 @@ public final class OpenGaussDataSourceChecker extends 
AbstractDataSourceChecker
     }
     
     @Override
-    public void checkVariable(final Collection<? extends DataSource> 
dataSources) {
+    public void checkVariable(final DataSource dataSource) {
     }
     
     @Override
diff --git 
a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/check/datasource/PostgreSQLDataSourceChecker.java
 
b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/check/datasource/PostgreSQLDataSourceChecker.java
index 2a397371035..5bc0e06ea1b 100644
--- 
a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/check/datasource/PostgreSQLDataSourceChecker.java
+++ 
b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/check/datasource/PostgreSQLDataSourceChecker.java
@@ -21,7 +21,7 @@ import lombok.extern.slf4j.Slf4j;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithCheckPrivilegeFailedException;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithoutEnoughPrivilegeException;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithoutUserException;
-import 
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.checker.AbstractDataSourceChecker;
+import 
org.apache.shardingsphere.data.pipeline.spi.check.datasource.DataSourceChecker;
 import 
org.apache.shardingsphere.infra.util.exception.ShardingSpherePreconditions;
 
 import javax.sql.DataSource;
@@ -30,25 +30,18 @@ import java.sql.DatabaseMetaData;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
-import java.util.Collection;
 import java.util.Collections;
 
 /**
  * PostgreSQL Data source checker.
  */
 @Slf4j
-public final class PostgreSQLDataSourceChecker extends 
AbstractDataSourceChecker {
+public final class PostgreSQLDataSourceChecker implements DataSourceChecker {
     
     private static final String SHOW_GRANTS_SQL = "SELECT * FROM pg_roles 
WHERE rolname = ?";
     
     @Override
-    public void checkPrivilege(final Collection<? extends DataSource> 
dataSources) {
-        for (DataSource each : dataSources) {
-            checkPrivilege(each);
-        }
-    }
-    
-    private void checkPrivilege(final DataSource dataSource) {
+    public void checkPrivilege(final DataSource dataSource) {
         try (Connection connection = dataSource.getConnection(); 
PreparedStatement preparedStatement = 
connection.prepareStatement(SHOW_GRANTS_SQL)) {
             DatabaseMetaData metaData = connection.getMetaData();
             preparedStatement.setString(1, metaData.getUserName());
@@ -67,7 +60,7 @@ public final class PostgreSQLDataSourceChecker extends 
AbstractDataSourceChecker
     }
     
     @Override
-    public void checkVariable(final Collection<? extends DataSource> 
dataSources) {
+    public void checkVariable(final DataSource dataSource) {
     }
     
     @Override
diff --git 
a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/check/datasource/PostgreSQLDataSourceCheckerTest.java
 
b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/check/datasource/PostgreSQLDataSourceCheckerTest.java
index c532fe824ee..17471ac5643 100644
--- 
a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/check/datasource/PostgreSQLDataSourceCheckerTest.java
+++ 
b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/check/datasource/PostgreSQLDataSourceCheckerTest.java
@@ -30,7 +30,6 @@ import java.sql.DatabaseMetaData;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
-import java.util.Collections;
 
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.mockito.ArgumentMatchers.anyString;
@@ -71,7 +70,7 @@ class PostgreSQLDataSourceCheckerTest {
         PostgreSQLDataSourceChecker dataSourceChecker = new 
PostgreSQLDataSourceChecker();
         when(resultSet.getString("rolreplication")).thenReturn("t");
         when(resultSet.getString("rolsuper")).thenReturn("f");
-        
dataSourceChecker.checkPrivilege(Collections.singletonList(dataSource));
+        dataSourceChecker.checkPrivilege(dataSource);
         verify(resultSet, atLeastOnce()).getString("rolsuper");
     }
     
@@ -80,7 +79,7 @@ class PostgreSQLDataSourceCheckerTest {
         PostgreSQLDataSourceChecker dataSourceChecker = new 
PostgreSQLDataSourceChecker();
         when(resultSet.getString("rolsuper")).thenReturn("t");
         when(resultSet.getString("rolreplication")).thenReturn("f");
-        
dataSourceChecker.checkPrivilege(Collections.singletonList(dataSource));
+        dataSourceChecker.checkPrivilege(dataSource);
         verify(resultSet, atLeastOnce()).getString("rolreplication");
     }
     
@@ -89,7 +88,7 @@ class PostgreSQLDataSourceCheckerTest {
         PostgreSQLDataSourceChecker dataSourceChecker = new 
PostgreSQLDataSourceChecker();
         when(resultSet.getString("rolsuper")).thenReturn("f");
         when(resultSet.getString("rolreplication")).thenReturn("f");
-        assertThrows(PrepareJobWithoutEnoughPrivilegeException.class, () -> 
dataSourceChecker.checkPrivilege(Collections.singletonList(dataSource)));
+        assertThrows(PrepareJobWithoutEnoughPrivilegeException.class, () -> 
dataSourceChecker.checkPrivilege(dataSource));
         verify(resultSet, atLeastOnce()).getString("rolreplication");
     }
 }
diff --git 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/FixtureDataSourceChecker.java
 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/FixtureDataSourceChecker.java
index c912f36169f..db12856d6e7 100644
--- 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/FixtureDataSourceChecker.java
+++ 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/FixtureDataSourceChecker.java
@@ -17,28 +17,18 @@
 
 package org.apache.shardingsphere.test.it.data.pipeline.core.fixture;
 
-import 
org.apache.shardingsphere.data.pipeline.api.config.TableNameSchemaNameMapping;
 import 
org.apache.shardingsphere.data.pipeline.spi.check.datasource.DataSourceChecker;
 
 import javax.sql.DataSource;
-import java.util.Collection;
 
 public final class FixtureDataSourceChecker implements DataSourceChecker {
     
     @Override
-    public void checkConnection(final Collection<? extends DataSource> 
dataSources) {
+    public void checkPrivilege(final DataSource dataSource) {
     }
     
     @Override
-    public void checkPrivilege(final Collection<? extends DataSource> 
dataSources) {
-    }
-    
-    @Override
-    public void checkVariable(final Collection<? extends DataSource> 
dataSources) {
-    }
-    
-    @Override
-    public void checkTargetTable(final Collection<? extends DataSource> 
dataSources, final TableNameSchemaNameMapping tableNameSchemaNameMapping, final 
Collection<String> tableNames) {
+    public void checkVariable(final DataSource dataSource) {
     }
     
     @Override


Reply via email to