On Tue, 11 Oct 2022 15:49:14 GMT, Daniel Fuchs <[email protected]> wrote:

> When [JDK-8277969](https://bugs.openjdk.org/browse/JDK-8277969) was 
> implemented, a list of outstanding response subscribers was added to 
> `HttpClientImpl`. A body subscriber is added to the list after being created 
> and is removed from the list when it is completed, either successfully or 
> exceptionally.
> 
> It appears that in the case where the subscription is cancelled before the 
> subscriber is completed, the subscriber might remain registered in the list 
> forever, or at least until the HttpClient gets garbage collected. This can be 
> easily reproduced using streaming subscribers, such as 
> BodySubscriber::ofInputStream. In the case where the input stream is closed 
> without having read all the bytes, Subscription::cancel will be called. 
> Whether the subscriber gets unregistered or not at that point becomes racy.
> 
> Indeed, the reactive stream specification doesn't guarantee whether 
> onComplete or onError will be called or not after a subscriber cancels its 
> subscription. Any cleanup that would have been performed by 
> onComplete/onError might therefore need to be performed when the subscription 
> is cancelled too.

@dfuch where is the part that removes the subscriber in case 
`HttpResponse.BodyHandlers#ofInputStream` is used and the `inputStream` closed 
before the response body is received?

i.e. I expect that even if I use `HttpResponse.BodyHandlers#ofInputStream` with 
`HttpClient#send` in `while(true)` loop it will retain only 1 subscriber at all 
times.

Example code for clarification (not complete code):

private static final HttpClient HTTP_CLIENT = HttpClient.newHttpClient();

public static void main(String[] args) throws IOException, InterruptedException 
{
        final URI uri = 
URI.create("https://www.oref.org.il/WarningMessages/alert/alerts.json";);
        final int minRedAlertEventContentLength = """
                        
{"cat":"1","data":[],"desc":"","id":0,"title":""}""".getBytes(StandardCharsets.UTF_8).length;

        while (true) {
                final HttpResponse<InputStream> httpResponse = HTTP_CLIENT.send(
                                HttpRequest.newBuilder(uri)
                                                .header("Accept", 
"application/json")
                                                .header("X-Requested-With", 
"XMLHttpRequest")
                                                .header("Referer", 
"https://www.oref.org.il/12481-he/Pakar.aspx";)
                                                .timeout(Duration.ofSeconds(10))
                                                .build(),
                                HttpResponse.BodyHandlers.ofInputStream()
                );

                try (InputStream httpResponseBody = httpResponse.body()) {
                        if (httpResponse.statusCode() != 
HttpURLConnection.HTTP_OK) {
                                sleep();
                                continue;
                        }
                        if 
(httpResponse.headers().firstValueAsLong("Content-Length").orElse(-1) > 
minRedAlertEventContentLength)
                                System.out.println(JSON_MAPPER.readValue(
                                                httpResponseBody,
                                                RedAlertEvent.class
                                ));
                }
        }
}

private record RedAlertEvent(
                int cat,
                List<String> data,
                String desc,
                long id,
                String title
) {
}

-------------

PR: https://git.openjdk.org/jdk/pull/10659

Reply via email to