This is an automated email from the ASF dual-hosted git repository.

zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new cf9e6a9a5bb Refactor MySQLBaseBinlogEvent (#32567)
cf9e6a9a5bb is described below

commit cf9e6a9a5bb5cdb0dfa998d3e0cbde5f0a5d1c68
Author: Liang Zhang <[email protected]>
AuthorDate: Sat Aug 17 21:39:24 2024 +0800

    Refactor MySQLBaseBinlogEvent (#32567)
    
    * Refactor MySQLBaseBinlogEvent
    
    * Refactor MySQLBaseBinlogEvent
---
 .../binlog/event/MySQLBaseBinlogEvent.java         | 10 ++---
 .../binlog/event/PlaceholderBinlogEvent.java       |  4 ++
 .../binlog/event/query/MySQLQueryBinlogEvent.java  | 12 +++++-
 .../event/rows/MySQLBaseRowsBinlogEvent.java       |  8 +++-
 .../event/rows/MySQLDeleteRowsBinlogEvent.java     |  4 +-
 .../event/rows/MySQLUpdateRowsBinlogEvent.java     |  5 ++-
 .../event/rows/MySQLWriteRowsBinlogEvent.java      |  4 +-
 .../event/transaction/MySQLXidBinlogEvent.java     |  7 +++-
 .../incremental/client/MySQLBinlogClient.java      |  9 +----
 .../netty/MySQLBinlogEventPacketDecoder.java       | 47 +++++-----------------
 .../dumper/MySQLIncrementalDumperTest.java         | 12 +++---
 11 files changed, 54 insertions(+), 68 deletions(-)

diff --git 
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/MySQLBaseBinlogEvent.java
 
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/MySQLBaseBinlogEvent.java
index 63873bec2b7..3d3b1d16880 100644
--- 
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/MySQLBaseBinlogEvent.java
+++ 
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/MySQLBaseBinlogEvent.java
@@ -18,18 +18,18 @@
 package 
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event;
 
 import lombok.Getter;
-import lombok.Setter;
+import lombok.RequiredArgsConstructor;
 
 /**
  * MySQL base binlog event.
  */
+@RequiredArgsConstructor
 @Getter
-@Setter
 public abstract class MySQLBaseBinlogEvent {
     
-    private String fileName;
+    private final String fileName;
     
-    private long position;
+    private final long position;
     
-    private long timestamp;
+    private final long timestamp;
 }
diff --git 
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/PlaceholderBinlogEvent.java
 
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/PlaceholderBinlogEvent.java
index ecc57d276ca..db007c6eb3f 100644
--- 
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/PlaceholderBinlogEvent.java
+++ 
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/PlaceholderBinlogEvent.java
@@ -21,4 +21,8 @@ package 
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.
  * Placeholder binlog event, unsupported binlog event will replace it into 
this class.
  */
 public final class PlaceholderBinlogEvent extends MySQLBaseBinlogEvent {
+    
+    public PlaceholderBinlogEvent(final String fileName, final long position, 
final long timestamp) {
+        super(fileName, position, timestamp);
+    }
 }
diff --git 
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/query/MySQLQueryBinlogEvent.java
 
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/query/MySQLQueryBinlogEvent.java
index a521588fc6b..8b0332a13c0 100644
--- 
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/query/MySQLQueryBinlogEvent.java
+++ 
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/query/MySQLQueryBinlogEvent.java
@@ -18,7 +18,6 @@
 package 
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.query;
 
 import lombok.Getter;
-import lombok.RequiredArgsConstructor;
 import 
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.MySQLBaseBinlogEvent;
 
 /**
@@ -29,7 +28,6 @@ import 
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.e
  *
  * @see <a 
href="https://dev.mysql.com/doc/dev/mysql-server/latest/page_protocol_replication_binlog_event.html#sect_protocol_replication_event_query";>QUERY_EVENT</a>
  */
-@RequiredArgsConstructor
 @Getter
 public final class MySQLQueryBinlogEvent extends MySQLBaseBinlogEvent {
     
@@ -42,4 +40,14 @@ public final class MySQLQueryBinlogEvent extends 
MySQLBaseBinlogEvent {
     private final String databaseName;
     
     private final String sql;
+    
+    public MySQLQueryBinlogEvent(final String fileName, final long position, 
final long timestamp,
+                                 final long threadId, final long 
executionTime, final int errorCode, final String databaseName, final String 
sql) {
+        super(fileName, position, timestamp);
+        this.threadId = threadId;
+        this.executionTime = executionTime;
+        this.errorCode = errorCode;
+        this.databaseName = databaseName;
+        this.sql = sql;
+    }
 }
diff --git 
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/rows/MySQLBaseRowsBinlogEvent.java
 
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/rows/MySQLBaseRowsBinlogEvent.java
index 0c9d39784be..dfa8fda0168 100644
--- 
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/rows/MySQLBaseRowsBinlogEvent.java
+++ 
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/rows/MySQLBaseRowsBinlogEvent.java
@@ -18,17 +18,21 @@
 package 
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.rows;
 
 import lombok.Getter;
-import lombok.RequiredArgsConstructor;
 import 
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.MySQLBaseBinlogEvent;
 
 /**
  * MySQL rows base event.
  */
-@RequiredArgsConstructor
 @Getter
 public abstract class MySQLBaseRowsBinlogEvent extends MySQLBaseBinlogEvent {
     
     private final String databaseName;
     
     private final String tableName;
+    
+    public MySQLBaseRowsBinlogEvent(final String fileName, final long 
position, final long timestamp, final String databaseName, final String 
tableName) {
+        super(fileName, position, timestamp);
+        this.databaseName = databaseName;
+        this.tableName = tableName;
+    }
 }
diff --git 
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/rows/MySQLDeleteRowsBinlogEvent.java
 
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/rows/MySQLDeleteRowsBinlogEvent.java
index 50e3d14d6b4..e08ce8589e8 100644
--- 
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/rows/MySQLDeleteRowsBinlogEvent.java
+++ 
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/rows/MySQLDeleteRowsBinlogEvent.java
@@ -30,8 +30,8 @@ public final class MySQLDeleteRowsBinlogEvent extends 
MySQLBaseRowsBinlogEvent {
     
     private final List<Serializable[]> beforeRows;
     
-    public MySQLDeleteRowsBinlogEvent(final String databaseName, final String 
tableName, final List<Serializable[]> beforeRows) {
-        super(databaseName, tableName);
+    public MySQLDeleteRowsBinlogEvent(final String fileName, final long 
position, final long timestamp, final String databaseName, final String 
tableName, final List<Serializable[]> beforeRows) {
+        super(fileName, position, timestamp, databaseName, tableName);
         this.beforeRows = beforeRows;
     }
 }
diff --git 
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/rows/MySQLUpdateRowsBinlogEvent.java
 
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/rows/MySQLUpdateRowsBinlogEvent.java
index 8c3379ea786..e3eef2f0326 100644
--- 
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/rows/MySQLUpdateRowsBinlogEvent.java
+++ 
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/rows/MySQLUpdateRowsBinlogEvent.java
@@ -32,8 +32,9 @@ public final class MySQLUpdateRowsBinlogEvent extends 
MySQLBaseRowsBinlogEvent {
     
     private final List<Serializable[]> afterRows;
     
-    public MySQLUpdateRowsBinlogEvent(final String databaseName, final String 
tableName, final List<Serializable[]> beforeRows, final List<Serializable[]> 
afterRows) {
-        super(databaseName, tableName);
+    public MySQLUpdateRowsBinlogEvent(final String fileName, final long 
position, final long timestamp,
+                                      final String databaseName, final String 
tableName, final List<Serializable[]> beforeRows, final List<Serializable[]> 
afterRows) {
+        super(fileName, position, timestamp, databaseName, tableName);
         this.beforeRows = beforeRows;
         this.afterRows = afterRows;
     }
diff --git 
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/rows/MySQLWriteRowsBinlogEvent.java
 
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/rows/MySQLWriteRowsBinlogEvent.java
index db1029bfac0..25e73d8e8fc 100644
--- 
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/rows/MySQLWriteRowsBinlogEvent.java
+++ 
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/rows/MySQLWriteRowsBinlogEvent.java
@@ -30,8 +30,8 @@ public final class MySQLWriteRowsBinlogEvent extends 
MySQLBaseRowsBinlogEvent {
     
     private final List<Serializable[]> afterRows;
     
-    public MySQLWriteRowsBinlogEvent(final String databaseName, final String 
tableName, final List<Serializable[]> afterRows) {
-        super(databaseName, tableName);
+    public MySQLWriteRowsBinlogEvent(final String fileName, final long 
position, final long timestamp, final String databaseName, final String 
tableName, final List<Serializable[]> afterRows) {
+        super(fileName, position, timestamp, databaseName, tableName);
         this.afterRows = afterRows;
     }
 }
diff --git 
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/transaction/MySQLXidBinlogEvent.java
 
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/transaction/MySQLXidBinlogEvent.java
index b3c2e2af885..6601f3c2ec8 100644
--- 
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/transaction/MySQLXidBinlogEvent.java
+++ 
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/transaction/MySQLXidBinlogEvent.java
@@ -18,7 +18,6 @@
 package 
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.transaction;
 
 import lombok.Getter;
-import lombok.RequiredArgsConstructor;
 import 
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.MySQLBaseBinlogEvent;
 
 /**
@@ -26,9 +25,13 @@ import 
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.e
  *
  * @see <a 
href="https://dev.mysql.com/doc/dev/mysql-server/latest/page_protocol_replication_binlog_event.html#sect_protocol_replication_event_xid";>XID_EVENT</a>
  */
-@RequiredArgsConstructor
 @Getter
 public final class MySQLXidBinlogEvent extends MySQLBaseBinlogEvent {
     
     private final long xid;
+    
+    public MySQLXidBinlogEvent(final String fileName, final long position, 
final long timestamp, final long xid) {
+        super(fileName, position, timestamp);
+        this.xid = xid;
+    }
 }
diff --git 
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/client/MySQLBinlogClient.java
 
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/client/MySQLBinlogClient.java
index 7ba01844b64..4436f14e6e2 100644
--- 
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/client/MySQLBinlogClient.java
+++ 
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/client/MySQLBinlogClient.java
@@ -221,18 +221,11 @@ public final class MySQLBinlogClient {
         channel.pipeline().remove(MySQLCommandResponseHandler.class);
         String tableKey = String.join(":", connectInfo.getHost(), 
String.valueOf(connectInfo.getPort()));
         channel.pipeline().addLast(new 
MySQLBinlogEventPacketDecoder(checksumLength, 
GlobalTableMapEventMapping.getTableMapEventMap(tableKey), decodeWithTX));
-        channel.pipeline().addLast(new 
MySQLBinlogEventHandler(getLastBinlogEvent(binlogFileName, binlogPosition)));
+        channel.pipeline().addLast(new MySQLBinlogEventHandler(new 
PlaceholderBinlogEvent(binlogFileName, binlogPosition, 0L)));
         resetSequenceID();
         channel.writeAndFlush(new MySQLComBinlogDumpCommandPacket((int) 
binlogPosition, connectInfo.getServerId(), binlogFileName));
     }
     
-    private MySQLBaseBinlogEvent getLastBinlogEvent(final String 
binlogFileName, final long binlogPosition) {
-        PlaceholderBinlogEvent result = new PlaceholderBinlogEvent();
-        result.setFileName(binlogFileName);
-        result.setPosition(binlogPosition);
-        return result;
-    }
-    
     private void resetSequenceID() {
         channel.attr(MySQLConstants.MYSQL_SEQUENCE_ID).get().set(0);
     }
diff --git 
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/client/netty/MySQLBinlogEventPacketDecoder.java
 
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/client/netty/MySQLBinlogEventPacketDecoder.java
index 25f7919687c..edc6021a28c 100644
--- 
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/client/netty/MySQLBinlogEventPacketDecoder.java
+++ 
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/client/netty/MySQLBinlogEventPacketDecoder.java
@@ -25,10 +25,9 @@ import lombok.extern.slf4j.Slf4j;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.PipelineInternalException;
 import 
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.MySQLBinlogContext;
 import 
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.MySQLBaseBinlogEvent;
-import 
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.rows.MySQLBaseRowsBinlogEvent;
-import 
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.rows.MySQLDeleteRowsBinlogEvent;
 import 
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.PlaceholderBinlogEvent;
 import 
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.query.MySQLQueryBinlogEvent;
+import 
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.rows.MySQLDeleteRowsBinlogEvent;
 import 
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.rows.MySQLUpdateRowsBinlogEvent;
 import 
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.rows.MySQLWriteRowsBinlogEvent;
 import 
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.transaction.MySQLXidBinlogEvent;
@@ -201,37 +200,28 @@ public final class MySQLBinlogEventPacketDecoder extends 
ByteToMessageDecoder {
         MySQLBinlogRowsEventPacket packet = new 
MySQLBinlogRowsEventPacket(binlogEventHeader, payload);
         MySQLBinlogTableMapEventPacket tableMapEventPacket = 
binlogContext.getTableMapEvent(packet.getTableId());
         packet.readRows(tableMapEventPacket, payload);
-        MySQLWriteRowsBinlogEvent result = new 
MySQLWriteRowsBinlogEvent(tableMapEventPacket.getSchemaName(), 
tableMapEventPacket.getTableName(), packet.getRows());
-        initRowsEvent(result, binlogEventHeader);
-        return result;
+        return new MySQLWriteRowsBinlogEvent(binlogContext.getFileName(),
+                binlogEventHeader.getLogPos(), 
binlogEventHeader.getTimestamp(), tableMapEventPacket.getSchemaName(), 
tableMapEventPacket.getTableName(), packet.getRows());
     }
     
     private MySQLUpdateRowsBinlogEvent decodeUpdateRowsEventV2(final 
MySQLBinlogEventHeader binlogEventHeader, final MySQLPacketPayload payload) {
         MySQLBinlogRowsEventPacket packet = new 
MySQLBinlogRowsEventPacket(binlogEventHeader, payload);
         MySQLBinlogTableMapEventPacket tableMapEventPacket = 
binlogContext.getTableMapEvent(packet.getTableId());
         packet.readRows(tableMapEventPacket, payload);
-        MySQLUpdateRowsBinlogEvent result = new 
MySQLUpdateRowsBinlogEvent(tableMapEventPacket.getSchemaName(), 
tableMapEventPacket.getTableName(), packet.getRows(), packet.getRows2());
-        initRowsEvent(result, binlogEventHeader);
-        return result;
+        return new MySQLUpdateRowsBinlogEvent(binlogContext.getFileName(),
+                binlogEventHeader.getLogPos(), 
binlogEventHeader.getTimestamp(), tableMapEventPacket.getSchemaName(), 
tableMapEventPacket.getTableName(), packet.getRows(), packet.getRows2());
     }
     
     private MySQLDeleteRowsBinlogEvent decodeDeleteRowsEventV2(final 
MySQLBinlogEventHeader binlogEventHeader, final MySQLPacketPayload payload) {
         MySQLBinlogRowsEventPacket packet = new 
MySQLBinlogRowsEventPacket(binlogEventHeader, payload);
         MySQLBinlogTableMapEventPacket tableMapEventPacket = 
binlogContext.getTableMapEvent(packet.getTableId());
         packet.readRows(tableMapEventPacket, payload);
-        MySQLDeleteRowsBinlogEvent result = new 
MySQLDeleteRowsBinlogEvent(tableMapEventPacket.getSchemaName(), 
tableMapEventPacket.getTableName(), packet.getRows());
-        initRowsEvent(result, binlogEventHeader);
-        return result;
-    }
-    
-    private void initRowsEvent(final MySQLBaseRowsBinlogEvent rowsEvent, final 
MySQLBinlogEventHeader binlogEventHeader) {
-        rowsEvent.setFileName(binlogContext.getFileName());
-        rowsEvent.setPosition(binlogEventHeader.getLogPos());
-        rowsEvent.setTimestamp(binlogEventHeader.getTimestamp());
+        return new MySQLDeleteRowsBinlogEvent(binlogContext.getFileName(),
+                binlogEventHeader.getLogPos(), 
binlogEventHeader.getTimestamp(), tableMapEventPacket.getSchemaName(), 
tableMapEventPacket.getTableName(), packet.getRows());
     }
     
     private PlaceholderBinlogEvent decodePlaceholderEvent(final 
MySQLBinlogEventHeader binlogEventHeader, final MySQLPacketPayload payload) {
-        PlaceholderBinlogEvent result = 
createPlaceholderEvent(binlogEventHeader);
+        PlaceholderBinlogEvent result = new 
PlaceholderBinlogEvent(binlogContext.getFileName(), 
binlogEventHeader.getLogPos(), binlogEventHeader.getTimestamp());
         int remainDataLength = binlogEventHeader.getEventSize() + 1 - 
binlogEventHeader.getChecksumLength() - payload.getByteBuf().readerIndex();
         if (remainDataLength > 0) {
             payload.skipReserved(remainDataLength);
@@ -247,28 +237,11 @@ public final class MySQLBinlogEventPacketDecoder extends 
ByteToMessageDecoder {
         payload.skipReserved(payload.readInt2());
         String databaseName = payload.readStringNul();
         String sql = 
payload.readStringFix(payload.getByteBuf().readableBytes() - 
binlogEventHeader.getChecksumLength());
-        MySQLQueryBinlogEvent result = new MySQLQueryBinlogEvent(threadId, 
executionTime, errorCode, databaseName, sql);
-        result.setFileName(binlogContext.getFileName());
-        result.setPosition(binlogEventHeader.getLogPos());
-        result.setTimestamp(binlogEventHeader.getTimestamp());
-        return result;
+        return new MySQLQueryBinlogEvent(binlogContext.getFileName(), 
binlogEventHeader.getLogPos(), binlogEventHeader.getTimestamp(), threadId, 
executionTime, errorCode, databaseName, sql);
     }
     
     private MySQLXidBinlogEvent decodeXidEvent(final MySQLBinlogEventHeader 
binlogEventHeader, final MySQLPacketPayload payload) {
-        MySQLXidBinlogEvent result = new 
MySQLXidBinlogEvent(payload.readInt8());
-        result.setFileName(binlogContext.getFileName());
-        result.setPosition(binlogEventHeader.getLogPos());
-        result.setTimestamp(binlogEventHeader.getTimestamp());
-        return result;
-    }
-    
-    // TODO May be used again later, keep this method first.
-    private PlaceholderBinlogEvent createPlaceholderEvent(final 
MySQLBinlogEventHeader binlogEventHeader) {
-        PlaceholderBinlogEvent result = new PlaceholderBinlogEvent();
-        result.setFileName(binlogContext.getFileName());
-        result.setPosition(binlogEventHeader.getLogPos());
-        result.setTimestamp(binlogEventHeader.getTimestamp());
-        return result;
+        return new MySQLXidBinlogEvent(binlogContext.getFileName(), 
binlogEventHeader.getLogPos(), binlogEventHeader.getTimestamp(), 
payload.readInt8());
     }
     
     private void skipChecksum(final int eventType, final ByteBuf in) {
diff --git 
a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/dumper/MySQLIncrementalDumperTest.java
 
b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/dumper/MySQLIncrementalDumperTest.java
index e7926fe5499..151add22cf7 100644
--- 
a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/dumper/MySQLIncrementalDumperTest.java
+++ 
b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/dumper/MySQLIncrementalDumperTest.java
@@ -134,7 +134,7 @@ class MySQLIncrementalDumperTest {
     
     @Test
     void assertWriteRowsEvent() throws ReflectiveOperationException {
-        List<Record> actual = getRecordsByWriteRowsEvent(new 
MySQLWriteRowsBinlogEvent("", "t_order", Collections.singletonList(new 
Serializable[]{101, 1, "OK"})));
+        List<Record> actual = getRecordsByWriteRowsEvent(new 
MySQLWriteRowsBinlogEvent("", 0, 0L, "", "t_order", 
Collections.singletonList(new Serializable[]{101, 1, "OK"})));
         assertThat(actual.size(), is(1));
         assertThat(actual.get(0), instanceOf(DataRecord.class));
         assertThat(((DataRecord) actual.get(0)).getType(), 
is(PipelineSQLOperationType.INSERT));
@@ -148,8 +148,8 @@ class MySQLIncrementalDumperTest {
     
     @Test
     void assertUpdateRowsEvent() throws ReflectiveOperationException {
-        List<Record> actual = getRecordsByUpdateRowsEvent(
-                new MySQLUpdateRowsBinlogEvent("test", "t_order", 
Collections.singletonList(new Serializable[]{101, 1, "OK"}), 
Collections.singletonList(new Serializable[]{101, 1, "OK2"})));
+        List<Record> actual = getRecordsByUpdateRowsEvent(new 
MySQLUpdateRowsBinlogEvent(
+                "", 0, 0L, "test", "t_order", Collections.singletonList(new 
Serializable[]{101, 1, "OK"}), Collections.singletonList(new 
Serializable[]{101, 1, "OK2"})));
         assertThat(actual.size(), is(1));
         assertThat(actual.get(0), instanceOf(DataRecord.class));
         assertThat(((DataRecord) actual.get(0)).getType(), 
is(PipelineSQLOperationType.UPDATE));
@@ -163,7 +163,7 @@ class MySQLIncrementalDumperTest {
     
     @Test
     void assertDeleteRowsEvent() throws ReflectiveOperationException {
-        List<Record> actual = getRecordsByDeleteRowsEvent(new 
MySQLDeleteRowsBinlogEvent("", "t_order", Collections.singletonList(new 
Serializable[]{101, 1, "OK"})));
+        List<Record> actual = getRecordsByDeleteRowsEvent(new 
MySQLDeleteRowsBinlogEvent("", 0, 0L, "", "t_order", 
Collections.singletonList(new Serializable[]{101, 1, "OK"})));
         assertThat(actual.size(), is(1));
         assertThat(actual.get(0), instanceOf(DataRecord.class));
         assertThat(((DataRecord) actual.get(0)).getType(), 
is(PipelineSQLOperationType.DELETE));
@@ -178,14 +178,14 @@ class MySQLIncrementalDumperTest {
     @Test
     void assertPlaceholderEvent() throws ReflectiveOperationException {
         List<Record> actual = (List<Record>) 
Plugins.getMemberAccessor().invoke(MySQLIncrementalDumper.class.getDeclaredMethod("handleEvent",
 MySQLBaseBinlogEvent.class),
-                incrementalDumper, new PlaceholderBinlogEvent());
+                incrementalDumper, new PlaceholderBinlogEvent("", 0, 0L));
         assertThat(actual.size(), is(1));
     }
     
     @Test
     void assertRowsEventFiltered() throws ReflectiveOperationException {
         List<Record> actual = (List<Record>) 
Plugins.getMemberAccessor().invoke(MySQLIncrementalDumper.class.getDeclaredMethod("handleEvent",
 MySQLBaseBinlogEvent.class),
-                incrementalDumper, new MySQLWriteRowsBinlogEvent("test", 
"t_order", Collections.singletonList(new Serializable[]{1})));
+                incrementalDumper, new MySQLWriteRowsBinlogEvent("", 0, 0L, 
"test", "t_order", Collections.singletonList(new Serializable[]{1})));
         assertThat(actual.size(), is(1));
         assertThat(actual.get(0), instanceOf(DataRecord.class));
     }

Reply via email to