On Tue, 21 Feb 2023 16:28:52 GMT, Daniel Fuchs <[email protected]> wrote:
>> This fix revisits an assertion that has been observed failing in >> ResponseSubscribers.HttpResponseInputStream. >> >> The HttpResponseInputStream has the logic to wait until a buffer has been >> taken out of the queue before requesting a new one. Therefore there should >> at most be one byte buffer in the queue, except in the case of error (or >> asychronous close), where a LAST_LIST sentinel is inserted in the queue to >> unblock the consumer of the input stream, which might be blocked in >> queue.take(). >> >> The HttpResponseInputStream thus asserts, when processing a subscription, >> that the remaining capacity of the queue should be greater than 1 (unless >> already closed), to ensure that there will be room for the LAST_LIST >> sentinel. However, in case of asynchronous shutdown of the executor, it's >> possible that the subscriber will be marked failed and the LAST_LIST >> sentinel inserted into the queue before/at the same time that the >> subscription is processed. >> >> This fix proposes to relax the assertion to only fire if closed == false and >> failed == null and capacity <= 1 when processing the subscription > > Daniel Fuchs has updated the pull request incrementally with one additional > commit since the last revision: > > Call tryRegister() before markSubscribed() @jaikiran made me realise that there is a code path where `register()` might end up in a call to `HttpSubscriberWrapper.onError()`, after the subscriber was `markSubscribed()` but before `onSubscribe()` was called on the wrapped subscriber. This results in having `onError()` called on the wrapped subscriber before `onSubscribe()`, which is the very thing that the `HttpSubscriberWrapper` class is trying to prevent. I have slightly modified `HttpSubscriberWrapper::onSubscribe` to cater for this, by calling `tryRegister` before calling `markSusbcribed`, but still within the `subscriptionLock`. Now if `register()` ends up calling `onError`, the subscriber will not be marked subscribed yet, causing `propagateError` to behave correctly. The real subscription will then be cancelled, which is what we want. Thanks for the good analysis @jaikiran! ------------- PR: https://git.openjdk.org/jdk/pull/12677
