Copilot commented on code in PR #5908: URL: https://github.com/apache/ignite-3/pull/5908#discussion_r2109147166
########## modules/core/src/main/java/org/apache/ignite/internal/streamer/StreamerSubscriber.java: ########## @@ -356,24 +367,37 @@ private void completeWithError(Throwable throwable) { } } - private synchronized void requestMore() { - if (closed || subscription == null) { - return; - } + private void requestMore() { + int toRequest; + Subscription subscription0; - // This method controls backpressure. We won't get more items than we requested. - // The idea is to have perPartitionParallelOperations batches in flight for every connection. - var pending = pendingItemCount.get(); - var desiredInFlight = Math.max(1, buffers.size()) * options.pageSize() * options.perPartitionParallelOperations(); - var inFlight = inFlightItemCount.get(); - var count = desiredInFlight - inFlight - pending; + synchronized (this) { + if (closed || subscription == null) { + return; + } - if (count <= 0) { - return; + // This method controls backpressure. We won't get more items than we requested. + // The idea is to have perPartitionParallelOperations batches in flight for every connection. + var pending = pendingItemCount.get(); + var desiredInFlight = Math.max(1, buffers.size()) * options.pageSize() * options.perPartitionParallelOperations(); + var inFlight = inFlightItemCount.get(); + toRequest = desiredInFlight - inFlight - pending; + + if (toRequest <= 0) { + return; + } + + pendingItemCount.addAndGet(toRequest); + subscription0 = subscription; } - subscription.request(count); - pendingItemCount.addAndGet(count); + try { + // User code: call outside of lock, handle exceptions. + subscription0.request(toRequest); Review Comment: Incrementing `pendingItemCount` before calling `subscription.request(toRequest)` may lead to an inconsistent pending count if the request throws. Consider moving the counter update after a successful request or rolling it back in the exception handler. ```suggestion subscription0 = subscription; } try { // User code: call outside of lock, handle exceptions. subscription0.request(toRequest); pendingItemCount.addAndGet(toRequest); ``` ########## modules/core/src/main/java/org/apache/ignite/internal/streamer/StreamerSubscriber.java: ########## @@ -305,20 +305,31 @@ private void invokeResultSubscriber(Collection<R> res) { return resultSubscription; } - private synchronized void close(@Nullable Throwable throwable) { - if (closed) { - return; - } + private void close(@Nullable Throwable throwable) { + Subscription subscription0; + ScheduledFuture<?> flushTask0; Review Comment: [nitpick] The local variable name `subscription0` is not descriptive. Renaming it to `localSubscription` (and similarly `flushTask0` to `localFlushTask`) would improve readability. ```suggestion Subscription localSubscription; ScheduledFuture<?> localFlushTask; ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org