Copilot commented on code in PR #16004:
URL: https://github.com/apache/dubbo/pull/16004#discussion_r2693496578
##########
dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/AbstractTripleClientStream.java:
##########
@@ -254,9 +263,31 @@ public boolean isReady() {
* asynchronously triggering all necessary callbacks through its executor.
*/
protected void onWritabilityChanged() {
- Channel channel = streamChannelFuture.getNow();
- if (channel != null && channel.isWritable()) {
- // Synchronously call listener.onReady(), which will use executor
to run the callback
+ notifyOnReady(false);
+ }
+
+ /**
+ * Called by InitOnReadyQueueCommand to trigger the initial onReady
notification.
+ */
+ public void triggerInitialOnReady() {
+ notifyOnReady(true);
+ }
+
+ /**
+ * notify listener when stream becomes ready
+ *
+ * @param forceNotify if true, always trigger onReady (for initial
notification);
+ * if false, only trigger when state changes from "not
ready" to "ready"
+ */
+ private void notifyOnReady(boolean forceNotify) {
+ boolean wasReady = lastReadyState;
+ boolean isNowReady = isReady();
+ lastReadyState = isNowReady;
+
+ // Trigger onReady if:
+ // 1. forceNotify is true (initial notification, spurious is OK), or
+ // 2. state changes from "not ready" to "ready"
+ if (forceNotify || (!wasReady && isNowReady)) {
listener.onReady();
}
}
Review Comment:
This change introduces backpressure race condition handling logic, but there
are no new tests to verify the race condition is actually fixed. Tests should
be added to verify that concurrent calls to notifyOnReady correctly track state
transitions and don't result in duplicate or lost notifications, especially
when onWritabilityChanged and sendMessage callbacks race.
##########
dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/AbstractTripleClientStream.java:
##########
@@ -86,6 +86,12 @@ public abstract class AbstractTripleClientStream extends
AbstractStream implemen
private boolean isReturnTriException = false;
+ /**
+ * Tracks the last known ready state for detecting state transitions.
+ * when the state changes from "not ready" to "ready".
Review Comment:
The JavaDoc comment is incomplete. The first sentence ends abruptly without
completing the thought: "when the state changes from 'not ready' to 'ready'."
This should be integrated into a complete sentence explaining what the field
tracks.
```suggestion
* Tracks the last known ready state to detect when the state changes
from "not ready" to "ready".
```
##########
dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/AbstractTripleClientStream.java:
##########
@@ -254,9 +263,31 @@ public boolean isReady() {
* asynchronously triggering all necessary callbacks through its executor.
*/
protected void onWritabilityChanged() {
- Channel channel = streamChannelFuture.getNow();
- if (channel != null && channel.isWritable()) {
- // Synchronously call listener.onReady(), which will use executor
to run the callback
+ notifyOnReady(false);
+ }
+
+ /**
+ * Called by InitOnReadyQueueCommand to trigger the initial onReady
notification.
+ */
+ public void triggerInitialOnReady() {
+ notifyOnReady(true);
+ }
+
+ /**
+ * notify listener when stream becomes ready
+ *
+ * @param forceNotify if true, always trigger onReady (for initial
notification);
+ * if false, only trigger when state changes from "not
ready" to "ready"
+ */
+ private void notifyOnReady(boolean forceNotify) {
Review Comment:
The notifyOnReady method has a race condition in the read-modify-write
sequence. Multiple threads could read the same value of lastReadyState, compute
isReady(), and then update lastReadyState, leading to lost state transitions or
duplicate notifications. Since this method can be called from different threads
(the EventLoop thread via onWritabilityChanged, the sendMessage callback
thread, and the InitOnReadyQueueCommand), the entire sequence should be
synchronized or use atomic compare-and-set operations to ensure thread safety.
```suggestion
private synchronized void notifyOnReady(boolean forceNotify) {
```
##########
dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/AbstractTripleClientStream.java:
##########
@@ -254,9 +263,31 @@ public boolean isReady() {
* asynchronously triggering all necessary callbacks through its executor.
*/
protected void onWritabilityChanged() {
- Channel channel = streamChannelFuture.getNow();
- if (channel != null && channel.isWritable()) {
- // Synchronously call listener.onReady(), which will use executor
to run the callback
+ notifyOnReady(false);
+ }
+
+ /**
+ * Called by InitOnReadyQueueCommand to trigger the initial onReady
notification.
+ */
+ public void triggerInitialOnReady() {
Review Comment:
The triggerInitialOnReady method should be package-private or protected
rather than public since it's only called by InitOnReadyQueueCommand within the
same package. Public visibility exposes internal stream state management that
should not be accessible to external clients.
```suggestion
void triggerInitialOnReady() {
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]