Author: titmuss
Date: Mon Jan 28 14:14:32 2008
New Revision: 1664

URL: http://svn.slimdevices.com?rev=1664&root=Jive&view=rev
Log:
 [EMAIL PROTECTED] (orig r1661):  titmuss | 2008-01-28 22:13:06 +0000
 Bug: 6485, 6600, 6762, 6771, 6787
 Description:
 Fixes for various Comet connection problems, including:
 - Fixed race condition on Comet connect and disconnect, subscription and 
requests could get queued indefinately.
 - Fixed Comet reconnect, and made connect/disconnect more reliable for 
different connection states.
 - Added list of requests 'on-the-wire', these requests will be retried on a 
connection error.
 - Use the correct sinks for the request and chunked connections, error 
conditions need to be handled differently.
 - Added 'aggressive' reconnection to SC/SN using a random interval.
 - Restart the http state machine after closing a socket.
 - Batch comet requests when SlimBrowser connects to a player.
 - Added traceon(), traceoff() debugging function in debug.lua.
 
 

Added:
    trunk/jive/src/pkg/jive/share/strict.lua
Modified:
    trunk/   (props changed)
    trunk/jive/src/pkg/jive/share/applets/SlimBrowser/SlimBrowserApplet.lua
    trunk/jive/src/pkg/jive/share/jive/net/Comet.lua
    trunk/jive/src/pkg/jive/share/jive/net/SocketHttp.lua
    trunk/jive/src/pkg/jive/share/jive/slim/SlimServer.lua
    trunk/jive/src/pkg/jive/share/jive/utils/debug.lua

Propchange: trunk/
------------------------------------------------------------------------------
--- svk:merge (original)
+++ svk:merge Mon Jan 28 14:14:32 2008
@@ -1,3 +1,3 @@
-bbe22326-0783-4b3a-ac2b-7ab96b24c8d9:/branches/7.0:1660
+bbe22326-0783-4b3a-ac2b-7ab96b24c8d9:/branches/7.0:1661
 bbe22326-0783-4b3a-ac2b-7ab96b24c8d9:/branches/SN:1083
 bbe22326-0783-4b3a-ac2b-7ab96b24c8d9:/branches/scrolling:1378

Modified: 
trunk/jive/src/pkg/jive/share/applets/SlimBrowser/SlimBrowserApplet.lua
URL: 
http://svn.slimdevices.com/trunk/jive/src/pkg/jive/share/applets/SlimBrowser/SlimBrowserApplet.lua?rev=1664&root=Jive&r1=1663&r2=1664&view=diff
==============================================================================
--- trunk/jive/src/pkg/jive/share/applets/SlimBrowser/SlimBrowserApplet.lua 
(original)
+++ trunk/jive/src/pkg/jive/share/applets/SlimBrowser/SlimBrowserApplet.lua Mon 
Jan 28 14:14:32 2008
@@ -1869,9 +1869,11 @@
        _statusStep.actionModifier = "-status"
 
        -- showtime for the player
+       _server.comet:startBatch()
        _server:request(sink, _playerId, { 'menu', 0, 100 })
        _player:onStage()
        _requestStatus()
+       _server.comet:endBatch()
 
        -- add a fullscreen popup that waits for the _menuSink to load
        _menuReceived = false

Modified: trunk/jive/src/pkg/jive/share/jive/net/Comet.lua
URL: 
http://svn.slimdevices.com/trunk/jive/src/pkg/jive/share/jive/net/Comet.lua?rev=1664&root=Jive&r1=1663&r2=1664&view=diff
==============================================================================
--- trunk/jive/src/pkg/jive/share/jive/net/Comet.lua (original)
+++ trunk/jive/src/pkg/jive/share/jive/net/Comet.lua Mon Jan 28 14:14:32 2008
@@ -57,9 +57,10 @@
 
 
 -- stuff we use
-local ipairs, table, pairs, string = ipairs, table, pairs, string
+local assert, ipairs, table, pairs, string = assert, ipairs, table, pairs, 
string
 
 local oo            = require("loop.simple")
+local math          = require("math")
 
 local CometRequest  = require("jive.net.CometRequest")
 local HttpPool      = require("jive.net.HttpPool")
@@ -79,6 +80,30 @@
 -- jive.net.Comet is a base class
 module(..., oo.class)
 
+
+-- forward declarations
+local _addPendingRequests
+local _sendPendingRequests
+local _state
+local _handshake
+local _getHandshakeSink
+local _connect
+local _reconnect
+local _getEventSink
+local _getRequestSink
+local _response
+local _disconnect
+local _handleAdvice
+local _handleTimer
+
+
+-- connection state
+local UNCONNECTED    = "UNCONNECTED"    -- not connected
+local CONNECTING     = "CONNECTING"     -- handshake request sent
+local CONNECTED      = "CONNECTED"      -- handshake completed
+local UNCONNECTING   = "UNCONNECTING"   -- disconnect request sent
+
+
 --[[
 
 =head2 jive.net.Comet(jnt, ip, port, path, name)
@@ -100,7 +125,7 @@
        log:debug("Comet: __init(", name, ", ", ip, ", ", port, ", ", path, ")")
 
        -- init superclass
-       local obj = oo.rawnew( self, SocketHttp(jnt, ip, port, name) )
+       local obj = oo.rawnew( self, {} )
        
        obj.uri = 'http://' .. ip .. ':' .. port .. path
        
@@ -113,17 +138,25 @@
        
        obj.jnt            = jnt
        obj.name           = name
-       obj.active         = "closed" -- whether or not we have an active 
connection: [ closed, handshake, connected, disconnect ]
+       obj.aggressive     = false    -- agressive reconnects
+
+       obj.isactive       = false    -- is the connection active
+       obj.state          = UNCONNECTED -- connection state:
+
        obj.clientId       = nil      -- clientId provided by server
        obj.reqid          = 1        -- used to identify non-subscription 
requests
        obj.advice         = {}       -- advice from server on how to handle 
reconnects
        obj.failures       = 0        -- count of connection failures
-       obj.batch          = false    -- are we batching queries?
+       obj.batch          = 0        -- are we batching queries?
        
        obj.subs           = {}       -- all subscriptions
        obj.pending_unsubs = {}       -- pending unsubscribe requests
        obj.pending_reqs   = {}       -- pending requests to send with connect
+       obj.sent_reqs      = {}       -- sent requests, awaiting a response
        obj.notify         = {}       -- callbacks to notify
+
+       -- Reconnection timer
+       obj.reconnect_timer = Timer(0, function() _handleTimer(obj) end, true)
 
        -- Subscribe to networkConnected events, which happen if we change 
wireless networks
        jnt:subscribe(obj)
@@ -131,85 +164,73 @@
        return obj
 end
 
--- forward declarations
-local _addPendingRequests
-local _connect
-local _getEventSink
-local _handleAdvice
-local _handshake
-local _getHandshakeSink
-local _getRequestSink
-local _reconnect
-local _active
-
-
--- FIXME add timer to make sure handshake and disconnect complete
-
-
-function start(self)
-       if self.active ~= "connected" and self.active ~= "handshake" then
-               -- Begin handshake
-               _handshake(self)
-       end
-end
+
+-- Enable aggressive reconnections
+function aggressiveReconnect(self, aggressive)
+       self.aggressive = aggressive
+end
+
+
+function connect(self)
+       log:debug(self, ": connect state=", self.state)
+
+       if self.state == CONNECTING or self.state == CONNECTED then
+               -- Already connecting/connected
+               return
+       end
+
+       if self.state == UNCONNECTING then
+               -- Force disconnection
+               _state(self, UNCONNECTED)
+       end
+
+       self.isactive = true
+       _handshake(self)
+end
+
 
 function disconnect(self)
-       if self.active == "connected" then
-               log:debug('Comet:disconnect()')
-               
-               _active(self, "disconnect")
-
-               -- Mark all subs as pending so they can be resubscribed later
-               for i, v in ipairs( self.subs ) do
-                       log:debug("Will re-subscribe to ", v.subscription, " on 
next connect")
-                       v.pending = true
-               end
-
-               local data = { {
-                       channel  = '/meta/disconnect',
-                       clientId = self.clientId,
-               } }
-
-               local req = CometRequest(
-                               _getEventSink(self),
-                               self.uri,
-                               data
-                       )
-
-               self.rhttp:fetch(req)
-       else
-               -- disable reconnections
-               self.advice.reconnect = 'none'
-
-               -- stop any existing reconnect timer
-               if self.reconnect_timer then
-                       self.reconnect_timer:stop()
-                       self.reconnect_timer = nil
-               end
-       end
-end
+       log:debug(self, ": disconnect state=", self.state)
+
+       if self.state == UNCONNECTED or self.state == UNCONNECTING then
+               -- Already disconnecting/unconnected
+               return
+       end
+
+       if self.state == CONNECTING then
+               -- Force disconnection
+               _state(self, UNCONNECTED)
+               return
+       end
+
+       self.isactive = false
+       _disconnect(self)
+end
+
 
 function notify_networkConnected(self)
        if self.active == "connected" then
-               log:warn(self, " Got networkConnected event, will try to 
reconnect to ", self.uri)
-               _handleAdvice(self)
+               log:info(self, ": Got networkConnected event, will try to 
reconnect")
+               return _handleAdvice(self)
        else
-               log:warn(self, " Got networkConnected event, but not currently 
connected")
-       end
-end
-
+               log:debug(self, ": Got networkConnected event, but not 
currently connected")
+       end
+end
+
+
+-- Add any pending requests to the request data
 _addPendingRequests = function(self, data)
        -- Add any pending subscription requests
        for i, v in ipairs( self.subs ) do
                if v.pending then
-                       -- really hating the lack of true arrays!
-                       local cmd = {}
-                       cmd[1] = v.playerid or ''
-                       cmd[2] = v.request
+                       local cmd = {
+                               v.playerid or '',
+                               v.request
+                       }
                        
                        -- Prepend clientId to subscription name
                        local subscription = '/' .. self.clientId .. 
v.subscription
-       
+
                        local sub = {
                                channel = '/slim/subscribe',
                                id      = v.reqid,
@@ -226,36 +247,38 @@
                        end
                        self.notify[v.subscription][v.func] = v.func
                        
-                       -- remove pending status from this sub
+                       -- Remove pending status from this sub
                        v.pending = nil
        
                        table.insert( data, sub )
+                       table.insert( self.sent_reqs, sub )
                end
        end
 
        -- Add pending unsubscribe requests
        for i, v in ipairs( self.pending_unsubs ) do
-               local subscription = '/' .. self.clientId .. v
-               
                local unsub = {
                        channel = '/slim/unsubscribe',
+                       id      = v.reqid,
                        data    = {
-                               unsubscribe = subscription,
+                               unsubscribe = '/' .. self.clientId .. 
v.subscription,
                        },
                }
 
-               table.insert( data, unsub )
-       end
-
-       -- Clear pending unsubs
+               table.insert( data, sub )
+               table.insert( self.sent_reqs, sub )
+       end
+
+       -- Clear out pending requests
        self.pending_unsubs = {}
 
-       -- Add pending non-subscription requests
+
+       -- Add pending requests
        for i, v in ipairs( self.pending_reqs ) do
-               -- really hating the lack of true arrays!
-               local cmd = {}
-               cmd[1] = v.playerid or ''
-               cmd[2] = v.request
+               local cmd = {
+                       v.playerid or '',
+                       v.request
+               }
        
                local req = {
                        channel = '/slim/request',
@@ -266,29 +289,326 @@
                        },
                }
 
-               -- only ask for a response if we have a callback function
+               -- Only ask for a response if we have a callback function
                if v.func then
-                       req["id"] = v.reqid
-                       
+                       req.id = v.reqid
+                               
                        -- Store this request's callback
                        local subscription = '/slim/request|' .. v.reqid
                        if not self.notify[subscription] then
                                self.notify[subscription] = {}
                        end
                        self.notify[subscription][v.func] = v.func
-               end
-       
+
+                       table.insert( self.sent_reqs, req )
+               end
+
                table.insert( data, req )
        end
 
        -- Clear out pending requests
        self.pending_reqs = {}
-
-       return
-end
+end
+
+
+-- Send any pending subscriptions and requests
+_sendPendingRequests = function(self)
+
+       -- add all pending unsub requests, and any others we need to send
+       local data = {}
+       _addPendingRequests(self, data)
+       
+       -- Only continue if we have some data to send
+       if data[1] then
+               if log:isDebug() then
+                       log:debug("Sending pending request(s):")
+                       debug.dump(data, 5)
+               end
+
+               local req = CometRequest(
+                       _getRequestSink(self),
+                        self.uri,
+                        data
+               )
+
+               self.rhttp:fetch(req)
+       end
+end
+
+
+function subscribe(self, subscription, func, playerid, request, priority)
+       if log:isDebug() then
+               log:debug(self, ": subscribe(", subscription, " ", func, ", ", 
playerid, ", ", table.concat(request, ","), ", priority:", priority, ")")
+       end
+       
+       -- Remember subs to send during connect now, or if we get
+       -- disconnected
+       table.insert( self.subs, {
+               reqid = self.reqid,
+               subscription = subscription,
+               playerid     = playerid,
+               request      = request,
+               func         = func,
+               priority     = priority,
+               pending      = true, -- pending means we haven't sent this sub 
request yet
+       } )
+
+       -- Bump reqid for the next request
+       self.reqid = self.reqid + 1
+
+       -- Send immediately unless we're batching queries
+       if self.active ~= CONNECTED or self.batch ~= 0 then
+               return
+       end
+
+       -- Send all pending requests and subscriptions
+       _sendPendingRequests(self)
+end
+
+
+function unsubscribe(self, subscription, func)
+       log:debug(self, ": unsubscribe(", subscription, ", ", func, ")")
+       
+       -- Remove from notify list
+       if func then
+               -- Remove only the given callback
+               self.notify[subscription][func] = nil
+       else
+               -- Remove all callbacks
+               self.notify[subscription] = nil
+       end
+       
+       -- If we unsubscribed the last one for this subscription, clear it out
+       if self.notify[subscription] then
+               return
+       end
+
+       log:debug("No more callbacks for ", subscription, " unsubscribing at 
server")
+               
+       -- Remove from subs list
+       for i, v in ipairs( self.subs ) do
+               if v.subscription == subscription then
+                       table.remove( self.subs, i )
+                       break
+               end
+       end
+
+       -- Add to pending unsubs
+       table.insert(self.pending_unsubs, {
+               reqid = self.reqid,
+               subscription = subscription,
+       } )
+
+       -- Bump reqid for the next request
+       self.reqid = self.reqid + 1
+
+       -- Send immediately unless we're batching queries
+       if self.state ~= CONNECTED or self.batch ~= 0 then
+               return
+       end
+
+       -- Send all pending requests
+       _sendPendingRequests(self)
+end
+
+
+function request(self, func, playerid, request, priority)
+       local id = self.reqid
+
+       if log:isDebug() then
+               log:debug(self, ": request(", func, ", reqid:", id, ", ", 
playerid, ", ", table.concat(request, ","), ", priority:", priority, ")")
+       end
+
+       -- Add to pending requests
+       table.insert(self.pending_reqs, {
+               reqid = id,
+               func = func,
+               playerid = playerid,
+               request = request,
+               priority = priority,
+       })
+
+       -- Bump reqid for the next request
+       self.reqid = id + 1
+
+       -- Send immediately unless we're batching queries
+       if self.state ~= CONNECTED or self.batch ~= 0 then
+               if self.state ~= CONNECTED then
+                       self.jnt:notify('cometDisconnected', self, 
#self.pending_reqs)
+               end
+
+               return id
+       end
+
+       -- Send all pending requests
+       _sendPendingRequests(self)
+
+       return id
+end
+
+
+function addCallback(self, subscription, func)
+       log:debug(self, ": addCallback(", subscription, ", ", func, ")")
+
+       if not self.notify[subscription] then
+               self.notify[subscription] = {}
+       end
+       
+       self.notify[subscription][func] = func
+end
+
+
+function removeCallback(self, subscription, func)
+       log:debug(self, ": removeCallback(", subscription, ", ", func, ")")
+       
+       self.notify[subscription][func] = nil
+end
+
+
+-- Begin a set of batched queries
+function startBatch(self)
+       log:debug(self, ": startBatch", self.batch)
+
+       self.batch = self.batch + 1
+end
+
+
+-- End batch mode, send all batched queries together
+function endBatch(self)
+       log:debug(self, ": endBatch ", self.batch)
+       
+       self.batch = self.batch - 1
+       if self.batch ~= 0 then
+               return
+       end
+
+       -- Send all pending requests and subscriptions
+       _sendPendingRequests(self)
+end
+
+
+-- Notify changes in connection state
+_state = function(self, state)
+        if self.state == state then
+               return
+       end
+
+       log:debug(self, ": state is ", state)
+       self.state = state
+
+       -- Stop reconnect timer
+       self.reconnect_timer:stop()
+
+       if state == CONNECTED then
+               -- Reset error count
+               self.failures = 0
+
+               self.jnt:notify('cometConnected', self)
+
+       elseif state == UNCONNECTED then
+               -- Force connections closed
+               self.chttp:close()
+               self.rhttp:close()
+
+               self.jnt:notify('cometDisconnected', self, #self.pending_reqs)
+       end
+end
+
+
+_handshake = function(self)
+       log:debug(self, ': _handshake(), calling: ', self.uri)
+
+       assert(self.state == UNCONNECTED)
+
+       if not self.isactive then
+               log:info(self, ': _handshake() connection not active')
+               return
+       end
+
+       -- Go through all existing subscriptions and reset the pending flag
+       -- so they are re-subscribed to during _connect()
+       for i, v in ipairs( self.subs ) do
+               log:debug("Will re-subscribe to ", v.subscription)
+               v.pending = true
+       end
+
+       -- Reset clientId
+       self.clientId  = nil
+
+
+       local uuid, mac = self.jnt:getUUID()
+       
+       local data = { {
+               channel                  = '/meta/handshake',
+               version                  = '1.0',
+               supportedConnectionTypes = { 'streaming' },
+               ext                      = {
+                       rev = JIVE_VERSION,
+                       mac = mac,
+                       uuid = uuid
+               },
+       } }
+
+       -- XXX: according to the spec this should be sent as 
application/x-www-form-urlencoded
+       -- with message=<url-encoded json> but it works as straight JSON
+
+       _state(self, CONNECTING)
+       
+       local req = CometRequest(
+                       _getHandshakeSink(self),
+                       self.uri,
+                       data
+               )
+
+       self.chttp:fetch(req)
+end
+
+
+_getHandshakeSink = function(self)
+       return function(chunk, err)
+               if self.state ~= CONNECTING then
+                       return
+               end
+
+               -- On error, print something...
+               if err then
+                       log:info(self, ": _handshake error: ", err)
+
+                       -- Try to reconnect according to advice
+                       return _handleAdvice(self)
+               end
+
+               -- If we have data
+               if not chunk then
+                       return
+               end
+
+               local data = chunk[1]
+
+               -- Update advice if any
+               if data.advice then
+                       self.advice = data.advice
+                       log:debug(self, ": _handshake, advice updated from 
server")
+               end
+
+               if data.successful then
+                       self.clientId  = data.clientId
+                       self.advice    = data.advice
+
+                       log:debug(self, ": _handshake OK, clientId: ", 
self.clientId)
+
+                       -- Continue with connect phase, note we are still not 
CONNECTED
+                       _connect(self)
+               else
+                       log:warn(self, ": _handshake error: ", data.error)
+                       return _handleAdvice(self)
+               end
+       end
+end
+
 
 _connect = function(self)
-       log:debug('Comet:_connect()')
+       log:debug(self, ': _connect()')
        
        -- Connect and subscribe to all events for this clientId
        local data = { {
@@ -302,9 +622,19 @@
                subscription = '/' .. self.clientId .. '/**',
        } }
 
+       -- Add any un-acknowledged requests to the outgoing data
+       for i, v in ipairs(self.sent_reqs) do
+               table.insert(data, v)
+       end
+
        -- Add any other pending requests to the outgoing data
-       _addPendingRequests( self, data )
-       
+       _addPendingRequests(self, data)
+
+       if log:isDebug() then
+               log:debug("Sending pending request(s):")
+               debug.dump(data, 5)
+       end
+
        -- This will be our last request on this connection, it is now only
        -- for listening for responses
 
@@ -317,473 +647,21 @@
        self.chttp:fetch(req)
 end
 
-_getEventSink = function(self)
-       return function(chunk, err)
-               -- on error, print something...
-               if err then
-                       log:warn(self, "_getEventSink: error: ", err)
-                       
-                       -- try to reconnect according to advice
-                       _handleAdvice(self)
-               end
-               
-               -- if we have data
-               if chunk then
-                       -- Process each response event
-                       for i, event in ipairs(chunk) do
-                       
-                               -- update advice if any
-                               if event.advice then
-                                       self.advice = event.advice
-                                       log:debug(self, "_getEventSink, advice 
updated from server")
-                               end
-                               
-                               if event.channel == '/meta/connect' then
-                                       if event.successful then
-                                               log:debug(self, "_getEventSink, 
connect message acknowledged")
-                                               _active(self, "connected")
-                                       else
-                                               log:warn(self, "_getEventSink, 
connect failed: ", event.error)
-                                               _handleAdvice(self)
-                                               break
-                                       end
-                               elseif event.channel == '/meta/disconnect' then
-                                       if event.successful then
-                                               log:debug(self, "_getEventSink, 
disconnect OK")
-                                               _active(self, "closed")
-                                       else
-                                               log:warn(self, "_getEventSink, 
disconnect failed: ", event.error)
-                                       end
-                               elseif event.channel == '/meta/reconnect' then
-                                       if event.successful then
-                                               log:debug(self, "_getEventSink, 
reconnect OK")
-                                               _active(self, "connected")
-                                       else
-                                               log:warn(self, "_getEventSink, 
reconnect failed: ", event.error)
-                                               _handleAdvice(self)
-                                               break
-                                       end
-                               elseif event.channel == '/meta/subscribe' then
-                                       if event.successful then
-                                               log:debug(self, "_getEventSink, 
/meta/subscribe OK for ", event.subscription)
-                                       else
-                                               log:warn(self, "_getEventSink, 
/meta/subscribe failed: ", event.error)
-                                       end
-                               elseif event.channel == '/meta/unsubscribe' then
-                                       if event.successful then
-                                               log:debug(self, "_getEventSink, 
/meta/unsubscribe OK for ", event.subscription)
-                                       else
-                                               log:warn(self, "_getEventSink, 
/meta/unsubscribe error: ", event.error)
-                                       end
-                               elseif event.channel == '/slim/subscribe' then
-                                       if event.successful then
-                                               log:debug(self, "_getEventSink, 
/slim/subscribe OK for reqid ", event.id)
-                                       else
-                                               log:warn(self, "_getEventSink, 
/slim/subscribe error for reqid ", event.id, ": ", event.error)
-                                       end
-                               elseif event.channel == '/slim/unsubscribe' then
-                                       if event.successful then
-                                               log:debug(self, "_getEventSink, 
/slim/unsubscribe OK for reqid ", event.id)
-                                       else
-                                               log:warn(self, "_getEventSink, 
/slim/unsubscribe error for reqid ", event.id, ": ", event.error)
-                                       end
-                               elseif event.channel == '/slim/request' and 
event.successful then
-                                       log:debug(self, "request id ", 
event.id, " sent OK")
-                               elseif event.channel then
-                                       local subscription    = event.channel
-                                       local onetime_request = false
-                                       
-                                       -- strip clientId from channel
-                                       subscription = 
string.gsub(subscription, "^/[0-9A-Za-z]+", "")
-                                       
-                                       if string.find(subscription, 
'/slim/request') then
-                                               -- an async notification from a 
normal request
-                                               subscription = subscription .. 
'|' .. event.id
-                                               onetime_request = true
-                                       end
-
-                                       if self.notify[subscription] then
-                                               log:debug(self, "_getEventSink, 
notifiying callbacks for ", subscription)
-                                       
-                                               for _, func in pairs( 
self.notify[subscription] ) do
-                                                       log:debug("  callback 
to: ", func)
-                                                       func(event)
-                                               end
-                                               
-                                               if onetime_request then
-                                                       -- this was a one-time 
request, so remove the callback
-                                                       
self.notify[subscription] = nil
-                                               end
-                                       else
-                                               -- this is normal, since 
unsub's are delayed by a few seconds, we may receive events
-                                               -- after we unsubscribed but 
before the server is notified about it
-                                               log:debug(self, "_getEventSink, 
got data for an event we aren't subscribed to, ignoring -> ", subscription)
-                                       end
-                               else
-                                       log:warn(self, "_getEventSink, unknown 
error: ", event.error)
-                                       if event.advice then
-                                               _handleAdvice(self)
-                                               break
-                                       end
-                               end
-                       end
-               end
-       end
-end
-
-_handshake = function(self)
-       log:debug('Comet:_handshake(), calling: ', self.uri)
-       
-       local data = { {
-               channel                  = '/meta/handshake',
-               version                  = '1.0',
-               supportedConnectionTypes = { 'streaming' },
-               ext                      = {
-                       rev = JIVE_VERSION,
-               },
-       } }
-       
-       local uuid, mac = self.jnt:getUUID()
-
-       if mac then
-               data[1].ext.mac = mac
-       end
-       if uuid then
-               data[1].ext.uuid = uuid
-       end
-
-       -- XXX: according to the spec this should be sent as 
application/x-www-form-urlencoded
-       -- with message=<url-encoded json> but it works as straight JSON
-       
-       local req = CometRequest(
-                       _getHandshakeSink(self),
-                       self.uri,
-                       data
-               )
-
-       _active(self, "handshake")
-       
-       self.chttp:fetch(req)
-end
-
-_getHandshakeSink = function(self)
-       return function(chunk, err)
-               -- on error, print something...
-               if err then
-                       log:warn(self, "_handshake with ", self.jsName, " 
error: ", err)
-
-                       -- try to reconnect according to advice
-                       _handleAdvice(self)
-               end
-               -- if we have data
-               if chunk then
-                       local data = chunk[1]
-                       if data.successful then
-                               _active(self, "connected")
-                               self.clientId  = data.clientId
-                               self.advice    = data.advice
-
-                               log:debug(self, "_handshake OK with ", 
self.jsName, ", clientId: ", self.clientId)
-                               
-                               
-                               -- Continue with connect phase
-                               _connect(self)
-                       else
-                               log:warn(self, "_handshake error: ", data.error)
-                               if data.advice then
-                                       self.advice = data.advice
-                                       _handleAdvice(self)
-                               end
-                       end
-               end
-       end
-end
-
-function subscribe(self, subscription, func, playerid, request, priority)
-       local id = self.reqid
-
-       if log:isDebug() then
-               log:debug(self, "subscribe(", subscription, ", reqid:", id, ", 
", func, ", ", playerid, ", ", table.concat(request, ","), ", priority:", 
priority, ")")
-       end
-       
-       -- Remember subs to send during connect now, or if we get
-       -- disconnected
-       table.insert( self.subs, {
-               subscription = subscription,
-               playerid     = playerid,
-               request      = request,
-               reqid        = id,
-               func         = func,
-               priority     = priority,
-               pending      = true, -- pending means we haven't send this sub 
request yet
-       } )
-       
-       self.reqid = self.reqid + 1
-
-       -- Send immediately unless we're batching queries
-       if self.active == "connected" and not self.batch then
-               -- add all pending unsub requests, and any others we need to 
send
-               local data = {}
-               _addPendingRequests(self, data)
-       
-               -- Only continue if we have some data to send
-               if data[1] then
-                       if log:isDebug() then
-                               log:debug("Sending pending subscribe 
request(s):")
-                               debug.dump(data, 5)
-                       end
-
-                       local req = CometRequest(
-                                       _getEventSink(self),
-                                       self.uri,
-                                       data
-                               )
-
-                       self.rhttp:fetch(req)
-               end
-       end
-end
-
-function unsubscribe(self, subscription, func)
-       log:debug(self, "unsubscribe(", subscription, ", ", func, ")")
-       
-       -- Remove from notify list
-       if func then
-               -- Remove only the given callback
-               self.notify[subscription][func] = nil
-       else
-               -- Remove all callbacks
-               self.notify[subscription] = nil
-       end
-       
-       -- If we unsubscribed the last one for this subscription, clear it out
-       if not self.notify[subscription] then
-               log:debug("No more callbacks for ", subscription, " 
unsubscribing at server")
-               
-               -- Remove from subs list
-               for i, v in ipairs( self.subs ) do
-                       if v.subscription == subscription then
-                               table.remove( self.subs, i )
-                               break
-                       end
-               end
-               
-               table.insert( self.pending_unsubs, subscription )
-               
-               if self.active == "connected" and not self.batch then
-                       -- add all pending unsub requests, and any others we 
need to send
-                       local data = {}
-                       _addPendingRequests(self, data)
-                       
-                       -- Only continue if we have stuff to send
-                       if data[1] then
-                               if log:isDebug() then
-                                       log:debug("Sending pending unsubscribe 
request(s):")
-                                       debug.dump(data, 5)
-                               end
-
-                               local req = CometRequest(
-                                               _getEventSink(self),
-                                               self.uri,
-                                               data
-                                       )
-
-                               -- unsubscribe doesn't need to be high priority 
                                        
-                               self.rhttp:fetch(req)
-                       end
-               end
-       end
-end
-
-function request(self, func, playerid, request, priority)
-       local id = self.reqid
-       
-       if log:isDebug() then
-               log:debug(self, "request(", func, ", reqid:", id, ", ", 
playerid, ", ", table.concat(request, ","), ", priority:", priority, ")")
-       end
-       
-       if self.active ~= "connected" or self.batch then
-               -- Add subscription to pending requests, to be sent during 
connect/reconnect
-               table.insert( self.pending_reqs, {
-                        reqid    = id,
-                        func     = func,
-                        playerid = playerid,
-                        request  = request,
-                        priority = priority,
-               } )
-               self.jnt:notify('cometDisconnected', self, #self.pending_reqs)
-       else    
-               local cmd = {}
-               cmd[1] = playerid or ''
-               cmd[2] = request
-               
-               local data = { {
-                       channel = '/slim/request',
-                       data    = {
-                               request  = cmd,
-                               response = '/' .. self.clientId .. 
'/slim/request',
-                               priority = priority,
-                       },
-               } }
-
-               -- only pass id if we have a callback function, this tells
-               -- SlimServer we don't want a response
-               if func then
-                       data[1]["id"] = id
-               end
-               
-               local sink = nil
-               if func then
-                       sink = _getRequestSink(self, func, id)
-               end
-       
-               local req = CometRequest(
-                               sink,
-                               self.uri,
-                               data
-                       )
-       
-               self.rhttp:fetch(req)
-       
-               -- If we expect a response, we will get the response on the 
persistent 
-               -- connection.  Store our callback for later
-               if func then
-                       local subscription = '/slim/request|' .. id
-                       if not self.notify[subscription] then
-                               self.notify[subscription] = {}
-                       end
-                       self.notify[subscription][func] = func
-               else
-                       log:debug('  No sink defined for this request, no 
response will be received')
-               end
-       end
-       
-       -- Bump reqid for the next request
-       self.reqid = self.reqid + 1
-       
-       -- Return the request id to the caller
-       return id
-end
-
-_getRequestSink = function(self, func, reqid)
-       return function(chunk, err)
-               -- on error, print something...
-               if err then
-                       log:warn(self, "request error: ", err)
-               end
-               -- if we have data
-               if chunk then
-                       for i, event in ipairs(chunk) do
-                               if event.advice then
-                                       self.advice = event.advice
-                               end
-                               
-                               if event.error then
-                                       log:warn(self, "request error: ", 
event.error)
-                               elseif event.channel == '/slim/request' and 
event.successful then
-                                       log:debug(self, "request id ", reqid, " 
sent OK")
-
-                               else
-                                       log:warn(self, "request unknown 
response")
-                               end
-                       end
-               end
-       end
-end
-
-function addCallback(self, subscription, func)
-       log:debug(self, "addCallback(", subscription, ", ", func, ")")
-
-       if not self.notify[subscription] then
-               self.notify[subscription] = {}
-       end
-       
-       self.notify[subscription][func] = func
-end
-
-function removeCallback(self, subscription, func)
-       log:debug(self, "removeCallback(", subscription, ", ", func, ")")
-       
-       self.notify[subscription][func] = nil
-end
-
--- Decide what to do if we get disconnected or get an error while 
handshaking/connecting
-_handleAdvice = function(self)
-       -- make sure our connection is closed
-       if self.active ~= "closed" then
-               _active(self, "closed")
-       end
-
-       -- stop any existing reconnect timer
-       if self.reconnect_timer then
-               self.reconnect_timer:stop()
-               self.reconnect_timer = nil
-       end
-
-       self.failures = self.failures + 1
-       
-       local advice = self.advice
-       
-       -- Keep retrying after multiple failures but backoff gracefully
-       local retry_interval = ( advice.interval or RETRY_DEFAULT ) * 
self.failures
-
-       if retry_interval > MAX_BACKOFF then
-               retry_interval = MAX_BACKOFF
-       end
-       
-       if advice.reconnect == 'none' then
-               self.clientId  = nil
-               log:warn(self, "_connect, server told us not to reconnect")
-
-       elseif advice.reconnect == 'handshake' then
-               log:warn(
-                       self, " advice is ", advice.reconnect, ", 
re-handshaking in ",
-                       retry_interval / 1000, " seconds"
-               )
-       
-               self.clientId  = nil
-
-               self.reconnect_timer = Timer(
-                       retry_interval,
-                       function()
-                               -- Go through all existing subscriptions and 
reset the pending flag
-                               -- so they are re-subscribed to during 
_connect()
-                               for i, v in ipairs( self.subs ) do
-                                       log:debug("Will re-subscribe to ", 
v.subscription)
-                                       v.pending = true
-                               end
-                               
-                               _handshake(self)
-                       end,
-                       true -- run timer only once
-               )
-               self.reconnect_timer:start()
-
-       else -- if advice.reconnect == 'retry' then
-               log:warn(
-                       self, " advice is ", advice.reconnect, ", reconnecting 
in ",
-                       retry_interval / 1000, " seconds"
-               )
-       
-               self.reconnect_timer = Timer(
-                       retry_interval,
-                       function()
-                               _reconnect(self)
-                       end,
-                       true -- run timer only once
-               )
-               self.reconnect_timer:start()
-       end
-end
 
 -- Reconnect to the server, try to maintain our previous clientId
 _reconnect = function(self)
-       log:debug('Comet:_reconnect()')
+       log:debug(self, ': _reconnect(), calling: ', self.uri)
+
+       assert(self.state == UNCONNECTED)
+
+       if not self.isactive then
+               log:info(self, ': _reconnect() connection not active')
+               return
+       end
        
        if not self.clientId then
-               log:debug(self, "_reconnect error: cannot reconnect without 
clientId, handshaking instead")
-               _handshake(self)
-               do return end
+               log:debug(self, ": _reconnect error: cannot reconnect without 
clientId, handshaking instead")
+               return _handshake(self)
        end
        
        local data = { {
@@ -798,70 +676,256 @@
                        data
                )
        
-       self.chttp:fetch(req)   
-end
-
--- Notify changes in connection state
-_active = function(self, active)
-
-        if self.active == active then
-               return
-       end
-
-       self.active = active
-
-       if active == "connected" then
-               -- Reset error count
-               self.failures = 0
-
-               self.jnt:notify('cometConnected', self)
-       elseif active == "closed" then
-               -- force connections closed
-               self.chttp:close()
-               self.rhttp:close()
-
-               self.jnt:notify('cometDisconnected', self, #self.pending_reqs)
-       end
-end
-
--- Begin a set of batched queries
-function startBatch(self)
-       log:debug(self, "startBatch()")
-
-       self.batch = true
-end
-
--- End batch mode, send all batched queries together
-function endBatch(self)
-       log:debug(self, "endBatch()")
-       
-       self.batch = false
-       
-       -- add all pending requests
-       local data = {}
-       _addPendingRequests(self, data)
-
-       -- Only continue if we have some data to send
-       if data[1] then
-               if log:isDebug() then
-                       log:debug("Sending pending queries:")
-                       debug.dump(data, 5)
-               end
-
-               local req = CometRequest(
-                               _getEventSink(self),
-                               self.uri,
-                               data
-                       )
-
-               self.rhttp:fetch(req)
+       self.chttp:fetch(req)
+
+       _state(self, CONNECTING)
+end
+
+
+-- sink for chunked connection, handle advice on error
+_getEventSink = function(self)
+       return function(chunk, err)
+               -- On error, print something...
+               if err then
+                       log:info(self, ": _getEventSink error: ", err)
+                       
+                       -- Try to reconnect according to advice
+                       return _handleAdvice(self)
+               end
+
+               _response(self, chunk)
+       end
+end
+
+
+-- sink for request connection, resend requests on error
+_getRequestSink = function(self)
+       return function(chunk, err)
+               -- On error, print something...
+               if err then
+                       log:info(self, ": _getRequestSink error: ", err)
+
+                       -- Resend any un-acknowledged requests
+                       local data = {}
+                       for i, v in ipairs(self.sent_reqs) do
+                               table.insert(data, v)
+                       end
+
+                       -- Only continue if we have some data to send
+                       if data[1] then
+                               log:info(self, ": Resending requests: ", 
#self.sent_reqs)
+
+                               local req = CometRequest(
+                                                        _getRequestSink(self),
+                                                        self.uri,
+                                                        data
+                                                )
+                               self.rhttp:fetch(req)
+                       end
+               end
+
+               _response(self, chunk)
+       end
+end
+
+
+-- handle responses for both request and chunked connections
+_response = function(self, chunk)
+       -- If we have data
+       if not chunk then
+               return
+       end
+
+       -- Process each response event
+       for i, event in ipairs(chunk) do
+
+               -- Remove request from sent queue
+               for i, v in ipairs( self.sent_reqs ) do
+                       if v.id == event.id then
+                               table.remove(self.sent_reqs, i)
+                               break
+                       end
+               end
+
+               -- Log response
+               if event.error then
+                       log:warn(self, ": _response, ", event.channel, " 
failed: ", event.error)
+               else
+                       log:debug(self, ": _response, ", event.channel, " OK")
+               end
+
+               -- Update advice if any
+               if event.advice then
+                       self.advice = event.advice
+                       log:debug(self, ": _response, advice updated from 
server")
+               end
+
+               -- Handle response
+               if event.channel == '/meta/connect' then
+                       if event.successful then
+                               _state(self, CONNECTED)
+
+                               -- send any requests queued during connect
+                               _sendPendingRequests(self)
+                       else
+                               return _handleAdvice(self)
+                       end
+               elseif event.channel == '/meta/disconnect' then
+                       if event.successful then
+                               self.clientId = nil
+                               _state(self, UNCONNECTED)
+                       else
+                               return _handleAdvice(self)
+                       end
+               elseif event.channel == '/meta/reconnect' then
+                       if event.successful then
+                               _state(self, CONNECTED)
+                       else
+                               return _handleAdvice(self)
+                       end
+               elseif event.channel == '/meta/subscribe' then
+                       -- no action
+               elseif event.channel == '/meta/unsubscribe' then
+                       -- no action
+               elseif event.channel == '/slim/subscribe' then
+                       -- no action
+               elseif event.channel == '/slim/unsubscribe' then
+                       -- no action
+               elseif event.channel == '/slim/request' and event.successful 
then
+                       -- no action
+               elseif event.channel then
+                       local subscription    = event.channel
+                       local onetime_request = false
+                                       
+                       -- strip clientId from channel
+                       subscription = string.gsub(subscription, 
"^/[0-9A-Za-z]+", "")
+                               
+                       if string.find(subscription, '/slim/request') then
+                               -- an async notification from a normal request
+                               subscription = subscription .. '|' .. event.id
+                               onetime_request = true
+                       end
+
+                       if self.notify[subscription] then
+                               log:debug(self, ": _response, notifiying 
callbacks for ", subscription)
+                               
+                               for _, func in pairs( self.notify[subscription] 
) do
+                                       log:debug("  callback to: ", func)
+                                       func(event)
+                               end
+                                               
+                               if onetime_request then
+                                       -- this was a one-time request, so 
remove the callback
+                                       self.notify[subscription] = nil
+                               end
+                       else
+                               -- this is normal, since unsub's are delayed by 
a few seconds, we may receive events
+                               -- after we unsubscribed but before the server 
is notified about it
+                               log:debug(self, ": _response, got data for an 
event we aren't subscribed to, ignoring -> ", subscription)
+                       end
+               else
+                       log:warn(self, ": _response, unknown error: ", 
event.error)
+                       return _handleAdvice(self)
+               end
+       end
+end
+
+
+_disconnect = function(self)
+       assert(self.state == CONNECTED)
+
+       log:debug(self, ': disconnect()')
+               
+       -- Mark all subs as pending so they can be resubscribed later
+       for i, v in ipairs( self.subs ) do
+               log:debug("Will re-subscribe to ", v.subscription, " on next 
connect")
+               v.pending = true
+       end
+
+       local data = { {
+               channel  = '/meta/disconnect',
+               clientId = self.clientId,
+       } }
+
+       local req = CometRequest(
+               _getRequestSink(self),
+               self.uri,
+               data
+       )
+
+       self.rhttp:fetch(req)
+
+       _state(self, UNCONNECTING)
+end
+
+
+-- Decide what to do if we get disconnected or get an error while 
handshaking/connecting
+_handleAdvice = function(self)
+       log:warn(self, ": handleAdvice state=", self.state)
+
+       if self.state == UNCONNECTED then
+               -- do nothing 
+               return
+       end
+
+       -- force connection closed
+       _state(self, UNCONNECTED)
+
+       self.failures = self.failures + 1
+       local reconnect = self.advice.reconnect or "retry"
+       local retry_interval = self.advice.interval or RETRY_DEFAULT
+
+       if retry_interval == 0 then
+               -- Retry immediately
+       elseif self.aggressive then
+               -- Retry using a random interval between 1 - advice.interval 
seconds
+               retry_interval = math.random(1000, retry_interval)
+       else
+               -- Keep retrying after multiple failures but backoff gracefully
+               retry_interval = retry_interval * self.failures
+
+               if retry_interval > MAX_BACKOFF then
+                       retry_interval = MAX_BACKOFF
+               end
+       end
+       
+       if reconnect == 'none' then
+               self.clientId  = nil
+               log:info(self, ": advice is ", reconnect, " server told us not 
to reconnect")
+
+       else
+               log:info(self, ": advice is ", reconnect, ", connect in ",
+                        retry_interval / 1000, " seconds")
+       
+               self.reconnect_timer:restart(retry_interval)    
+       end
+end
+
+
+_handleTimer = function(self)
+       log:debug(self, ": handleTimer state=", self.state, " advice=", 
self.advice)
+
+       if self.state ~= UNCONNECTED then
+               log:debug(self, ": ignoring timer while ", self.state)
+               return
+       end
+
+       local reconnect = self.advice.reconnect or "retry"
+
+       if reconnect == 'handshake' then
+               _handshake(self)
+
+       elseif reconnect == 'retry' then
+               _reconnect(self)
+
        end
 end
 
 
 function __tostring(self)
-       return "Comet {" .. self.name .. "}: "
-end
+       return "Comet {" .. self.name .. "}"
+end
+
 
 --[[
 

Modified: trunk/jive/src/pkg/jive/share/jive/net/SocketHttp.lua
URL: 
http://svn.slimdevices.com/trunk/jive/src/pkg/jive/share/jive/net/SocketHttp.lua?rev=1664&root=Jive&r1=1663&r2=1664&view=diff
==============================================================================
--- trunk/jive/src/pkg/jive/share/jive/net/SocketHttp.lua (original)
+++ trunk/jive/src/pkg/jive/share/jive/net/SocketHttp.lua Mon Jan 28 14:14:32 
2008
@@ -808,6 +808,9 @@
        -- FIXME: manage the queues
        
        SocketTcp.close(self)
+
+       -- restart the state machine if it is idle
+       self:t_sendDequeueIfIdle()
 end
 
 

Modified: trunk/jive/src/pkg/jive/share/jive/slim/SlimServer.lua
URL: 
http://svn.slimdevices.com/trunk/jive/src/pkg/jive/share/jive/slim/SlimServer.lua?rev=1664&root=Jive&r1=1663&r2=1664&view=diff
==============================================================================
--- trunk/jive/src/pkg/jive/share/jive/slim/SlimServer.lua (original)
+++ trunk/jive/src/pkg/jive/share/jive/slim/SlimServer.lua Mon Jan 28 14:14:32 
2008
@@ -220,13 +220,14 @@
        })
 
        obj.id = obj:idFor(ip, port, name)
-       
+
        -- subscribe to comet events
        jnt:subscribe(obj)
 
        -- subscribe to server status, timeout at 60 seconds.
        -- get 50 players
        -- FIXME: what if the server has more than 50 players?
+       obj.comet:aggressiveReconnect(true)
        obj.comet:subscribe('/slim/serverstatus',
                            _getSink(obj, '_serverstatusSink'),
                            nil,
@@ -293,7 +294,7 @@
        log:info(self, ":connect()")
 
        -- artwork pool connects on demand
-       self.comet:start()
+       self.comet:connect()
 end
 
 

Modified: trunk/jive/src/pkg/jive/share/jive/utils/debug.lua
URL: 
http://svn.slimdevices.com/trunk/jive/src/pkg/jive/share/jive/utils/debug.lua?rev=1664&root=Jive&r1=1663&r2=1664&view=diff
==============================================================================
--- trunk/jive/src/pkg/jive/share/jive/utils/debug.lua (original)
+++ trunk/jive/src/pkg/jive/share/jive/utils/debug.lua Mon Jan 28 14:14:32 2008
@@ -62,16 +62,29 @@
 
 --[[
 
-=head2 trace()
+=head2 traceon()
 
-Traces Lua calls, line by line. There is no way to stop tracing. This is very 
verbose,
+Traces Lua calls, line by line. Use traceoff() to turn off tracing. This is 
very verbose,
 but can help trace performance or strange behavioral issues. It also gives a 
glimpse on
 the inner working of the Lua engine.
 
 =cut
 --]]
-function trace ()
+function traceon ()
        ldebug.sethook(_trace_line, "l")
+end
+
+
+--[[
+
+=head2 traceff()
+
+Turns off tracing Lua calls, line by line.
+
+=cut
+--]]
+function traceoff ()
+       ldebug.sethook(nil, "l")
 end
 
 

Added: trunk/jive/src/pkg/jive/share/strict.lua
URL: 
http://svn.slimdevices.com/trunk/jive/src/pkg/jive/share/strict.lua?rev=1664&root=Jive&view=auto
==============================================================================
--- trunk/jive/src/pkg/jive/share/strict.lua (added)
+++ trunk/jive/src/pkg/jive/share/strict.lua Mon Jan 28 14:14:32 2008
@@ -1,0 +1,34 @@
+--
+-- strict.lua
+-- checks uses of undeclared global variables
+-- All global variables must be 'declared' through a regular assignment
+-- (even assigning nil will do) in a main chunk before being used
+-- anywhere or assigned to inside a function.
+--
+
+local mt = getmetatable(_G)
+if mt == nil then
+  mt = {}
+  setmetatable(_G, mt)
+end
+
+mt.__declared = {}
+
+mt.__newindex = function (t, n, v)
+  if not mt.__declared[n] then
+    local w = debug.getinfo(2, "S").what
+    if w ~= "main" and w ~= "C" then
+      error("assign to undeclared variable '"..n.."'", 2)
+    end
+    mt.__declared[n] = true
+  end
+  rawset(t, n, v)
+end
+  
+mt.__index = function (t, n)
+  if not mt.__declared[n] and debug.getinfo(2, "S").what ~= "C" then
+    error("variable '"..n.."' is not declared", 2)
+  end
+  return rawget(t, n)
+end
+

_______________________________________________
Jive-checkins mailing list
[email protected]
http://lists.slimdevices.com/cgi-bin/mailman/listinfo/jive-checkins

Reply via email to