This is an automated email from the ASF dual-hosted git repository.
zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new f0f1c2a8d11 Rename MySQLBaseBinlogEvent (#32563)
f0f1c2a8d11 is described below
commit f0f1c2a8d119acd6cc60f55eb84039725ff7317f
Author: Liang Zhang <[email protected]>
AuthorDate: Fri Aug 16 23:01:54 2024 +0800
Rename MySQLBaseBinlogEvent (#32563)
* Rename MySQLBaseBinlogEvent
* Refactor MySQLWriteRowsBinlogEvent
* Refactor MySQLUpdateRowsBinlogEvent
* Refactor MySQLDeleteRowsBinlogEvent
---
...tBinlogEvent.java => MySQLBaseBinlogEvent.java} | 4 +-
...olderEvent.java => PlaceholderBinlogEvent.java} | 2 +-
.../MySQLQueryBinlogEvent.java} | 5 +-
.../MySQLBaseRowsBinlogEvent.java} | 7 ++-
.../MySQLDeleteRowsBinlogEvent.java} | 12 ++--
.../MySQLUpdateRowsBinlogEvent.java} | 14 ++---
.../MySQLWriteRowsBinlogEvent.java} | 12 ++--
.../MySQLXidBinlogEvent.java} | 5 +-
.../incremental/client/MySQLBinlogClient.java | 24 ++++----
.../netty/MySQLBinlogEventPacketDecoder.java | 72 ++++++++++------------
.../incremental/dumper/MySQLIncrementalDumper.java | 42 ++++++-------
.../netty/MySQLBinlogEventPacketDecoderTest.java | 30 ++++-----
.../dumper/MySQLIncrementalDumperTest.java | 49 +++++++--------
13 files changed, 136 insertions(+), 142 deletions(-)
diff --git
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/AbstractBinlogEvent.java
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/MySQLBaseBinlogEvent.java
similarity index 93%
rename from
kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/AbstractBinlogEvent.java
rename to
kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/MySQLBaseBinlogEvent.java
index 350f9bb56c2..63873bec2b7 100644
---
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/AbstractBinlogEvent.java
+++
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/MySQLBaseBinlogEvent.java
@@ -21,11 +21,11 @@ import lombok.Getter;
import lombok.Setter;
/**
- * Abstract binlog event.
+ * MySQL base binlog event.
*/
@Getter
@Setter
-public abstract class AbstractBinlogEvent {
+public abstract class MySQLBaseBinlogEvent {
private String fileName;
diff --git
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/PlaceholderEvent.java
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/PlaceholderBinlogEvent.java
similarity index 93%
rename from
kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/PlaceholderEvent.java
rename to
kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/PlaceholderBinlogEvent.java
index f37272f4625..ecc57d276ca 100644
---
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/PlaceholderEvent.java
+++
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/PlaceholderBinlogEvent.java
@@ -20,5 +20,5 @@ package
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.
/**
* Placeholder binlog event, unsupported binlog event will replace it into
this class.
*/
-public final class PlaceholderEvent extends AbstractBinlogEvent {
+public final class PlaceholderBinlogEvent extends MySQLBaseBinlogEvent {
}
diff --git
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/QueryEvent.java
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/query/MySQLQueryBinlogEvent.java
similarity index 88%
rename from
kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/QueryEvent.java
rename to
kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/query/MySQLQueryBinlogEvent.java
index 5b3e591dd73..a521588fc6b 100644
---
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/QueryEvent.java
+++
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/query/MySQLQueryBinlogEvent.java
@@ -15,10 +15,11 @@
* limitations under the License.
*/
-package
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event;
+package
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.query;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
+import
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.MySQLBaseBinlogEvent;
/**
* Query event.This event is written into the binary log file for:
@@ -30,7 +31,7 @@ import lombok.RequiredArgsConstructor;
*/
@RequiredArgsConstructor
@Getter
-public final class QueryEvent extends AbstractBinlogEvent {
+public final class MySQLQueryBinlogEvent extends MySQLBaseBinlogEvent {
private final long threadId;
diff --git
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/AbstractRowsEvent.java
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/rows/MySQLBaseRowsBinlogEvent.java
similarity index 80%
rename from
kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/AbstractRowsEvent.java
rename to
kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/rows/MySQLBaseRowsBinlogEvent.java
index 56ecaeeb963..8bb91ffd833 100644
---
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/AbstractRowsEvent.java
+++
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/rows/MySQLBaseRowsBinlogEvent.java
@@ -15,17 +15,18 @@
* limitations under the License.
*/
-package
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event;
+package
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.rows;
import lombok.Getter;
import lombok.Setter;
+import
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.MySQLBaseBinlogEvent;
/**
- * Abstract rows event.
+ * MySQL rows base event.
*/
@Getter
@Setter
-public abstract class AbstractRowsEvent extends AbstractBinlogEvent {
+public abstract class MySQLBaseRowsBinlogEvent extends MySQLBaseBinlogEvent {
private String databaseName;
diff --git
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/DeleteRowsEvent.java
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/rows/MySQLDeleteRowsBinlogEvent.java
similarity index 78%
rename from
kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/DeleteRowsEvent.java
rename to
kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/rows/MySQLDeleteRowsBinlogEvent.java
index 1d27a8dbd7a..e403d6d643c 100644
---
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/DeleteRowsEvent.java
+++
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/rows/MySQLDeleteRowsBinlogEvent.java
@@ -15,20 +15,20 @@
* limitations under the License.
*/
-package
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event;
+package
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.rows;
import lombok.Getter;
-import lombok.Setter;
+import lombok.RequiredArgsConstructor;
import java.io.Serializable;
import java.util.List;
/**
- * Delete rows event.
+ * MySQL delete rows binlog event.
*/
+@RequiredArgsConstructor
@Getter
-@Setter
-public final class DeleteRowsEvent extends AbstractRowsEvent {
+public final class MySQLDeleteRowsBinlogEvent extends MySQLBaseRowsBinlogEvent
{
- private List<Serializable[]> beforeRows;
+ private final List<Serializable[]> beforeRows;
}
diff --git
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/UpdateRowsEvent.java
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/rows/MySQLUpdateRowsBinlogEvent.java
similarity index 75%
rename from
kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/UpdateRowsEvent.java
rename to
kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/rows/MySQLUpdateRowsBinlogEvent.java
index 0f89a70f396..e8b99362368 100644
---
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/UpdateRowsEvent.java
+++
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/rows/MySQLUpdateRowsBinlogEvent.java
@@ -15,22 +15,22 @@
* limitations under the License.
*/
-package
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event;
+package
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.rows;
import lombok.Getter;
-import lombok.Setter;
+import lombok.RequiredArgsConstructor;
import java.io.Serializable;
import java.util.List;
/**
- * Update rows event.
+ * MySQL update rows binlog event.
*/
+@RequiredArgsConstructor
@Getter
-@Setter
-public final class UpdateRowsEvent extends AbstractRowsEvent {
+public final class MySQLUpdateRowsBinlogEvent extends MySQLBaseRowsBinlogEvent
{
- private List<Serializable[]> beforeRows;
+ private final List<Serializable[]> beforeRows;
- private List<Serializable[]> afterRows;
+ private final List<Serializable[]> afterRows;
}
diff --git
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/WriteRowsEvent.java
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/rows/MySQLWriteRowsBinlogEvent.java
similarity index 79%
rename from
kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/WriteRowsEvent.java
rename to
kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/rows/MySQLWriteRowsBinlogEvent.java
index 35f76361006..912d54e14d1 100644
---
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/WriteRowsEvent.java
+++
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/rows/MySQLWriteRowsBinlogEvent.java
@@ -15,20 +15,20 @@
* limitations under the License.
*/
-package
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event;
+package
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.rows;
import lombok.Getter;
-import lombok.Setter;
+import lombok.RequiredArgsConstructor;
import java.io.Serializable;
import java.util.List;
/**
- * Write rows event.
+ * MySQL write rows binlog event.
*/
+@RequiredArgsConstructor
@Getter
-@Setter
-public final class WriteRowsEvent extends AbstractRowsEvent {
+public final class MySQLWriteRowsBinlogEvent extends MySQLBaseRowsBinlogEvent {
- private List<Serializable[]> afterRows;
+ private final List<Serializable[]> afterRows;
}
diff --git
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/XidEvent.java
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/transaction/MySQLXidBinlogEvent.java
similarity index 85%
rename from
kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/XidEvent.java
rename to
kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/transaction/MySQLXidBinlogEvent.java
index 9b7589513e1..b3c2e2af885 100644
---
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/XidEvent.java
+++
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/transaction/MySQLXidBinlogEvent.java
@@ -15,10 +15,11 @@
* limitations under the License.
*/
-package
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event;
+package
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.transaction;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
+import
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.MySQLBaseBinlogEvent;
/**
* XID event is generated for a COMMIT of a transaction that modifies one or
more tables of an XA-capable storage engine.
@@ -27,7 +28,7 @@ import lombok.RequiredArgsConstructor;
*/
@RequiredArgsConstructor
@Getter
-public final class XidEvent extends AbstractBinlogEvent {
+public final class MySQLXidBinlogEvent extends MySQLBaseBinlogEvent {
private final long xid;
}
diff --git
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/client/MySQLBinlogClient.java
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/client/MySQLBinlogClient.java
index 3ec1a166257..7ba01844b64 100644
---
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/client/MySQLBinlogClient.java
+++
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/client/MySQLBinlogClient.java
@@ -34,8 +34,8 @@ import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import
org.apache.shardingsphere.data.pipeline.core.exception.PipelineInternalException;
-import
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.AbstractBinlogEvent;
-import
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.PlaceholderEvent;
+import
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.MySQLBaseBinlogEvent;
+import
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.PlaceholderBinlogEvent;
import
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.client.netty.MySQLBinlogEventPacketDecoder;
import
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.client.netty.MySQLCommandPacketDecoder;
import
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.client.netty.MySQLNegotiateHandler;
@@ -77,7 +77,7 @@ public final class MySQLBinlogClient {
private final boolean decodeWithTX;
- private final ArrayBlockingQueue<List<AbstractBinlogEvent>>
blockingEventQueue = new ArrayBlockingQueue<>(2500);
+ private final ArrayBlockingQueue<List<MySQLBaseBinlogEvent>>
blockingEventQueue = new ArrayBlockingQueue<>(2500);
private EventLoopGroup eventLoopGroup;
@@ -226,8 +226,8 @@ public final class MySQLBinlogClient {
channel.writeAndFlush(new MySQLComBinlogDumpCommandPacket((int)
binlogPosition, connectInfo.getServerId(), binlogFileName));
}
- private AbstractBinlogEvent getLastBinlogEvent(final String
binlogFileName, final long binlogPosition) {
- PlaceholderEvent result = new PlaceholderEvent();
+ private MySQLBaseBinlogEvent getLastBinlogEvent(final String
binlogFileName, final long binlogPosition) {
+ PlaceholderBinlogEvent result = new PlaceholderBinlogEvent();
result.setFileName(binlogFileName);
result.setPosition(binlogPosition);
return result;
@@ -242,12 +242,12 @@ public final class MySQLBinlogClient {
*
* @return binlog event
*/
- public synchronized List<AbstractBinlogEvent> poll() {
+ public synchronized List<MySQLBaseBinlogEvent> poll() {
if (!running) {
return Collections.emptyList();
}
try {
- List<AbstractBinlogEvent> result = blockingEventQueue.poll(100L,
TimeUnit.MILLISECONDS);
+ List<MySQLBaseBinlogEvent> result = blockingEventQueue.poll(100L,
TimeUnit.MILLISECONDS);
return null == result ? Collections.emptyList() : result;
} catch (final InterruptedException ignored) {
Thread.currentThread().interrupt();
@@ -314,11 +314,11 @@ public final class MySQLBinlogClient {
private final class MySQLBinlogEventHandler extends
ChannelInboundHandlerAdapter {
- private final AtomicReference<AbstractBinlogEvent> lastBinlogEvent;
+ private final AtomicReference<MySQLBaseBinlogEvent> lastBinlogEvent;
private final AtomicBoolean reconnectRequested = new
AtomicBoolean(false);
- MySQLBinlogEventHandler(final AbstractBinlogEvent lastBinlogEvent) {
+ MySQLBinlogEventHandler(final MySQLBaseBinlogEvent lastBinlogEvent) {
this.lastBinlogEvent = new AtomicReference<>(lastBinlogEvent);
}
@@ -329,7 +329,7 @@ public final class MySQLBinlogClient {
return;
}
if (msg instanceof List) {
- List<AbstractBinlogEvent> records =
(List<AbstractBinlogEvent>) msg;
+ List<MySQLBaseBinlogEvent> records =
(List<MySQLBaseBinlogEvent>) msg;
if (records.isEmpty()) {
log.warn("The records is empty");
return;
@@ -338,8 +338,8 @@ public final class MySQLBinlogClient {
blockingEventQueue.put(records);
return;
}
- if (msg instanceof AbstractBinlogEvent) {
- lastBinlogEvent.set((AbstractBinlogEvent) msg);
+ if (msg instanceof MySQLBaseBinlogEvent) {
+ lastBinlogEvent.set((MySQLBaseBinlogEvent) msg);
blockingEventQueue.put(Collections.singletonList(lastBinlogEvent.get()));
}
}
diff --git
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/client/netty/MySQLBinlogEventPacketDecoder.java
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/client/netty/MySQLBinlogEventPacketDecoder.java
index d53c1291421..223c7ed08ea 100644
---
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/client/netty/MySQLBinlogEventPacketDecoder.java
+++
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/client/netty/MySQLBinlogEventPacketDecoder.java
@@ -24,14 +24,14 @@ import io.netty.handler.codec.ByteToMessageDecoder;
import lombok.extern.slf4j.Slf4j;
import
org.apache.shardingsphere.data.pipeline.core.exception.PipelineInternalException;
import
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.MySQLBinlogContext;
-import
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.AbstractBinlogEvent;
-import
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.AbstractRowsEvent;
-import
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.DeleteRowsEvent;
-import
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.PlaceholderEvent;
-import
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.QueryEvent;
-import
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.UpdateRowsEvent;
-import
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.WriteRowsEvent;
-import
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.XidEvent;
+import
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.MySQLBaseBinlogEvent;
+import
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.rows.MySQLBaseRowsBinlogEvent;
+import
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.rows.MySQLDeleteRowsBinlogEvent;
+import
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.PlaceholderBinlogEvent;
+import
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.query.MySQLQueryBinlogEvent;
+import
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.rows.MySQLUpdateRowsBinlogEvent;
+import
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.rows.MySQLWriteRowsBinlogEvent;
+import
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.transaction.MySQLXidBinlogEvent;
import org.apache.shardingsphere.db.protocol.constant.CommonConstants;
import
org.apache.shardingsphere.db.protocol.mysql.constant.MySQLBinlogEventType;
import
org.apache.shardingsphere.db.protocol.mysql.packet.binlog.MySQLBinlogEventHeader;
@@ -58,7 +58,7 @@ public final class MySQLBinlogEventPacketDecoder extends
ByteToMessageDecoder {
private final boolean decodeWithTX;
- private List<AbstractBinlogEvent> records = new LinkedList<>();
+ private List<MySQLBaseBinlogEvent> records = new LinkedList<>();
public MySQLBinlogEventPacketDecoder(final int checksumLength, final
Map<Long, MySQLBinlogTableMapEventPacket> tableMap, final boolean decodeWithTX)
{
this.decodeWithTX = decodeWithTX;
@@ -75,12 +75,12 @@ public final class MySQLBinlogEventPacketDecoder extends
ByteToMessageDecoder {
if (!checkEventIntegrity(in, binlogEventHeader)) {
return;
}
- Optional<AbstractBinlogEvent> binlogEvent =
decodeEvent(binlogEventHeader, payload);
+ Optional<MySQLBaseBinlogEvent> binlogEvent =
decodeEvent(binlogEventHeader, payload);
if (!binlogEvent.isPresent()) {
skipChecksum(binlogEventHeader.getEventType(), in);
return;
}
- if (binlogEvent.get() instanceof PlaceholderEvent) {
+ if (binlogEvent.get() instanceof PlaceholderBinlogEvent) {
out.add(binlogEvent.get());
skipChecksum(binlogEventHeader.getEventType(), in);
return;
@@ -120,15 +120,15 @@ public final class MySQLBinlogEventPacketDecoder extends
ByteToMessageDecoder {
return true;
}
- private void processEventWithTX(final AbstractBinlogEvent binlogEvent,
final List<Object> out) {
- if (binlogEvent instanceof QueryEvent) {
- QueryEvent queryEvent = (QueryEvent) binlogEvent;
+ private void processEventWithTX(final MySQLBaseBinlogEvent binlogEvent,
final List<Object> out) {
+ if (binlogEvent instanceof MySQLQueryBinlogEvent) {
+ MySQLQueryBinlogEvent queryEvent = (MySQLQueryBinlogEvent)
binlogEvent;
if (TX_BEGIN_SQL.equals(queryEvent.getSql())) {
records = new LinkedList<>();
} else {
out.add(binlogEvent);
}
- } else if (binlogEvent instanceof XidEvent) {
+ } else if (binlogEvent instanceof MySQLXidBinlogEvent) {
records.add(binlogEvent);
out.add(records);
} else {
@@ -136,9 +136,9 @@ public final class MySQLBinlogEventPacketDecoder extends
ByteToMessageDecoder {
}
}
- private void processEventIgnoreTX(final AbstractBinlogEvent binlogEvent,
final List<Object> out) {
- if (binlogEvent instanceof QueryEvent) {
- QueryEvent queryEvent = (QueryEvent) binlogEvent;
+ private void processEventIgnoreTX(final MySQLBaseBinlogEvent binlogEvent,
final List<Object> out) {
+ if (binlogEvent instanceof MySQLQueryBinlogEvent) {
+ MySQLQueryBinlogEvent queryEvent = (MySQLQueryBinlogEvent)
binlogEvent;
if (TX_BEGIN_SQL.equals(queryEvent.getSql())) {
return;
}
@@ -146,7 +146,7 @@ public final class MySQLBinlogEventPacketDecoder extends
ByteToMessageDecoder {
out.add(binlogEvent);
}
- private Optional<AbstractBinlogEvent> decodeEvent(final
MySQLBinlogEventHeader binlogEventHeader, final MySQLPacketPayload payload) {
+ private Optional<MySQLBaseBinlogEvent> decodeEvent(final
MySQLBinlogEventHeader binlogEventHeader, final MySQLPacketPayload payload) {
switch
(MySQLBinlogEventType.valueOf(binlogEventHeader.getEventType()).orElse(MySQLBinlogEventType.UNKNOWN_EVENT))
{
case ROTATE_EVENT:
decodeRotateEvent(binlogEventHeader, payload);
@@ -197,35 +197,31 @@ public final class MySQLBinlogEventPacketDecoder extends
ByteToMessageDecoder {
binlogContext.putTableMapEvent(packet.getTableId(), packet);
}
- private WriteRowsEvent decodeWriteRowsEventV2(final MySQLBinlogEventHeader
binlogEventHeader, final MySQLPacketPayload payload) {
+ private MySQLWriteRowsBinlogEvent decodeWriteRowsEventV2(final
MySQLBinlogEventHeader binlogEventHeader, final MySQLPacketPayload payload) {
MySQLBinlogRowsEventPacket packet = new
MySQLBinlogRowsEventPacket(binlogEventHeader, payload);
packet.readRows(binlogContext.getTableMapEvent(packet.getTableId()),
payload);
- WriteRowsEvent result = new WriteRowsEvent();
+ MySQLWriteRowsBinlogEvent result = new
MySQLWriteRowsBinlogEvent(packet.getRows());
initRowsEvent(result, binlogEventHeader, packet.getTableId());
- result.setAfterRows(packet.getRows());
return result;
}
- private UpdateRowsEvent decodeUpdateRowsEventV2(final
MySQLBinlogEventHeader binlogEventHeader, final MySQLPacketPayload payload) {
+ private MySQLUpdateRowsBinlogEvent decodeUpdateRowsEventV2(final
MySQLBinlogEventHeader binlogEventHeader, final MySQLPacketPayload payload) {
MySQLBinlogRowsEventPacket packet = new
MySQLBinlogRowsEventPacket(binlogEventHeader, payload);
packet.readRows(binlogContext.getTableMapEvent(packet.getTableId()),
payload);
- UpdateRowsEvent result = new UpdateRowsEvent();
+ MySQLUpdateRowsBinlogEvent result = new
MySQLUpdateRowsBinlogEvent(packet.getRows(), packet.getRows2());
initRowsEvent(result, binlogEventHeader, packet.getTableId());
- result.setBeforeRows(packet.getRows());
- result.setAfterRows(packet.getRows2());
return result;
}
- private DeleteRowsEvent decodeDeleteRowsEventV2(final
MySQLBinlogEventHeader binlogEventHeader, final MySQLPacketPayload payload) {
+ private MySQLDeleteRowsBinlogEvent decodeDeleteRowsEventV2(final
MySQLBinlogEventHeader binlogEventHeader, final MySQLPacketPayload payload) {
MySQLBinlogRowsEventPacket packet = new
MySQLBinlogRowsEventPacket(binlogEventHeader, payload);
packet.readRows(binlogContext.getTableMapEvent(packet.getTableId()),
payload);
- DeleteRowsEvent result = new DeleteRowsEvent();
+ MySQLDeleteRowsBinlogEvent result = new
MySQLDeleteRowsBinlogEvent(packet.getRows());
initRowsEvent(result, binlogEventHeader, packet.getTableId());
- result.setBeforeRows(packet.getRows());
return result;
}
- private void initRowsEvent(final AbstractRowsEvent rowsEvent, final
MySQLBinlogEventHeader binlogEventHeader, final long tableId) {
+ private void initRowsEvent(final MySQLBaseRowsBinlogEvent rowsEvent, final
MySQLBinlogEventHeader binlogEventHeader, final long tableId) {
rowsEvent.setDatabaseName(binlogContext.getTableMapEvent(tableId).getSchemaName());
rowsEvent.setTableName(binlogContext.getTableMapEvent(tableId).getTableName());
rowsEvent.setFileName(binlogContext.getFileName());
@@ -233,8 +229,8 @@ public final class MySQLBinlogEventPacketDecoder extends
ByteToMessageDecoder {
rowsEvent.setTimestamp(binlogEventHeader.getTimestamp());
}
- private PlaceholderEvent decodePlaceholderEvent(final
MySQLBinlogEventHeader binlogEventHeader, final MySQLPacketPayload payload) {
- PlaceholderEvent result = createPlaceholderEvent(binlogEventHeader);
+ private PlaceholderBinlogEvent decodePlaceholderEvent(final
MySQLBinlogEventHeader binlogEventHeader, final MySQLPacketPayload payload) {
+ PlaceholderBinlogEvent result =
createPlaceholderEvent(binlogEventHeader);
int remainDataLength = binlogEventHeader.getEventSize() + 1 -
binlogEventHeader.getChecksumLength() - payload.getByteBuf().readerIndex();
if (remainDataLength > 0) {
payload.skipReserved(remainDataLength);
@@ -242,7 +238,7 @@ public final class MySQLBinlogEventPacketDecoder extends
ByteToMessageDecoder {
return result;
}
- private QueryEvent decodeQueryEvent(final MySQLBinlogEventHeader
binlogEventHeader, final MySQLPacketPayload payload) {
+ private MySQLQueryBinlogEvent decodeQueryEvent(final
MySQLBinlogEventHeader binlogEventHeader, final MySQLPacketPayload payload) {
int threadId = payload.readInt4();
int executionTime = payload.readInt4();
payload.skipReserved(1);
@@ -250,15 +246,15 @@ public final class MySQLBinlogEventPacketDecoder extends
ByteToMessageDecoder {
payload.skipReserved(payload.readInt2());
String databaseName = payload.readStringNul();
String sql =
payload.readStringFix(payload.getByteBuf().readableBytes() -
binlogEventHeader.getChecksumLength());
- QueryEvent result = new QueryEvent(threadId, executionTime, errorCode,
databaseName, sql);
+ MySQLQueryBinlogEvent result = new MySQLQueryBinlogEvent(threadId,
executionTime, errorCode, databaseName, sql);
result.setFileName(binlogContext.getFileName());
result.setPosition(binlogEventHeader.getLogPos());
result.setTimestamp(binlogEventHeader.getTimestamp());
return result;
}
- private XidEvent decodeXidEvent(final MySQLBinlogEventHeader
binlogEventHeader, final MySQLPacketPayload payload) {
- XidEvent result = new XidEvent(payload.readInt8());
+ private MySQLXidBinlogEvent decodeXidEvent(final MySQLBinlogEventHeader
binlogEventHeader, final MySQLPacketPayload payload) {
+ MySQLXidBinlogEvent result = new
MySQLXidBinlogEvent(payload.readInt8());
result.setFileName(binlogContext.getFileName());
result.setPosition(binlogEventHeader.getLogPos());
result.setTimestamp(binlogEventHeader.getTimestamp());
@@ -266,8 +262,8 @@ public final class MySQLBinlogEventPacketDecoder extends
ByteToMessageDecoder {
}
// TODO May be used again later, keep this method first.
- private PlaceholderEvent createPlaceholderEvent(final
MySQLBinlogEventHeader binlogEventHeader) {
- PlaceholderEvent result = new PlaceholderEvent();
+ private PlaceholderBinlogEvent createPlaceholderEvent(final
MySQLBinlogEventHeader binlogEventHeader) {
+ PlaceholderBinlogEvent result = new PlaceholderBinlogEvent();
result.setFileName(binlogContext.getFileName());
result.setPosition(binlogEventHeader.getLogPos());
result.setTimestamp(binlogEventHeader.getTimestamp());
diff --git
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/dumper/MySQLIncrementalDumper.java
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/dumper/MySQLIncrementalDumper.java
index 8c15f860ade..3f6f2d2720e 100644
---
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/dumper/MySQLIncrementalDumper.java
+++
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/dumper/MySQLIncrementalDumper.java
@@ -34,11 +34,11 @@ import
org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineColum
import
org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineTableMetaData;
import
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.MySQLBinlogPosition;
import
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.data.MySQLBinlogDataHandler;
-import
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.AbstractBinlogEvent;
-import
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.AbstractRowsEvent;
-import
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.DeleteRowsEvent;
-import
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.UpdateRowsEvent;
-import
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.WriteRowsEvent;
+import
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.MySQLBaseBinlogEvent;
+import
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.rows.MySQLBaseRowsBinlogEvent;
+import
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.rows.MySQLDeleteRowsBinlogEvent;
+import
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.rows.MySQLUpdateRowsBinlogEvent;
+import
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.rows.MySQLWriteRowsBinlogEvent;
import
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.client.ConnectInfo;
import
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.client.MySQLBinlogClient;
import org.apache.shardingsphere.infra.annotation.HighFrequencyInvocation;
@@ -103,9 +103,9 @@ public final class MySQLIncrementalDumper extends
AbstractPipelineLifecycleRunna
}
}
- private void handleEvents(final List<AbstractBinlogEvent> events) {
+ private void handleEvents(final List<MySQLBaseBinlogEvent> events) {
List<Record> dataRecords = new LinkedList<>();
- for (AbstractBinlogEvent each : events) {
+ for (MySQLBaseBinlogEvent each : events) {
dataRecords.addAll(handleEvent(each));
}
if (!dataRecords.isEmpty()) {
@@ -113,28 +113,28 @@ public final class MySQLIncrementalDumper extends
AbstractPipelineLifecycleRunna
}
}
- private List<? extends Record> handleEvent(final AbstractBinlogEvent
event) {
- if (!(event instanceof AbstractRowsEvent)) {
+ private List<? extends Record> handleEvent(final MySQLBaseBinlogEvent
event) {
+ if (!(event instanceof MySQLBaseRowsBinlogEvent)) {
return Collections.singletonList(createPlaceholderRecord(event));
}
- AbstractRowsEvent rowsEvent = (AbstractRowsEvent) event;
+ MySQLBaseRowsBinlogEvent rowsEvent = (MySQLBaseRowsBinlogEvent) event;
if (!rowsEvent.getDatabaseName().equals(catalog) ||
!dumperContext.getCommonContext().getTableNameMapper().containsTable(rowsEvent.getTableName()))
{
return Collections.singletonList(createPlaceholderRecord(event));
}
PipelineTableMetaData tableMetaData =
getPipelineTableMetaData(rowsEvent.getTableName());
- if (event instanceof WriteRowsEvent) {
- return handleWriteRowsEvent((WriteRowsEvent) event, tableMetaData);
+ if (event instanceof MySQLWriteRowsBinlogEvent) {
+ return handleWriteRowsEvent((MySQLWriteRowsBinlogEvent) event,
tableMetaData);
}
- if (event instanceof UpdateRowsEvent) {
- return handleUpdateRowsEvent((UpdateRowsEvent) event,
tableMetaData);
+ if (event instanceof MySQLUpdateRowsBinlogEvent) {
+ return handleUpdateRowsEvent((MySQLUpdateRowsBinlogEvent) event,
tableMetaData);
}
- if (event instanceof DeleteRowsEvent) {
- return handleDeleteRowsEvent((DeleteRowsEvent) event,
tableMetaData);
+ if (event instanceof MySQLDeleteRowsBinlogEvent) {
+ return handleDeleteRowsEvent((MySQLDeleteRowsBinlogEvent) event,
tableMetaData);
}
return Collections.emptyList();
}
- private PlaceholderRecord createPlaceholderRecord(final
AbstractBinlogEvent event) {
+ private PlaceholderRecord createPlaceholderRecord(final
MySQLBaseBinlogEvent event) {
PlaceholderRecord result = new PlaceholderRecord(new
MySQLBinlogPosition(event.getFileName(), event.getPosition()));
result.setCommitTime(event.getTimestamp() * 1000L);
return result;
@@ -145,7 +145,7 @@ public final class MySQLIncrementalDumper extends
AbstractPipelineLifecycleRunna
return
metaDataLoader.getTableMetaData(dumperContext.getCommonContext().getTableAndSchemaNameMapper().getSchemaName(logicTableName),
actualTableName);
}
- private List<DataRecord> handleWriteRowsEvent(final WriteRowsEvent event,
final PipelineTableMetaData tableMetaData) {
+ private List<DataRecord> handleWriteRowsEvent(final
MySQLWriteRowsBinlogEvent event, final PipelineTableMetaData tableMetaData) {
List<DataRecord> result = new LinkedList<>();
for (Serializable[] each : event.getAfterRows()) {
DataRecord dataRecord =
createDataRecord(PipelineSQLOperationType.INSERT, event, each.length);
@@ -158,7 +158,7 @@ public final class MySQLIncrementalDumper extends
AbstractPipelineLifecycleRunna
return result;
}
- private List<DataRecord> handleUpdateRowsEvent(final UpdateRowsEvent
event, final PipelineTableMetaData tableMetaData) {
+ private List<DataRecord> handleUpdateRowsEvent(final
MySQLUpdateRowsBinlogEvent event, final PipelineTableMetaData tableMetaData) {
List<DataRecord> result = new LinkedList<>();
for (int i = 0; i < event.getBeforeRows().size(); i++) {
Serializable[] beforeValues = event.getBeforeRows().get(i);
@@ -176,7 +176,7 @@ public final class MySQLIncrementalDumper extends
AbstractPipelineLifecycleRunna
return result;
}
- private List<DataRecord> handleDeleteRowsEvent(final DeleteRowsEvent
event, final PipelineTableMetaData tableMetaData) {
+ private List<DataRecord> handleDeleteRowsEvent(final
MySQLDeleteRowsBinlogEvent event, final PipelineTableMetaData tableMetaData) {
List<DataRecord> result = new LinkedList<>();
for (Serializable[] each : event.getBeforeRows()) {
DataRecord dataRecord =
createDataRecord(PipelineSQLOperationType.DELETE, event, each.length);
@@ -189,7 +189,7 @@ public final class MySQLIncrementalDumper extends
AbstractPipelineLifecycleRunna
return result;
}
- private DataRecord createDataRecord(final PipelineSQLOperationType type,
final AbstractRowsEvent rowsEvent, final int columnCount) {
+ private DataRecord createDataRecord(final PipelineSQLOperationType type,
final MySQLBaseRowsBinlogEvent rowsEvent, final int columnCount) {
String tableName =
dumperContext.getCommonContext().getTableNameMapper().getLogicTableName(rowsEvent.getTableName()).toString();
IngestPosition binlogPosition = new
MySQLBinlogPosition(rowsEvent.getFileName(), rowsEvent.getPosition());
DataRecord result = new DataRecord(type, tableName, binlogPosition,
columnCount);
diff --git
a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/client/netty/MySQLBinlogEventPacketDecoderTest.java
b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/client/netty/MySQLBinlogEventPacketDecoderTest.java
index 9d27fdd8f12..b4a6a798658 100644
---
a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/client/netty/MySQLBinlogEventPacketDecoderTest.java
+++
b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/client/netty/MySQLBinlogEventPacketDecoderTest.java
@@ -24,11 +24,11 @@ import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.util.internal.StringUtil;
import
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.MySQLBinlogContext;
-import
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.DeleteRowsEvent;
-import
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.QueryEvent;
-import
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.UpdateRowsEvent;
-import
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.WriteRowsEvent;
-import
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.XidEvent;
+import
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.rows.MySQLDeleteRowsBinlogEvent;
+import
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.query.MySQLQueryBinlogEvent;
+import
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.rows.MySQLUpdateRowsBinlogEvent;
+import
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.rows.MySQLWriteRowsBinlogEvent;
+import
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.transaction.MySQLXidBinlogEvent;
import org.apache.shardingsphere.db.protocol.constant.CommonConstants;
import
org.apache.shardingsphere.db.protocol.mysql.constant.MySQLBinaryColumnType;
import
org.apache.shardingsphere.db.protocol.mysql.packet.binlog.row.MySQLBinlogTableMapEventPacket;
@@ -120,9 +120,9 @@ class MySQLBinlogEventPacketDecoderTest {
binlogEventPacketDecoder.decode(channelHandlerContext, byteBuf,
decodedEvents);
assertFalse(decodedEvents.isEmpty());
Object actual = decodedEvents.get(0);
- assertInstanceOf(QueryEvent.class, actual);
- assertThat(((QueryEvent) actual).getTimestamp(), is(1700193011L));
- assertThat(((QueryEvent) actual).getPosition(), is(168785090L));
+ assertInstanceOf(MySQLQueryBinlogEvent.class, actual);
+ assertThat(((MySQLQueryBinlogEvent) actual).getTimestamp(),
is(1700193011L));
+ assertThat(((MySQLQueryBinlogEvent) actual).getPosition(),
is(168785090L));
}
@Test
@@ -149,8 +149,8 @@ class MySQLBinlogEventPacketDecoderTest {
binlogEventPacketDecoder.decode(channelHandlerContext, byteBuf,
decodedEvents);
assertThat(decodedEvents.size(), is(1));
LinkedList<?> actualEventList = (LinkedList<?>) decodedEvents.get(0);
- assertThat(actualEventList.get(0), instanceOf(WriteRowsEvent.class));
- WriteRowsEvent actual = (WriteRowsEvent) actualEventList.get(0);
+ assertThat(actualEventList.get(0),
instanceOf(MySQLWriteRowsBinlogEvent.class));
+ MySQLWriteRowsBinlogEvent actual = (MySQLWriteRowsBinlogEvent)
actualEventList.get(0);
assertThat(actual.getAfterRows().get(0), is(new Serializable[]{1L, 1,
new MySQLBinaryString("SUCCESS".getBytes()), null}));
}
@@ -167,8 +167,8 @@ class MySQLBinlogEventPacketDecoderTest {
binlogEventPacketDecoder.decode(channelHandlerContext, byteBuf,
decodedEvents);
assertThat(decodedEvents.size(), is(1));
LinkedList<?> actualEventList = (LinkedList<?>) decodedEvents.get(0);
- assertThat(actualEventList.get(0), instanceOf(UpdateRowsEvent.class));
- UpdateRowsEvent actual = (UpdateRowsEvent) actualEventList.get(0);
+ assertThat(actualEventList.get(0),
instanceOf(MySQLUpdateRowsBinlogEvent.class));
+ MySQLUpdateRowsBinlogEvent actual = (MySQLUpdateRowsBinlogEvent)
actualEventList.get(0);
assertThat(actual.getBeforeRows().get(0), is(new Serializable[]{1L, 1,
new MySQLBinaryString("SUCCESS".getBytes()), null}));
assertThat(actual.getAfterRows().get(0), is(new Serializable[]{1L, 1,
new MySQLBinaryString("updated".getBytes()), null}));
}
@@ -185,9 +185,9 @@ class MySQLBinlogEventPacketDecoderTest {
binlogEventPacketDecoder.decode(channelHandlerContext, byteBuf,
decodedEvents);
assertThat(decodedEvents.size(), is(1));
LinkedList<?> actualEventList = (LinkedList<?>) decodedEvents.get(0);
- assertThat(actualEventList.get(0), instanceOf(DeleteRowsEvent.class));
- assertThat(actualEventList.get(1), instanceOf(XidEvent.class));
- DeleteRowsEvent actual = (DeleteRowsEvent) actualEventList.get(0);
+ assertThat(actualEventList.get(0),
instanceOf(MySQLDeleteRowsBinlogEvent.class));
+ assertThat(actualEventList.get(1),
instanceOf(MySQLXidBinlogEvent.class));
+ MySQLDeleteRowsBinlogEvent actual = (MySQLDeleteRowsBinlogEvent)
actualEventList.get(0);
assertThat(actual.getBeforeRows().get(0), is(new Serializable[]{1L, 1,
new MySQLBinaryString("SUCCESS".getBytes()), null}));
}
diff --git
a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/dumper/MySQLIncrementalDumperTest.java
b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/dumper/MySQLIncrementalDumperTest.java
index 51a398810ed..cae3e3bb80c 100644
---
a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/dumper/MySQLIncrementalDumperTest.java
+++
b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/dumper/MySQLIncrementalDumperTest.java
@@ -32,11 +32,11 @@ import
org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTabl
import
org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineColumnMetaData;
import
org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineTableMetaData;
import
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.MySQLBinlogPosition;
-import
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.AbstractBinlogEvent;
-import
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.DeleteRowsEvent;
-import
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.PlaceholderEvent;
-import
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.UpdateRowsEvent;
-import
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.WriteRowsEvent;
+import
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.MySQLBaseBinlogEvent;
+import
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.rows.MySQLDeleteRowsBinlogEvent;
+import
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.PlaceholderBinlogEvent;
+import
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.rows.MySQLUpdateRowsBinlogEvent;
+import
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.rows.MySQLWriteRowsBinlogEvent;
import
org.apache.shardingsphere.infra.metadata.caseinsensitive.CaseInsensitiveIdentifier;
import org.apache.shardingsphere.test.fixture.jdbc.MockedDriver;
import org.junit.jupiter.api.BeforeAll;
@@ -141,16 +141,15 @@ class MySQLIncrementalDumperTest {
assertThat(((DataRecord) actual.get(0)).getColumnCount(), is(3));
}
- private WriteRowsEvent createWriteRowsEvent() {
- WriteRowsEvent result = new WriteRowsEvent();
+ private MySQLWriteRowsBinlogEvent createWriteRowsEvent() {
+ MySQLWriteRowsBinlogEvent result = new
MySQLWriteRowsBinlogEvent(Collections.singletonList(new Serializable[]{101, 1,
"OK"}));
result.setDatabaseName("");
result.setTableName("t_order");
- result.setAfterRows(Collections.singletonList(new Serializable[]{101,
1, "OK"}));
return result;
}
- private List<Record> getRecordsByWriteRowsEvent(final WriteRowsEvent
rowsEvent) throws ReflectiveOperationException {
- Method method =
MySQLIncrementalDumper.class.getDeclaredMethod("handleWriteRowsEvent",
WriteRowsEvent.class, PipelineTableMetaData.class);
+ private List<Record> getRecordsByWriteRowsEvent(final
MySQLWriteRowsBinlogEvent rowsEvent) throws ReflectiveOperationException {
+ Method method =
MySQLIncrementalDumper.class.getDeclaredMethod("handleWriteRowsEvent",
MySQLWriteRowsBinlogEvent.class, PipelineTableMetaData.class);
return (List<Record>) Plugins.getMemberAccessor().invoke(method,
incrementalDumper, rowsEvent, pipelineTableMetaData);
}
@@ -163,17 +162,15 @@ class MySQLIncrementalDumperTest {
assertThat(((DataRecord) actual.get(0)).getColumnCount(), is(3));
}
- private UpdateRowsEvent createUpdateRowsEvent() {
- UpdateRowsEvent result = new UpdateRowsEvent();
+ private MySQLUpdateRowsBinlogEvent createUpdateRowsEvent() {
+ MySQLUpdateRowsBinlogEvent result = new
MySQLUpdateRowsBinlogEvent(Collections.singletonList(new Serializable[]{101, 1,
"OK"}), Collections.singletonList(new Serializable[]{101, 1, "OK2"}));
result.setDatabaseName("test");
result.setTableName("t_order");
- result.setBeforeRows(Collections.singletonList(new Serializable[]{101,
1, "OK"}));
- result.setAfterRows(Collections.singletonList(new Serializable[]{101,
1, "OK2"}));
return result;
}
- private List<Record> getRecordsByUpdateRowsEvent(final UpdateRowsEvent
rowsEvent) throws ReflectiveOperationException {
- Method method =
MySQLIncrementalDumper.class.getDeclaredMethod("handleUpdateRowsEvent",
UpdateRowsEvent.class, PipelineTableMetaData.class);
+ private List<Record> getRecordsByUpdateRowsEvent(final
MySQLUpdateRowsBinlogEvent rowsEvent) throws ReflectiveOperationException {
+ Method method =
MySQLIncrementalDumper.class.getDeclaredMethod("handleUpdateRowsEvent",
MySQLUpdateRowsBinlogEvent.class, PipelineTableMetaData.class);
return (List<Record>) Plugins.getMemberAccessor().invoke(method,
incrementalDumper, rowsEvent, pipelineTableMetaData);
}
@@ -186,39 +183,37 @@ class MySQLIncrementalDumperTest {
assertThat(((DataRecord) actual.get(0)).getColumnCount(), is(3));
}
- private DeleteRowsEvent createDeleteRowsEvent() {
- DeleteRowsEvent result = new DeleteRowsEvent();
+ private MySQLDeleteRowsBinlogEvent createDeleteRowsEvent() {
+ MySQLDeleteRowsBinlogEvent result = new
MySQLDeleteRowsBinlogEvent(Collections.singletonList(new Serializable[]{101, 1,
"OK"}));
result.setDatabaseName("");
result.setTableName("t_order");
- result.setBeforeRows(Collections.singletonList(new Serializable[]{101,
1, "OK"}));
return result;
}
- private List<Record> getRecordsByDeleteRowsEvent(final DeleteRowsEvent
rowsEvent) throws ReflectiveOperationException {
- Method method =
MySQLIncrementalDumper.class.getDeclaredMethod("handleDeleteRowsEvent",
DeleteRowsEvent.class, PipelineTableMetaData.class);
+ private List<Record> getRecordsByDeleteRowsEvent(final
MySQLDeleteRowsBinlogEvent rowsEvent) throws ReflectiveOperationException {
+ Method method =
MySQLIncrementalDumper.class.getDeclaredMethod("handleDeleteRowsEvent",
MySQLDeleteRowsBinlogEvent.class, PipelineTableMetaData.class);
return (List<Record>) Plugins.getMemberAccessor().invoke(method,
incrementalDumper, rowsEvent, pipelineTableMetaData);
}
@Test
void assertPlaceholderEvent() throws ReflectiveOperationException {
- List<Record> actual = (List<Record>)
Plugins.getMemberAccessor().invoke(MySQLIncrementalDumper.class.getDeclaredMethod("handleEvent",
AbstractBinlogEvent.class),
- incrementalDumper, new PlaceholderEvent());
+ List<Record> actual = (List<Record>)
Plugins.getMemberAccessor().invoke(MySQLIncrementalDumper.class.getDeclaredMethod("handleEvent",
MySQLBaseBinlogEvent.class),
+ incrementalDumper, new PlaceholderBinlogEvent());
assertThat(actual.size(), is(1));
}
@Test
void assertRowsEventFiltered() throws ReflectiveOperationException {
- List<Record> actual = (List<Record>)
Plugins.getMemberAccessor().invoke(MySQLIncrementalDumper.class.getDeclaredMethod("handleEvent",
AbstractBinlogEvent.class),
+ List<Record> actual = (List<Record>)
Plugins.getMemberAccessor().invoke(MySQLIncrementalDumper.class.getDeclaredMethod("handleEvent",
MySQLBaseBinlogEvent.class),
incrementalDumper, getFilteredWriteRowsEvent());
assertThat(actual.size(), is(1));
assertThat(actual.get(0), instanceOf(DataRecord.class));
}
- private WriteRowsEvent getFilteredWriteRowsEvent() {
- WriteRowsEvent result = new WriteRowsEvent();
+ private MySQLWriteRowsBinlogEvent getFilteredWriteRowsEvent() {
+ MySQLWriteRowsBinlogEvent result = new
MySQLWriteRowsBinlogEvent(Collections.singletonList(new Serializable[]{1}));
result.setDatabaseName("test");
result.setTableName("t_order");
- result.setAfterRows(Collections.singletonList(new Serializable[]{1}));
return result;
}
}