This is an automated email from the ASF dual-hosted git repository.

zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 301cee319c6 Improve MySQL binlog query event decode (#29064)
301cee319c6 is described below

commit 301cee319c69d81c2c2d919cad8e95234d9beb75
Author: Xinze Guo <[email protected]>
AuthorDate: Tue Nov 21 16:13:01 2023 +0800

    Improve MySQL binlog query event decode (#29064)
---
 .../pipeline/mysql/ingest/MySQLIncrementalDumper.java   |  7 ++-----
 .../client/netty/MySQLBinlogEventPacketDecoder.java     | 15 +++++++++++----
 .../client/netty/MySQLBinlogEventPacketDecoderTest.java | 17 +++++++++++++++++
 3 files changed, 30 insertions(+), 9 deletions(-)

diff --git 
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
 
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
index fd6a290997c..e3e093d777c 100644
--- 
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
+++ 
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
@@ -116,11 +116,8 @@ public final class MySQLIncrementalDumper extends 
AbstractPipelineLifecycleRunna
     private void handleEvents(final List<AbstractBinlogEvent> events) {
         List<Record> dataRecords = new LinkedList<>();
         for (AbstractBinlogEvent each : events) {
-            if (!(each instanceof AbstractRowsEvent)) {
-                dataRecords.add(createPlaceholderRecord(each));
-                continue;
-            }
-            dataRecords.addAll(handleEvent(each));
+            List<? extends Record> records = handleEvent(each);
+            dataRecords.addAll(records);
         }
         if (dataRecords.isEmpty()) {
             return;
diff --git 
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/netty/MySQLBinlogEventPacketDecoder.java
 
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/netty/MySQLBinlogEventPacketDecoder.java
index 0bcaabba7fe..4b3fff759b3 100644
--- 
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/netty/MySQLBinlogEventPacketDecoder.java
+++ 
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/netty/MySQLBinlogEventPacketDecoder.java
@@ -125,6 +125,8 @@ public final class MySQLBinlogEventPacketDecoder extends 
ByteToMessageDecoder {
             QueryEvent queryEvent = (QueryEvent) binlogEvent;
             if (TX_BEGIN_SQL.equals(queryEvent.getSql())) {
                 records = new LinkedList<>();
+            } else {
+                out.add(binlogEvent);
             }
         } else if (binlogEvent instanceof XidEvent) {
             records.add(binlogEvent);
@@ -165,7 +167,7 @@ public final class MySQLBinlogEventPacketDecoder extends 
ByteToMessageDecoder {
             case DELETE_ROWS_EVENT_V2:
                 return Optional.of(decodeDeleteRowsEventV2(binlogEventHeader, 
payload));
             case QUERY_EVENT:
-                return 
Optional.of(decodeQueryEvent(binlogEventHeader.getChecksumLength(), payload));
+                return Optional.of(decodeQueryEvent(binlogEventHeader, 
payload));
             case XID_EVENT:
                 return Optional.of(decodeXidEvent(binlogEventHeader, payload));
             default:
@@ -241,15 +243,20 @@ public final class MySQLBinlogEventPacketDecoder extends 
ByteToMessageDecoder {
         return result;
     }
     
-    private QueryEvent decodeQueryEvent(final int checksumLength, final 
MySQLPacketPayload payload) {
+    private QueryEvent decodeQueryEvent(final MySQLBinlogEventHeader 
binlogEventHeader, final MySQLPacketPayload payload) {
         int threadId = payload.readInt4();
         int executionTime = payload.readInt4();
         payload.skipReserved(1);
         int errorCode = payload.readInt2();
         payload.skipReserved(payload.readInt2());
         String databaseName = payload.readStringNul();
-        String sql = 
payload.readStringFix(payload.getByteBuf().readableBytes() - checksumLength);
-        return new QueryEvent(threadId, executionTime, errorCode, 
databaseName, sql);
+        String sql = 
payload.readStringFix(payload.getByteBuf().readableBytes() - 
binlogEventHeader.getChecksumLength());
+        QueryEvent result = new QueryEvent(threadId, executionTime, errorCode, 
databaseName, sql);
+        result.setFileName(binlogContext.getFileName());
+        result.setPosition(binlogEventHeader.getLogPos());
+        result.setTimestamp(binlogEventHeader.getTimestamp());
+        result.setServerId(binlogEventHeader.getServerId());
+        return result;
     }
     
     private XidEvent decodeXidEvent(final MySQLBinlogEventHeader 
binlogEventHeader, final MySQLPacketPayload payload) {
diff --git 
a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/netty/MySQLBinlogEventPacketDecoderTest.java
 
b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/netty/MySQLBinlogEventPacketDecoderTest.java
index 651188775ff..3aa102d5633 100644
--- 
a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/netty/MySQLBinlogEventPacketDecoderTest.java
+++ 
b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/netty/MySQLBinlogEventPacketDecoderTest.java
@@ -25,6 +25,7 @@ import io.netty.channel.ChannelHandlerContext;
 import io.netty.util.internal.StringUtil;
 import 
org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.BinlogContext;
 import 
org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.event.DeleteRowsEvent;
+import 
org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.event.QueryEvent;
 import 
org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.event.UpdateRowsEvent;
 import 
org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.event.WriteRowsEvent;
 import 
org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.event.XidEvent;
@@ -50,6 +51,8 @@ import java.util.concurrent.ConcurrentHashMap;
 import static org.hamcrest.CoreMatchers.instanceOf;
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.Mockito.when;
@@ -108,6 +111,20 @@ class MySQLBinlogEventPacketDecoderTest {
         assertThat(binlogContext.getChecksumLength(), is(4));
     }
     
+    @Test
+    void assertDecodeQueryEvent() {
+        ByteBuf byteBuf = Unpooled.buffer();
+        
byteBuf.writeBytes(StringUtil.decodeHexDump("00f3e25665020100000087000000c2740f0a0400c9150000000000000400002d000000000000012000a045000000000603737464042d002d00e0000c0164735f3000116df40b00000"
+                + 
"0000012ff0064735f300044524f50205441424c452060745f70726f76696e636560202f2a2067656e65726174656420627920736572766572202a2fcefe4ec6"));
+        List<Object> decodedEvents = new LinkedList<>();
+        binlogEventPacketDecoder.decode(channelHandlerContext, byteBuf, 
decodedEvents);
+        assertFalse(decodedEvents.isEmpty());
+        Object actual = decodedEvents.get(0);
+        assertInstanceOf(QueryEvent.class, actual);
+        assertThat(((QueryEvent) actual).getTimestamp(), is(1700193011L));
+        assertThat(((QueryEvent) actual).getPosition(), is(168785090L));
+    }
+    
     @Test
     void assertDecodeTableMapEvent() {
         ByteBuf byteBuf = ByteBufAllocator.DEFAULT.buffer();

Reply via email to