This is an automated email from the ASF dual-hosted git repository.

rubys pushed a commit to branch master
in repository https://git-dual.apache.org/repos/asf/whimsy.git

The following commit(s) were added to refs/heads/master by this push:
       new  117473a   board/agenda: s/EventSource/WebSocket/
117473a is described below

commit 117473a0f00b6f3ef60f573c381e7d2c20ace353
Author: Sam Ruby <ru...@intertwingly.net>
AuthorDate: Wed Oct 12 10:33:24 2016 -0400

    board/agenda: s/EventSource/WebSocket/
---
 www/board/agenda/daemon/session.rb                |  23 ++--
 www/board/agenda/daemon/wss.rb                    |  49 +++++---
 www/board/agenda/main.rb                          |   4 +-
 www/board/agenda/models/events.rb                 | 124 --------------------
 www/board/agenda/models/ipc.rb                    | 134 +++++-----------------
 www/board/agenda/public/assets/eventsource.min.js |   6 -
 www/board/agenda/routes.rb                        |  24 ----
 www/board/agenda/views/models/events.js.rb        |  53 +++++----
 www/board/agenda/views/sw.js.rb                   |  29 +----
 9 files changed, 113 insertions(+), 333 deletions(-)

diff --git a/www/board/agenda/daemon/session.rb 
b/www/board/agenda/daemon/session.rb
index 29d70b0..7e99dfd 100644
--- a/www/board/agenda/daemon/session.rb
+++ b/www/board/agenda/daemon/session.rb
@@ -3,6 +3,8 @@ require 'thread'
 require 'securerandom'
 require 'concurrent'
 
+require 'whimsy/asf/config'
+
 #
 # Low-tech, file based session manager.  Each session is stored as a separate
 # file on disk, and expires after two days.  Each request for a new session
@@ -18,6 +20,7 @@ require 'concurrent'
 #
 
 class Session
+  AGENDA_WORK = ASF::Config.get(:agenda_work).untaint || '/srv/agenda'
   WORKDIR = File.expand_path('sessions', AGENDA_WORK)
   DAY = 24*60*60 # seconds
 
@@ -70,16 +73,19 @@ class Session
         secret = File.basename(file)
         session = @@sessions[secret]
 
-        File.delete file if session and session[:mtime] < Time.now - 2 * DAY
 
         if File.exist? file
-          # update class variables if the file changed
-          mtime = File.mtime(file)
-          next if session and session[:mtime] == mtime
+          if File.mtime(file) < Time.now - 2 * DAY
+            File.delete file 
+          else
+            # update class variables if the file changed
+            mtime = File.mtime(file)
+            next if session and session[:mtime] == mtime
 
-          session = {id: File.read(file), secret: secret, mtime: mtime}
-          @@sessions[secret] == session
-          @@users[session[:id]] << session
+            session = {id: File.read(file), secret: secret, mtime: mtime}
+            @@sessions[secret] = session
+            @@users[session[:id]] << session
+          end
         else
           # remove session if the file no longer exists
           @@users[session[:id]].delete(session) if session
@@ -91,4 +97,7 @@ class Session
 
   # ensure the working directory exists
   FileUtils.mkdir_p WORKDIR
+
+  # load initial data from disk
+  self.load
 end
diff --git a/www/board/agenda/daemon/wss.rb b/www/board/agenda/daemon/wss.rb
index ba3c205..2878736 100755
--- a/www/board/agenda/daemon/wss.rb
+++ b/www/board/agenda/daemon/wss.rb
@@ -7,7 +7,10 @@ require 'optparse'
 require 'yaml'
 require 'rbconfig'
 
-clients = []
+require_relative './session'
+
+users = {}
+sessions = {}
 
 ########################################################################
 #                         Parse argument list                          #
@@ -85,7 +88,7 @@ listener.start
 ########################################################################
 
 at_exit do
-  clients.each do |client|
+  sessions.keys.each do |client|
     client.close
   end
 end
@@ -106,13 +109,9 @@ end
 
 EM.run do
   WebSocket::EventMachine::Server.start(server_options) do |ws|
-    ws.onopen do |handshake|
-      ws.comm_inactivity_timeout = options.timeout
-      clients << ws
-    end
-
     ws.onclose do 
-     clients.delete ws
+      id = sessions.delete(ws)
+      users[id].delete ws if id
     end
 
     ws.onmessage do |msg|
@@ -120,13 +119,33 @@ EM.run do
       headers = msg.slice!(/\A(\w+:\s*.*\r?\n)\s*(\n|\Z)/).to_s
       headers = YAML.safe_load(headers) || {} rescue {}
 
-      # echo message to all of the clients
-      clients.each do |client|
-        EM.defer(
-          ->() {client.send msg},
-          ->(response) {},
-          ->(error) {client.close rescue nil}
-        )
+      if headers['session']
+        STDERR.puts headers.inspect
+        session = Session[headers['session']]
+        STDERR.puts session.inspect
+        if session
+          users[session[:id]] = ws
+          sessions[ws] = session[:id]
+        end
+      end
+
+      # forward message
+      unless msg.empty?
+        if headers['private']
+          # send only to a specific user
+          clients = users[headers['private']] || []
+        else
+          # send to all users
+          clients = sessions.keys
+        end
+
+        clients.each do |client|
+          EM.defer(
+            ->() {client.send msg},
+            ->(response) {},
+            ->(error) {client.close rescue nil}
+          )
+        end
       end
     end
   end
diff --git a/www/board/agenda/main.rb b/www/board/agenda/main.rb
index 2cf62c3..018a914 100755
--- a/www/board/agenda/main.rb
+++ b/www/board/agenda/main.rb
@@ -8,7 +8,6 @@ require 'whimsy/asf/agenda'
 
 require 'wunderbar/sinatra'
 require 'wunderbar/react'
-require 'wunderbar/eventsource'
 require 'wunderbar/bootstrap/theme'
 require 'ruby2js/filter/functions'
 require 'ruby2js/filter/require'
@@ -36,13 +35,14 @@ FileUtils.mkdir_p AGENDA_WORK if not Dir.exist? AGENDA_WORK
 require_relative './routes'
 require_relative './models/ipc'
 require_relative './models/pending'
-require_relative './models/events'
 require_relative './models/agenda'
 require_relative './models/minutes'
 require_relative './models/comments'
 require_relative './helpers/string'
 require_relative './daemon/session'
 
+require 'websocket-client-simple'
+
 # if AGENDA_WORK doesn't exist yet, make it
 if not Dir.exist? AGENDA_WORK
   require 'fileutils'
diff --git a/www/board/agenda/models/events.rb 
b/www/board/agenda/models/events.rb
deleted file mode 100644
index 412fd29..0000000
--- a/www/board/agenda/models/events.rb
+++ /dev/null
@@ -1,124 +0,0 @@
-#
-# A centralized subscription service for server side events
-#  * Sends a heartbeat every 25 seconds
-#  * Closes all sockets when restart is detected
-#
-
-class EventService
-  attr_accessor :user
-  attr_accessor :token
-  attr_accessor :queue
-
-  @@subscriptions = {}
-  @@restart = false
-  @@next_token = 1
-  @@cache = Hash.new(mtime: 0)
-
-  # key/value store (for agenda purposes)
-  def self.[](file)
-    @@cache[file]
-  end
-
-  def self.[]=(file, data)
-    @@cache[file] = data
-  end
-
-  # create a new subscription
-  def self.subscribe(user)
-    self.hook_restart
-    present = EventService.present
-    subscriber = EventService.new(user)
-    @@subscriptions[subscriber.token] = subscriber
-    if not present.include? user
-      EventService.post type: :arrive, user: user, 
-        present: EventService.present, timestamp: Time.now.to_f*1000
-    end
-    subscriber.token
-  end
-
-  # post an event to all subscribers
-  def self.post(event)
-    return unless event
-
-    @@subscriptions.each do |token, subscriber|
-      if
-        not Hash === event or not event[:private] or # broadcast
-        event[:private] == subscriber.user           # narrowcast
-      then
-        subscriber.queue << event
-      end
-    end
-    event
-  end
-
-  # list of users present
-  def self.present
-    @@subscriptions.map {|token, subscriber| subscriber.user}.uniq.sort
-  end
-
-  # capture user information associated with this queue
-  def initialize(user)
-    @user = user
-    @token = @@next_token
-    @queue = Queue.new
-    @@next_token += 1
-    super()
-  end
-
-  def self.pop(token)
-    subscription = @@subscriptions[token]
-    subscription.queue.pop if subscription
-  end
-
-  # remove a subscription
-  def self.unsubscribe(token)
-    event = @@subscriptions.delete token
-    present = EventService.present
-    if event and not present.include? event.user
-      EventService.post type: :depart, user: event.user, present: present,
-        timestamp: Time.now.to_f*1000
-    end
-  end
-
-  # When restart signal is detected, close all open connections
-  def self.hook_restart
-    # puma uses SIGUSR2
-    restart_usr2 ||= trap 'SIGUSR2' do
-      restart_usr2.call if Proc === restart_usr2
-      begin
-        EventService.post(:exit)
-      rescue ThreadError
-        # some versions of Ruby don't allow queue operations in traps
-        @restart = true
-      end
-    end
-
-    # thin uses SIGHUP
-    restart_hup ||= trap 'SIGHUP' do
-      restart_hup.call if Proc === restart_hup
-      begin
-        EventService.post(:exit)
-      rescue ThreadError
-        # some versions of Ruby don't allow queue operations in traps
-        @restart = true
-      end
-    end
-  end
-
-  # As some TCP/IP implementations will close idle sockets after as little
-  # as 30 seconds, sent out a heartbeat every 25 seconds.  Due to limitations
-  # of some versions of Ruby (2.0, 2.1), this is lowered to every 5 seconds
-  # in development mode to allow for quicker restarting after a trap/signal.
-  Thread.new do
-    loop do
-      sleep(ENV['RACK_ENV'] == 'development' ? 5 : 25)
-
-      if @restart
-        EventService.post(:exit)
-        @restart = false
-      else
-        EventService.post(:heartbeat)
-      end
-    end
-  end
-end
diff --git a/www/board/agenda/models/ipc.rb b/www/board/agenda/models/ipc.rb
index ecc6f62..a87da8c 100644
--- a/www/board/agenda/models/ipc.rb
+++ b/www/board/agenda/models/ipc.rb
@@ -1,117 +1,37 @@
-#
-# IPC server based on Ruby's DRB.  Manages publish and subscribe queues as
-# well as distributed hash.
-#
-# Key principles:
-#   * Reserves a 9146 as a socket
-#
-#   * All IPC server operations are provided by a single class, and all server
-#     operations are class methods that return primitive (round-trippable to
-#     JSON) objects.  This provides the IPC_Server logic the ability to
-#     consistently capture and recover from DRBConnErrors.
-#
-#   * No I/O or computational intensive processing is provided by the IPC
-#     server.  
-#
+module IPC
 
-require 'drb'
-require 'thread'
-
-class IPC_Server
-  SOCKET = 'druby://:9146'
-
-  attr_accessor :object
-
-  def initialize(object)
-    IPC_Server.start_server
-    @object = object
-  end
-
-  def method_missing(method, *args, &block)
-    loop do
-      begin
-        return @object.send method, *args, &block
-      rescue DRb::DRbConnError => e
-        IPC_Server.start_server
-        sleep 0.1
-      end
-    end
-  end
-
-  def self.start_server
-    pid = fork do
-      # run server code in a separate process
-      ruby = File.join(
-        RbConfig::CONFIG["bindir"],
-        RbConfig::CONFIG["ruby_install_name"] + RbConfig::CONFIG["EXEEXT"]
-      )
-
-      exec(ruby, __FILE__.dup.untaint, '--server-only')
-    end
-
-    Process.detach pid
-
-    at_exit {Process.kill 'INT', pid rescue nil}
-  end
-
-  ENV['RACK_ENV'] ||= 'development'
-end
-
-# launch server or client
-if ENV['RACK_ENV'] == 'test'
-
-  require_relative 'events'
-  IPC = EventService
-
-elsif ARGV[0] == '--server-only'
-
-  if  __FILE__ == $0
-    Signal.trap('INT') {sleep 1; exit}
-    Signal.trap('TERM') {exit}
-    Signal.trap('USR2') {exit}
-
-    require_relative 'events'
-
-    begin
-      DRb.start_service(IPC_Server::SOCKET, EventService)
-      DRb.thread.join
-    rescue Errno::EADDRINUSE => e
-      exit
-    end
+  if Dir.exist? '/etc/letsencrypt'
+    @@url ="wss://127.0.0.1:34234"
+  else
+    @@url ="ws://127.0.0.1:34234"
   end
 
-else
+  def self.post(data)
+    thread = Thread.new do
+      # post to web socket server
+      ws = WebSocket::Client::Simple.connect @@url
 
-  # IPC client
-  IPC = IPC_Server.new(DRbObject.new(nil, IPC_Server::SOCKET))
-
-end
-
-# For demonstration / debugging purposes
-if __FILE__ == $0
-  require 'etc'
-  user = ARGV.pop || Etc.getlogin
-
-  queue = IPC.subscribe(user)
-
-  at_exit do
-    IPC.unsubscribe(queue)
-  end
-
-  Thread.new do
-    loop do
-      event = IPC.pop(queue)
-      if event
-        puts '>> ' + event.inspect
-      else
-        queue = IPC.subscribe(user)
+      begin
+        done = false
+        ws.on :open do
+          if data[:private]
+            headers = "session: #{data[:private]}\n\n"
+          else
+            headers = ''
+          end
+
+          ws.send headers + JSON.dump(data)
+          done = true
+        end
+
+        sleep 0.1 until done
+      ensure
+        ws.close
       end
     end
   end
 
-  loop do
-    data = gets.strip
-    exit if data.empty?
-    IPC.post data
+  def self.present
+    [] # TBD
   end
 end
diff --git a/www/board/agenda/public/assets/eventsource.min.js 
b/www/board/agenda/public/assets/eventsource.min.js
deleted file mode 100644
index 5cacdc6..0000000
--- a/www/board/agenda/public/assets/eventsource.min.js
+++ /dev/null
@@ -1,6 +0,0 @@
-/** @license
- * eventsource.js
- * Available under MIT License (MIT)
- * https://github.com/Yaffle/EventSource/
- */
-!function(a){"use strict";function b(){this.data={}}function 
c(){this.listeners=new b}function d(a){setTimeout(function(){throw 
a},0)}function e(a){this.type=a,this.target=void 0}function 
f(a,b){e.call(this,a),this.data=b.data,this.lastEventId=b.lastEventId}function 
x(a,b){var c=a;return c!==c&&(c=b),v>c?v:c>w?w:c}function 
y(a,b,c){try{"function"==typeof b&&b.call(a,c)}catch(e){d(e)}}function 
z(b,d){function P(){H=o,void 0!==D&&(D.abort(),D=void 
0),0!==E&&(clearTimeout(E),E=0),0!==F&&(cl [...]
diff --git a/www/board/agenda/routes.rb b/www/board/agenda/routes.rb
index 5ab1e82..e77d738 100755
--- a/www/board/agenda/routes.rb
+++ b/www/board/agenda/routes.rb
@@ -234,30 +234,6 @@ get '/json/historical-comments' do
   _json HistoricalComments.comments
 end
 
-# event stream for server sent events (a.k.a EventSource)
-get '/events', provides: 'text/event-stream' do
-  stream :keep_open do |out|
-    subscription = IPC.subscribe(env.user)
-    out.callback {IPC.unsubscribe(subscription)}
-
-    loop do
-      event = IPC.pop(subscription)
-      if Hash === event or Array === event
-        out << "data: #{JSON.dump(event)}\n\n"
-      elsif event == :heartbeat
-        out << ":\n"
-      elsif event == :exit
-        out.close
-        break
-      elsif event == nil
-        subscription = IPC.subscribe(env.user)
-      else
-        out << "data: #{event.inspect}\n\n"
-      end
-    end
-  end
-end
-
 # draft minutes
 get '/text/draft/:file' do |file|
   agenda = "board_agenda_#{file.gsub('-','_')}.txt".untaint
diff --git a/www/board/agenda/views/models/events.js.rb 
b/www/board/agenda/views/models/events.js.rb
index 351b8ae..cbad860 100644
--- a/www/board/agenda/views/models/events.js.rb
+++ b/www/board/agenda/views/models/events.js.rb
@@ -1,7 +1,7 @@
 #
-# Motivation: browsers limit the number of open HTTP connections to any
-# one host to somewhere between 4-10.  "Long polling" keeps an HTTP
-# connection open making it impractical to have one EventSource per tab.
+# Motivation: browsers limit the number of open web socket connections to any
+# one host to somewhere between 6 and 250, making it impractical to have one
+# Web Socket per tab.
 #
 # The solution below uses localStorage to communicate between tabs, with
 # the majority of logic involved with the "election" of a master.  This
@@ -9,18 +9,10 @@
 #
 # Alternatives include: 
 #
-# * Replacing Server Side Events with Web Sockets which will require more open
-#   connections (and therefore server load) as well as require special proxy
-#   configuration (mod_proxy_wstunnel) and server coding (faye-websocket or
-#   equivalent).
-#
 # * Replacing localStorage with Service Workers.  This would be much cleaner,
 #   unfortunately Service Workers aren't widely deployed yet.  Sadly, the
 #   state isn't much better for Shared Web Workers.
 #
-# Notable downside to Server Side Events is lack of native support by IE.
-# This is readily addressed by a polyfill.
-#
 ###
 #
 # Class variables:
@@ -33,6 +25,7 @@
 
 class Events
   @@subscriptions = {}
+  @@socket = nil
 
   def self.subscribe event, &block
     @@subscriptions[event] ||= []
@@ -109,23 +102,43 @@ class Events
 
   # master logic
   def self.master()
-    events = EventSource.new('../events')
+    self.connectToServer()
 
-    # dispatch events received to all windows
-    events.addEventListener :message do |event|
-      localStorage.setItem("#{@@prefix}-event", event.data)
-      self.dispatch event.data
-    end
-
-    # proof of life
+    # proof of life; maintain connection to the server
     setTimeout 25_000 do
       localStorage.setItem("#{@@prefix}-timestamp", Date.new().getTime())
+      self.connectToServer()
     end
 
     # close connection on exit
     window.addEventListener :unload do |event|
-      events.close()
+      @@socket.close() if @@socket
+    end
+  end
+
+  # establish a connection to the server
+  def self.connectToServer()
+    return if @@socket
+
+    socket_url = window.location.protocol.sub('http', 'ws') + "//" + 
+        window.location.hostname + ':34234/'
+
+    @@socket = WebSocket.new(socket_url)
+
+    def @@socket.onopen(event)
+      @@socket.send "session: #{Server.session}\n\n"
+    end
+
+    def @@socket.onmessage(event)
+      localStorage.setItem("#{@@prefix}-event", event.data)
+      self.dispatch event.data
+    end
+
+    def @@socket.onclose(event)
+      @@socket = nil
     end
+  rescue => e
+    console.log e
   end
 
   # dispatch logic (common to all tabs)
diff --git a/www/board/agenda/views/sw.js.rb b/www/board/agenda/views/sw.js.rb
index d8876c7..7100d65 100644
--- a/www/board/agenda/views/sw.js.rb
+++ b/www/board/agenda/views/sw.js.rb
@@ -1,38 +1,11 @@
 #
 # A very simple service worker
 #
-#   1) Create an event source and pass all events received to all clients
-#
-#   2) Return back cached bootstrap page instead of fetching agenda pages
+#   *) Return back cached bootstrap page instead of fetching agenda pages
 #      from the network.  Bootstrap will construct page from cached
 #      agenda.json, as well as update the cache.
 # 
 
-events = nil
-
-self.addEventListener :activate do |event|
-  # close any pre-existing event socket
-  if events
-    begin
-      events.close()
-    rescue => e
-      events = nil
-    end
-  end
- 
-  # create a new event source
-  events = EventSource.new('events')
- 
-  # dispatch any events received to clients
-  events.addEventListener :message do |event|
-    clients.matchAll().then do |list|
-      list.each do |client|
-        client.postMessage(event.data)
-      end
-    end
-  end
-end
-
 self.addEventListener :fetch do |event|
   scope = self.registration.scope
   url = event.request.url

-- 
To stop receiving notification emails like this one, please contact
['"commits@whimsical.apache.org" <commits@whimsical.apache.org>'].

Reply via email to