Copilot commented on code in PR #5908:
URL: https://github.com/apache/ignite-3/pull/5908#discussion_r2109147166


##########
modules/core/src/main/java/org/apache/ignite/internal/streamer/StreamerSubscriber.java:
##########
@@ -356,24 +367,37 @@ private void completeWithError(Throwable throwable) {
         }
     }
 
-    private synchronized void requestMore() {
-        if (closed || subscription == null) {
-            return;
-        }
+    private void requestMore() {
+        int toRequest;
+        Subscription subscription0;
 
-        // This method controls backpressure. We won't get more items than we 
requested.
-        // The idea is to have perPartitionParallelOperations batches in 
flight for every connection.
-        var pending = pendingItemCount.get();
-        var desiredInFlight = Math.max(1, buffers.size()) * options.pageSize() 
* options.perPartitionParallelOperations();
-        var inFlight = inFlightItemCount.get();
-        var count = desiredInFlight - inFlight - pending;
+        synchronized (this) {
+            if (closed || subscription == null) {
+                return;
+            }
 
-        if (count <= 0) {
-            return;
+            // This method controls backpressure. We won't get more items than 
we requested.
+            // The idea is to have perPartitionParallelOperations batches in 
flight for every connection.
+            var pending = pendingItemCount.get();
+            var desiredInFlight = Math.max(1, buffers.size()) * 
options.pageSize() * options.perPartitionParallelOperations();
+            var inFlight = inFlightItemCount.get();
+            toRequest = desiredInFlight - inFlight - pending;
+
+            if (toRequest <= 0) {
+                return;
+            }
+
+            pendingItemCount.addAndGet(toRequest);
+            subscription0 = subscription;
         }
 
-        subscription.request(count);
-        pendingItemCount.addAndGet(count);
+        try {
+            // User code: call outside of lock, handle exceptions.
+            subscription0.request(toRequest);

Review Comment:
   Incrementing `pendingItemCount` before calling 
`subscription.request(toRequest)` may lead to an inconsistent pending count if 
the request throws. Consider moving the counter update after a successful 
request or rolling it back in the exception handler.
   ```suggestion
               subscription0 = subscription;
           }
   
           try {
               // User code: call outside of lock, handle exceptions.
               subscription0.request(toRequest);
               pendingItemCount.addAndGet(toRequest);
   ```



##########
modules/core/src/main/java/org/apache/ignite/internal/streamer/StreamerSubscriber.java:
##########
@@ -305,20 +305,31 @@ private void invokeResultSubscriber(Collection<R> res) {
         return resultSubscription;
     }
 
-    private synchronized void close(@Nullable Throwable throwable) {
-        if (closed) {
-            return;
-        }
+    private void close(@Nullable Throwable throwable) {
+        Subscription subscription0;
+        ScheduledFuture<?> flushTask0;

Review Comment:
   [nitpick] The local variable name `subscription0` is not descriptive. 
Renaming it to `localSubscription` (and similarly `flushTask0` to 
`localFlushTask`) would improve readability.
   ```suggestion
           Subscription localSubscription;
           ScheduledFuture<?> localFlushTask;
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to