Author: pauls
Date: Mon May 22 22:01:23 2017
New Revision: 1795866

URL: http://svn.apache.org/viewvc?rev=1795866&view=rev
Log:
Get a basic POC to work - still needs a lot of clean-up and SSE protocol 
improvements

Modified:
    
sling/whiteboard/pauls/gush/servlet/src/main/java/org/apache/sling/whiteboard/pauls/gush/servlet/GushServlet.java

Modified: 
sling/whiteboard/pauls/gush/servlet/src/main/java/org/apache/sling/whiteboard/pauls/gush/servlet/GushServlet.java
URL: 
http://svn.apache.org/viewvc/sling/whiteboard/pauls/gush/servlet/src/main/java/org/apache/sling/whiteboard/pauls/gush/servlet/GushServlet.java?rev=1795866&r1=1795865&r2=1795866&view=diff
==============================================================================
--- 
sling/whiteboard/pauls/gush/servlet/src/main/java/org/apache/sling/whiteboard/pauls/gush/servlet/GushServlet.java
 (original)
+++ 
sling/whiteboard/pauls/gush/servlet/src/main/java/org/apache/sling/whiteboard/pauls/gush/servlet/GushServlet.java
 Mon May 22 22:01:23 2017
@@ -23,6 +23,8 @@ import java.io.IOException;
 import java.io.OutputStreamWriter;
 import java.io.PrintWriter;
 import java.io.UncheckedIOException;
+import java.io.Writer;
+import java.util.Base64;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.Hashtable;
@@ -42,7 +44,12 @@ import javax.servlet.http.HttpServletReq
 import javax.servlet.http.HttpServletResponse;
 import javax.servlet.http.HttpServletResponseWrapper;
 
+import org.apache.felix.utils.json.JSONWriter;
+import org.apache.sling.api.resource.LoginException;
+import org.apache.sling.api.resource.NonExistingResource;
+import org.apache.sling.api.resource.Resource;
 import org.apache.sling.api.resource.ResourceResolver;
+import org.apache.sling.api.resource.ResourceResolverFactory;
 import 
org.apache.sling.api.resource.observation.ExternalResourceChangeListener;
 import org.apache.sling.api.resource.observation.ResourceChange;
 import org.apache.sling.api.resource.observation.ResourceChangeListener;
@@ -76,9 +83,12 @@ public class GushServlet extends HttpSer
     @Reference
     SlingRequestProcessor m_requestProcessor;
     
+    @Reference
+    ResourceResolverFactory m_ResourceResolverFactory;
+    
     volatile BundleContext m_context;
     
-    private final Map<Listener, ServiceRegistration<Listener>> m_listeners = 
new HashMap<>();
+    private final Map<Listener, ServiceRegistration<ResourceChangeListener>> 
m_listeners = new HashMap<>();
     
     @Activate
     synchronized void activate(BundleContext context) {
@@ -96,14 +106,15 @@ public class GushServlet extends HttpSer
         if (m_context != null)
         {
             Hashtable<String, Object> props = new Hashtable<>();
-            props.put(ResourceChangeListener.PATHS, pattern);
-            ServiceRegistration<Listener> reg = 
m_context.registerService(Listener.class, listener, props);
+            props.put(ResourceChangeListener.PATHS, new String[] {pattern});
+            props.put(ResourceChangeListener.CHANGES, new String[] 
{ResourceChange.ChangeType.ADDED.toString(), 
ResourceChange.ChangeType.CHANGED.toString(), 
ResourceChange.ChangeType.REMOVED.toString()});
+            ServiceRegistration<ResourceChangeListener> reg = 
m_context.registerService(ResourceChangeListener.class, listener, props);
             m_listeners.put(listener, reg);
         }
     }
 
     synchronized void unregisterListener(Listener listener) {
-        ServiceRegistration<Listener> reg = m_listeners.remove(listener);
+        ServiceRegistration<ResourceChangeListener> reg = 
m_listeners.remove(listener);
         if (reg != null)
         {
             reg.unregister();
@@ -112,15 +123,19 @@ public class GushServlet extends HttpSer
 
     private final class Listener implements AsyncListener, 
ResourceChangeListener, ExternalResourceChangeListener{
         AsyncContext m_ac;
-        Listener(AsyncContext ac) {
+        ResourceResolver m_resolver;
+        Listener(AsyncContext ac, ResourceResolver resolver) {
             m_ac = ac;
+            m_resolver = resolver;
         }
         @Override
         public void onChange(List<ResourceChange> changes) {
             try {
-                dispatch((HttpServletRequest) m_ac.getRequest(), 
(HttpServletResponse) m_ac.getResponse());
-            } catch (ServletException | IOException e) {
-                e.printStackTrace();
+                m_resolver.refresh();
+                dispatch((HttpServletRequest) m_ac.getRequest(), 
(HttpServletResponse) m_ac.getResponse(), m_resolver);
+            } catch (Exception e) {
+                unregisterListener(this);
+                m_ac.complete();
             }
         }
         
@@ -138,19 +153,53 @@ public class GushServlet extends HttpSer
         }
         @Override
         public void onStartAsync(AsyncEvent event) throws IOException {
-            
+            //m_ac = event.getAsyncContext();
         }
 
     }
     
-    private void dispatch(HttpServletRequest req, HttpServletResponse resp) 
throws ServletException, IOException {
+    private void dispatch(HttpServletRequest req, HttpServletResponse resp, 
ResourceResolver resolver) throws ServletException, IOException {
+        PrintWriter writer = resp.getWriter(); 
+        
+        if (writer.checkError())
+        {
+            throw new IOException("Closed");
+        }
+        
         final ByteArrayOutputStream output = new ByteArrayOutputStream();
-        output.write(("id: " + (new Date()).getTime() + "\ndata: 
").getBytes("UTF-8"));
         HttpServletResponseWrapper respWrapper = new 
HttpServletResponseWrapper(resp) {
+            private String contentType;
+            private String charset;
+            private boolean commited;
+            @Override
+            public void setContentType(String type) {
+                this.contentType = type;
+            }
+            
+            @Override
+            public String getContentType() {
+                return contentType;
+            }
+            
+            @Override
+            public boolean isCommitted() {
+                return commited;
+            }
+            
+            @Override
+            public void setCharacterEncoding(String charset) {
+                this.charset = charset;
+            }
+            
+            @Override
+            public String getCharacterEncoding() {
+                return this.charset;
+            }
             
             @Override
             public ServletOutputStream getOutputStream() throws IOException
             {
+                commited = true;
                 return new ServletOutputStream()
                 {
                     @Override
@@ -201,34 +250,85 @@ public class GushServlet extends HttpSer
             public void flushBuffer() throws IOException
             {
             }
+            
+            @Override
+            public void sendError(int sc, String msg) throws IOException {
+                System.out.println(sc + " - " + msg);
+            }
+            
+            @Override
+            public void sendError(int sc) throws IOException {
+                System.out.println(sc);
+            }
         };
         
-        m_requestProcessor.processRequest(req, resp, (ResourceResolver) 
req.getAttribute(AuthenticationSupport.REQUEST_ATTRIBUTE_RESOLVER));
+        resolver.refresh();
         
-        output.write("\n\n".getBytes("UTF-8"));
-        resp.getOutputStream().write(output.toByteArray());
+        m_requestProcessor.processRequest(req, respWrapper, resolver);
         
-        resp.flushBuffer();
+        try
+        {
+            writer.write("id: " + (new Date()).getTime() + "\ndata: ");
+            
+            JSONWriter jsonWriter = new JSONWriter(writer);
+            
+            jsonWriter.object();
+            jsonWriter.key("contenttype");
+            jsonWriter.value(respWrapper.getContentType());
+            jsonWriter.key("body");
+            
jsonWriter.value(Base64.getEncoder().encodeToString(output.toByteArray()));
+            jsonWriter.endObject();
+            
+            jsonWriter.flush();
+            
+            writer.write("\n\n");
+            
+            writer.flush();
+            
+            resp.flushBuffer();
+        }
+        catch (Exception ex) {
+            throw new IOException(ex);
+        }
     }
 
     @Override
     protected void doGet(HttpServletRequest req, HttpServletResponse resp) 
throws ServletException, IOException
     {
+        if ("/index.html".equals(req.getPathInfo()))
+        {
+            resp.setContentType("text/html;charset=utf-8");
+            Writer writer = resp.getWriter();
+            
+            writer.write(
+                    "<html><head>"
+                    + "<script>"
+                    + "function b64DecodeUnicode(str) {\n" + 
+                    "    // Going backwards: from bytestream, to 
percent-encoding, to original string.\n" + 
+                    "    return 
decodeURIComponent(atob(str).split('').map(function(c) {\n" + 
+                    "        return '%' + ('00' + 
c.charCodeAt(0).toString(16)).slice(-2);\n" + 
+                    "    }).join(''));\n" + 
+                    "}"
+                    + "var source = new 
EventSource('/gush/content/test.json');\n" + 
+                    "    source.onmessage = function(e) {\n"
+                    + "var foo = JSON.parse(e.data);" + 
+                    "      document.body.innerHTML += foo.contenttype + ' ' + 
b64DecodeUnicode(foo.body)  + '<br>';\n" + 
+                    "    };</script>"
+                    + "</head><body></body></html>"
+            );
+            
+            writer.flush();
+            return;
+        }
         HttpServletRequestWrapper reqWrapper = new 
HttpServletRequestWrapper(req) {
             @Override
             public String getServletPath()
             {
                 return "";
             }
-            
-            @Override
-            public String getPathInfo()
-            {
-                String path = super.getPathInfo();
-                return path != null && path.startsWith("/next/") ? 
path.substring("/next".length()) : path;
-            }
         };
         
+        
         if (req.getHeader("Accept") == null || 
!req.getHeader("Accept").contains("text/event-stream"))
         {
             m_requestProcessor.processRequest(reqWrapper, resp, 
(ResourceResolver) 
req.getAttribute(AuthenticationSupport.REQUEST_ATTRIBUTE_RESOLVER));
@@ -238,27 +338,35 @@ public class GushServlet extends HttpSer
         resp.setContentType("text/event-stream");
         resp.setCharacterEncoding("UTF-8");
         
-        dispatch(reqWrapper, resp);
+       
+        dispatch(reqWrapper, resp, (ResourceResolver) 
req.getAttribute(AuthenticationSupport.REQUEST_ATTRIBUTE_RESOLVER));
         
         AsyncContext ac = req.startAsync(reqWrapper, resp);
         ac.setTimeout(60 * 60 * 1000);
         
-        Listener listener = new Listener(ac);
-       
-        String pattern = reqWrapper.getPathInfo();
-        if (pattern == null) {
-            pattern = "/content/";
-        }
-        else if (!pattern.endsWith("/")){
-           pattern = pattern + "/";
+        Listener listener;
+        try {
+            listener = new Listener(ac, ((ResourceResolver) 
req.getAttribute(AuthenticationSupport.REQUEST_ATTRIBUTE_RESOLVER)).clone(null));
+        } catch (LoginException e) {
+            throw new IOException(e);
         }
         
-        pattern = "=glob:" + pattern + "**";
-        
-        registerListener(pattern, listener);
+        String pattern = reqWrapper.getPathInfo();
+        if (pattern != null) {
+            
+            Resource resource = ((ResourceResolver) 
req.getAttribute(AuthenticationSupport.REQUEST_ATTRIBUTE_RESOLVER)).resolve(req,
 pattern);
+            if (!(resource instanceof NonExistingResource))
+            {
+                pattern = resource.getPath();
+                if (pattern.startsWith("/content/"))
+                {
+                    registerListener(pattern, listener);
+                    
+                    ac.addListener(listener);
+                }
+            }
+        }
         
-        ac.addListener(listener);
         
-        dispatch(reqWrapper, resp);
     }
 }


Reply via email to