Author: pauls Date: Mon May 22 22:01:23 2017 New Revision: 1795866 URL: http://svn.apache.org/viewvc?rev=1795866&view=rev Log: Get a basic POC to work - still needs a lot of clean-up and SSE protocol improvements
Modified: sling/whiteboard/pauls/gush/servlet/src/main/java/org/apache/sling/whiteboard/pauls/gush/servlet/GushServlet.java Modified: sling/whiteboard/pauls/gush/servlet/src/main/java/org/apache/sling/whiteboard/pauls/gush/servlet/GushServlet.java URL: http://svn.apache.org/viewvc/sling/whiteboard/pauls/gush/servlet/src/main/java/org/apache/sling/whiteboard/pauls/gush/servlet/GushServlet.java?rev=1795866&r1=1795865&r2=1795866&view=diff ============================================================================== --- sling/whiteboard/pauls/gush/servlet/src/main/java/org/apache/sling/whiteboard/pauls/gush/servlet/GushServlet.java (original) +++ sling/whiteboard/pauls/gush/servlet/src/main/java/org/apache/sling/whiteboard/pauls/gush/servlet/GushServlet.java Mon May 22 22:01:23 2017 @@ -23,6 +23,8 @@ import java.io.IOException; import java.io.OutputStreamWriter; import java.io.PrintWriter; import java.io.UncheckedIOException; +import java.io.Writer; +import java.util.Base64; import java.util.Date; import java.util.HashMap; import java.util.Hashtable; @@ -42,7 +44,12 @@ import javax.servlet.http.HttpServletReq import javax.servlet.http.HttpServletResponse; import javax.servlet.http.HttpServletResponseWrapper; +import org.apache.felix.utils.json.JSONWriter; +import org.apache.sling.api.resource.LoginException; +import org.apache.sling.api.resource.NonExistingResource; +import org.apache.sling.api.resource.Resource; import org.apache.sling.api.resource.ResourceResolver; +import org.apache.sling.api.resource.ResourceResolverFactory; import org.apache.sling.api.resource.observation.ExternalResourceChangeListener; import org.apache.sling.api.resource.observation.ResourceChange; import org.apache.sling.api.resource.observation.ResourceChangeListener; @@ -76,9 +83,12 @@ public class GushServlet extends HttpSer @Reference SlingRequestProcessor m_requestProcessor; + @Reference + ResourceResolverFactory m_ResourceResolverFactory; + volatile BundleContext m_context; - private final Map<Listener, ServiceRegistration<Listener>> m_listeners = new HashMap<>(); + private final Map<Listener, ServiceRegistration<ResourceChangeListener>> m_listeners = new HashMap<>(); @Activate synchronized void activate(BundleContext context) { @@ -96,14 +106,15 @@ public class GushServlet extends HttpSer if (m_context != null) { Hashtable<String, Object> props = new Hashtable<>(); - props.put(ResourceChangeListener.PATHS, pattern); - ServiceRegistration<Listener> reg = m_context.registerService(Listener.class, listener, props); + props.put(ResourceChangeListener.PATHS, new String[] {pattern}); + props.put(ResourceChangeListener.CHANGES, new String[] {ResourceChange.ChangeType.ADDED.toString(), ResourceChange.ChangeType.CHANGED.toString(), ResourceChange.ChangeType.REMOVED.toString()}); + ServiceRegistration<ResourceChangeListener> reg = m_context.registerService(ResourceChangeListener.class, listener, props); m_listeners.put(listener, reg); } } synchronized void unregisterListener(Listener listener) { - ServiceRegistration<Listener> reg = m_listeners.remove(listener); + ServiceRegistration<ResourceChangeListener> reg = m_listeners.remove(listener); if (reg != null) { reg.unregister(); @@ -112,15 +123,19 @@ public class GushServlet extends HttpSer private final class Listener implements AsyncListener, ResourceChangeListener, ExternalResourceChangeListener{ AsyncContext m_ac; - Listener(AsyncContext ac) { + ResourceResolver m_resolver; + Listener(AsyncContext ac, ResourceResolver resolver) { m_ac = ac; + m_resolver = resolver; } @Override public void onChange(List<ResourceChange> changes) { try { - dispatch((HttpServletRequest) m_ac.getRequest(), (HttpServletResponse) m_ac.getResponse()); - } catch (ServletException | IOException e) { - e.printStackTrace(); + m_resolver.refresh(); + dispatch((HttpServletRequest) m_ac.getRequest(), (HttpServletResponse) m_ac.getResponse(), m_resolver); + } catch (Exception e) { + unregisterListener(this); + m_ac.complete(); } } @@ -138,19 +153,53 @@ public class GushServlet extends HttpSer } @Override public void onStartAsync(AsyncEvent event) throws IOException { - + //m_ac = event.getAsyncContext(); } } - private void dispatch(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException { + private void dispatch(HttpServletRequest req, HttpServletResponse resp, ResourceResolver resolver) throws ServletException, IOException { + PrintWriter writer = resp.getWriter(); + + if (writer.checkError()) + { + throw new IOException("Closed"); + } + final ByteArrayOutputStream output = new ByteArrayOutputStream(); - output.write(("id: " + (new Date()).getTime() + "\ndata: ").getBytes("UTF-8")); HttpServletResponseWrapper respWrapper = new HttpServletResponseWrapper(resp) { + private String contentType; + private String charset; + private boolean commited; + @Override + public void setContentType(String type) { + this.contentType = type; + } + + @Override + public String getContentType() { + return contentType; + } + + @Override + public boolean isCommitted() { + return commited; + } + + @Override + public void setCharacterEncoding(String charset) { + this.charset = charset; + } + + @Override + public String getCharacterEncoding() { + return this.charset; + } @Override public ServletOutputStream getOutputStream() throws IOException { + commited = true; return new ServletOutputStream() { @Override @@ -201,34 +250,85 @@ public class GushServlet extends HttpSer public void flushBuffer() throws IOException { } + + @Override + public void sendError(int sc, String msg) throws IOException { + System.out.println(sc + " - " + msg); + } + + @Override + public void sendError(int sc) throws IOException { + System.out.println(sc); + } }; - m_requestProcessor.processRequest(req, resp, (ResourceResolver) req.getAttribute(AuthenticationSupport.REQUEST_ATTRIBUTE_RESOLVER)); + resolver.refresh(); - output.write("\n\n".getBytes("UTF-8")); - resp.getOutputStream().write(output.toByteArray()); + m_requestProcessor.processRequest(req, respWrapper, resolver); - resp.flushBuffer(); + try + { + writer.write("id: " + (new Date()).getTime() + "\ndata: "); + + JSONWriter jsonWriter = new JSONWriter(writer); + + jsonWriter.object(); + jsonWriter.key("contenttype"); + jsonWriter.value(respWrapper.getContentType()); + jsonWriter.key("body"); + jsonWriter.value(Base64.getEncoder().encodeToString(output.toByteArray())); + jsonWriter.endObject(); + + jsonWriter.flush(); + + writer.write("\n\n"); + + writer.flush(); + + resp.flushBuffer(); + } + catch (Exception ex) { + throw new IOException(ex); + } } @Override protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException { + if ("/index.html".equals(req.getPathInfo())) + { + resp.setContentType("text/html;charset=utf-8"); + Writer writer = resp.getWriter(); + + writer.write( + "<html><head>" + + "<script>" + + "function b64DecodeUnicode(str) {\n" + + " // Going backwards: from bytestream, to percent-encoding, to original string.\n" + + " return decodeURIComponent(atob(str).split('').map(function(c) {\n" + + " return '%' + ('00' + c.charCodeAt(0).toString(16)).slice(-2);\n" + + " }).join(''));\n" + + "}" + + "var source = new EventSource('/gush/content/test.json');\n" + + " source.onmessage = function(e) {\n" + + "var foo = JSON.parse(e.data);" + + " document.body.innerHTML += foo.contenttype + ' ' + b64DecodeUnicode(foo.body) + '<br>';\n" + + " };</script>" + + "</head><body></body></html>" + ); + + writer.flush(); + return; + } HttpServletRequestWrapper reqWrapper = new HttpServletRequestWrapper(req) { @Override public String getServletPath() { return ""; } - - @Override - public String getPathInfo() - { - String path = super.getPathInfo(); - return path != null && path.startsWith("/next/") ? path.substring("/next".length()) : path; - } }; + if (req.getHeader("Accept") == null || !req.getHeader("Accept").contains("text/event-stream")) { m_requestProcessor.processRequest(reqWrapper, resp, (ResourceResolver) req.getAttribute(AuthenticationSupport.REQUEST_ATTRIBUTE_RESOLVER)); @@ -238,27 +338,35 @@ public class GushServlet extends HttpSer resp.setContentType("text/event-stream"); resp.setCharacterEncoding("UTF-8"); - dispatch(reqWrapper, resp); + + dispatch(reqWrapper, resp, (ResourceResolver) req.getAttribute(AuthenticationSupport.REQUEST_ATTRIBUTE_RESOLVER)); AsyncContext ac = req.startAsync(reqWrapper, resp); ac.setTimeout(60 * 60 * 1000); - Listener listener = new Listener(ac); - - String pattern = reqWrapper.getPathInfo(); - if (pattern == null) { - pattern = "/content/"; - } - else if (!pattern.endsWith("/")){ - pattern = pattern + "/"; + Listener listener; + try { + listener = new Listener(ac, ((ResourceResolver) req.getAttribute(AuthenticationSupport.REQUEST_ATTRIBUTE_RESOLVER)).clone(null)); + } catch (LoginException e) { + throw new IOException(e); } - pattern = "=glob:" + pattern + "**"; - - registerListener(pattern, listener); + String pattern = reqWrapper.getPathInfo(); + if (pattern != null) { + + Resource resource = ((ResourceResolver) req.getAttribute(AuthenticationSupport.REQUEST_ATTRIBUTE_RESOLVER)).resolve(req, pattern); + if (!(resource instanceof NonExistingResource)) + { + pattern = resource.getPath(); + if (pattern.startsWith("/content/")) + { + registerListener(pattern, listener); + + ac.addListener(listener); + } + } + } - ac.addListener(listener); - dispatch(reqWrapper, resp); } }