Copilot commented on code in PR #15967:
URL: https://github.com/apache/dubbo/pull/15967#discussion_r2663667138
##########
dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/stream/TripleClientStreamTest.java:
##########
@@ -100,11 +102,13 @@ void progress() {
requestMetadata.version = url.getVersion();
stream.sendHeader(requestMetadata.toHeaders());
verify(writeQueue).enqueueFuture(any(HeaderQueueCommand.class),
any(Executor.class));
- // no other commands
- verify(writeQueue).enqueue(any(QueuedCommand.class));
+ // enqueue should have been called twice: CreateStreamQueueCommand and
InitOnReadyQueueCommand
+ verify(writeQueue, times(2)).enqueue(any(QueuedCommand.class));
stream.sendMessage(new byte[0], 0);
verify(writeQueue).enqueueFuture(any(DataQueueCommand.class),
any(Executor.class));
verify(writeQueue, times(2)).enqueueFuture(any(QueuedCommand.class),
any(Executor.class));
+ // After sendHeader and sendMessage, enqueue should have been called
twice:
+ // once for CreateStreamQueueCommand and once for
InitOnReadyQueueCommand
Review Comment:
The comment states that enqueue should have been called twice for
CreateStreamQueueCommand and InitOnReadyQueueCommand, but this comment appears
after sendHeader and sendMessage have been called. At this point, enqueue has
still only been called twice (both during stream initialization), so the
comment is accurate but potentially misleading. The comment seems to suggest
these calls happened after sendHeader and sendMessage, but they actually
happened during stream construction.
##########
dubbo-plugin/dubbo-reactive/src/main/java/org/apache/dubbo/reactive/AbstractTripleReactorPublisher.java:
##########
@@ -71,8 +71,20 @@ protected void onSubscribe(final CallStreamObserver<?>
subscription) {
if (subscription != null && this.subscription == null &&
HAS_SUBSCRIPTION.compareAndSet(false, true)) {
this.subscription = subscription;
subscription.disableAutoFlowControl();
+
+ // Set up onReadyHandler to trigger onSubscribe callback when
stream becomes ready.
+ // This is called AFTER call.start() via InitOnReadyQueueCommand,
ensuring the stream
+ // is created before any data is sent
+ // is triggered by onReady, not by onStart (which requires server
headers).
Review Comment:
The comment on line 78 appears to be incomplete or has a grammatical error.
The sentence starts with "is triggered by onReady" but doesn't have a subject.
The comment should be revised to clearly state what is triggered by onReady
versus onStart.
```suggestion
// is created before any data is sent.
// The callback is triggered by onReady, not by onStart (which
requires server headers).
```
##########
dubbo-plugin/dubbo-reactive/src/main/java/org/apache/dubbo/reactive/AbstractTripleReactorPublisher.java:
##########
@@ -71,8 +71,20 @@ protected void onSubscribe(final CallStreamObserver<?>
subscription) {
if (subscription != null && this.subscription == null &&
HAS_SUBSCRIPTION.compareAndSet(false, true)) {
this.subscription = subscription;
subscription.disableAutoFlowControl();
+
+ // Set up onReadyHandler to trigger onSubscribe callback when
stream becomes ready.
+ // This is called AFTER call.start() via InitOnReadyQueueCommand,
ensuring the stream
+ // is created before any data is sent
+ // is triggered by onReady, not by onStart (which requires server
headers).
if (onSubscribe != null) {
- onSubscribe.accept(subscription);
+ subscription.setOnReadyHandler(() -> {
+ // Only execute the callback once (on first onReady)
+ Consumer<CallStreamObserver<?>> callback = onSubscribe;
+ if (callback != null && subscription.isReady()) {
+ onSubscribe = null; // Clear to prevent re-execution
+ callback.accept(subscription);
Review Comment:
There's a potential race condition in the onReadyHandler. If the handler is
invoked concurrently by multiple threads, line 82 could capture a non-null
value while another thread is executing line 84 (setting onSubscribe to null).
This could result in the callback being executed multiple times. Consider using
a compareAndSet pattern or ensuring the handler is only invoked once at the
framework level.
```suggestion
// Only execute the callback once (on first onReady),
safely under synchronization
synchronized (AbstractTripleReactorPublisher.this) {
Consumer<CallStreamObserver<?>> callback =
onSubscribe;
if (callback != null && subscription.isReady()) {
onSubscribe = null; // Clear to prevent
re-execution
callback.accept(subscription);
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]