Author: titmuss
Date: Mon Jan 28 14:14:32 2008
New Revision: 1664
URL: http://svn.slimdevices.com?rev=1664&root=Jive&view=rev
Log:
[EMAIL PROTECTED] (orig r1661): titmuss | 2008-01-28 22:13:06 +0000
Bug: 6485, 6600, 6762, 6771, 6787
Description:
Fixes for various Comet connection problems, including:
- Fixed race condition on Comet connect and disconnect, subscription and
requests could get queued indefinately.
- Fixed Comet reconnect, and made connect/disconnect more reliable for
different connection states.
- Added list of requests 'on-the-wire', these requests will be retried on a
connection error.
- Use the correct sinks for the request and chunked connections, error
conditions need to be handled differently.
- Added 'aggressive' reconnection to SC/SN using a random interval.
- Restart the http state machine after closing a socket.
- Batch comet requests when SlimBrowser connects to a player.
- Added traceon(), traceoff() debugging function in debug.lua.
Added:
trunk/jive/src/pkg/jive/share/strict.lua
Modified:
trunk/ (props changed)
trunk/jive/src/pkg/jive/share/applets/SlimBrowser/SlimBrowserApplet.lua
trunk/jive/src/pkg/jive/share/jive/net/Comet.lua
trunk/jive/src/pkg/jive/share/jive/net/SocketHttp.lua
trunk/jive/src/pkg/jive/share/jive/slim/SlimServer.lua
trunk/jive/src/pkg/jive/share/jive/utils/debug.lua
Propchange: trunk/
------------------------------------------------------------------------------
--- svk:merge (original)
+++ svk:merge Mon Jan 28 14:14:32 2008
@@ -1,3 +1,3 @@
-bbe22326-0783-4b3a-ac2b-7ab96b24c8d9:/branches/7.0:1660
+bbe22326-0783-4b3a-ac2b-7ab96b24c8d9:/branches/7.0:1661
bbe22326-0783-4b3a-ac2b-7ab96b24c8d9:/branches/SN:1083
bbe22326-0783-4b3a-ac2b-7ab96b24c8d9:/branches/scrolling:1378
Modified:
trunk/jive/src/pkg/jive/share/applets/SlimBrowser/SlimBrowserApplet.lua
URL:
http://svn.slimdevices.com/trunk/jive/src/pkg/jive/share/applets/SlimBrowser/SlimBrowserApplet.lua?rev=1664&root=Jive&r1=1663&r2=1664&view=diff
==============================================================================
--- trunk/jive/src/pkg/jive/share/applets/SlimBrowser/SlimBrowserApplet.lua
(original)
+++ trunk/jive/src/pkg/jive/share/applets/SlimBrowser/SlimBrowserApplet.lua Mon
Jan 28 14:14:32 2008
@@ -1869,9 +1869,11 @@
_statusStep.actionModifier = "-status"
-- showtime for the player
+ _server.comet:startBatch()
_server:request(sink, _playerId, { 'menu', 0, 100 })
_player:onStage()
_requestStatus()
+ _server.comet:endBatch()
-- add a fullscreen popup that waits for the _menuSink to load
_menuReceived = false
Modified: trunk/jive/src/pkg/jive/share/jive/net/Comet.lua
URL:
http://svn.slimdevices.com/trunk/jive/src/pkg/jive/share/jive/net/Comet.lua?rev=1664&root=Jive&r1=1663&r2=1664&view=diff
==============================================================================
--- trunk/jive/src/pkg/jive/share/jive/net/Comet.lua (original)
+++ trunk/jive/src/pkg/jive/share/jive/net/Comet.lua Mon Jan 28 14:14:32 2008
@@ -57,9 +57,10 @@
-- stuff we use
-local ipairs, table, pairs, string = ipairs, table, pairs, string
+local assert, ipairs, table, pairs, string = assert, ipairs, table, pairs,
string
local oo = require("loop.simple")
+local math = require("math")
local CometRequest = require("jive.net.CometRequest")
local HttpPool = require("jive.net.HttpPool")
@@ -79,6 +80,30 @@
-- jive.net.Comet is a base class
module(..., oo.class)
+
+-- forward declarations
+local _addPendingRequests
+local _sendPendingRequests
+local _state
+local _handshake
+local _getHandshakeSink
+local _connect
+local _reconnect
+local _getEventSink
+local _getRequestSink
+local _response
+local _disconnect
+local _handleAdvice
+local _handleTimer
+
+
+-- connection state
+local UNCONNECTED = "UNCONNECTED" -- not connected
+local CONNECTING = "CONNECTING" -- handshake request sent
+local CONNECTED = "CONNECTED" -- handshake completed
+local UNCONNECTING = "UNCONNECTING" -- disconnect request sent
+
+
--[[
=head2 jive.net.Comet(jnt, ip, port, path, name)
@@ -100,7 +125,7 @@
log:debug("Comet: __init(", name, ", ", ip, ", ", port, ", ", path, ")")
-- init superclass
- local obj = oo.rawnew( self, SocketHttp(jnt, ip, port, name) )
+ local obj = oo.rawnew( self, {} )
obj.uri = 'http://' .. ip .. ':' .. port .. path
@@ -113,17 +138,25 @@
obj.jnt = jnt
obj.name = name
- obj.active = "closed" -- whether or not we have an active
connection: [ closed, handshake, connected, disconnect ]
+ obj.aggressive = false -- agressive reconnects
+
+ obj.isactive = false -- is the connection active
+ obj.state = UNCONNECTED -- connection state:
+
obj.clientId = nil -- clientId provided by server
obj.reqid = 1 -- used to identify non-subscription
requests
obj.advice = {} -- advice from server on how to handle
reconnects
obj.failures = 0 -- count of connection failures
- obj.batch = false -- are we batching queries?
+ obj.batch = 0 -- are we batching queries?
obj.subs = {} -- all subscriptions
obj.pending_unsubs = {} -- pending unsubscribe requests
obj.pending_reqs = {} -- pending requests to send with connect
+ obj.sent_reqs = {} -- sent requests, awaiting a response
obj.notify = {} -- callbacks to notify
+
+ -- Reconnection timer
+ obj.reconnect_timer = Timer(0, function() _handleTimer(obj) end, true)
-- Subscribe to networkConnected events, which happen if we change
wireless networks
jnt:subscribe(obj)
@@ -131,85 +164,73 @@
return obj
end
--- forward declarations
-local _addPendingRequests
-local _connect
-local _getEventSink
-local _handleAdvice
-local _handshake
-local _getHandshakeSink
-local _getRequestSink
-local _reconnect
-local _active
-
-
--- FIXME add timer to make sure handshake and disconnect complete
-
-
-function start(self)
- if self.active ~= "connected" and self.active ~= "handshake" then
- -- Begin handshake
- _handshake(self)
- end
-end
+
+-- Enable aggressive reconnections
+function aggressiveReconnect(self, aggressive)
+ self.aggressive = aggressive
+end
+
+
+function connect(self)
+ log:debug(self, ": connect state=", self.state)
+
+ if self.state == CONNECTING or self.state == CONNECTED then
+ -- Already connecting/connected
+ return
+ end
+
+ if self.state == UNCONNECTING then
+ -- Force disconnection
+ _state(self, UNCONNECTED)
+ end
+
+ self.isactive = true
+ _handshake(self)
+end
+
function disconnect(self)
- if self.active == "connected" then
- log:debug('Comet:disconnect()')
-
- _active(self, "disconnect")
-
- -- Mark all subs as pending so they can be resubscribed later
- for i, v in ipairs( self.subs ) do
- log:debug("Will re-subscribe to ", v.subscription, " on
next connect")
- v.pending = true
- end
-
- local data = { {
- channel = '/meta/disconnect',
- clientId = self.clientId,
- } }
-
- local req = CometRequest(
- _getEventSink(self),
- self.uri,
- data
- )
-
- self.rhttp:fetch(req)
- else
- -- disable reconnections
- self.advice.reconnect = 'none'
-
- -- stop any existing reconnect timer
- if self.reconnect_timer then
- self.reconnect_timer:stop()
- self.reconnect_timer = nil
- end
- end
-end
+ log:debug(self, ": disconnect state=", self.state)
+
+ if self.state == UNCONNECTED or self.state == UNCONNECTING then
+ -- Already disconnecting/unconnected
+ return
+ end
+
+ if self.state == CONNECTING then
+ -- Force disconnection
+ _state(self, UNCONNECTED)
+ return
+ end
+
+ self.isactive = false
+ _disconnect(self)
+end
+
function notify_networkConnected(self)
if self.active == "connected" then
- log:warn(self, " Got networkConnected event, will try to
reconnect to ", self.uri)
- _handleAdvice(self)
+ log:info(self, ": Got networkConnected event, will try to
reconnect")
+ return _handleAdvice(self)
else
- log:warn(self, " Got networkConnected event, but not currently
connected")
- end
-end
-
+ log:debug(self, ": Got networkConnected event, but not
currently connected")
+ end
+end
+
+
+-- Add any pending requests to the request data
_addPendingRequests = function(self, data)
-- Add any pending subscription requests
for i, v in ipairs( self.subs ) do
if v.pending then
- -- really hating the lack of true arrays!
- local cmd = {}
- cmd[1] = v.playerid or ''
- cmd[2] = v.request
+ local cmd = {
+ v.playerid or '',
+ v.request
+ }
-- Prepend clientId to subscription name
local subscription = '/' .. self.clientId ..
v.subscription
-
+
local sub = {
channel = '/slim/subscribe',
id = v.reqid,
@@ -226,36 +247,38 @@
end
self.notify[v.subscription][v.func] = v.func
- -- remove pending status from this sub
+ -- Remove pending status from this sub
v.pending = nil
table.insert( data, sub )
+ table.insert( self.sent_reqs, sub )
end
end
-- Add pending unsubscribe requests
for i, v in ipairs( self.pending_unsubs ) do
- local subscription = '/' .. self.clientId .. v
-
local unsub = {
channel = '/slim/unsubscribe',
+ id = v.reqid,
data = {
- unsubscribe = subscription,
+ unsubscribe = '/' .. self.clientId ..
v.subscription,
},
}
- table.insert( data, unsub )
- end
-
- -- Clear pending unsubs
+ table.insert( data, sub )
+ table.insert( self.sent_reqs, sub )
+ end
+
+ -- Clear out pending requests
self.pending_unsubs = {}
- -- Add pending non-subscription requests
+
+ -- Add pending requests
for i, v in ipairs( self.pending_reqs ) do
- -- really hating the lack of true arrays!
- local cmd = {}
- cmd[1] = v.playerid or ''
- cmd[2] = v.request
+ local cmd = {
+ v.playerid or '',
+ v.request
+ }
local req = {
channel = '/slim/request',
@@ -266,29 +289,326 @@
},
}
- -- only ask for a response if we have a callback function
+ -- Only ask for a response if we have a callback function
if v.func then
- req["id"] = v.reqid
-
+ req.id = v.reqid
+
-- Store this request's callback
local subscription = '/slim/request|' .. v.reqid
if not self.notify[subscription] then
self.notify[subscription] = {}
end
self.notify[subscription][v.func] = v.func
- end
-
+
+ table.insert( self.sent_reqs, req )
+ end
+
table.insert( data, req )
end
-- Clear out pending requests
self.pending_reqs = {}
-
- return
-end
+end
+
+
+-- Send any pending subscriptions and requests
+_sendPendingRequests = function(self)
+
+ -- add all pending unsub requests, and any others we need to send
+ local data = {}
+ _addPendingRequests(self, data)
+
+ -- Only continue if we have some data to send
+ if data[1] then
+ if log:isDebug() then
+ log:debug("Sending pending request(s):")
+ debug.dump(data, 5)
+ end
+
+ local req = CometRequest(
+ _getRequestSink(self),
+ self.uri,
+ data
+ )
+
+ self.rhttp:fetch(req)
+ end
+end
+
+
+function subscribe(self, subscription, func, playerid, request, priority)
+ if log:isDebug() then
+ log:debug(self, ": subscribe(", subscription, " ", func, ", ",
playerid, ", ", table.concat(request, ","), ", priority:", priority, ")")
+ end
+
+ -- Remember subs to send during connect now, or if we get
+ -- disconnected
+ table.insert( self.subs, {
+ reqid = self.reqid,
+ subscription = subscription,
+ playerid = playerid,
+ request = request,
+ func = func,
+ priority = priority,
+ pending = true, -- pending means we haven't sent this sub
request yet
+ } )
+
+ -- Bump reqid for the next request
+ self.reqid = self.reqid + 1
+
+ -- Send immediately unless we're batching queries
+ if self.active ~= CONNECTED or self.batch ~= 0 then
+ return
+ end
+
+ -- Send all pending requests and subscriptions
+ _sendPendingRequests(self)
+end
+
+
+function unsubscribe(self, subscription, func)
+ log:debug(self, ": unsubscribe(", subscription, ", ", func, ")")
+
+ -- Remove from notify list
+ if func then
+ -- Remove only the given callback
+ self.notify[subscription][func] = nil
+ else
+ -- Remove all callbacks
+ self.notify[subscription] = nil
+ end
+
+ -- If we unsubscribed the last one for this subscription, clear it out
+ if self.notify[subscription] then
+ return
+ end
+
+ log:debug("No more callbacks for ", subscription, " unsubscribing at
server")
+
+ -- Remove from subs list
+ for i, v in ipairs( self.subs ) do
+ if v.subscription == subscription then
+ table.remove( self.subs, i )
+ break
+ end
+ end
+
+ -- Add to pending unsubs
+ table.insert(self.pending_unsubs, {
+ reqid = self.reqid,
+ subscription = subscription,
+ } )
+
+ -- Bump reqid for the next request
+ self.reqid = self.reqid + 1
+
+ -- Send immediately unless we're batching queries
+ if self.state ~= CONNECTED or self.batch ~= 0 then
+ return
+ end
+
+ -- Send all pending requests
+ _sendPendingRequests(self)
+end
+
+
+function request(self, func, playerid, request, priority)
+ local id = self.reqid
+
+ if log:isDebug() then
+ log:debug(self, ": request(", func, ", reqid:", id, ", ",
playerid, ", ", table.concat(request, ","), ", priority:", priority, ")")
+ end
+
+ -- Add to pending requests
+ table.insert(self.pending_reqs, {
+ reqid = id,
+ func = func,
+ playerid = playerid,
+ request = request,
+ priority = priority,
+ })
+
+ -- Bump reqid for the next request
+ self.reqid = id + 1
+
+ -- Send immediately unless we're batching queries
+ if self.state ~= CONNECTED or self.batch ~= 0 then
+ if self.state ~= CONNECTED then
+ self.jnt:notify('cometDisconnected', self,
#self.pending_reqs)
+ end
+
+ return id
+ end
+
+ -- Send all pending requests
+ _sendPendingRequests(self)
+
+ return id
+end
+
+
+function addCallback(self, subscription, func)
+ log:debug(self, ": addCallback(", subscription, ", ", func, ")")
+
+ if not self.notify[subscription] then
+ self.notify[subscription] = {}
+ end
+
+ self.notify[subscription][func] = func
+end
+
+
+function removeCallback(self, subscription, func)
+ log:debug(self, ": removeCallback(", subscription, ", ", func, ")")
+
+ self.notify[subscription][func] = nil
+end
+
+
+-- Begin a set of batched queries
+function startBatch(self)
+ log:debug(self, ": startBatch", self.batch)
+
+ self.batch = self.batch + 1
+end
+
+
+-- End batch mode, send all batched queries together
+function endBatch(self)
+ log:debug(self, ": endBatch ", self.batch)
+
+ self.batch = self.batch - 1
+ if self.batch ~= 0 then
+ return
+ end
+
+ -- Send all pending requests and subscriptions
+ _sendPendingRequests(self)
+end
+
+
+-- Notify changes in connection state
+_state = function(self, state)
+ if self.state == state then
+ return
+ end
+
+ log:debug(self, ": state is ", state)
+ self.state = state
+
+ -- Stop reconnect timer
+ self.reconnect_timer:stop()
+
+ if state == CONNECTED then
+ -- Reset error count
+ self.failures = 0
+
+ self.jnt:notify('cometConnected', self)
+
+ elseif state == UNCONNECTED then
+ -- Force connections closed
+ self.chttp:close()
+ self.rhttp:close()
+
+ self.jnt:notify('cometDisconnected', self, #self.pending_reqs)
+ end
+end
+
+
+_handshake = function(self)
+ log:debug(self, ': _handshake(), calling: ', self.uri)
+
+ assert(self.state == UNCONNECTED)
+
+ if not self.isactive then
+ log:info(self, ': _handshake() connection not active')
+ return
+ end
+
+ -- Go through all existing subscriptions and reset the pending flag
+ -- so they are re-subscribed to during _connect()
+ for i, v in ipairs( self.subs ) do
+ log:debug("Will re-subscribe to ", v.subscription)
+ v.pending = true
+ end
+
+ -- Reset clientId
+ self.clientId = nil
+
+
+ local uuid, mac = self.jnt:getUUID()
+
+ local data = { {
+ channel = '/meta/handshake',
+ version = '1.0',
+ supportedConnectionTypes = { 'streaming' },
+ ext = {
+ rev = JIVE_VERSION,
+ mac = mac,
+ uuid = uuid
+ },
+ } }
+
+ -- XXX: according to the spec this should be sent as
application/x-www-form-urlencoded
+ -- with message=<url-encoded json> but it works as straight JSON
+
+ _state(self, CONNECTING)
+
+ local req = CometRequest(
+ _getHandshakeSink(self),
+ self.uri,
+ data
+ )
+
+ self.chttp:fetch(req)
+end
+
+
+_getHandshakeSink = function(self)
+ return function(chunk, err)
+ if self.state ~= CONNECTING then
+ return
+ end
+
+ -- On error, print something...
+ if err then
+ log:info(self, ": _handshake error: ", err)
+
+ -- Try to reconnect according to advice
+ return _handleAdvice(self)
+ end
+
+ -- If we have data
+ if not chunk then
+ return
+ end
+
+ local data = chunk[1]
+
+ -- Update advice if any
+ if data.advice then
+ self.advice = data.advice
+ log:debug(self, ": _handshake, advice updated from
server")
+ end
+
+ if data.successful then
+ self.clientId = data.clientId
+ self.advice = data.advice
+
+ log:debug(self, ": _handshake OK, clientId: ",
self.clientId)
+
+ -- Continue with connect phase, note we are still not
CONNECTED
+ _connect(self)
+ else
+ log:warn(self, ": _handshake error: ", data.error)
+ return _handleAdvice(self)
+ end
+ end
+end
+
_connect = function(self)
- log:debug('Comet:_connect()')
+ log:debug(self, ': _connect()')
-- Connect and subscribe to all events for this clientId
local data = { {
@@ -302,9 +622,19 @@
subscription = '/' .. self.clientId .. '/**',
} }
+ -- Add any un-acknowledged requests to the outgoing data
+ for i, v in ipairs(self.sent_reqs) do
+ table.insert(data, v)
+ end
+
-- Add any other pending requests to the outgoing data
- _addPendingRequests( self, data )
-
+ _addPendingRequests(self, data)
+
+ if log:isDebug() then
+ log:debug("Sending pending request(s):")
+ debug.dump(data, 5)
+ end
+
-- This will be our last request on this connection, it is now only
-- for listening for responses
@@ -317,473 +647,21 @@
self.chttp:fetch(req)
end
-_getEventSink = function(self)
- return function(chunk, err)
- -- on error, print something...
- if err then
- log:warn(self, "_getEventSink: error: ", err)
-
- -- try to reconnect according to advice
- _handleAdvice(self)
- end
-
- -- if we have data
- if chunk then
- -- Process each response event
- for i, event in ipairs(chunk) do
-
- -- update advice if any
- if event.advice then
- self.advice = event.advice
- log:debug(self, "_getEventSink, advice
updated from server")
- end
-
- if event.channel == '/meta/connect' then
- if event.successful then
- log:debug(self, "_getEventSink,
connect message acknowledged")
- _active(self, "connected")
- else
- log:warn(self, "_getEventSink,
connect failed: ", event.error)
- _handleAdvice(self)
- break
- end
- elseif event.channel == '/meta/disconnect' then
- if event.successful then
- log:debug(self, "_getEventSink,
disconnect OK")
- _active(self, "closed")
- else
- log:warn(self, "_getEventSink,
disconnect failed: ", event.error)
- end
- elseif event.channel == '/meta/reconnect' then
- if event.successful then
- log:debug(self, "_getEventSink,
reconnect OK")
- _active(self, "connected")
- else
- log:warn(self, "_getEventSink,
reconnect failed: ", event.error)
- _handleAdvice(self)
- break
- end
- elseif event.channel == '/meta/subscribe' then
- if event.successful then
- log:debug(self, "_getEventSink,
/meta/subscribe OK for ", event.subscription)
- else
- log:warn(self, "_getEventSink,
/meta/subscribe failed: ", event.error)
- end
- elseif event.channel == '/meta/unsubscribe' then
- if event.successful then
- log:debug(self, "_getEventSink,
/meta/unsubscribe OK for ", event.subscription)
- else
- log:warn(self, "_getEventSink,
/meta/unsubscribe error: ", event.error)
- end
- elseif event.channel == '/slim/subscribe' then
- if event.successful then
- log:debug(self, "_getEventSink,
/slim/subscribe OK for reqid ", event.id)
- else
- log:warn(self, "_getEventSink,
/slim/subscribe error for reqid ", event.id, ": ", event.error)
- end
- elseif event.channel == '/slim/unsubscribe' then
- if event.successful then
- log:debug(self, "_getEventSink,
/slim/unsubscribe OK for reqid ", event.id)
- else
- log:warn(self, "_getEventSink,
/slim/unsubscribe error for reqid ", event.id, ": ", event.error)
- end
- elseif event.channel == '/slim/request' and
event.successful then
- log:debug(self, "request id ",
event.id, " sent OK")
- elseif event.channel then
- local subscription = event.channel
- local onetime_request = false
-
- -- strip clientId from channel
- subscription =
string.gsub(subscription, "^/[0-9A-Za-z]+", "")
-
- if string.find(subscription,
'/slim/request') then
- -- an async notification from a
normal request
- subscription = subscription ..
'|' .. event.id
- onetime_request = true
- end
-
- if self.notify[subscription] then
- log:debug(self, "_getEventSink,
notifiying callbacks for ", subscription)
-
- for _, func in pairs(
self.notify[subscription] ) do
- log:debug(" callback
to: ", func)
- func(event)
- end
-
- if onetime_request then
- -- this was a one-time
request, so remove the callback
-
self.notify[subscription] = nil
- end
- else
- -- this is normal, since
unsub's are delayed by a few seconds, we may receive events
- -- after we unsubscribed but
before the server is notified about it
- log:debug(self, "_getEventSink,
got data for an event we aren't subscribed to, ignoring -> ", subscription)
- end
- else
- log:warn(self, "_getEventSink, unknown
error: ", event.error)
- if event.advice then
- _handleAdvice(self)
- break
- end
- end
- end
- end
- end
-end
-
-_handshake = function(self)
- log:debug('Comet:_handshake(), calling: ', self.uri)
-
- local data = { {
- channel = '/meta/handshake',
- version = '1.0',
- supportedConnectionTypes = { 'streaming' },
- ext = {
- rev = JIVE_VERSION,
- },
- } }
-
- local uuid, mac = self.jnt:getUUID()
-
- if mac then
- data[1].ext.mac = mac
- end
- if uuid then
- data[1].ext.uuid = uuid
- end
-
- -- XXX: according to the spec this should be sent as
application/x-www-form-urlencoded
- -- with message=<url-encoded json> but it works as straight JSON
-
- local req = CometRequest(
- _getHandshakeSink(self),
- self.uri,
- data
- )
-
- _active(self, "handshake")
-
- self.chttp:fetch(req)
-end
-
-_getHandshakeSink = function(self)
- return function(chunk, err)
- -- on error, print something...
- if err then
- log:warn(self, "_handshake with ", self.jsName, "
error: ", err)
-
- -- try to reconnect according to advice
- _handleAdvice(self)
- end
- -- if we have data
- if chunk then
- local data = chunk[1]
- if data.successful then
- _active(self, "connected")
- self.clientId = data.clientId
- self.advice = data.advice
-
- log:debug(self, "_handshake OK with ",
self.jsName, ", clientId: ", self.clientId)
-
-
- -- Continue with connect phase
- _connect(self)
- else
- log:warn(self, "_handshake error: ", data.error)
- if data.advice then
- self.advice = data.advice
- _handleAdvice(self)
- end
- end
- end
- end
-end
-
-function subscribe(self, subscription, func, playerid, request, priority)
- local id = self.reqid
-
- if log:isDebug() then
- log:debug(self, "subscribe(", subscription, ", reqid:", id, ",
", func, ", ", playerid, ", ", table.concat(request, ","), ", priority:",
priority, ")")
- end
-
- -- Remember subs to send during connect now, or if we get
- -- disconnected
- table.insert( self.subs, {
- subscription = subscription,
- playerid = playerid,
- request = request,
- reqid = id,
- func = func,
- priority = priority,
- pending = true, -- pending means we haven't send this sub
request yet
- } )
-
- self.reqid = self.reqid + 1
-
- -- Send immediately unless we're batching queries
- if self.active == "connected" and not self.batch then
- -- add all pending unsub requests, and any others we need to
send
- local data = {}
- _addPendingRequests(self, data)
-
- -- Only continue if we have some data to send
- if data[1] then
- if log:isDebug() then
- log:debug("Sending pending subscribe
request(s):")
- debug.dump(data, 5)
- end
-
- local req = CometRequest(
- _getEventSink(self),
- self.uri,
- data
- )
-
- self.rhttp:fetch(req)
- end
- end
-end
-
-function unsubscribe(self, subscription, func)
- log:debug(self, "unsubscribe(", subscription, ", ", func, ")")
-
- -- Remove from notify list
- if func then
- -- Remove only the given callback
- self.notify[subscription][func] = nil
- else
- -- Remove all callbacks
- self.notify[subscription] = nil
- end
-
- -- If we unsubscribed the last one for this subscription, clear it out
- if not self.notify[subscription] then
- log:debug("No more callbacks for ", subscription, "
unsubscribing at server")
-
- -- Remove from subs list
- for i, v in ipairs( self.subs ) do
- if v.subscription == subscription then
- table.remove( self.subs, i )
- break
- end
- end
-
- table.insert( self.pending_unsubs, subscription )
-
- if self.active == "connected" and not self.batch then
- -- add all pending unsub requests, and any others we
need to send
- local data = {}
- _addPendingRequests(self, data)
-
- -- Only continue if we have stuff to send
- if data[1] then
- if log:isDebug() then
- log:debug("Sending pending unsubscribe
request(s):")
- debug.dump(data, 5)
- end
-
- local req = CometRequest(
- _getEventSink(self),
- self.uri,
- data
- )
-
- -- unsubscribe doesn't need to be high priority
- self.rhttp:fetch(req)
- end
- end
- end
-end
-
-function request(self, func, playerid, request, priority)
- local id = self.reqid
-
- if log:isDebug() then
- log:debug(self, "request(", func, ", reqid:", id, ", ",
playerid, ", ", table.concat(request, ","), ", priority:", priority, ")")
- end
-
- if self.active ~= "connected" or self.batch then
- -- Add subscription to pending requests, to be sent during
connect/reconnect
- table.insert( self.pending_reqs, {
- reqid = id,
- func = func,
- playerid = playerid,
- request = request,
- priority = priority,
- } )
- self.jnt:notify('cometDisconnected', self, #self.pending_reqs)
- else
- local cmd = {}
- cmd[1] = playerid or ''
- cmd[2] = request
-
- local data = { {
- channel = '/slim/request',
- data = {
- request = cmd,
- response = '/' .. self.clientId ..
'/slim/request',
- priority = priority,
- },
- } }
-
- -- only pass id if we have a callback function, this tells
- -- SlimServer we don't want a response
- if func then
- data[1]["id"] = id
- end
-
- local sink = nil
- if func then
- sink = _getRequestSink(self, func, id)
- end
-
- local req = CometRequest(
- sink,
- self.uri,
- data
- )
-
- self.rhttp:fetch(req)
-
- -- If we expect a response, we will get the response on the
persistent
- -- connection. Store our callback for later
- if func then
- local subscription = '/slim/request|' .. id
- if not self.notify[subscription] then
- self.notify[subscription] = {}
- end
- self.notify[subscription][func] = func
- else
- log:debug(' No sink defined for this request, no
response will be received')
- end
- end
-
- -- Bump reqid for the next request
- self.reqid = self.reqid + 1
-
- -- Return the request id to the caller
- return id
-end
-
-_getRequestSink = function(self, func, reqid)
- return function(chunk, err)
- -- on error, print something...
- if err then
- log:warn(self, "request error: ", err)
- end
- -- if we have data
- if chunk then
- for i, event in ipairs(chunk) do
- if event.advice then
- self.advice = event.advice
- end
-
- if event.error then
- log:warn(self, "request error: ",
event.error)
- elseif event.channel == '/slim/request' and
event.successful then
- log:debug(self, "request id ", reqid, "
sent OK")
-
- else
- log:warn(self, "request unknown
response")
- end
- end
- end
- end
-end
-
-function addCallback(self, subscription, func)
- log:debug(self, "addCallback(", subscription, ", ", func, ")")
-
- if not self.notify[subscription] then
- self.notify[subscription] = {}
- end
-
- self.notify[subscription][func] = func
-end
-
-function removeCallback(self, subscription, func)
- log:debug(self, "removeCallback(", subscription, ", ", func, ")")
-
- self.notify[subscription][func] = nil
-end
-
--- Decide what to do if we get disconnected or get an error while
handshaking/connecting
-_handleAdvice = function(self)
- -- make sure our connection is closed
- if self.active ~= "closed" then
- _active(self, "closed")
- end
-
- -- stop any existing reconnect timer
- if self.reconnect_timer then
- self.reconnect_timer:stop()
- self.reconnect_timer = nil
- end
-
- self.failures = self.failures + 1
-
- local advice = self.advice
-
- -- Keep retrying after multiple failures but backoff gracefully
- local retry_interval = ( advice.interval or RETRY_DEFAULT ) *
self.failures
-
- if retry_interval > MAX_BACKOFF then
- retry_interval = MAX_BACKOFF
- end
-
- if advice.reconnect == 'none' then
- self.clientId = nil
- log:warn(self, "_connect, server told us not to reconnect")
-
- elseif advice.reconnect == 'handshake' then
- log:warn(
- self, " advice is ", advice.reconnect, ",
re-handshaking in ",
- retry_interval / 1000, " seconds"
- )
-
- self.clientId = nil
-
- self.reconnect_timer = Timer(
- retry_interval,
- function()
- -- Go through all existing subscriptions and
reset the pending flag
- -- so they are re-subscribed to during
_connect()
- for i, v in ipairs( self.subs ) do
- log:debug("Will re-subscribe to ",
v.subscription)
- v.pending = true
- end
-
- _handshake(self)
- end,
- true -- run timer only once
- )
- self.reconnect_timer:start()
-
- else -- if advice.reconnect == 'retry' then
- log:warn(
- self, " advice is ", advice.reconnect, ", reconnecting
in ",
- retry_interval / 1000, " seconds"
- )
-
- self.reconnect_timer = Timer(
- retry_interval,
- function()
- _reconnect(self)
- end,
- true -- run timer only once
- )
- self.reconnect_timer:start()
- end
-end
-- Reconnect to the server, try to maintain our previous clientId
_reconnect = function(self)
- log:debug('Comet:_reconnect()')
+ log:debug(self, ': _reconnect(), calling: ', self.uri)
+
+ assert(self.state == UNCONNECTED)
+
+ if not self.isactive then
+ log:info(self, ': _reconnect() connection not active')
+ return
+ end
if not self.clientId then
- log:debug(self, "_reconnect error: cannot reconnect without
clientId, handshaking instead")
- _handshake(self)
- do return end
+ log:debug(self, ": _reconnect error: cannot reconnect without
clientId, handshaking instead")
+ return _handshake(self)
end
local data = { {
@@ -798,70 +676,256 @@
data
)
- self.chttp:fetch(req)
-end
-
--- Notify changes in connection state
-_active = function(self, active)
-
- if self.active == active then
- return
- end
-
- self.active = active
-
- if active == "connected" then
- -- Reset error count
- self.failures = 0
-
- self.jnt:notify('cometConnected', self)
- elseif active == "closed" then
- -- force connections closed
- self.chttp:close()
- self.rhttp:close()
-
- self.jnt:notify('cometDisconnected', self, #self.pending_reqs)
- end
-end
-
--- Begin a set of batched queries
-function startBatch(self)
- log:debug(self, "startBatch()")
-
- self.batch = true
-end
-
--- End batch mode, send all batched queries together
-function endBatch(self)
- log:debug(self, "endBatch()")
-
- self.batch = false
-
- -- add all pending requests
- local data = {}
- _addPendingRequests(self, data)
-
- -- Only continue if we have some data to send
- if data[1] then
- if log:isDebug() then
- log:debug("Sending pending queries:")
- debug.dump(data, 5)
- end
-
- local req = CometRequest(
- _getEventSink(self),
- self.uri,
- data
- )
-
- self.rhttp:fetch(req)
+ self.chttp:fetch(req)
+
+ _state(self, CONNECTING)
+end
+
+
+-- sink for chunked connection, handle advice on error
+_getEventSink = function(self)
+ return function(chunk, err)
+ -- On error, print something...
+ if err then
+ log:info(self, ": _getEventSink error: ", err)
+
+ -- Try to reconnect according to advice
+ return _handleAdvice(self)
+ end
+
+ _response(self, chunk)
+ end
+end
+
+
+-- sink for request connection, resend requests on error
+_getRequestSink = function(self)
+ return function(chunk, err)
+ -- On error, print something...
+ if err then
+ log:info(self, ": _getRequestSink error: ", err)
+
+ -- Resend any un-acknowledged requests
+ local data = {}
+ for i, v in ipairs(self.sent_reqs) do
+ table.insert(data, v)
+ end
+
+ -- Only continue if we have some data to send
+ if data[1] then
+ log:info(self, ": Resending requests: ",
#self.sent_reqs)
+
+ local req = CometRequest(
+ _getRequestSink(self),
+ self.uri,
+ data
+ )
+ self.rhttp:fetch(req)
+ end
+ end
+
+ _response(self, chunk)
+ end
+end
+
+
+-- handle responses for both request and chunked connections
+_response = function(self, chunk)
+ -- If we have data
+ if not chunk then
+ return
+ end
+
+ -- Process each response event
+ for i, event in ipairs(chunk) do
+
+ -- Remove request from sent queue
+ for i, v in ipairs( self.sent_reqs ) do
+ if v.id == event.id then
+ table.remove(self.sent_reqs, i)
+ break
+ end
+ end
+
+ -- Log response
+ if event.error then
+ log:warn(self, ": _response, ", event.channel, "
failed: ", event.error)
+ else
+ log:debug(self, ": _response, ", event.channel, " OK")
+ end
+
+ -- Update advice if any
+ if event.advice then
+ self.advice = event.advice
+ log:debug(self, ": _response, advice updated from
server")
+ end
+
+ -- Handle response
+ if event.channel == '/meta/connect' then
+ if event.successful then
+ _state(self, CONNECTED)
+
+ -- send any requests queued during connect
+ _sendPendingRequests(self)
+ else
+ return _handleAdvice(self)
+ end
+ elseif event.channel == '/meta/disconnect' then
+ if event.successful then
+ self.clientId = nil
+ _state(self, UNCONNECTED)
+ else
+ return _handleAdvice(self)
+ end
+ elseif event.channel == '/meta/reconnect' then
+ if event.successful then
+ _state(self, CONNECTED)
+ else
+ return _handleAdvice(self)
+ end
+ elseif event.channel == '/meta/subscribe' then
+ -- no action
+ elseif event.channel == '/meta/unsubscribe' then
+ -- no action
+ elseif event.channel == '/slim/subscribe' then
+ -- no action
+ elseif event.channel == '/slim/unsubscribe' then
+ -- no action
+ elseif event.channel == '/slim/request' and event.successful
then
+ -- no action
+ elseif event.channel then
+ local subscription = event.channel
+ local onetime_request = false
+
+ -- strip clientId from channel
+ subscription = string.gsub(subscription,
"^/[0-9A-Za-z]+", "")
+
+ if string.find(subscription, '/slim/request') then
+ -- an async notification from a normal request
+ subscription = subscription .. '|' .. event.id
+ onetime_request = true
+ end
+
+ if self.notify[subscription] then
+ log:debug(self, ": _response, notifiying
callbacks for ", subscription)
+
+ for _, func in pairs( self.notify[subscription]
) do
+ log:debug(" callback to: ", func)
+ func(event)
+ end
+
+ if onetime_request then
+ -- this was a one-time request, so
remove the callback
+ self.notify[subscription] = nil
+ end
+ else
+ -- this is normal, since unsub's are delayed by
a few seconds, we may receive events
+ -- after we unsubscribed but before the server
is notified about it
+ log:debug(self, ": _response, got data for an
event we aren't subscribed to, ignoring -> ", subscription)
+ end
+ else
+ log:warn(self, ": _response, unknown error: ",
event.error)
+ return _handleAdvice(self)
+ end
+ end
+end
+
+
+_disconnect = function(self)
+ assert(self.state == CONNECTED)
+
+ log:debug(self, ': disconnect()')
+
+ -- Mark all subs as pending so they can be resubscribed later
+ for i, v in ipairs( self.subs ) do
+ log:debug("Will re-subscribe to ", v.subscription, " on next
connect")
+ v.pending = true
+ end
+
+ local data = { {
+ channel = '/meta/disconnect',
+ clientId = self.clientId,
+ } }
+
+ local req = CometRequest(
+ _getRequestSink(self),
+ self.uri,
+ data
+ )
+
+ self.rhttp:fetch(req)
+
+ _state(self, UNCONNECTING)
+end
+
+
+-- Decide what to do if we get disconnected or get an error while
handshaking/connecting
+_handleAdvice = function(self)
+ log:warn(self, ": handleAdvice state=", self.state)
+
+ if self.state == UNCONNECTED then
+ -- do nothing
+ return
+ end
+
+ -- force connection closed
+ _state(self, UNCONNECTED)
+
+ self.failures = self.failures + 1
+ local reconnect = self.advice.reconnect or "retry"
+ local retry_interval = self.advice.interval or RETRY_DEFAULT
+
+ if retry_interval == 0 then
+ -- Retry immediately
+ elseif self.aggressive then
+ -- Retry using a random interval between 1 - advice.interval
seconds
+ retry_interval = math.random(1000, retry_interval)
+ else
+ -- Keep retrying after multiple failures but backoff gracefully
+ retry_interval = retry_interval * self.failures
+
+ if retry_interval > MAX_BACKOFF then
+ retry_interval = MAX_BACKOFF
+ end
+ end
+
+ if reconnect == 'none' then
+ self.clientId = nil
+ log:info(self, ": advice is ", reconnect, " server told us not
to reconnect")
+
+ else
+ log:info(self, ": advice is ", reconnect, ", connect in ",
+ retry_interval / 1000, " seconds")
+
+ self.reconnect_timer:restart(retry_interval)
+ end
+end
+
+
+_handleTimer = function(self)
+ log:debug(self, ": handleTimer state=", self.state, " advice=",
self.advice)
+
+ if self.state ~= UNCONNECTED then
+ log:debug(self, ": ignoring timer while ", self.state)
+ return
+ end
+
+ local reconnect = self.advice.reconnect or "retry"
+
+ if reconnect == 'handshake' then
+ _handshake(self)
+
+ elseif reconnect == 'retry' then
+ _reconnect(self)
+
end
end
function __tostring(self)
- return "Comet {" .. self.name .. "}: "
-end
+ return "Comet {" .. self.name .. "}"
+end
+
--[[
Modified: trunk/jive/src/pkg/jive/share/jive/net/SocketHttp.lua
URL:
http://svn.slimdevices.com/trunk/jive/src/pkg/jive/share/jive/net/SocketHttp.lua?rev=1664&root=Jive&r1=1663&r2=1664&view=diff
==============================================================================
--- trunk/jive/src/pkg/jive/share/jive/net/SocketHttp.lua (original)
+++ trunk/jive/src/pkg/jive/share/jive/net/SocketHttp.lua Mon Jan 28 14:14:32
2008
@@ -808,6 +808,9 @@
-- FIXME: manage the queues
SocketTcp.close(self)
+
+ -- restart the state machine if it is idle
+ self:t_sendDequeueIfIdle()
end
Modified: trunk/jive/src/pkg/jive/share/jive/slim/SlimServer.lua
URL:
http://svn.slimdevices.com/trunk/jive/src/pkg/jive/share/jive/slim/SlimServer.lua?rev=1664&root=Jive&r1=1663&r2=1664&view=diff
==============================================================================
--- trunk/jive/src/pkg/jive/share/jive/slim/SlimServer.lua (original)
+++ trunk/jive/src/pkg/jive/share/jive/slim/SlimServer.lua Mon Jan 28 14:14:32
2008
@@ -220,13 +220,14 @@
})
obj.id = obj:idFor(ip, port, name)
-
+
-- subscribe to comet events
jnt:subscribe(obj)
-- subscribe to server status, timeout at 60 seconds.
-- get 50 players
-- FIXME: what if the server has more than 50 players?
+ obj.comet:aggressiveReconnect(true)
obj.comet:subscribe('/slim/serverstatus',
_getSink(obj, '_serverstatusSink'),
nil,
@@ -293,7 +294,7 @@
log:info(self, ":connect()")
-- artwork pool connects on demand
- self.comet:start()
+ self.comet:connect()
end
Modified: trunk/jive/src/pkg/jive/share/jive/utils/debug.lua
URL:
http://svn.slimdevices.com/trunk/jive/src/pkg/jive/share/jive/utils/debug.lua?rev=1664&root=Jive&r1=1663&r2=1664&view=diff
==============================================================================
--- trunk/jive/src/pkg/jive/share/jive/utils/debug.lua (original)
+++ trunk/jive/src/pkg/jive/share/jive/utils/debug.lua Mon Jan 28 14:14:32 2008
@@ -62,16 +62,29 @@
--[[
-=head2 trace()
+=head2 traceon()
-Traces Lua calls, line by line. There is no way to stop tracing. This is very
verbose,
+Traces Lua calls, line by line. Use traceoff() to turn off tracing. This is
very verbose,
but can help trace performance or strange behavioral issues. It also gives a
glimpse on
the inner working of the Lua engine.
=cut
--]]
-function trace ()
+function traceon ()
ldebug.sethook(_trace_line, "l")
+end
+
+
+--[[
+
+=head2 traceff()
+
+Turns off tracing Lua calls, line by line.
+
+=cut
+--]]
+function traceoff ()
+ ldebug.sethook(nil, "l")
end
Added: trunk/jive/src/pkg/jive/share/strict.lua
URL:
http://svn.slimdevices.com/trunk/jive/src/pkg/jive/share/strict.lua?rev=1664&root=Jive&view=auto
==============================================================================
--- trunk/jive/src/pkg/jive/share/strict.lua (added)
+++ trunk/jive/src/pkg/jive/share/strict.lua Mon Jan 28 14:14:32 2008
@@ -1,0 +1,34 @@
+--
+-- strict.lua
+-- checks uses of undeclared global variables
+-- All global variables must be 'declared' through a regular assignment
+-- (even assigning nil will do) in a main chunk before being used
+-- anywhere or assigned to inside a function.
+--
+
+local mt = getmetatable(_G)
+if mt == nil then
+ mt = {}
+ setmetatable(_G, mt)
+end
+
+mt.__declared = {}
+
+mt.__newindex = function (t, n, v)
+ if not mt.__declared[n] then
+ local w = debug.getinfo(2, "S").what
+ if w ~= "main" and w ~= "C" then
+ error("assign to undeclared variable '"..n.."'", 2)
+ end
+ mt.__declared[n] = true
+ end
+ rawset(t, n, v)
+end
+
+mt.__index = function (t, n)
+ if not mt.__declared[n] and debug.getinfo(2, "S").what ~= "C" then
+ error("variable '"..n.."' is not declared", 2)
+ end
+ return rawget(t, n)
+end
+
_______________________________________________
Jive-checkins mailing list
[email protected]
http://lists.slimdevices.com/cgi-bin/mailman/listinfo/jive-checkins