This is an automated email from the ASF dual-hosted git repository.
sandynz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 212f98f2bd0 Pipeline: Support MySQL 8.4 binlog position initialization
(#38679)
212f98f2bd0 is described below
commit 212f98f2bd06d86c63a919f8585bbcf173018892
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Sat May 9 13:41:48 2026 +0800
Pipeline: Support MySQL 8.4 binlog position initialization (#38679)
* pipeline: support MySQL 8.4 binlog status
* pipeline: keep MariaDB binlog status compatible
---
.../position/MySQLIncrementalPositionManager.java | 30 ++++++++++++++-
.../MySQLIncrementalPositionManagerTest.java | 45 +++++++++++++++++++---
2 files changed, 68 insertions(+), 7 deletions(-)
diff --git
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/position/MySQLIncrementalPositionManager.java
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/position/MySQLIncrementalPositionManager.java
index d6f35c375e0..6482ea30f66 100644
---
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/position/MySQLIncrementalPositionManager.java
+++
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/position/MySQLIncrementalPositionManager.java
@@ -22,6 +22,7 @@ import
org.apache.shardingsphere.data.pipeline.core.ingest.position.DialectIncre
import javax.sql.DataSource;
import java.sql.Connection;
+import java.sql.DatabaseMetaData;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
@@ -31,6 +32,12 @@ import java.sql.SQLException;
*/
public final class MySQLIncrementalPositionManager implements
DialectIncrementalPositionManager {
+ private static final String MYSQL_DATABASE_PRODUCT_NAME = "MySQL";
+
+ private static final String SHOW_MASTER_STATUS_SQL = "SHOW MASTER STATUS";
+
+ private static final String SHOW_BINARY_LOG_STATUS_SQL = "SHOW BINARY LOG
STATUS";
+
@Override
public MySQLBinlogPosition init(final String data) {
String[] array = data.split("#");
@@ -40,15 +47,34 @@ public final class MySQLIncrementalPositionManager
implements DialectIncremental
@Override
public MySQLBinlogPosition init(final DataSource dataSource, final String
slotNameSuffix) throws SQLException {
+ try (Connection connection = dataSource.getConnection()) {
+ return getBinlogPosition(connection);
+ }
+ }
+
+ private MySQLBinlogPosition getBinlogPosition(final Connection connection)
throws SQLException {
+ String sql = getShowBinlogStatusSQL(connection);
try (
- Connection connection = dataSource.getConnection();
- PreparedStatement preparedStatement =
connection.prepareStatement("SHOW MASTER STATUS");
+ PreparedStatement preparedStatement =
connection.prepareStatement(sql);
ResultSet resultSet = preparedStatement.executeQuery()) {
resultSet.next();
return new MySQLBinlogPosition(resultSet.getString(1),
resultSet.getLong(2));
}
}
+ private String getShowBinlogStatusSQL(final Connection connection) throws
SQLException {
+ DatabaseMetaData databaseMetaData = connection.getMetaData();
+ return isShowBinaryLogStatusSupported(databaseMetaData) ?
SHOW_BINARY_LOG_STATUS_SQL : SHOW_MASTER_STATUS_SQL;
+ }
+
+ private boolean isShowBinaryLogStatusSupported(final DatabaseMetaData
databaseMetaData) throws SQLException {
+ if
(!MYSQL_DATABASE_PRODUCT_NAME.equals(databaseMetaData.getDatabaseProductName()))
{
+ return false;
+ }
+ int majorVersion = databaseMetaData.getDatabaseMajorVersion();
+ return majorVersion > 8 || 8 == majorVersion &&
databaseMetaData.getDatabaseMinorVersion() >= 4;
+ }
+
@Override
public String getDatabaseType() {
return "MySQL";
diff --git
a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/position/MySQLIncrementalPositionManagerTest.java
b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/position/MySQLIncrementalPositionManagerTest.java
index d80438c26a7..f1fb71b18ca 100644
---
a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/position/MySQLIncrementalPositionManagerTest.java
+++
b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/position/MySQLIncrementalPositionManagerTest.java
@@ -26,6 +26,7 @@ import org.junit.jupiter.api.Test;
import javax.sql.DataSource;
import java.sql.Connection;
+import java.sql.DatabaseMetaData;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
@@ -34,10 +35,19 @@ import static org.hamcrest.Matchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
class MySQLIncrementalPositionManagerTest {
+ private static final String MYSQL_DATABASE_PRODUCT_NAME = "MySQL";
+
+ private static final String MARIADB_DATABASE_PRODUCT_NAME = "MariaDB";
+
+ private static final String SHOW_MASTER_STATUS_SQL = "SHOW MASTER STATUS";
+
+ private static final String SHOW_BINARY_LOG_STATUS_SQL = "SHOW BINARY LOG
STATUS";
+
private static final String LOG_FILE_NAME = "binlog-000001";
private static final long LOG_POSITION = 4L;
@@ -59,16 +69,41 @@ class MySQLIncrementalPositionManagerTest {
}
@Test
- void assertInitWithDataSource() throws SQLException {
- MySQLBinlogPosition actual = (MySQLBinlogPosition)
incrementalPositionManager.init(createDataSource(), "");
+ void assertInitWithDataSourceByShowMasterStatus() throws SQLException {
+ assertInitWithDataSource(MYSQL_DATABASE_PRODUCT_NAME, 8, 3,
SHOW_MASTER_STATUS_SQL);
+ }
+
+ @Test
+ void assertInitWithDataSourceByShowBinaryLogStatus() throws SQLException {
+ assertInitWithDataSource(MYSQL_DATABASE_PRODUCT_NAME, 8, 4,
SHOW_BINARY_LOG_STATUS_SQL);
+ }
+
+ @Test
+ void assertInitWithDataSourceByShowBinaryLogStatusForHigherMajorVersion()
throws SQLException {
+ assertInitWithDataSource(MYSQL_DATABASE_PRODUCT_NAME, 9, 0,
SHOW_BINARY_LOG_STATUS_SQL);
+ }
+
+ @Test
+ void assertInitWithDataSourceByShowMasterStatusForMariaDB() throws
SQLException {
+ assertInitWithDataSource(MARIADB_DATABASE_PRODUCT_NAME, 11, 4,
SHOW_MASTER_STATUS_SQL);
+ }
+
+ private void assertInitWithDataSource(final String productName, final int
majorVersion, final int minorVersion, final String expectedStatusSQL) throws
SQLException {
+ Connection connection = mock(Connection.class);
+ MySQLBinlogPosition actual = (MySQLBinlogPosition)
incrementalPositionManager.init(createDataSource(connection, productName,
majorVersion, minorVersion, expectedStatusSQL), "");
assertThat(actual.getFilename(), is(LOG_FILE_NAME));
assertThat(actual.getPosition(), is(LOG_POSITION));
+ verify(connection).prepareStatement(expectedStatusSQL);
}
- DataSource createDataSource() throws SQLException {
- Connection connection = mock(Connection.class);
+ private DataSource createDataSource(final Connection connection, final
String productName, final int majorVersion, final int minorVersion, final
String expectedStatusSQL) throws SQLException {
+ DatabaseMetaData databaseMetaData = mock(DatabaseMetaData.class);
+
when(databaseMetaData.getDatabaseProductName()).thenReturn(productName);
+
when(databaseMetaData.getDatabaseMajorVersion()).thenReturn(majorVersion);
+
when(databaseMetaData.getDatabaseMinorVersion()).thenReturn(minorVersion);
+ when(connection.getMetaData()).thenReturn(databaseMetaData);
PreparedStatement positionStatement = mockPositionStatement();
- when(connection.prepareStatement("SHOW MASTER
STATUS")).thenReturn(positionStatement);
+
when(connection.prepareStatement(expectedStatusSQL)).thenReturn(positionStatement);
return new MockedDataSource(connection);
}