Copilot commented on code in PR #16045:
URL: https://github.com/apache/dubbo/pull/16045#discussion_r2724068522
##########
dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleCustomerProtocolWrapper.java:
##########
@@ -168,6 +201,54 @@ public byte[] toByteArray() {
return byteBuffer.array();
}
+ public static TripleResponseWrapper parseFrom(InputStream inputStream)
throws IOException {
+ TripleResponseWrapper tripleResponseWrapper = new
TripleResponseWrapper();
+ int b;
+ while ((b = inputStream.read()) != -1) {
+ int tag = b;
+ if ((b & 0x80) != 0) {
+ int shift = 7;
+ tag = b & 0x7F;
+ while (shift < 35) {
+ b = inputStream.read();
+ if (b == -1) {
+ throw new IOException("Unexpected end of stream
while reading tag");
+ }
+ tag |= (b & 0x7F) << shift;
+ if ((b & 0x80) == 0) {
+ break;
+ }
+ shift += 7;
+ }
+ }
+
Review Comment:
The varint parsing logic for tags (lines 208-223) is duplicated and slightly
different from the readRawVarint32 method. Consider using readRawVarint32 for
both tag and length parsing to reduce duplication and ensure consistency. The
current implementation reads the first byte outside the loop and then continues
if needed, which could be simplified.
```suggestion
final int firstByte = b;
InputStream tagInputStream = new InputStream() {
private boolean first = true;
@Override
public int read() throws IOException {
if (first) {
first = false;
return firstByte;
}
return inputStream.read();
}
@Override
public int read(byte[] buffer, int off, int len) throws
IOException {
if (buffer == null) {
throw new NullPointerException();
}
if (off < 0 || len < 0 || len > buffer.length - off)
{
throw new IndexOutOfBoundsException();
}
if (len == 0) {
return 0;
}
if (first) {
first = false;
buffer[off] = (byte) firstByte;
return 1;
}
return inputStream.read(buffer, off, len);
}
};
int tag = readRawVarint32(tagInputStream);
```
##########
dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/stream/MockClientStreamListener.java:
##########
@@ -40,8 +43,18 @@ public void onComplete(TriRpcStatus status, Map<String,
Object> attachments) {
public void onClose() {}
@Override
- public void onMessage(byte[] message, boolean isNeedReturnException) {
- this.message = message;
+ public void onMessage(InputStream message, int messageLength, boolean
isNeedReturnException) {
+ try {
+ ByteArrayOutputStream buffer = new ByteArrayOutputStream();
+ byte[] tmp = new byte[4096];
+ int len;
+ while ((len = message.read(tmp)) != -1) {
+ buffer.write(tmp, 0, len);
+ }
+ this.message = buffer.toByteArray();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
}
Review Comment:
The InputStream passed to onMessage is not explicitly closed in the test
implementation. While BoundedInputStream will return -1 after reading the
expected bytes, the stream should be closed to properly release resources,
especially when ByteBufInputStream is used with releaseOnClose=true. Consider
adding a try-with-resources or explicit close() call, or document that the
caller is responsible for closing the stream.
##########
dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/LengthFieldStreamingDecoder.java:
##########
@@ -262,6 +271,82 @@ protected byte[] readRawMessage(InputStream inputStream,
int length) throws IOEx
return data;
}
+ protected static class MessageStream {
+
+ public final InputStream inputStream;
+ public final int length;
+
+ public MessageStream(InputStream inputStream, int length) {
+ this.inputStream = inputStream;
+ this.length = length;
+ }
+ }
+
+ /**
+ * A bounded InputStream that reads at most 'limit' bytes from the source
stream.
+ * Extends BufferedInputStream to support mark/reset, which is required by
+ * deserializers like Hessian2.
+ */
+ private static class BoundedInputStream extends BufferedInputStream {
+
+ private final int limit;
+ private int remaining;
+ private int markedRemaining;
+
+ public BoundedInputStream(InputStream source, int limit) {
+ super(source, limit);
+ this.limit = limit;
+ this.remaining = limit;
+ this.markedRemaining = limit;
+ }
+
+ @Override
+ public int read() throws IOException {
+ if (remaining <= 0) {
+ return -1;
+ }
+ int result = super.read();
+ if (result != -1) {
+ remaining--;
+ }
+ return result;
+ }
+
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException {
Review Comment:
Method 'read' overrides a synchronized method in
[java.io.BufferedInputStream](1) but is not synchronized.
##########
dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleCustomerProtocolWrapper.java:
##########
@@ -168,6 +201,54 @@ public byte[] toByteArray() {
return byteBuffer.array();
}
+ public static TripleResponseWrapper parseFrom(InputStream inputStream)
throws IOException {
+ TripleResponseWrapper tripleResponseWrapper = new
TripleResponseWrapper();
+ int b;
+ while ((b = inputStream.read()) != -1) {
+ int tag = b;
+ if ((b & 0x80) != 0) {
+ int shift = 7;
+ tag = b & 0x7F;
+ while (shift < 35) {
+ b = inputStream.read();
+ if (b == -1) {
+ throw new IOException("Unexpected end of stream
while reading tag");
+ }
+ tag |= (b & 0x7F) << shift;
+ if ((b & 0x80) == 0) {
+ break;
+ }
+ shift += 7;
+ }
+ }
+
+ int fieldNum = extractFieldNumFromTag(tag);
+ int wireType = extractWireTypeFromTag(tag);
+ if (wireType != 2) {
+ throw new RuntimeException(
+ String.format("unexpected wireType, expect %d
realType %d", 2, wireType));
+ }
+
+ int length = readRawVarint32(inputStream);
+ byte[] fieldData = readExactly(inputStream, length);
+
+ if (fieldNum == 1) {
+ tripleResponseWrapper.serializeType = new
String(fieldData);
+ } else if (fieldNum == 2) {
+ tripleResponseWrapper.data = fieldData;
+ } else if (fieldNum == 3) {
+ tripleResponseWrapper.type = new String(fieldData);
Review Comment:
String construction should specify charset explicitly. The current code uses
the default platform charset (via new String(byte[])) which may differ across
systems. Consider using new String(fieldData, StandardCharsets.UTF_8) to match
the encoding used in toByteArray().
```suggestion
tripleResponseWrapper.serializeType = new
String(fieldData, StandardCharsets.UTF_8);
} else if (fieldNum == 2) {
tripleResponseWrapper.data = fieldData;
} else if (fieldNum == 3) {
tripleResponseWrapper.type = new String(fieldData,
StandardCharsets.UTF_8);
```
##########
dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleCustomerProtocolWrapper.java:
##########
@@ -317,6 +398,56 @@ public byte[] toByteArray() {
return byteBuffer.array();
}
+ public static TripleRequestWrapper parseFrom(InputStream inputStream)
throws IOException {
+ TripleRequestWrapper tripleRequestWrapper = new
TripleRequestWrapper();
+ tripleRequestWrapper.args = new ArrayList<>();
+ tripleRequestWrapper.argTypes = new ArrayList<>();
+ int b;
+ while ((b = inputStream.read()) != -1) {
+ int tag = b;
+ if ((b & 0x80) != 0) {
+ int shift = 7;
+ tag = b & 0x7F;
+ while (shift < 35) {
+ b = inputStream.read();
+ if (b == -1) {
+ throw new IOException("Unexpected end of stream
while reading tag");
+ }
+ tag |= (b & 0x7F) << shift;
+ if ((b & 0x80) == 0) {
+ break;
+ }
+ shift += 7;
+ }
+ }
+
+ int fieldNum = extractFieldNumFromTag(tag);
+ int wireType = extractWireTypeFromTag(tag);
+ if (wireType != 2) {
+ throw new RuntimeException(
+ String.format("unexpected wireType, expect %d
realType %d", 2, wireType));
+ }
+
+ int length = readRawVarint32(inputStream);
+ byte[] fieldData = readExactly(inputStream, length);
+
+ if (fieldNum == 1) {
+ tripleRequestWrapper.serializeType = new String(fieldData);
+ } else if (fieldNum == 2) {
+ tripleRequestWrapper.args.add(fieldData);
+ } else if (fieldNum == 3) {
+ tripleRequestWrapper.argTypes.add(new String(fieldData));
Review Comment:
String construction should specify charset explicitly. The current code uses
the default platform charset (via new String(byte[])) which may differ across
systems. Consider using new String(fieldData, StandardCharsets.UTF_8) to match
the encoding used in toByteArray().
```suggestion
tripleRequestWrapper.serializeType = new
String(fieldData, StandardCharsets.UTF_8);
} else if (fieldNum == 2) {
tripleRequestWrapper.args.add(fieldData);
} else if (fieldNum == 3) {
tripleRequestWrapper.argTypes.add(new String(fieldData,
StandardCharsets.UTF_8));
```
##########
dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/LengthFieldStreamingDecoder.java:
##########
@@ -262,6 +271,82 @@ protected byte[] readRawMessage(InputStream inputStream,
int length) throws IOEx
return data;
}
+ protected static class MessageStream {
+
+ public final InputStream inputStream;
+ public final int length;
+
+ public MessageStream(InputStream inputStream, int length) {
+ this.inputStream = inputStream;
+ this.length = length;
+ }
+ }
+
+ /**
+ * A bounded InputStream that reads at most 'limit' bytes from the source
stream.
+ * Extends BufferedInputStream to support mark/reset, which is required by
+ * deserializers like Hessian2.
+ */
+ private static class BoundedInputStream extends BufferedInputStream {
+
+ private final int limit;
+ private int remaining;
+ private int markedRemaining;
+
+ public BoundedInputStream(InputStream source, int limit) {
+ super(source, limit);
+ this.limit = limit;
+ this.remaining = limit;
+ this.markedRemaining = limit;
+ }
+
+ @Override
+ public int read() throws IOException {
+ if (remaining <= 0) {
+ return -1;
+ }
+ int result = super.read();
+ if (result != -1) {
+ remaining--;
+ }
+ return result;
+ }
+
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException {
+ if (remaining <= 0) {
+ return -1;
+ }
+ int toRead = Math.min(len, remaining);
+ int result = super.read(b, off, toRead);
+ if (result > 0) {
+ remaining -= result;
+ }
+ return result;
+ }
+
+ @Override
+ public int available() throws IOException {
Review Comment:
Method 'available' overrides a synchronized method in
[java.io.BufferedInputStream](1) but is not synchronized.
```suggestion
public synchronized int available() throws IOException {
```
##########
dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/AbstractTripleClientStream.java:
##########
@@ -583,7 +589,9 @@ private void doOnData(ByteBuf data, boolean endStream) {
handleH2TransportError(TriRpcStatus.INTERNAL.withDescription("headers not
received before payload"));
return;
}
- deframer.deframe(data);
+ // Use ByteBufInputStream to adapt ByteBuf to InputStream
+ // The second parameter 'true' means release ByteBuf after reading
+ deframer.decode(new ByteBufInputStream(data, true));
Review Comment:
This ByteBufInputStream is not always closed on method exit.
```suggestion
try (InputStream in = new ByteBufInputStream(data, true)) {
deframer.decode(in);
}
```
##########
dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/DescriptorUtils.java:
##########
@@ -125,9 +124,9 @@ public static MethodDescriptor findTripleMethodDescriptor(
ServiceDescriptor serviceDescriptor, String methodName,
InputStream rawMessage) throws IOException {
MethodDescriptor methodDescriptor =
findReflectionMethodDescriptor(serviceDescriptor, methodName);
if (methodDescriptor == null) {
Review Comment:
Using mark with Integer.MAX_VALUE as the readlimit parameter may not work
with all InputStream implementations. The BoundedInputStream created in
LengthFieldStreamingDecoder overrides mark to ensure sufficient buffer size,
but this assumption may not hold for all input streams passed to this method.
Consider checking if mark is supported using markSupported() before calling
mark, or document that the input stream must support mark/reset.
```suggestion
if (methodDescriptor == null) {
if (!rawMessage.markSupported()) {
throw new IOException("InputStream must support mark/reset
to resolve overloaded triple methods");
}
```
##########
dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/LengthFieldStreamingDecoder.java:
##########
@@ -262,6 +271,82 @@ protected byte[] readRawMessage(InputStream inputStream,
int length) throws IOEx
return data;
}
+ protected static class MessageStream {
+
+ public final InputStream inputStream;
+ public final int length;
+
+ public MessageStream(InputStream inputStream, int length) {
+ this.inputStream = inputStream;
+ this.length = length;
+ }
+ }
+
+ /**
+ * A bounded InputStream that reads at most 'limit' bytes from the source
stream.
+ * Extends BufferedInputStream to support mark/reset, which is required by
+ * deserializers like Hessian2.
+ */
+ private static class BoundedInputStream extends BufferedInputStream {
+
+ private final int limit;
+ private int remaining;
+ private int markedRemaining;
+
+ public BoundedInputStream(InputStream source, int limit) {
+ super(source, limit);
+ this.limit = limit;
+ this.remaining = limit;
+ this.markedRemaining = limit;
+ }
Review Comment:
The BoundedInputStream constructor uses the limit as the buffer size for
BufferedInputStream (line 297). For large messages, this could allocate
unnecessarily large buffers. Consider using a more reasonable buffer size
(e.g., Math.min(limit, 8192)) to avoid excessive memory allocation while still
providing good buffering performance.
##########
dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/LengthFieldStreamingDecoder.java:
##########
@@ -262,6 +271,82 @@ protected byte[] readRawMessage(InputStream inputStream,
int length) throws IOEx
return data;
}
+ protected static class MessageStream {
+
+ public final InputStream inputStream;
+ public final int length;
+
+ public MessageStream(InputStream inputStream, int length) {
+ this.inputStream = inputStream;
+ this.length = length;
+ }
+ }
+
+ /**
+ * A bounded InputStream that reads at most 'limit' bytes from the source
stream.
+ * Extends BufferedInputStream to support mark/reset, which is required by
+ * deserializers like Hessian2.
+ */
+ private static class BoundedInputStream extends BufferedInputStream {
+
+ private final int limit;
+ private int remaining;
+ private int markedRemaining;
+
+ public BoundedInputStream(InputStream source, int limit) {
+ super(source, limit);
+ this.limit = limit;
+ this.remaining = limit;
+ this.markedRemaining = limit;
+ }
+
+ @Override
+ public int read() throws IOException {
Review Comment:
Method 'read' overrides a synchronized method in
[java.io.BufferedInputStream](1) but is not synchronized.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]