Copilot commented on code in PR #5908:
URL: https://github.com/apache/ignite-3/pull/5908#discussion_r2109147166
##########
modules/core/src/main/java/org/apache/ignite/internal/streamer/StreamerSubscriber.java:
##########
@@ -356,24 +367,37 @@ private void completeWithError(Throwable throwable) {
}
}
- private synchronized void requestMore() {
- if (closed || subscription == null) {
- return;
- }
+ private void requestMore() {
+ int toRequest;
+ Subscription subscription0;
- // This method controls backpressure. We won't get more items than we
requested.
- // The idea is to have perPartitionParallelOperations batches in
flight for every connection.
- var pending = pendingItemCount.get();
- var desiredInFlight = Math.max(1, buffers.size()) * options.pageSize()
* options.perPartitionParallelOperations();
- var inFlight = inFlightItemCount.get();
- var count = desiredInFlight - inFlight - pending;
+ synchronized (this) {
+ if (closed || subscription == null) {
+ return;
+ }
- if (count <= 0) {
- return;
+ // This method controls backpressure. We won't get more items than
we requested.
+ // The idea is to have perPartitionParallelOperations batches in
flight for every connection.
+ var pending = pendingItemCount.get();
+ var desiredInFlight = Math.max(1, buffers.size()) *
options.pageSize() * options.perPartitionParallelOperations();
+ var inFlight = inFlightItemCount.get();
+ toRequest = desiredInFlight - inFlight - pending;
+
+ if (toRequest <= 0) {
+ return;
+ }
+
+ pendingItemCount.addAndGet(toRequest);
+ subscription0 = subscription;
}
- subscription.request(count);
- pendingItemCount.addAndGet(count);
+ try {
+ // User code: call outside of lock, handle exceptions.
+ subscription0.request(toRequest);
Review Comment:
Incrementing `pendingItemCount` before calling
`subscription.request(toRequest)` may lead to an inconsistent pending count if
the request throws. Consider moving the counter update after a successful
request or rolling it back in the exception handler.
```suggestion
subscription0 = subscription;
}
try {
// User code: call outside of lock, handle exceptions.
subscription0.request(toRequest);
pendingItemCount.addAndGet(toRequest);
```
##########
modules/core/src/main/java/org/apache/ignite/internal/streamer/StreamerSubscriber.java:
##########
@@ -305,20 +305,31 @@ private void invokeResultSubscriber(Collection<R> res) {
return resultSubscription;
}
- private synchronized void close(@Nullable Throwable throwable) {
- if (closed) {
- return;
- }
+ private void close(@Nullable Throwable throwable) {
+ Subscription subscription0;
+ ScheduledFuture<?> flushTask0;
Review Comment:
[nitpick] The local variable name `subscription0` is not descriptive.
Renaming it to `localSubscription` (and similarly `flushTask0` to
`localFlushTask`) would improve readability.
```suggestion
Subscription localSubscription;
ScheduledFuture<?> localFlushTask;
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]