This is an automated email from the ASF dual-hosted git repository.

zhaojinchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new addeb1d1a01 Refactor MySQLBaseRowsBinlogEvent (#32566)
addeb1d1a01 is described below

commit addeb1d1a0107ab1e108ab03fe89bd5acdc8385c
Author: Liang Zhang <[email protected]>
AuthorDate: Sat Aug 17 21:07:00 2024 +0800

    Refactor MySQLBaseRowsBinlogEvent (#32566)
    
    * Refactor MySQLBaseRowsBinlogEvent
    
    * Refactor MySQLBaseRowsBinlogEvent
---
 .../event/rows/MySQLBaseRowsBinlogEvent.java       |  8 ++---
 .../event/rows/MySQLDeleteRowsBinlogEvent.java     |  7 ++--
 .../event/rows/MySQLUpdateRowsBinlogEvent.java     |  8 +++--
 .../event/rows/MySQLWriteRowsBinlogEvent.java      |  7 ++--
 .../netty/MySQLBinlogEventPacketDecoder.java       | 25 ++++++++-------
 .../dumper/MySQLIncrementalDumperTest.java         | 37 +++-------------------
 6 files changed, 38 insertions(+), 54 deletions(-)

diff --git 
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/rows/MySQLBaseRowsBinlogEvent.java
 
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/rows/MySQLBaseRowsBinlogEvent.java
index 8bb91ffd833..0c9d39784be 100644
--- 
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/rows/MySQLBaseRowsBinlogEvent.java
+++ 
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/rows/MySQLBaseRowsBinlogEvent.java
@@ -18,17 +18,17 @@
 package 
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.rows;
 
 import lombok.Getter;
-import lombok.Setter;
+import lombok.RequiredArgsConstructor;
 import 
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.MySQLBaseBinlogEvent;
 
 /**
  * MySQL rows base event.
  */
+@RequiredArgsConstructor
 @Getter
-@Setter
 public abstract class MySQLBaseRowsBinlogEvent extends MySQLBaseBinlogEvent {
     
-    private String databaseName;
+    private final String databaseName;
     
-    private String tableName;
+    private final String tableName;
 }
diff --git 
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/rows/MySQLDeleteRowsBinlogEvent.java
 
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/rows/MySQLDeleteRowsBinlogEvent.java
index e403d6d643c..50e3d14d6b4 100644
--- 
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/rows/MySQLDeleteRowsBinlogEvent.java
+++ 
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/rows/MySQLDeleteRowsBinlogEvent.java
@@ -18,7 +18,6 @@
 package 
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.rows;
 
 import lombok.Getter;
-import lombok.RequiredArgsConstructor;
 
 import java.io.Serializable;
 import java.util.List;
@@ -26,9 +25,13 @@ import java.util.List;
 /**
  * MySQL delete rows binlog event.
  */
-@RequiredArgsConstructor
 @Getter
 public final class MySQLDeleteRowsBinlogEvent extends MySQLBaseRowsBinlogEvent 
{
     
     private final List<Serializable[]> beforeRows;
+    
+    public MySQLDeleteRowsBinlogEvent(final String databaseName, final String 
tableName, final List<Serializable[]> beforeRows) {
+        super(databaseName, tableName);
+        this.beforeRows = beforeRows;
+    }
 }
diff --git 
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/rows/MySQLUpdateRowsBinlogEvent.java
 
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/rows/MySQLUpdateRowsBinlogEvent.java
index e8b99362368..8c3379ea786 100644
--- 
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/rows/MySQLUpdateRowsBinlogEvent.java
+++ 
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/rows/MySQLUpdateRowsBinlogEvent.java
@@ -18,7 +18,6 @@
 package 
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.rows;
 
 import lombok.Getter;
-import lombok.RequiredArgsConstructor;
 
 import java.io.Serializable;
 import java.util.List;
@@ -26,11 +25,16 @@ import java.util.List;
 /**
  * MySQL update rows binlog event.
  */
-@RequiredArgsConstructor
 @Getter
 public final class MySQLUpdateRowsBinlogEvent extends MySQLBaseRowsBinlogEvent 
{
     
     private final List<Serializable[]> beforeRows;
     
     private final List<Serializable[]> afterRows;
+    
+    public MySQLUpdateRowsBinlogEvent(final String databaseName, final String 
tableName, final List<Serializable[]> beforeRows, final List<Serializable[]> 
afterRows) {
+        super(databaseName, tableName);
+        this.beforeRows = beforeRows;
+        this.afterRows = afterRows;
+    }
 }
diff --git 
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/rows/MySQLWriteRowsBinlogEvent.java
 
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/rows/MySQLWriteRowsBinlogEvent.java
index 912d54e14d1..db1029bfac0 100644
--- 
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/rows/MySQLWriteRowsBinlogEvent.java
+++ 
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/rows/MySQLWriteRowsBinlogEvent.java
@@ -18,7 +18,6 @@
 package 
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.rows;
 
 import lombok.Getter;
-import lombok.RequiredArgsConstructor;
 
 import java.io.Serializable;
 import java.util.List;
@@ -26,9 +25,13 @@ import java.util.List;
 /**
  * MySQL write rows binlog event.
  */
-@RequiredArgsConstructor
 @Getter
 public final class MySQLWriteRowsBinlogEvent extends MySQLBaseRowsBinlogEvent {
     
     private final List<Serializable[]> afterRows;
+    
+    public MySQLWriteRowsBinlogEvent(final String databaseName, final String 
tableName, final List<Serializable[]> afterRows) {
+        super(databaseName, tableName);
+        this.afterRows = afterRows;
+    }
 }
diff --git 
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/client/netty/MySQLBinlogEventPacketDecoder.java
 
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/client/netty/MySQLBinlogEventPacketDecoder.java
index 223c7ed08ea..25f7919687c 100644
--- 
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/client/netty/MySQLBinlogEventPacketDecoder.java
+++ 
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/client/netty/MySQLBinlogEventPacketDecoder.java
@@ -199,31 +199,32 @@ public final class MySQLBinlogEventPacketDecoder extends 
ByteToMessageDecoder {
     
     private MySQLWriteRowsBinlogEvent decodeWriteRowsEventV2(final 
MySQLBinlogEventHeader binlogEventHeader, final MySQLPacketPayload payload) {
         MySQLBinlogRowsEventPacket packet = new 
MySQLBinlogRowsEventPacket(binlogEventHeader, payload);
-        packet.readRows(binlogContext.getTableMapEvent(packet.getTableId()), 
payload);
-        MySQLWriteRowsBinlogEvent result = new 
MySQLWriteRowsBinlogEvent(packet.getRows());
-        initRowsEvent(result, binlogEventHeader, packet.getTableId());
+        MySQLBinlogTableMapEventPacket tableMapEventPacket = 
binlogContext.getTableMapEvent(packet.getTableId());
+        packet.readRows(tableMapEventPacket, payload);
+        MySQLWriteRowsBinlogEvent result = new 
MySQLWriteRowsBinlogEvent(tableMapEventPacket.getSchemaName(), 
tableMapEventPacket.getTableName(), packet.getRows());
+        initRowsEvent(result, binlogEventHeader);
         return result;
     }
     
     private MySQLUpdateRowsBinlogEvent decodeUpdateRowsEventV2(final 
MySQLBinlogEventHeader binlogEventHeader, final MySQLPacketPayload payload) {
         MySQLBinlogRowsEventPacket packet = new 
MySQLBinlogRowsEventPacket(binlogEventHeader, payload);
-        packet.readRows(binlogContext.getTableMapEvent(packet.getTableId()), 
payload);
-        MySQLUpdateRowsBinlogEvent result = new 
MySQLUpdateRowsBinlogEvent(packet.getRows(), packet.getRows2());
-        initRowsEvent(result, binlogEventHeader, packet.getTableId());
+        MySQLBinlogTableMapEventPacket tableMapEventPacket = 
binlogContext.getTableMapEvent(packet.getTableId());
+        packet.readRows(tableMapEventPacket, payload);
+        MySQLUpdateRowsBinlogEvent result = new 
MySQLUpdateRowsBinlogEvent(tableMapEventPacket.getSchemaName(), 
tableMapEventPacket.getTableName(), packet.getRows(), packet.getRows2());
+        initRowsEvent(result, binlogEventHeader);
         return result;
     }
     
     private MySQLDeleteRowsBinlogEvent decodeDeleteRowsEventV2(final 
MySQLBinlogEventHeader binlogEventHeader, final MySQLPacketPayload payload) {
         MySQLBinlogRowsEventPacket packet = new 
MySQLBinlogRowsEventPacket(binlogEventHeader, payload);
-        packet.readRows(binlogContext.getTableMapEvent(packet.getTableId()), 
payload);
-        MySQLDeleteRowsBinlogEvent result = new 
MySQLDeleteRowsBinlogEvent(packet.getRows());
-        initRowsEvent(result, binlogEventHeader, packet.getTableId());
+        MySQLBinlogTableMapEventPacket tableMapEventPacket = 
binlogContext.getTableMapEvent(packet.getTableId());
+        packet.readRows(tableMapEventPacket, payload);
+        MySQLDeleteRowsBinlogEvent result = new 
MySQLDeleteRowsBinlogEvent(tableMapEventPacket.getSchemaName(), 
tableMapEventPacket.getTableName(), packet.getRows());
+        initRowsEvent(result, binlogEventHeader);
         return result;
     }
     
-    private void initRowsEvent(final MySQLBaseRowsBinlogEvent rowsEvent, final 
MySQLBinlogEventHeader binlogEventHeader, final long tableId) {
-        
rowsEvent.setDatabaseName(binlogContext.getTableMapEvent(tableId).getSchemaName());
-        
rowsEvent.setTableName(binlogContext.getTableMapEvent(tableId).getTableName());
+    private void initRowsEvent(final MySQLBaseRowsBinlogEvent rowsEvent, final 
MySQLBinlogEventHeader binlogEventHeader) {
         rowsEvent.setFileName(binlogContext.getFileName());
         rowsEvent.setPosition(binlogEventHeader.getLogPos());
         rowsEvent.setTimestamp(binlogEventHeader.getTimestamp());
diff --git 
a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/dumper/MySQLIncrementalDumperTest.java
 
b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/dumper/MySQLIncrementalDumperTest.java
index cae3e3bb80c..e7926fe5499 100644
--- 
a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/dumper/MySQLIncrementalDumperTest.java
+++ 
b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/dumper/MySQLIncrementalDumperTest.java
@@ -134,20 +134,13 @@ class MySQLIncrementalDumperTest {
     
     @Test
     void assertWriteRowsEvent() throws ReflectiveOperationException {
-        List<Record> actual = 
getRecordsByWriteRowsEvent(createWriteRowsEvent());
+        List<Record> actual = getRecordsByWriteRowsEvent(new 
MySQLWriteRowsBinlogEvent("", "t_order", Collections.singletonList(new 
Serializable[]{101, 1, "OK"})));
         assertThat(actual.size(), is(1));
         assertThat(actual.get(0), instanceOf(DataRecord.class));
         assertThat(((DataRecord) actual.get(0)).getType(), 
is(PipelineSQLOperationType.INSERT));
         assertThat(((DataRecord) actual.get(0)).getColumnCount(), is(3));
     }
     
-    private MySQLWriteRowsBinlogEvent createWriteRowsEvent() {
-        MySQLWriteRowsBinlogEvent result = new 
MySQLWriteRowsBinlogEvent(Collections.singletonList(new Serializable[]{101, 1, 
"OK"}));
-        result.setDatabaseName("");
-        result.setTableName("t_order");
-        return result;
-    }
-    
     private List<Record> getRecordsByWriteRowsEvent(final 
MySQLWriteRowsBinlogEvent rowsEvent) throws ReflectiveOperationException {
         Method method = 
MySQLIncrementalDumper.class.getDeclaredMethod("handleWriteRowsEvent", 
MySQLWriteRowsBinlogEvent.class, PipelineTableMetaData.class);
         return (List<Record>) Plugins.getMemberAccessor().invoke(method, 
incrementalDumper, rowsEvent, pipelineTableMetaData);
@@ -155,20 +148,14 @@ class MySQLIncrementalDumperTest {
     
     @Test
     void assertUpdateRowsEvent() throws ReflectiveOperationException {
-        List<Record> actual = 
getRecordsByUpdateRowsEvent(createUpdateRowsEvent());
+        List<Record> actual = getRecordsByUpdateRowsEvent(
+                new MySQLUpdateRowsBinlogEvent("test", "t_order", 
Collections.singletonList(new Serializable[]{101, 1, "OK"}), 
Collections.singletonList(new Serializable[]{101, 1, "OK2"})));
         assertThat(actual.size(), is(1));
         assertThat(actual.get(0), instanceOf(DataRecord.class));
         assertThat(((DataRecord) actual.get(0)).getType(), 
is(PipelineSQLOperationType.UPDATE));
         assertThat(((DataRecord) actual.get(0)).getColumnCount(), is(3));
     }
     
-    private MySQLUpdateRowsBinlogEvent createUpdateRowsEvent() {
-        MySQLUpdateRowsBinlogEvent result = new 
MySQLUpdateRowsBinlogEvent(Collections.singletonList(new Serializable[]{101, 1, 
"OK"}), Collections.singletonList(new Serializable[]{101, 1, "OK2"}));
-        result.setDatabaseName("test");
-        result.setTableName("t_order");
-        return result;
-    }
-    
     private List<Record> getRecordsByUpdateRowsEvent(final 
MySQLUpdateRowsBinlogEvent rowsEvent) throws ReflectiveOperationException {
         Method method = 
MySQLIncrementalDumper.class.getDeclaredMethod("handleUpdateRowsEvent", 
MySQLUpdateRowsBinlogEvent.class, PipelineTableMetaData.class);
         return (List<Record>) Plugins.getMemberAccessor().invoke(method, 
incrementalDumper, rowsEvent, pipelineTableMetaData);
@@ -176,20 +163,13 @@ class MySQLIncrementalDumperTest {
     
     @Test
     void assertDeleteRowsEvent() throws ReflectiveOperationException {
-        List<Record> actual = 
getRecordsByDeleteRowsEvent(createDeleteRowsEvent());
+        List<Record> actual = getRecordsByDeleteRowsEvent(new 
MySQLDeleteRowsBinlogEvent("", "t_order", Collections.singletonList(new 
Serializable[]{101, 1, "OK"})));
         assertThat(actual.size(), is(1));
         assertThat(actual.get(0), instanceOf(DataRecord.class));
         assertThat(((DataRecord) actual.get(0)).getType(), 
is(PipelineSQLOperationType.DELETE));
         assertThat(((DataRecord) actual.get(0)).getColumnCount(), is(3));
     }
     
-    private MySQLDeleteRowsBinlogEvent createDeleteRowsEvent() {
-        MySQLDeleteRowsBinlogEvent result = new 
MySQLDeleteRowsBinlogEvent(Collections.singletonList(new Serializable[]{101, 1, 
"OK"}));
-        result.setDatabaseName("");
-        result.setTableName("t_order");
-        return result;
-    }
-    
     private List<Record> getRecordsByDeleteRowsEvent(final 
MySQLDeleteRowsBinlogEvent rowsEvent) throws ReflectiveOperationException {
         Method method = 
MySQLIncrementalDumper.class.getDeclaredMethod("handleDeleteRowsEvent", 
MySQLDeleteRowsBinlogEvent.class, PipelineTableMetaData.class);
         return (List<Record>) Plugins.getMemberAccessor().invoke(method, 
incrementalDumper, rowsEvent, pipelineTableMetaData);
@@ -205,15 +185,8 @@ class MySQLIncrementalDumperTest {
     @Test
     void assertRowsEventFiltered() throws ReflectiveOperationException {
         List<Record> actual = (List<Record>) 
Plugins.getMemberAccessor().invoke(MySQLIncrementalDumper.class.getDeclaredMethod("handleEvent",
 MySQLBaseBinlogEvent.class),
-                incrementalDumper, getFilteredWriteRowsEvent());
+                incrementalDumper, new MySQLWriteRowsBinlogEvent("test", 
"t_order", Collections.singletonList(new Serializable[]{1})));
         assertThat(actual.size(), is(1));
         assertThat(actual.get(0), instanceOf(DataRecord.class));
     }
-    
-    private MySQLWriteRowsBinlogEvent getFilteredWriteRowsEvent() {
-        MySQLWriteRowsBinlogEvent result = new 
MySQLWriteRowsBinlogEvent(Collections.singletonList(new Serializable[]{1}));
-        result.setDatabaseName("test");
-        result.setTableName("t_order");
-        return result;
-    }
 }

Reply via email to