This is an automated email from the ASF dual-hosted git repository. rubys pushed a commit to branch master in repository https://git-dual.apache.org/repos/asf/whimsy.git
The following commit(s) were added to refs/heads/master by this push: new 117473a board/agenda: s/EventSource/WebSocket/ 117473a is described below commit 117473a0f00b6f3ef60f573c381e7d2c20ace353 Author: Sam Ruby <ru...@intertwingly.net> AuthorDate: Wed Oct 12 10:33:24 2016 -0400 board/agenda: s/EventSource/WebSocket/ --- www/board/agenda/daemon/session.rb | 23 ++-- www/board/agenda/daemon/wss.rb | 49 +++++--- www/board/agenda/main.rb | 4 +- www/board/agenda/models/events.rb | 124 -------------------- www/board/agenda/models/ipc.rb | 134 +++++----------------- www/board/agenda/public/assets/eventsource.min.js | 6 - www/board/agenda/routes.rb | 24 ---- www/board/agenda/views/models/events.js.rb | 53 +++++---- www/board/agenda/views/sw.js.rb | 29 +---- 9 files changed, 113 insertions(+), 333 deletions(-) diff --git a/www/board/agenda/daemon/session.rb b/www/board/agenda/daemon/session.rb index 29d70b0..7e99dfd 100644 --- a/www/board/agenda/daemon/session.rb +++ b/www/board/agenda/daemon/session.rb @@ -3,6 +3,8 @@ require 'thread' require 'securerandom' require 'concurrent' +require 'whimsy/asf/config' + # # Low-tech, file based session manager. Each session is stored as a separate # file on disk, and expires after two days. Each request for a new session @@ -18,6 +20,7 @@ require 'concurrent' # class Session + AGENDA_WORK = ASF::Config.get(:agenda_work).untaint || '/srv/agenda' WORKDIR = File.expand_path('sessions', AGENDA_WORK) DAY = 24*60*60 # seconds @@ -70,16 +73,19 @@ class Session secret = File.basename(file) session = @@sessions[secret] - File.delete file if session and session[:mtime] < Time.now - 2 * DAY if File.exist? file - # update class variables if the file changed - mtime = File.mtime(file) - next if session and session[:mtime] == mtime + if File.mtime(file) < Time.now - 2 * DAY + File.delete file + else + # update class variables if the file changed + mtime = File.mtime(file) + next if session and session[:mtime] == mtime - session = {id: File.read(file), secret: secret, mtime: mtime} - @@sessions[secret] == session - @@users[session[:id]] << session + session = {id: File.read(file), secret: secret, mtime: mtime} + @@sessions[secret] = session + @@users[session[:id]] << session + end else # remove session if the file no longer exists @@users[session[:id]].delete(session) if session @@ -91,4 +97,7 @@ class Session # ensure the working directory exists FileUtils.mkdir_p WORKDIR + + # load initial data from disk + self.load end diff --git a/www/board/agenda/daemon/wss.rb b/www/board/agenda/daemon/wss.rb index ba3c205..2878736 100755 --- a/www/board/agenda/daemon/wss.rb +++ b/www/board/agenda/daemon/wss.rb @@ -7,7 +7,10 @@ require 'optparse' require 'yaml' require 'rbconfig' -clients = [] +require_relative './session' + +users = {} +sessions = {} ######################################################################## # Parse argument list # @@ -85,7 +88,7 @@ listener.start ######################################################################## at_exit do - clients.each do |client| + sessions.keys.each do |client| client.close end end @@ -106,13 +109,9 @@ end EM.run do WebSocket::EventMachine::Server.start(server_options) do |ws| - ws.onopen do |handshake| - ws.comm_inactivity_timeout = options.timeout - clients << ws - end - ws.onclose do - clients.delete ws + id = sessions.delete(ws) + users[id].delete ws if id end ws.onmessage do |msg| @@ -120,13 +119,33 @@ EM.run do headers = msg.slice!(/\A(\w+:\s*.*\r?\n)\s*(\n|\Z)/).to_s headers = YAML.safe_load(headers) || {} rescue {} - # echo message to all of the clients - clients.each do |client| - EM.defer( - ->() {client.send msg}, - ->(response) {}, - ->(error) {client.close rescue nil} - ) + if headers['session'] + STDERR.puts headers.inspect + session = Session[headers['session']] + STDERR.puts session.inspect + if session + users[session[:id]] = ws + sessions[ws] = session[:id] + end + end + + # forward message + unless msg.empty? + if headers['private'] + # send only to a specific user + clients = users[headers['private']] || [] + else + # send to all users + clients = sessions.keys + end + + clients.each do |client| + EM.defer( + ->() {client.send msg}, + ->(response) {}, + ->(error) {client.close rescue nil} + ) + end end end end diff --git a/www/board/agenda/main.rb b/www/board/agenda/main.rb index 2cf62c3..018a914 100755 --- a/www/board/agenda/main.rb +++ b/www/board/agenda/main.rb @@ -8,7 +8,6 @@ require 'whimsy/asf/agenda' require 'wunderbar/sinatra' require 'wunderbar/react' -require 'wunderbar/eventsource' require 'wunderbar/bootstrap/theme' require 'ruby2js/filter/functions' require 'ruby2js/filter/require' @@ -36,13 +35,14 @@ FileUtils.mkdir_p AGENDA_WORK if not Dir.exist? AGENDA_WORK require_relative './routes' require_relative './models/ipc' require_relative './models/pending' -require_relative './models/events' require_relative './models/agenda' require_relative './models/minutes' require_relative './models/comments' require_relative './helpers/string' require_relative './daemon/session' +require 'websocket-client-simple' + # if AGENDA_WORK doesn't exist yet, make it if not Dir.exist? AGENDA_WORK require 'fileutils' diff --git a/www/board/agenda/models/events.rb b/www/board/agenda/models/events.rb deleted file mode 100644 index 412fd29..0000000 --- a/www/board/agenda/models/events.rb +++ /dev/null @@ -1,124 +0,0 @@ -# -# A centralized subscription service for server side events -# * Sends a heartbeat every 25 seconds -# * Closes all sockets when restart is detected -# - -class EventService - attr_accessor :user - attr_accessor :token - attr_accessor :queue - - @@subscriptions = {} - @@restart = false - @@next_token = 1 - @@cache = Hash.new(mtime: 0) - - # key/value store (for agenda purposes) - def self.[](file) - @@cache[file] - end - - def self.[]=(file, data) - @@cache[file] = data - end - - # create a new subscription - def self.subscribe(user) - self.hook_restart - present = EventService.present - subscriber = EventService.new(user) - @@subscriptions[subscriber.token] = subscriber - if not present.include? user - EventService.post type: :arrive, user: user, - present: EventService.present, timestamp: Time.now.to_f*1000 - end - subscriber.token - end - - # post an event to all subscribers - def self.post(event) - return unless event - - @@subscriptions.each do |token, subscriber| - if - not Hash === event or not event[:private] or # broadcast - event[:private] == subscriber.user # narrowcast - then - subscriber.queue << event - end - end - event - end - - # list of users present - def self.present - @@subscriptions.map {|token, subscriber| subscriber.user}.uniq.sort - end - - # capture user information associated with this queue - def initialize(user) - @user = user - @token = @@next_token - @queue = Queue.new - @@next_token += 1 - super() - end - - def self.pop(token) - subscription = @@subscriptions[token] - subscription.queue.pop if subscription - end - - # remove a subscription - def self.unsubscribe(token) - event = @@subscriptions.delete token - present = EventService.present - if event and not present.include? event.user - EventService.post type: :depart, user: event.user, present: present, - timestamp: Time.now.to_f*1000 - end - end - - # When restart signal is detected, close all open connections - def self.hook_restart - # puma uses SIGUSR2 - restart_usr2 ||= trap 'SIGUSR2' do - restart_usr2.call if Proc === restart_usr2 - begin - EventService.post(:exit) - rescue ThreadError - # some versions of Ruby don't allow queue operations in traps - @restart = true - end - end - - # thin uses SIGHUP - restart_hup ||= trap 'SIGHUP' do - restart_hup.call if Proc === restart_hup - begin - EventService.post(:exit) - rescue ThreadError - # some versions of Ruby don't allow queue operations in traps - @restart = true - end - end - end - - # As some TCP/IP implementations will close idle sockets after as little - # as 30 seconds, sent out a heartbeat every 25 seconds. Due to limitations - # of some versions of Ruby (2.0, 2.1), this is lowered to every 5 seconds - # in development mode to allow for quicker restarting after a trap/signal. - Thread.new do - loop do - sleep(ENV['RACK_ENV'] == 'development' ? 5 : 25) - - if @restart - EventService.post(:exit) - @restart = false - else - EventService.post(:heartbeat) - end - end - end -end diff --git a/www/board/agenda/models/ipc.rb b/www/board/agenda/models/ipc.rb index ecc6f62..a87da8c 100644 --- a/www/board/agenda/models/ipc.rb +++ b/www/board/agenda/models/ipc.rb @@ -1,117 +1,37 @@ -# -# IPC server based on Ruby's DRB. Manages publish and subscribe queues as -# well as distributed hash. -# -# Key principles: -# * Reserves a 9146 as a socket -# -# * All IPC server operations are provided by a single class, and all server -# operations are class methods that return primitive (round-trippable to -# JSON) objects. This provides the IPC_Server logic the ability to -# consistently capture and recover from DRBConnErrors. -# -# * No I/O or computational intensive processing is provided by the IPC -# server. -# +module IPC -require 'drb' -require 'thread' - -class IPC_Server - SOCKET = 'druby://:9146' - - attr_accessor :object - - def initialize(object) - IPC_Server.start_server - @object = object - end - - def method_missing(method, *args, &block) - loop do - begin - return @object.send method, *args, &block - rescue DRb::DRbConnError => e - IPC_Server.start_server - sleep 0.1 - end - end - end - - def self.start_server - pid = fork do - # run server code in a separate process - ruby = File.join( - RbConfig::CONFIG["bindir"], - RbConfig::CONFIG["ruby_install_name"] + RbConfig::CONFIG["EXEEXT"] - ) - - exec(ruby, __FILE__.dup.untaint, '--server-only') - end - - Process.detach pid - - at_exit {Process.kill 'INT', pid rescue nil} - end - - ENV['RACK_ENV'] ||= 'development' -end - -# launch server or client -if ENV['RACK_ENV'] == 'test' - - require_relative 'events' - IPC = EventService - -elsif ARGV[0] == '--server-only' - - if __FILE__ == $0 - Signal.trap('INT') {sleep 1; exit} - Signal.trap('TERM') {exit} - Signal.trap('USR2') {exit} - - require_relative 'events' - - begin - DRb.start_service(IPC_Server::SOCKET, EventService) - DRb.thread.join - rescue Errno::EADDRINUSE => e - exit - end + if Dir.exist? '/etc/letsencrypt' + @@url ="wss://127.0.0.1:34234" + else + @@url ="ws://127.0.0.1:34234" end -else + def self.post(data) + thread = Thread.new do + # post to web socket server + ws = WebSocket::Client::Simple.connect @@url - # IPC client - IPC = IPC_Server.new(DRbObject.new(nil, IPC_Server::SOCKET)) - -end - -# For demonstration / debugging purposes -if __FILE__ == $0 - require 'etc' - user = ARGV.pop || Etc.getlogin - - queue = IPC.subscribe(user) - - at_exit do - IPC.unsubscribe(queue) - end - - Thread.new do - loop do - event = IPC.pop(queue) - if event - puts '>> ' + event.inspect - else - queue = IPC.subscribe(user) + begin + done = false + ws.on :open do + if data[:private] + headers = "session: #{data[:private]}\n\n" + else + headers = '' + end + + ws.send headers + JSON.dump(data) + done = true + end + + sleep 0.1 until done + ensure + ws.close end end end - loop do - data = gets.strip - exit if data.empty? - IPC.post data + def self.present + [] # TBD end end diff --git a/www/board/agenda/public/assets/eventsource.min.js b/www/board/agenda/public/assets/eventsource.min.js deleted file mode 100644 index 5cacdc6..0000000 --- a/www/board/agenda/public/assets/eventsource.min.js +++ /dev/null @@ -1,6 +0,0 @@ -/** @license - * eventsource.js - * Available under MIT License (MIT) - * https://github.com/Yaffle/EventSource/ - */ -!function(a){"use strict";function b(){this.data={}}function c(){this.listeners=new b}function d(a){setTimeout(function(){throw a},0)}function e(a){this.type=a,this.target=void 0}function f(a,b){e.call(this,a),this.data=b.data,this.lastEventId=b.lastEventId}function x(a,b){var c=a;return c!==c&&(c=b),v>c?v:c>w?w:c}function y(a,b,c){try{"function"==typeof b&&b.call(a,c)}catch(e){d(e)}}function z(b,d){function P(){H=o,void 0!==D&&(D.abort(),D=void 0),0!==E&&(clearTimeout(E),E=0),0!==F&&(cl [...] diff --git a/www/board/agenda/routes.rb b/www/board/agenda/routes.rb index 5ab1e82..e77d738 100755 --- a/www/board/agenda/routes.rb +++ b/www/board/agenda/routes.rb @@ -234,30 +234,6 @@ get '/json/historical-comments' do _json HistoricalComments.comments end -# event stream for server sent events (a.k.a EventSource) -get '/events', provides: 'text/event-stream' do - stream :keep_open do |out| - subscription = IPC.subscribe(env.user) - out.callback {IPC.unsubscribe(subscription)} - - loop do - event = IPC.pop(subscription) - if Hash === event or Array === event - out << "data: #{JSON.dump(event)}\n\n" - elsif event == :heartbeat - out << ":\n" - elsif event == :exit - out.close - break - elsif event == nil - subscription = IPC.subscribe(env.user) - else - out << "data: #{event.inspect}\n\n" - end - end - end -end - # draft minutes get '/text/draft/:file' do |file| agenda = "board_agenda_#{file.gsub('-','_')}.txt".untaint diff --git a/www/board/agenda/views/models/events.js.rb b/www/board/agenda/views/models/events.js.rb index 351b8ae..cbad860 100644 --- a/www/board/agenda/views/models/events.js.rb +++ b/www/board/agenda/views/models/events.js.rb @@ -1,7 +1,7 @@ # -# Motivation: browsers limit the number of open HTTP connections to any -# one host to somewhere between 4-10. "Long polling" keeps an HTTP -# connection open making it impractical to have one EventSource per tab. +# Motivation: browsers limit the number of open web socket connections to any +# one host to somewhere between 6 and 250, making it impractical to have one +# Web Socket per tab. # # The solution below uses localStorage to communicate between tabs, with # the majority of logic involved with the "election" of a master. This @@ -9,18 +9,10 @@ # # Alternatives include: # -# * Replacing Server Side Events with Web Sockets which will require more open -# connections (and therefore server load) as well as require special proxy -# configuration (mod_proxy_wstunnel) and server coding (faye-websocket or -# equivalent). -# # * Replacing localStorage with Service Workers. This would be much cleaner, # unfortunately Service Workers aren't widely deployed yet. Sadly, the # state isn't much better for Shared Web Workers. # -# Notable downside to Server Side Events is lack of native support by IE. -# This is readily addressed by a polyfill. -# ### # # Class variables: @@ -33,6 +25,7 @@ class Events @@subscriptions = {} + @@socket = nil def self.subscribe event, &block @@subscriptions[event] ||= [] @@ -109,23 +102,43 @@ class Events # master logic def self.master() - events = EventSource.new('../events') + self.connectToServer() - # dispatch events received to all windows - events.addEventListener :message do |event| - localStorage.setItem("#{@@prefix}-event", event.data) - self.dispatch event.data - end - - # proof of life + # proof of life; maintain connection to the server setTimeout 25_000 do localStorage.setItem("#{@@prefix}-timestamp", Date.new().getTime()) + self.connectToServer() end # close connection on exit window.addEventListener :unload do |event| - events.close() + @@socket.close() if @@socket + end + end + + # establish a connection to the server + def self.connectToServer() + return if @@socket + + socket_url = window.location.protocol.sub('http', 'ws') + "//" + + window.location.hostname + ':34234/' + + @@socket = WebSocket.new(socket_url) + + def @@socket.onopen(event) + @@socket.send "session: #{Server.session}\n\n" + end + + def @@socket.onmessage(event) + localStorage.setItem("#{@@prefix}-event", event.data) + self.dispatch event.data + end + + def @@socket.onclose(event) + @@socket = nil end + rescue => e + console.log e end # dispatch logic (common to all tabs) diff --git a/www/board/agenda/views/sw.js.rb b/www/board/agenda/views/sw.js.rb index d8876c7..7100d65 100644 --- a/www/board/agenda/views/sw.js.rb +++ b/www/board/agenda/views/sw.js.rb @@ -1,38 +1,11 @@ # # A very simple service worker # -# 1) Create an event source and pass all events received to all clients -# -# 2) Return back cached bootstrap page instead of fetching agenda pages +# *) Return back cached bootstrap page instead of fetching agenda pages # from the network. Bootstrap will construct page from cached # agenda.json, as well as update the cache. # -events = nil - -self.addEventListener :activate do |event| - # close any pre-existing event socket - if events - begin - events.close() - rescue => e - events = nil - end - end - - # create a new event source - events = EventSource.new('events') - - # dispatch any events received to clients - events.addEventListener :message do |event| - clients.matchAll().then do |list| - list.each do |client| - client.postMessage(event.data) - end - end - end -end - self.addEventListener :fetch do |event| scope = self.registration.scope url = event.request.url -- To stop receiving notification emails like this one, please contact ['"commits@whimsical.apache.org" <commits@whimsical.apache.org>'].