On Tue, 21 Feb 2023 16:28:52 GMT, Daniel Fuchs <[email protected]> wrote:

>> This fix revisits an assertion that has been observed failing in 
>> ResponseSubscribers.HttpResponseInputStream.
>> 
>> The HttpResponseInputStream has the logic to wait until a buffer has been 
>> taken out of the queue before requesting a new one. Therefore there should 
>> at most be one byte buffer in the queue, except in the case of error (or 
>> asychronous close), where a LAST_LIST sentinel is inserted in the queue to 
>> unblock the consumer of the input stream, which might be blocked in 
>> queue.take().
>> 
>> The  HttpResponseInputStream thus asserts, when processing a subscription, 
>> that the remaining capacity of the queue should be greater than 1 (unless 
>> already closed), to ensure that there will be room for the LAST_LIST 
>> sentinel. However, in case of asynchronous shutdown of the executor, it's 
>> possible that the subscriber will be marked failed and the LAST_LIST 
>> sentinel inserted into the queue before/at the same time that the 
>> subscription is processed.
>> 
>> This fix proposes to relax the assertion to only fire if closed == false and 
>> failed == null and capacity <= 1 when processing the subscription
>
> Daniel Fuchs has updated the pull request incrementally with one additional 
> commit since the last revision:
> 
>   Call tryRegister() before markSubscribed()

@jaikiran made me realise that there is a code path where `register()` might 
end up in a call to `HttpSubscriberWrapper.onError()`, after the subscriber was 
`markSubscribed()` but before `onSubscribe()` was called on the wrapped 
subscriber. This results in having `onError()` called on the wrapped subscriber 
before  `onSubscribe()`, which is the very thing that the 
`HttpSubscriberWrapper` class is trying to prevent. 

I have slightly modified `HttpSubscriberWrapper::onSubscribe` to cater for 
this, by calling `tryRegister` before calling `markSusbcribed`, but still 
within the `subscriptionLock`. Now if `register()` ends up calling `onError`, 
the subscriber will not be marked subscribed yet, causing `propagateError` to 
behave correctly. The real subscription will then be cancelled, which is what 
we want.

Thanks for the good analysis @jaikiran!

-------------

PR: https://git.openjdk.org/jdk/pull/12677

Reply via email to