This is an automated email from the ASF dual-hosted git repository.

sandynz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 212f98f2bd0 Pipeline: Support MySQL 8.4 binlog position initialization 
(#38679)
212f98f2bd0 is described below

commit 212f98f2bd06d86c63a919f8585bbcf173018892
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Sat May 9 13:41:48 2026 +0800

    Pipeline: Support MySQL 8.4 binlog position initialization (#38679)
    
    * pipeline: support MySQL 8.4 binlog status
    
    * pipeline: keep MariaDB binlog status compatible
---
 .../position/MySQLIncrementalPositionManager.java  | 30 ++++++++++++++-
 .../MySQLIncrementalPositionManagerTest.java       | 45 +++++++++++++++++++---
 2 files changed, 68 insertions(+), 7 deletions(-)

diff --git 
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/position/MySQLIncrementalPositionManager.java
 
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/position/MySQLIncrementalPositionManager.java
index d6f35c375e0..6482ea30f66 100644
--- 
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/position/MySQLIncrementalPositionManager.java
+++ 
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/position/MySQLIncrementalPositionManager.java
@@ -22,6 +22,7 @@ import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.DialectIncre
 
 import javax.sql.DataSource;
 import java.sql.Connection;
+import java.sql.DatabaseMetaData;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
@@ -31,6 +32,12 @@ import java.sql.SQLException;
  */
 public final class MySQLIncrementalPositionManager implements 
DialectIncrementalPositionManager {
     
+    private static final String MYSQL_DATABASE_PRODUCT_NAME = "MySQL";
+    
+    private static final String SHOW_MASTER_STATUS_SQL = "SHOW MASTER STATUS";
+    
+    private static final String SHOW_BINARY_LOG_STATUS_SQL = "SHOW BINARY LOG 
STATUS";
+    
     @Override
     public MySQLBinlogPosition init(final String data) {
         String[] array = data.split("#");
@@ -40,15 +47,34 @@ public final class MySQLIncrementalPositionManager 
implements DialectIncremental
     
     @Override
     public MySQLBinlogPosition init(final DataSource dataSource, final String 
slotNameSuffix) throws SQLException {
+        try (Connection connection = dataSource.getConnection()) {
+            return getBinlogPosition(connection);
+        }
+    }
+    
+    private MySQLBinlogPosition getBinlogPosition(final Connection connection) 
throws SQLException {
+        String sql = getShowBinlogStatusSQL(connection);
         try (
-                Connection connection = dataSource.getConnection();
-                PreparedStatement preparedStatement = 
connection.prepareStatement("SHOW MASTER STATUS");
+                PreparedStatement preparedStatement = 
connection.prepareStatement(sql);
                 ResultSet resultSet = preparedStatement.executeQuery()) {
             resultSet.next();
             return new MySQLBinlogPosition(resultSet.getString(1), 
resultSet.getLong(2));
         }
     }
     
+    private String getShowBinlogStatusSQL(final Connection connection) throws 
SQLException {
+        DatabaseMetaData databaseMetaData = connection.getMetaData();
+        return isShowBinaryLogStatusSupported(databaseMetaData) ? 
SHOW_BINARY_LOG_STATUS_SQL : SHOW_MASTER_STATUS_SQL;
+    }
+    
+    private boolean isShowBinaryLogStatusSupported(final DatabaseMetaData 
databaseMetaData) throws SQLException {
+        if 
(!MYSQL_DATABASE_PRODUCT_NAME.equals(databaseMetaData.getDatabaseProductName()))
 {
+            return false;
+        }
+        int majorVersion = databaseMetaData.getDatabaseMajorVersion();
+        return majorVersion > 8 || 8 == majorVersion && 
databaseMetaData.getDatabaseMinorVersion() >= 4;
+    }
+    
     @Override
     public String getDatabaseType() {
         return "MySQL";
diff --git 
a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/position/MySQLIncrementalPositionManagerTest.java
 
b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/position/MySQLIncrementalPositionManagerTest.java
index d80438c26a7..f1fb71b18ca 100644
--- 
a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/position/MySQLIncrementalPositionManagerTest.java
+++ 
b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/position/MySQLIncrementalPositionManagerTest.java
@@ -26,6 +26,7 @@ import org.junit.jupiter.api.Test;
 
 import javax.sql.DataSource;
 import java.sql.Connection;
+import java.sql.DatabaseMetaData;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
@@ -34,10 +35,19 @@ import static org.hamcrest.Matchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 class MySQLIncrementalPositionManagerTest {
     
+    private static final String MYSQL_DATABASE_PRODUCT_NAME = "MySQL";
+    
+    private static final String MARIADB_DATABASE_PRODUCT_NAME = "MariaDB";
+    
+    private static final String SHOW_MASTER_STATUS_SQL = "SHOW MASTER STATUS";
+    
+    private static final String SHOW_BINARY_LOG_STATUS_SQL = "SHOW BINARY LOG 
STATUS";
+    
     private static final String LOG_FILE_NAME = "binlog-000001";
     
     private static final long LOG_POSITION = 4L;
@@ -59,16 +69,41 @@ class MySQLIncrementalPositionManagerTest {
     }
     
     @Test
-    void assertInitWithDataSource() throws SQLException {
-        MySQLBinlogPosition actual = (MySQLBinlogPosition) 
incrementalPositionManager.init(createDataSource(), "");
+    void assertInitWithDataSourceByShowMasterStatus() throws SQLException {
+        assertInitWithDataSource(MYSQL_DATABASE_PRODUCT_NAME, 8, 3, 
SHOW_MASTER_STATUS_SQL);
+    }
+    
+    @Test
+    void assertInitWithDataSourceByShowBinaryLogStatus() throws SQLException {
+        assertInitWithDataSource(MYSQL_DATABASE_PRODUCT_NAME, 8, 4, 
SHOW_BINARY_LOG_STATUS_SQL);
+    }
+    
+    @Test
+    void assertInitWithDataSourceByShowBinaryLogStatusForHigherMajorVersion() 
throws SQLException {
+        assertInitWithDataSource(MYSQL_DATABASE_PRODUCT_NAME, 9, 0, 
SHOW_BINARY_LOG_STATUS_SQL);
+    }
+    
+    @Test
+    void assertInitWithDataSourceByShowMasterStatusForMariaDB() throws 
SQLException {
+        assertInitWithDataSource(MARIADB_DATABASE_PRODUCT_NAME, 11, 4, 
SHOW_MASTER_STATUS_SQL);
+    }
+    
+    private void assertInitWithDataSource(final String productName, final int 
majorVersion, final int minorVersion, final String expectedStatusSQL) throws 
SQLException {
+        Connection connection = mock(Connection.class);
+        MySQLBinlogPosition actual = (MySQLBinlogPosition) 
incrementalPositionManager.init(createDataSource(connection, productName, 
majorVersion, minorVersion, expectedStatusSQL), "");
         assertThat(actual.getFilename(), is(LOG_FILE_NAME));
         assertThat(actual.getPosition(), is(LOG_POSITION));
+        verify(connection).prepareStatement(expectedStatusSQL);
     }
     
-    DataSource createDataSource() throws SQLException {
-        Connection connection = mock(Connection.class);
+    private DataSource createDataSource(final Connection connection, final 
String productName, final int majorVersion, final int minorVersion, final 
String expectedStatusSQL) throws SQLException {
+        DatabaseMetaData databaseMetaData = mock(DatabaseMetaData.class);
+        
when(databaseMetaData.getDatabaseProductName()).thenReturn(productName);
+        
when(databaseMetaData.getDatabaseMajorVersion()).thenReturn(majorVersion);
+        
when(databaseMetaData.getDatabaseMinorVersion()).thenReturn(minorVersion);
+        when(connection.getMetaData()).thenReturn(databaseMetaData);
         PreparedStatement positionStatement = mockPositionStatement();
-        when(connection.prepareStatement("SHOW MASTER 
STATUS")).thenReturn(positionStatement);
+        
when(connection.prepareStatement(expectedStatusSQL)).thenReturn(positionStatement);
         return new MockedDataSource(connection);
     }
     

Reply via email to