This is an automated email from the ASF dual-hosted git repository.
zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 301cee319c6 Improve MySQL binlog query event decode (#29064)
301cee319c6 is described below
commit 301cee319c69d81c2c2d919cad8e95234d9beb75
Author: Xinze Guo <[email protected]>
AuthorDate: Tue Nov 21 16:13:01 2023 +0800
Improve MySQL binlog query event decode (#29064)
---
.../pipeline/mysql/ingest/MySQLIncrementalDumper.java | 7 ++-----
.../client/netty/MySQLBinlogEventPacketDecoder.java | 15 +++++++++++----
.../client/netty/MySQLBinlogEventPacketDecoderTest.java | 17 +++++++++++++++++
3 files changed, 30 insertions(+), 9 deletions(-)
diff --git
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
index fd6a290997c..e3e093d777c 100644
---
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
+++
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
@@ -116,11 +116,8 @@ public final class MySQLIncrementalDumper extends
AbstractPipelineLifecycleRunna
private void handleEvents(final List<AbstractBinlogEvent> events) {
List<Record> dataRecords = new LinkedList<>();
for (AbstractBinlogEvent each : events) {
- if (!(each instanceof AbstractRowsEvent)) {
- dataRecords.add(createPlaceholderRecord(each));
- continue;
- }
- dataRecords.addAll(handleEvent(each));
+ List<? extends Record> records = handleEvent(each);
+ dataRecords.addAll(records);
}
if (dataRecords.isEmpty()) {
return;
diff --git
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/netty/MySQLBinlogEventPacketDecoder.java
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/netty/MySQLBinlogEventPacketDecoder.java
index 0bcaabba7fe..4b3fff759b3 100644
---
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/netty/MySQLBinlogEventPacketDecoder.java
+++
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/netty/MySQLBinlogEventPacketDecoder.java
@@ -125,6 +125,8 @@ public final class MySQLBinlogEventPacketDecoder extends
ByteToMessageDecoder {
QueryEvent queryEvent = (QueryEvent) binlogEvent;
if (TX_BEGIN_SQL.equals(queryEvent.getSql())) {
records = new LinkedList<>();
+ } else {
+ out.add(binlogEvent);
}
} else if (binlogEvent instanceof XidEvent) {
records.add(binlogEvent);
@@ -165,7 +167,7 @@ public final class MySQLBinlogEventPacketDecoder extends
ByteToMessageDecoder {
case DELETE_ROWS_EVENT_V2:
return Optional.of(decodeDeleteRowsEventV2(binlogEventHeader,
payload));
case QUERY_EVENT:
- return
Optional.of(decodeQueryEvent(binlogEventHeader.getChecksumLength(), payload));
+ return Optional.of(decodeQueryEvent(binlogEventHeader,
payload));
case XID_EVENT:
return Optional.of(decodeXidEvent(binlogEventHeader, payload));
default:
@@ -241,15 +243,20 @@ public final class MySQLBinlogEventPacketDecoder extends
ByteToMessageDecoder {
return result;
}
- private QueryEvent decodeQueryEvent(final int checksumLength, final
MySQLPacketPayload payload) {
+ private QueryEvent decodeQueryEvent(final MySQLBinlogEventHeader
binlogEventHeader, final MySQLPacketPayload payload) {
int threadId = payload.readInt4();
int executionTime = payload.readInt4();
payload.skipReserved(1);
int errorCode = payload.readInt2();
payload.skipReserved(payload.readInt2());
String databaseName = payload.readStringNul();
- String sql =
payload.readStringFix(payload.getByteBuf().readableBytes() - checksumLength);
- return new QueryEvent(threadId, executionTime, errorCode,
databaseName, sql);
+ String sql =
payload.readStringFix(payload.getByteBuf().readableBytes() -
binlogEventHeader.getChecksumLength());
+ QueryEvent result = new QueryEvent(threadId, executionTime, errorCode,
databaseName, sql);
+ result.setFileName(binlogContext.getFileName());
+ result.setPosition(binlogEventHeader.getLogPos());
+ result.setTimestamp(binlogEventHeader.getTimestamp());
+ result.setServerId(binlogEventHeader.getServerId());
+ return result;
}
private XidEvent decodeXidEvent(final MySQLBinlogEventHeader
binlogEventHeader, final MySQLPacketPayload payload) {
diff --git
a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/netty/MySQLBinlogEventPacketDecoderTest.java
b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/netty/MySQLBinlogEventPacketDecoderTest.java
index 651188775ff..3aa102d5633 100644
---
a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/netty/MySQLBinlogEventPacketDecoderTest.java
+++
b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/netty/MySQLBinlogEventPacketDecoderTest.java
@@ -25,6 +25,7 @@ import io.netty.channel.ChannelHandlerContext;
import io.netty.util.internal.StringUtil;
import
org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.BinlogContext;
import
org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.event.DeleteRowsEvent;
+import
org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.event.QueryEvent;
import
org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.event.UpdateRowsEvent;
import
org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.event.WriteRowsEvent;
import
org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.event.XidEvent;
@@ -50,6 +51,8 @@ import java.util.concurrent.ConcurrentHashMap;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.when;
@@ -108,6 +111,20 @@ class MySQLBinlogEventPacketDecoderTest {
assertThat(binlogContext.getChecksumLength(), is(4));
}
+ @Test
+ void assertDecodeQueryEvent() {
+ ByteBuf byteBuf = Unpooled.buffer();
+
byteBuf.writeBytes(StringUtil.decodeHexDump("00f3e25665020100000087000000c2740f0a0400c9150000000000000400002d000000000000012000a045000000000603737464042d002d00e0000c0164735f3000116df40b00000"
+ +
"0000012ff0064735f300044524f50205441424c452060745f70726f76696e636560202f2a2067656e65726174656420627920736572766572202a2fcefe4ec6"));
+ List<Object> decodedEvents = new LinkedList<>();
+ binlogEventPacketDecoder.decode(channelHandlerContext, byteBuf,
decodedEvents);
+ assertFalse(decodedEvents.isEmpty());
+ Object actual = decodedEvents.get(0);
+ assertInstanceOf(QueryEvent.class, actual);
+ assertThat(((QueryEvent) actual).getTimestamp(), is(1700193011L));
+ assertThat(((QueryEvent) actual).getPosition(), is(168785090L));
+ }
+
@Test
void assertDecodeTableMapEvent() {
ByteBuf byteBuf = ByteBufAllocator.DEFAULT.buffer();