This is an automated email from the ASF dual-hosted git repository.
zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 0bb3d8ebbb1 Add DialectPipelineDatabaseVariableChecker (#32586)
0bb3d8ebbb1 is described below
commit 0bb3d8ebbb188bb7cec6b4fb27e8e8feba42a296
Author: Liang Zhang <[email protected]>
AuthorDate: Sun Aug 18 18:13:34 2024 +0800
Add DialectPipelineDatabaseVariableChecker (#32586)
* Refactor PipelineChannelAckCallback
* Add DialectPipelineDatabaseVariableChecker
* Add DialectPipelineDatabaseVariableChecker
* Add DialectPipelineDatabaseVariableChecker
---
.../checker/DialectDatabaseEnvironmentChecker.java | 7 --
.../h2/checker/H2DatabaseEnvironmentChecker.java | 4 -
.../checker/MySQLDatabaseEnvironmentChecker.java | 37 ---------
.../MySQLDatabaseEnvironmentCheckerTest.java | 28 -------
.../OpenGaussDatabaseEnvironmentChecker.java | 4 -
.../PostgreSQLDatabaseEnvironmentChecker.java | 4 -
.../core/channel/PipelineChannelAckCallback.java | 1 +
.../DialectPipelineDatabaseVariableChecker.java | 16 +---
...ine.java => PipelineDataSourceCheckEngine.java} | 18 +++--
...java => PipelineDataSourceCheckEngineTest.java} | 16 ++--
.../MySQLPipelineDatabaseVariableChecker.java | 78 +++++++++++++++++++
....checker.DialectPipelineDatabaseVariableChecker | 18 +++++
.../MySQLPipelineDatabaseVariableCheckerTest.java | 89 ++++++++++++++++++++++
.../migration/preparer/MigrationJobPreparer.java | 6 +-
14 files changed, 212 insertions(+), 114 deletions(-)
diff --git
a/infra/database/core/src/main/java/org/apache/shardingsphere/infra/database/core/checker/DialectDatabaseEnvironmentChecker.java
b/infra/database/core/src/main/java/org/apache/shardingsphere/infra/database/core/checker/DialectDatabaseEnvironmentChecker.java
index c928b4d796c..84cc9724e19 100644
---
a/infra/database/core/src/main/java/org/apache/shardingsphere/infra/database/core/checker/DialectDatabaseEnvironmentChecker.java
+++
b/infra/database/core/src/main/java/org/apache/shardingsphere/infra/database/core/checker/DialectDatabaseEnvironmentChecker.java
@@ -35,11 +35,4 @@ public interface DialectDatabaseEnvironmentChecker extends
DatabaseTypedSPI {
* @param privilegeCheckType privilege check type
*/
void checkPrivilege(DataSource dataSource, PrivilegeCheckType
privilegeCheckType);
-
- /**
- * Check variables.
- *
- * @param dataSource data source to be checked
- */
- void checkVariable(DataSource dataSource);
}
diff --git
a/infra/database/type/h2/src/main/java/org/apache/shardingsphere/infra/database/h2/checker/H2DatabaseEnvironmentChecker.java
b/infra/database/type/h2/src/main/java/org/apache/shardingsphere/infra/database/h2/checker/H2DatabaseEnvironmentChecker.java
index 2b05baa77d7..7094b726745 100644
---
a/infra/database/type/h2/src/main/java/org/apache/shardingsphere/infra/database/h2/checker/H2DatabaseEnvironmentChecker.java
+++
b/infra/database/type/h2/src/main/java/org/apache/shardingsphere/infra/database/h2/checker/H2DatabaseEnvironmentChecker.java
@@ -31,10 +31,6 @@ public final class H2DatabaseEnvironmentChecker implements
DialectDatabaseEnviro
public void checkPrivilege(final DataSource dataSource, final
PrivilegeCheckType privilegeCheckType) {
}
- @Override
- public void checkVariable(final DataSource dataSource) {
- }
-
@Override
public String getDatabaseType() {
return "H2";
diff --git
a/infra/database/type/mysql/src/main/java/org/apache/shardingsphere/infra/database/mysql/checker/MySQLDatabaseEnvironmentChecker.java
b/infra/database/type/mysql/src/main/java/org/apache/shardingsphere/infra/database/mysql/checker/MySQLDatabaseEnvironmentChecker.java
index b35a39d401d..358071b6a3d 100644
---
a/infra/database/type/mysql/src/main/java/org/apache/shardingsphere/infra/database/mysql/checker/MySQLDatabaseEnvironmentChecker.java
+++
b/infra/database/type/mysql/src/main/java/org/apache/shardingsphere/infra/database/mysql/checker/MySQLDatabaseEnvironmentChecker.java
@@ -21,8 +21,6 @@ import
org.apache.shardingsphere.infra.database.core.checker.DialectDatabaseEnvi
import
org.apache.shardingsphere.infra.database.core.checker.PrivilegeCheckType;
import
org.apache.shardingsphere.infra.database.core.exception.CheckDatabaseEnvironmentFailedException;
import
org.apache.shardingsphere.infra.database.core.exception.MissingRequiredPrivilegeException;
-import
org.apache.shardingsphere.infra.database.core.exception.UnexpectedVariableValueException;
-import
org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
import javax.sql.DataSource;
import java.sql.Connection;
@@ -33,10 +31,7 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumMap;
-import java.util.HashMap;
import java.util.Map;
-import java.util.Map.Entry;
-import java.util.stream.Collectors;
/**
* Database environment checker for MySQL.
@@ -55,19 +50,10 @@ public final class MySQLDatabaseEnvironmentChecker
implements DialectDatabaseEnv
private static final Map<PrivilegeCheckType, Collection<String>>
REQUIRED_PRIVILEGES_FOR_MESSAGE = new EnumMap<>(PrivilegeCheckType.class);
- private static final Map<String, String> REQUIRED_VARIABLES = new
HashMap<>(3, 1F);
-
- private static final String SHOW_VARIABLES_SQL;
-
static {
REQUIRED_PRIVILEGES_FOR_MESSAGE.put(PrivilegeCheckType.PIPELINE,
Arrays.asList("REPLICATION SLAVE", "REPLICATION CLIENT"));
REQUIRED_PRIVILEGES_FOR_MESSAGE.put(PrivilegeCheckType.SELECT,
Collections.singleton("SELECT ON DATABASE"));
REQUIRED_PRIVILEGES_FOR_MESSAGE.put(PrivilegeCheckType.XA,
Collections.singleton("XA_RECOVER_ADMIN"));
- REQUIRED_VARIABLES.put("LOG_BIN", "ON");
- REQUIRED_VARIABLES.put("BINLOG_FORMAT", "ROW");
- // It does not exist in all versions of MySQL
- REQUIRED_VARIABLES.put("BINLOG_ROW_IMAGE", "FULL");
- SHOW_VARIABLES_SQL = String.format("SHOW VARIABLES WHERE Variable_name
IN (%s)", REQUIRED_VARIABLES.keySet().stream().map(each ->
"?").collect(Collectors.joining(",")));
}
@Override
@@ -119,29 +105,6 @@ public final class MySQLDatabaseEnvironmentChecker
implements DialectDatabaseEnv
return Arrays.stream(requiredPrivileges).anyMatch(each ->
Arrays.stream(each).allMatch(grantedPrivileges::contains));
}
- @Override
- public void checkVariable(final DataSource dataSource) {
- try (
- Connection connection = dataSource.getConnection();
- PreparedStatement preparedStatement =
connection.prepareStatement(SHOW_VARIABLES_SQL)) {
- int parameterIndex = 1;
- for (Entry<String, String> entry : REQUIRED_VARIABLES.entrySet()) {
- preparedStatement.setString(parameterIndex++, entry.getKey());
- }
- try (ResultSet resultSet = preparedStatement.executeQuery()) {
- while (resultSet.next()) {
- String variableName = resultSet.getString(1).toUpperCase();
- String expectedValue =
REQUIRED_VARIABLES.get(variableName);
- String actualValue = resultSet.getString(2);
-
ShardingSpherePreconditions.checkState(expectedValue.equalsIgnoreCase(actualValue),
- () -> new
UnexpectedVariableValueException(variableName, expectedValue, actualValue));
- }
- }
- } catch (final SQLException ex) {
- throw new CheckDatabaseEnvironmentFailedException(ex);
- }
- }
-
@Override
public String getDatabaseType() {
return "MySQL";
diff --git
a/infra/database/type/mysql/src/test/java/org/apache/shardingsphere/infra/database/mysql/checker/MySQLDatabaseEnvironmentCheckerTest.java
b/infra/database/type/mysql/src/test/java/org/apache/shardingsphere/infra/database/mysql/checker/MySQLDatabaseEnvironmentCheckerTest.java
index d07ce71701a..2d96978e2e5 100644
---
a/infra/database/type/mysql/src/test/java/org/apache/shardingsphere/infra/database/mysql/checker/MySQLDatabaseEnvironmentCheckerTest.java
+++
b/infra/database/type/mysql/src/test/java/org/apache/shardingsphere/infra/database/mysql/checker/MySQLDatabaseEnvironmentCheckerTest.java
@@ -20,7 +20,6 @@ package
org.apache.shardingsphere.infra.database.mysql.checker;
import
org.apache.shardingsphere.infra.database.core.checker.PrivilegeCheckType;
import
org.apache.shardingsphere.infra.database.core.exception.CheckDatabaseEnvironmentFailedException;
import
org.apache.shardingsphere.infra.database.core.exception.MissingRequiredPrivilegeException;
-import
org.apache.shardingsphere.infra.database.core.exception.UnexpectedVariableValueException;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
@@ -33,7 +32,6 @@ import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
-import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.times;
@@ -88,32 +86,6 @@ class MySQLDatabaseEnvironmentCheckerTest {
assertThrows(CheckDatabaseEnvironmentFailedException.class, () -> new
MySQLDatabaseEnvironmentChecker().checkPrivilege(dataSource,
PrivilegeCheckType.PIPELINE));
}
- @Test
- void assertCheckVariableSuccess() throws SQLException {
- when(preparedStatement.executeQuery()).thenReturn(resultSet);
- when(resultSet.next()).thenReturn(true, true, true, false);
- when(resultSet.getString(1)).thenReturn("LOG_BIN", "BINLOG_FORMAT",
"BINLOG_ROW_IMAGE");
- when(resultSet.getString(2)).thenReturn("ON", "ROW", "FULL");
- assertDoesNotThrow(() -> new
MySQLDatabaseEnvironmentChecker().checkVariable(dataSource));
- verify(preparedStatement, times(1)).executeQuery();
- }
-
- @Test
- void assertCheckVariableWithWrongVariable() throws SQLException {
- when(preparedStatement.executeQuery()).thenReturn(resultSet);
- when(resultSet.next()).thenReturn(true, true, false);
- when(resultSet.getString(1)).thenReturn("BINLOG_FORMAT", "LOG_BIN");
- when(resultSet.getString(2)).thenReturn("ROW", "OFF");
- assertThrows(UnexpectedVariableValueException.class, () -> new
MySQLDatabaseEnvironmentChecker().checkVariable(dataSource));
- }
-
- @Test
- void assertCheckVariableFailure() throws SQLException {
- when(preparedStatement.executeQuery()).thenReturn(resultSet);
- when(resultSet.next()).thenThrow(new SQLException(""));
- assertThrows(CheckDatabaseEnvironmentFailedException.class, () -> new
MySQLDatabaseEnvironmentChecker().checkVariable(dataSource));
- }
-
@Test
void assertCheckXAPrivilegeWithParticularSuccessInMySQL8() throws
SQLException {
when(preparedStatement.executeQuery()).thenReturn(resultSet);
diff --git
a/infra/database/type/opengauss/src/main/java/org/apache/shardingsphere/infra/database/opengauss/checker/OpenGaussDatabaseEnvironmentChecker.java
b/infra/database/type/opengauss/src/main/java/org/apache/shardingsphere/infra/database/opengauss/checker/OpenGaussDatabaseEnvironmentChecker.java
index c50cb1a6ee4..a83505e20ba 100644
---
a/infra/database/type/opengauss/src/main/java/org/apache/shardingsphere/infra/database/opengauss/checker/OpenGaussDatabaseEnvironmentChecker.java
+++
b/infra/database/type/opengauss/src/main/java/org/apache/shardingsphere/infra/database/opengauss/checker/OpenGaussDatabaseEnvironmentChecker.java
@@ -65,10 +65,6 @@ public final class OpenGaussDatabaseEnvironmentChecker
implements DialectDatabas
}
}
- @Override
- public void checkVariable(final DataSource dataSource) {
- }
-
@Override
public String getDatabaseType() {
return "openGauss";
diff --git
a/infra/database/type/postgresql/src/main/java/org/apache/shardingsphere/infra/database/postgresql/checker/PostgreSQLDatabaseEnvironmentChecker.java
b/infra/database/type/postgresql/src/main/java/org/apache/shardingsphere/infra/database/postgresql/checker/PostgreSQLDatabaseEnvironmentChecker.java
index 324fbd7e0bf..9f72b08ed61 100644
---
a/infra/database/type/postgresql/src/main/java/org/apache/shardingsphere/infra/database/postgresql/checker/PostgreSQLDatabaseEnvironmentChecker.java
+++
b/infra/database/type/postgresql/src/main/java/org/apache/shardingsphere/infra/database/postgresql/checker/PostgreSQLDatabaseEnvironmentChecker.java
@@ -65,10 +65,6 @@ public final class PostgreSQLDatabaseEnvironmentChecker
implements DialectDataba
}
}
- @Override
- public void checkVariable(final DataSource dataSource) {
- }
-
@Override
public String getDatabaseType() {
return "PostgreSQL";
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/channel/PipelineChannelAckCallback.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/channel/PipelineChannelAckCallback.java
index 88f7b9d7bb0..768561ece94 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/channel/PipelineChannelAckCallback.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/channel/PipelineChannelAckCallback.java
@@ -24,6 +24,7 @@ import java.util.List;
/**
* Pipeline channel acknowledged callback.
*/
+@FunctionalInterface
public interface PipelineChannelAckCallback {
/**
diff --git
a/infra/database/core/src/main/java/org/apache/shardingsphere/infra/database/core/checker/DialectDatabaseEnvironmentChecker.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/checker/DialectPipelineDatabaseVariableChecker.java
similarity index 69%
copy from
infra/database/core/src/main/java/org/apache/shardingsphere/infra/database/core/checker/DialectDatabaseEnvironmentChecker.java
copy to
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/checker/DialectPipelineDatabaseVariableChecker.java
index c928b4d796c..6b42321f952 100644
---
a/infra/database/core/src/main/java/org/apache/shardingsphere/infra/database/core/checker/DialectDatabaseEnvironmentChecker.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/checker/DialectPipelineDatabaseVariableChecker.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.infra.database.core.checker;
+package org.apache.shardingsphere.data.pipeline.core.checker;
import org.apache.shardingsphere.infra.database.core.spi.DatabaseTypedSPI;
import org.apache.shardingsphere.infra.spi.annotation.SingletonSPI;
@@ -23,23 +23,15 @@ import
org.apache.shardingsphere.infra.spi.annotation.SingletonSPI;
import javax.sql.DataSource;
/**
- * Dialect database environment checker.
+ * Dialect pipeline database variable checker.
*/
@SingletonSPI
-public interface DialectDatabaseEnvironmentChecker extends DatabaseTypedSPI {
-
- /**
- * Check user privileges.
- *
- * @param dataSource data source to be checked
- * @param privilegeCheckType privilege check type
- */
- void checkPrivilege(DataSource dataSource, PrivilegeCheckType
privilegeCheckType);
+public interface DialectPipelineDatabaseVariableChecker extends
DatabaseTypedSPI {
/**
* Check variables.
*
* @param dataSource data source to be checked
*/
- void checkVariable(DataSource dataSource);
+ void check(DataSource dataSource);
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/checker/DataSourceCheckEngine.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/checker/PipelineDataSourceCheckEngine.java
similarity index 88%
rename from
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/checker/DataSourceCheckEngine.java
rename to
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/checker/PipelineDataSourceCheckEngine.java
index 89062655669..08cd90cb343 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/checker/DataSourceCheckEngine.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/checker/PipelineDataSourceCheckEngine.java
@@ -36,16 +36,19 @@ import java.sql.SQLException;
import java.util.Collection;
/**
- * Data source check engine.
+ * Pipeline data source check engine.
*/
-public final class DataSourceCheckEngine {
+public final class PipelineDataSourceCheckEngine {
private final DialectDatabaseEnvironmentChecker checker;
+ private final DialectPipelineDatabaseVariableChecker variableChecker;
+
private final PipelinePrepareSQLBuilder sqlBuilder;
- public DataSourceCheckEngine(final DatabaseType databaseType) {
+ public PipelineDataSourceCheckEngine(final DatabaseType databaseType) {
checker =
DatabaseTypedSPILoader.findService(DialectDatabaseEnvironmentChecker.class,
databaseType).orElse(null);
+ variableChecker =
DatabaseTypedSPILoader.findService(DialectPipelineDatabaseVariableChecker.class,
databaseType).orElse(null);
sqlBuilder = new PipelinePrepareSQLBuilder(databaseType);
}
@@ -72,11 +75,12 @@ public final class DataSourceCheckEngine {
*/
public void checkSourceDataSources(final Collection<DataSource>
dataSources) {
checkConnection(dataSources);
- if (null == checker) {
- return;
+ if (null != checker) {
+ dataSources.forEach(each -> checker.checkPrivilege(each,
PrivilegeCheckType.PIPELINE));
+ }
+ if (null != variableChecker) {
+ dataSources.forEach(variableChecker::check);
}
- dataSources.forEach(each -> checker.checkPrivilege(each,
PrivilegeCheckType.PIPELINE));
- dataSources.forEach(checker::checkVariable);
}
/**
diff --git
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/DataSourceCheckEngineTest.java
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/PipelineDataSourceCheckEngineTest.java
similarity index 85%
rename from
kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/DataSourceCheckEngineTest.java
rename to
kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/PipelineDataSourceCheckEngineTest.java
index ed7b92aeeab..024fa7ae7b7 100644
---
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/DataSourceCheckEngineTest.java
+++
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/PipelineDataSourceCheckEngineTest.java
@@ -17,7 +17,7 @@
package org.apache.shardingsphere.data.pipeline.core.preparer.datasource;
-import
org.apache.shardingsphere.data.pipeline.core.checker.DataSourceCheckEngine;
+import
org.apache.shardingsphere.data.pipeline.core.checker.PipelineDataSourceCheckEngine;
import
org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithTargetTableNotEmptyException;
import
org.apache.shardingsphere.data.pipeline.core.importer.ImporterConfiguration;
import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
@@ -45,12 +45,12 @@ import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@ExtendWith(MockitoExtension.class)
-class DataSourceCheckEngineTest {
+class PipelineDataSourceCheckEngineTest {
@Mock(extraInterfaces = AutoCloseable.class)
private DataSource dataSource;
- private DataSourceCheckEngine dataSourceCheckEngine;
+ private PipelineDataSourceCheckEngine pipelineDataSourceCheckEngine;
private Collection<DataSource> dataSources;
@@ -65,7 +65,7 @@ class DataSourceCheckEngineTest {
@BeforeEach
void setUp() {
- dataSourceCheckEngine = new
DataSourceCheckEngine(TypedSPILoader.getService(DatabaseType.class, "FIXTURE"));
+ pipelineDataSourceCheckEngine = new
PipelineDataSourceCheckEngine(TypedSPILoader.getService(DatabaseType.class,
"FIXTURE"));
dataSources = new LinkedList<>();
dataSources.add(dataSource);
}
@@ -73,14 +73,14 @@ class DataSourceCheckEngineTest {
@Test
void assertCheckConnection() throws SQLException {
when(dataSource.getConnection()).thenReturn(connection);
- dataSourceCheckEngine.checkConnection(dataSources);
+ pipelineDataSourceCheckEngine.checkConnection(dataSources);
verify(dataSource).getConnection();
}
@Test
void assertCheckConnectionFailed() throws SQLException {
when(dataSource.getConnection()).thenThrow(new SQLException("error"));
- assertThrows(SQLWrapperException.class, () ->
dataSourceCheckEngine.checkConnection(dataSources));
+ assertThrows(SQLWrapperException.class, () ->
pipelineDataSourceCheckEngine.checkConnection(dataSources));
}
@Test
@@ -90,7 +90,7 @@ class DataSourceCheckEngineTest {
when(preparedStatement.executeQuery()).thenReturn(resultSet);
ImporterConfiguration importerConfig =
mock(ImporterConfiguration.class);
when(importerConfig.getQualifiedTables()).thenReturn(Collections.singleton(new
CaseInsensitiveQualifiedTable(null, "t_order")));
- dataSourceCheckEngine.checkTargetDataSources(dataSources,
importerConfig);
+ pipelineDataSourceCheckEngine.checkTargetDataSources(dataSources,
importerConfig);
}
@Test
@@ -101,6 +101,6 @@ class DataSourceCheckEngineTest {
when(resultSet.next()).thenReturn(true);
ImporterConfiguration importerConfig =
mock(ImporterConfiguration.class);
when(importerConfig.getQualifiedTables()).thenReturn(Collections.singleton(new
CaseInsensitiveQualifiedTable(null, "t_order")));
- assertThrows(PrepareJobWithTargetTableNotEmptyException.class, () ->
dataSourceCheckEngine.checkTargetDataSources(dataSources, importerConfig));
+ assertThrows(PrepareJobWithTargetTableNotEmptyException.class, () ->
pipelineDataSourceCheckEngine.checkTargetDataSources(dataSources,
importerConfig));
}
}
diff --git
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/datasource/MySQLPipelineDatabaseVariableChecker.java
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/datasource/MySQLPipelineDatabaseVariableChecker.java
new file mode 100644
index 00000000000..13e4e9b28bb
--- /dev/null
+++
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/datasource/MySQLPipelineDatabaseVariableChecker.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.mysql.datasource;
+
+import
org.apache.shardingsphere.data.pipeline.core.checker.DialectPipelineDatabaseVariableChecker;
+import
org.apache.shardingsphere.infra.database.core.exception.CheckDatabaseEnvironmentFailedException;
+import
org.apache.shardingsphere.infra.database.core.exception.UnexpectedVariableValueException;
+import
org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
+
+import javax.sql.DataSource;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.stream.Collectors;
+
+/**
+ * Pipeline database variable checker for MySQL.
+ */
+public final class MySQLPipelineDatabaseVariableChecker implements
DialectPipelineDatabaseVariableChecker {
+
+ private static final Map<String, String> REQUIRED_VARIABLES = new
HashMap<>(3, 1F);
+
+ private static final String SHOW_VARIABLES_SQL;
+
+ static {
+ REQUIRED_VARIABLES.put("LOG_BIN", "ON");
+ REQUIRED_VARIABLES.put("BINLOG_FORMAT", "ROW");
+ // It does not exist in all versions of MySQL
+ REQUIRED_VARIABLES.put("BINLOG_ROW_IMAGE", "FULL");
+ SHOW_VARIABLES_SQL = String.format("SHOW VARIABLES WHERE Variable_name
IN (%s)", REQUIRED_VARIABLES.keySet().stream().map(each ->
"?").collect(Collectors.joining(",")));
+ }
+
+ @Override
+ public void check(final DataSource dataSource) {
+ try (
+ Connection connection = dataSource.getConnection();
+ PreparedStatement preparedStatement =
connection.prepareStatement(SHOW_VARIABLES_SQL)) {
+ int parameterIndex = 1;
+ for (Entry<String, String> entry : REQUIRED_VARIABLES.entrySet()) {
+ preparedStatement.setString(parameterIndex++, entry.getKey());
+ }
+ try (ResultSet resultSet = preparedStatement.executeQuery()) {
+ while (resultSet.next()) {
+ String variableName = resultSet.getString(1).toUpperCase();
+ String expectedValue =
REQUIRED_VARIABLES.get(variableName);
+ String actualValue = resultSet.getString(2);
+
ShardingSpherePreconditions.checkState(expectedValue.equalsIgnoreCase(actualValue),
() -> new UnexpectedVariableValueException(variableName, expectedValue,
actualValue));
+ }
+ }
+ } catch (final SQLException ex) {
+ throw new CheckDatabaseEnvironmentFailedException(ex);
+ }
+ }
+
+ @Override
+ public String getDatabaseType() {
+ return "MySQL";
+ }
+}
diff --git
a/kernel/data-pipeline/dialect/mysql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.checker.DialectPipelineDatabaseVariableChecker
b/kernel/data-pipeline/dialect/mysql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.checker.DialectPipelineDatabaseVariableChecker
new file mode 100644
index 00000000000..ea7d0e24436
--- /dev/null
+++
b/kernel/data-pipeline/dialect/mysql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.checker.DialectPipelineDatabaseVariableChecker
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+org.apache.shardingsphere.data.pipeline.mysql.datasource.MySQLPipelineDatabaseVariableChecker
diff --git
a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/datasource/MySQLPipelineDatabaseVariableCheckerTest.java
b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/datasource/MySQLPipelineDatabaseVariableCheckerTest.java
new file mode 100644
index 00000000000..f093d2cfc56
--- /dev/null
+++
b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/datasource/MySQLPipelineDatabaseVariableCheckerTest.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.mysql.datasource;
+
+import
org.apache.shardingsphere.data.pipeline.core.checker.DialectPipelineDatabaseVariableChecker;
+import
org.apache.shardingsphere.infra.database.core.exception.CheckDatabaseEnvironmentFailedException;
+import
org.apache.shardingsphere.infra.database.core.exception.UnexpectedVariableValueException;
+import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
+import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Answers;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import javax.sql.DataSource;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+@ExtendWith(MockitoExtension.class)
+class MySQLPipelineDatabaseVariableCheckerTest {
+
+ private DialectPipelineDatabaseVariableChecker variableChecker;
+
+ @Mock
+ private PreparedStatement preparedStatement;
+
+ @Mock
+ private ResultSet resultSet;
+
+ @Mock(answer = Answers.RETURNS_DEEP_STUBS)
+ private DataSource dataSource;
+
+ @BeforeEach
+ void setUp() throws SQLException {
+ variableChecker =
TypedSPILoader.getService(DialectPipelineDatabaseVariableChecker.class,
TypedSPILoader.getService(DatabaseType.class, "MySQL"));
+
when(dataSource.getConnection().prepareStatement(anyString())).thenReturn(preparedStatement);
+ }
+
+ @Test
+ void assertCheckSuccess() throws SQLException {
+ when(preparedStatement.executeQuery()).thenReturn(resultSet);
+ when(resultSet.next()).thenReturn(true, true, true, false);
+ when(resultSet.getString(1)).thenReturn("LOG_BIN", "BINLOG_FORMAT",
"BINLOG_ROW_IMAGE");
+ when(resultSet.getString(2)).thenReturn("ON", "ROW", "FULL");
+ assertDoesNotThrow(() -> variableChecker.check(dataSource));
+ verify(preparedStatement, times(1)).executeQuery();
+ }
+
+ @Test
+ void assertCheckVariableWithWrong() throws SQLException {
+ when(preparedStatement.executeQuery()).thenReturn(resultSet);
+ when(resultSet.next()).thenReturn(true, true, false);
+ when(resultSet.getString(1)).thenReturn("BINLOG_FORMAT", "LOG_BIN");
+ when(resultSet.getString(2)).thenReturn("ROW", "OFF");
+ assertThrows(UnexpectedVariableValueException.class, () ->
variableChecker.check(dataSource));
+ }
+
+ @Test
+ void assertCheckFailure() throws SQLException {
+ when(preparedStatement.executeQuery()).thenReturn(resultSet);
+ when(resultSet.next()).thenThrow(new SQLException(""));
+ assertThrows(CheckDatabaseEnvironmentFailedException.class, () ->
variableChecker.check(dataSource));
+ }
+}
diff --git
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java
index 106d6daf76f..29f21a14173 100644
---
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java
+++
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java
@@ -21,7 +21,7 @@ import lombok.extern.slf4j.Slf4j;
import
org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.type.StandardPipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.core.channel.PipelineChannel;
-import
org.apache.shardingsphere.data.pipeline.core.checker.DataSourceCheckEngine;
+import
org.apache.shardingsphere.data.pipeline.core.checker.PipelineDataSourceCheckEngine;
import
org.apache.shardingsphere.data.pipeline.core.context.PipelineContextManager;
import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceWrapper;
@@ -97,7 +97,7 @@ public final class MigrationJobPreparer {
jobItemContext.getTaskConfig().getDumperContext().getCommonContext().getDataSourceConfig().getClass()),
() -> new UnsupportedSQLOperationException("Migration
inventory dumper only support StandardPipelineDataSourceConfiguration"));
DatabaseType sourceDatabaseType =
jobItemContext.getJobConfig().getSourceDatabaseType();
- new
DataSourceCheckEngine(sourceDatabaseType).checkSourceDataSources(Collections.singleton(jobItemContext.getSourceDataSource()));
+ new
PipelineDataSourceCheckEngine(sourceDatabaseType).checkSourceDataSources(Collections.singleton(jobItemContext.getSourceDataSource()));
if (jobItemContext.isStopping()) {
throw new PipelineJobCancelingException();
}
@@ -156,7 +156,7 @@ public final class MigrationJobPreparer {
}
if (null == jobItemContext.getInitProgress()) {
PipelineDataSourceWrapper targetDataSource =
jobItemContext.getDataSourceManager().getDataSource(jobItemContext.getTaskConfig().getImporterConfig().getDataSourceConfig());
- new
DataSourceCheckEngine(targetDatabaseType).checkTargetDataSources(Collections.singleton(targetDataSource),
jobItemContext.getTaskConfig().getImporterConfig());
+ new
PipelineDataSourceCheckEngine(targetDatabaseType).checkTargetDataSources(Collections.singleton(targetDataSource),
jobItemContext.getTaskConfig().getImporterConfig());
}
}