Copilot commented on code in PR #15967:
URL: https://github.com/apache/dubbo/pull/15967#discussion_r2663667138


##########
dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/stream/TripleClientStreamTest.java:
##########
@@ -100,11 +102,13 @@ void progress() {
         requestMetadata.version = url.getVersion();
         stream.sendHeader(requestMetadata.toHeaders());
         verify(writeQueue).enqueueFuture(any(HeaderQueueCommand.class), 
any(Executor.class));
-        // no other commands
-        verify(writeQueue).enqueue(any(QueuedCommand.class));
+        // enqueue should have been called twice: CreateStreamQueueCommand and 
InitOnReadyQueueCommand
+        verify(writeQueue, times(2)).enqueue(any(QueuedCommand.class));
         stream.sendMessage(new byte[0], 0);
         verify(writeQueue).enqueueFuture(any(DataQueueCommand.class), 
any(Executor.class));
         verify(writeQueue, times(2)).enqueueFuture(any(QueuedCommand.class), 
any(Executor.class));
+        // After sendHeader and sendMessage, enqueue should have been called 
twice:
+        // once for CreateStreamQueueCommand and once for 
InitOnReadyQueueCommand

Review Comment:
   The comment states that enqueue should have been called twice for 
CreateStreamQueueCommand and InitOnReadyQueueCommand, but this comment appears 
after sendHeader and sendMessage have been called. At this point, enqueue has 
still only been called twice (both during stream initialization), so the 
comment is accurate but potentially misleading. The comment seems to suggest 
these calls happened after sendHeader and sendMessage, but they actually 
happened during stream construction.



##########
dubbo-plugin/dubbo-reactive/src/main/java/org/apache/dubbo/reactive/AbstractTripleReactorPublisher.java:
##########
@@ -71,8 +71,20 @@ protected void onSubscribe(final CallStreamObserver<?> 
subscription) {
         if (subscription != null && this.subscription == null && 
HAS_SUBSCRIPTION.compareAndSet(false, true)) {
             this.subscription = subscription;
             subscription.disableAutoFlowControl();
+
+            // Set up onReadyHandler to trigger onSubscribe callback when 
stream becomes ready.
+            // This is called AFTER call.start() via InitOnReadyQueueCommand, 
ensuring the stream
+            // is created before any data is sent
+            // is triggered by onReady, not by onStart (which requires server 
headers).

Review Comment:
   The comment on line 78 appears to be incomplete or has a grammatical error. 
The sentence starts with "is triggered by onReady" but doesn't have a subject. 
The comment should be revised to clearly state what is triggered by onReady 
versus onStart.
   ```suggestion
               // is created before any data is sent.
               // The callback is triggered by onReady, not by onStart (which 
requires server headers).
   ```



##########
dubbo-plugin/dubbo-reactive/src/main/java/org/apache/dubbo/reactive/AbstractTripleReactorPublisher.java:
##########
@@ -71,8 +71,20 @@ protected void onSubscribe(final CallStreamObserver<?> 
subscription) {
         if (subscription != null && this.subscription == null && 
HAS_SUBSCRIPTION.compareAndSet(false, true)) {
             this.subscription = subscription;
             subscription.disableAutoFlowControl();
+
+            // Set up onReadyHandler to trigger onSubscribe callback when 
stream becomes ready.
+            // This is called AFTER call.start() via InitOnReadyQueueCommand, 
ensuring the stream
+            // is created before any data is sent
+            // is triggered by onReady, not by onStart (which requires server 
headers).
             if (onSubscribe != null) {
-                onSubscribe.accept(subscription);
+                subscription.setOnReadyHandler(() -> {
+                    // Only execute the callback once (on first onReady)
+                    Consumer<CallStreamObserver<?>> callback = onSubscribe;
+                    if (callback != null && subscription.isReady()) {
+                        onSubscribe = null; // Clear to prevent re-execution
+                        callback.accept(subscription);

Review Comment:
   There's a potential race condition in the onReadyHandler. If the handler is 
invoked concurrently by multiple threads, line 82 could capture a non-null 
value while another thread is executing line 84 (setting onSubscribe to null). 
This could result in the callback being executed multiple times. Consider using 
a compareAndSet pattern or ensuring the handler is only invoked once at the 
framework level.
   ```suggestion
                       // Only execute the callback once (on first onReady), 
safely under synchronization
                       synchronized (AbstractTripleReactorPublisher.this) {
                           Consumer<CallStreamObserver<?>> callback = 
onSubscribe;
                           if (callback != null && subscription.isReady()) {
                               onSubscribe = null; // Clear to prevent 
re-execution
                               callback.accept(subscription);
                           }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to