Repository: cxf Updated Branches: refs/heads/master b69bd4273 -> 85e9a4780
CXF-7085: Introduce support for Server Sent Events (Client). Multiple bugfixes and improvements Project: http://git-wip-us.apache.org/repos/asf/cxf/repo Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/85e9a478 Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/85e9a478 Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/85e9a478 Branch: refs/heads/master Commit: 85e9a478055cb35f917e5cfa8e19fa52a7290fe0 Parents: b69bd42 Author: reta <drr...@gmail.com> Authored: Thu Aug 10 19:56:37 2017 -0400 Committer: reta <drr...@gmail.com> Committed: Thu Aug 10 19:56:37 2017 -0400 ---------------------------------------------------------------------- .../sse/client/InboundSseEventProcessor.java | 5 ++- .../jaxrs/sse/client/SseEventSourceImpl.java | 33 ++++++++++++++------ .../AtmosphereSseServletDestination.java | 4 +++ .../cxf/systest/jaxrs/sse/AbstractSseTest.java | 17 +++++++++- .../apache/cxf/systest/jaxrs/sse/BookStore.java | 7 +++++ .../cxf/systest/jaxrs/sse/BookStore2.java | 7 +++++ 6 files changed, 62 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cxf/blob/85e9a478/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/client/InboundSseEventProcessor.java ---------------------------------------------------------------------- diff --git a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/client/InboundSseEventProcessor.java b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/client/InboundSseEventProcessor.java index 80a34cd..d105963 100644 --- a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/client/InboundSseEventProcessor.java +++ b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/client/InboundSseEventProcessor.java @@ -118,11 +118,14 @@ public class InboundSseEventProcessor { response.close(); } - closed = true; return null; }; } + boolean isClosed() { + return closed; + } + boolean close(long timeout, TimeUnit unit) { try { closed = true; http://git-wip-us.apache.org/repos/asf/cxf/blob/85e9a478/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/client/SseEventSourceImpl.java ---------------------------------------------------------------------- diff --git a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/client/SseEventSourceImpl.java b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/client/SseEventSourceImpl.java index ad002c8..d8b22d4 100644 --- a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/client/SseEventSourceImpl.java +++ b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/client/SseEventSourceImpl.java @@ -28,6 +28,7 @@ import java.util.function.Consumer; import java.util.logging.Logger; import javax.ws.rs.client.WebTarget; +import javax.ws.rs.core.Configuration; import javax.ws.rs.core.HttpHeaders; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.MultivaluedHashMap; @@ -52,6 +53,7 @@ public class SseEventSourceImpl implements SseEventSource { // It may happen that open() and close() could be called on separate threads private volatile ScheduledExecutorService executor; + private volatile boolean managedExecutor = true; private volatile InboundSseEventProcessor processor; private volatile TimeUnit unit; private volatile long delay; @@ -160,14 +162,18 @@ public class SseEventSourceImpl implements SseEventSource { } // Create the executor for scheduling the reconnect tasks - executor = - (ScheduledExecutorService)target.getConfiguration().getProperty("scheduledExecutorService"); + final Configuration configuration = target.getConfiguration(); if (executor == null) { - executor = Executors.newSingleThreadScheduledExecutor(); + executor = (ScheduledExecutorService)configuration + .getProperty("scheduledExecutorService"); + + if (executor == null) { + executor = Executors.newSingleThreadScheduledExecutor(); + managedExecutor = false; /* we manage lifecycle */ + } } - - final Object lastEventId = target.getConfiguration().getProperty(HttpHeaders.LAST_EVENT_ID_HEADER); + final Object lastEventId = configuration.getProperty(HttpHeaders.LAST_EVENT_ID_HEADER); connect(lastEventId != null ? lastEventId.toString() : null); } @@ -187,7 +193,7 @@ public class SseEventSourceImpl implements SseEventSource { .get(); // A client can be told to stop reconnecting using the HTTP 204 No Content - // response code. In this case, we should stop here. + // response code. In this case, we should give up. if (response.getStatus() == 204) { LOG.fine("SSE endpoint " + target.getUri() + " returns no data, disconnecting"); state.compareAndSet(SseSourceState.CONNECTING, SseSourceState.CLOSED); @@ -196,11 +202,18 @@ public class SseEventSourceImpl implements SseEventSource { } final Endpoint endpoint = WebClient.getConfig(target).getEndpoint(); - processor = new InboundSseEventProcessor(endpoint, delegate); + // Create new processor if this is the first time or the old one has been closed + if (processor == null || processor.isClosed()) { + LOG.fine("Creating new instance of SSE event processor ..."); + processor = new InboundSseEventProcessor(endpoint, delegate); + } + + // Start consuming events processor.run(response); + LOG.fine("SSE event processor has been started ..."); state.compareAndSet(SseSourceState.CONNECTING, SseSourceState.OPEN); - LOG.fine("Opened SSE connection to " + target.getUri()); + LOG.fine("Successfuly opened SSE connection to " + target.getUri()); } catch (final Exception ex) { if (processor != null) { processor.close(1, TimeUnit.SECONDS); @@ -234,8 +247,10 @@ public class SseEventSourceImpl implements SseEventSource { throw new IllegalStateException("The SseEventSource is not opened, but in " + state.get() + " state"); } - if (executor != null) { + if (executor != null && !managedExecutor) { executor.shutdown(); + executor = null; + managedExecutor = true; } // Should never happen http://git-wip-us.apache.org/repos/asf/cxf/blob/85e9a478/rt/rs/sse/src/main/java/org/apache/cxf/transport/sse/atmosphere/AtmosphereSseServletDestination.java ---------------------------------------------------------------------- diff --git a/rt/rs/sse/src/main/java/org/apache/cxf/transport/sse/atmosphere/AtmosphereSseServletDestination.java b/rt/rs/sse/src/main/java/org/apache/cxf/transport/sse/atmosphere/AtmosphereSseServletDestination.java index 8407021..92d6115 100644 --- a/rt/rs/sse/src/main/java/org/apache/cxf/transport/sse/atmosphere/AtmosphereSseServletDestination.java +++ b/rt/rs/sse/src/main/java/org/apache/cxf/transport/sse/atmosphere/AtmosphereSseServletDestination.java @@ -67,6 +67,10 @@ public class AtmosphereSseServletDestination extends ServletDestination { framework.addInitParameter(ApplicationConfig.WEBSOCKET_SUPPORT, "true"); framework.addInitParameter(ApplicationConfig.DISABLE_ATMOSPHEREINTERCEPTOR, "true"); framework.addInitParameter(ApplicationConfig.CLOSE_STREAM_ON_CANCEL, "true"); + // Atmosphere does not limit amount of threads and application can crash in no time + // https://github.com/Atmosphere/atmosphere/wiki/Configuring-Atmosphere-for-Performance + framework.addInitParameter(ApplicationConfig.BROADCASTER_MESSAGE_PROCESSING_THREADPOOL_MAXSIZE, "20"); + framework.addInitParameter(ApplicationConfig.BROADCASTER_ASYNC_WRITE_THREADPOOL_MAXSIZE, "20"); framework.setBroadcasterCacheClassName(UUIDBroadcasterCache.class.getName()); framework.addAtmosphereHandler("/", new DestinationHandler()); framework.init(); http://git-wip-us.apache.org/repos/asf/cxf/blob/85e9a478/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/AbstractSseTest.java ---------------------------------------------------------------------- diff --git a/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/AbstractSseTest.java b/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/AbstractSseTest.java index 0232d45..5c8bf99 100644 --- a/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/AbstractSseTest.java +++ b/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/AbstractSseTest.java @@ -88,7 +88,22 @@ public abstract class AbstractSseTest extends AbstractSseBaseTest { ) ); } - + + @Test + public void testNoDataIsReturnedFromInboundSseEvents() throws InterruptedException { + final WebTarget target = createWebTarget("/rest/api/bookstore/nodata"); + final Collection<Book> books = new ArrayList<>(); + + try (SseEventSource eventSource = SseEventSource.target(target).build()) { + eventSource.register(collect(books), System.out::println); + eventSource.open(); + // Give the SSE stream some time to collect all events + Thread.sleep(1000); + } + // Easing the test verification here, it does not work well for Atm + Jetty + assertTrue(books.isEmpty()); + } + @Test public void testBooksStreamIsReconnectedFromInboundSseEvents() throws InterruptedException { final WebTarget target = createWebTarget("/rest/api/bookstore/sse/0"); http://git-wip-us.apache.org/repos/asf/cxf/blob/85e9a478/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/BookStore.java ---------------------------------------------------------------------- diff --git a/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/BookStore.java b/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/BookStore.java index c096713..a0c3fd8 100644 --- a/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/BookStore.java +++ b/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/BookStore.java @@ -94,6 +94,13 @@ public class BookStore { } @GET + @Path("nodata") + @Produces(MediaType.SERVER_SENT_EVENTS) + public void nodata(@Context SseEventSink sink) { + sink.close(); + } + + @GET @Path("broadcast/sse") @Produces(MediaType.SERVER_SENT_EVENTS) public void broadcast(@Context SseEventSink sink) { http://git-wip-us.apache.org/repos/asf/cxf/blob/85e9a478/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/BookStore2.java ---------------------------------------------------------------------- diff --git a/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/BookStore2.java b/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/BookStore2.java index a5eeb8e..f97906c 100644 --- a/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/BookStore2.java +++ b/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/BookStore2.java @@ -102,6 +102,13 @@ public class BookStore2 { latch.countDown(); } } + + @GET + @Path("nodata") + @Produces(MediaType.SERVER_SENT_EVENTS) + public void nodata(@Context SseEventSink sink) { + sink.close(); + } @POST @Path("broadcast/close")