This is an automated email from the ASF dual-hosted git repository.

zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 1958c4eb78d Supports receiving MySQL packet more than 16 MB (#17914)
1958c4eb78d is described below

commit 1958c4eb78deab332c93d3a5093b597973e48af3
Author: 吴伟杰 <wuwei...@apache.org>
AuthorDate: Tue May 24 23:18:39 2022 +0800

    Supports receiving MySQL packet more than 16 MB (#17914)
    
    * Supports receiving MySQL packet more than 16 MB
    
    * Complete MySQLPacketCodecEngineTest
---
 .../mysql/codec/MySQLPacketCodecEngine.java        | 30 +++++++++++++++++++++-
 .../mysql/codec/MySQLPacketCodecEngineTest.java    | 23 ++++++++++++-----
 2 files changed, 46 insertions(+), 7 deletions(-)

diff --git 
a/shardingsphere-db-protocol/shardingsphere-db-protocol-mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/codec/MySQLPacketCodecEngine.java
 
b/shardingsphere-db-protocol/shardingsphere-db-protocol-mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/codec/MySQLPacketCodecEngine.java
index 33b09cf8689..3e1f1708c26 100644
--- 
a/shardingsphere-db-protocol/shardingsphere-db-protocol-mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/codec/MySQLPacketCodecEngine.java
+++ 
b/shardingsphere-db-protocol/shardingsphere-db-protocol-mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/codec/MySQLPacketCodecEngine.java
@@ -18,6 +18,7 @@
 package org.apache.shardingsphere.db.protocol.mysql.codec;
 
 import io.netty.buffer.ByteBuf;
+import io.netty.buffer.CompositeByteBuf;
 import io.netty.channel.ChannelHandlerContext;
 import org.apache.shardingsphere.db.protocol.CommonConstants;
 import org.apache.shardingsphere.db.protocol.codec.DatabasePacketCodecEngine;
@@ -27,6 +28,8 @@ import 
org.apache.shardingsphere.db.protocol.mysql.packet.generic.MySQLErrPacket
 import org.apache.shardingsphere.db.protocol.mysql.payload.MySQLPacketPayload;
 
 import java.nio.charset.Charset;
+import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.List;
 
 /**
@@ -34,10 +37,14 @@ import java.util.List;
  */
 public final class MySQLPacketCodecEngine implements 
DatabasePacketCodecEngine<MySQLPacket> {
     
+    private static final int MAX_PACKET_LENGTH = 0xFFFFFF;
+    
     private static final int PAYLOAD_LENGTH = 3;
     
     private static final int SEQUENCE_LENGTH = 1;
     
+    private final List<ByteBuf> pendingMessages = new LinkedList<>();
+    
     @Override
     public boolean isValidHeader(final int readableBytes) {
         return readableBytes >= PAYLOAD_LENGTH + SEQUENCE_LENGTH;
@@ -51,7 +58,28 @@ public final class MySQLPacketCodecEngine implements 
DatabasePacketCodecEngine<M
             in.resetReaderIndex();
             return;
         }
-        out.add(in.readRetainedSlice(SEQUENCE_LENGTH + payloadLength));
+        ByteBuf message = in.readRetainedSlice(SEQUENCE_LENGTH + 
payloadLength);
+        if (MAX_PACKET_LENGTH == payloadLength) {
+            pendingMessages.add(message);
+        } else if (pendingMessages.isEmpty()) {
+            out.add(message);
+        } else {
+            aggregateMessages(context, message, out);
+        }
+    }
+    
+    private void aggregateMessages(final ChannelHandlerContext context, final 
ByteBuf lastMessage, final List<Object> out) {
+        CompositeByteBuf result = 
context.alloc().compositeBuffer(pendingMessages.size() + 1);
+        Iterator<ByteBuf> pendingMessagesIterator = pendingMessages.iterator();
+        result.addComponent(true, pendingMessagesIterator.next());
+        while (pendingMessagesIterator.hasNext()) {
+            result.addComponent(true, 
pendingMessagesIterator.next().skipBytes(1));
+        }
+        if (lastMessage.readableBytes() > 1) {
+            result.addComponent(true, lastMessage.skipBytes(1));
+        }
+        out.add(result);
+        pendingMessages.clear();
     }
     
     @Override
diff --git 
a/shardingsphere-db-protocol/shardingsphere-db-protocol-mysql/src/test/java/org/apache/shardingsphere/db/protocol/mysql/codec/MySQLPacketCodecEngineTest.java
 
b/shardingsphere-db-protocol/shardingsphere-db-protocol-mysql/src/test/java/org/apache/shardingsphere/db/protocol/mysql/codec/MySQLPacketCodecEngineTest.java
index 44b7598abc9..7fb70ac37e4 100644
--- 
a/shardingsphere-db-protocol/shardingsphere-db-protocol-mysql/src/test/java/org/apache/shardingsphere/db/protocol/mysql/codec/MySQLPacketCodecEngineTest.java
+++ 
b/shardingsphere-db-protocol/shardingsphere-db-protocol-mysql/src/test/java/org/apache/shardingsphere/db/protocol/mysql/codec/MySQLPacketCodecEngineTest.java
@@ -18,7 +18,9 @@
 package org.apache.shardingsphere.db.protocol.mysql.codec;
 
 import io.netty.buffer.ByteBuf;
+import io.netty.buffer.CompositeByteBuf;
 import io.netty.buffer.Unpooled;
+import io.netty.buffer.UnpooledByteBufAllocator;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.util.AttributeKey;
 import org.apache.shardingsphere.db.protocol.mysql.packet.MySQLPacket;
@@ -33,6 +35,7 @@ import org.mockito.junit.MockitoJUnitRunner;
 import java.nio.charset.Charset;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.LinkedList;
 import java.util.List;
 
@@ -102,17 +105,25 @@ public final class MySQLPacketCodecEngineTest {
     }
     
     @Test
-    public void assertDecodeMaxLengthPacket() {
-        byte[] packetData = new byte[(1 << 24) + 4];
-        packetData[0] = packetData[1] = packetData[2] = (byte) 0xff;
-        packetData[3] = (byte) 0;
-        ByteBuf input = Unpooled.wrappedBuffer(packetData);
+    public void assertDecodePacketMoreThan16MB() {
+        MySQLPacketCodecEngine engine = new MySQLPacketCodecEngine();
+        when(context.alloc().compositeBuffer(2)).thenReturn(new 
CompositeByteBuf(UnpooledByteBufAllocator.DEFAULT, false, 2));
         List<Object> actual = new ArrayList<>(1);
-        new MySQLPacketCodecEngine().decode(null, input, actual);
+        for (ByteBuf each : preparePacketMoreThan16MB()) {
+            engine.decode(context, each, actual);
+        }
         assertThat(actual.size(), is(1));
         assertThat(((ByteBuf) actual.get(0)).readableBytes(), is(1 << 24));
     }
     
+    private List<ByteBuf> preparePacketMoreThan16MB() {
+        byte[] firstPacketData = new byte[4 + (1 << 24) - 1];
+        firstPacketData[0] = firstPacketData[1] = firstPacketData[2] = (byte) 
0xff;
+        firstPacketData[3] = (byte) 0;
+        byte[] secondPacketData = new byte[]{0x00, 0x00, 0x00, 0x01};
+        return Arrays.asList(Unpooled.wrappedBuffer(firstPacketData), 
Unpooled.wrappedBuffer(secondPacketData));
+    }
+    
     @Test
     public void assertEncode() {
         when(byteBuf.writeInt(anyInt())).thenReturn(byteBuf);

Reply via email to