This is an automated email from the ASF dual-hosted git repository.

zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new f0f1c2a8d11 Rename MySQLBaseBinlogEvent (#32563)
f0f1c2a8d11 is described below

commit f0f1c2a8d119acd6cc60f55eb84039725ff7317f
Author: Liang Zhang <[email protected]>
AuthorDate: Fri Aug 16 23:01:54 2024 +0800

    Rename MySQLBaseBinlogEvent (#32563)
    
    * Rename MySQLBaseBinlogEvent
    
    * Refactor MySQLWriteRowsBinlogEvent
    
    * Refactor MySQLUpdateRowsBinlogEvent
    
    * Refactor MySQLDeleteRowsBinlogEvent
---
 ...tBinlogEvent.java => MySQLBaseBinlogEvent.java} |  4 +-
 ...olderEvent.java => PlaceholderBinlogEvent.java} |  2 +-
 .../MySQLQueryBinlogEvent.java}                    |  5 +-
 .../MySQLBaseRowsBinlogEvent.java}                 |  7 ++-
 .../MySQLDeleteRowsBinlogEvent.java}               | 12 ++--
 .../MySQLUpdateRowsBinlogEvent.java}               | 14 ++---
 .../MySQLWriteRowsBinlogEvent.java}                | 12 ++--
 .../MySQLXidBinlogEvent.java}                      |  5 +-
 .../incremental/client/MySQLBinlogClient.java      | 24 ++++----
 .../netty/MySQLBinlogEventPacketDecoder.java       | 72 ++++++++++------------
 .../incremental/dumper/MySQLIncrementalDumper.java | 42 ++++++-------
 .../netty/MySQLBinlogEventPacketDecoderTest.java   | 30 ++++-----
 .../dumper/MySQLIncrementalDumperTest.java         | 49 +++++++--------
 13 files changed, 136 insertions(+), 142 deletions(-)

diff --git 
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/AbstractBinlogEvent.java
 
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/MySQLBaseBinlogEvent.java
similarity index 93%
rename from 
kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/AbstractBinlogEvent.java
rename to 
kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/MySQLBaseBinlogEvent.java
index 350f9bb56c2..63873bec2b7 100644
--- 
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/AbstractBinlogEvent.java
+++ 
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/MySQLBaseBinlogEvent.java
@@ -21,11 +21,11 @@ import lombok.Getter;
 import lombok.Setter;
 
 /**
- * Abstract binlog event.
+ * MySQL base binlog event.
  */
 @Getter
 @Setter
-public abstract class AbstractBinlogEvent {
+public abstract class MySQLBaseBinlogEvent {
     
     private String fileName;
     
diff --git 
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/PlaceholderEvent.java
 
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/PlaceholderBinlogEvent.java
similarity index 93%
rename from 
kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/PlaceholderEvent.java
rename to 
kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/PlaceholderBinlogEvent.java
index f37272f4625..ecc57d276ca 100644
--- 
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/PlaceholderEvent.java
+++ 
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/PlaceholderBinlogEvent.java
@@ -20,5 +20,5 @@ package 
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.
 /**
  * Placeholder binlog event, unsupported binlog event will replace it into 
this class.
  */
-public final class PlaceholderEvent extends AbstractBinlogEvent {
+public final class PlaceholderBinlogEvent extends MySQLBaseBinlogEvent {
 }
diff --git 
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/QueryEvent.java
 
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/query/MySQLQueryBinlogEvent.java
similarity index 88%
rename from 
kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/QueryEvent.java
rename to 
kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/query/MySQLQueryBinlogEvent.java
index 5b3e591dd73..a521588fc6b 100644
--- 
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/QueryEvent.java
+++ 
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/query/MySQLQueryBinlogEvent.java
@@ -15,10 +15,11 @@
  * limitations under the License.
  */
 
-package 
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event;
+package 
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.query;
 
 import lombok.Getter;
 import lombok.RequiredArgsConstructor;
+import 
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.MySQLBaseBinlogEvent;
 
 /**
  * Query event.This event is written into the binary log file for:
@@ -30,7 +31,7 @@ import lombok.RequiredArgsConstructor;
  */
 @RequiredArgsConstructor
 @Getter
-public final class QueryEvent extends AbstractBinlogEvent {
+public final class MySQLQueryBinlogEvent extends MySQLBaseBinlogEvent {
     
     private final long threadId;
     
diff --git 
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/AbstractRowsEvent.java
 
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/rows/MySQLBaseRowsBinlogEvent.java
similarity index 80%
rename from 
kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/AbstractRowsEvent.java
rename to 
kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/rows/MySQLBaseRowsBinlogEvent.java
index 56ecaeeb963..8bb91ffd833 100644
--- 
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/AbstractRowsEvent.java
+++ 
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/rows/MySQLBaseRowsBinlogEvent.java
@@ -15,17 +15,18 @@
  * limitations under the License.
  */
 
-package 
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event;
+package 
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.rows;
 
 import lombok.Getter;
 import lombok.Setter;
+import 
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.MySQLBaseBinlogEvent;
 
 /**
- * Abstract rows event.
+ * MySQL rows base event.
  */
 @Getter
 @Setter
-public abstract class AbstractRowsEvent extends AbstractBinlogEvent {
+public abstract class MySQLBaseRowsBinlogEvent extends MySQLBaseBinlogEvent {
     
     private String databaseName;
     
diff --git 
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/DeleteRowsEvent.java
 
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/rows/MySQLDeleteRowsBinlogEvent.java
similarity index 78%
rename from 
kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/DeleteRowsEvent.java
rename to 
kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/rows/MySQLDeleteRowsBinlogEvent.java
index 1d27a8dbd7a..e403d6d643c 100644
--- 
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/DeleteRowsEvent.java
+++ 
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/rows/MySQLDeleteRowsBinlogEvent.java
@@ -15,20 +15,20 @@
  * limitations under the License.
  */
 
-package 
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event;
+package 
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.rows;
 
 import lombok.Getter;
-import lombok.Setter;
+import lombok.RequiredArgsConstructor;
 
 import java.io.Serializable;
 import java.util.List;
 
 /**
- * Delete rows event.
+ * MySQL delete rows binlog event.
  */
+@RequiredArgsConstructor
 @Getter
-@Setter
-public final class DeleteRowsEvent extends AbstractRowsEvent {
+public final class MySQLDeleteRowsBinlogEvent extends MySQLBaseRowsBinlogEvent 
{
     
-    private List<Serializable[]> beforeRows;
+    private final List<Serializable[]> beforeRows;
 }
diff --git 
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/UpdateRowsEvent.java
 
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/rows/MySQLUpdateRowsBinlogEvent.java
similarity index 75%
rename from 
kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/UpdateRowsEvent.java
rename to 
kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/rows/MySQLUpdateRowsBinlogEvent.java
index 0f89a70f396..e8b99362368 100644
--- 
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/UpdateRowsEvent.java
+++ 
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/rows/MySQLUpdateRowsBinlogEvent.java
@@ -15,22 +15,22 @@
  * limitations under the License.
  */
 
-package 
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event;
+package 
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.rows;
 
 import lombok.Getter;
-import lombok.Setter;
+import lombok.RequiredArgsConstructor;
 
 import java.io.Serializable;
 import java.util.List;
 
 /**
- * Update rows event.
+ * MySQL update rows binlog event.
  */
+@RequiredArgsConstructor
 @Getter
-@Setter
-public final class UpdateRowsEvent extends AbstractRowsEvent {
+public final class MySQLUpdateRowsBinlogEvent extends MySQLBaseRowsBinlogEvent 
{
     
-    private List<Serializable[]> beforeRows;
+    private final List<Serializable[]> beforeRows;
     
-    private List<Serializable[]> afterRows;
+    private final List<Serializable[]> afterRows;
 }
diff --git 
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/WriteRowsEvent.java
 
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/rows/MySQLWriteRowsBinlogEvent.java
similarity index 79%
rename from 
kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/WriteRowsEvent.java
rename to 
kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/rows/MySQLWriteRowsBinlogEvent.java
index 35f76361006..912d54e14d1 100644
--- 
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/WriteRowsEvent.java
+++ 
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/rows/MySQLWriteRowsBinlogEvent.java
@@ -15,20 +15,20 @@
  * limitations under the License.
  */
 
-package 
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event;
+package 
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.rows;
 
 import lombok.Getter;
-import lombok.Setter;
+import lombok.RequiredArgsConstructor;
 
 import java.io.Serializable;
 import java.util.List;
 
 /**
- * Write rows event.
+ * MySQL write rows binlog event.
  */
+@RequiredArgsConstructor
 @Getter
-@Setter
-public final class WriteRowsEvent extends AbstractRowsEvent {
+public final class MySQLWriteRowsBinlogEvent extends MySQLBaseRowsBinlogEvent {
     
-    private List<Serializable[]> afterRows;
+    private final List<Serializable[]> afterRows;
 }
diff --git 
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/XidEvent.java
 
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/transaction/MySQLXidBinlogEvent.java
similarity index 85%
rename from 
kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/XidEvent.java
rename to 
kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/transaction/MySQLXidBinlogEvent.java
index 9b7589513e1..b3c2e2af885 100644
--- 
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/XidEvent.java
+++ 
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/transaction/MySQLXidBinlogEvent.java
@@ -15,10 +15,11 @@
  * limitations under the License.
  */
 
-package 
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event;
+package 
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.transaction;
 
 import lombok.Getter;
 import lombok.RequiredArgsConstructor;
+import 
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.MySQLBaseBinlogEvent;
 
 /**
  * XID event is generated for a COMMIT of a transaction that modifies one or 
more tables of an XA-capable storage engine.
@@ -27,7 +28,7 @@ import lombok.RequiredArgsConstructor;
  */
 @RequiredArgsConstructor
 @Getter
-public final class XidEvent extends AbstractBinlogEvent {
+public final class MySQLXidBinlogEvent extends MySQLBaseBinlogEvent {
     
     private final long xid;
 }
diff --git 
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/client/MySQLBinlogClient.java
 
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/client/MySQLBinlogClient.java
index 3ec1a166257..7ba01844b64 100644
--- 
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/client/MySQLBinlogClient.java
+++ 
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/client/MySQLBinlogClient.java
@@ -34,8 +34,8 @@ import lombok.RequiredArgsConstructor;
 import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.PipelineInternalException;
-import 
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.AbstractBinlogEvent;
-import 
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.PlaceholderEvent;
+import 
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.MySQLBaseBinlogEvent;
+import 
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.PlaceholderBinlogEvent;
 import 
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.client.netty.MySQLBinlogEventPacketDecoder;
 import 
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.client.netty.MySQLCommandPacketDecoder;
 import 
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.client.netty.MySQLNegotiateHandler;
@@ -77,7 +77,7 @@ public final class MySQLBinlogClient {
     
     private final boolean decodeWithTX;
     
-    private final ArrayBlockingQueue<List<AbstractBinlogEvent>> 
blockingEventQueue = new ArrayBlockingQueue<>(2500);
+    private final ArrayBlockingQueue<List<MySQLBaseBinlogEvent>> 
blockingEventQueue = new ArrayBlockingQueue<>(2500);
     
     private EventLoopGroup eventLoopGroup;
     
@@ -226,8 +226,8 @@ public final class MySQLBinlogClient {
         channel.writeAndFlush(new MySQLComBinlogDumpCommandPacket((int) 
binlogPosition, connectInfo.getServerId(), binlogFileName));
     }
     
-    private AbstractBinlogEvent getLastBinlogEvent(final String 
binlogFileName, final long binlogPosition) {
-        PlaceholderEvent result = new PlaceholderEvent();
+    private MySQLBaseBinlogEvent getLastBinlogEvent(final String 
binlogFileName, final long binlogPosition) {
+        PlaceholderBinlogEvent result = new PlaceholderBinlogEvent();
         result.setFileName(binlogFileName);
         result.setPosition(binlogPosition);
         return result;
@@ -242,12 +242,12 @@ public final class MySQLBinlogClient {
      *
      * @return binlog event
      */
-    public synchronized List<AbstractBinlogEvent> poll() {
+    public synchronized List<MySQLBaseBinlogEvent> poll() {
         if (!running) {
             return Collections.emptyList();
         }
         try {
-            List<AbstractBinlogEvent> result = blockingEventQueue.poll(100L, 
TimeUnit.MILLISECONDS);
+            List<MySQLBaseBinlogEvent> result = blockingEventQueue.poll(100L, 
TimeUnit.MILLISECONDS);
             return null == result ? Collections.emptyList() : result;
         } catch (final InterruptedException ignored) {
             Thread.currentThread().interrupt();
@@ -314,11 +314,11 @@ public final class MySQLBinlogClient {
     
     private final class MySQLBinlogEventHandler extends 
ChannelInboundHandlerAdapter {
         
-        private final AtomicReference<AbstractBinlogEvent> lastBinlogEvent;
+        private final AtomicReference<MySQLBaseBinlogEvent> lastBinlogEvent;
         
         private final AtomicBoolean reconnectRequested = new 
AtomicBoolean(false);
         
-        MySQLBinlogEventHandler(final AbstractBinlogEvent lastBinlogEvent) {
+        MySQLBinlogEventHandler(final MySQLBaseBinlogEvent lastBinlogEvent) {
             this.lastBinlogEvent = new AtomicReference<>(lastBinlogEvent);
         }
         
@@ -329,7 +329,7 @@ public final class MySQLBinlogClient {
                 return;
             }
             if (msg instanceof List) {
-                List<AbstractBinlogEvent> records = 
(List<AbstractBinlogEvent>) msg;
+                List<MySQLBaseBinlogEvent> records = 
(List<MySQLBaseBinlogEvent>) msg;
                 if (records.isEmpty()) {
                     log.warn("The records is empty");
                     return;
@@ -338,8 +338,8 @@ public final class MySQLBinlogClient {
                 blockingEventQueue.put(records);
                 return;
             }
-            if (msg instanceof AbstractBinlogEvent) {
-                lastBinlogEvent.set((AbstractBinlogEvent) msg);
+            if (msg instanceof MySQLBaseBinlogEvent) {
+                lastBinlogEvent.set((MySQLBaseBinlogEvent) msg);
                 
blockingEventQueue.put(Collections.singletonList(lastBinlogEvent.get()));
             }
         }
diff --git 
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/client/netty/MySQLBinlogEventPacketDecoder.java
 
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/client/netty/MySQLBinlogEventPacketDecoder.java
index d53c1291421..223c7ed08ea 100644
--- 
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/client/netty/MySQLBinlogEventPacketDecoder.java
+++ 
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/client/netty/MySQLBinlogEventPacketDecoder.java
@@ -24,14 +24,14 @@ import io.netty.handler.codec.ByteToMessageDecoder;
 import lombok.extern.slf4j.Slf4j;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.PipelineInternalException;
 import 
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.MySQLBinlogContext;
-import 
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.AbstractBinlogEvent;
-import 
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.AbstractRowsEvent;
-import 
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.DeleteRowsEvent;
-import 
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.PlaceholderEvent;
-import 
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.QueryEvent;
-import 
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.UpdateRowsEvent;
-import 
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.WriteRowsEvent;
-import 
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.XidEvent;
+import 
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.MySQLBaseBinlogEvent;
+import 
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.rows.MySQLBaseRowsBinlogEvent;
+import 
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.rows.MySQLDeleteRowsBinlogEvent;
+import 
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.PlaceholderBinlogEvent;
+import 
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.query.MySQLQueryBinlogEvent;
+import 
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.rows.MySQLUpdateRowsBinlogEvent;
+import 
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.rows.MySQLWriteRowsBinlogEvent;
+import 
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.transaction.MySQLXidBinlogEvent;
 import org.apache.shardingsphere.db.protocol.constant.CommonConstants;
 import 
org.apache.shardingsphere.db.protocol.mysql.constant.MySQLBinlogEventType;
 import 
org.apache.shardingsphere.db.protocol.mysql.packet.binlog.MySQLBinlogEventHeader;
@@ -58,7 +58,7 @@ public final class MySQLBinlogEventPacketDecoder extends 
ByteToMessageDecoder {
     
     private final boolean decodeWithTX;
     
-    private List<AbstractBinlogEvent> records = new LinkedList<>();
+    private List<MySQLBaseBinlogEvent> records = new LinkedList<>();
     
     public MySQLBinlogEventPacketDecoder(final int checksumLength, final 
Map<Long, MySQLBinlogTableMapEventPacket> tableMap, final boolean decodeWithTX) 
{
         this.decodeWithTX = decodeWithTX;
@@ -75,12 +75,12 @@ public final class MySQLBinlogEventPacketDecoder extends 
ByteToMessageDecoder {
             if (!checkEventIntegrity(in, binlogEventHeader)) {
                 return;
             }
-            Optional<AbstractBinlogEvent> binlogEvent = 
decodeEvent(binlogEventHeader, payload);
+            Optional<MySQLBaseBinlogEvent> binlogEvent = 
decodeEvent(binlogEventHeader, payload);
             if (!binlogEvent.isPresent()) {
                 skipChecksum(binlogEventHeader.getEventType(), in);
                 return;
             }
-            if (binlogEvent.get() instanceof PlaceholderEvent) {
+            if (binlogEvent.get() instanceof PlaceholderBinlogEvent) {
                 out.add(binlogEvent.get());
                 skipChecksum(binlogEventHeader.getEventType(), in);
                 return;
@@ -120,15 +120,15 @@ public final class MySQLBinlogEventPacketDecoder extends 
ByteToMessageDecoder {
         return true;
     }
     
-    private void processEventWithTX(final AbstractBinlogEvent binlogEvent, 
final List<Object> out) {
-        if (binlogEvent instanceof QueryEvent) {
-            QueryEvent queryEvent = (QueryEvent) binlogEvent;
+    private void processEventWithTX(final MySQLBaseBinlogEvent binlogEvent, 
final List<Object> out) {
+        if (binlogEvent instanceof MySQLQueryBinlogEvent) {
+            MySQLQueryBinlogEvent queryEvent = (MySQLQueryBinlogEvent) 
binlogEvent;
             if (TX_BEGIN_SQL.equals(queryEvent.getSql())) {
                 records = new LinkedList<>();
             } else {
                 out.add(binlogEvent);
             }
-        } else if (binlogEvent instanceof XidEvent) {
+        } else if (binlogEvent instanceof MySQLXidBinlogEvent) {
             records.add(binlogEvent);
             out.add(records);
         } else {
@@ -136,9 +136,9 @@ public final class MySQLBinlogEventPacketDecoder extends 
ByteToMessageDecoder {
         }
     }
     
-    private void processEventIgnoreTX(final AbstractBinlogEvent binlogEvent, 
final List<Object> out) {
-        if (binlogEvent instanceof QueryEvent) {
-            QueryEvent queryEvent = (QueryEvent) binlogEvent;
+    private void processEventIgnoreTX(final MySQLBaseBinlogEvent binlogEvent, 
final List<Object> out) {
+        if (binlogEvent instanceof MySQLQueryBinlogEvent) {
+            MySQLQueryBinlogEvent queryEvent = (MySQLQueryBinlogEvent) 
binlogEvent;
             if (TX_BEGIN_SQL.equals(queryEvent.getSql())) {
                 return;
             }
@@ -146,7 +146,7 @@ public final class MySQLBinlogEventPacketDecoder extends 
ByteToMessageDecoder {
         out.add(binlogEvent);
     }
     
-    private Optional<AbstractBinlogEvent> decodeEvent(final 
MySQLBinlogEventHeader binlogEventHeader, final MySQLPacketPayload payload) {
+    private Optional<MySQLBaseBinlogEvent> decodeEvent(final 
MySQLBinlogEventHeader binlogEventHeader, final MySQLPacketPayload payload) {
         switch 
(MySQLBinlogEventType.valueOf(binlogEventHeader.getEventType()).orElse(MySQLBinlogEventType.UNKNOWN_EVENT))
 {
             case ROTATE_EVENT:
                 decodeRotateEvent(binlogEventHeader, payload);
@@ -197,35 +197,31 @@ public final class MySQLBinlogEventPacketDecoder extends 
ByteToMessageDecoder {
         binlogContext.putTableMapEvent(packet.getTableId(), packet);
     }
     
-    private WriteRowsEvent decodeWriteRowsEventV2(final MySQLBinlogEventHeader 
binlogEventHeader, final MySQLPacketPayload payload) {
+    private MySQLWriteRowsBinlogEvent decodeWriteRowsEventV2(final 
MySQLBinlogEventHeader binlogEventHeader, final MySQLPacketPayload payload) {
         MySQLBinlogRowsEventPacket packet = new 
MySQLBinlogRowsEventPacket(binlogEventHeader, payload);
         packet.readRows(binlogContext.getTableMapEvent(packet.getTableId()), 
payload);
-        WriteRowsEvent result = new WriteRowsEvent();
+        MySQLWriteRowsBinlogEvent result = new 
MySQLWriteRowsBinlogEvent(packet.getRows());
         initRowsEvent(result, binlogEventHeader, packet.getTableId());
-        result.setAfterRows(packet.getRows());
         return result;
     }
     
-    private UpdateRowsEvent decodeUpdateRowsEventV2(final 
MySQLBinlogEventHeader binlogEventHeader, final MySQLPacketPayload payload) {
+    private MySQLUpdateRowsBinlogEvent decodeUpdateRowsEventV2(final 
MySQLBinlogEventHeader binlogEventHeader, final MySQLPacketPayload payload) {
         MySQLBinlogRowsEventPacket packet = new 
MySQLBinlogRowsEventPacket(binlogEventHeader, payload);
         packet.readRows(binlogContext.getTableMapEvent(packet.getTableId()), 
payload);
-        UpdateRowsEvent result = new UpdateRowsEvent();
+        MySQLUpdateRowsBinlogEvent result = new 
MySQLUpdateRowsBinlogEvent(packet.getRows(), packet.getRows2());
         initRowsEvent(result, binlogEventHeader, packet.getTableId());
-        result.setBeforeRows(packet.getRows());
-        result.setAfterRows(packet.getRows2());
         return result;
     }
     
-    private DeleteRowsEvent decodeDeleteRowsEventV2(final 
MySQLBinlogEventHeader binlogEventHeader, final MySQLPacketPayload payload) {
+    private MySQLDeleteRowsBinlogEvent decodeDeleteRowsEventV2(final 
MySQLBinlogEventHeader binlogEventHeader, final MySQLPacketPayload payload) {
         MySQLBinlogRowsEventPacket packet = new 
MySQLBinlogRowsEventPacket(binlogEventHeader, payload);
         packet.readRows(binlogContext.getTableMapEvent(packet.getTableId()), 
payload);
-        DeleteRowsEvent result = new DeleteRowsEvent();
+        MySQLDeleteRowsBinlogEvent result = new 
MySQLDeleteRowsBinlogEvent(packet.getRows());
         initRowsEvent(result, binlogEventHeader, packet.getTableId());
-        result.setBeforeRows(packet.getRows());
         return result;
     }
     
-    private void initRowsEvent(final AbstractRowsEvent rowsEvent, final 
MySQLBinlogEventHeader binlogEventHeader, final long tableId) {
+    private void initRowsEvent(final MySQLBaseRowsBinlogEvent rowsEvent, final 
MySQLBinlogEventHeader binlogEventHeader, final long tableId) {
         
rowsEvent.setDatabaseName(binlogContext.getTableMapEvent(tableId).getSchemaName());
         
rowsEvent.setTableName(binlogContext.getTableMapEvent(tableId).getTableName());
         rowsEvent.setFileName(binlogContext.getFileName());
@@ -233,8 +229,8 @@ public final class MySQLBinlogEventPacketDecoder extends 
ByteToMessageDecoder {
         rowsEvent.setTimestamp(binlogEventHeader.getTimestamp());
     }
     
-    private PlaceholderEvent decodePlaceholderEvent(final 
MySQLBinlogEventHeader binlogEventHeader, final MySQLPacketPayload payload) {
-        PlaceholderEvent result = createPlaceholderEvent(binlogEventHeader);
+    private PlaceholderBinlogEvent decodePlaceholderEvent(final 
MySQLBinlogEventHeader binlogEventHeader, final MySQLPacketPayload payload) {
+        PlaceholderBinlogEvent result = 
createPlaceholderEvent(binlogEventHeader);
         int remainDataLength = binlogEventHeader.getEventSize() + 1 - 
binlogEventHeader.getChecksumLength() - payload.getByteBuf().readerIndex();
         if (remainDataLength > 0) {
             payload.skipReserved(remainDataLength);
@@ -242,7 +238,7 @@ public final class MySQLBinlogEventPacketDecoder extends 
ByteToMessageDecoder {
         return result;
     }
     
-    private QueryEvent decodeQueryEvent(final MySQLBinlogEventHeader 
binlogEventHeader, final MySQLPacketPayload payload) {
+    private MySQLQueryBinlogEvent decodeQueryEvent(final 
MySQLBinlogEventHeader binlogEventHeader, final MySQLPacketPayload payload) {
         int threadId = payload.readInt4();
         int executionTime = payload.readInt4();
         payload.skipReserved(1);
@@ -250,15 +246,15 @@ public final class MySQLBinlogEventPacketDecoder extends 
ByteToMessageDecoder {
         payload.skipReserved(payload.readInt2());
         String databaseName = payload.readStringNul();
         String sql = 
payload.readStringFix(payload.getByteBuf().readableBytes() - 
binlogEventHeader.getChecksumLength());
-        QueryEvent result = new QueryEvent(threadId, executionTime, errorCode, 
databaseName, sql);
+        MySQLQueryBinlogEvent result = new MySQLQueryBinlogEvent(threadId, 
executionTime, errorCode, databaseName, sql);
         result.setFileName(binlogContext.getFileName());
         result.setPosition(binlogEventHeader.getLogPos());
         result.setTimestamp(binlogEventHeader.getTimestamp());
         return result;
     }
     
-    private XidEvent decodeXidEvent(final MySQLBinlogEventHeader 
binlogEventHeader, final MySQLPacketPayload payload) {
-        XidEvent result = new XidEvent(payload.readInt8());
+    private MySQLXidBinlogEvent decodeXidEvent(final MySQLBinlogEventHeader 
binlogEventHeader, final MySQLPacketPayload payload) {
+        MySQLXidBinlogEvent result = new 
MySQLXidBinlogEvent(payload.readInt8());
         result.setFileName(binlogContext.getFileName());
         result.setPosition(binlogEventHeader.getLogPos());
         result.setTimestamp(binlogEventHeader.getTimestamp());
@@ -266,8 +262,8 @@ public final class MySQLBinlogEventPacketDecoder extends 
ByteToMessageDecoder {
     }
     
     // TODO May be used again later, keep this method first.
-    private PlaceholderEvent createPlaceholderEvent(final 
MySQLBinlogEventHeader binlogEventHeader) {
-        PlaceholderEvent result = new PlaceholderEvent();
+    private PlaceholderBinlogEvent createPlaceholderEvent(final 
MySQLBinlogEventHeader binlogEventHeader) {
+        PlaceholderBinlogEvent result = new PlaceholderBinlogEvent();
         result.setFileName(binlogContext.getFileName());
         result.setPosition(binlogEventHeader.getLogPos());
         result.setTimestamp(binlogEventHeader.getTimestamp());
diff --git 
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/dumper/MySQLIncrementalDumper.java
 
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/dumper/MySQLIncrementalDumper.java
index 8c15f860ade..3f6f2d2720e 100644
--- 
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/dumper/MySQLIncrementalDumper.java
+++ 
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/dumper/MySQLIncrementalDumper.java
@@ -34,11 +34,11 @@ import 
org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineColum
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineTableMetaData;
 import 
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.MySQLBinlogPosition;
 import 
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.data.MySQLBinlogDataHandler;
-import 
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.AbstractBinlogEvent;
-import 
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.AbstractRowsEvent;
-import 
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.DeleteRowsEvent;
-import 
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.UpdateRowsEvent;
-import 
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.WriteRowsEvent;
+import 
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.MySQLBaseBinlogEvent;
+import 
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.rows.MySQLBaseRowsBinlogEvent;
+import 
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.rows.MySQLDeleteRowsBinlogEvent;
+import 
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.rows.MySQLUpdateRowsBinlogEvent;
+import 
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.rows.MySQLWriteRowsBinlogEvent;
 import 
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.client.ConnectInfo;
 import 
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.client.MySQLBinlogClient;
 import org.apache.shardingsphere.infra.annotation.HighFrequencyInvocation;
@@ -103,9 +103,9 @@ public final class MySQLIncrementalDumper extends 
AbstractPipelineLifecycleRunna
         }
     }
     
-    private void handleEvents(final List<AbstractBinlogEvent> events) {
+    private void handleEvents(final List<MySQLBaseBinlogEvent> events) {
         List<Record> dataRecords = new LinkedList<>();
-        for (AbstractBinlogEvent each : events) {
+        for (MySQLBaseBinlogEvent each : events) {
             dataRecords.addAll(handleEvent(each));
         }
         if (!dataRecords.isEmpty()) {
@@ -113,28 +113,28 @@ public final class MySQLIncrementalDumper extends 
AbstractPipelineLifecycleRunna
         }
     }
     
-    private List<? extends Record> handleEvent(final AbstractBinlogEvent 
event) {
-        if (!(event instanceof AbstractRowsEvent)) {
+    private List<? extends Record> handleEvent(final MySQLBaseBinlogEvent 
event) {
+        if (!(event instanceof MySQLBaseRowsBinlogEvent)) {
             return Collections.singletonList(createPlaceholderRecord(event));
         }
-        AbstractRowsEvent rowsEvent = (AbstractRowsEvent) event;
+        MySQLBaseRowsBinlogEvent rowsEvent = (MySQLBaseRowsBinlogEvent) event;
         if (!rowsEvent.getDatabaseName().equals(catalog) || 
!dumperContext.getCommonContext().getTableNameMapper().containsTable(rowsEvent.getTableName()))
 {
             return Collections.singletonList(createPlaceholderRecord(event));
         }
         PipelineTableMetaData tableMetaData = 
getPipelineTableMetaData(rowsEvent.getTableName());
-        if (event instanceof WriteRowsEvent) {
-            return handleWriteRowsEvent((WriteRowsEvent) event, tableMetaData);
+        if (event instanceof MySQLWriteRowsBinlogEvent) {
+            return handleWriteRowsEvent((MySQLWriteRowsBinlogEvent) event, 
tableMetaData);
         }
-        if (event instanceof UpdateRowsEvent) {
-            return handleUpdateRowsEvent((UpdateRowsEvent) event, 
tableMetaData);
+        if (event instanceof MySQLUpdateRowsBinlogEvent) {
+            return handleUpdateRowsEvent((MySQLUpdateRowsBinlogEvent) event, 
tableMetaData);
         }
-        if (event instanceof DeleteRowsEvent) {
-            return handleDeleteRowsEvent((DeleteRowsEvent) event, 
tableMetaData);
+        if (event instanceof MySQLDeleteRowsBinlogEvent) {
+            return handleDeleteRowsEvent((MySQLDeleteRowsBinlogEvent) event, 
tableMetaData);
         }
         return Collections.emptyList();
     }
     
-    private PlaceholderRecord createPlaceholderRecord(final 
AbstractBinlogEvent event) {
+    private PlaceholderRecord createPlaceholderRecord(final 
MySQLBaseBinlogEvent event) {
         PlaceholderRecord result = new PlaceholderRecord(new 
MySQLBinlogPosition(event.getFileName(), event.getPosition()));
         result.setCommitTime(event.getTimestamp() * 1000L);
         return result;
@@ -145,7 +145,7 @@ public final class MySQLIncrementalDumper extends 
AbstractPipelineLifecycleRunna
         return 
metaDataLoader.getTableMetaData(dumperContext.getCommonContext().getTableAndSchemaNameMapper().getSchemaName(logicTableName),
 actualTableName);
     }
     
-    private List<DataRecord> handleWriteRowsEvent(final WriteRowsEvent event, 
final PipelineTableMetaData tableMetaData) {
+    private List<DataRecord> handleWriteRowsEvent(final 
MySQLWriteRowsBinlogEvent event, final PipelineTableMetaData tableMetaData) {
         List<DataRecord> result = new LinkedList<>();
         for (Serializable[] each : event.getAfterRows()) {
             DataRecord dataRecord = 
createDataRecord(PipelineSQLOperationType.INSERT, event, each.length);
@@ -158,7 +158,7 @@ public final class MySQLIncrementalDumper extends 
AbstractPipelineLifecycleRunna
         return result;
     }
     
-    private List<DataRecord> handleUpdateRowsEvent(final UpdateRowsEvent 
event, final PipelineTableMetaData tableMetaData) {
+    private List<DataRecord> handleUpdateRowsEvent(final 
MySQLUpdateRowsBinlogEvent event, final PipelineTableMetaData tableMetaData) {
         List<DataRecord> result = new LinkedList<>();
         for (int i = 0; i < event.getBeforeRows().size(); i++) {
             Serializable[] beforeValues = event.getBeforeRows().get(i);
@@ -176,7 +176,7 @@ public final class MySQLIncrementalDumper extends 
AbstractPipelineLifecycleRunna
         return result;
     }
     
-    private List<DataRecord> handleDeleteRowsEvent(final DeleteRowsEvent 
event, final PipelineTableMetaData tableMetaData) {
+    private List<DataRecord> handleDeleteRowsEvent(final 
MySQLDeleteRowsBinlogEvent event, final PipelineTableMetaData tableMetaData) {
         List<DataRecord> result = new LinkedList<>();
         for (Serializable[] each : event.getBeforeRows()) {
             DataRecord dataRecord = 
createDataRecord(PipelineSQLOperationType.DELETE, event, each.length);
@@ -189,7 +189,7 @@ public final class MySQLIncrementalDumper extends 
AbstractPipelineLifecycleRunna
         return result;
     }
     
-    private DataRecord createDataRecord(final PipelineSQLOperationType type, 
final AbstractRowsEvent rowsEvent, final int columnCount) {
+    private DataRecord createDataRecord(final PipelineSQLOperationType type, 
final MySQLBaseRowsBinlogEvent rowsEvent, final int columnCount) {
         String tableName = 
dumperContext.getCommonContext().getTableNameMapper().getLogicTableName(rowsEvent.getTableName()).toString();
         IngestPosition binlogPosition = new 
MySQLBinlogPosition(rowsEvent.getFileName(), rowsEvent.getPosition());
         DataRecord result = new DataRecord(type, tableName, binlogPosition, 
columnCount);
diff --git 
a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/client/netty/MySQLBinlogEventPacketDecoderTest.java
 
b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/client/netty/MySQLBinlogEventPacketDecoderTest.java
index 9d27fdd8f12..b4a6a798658 100644
--- 
a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/client/netty/MySQLBinlogEventPacketDecoderTest.java
+++ 
b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/client/netty/MySQLBinlogEventPacketDecoderTest.java
@@ -24,11 +24,11 @@ import io.netty.buffer.Unpooled;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.util.internal.StringUtil;
 import 
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.MySQLBinlogContext;
-import 
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.DeleteRowsEvent;
-import 
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.QueryEvent;
-import 
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.UpdateRowsEvent;
-import 
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.WriteRowsEvent;
-import 
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.XidEvent;
+import 
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.rows.MySQLDeleteRowsBinlogEvent;
+import 
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.query.MySQLQueryBinlogEvent;
+import 
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.rows.MySQLUpdateRowsBinlogEvent;
+import 
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.rows.MySQLWriteRowsBinlogEvent;
+import 
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.transaction.MySQLXidBinlogEvent;
 import org.apache.shardingsphere.db.protocol.constant.CommonConstants;
 import 
org.apache.shardingsphere.db.protocol.mysql.constant.MySQLBinaryColumnType;
 import 
org.apache.shardingsphere.db.protocol.mysql.packet.binlog.row.MySQLBinlogTableMapEventPacket;
@@ -120,9 +120,9 @@ class MySQLBinlogEventPacketDecoderTest {
         binlogEventPacketDecoder.decode(channelHandlerContext, byteBuf, 
decodedEvents);
         assertFalse(decodedEvents.isEmpty());
         Object actual = decodedEvents.get(0);
-        assertInstanceOf(QueryEvent.class, actual);
-        assertThat(((QueryEvent) actual).getTimestamp(), is(1700193011L));
-        assertThat(((QueryEvent) actual).getPosition(), is(168785090L));
+        assertInstanceOf(MySQLQueryBinlogEvent.class, actual);
+        assertThat(((MySQLQueryBinlogEvent) actual).getTimestamp(), 
is(1700193011L));
+        assertThat(((MySQLQueryBinlogEvent) actual).getPosition(), 
is(168785090L));
     }
     
     @Test
@@ -149,8 +149,8 @@ class MySQLBinlogEventPacketDecoderTest {
         binlogEventPacketDecoder.decode(channelHandlerContext, byteBuf, 
decodedEvents);
         assertThat(decodedEvents.size(), is(1));
         LinkedList<?> actualEventList = (LinkedList<?>) decodedEvents.get(0);
-        assertThat(actualEventList.get(0), instanceOf(WriteRowsEvent.class));
-        WriteRowsEvent actual = (WriteRowsEvent) actualEventList.get(0);
+        assertThat(actualEventList.get(0), 
instanceOf(MySQLWriteRowsBinlogEvent.class));
+        MySQLWriteRowsBinlogEvent actual = (MySQLWriteRowsBinlogEvent) 
actualEventList.get(0);
         assertThat(actual.getAfterRows().get(0), is(new Serializable[]{1L, 1, 
new MySQLBinaryString("SUCCESS".getBytes()), null}));
     }
     
@@ -167,8 +167,8 @@ class MySQLBinlogEventPacketDecoderTest {
         binlogEventPacketDecoder.decode(channelHandlerContext, byteBuf, 
decodedEvents);
         assertThat(decodedEvents.size(), is(1));
         LinkedList<?> actualEventList = (LinkedList<?>) decodedEvents.get(0);
-        assertThat(actualEventList.get(0), instanceOf(UpdateRowsEvent.class));
-        UpdateRowsEvent actual = (UpdateRowsEvent) actualEventList.get(0);
+        assertThat(actualEventList.get(0), 
instanceOf(MySQLUpdateRowsBinlogEvent.class));
+        MySQLUpdateRowsBinlogEvent actual = (MySQLUpdateRowsBinlogEvent) 
actualEventList.get(0);
         assertThat(actual.getBeforeRows().get(0), is(new Serializable[]{1L, 1, 
new MySQLBinaryString("SUCCESS".getBytes()), null}));
         assertThat(actual.getAfterRows().get(0), is(new Serializable[]{1L, 1, 
new MySQLBinaryString("updated".getBytes()), null}));
     }
@@ -185,9 +185,9 @@ class MySQLBinlogEventPacketDecoderTest {
         binlogEventPacketDecoder.decode(channelHandlerContext, byteBuf, 
decodedEvents);
         assertThat(decodedEvents.size(), is(1));
         LinkedList<?> actualEventList = (LinkedList<?>) decodedEvents.get(0);
-        assertThat(actualEventList.get(0), instanceOf(DeleteRowsEvent.class));
-        assertThat(actualEventList.get(1), instanceOf(XidEvent.class));
-        DeleteRowsEvent actual = (DeleteRowsEvent) actualEventList.get(0);
+        assertThat(actualEventList.get(0), 
instanceOf(MySQLDeleteRowsBinlogEvent.class));
+        assertThat(actualEventList.get(1), 
instanceOf(MySQLXidBinlogEvent.class));
+        MySQLDeleteRowsBinlogEvent actual = (MySQLDeleteRowsBinlogEvent) 
actualEventList.get(0);
         assertThat(actual.getBeforeRows().get(0), is(new Serializable[]{1L, 1, 
new MySQLBinaryString("SUCCESS".getBytes()), null}));
     }
     
diff --git 
a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/dumper/MySQLIncrementalDumperTest.java
 
b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/dumper/MySQLIncrementalDumperTest.java
index 51a398810ed..cae3e3bb80c 100644
--- 
a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/dumper/MySQLIncrementalDumperTest.java
+++ 
b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/dumper/MySQLIncrementalDumperTest.java
@@ -32,11 +32,11 @@ import 
org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTabl
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineColumnMetaData;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineTableMetaData;
 import 
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.MySQLBinlogPosition;
-import 
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.AbstractBinlogEvent;
-import 
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.DeleteRowsEvent;
-import 
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.PlaceholderEvent;
-import 
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.UpdateRowsEvent;
-import 
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.WriteRowsEvent;
+import 
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.MySQLBaseBinlogEvent;
+import 
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.rows.MySQLDeleteRowsBinlogEvent;
+import 
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.PlaceholderBinlogEvent;
+import 
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.rows.MySQLUpdateRowsBinlogEvent;
+import 
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.rows.MySQLWriteRowsBinlogEvent;
 import 
org.apache.shardingsphere.infra.metadata.caseinsensitive.CaseInsensitiveIdentifier;
 import org.apache.shardingsphere.test.fixture.jdbc.MockedDriver;
 import org.junit.jupiter.api.BeforeAll;
@@ -141,16 +141,15 @@ class MySQLIncrementalDumperTest {
         assertThat(((DataRecord) actual.get(0)).getColumnCount(), is(3));
     }
     
-    private WriteRowsEvent createWriteRowsEvent() {
-        WriteRowsEvent result = new WriteRowsEvent();
+    private MySQLWriteRowsBinlogEvent createWriteRowsEvent() {
+        MySQLWriteRowsBinlogEvent result = new 
MySQLWriteRowsBinlogEvent(Collections.singletonList(new Serializable[]{101, 1, 
"OK"}));
         result.setDatabaseName("");
         result.setTableName("t_order");
-        result.setAfterRows(Collections.singletonList(new Serializable[]{101, 
1, "OK"}));
         return result;
     }
     
-    private List<Record> getRecordsByWriteRowsEvent(final WriteRowsEvent 
rowsEvent) throws ReflectiveOperationException {
-        Method method = 
MySQLIncrementalDumper.class.getDeclaredMethod("handleWriteRowsEvent", 
WriteRowsEvent.class, PipelineTableMetaData.class);
+    private List<Record> getRecordsByWriteRowsEvent(final 
MySQLWriteRowsBinlogEvent rowsEvent) throws ReflectiveOperationException {
+        Method method = 
MySQLIncrementalDumper.class.getDeclaredMethod("handleWriteRowsEvent", 
MySQLWriteRowsBinlogEvent.class, PipelineTableMetaData.class);
         return (List<Record>) Plugins.getMemberAccessor().invoke(method, 
incrementalDumper, rowsEvent, pipelineTableMetaData);
     }
     
@@ -163,17 +162,15 @@ class MySQLIncrementalDumperTest {
         assertThat(((DataRecord) actual.get(0)).getColumnCount(), is(3));
     }
     
-    private UpdateRowsEvent createUpdateRowsEvent() {
-        UpdateRowsEvent result = new UpdateRowsEvent();
+    private MySQLUpdateRowsBinlogEvent createUpdateRowsEvent() {
+        MySQLUpdateRowsBinlogEvent result = new 
MySQLUpdateRowsBinlogEvent(Collections.singletonList(new Serializable[]{101, 1, 
"OK"}), Collections.singletonList(new Serializable[]{101, 1, "OK2"}));
         result.setDatabaseName("test");
         result.setTableName("t_order");
-        result.setBeforeRows(Collections.singletonList(new Serializable[]{101, 
1, "OK"}));
-        result.setAfterRows(Collections.singletonList(new Serializable[]{101, 
1, "OK2"}));
         return result;
     }
     
-    private List<Record> getRecordsByUpdateRowsEvent(final UpdateRowsEvent 
rowsEvent) throws ReflectiveOperationException {
-        Method method = 
MySQLIncrementalDumper.class.getDeclaredMethod("handleUpdateRowsEvent", 
UpdateRowsEvent.class, PipelineTableMetaData.class);
+    private List<Record> getRecordsByUpdateRowsEvent(final 
MySQLUpdateRowsBinlogEvent rowsEvent) throws ReflectiveOperationException {
+        Method method = 
MySQLIncrementalDumper.class.getDeclaredMethod("handleUpdateRowsEvent", 
MySQLUpdateRowsBinlogEvent.class, PipelineTableMetaData.class);
         return (List<Record>) Plugins.getMemberAccessor().invoke(method, 
incrementalDumper, rowsEvent, pipelineTableMetaData);
     }
     
@@ -186,39 +183,37 @@ class MySQLIncrementalDumperTest {
         assertThat(((DataRecord) actual.get(0)).getColumnCount(), is(3));
     }
     
-    private DeleteRowsEvent createDeleteRowsEvent() {
-        DeleteRowsEvent result = new DeleteRowsEvent();
+    private MySQLDeleteRowsBinlogEvent createDeleteRowsEvent() {
+        MySQLDeleteRowsBinlogEvent result = new 
MySQLDeleteRowsBinlogEvent(Collections.singletonList(new Serializable[]{101, 1, 
"OK"}));
         result.setDatabaseName("");
         result.setTableName("t_order");
-        result.setBeforeRows(Collections.singletonList(new Serializable[]{101, 
1, "OK"}));
         return result;
     }
     
-    private List<Record> getRecordsByDeleteRowsEvent(final DeleteRowsEvent 
rowsEvent) throws ReflectiveOperationException {
-        Method method = 
MySQLIncrementalDumper.class.getDeclaredMethod("handleDeleteRowsEvent", 
DeleteRowsEvent.class, PipelineTableMetaData.class);
+    private List<Record> getRecordsByDeleteRowsEvent(final 
MySQLDeleteRowsBinlogEvent rowsEvent) throws ReflectiveOperationException {
+        Method method = 
MySQLIncrementalDumper.class.getDeclaredMethod("handleDeleteRowsEvent", 
MySQLDeleteRowsBinlogEvent.class, PipelineTableMetaData.class);
         return (List<Record>) Plugins.getMemberAccessor().invoke(method, 
incrementalDumper, rowsEvent, pipelineTableMetaData);
     }
     
     @Test
     void assertPlaceholderEvent() throws ReflectiveOperationException {
-        List<Record> actual = (List<Record>) 
Plugins.getMemberAccessor().invoke(MySQLIncrementalDumper.class.getDeclaredMethod("handleEvent",
 AbstractBinlogEvent.class),
-                incrementalDumper, new PlaceholderEvent());
+        List<Record> actual = (List<Record>) 
Plugins.getMemberAccessor().invoke(MySQLIncrementalDumper.class.getDeclaredMethod("handleEvent",
 MySQLBaseBinlogEvent.class),
+                incrementalDumper, new PlaceholderBinlogEvent());
         assertThat(actual.size(), is(1));
     }
     
     @Test
     void assertRowsEventFiltered() throws ReflectiveOperationException {
-        List<Record> actual = (List<Record>) 
Plugins.getMemberAccessor().invoke(MySQLIncrementalDumper.class.getDeclaredMethod("handleEvent",
 AbstractBinlogEvent.class),
+        List<Record> actual = (List<Record>) 
Plugins.getMemberAccessor().invoke(MySQLIncrementalDumper.class.getDeclaredMethod("handleEvent",
 MySQLBaseBinlogEvent.class),
                 incrementalDumper, getFilteredWriteRowsEvent());
         assertThat(actual.size(), is(1));
         assertThat(actual.get(0), instanceOf(DataRecord.class));
     }
     
-    private WriteRowsEvent getFilteredWriteRowsEvent() {
-        WriteRowsEvent result = new WriteRowsEvent();
+    private MySQLWriteRowsBinlogEvent getFilteredWriteRowsEvent() {
+        MySQLWriteRowsBinlogEvent result = new 
MySQLWriteRowsBinlogEvent(Collections.singletonList(new Serializable[]{1}));
         result.setDatabaseName("test");
         result.setTableName("t_order");
-        result.setAfterRows(Collections.singletonList(new Serializable[]{1}));
         return result;
     }
 }

Reply via email to