Copilot commented on code in PR #16072:
URL: https://github.com/apache/dubbo/pull/16072#discussion_r2767687490


##########
dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/command/DataQueueCommand.java:
##########
@@ -16,50 +16,94 @@
  */
 package org.apache.dubbo.rpc.protocol.tri.command;
 
+import org.apache.dubbo.common.io.StreamUtils;
+import org.apache.dubbo.rpc.protocol.tri.compressor.Compressor;
+import org.apache.dubbo.rpc.protocol.tri.compressor.Identity;
 import org.apache.dubbo.rpc.protocol.tri.stream.TripleStreamChannelFuture;
 
+import java.io.InputStream;
+import java.io.OutputStream;
+
 import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufOutputStream;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelPromise;
 import io.netty.handler.codec.http2.DefaultHttp2DataFrame;
 
 public class DataQueueCommand extends StreamQueueCommand {
 
-    private final byte[] data;
+    private final InputStream dataStream;
 
-    private final int compressFlag;
+    private final Compressor compressor;
 
     private final boolean endStream;
 
     private DataQueueCommand(
-            TripleStreamChannelFuture streamChannelFuture, byte[] data, int 
compressFlag, boolean endStream) {
+            TripleStreamChannelFuture streamChannelFuture,
+            InputStream dataStream,
+            Compressor compressor,
+            boolean endStream) {
         super(streamChannelFuture);
-        this.data = data;
-        this.compressFlag = compressFlag;
+        this.dataStream = dataStream;
+        this.compressor = compressor;
         this.endStream = endStream;
     }
 
     public static DataQueueCommand create(
-            TripleStreamChannelFuture streamChannelFuture, byte[] data, 
boolean endStream, int compressFlag) {
-        return new DataQueueCommand(streamChannelFuture, data, compressFlag, 
endStream);
+            TripleStreamChannelFuture streamChannelFuture,
+            InputStream dataStream,
+            boolean endStream,
+            Compressor compressor) {
+        return new DataQueueCommand(streamChannelFuture, dataStream, 
compressor, endStream);
     }
 
+    /**
+     * Send data frame to the channel.
+     *
+     * <p>gRPC message frame format:
+     * <pre>
+     * +----------------------+
+     * | Compressed-Flag (1B) |  0 = uncompressed, 1 = compressed
+     * +----------------------+
+     * | Message-Length  (4B) |  big-endian unsigned integer
+     * +----------------------+
+     * | Message Data    (N)  |  compressed or uncompressed payload
+     * +----------------------+
+     * </pre>
+     */
     @Override
     public void doSend(ChannelHandlerContext ctx, ChannelPromise promise) {
-        if (data == null) {
+        if (dataStream == null) {
             ctx.write(new DefaultHttp2DataFrame(endStream), promise);
         } else {
             ByteBuf buf = ctx.alloc().buffer();
+            // Write compression flag (1 byte): 0 for identity, 1 for 
compressed
+            int compressFlag = 
Identity.MESSAGE_ENCODING.equals(compressor.getMessageEncoding()) ? 0 : 1;
             buf.writeByte(compressFlag);
-            buf.writeInt(data.length);
-            buf.writeBytes(data);
+            // Record position for length field, write placeholder (4 bytes)
+            int lengthIndex = buf.writerIndex();
+            buf.writeInt(0);
+            try {
+                // Compress and write data directly into ByteBuf using 
decorator pattern
+                ByteBufOutputStream bbos = new ByteBufOutputStream(buf);
+                OutputStream compressedOut = compressor.decorate(bbos);
+                StreamUtils.copy(dataStream, compressedOut);
+                compressedOut.close();

Review Comment:
   The dataStream passed to this method is not closed after being read. This 
can lead to resource leaks, especially for file-based InputStreams or other 
InputStreams that hold system resources. Consider wrapping the stream 
operations in a try-with-resources block or explicitly closing the dataStream 
in the finally block after copying is complete.



##########
dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/AbstractTripleClientStream.java:
##########
@@ -198,16 +199,23 @@ public SSLSession getSslSession() {
     }
 
     @Override
-    public ChannelFuture sendMessage(byte[] message, int compressFlag) {
+    public ChannelFuture sendMessage(InputStream message, Compressor 
compressor) {
         ChannelFuture checkResult = preCheck();
         if (!checkResult.isSuccess()) {
             return checkResult;
         }
 
-        final int messageSize = message.length;
+        // Estimate message size for flow control
+        int estimatedSize;
+        try {
+            estimatedSize = message.available();
+        } catch (IOException e) {
+            estimatedSize = 0;
+        }

Review Comment:
   The flow control size estimation using message.available() is unreliable for 
general InputStream types. The available() method only returns an estimate of 
bytes that can be read without blocking, not the total stream size. While this 
works correctly for ByteArrayInputStream (which is what's currently used in 
TripleClientCall), it could cause incorrect flow control accounting if other 
InputStream types are passed to sendMessage in the future. Consider documenting 
this limitation in the method's javadoc or adding validation to ensure only 
ByteArrayInputStream is accepted.



##########
dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/command/DataQueueCommand.java:
##########
@@ -16,50 +16,94 @@
  */
 package org.apache.dubbo.rpc.protocol.tri.command;
 
+import org.apache.dubbo.common.io.StreamUtils;
+import org.apache.dubbo.rpc.protocol.tri.compressor.Compressor;
+import org.apache.dubbo.rpc.protocol.tri.compressor.Identity;
 import org.apache.dubbo.rpc.protocol.tri.stream.TripleStreamChannelFuture;
 
+import java.io.InputStream;
+import java.io.OutputStream;
+
 import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufOutputStream;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelPromise;
 import io.netty.handler.codec.http2.DefaultHttp2DataFrame;
 
 public class DataQueueCommand extends StreamQueueCommand {
 
-    private final byte[] data;
+    private final InputStream dataStream;
 
-    private final int compressFlag;
+    private final Compressor compressor;
 
     private final boolean endStream;
 
     private DataQueueCommand(
-            TripleStreamChannelFuture streamChannelFuture, byte[] data, int 
compressFlag, boolean endStream) {
+            TripleStreamChannelFuture streamChannelFuture,
+            InputStream dataStream,
+            Compressor compressor,
+            boolean endStream) {
         super(streamChannelFuture);
-        this.data = data;
-        this.compressFlag = compressFlag;
+        this.dataStream = dataStream;
+        this.compressor = compressor;
         this.endStream = endStream;
     }
 
     public static DataQueueCommand create(
-            TripleStreamChannelFuture streamChannelFuture, byte[] data, 
boolean endStream, int compressFlag) {
-        return new DataQueueCommand(streamChannelFuture, data, compressFlag, 
endStream);
+            TripleStreamChannelFuture streamChannelFuture,
+            InputStream dataStream,
+            boolean endStream,
+            Compressor compressor) {
+        return new DataQueueCommand(streamChannelFuture, dataStream, 
compressor, endStream);
     }
 
+    /**
+     * Send data frame to the channel.
+     *
+     * <p>gRPC message frame format:
+     * <pre>
+     * +----------------------+
+     * | Compressed-Flag (1B) |  0 = uncompressed, 1 = compressed
+     * +----------------------+
+     * | Message-Length  (4B) |  big-endian unsigned integer
+     * +----------------------+
+     * | Message Data    (N)  |  compressed or uncompressed payload
+     * +----------------------+
+     * </pre>
+     */
     @Override
     public void doSend(ChannelHandlerContext ctx, ChannelPromise promise) {
-        if (data == null) {
+        if (dataStream == null) {
             ctx.write(new DefaultHttp2DataFrame(endStream), promise);
         } else {
             ByteBuf buf = ctx.alloc().buffer();
+            // Write compression flag (1 byte): 0 for identity, 1 for 
compressed
+            int compressFlag = 
Identity.MESSAGE_ENCODING.equals(compressor.getMessageEncoding()) ? 0 : 1;
             buf.writeByte(compressFlag);
-            buf.writeInt(data.length);
-            buf.writeBytes(data);
+            // Record position for length field, write placeholder (4 bytes)
+            int lengthIndex = buf.writerIndex();
+            buf.writeInt(0);
+            try {
+                // Compress and write data directly into ByteBuf using 
decorator pattern
+                ByteBufOutputStream bbos = new ByteBufOutputStream(buf);
+                OutputStream compressedOut = compressor.decorate(bbos);
+                StreamUtils.copy(dataStream, compressedOut);
+                compressedOut.close();
+                // Calculate actual message length: total written bytes minus 
the 4-byte length field itself
+                int written = buf.writerIndex() - lengthIndex - 4;
+                buf.setInt(lengthIndex, written);
+            } catch (Exception e) {
+                buf.release();
+                promise.setFailure(e);
+                return;
+            }

Review Comment:
   When an exception occurs during compression/copying, the ByteBuf is released 
but the dataStream is not closed. This can lead to resource leaks. The 
dataStream should also be closed in the catch block to ensure proper cleanup of 
any system resources it may hold.



##########
dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/ClientStream.java:
##########
@@ -73,12 +75,14 @@ default void onReady() {}
     void initStream();
 
     /**
-     * Send message to remote peer.
+     * Send message to remote peer with zero-copy compression.
+     * The compressor decorates the output stream to compress data directly 
into ByteBuf.
      *
-     * @param message message to send to remote peer
-     * @return future to callback when send message is done
+     * @param message    raw message stream (uncompressed)
+     * @param compressor compressor to use for compression
+     * @return future to callback when send is done
      */
-    Future<?> sendMessage(byte[] message, int compressFlag);
+    Future<?> sendMessage(InputStream message, Compressor compressor);

Review Comment:
   The sendMessage method signature has been changed from sendMessage(byte[], 
int) to sendMessage(InputStream, Compressor), which is a breaking API change. 
If ClientStream is considered a public or stable API, the old method should be 
deprecated first rather than removed directly, or this should be clearly 
documented as a breaking change in the PR description. If this is an 
internal-only API, consider adding package-private visibility or an @Internal 
annotation to make this clear.



##########
dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/command/DataQueueCommand.java:
##########
@@ -16,50 +16,94 @@
  */
 package org.apache.dubbo.rpc.protocol.tri.command;
 
+import org.apache.dubbo.common.io.StreamUtils;
+import org.apache.dubbo.rpc.protocol.tri.compressor.Compressor;
+import org.apache.dubbo.rpc.protocol.tri.compressor.Identity;
 import org.apache.dubbo.rpc.protocol.tri.stream.TripleStreamChannelFuture;
 
+import java.io.InputStream;
+import java.io.OutputStream;
+
 import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufOutputStream;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelPromise;
 import io.netty.handler.codec.http2.DefaultHttp2DataFrame;
 
 public class DataQueueCommand extends StreamQueueCommand {
 
-    private final byte[] data;
+    private final InputStream dataStream;
 
-    private final int compressFlag;
+    private final Compressor compressor;
 
     private final boolean endStream;
 
     private DataQueueCommand(
-            TripleStreamChannelFuture streamChannelFuture, byte[] data, int 
compressFlag, boolean endStream) {
+            TripleStreamChannelFuture streamChannelFuture,
+            InputStream dataStream,
+            Compressor compressor,
+            boolean endStream) {
         super(streamChannelFuture);
-        this.data = data;
-        this.compressFlag = compressFlag;
+        this.dataStream = dataStream;
+        this.compressor = compressor;
         this.endStream = endStream;
     }
 
     public static DataQueueCommand create(
-            TripleStreamChannelFuture streamChannelFuture, byte[] data, 
boolean endStream, int compressFlag) {
-        return new DataQueueCommand(streamChannelFuture, data, compressFlag, 
endStream);
+            TripleStreamChannelFuture streamChannelFuture,
+            InputStream dataStream,
+            boolean endStream,
+            Compressor compressor) {
+        return new DataQueueCommand(streamChannelFuture, dataStream, 
compressor, endStream);
     }
 
+    /**
+     * Send data frame to the channel.
+     *
+     * <p>gRPC message frame format:
+     * <pre>
+     * +----------------------+
+     * | Compressed-Flag (1B) |  0 = uncompressed, 1 = compressed
+     * +----------------------+
+     * | Message-Length  (4B) |  big-endian unsigned integer
+     * +----------------------+
+     * | Message Data    (N)  |  compressed or uncompressed payload
+     * +----------------------+
+     * </pre>
+     */
     @Override
     public void doSend(ChannelHandlerContext ctx, ChannelPromise promise) {
-        if (data == null) {
+        if (dataStream == null) {
             ctx.write(new DefaultHttp2DataFrame(endStream), promise);
         } else {
             ByteBuf buf = ctx.alloc().buffer();
+            // Write compression flag (1 byte): 0 for identity, 1 for 
compressed
+            int compressFlag = 
Identity.MESSAGE_ENCODING.equals(compressor.getMessageEncoding()) ? 0 : 1;
             buf.writeByte(compressFlag);
-            buf.writeInt(data.length);
-            buf.writeBytes(data);
+            // Record position for length field, write placeholder (4 bytes)
+            int lengthIndex = buf.writerIndex();
+            buf.writeInt(0);
+            try {
+                // Compress and write data directly into ByteBuf using 
decorator pattern
+                ByteBufOutputStream bbos = new ByteBufOutputStream(buf);
+                OutputStream compressedOut = compressor.decorate(bbos);
+                StreamUtils.copy(dataStream, compressedOut);
+                compressedOut.close();

Review Comment:
   This ByteBufOutputStream is not always closed on method exit.
   ```suggestion
                   try (ByteBufOutputStream bbos = new ByteBufOutputStream(buf);
                        OutputStream compressedOut = compressor.decorate(bbos)) 
{
                       StreamUtils.copy(dataStream, compressedOut);
                   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to