This is an automated email from the ASF dual-hosted git repository.
duanzhengqiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new aa50565799f Remove useless BinlogPosition.serverId (#32492)
aa50565799f is described below
commit aa50565799fa08b0fdfce21447a3d59a8e2e3347
Author: Liang Zhang <[email protected]>
AuthorDate: Tue Aug 13 17:15:04 2024 +0800
Remove useless BinlogPosition.serverId (#32492)
* Remove useless BinlogPosition.serverId
* Remove useless BinlogPosition.serverId
* Remove useless BinlogPosition.serverId
---
.../data/pipeline/mysql/ingest/MySQLIncrementalDumper.java | 4 ++--
.../pipeline/mysql/ingest/MySQLIngestPositionManager.java | 11 ++---------
.../data/pipeline/mysql/ingest/binlog/BinlogPosition.java | 2 --
.../mysql/ingest/binlog/event/AbstractBinlogEvent.java | 2 --
.../ingest/client/netty/MySQLBinlogEventPacketDecoder.java | 3 ---
.../pipeline/mysql/ingest/MySQLIncrementalDumperTest.java | 2 +-
.../mysql/ingest/MySQLIngestPositionManagerTest.java | 14 --------------
.../pipeline/mysql/ingest/binlog/BinlogPositionTest.java | 2 +-
8 files changed, 6 insertions(+), 34 deletions(-)
diff --git
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
index 1290516f358..43074284e25 100644
---
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
+++
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
@@ -149,7 +149,7 @@ public final class MySQLIncrementalDumper extends
AbstractPipelineLifecycleRunna
}
private PlaceholderRecord createPlaceholderRecord(final
AbstractBinlogEvent event) {
- PlaceholderRecord result = new PlaceholderRecord(new
BinlogPosition(event.getFileName(), event.getPosition(), event.getServerId()));
+ PlaceholderRecord result = new PlaceholderRecord(new
BinlogPosition(event.getFileName(), event.getPosition()));
result.setCommitTime(event.getTimestamp() * 1000L);
return result;
}
@@ -218,7 +218,7 @@ public final class MySQLIncrementalDumper extends
AbstractPipelineLifecycleRunna
private DataRecord createDataRecord(final PipelineSQLOperationType type,
final AbstractRowsEvent rowsEvent, final int columnCount) {
String tableName =
dumperContext.getCommonContext().getTableNameMapper().getLogicTableName(rowsEvent.getTableName()).toString();
- IngestPosition position = new BinlogPosition(rowsEvent.getFileName(),
rowsEvent.getPosition(), rowsEvent.getServerId());
+ IngestPosition position = new BinlogPosition(rowsEvent.getFileName(),
rowsEvent.getPosition());
DataRecord result = new DataRecord(type, tableName, position,
columnCount);
result.setActualTableName(rowsEvent.getTableName());
result.setCommitTime(rowsEvent.getTimestamp() * 1000L);
diff --git
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIngestPositionManager.java
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIngestPositionManager.java
index 6f345f81dfe..cbc3170f584 100644
---
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIngestPositionManager.java
+++
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIngestPositionManager.java
@@ -43,13 +43,12 @@ public final class MySQLIngestPositionManager implements
DialectIngestPositionMa
public BinlogPosition init(final String data) {
String[] array = data.split("#");
Preconditions.checkArgument(2 == array.length, "Unknown binlog
position: %s", data);
- return new BinlogPosition(array[0], Long.parseLong(array[1]), 0L);
+ return new BinlogPosition(array[0], Long.parseLong(array[1]));
}
private BinlogPosition getBinlogPosition(final Connection connection)
throws SQLException {
String filename;
long position;
- long serverId;
try (
PreparedStatement preparedStatement =
connection.prepareStatement("SHOW MASTER STATUS");
ResultSet resultSet = preparedStatement.executeQuery()) {
@@ -57,13 +56,7 @@ public final class MySQLIngestPositionManager implements
DialectIngestPositionMa
filename = resultSet.getString(1);
position = resultSet.getLong(2);
}
- try (
- PreparedStatement preparedStatement =
connection.prepareStatement("SHOW VARIABLES LIKE 'server_id'");
- ResultSet resultSet = preparedStatement.executeQuery()) {
- resultSet.next();
- serverId = resultSet.getLong(2);
- }
- return new BinlogPosition(filename, position, serverId);
+ return new BinlogPosition(filename, position);
}
@Override
diff --git
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/binlog/BinlogPosition.java
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/binlog/BinlogPosition.java
index 79a27e7ac62..e70d4427dc6 100644
---
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/binlog/BinlogPosition.java
+++
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/binlog/BinlogPosition.java
@@ -32,8 +32,6 @@ public final class BinlogPosition implements IngestPosition {
private final long position;
- private final long serverId;
-
@Override
public String toString() {
return String.format("%s#%d", filename, position);
diff --git
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/binlog/event/AbstractBinlogEvent.java
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/binlog/event/AbstractBinlogEvent.java
index a38ea752512..37d91c04853 100644
---
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/binlog/event/AbstractBinlogEvent.java
+++
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/binlog/event/AbstractBinlogEvent.java
@@ -27,8 +27,6 @@ import lombok.Setter;
@Setter
public abstract class AbstractBinlogEvent {
- private long serverId;
-
private String fileName;
private long position;
diff --git
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/netty/MySQLBinlogEventPacketDecoder.java
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/netty/MySQLBinlogEventPacketDecoder.java
index 140660a8057..1d2984a3d77 100644
---
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/netty/MySQLBinlogEventPacketDecoder.java
+++
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/netty/MySQLBinlogEventPacketDecoder.java
@@ -231,7 +231,6 @@ public final class MySQLBinlogEventPacketDecoder extends
ByteToMessageDecoder {
rowsEvent.setFileName(binlogContext.getFileName());
rowsEvent.setPosition(binlogEventHeader.getLogPos());
rowsEvent.setTimestamp(binlogEventHeader.getTimestamp());
- rowsEvent.setServerId(binlogEventHeader.getServerId());
}
private PlaceholderEvent decodePlaceholderEvent(final
MySQLBinlogEventHeader binlogEventHeader, final MySQLPacketPayload payload) {
@@ -255,7 +254,6 @@ public final class MySQLBinlogEventPacketDecoder extends
ByteToMessageDecoder {
result.setFileName(binlogContext.getFileName());
result.setPosition(binlogEventHeader.getLogPos());
result.setTimestamp(binlogEventHeader.getTimestamp());
- result.setServerId(binlogEventHeader.getServerId());
return result;
}
@@ -264,7 +262,6 @@ public final class MySQLBinlogEventPacketDecoder extends
ByteToMessageDecoder {
result.setFileName(binlogContext.getFileName());
result.setPosition(binlogEventHeader.getLogPos());
result.setTimestamp(binlogEventHeader.getTimestamp());
- result.setServerId(binlogEventHeader.getServerId());
return result;
}
diff --git
a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java
b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java
index 79ce7c57607..3e5180b1b56 100644
---
a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java
+++
b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java
@@ -91,7 +91,7 @@ class MySQLIncrementalDumperTest {
MemoryPipelineChannel channel = new MemoryPipelineChannel(10000,
records -> {
});
- incrementalDumper = new MySQLIncrementalDumper(dumperContext, new
BinlogPosition("binlog-000001", 4L, 0L), channel, metaDataLoader);
+ incrementalDumper = new MySQLIncrementalDumper(dumperContext, new
BinlogPosition("binlog-000001", 4L), channel, metaDataLoader);
pipelineTableMetaData = new PipelineTableMetaData("t_order",
mockOrderColumnsMetaDataMap(), Collections.emptyList());
when(metaDataLoader.getTableMetaData(any(),
any())).thenReturn(pipelineTableMetaData);
}
diff --git
a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIngestPositionManagerTest.java
b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIngestPositionManagerTest.java
index a6107d23c0b..c72750448e0 100644
---
a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIngestPositionManagerTest.java
+++
b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIngestPositionManagerTest.java
@@ -42,8 +42,6 @@ class MySQLIngestPositionManagerTest {
private static final long LOG_POSITION = 4L;
- private static final long SERVER_ID = 555555L;
-
@Mock(extraInterfaces = AutoCloseable.class)
private DataSource dataSource;
@@ -55,15 +53,12 @@ class MySQLIngestPositionManagerTest {
when(dataSource.getConnection()).thenReturn(connection);
PreparedStatement positionStatement = mockPositionStatement();
when(connection.prepareStatement("SHOW MASTER
STATUS")).thenReturn(positionStatement);
- PreparedStatement serverIdStatement = mockServerIdStatement();
- when(connection.prepareStatement("SHOW VARIABLES LIKE
'server_id'")).thenReturn(serverIdStatement);
}
@Test
void assertGetCurrentPosition() throws SQLException {
MySQLIngestPositionManager positionInitializer = new
MySQLIngestPositionManager();
BinlogPosition actual = positionInitializer.init(dataSource, "");
- assertThat(actual.getServerId(), is(SERVER_ID));
assertThat(actual.getFilename(), is(LOG_FILE_NAME));
assertThat(actual.getPosition(), is(LOG_POSITION));
}
@@ -77,13 +72,4 @@ class MySQLIngestPositionManagerTest {
when(resultSet.getLong(2)).thenReturn(LOG_POSITION);
return result;
}
-
- private PreparedStatement mockServerIdStatement() throws SQLException {
- PreparedStatement result = mock(PreparedStatement.class);
- ResultSet resultSet = mock(ResultSet.class);
- when(result.executeQuery()).thenReturn(resultSet);
- when(resultSet.next()).thenReturn(true, false);
- when(resultSet.getLong(2)).thenReturn(SERVER_ID);
- return result;
- }
}
diff --git
a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/binlog/BinlogPositionTest.java
b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/binlog/BinlogPositionTest.java
index 31ae65d93bc..da2df4e23e7 100644
---
a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/binlog/BinlogPositionTest.java
+++
b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/binlog/BinlogPositionTest.java
@@ -26,6 +26,6 @@ class BinlogPositionTest {
@Test
void assertToString() {
- assertThat(new BinlogPosition("mysql-bin.000001", 4L, 0L).toString(),
is("mysql-bin.000001#4"));
+ assertThat(new BinlogPosition("mysql-bin.000001", 4L).toString(),
is("mysql-bin.000001#4"));
}
}