Copilot commented on code in PR #16045:
URL: https://github.com/apache/dubbo/pull/16045#discussion_r2724068522


##########
dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleCustomerProtocolWrapper.java:
##########
@@ -168,6 +201,54 @@ public byte[] toByteArray() {
             return byteBuffer.array();
         }
 
+        public static TripleResponseWrapper parseFrom(InputStream inputStream) 
throws IOException {
+            TripleResponseWrapper tripleResponseWrapper = new 
TripleResponseWrapper();
+            int b;
+            while ((b = inputStream.read()) != -1) {
+                int tag = b;
+                if ((b & 0x80) != 0) {
+                    int shift = 7;
+                    tag = b & 0x7F;
+                    while (shift < 35) {
+                        b = inputStream.read();
+                        if (b == -1) {
+                            throw new IOException("Unexpected end of stream 
while reading tag");
+                        }
+                        tag |= (b & 0x7F) << shift;
+                        if ((b & 0x80) == 0) {
+                            break;
+                        }
+                        shift += 7;
+                    }
+                }
+

Review Comment:
   The varint parsing logic for tags (lines 208-223) is duplicated and slightly 
different from the readRawVarint32 method. Consider using readRawVarint32 for 
both tag and length parsing to reduce duplication and ensure consistency. The 
current implementation reads the first byte outside the loop and then continues 
if needed, which could be simplified.
   ```suggestion
                   final int firstByte = b;
                   InputStream tagInputStream = new InputStream() {
                       private boolean first = true;
   
                       @Override
                       public int read() throws IOException {
                           if (first) {
                               first = false;
                               return firstByte;
                           }
                           return inputStream.read();
                       }
   
                       @Override
                       public int read(byte[] buffer, int off, int len) throws 
IOException {
                           if (buffer == null) {
                               throw new NullPointerException();
                           }
                           if (off < 0 || len < 0 || len > buffer.length - off) 
{
                               throw new IndexOutOfBoundsException();
                           }
                           if (len == 0) {
                               return 0;
                           }
                           if (first) {
                               first = false;
                               buffer[off] = (byte) firstByte;
                               return 1;
                           }
                           return inputStream.read(buffer, off, len);
                       }
                   };
   
                   int tag = readRawVarint32(tagInputStream);
   ```



##########
dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/stream/MockClientStreamListener.java:
##########
@@ -40,8 +43,18 @@ public void onComplete(TriRpcStatus status, Map<String, 
Object> attachments) {
     public void onClose() {}
 
     @Override
-    public void onMessage(byte[] message, boolean isNeedReturnException) {
-        this.message = message;
+    public void onMessage(InputStream message, int messageLength, boolean 
isNeedReturnException) {
+        try {
+            ByteArrayOutputStream buffer = new ByteArrayOutputStream();
+            byte[] tmp = new byte[4096];
+            int len;
+            while ((len = message.read(tmp)) != -1) {
+                buffer.write(tmp, 0, len);
+            }
+            this.message = buffer.toByteArray();
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
     }

Review Comment:
   The InputStream passed to onMessage is not explicitly closed in the test 
implementation. While BoundedInputStream will return -1 after reading the 
expected bytes, the stream should be closed to properly release resources, 
especially when ByteBufInputStream is used with releaseOnClose=true. Consider 
adding a try-with-resources or explicit close() call, or document that the 
caller is responsible for closing the stream.



##########
dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/LengthFieldStreamingDecoder.java:
##########
@@ -262,6 +271,82 @@ protected byte[] readRawMessage(InputStream inputStream, 
int length) throws IOEx
         return data;
     }
 
+    protected static class MessageStream {
+
+        public final InputStream inputStream;
+        public final int length;
+
+        public MessageStream(InputStream inputStream, int length) {
+            this.inputStream = inputStream;
+            this.length = length;
+        }
+    }
+
+    /**
+     * A bounded InputStream that reads at most 'limit' bytes from the source 
stream.
+     * Extends BufferedInputStream to support mark/reset, which is required by
+     * deserializers like Hessian2.
+     */
+    private static class BoundedInputStream extends BufferedInputStream {
+
+        private final int limit;
+        private int remaining;
+        private int markedRemaining;
+
+        public BoundedInputStream(InputStream source, int limit) {
+            super(source, limit);
+            this.limit = limit;
+            this.remaining = limit;
+            this.markedRemaining = limit;
+        }
+
+        @Override
+        public int read() throws IOException {
+            if (remaining <= 0) {
+                return -1;
+            }
+            int result = super.read();
+            if (result != -1) {
+                remaining--;
+            }
+            return result;
+        }
+
+        @Override
+        public int read(byte[] b, int off, int len) throws IOException {

Review Comment:
   Method 'read' overrides a synchronized method in 
[java.io.BufferedInputStream](1) but is not synchronized.



##########
dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleCustomerProtocolWrapper.java:
##########
@@ -168,6 +201,54 @@ public byte[] toByteArray() {
             return byteBuffer.array();
         }
 
+        public static TripleResponseWrapper parseFrom(InputStream inputStream) 
throws IOException {
+            TripleResponseWrapper tripleResponseWrapper = new 
TripleResponseWrapper();
+            int b;
+            while ((b = inputStream.read()) != -1) {
+                int tag = b;
+                if ((b & 0x80) != 0) {
+                    int shift = 7;
+                    tag = b & 0x7F;
+                    while (shift < 35) {
+                        b = inputStream.read();
+                        if (b == -1) {
+                            throw new IOException("Unexpected end of stream 
while reading tag");
+                        }
+                        tag |= (b & 0x7F) << shift;
+                        if ((b & 0x80) == 0) {
+                            break;
+                        }
+                        shift += 7;
+                    }
+                }
+
+                int fieldNum = extractFieldNumFromTag(tag);
+                int wireType = extractWireTypeFromTag(tag);
+                if (wireType != 2) {
+                    throw new RuntimeException(
+                            String.format("unexpected wireType, expect %d 
realType %d", 2, wireType));
+                }
+
+                int length = readRawVarint32(inputStream);
+                byte[] fieldData = readExactly(inputStream, length);
+
+                if (fieldNum == 1) {
+                    tripleResponseWrapper.serializeType = new 
String(fieldData);
+                } else if (fieldNum == 2) {
+                    tripleResponseWrapper.data = fieldData;
+                } else if (fieldNum == 3) {
+                    tripleResponseWrapper.type = new String(fieldData);

Review Comment:
   String construction should specify charset explicitly. The current code uses 
the default platform charset (via new String(byte[])) which may differ across 
systems. Consider using new String(fieldData, StandardCharsets.UTF_8) to match 
the encoding used in toByteArray().
   ```suggestion
                       tripleResponseWrapper.serializeType = new 
String(fieldData, StandardCharsets.UTF_8);
                   } else if (fieldNum == 2) {
                       tripleResponseWrapper.data = fieldData;
                   } else if (fieldNum == 3) {
                       tripleResponseWrapper.type = new String(fieldData, 
StandardCharsets.UTF_8);
   ```



##########
dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleCustomerProtocolWrapper.java:
##########
@@ -317,6 +398,56 @@ public byte[] toByteArray() {
             return byteBuffer.array();
         }
 
+        public static TripleRequestWrapper parseFrom(InputStream inputStream) 
throws IOException {
+            TripleRequestWrapper tripleRequestWrapper = new 
TripleRequestWrapper();
+            tripleRequestWrapper.args = new ArrayList<>();
+            tripleRequestWrapper.argTypes = new ArrayList<>();
+            int b;
+            while ((b = inputStream.read()) != -1) {
+                int tag = b;
+                if ((b & 0x80) != 0) {
+                    int shift = 7;
+                    tag = b & 0x7F;
+                    while (shift < 35) {
+                        b = inputStream.read();
+                        if (b == -1) {
+                            throw new IOException("Unexpected end of stream 
while reading tag");
+                        }
+                        tag |= (b & 0x7F) << shift;
+                        if ((b & 0x80) == 0) {
+                            break;
+                        }
+                        shift += 7;
+                    }
+                }
+
+                int fieldNum = extractFieldNumFromTag(tag);
+                int wireType = extractWireTypeFromTag(tag);
+                if (wireType != 2) {
+                    throw new RuntimeException(
+                            String.format("unexpected wireType, expect %d 
realType %d", 2, wireType));
+                }
+
+                int length = readRawVarint32(inputStream);
+                byte[] fieldData = readExactly(inputStream, length);
+
+                if (fieldNum == 1) {
+                    tripleRequestWrapper.serializeType = new String(fieldData);
+                } else if (fieldNum == 2) {
+                    tripleRequestWrapper.args.add(fieldData);
+                } else if (fieldNum == 3) {
+                    tripleRequestWrapper.argTypes.add(new String(fieldData));

Review Comment:
   String construction should specify charset explicitly. The current code uses 
the default platform charset (via new String(byte[])) which may differ across 
systems. Consider using new String(fieldData, StandardCharsets.UTF_8) to match 
the encoding used in toByteArray().
   ```suggestion
                       tripleRequestWrapper.serializeType = new 
String(fieldData, StandardCharsets.UTF_8);
                   } else if (fieldNum == 2) {
                       tripleRequestWrapper.args.add(fieldData);
                   } else if (fieldNum == 3) {
                       tripleRequestWrapper.argTypes.add(new String(fieldData, 
StandardCharsets.UTF_8));
   ```



##########
dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/LengthFieldStreamingDecoder.java:
##########
@@ -262,6 +271,82 @@ protected byte[] readRawMessage(InputStream inputStream, 
int length) throws IOEx
         return data;
     }
 
+    protected static class MessageStream {
+
+        public final InputStream inputStream;
+        public final int length;
+
+        public MessageStream(InputStream inputStream, int length) {
+            this.inputStream = inputStream;
+            this.length = length;
+        }
+    }
+
+    /**
+     * A bounded InputStream that reads at most 'limit' bytes from the source 
stream.
+     * Extends BufferedInputStream to support mark/reset, which is required by
+     * deserializers like Hessian2.
+     */
+    private static class BoundedInputStream extends BufferedInputStream {
+
+        private final int limit;
+        private int remaining;
+        private int markedRemaining;
+
+        public BoundedInputStream(InputStream source, int limit) {
+            super(source, limit);
+            this.limit = limit;
+            this.remaining = limit;
+            this.markedRemaining = limit;
+        }
+
+        @Override
+        public int read() throws IOException {
+            if (remaining <= 0) {
+                return -1;
+            }
+            int result = super.read();
+            if (result != -1) {
+                remaining--;
+            }
+            return result;
+        }
+
+        @Override
+        public int read(byte[] b, int off, int len) throws IOException {
+            if (remaining <= 0) {
+                return -1;
+            }
+            int toRead = Math.min(len, remaining);
+            int result = super.read(b, off, toRead);
+            if (result > 0) {
+                remaining -= result;
+            }
+            return result;
+        }
+
+        @Override
+        public int available() throws IOException {

Review Comment:
   Method 'available' overrides a synchronized method in 
[java.io.BufferedInputStream](1) but is not synchronized.
   ```suggestion
           public synchronized int available() throws IOException {
   ```



##########
dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/AbstractTripleClientStream.java:
##########
@@ -583,7 +589,9 @@ private void doOnData(ByteBuf data, boolean endStream) {
                 
handleH2TransportError(TriRpcStatus.INTERNAL.withDescription("headers not 
received before payload"));
                 return;
             }
-            deframer.deframe(data);
+            // Use ByteBufInputStream to adapt ByteBuf to InputStream
+            // The second parameter 'true' means release ByteBuf after reading
+            deframer.decode(new ByteBufInputStream(data, true));

Review Comment:
   This ByteBufInputStream is not always closed on method exit.
   ```suggestion
               try (InputStream in = new ByteBufInputStream(data, true)) {
                   deframer.decode(in);
               }
   ```



##########
dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/DescriptorUtils.java:
##########
@@ -125,9 +124,9 @@ public static MethodDescriptor findTripleMethodDescriptor(
             ServiceDescriptor serviceDescriptor, String methodName, 
InputStream rawMessage) throws IOException {
         MethodDescriptor methodDescriptor = 
findReflectionMethodDescriptor(serviceDescriptor, methodName);
         if (methodDescriptor == null) {

Review Comment:
   Using mark with Integer.MAX_VALUE as the readlimit parameter may not work 
with all InputStream implementations. The BoundedInputStream created in 
LengthFieldStreamingDecoder overrides mark to ensure sufficient buffer size, 
but this assumption may not hold for all input streams passed to this method. 
Consider checking if mark is supported using markSupported() before calling 
mark, or document that the input stream must support mark/reset.
   ```suggestion
           if (methodDescriptor == null) {
               if (!rawMessage.markSupported()) {
                   throw new IOException("InputStream must support mark/reset 
to resolve overloaded triple methods");
               }
   ```



##########
dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/LengthFieldStreamingDecoder.java:
##########
@@ -262,6 +271,82 @@ protected byte[] readRawMessage(InputStream inputStream, 
int length) throws IOEx
         return data;
     }
 
+    protected static class MessageStream {
+
+        public final InputStream inputStream;
+        public final int length;
+
+        public MessageStream(InputStream inputStream, int length) {
+            this.inputStream = inputStream;
+            this.length = length;
+        }
+    }
+
+    /**
+     * A bounded InputStream that reads at most 'limit' bytes from the source 
stream.
+     * Extends BufferedInputStream to support mark/reset, which is required by
+     * deserializers like Hessian2.
+     */
+    private static class BoundedInputStream extends BufferedInputStream {
+
+        private final int limit;
+        private int remaining;
+        private int markedRemaining;
+
+        public BoundedInputStream(InputStream source, int limit) {
+            super(source, limit);
+            this.limit = limit;
+            this.remaining = limit;
+            this.markedRemaining = limit;
+        }

Review Comment:
   The BoundedInputStream constructor uses the limit as the buffer size for 
BufferedInputStream (line 297). For large messages, this could allocate 
unnecessarily large buffers. Consider using a more reasonable buffer size 
(e.g., Math.min(limit, 8192)) to avoid excessive memory allocation while still 
providing good buffering performance.



##########
dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/LengthFieldStreamingDecoder.java:
##########
@@ -262,6 +271,82 @@ protected byte[] readRawMessage(InputStream inputStream, 
int length) throws IOEx
         return data;
     }
 
+    protected static class MessageStream {
+
+        public final InputStream inputStream;
+        public final int length;
+
+        public MessageStream(InputStream inputStream, int length) {
+            this.inputStream = inputStream;
+            this.length = length;
+        }
+    }
+
+    /**
+     * A bounded InputStream that reads at most 'limit' bytes from the source 
stream.
+     * Extends BufferedInputStream to support mark/reset, which is required by
+     * deserializers like Hessian2.
+     */
+    private static class BoundedInputStream extends BufferedInputStream {
+
+        private final int limit;
+        private int remaining;
+        private int markedRemaining;
+
+        public BoundedInputStream(InputStream source, int limit) {
+            super(source, limit);
+            this.limit = limit;
+            this.remaining = limit;
+            this.markedRemaining = limit;
+        }
+
+        @Override
+        public int read() throws IOException {

Review Comment:
   Method 'read' overrides a synchronized method in 
[java.io.BufferedInputStream](1) but is not synchronized.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to