This is an automated email from the ASF dual-hosted git repository.
zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new cf9e6a9a5bb Refactor MySQLBaseBinlogEvent (#32567)
cf9e6a9a5bb is described below
commit cf9e6a9a5bb5cdb0dfa998d3e0cbde5f0a5d1c68
Author: Liang Zhang <[email protected]>
AuthorDate: Sat Aug 17 21:39:24 2024 +0800
Refactor MySQLBaseBinlogEvent (#32567)
* Refactor MySQLBaseBinlogEvent
* Refactor MySQLBaseBinlogEvent
---
.../binlog/event/MySQLBaseBinlogEvent.java | 10 ++---
.../binlog/event/PlaceholderBinlogEvent.java | 4 ++
.../binlog/event/query/MySQLQueryBinlogEvent.java | 12 +++++-
.../event/rows/MySQLBaseRowsBinlogEvent.java | 8 +++-
.../event/rows/MySQLDeleteRowsBinlogEvent.java | 4 +-
.../event/rows/MySQLUpdateRowsBinlogEvent.java | 5 ++-
.../event/rows/MySQLWriteRowsBinlogEvent.java | 4 +-
.../event/transaction/MySQLXidBinlogEvent.java | 7 +++-
.../incremental/client/MySQLBinlogClient.java | 9 +----
.../netty/MySQLBinlogEventPacketDecoder.java | 47 +++++-----------------
.../dumper/MySQLIncrementalDumperTest.java | 12 +++---
11 files changed, 54 insertions(+), 68 deletions(-)
diff --git
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/MySQLBaseBinlogEvent.java
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/MySQLBaseBinlogEvent.java
index 63873bec2b7..3d3b1d16880 100644
---
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/MySQLBaseBinlogEvent.java
+++
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/MySQLBaseBinlogEvent.java
@@ -18,18 +18,18 @@
package
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event;
import lombok.Getter;
-import lombok.Setter;
+import lombok.RequiredArgsConstructor;
/**
* MySQL base binlog event.
*/
+@RequiredArgsConstructor
@Getter
-@Setter
public abstract class MySQLBaseBinlogEvent {
- private String fileName;
+ private final String fileName;
- private long position;
+ private final long position;
- private long timestamp;
+ private final long timestamp;
}
diff --git
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/PlaceholderBinlogEvent.java
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/PlaceholderBinlogEvent.java
index ecc57d276ca..db007c6eb3f 100644
---
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/PlaceholderBinlogEvent.java
+++
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/PlaceholderBinlogEvent.java
@@ -21,4 +21,8 @@ package
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.
* Placeholder binlog event, unsupported binlog event will replace it into
this class.
*/
public final class PlaceholderBinlogEvent extends MySQLBaseBinlogEvent {
+
+ public PlaceholderBinlogEvent(final String fileName, final long position,
final long timestamp) {
+ super(fileName, position, timestamp);
+ }
}
diff --git
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/query/MySQLQueryBinlogEvent.java
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/query/MySQLQueryBinlogEvent.java
index a521588fc6b..8b0332a13c0 100644
---
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/query/MySQLQueryBinlogEvent.java
+++
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/query/MySQLQueryBinlogEvent.java
@@ -18,7 +18,6 @@
package
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.query;
import lombok.Getter;
-import lombok.RequiredArgsConstructor;
import
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.MySQLBaseBinlogEvent;
/**
@@ -29,7 +28,6 @@ import
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.e
*
* @see <a
href="https://dev.mysql.com/doc/dev/mysql-server/latest/page_protocol_replication_binlog_event.html#sect_protocol_replication_event_query">QUERY_EVENT</a>
*/
-@RequiredArgsConstructor
@Getter
public final class MySQLQueryBinlogEvent extends MySQLBaseBinlogEvent {
@@ -42,4 +40,14 @@ public final class MySQLQueryBinlogEvent extends
MySQLBaseBinlogEvent {
private final String databaseName;
private final String sql;
+
+ public MySQLQueryBinlogEvent(final String fileName, final long position,
final long timestamp,
+ final long threadId, final long
executionTime, final int errorCode, final String databaseName, final String
sql) {
+ super(fileName, position, timestamp);
+ this.threadId = threadId;
+ this.executionTime = executionTime;
+ this.errorCode = errorCode;
+ this.databaseName = databaseName;
+ this.sql = sql;
+ }
}
diff --git
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/rows/MySQLBaseRowsBinlogEvent.java
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/rows/MySQLBaseRowsBinlogEvent.java
index 0c9d39784be..dfa8fda0168 100644
---
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/rows/MySQLBaseRowsBinlogEvent.java
+++
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/rows/MySQLBaseRowsBinlogEvent.java
@@ -18,17 +18,21 @@
package
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.rows;
import lombok.Getter;
-import lombok.RequiredArgsConstructor;
import
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.MySQLBaseBinlogEvent;
/**
* MySQL rows base event.
*/
-@RequiredArgsConstructor
@Getter
public abstract class MySQLBaseRowsBinlogEvent extends MySQLBaseBinlogEvent {
private final String databaseName;
private final String tableName;
+
+ public MySQLBaseRowsBinlogEvent(final String fileName, final long
position, final long timestamp, final String databaseName, final String
tableName) {
+ super(fileName, position, timestamp);
+ this.databaseName = databaseName;
+ this.tableName = tableName;
+ }
}
diff --git
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/rows/MySQLDeleteRowsBinlogEvent.java
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/rows/MySQLDeleteRowsBinlogEvent.java
index 50e3d14d6b4..e08ce8589e8 100644
---
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/rows/MySQLDeleteRowsBinlogEvent.java
+++
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/rows/MySQLDeleteRowsBinlogEvent.java
@@ -30,8 +30,8 @@ public final class MySQLDeleteRowsBinlogEvent extends
MySQLBaseRowsBinlogEvent {
private final List<Serializable[]> beforeRows;
- public MySQLDeleteRowsBinlogEvent(final String databaseName, final String
tableName, final List<Serializable[]> beforeRows) {
- super(databaseName, tableName);
+ public MySQLDeleteRowsBinlogEvent(final String fileName, final long
position, final long timestamp, final String databaseName, final String
tableName, final List<Serializable[]> beforeRows) {
+ super(fileName, position, timestamp, databaseName, tableName);
this.beforeRows = beforeRows;
}
}
diff --git
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/rows/MySQLUpdateRowsBinlogEvent.java
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/rows/MySQLUpdateRowsBinlogEvent.java
index 8c3379ea786..e3eef2f0326 100644
---
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/rows/MySQLUpdateRowsBinlogEvent.java
+++
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/rows/MySQLUpdateRowsBinlogEvent.java
@@ -32,8 +32,9 @@ public final class MySQLUpdateRowsBinlogEvent extends
MySQLBaseRowsBinlogEvent {
private final List<Serializable[]> afterRows;
- public MySQLUpdateRowsBinlogEvent(final String databaseName, final String
tableName, final List<Serializable[]> beforeRows, final List<Serializable[]>
afterRows) {
- super(databaseName, tableName);
+ public MySQLUpdateRowsBinlogEvent(final String fileName, final long
position, final long timestamp,
+ final String databaseName, final String
tableName, final List<Serializable[]> beforeRows, final List<Serializable[]>
afterRows) {
+ super(fileName, position, timestamp, databaseName, tableName);
this.beforeRows = beforeRows;
this.afterRows = afterRows;
}
diff --git
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/rows/MySQLWriteRowsBinlogEvent.java
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/rows/MySQLWriteRowsBinlogEvent.java
index db1029bfac0..25e73d8e8fc 100644
---
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/rows/MySQLWriteRowsBinlogEvent.java
+++
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/rows/MySQLWriteRowsBinlogEvent.java
@@ -30,8 +30,8 @@ public final class MySQLWriteRowsBinlogEvent extends
MySQLBaseRowsBinlogEvent {
private final List<Serializable[]> afterRows;
- public MySQLWriteRowsBinlogEvent(final String databaseName, final String
tableName, final List<Serializable[]> afterRows) {
- super(databaseName, tableName);
+ public MySQLWriteRowsBinlogEvent(final String fileName, final long
position, final long timestamp, final String databaseName, final String
tableName, final List<Serializable[]> afterRows) {
+ super(fileName, position, timestamp, databaseName, tableName);
this.afterRows = afterRows;
}
}
diff --git
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/transaction/MySQLXidBinlogEvent.java
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/transaction/MySQLXidBinlogEvent.java
index b3c2e2af885..6601f3c2ec8 100644
---
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/transaction/MySQLXidBinlogEvent.java
+++
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/transaction/MySQLXidBinlogEvent.java
@@ -18,7 +18,6 @@
package
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.transaction;
import lombok.Getter;
-import lombok.RequiredArgsConstructor;
import
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.MySQLBaseBinlogEvent;
/**
@@ -26,9 +25,13 @@ import
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.e
*
* @see <a
href="https://dev.mysql.com/doc/dev/mysql-server/latest/page_protocol_replication_binlog_event.html#sect_protocol_replication_event_xid">XID_EVENT</a>
*/
-@RequiredArgsConstructor
@Getter
public final class MySQLXidBinlogEvent extends MySQLBaseBinlogEvent {
private final long xid;
+
+ public MySQLXidBinlogEvent(final String fileName, final long position,
final long timestamp, final long xid) {
+ super(fileName, position, timestamp);
+ this.xid = xid;
+ }
}
diff --git
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/client/MySQLBinlogClient.java
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/client/MySQLBinlogClient.java
index 7ba01844b64..4436f14e6e2 100644
---
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/client/MySQLBinlogClient.java
+++
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/client/MySQLBinlogClient.java
@@ -221,18 +221,11 @@ public final class MySQLBinlogClient {
channel.pipeline().remove(MySQLCommandResponseHandler.class);
String tableKey = String.join(":", connectInfo.getHost(),
String.valueOf(connectInfo.getPort()));
channel.pipeline().addLast(new
MySQLBinlogEventPacketDecoder(checksumLength,
GlobalTableMapEventMapping.getTableMapEventMap(tableKey), decodeWithTX));
- channel.pipeline().addLast(new
MySQLBinlogEventHandler(getLastBinlogEvent(binlogFileName, binlogPosition)));
+ channel.pipeline().addLast(new MySQLBinlogEventHandler(new
PlaceholderBinlogEvent(binlogFileName, binlogPosition, 0L)));
resetSequenceID();
channel.writeAndFlush(new MySQLComBinlogDumpCommandPacket((int)
binlogPosition, connectInfo.getServerId(), binlogFileName));
}
- private MySQLBaseBinlogEvent getLastBinlogEvent(final String
binlogFileName, final long binlogPosition) {
- PlaceholderBinlogEvent result = new PlaceholderBinlogEvent();
- result.setFileName(binlogFileName);
- result.setPosition(binlogPosition);
- return result;
- }
-
private void resetSequenceID() {
channel.attr(MySQLConstants.MYSQL_SEQUENCE_ID).get().set(0);
}
diff --git
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/client/netty/MySQLBinlogEventPacketDecoder.java
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/client/netty/MySQLBinlogEventPacketDecoder.java
index 25f7919687c..edc6021a28c 100644
---
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/client/netty/MySQLBinlogEventPacketDecoder.java
+++
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/client/netty/MySQLBinlogEventPacketDecoder.java
@@ -25,10 +25,9 @@ import lombok.extern.slf4j.Slf4j;
import
org.apache.shardingsphere.data.pipeline.core.exception.PipelineInternalException;
import
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.MySQLBinlogContext;
import
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.MySQLBaseBinlogEvent;
-import
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.rows.MySQLBaseRowsBinlogEvent;
-import
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.rows.MySQLDeleteRowsBinlogEvent;
import
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.PlaceholderBinlogEvent;
import
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.query.MySQLQueryBinlogEvent;
+import
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.rows.MySQLDeleteRowsBinlogEvent;
import
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.rows.MySQLUpdateRowsBinlogEvent;
import
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.rows.MySQLWriteRowsBinlogEvent;
import
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.transaction.MySQLXidBinlogEvent;
@@ -201,37 +200,28 @@ public final class MySQLBinlogEventPacketDecoder extends
ByteToMessageDecoder {
MySQLBinlogRowsEventPacket packet = new
MySQLBinlogRowsEventPacket(binlogEventHeader, payload);
MySQLBinlogTableMapEventPacket tableMapEventPacket =
binlogContext.getTableMapEvent(packet.getTableId());
packet.readRows(tableMapEventPacket, payload);
- MySQLWriteRowsBinlogEvent result = new
MySQLWriteRowsBinlogEvent(tableMapEventPacket.getSchemaName(),
tableMapEventPacket.getTableName(), packet.getRows());
- initRowsEvent(result, binlogEventHeader);
- return result;
+ return new MySQLWriteRowsBinlogEvent(binlogContext.getFileName(),
+ binlogEventHeader.getLogPos(),
binlogEventHeader.getTimestamp(), tableMapEventPacket.getSchemaName(),
tableMapEventPacket.getTableName(), packet.getRows());
}
private MySQLUpdateRowsBinlogEvent decodeUpdateRowsEventV2(final
MySQLBinlogEventHeader binlogEventHeader, final MySQLPacketPayload payload) {
MySQLBinlogRowsEventPacket packet = new
MySQLBinlogRowsEventPacket(binlogEventHeader, payload);
MySQLBinlogTableMapEventPacket tableMapEventPacket =
binlogContext.getTableMapEvent(packet.getTableId());
packet.readRows(tableMapEventPacket, payload);
- MySQLUpdateRowsBinlogEvent result = new
MySQLUpdateRowsBinlogEvent(tableMapEventPacket.getSchemaName(),
tableMapEventPacket.getTableName(), packet.getRows(), packet.getRows2());
- initRowsEvent(result, binlogEventHeader);
- return result;
+ return new MySQLUpdateRowsBinlogEvent(binlogContext.getFileName(),
+ binlogEventHeader.getLogPos(),
binlogEventHeader.getTimestamp(), tableMapEventPacket.getSchemaName(),
tableMapEventPacket.getTableName(), packet.getRows(), packet.getRows2());
}
private MySQLDeleteRowsBinlogEvent decodeDeleteRowsEventV2(final
MySQLBinlogEventHeader binlogEventHeader, final MySQLPacketPayload payload) {
MySQLBinlogRowsEventPacket packet = new
MySQLBinlogRowsEventPacket(binlogEventHeader, payload);
MySQLBinlogTableMapEventPacket tableMapEventPacket =
binlogContext.getTableMapEvent(packet.getTableId());
packet.readRows(tableMapEventPacket, payload);
- MySQLDeleteRowsBinlogEvent result = new
MySQLDeleteRowsBinlogEvent(tableMapEventPacket.getSchemaName(),
tableMapEventPacket.getTableName(), packet.getRows());
- initRowsEvent(result, binlogEventHeader);
- return result;
- }
-
- private void initRowsEvent(final MySQLBaseRowsBinlogEvent rowsEvent, final
MySQLBinlogEventHeader binlogEventHeader) {
- rowsEvent.setFileName(binlogContext.getFileName());
- rowsEvent.setPosition(binlogEventHeader.getLogPos());
- rowsEvent.setTimestamp(binlogEventHeader.getTimestamp());
+ return new MySQLDeleteRowsBinlogEvent(binlogContext.getFileName(),
+ binlogEventHeader.getLogPos(),
binlogEventHeader.getTimestamp(), tableMapEventPacket.getSchemaName(),
tableMapEventPacket.getTableName(), packet.getRows());
}
private PlaceholderBinlogEvent decodePlaceholderEvent(final
MySQLBinlogEventHeader binlogEventHeader, final MySQLPacketPayload payload) {
- PlaceholderBinlogEvent result =
createPlaceholderEvent(binlogEventHeader);
+ PlaceholderBinlogEvent result = new
PlaceholderBinlogEvent(binlogContext.getFileName(),
binlogEventHeader.getLogPos(), binlogEventHeader.getTimestamp());
int remainDataLength = binlogEventHeader.getEventSize() + 1 -
binlogEventHeader.getChecksumLength() - payload.getByteBuf().readerIndex();
if (remainDataLength > 0) {
payload.skipReserved(remainDataLength);
@@ -247,28 +237,11 @@ public final class MySQLBinlogEventPacketDecoder extends
ByteToMessageDecoder {
payload.skipReserved(payload.readInt2());
String databaseName = payload.readStringNul();
String sql =
payload.readStringFix(payload.getByteBuf().readableBytes() -
binlogEventHeader.getChecksumLength());
- MySQLQueryBinlogEvent result = new MySQLQueryBinlogEvent(threadId,
executionTime, errorCode, databaseName, sql);
- result.setFileName(binlogContext.getFileName());
- result.setPosition(binlogEventHeader.getLogPos());
- result.setTimestamp(binlogEventHeader.getTimestamp());
- return result;
+ return new MySQLQueryBinlogEvent(binlogContext.getFileName(),
binlogEventHeader.getLogPos(), binlogEventHeader.getTimestamp(), threadId,
executionTime, errorCode, databaseName, sql);
}
private MySQLXidBinlogEvent decodeXidEvent(final MySQLBinlogEventHeader
binlogEventHeader, final MySQLPacketPayload payload) {
- MySQLXidBinlogEvent result = new
MySQLXidBinlogEvent(payload.readInt8());
- result.setFileName(binlogContext.getFileName());
- result.setPosition(binlogEventHeader.getLogPos());
- result.setTimestamp(binlogEventHeader.getTimestamp());
- return result;
- }
-
- // TODO May be used again later, keep this method first.
- private PlaceholderBinlogEvent createPlaceholderEvent(final
MySQLBinlogEventHeader binlogEventHeader) {
- PlaceholderBinlogEvent result = new PlaceholderBinlogEvent();
- result.setFileName(binlogContext.getFileName());
- result.setPosition(binlogEventHeader.getLogPos());
- result.setTimestamp(binlogEventHeader.getTimestamp());
- return result;
+ return new MySQLXidBinlogEvent(binlogContext.getFileName(),
binlogEventHeader.getLogPos(), binlogEventHeader.getTimestamp(),
payload.readInt8());
}
private void skipChecksum(final int eventType, final ByteBuf in) {
diff --git
a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/dumper/MySQLIncrementalDumperTest.java
b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/dumper/MySQLIncrementalDumperTest.java
index e7926fe5499..151add22cf7 100644
---
a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/dumper/MySQLIncrementalDumperTest.java
+++
b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/dumper/MySQLIncrementalDumperTest.java
@@ -134,7 +134,7 @@ class MySQLIncrementalDumperTest {
@Test
void assertWriteRowsEvent() throws ReflectiveOperationException {
- List<Record> actual = getRecordsByWriteRowsEvent(new
MySQLWriteRowsBinlogEvent("", "t_order", Collections.singletonList(new
Serializable[]{101, 1, "OK"})));
+ List<Record> actual = getRecordsByWriteRowsEvent(new
MySQLWriteRowsBinlogEvent("", 0, 0L, "", "t_order",
Collections.singletonList(new Serializable[]{101, 1, "OK"})));
assertThat(actual.size(), is(1));
assertThat(actual.get(0), instanceOf(DataRecord.class));
assertThat(((DataRecord) actual.get(0)).getType(),
is(PipelineSQLOperationType.INSERT));
@@ -148,8 +148,8 @@ class MySQLIncrementalDumperTest {
@Test
void assertUpdateRowsEvent() throws ReflectiveOperationException {
- List<Record> actual = getRecordsByUpdateRowsEvent(
- new MySQLUpdateRowsBinlogEvent("test", "t_order",
Collections.singletonList(new Serializable[]{101, 1, "OK"}),
Collections.singletonList(new Serializable[]{101, 1, "OK2"})));
+ List<Record> actual = getRecordsByUpdateRowsEvent(new
MySQLUpdateRowsBinlogEvent(
+ "", 0, 0L, "test", "t_order", Collections.singletonList(new
Serializable[]{101, 1, "OK"}), Collections.singletonList(new
Serializable[]{101, 1, "OK2"})));
assertThat(actual.size(), is(1));
assertThat(actual.get(0), instanceOf(DataRecord.class));
assertThat(((DataRecord) actual.get(0)).getType(),
is(PipelineSQLOperationType.UPDATE));
@@ -163,7 +163,7 @@ class MySQLIncrementalDumperTest {
@Test
void assertDeleteRowsEvent() throws ReflectiveOperationException {
- List<Record> actual = getRecordsByDeleteRowsEvent(new
MySQLDeleteRowsBinlogEvent("", "t_order", Collections.singletonList(new
Serializable[]{101, 1, "OK"})));
+ List<Record> actual = getRecordsByDeleteRowsEvent(new
MySQLDeleteRowsBinlogEvent("", 0, 0L, "", "t_order",
Collections.singletonList(new Serializable[]{101, 1, "OK"})));
assertThat(actual.size(), is(1));
assertThat(actual.get(0), instanceOf(DataRecord.class));
assertThat(((DataRecord) actual.get(0)).getType(),
is(PipelineSQLOperationType.DELETE));
@@ -178,14 +178,14 @@ class MySQLIncrementalDumperTest {
@Test
void assertPlaceholderEvent() throws ReflectiveOperationException {
List<Record> actual = (List<Record>)
Plugins.getMemberAccessor().invoke(MySQLIncrementalDumper.class.getDeclaredMethod("handleEvent",
MySQLBaseBinlogEvent.class),
- incrementalDumper, new PlaceholderBinlogEvent());
+ incrementalDumper, new PlaceholderBinlogEvent("", 0, 0L));
assertThat(actual.size(), is(1));
}
@Test
void assertRowsEventFiltered() throws ReflectiveOperationException {
List<Record> actual = (List<Record>)
Plugins.getMemberAccessor().invoke(MySQLIncrementalDumper.class.getDeclaredMethod("handleEvent",
MySQLBaseBinlogEvent.class),
- incrementalDumper, new MySQLWriteRowsBinlogEvent("test",
"t_order", Collections.singletonList(new Serializable[]{1})));
+ incrementalDumper, new MySQLWriteRowsBinlogEvent("", 0, 0L,
"test", "t_order", Collections.singletonList(new Serializable[]{1})));
assertThat(actual.size(), is(1));
assertThat(actual.get(0), instanceOf(DataRecord.class));
}