Author: titmuss
Date: Wed Jan 30 09:51:19 2008
New Revision: 1702

URL: http://svn.slimdevices.com?rev=1702&root=Jive&view=rev
Log:
Bug: 6763
Description:
The http code originally allowed for pipelining requests on the socket. This 
was never used, and the code contained a race 
condition that meant no more requests could be processed by that socket. To fix 
this a single state is now kept for both 
send and receive operations. Pipelining can be added back later if thought 
useful.


Modified:
    branches/7.0/jive/src/pkg/jive/share/jive/net/HttpPool.lua
    branches/7.0/jive/src/pkg/jive/share/jive/net/SocketHttp.lua

Modified: branches/7.0/jive/src/pkg/jive/share/jive/net/HttpPool.lua
URL: 
http://svn.slimdevices.com/branches/7.0/jive/src/pkg/jive/share/jive/net/HttpPool.lua?rev=1702&root=Jive&r1=1701&r2=1702&view=diff
==============================================================================
--- branches/7.0/jive/src/pkg/jive/share/jive/net/HttpPool.lua (original)
+++ branches/7.0/jive/src/pkg/jive/share/jive/net/HttpPool.lua Wed Jan 30 
09:51:19 2008
@@ -145,12 +145,16 @@
        self.reqQueueCount = self.reqQueueCount + 1
        
        -- calculate threshold
+--[[
        local active = math.floor(self.reqQueueCount / self.pool.threshold) + 1
        if active > #self.pool.jshq then
                active = #self.pool.jshq
        end
        self.pool.active = active
-       
+--]]
+       self.pool.active = #self.pool.jshq
+
+
 --     log:debug(self, ":", self.reqQueueCount, " requests, ", 
self.pool.active, " connections")
 
        -- kick all active queues

Modified: branches/7.0/jive/src/pkg/jive/share/jive/net/SocketHttp.lua
URL: 
http://svn.slimdevices.com/branches/7.0/jive/src/pkg/jive/share/jive/net/SocketHttp.lua?rev=1702&root=Jive&r1=1701&r2=1702&view=diff
==============================================================================
--- branches/7.0/jive/src/pkg/jive/share/jive/net/SocketHttp.lua (original)
+++ branches/7.0/jive/src/pkg/jive/share/jive/net/SocketHttp.lua Wed Jan 30 
09:51:19 2008
@@ -71,20 +71,17 @@
 =cut
 --]]
 function __init(self, jnt, address, port, name)
-       --log:debug("SocketHttp:__init(", name, ", ", address, ", ", port, ")")
+--     log:debug("SocketHttp:__init(", name, ", ", address, ", ", port, ")")
 
        -- init superclass
        local obj = oo.rawnew(self, SocketTcp(jnt, address, port, name))
 
        -- init states
-       obj.t_httpSendState = 't_sendDequeue'
-       obj.t_httpRcvState = 't_rcvDequeue'
+       obj.t_httpState = 't_sendDequeue'
        
        -- init queues
        obj.t_httpSendRequests = {}
-       obj.t_httpSending = false
-       obj.t_httpRcvRequests = {}
-       obj.t_httpReceiving = false
+       obj.t_httpRequest = false
        
        obj.t_httpProtocol = '1.1'
        
@@ -108,27 +105,27 @@
        -- push the request
        table.insert(self.t_httpSendRequests, request)
 
---     log:info(self, " queuing ", request, " - ", #self.t_httpSendRequests, " 
requests in queue")
+--     log:debug(self, " queuing ", request, " - ", #self.t_httpSendRequests, 
" requests in queue")
                
        -- start the state machine if it is idle
        self:t_sendDequeueIfIdle()
 end
 
 
--- t_sendNext
+-- t_nextState
 -- manages the http state machine for sending stuff to the server
-function t_sendNext(self, go, newState)
---     log:debug(self, ":t_sendNext(", go, ", ", newState, ")")
+function t_nextState(self, go, newState)
+--     log:debug(self, ":t_nextState(", go, ", ", newState, ")")
 
        if newState then
                _assert(self[newState] and type(self[newState]) == 'function')
-               self.t_httpSendState = newState
+               self.t_httpState = newState
        end
        
        if go then
                -- call the function
                -- self:XXX(bla) is really the same as self["XXX"](self, bla)
-               self[self.t_httpSendState](self)
+               self[self.t_httpState](self)
        end
 end
 
@@ -148,21 +145,21 @@
 -- removes a request from the queue
 function t_sendDequeue(self)
 --     log:debug(self, ":t_sendDequeue()")
-       
-       self.t_httpSending = self:_dequeueRequest()
-
-       if self.t_httpSending then
---             log:info(self, " processing ", self.t_httpSending)
-               self:t_sendNext(true, 't_sendConnect')
+
+       self.t_httpRequest = self:_dequeueRequest()
+
+       if self.t_httpRequest then
+--             log:debug(self, " processing ", self.t_httpRequest)
+               self:t_nextState(true, 't_sendConnect')
                return
        end
        
        -- back to idle
---     log:info(self, ": no request in queue")
+--     log:debug(self, ": no request in queue")
 
        if self:connected() then
                local pump = function(NetworkThreadErr)
-                                    self:close("connect closed")
+                                    self:close("idle close")
                             end
 
                self:t_addRead(pump, 0) -- No timeout
@@ -173,8 +170,8 @@
 -- t_sendDequeueIfIdle
 -- causes a dequeue and processing on the send queue if possible
 function t_sendDequeueIfIdle(self)
-       if self.t_httpSendState == 't_sendDequeue' then
-               self:t_sendNext(true)
+       if self.t_httpState == 't_sendDequeue' then
+               self:t_nextState(true)
        end
 end
 
@@ -194,7 +191,7 @@
                end
        end
                
-       self:t_sendNext(true, 't_sendRequest')
+       self:t_nextState(true, 't_sendRequest')
 end
 
 
@@ -208,7 +205,7 @@
                ["User-Agent"] = 'Jive/' .. JIVE_VERSION,
        }
        
-       local req_headers = self.t_httpSending:t_getRequestHeaders()
+       local req_headers = self.t_httpRequest:t_getRequestHeaders()
        if not req_headers["Host"] then
                local address, port = self:t_getAddressPort()
                headers["Host"] = address
@@ -217,8 +214,8 @@
                end
        end
        
-       if self.t_httpSending:t_hasBody() then
-               headers["Content-Length"] = #self.t_httpSending:t_body()
+       if self.t_httpRequest:t_hasBody() then
+               headers["Content-Length"] = #self.t_httpRequest:t_body()
        end
 
        req_headers["Accept-Language"] = string.lower(locale.getLocale())       
@@ -262,7 +259,7 @@
        
        local source = function()
        
-               local line1 = string.format("%s HTTP/%s", 
self.t_httpSending:t_getRequestString(), self.t_httpProtocol)
+               local line1 = string.format("%s HTTP/%s", 
self.t_httpRequest:t_getRequestString(), self.t_httpProtocol)
 
                local t = {}
                
@@ -271,13 +268,13 @@
                for k, v in pairs(self:t_getSendHeaders()) do
                        table.insert(t, k .. ": " .. v)
                end
-               for k, v in pairs(self.t_httpSending:t_getRequestHeaders()) do
+               for k, v in pairs(self.t_httpRequest:t_getRequestHeaders()) do
                        table.insert(t, k .. ": " .. v)
                end
                
                table.insert(t, "")
-               if self.t_httpSending:t_hasBody() then
-                       table.insert(t, self.t_httpSending:t_body())
+               if self.t_httpRequest:t_hasBody() then
+                       table.insert(t, self.t_httpRequest:t_body())
                else
                        table.insert(t, "")
                end
@@ -313,61 +310,10 @@
                
                -- no error, we're done, move on!
                self:t_removeWrite()
-               self:t_sendNext(true, 't_sendReceive')
+               self:t_nextState(true, 't_rcvHeaders')
        end
 
        self:t_addWrite(pump, SOCKET_TIMEOUT)
-end
-
-
--- t_sendReceive
---
-function t_sendReceive(self)
---     log:debug(self, ":t_sendReceive()")
-       
-       -- we're done sending request, add it to receive queue
-       if self.t_httpSending then
-               table.insert(self.t_httpRcvRequests, self.t_httpSending)
-               self.t_httpSending = nil
-               
-               -- get the receive game rolling if possible
-               if self.t_httpRcvState == 't_rcvDequeue' then
-                       self:t_rcvNext(true)
-               end
-       end
-       
-       -- stay put until told we can dequeue next
-       self:t_sendNext(false, 't_sendReceive')
-end
-
-
--- t_rcvNext
--- manages the http state machine for receiving server data
-function t_rcvNext(self, go, newState)
---     log:debug(self, ":t_rcvNext(", go, ", ", newState, ")")
-
-       if newState then
-               _assert(self[newState] and type(self[newState]) == 'function')
-               self.t_httpRcvState = newState
-       end
-       
-       if go then
-               -- call the function
-               -- self:XXX(bla) is really the same as self["XXX"](self, bla)
-               self[self.t_httpRcvState](self)
-       end
-end
-
-
--- t_rcvDequeue
---
-function t_rcvDequeue(self)
---     log:debug(self, ":t_rcvDequeue()")
-       
-       if #self.t_httpRcvRequests > 0 then
-               self.t_httpReceiving = table.remove(self.t_httpRcvRequests, 1)
-               self:t_rcvNext(true, 't_rcvHeaders')
-       end
 end
 
 
@@ -381,7 +327,7 @@
                line, err, partial = self.t_sock:receive('*l', partial)
                if err then
                        if err == 'timeout' then
-                               return
+                               return false, err
                        end
 
                        log:error(self, ":t_rcvHeaders.pump:", err)
@@ -410,7 +356,7 @@
                if not statusCode then
                        local line, err = source()
                        if err then
-                               return false, err
+                               return
                        end
 
                        local data = socket.skip(2, string.find(line, 
"HTTP/%d*%.%d* (%d%d%d)"))
@@ -419,7 +365,8 @@
                                statusCode = tonumber(data)
                                statusLine = chunk
                        else
-                               return false, "cannot parse: " .. chunk
+                               self:close(err)
+                               return
                        end
                end
 
@@ -427,7 +374,7 @@
                while true do
                        local line, err = source()
                        if err then
-                               return false, err
+                               return
                        end
 
                        if line ~= "" then
@@ -436,19 +383,16 @@
                                        err = "malformed reponse headers"
                                        log:warn(err)
                                        self:close(err)
-                                       return false, err
+                                       return
                                end
 
                                headers[name] = value
                        else
                                -- we're done
-                               
self.t_httpReceiving:t_setResponseHeaders(statusCode, statusLine, headers)
-
-                               -- release send queue
-                               self:t_sendNext(false, 't_sendDequeue')
-               
+                               
self.t_httpRequest:t_setResponseHeaders(statusCode, statusLine, headers)
+
                                -- move on to our future...
-                               self:t_rcvNext(true, 't_rcvResponse')
+                               self:t_nextState(true, 't_rcvResponse')
                                return
                        end
                end
@@ -678,21 +622,18 @@
 -- t_rcvResponse
 -- acrobatics to read the response body
 function t_rcvResponse(self)
-       
        local mode
        local len
 
-       local httpReceiving = self.t_httpReceiving
-       
-       if httpReceiving:t_getResponseHeader('Transfer-Encoding') == 'chunked' 
then
+       if self.t_httpRequest:t_getResponseHeader('Transfer-Encoding') == 
'chunked' then
        
                mode = 'jive-http-chunked'
                
        else
                        
-               if httpReceiving:t_getResponseHeader("Content-Length") then
+               if self.t_httpRequest:t_getResponseHeader("Content-Length") then
                        -- if we have a length, use it!
-                       len = 
tonumber(httpReceiving:t_getResponseHeader("Content-Length"))
+                       len = 
tonumber(self.t_httpRequest:t_getResponseHeader("Content-Length"))
                        mode = 'jive-by-length'
                        
                else
@@ -703,8 +644,8 @@
        
        local source = socket.source(mode, self.t_sock, len or self)
        
-       local sinkMode = httpReceiving:t_getResponseSinkMode()
-       local sink = _getSink(sinkMode, httpReceiving)
+       local sinkMode = self.t_httpRequest:t_getResponseSinkMode()
+       local sink = _getSink(sinkMode, self.t_httpRequest)
 
        local pump = function (NetworkThreadErr)
 --             log:debug(self, ":t_rcvResponse.pump(", mode, ", ", 
tostring(nt_err) , ")")
@@ -736,38 +677,17 @@
                                return
                        end
 
-                       if httpReceiving:t_getResponseHeader('Connection') == 
'close' then
+                       if self.t_httpRequest:t_getResponseHeader('Connection') 
== 'close' then
                                -- just close the socket, don't reset our state
                                SocketTcp.close(self)
                        end
 
                        -- move on to our future
-                       self:t_rcvNext(true, 't_rcvSend')
+                       self:t_nextState(true, 't_sendDequeue')
                end
        end
        
        self:t_addRead(pump, SOCKET_TIMEOUT)
-end
-
-
--- t_rcvSend
---
-function t_rcvSend(self)
---     log:debug(self, ":t_rcvSend()")
-       
-       -- we're done receiving request, drop it
-       if self.t_httpReceiving then
-       
---             log:debug(self, " done with ", self.t_httpReceiving)
-
-               self.t_httpReceiving = nil
-               
-               -- get the send game rolling if possible
-               self:t_sendDequeueIfIdle()
-       end
-       
-       -- move to dequeue
-       self:t_rcvNext(true, 't_rcvDequeue')
 end
 
 
@@ -779,9 +699,7 @@
        -- dump queues
        -- FIXME: should it free requests?
        self.t_httpSendRequests = nil
-       self.t_httpSending = nil
-       self.t_httpRcvRequests = nil
-       self.t_httpReceiving = nil
+       self.t_httpRequest = nil
        
        SocketTcp.free(self)
 end
@@ -790,30 +708,18 @@
 -- close
 -- close our socket
 function close(self, err)
---     log:info(self, " closing with err: ", err, ")")
-
-       -- assumption is sending and receiving queries are never the same
-       if self.t_httpSending then
-               local errorSink = self.t_httpSending:t_getResponseSink()
+--     log:debug(self, " closing with err: ", err, ")")
+
+       if self.t_httpRequest then
+               local errorSink = self.t_httpRequest:t_getResponseSink()
                if errorSink then
                        errorSink(nil, err)
                end
-               self.t_httpSending = nil
-       end
-
-       if self.t_httpReceiving then
-               local errorSink = self.t_httpReceiving:t_getResponseSink()
-               if errorSink then
-                       errorSink(nil, err)
-               end
-               self.t_httpReceiving = nil
-       end
-
-       self:t_sendNext(false, 't_sendDequeue')
-       self:t_rcvNext(false, 't_rcvDequeue')
-
-       -- FIXME: manage the queues
-       
+               self.t_httpRequest = nil
+       end
+
+       self:t_nextState(false, 't_sendDequeue')
+
        SocketTcp.close(self)
 
        -- restart the state machine if it is idle

_______________________________________________
Jive-checkins mailing list
[email protected]
http://lists.slimdevices.com/cgi-bin/mailman/listinfo/jive-checkins

Reply via email to