Repository: cxf
Updated Branches:
  refs/heads/master e1a5b307c -> 625f9fbde


CXF-7085: Introduce support for Server Sent Events (Client). Added 
LAST_EVENT_ID support and test cases for that.


Project: http://git-wip-us.apache.org/repos/asf/cxf/repo
Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/625f9fbd
Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/625f9fbd
Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/625f9fbd

Branch: refs/heads/master
Commit: 625f9fbde63a8a0052f9b3bc2a412033e367a139
Parents: e1a5b30
Author: reta <drr...@gmail.com>
Authored: Fri Jun 23 22:37:05 2017 -0400
Committer: reta <drr...@gmail.com>
Committed: Fri Jun 23 22:37:05 2017 -0400

----------------------------------------------------------------------
 .../jaxrs/sse/client/SseEventSourceImpl.java    | 10 +++++++
 .../cxf/systest/jaxrs/sse/AbstractSseTest.java  | 29 ++++++++++++++++++--
 .../jaxrs/sse/jetty/JettyBroadcasterTest.java   | 12 ++++++--
 .../jaxrs/sse/jetty/JettyEmbeddedTest.java      | 12 ++++++--
 .../systest/jaxrs/sse/jetty/JettyWarTest.java   | 12 ++++++--
 5 files changed, 63 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cxf/blob/625f9fbd/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/client/SseEventSourceImpl.java
----------------------------------------------------------------------
diff --git 
a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/client/SseEventSourceImpl.java
 
b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/client/SseEventSourceImpl.java
index e9e5c1d..a8385a9 100644
--- 
a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/client/SseEventSourceImpl.java
+++ 
b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/client/SseEventSourceImpl.java
@@ -26,7 +26,10 @@ import java.util.function.Consumer;
 import java.util.logging.Logger;
 
 import javax.ws.rs.client.WebTarget;
+import javax.ws.rs.core.HttpHeaders;
 import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.MultivaluedHashMap;
+import javax.ws.rs.core.MultivaluedMap;
 import javax.ws.rs.core.Response;
 import javax.ws.rs.sse.InboundSseEvent;
 import javax.ws.rs.sse.SseEventSource;
@@ -122,8 +125,15 @@ public class SseEventSourceImpl implements SseEventSource {
 
         Response response = null; 
         try {
+            final MultivaluedMap<String, Object> headers = new 
MultivaluedHashMap<>();
+            final Object lastEventId = 
target.getConfiguration().getProperty(HttpHeaders.LAST_EVENT_ID_HEADER);
+            if (lastEventId != null) {
+                headers.putSingle(HttpHeaders.LAST_EVENT_ID_HEADER, 
lastEventId);
+            }
+            
             response = target
                 .request(MediaType.SERVER_SENT_EVENTS)
+                .headers(headers)
                 .get();
 
             final Endpoint endpoint = 
WebClient.getConfig(target).getEndpoint();

http://git-wip-us.apache.org/repos/asf/cxf/blob/625f9fbd/systests/rs-sse/src/test/java/org/apache/cxf/systest/jaxrs/sse/AbstractSseTest.java
----------------------------------------------------------------------
diff --git 
a/systests/rs-sse/src/test/java/org/apache/cxf/systest/jaxrs/sse/AbstractSseTest.java
 
b/systests/rs-sse/src/test/java/org/apache/cxf/systest/jaxrs/sse/AbstractSseTest.java
index 8f41bd7..bf646b4 100644
--- 
a/systests/rs-sse/src/test/java/org/apache/cxf/systest/jaxrs/sse/AbstractSseTest.java
+++ 
b/systests/rs-sse/src/test/java/org/apache/cxf/systest/jaxrs/sse/AbstractSseTest.java
@@ -23,19 +23,42 @@ import java.util.Collection;
 import java.util.function.Consumer;
 
 import javax.ws.rs.client.WebTarget;
+import javax.ws.rs.core.HttpHeaders;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.sse.InboundSseEvent;
 import javax.ws.rs.sse.SseEventSource;
 
-import com.fasterxml.jackson.core.JsonProcessingException;
-
 import org.junit.Test;
 
 import static org.hamcrest.CoreMatchers.hasItems;
 
 public abstract class AbstractSseTest extends AbstractSseBaseTest {
     @Test
-    public void testBooksStreamIsReturnedFromInboundSseEvents() throws 
JsonProcessingException, InterruptedException {
+    public void testBooksStreamIsReturnedFromLastEventId() throws 
InterruptedException {
+        final WebTarget target = createWebTarget("/rest/api/bookstore/sse/0")
+            .property(HttpHeaders.LAST_EVENT_ID_HEADER, 150);
+        final Collection<Book> books = new ArrayList<>();
+        
+        try (final SseEventSource eventSource = 
SseEventSource.target(target).build()) {
+            eventSource.register(collect(books), System.out::println);
+            eventSource.open();
+            // Give the SSE stream some time to collect all events
+            awaitEvents(3000, books, 4);
+        }
+
+        assertThat(books, 
+            hasItems(
+                new Book("New Book #151", 151), 
+                new Book("New Book #152", 152), 
+                new Book("New Book #153", 153), 
+                new Book("New Book #154", 154)
+            )
+        );
+
+    }
+
+    @Test
+    public void testBooksStreamIsReturnedFromInboundSseEvents() throws 
InterruptedException {
         final WebTarget target = createWebTarget("/rest/api/bookstore/sse/0");
         final Collection<Book> books = new ArrayList<>();
         

http://git-wip-us.apache.org/repos/asf/cxf/blob/625f9fbd/systests/rs-sse/src/test/java/org/apache/cxf/systest/jaxrs/sse/jetty/JettyBroadcasterTest.java
----------------------------------------------------------------------
diff --git 
a/systests/rs-sse/src/test/java/org/apache/cxf/systest/jaxrs/sse/jetty/JettyBroadcasterTest.java
 
b/systests/rs-sse/src/test/java/org/apache/cxf/systest/jaxrs/sse/jetty/JettyBroadcasterTest.java
index 44c4a31..9631203 100644
--- 
a/systests/rs-sse/src/test/java/org/apache/cxf/systest/jaxrs/sse/jetty/JettyBroadcasterTest.java
+++ 
b/systests/rs-sse/src/test/java/org/apache/cxf/systest/jaxrs/sse/jetty/JettyBroadcasterTest.java
@@ -21,7 +21,8 @@ package org.apache.cxf.systest.jaxrs.sse.jetty;
 
 import org.apache.cxf.jaxrs.model.AbstractResourceInfo;
 import org.apache.cxf.systest.jaxrs.sse.AbstractBroadcasterSseTest;
-import org.junit.BeforeClass;
+import org.junit.After;
+import org.junit.Before;
 import org.junit.Ignore;
 
 public class JettyBroadcasterTest extends AbstractBroadcasterSseTest {
@@ -34,13 +35,18 @@ public class JettyBroadcasterTest extends 
AbstractBroadcasterSseTest {
         }
     }
 
-    @BeforeClass
-    public static void startServers() throws Exception {
+    @Before
+    public void startServers() throws Exception {
         AbstractResourceInfo.clearAllMaps();
         //keep out of process due to stack traces testing failures
         assertTrue("server did not launch correctly", 
launchServer(EmbeddedJettyServer.class, true));
         createStaticBus();
     }
+    
+    @After
+    public void stopServers() throws Exception {
+        stopAllServers();
+    }
 
     @Override
     protected int getPort() {

http://git-wip-us.apache.org/repos/asf/cxf/blob/625f9fbd/systests/rs-sse/src/test/java/org/apache/cxf/systest/jaxrs/sse/jetty/JettyEmbeddedTest.java
----------------------------------------------------------------------
diff --git 
a/systests/rs-sse/src/test/java/org/apache/cxf/systest/jaxrs/sse/jetty/JettyEmbeddedTest.java
 
b/systests/rs-sse/src/test/java/org/apache/cxf/systest/jaxrs/sse/jetty/JettyEmbeddedTest.java
index cafdeec..488dd45 100644
--- 
a/systests/rs-sse/src/test/java/org/apache/cxf/systest/jaxrs/sse/jetty/JettyEmbeddedTest.java
+++ 
b/systests/rs-sse/src/test/java/org/apache/cxf/systest/jaxrs/sse/jetty/JettyEmbeddedTest.java
@@ -21,7 +21,8 @@ package org.apache.cxf.systest.jaxrs.sse.jetty;
 
 import org.apache.cxf.jaxrs.model.AbstractResourceInfo;
 import org.apache.cxf.systest.jaxrs.sse.AbstractSseTest;
-import org.junit.BeforeClass;
+import org.junit.After;
+import org.junit.Before;
 import org.junit.Ignore;
 
 public class JettyEmbeddedTest extends AbstractSseTest {
@@ -34,13 +35,18 @@ public class JettyEmbeddedTest extends AbstractSseTest {
         }
     }
 
-    @BeforeClass
-    public static void startServers() throws Exception {
+    @Before
+    public void startServers() throws Exception {
         AbstractResourceInfo.clearAllMaps();
         //keep out of process due to stack traces testing failures
         assertTrue("server did not launch correctly", 
launchServer(EmbeddedJettyServer.class, true));
         createStaticBus();
     }
+    
+    @After
+    public void stopServers() throws Exception {
+        stopAllServers();
+    }
 
     @Override
     protected int getPort() {

http://git-wip-us.apache.org/repos/asf/cxf/blob/625f9fbd/systests/rs-sse/src/test/java/org/apache/cxf/systest/jaxrs/sse/jetty/JettyWarTest.java
----------------------------------------------------------------------
diff --git 
a/systests/rs-sse/src/test/java/org/apache/cxf/systest/jaxrs/sse/jetty/JettyWarTest.java
 
b/systests/rs-sse/src/test/java/org/apache/cxf/systest/jaxrs/sse/jetty/JettyWarTest.java
index 10f435b..88142ae 100644
--- 
a/systests/rs-sse/src/test/java/org/apache/cxf/systest/jaxrs/sse/jetty/JettyWarTest.java
+++ 
b/systests/rs-sse/src/test/java/org/apache/cxf/systest/jaxrs/sse/jetty/JettyWarTest.java
@@ -21,7 +21,8 @@ package org.apache.cxf.systest.jaxrs.sse.jetty;
 
 import org.apache.cxf.jaxrs.model.AbstractResourceInfo;
 import org.apache.cxf.systest.jaxrs.sse.AbstractSseTest;
-import org.junit.BeforeClass;
+import org.junit.After;
+import org.junit.Before;
 import org.junit.Ignore;
 
 public class JettyWarTest extends AbstractSseTest {
@@ -34,13 +35,18 @@ public class JettyWarTest extends AbstractSseTest {
         }
     }
 
-    @BeforeClass
-    public static void startServers() throws Exception {
+    @Before
+    public void startServers() throws Exception {
         AbstractResourceInfo.clearAllMaps();
         assertTrue("server did not launch correctly", 
launchServer(EmbeddedJettyServer.class, true));
         createStaticBus();
     }
 
+    @After
+    public void stopServers() throws Exception {
+        stopAllServers();
+    }
+
     @Override
     protected int getPort() {
         return EmbeddedJettyServer.PORT;

Reply via email to