Repository: cxf Updated Branches: refs/heads/master e1a5b307c -> 625f9fbde
CXF-7085: Introduce support for Server Sent Events (Client). Added LAST_EVENT_ID support and test cases for that. Project: http://git-wip-us.apache.org/repos/asf/cxf/repo Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/625f9fbd Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/625f9fbd Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/625f9fbd Branch: refs/heads/master Commit: 625f9fbde63a8a0052f9b3bc2a412033e367a139 Parents: e1a5b30 Author: reta <drr...@gmail.com> Authored: Fri Jun 23 22:37:05 2017 -0400 Committer: reta <drr...@gmail.com> Committed: Fri Jun 23 22:37:05 2017 -0400 ---------------------------------------------------------------------- .../jaxrs/sse/client/SseEventSourceImpl.java | 10 +++++++ .../cxf/systest/jaxrs/sse/AbstractSseTest.java | 29 ++++++++++++++++++-- .../jaxrs/sse/jetty/JettyBroadcasterTest.java | 12 ++++++-- .../jaxrs/sse/jetty/JettyEmbeddedTest.java | 12 ++++++-- .../systest/jaxrs/sse/jetty/JettyWarTest.java | 12 ++++++-- 5 files changed, 63 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cxf/blob/625f9fbd/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/client/SseEventSourceImpl.java ---------------------------------------------------------------------- diff --git a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/client/SseEventSourceImpl.java b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/client/SseEventSourceImpl.java index e9e5c1d..a8385a9 100644 --- a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/client/SseEventSourceImpl.java +++ b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/client/SseEventSourceImpl.java @@ -26,7 +26,10 @@ import java.util.function.Consumer; import java.util.logging.Logger; import javax.ws.rs.client.WebTarget; +import javax.ws.rs.core.HttpHeaders; import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.MultivaluedHashMap; +import javax.ws.rs.core.MultivaluedMap; import javax.ws.rs.core.Response; import javax.ws.rs.sse.InboundSseEvent; import javax.ws.rs.sse.SseEventSource; @@ -122,8 +125,15 @@ public class SseEventSourceImpl implements SseEventSource { Response response = null; try { + final MultivaluedMap<String, Object> headers = new MultivaluedHashMap<>(); + final Object lastEventId = target.getConfiguration().getProperty(HttpHeaders.LAST_EVENT_ID_HEADER); + if (lastEventId != null) { + headers.putSingle(HttpHeaders.LAST_EVENT_ID_HEADER, lastEventId); + } + response = target .request(MediaType.SERVER_SENT_EVENTS) + .headers(headers) .get(); final Endpoint endpoint = WebClient.getConfig(target).getEndpoint(); http://git-wip-us.apache.org/repos/asf/cxf/blob/625f9fbd/systests/rs-sse/src/test/java/org/apache/cxf/systest/jaxrs/sse/AbstractSseTest.java ---------------------------------------------------------------------- diff --git a/systests/rs-sse/src/test/java/org/apache/cxf/systest/jaxrs/sse/AbstractSseTest.java b/systests/rs-sse/src/test/java/org/apache/cxf/systest/jaxrs/sse/AbstractSseTest.java index 8f41bd7..bf646b4 100644 --- a/systests/rs-sse/src/test/java/org/apache/cxf/systest/jaxrs/sse/AbstractSseTest.java +++ b/systests/rs-sse/src/test/java/org/apache/cxf/systest/jaxrs/sse/AbstractSseTest.java @@ -23,19 +23,42 @@ import java.util.Collection; import java.util.function.Consumer; import javax.ws.rs.client.WebTarget; +import javax.ws.rs.core.HttpHeaders; import javax.ws.rs.core.MediaType; import javax.ws.rs.sse.InboundSseEvent; import javax.ws.rs.sse.SseEventSource; -import com.fasterxml.jackson.core.JsonProcessingException; - import org.junit.Test; import static org.hamcrest.CoreMatchers.hasItems; public abstract class AbstractSseTest extends AbstractSseBaseTest { @Test - public void testBooksStreamIsReturnedFromInboundSseEvents() throws JsonProcessingException, InterruptedException { + public void testBooksStreamIsReturnedFromLastEventId() throws InterruptedException { + final WebTarget target = createWebTarget("/rest/api/bookstore/sse/0") + .property(HttpHeaders.LAST_EVENT_ID_HEADER, 150); + final Collection<Book> books = new ArrayList<>(); + + try (final SseEventSource eventSource = SseEventSource.target(target).build()) { + eventSource.register(collect(books), System.out::println); + eventSource.open(); + // Give the SSE stream some time to collect all events + awaitEvents(3000, books, 4); + } + + assertThat(books, + hasItems( + new Book("New Book #151", 151), + new Book("New Book #152", 152), + new Book("New Book #153", 153), + new Book("New Book #154", 154) + ) + ); + + } + + @Test + public void testBooksStreamIsReturnedFromInboundSseEvents() throws InterruptedException { final WebTarget target = createWebTarget("/rest/api/bookstore/sse/0"); final Collection<Book> books = new ArrayList<>(); http://git-wip-us.apache.org/repos/asf/cxf/blob/625f9fbd/systests/rs-sse/src/test/java/org/apache/cxf/systest/jaxrs/sse/jetty/JettyBroadcasterTest.java ---------------------------------------------------------------------- diff --git a/systests/rs-sse/src/test/java/org/apache/cxf/systest/jaxrs/sse/jetty/JettyBroadcasterTest.java b/systests/rs-sse/src/test/java/org/apache/cxf/systest/jaxrs/sse/jetty/JettyBroadcasterTest.java index 44c4a31..9631203 100644 --- a/systests/rs-sse/src/test/java/org/apache/cxf/systest/jaxrs/sse/jetty/JettyBroadcasterTest.java +++ b/systests/rs-sse/src/test/java/org/apache/cxf/systest/jaxrs/sse/jetty/JettyBroadcasterTest.java @@ -21,7 +21,8 @@ package org.apache.cxf.systest.jaxrs.sse.jetty; import org.apache.cxf.jaxrs.model.AbstractResourceInfo; import org.apache.cxf.systest.jaxrs.sse.AbstractBroadcasterSseTest; -import org.junit.BeforeClass; +import org.junit.After; +import org.junit.Before; import org.junit.Ignore; public class JettyBroadcasterTest extends AbstractBroadcasterSseTest { @@ -34,13 +35,18 @@ public class JettyBroadcasterTest extends AbstractBroadcasterSseTest { } } - @BeforeClass - public static void startServers() throws Exception { + @Before + public void startServers() throws Exception { AbstractResourceInfo.clearAllMaps(); //keep out of process due to stack traces testing failures assertTrue("server did not launch correctly", launchServer(EmbeddedJettyServer.class, true)); createStaticBus(); } + + @After + public void stopServers() throws Exception { + stopAllServers(); + } @Override protected int getPort() { http://git-wip-us.apache.org/repos/asf/cxf/blob/625f9fbd/systests/rs-sse/src/test/java/org/apache/cxf/systest/jaxrs/sse/jetty/JettyEmbeddedTest.java ---------------------------------------------------------------------- diff --git a/systests/rs-sse/src/test/java/org/apache/cxf/systest/jaxrs/sse/jetty/JettyEmbeddedTest.java b/systests/rs-sse/src/test/java/org/apache/cxf/systest/jaxrs/sse/jetty/JettyEmbeddedTest.java index cafdeec..488dd45 100644 --- a/systests/rs-sse/src/test/java/org/apache/cxf/systest/jaxrs/sse/jetty/JettyEmbeddedTest.java +++ b/systests/rs-sse/src/test/java/org/apache/cxf/systest/jaxrs/sse/jetty/JettyEmbeddedTest.java @@ -21,7 +21,8 @@ package org.apache.cxf.systest.jaxrs.sse.jetty; import org.apache.cxf.jaxrs.model.AbstractResourceInfo; import org.apache.cxf.systest.jaxrs.sse.AbstractSseTest; -import org.junit.BeforeClass; +import org.junit.After; +import org.junit.Before; import org.junit.Ignore; public class JettyEmbeddedTest extends AbstractSseTest { @@ -34,13 +35,18 @@ public class JettyEmbeddedTest extends AbstractSseTest { } } - @BeforeClass - public static void startServers() throws Exception { + @Before + public void startServers() throws Exception { AbstractResourceInfo.clearAllMaps(); //keep out of process due to stack traces testing failures assertTrue("server did not launch correctly", launchServer(EmbeddedJettyServer.class, true)); createStaticBus(); } + + @After + public void stopServers() throws Exception { + stopAllServers(); + } @Override protected int getPort() { http://git-wip-us.apache.org/repos/asf/cxf/blob/625f9fbd/systests/rs-sse/src/test/java/org/apache/cxf/systest/jaxrs/sse/jetty/JettyWarTest.java ---------------------------------------------------------------------- diff --git a/systests/rs-sse/src/test/java/org/apache/cxf/systest/jaxrs/sse/jetty/JettyWarTest.java b/systests/rs-sse/src/test/java/org/apache/cxf/systest/jaxrs/sse/jetty/JettyWarTest.java index 10f435b..88142ae 100644 --- a/systests/rs-sse/src/test/java/org/apache/cxf/systest/jaxrs/sse/jetty/JettyWarTest.java +++ b/systests/rs-sse/src/test/java/org/apache/cxf/systest/jaxrs/sse/jetty/JettyWarTest.java @@ -21,7 +21,8 @@ package org.apache.cxf.systest.jaxrs.sse.jetty; import org.apache.cxf.jaxrs.model.AbstractResourceInfo; import org.apache.cxf.systest.jaxrs.sse.AbstractSseTest; -import org.junit.BeforeClass; +import org.junit.After; +import org.junit.Before; import org.junit.Ignore; public class JettyWarTest extends AbstractSseTest { @@ -34,13 +35,18 @@ public class JettyWarTest extends AbstractSseTest { } } - @BeforeClass - public static void startServers() throws Exception { + @Before + public void startServers() throws Exception { AbstractResourceInfo.clearAllMaps(); assertTrue("server did not launch correctly", launchServer(EmbeddedJettyServer.class, true)); createStaticBus(); } + @After + public void stopServers() throws Exception { + stopAllServers(); + } + @Override protected int getPort() { return EmbeddedJettyServer.PORT;