Repository: cxf
Updated Branches:
  refs/heads/master b69bd4273 -> 85e9a4780


CXF-7085: Introduce support for Server Sent Events (Client). Multiple bugfixes 
and improvements


Project: http://git-wip-us.apache.org/repos/asf/cxf/repo
Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/85e9a478
Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/85e9a478
Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/85e9a478

Branch: refs/heads/master
Commit: 85e9a478055cb35f917e5cfa8e19fa52a7290fe0
Parents: b69bd42
Author: reta <drr...@gmail.com>
Authored: Thu Aug 10 19:56:37 2017 -0400
Committer: reta <drr...@gmail.com>
Committed: Thu Aug 10 19:56:37 2017 -0400

----------------------------------------------------------------------
 .../sse/client/InboundSseEventProcessor.java    |  5 ++-
 .../jaxrs/sse/client/SseEventSourceImpl.java    | 33 ++++++++++++++------
 .../AtmosphereSseServletDestination.java        |  4 +++
 .../cxf/systest/jaxrs/sse/AbstractSseTest.java  | 17 +++++++++-
 .../apache/cxf/systest/jaxrs/sse/BookStore.java |  7 +++++
 .../cxf/systest/jaxrs/sse/BookStore2.java       |  7 +++++
 6 files changed, 62 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cxf/blob/85e9a478/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/client/InboundSseEventProcessor.java
----------------------------------------------------------------------
diff --git 
a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/client/InboundSseEventProcessor.java
 
b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/client/InboundSseEventProcessor.java
index 80a34cd..d105963 100644
--- 
a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/client/InboundSseEventProcessor.java
+++ 
b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/client/InboundSseEventProcessor.java
@@ -118,11 +118,14 @@ public class InboundSseEventProcessor {
                 response.close();
             }
 
-            closed = true;
             return null;
         };
     }
     
+    boolean isClosed() {
+        return closed;
+    }
+    
     boolean close(long timeout, TimeUnit unit) {
         try {
             closed = true;

http://git-wip-us.apache.org/repos/asf/cxf/blob/85e9a478/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/client/SseEventSourceImpl.java
----------------------------------------------------------------------
diff --git 
a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/client/SseEventSourceImpl.java
 
b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/client/SseEventSourceImpl.java
index ad002c8..d8b22d4 100644
--- 
a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/client/SseEventSourceImpl.java
+++ 
b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/client/SseEventSourceImpl.java
@@ -28,6 +28,7 @@ import java.util.function.Consumer;
 import java.util.logging.Logger;
 
 import javax.ws.rs.client.WebTarget;
+import javax.ws.rs.core.Configuration;
 import javax.ws.rs.core.HttpHeaders;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.MultivaluedHashMap;
@@ -52,6 +53,7 @@ public class SseEventSourceImpl implements SseEventSource {
     
     // It may happen that open() and close() could be called on separate 
threads
     private volatile ScheduledExecutorService executor;
+    private volatile boolean managedExecutor = true;
     private volatile InboundSseEventProcessor processor; 
     private volatile TimeUnit unit;
     private volatile long delay;
@@ -160,14 +162,18 @@ public class SseEventSourceImpl implements SseEventSource 
{
         }
 
         // Create the executor for scheduling the reconnect tasks 
-        executor = 
-            
(ScheduledExecutorService)target.getConfiguration().getProperty("scheduledExecutorService");
+        final Configuration configuration = target.getConfiguration();
         if (executor == null) {
-            executor = Executors.newSingleThreadScheduledExecutor();    
+            executor = (ScheduledExecutorService)configuration
+                .getProperty("scheduledExecutorService");
+            
+            if (executor == null) {
+                executor = Executors.newSingleThreadScheduledExecutor();
+                managedExecutor = false; /* we manage lifecycle */
+            }
         }
         
-        
-        final Object lastEventId = 
target.getConfiguration().getProperty(HttpHeaders.LAST_EVENT_ID_HEADER);
+        final Object lastEventId = 
configuration.getProperty(HttpHeaders.LAST_EVENT_ID_HEADER);
         connect(lastEventId != null ? lastEventId.toString() : null);
     }
 
@@ -187,7 +193,7 @@ public class SseEventSourceImpl implements SseEventSource {
                 .get();
             
             // A client can be told to stop reconnecting using the HTTP 204 No 
Content 
-            // response code. In this case, we should stop here.
+            // response code. In this case, we should give up.
             if (response.getStatus() == 204) {
                 LOG.fine("SSE endpoint " + target.getUri() + " returns no 
data, disconnecting");
                 state.compareAndSet(SseSourceState.CONNECTING, 
SseSourceState.CLOSED);
@@ -196,11 +202,18 @@ public class SseEventSourceImpl implements SseEventSource 
{
             }
 
             final Endpoint endpoint = 
WebClient.getConfig(target).getEndpoint();
-            processor = new InboundSseEventProcessor(endpoint, delegate);
+            // Create new processor if this is the first time or the old one 
has been closed 
+            if (processor == null || processor.isClosed()) {
+                LOG.fine("Creating new instance of SSE event processor ...");
+                processor = new InboundSseEventProcessor(endpoint, delegate);
+            }
+            
+            // Start consuming events
             processor.run(response);
+            LOG.fine("SSE event processor has been started ...");
             
             state.compareAndSet(SseSourceState.CONNECTING, 
SseSourceState.OPEN);
-            LOG.fine("Opened SSE connection to " + target.getUri());
+            LOG.fine("Successfuly opened SSE connection to " + 
target.getUri());
         } catch (final Exception ex) {
             if (processor != null) {
                 processor.close(1, TimeUnit.SECONDS);
@@ -234,8 +247,10 @@ public class SseEventSourceImpl implements SseEventSource {
             throw new IllegalStateException("The SseEventSource is not opened, 
but in " + state.get() + " state");
         }
         
-        if (executor != null) {
+        if (executor != null && !managedExecutor) {
             executor.shutdown();
+            executor = null;
+            managedExecutor = true;
         }
 
         // Should never happen

http://git-wip-us.apache.org/repos/asf/cxf/blob/85e9a478/rt/rs/sse/src/main/java/org/apache/cxf/transport/sse/atmosphere/AtmosphereSseServletDestination.java
----------------------------------------------------------------------
diff --git 
a/rt/rs/sse/src/main/java/org/apache/cxf/transport/sse/atmosphere/AtmosphereSseServletDestination.java
 
b/rt/rs/sse/src/main/java/org/apache/cxf/transport/sse/atmosphere/AtmosphereSseServletDestination.java
index 8407021..92d6115 100644
--- 
a/rt/rs/sse/src/main/java/org/apache/cxf/transport/sse/atmosphere/AtmosphereSseServletDestination.java
+++ 
b/rt/rs/sse/src/main/java/org/apache/cxf/transport/sse/atmosphere/AtmosphereSseServletDestination.java
@@ -67,6 +67,10 @@ public class AtmosphereSseServletDestination extends 
ServletDestination {
         framework.addInitParameter(ApplicationConfig.WEBSOCKET_SUPPORT, 
"true");
         
framework.addInitParameter(ApplicationConfig.DISABLE_ATMOSPHEREINTERCEPTOR, 
"true");
         framework.addInitParameter(ApplicationConfig.CLOSE_STREAM_ON_CANCEL, 
"true");
+        // Atmosphere does not limit amount of threads and application can 
crash in no time
+        // 
https://github.com/Atmosphere/atmosphere/wiki/Configuring-Atmosphere-for-Performance
+        
framework.addInitParameter(ApplicationConfig.BROADCASTER_MESSAGE_PROCESSING_THREADPOOL_MAXSIZE,
 "20");
+        
framework.addInitParameter(ApplicationConfig.BROADCASTER_ASYNC_WRITE_THREADPOOL_MAXSIZE,
 "20");
         
framework.setBroadcasterCacheClassName(UUIDBroadcasterCache.class.getName());
         framework.addAtmosphereHandler("/", new DestinationHandler());
         framework.init();

http://git-wip-us.apache.org/repos/asf/cxf/blob/85e9a478/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/AbstractSseTest.java
----------------------------------------------------------------------
diff --git 
a/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/AbstractSseTest.java
 
b/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/AbstractSseTest.java
index 0232d45..5c8bf99 100644
--- 
a/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/AbstractSseTest.java
+++ 
b/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/AbstractSseTest.java
@@ -88,7 +88,22 @@ public abstract class AbstractSseTest extends 
AbstractSseBaseTest {
             )
         );
     }
-    
+
+    @Test
+    public void testNoDataIsReturnedFromInboundSseEvents() throws 
InterruptedException {
+        final WebTarget target = createWebTarget("/rest/api/bookstore/nodata");
+        final Collection<Book> books = new ArrayList<>();
+        
+        try (SseEventSource eventSource = 
SseEventSource.target(target).build()) {
+            eventSource.register(collect(books), System.out::println);
+            eventSource.open();
+            // Give the SSE stream some time to collect all events
+            Thread.sleep(1000);
+        }
+        // Easing the test verification here, it does not work well for Atm + 
Jetty
+        assertTrue(books.isEmpty());
+    }
+
     @Test
     public void testBooksStreamIsReconnectedFromInboundSseEvents() throws 
InterruptedException {
         final WebTarget target = createWebTarget("/rest/api/bookstore/sse/0");

http://git-wip-us.apache.org/repos/asf/cxf/blob/85e9a478/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/BookStore.java
----------------------------------------------------------------------
diff --git 
a/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/BookStore.java
 
b/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/BookStore.java
index c096713..a0c3fd8 100644
--- 
a/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/BookStore.java
+++ 
b/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/BookStore.java
@@ -94,6 +94,13 @@ public class BookStore {
     }
 
     @GET
+    @Path("nodata")
+    @Produces(MediaType.SERVER_SENT_EVENTS)
+    public void nodata(@Context SseEventSink sink) {
+        sink.close();
+    }
+
+    @GET
     @Path("broadcast/sse")
     @Produces(MediaType.SERVER_SENT_EVENTS)
     public void broadcast(@Context SseEventSink sink) {

http://git-wip-us.apache.org/repos/asf/cxf/blob/85e9a478/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/BookStore2.java
----------------------------------------------------------------------
diff --git 
a/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/BookStore2.java
 
b/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/BookStore2.java
index a5eeb8e..f97906c 100644
--- 
a/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/BookStore2.java
+++ 
b/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/BookStore2.java
@@ -102,6 +102,13 @@ public class BookStore2 {
             latch.countDown();
         }
     }
+    
+    @GET
+    @Path("nodata")
+    @Produces(MediaType.SERVER_SENT_EVENTS)
+    public void nodata(@Context SseEventSink sink) {
+        sink.close();
+    }
 
     @POST
     @Path("broadcast/close")

Reply via email to