http://git-wip-us.apache.org/repos/asf/cxf/blob/bd8aff66/distribution/src/main/release/samples/pom.xml ---------------------------------------------------------------------- diff --git a/distribution/src/main/release/samples/pom.xml b/distribution/src/main/release/samples/pom.xml index ca874f3..156413c 100644 --- a/distribution/src/main/release/samples/pom.xml +++ b/distribution/src/main/release/samples/pom.xml @@ -115,6 +115,9 @@ <module>jax_rs/tracing_htrace</module> <module>clustering/failover_jaxws_osgi</module> <module>clustering/failover_server</module> + <module>jax_rs/sse_cdi</module> + <module>jax_rs/sse_tomcat</module> + <module>jax_rs/sse_spring</module> <!-- These are removed from the build as they currently don't inherit the parent from
http://git-wip-us.apache.org/repos/asf/cxf/blob/bd8aff66/integration/cdi/src/main/java/org/apache/cxf/cdi/JAXRSCdiResourceExtension.java ---------------------------------------------------------------------- diff --git a/integration/cdi/src/main/java/org/apache/cxf/cdi/JAXRSCdiResourceExtension.java b/integration/cdi/src/main/java/org/apache/cxf/cdi/JAXRSCdiResourceExtension.java index e674f45..fa81160 100644 --- a/integration/cdi/src/main/java/org/apache/cxf/cdi/JAXRSCdiResourceExtension.java +++ b/integration/cdi/src/main/java/org/apache/cxf/cdi/JAXRSCdiResourceExtension.java @@ -45,6 +45,7 @@ import javax.ws.rs.ext.Provider; import org.apache.cxf.Bus; import org.apache.cxf.bus.extension.ExtensionManagerBus; +import org.apache.cxf.cdi.extension.JAXRSServerFactoryCustomizationExtension; import org.apache.cxf.feature.Feature; import org.apache.cxf.helpers.CastUtils; import org.apache.cxf.jaxrs.JAXRSServerFactoryBean; @@ -99,11 +100,13 @@ public class JAXRSCdiResourceExtension implements Extension { loadServices(beanManager, Collections.<Class<?>>emptySet()), loadProviders(beanManager, Collections.<Class<?>>emptySet()), loadFeatures(beanManager, Collections.<Class<?>>emptySet())); + customize(beanManager, factory); factory.init(); } else { // If there is an application with any singletons or classes defined, we will // create a server factory bean with only application singletons and classes. final JAXRSServerFactoryBean factory = createFactoryInstance(instance, beanManager); + customize(beanManager, factory); factory.init(); } } @@ -273,4 +276,24 @@ public class JAXRSCdiResourceExtension implements Extension { return services; } + + /** + * Look and apply the available JAXRSServerFactoryBean extensions to customize its + * creation (f.e. add features, providers, assign transport, ...) + * @param beanManager bean manager + * @param bean JAX-RS server factory bean about to be created + */ + private void customize(final BeanManager beanManager, final JAXRSServerFactoryBean bean) { + final Collection<Bean<?>> extensionBeans = beanManager.getBeans(JAXRSServerFactoryCustomizationExtension.class); + + for (final Bean<?> extensionBean: extensionBeans) { + final JAXRSServerFactoryCustomizationExtension extension = + (JAXRSServerFactoryCustomizationExtension)beanManager.getReference( + extensionBean, + extensionBean.getBeanClass(), + beanManager.createCreationalContext(extensionBean) + ); + extension.customize(bean); + } + } } http://git-wip-us.apache.org/repos/asf/cxf/blob/bd8aff66/integration/cdi/src/main/java/org/apache/cxf/cdi/extension/JAXRSServerFactoryCustomizationExtension.java ---------------------------------------------------------------------- diff --git a/integration/cdi/src/main/java/org/apache/cxf/cdi/extension/JAXRSServerFactoryCustomizationExtension.java b/integration/cdi/src/main/java/org/apache/cxf/cdi/extension/JAXRSServerFactoryCustomizationExtension.java new file mode 100644 index 0000000..3e311b9 --- /dev/null +++ b/integration/cdi/src/main/java/org/apache/cxf/cdi/extension/JAXRSServerFactoryCustomizationExtension.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.cxf.cdi.extension; + +import org.apache.cxf.jaxrs.JAXRSServerFactoryBean; + +/** + * Serves as an extension point in order to allow to customize JAX-RS server + * factory bean creation (f.e. add features, providers, assign transport, ...) + * during the CDI beans discovery and initialization. + */ +public interface JAXRSServerFactoryCustomizationExtension { + /** + * Customize JAX-RS server factory bean before it is being initialized + * @param bean JAX-RS server factory bean + */ + void customize(final JAXRSServerFactoryBean bean); +} http://git-wip-us.apache.org/repos/asf/cxf/blob/bd8aff66/parent/pom.xml ---------------------------------------------------------------------- diff --git a/parent/pom.xml b/parent/pom.xml index afd3a59..c19e9f5 100644 --- a/parent/pom.xml +++ b/parent/pom.xml @@ -110,7 +110,7 @@ <cxf.geronimo.transaction.version>1.1.1</cxf.geronimo.transaction.version> <cxf.jasypt.bundle.version>1.9.0_1</cxf.jasypt.bundle.version> <cxf.javassist.version>3.19.0-GA</cxf.javassist.version> - <cxf.javax.ws.rs.version>2.0.1</cxf.javax.ws.rs.version> + <cxf.javax.ws.rs.version>2.1-m01</cxf.javax.ws.rs.version> <cxf.jaxb.version>2.2.11</cxf.jaxb.version> <cxf.jaxb.impl.version>${cxf.jaxb.version}</cxf.jaxb.impl.version> <cxf.jaxb.core.version>${cxf.jaxb.version}</cxf.jaxb.core.version> @@ -2185,4 +2185,19 @@ </build> </profile> </profiles> + + <!-- Temporarily only till JAX-RS 2.1 artifacts become available --> + <repositories> + <repository> + <id>maven.java.net</id> + <name>java.net snapshots</name> + <url>https://maven.java.net/content/repositories/snapshots/</url> + <releases> + <enabled>false</enabled> + </releases> + <snapshots> + <enabled>true</enabled> + </snapshots> + </repository> + </repositories> </project> http://git-wip-us.apache.org/repos/asf/cxf/blob/bd8aff66/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/JAXRSServerFactoryBean.java ---------------------------------------------------------------------- diff --git a/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/JAXRSServerFactoryBean.java b/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/JAXRSServerFactoryBean.java index d4e53ac..0f481a3 100644 --- a/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/JAXRSServerFactoryBean.java +++ b/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/JAXRSServerFactoryBean.java @@ -193,6 +193,7 @@ public class JAXRSServerFactoryBean extends AbstractJAXRSFactoryBean { checkPrivateEndpoint(ep); factory.applyDynamicFeatures(getServiceFactory().getClassResourceInfo()); + applyBusFeatures(getBus()); applyFeatures(); getServiceFactory().sendEvent(FactoryBeanListener.Event.SERVER_CREATED, @@ -246,6 +247,14 @@ public class JAXRSServerFactoryBean extends AbstractJAXRSFactoryBean { } + protected void applyBusFeatures(final Bus bus) { + if (bus.getFeatures() != null) { + for (Feature feature : bus.getFeatures()) { + feature.initialize(server, bus); + } + } + } + protected void applyFeatures() { if (getFeatures() != null) { for (Feature feature : getFeatures()) { http://git-wip-us.apache.org/repos/asf/cxf/blob/bd8aff66/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/impl/RequestImpl.java ---------------------------------------------------------------------- diff --git a/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/impl/RequestImpl.java b/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/impl/RequestImpl.java index 0914854..e57f3e0 100644 --- a/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/impl/RequestImpl.java +++ b/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/impl/RequestImpl.java @@ -32,6 +32,9 @@ import javax.ws.rs.HttpMethod; import javax.ws.rs.core.EntityTag; import javax.ws.rs.core.HttpHeaders; import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.NioCompletionHandler; +import javax.ws.rs.core.NioErrorHandler; +import javax.ws.rs.core.NioReaderHandler; import javax.ws.rs.core.Request; import javax.ws.rs.core.Response; import javax.ws.rs.core.Response.ResponseBuilder; @@ -382,4 +385,24 @@ public class RequestImpl implements Request { return 0; } } + + @Override + public void entity(NioReaderHandler arg0) { + // TODO: Implementation required (JAX-RS 2.1) + } + + @Override + public void entity(NioReaderHandler arg0, NioCompletionHandler arg1) { + // TODO: Implementation required (JAX-RS 2.1) + } + + @Override + public void entity(NioReaderHandler arg0, NioErrorHandler arg1) { + // TODO: Implementation required (JAX-RS 2.1) + } + + @Override + public void entity(NioReaderHandler arg0, NioCompletionHandler arg1, NioErrorHandler arg2) { + // TODO: Implementation required (JAX-RS 2.1) + } } http://git-wip-us.apache.org/repos/asf/cxf/blob/bd8aff66/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/impl/ResponseBuilderImpl.java ---------------------------------------------------------------------- diff --git a/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/impl/ResponseBuilderImpl.java b/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/impl/ResponseBuilderImpl.java index cee5d36..29c5c42 100644 --- a/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/impl/ResponseBuilderImpl.java +++ b/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/impl/ResponseBuilderImpl.java @@ -34,6 +34,8 @@ import javax.ws.rs.core.Link; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.MultivaluedMap; import javax.ws.rs.core.NewCookie; +import javax.ws.rs.core.NioErrorHandler; +import javax.ws.rs.core.NioWriterHandler; import javax.ws.rs.core.Response; import javax.ws.rs.core.Response.ResponseBuilder; import javax.ws.rs.core.UriInfo; @@ -314,4 +316,16 @@ public class ResponseBuilderImpl extends ResponseBuilder implements Cloneable { } return variants(Arrays.asList(variants)); } + + @Override + public ResponseBuilder entity(NioWriterHandler arg0) { + // TODO: Not Implemented + return this; + } + + @Override + public ResponseBuilder entity(NioWriterHandler arg0, NioErrorHandler arg1) { + // TODO: Not Implemented + return this; + } } http://git-wip-us.apache.org/repos/asf/cxf/blob/bd8aff66/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/impl/tl/ThreadLocalRequest.java ---------------------------------------------------------------------- diff --git a/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/impl/tl/ThreadLocalRequest.java b/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/impl/tl/ThreadLocalRequest.java index 8bcdbb6..44a26f9 100644 --- a/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/impl/tl/ThreadLocalRequest.java +++ b/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/impl/tl/ThreadLocalRequest.java @@ -23,6 +23,9 @@ import java.util.Date; import java.util.List; import javax.ws.rs.core.EntityTag; +import javax.ws.rs.core.NioCompletionHandler; +import javax.ws.rs.core.NioErrorHandler; +import javax.ws.rs.core.NioReaderHandler; import javax.ws.rs.core.Request; import javax.ws.rs.core.Response.ResponseBuilder; import javax.ws.rs.core.Variant; @@ -54,4 +57,24 @@ public class ThreadLocalRequest extends AbstractThreadLocalProxy<Request> return get().evaluatePreconditions(); } + @Override + public void entity(NioReaderHandler reader) { + get().entity(reader); + } + + @Override + public void entity(NioReaderHandler reader, NioCompletionHandler completion) { + get().entity(reader, completion); + } + + @Override + public void entity(NioReaderHandler reader, NioErrorHandler error) { + get().entity(reader, error); + } + + @Override + public void entity(NioReaderHandler reader, NioCompletionHandler completion, NioErrorHandler error) { + get().entity(reader, completion, error); + } + } http://git-wip-us.apache.org/repos/asf/cxf/blob/bd8aff66/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/spec/InvocationBuilderImpl.java ---------------------------------------------------------------------- diff --git a/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/spec/InvocationBuilderImpl.java b/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/spec/InvocationBuilderImpl.java index c5d4318..10f5d9b 100644 --- a/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/spec/InvocationBuilderImpl.java +++ b/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/spec/InvocationBuilderImpl.java @@ -22,14 +22,18 @@ import java.util.HashMap; import java.util.List; import java.util.Locale; import java.util.Map; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import javax.ws.rs.HttpMethod; import javax.ws.rs.client.AsyncInvoker; +import javax.ws.rs.client.CompletionStageRxInvoker; import javax.ws.rs.client.Entity; import javax.ws.rs.client.Invocation; import javax.ws.rs.client.Invocation.Builder; import javax.ws.rs.client.InvocationCallback; +import javax.ws.rs.client.NioInvoker; +import javax.ws.rs.client.RxInvoker; import javax.ws.rs.client.SyncInvoker; import javax.ws.rs.core.CacheControl; import javax.ws.rs.core.Cookie; @@ -369,6 +373,35 @@ public class InvocationBuilderImpl implements Invocation.Builder { public <T> Future<T> submit(InvocationCallback<T> callback) { return invBuilder.async().method(httpMethod, entity, callback); } + } + + @Override + public CompletionStageRxInvoker rx() { + // TODO: Implementation required (JAX-RS 2.1) + return null; + } + + @Override + public CompletionStageRxInvoker rx(ExecutorService executorService) { + // TODO: Implementation required (JAX-RS 2.1) + return null; + } + @Override + public <T extends RxInvoker> T rx(Class<T> clazz) { + // TODO: Implementation required (JAX-RS 2.1) + return null; + } + + @Override + public <T extends RxInvoker> T rx(Class<T> clazz, ExecutorService executorService) { + // TODO: Implementation required (JAX-RS 2.1) + return null; + } + + @Override + public NioInvoker nio() { + // TODO: Implementation required (JAX-RS 2.1) + return null; } } http://git-wip-us.apache.org/repos/asf/cxf/blob/bd8aff66/rt/rs/pom.xml ---------------------------------------------------------------------- diff --git a/rt/rs/pom.xml b/rt/rs/pom.xml index 0348e4b..cae173e 100644 --- a/rt/rs/pom.xml +++ b/rt/rs/pom.xml @@ -39,5 +39,6 @@ <module>extensions/search</module> <module>extensions/rx</module> <module>security</module> + <module>sse</module> </modules> </project> http://git-wip-us.apache.org/repos/asf/cxf/blob/bd8aff66/rt/rs/sse/pom.xml ---------------------------------------------------------------------- diff --git a/rt/rs/sse/pom.xml b/rt/rs/sse/pom.xml new file mode 100644 index 0000000..d4c9f17 --- /dev/null +++ b/rt/rs/sse/pom.xml @@ -0,0 +1,76 @@ +<?xml version="1.0"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + <modelVersion>4.0.0</modelVersion> + <artifactId>cxf-rt-rs-sse</artifactId> + <packaging>bundle</packaging> + <name>Apache CXF JAX-RS Server-Side Events Support</name> + <description>Apache CXF JAX-RS Server-Side Events Support</description> + <url>http://cxf.apache.org</url> + <parent> + <groupId>org.apache.cxf</groupId> + <artifactId>cxf-parent</artifactId> + <version>3.2.0-SNAPSHOT</version> + <relativePath>../../../parent/pom.xml</relativePath> + </parent> + <properties> + <cxf.osgi.import> + javax.servlet*;version="${cxf.osgi.javax.servlet.version}", + </cxf.osgi.import> + </properties> + <dependencies> + <dependency> + <groupId>org.apache.cxf</groupId> + <artifactId>cxf-rt-frontend-jaxrs</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.cxf</groupId> + <artifactId>cxf-integration-cdi</artifactId> + <version>${project.version}</version> + <optional>true</optional> + </dependency> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>${cxf.servlet-api.group}</groupId> + <artifactId>${cxf.servlet-api.artifact}</artifactId> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.atmosphere</groupId> + <artifactId>atmosphere-runtime</artifactId> + <version>${cxf.atmosphere.version}</version> + </dependency> + </dependencies> + <build> + <plugins> + <plugin> + <artifactId>maven-checkstyle-plugin</artifactId> + <configuration> + <skip>true</skip> + </configuration> + </plugin> + </plugins> + </build> +</project> http://git-wip-us.apache.org/repos/asf/cxf/blob/bd8aff66/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/OutboundSseEventBodyWriter.java ---------------------------------------------------------------------- diff --git a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/OutboundSseEventBodyWriter.java b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/OutboundSseEventBodyWriter.java new file mode 100644 index 0000000..4a9b3aa --- /dev/null +++ b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/OutboundSseEventBodyWriter.java @@ -0,0 +1,139 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.cxf.jaxrs.sse; + +import java.io.IOException; +import java.io.OutputStream; +import java.lang.annotation.Annotation; +import java.lang.reflect.Type; +import java.nio.charset.StandardCharsets; + +import javax.ws.rs.InternalServerErrorException; +import javax.ws.rs.WebApplicationException; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.MultivaluedMap; +import javax.ws.rs.ext.MessageBodyWriter; +import javax.ws.rs.ext.Provider; +import javax.ws.rs.sse.OutboundSseEvent; + +import org.apache.cxf.jaxrs.provider.ServerProviderFactory; +import org.apache.cxf.message.Exchange; +import org.apache.cxf.message.Message; +import org.apache.cxf.message.MessageImpl; + +@Provider +public class OutboundSseEventBodyWriter implements MessageBodyWriter<OutboundSseEvent> { + public static final String SERVER_SENT_EVENTS = "text/event-stream"; + public static final MediaType SERVER_SENT_EVENTS_TYPE = MediaType.valueOf(SERVER_SENT_EVENTS); + + private static final byte[] COMMENT = ": ".getBytes(StandardCharsets.UTF_8); + private static final byte[] EVENT = " ".getBytes(StandardCharsets.UTF_8); + private static final byte[] ID = "id: ".getBytes(StandardCharsets.UTF_8); + private static final byte[] RETRY = "retry: ".getBytes(StandardCharsets.UTF_8); + private static final byte[] DATA = "data: ".getBytes(StandardCharsets.UTF_8); + private static final byte[] NEW_LINE = "\n".getBytes(StandardCharsets.UTF_8); + + private ServerProviderFactory factory; + private Message message; + + protected OutboundSseEventBodyWriter() { + } + + public OutboundSseEventBodyWriter(final ServerProviderFactory factory, final Exchange exchange) { + this.factory = factory; + this.message = new MessageImpl(); + this.message.setExchange(exchange); + } + + + @Override + public boolean isWriteable(Class<?> cls, Type type, Annotation[] anns, MediaType mt) { + return OutboundSseEvent.class.isAssignableFrom(cls) || SERVER_SENT_EVENTS_TYPE.isCompatible(mt); + } + + @Override + public void writeTo(OutboundSseEvent p, Class<?> cls, Type t, Annotation[] anns, + MediaType mt, MultivaluedMap<String, Object> headers, OutputStream os) + throws IOException, WebApplicationException { + + if (p.getName() != null) { + os.write(EVENT); + os.write(p.getName().getBytes(StandardCharsets.UTF_8)); + os.write(NEW_LINE); + } + + if (p.getId() != null) { + os.write(ID); + os.write(p.getId().getBytes(StandardCharsets.UTF_8)); + os.write(NEW_LINE); + } + + if (p.getComment() != null) { + os.write(COMMENT); + os.write(p.getComment().getBytes(StandardCharsets.UTF_8)); + os.write(NEW_LINE); + } + + if (p.getReconnectDelay() > 0) { + os.write(RETRY); + os.write(Long.toString(p.getReconnectDelay()).getBytes(StandardCharsets.UTF_8)); + os.write(NEW_LINE); + } + + if (p.getData() != null) { + Class<?> payloadClass = p.getType(); + Type payloadType = p.getGenericType(); + if (payloadType == null) { + payloadType = payloadClass; + } + + if (payloadType == null && payloadClass == null) { + payloadType = Object.class; + payloadClass = Object.class; + } + + os.write(DATA); + writePayloadTo(payloadClass, payloadType, anns, p.getMediaType(), headers, p.getData(), os); + os.write(NEW_LINE); + } + } + + @SuppressWarnings("unchecked") + private<T> void writePayloadTo(Class<T> cls, Type type, Annotation[] anns, MediaType mt, + MultivaluedMap<String, Object> headers, Object data, OutputStream os) + throws IOException, WebApplicationException { + + MessageBodyWriter<T> writer = null; + if (message != null && factory != null) { + writer = factory.createMessageBodyWriter(cls, type, anns, mt, message); + } + + if (writer == null) { + throw new InternalServerErrorException("No suitable message body writer for class: " + cls.getName()); + } + + writer.writeTo((T)data, cls, type, anns, mt, headers, os); + } + + @Override + public long getSize(OutboundSseEvent t, Class<?> type, Type genericType, Annotation[] annotations, + MediaType mediaType) { + return -1; + } +} http://git-wip-us.apache.org/repos/asf/cxf/blob/bd8aff66/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/OutboundSseEventImpl.java ---------------------------------------------------------------------- diff --git a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/OutboundSseEventImpl.java b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/OutboundSseEventImpl.java new file mode 100644 index 0000000..f852637 --- /dev/null +++ b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/OutboundSseEventImpl.java @@ -0,0 +1,171 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.cxf.jaxrs.sse; + +import java.lang.reflect.Type; + +import javax.ws.rs.core.GenericType; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.sse.OutboundSseEvent; + +public class OutboundSseEventImpl implements OutboundSseEvent { + private String id; + private String name; + private String comment; + private long reconnectDelay = -1; + private Class<?> type; + private Type genericType; + private MediaType mediaType; + private Object data; + + public static class BuilderImpl implements Builder { + private String id; + private String name; + private String comment; + private long reconnectDelay = -1; + private Class<?> type; + private Type genericType; + private MediaType mediaType; + private Object data; + + @Override + public Builder id(String id) { + this.id = id; + return this; + } + + @Override + public Builder name(String name) { + this.name = name; + return this; + } + + @Override + public Builder reconnectDelay(long milliseconds) { + this.reconnectDelay = milliseconds; + return this; + } + + @Override + public Builder mediaType(MediaType mediaType) { + this.mediaType = mediaType; + return this; + } + + @Override + public Builder comment(String comment) { + this.comment = comment; + return this; + } + + @Override + @SuppressWarnings("rawtypes") + public Builder data(Class type, Object data) { + this.type = type; + this.data= data; + return this; + } + + @Override + @SuppressWarnings("rawtypes") + public Builder data(GenericType type, Object data) { + this.genericType = type.getType(); + this.data= data; + return this; + } + + @Override + public Builder data(Object data) { + this.data = data; + return this; + } + + @Override + public OutboundSseEvent build() { + return new OutboundSseEventImpl( + id, + name, + comment, + reconnectDelay, + type, + genericType, + mediaType, + data + ); + } + + } + + OutboundSseEventImpl(String id, String name, String comment, long reconnectDelay, + Class<?> type, Type genericType, MediaType mediaType, Object data) { + this.id = id; + this.name = name; + this.comment = comment; + this.reconnectDelay = reconnectDelay; + this.type = type; + this.genericType = genericType; + this.mediaType = mediaType; + this.data = data; + } + + @Override + public String getId() { + return id; + } + + @Override + public String getName() { + return name; + } + + @Override + public String getComment() { + return comment; + } + + @Override + public long getReconnectDelay() { + return reconnectDelay; + } + + @Override + public boolean isReconnectDelaySet() { + return reconnectDelay != -1; + } + + @Override + public Class<?> getType() { + return type; + } + + @Override + public Type getGenericType() { + return genericType; + } + + @Override + public MediaType getMediaType() { + return mediaType; + } + + @Override + public Object getData() { + return data; + } +} http://git-wip-us.apache.org/repos/asf/cxf/blob/bd8aff66/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseBroadcasterImpl.java ---------------------------------------------------------------------- diff --git a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseBroadcasterImpl.java b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseBroadcasterImpl.java new file mode 100644 index 0000000..977a6b2 --- /dev/null +++ b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseBroadcasterImpl.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.cxf.jaxrs.sse; + +import java.io.IOException; +import java.util.Set; +import java.util.concurrent.CopyOnWriteArraySet; + +import javax.ws.rs.sse.OutboundSseEvent; +import javax.ws.rs.sse.SseBroadcaster; +import javax.ws.rs.sse.SseEventOutput; + +public class SseBroadcasterImpl implements SseBroadcaster { + private final Set<SseEventOutput> outputs = new CopyOnWriteArraySet<>(); + private final Set<Listener> listeners = new CopyOnWriteArraySet<>(); + + @Override + public boolean register(Listener listener) { + return listeners.add(listener); + } + + @Override + public boolean register(SseEventOutput output) { + return outputs.add(output); + } + + @Override + public void broadcast(OutboundSseEvent event) { + for (final SseEventOutput output: outputs) { + try { + output.write(event); + } catch (final IOException ex) { + listeners.forEach(listener -> listener.onException(output, ex)); + } + } + } + + @Override + public void close() { + for (final SseEventOutput output: outputs) { + try { + output.close(); + listeners.forEach(listener -> listener.onClose(output)); + } catch (final IOException ex) { + listeners.forEach(listener -> listener.onException(output, ex)); + } + } + } +} http://git-wip-us.apache.org/repos/asf/cxf/blob/bd8aff66/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseEventOutputProvider.java ---------------------------------------------------------------------- diff --git a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseEventOutputProvider.java b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseEventOutputProvider.java new file mode 100644 index 0000000..7f7963f --- /dev/null +++ b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseEventOutputProvider.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.cxf.jaxrs.sse; + +import java.io.IOException; +import java.io.OutputStream; +import java.lang.annotation.Annotation; +import java.lang.reflect.Type; + +import javax.ws.rs.WebApplicationException; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.MultivaluedMap; +import javax.ws.rs.ext.MessageBodyWriter; +import javax.ws.rs.ext.Provider; +import javax.ws.rs.sse.SseEventOutput; + +@Provider +public class SseEventOutputProvider implements MessageBodyWriter<SseEventOutput> { + @Override + public boolean isWriteable(Class<?> cls, Type type, Annotation[] anns, MediaType mt) { + return SseEventOutput.class.isAssignableFrom(cls); + } + + @Override + public long getSize(final SseEventOutput output, final Class<?> type, final Type genericType, + final Annotation[] annotations, final MediaType mediaType) { + return -1; + } + + @Override + public void writeTo(final SseEventOutput output, final Class<?> type, final Type genericType, + final Annotation[] annotations, final MediaType mediaType, + final MultivaluedMap<String, Object> httpHeaders, final OutputStream entityStream) + throws IOException, WebApplicationException { + // do nothing. + } +} http://git-wip-us.apache.org/repos/asf/cxf/blob/bd8aff66/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseFeature.java ---------------------------------------------------------------------- diff --git a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseFeature.java b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseFeature.java new file mode 100644 index 0000000..da682a0 --- /dev/null +++ b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseFeature.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.cxf.jaxrs.sse; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.cxf.Bus; +import org.apache.cxf.endpoint.Server; +import org.apache.cxf.feature.AbstractFeature; +import org.apache.cxf.jaxrs.provider.ServerProviderFactory; +import org.apache.cxf.jaxrs.sse.atmosphere.SseAtmosphereContextProvider; + +public class SseFeature extends AbstractFeature { + @Override + public void initialize(Server server, Bus bus) { + final List<Object> providers = new ArrayList<>(); + + providers.add(new SseAtmosphereContextProvider()); + providers.add(new SseEventOutputProvider()); + + ((ServerProviderFactory) server.getEndpoint().get( + ServerProviderFactory.class.getName())).setUserProviders(providers); + } +} http://git-wip-us.apache.org/repos/asf/cxf/blob/bd8aff66/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/atmosphere/SseAtmosphereContextProvider.java ---------------------------------------------------------------------- diff --git a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/atmosphere/SseAtmosphereContextProvider.java b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/atmosphere/SseAtmosphereContextProvider.java new file mode 100644 index 0000000..de2c3a9 --- /dev/null +++ b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/atmosphere/SseAtmosphereContextProvider.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.cxf.jaxrs.sse.atmosphere; + +import javax.servlet.http.HttpServletRequest; +import javax.ws.rs.ext.Provider; +import javax.ws.rs.sse.SseContext; + +import org.apache.cxf.jaxrs.ext.ContextProvider; +import org.apache.cxf.jaxrs.provider.ServerProviderFactory; +import org.apache.cxf.message.Message; +import org.apache.cxf.transport.http.AbstractHTTPDestination; +import org.atmosphere.cpr.AtmosphereResource; +import org.atmosphere.cpr.Broadcaster; + +@Provider +public class SseAtmosphereContextProvider implements ContextProvider<SseContext> { + @Override + public SseContext createContext(Message message) { + final HttpServletRequest request = (HttpServletRequest)message.get(AbstractHTTPDestination.HTTP_REQUEST); + if (request == null) { + throw new IllegalStateException("Unable to retrieve HTTP request from the context"); + } + + final AtmosphereResource resource = (AtmosphereResource)request + .getAttribute(AtmosphereResource.class.getName()); + if (resource == null) { + throw new IllegalStateException("AtmosphereResource is not present, " + + "is AtmosphereServlet configured properly?"); + } + + final Broadcaster broadcaster = resource.getAtmosphereConfig() + .getBroadcasterFactory() + .lookup(resource.uuid(), true); + + resource.removeFromAllBroadcasters(); + resource.setBroadcaster(broadcaster); + + return new SseAtmosphereResourceContext(ServerProviderFactory.getInstance(message), resource); + } +} http://git-wip-us.apache.org/repos/asf/cxf/blob/bd8aff66/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/atmosphere/SseAtmosphereEventOutputImpl.java ---------------------------------------------------------------------- diff --git a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/atmosphere/SseAtmosphereEventOutputImpl.java b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/atmosphere/SseAtmosphereEventOutputImpl.java new file mode 100644 index 0000000..cbf1a26 --- /dev/null +++ b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/atmosphere/SseAtmosphereEventOutputImpl.java @@ -0,0 +1,117 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.cxf.jaxrs.sse.atmosphere; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.lang.annotation.Annotation; +import java.nio.charset.StandardCharsets; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.logging.Logger; + +import javax.ws.rs.ext.MessageBodyWriter; +import javax.ws.rs.sse.OutboundSseEvent; +import javax.ws.rs.sse.SseEventOutput; + +import org.apache.cxf.common.logging.LogUtils; +import org.atmosphere.cpr.AtmosphereResource; +import org.atmosphere.cpr.Broadcaster; + +public class SseAtmosphereEventOutputImpl implements SseEventOutput { + private static final Logger LOG = LogUtils.getL7dLogger(SseAtmosphereEventOutputImpl.class); + + private final AtmosphereResource resource; + private final MessageBodyWriter<OutboundSseEvent> writer; + private volatile boolean closed = false; + + public SseAtmosphereEventOutputImpl(final MessageBodyWriter<OutboundSseEvent> writer, + final AtmosphereResource resource) { + this.writer = writer; + this.resource = resource; + + if (!resource.isSuspended()) { + LOG.fine("Atmosphere resource is not suspended, suspending"); + resource.suspend(); + } + } + + @Override + public void close() throws IOException { + if (!closed) { + closed = true; + + LOG.fine("Closing Atmosphere SSE event output"); + if (resource.isSuspended()) { + LOG.fine("Atmosphere resource is suspended, resuming"); + resource.resume(); + } + + final Broadcaster broadcaster = resource.getBroadcaster(); + resource.removeFromAllBroadcasters(); + + try { + if (!resource.getResponse().isCommitted()) { + LOG.fine("Response is not committed, flushing buffer"); + resource.getResponse().flushBuffer(); + } + } finally { + resource.close(); + broadcaster.destroy(); + LOG.fine("Atmosphere SSE event output is closed"); + } + } + } + + @Override + public void write(OutboundSseEvent event) throws IOException { + if (!closed && writer != null) { + try (final ByteArrayOutputStream os = new ByteArrayOutputStream()) { + writer.writeTo(event, event.getClass(), null, new Annotation [] {}, event.getMediaType(), null, os); + + // Atmosphere broadcasts asynchronously which is acceptable in most cases. + // Unfortunately, calling close() may lead to response stream being closed + // while there are still some SSE delivery scheduled. + final Future<Object> future = resource + .getBroadcaster() + .broadcast(os.toString(StandardCharsets.UTF_8.name())); + + try { + if (!future.isDone()) { + // Let us wait at least 200 milliseconds before returning to ensure + // that SSE had the opportunity to be delivered. + LOG.info("Waiting 200ms to ensure SSE Atmosphere response is delivered"); + future.get(200, TimeUnit.MILLISECONDS); + } + } catch (final ExecutionException | InterruptedException ex) { + throw new IOException(ex); + } catch (final TimeoutException ex) { + LOG.warning("SSE Atmosphere response was not delivered within default timeout"); + } + } + } + } + + @Override + public boolean isClosed() { + return closed; + } +} http://git-wip-us.apache.org/repos/asf/cxf/blob/bd8aff66/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/atmosphere/SseAtmosphereInterceptor.java ---------------------------------------------------------------------- diff --git a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/atmosphere/SseAtmosphereInterceptor.java b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/atmosphere/SseAtmosphereInterceptor.java new file mode 100644 index 0000000..d8bbabc --- /dev/null +++ b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/atmosphere/SseAtmosphereInterceptor.java @@ -0,0 +1,181 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.cxf.jaxrs.sse.atmosphere; + +import java.io.IOException; +import java.io.OutputStream; +import java.io.PrintWriter; +import java.util.logging.Level; +import java.util.logging.Logger; + +import org.apache.cxf.common.logging.LogUtils; +import org.atmosphere.cpr.Action; +import org.atmosphere.cpr.AsyncIOInterceptorAdapter; +import org.atmosphere.cpr.AsyncIOWriter; +import org.atmosphere.cpr.AtmosphereInterceptorWriter; +import org.atmosphere.cpr.AtmosphereRequest; +import org.atmosphere.cpr.AtmosphereResource; +import org.atmosphere.cpr.AtmosphereResourceEvent; +import org.atmosphere.cpr.AtmosphereResourceEventListenerAdapter.OnPreSuspend; +import org.atmosphere.cpr.AtmosphereResponse; +import org.atmosphere.interceptor.AllowInterceptor; +import org.atmosphere.interceptor.SSEAtmosphereInterceptor; +import org.atmosphere.util.Utils; + +import static org.apache.cxf.jaxrs.sse.OutboundSseEventBodyWriter.SERVER_SENT_EVENTS; +import static org.atmosphere.cpr.ApplicationConfig.PROPERTY_USE_STREAM; +import static org.atmosphere.cpr.FrameworkConfig.CALLBACK_JAVASCRIPT_PROTOCOL; +import static org.atmosphere.cpr.FrameworkConfig.CONTAINER_RESPONSE; + +/** + * Most of this class implementation is borrowed from SSEAtmosphereInterceptor. The original + * implementation does two things which do not fit well into SSE support: + * - closes the response stream (overridden by SseAtmosphereInterceptorWriter) + * - wraps the whatever object is being written to SSE payload (overridden using + * the complete SSE protocol) + */ +public class SseAtmosphereInterceptor extends SSEAtmosphereInterceptor { + private static final Logger LOG = LogUtils.getL7dLogger(SseAtmosphereInterceptor.class); + + private static final byte[] PADDING; + private static final String PADDING_TEXT; + private static final byte[] END = "\r\n\r\n".getBytes(); + + static { + StringBuffer whitespace = new StringBuffer(); + for (int i = 0; i < 2000; i++) { + whitespace.append(" "); + } + whitespace.append("\n"); + PADDING_TEXT = whitespace.toString(); + PADDING = PADDING_TEXT.getBytes(); + } + + private boolean writePadding(AtmosphereResponse response) { + if (response.request() != null && response.request().getAttribute("paddingWritten") != null) { + return false; + } + + response.setContentType(SERVER_SENT_EVENTS); + response.setCharacterEncoding("utf-8"); + boolean isUsingStream = (Boolean) response.request().getAttribute(PROPERTY_USE_STREAM); + if (isUsingStream) { + try { + OutputStream stream = response.getResponse().getOutputStream(); + try { + stream.write(PADDING); + stream.flush(); + } catch (IOException ex) { + LOG.log(Level.WARNING, "SSE may not work", ex); + } + } catch (IOException e) { + LOG.log(Level.FINEST, "", e); + } + } else { + try { + PrintWriter w = response.getResponse().getWriter(); + w.println(PADDING_TEXT); + w.flush(); + } catch (IOException e) { + LOG.log(Level.FINEST, "", e); + } + } + response.resource().getRequest().setAttribute("paddingWritten", "true"); + return true; + } + + @Override + public Action inspect(final AtmosphereResource r) { + if (Utils.webSocketMessage(r)) { + return Action.CONTINUE; + } + + final AtmosphereRequest request = r.getRequest(); + final String accept = request.getHeader("Accept") == null ? "text/plain" : request.getHeader("Accept").trim(); + + if (r.transport().equals(AtmosphereResource.TRANSPORT.SSE) || SERVER_SENT_EVENTS.equalsIgnoreCase(accept)) { + final AtmosphereResponse response = r.getResponse(); + if (response.getAsyncIOWriter() == null) { + response.asyncIOWriter(new SseAtmosphereInterceptorWriter()); + } + + r.addEventListener(new P(response)); + + AsyncIOWriter writer = response.getAsyncIOWriter(); + if (AtmosphereInterceptorWriter.class.isAssignableFrom(writer.getClass())) { + AtmosphereInterceptorWriter.class.cast(writer).interceptor(new AsyncIOInterceptorAdapter() { + private boolean padding() { + if (!r.isSuspended()) { + return writePadding(response); + } + return false; + } + + @Override + public void prePayload(AtmosphereResponse response, byte[] data, int offset, int length) { + padding(); + } + + @Override + public void postPayload(AtmosphereResponse response, byte[] data, int offset, int length) { + // The CALLBACK_JAVASCRIPT_PROTOCOL may be called by a framework running on top of Atmosphere + // In that case, we must pad/protocol indenendently of the state of the AtmosphereResource + if (r.isSuspended() || r.getRequest().getAttribute(CALLBACK_JAVASCRIPT_PROTOCOL) != null + || r.getRequest().getAttribute(CONTAINER_RESPONSE) != null) { + response.write(END, true); + } + + /** + * When used with https://github.com/remy/polyfills/blob/master/EventSource.js , we + * resume after every message. + */ + String ua = r.getRequest().getHeader("User-Agent"); + if (ua != null && ua.contains("MSIE")) { + try { + response.flushBuffer(); + } catch (IOException e) { + LOG.log(Level.FINEST, "", e); + } + r.resume(); + } + } + }); + } else { + LOG.warning(String.format("Unable to apply %s. Your AsyncIOWriter must implement %s", + getClass().getName(), AtmosphereInterceptorWriter.class.getName())); + } + } + + return Action.CONTINUE; + } + + private final class P extends OnPreSuspend implements AllowInterceptor { + + private final AtmosphereResponse response; + + private P(AtmosphereResponse response) { + this.response = response; + } + + @Override + public void onPreSuspend(AtmosphereResourceEvent event) { + writePadding(response); + } + } +} http://git-wip-us.apache.org/repos/asf/cxf/blob/bd8aff66/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/atmosphere/SseAtmosphereInterceptorWriter.java ---------------------------------------------------------------------- diff --git a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/atmosphere/SseAtmosphereInterceptorWriter.java b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/atmosphere/SseAtmosphereInterceptorWriter.java new file mode 100644 index 0000000..cfafe87 --- /dev/null +++ b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/atmosphere/SseAtmosphereInterceptorWriter.java @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.cxf.jaxrs.sse.atmosphere; + +import java.io.IOException; + +import org.atmosphere.cpr.AtmosphereInterceptorWriter; +import org.atmosphere.cpr.AtmosphereResponse; + +public class SseAtmosphereInterceptorWriter extends AtmosphereInterceptorWriter { + @Override + public void close(AtmosphereResponse response) throws IOException { + // Do not close the response, keep output stream open + } +} http://git-wip-us.apache.org/repos/asf/cxf/blob/bd8aff66/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/atmosphere/SseAtmosphereResourceContext.java ---------------------------------------------------------------------- diff --git a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/atmosphere/SseAtmosphereResourceContext.java b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/atmosphere/SseAtmosphereResourceContext.java new file mode 100644 index 0000000..c330d6c --- /dev/null +++ b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/atmosphere/SseAtmosphereResourceContext.java @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.cxf.jaxrs.sse.atmosphere; + +import javax.ws.rs.ext.MessageBodyWriter; +import javax.ws.rs.sse.OutboundSseEvent; +import javax.ws.rs.sse.OutboundSseEvent.Builder; +import javax.ws.rs.sse.SseBroadcaster; +import javax.ws.rs.sse.SseContext; +import javax.ws.rs.sse.SseEventOutput; + +import org.apache.cxf.jaxrs.provider.ServerProviderFactory; +import org.apache.cxf.jaxrs.sse.OutboundSseEventBodyWriter; +import org.apache.cxf.jaxrs.sse.OutboundSseEventImpl; +import org.apache.cxf.jaxrs.sse.SseBroadcasterImpl; +import org.apache.cxf.jaxrs.utils.JAXRSUtils; +import org.atmosphere.cpr.AtmosphereResource; + +public class SseAtmosphereResourceContext implements SseContext { + private final AtmosphereResource resource; + private final ServerProviderFactory factory; + + SseAtmosphereResourceContext(final ServerProviderFactory factory, final AtmosphereResource resource) { + this.factory = factory; + this.resource = resource; + } + + @Override + public SseEventOutput newOutput() { + final MessageBodyWriter<OutboundSseEvent> writer = new OutboundSseEventBodyWriter(factory, + JAXRSUtils.getCurrentMessage().getExchange()); + return new SseAtmosphereEventOutputImpl(writer, resource); + } + + @Override + public Builder newEvent() { + return new OutboundSseEventImpl.BuilderImpl(); + } + + @Override + public SseBroadcaster newBroadcaster() { + return new SseBroadcasterImpl(); + } +} http://git-wip-us.apache.org/repos/asf/cxf/blob/bd8aff66/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/cdi/SseTransportCustomizationExtension.java ---------------------------------------------------------------------- diff --git a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/cdi/SseTransportCustomizationExtension.java b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/cdi/SseTransportCustomizationExtension.java new file mode 100644 index 0000000..f4ea862 --- /dev/null +++ b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/cdi/SseTransportCustomizationExtension.java @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.cxf.jaxrs.sse.cdi; + +import org.apache.cxf.cdi.extension.JAXRSServerFactoryCustomizationExtension; +import org.apache.cxf.jaxrs.JAXRSServerFactoryBean; +import org.apache.cxf.transport.sse.SseHttpTransportFactory; + +public class SseTransportCustomizationExtension implements JAXRSServerFactoryCustomizationExtension{ + @Override + public void customize(final JAXRSServerFactoryBean bean) { + bean.setTransportId(SseHttpTransportFactory.TRANSPORT_ID); + } +} http://git-wip-us.apache.org/repos/asf/cxf/blob/bd8aff66/rt/rs/sse/src/main/java/org/apache/cxf/transport/sse/SseHttpTransportFactory.java ---------------------------------------------------------------------- diff --git a/rt/rs/sse/src/main/java/org/apache/cxf/transport/sse/SseHttpTransportFactory.java b/rt/rs/sse/src/main/java/org/apache/cxf/transport/sse/SseHttpTransportFactory.java new file mode 100644 index 0000000..af59327 --- /dev/null +++ b/rt/rs/sse/src/main/java/org/apache/cxf/transport/sse/SseHttpTransportFactory.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.cxf.transport.sse; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; + +import org.apache.cxf.Bus; +import org.apache.cxf.common.injection.NoJSR250Annotations; +import org.apache.cxf.service.model.EndpointInfo; +import org.apache.cxf.transport.ConduitInitiator; +import org.apache.cxf.transport.Destination; +import org.apache.cxf.transport.DestinationFactory; +import org.apache.cxf.transport.http.DestinationRegistry; +import org.apache.cxf.transport.http.HTTPTransportFactory; +import org.apache.cxf.transport.sse.atmosphere.AtmosphereSseServletDestination; + +@NoJSR250Annotations +public class SseHttpTransportFactory extends HTTPTransportFactory + implements ConduitInitiator, DestinationFactory { + + public static final String TRANSPORT_ID = "http://cxf.apache.org/transports/http/sse"; + public static final List<String> DEFAULT_NAMESPACES = Arrays.asList( + TRANSPORT_ID, + "http://cxf.apache.org/transports/http/sse/configuration" + ); + + public SseHttpTransportFactory() { + this(null); + } + + public SseHttpTransportFactory(DestinationRegistry registry) { + super(DEFAULT_NAMESPACES, registry); + } + + @Override + public Destination getDestination(EndpointInfo endpointInfo, Bus bus) throws IOException { + return new AtmosphereSseServletDestination(bus, getRegistry(), endpointInfo, endpointInfo.getAddress()); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/cxf/blob/bd8aff66/rt/rs/sse/src/main/java/org/apache/cxf/transport/sse/atmosphere/AtmosphereSseServletDestination.java ---------------------------------------------------------------------- diff --git a/rt/rs/sse/src/main/java/org/apache/cxf/transport/sse/atmosphere/AtmosphereSseServletDestination.java b/rt/rs/sse/src/main/java/org/apache/cxf/transport/sse/atmosphere/AtmosphereSseServletDestination.java new file mode 100644 index 0000000..15e5c9f --- /dev/null +++ b/rt/rs/sse/src/main/java/org/apache/cxf/transport/sse/atmosphere/AtmosphereSseServletDestination.java @@ -0,0 +1,100 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cxf.transport.sse.atmosphere; + +import java.io.IOException; +import java.util.logging.Level; +import java.util.logging.Logger; + +import javax.servlet.ServletConfig; +import javax.servlet.ServletContext; +import javax.servlet.ServletException; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + +import org.apache.cxf.Bus; +import org.apache.cxf.common.logging.LogUtils; +import org.apache.cxf.jaxrs.sse.SseFeature; +import org.apache.cxf.jaxrs.sse.atmosphere.SseAtmosphereInterceptor; +import org.apache.cxf.service.model.EndpointInfo; +import org.apache.cxf.transport.http.DestinationRegistry; +import org.apache.cxf.transport.servlet.ServletDestination; +import org.atmosphere.cpr.ApplicationConfig; +import org.atmosphere.cpr.AtmosphereFramework; +import org.atmosphere.cpr.AtmosphereRequestImpl; +import org.atmosphere.cpr.AtmosphereResource; +import org.atmosphere.cpr.AtmosphereResponseImpl; +import org.atmosphere.handler.AbstractReflectorAtmosphereHandler; + +public class AtmosphereSseServletDestination extends ServletDestination { + private static final Logger LOG = LogUtils.getL7dLogger(AtmosphereSseServletDestination.class); + + private AtmosphereFramework framework; + + public AtmosphereSseServletDestination(Bus bus, DestinationRegistry registry, + EndpointInfo ei, String path) throws IOException { + super(bus, registry, ei, path); + + framework = new AtmosphereFramework(true, false); + framework.interceptor(new SseAtmosphereInterceptor()); + framework.addInitParameter(ApplicationConfig.PROPERTY_NATIVE_COMETSUPPORT, "true"); + framework.addInitParameter(ApplicationConfig.WEBSOCKET_SUPPORT, "true"); + framework.addInitParameter(ApplicationConfig.DISABLE_ATMOSPHEREINTERCEPTOR, "true"); + framework.addInitParameter(ApplicationConfig.CLOSE_STREAM_ON_CANCEL, "true"); + framework.addAtmosphereHandler("/", new DestinationHandler()); + framework.init(); + + bus.getFeatures().add(new SseFeature()); + } + + @Override + public void invoke(ServletConfig config, ServletContext context, HttpServletRequest req, + HttpServletResponse resp) throws IOException { + try { + framework.doCometSupport(AtmosphereRequestImpl.wrap(req), AtmosphereResponseImpl.wrap(resp)); + } catch (ServletException e) { + throw new IOException(e); + } + } + + @Override + public void shutdown() { + try { + framework.destroy(); + } catch (Exception ex) { + LOG.warning("Graceful shutdown was not successful: " + ex.getMessage()); + } finally { + super.shutdown(); + } + } + + private class DestinationHandler extends AbstractReflectorAtmosphereHandler { + @Override + public void onRequest(final AtmosphereResource resource) throws IOException { + LOG.fine("onRequest"); + try { + AtmosphereSseServletDestination.super.invoke(null, resource.getRequest().getServletContext(), + resource.getRequest(), resource.getResponse()); + } catch (Exception e) { + LOG.log(Level.WARNING, "Failed to invoke service", e); + } + } + } +} http://git-wip-us.apache.org/repos/asf/cxf/blob/bd8aff66/rt/rs/sse/src/main/resources/META-INF/beans.xml ---------------------------------------------------------------------- diff --git a/rt/rs/sse/src/main/resources/META-INF/beans.xml b/rt/rs/sse/src/main/resources/META-INF/beans.xml new file mode 100644 index 0000000..aff28d9 --- /dev/null +++ b/rt/rs/sse/src/main/resources/META-INF/beans.xml @@ -0,0 +1,5 @@ +<?xml version="1.0"?> +<beans xmlns="http://java.sun.com/xml/ns/javaee" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://java.sun.com/xml/ns/javaee http://jboss.org/schema/cdi/beans_1_1.xsd"> +</beans> http://git-wip-us.apache.org/repos/asf/cxf/blob/bd8aff66/rt/rs/sse/src/main/resources/META-INF/cxf/bus-extensions.txt ---------------------------------------------------------------------- diff --git a/rt/rs/sse/src/main/resources/META-INF/cxf/bus-extensions.txt b/rt/rs/sse/src/main/resources/META-INF/cxf/bus-extensions.txt new file mode 100644 index 0000000..643b51c --- /dev/null +++ b/rt/rs/sse/src/main/resources/META-INF/cxf/bus-extensions.txt @@ -0,0 +1 @@ +org.apache.cxf.transport.sse.SseHttpTransportFactory::true \ No newline at end of file http://git-wip-us.apache.org/repos/asf/cxf/blob/bd8aff66/rt/transports/http/src/main/java/org/apache/cxf/transport/http/HTTPTransportFactory.java ---------------------------------------------------------------------- diff --git a/rt/transports/http/src/main/java/org/apache/cxf/transport/http/HTTPTransportFactory.java b/rt/transports/http/src/main/java/org/apache/cxf/transport/http/HTTPTransportFactory.java index 706c8c1..e09d549 100644 --- a/rt/transports/http/src/main/java/org/apache/cxf/transport/http/HTTPTransportFactory.java +++ b/rt/transports/http/src/main/java/org/apache/cxf/transport/http/HTTPTransportFactory.java @@ -87,13 +87,19 @@ public class HTTPTransportFactory public HTTPTransportFactory() { this(new DestinationRegistryImpl()); } + public HTTPTransportFactory(DestinationRegistry registry) { - super(DEFAULT_NAMESPACES); + this(DEFAULT_NAMESPACES, registry); + } + + protected HTTPTransportFactory(List<String> transportIds, DestinationRegistry registry) { + super(transportIds); if (registry == null) { registry = new DestinationRegistryImpl(); } this.registry = registry; } + public DestinationRegistry getRegistry() { return registry; } http://git-wip-us.apache.org/repos/asf/cxf/blob/bd8aff66/rt/transports/http/src/main/java/org/apache/cxf/transport/servlet/CXFNonSpringServlet.java ---------------------------------------------------------------------- diff --git a/rt/transports/http/src/main/java/org/apache/cxf/transport/servlet/CXFNonSpringServlet.java b/rt/transports/http/src/main/java/org/apache/cxf/transport/servlet/CXFNonSpringServlet.java index fe46f4f..37d56b8 100644 --- a/rt/transports/http/src/main/java/org/apache/cxf/transport/servlet/CXFNonSpringServlet.java +++ b/rt/transports/http/src/main/java/org/apache/cxf/transport/servlet/CXFNonSpringServlet.java @@ -38,6 +38,7 @@ import org.apache.cxf.BusException; import org.apache.cxf.BusFactory; import org.apache.cxf.common.classloader.ClassLoaderUtils; import org.apache.cxf.common.classloader.ClassLoaderUtils.ClassLoaderHolder; +import org.apache.cxf.common.util.StringUtils; import org.apache.cxf.helpers.CastUtils; import org.apache.cxf.resource.ResourceManager; import org.apache.cxf.transport.DestinationFactory; @@ -48,6 +49,8 @@ import org.apache.cxf.transport.http.HTTPTransportFactory; import org.apache.cxf.transport.servlet.servicelist.ServiceListGeneratorServlet; public class CXFNonSpringServlet extends AbstractHTTPServlet { + public static final String TRANSPORT_ID = "transportId"; + private static final long serialVersionUID = -2437897227486327166L; private static final String IGNORE_SERVLET_CONTEXT_RESOLVER = "ignore.servlet.context.resolver"; @@ -80,7 +83,7 @@ public class CXFNonSpringServlet extends AbstractHTTPServlet { loader = initClassLoader(); registerServletContextResolver(sc); if (destinationRegistry == null) { - this.destinationRegistry = getDestinationRegistryFromBus(); + this.destinationRegistry = getDestinationRegistryFromBusOrDefault(sc.getInitParameter(TRANSPORT_ID)); } } @@ -101,11 +104,12 @@ public class CXFNonSpringServlet extends AbstractHTTPServlet { return bus.getExtension(ClassLoader.class); } - protected DestinationRegistry getDestinationRegistryFromBus() { + protected DestinationRegistry getDestinationRegistryFromBusOrDefault(final String transportId) { DestinationFactoryManager dfm = bus.getExtension(DestinationFactoryManager.class); try { - DestinationFactory df = dfm - .getDestinationFactory("http://cxf.apache.org/transports/http/configuration"); + DestinationFactory df = StringUtils.isEmpty(transportId) + ? dfm.getDestinationFactory("http://cxf.apache.org/transports/http/configuration") + : dfm.getDestinationFactory(transportId); if (df instanceof HTTPTransportFactory) { HTTPTransportFactory transportFactory = (HTTPTransportFactory)df; return transportFactory.getRegistry(); http://git-wip-us.apache.org/repos/asf/cxf/blob/bd8aff66/systests/pom.xml ---------------------------------------------------------------------- diff --git a/systests/pom.xml b/systests/pom.xml index 3a5ba50..66163b2 100644 --- a/systests/pom.xml +++ b/systests/pom.xml @@ -52,5 +52,6 @@ <module>tracing</module> <module>jibx</module> <module>ws-transfer</module> + <module>rs-sse</module> </modules> </project> http://git-wip-us.apache.org/repos/asf/cxf/blob/bd8aff66/systests/rs-sse/pom.xml ---------------------------------------------------------------------- diff --git a/systests/rs-sse/pom.xml b/systests/rs-sse/pom.xml new file mode 100644 index 0000000..dc3da1a --- /dev/null +++ b/systests/rs-sse/pom.xml @@ -0,0 +1,166 @@ +<?xml version="1.0"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + <parent> + <artifactId>cxf-parent</artifactId> + <groupId>org.apache.cxf</groupId> + <version>3.2.0-SNAPSHOT</version> + <relativePath>../../parent/pom.xml</relativePath> + </parent> + <modelVersion>4.0.0</modelVersion> + <groupId>org.apache.cxf.systests</groupId> + <artifactId>cxf-systests-rs-sse</artifactId> + <name>Apache CXF SSE Integration System Tests</name> + <description>Apache CXF SSE Integration System Tests</description> + <url>http://cxf.apache.org</url> + <properties> + <cxf.surefire.fork.vmargs>-XX:MaxPermSize=192m</cxf.surefire.fork.vmargs> + <cxf.server.launcher.vmargs>-XX:MaxPermSize=192m</cxf.server.launcher.vmargs> + <cxf.jetty.version>${cxf.jetty9.version}</cxf.jetty.version> + <cxf.tomcat.version>8.0.32</cxf.tomcat.version> + </properties> + <dependencies> + <dependency> + <groupId>org.eclipse.jetty</groupId> + <artifactId>jetty-annotations</artifactId> + <version>${cxf.jetty.version}</version> + </dependency> + <dependency> + <groupId>org.eclipse.jetty</groupId> + <artifactId>jetty-server</artifactId> + </dependency> + <dependency> + <groupId>org.eclipse.jetty</groupId> + <artifactId>jetty-plus</artifactId> + </dependency> + <dependency> + <groupId>org.eclipse.jetty.websocket</groupId> + <artifactId>websocket-server</artifactId> + <version>${cxf.jetty.version}</version> + </dependency> + <dependency> + <groupId>org.eclipse.jetty</groupId> + <artifactId>jetty-webapp</artifactId> + </dependency> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-jdk14</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.cxf</groupId> + <artifactId>cxf-core</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.cxf</groupId> + <artifactId>cxf-rt-transports-http-jetty</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.cxf</groupId> + <artifactId>cxf-rt-frontend-jaxrs</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.cxf</groupId> + <artifactId>cxf-rt-rs-client</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.cxf</groupId> + <artifactId>cxf-rt-rs-sse</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.tomcat.embed</groupId> + <artifactId>tomcat-embed-core</artifactId> + <version>${cxf.tomcat.version}</version> + </dependency> + <dependency> + <groupId>org.apache.tomcat.embed</groupId> + <artifactId>tomcat-embed-logging-juli</artifactId> + <version>${cxf.tomcat.version}</version> + </dependency> + <dependency> + <groupId>org.apache.tomcat</groupId> + <artifactId>tomcat-jasper</artifactId> + <version>${cxf.tomcat.version}</version> + </dependency> + <dependency> + <groupId>org.apache.cxf</groupId> + <artifactId>cxf-testutils</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>javax.annotation</groupId> + <artifactId>jsr250-api</artifactId> + <version>1.0</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>com.fasterxml.jackson.jaxrs</groupId> + <artifactId>jackson-jaxrs-json-provider</artifactId> + </dependency> + <dependency> + <groupId>org.apache.httpcomponents</groupId> + <artifactId>httpclient</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>commons-lang</groupId> + <artifactId>commons-lang</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>com.ning</groupId> + <artifactId>async-http-client</artifactId> + <version>${cxf.ahc.version}</version> + <scope>test</scope> + <exclusions> + <exclusion> + <groupId>io.netty</groupId> + <artifactId>netty</artifactId> + </exclusion> + </exclusions> + </dependency> + </dependencies> + <build> + <plugins> + <plugin> + <artifactId>maven-jar-plugin</artifactId> + <executions> + <execution> + <id>attach-sources</id> + <goals> + <goal>test-jar</goal> + </goals> + </execution> + </executions> + </plugin> + </plugins> + </build> +</project> http://git-wip-us.apache.org/repos/asf/cxf/blob/bd8aff66/systests/rs-sse/src/test/java/org/apache/cxf/systest/jaxrs/sse/AbstractSseTest.java ---------------------------------------------------------------------- diff --git a/systests/rs-sse/src/test/java/org/apache/cxf/systest/jaxrs/sse/AbstractSseTest.java b/systests/rs-sse/src/test/java/org/apache/cxf/systest/jaxrs/sse/AbstractSseTest.java new file mode 100644 index 0000000..0cf3087 --- /dev/null +++ b/systests/rs-sse/src/test/java/org/apache/cxf/systest/jaxrs/sse/AbstractSseTest.java @@ -0,0 +1,147 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.cxf.systest.jaxrs.sse; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +import javax.ws.rs.core.HttpHeaders; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; +import javax.ws.rs.core.Response.Status; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider; + +import org.apache.cxf.jaxrs.client.WebClient; +import org.apache.cxf.testutil.common.AbstractBusClientServerTestBase; +import org.junit.Test; + +import static org.hamcrest.CoreMatchers.containsString; +import static org.hamcrest.CoreMatchers.hasItems; + +public abstract class AbstractSseTest extends AbstractBusClientServerTestBase { + private final ObjectMapper mapper = new ObjectMapper(); + + @Test + public void testBooksStreamIsReturned() throws JsonProcessingException { + Response r = createWebClient("/rest/api/bookstore/sse/100").get(); + assertEquals(Status.OK.getStatusCode(), r.getStatus()); + + final String response = r.readEntity(String.class); + assertThat(response, containsString("id: 1")); + assertThat(response, containsString("data: " + toJson("New Book #1", 1))); + + assertThat(response, containsString("id: 2")); + assertThat(response, containsString("data: " + toJson("New Book #2", 2))); + + assertThat(response, containsString("id: 3")); + assertThat(response, containsString("data: " + toJson("New Book #3", 3))); + + assertThat(response, containsString("id: 4")); + assertThat(response, containsString("data: " + toJson("New Book #4", 4))); + } + + @Test + public void testBooksStreamIsReturnedFromLastEventId() throws JsonProcessingException { + Response r = createWebClient("/rest/api/bookstore/sse/100") + .header(HttpHeaders.LAST_EVENT_ID_HEADER, 150) + .get(); + assertEquals(Status.OK.getStatusCode(), r.getStatus()); + + final String response = r.readEntity(String.class); + assertThat(response, containsString("id: 151")); + assertThat(response, containsString("data: " + toJson("New Book #151", 151))); + + assertThat(response, containsString("id: 152")); + assertThat(response, containsString("data: " + toJson("New Book #152", 152))); + + assertThat(response, containsString("id: 152")); + assertThat(response, containsString("data: " + toJson("New Book #153", 153))); + + assertThat(response, containsString("id: 152")); + assertThat(response, containsString("data: " + toJson("New Book #154", 154))); + } + + @Test + public void testBooksAreReturned() throws JsonProcessingException { + Response r = createWebClient("/rest/api/bookstore", MediaType.APPLICATION_JSON).get(); + assertEquals(Status.OK.getStatusCode(), r.getStatus()); + + final Book[] books = r.readEntity(Book[].class); + assertThat(Arrays.asList(books), hasItems(new Book("New Book #1", 1), new Book("New Book #2", 2))); + } + + @Test + public void testBooksStreamIsBroadcasted() throws Exception { + final Collection<Future<Response>> results = new ArrayList<>(); + + for (int i = 0; i < 2; ++i) { + results.add( + createWebClient("/rest/api/bookstore/broadcast/sse").async().get() + ); + } + + createWebClient("/rest/api/bookstore/broadcast/close") + .async() + .post(null) + .get(4, TimeUnit.SECONDS) + .close(); + + for (final Future<Response> result: results) { + final Response r = result.get(1, TimeUnit.SECONDS); + assertEquals(Status.OK.getStatusCode(), r.getStatus()); + + final String response = r.readEntity(String.class); + assertThat(response, containsString("id: 1000")); + assertThat(response, containsString("data: " + toJson("New Book #1000", 1000))); + + assertThat(response, containsString("id: 2000")); + assertThat(response, containsString("data: " + toJson("New Book #2000", 2000))); + + r.close(); + } + } + + private String toJson(final String name, final Integer id) throws JsonProcessingException { + return mapper.writeValueAsString(new Book(name, id)); + } + + protected WebClient createWebClient(final String url, final String media) { + final List< ? > providers = Arrays.asList(new JacksonJsonProvider()); + + final WebClient wc = WebClient + .create("http://localhost:" + getPort() + url, providers) + .accept(media); + + WebClient.getConfig(wc).getHttpConduit().getClient().setReceiveTimeout(5000L); + return wc; + } + + protected WebClient createWebClient(final String url) { + return createWebClient(url, MediaType.SERVER_SENT_EVENTS); + } + + protected abstract int getPort(); +}