Copilot commented on code in PR #16072:
URL: https://github.com/apache/dubbo/pull/16072#discussion_r2768045297


##########
dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/compressor/Snappy.java:
##########
@@ -49,7 +50,8 @@ public byte[] compress(byte[] payloadByteArr) throws 
RpcException {
 
     @Override
     public OutputStream decorate(OutputStream outputStream) {
-        return outputStream;
+        // Snappy wraps output with SnappyOutputStream for streaming 
compression
+        return new org.xerial.snappy.SnappyOutputStream(outputStream);

Review Comment:
   The SnappyOutputStream constructor can throw IOException, but this is not 
being caught. While Gzip and Bzip2 properly wrap IOExceptions from their stream 
constructors in IllegalStateException (see lines 70-74 in Gzip.java and lines 
76-80 in Bzip2.java), the Snappy implementation doesn't follow this pattern. 
This will cause a compilation error. Wrap the construction in a try-catch block 
and throw IllegalStateException, consistent with the other compressor 
implementations.
   ```suggestion
           try {
               return new org.xerial.snappy.SnappyOutputStream(outputStream);
           } catch (IOException e) {
               throw new IllegalStateException(e);
           }
   ```



##########
dubbo-common/src/main/java/org/apache/dubbo/common/io/UnsafeByteArrayInputStream.java:
##########
@@ -114,4 +115,15 @@ public void position(int newPosition) {
     public int size() {
         return mData == null ? 0 : mData.length;
     }
+
+    /**
+     * Write remaining data directly to output stream without copying through 
intermediate buffer.
+     *
+     * @param out the output stream to write to
+     * @throws IOException if an I/O error occurs
+     */
+    public void writeTo(OutputStream out) throws IOException {
+        out.write(mData, mPosition, mLimit - mPosition);
+        mPosition = mLimit;
+    }

Review Comment:
   The new writeTo(OutputStream) method in UnsafeByteArrayInputStream is not 
tested. While integration tests may exercise this path through 
StreamUtils.copy, consider adding a unit test to verify that: 1) data is 
written correctly from mPosition to mLimit, 2) mPosition is updated to mLimit 
after the write, and 3) the zero-copy behavior works correctly (writing 
directly from the internal buffer).



##########
dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/Stream.java:
##########
@@ -43,7 +43,9 @@ interface Listener {
          * streaming .
          *
          * @param message message received from remote peer as InputStream
-         * @param messageLength the length of the message in bytes
+         * @param messageLength the length of the message in bytes.
+         *                      For compressed messages, this may be -1 to 
indicate unknown length
+         *                      since decompressed size cannot be known until 
all bytes are read.

Review Comment:
   The documentation states "For compressed messages, this may be -1 to 
indicate unknown length since decompressed size cannot be known until all bytes 
are read." However, this is misleading. The -1 value is used when streaming 
decompression is employed (as implemented in GrpcStreamingDecoder line 58), but 
the actual decompressed size CAN be known for some compression formats (like 
gzip which stores original size in the trailer). The comment should clarify 
that -1 is used when employing streaming decompression to avoid buffering the 
entire decompressed message in memory, not because the size is fundamentally 
unknowable.
   ```suggestion
            *                      For compressed messages processed via 
streaming decompression,
            *                      this may be -1 to indicate that the total 
decompressed length is
            *                      not computed up front to avoid buffering the 
entire message in memory.
   ```



##########
dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/StreamingDecoder.java:
##########
@@ -45,7 +45,9 @@ interface FragmentListener {
          * Called when a complete message fragment is received.
          *
          * @param rawMessage raw message as InputStream
-         * @param messageLength the length of the message payload in bytes
+         * @param messageLength the length of the message payload in bytes.
+         *                      For compressed messages, this may be -1 to 
indicate unknown length
+         *                      since decompressed size cannot be known until 
all bytes are read.

Review Comment:
   The documentation states "For compressed messages, this may be -1 to 
indicate unknown length since decompressed size cannot be known until all bytes 
are read." However, this is misleading. The -1 value is used when streaming 
decompression is employed (as implemented in GrpcStreamingDecoder line 58), but 
the actual decompressed size CAN be known for some compression formats (like 
gzip which stores original size in the trailer). The comment should clarify 
that -1 is used when employing streaming decompression to avoid buffering the 
entire decompressed message in memory, not because the size is fundamentally 
unknowable.
   ```suggestion
            *                      For compressed messages using streaming 
decompression, this may be -1
            *                      to indicate that the decompressed size is 
not precomputed in order to
            *                      avoid buffering the entire message in memory.
   ```



##########
dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/compressor/Snappy.java:
##########
@@ -64,4 +66,9 @@ public byte[] decompress(byte[] payloadByteArr) {
             throw new IllegalStateException(e);
         }
     }
+
+    @Override
+    public InputStream decompress(InputStream inputStream) throws IOException {
+        return new org.xerial.snappy.SnappyInputStream(inputStream);
+    }

Review Comment:
   The new streaming compression methods (decorate and decompress(InputStream)) 
are not directly unit tested. While the existing compressor tests exercise the 
deprecated byte[] methods and the integration tests (WriteQueueTest, 
TripleClientStreamTest) exercise the new paths, consider adding explicit unit 
tests for the streaming methods to ensure they properly handle edge cases like 
empty streams, IOException propagation, and proper resource cleanup. This is 
especially important since the deprecated methods will eventually be removed.



##########
dubbo-common/src/main/java/org/apache/dubbo/common/io/UnsafeByteArrayOutputStream.java:
##########
@@ -86,6 +86,15 @@ public void writeTo(OutputStream out) throws IOException {
         out.write(mBuffer, 0, mCount);
     }
 
+    /**
+     * Returns an InputStream that reads from the internal buffer without 
copying.
+     *
+     * @return an InputStream wrapping the internal buffer
+     */
+    public UnsafeByteArrayInputStream toInputStream() {
+        return new UnsafeByteArrayInputStream(mBuffer, 0, mCount);
+    }

Review Comment:
   The new toInputStream() method in UnsafeByteArrayOutputStream is not tested. 
While integration tests may exercise this path, consider adding a unit test to 
verify that the returned InputStream correctly wraps the internal buffer 
without copying, properly reflects the current size (mCount), and can be read 
from the correct position.



##########
dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/ClientCall.java:
##########
@@ -49,7 +49,9 @@ interface Listener {
          * Callback when message received.
          *
          * @param message             message received
-         * @param actualContentLength actual content length from body
+         * @param actualContentLength actual content length from body.
+         *                            For compressed messages, this may be -1 
to indicate unknown length
+         *                            since decompressed size cannot be known 
until all bytes are read.

Review Comment:
   The documentation states "For compressed messages, this may be -1 to 
indicate unknown length since decompressed size cannot be known until all bytes 
are read." However, this is misleading. The -1 value is used when streaming 
decompression is employed (as implemented in GrpcStreamingDecoder line 58), but 
the actual decompressed size CAN be known for some compression formats (like 
gzip which stores original size in the trailer). The comment should clarify 
that -1 is used when employing streaming decompression to avoid buffering the 
entire decompressed message in memory, not because the size is fundamentally 
unknowable.
   ```suggestion
            *                            For compressed messages, this may be 
-1 when streaming
            *                            decompression is used and the 
implementation avoids buffering
            *                            the entire decompressed message in 
memory to determine its size.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to