On Tue, 11 Oct 2022 15:49:14 GMT, Daniel Fuchs <[email protected]> wrote:
> When [JDK-8277969](https://bugs.openjdk.org/browse/JDK-8277969) was > implemented, a list of outstanding response subscribers was added to > `HttpClientImpl`. A body subscriber is added to the list after being created > and is removed from the list when it is completed, either successfully or > exceptionally. > > It appears that in the case where the subscription is cancelled before the > subscriber is completed, the subscriber might remain registered in the list > forever, or at least until the HttpClient gets garbage collected. This can be > easily reproduced using streaming subscribers, such as > BodySubscriber::ofInputStream. In the case where the input stream is closed > without having read all the bytes, Subscription::cancel will be called. > Whether the subscriber gets unregistered or not at that point becomes racy. > > Indeed, the reactive stream specification doesn't guarantee whether > onComplete or onError will be called or not after a subscriber cancels its > subscription. Any cleanup that would have been performed by > onComplete/onError might therefore need to be performed when the subscription > is cancelled too. @dfuch where is the part that removes the subscriber in case `HttpResponse.BodyHandlers#ofInputStream` is used and the `inputStream` closed before the response body is received? i.e. I expect that even if I use `HttpResponse.BodyHandlers#ofInputStream` with `HttpClient#send` in `while(true)` loop it will retain only 1 subscriber at all times. Example code for clarification (not complete code): private static final HttpClient HTTP_CLIENT = HttpClient.newHttpClient(); public static void main(String[] args) throws IOException, InterruptedException { final URI uri = URI.create("https://www.oref.org.il/WarningMessages/alert/alerts.json"); final int minRedAlertEventContentLength = """ {"cat":"1","data":[],"desc":"","id":0,"title":""}""".getBytes(StandardCharsets.UTF_8).length; while (true) { final HttpResponse<InputStream> httpResponse = HTTP_CLIENT.send( HttpRequest.newBuilder(uri) .header("Accept", "application/json") .header("X-Requested-With", "XMLHttpRequest") .header("Referer", "https://www.oref.org.il/12481-he/Pakar.aspx") .timeout(Duration.ofSeconds(10)) .build(), HttpResponse.BodyHandlers.ofInputStream() ); try (InputStream httpResponseBody = httpResponse.body()) { if (httpResponse.statusCode() != HttpURLConnection.HTTP_OK) { sleep(); continue; } if (httpResponse.headers().firstValueAsLong("Content-Length").orElse(-1) > minRedAlertEventContentLength) System.out.println(JSON_MAPPER.readValue( httpResponseBody, RedAlertEvent.class )); } } } private record RedAlertEvent( int cat, List<String> data, String desc, long id, String title ) { } ------------- PR: https://git.openjdk.org/jdk/pull/10659
