This is an automated email from the ASF dual-hosted git repository.

duanzhengqiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new aa50565799f Remove useless BinlogPosition.serverId (#32492)
aa50565799f is described below

commit aa50565799fa08b0fdfce21447a3d59a8e2e3347
Author: Liang Zhang <[email protected]>
AuthorDate: Tue Aug 13 17:15:04 2024 +0800

    Remove useless BinlogPosition.serverId (#32492)
    
    * Remove useless BinlogPosition.serverId
    
    * Remove useless BinlogPosition.serverId
    
    * Remove useless BinlogPosition.serverId
---
 .../data/pipeline/mysql/ingest/MySQLIncrementalDumper.java |  4 ++--
 .../pipeline/mysql/ingest/MySQLIngestPositionManager.java  | 11 ++---------
 .../data/pipeline/mysql/ingest/binlog/BinlogPosition.java  |  2 --
 .../mysql/ingest/binlog/event/AbstractBinlogEvent.java     |  2 --
 .../ingest/client/netty/MySQLBinlogEventPacketDecoder.java |  3 ---
 .../pipeline/mysql/ingest/MySQLIncrementalDumperTest.java  |  2 +-
 .../mysql/ingest/MySQLIngestPositionManagerTest.java       | 14 --------------
 .../pipeline/mysql/ingest/binlog/BinlogPositionTest.java   |  2 +-
 8 files changed, 6 insertions(+), 34 deletions(-)

diff --git 
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
 
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
index 1290516f358..43074284e25 100644
--- 
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
+++ 
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
@@ -149,7 +149,7 @@ public final class MySQLIncrementalDumper extends 
AbstractPipelineLifecycleRunna
     }
     
     private PlaceholderRecord createPlaceholderRecord(final 
AbstractBinlogEvent event) {
-        PlaceholderRecord result = new PlaceholderRecord(new 
BinlogPosition(event.getFileName(), event.getPosition(), event.getServerId()));
+        PlaceholderRecord result = new PlaceholderRecord(new 
BinlogPosition(event.getFileName(), event.getPosition()));
         result.setCommitTime(event.getTimestamp() * 1000L);
         return result;
     }
@@ -218,7 +218,7 @@ public final class MySQLIncrementalDumper extends 
AbstractPipelineLifecycleRunna
     
     private DataRecord createDataRecord(final PipelineSQLOperationType type, 
final AbstractRowsEvent rowsEvent, final int columnCount) {
         String tableName = 
dumperContext.getCommonContext().getTableNameMapper().getLogicTableName(rowsEvent.getTableName()).toString();
-        IngestPosition position = new BinlogPosition(rowsEvent.getFileName(), 
rowsEvent.getPosition(), rowsEvent.getServerId());
+        IngestPosition position = new BinlogPosition(rowsEvent.getFileName(), 
rowsEvent.getPosition());
         DataRecord result = new DataRecord(type, tableName, position, 
columnCount);
         result.setActualTableName(rowsEvent.getTableName());
         result.setCommitTime(rowsEvent.getTimestamp() * 1000L);
diff --git 
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIngestPositionManager.java
 
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIngestPositionManager.java
index 6f345f81dfe..cbc3170f584 100644
--- 
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIngestPositionManager.java
+++ 
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIngestPositionManager.java
@@ -43,13 +43,12 @@ public final class MySQLIngestPositionManager implements 
DialectIngestPositionMa
     public BinlogPosition init(final String data) {
         String[] array = data.split("#");
         Preconditions.checkArgument(2 == array.length, "Unknown binlog 
position: %s", data);
-        return new BinlogPosition(array[0], Long.parseLong(array[1]), 0L);
+        return new BinlogPosition(array[0], Long.parseLong(array[1]));
     }
     
     private BinlogPosition getBinlogPosition(final Connection connection) 
throws SQLException {
         String filename;
         long position;
-        long serverId;
         try (
                 PreparedStatement preparedStatement = 
connection.prepareStatement("SHOW MASTER STATUS");
                 ResultSet resultSet = preparedStatement.executeQuery()) {
@@ -57,13 +56,7 @@ public final class MySQLIngestPositionManager implements 
DialectIngestPositionMa
             filename = resultSet.getString(1);
             position = resultSet.getLong(2);
         }
-        try (
-                PreparedStatement preparedStatement = 
connection.prepareStatement("SHOW VARIABLES LIKE 'server_id'");
-                ResultSet resultSet = preparedStatement.executeQuery()) {
-            resultSet.next();
-            serverId = resultSet.getLong(2);
-        }
-        return new BinlogPosition(filename, position, serverId);
+        return new BinlogPosition(filename, position);
     }
     
     @Override
diff --git 
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/binlog/BinlogPosition.java
 
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/binlog/BinlogPosition.java
index 79a27e7ac62..e70d4427dc6 100644
--- 
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/binlog/BinlogPosition.java
+++ 
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/binlog/BinlogPosition.java
@@ -32,8 +32,6 @@ public final class BinlogPosition implements IngestPosition {
     
     private final long position;
     
-    private final long serverId;
-    
     @Override
     public String toString() {
         return String.format("%s#%d", filename, position);
diff --git 
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/binlog/event/AbstractBinlogEvent.java
 
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/binlog/event/AbstractBinlogEvent.java
index a38ea752512..37d91c04853 100644
--- 
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/binlog/event/AbstractBinlogEvent.java
+++ 
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/binlog/event/AbstractBinlogEvent.java
@@ -27,8 +27,6 @@ import lombok.Setter;
 @Setter
 public abstract class AbstractBinlogEvent {
     
-    private long serverId;
-    
     private String fileName;
     
     private long position;
diff --git 
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/netty/MySQLBinlogEventPacketDecoder.java
 
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/netty/MySQLBinlogEventPacketDecoder.java
index 140660a8057..1d2984a3d77 100644
--- 
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/netty/MySQLBinlogEventPacketDecoder.java
+++ 
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/netty/MySQLBinlogEventPacketDecoder.java
@@ -231,7 +231,6 @@ public final class MySQLBinlogEventPacketDecoder extends 
ByteToMessageDecoder {
         rowsEvent.setFileName(binlogContext.getFileName());
         rowsEvent.setPosition(binlogEventHeader.getLogPos());
         rowsEvent.setTimestamp(binlogEventHeader.getTimestamp());
-        rowsEvent.setServerId(binlogEventHeader.getServerId());
     }
     
     private PlaceholderEvent decodePlaceholderEvent(final 
MySQLBinlogEventHeader binlogEventHeader, final MySQLPacketPayload payload) {
@@ -255,7 +254,6 @@ public final class MySQLBinlogEventPacketDecoder extends 
ByteToMessageDecoder {
         result.setFileName(binlogContext.getFileName());
         result.setPosition(binlogEventHeader.getLogPos());
         result.setTimestamp(binlogEventHeader.getTimestamp());
-        result.setServerId(binlogEventHeader.getServerId());
         return result;
     }
     
@@ -264,7 +262,6 @@ public final class MySQLBinlogEventPacketDecoder extends 
ByteToMessageDecoder {
         result.setFileName(binlogContext.getFileName());
         result.setPosition(binlogEventHeader.getLogPos());
         result.setTimestamp(binlogEventHeader.getTimestamp());
-        result.setServerId(binlogEventHeader.getServerId());
         return result;
     }
     
diff --git 
a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java
 
b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java
index 79ce7c57607..3e5180b1b56 100644
--- 
a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java
+++ 
b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java
@@ -91,7 +91,7 @@ class MySQLIncrementalDumperTest {
         MemoryPipelineChannel channel = new MemoryPipelineChannel(10000, 
records -> {
             
         });
-        incrementalDumper = new MySQLIncrementalDumper(dumperContext, new 
BinlogPosition("binlog-000001", 4L, 0L), channel, metaDataLoader);
+        incrementalDumper = new MySQLIncrementalDumper(dumperContext, new 
BinlogPosition("binlog-000001", 4L), channel, metaDataLoader);
         pipelineTableMetaData = new PipelineTableMetaData("t_order", 
mockOrderColumnsMetaDataMap(), Collections.emptyList());
         when(metaDataLoader.getTableMetaData(any(), 
any())).thenReturn(pipelineTableMetaData);
     }
diff --git 
a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIngestPositionManagerTest.java
 
b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIngestPositionManagerTest.java
index a6107d23c0b..c72750448e0 100644
--- 
a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIngestPositionManagerTest.java
+++ 
b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIngestPositionManagerTest.java
@@ -42,8 +42,6 @@ class MySQLIngestPositionManagerTest {
     
     private static final long LOG_POSITION = 4L;
     
-    private static final long SERVER_ID = 555555L;
-    
     @Mock(extraInterfaces = AutoCloseable.class)
     private DataSource dataSource;
     
@@ -55,15 +53,12 @@ class MySQLIngestPositionManagerTest {
         when(dataSource.getConnection()).thenReturn(connection);
         PreparedStatement positionStatement = mockPositionStatement();
         when(connection.prepareStatement("SHOW MASTER 
STATUS")).thenReturn(positionStatement);
-        PreparedStatement serverIdStatement = mockServerIdStatement();
-        when(connection.prepareStatement("SHOW VARIABLES LIKE 
'server_id'")).thenReturn(serverIdStatement);
     }
     
     @Test
     void assertGetCurrentPosition() throws SQLException {
         MySQLIngestPositionManager positionInitializer = new 
MySQLIngestPositionManager();
         BinlogPosition actual = positionInitializer.init(dataSource, "");
-        assertThat(actual.getServerId(), is(SERVER_ID));
         assertThat(actual.getFilename(), is(LOG_FILE_NAME));
         assertThat(actual.getPosition(), is(LOG_POSITION));
     }
@@ -77,13 +72,4 @@ class MySQLIngestPositionManagerTest {
         when(resultSet.getLong(2)).thenReturn(LOG_POSITION);
         return result;
     }
-    
-    private PreparedStatement mockServerIdStatement() throws SQLException {
-        PreparedStatement result = mock(PreparedStatement.class);
-        ResultSet resultSet = mock(ResultSet.class);
-        when(result.executeQuery()).thenReturn(resultSet);
-        when(resultSet.next()).thenReturn(true, false);
-        when(resultSet.getLong(2)).thenReturn(SERVER_ID);
-        return result;
-    }
 }
diff --git 
a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/binlog/BinlogPositionTest.java
 
b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/binlog/BinlogPositionTest.java
index 31ae65d93bc..da2df4e23e7 100644
--- 
a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/binlog/BinlogPositionTest.java
+++ 
b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/binlog/BinlogPositionTest.java
@@ -26,6 +26,6 @@ class BinlogPositionTest {
     
     @Test
     void assertToString() {
-        assertThat(new BinlogPosition("mysql-bin.000001", 4L, 0L).toString(), 
is("mysql-bin.000001#4"));
+        assertThat(new BinlogPosition("mysql-bin.000001", 4L).toString(), 
is("mysql-bin.000001#4"));
     }
 }

Reply via email to