Copilot commented on code in PR #16028:
URL: https://github.com/apache/dubbo/pull/16028#discussion_r2708033626
##########
dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h2/Http2ServerChannelObserver.java:
##########
@@ -68,19 +95,82 @@ public void setOnReadyHandler(Runnable onReadyHandler) {
/**
* Called when the channel writability changes.
- * Triggers the onReadyHandler if the channel is now writable.
*/
public void onWritabilityChanged() {
- Runnable handler = this.onReadyHandler;
- if (handler != null && isReady()) {
- handler.run();
+ if (isReady()) {
+ notifyOnReady();
}
}
public void setStreamingDecoder(StreamingDecoder streamingDecoder) {
this.streamingDecoder = streamingDecoder;
}
+ /**
+ * Override to add byte counting for backpressure support.
+ */
+ @Override
+ protected CompletableFuture<Void> sendMessage(HttpOutputMessage message)
throws Throwable {
+ if (message == null) {
+ return CompletableFuture.completedFuture(null);
+ }
+
+ int messageSize = message.messageSize();
+ onSendingBytes(messageSize);
+
+ CompletableFuture<Void> future = super.sendMessage(message);
+
+ final int size = messageSize;
+ future.whenComplete((v, t) -> {
+ if (t == null) {
+ onSentBytes(size);
+ } else {
+ rollbackSendingBytes(size);
+ }
+ });
+
+ return future;
+ }
+
+ /**
+ * Called before bytes are sent to track pending bytes.
+ */
+ private void onSendingBytes(int numBytes) {
+ numSentBytesQueued.addAndGet(numBytes);
+ }
+
+ /**
+ * Called when sending fails to rollback the pending bytes count.
+ */
+ private void rollbackSendingBytes(int numBytes) {
+ numSentBytesQueued.addAndGet(-numBytes);
+ }
+
+ /**
+ * Called when bytes have been successfully sent to the remote endpoint.
+ */
+ private void onSentBytes(int numBytes) {
+ boolean wasBelowThreshold = numSentBytesQueued.get() <
ON_READY_THRESHOLD;
+ long newValue = numSentBytesQueued.addAndGet(-numBytes);
+ boolean nowBelowThreshold = newValue < ON_READY_THRESHOLD;
+
+ // Trigger onReady when transitioning from "not ready" to "ready"
+ if (!wasBelowThreshold && nowBelowThreshold) {
+ notifyOnReady();
+ }
+ }
Review Comment:
There is a race condition in the onSentBytes method. The check
`wasBelowThreshold` and the subsequent `addAndGet` operation are not atomic.
Between reading the value for the check and updating it, another thread could
modify numSentBytesQueued, leading to incorrect state transitions and
potentially missing or duplicate onReady notifications.
To fix this, you should either:
1. Use a compare-and-set loop to atomically update and detect the transition
2. Synchronize the entire method
3. Use a single atomic operation that captures both the old and new values
##########
dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/AbstractTripleClientStream.java:
##########
@@ -194,20 +201,58 @@ public ChannelFuture sendMessage(byte[] message, int
compressFlag) {
if (!checkResult.isSuccess()) {
return checkResult;
}
+
+ final int messageSize = message.length;
+ onSendingBytes(messageSize);
+
final DataQueueCommand cmd =
DataQueueCommand.create(streamChannelFuture, message, false, compressFlag);
return this.writeQueue.enqueueFuture(cmd,
parent.eventLoop()).addListener(future -> {
if (!future.isSuccess()) {
+ rollbackSendingBytes(messageSize);
cancelByLocal(TriRpcStatus.INTERNAL
.withDescription("Client write message failed")
.withCause(future.cause()));
transportException(future.cause());
} else {
- // After successful write, check if we need to trigger onReady
- notifyOnReady(false);
+ onSentBytes(messageSize);
}
});
}
+ /**
+ * Called before bytes are sent to track pending bytes.
+ *
+ * @param numBytes the number of bytes about to be sent
+ */
+ private void onSendingBytes(int numBytes) {
+ numSentBytesQueued.addAndGet(numBytes);
+ }
+
+ /**
+ * Called when sending fails to rollback the pending bytes count.
+ *
+ * @param numBytes the number of bytes to rollback
+ */
+ private void rollbackSendingBytes(int numBytes) {
+ numSentBytesQueued.addAndGet(-numBytes);
+ }
+
+ /**
+ * Called when bytes have been successfully sent to the remote endpoint.
+ *
+ * @param numBytes the number of bytes that were sent
+ */
+ private void onSentBytes(int numBytes) {
+ boolean wasBelowThreshold = numSentBytesQueued.get() <
ON_READY_THRESHOLD;
+ long newValue = numSentBytesQueued.addAndGet(-numBytes);
+ boolean nowBelowThreshold = newValue < ON_READY_THRESHOLD;
+
+ // Trigger onReady when transitioning from "not ready" to "ready"
+ if (!wasBelowThreshold && nowBelowThreshold) {
+ listener.onReady();
+ }
+ }
Review Comment:
There is a race condition in the onSentBytes method. The check
`wasBelowThreshold` and the subsequent `addAndGet` operation are not atomic.
Between reading the value for the check and updating it, another thread could
modify numSentBytesQueued, leading to incorrect state transitions and
potentially missing or duplicate onReady notifications.
To fix this, you should either:
1. Use a compare-and-set loop to atomically update and detect the transition
2. Synchronize the entire method
3. Use a single atomic operation that captures both the old and new values
##########
dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/AbstractTripleClientStream.java:
##########
@@ -194,20 +201,58 @@ public ChannelFuture sendMessage(byte[] message, int
compressFlag) {
if (!checkResult.isSuccess()) {
return checkResult;
}
+
+ final int messageSize = message.length;
+ onSendingBytes(messageSize);
+
final DataQueueCommand cmd =
DataQueueCommand.create(streamChannelFuture, message, false, compressFlag);
return this.writeQueue.enqueueFuture(cmd,
parent.eventLoop()).addListener(future -> {
if (!future.isSuccess()) {
+ rollbackSendingBytes(messageSize);
cancelByLocal(TriRpcStatus.INTERNAL
.withDescription("Client write message failed")
.withCause(future.cause()));
transportException(future.cause());
} else {
- // After successful write, check if we need to trigger onReady
- notifyOnReady(false);
+ onSentBytes(messageSize);
}
});
}
+ /**
+ * Called before bytes are sent to track pending bytes.
+ *
+ * @param numBytes the number of bytes about to be sent
+ */
+ private void onSendingBytes(int numBytes) {
+ numSentBytesQueued.addAndGet(numBytes);
+ }
+
+ /**
+ * Called when sending fails to rollback the pending bytes count.
+ *
+ * @param numBytes the number of bytes to rollback
+ */
+ private void rollbackSendingBytes(int numBytes) {
+ numSentBytesQueued.addAndGet(-numBytes);
+ }
+
+ /**
+ * Called when bytes have been successfully sent to the remote endpoint.
+ *
+ * @param numBytes the number of bytes that were sent
+ */
+ private void onSentBytes(int numBytes) {
+ boolean wasBelowThreshold = numSentBytesQueued.get() <
ON_READY_THRESHOLD;
+ long newValue = numSentBytesQueued.addAndGet(-numBytes);
+ boolean nowBelowThreshold = newValue < ON_READY_THRESHOLD;
+
+ // Trigger onReady when transitioning from "not ready" to "ready"
+ if (!wasBelowThreshold && nowBelowThreshold) {
+ listener.onReady();
+ }
+ }
Review Comment:
The new backpressure mechanism with byte tracking introduces complex
concurrency logic that is not covered by tests. Consider adding unit tests to
verify:
1. The byte counting mechanism correctly increments and decrements
2. The threshold-based isReady() logic works correctly
3. The onReady callback is triggered exactly once when transitioning from
not-ready to ready
4. Edge cases like concurrent sends and completions are handled correctly
5. The rollback mechanism works when sends fail
##########
dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h2/Http2ServerChannelObserver.java:
##########
@@ -68,19 +95,82 @@ public void setOnReadyHandler(Runnable onReadyHandler) {
/**
* Called when the channel writability changes.
- * Triggers the onReadyHandler if the channel is now writable.
*/
public void onWritabilityChanged() {
- Runnable handler = this.onReadyHandler;
- if (handler != null && isReady()) {
- handler.run();
+ if (isReady()) {
+ notifyOnReady();
}
}
public void setStreamingDecoder(StreamingDecoder streamingDecoder) {
this.streamingDecoder = streamingDecoder;
}
+ /**
+ * Override to add byte counting for backpressure support.
+ */
+ @Override
+ protected CompletableFuture<Void> sendMessage(HttpOutputMessage message)
throws Throwable {
+ if (message == null) {
+ return CompletableFuture.completedFuture(null);
+ }
+
+ int messageSize = message.messageSize();
+ onSendingBytes(messageSize);
+
+ CompletableFuture<Void> future = super.sendMessage(message);
+
+ final int size = messageSize;
+ future.whenComplete((v, t) -> {
+ if (t == null) {
+ onSentBytes(size);
+ } else {
+ rollbackSendingBytes(size);
+ }
+ });
+
+ return future;
+ }
+
+ /**
+ * Called before bytes are sent to track pending bytes.
+ */
+ private void onSendingBytes(int numBytes) {
+ numSentBytesQueued.addAndGet(numBytes);
+ }
+
+ /**
+ * Called when sending fails to rollback the pending bytes count.
+ */
+ private void rollbackSendingBytes(int numBytes) {
+ numSentBytesQueued.addAndGet(-numBytes);
+ }
+
+ /**
+ * Called when bytes have been successfully sent to the remote endpoint.
+ */
+ private void onSentBytes(int numBytes) {
+ boolean wasBelowThreshold = numSentBytesQueued.get() <
ON_READY_THRESHOLD;
+ long newValue = numSentBytesQueued.addAndGet(-numBytes);
+ boolean nowBelowThreshold = newValue < ON_READY_THRESHOLD;
+
+ // Trigger onReady when transitioning from "not ready" to "ready"
+ if (!wasBelowThreshold && nowBelowThreshold) {
+ notifyOnReady();
+ }
+ }
+
+ /**
+ * Notify the onReadyHandler that the stream is ready for writing.
+ */
+ private void notifyOnReady() {
+ Runnable handler = this.onReadyHandler;
+ if (handler == null) {
+ return;
+ }
+ executor.execute(handler);
+ }
+
Review Comment:
The new backpressure mechanism with byte tracking introduces complex
concurrency logic that is not covered by tests. Consider adding unit tests to
verify:
1. The byte counting mechanism correctly increments and decrements
2. The threshold-based isReady() logic works correctly
3. The onReady callback is triggered exactly once when transitioning from
not-ready to ready
4. Edge cases like concurrent sends and completions are handled correctly
5. The rollback mechanism works when sends fail
6. The executor-based async dispatch of onReadyHandler works correctly
##########
dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h2/Http2ServerChannelObserver.java:
##########
@@ -47,16 +62,28 @@ public class Http2ServerChannelObserver extends
AbstractServerHttpChannelObserve
private Runnable onReadyHandler;
Review Comment:
The `onReadyHandler` field is not declared as volatile. Since it's set via
`setOnReadyHandler()` and read from potentially different threads in
`notifyOnReady()`, this could lead to visibility issues where threads might see
a stale or null value. Declare the field as `private volatile Runnable
onReadyHandler` to ensure proper memory visibility across threads.
```suggestion
private volatile Runnable onReadyHandler;
```
##########
dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h2/Http2ServerChannelObserver.java:
##########
@@ -68,19 +95,82 @@ public void setOnReadyHandler(Runnable onReadyHandler) {
/**
* Called when the channel writability changes.
- * Triggers the onReadyHandler if the channel is now writable.
*/
public void onWritabilityChanged() {
- Runnable handler = this.onReadyHandler;
- if (handler != null && isReady()) {
- handler.run();
+ if (isReady()) {
+ notifyOnReady();
}
}
public void setStreamingDecoder(StreamingDecoder streamingDecoder) {
this.streamingDecoder = streamingDecoder;
}
+ /**
+ * Override to add byte counting for backpressure support.
+ */
+ @Override
+ protected CompletableFuture<Void> sendMessage(HttpOutputMessage message)
throws Throwable {
+ if (message == null) {
+ return CompletableFuture.completedFuture(null);
+ }
+
+ int messageSize = message.messageSize();
+ onSendingBytes(messageSize);
+
+ CompletableFuture<Void> future = super.sendMessage(message);
+
+ final int size = messageSize;
+ future.whenComplete((v, t) -> {
+ if (t == null) {
+ onSentBytes(size);
+ } else {
+ rollbackSendingBytes(size);
Review Comment:
The variable `size` is redundantly assigned from `messageSize`. Since
`messageSize` is effectively final (not reassigned after initialization), you
can use `messageSize` directly in the lambda without creating an additional
local variable. This simplifies the code.
```suggestion
future.whenComplete((v, t) -> {
if (t == null) {
onSentBytes(messageSize);
} else {
rollbackSendingBytes(messageSize);
```
##########
dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/AbstractTripleClientStream.java:
##########
@@ -87,9 +87,16 @@ public abstract class AbstractTripleClientStream extends
AbstractStream implemen
private boolean isReturnTriException = false;
/**
- * Tracks the last known ready state to detect when the state changes from
"not ready" to "ready".
+ * Number of bytes currently queued, waiting to be sent.
+ * When this falls below ON_READY_THRESHOLD, onReady will be triggered.
*/
- private volatile boolean lastReadyState = false;
+ private final java.util.concurrent.atomic.AtomicLong numSentBytesQueued =
+ new java.util.concurrent.atomic.AtomicLong(0);
Review Comment:
The fully qualified class name `java.util.concurrent.atomic.AtomicLong` is
used instead of importing the class. This is inconsistent with Java best
practices and the rest of the codebase. Consider adding an import statement for
AtomicLong at the top of the file and using the simple class name instead.
##########
dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h2/Http2ServerChannelObserver.java:
##########
@@ -47,16 +62,28 @@ public class Http2ServerChannelObserver extends
AbstractServerHttpChannelObserve
private Runnable onReadyHandler;
+ private Executor executor = Runnable::run;
Review Comment:
The `executor` field is not declared as volatile. Since it's set via
`setExecutor()` (line 182 of GenericHttp2ServerTransportListener) and
potentially read from a different thread in `notifyOnReady()` (line 171), this
could lead to visibility issues where threads might see a stale value of the
executor. Declare the field as `private volatile Executor executor` to ensure
proper memory visibility across threads.
```suggestion
private volatile Executor executor = Runnable::run;
```
##########
dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h2/Http2ServerChannelObserver.java:
##########
@@ -24,21 +24,36 @@
import org.apache.dubbo.remoting.http12.HttpHeaderNames;
import org.apache.dubbo.remoting.http12.HttpHeaders;
import org.apache.dubbo.remoting.http12.HttpMetadata;
+import org.apache.dubbo.remoting.http12.HttpOutputMessage;
import org.apache.dubbo.remoting.http12.message.StreamingDecoder;
import org.apache.dubbo.remoting.http12.netty4.NettyHttpHeaders;
import org.apache.dubbo.rpc.CancellationContext;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
import io.netty.handler.codec.http2.DefaultHttp2Headers;
/**
* HTTP/2 server-side stream observer with flow control and backpressure
support.
- * Implements {@link ServerCallStreamObserver} following gRPC's pattern for
backpressure.
*/
public class Http2ServerChannelObserver extends
AbstractServerHttpChannelObserver<H2StreamChannel>
implements FlowControlStreamObserver<Object>,
Http2CancelableStreamObserver<Object>,
ServerCallStreamObserver<Object> {
+ /**
+ * Number of bytes currently queued, waiting to be sent.
+ * When this falls below ON_READY_THRESHOLD, onReady will be triggered.
+ */
+ private final java.util.concurrent.atomic.AtomicLong numSentBytesQueued =
+ new java.util.concurrent.atomic.AtomicLong(0);
Review Comment:
The fully qualified class name `java.util.concurrent.atomic.AtomicLong` is
used instead of importing the class. This is inconsistent with Java best
practices and the rest of the codebase. Consider adding an import statement for
AtomicLong at the top of the file and using the simple class name instead.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]