Copilot commented on code in PR #16041:
URL: https://github.com/apache/dubbo/pull/16041#discussion_r2720073775
##########
dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/DescriptorUtils.java:
##########
@@ -125,6 +125,7 @@ public static MethodDescriptor findTripleMethodDescriptor(
ServiceDescriptor serviceDescriptor, String methodName,
InputStream rawMessage) throws IOException {
MethodDescriptor methodDescriptor =
findReflectionMethodDescriptor(serviceDescriptor, methodName);
if (methodDescriptor == null) {
Review Comment:
The call to `rawMessage.mark(Integer.MAX_VALUE)` assumes the InputStream
supports mark/reset and has a sufficient buffer. While BoundedInputStream (used
in the unified deframer) extends BufferedInputStream and supports this,
consider adding a check or documentation to ensure all callers pass
InputStreams that support mark/reset with adequate buffer size. If an
InputStream that doesn't support mark/reset is passed, this will fail silently
and the subsequent `reset()` on line 144 will throw an IOException.
```suggestion
if (methodDescriptor == null) {
if (!rawMessage.markSupported()) {
throw new IOException("InputStream does not support
mark/reset, which is required to resolve overloaded triple methods.");
}
```
##########
dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/LengthFieldStreamingDecoder.java:
##########
@@ -262,6 +271,82 @@ protected byte[] readRawMessage(InputStream inputStream,
int length) throws IOEx
return data;
}
+ protected static class MessageStream {
+
+ public final InputStream inputStream;
+ public final int length;
+
+ public MessageStream(InputStream inputStream, int length) {
+ this.inputStream = inputStream;
+ this.length = length;
+ }
+ }
+
+ /**
+ * A bounded InputStream that reads at most 'limit' bytes from the source
stream.
+ * Extends BufferedInputStream to support mark/reset, which is required by
+ * deserializers like Hessian2.
+ */
+ private static class BoundedInputStream extends BufferedInputStream {
+
+ private final int limit;
+ private int remaining;
+ private int markedRemaining;
+
+ public BoundedInputStream(InputStream source, int limit) {
+ super(source, limit);
+ this.limit = limit;
+ this.remaining = limit;
+ this.markedRemaining = limit;
+ }
+
+ @Override
+ public int read() throws IOException {
+ if (remaining <= 0) {
+ return -1;
+ }
+ int result = super.read();
+ if (result != -1) {
+ remaining--;
+ }
+ return result;
+ }
+
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException {
Review Comment:
Method 'read' overrides a synchronized method in
[java.io.BufferedInputStream](1) but is not synchronized.
##########
dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/LengthFieldStreamingDecoder.java:
##########
@@ -262,6 +271,82 @@ protected byte[] readRawMessage(InputStream inputStream,
int length) throws IOEx
return data;
}
+ protected static class MessageStream {
+
+ public final InputStream inputStream;
+ public final int length;
+
+ public MessageStream(InputStream inputStream, int length) {
+ this.inputStream = inputStream;
+ this.length = length;
+ }
+ }
+
+ /**
+ * A bounded InputStream that reads at most 'limit' bytes from the source
stream.
+ * Extends BufferedInputStream to support mark/reset, which is required by
+ * deserializers like Hessian2.
+ */
+ private static class BoundedInputStream extends BufferedInputStream {
+
+ private final int limit;
+ private int remaining;
+ private int markedRemaining;
+
+ public BoundedInputStream(InputStream source, int limit) {
+ super(source, limit);
+ this.limit = limit;
+ this.remaining = limit;
+ this.markedRemaining = limit;
+ }
+
+ @Override
+ public int read() throws IOException {
+ if (remaining <= 0) {
+ return -1;
+ }
+ int result = super.read();
+ if (result != -1) {
+ remaining--;
+ }
+ return result;
+ }
+
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException {
+ if (remaining <= 0) {
+ return -1;
+ }
+ int toRead = Math.min(len, remaining);
+ int result = super.read(b, off, toRead);
+ if (result > 0) {
+ remaining -= result;
+ }
+ return result;
+ }
+
+ @Override
+ public int available() throws IOException {
Review Comment:
Method 'available' overrides a synchronized method in
[java.io.BufferedInputStream](1) but is not synchronized.
```suggestion
public synchronized int available() throws IOException {
```
##########
dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/AbstractTripleClientStream.java:
##########
@@ -583,7 +589,9 @@ private void doOnData(ByteBuf data, boolean endStream) {
handleH2TransportError(TriRpcStatus.INTERNAL.withDescription("headers not
received before payload"));
Review Comment:
When headers are not received and the method returns early, the ByteBuf
'data' is not released, causing a potential memory leak. Since line 594 creates
a ByteBufInputStream with auto-release (second parameter 'true'), but this line
is never reached when returning early at line 590, the ByteBuf will not be
released. Add ReferenceCountUtil.release(data) before returning at line 590,
similar to the error handling at line 582.
```suggestion
handleH2TransportError(TriRpcStatus.INTERNAL.withDescription("headers not
received before payload"));
ReferenceCountUtil.release(data);
```
##########
dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/LengthFieldStreamingDecoder.java:
##########
@@ -262,6 +271,82 @@ protected byte[] readRawMessage(InputStream inputStream,
int length) throws IOEx
return data;
}
+ protected static class MessageStream {
+
+ public final InputStream inputStream;
+ public final int length;
+
+ public MessageStream(InputStream inputStream, int length) {
+ this.inputStream = inputStream;
+ this.length = length;
+ }
+ }
+
+ /**
+ * A bounded InputStream that reads at most 'limit' bytes from the source
stream.
+ * Extends BufferedInputStream to support mark/reset, which is required by
+ * deserializers like Hessian2.
+ */
+ private static class BoundedInputStream extends BufferedInputStream {
+
+ private final int limit;
+ private int remaining;
+ private int markedRemaining;
+
+ public BoundedInputStream(InputStream source, int limit) {
+ super(source, limit);
+ this.limit = limit;
+ this.remaining = limit;
+ this.markedRemaining = limit;
+ }
+
+ @Override
+ public int read() throws IOException {
Review Comment:
Method 'read' overrides a synchronized method in
[java.io.BufferedInputStream](1) but is not synchronized.
##########
dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/grpc/GrpcStreamingDecoder.java:
##########
@@ -50,12 +50,13 @@ protected void processOffset(InputStream inputStream, int
lengthFieldOffset) thr
}
@Override
- protected byte[] readRawMessage(InputStream inputStream, int length)
throws IOException {
- byte[] rawMessage = super.readRawMessage(inputStream, length);
- return compressedFlag ? deCompressedMessage(rawMessage) : rawMessage;
- }
-
- private byte[] deCompressedMessage(byte[] rawMessage) {
- return deCompressor.decompress(rawMessage);
+ protected MessageStream readMessageStream(InputStream inputStream, int
length) throws IOException {
+ if (compressedFlag) {
+ // For compressed messages, we need to read bytes first, then
decompress
+ byte[] rawMessage = readRawMessage(inputStream, length);
+ byte[] decompressed = deCompressor.decompress(rawMessage);
+ return new MessageStream(new
java.io.ByteArrayInputStream(decompressed), decompressed.length);
+ }
+ return super.readMessageStream(inputStream, length);
}
}
Review Comment:
The refactoring to use GrpcStreamingDecoder (unified deframer) removes the
Triple-specific TriDecoder tests without adding equivalent test coverage. While
GrpcStreamingDecoder is tested elsewhere, consider adding tests that verify the
Triple protocol's specific usage patterns, including: compressed/uncompressed
message handling, proper flow control through bytesRead callbacks, and correct
integration with the DeCompressor component.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]