This is an automated email from the ASF dual-hosted git repository. zhangliang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push: new 1958c4eb78d Supports receiving MySQL packet more than 16 MB (#17914) 1958c4eb78d is described below commit 1958c4eb78deab332c93d3a5093b597973e48af3 Author: 吴伟杰 <wuwei...@apache.org> AuthorDate: Tue May 24 23:18:39 2022 +0800 Supports receiving MySQL packet more than 16 MB (#17914) * Supports receiving MySQL packet more than 16 MB * Complete MySQLPacketCodecEngineTest --- .../mysql/codec/MySQLPacketCodecEngine.java | 30 +++++++++++++++++++++- .../mysql/codec/MySQLPacketCodecEngineTest.java | 23 ++++++++++++----- 2 files changed, 46 insertions(+), 7 deletions(-) diff --git a/shardingsphere-db-protocol/shardingsphere-db-protocol-mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/codec/MySQLPacketCodecEngine.java b/shardingsphere-db-protocol/shardingsphere-db-protocol-mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/codec/MySQLPacketCodecEngine.java index 33b09cf8689..3e1f1708c26 100644 --- a/shardingsphere-db-protocol/shardingsphere-db-protocol-mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/codec/MySQLPacketCodecEngine.java +++ b/shardingsphere-db-protocol/shardingsphere-db-protocol-mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/codec/MySQLPacketCodecEngine.java @@ -18,6 +18,7 @@ package org.apache.shardingsphere.db.protocol.mysql.codec; import io.netty.buffer.ByteBuf; +import io.netty.buffer.CompositeByteBuf; import io.netty.channel.ChannelHandlerContext; import org.apache.shardingsphere.db.protocol.CommonConstants; import org.apache.shardingsphere.db.protocol.codec.DatabasePacketCodecEngine; @@ -27,6 +28,8 @@ import org.apache.shardingsphere.db.protocol.mysql.packet.generic.MySQLErrPacket import org.apache.shardingsphere.db.protocol.mysql.payload.MySQLPacketPayload; import java.nio.charset.Charset; +import java.util.Iterator; +import java.util.LinkedList; import java.util.List; /** @@ -34,10 +37,14 @@ import java.util.List; */ public final class MySQLPacketCodecEngine implements DatabasePacketCodecEngine<MySQLPacket> { + private static final int MAX_PACKET_LENGTH = 0xFFFFFF; + private static final int PAYLOAD_LENGTH = 3; private static final int SEQUENCE_LENGTH = 1; + private final List<ByteBuf> pendingMessages = new LinkedList<>(); + @Override public boolean isValidHeader(final int readableBytes) { return readableBytes >= PAYLOAD_LENGTH + SEQUENCE_LENGTH; @@ -51,7 +58,28 @@ public final class MySQLPacketCodecEngine implements DatabasePacketCodecEngine<M in.resetReaderIndex(); return; } - out.add(in.readRetainedSlice(SEQUENCE_LENGTH + payloadLength)); + ByteBuf message = in.readRetainedSlice(SEQUENCE_LENGTH + payloadLength); + if (MAX_PACKET_LENGTH == payloadLength) { + pendingMessages.add(message); + } else if (pendingMessages.isEmpty()) { + out.add(message); + } else { + aggregateMessages(context, message, out); + } + } + + private void aggregateMessages(final ChannelHandlerContext context, final ByteBuf lastMessage, final List<Object> out) { + CompositeByteBuf result = context.alloc().compositeBuffer(pendingMessages.size() + 1); + Iterator<ByteBuf> pendingMessagesIterator = pendingMessages.iterator(); + result.addComponent(true, pendingMessagesIterator.next()); + while (pendingMessagesIterator.hasNext()) { + result.addComponent(true, pendingMessagesIterator.next().skipBytes(1)); + } + if (lastMessage.readableBytes() > 1) { + result.addComponent(true, lastMessage.skipBytes(1)); + } + out.add(result); + pendingMessages.clear(); } @Override diff --git a/shardingsphere-db-protocol/shardingsphere-db-protocol-mysql/src/test/java/org/apache/shardingsphere/db/protocol/mysql/codec/MySQLPacketCodecEngineTest.java b/shardingsphere-db-protocol/shardingsphere-db-protocol-mysql/src/test/java/org/apache/shardingsphere/db/protocol/mysql/codec/MySQLPacketCodecEngineTest.java index 44b7598abc9..7fb70ac37e4 100644 --- a/shardingsphere-db-protocol/shardingsphere-db-protocol-mysql/src/test/java/org/apache/shardingsphere/db/protocol/mysql/codec/MySQLPacketCodecEngineTest.java +++ b/shardingsphere-db-protocol/shardingsphere-db-protocol-mysql/src/test/java/org/apache/shardingsphere/db/protocol/mysql/codec/MySQLPacketCodecEngineTest.java @@ -18,7 +18,9 @@ package org.apache.shardingsphere.db.protocol.mysql.codec; import io.netty.buffer.ByteBuf; +import io.netty.buffer.CompositeByteBuf; import io.netty.buffer.Unpooled; +import io.netty.buffer.UnpooledByteBufAllocator; import io.netty.channel.ChannelHandlerContext; import io.netty.util.AttributeKey; import org.apache.shardingsphere.db.protocol.mysql.packet.MySQLPacket; @@ -33,6 +35,7 @@ import org.mockito.junit.MockitoJUnitRunner; import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; import java.util.ArrayList; +import java.util.Arrays; import java.util.LinkedList; import java.util.List; @@ -102,17 +105,25 @@ public final class MySQLPacketCodecEngineTest { } @Test - public void assertDecodeMaxLengthPacket() { - byte[] packetData = new byte[(1 << 24) + 4]; - packetData[0] = packetData[1] = packetData[2] = (byte) 0xff; - packetData[3] = (byte) 0; - ByteBuf input = Unpooled.wrappedBuffer(packetData); + public void assertDecodePacketMoreThan16MB() { + MySQLPacketCodecEngine engine = new MySQLPacketCodecEngine(); + when(context.alloc().compositeBuffer(2)).thenReturn(new CompositeByteBuf(UnpooledByteBufAllocator.DEFAULT, false, 2)); List<Object> actual = new ArrayList<>(1); - new MySQLPacketCodecEngine().decode(null, input, actual); + for (ByteBuf each : preparePacketMoreThan16MB()) { + engine.decode(context, each, actual); + } assertThat(actual.size(), is(1)); assertThat(((ByteBuf) actual.get(0)).readableBytes(), is(1 << 24)); } + private List<ByteBuf> preparePacketMoreThan16MB() { + byte[] firstPacketData = new byte[4 + (1 << 24) - 1]; + firstPacketData[0] = firstPacketData[1] = firstPacketData[2] = (byte) 0xff; + firstPacketData[3] = (byte) 0; + byte[] secondPacketData = new byte[]{0x00, 0x00, 0x00, 0x01}; + return Arrays.asList(Unpooled.wrappedBuffer(firstPacketData), Unpooled.wrappedBuffer(secondPacketData)); + } + @Test public void assertEncode() { when(byteBuf.writeInt(anyInt())).thenReturn(byteBuf);