This is an automated email from the ASF dual-hosted git repository.

zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 0bb3d8ebbb1 Add DialectPipelineDatabaseVariableChecker (#32586)
0bb3d8ebbb1 is described below

commit 0bb3d8ebbb188bb7cec6b4fb27e8e8feba42a296
Author: Liang Zhang <[email protected]>
AuthorDate: Sun Aug 18 18:13:34 2024 +0800

    Add DialectPipelineDatabaseVariableChecker (#32586)
    
    * Refactor PipelineChannelAckCallback
    
    * Add DialectPipelineDatabaseVariableChecker
    
    * Add DialectPipelineDatabaseVariableChecker
    
    * Add DialectPipelineDatabaseVariableChecker
---
 .../checker/DialectDatabaseEnvironmentChecker.java |  7 --
 .../h2/checker/H2DatabaseEnvironmentChecker.java   |  4 -
 .../checker/MySQLDatabaseEnvironmentChecker.java   | 37 ---------
 .../MySQLDatabaseEnvironmentCheckerTest.java       | 28 -------
 .../OpenGaussDatabaseEnvironmentChecker.java       |  4 -
 .../PostgreSQLDatabaseEnvironmentChecker.java      |  4 -
 .../core/channel/PipelineChannelAckCallback.java   |  1 +
 .../DialectPipelineDatabaseVariableChecker.java    | 16 +---
 ...ine.java => PipelineDataSourceCheckEngine.java} | 18 +++--
 ...java => PipelineDataSourceCheckEngineTest.java} | 16 ++--
 .../MySQLPipelineDatabaseVariableChecker.java      | 78 +++++++++++++++++++
 ....checker.DialectPipelineDatabaseVariableChecker | 18 +++++
 .../MySQLPipelineDatabaseVariableCheckerTest.java  | 89 ++++++++++++++++++++++
 .../migration/preparer/MigrationJobPreparer.java   |  6 +-
 14 files changed, 212 insertions(+), 114 deletions(-)

diff --git 
a/infra/database/core/src/main/java/org/apache/shardingsphere/infra/database/core/checker/DialectDatabaseEnvironmentChecker.java
 
b/infra/database/core/src/main/java/org/apache/shardingsphere/infra/database/core/checker/DialectDatabaseEnvironmentChecker.java
index c928b4d796c..84cc9724e19 100644
--- 
a/infra/database/core/src/main/java/org/apache/shardingsphere/infra/database/core/checker/DialectDatabaseEnvironmentChecker.java
+++ 
b/infra/database/core/src/main/java/org/apache/shardingsphere/infra/database/core/checker/DialectDatabaseEnvironmentChecker.java
@@ -35,11 +35,4 @@ public interface DialectDatabaseEnvironmentChecker extends 
DatabaseTypedSPI {
      * @param privilegeCheckType privilege check type
      */
     void checkPrivilege(DataSource dataSource, PrivilegeCheckType 
privilegeCheckType);
-    
-    /**
-     * Check variables.
-     *
-     * @param dataSource data source to be checked
-     */
-    void checkVariable(DataSource dataSource);
 }
diff --git 
a/infra/database/type/h2/src/main/java/org/apache/shardingsphere/infra/database/h2/checker/H2DatabaseEnvironmentChecker.java
 
b/infra/database/type/h2/src/main/java/org/apache/shardingsphere/infra/database/h2/checker/H2DatabaseEnvironmentChecker.java
index 2b05baa77d7..7094b726745 100644
--- 
a/infra/database/type/h2/src/main/java/org/apache/shardingsphere/infra/database/h2/checker/H2DatabaseEnvironmentChecker.java
+++ 
b/infra/database/type/h2/src/main/java/org/apache/shardingsphere/infra/database/h2/checker/H2DatabaseEnvironmentChecker.java
@@ -31,10 +31,6 @@ public final class H2DatabaseEnvironmentChecker implements 
DialectDatabaseEnviro
     public void checkPrivilege(final DataSource dataSource, final 
PrivilegeCheckType privilegeCheckType) {
     }
     
-    @Override
-    public void checkVariable(final DataSource dataSource) {
-    }
-    
     @Override
     public String getDatabaseType() {
         return "H2";
diff --git 
a/infra/database/type/mysql/src/main/java/org/apache/shardingsphere/infra/database/mysql/checker/MySQLDatabaseEnvironmentChecker.java
 
b/infra/database/type/mysql/src/main/java/org/apache/shardingsphere/infra/database/mysql/checker/MySQLDatabaseEnvironmentChecker.java
index b35a39d401d..358071b6a3d 100644
--- 
a/infra/database/type/mysql/src/main/java/org/apache/shardingsphere/infra/database/mysql/checker/MySQLDatabaseEnvironmentChecker.java
+++ 
b/infra/database/type/mysql/src/main/java/org/apache/shardingsphere/infra/database/mysql/checker/MySQLDatabaseEnvironmentChecker.java
@@ -21,8 +21,6 @@ import 
org.apache.shardingsphere.infra.database.core.checker.DialectDatabaseEnvi
 import 
org.apache.shardingsphere.infra.database.core.checker.PrivilegeCheckType;
 import 
org.apache.shardingsphere.infra.database.core.exception.CheckDatabaseEnvironmentFailedException;
 import 
org.apache.shardingsphere.infra.database.core.exception.MissingRequiredPrivilegeException;
-import 
org.apache.shardingsphere.infra.database.core.exception.UnexpectedVariableValueException;
-import 
org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
 
 import javax.sql.DataSource;
 import java.sql.Connection;
@@ -33,10 +31,7 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.EnumMap;
-import java.util.HashMap;
 import java.util.Map;
-import java.util.Map.Entry;
-import java.util.stream.Collectors;
 
 /**
  * Database environment checker for MySQL.
@@ -55,19 +50,10 @@ public final class MySQLDatabaseEnvironmentChecker 
implements DialectDatabaseEnv
     
     private static final Map<PrivilegeCheckType, Collection<String>> 
REQUIRED_PRIVILEGES_FOR_MESSAGE = new EnumMap<>(PrivilegeCheckType.class);
     
-    private static final Map<String, String> REQUIRED_VARIABLES = new 
HashMap<>(3, 1F);
-    
-    private static final String SHOW_VARIABLES_SQL;
-    
     static {
         REQUIRED_PRIVILEGES_FOR_MESSAGE.put(PrivilegeCheckType.PIPELINE, 
Arrays.asList("REPLICATION SLAVE", "REPLICATION CLIENT"));
         REQUIRED_PRIVILEGES_FOR_MESSAGE.put(PrivilegeCheckType.SELECT, 
Collections.singleton("SELECT ON DATABASE"));
         REQUIRED_PRIVILEGES_FOR_MESSAGE.put(PrivilegeCheckType.XA, 
Collections.singleton("XA_RECOVER_ADMIN"));
-        REQUIRED_VARIABLES.put("LOG_BIN", "ON");
-        REQUIRED_VARIABLES.put("BINLOG_FORMAT", "ROW");
-        // It does not exist in all versions of MySQL
-        REQUIRED_VARIABLES.put("BINLOG_ROW_IMAGE", "FULL");
-        SHOW_VARIABLES_SQL = String.format("SHOW VARIABLES WHERE Variable_name 
IN (%s)", REQUIRED_VARIABLES.keySet().stream().map(each -> 
"?").collect(Collectors.joining(",")));
     }
     
     @Override
@@ -119,29 +105,6 @@ public final class MySQLDatabaseEnvironmentChecker 
implements DialectDatabaseEnv
         return Arrays.stream(requiredPrivileges).anyMatch(each -> 
Arrays.stream(each).allMatch(grantedPrivileges::contains));
     }
     
-    @Override
-    public void checkVariable(final DataSource dataSource) {
-        try (
-                Connection connection = dataSource.getConnection();
-                PreparedStatement preparedStatement = 
connection.prepareStatement(SHOW_VARIABLES_SQL)) {
-            int parameterIndex = 1;
-            for (Entry<String, String> entry : REQUIRED_VARIABLES.entrySet()) {
-                preparedStatement.setString(parameterIndex++, entry.getKey());
-            }
-            try (ResultSet resultSet = preparedStatement.executeQuery()) {
-                while (resultSet.next()) {
-                    String variableName = resultSet.getString(1).toUpperCase();
-                    String expectedValue = 
REQUIRED_VARIABLES.get(variableName);
-                    String actualValue = resultSet.getString(2);
-                    
ShardingSpherePreconditions.checkState(expectedValue.equalsIgnoreCase(actualValue),
-                            () -> new 
UnexpectedVariableValueException(variableName, expectedValue, actualValue));
-                }
-            }
-        } catch (final SQLException ex) {
-            throw new CheckDatabaseEnvironmentFailedException(ex);
-        }
-    }
-    
     @Override
     public String getDatabaseType() {
         return "MySQL";
diff --git 
a/infra/database/type/mysql/src/test/java/org/apache/shardingsphere/infra/database/mysql/checker/MySQLDatabaseEnvironmentCheckerTest.java
 
b/infra/database/type/mysql/src/test/java/org/apache/shardingsphere/infra/database/mysql/checker/MySQLDatabaseEnvironmentCheckerTest.java
index d07ce71701a..2d96978e2e5 100644
--- 
a/infra/database/type/mysql/src/test/java/org/apache/shardingsphere/infra/database/mysql/checker/MySQLDatabaseEnvironmentCheckerTest.java
+++ 
b/infra/database/type/mysql/src/test/java/org/apache/shardingsphere/infra/database/mysql/checker/MySQLDatabaseEnvironmentCheckerTest.java
@@ -20,7 +20,6 @@ package 
org.apache.shardingsphere.infra.database.mysql.checker;
 import 
org.apache.shardingsphere.infra.database.core.checker.PrivilegeCheckType;
 import 
org.apache.shardingsphere.infra.database.core.exception.CheckDatabaseEnvironmentFailedException;
 import 
org.apache.shardingsphere.infra.database.core.exception.MissingRequiredPrivilegeException;
-import 
org.apache.shardingsphere.infra.database.core.exception.UnexpectedVariableValueException;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
@@ -33,7 +32,6 @@ import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 
-import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.Mockito.times;
@@ -88,32 +86,6 @@ class MySQLDatabaseEnvironmentCheckerTest {
         assertThrows(CheckDatabaseEnvironmentFailedException.class, () -> new 
MySQLDatabaseEnvironmentChecker().checkPrivilege(dataSource, 
PrivilegeCheckType.PIPELINE));
     }
     
-    @Test
-    void assertCheckVariableSuccess() throws SQLException {
-        when(preparedStatement.executeQuery()).thenReturn(resultSet);
-        when(resultSet.next()).thenReturn(true, true, true, false);
-        when(resultSet.getString(1)).thenReturn("LOG_BIN", "BINLOG_FORMAT", 
"BINLOG_ROW_IMAGE");
-        when(resultSet.getString(2)).thenReturn("ON", "ROW", "FULL");
-        assertDoesNotThrow(() -> new 
MySQLDatabaseEnvironmentChecker().checkVariable(dataSource));
-        verify(preparedStatement, times(1)).executeQuery();
-    }
-    
-    @Test
-    void assertCheckVariableWithWrongVariable() throws SQLException {
-        when(preparedStatement.executeQuery()).thenReturn(resultSet);
-        when(resultSet.next()).thenReturn(true, true, false);
-        when(resultSet.getString(1)).thenReturn("BINLOG_FORMAT", "LOG_BIN");
-        when(resultSet.getString(2)).thenReturn("ROW", "OFF");
-        assertThrows(UnexpectedVariableValueException.class, () -> new 
MySQLDatabaseEnvironmentChecker().checkVariable(dataSource));
-    }
-    
-    @Test
-    void assertCheckVariableFailure() throws SQLException {
-        when(preparedStatement.executeQuery()).thenReturn(resultSet);
-        when(resultSet.next()).thenThrow(new SQLException(""));
-        assertThrows(CheckDatabaseEnvironmentFailedException.class, () -> new 
MySQLDatabaseEnvironmentChecker().checkVariable(dataSource));
-    }
-    
     @Test
     void assertCheckXAPrivilegeWithParticularSuccessInMySQL8() throws 
SQLException {
         when(preparedStatement.executeQuery()).thenReturn(resultSet);
diff --git 
a/infra/database/type/opengauss/src/main/java/org/apache/shardingsphere/infra/database/opengauss/checker/OpenGaussDatabaseEnvironmentChecker.java
 
b/infra/database/type/opengauss/src/main/java/org/apache/shardingsphere/infra/database/opengauss/checker/OpenGaussDatabaseEnvironmentChecker.java
index c50cb1a6ee4..a83505e20ba 100644
--- 
a/infra/database/type/opengauss/src/main/java/org/apache/shardingsphere/infra/database/opengauss/checker/OpenGaussDatabaseEnvironmentChecker.java
+++ 
b/infra/database/type/opengauss/src/main/java/org/apache/shardingsphere/infra/database/opengauss/checker/OpenGaussDatabaseEnvironmentChecker.java
@@ -65,10 +65,6 @@ public final class OpenGaussDatabaseEnvironmentChecker 
implements DialectDatabas
         }
     }
     
-    @Override
-    public void checkVariable(final DataSource dataSource) {
-    }
-    
     @Override
     public String getDatabaseType() {
         return "openGauss";
diff --git 
a/infra/database/type/postgresql/src/main/java/org/apache/shardingsphere/infra/database/postgresql/checker/PostgreSQLDatabaseEnvironmentChecker.java
 
b/infra/database/type/postgresql/src/main/java/org/apache/shardingsphere/infra/database/postgresql/checker/PostgreSQLDatabaseEnvironmentChecker.java
index 324fbd7e0bf..9f72b08ed61 100644
--- 
a/infra/database/type/postgresql/src/main/java/org/apache/shardingsphere/infra/database/postgresql/checker/PostgreSQLDatabaseEnvironmentChecker.java
+++ 
b/infra/database/type/postgresql/src/main/java/org/apache/shardingsphere/infra/database/postgresql/checker/PostgreSQLDatabaseEnvironmentChecker.java
@@ -65,10 +65,6 @@ public final class PostgreSQLDatabaseEnvironmentChecker 
implements DialectDataba
         }
     }
     
-    @Override
-    public void checkVariable(final DataSource dataSource) {
-    }
-    
     @Override
     public String getDatabaseType() {
         return "PostgreSQL";
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/channel/PipelineChannelAckCallback.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/channel/PipelineChannelAckCallback.java
index 88f7b9d7bb0..768561ece94 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/channel/PipelineChannelAckCallback.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/channel/PipelineChannelAckCallback.java
@@ -24,6 +24,7 @@ import java.util.List;
 /**
  * Pipeline channel acknowledged callback.
  */
+@FunctionalInterface
 public interface PipelineChannelAckCallback {
     
     /**
diff --git 
a/infra/database/core/src/main/java/org/apache/shardingsphere/infra/database/core/checker/DialectDatabaseEnvironmentChecker.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/checker/DialectPipelineDatabaseVariableChecker.java
similarity index 69%
copy from 
infra/database/core/src/main/java/org/apache/shardingsphere/infra/database/core/checker/DialectDatabaseEnvironmentChecker.java
copy to 
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/checker/DialectPipelineDatabaseVariableChecker.java
index c928b4d796c..6b42321f952 100644
--- 
a/infra/database/core/src/main/java/org/apache/shardingsphere/infra/database/core/checker/DialectDatabaseEnvironmentChecker.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/checker/DialectPipelineDatabaseVariableChecker.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.infra.database.core.checker;
+package org.apache.shardingsphere.data.pipeline.core.checker;
 
 import org.apache.shardingsphere.infra.database.core.spi.DatabaseTypedSPI;
 import org.apache.shardingsphere.infra.spi.annotation.SingletonSPI;
@@ -23,23 +23,15 @@ import 
org.apache.shardingsphere.infra.spi.annotation.SingletonSPI;
 import javax.sql.DataSource;
 
 /**
- * Dialect database environment checker.
+ * Dialect pipeline database variable checker.
  */
 @SingletonSPI
-public interface DialectDatabaseEnvironmentChecker extends DatabaseTypedSPI {
-    
-    /**
-     * Check user privileges.
-     *
-     * @param dataSource data source to be checked
-     * @param privilegeCheckType privilege check type
-     */
-    void checkPrivilege(DataSource dataSource, PrivilegeCheckType 
privilegeCheckType);
+public interface DialectPipelineDatabaseVariableChecker extends 
DatabaseTypedSPI {
     
     /**
      * Check variables.
      *
      * @param dataSource data source to be checked
      */
-    void checkVariable(DataSource dataSource);
+    void check(DataSource dataSource);
 }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/checker/DataSourceCheckEngine.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/checker/PipelineDataSourceCheckEngine.java
similarity index 88%
rename from 
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/checker/DataSourceCheckEngine.java
rename to 
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/checker/PipelineDataSourceCheckEngine.java
index 89062655669..08cd90cb343 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/checker/DataSourceCheckEngine.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/checker/PipelineDataSourceCheckEngine.java
@@ -36,16 +36,19 @@ import java.sql.SQLException;
 import java.util.Collection;
 
 /**
- * Data source check engine.
+ * Pipeline data source check engine.
  */
-public final class DataSourceCheckEngine {
+public final class PipelineDataSourceCheckEngine {
     
     private final DialectDatabaseEnvironmentChecker checker;
     
+    private final DialectPipelineDatabaseVariableChecker variableChecker;
+    
     private final PipelinePrepareSQLBuilder sqlBuilder;
     
-    public DataSourceCheckEngine(final DatabaseType databaseType) {
+    public PipelineDataSourceCheckEngine(final DatabaseType databaseType) {
         checker = 
DatabaseTypedSPILoader.findService(DialectDatabaseEnvironmentChecker.class, 
databaseType).orElse(null);
+        variableChecker = 
DatabaseTypedSPILoader.findService(DialectPipelineDatabaseVariableChecker.class,
 databaseType).orElse(null);
         sqlBuilder = new PipelinePrepareSQLBuilder(databaseType);
     }
     
@@ -72,11 +75,12 @@ public final class DataSourceCheckEngine {
      */
     public void checkSourceDataSources(final Collection<DataSource> 
dataSources) {
         checkConnection(dataSources);
-        if (null == checker) {
-            return;
+        if (null != checker) {
+            dataSources.forEach(each -> checker.checkPrivilege(each, 
PrivilegeCheckType.PIPELINE));
+        }
+        if (null != variableChecker) {
+            dataSources.forEach(variableChecker::check);
         }
-        dataSources.forEach(each -> checker.checkPrivilege(each, 
PrivilegeCheckType.PIPELINE));
-        dataSources.forEach(checker::checkVariable);
     }
     
     /**
diff --git 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/DataSourceCheckEngineTest.java
 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/PipelineDataSourceCheckEngineTest.java
similarity index 85%
rename from 
kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/DataSourceCheckEngineTest.java
rename to 
kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/PipelineDataSourceCheckEngineTest.java
index ed7b92aeeab..024fa7ae7b7 100644
--- 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/DataSourceCheckEngineTest.java
+++ 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/PipelineDataSourceCheckEngineTest.java
@@ -17,7 +17,7 @@
 
 package org.apache.shardingsphere.data.pipeline.core.preparer.datasource;
 
-import 
org.apache.shardingsphere.data.pipeline.core.checker.DataSourceCheckEngine;
+import 
org.apache.shardingsphere.data.pipeline.core.checker.PipelineDataSourceCheckEngine;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithTargetTableNotEmptyException;
 import 
org.apache.shardingsphere.data.pipeline.core.importer.ImporterConfiguration;
 import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
@@ -45,12 +45,12 @@ import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 @ExtendWith(MockitoExtension.class)
-class DataSourceCheckEngineTest {
+class PipelineDataSourceCheckEngineTest {
     
     @Mock(extraInterfaces = AutoCloseable.class)
     private DataSource dataSource;
     
-    private DataSourceCheckEngine dataSourceCheckEngine;
+    private PipelineDataSourceCheckEngine pipelineDataSourceCheckEngine;
     
     private Collection<DataSource> dataSources;
     
@@ -65,7 +65,7 @@ class DataSourceCheckEngineTest {
     
     @BeforeEach
     void setUp() {
-        dataSourceCheckEngine = new 
DataSourceCheckEngine(TypedSPILoader.getService(DatabaseType.class, "FIXTURE"));
+        pipelineDataSourceCheckEngine = new 
PipelineDataSourceCheckEngine(TypedSPILoader.getService(DatabaseType.class, 
"FIXTURE"));
         dataSources = new LinkedList<>();
         dataSources.add(dataSource);
     }
@@ -73,14 +73,14 @@ class DataSourceCheckEngineTest {
     @Test
     void assertCheckConnection() throws SQLException {
         when(dataSource.getConnection()).thenReturn(connection);
-        dataSourceCheckEngine.checkConnection(dataSources);
+        pipelineDataSourceCheckEngine.checkConnection(dataSources);
         verify(dataSource).getConnection();
     }
     
     @Test
     void assertCheckConnectionFailed() throws SQLException {
         when(dataSource.getConnection()).thenThrow(new SQLException("error"));
-        assertThrows(SQLWrapperException.class, () -> 
dataSourceCheckEngine.checkConnection(dataSources));
+        assertThrows(SQLWrapperException.class, () -> 
pipelineDataSourceCheckEngine.checkConnection(dataSources));
     }
     
     @Test
@@ -90,7 +90,7 @@ class DataSourceCheckEngineTest {
         when(preparedStatement.executeQuery()).thenReturn(resultSet);
         ImporterConfiguration importerConfig = 
mock(ImporterConfiguration.class);
         
when(importerConfig.getQualifiedTables()).thenReturn(Collections.singleton(new 
CaseInsensitiveQualifiedTable(null, "t_order")));
-        dataSourceCheckEngine.checkTargetDataSources(dataSources, 
importerConfig);
+        pipelineDataSourceCheckEngine.checkTargetDataSources(dataSources, 
importerConfig);
     }
     
     @Test
@@ -101,6 +101,6 @@ class DataSourceCheckEngineTest {
         when(resultSet.next()).thenReturn(true);
         ImporterConfiguration importerConfig = 
mock(ImporterConfiguration.class);
         
when(importerConfig.getQualifiedTables()).thenReturn(Collections.singleton(new 
CaseInsensitiveQualifiedTable(null, "t_order")));
-        assertThrows(PrepareJobWithTargetTableNotEmptyException.class, () -> 
dataSourceCheckEngine.checkTargetDataSources(dataSources, importerConfig));
+        assertThrows(PrepareJobWithTargetTableNotEmptyException.class, () -> 
pipelineDataSourceCheckEngine.checkTargetDataSources(dataSources, 
importerConfig));
     }
 }
diff --git 
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/datasource/MySQLPipelineDatabaseVariableChecker.java
 
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/datasource/MySQLPipelineDatabaseVariableChecker.java
new file mode 100644
index 00000000000..13e4e9b28bb
--- /dev/null
+++ 
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/datasource/MySQLPipelineDatabaseVariableChecker.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.mysql.datasource;
+
+import 
org.apache.shardingsphere.data.pipeline.core.checker.DialectPipelineDatabaseVariableChecker;
+import 
org.apache.shardingsphere.infra.database.core.exception.CheckDatabaseEnvironmentFailedException;
+import 
org.apache.shardingsphere.infra.database.core.exception.UnexpectedVariableValueException;
+import 
org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
+
+import javax.sql.DataSource;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.stream.Collectors;
+
+/**
+ * Pipeline database variable checker for MySQL.
+ */
+public final class MySQLPipelineDatabaseVariableChecker implements 
DialectPipelineDatabaseVariableChecker {
+    
+    private static final Map<String, String> REQUIRED_VARIABLES = new 
HashMap<>(3, 1F);
+    
+    private static final String SHOW_VARIABLES_SQL;
+    
+    static {
+        REQUIRED_VARIABLES.put("LOG_BIN", "ON");
+        REQUIRED_VARIABLES.put("BINLOG_FORMAT", "ROW");
+        // It does not exist in all versions of MySQL
+        REQUIRED_VARIABLES.put("BINLOG_ROW_IMAGE", "FULL");
+        SHOW_VARIABLES_SQL = String.format("SHOW VARIABLES WHERE Variable_name 
IN (%s)", REQUIRED_VARIABLES.keySet().stream().map(each -> 
"?").collect(Collectors.joining(",")));
+    }
+    
+    @Override
+    public void check(final DataSource dataSource) {
+        try (
+                Connection connection = dataSource.getConnection();
+                PreparedStatement preparedStatement = 
connection.prepareStatement(SHOW_VARIABLES_SQL)) {
+            int parameterIndex = 1;
+            for (Entry<String, String> entry : REQUIRED_VARIABLES.entrySet()) {
+                preparedStatement.setString(parameterIndex++, entry.getKey());
+            }
+            try (ResultSet resultSet = preparedStatement.executeQuery()) {
+                while (resultSet.next()) {
+                    String variableName = resultSet.getString(1).toUpperCase();
+                    String expectedValue = 
REQUIRED_VARIABLES.get(variableName);
+                    String actualValue = resultSet.getString(2);
+                    
ShardingSpherePreconditions.checkState(expectedValue.equalsIgnoreCase(actualValue),
 () -> new UnexpectedVariableValueException(variableName, expectedValue, 
actualValue));
+                }
+            }
+        } catch (final SQLException ex) {
+            throw new CheckDatabaseEnvironmentFailedException(ex);
+        }
+    }
+    
+    @Override
+    public String getDatabaseType() {
+        return "MySQL";
+    }
+}
diff --git 
a/kernel/data-pipeline/dialect/mysql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.checker.DialectPipelineDatabaseVariableChecker
 
b/kernel/data-pipeline/dialect/mysql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.checker.DialectPipelineDatabaseVariableChecker
new file mode 100644
index 00000000000..ea7d0e24436
--- /dev/null
+++ 
b/kernel/data-pipeline/dialect/mysql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.checker.DialectPipelineDatabaseVariableChecker
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+org.apache.shardingsphere.data.pipeline.mysql.datasource.MySQLPipelineDatabaseVariableChecker
diff --git 
a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/datasource/MySQLPipelineDatabaseVariableCheckerTest.java
 
b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/datasource/MySQLPipelineDatabaseVariableCheckerTest.java
new file mode 100644
index 00000000000..f093d2cfc56
--- /dev/null
+++ 
b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/datasource/MySQLPipelineDatabaseVariableCheckerTest.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.mysql.datasource;
+
+import 
org.apache.shardingsphere.data.pipeline.core.checker.DialectPipelineDatabaseVariableChecker;
+import 
org.apache.shardingsphere.infra.database.core.exception.CheckDatabaseEnvironmentFailedException;
+import 
org.apache.shardingsphere.infra.database.core.exception.UnexpectedVariableValueException;
+import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
+import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Answers;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import javax.sql.DataSource;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+@ExtendWith(MockitoExtension.class)
+class MySQLPipelineDatabaseVariableCheckerTest {
+    
+    private DialectPipelineDatabaseVariableChecker variableChecker;
+    
+    @Mock
+    private PreparedStatement preparedStatement;
+    
+    @Mock
+    private ResultSet resultSet;
+    
+    @Mock(answer = Answers.RETURNS_DEEP_STUBS)
+    private DataSource dataSource;
+    
+    @BeforeEach
+    void setUp() throws SQLException {
+        variableChecker = 
TypedSPILoader.getService(DialectPipelineDatabaseVariableChecker.class, 
TypedSPILoader.getService(DatabaseType.class, "MySQL"));
+        
when(dataSource.getConnection().prepareStatement(anyString())).thenReturn(preparedStatement);
+    }
+    
+    @Test
+    void assertCheckSuccess() throws SQLException {
+        when(preparedStatement.executeQuery()).thenReturn(resultSet);
+        when(resultSet.next()).thenReturn(true, true, true, false);
+        when(resultSet.getString(1)).thenReturn("LOG_BIN", "BINLOG_FORMAT", 
"BINLOG_ROW_IMAGE");
+        when(resultSet.getString(2)).thenReturn("ON", "ROW", "FULL");
+        assertDoesNotThrow(() -> variableChecker.check(dataSource));
+        verify(preparedStatement, times(1)).executeQuery();
+    }
+    
+    @Test
+    void assertCheckVariableWithWrong() throws SQLException {
+        when(preparedStatement.executeQuery()).thenReturn(resultSet);
+        when(resultSet.next()).thenReturn(true, true, false);
+        when(resultSet.getString(1)).thenReturn("BINLOG_FORMAT", "LOG_BIN");
+        when(resultSet.getString(2)).thenReturn("ROW", "OFF");
+        assertThrows(UnexpectedVariableValueException.class, () -> 
variableChecker.check(dataSource));
+    }
+    
+    @Test
+    void assertCheckFailure() throws SQLException {
+        when(preparedStatement.executeQuery()).thenReturn(resultSet);
+        when(resultSet.next()).thenThrow(new SQLException(""));
+        assertThrows(CheckDatabaseEnvironmentFailedException.class, () -> 
variableChecker.check(dataSource));
+    }
+}
diff --git 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java
 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java
index 106d6daf76f..29f21a14173 100644
--- 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java
+++ 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java
@@ -21,7 +21,7 @@ import lombok.extern.slf4j.Slf4j;
 import 
org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.type.StandardPipelineDataSourceConfiguration;
 import org.apache.shardingsphere.data.pipeline.core.channel.PipelineChannel;
-import 
org.apache.shardingsphere.data.pipeline.core.checker.DataSourceCheckEngine;
+import 
org.apache.shardingsphere.data.pipeline.core.checker.PipelineDataSourceCheckEngine;
 import 
org.apache.shardingsphere.data.pipeline.core.context.PipelineContextManager;
 import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
 import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceWrapper;
@@ -97,7 +97,7 @@ public final class MigrationJobPreparer {
                 
jobItemContext.getTaskConfig().getDumperContext().getCommonContext().getDataSourceConfig().getClass()),
                 () -> new UnsupportedSQLOperationException("Migration 
inventory dumper only support StandardPipelineDataSourceConfiguration"));
         DatabaseType sourceDatabaseType = 
jobItemContext.getJobConfig().getSourceDatabaseType();
-        new 
DataSourceCheckEngine(sourceDatabaseType).checkSourceDataSources(Collections.singleton(jobItemContext.getSourceDataSource()));
+        new 
PipelineDataSourceCheckEngine(sourceDatabaseType).checkSourceDataSources(Collections.singleton(jobItemContext.getSourceDataSource()));
         if (jobItemContext.isStopping()) {
             throw new PipelineJobCancelingException();
         }
@@ -156,7 +156,7 @@ public final class MigrationJobPreparer {
         }
         if (null == jobItemContext.getInitProgress()) {
             PipelineDataSourceWrapper targetDataSource = 
jobItemContext.getDataSourceManager().getDataSource(jobItemContext.getTaskConfig().getImporterConfig().getDataSourceConfig());
-            new 
DataSourceCheckEngine(targetDatabaseType).checkTargetDataSources(Collections.singleton(targetDataSource),
 jobItemContext.getTaskConfig().getImporterConfig());
+            new 
PipelineDataSourceCheckEngine(targetDatabaseType).checkTargetDataSources(Collections.singleton(targetDataSource),
 jobItemContext.getTaskConfig().getImporterConfig());
         }
     }
     


Reply via email to