This is an automated email from the ASF dual-hosted git repository.
zhaojinchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new addeb1d1a01 Refactor MySQLBaseRowsBinlogEvent (#32566)
addeb1d1a01 is described below
commit addeb1d1a0107ab1e108ab03fe89bd5acdc8385c
Author: Liang Zhang <[email protected]>
AuthorDate: Sat Aug 17 21:07:00 2024 +0800
Refactor MySQLBaseRowsBinlogEvent (#32566)
* Refactor MySQLBaseRowsBinlogEvent
* Refactor MySQLBaseRowsBinlogEvent
---
.../event/rows/MySQLBaseRowsBinlogEvent.java | 8 ++---
.../event/rows/MySQLDeleteRowsBinlogEvent.java | 7 ++--
.../event/rows/MySQLUpdateRowsBinlogEvent.java | 8 +++--
.../event/rows/MySQLWriteRowsBinlogEvent.java | 7 ++--
.../netty/MySQLBinlogEventPacketDecoder.java | 25 ++++++++-------
.../dumper/MySQLIncrementalDumperTest.java | 37 +++-------------------
6 files changed, 38 insertions(+), 54 deletions(-)
diff --git
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/rows/MySQLBaseRowsBinlogEvent.java
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/rows/MySQLBaseRowsBinlogEvent.java
index 8bb91ffd833..0c9d39784be 100644
---
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/rows/MySQLBaseRowsBinlogEvent.java
+++
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/rows/MySQLBaseRowsBinlogEvent.java
@@ -18,17 +18,17 @@
package
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.rows;
import lombok.Getter;
-import lombok.Setter;
+import lombok.RequiredArgsConstructor;
import
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.MySQLBaseBinlogEvent;
/**
* MySQL rows base event.
*/
+@RequiredArgsConstructor
@Getter
-@Setter
public abstract class MySQLBaseRowsBinlogEvent extends MySQLBaseBinlogEvent {
- private String databaseName;
+ private final String databaseName;
- private String tableName;
+ private final String tableName;
}
diff --git
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/rows/MySQLDeleteRowsBinlogEvent.java
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/rows/MySQLDeleteRowsBinlogEvent.java
index e403d6d643c..50e3d14d6b4 100644
---
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/rows/MySQLDeleteRowsBinlogEvent.java
+++
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/rows/MySQLDeleteRowsBinlogEvent.java
@@ -18,7 +18,6 @@
package
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.rows;
import lombok.Getter;
-import lombok.RequiredArgsConstructor;
import java.io.Serializable;
import java.util.List;
@@ -26,9 +25,13 @@ import java.util.List;
/**
* MySQL delete rows binlog event.
*/
-@RequiredArgsConstructor
@Getter
public final class MySQLDeleteRowsBinlogEvent extends MySQLBaseRowsBinlogEvent
{
private final List<Serializable[]> beforeRows;
+
+ public MySQLDeleteRowsBinlogEvent(final String databaseName, final String
tableName, final List<Serializable[]> beforeRows) {
+ super(databaseName, tableName);
+ this.beforeRows = beforeRows;
+ }
}
diff --git
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/rows/MySQLUpdateRowsBinlogEvent.java
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/rows/MySQLUpdateRowsBinlogEvent.java
index e8b99362368..8c3379ea786 100644
---
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/rows/MySQLUpdateRowsBinlogEvent.java
+++
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/rows/MySQLUpdateRowsBinlogEvent.java
@@ -18,7 +18,6 @@
package
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.rows;
import lombok.Getter;
-import lombok.RequiredArgsConstructor;
import java.io.Serializable;
import java.util.List;
@@ -26,11 +25,16 @@ import java.util.List;
/**
* MySQL update rows binlog event.
*/
-@RequiredArgsConstructor
@Getter
public final class MySQLUpdateRowsBinlogEvent extends MySQLBaseRowsBinlogEvent
{
private final List<Serializable[]> beforeRows;
private final List<Serializable[]> afterRows;
+
+ public MySQLUpdateRowsBinlogEvent(final String databaseName, final String
tableName, final List<Serializable[]> beforeRows, final List<Serializable[]>
afterRows) {
+ super(databaseName, tableName);
+ this.beforeRows = beforeRows;
+ this.afterRows = afterRows;
+ }
}
diff --git
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/rows/MySQLWriteRowsBinlogEvent.java
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/rows/MySQLWriteRowsBinlogEvent.java
index 912d54e14d1..db1029bfac0 100644
---
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/rows/MySQLWriteRowsBinlogEvent.java
+++
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/rows/MySQLWriteRowsBinlogEvent.java
@@ -18,7 +18,6 @@
package
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.rows;
import lombok.Getter;
-import lombok.RequiredArgsConstructor;
import java.io.Serializable;
import java.util.List;
@@ -26,9 +25,13 @@ import java.util.List;
/**
* MySQL write rows binlog event.
*/
-@RequiredArgsConstructor
@Getter
public final class MySQLWriteRowsBinlogEvent extends MySQLBaseRowsBinlogEvent {
private final List<Serializable[]> afterRows;
+
+ public MySQLWriteRowsBinlogEvent(final String databaseName, final String
tableName, final List<Serializable[]> afterRows) {
+ super(databaseName, tableName);
+ this.afterRows = afterRows;
+ }
}
diff --git
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/client/netty/MySQLBinlogEventPacketDecoder.java
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/client/netty/MySQLBinlogEventPacketDecoder.java
index 223c7ed08ea..25f7919687c 100644
---
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/client/netty/MySQLBinlogEventPacketDecoder.java
+++
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/client/netty/MySQLBinlogEventPacketDecoder.java
@@ -199,31 +199,32 @@ public final class MySQLBinlogEventPacketDecoder extends
ByteToMessageDecoder {
private MySQLWriteRowsBinlogEvent decodeWriteRowsEventV2(final
MySQLBinlogEventHeader binlogEventHeader, final MySQLPacketPayload payload) {
MySQLBinlogRowsEventPacket packet = new
MySQLBinlogRowsEventPacket(binlogEventHeader, payload);
- packet.readRows(binlogContext.getTableMapEvent(packet.getTableId()),
payload);
- MySQLWriteRowsBinlogEvent result = new
MySQLWriteRowsBinlogEvent(packet.getRows());
- initRowsEvent(result, binlogEventHeader, packet.getTableId());
+ MySQLBinlogTableMapEventPacket tableMapEventPacket =
binlogContext.getTableMapEvent(packet.getTableId());
+ packet.readRows(tableMapEventPacket, payload);
+ MySQLWriteRowsBinlogEvent result = new
MySQLWriteRowsBinlogEvent(tableMapEventPacket.getSchemaName(),
tableMapEventPacket.getTableName(), packet.getRows());
+ initRowsEvent(result, binlogEventHeader);
return result;
}
private MySQLUpdateRowsBinlogEvent decodeUpdateRowsEventV2(final
MySQLBinlogEventHeader binlogEventHeader, final MySQLPacketPayload payload) {
MySQLBinlogRowsEventPacket packet = new
MySQLBinlogRowsEventPacket(binlogEventHeader, payload);
- packet.readRows(binlogContext.getTableMapEvent(packet.getTableId()),
payload);
- MySQLUpdateRowsBinlogEvent result = new
MySQLUpdateRowsBinlogEvent(packet.getRows(), packet.getRows2());
- initRowsEvent(result, binlogEventHeader, packet.getTableId());
+ MySQLBinlogTableMapEventPacket tableMapEventPacket =
binlogContext.getTableMapEvent(packet.getTableId());
+ packet.readRows(tableMapEventPacket, payload);
+ MySQLUpdateRowsBinlogEvent result = new
MySQLUpdateRowsBinlogEvent(tableMapEventPacket.getSchemaName(),
tableMapEventPacket.getTableName(), packet.getRows(), packet.getRows2());
+ initRowsEvent(result, binlogEventHeader);
return result;
}
private MySQLDeleteRowsBinlogEvent decodeDeleteRowsEventV2(final
MySQLBinlogEventHeader binlogEventHeader, final MySQLPacketPayload payload) {
MySQLBinlogRowsEventPacket packet = new
MySQLBinlogRowsEventPacket(binlogEventHeader, payload);
- packet.readRows(binlogContext.getTableMapEvent(packet.getTableId()),
payload);
- MySQLDeleteRowsBinlogEvent result = new
MySQLDeleteRowsBinlogEvent(packet.getRows());
- initRowsEvent(result, binlogEventHeader, packet.getTableId());
+ MySQLBinlogTableMapEventPacket tableMapEventPacket =
binlogContext.getTableMapEvent(packet.getTableId());
+ packet.readRows(tableMapEventPacket, payload);
+ MySQLDeleteRowsBinlogEvent result = new
MySQLDeleteRowsBinlogEvent(tableMapEventPacket.getSchemaName(),
tableMapEventPacket.getTableName(), packet.getRows());
+ initRowsEvent(result, binlogEventHeader);
return result;
}
- private void initRowsEvent(final MySQLBaseRowsBinlogEvent rowsEvent, final
MySQLBinlogEventHeader binlogEventHeader, final long tableId) {
-
rowsEvent.setDatabaseName(binlogContext.getTableMapEvent(tableId).getSchemaName());
-
rowsEvent.setTableName(binlogContext.getTableMapEvent(tableId).getTableName());
+ private void initRowsEvent(final MySQLBaseRowsBinlogEvent rowsEvent, final
MySQLBinlogEventHeader binlogEventHeader) {
rowsEvent.setFileName(binlogContext.getFileName());
rowsEvent.setPosition(binlogEventHeader.getLogPos());
rowsEvent.setTimestamp(binlogEventHeader.getTimestamp());
diff --git
a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/dumper/MySQLIncrementalDumperTest.java
b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/dumper/MySQLIncrementalDumperTest.java
index cae3e3bb80c..e7926fe5499 100644
---
a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/dumper/MySQLIncrementalDumperTest.java
+++
b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/dumper/MySQLIncrementalDumperTest.java
@@ -134,20 +134,13 @@ class MySQLIncrementalDumperTest {
@Test
void assertWriteRowsEvent() throws ReflectiveOperationException {
- List<Record> actual =
getRecordsByWriteRowsEvent(createWriteRowsEvent());
+ List<Record> actual = getRecordsByWriteRowsEvent(new
MySQLWriteRowsBinlogEvent("", "t_order", Collections.singletonList(new
Serializable[]{101, 1, "OK"})));
assertThat(actual.size(), is(1));
assertThat(actual.get(0), instanceOf(DataRecord.class));
assertThat(((DataRecord) actual.get(0)).getType(),
is(PipelineSQLOperationType.INSERT));
assertThat(((DataRecord) actual.get(0)).getColumnCount(), is(3));
}
- private MySQLWriteRowsBinlogEvent createWriteRowsEvent() {
- MySQLWriteRowsBinlogEvent result = new
MySQLWriteRowsBinlogEvent(Collections.singletonList(new Serializable[]{101, 1,
"OK"}));
- result.setDatabaseName("");
- result.setTableName("t_order");
- return result;
- }
-
private List<Record> getRecordsByWriteRowsEvent(final
MySQLWriteRowsBinlogEvent rowsEvent) throws ReflectiveOperationException {
Method method =
MySQLIncrementalDumper.class.getDeclaredMethod("handleWriteRowsEvent",
MySQLWriteRowsBinlogEvent.class, PipelineTableMetaData.class);
return (List<Record>) Plugins.getMemberAccessor().invoke(method,
incrementalDumper, rowsEvent, pipelineTableMetaData);
@@ -155,20 +148,14 @@ class MySQLIncrementalDumperTest {
@Test
void assertUpdateRowsEvent() throws ReflectiveOperationException {
- List<Record> actual =
getRecordsByUpdateRowsEvent(createUpdateRowsEvent());
+ List<Record> actual = getRecordsByUpdateRowsEvent(
+ new MySQLUpdateRowsBinlogEvent("test", "t_order",
Collections.singletonList(new Serializable[]{101, 1, "OK"}),
Collections.singletonList(new Serializable[]{101, 1, "OK2"})));
assertThat(actual.size(), is(1));
assertThat(actual.get(0), instanceOf(DataRecord.class));
assertThat(((DataRecord) actual.get(0)).getType(),
is(PipelineSQLOperationType.UPDATE));
assertThat(((DataRecord) actual.get(0)).getColumnCount(), is(3));
}
- private MySQLUpdateRowsBinlogEvent createUpdateRowsEvent() {
- MySQLUpdateRowsBinlogEvent result = new
MySQLUpdateRowsBinlogEvent(Collections.singletonList(new Serializable[]{101, 1,
"OK"}), Collections.singletonList(new Serializable[]{101, 1, "OK2"}));
- result.setDatabaseName("test");
- result.setTableName("t_order");
- return result;
- }
-
private List<Record> getRecordsByUpdateRowsEvent(final
MySQLUpdateRowsBinlogEvent rowsEvent) throws ReflectiveOperationException {
Method method =
MySQLIncrementalDumper.class.getDeclaredMethod("handleUpdateRowsEvent",
MySQLUpdateRowsBinlogEvent.class, PipelineTableMetaData.class);
return (List<Record>) Plugins.getMemberAccessor().invoke(method,
incrementalDumper, rowsEvent, pipelineTableMetaData);
@@ -176,20 +163,13 @@ class MySQLIncrementalDumperTest {
@Test
void assertDeleteRowsEvent() throws ReflectiveOperationException {
- List<Record> actual =
getRecordsByDeleteRowsEvent(createDeleteRowsEvent());
+ List<Record> actual = getRecordsByDeleteRowsEvent(new
MySQLDeleteRowsBinlogEvent("", "t_order", Collections.singletonList(new
Serializable[]{101, 1, "OK"})));
assertThat(actual.size(), is(1));
assertThat(actual.get(0), instanceOf(DataRecord.class));
assertThat(((DataRecord) actual.get(0)).getType(),
is(PipelineSQLOperationType.DELETE));
assertThat(((DataRecord) actual.get(0)).getColumnCount(), is(3));
}
- private MySQLDeleteRowsBinlogEvent createDeleteRowsEvent() {
- MySQLDeleteRowsBinlogEvent result = new
MySQLDeleteRowsBinlogEvent(Collections.singletonList(new Serializable[]{101, 1,
"OK"}));
- result.setDatabaseName("");
- result.setTableName("t_order");
- return result;
- }
-
private List<Record> getRecordsByDeleteRowsEvent(final
MySQLDeleteRowsBinlogEvent rowsEvent) throws ReflectiveOperationException {
Method method =
MySQLIncrementalDumper.class.getDeclaredMethod("handleDeleteRowsEvent",
MySQLDeleteRowsBinlogEvent.class, PipelineTableMetaData.class);
return (List<Record>) Plugins.getMemberAccessor().invoke(method,
incrementalDumper, rowsEvent, pipelineTableMetaData);
@@ -205,15 +185,8 @@ class MySQLIncrementalDumperTest {
@Test
void assertRowsEventFiltered() throws ReflectiveOperationException {
List<Record> actual = (List<Record>)
Plugins.getMemberAccessor().invoke(MySQLIncrementalDumper.class.getDeclaredMethod("handleEvent",
MySQLBaseBinlogEvent.class),
- incrementalDumper, getFilteredWriteRowsEvent());
+ incrementalDumper, new MySQLWriteRowsBinlogEvent("test",
"t_order", Collections.singletonList(new Serializable[]{1})));
assertThat(actual.size(), is(1));
assertThat(actual.get(0), instanceOf(DataRecord.class));
}
-
- private MySQLWriteRowsBinlogEvent getFilteredWriteRowsEvent() {
- MySQLWriteRowsBinlogEvent result = new
MySQLWriteRowsBinlogEvent(Collections.singletonList(new Serializable[]{1}));
- result.setDatabaseName("test");
- result.setTableName("t_order");
- return result;
- }
}