Author: titmuss
Date: Tue Jan 15 14:26:46 2008
New Revision: 1450

URL: http://svn.slimdevices.com?rev=1450&root=Jive&view=rev
Log:
Bug: N/A
Description:
When an http connection is idle keep monitoring it in the select loop, the 
server may 
close the connection and we need to detect this.

Tune how HttpPool manages the connections. That class is now just used for 
artwork, so we 
want all connections to close on a timeout. This was also closing connections 
too 
quickly, making multiple connections get opened while browsing artists.

Modified:
    trunk/jive/src/pkg/jive/share/jive/net/HttpPool.lua
    trunk/jive/src/pkg/jive/share/jive/net/RequestHttp.lua
    trunk/jive/src/pkg/jive/share/jive/net/SocketHttp.lua
    trunk/jive/src/pkg/jive/share/jive/net/SocketHttpQueue.lua
    trunk/jive/src/pkg/jive/share/jive/slim/SlimServer.lua

Modified: trunk/jive/src/pkg/jive/share/jive/net/HttpPool.lua
URL: 
http://svn.slimdevices.com/trunk/jive/src/pkg/jive/share/jive/net/HttpPool.lua?rev=1450&root=Jive&r1=1449&r2=1450&view=diff
==============================================================================
--- trunk/jive/src/pkg/jive/share/jive/net/HttpPool.lua (original)
+++ trunk/jive/src/pkg/jive/share/jive/net/HttpPool.lua Tue Jan 15 14:26:46 2008
@@ -43,7 +43,7 @@
 
 local log             = require("jive.utils.log").logger("net.http")
 
-local KEEPALIVE_TIMEOUT = 300000 -- timeout idle connections after 300 seconds
+local KEEPALIVE_TIMEOUT = 60000 -- timeout idle connections after 60 seconds
 
 -- jive.net.HttpPool is a base class
 module(..., oo.class)
@@ -191,7 +191,9 @@
                        KEEPALIVE_TIMEOUT,
                        function()
                                log:debug(self, ": closing idle connection")
-                               self.pool.jshq[1]:close('keep-alive timeout')
+                               for i = 1, self.pool.active do
+                                       self.pool.jshq[1]:close('keep-alive 
timeout')
+                               end
                        end,
                        true -- run once
                )
@@ -199,7 +201,7 @@
        end                     
 
        -- close all but the first one (active = 1)
-       return nil, (socket != self.pool.jshq[1])
+       return nil, false
 end
 
 

Modified: trunk/jive/src/pkg/jive/share/jive/net/RequestHttp.lua
URL: 
http://svn.slimdevices.com/trunk/jive/src/pkg/jive/share/jive/net/RequestHttp.lua?rev=1450&root=Jive&r1=1449&r2=1450&view=diff
==============================================================================
--- trunk/jive/src/pkg/jive/share/jive/net/RequestHttp.lua (original)
+++ trunk/jive/src/pkg/jive/share/jive/net/RequestHttp.lua Tue Jan 15 14:26:46 
2008
@@ -352,14 +352,6 @@
 end
 
 
--- t_canDequeue
--- called by the HTTP layer to determine if the next request in the queue
--- can be sent
-function t_canDequeue(self)
-       return false
-end
-
-
 --[[
 
 =head2 tostring(aRequest)

Modified: trunk/jive/src/pkg/jive/share/jive/net/SocketHttp.lua
URL: 
http://svn.slimdevices.com/trunk/jive/src/pkg/jive/share/jive/net/SocketHttp.lua?rev=1450&root=Jive&r1=1449&r2=1450&view=diff
==============================================================================
--- trunk/jive/src/pkg/jive/share/jive/net/SocketHttp.lua (original)
+++ trunk/jive/src/pkg/jive/share/jive/net/SocketHttp.lua Tue Jan 15 14:26:46 
2008
@@ -104,6 +104,7 @@
 --]]
 function fetch(self, request)
        _assert(oo.instanceof(request, RequestHttp), tostring(self) .. 
":fetch() parameter must be RequestHttp - " .. type(request) .. " - ".. 
debug.traceback())
+
        -- push the request
        table.insert(self.t_httpSendRequests, request)
 
@@ -118,7 +119,7 @@
 -- manages the http state machine for sending stuff to the server
 function t_sendNext(self, go, newState)
 --     log:debug(self, ":t_sendNext(", go, ", ", newState, ")")
-       
+
        if newState then
                _assert(self[newState] and type(self[newState]) == 'function')
                self.t_httpSendState = newState
@@ -132,14 +133,25 @@
 end
 
 
+-- t_dequeueRequest
+-- removes a request from the queue, can be overridden by sub-classes
+function _dequeueRequest(self)
+       if #self.t_httpSendRequests > 0 then
+               return table.remove(self.t_httpSendRequests, 1)
+       end
+
+       return nil
+end
+
+
 -- t_sendDequeue
 -- removes a request from the queue
 function t_sendDequeue(self)
 --     log:debug(self, ":t_sendDequeue()")
        
-       if #self.t_httpSendRequests > 0 then
-       
-               self.t_httpSending = table.remove(self.t_httpSendRequests, 1)
+       self.t_httpSending = self:_dequeueRequest()
+
+       if self.t_httpSending then
 --             log:info(self, " processing ", self.t_httpSending)
                self:t_sendNext(true, 't_sendConnect')
                return
@@ -147,6 +159,14 @@
        
        -- back to idle
 --     log:info(self, ": no request in queue")
+
+       if self:connected() then
+               local pump = function(NetworkThreadErr)
+                                    self:close("connect closed")
+                            end
+
+               self:t_addRead(pump, 0) -- No timeout
+       end
 end
 
 
@@ -165,11 +185,9 @@
 --     log:debug(self, ":t_sendConnect()")
        
        if not self:connected() then
-       
                local err = socket.skip(1, self:t_connect())
        
                if err then
-       
                        log:error(self, ":t_sendConnect: ", err)
                        self:close(err)
                        return
@@ -245,7 +263,7 @@
        local source = function()
        
                local line1 = string.format("%s HTTP/%s", 
self.t_httpSending:t_getRequestString(), self.t_httpProtocol)
-               
+
                local t = {}
                
                table.insert(t, line1)
@@ -403,8 +421,6 @@
                        else
                                return false, "cannot parse: " .. chunk
                        end
-
-                       log:warn(self," CODE: line1=", line1)
                end
 
                -- read headers
@@ -425,130 +441,12 @@
 
                                headers[name] = value
                        else
-                               log:warn(self," HDRS: line1=", line1)
-
                                -- we're done
                                
self.t_httpReceiving:t_setResponseHeaders(statusCode, statusLine, headers)
 
                                -- release send queue
                                self:t_sendNext(false, 't_sendDequeue')
                
-                               -- we've received response headers, check with 
request if OK to send next query now
-                               if self.t_httpReceiving:t_canDequeue() then
-                                       self:t_sendDequeueIfIdle()
-                               end
-                       
-                               -- move on to our future...
-                               self:t_rcvNext(true, 't_rcvResponse')
-                               return
-                       end
-               end
-       end
-       
-       self:t_addRead(pump, SOCKET_TIMEOUT)
-
-
-       
-       local first = true
-       local partial
-       
-       local source = function()
-       
-               local line, err = self.t_sock:receive('*l', partial)
-               
-               if err then
---                     log:debug(self, ":t_rcvHeaders.source:", err)
-
-                       return nil, err
-               end
-               
-               if line == "" then
-                       -- we're done
-                       return nil
-               end
-               
-               return line
-       end
-
-       local headers = {}
-       local statusCode = false
-       local statusLine = false
-       local sink = function(chunk, err)
---             log:debug(self, ":t_rcvHeaders.sink: ", chunk)
-               
-               if chunk then
-                       
-                       -- first line is status line
-                       if not statusCode then
-                       
-                               local data = socket.skip(2, string.find(chunk, 
"HTTP/%d*%.%d* (%d%d%d)"))
-                               
-                               if data then
-                                       statusCode = tonumber(data)
-                                       statusLine = chunk
-                               else
-                                       return false, "cannot parse: " .. chunk
-                               end
-                               
-                       else
-                               
-                               local name, value = socket.skip(2, 
string.find(chunk, "^(.-):%s*(.*)"))
-                               
-                               if not (name and value) then 
-                                       return false, "malformed reponse 
headers"
-                               else
-                                       headers[name] = value
---                                     log:debug(self, ":t_rcvHeaders.sink 
header: ", name, ":", value)
-                               end
-                       end
-               end
-               return 1
-       end
-
-       local pump = function (NetworkThreadErr)
---             log:debug(self, ":t_rcvHeaders.pump()")
-               if first then
-                       first = false
-               end
-               
-               if NetworkThreadErr then
-                       log:error(self, ":t_rcvHeaders.pump:", err)
-                       --self:t_removeRead()
-                       self:close(err)
-                       return
-               end
-               
-               while true do
-                       local ret, err = ltn12.pump.step(source, sink)
-       
-                       if err then
-               
-                               if err == 'timeout' then
---                                     log:debug(self, ":t_rcvHeaders.pump - 
timeout")
-                                       -- more next time
-                                       return
-                               end
-               
-                               log:error(self, ":t_rcvHeaders.pump:", err)
-                               --self:t_removeRead()
-                               self:close(err)
-                               return
-                       
-                       elseif not ret then
-               
-                               -- we're done
---                             self:t_removeRead()
-                       
-                               
self.t_httpReceiving:t_setResponseHeaders(statusCode, statusLine, headers)
-               
-                               -- release send queue
-                               self:t_sendNext(false, 't_sendDequeue')
-               
-                               -- we've received response headers, check with 
request if OK to send next query now
-                               if self.t_httpReceiving:t_canDequeue() then
-                                       self:t_sendDequeueIfIdle()
-                               end
-                       
                                -- move on to our future...
                                self:t_rcvNext(true, 't_rcvResponse')
                                return
@@ -841,12 +739,6 @@
 
                        -- move on to our future
                        self:t_rcvNext(true, 't_rcvSend')
-                       
-               -- else let the request decide if we can send another request, 
if any   
-               else
-                       if self.t_httpReceiving:t_canDequeue() then
-                               self:t_sendDequeueIfIdle()
-                       end
                end
        end
        

Modified: trunk/jive/src/pkg/jive/share/jive/net/SocketHttpQueue.lua
URL: 
http://svn.slimdevices.com/trunk/jive/src/pkg/jive/share/jive/net/SocketHttpQueue.lua?rev=1450&root=Jive&r1=1449&r2=1450&view=diff
==============================================================================
--- trunk/jive/src/pkg/jive/share/jive/net/SocketHttpQueue.lua (original)
+++ trunk/jive/src/pkg/jive/share/jive/net/SocketHttpQueue.lua Tue Jan 15 
14:26:46 2008
@@ -62,22 +62,22 @@
 end
 
 
--- t_sendDequeue
+-- _dequeueRequest
 --
-function t_sendDequeue(self)
---     log:debug(self, ":t_sendDequeue()")
+function _dequeueRequest(self)
+--     log:debug(self, ":_dequeueRequest()")
        
        local request, close = self.httpqueue:t_dequeue(self)
        
        if request then
-               self.t_httpSending = request
---             log:info(self, " processing ", self.t_httpSending)
-               self:t_sendNext(true, 't_sendConnect')
+               return request
        end
        
        if close then
                self:close()
        end
+
+       return nil
 end
 
 

Modified: trunk/jive/src/pkg/jive/share/jive/slim/SlimServer.lua
URL: 
http://svn.slimdevices.com/trunk/jive/src/pkg/jive/share/jive/slim/SlimServer.lua?rev=1450&root=Jive&r1=1449&r2=1450&view=diff
==============================================================================
--- trunk/jive/src/pkg/jive/share/jive/slim/SlimServer.lua (original)
+++ trunk/jive/src/pkg/jive/share/jive/slim/SlimServer.lua Tue Jan 15 14:26:46 
2008
@@ -204,7 +204,7 @@
                players = {},
 
                -- artwork http pool
-               artworkPool = HttpPool(jnt, name, ip, port, 2, 2, 
Task.PRIORITY_LOW),
+               artworkPool = HttpPool(jnt, name, ip, port, 2, 1, 
Task.PRIORITY_LOW),
 
                -- artwork cache: Weak table storing a surface by iconId
                artworkThumbCache = setmetatable({}, { __mode="k" }),

_______________________________________________
Jive-checkins mailing list
[email protected]
http://lists.slimdevices.com/cgi-bin/mailman/listinfo/jive-checkins

Reply via email to