Author: titmuss
Date: Wed Feb 13 09:25:44 2008
New Revision: 1930

URL: http://svn.slimdevices.com?rev=1930&root=Jive&view=rev
Log:
 [EMAIL PROTECTED] (orig r1916):  titmuss | 2008-02-12 11:53:53 +0000
 Bug: 7074
 Description:
 Add back in http pipelining, this should reduce the volume latency.
 Increased the block size to 4096.
 For the jive-by-length sink try to read all the data at one time, not in 
blocks.
 
 Bug: 7012
 Description:
 Return HTTP error codes correctly to fix artwork on SN.
 
 

Modified:
    trunk/   (props changed)
    trunk/jive/src/pkg/jive/share/jive/net/SocketHttp.lua

Propchange: trunk/
------------------------------------------------------------------------------
--- svk:merge (original)
+++ svk:merge Wed Feb 13 09:25:44 2008
@@ -1,3 +1,3 @@
-bbe22326-0783-4b3a-ac2b-7ab96b24c8d9:/branches/7.0:1915
+bbe22326-0783-4b3a-ac2b-7ab96b24c8d9:/branches/7.0:1916
 bbe22326-0783-4b3a-ac2b-7ab96b24c8d9:/branches/SN:1083
 bbe22326-0783-4b3a-ac2b-7ab96b24c8d9:/branches/scrolling:1378

Modified: trunk/jive/src/pkg/jive/share/jive/net/SocketHttp.lua
URL: 
http://svn.slimdevices.com/trunk/jive/src/pkg/jive/share/jive/net/SocketHttp.lua?rev=1930&root=Jive&r1=1929&r2=1930&view=diff
==============================================================================
--- trunk/jive/src/pkg/jive/share/jive/net/SocketHttp.lua (original)
+++ trunk/jive/src/pkg/jive/share/jive/net/SocketHttp.lua Wed Feb 13 09:25:44 
2008
@@ -30,8 +30,7 @@
 
 
 -- stuff we use
-local _assert, tostring, tonumber, type = _assert, tostring, tonumber, type
-local setmetatable, pairs = setmetatable, pairs
+local _assert, ipairs, pairs, setmetatable, tostring, tonumber, type = 
_assert, ipairs, pairs, setmetatable, tostring, tonumber, type
 
 local math        = require("math")
 local table       = require("table")
@@ -57,6 +56,8 @@
 oo.class(_M, SocketTcp)
 
 
+local BLOCKSIZE = 4096
+
 local SOCKET_TIMEOUT = 70 -- timeout for socket operations (seconds)
 
 
@@ -71,17 +72,21 @@
 =cut
 --]]
 function __init(self, jnt, address, port, name)
---     log:debug("SocketHttp:__init(", name, ", ", address, ", ", port, ")")
+       log:debug("SocketHttp:__init(", name, ", ", address, ", ", port, ")")
 
        -- init superclass
        local obj = oo.rawnew(self, SocketTcp(jnt, address, port, name))
 
        -- init states
-       obj.t_httpState = 't_sendDequeue'
+       obj.t_httpSendState = 't_sendDequeue'
+       obj.t_httpRecvState = 't_recvDequeue'
        
        -- init queues
        obj.t_httpSendRequests = {}
-       obj.t_httpRequest = false
+       obj.t_httpSendRequest = false
+
+       obj.t_httpRecvRequests = {}
+       obj.t_httpRecvRequest = false
        
        obj.t_httpProtocol = '1.1'
        
@@ -105,27 +110,27 @@
        -- push the request
        table.insert(self.t_httpSendRequests, request)
 
---     log:debug(self, " queuing ", request, " - ", #self.t_httpSendRequests, 
" requests in queue")
+       log:debug(self, " queuing ", request, " - ", #self.t_httpSendRequests, 
" requests in queue")
                
        -- start the state machine if it is idle
        self:t_sendDequeueIfIdle()
 end
 
 
--- t_nextState
+-- t_nextSendState
 -- manages the http state machine for sending stuff to the server
-function t_nextState(self, go, newState)
---     log:debug(self, ":t_nextState(", go, ", ", newState, ")")
+function t_nextSendState(self, go, newState)
+       log:debug(self, ":t_nextSendState(", go, ", ", newState, ")")
 
        if newState then
                _assert(self[newState] and type(self[newState]) == 'function')
-               self.t_httpState = newState
+               self.t_httpSendState = newState
        end
        
        if go then
                -- call the function
                -- self:XXX(bla) is really the same as self["XXX"](self, bla)
-               self[self.t_httpState](self)
+               self[self.t_httpSendState](self)
        end
 end
 
@@ -144,25 +149,14 @@
 -- t_sendDequeue
 -- removes a request from the queue
 function t_sendDequeue(self)
---     log:debug(self, ":t_sendDequeue()")
-
-       self.t_httpRequest = self:_dequeueRequest()
-
-       if self.t_httpRequest then
---             log:debug(self, " processing ", self.t_httpRequest)
-               self:t_nextState(true, 't_sendConnect')
+       log:debug(self, ":t_sendDequeue()")
+
+       self.t_httpSendRequest = self:_dequeueRequest()
+
+       if self.t_httpSendRequest then
+               log:debug(self, " send processing ", self.t_httpSendRequest)
+               self:t_nextSendState(true, 't_sendConnect')
                return
-       end
-       
-       -- back to idle
---     log:debug(self, ": no request in queue")
-
-       if self:connected() then
-               local pump = function(NetworkThreadErr)
-                                    self:close("idle close")
-                            end
-
-               self:t_addRead(pump, 0) -- No timeout
        end
 end
 
@@ -170,8 +164,10 @@
 -- t_sendDequeueIfIdle
 -- causes a dequeue and processing on the send queue if possible
 function t_sendDequeueIfIdle(self)
-       if self.t_httpState == 't_sendDequeue' then
-               self:t_nextState(true)
+       log:debug(self, ":t_sendDequeueIfIdle state=", self.t_httpSendState)
+
+       if self.t_httpSendState == 't_sendDequeue' then
+               self:t_nextSendState(true)
        end
 end
 
@@ -179,7 +175,7 @@
 -- t_sendConnect
 -- open our socket
 function t_sendConnect(self)
---     log:debug(self, ":t_sendConnect()")
+       log:debug(self, ":t_sendConnect()")
        
        if not self:connected() then
                local err = socket.skip(1, self:t_connect())
@@ -191,21 +187,21 @@
                end
        end
                
-       self:t_nextState(true, 't_sendRequest')
+       self:t_nextSendState(true, 't_sendRequest')
 end
 
 
 -- t_getSendHeaders
 -- calculates the headers to send from a socket perspective
 function t_getSendHeaders(self)
---     log:debug(self, ":t_getSendHeaders()")
+       log:debug(self, ":t_getSendHeaders()")
 
        -- default set
        local headers = {
                ["User-Agent"] = 'Jive/' .. JIVE_VERSION,
        }
        
-       local req_headers = self.t_httpRequest:t_getRequestHeaders()
+       local req_headers = self.t_httpSendRequest:t_getRequestHeaders()
        if not req_headers["Host"] then
                local address, port = self:t_getAddressPort()
                headers["Host"] = address
@@ -214,8 +210,8 @@
                end
        end
        
-       if self.t_httpRequest:t_hasBody() then
-               headers["Content-Length"] = #self.t_httpRequest:t_body()
+       if self.t_httpSendRequest:t_hasBody() then
+               headers["Content-Length"] = #self.t_httpSendRequest:t_body()
        end
 
        req_headers["Accept-Language"] = string.lower(locale.getLocale())       
@@ -235,12 +231,12 @@
                }, 
                {
                        __call = function(self, chunk, err)
---                             log:debug("keep-open-non-blocking sink(", chunk 
and #chunk, ", ", tostring(err), ", ", tostring(first), ")")
+                               log:debug("keep-open-non-blocking sink(", chunk 
and #chunk, ", ", tostring(err), ", ", tostring(first), ")")
                                if chunk then 
                                        local res, err
                                        -- if send times out, err is 'timeout' 
and first is updated.
                                        res, err, first = sock:send(chunk, 
first+1)
---                                     log:debug("keep-open-non-blocking sent 
- first is ", tostring(first), " returning ", tostring(res), ", " , 
tostring(err))
+                                       log:debug("keep-open-non-blocking sent 
- first is ", tostring(first), " returning ", tostring(res), ", " , 
tostring(err))
                                        -- we return the err
                                        return res, err
                                else 
@@ -255,11 +251,11 @@
 -- t_sendRequest
 -- send the headers, aggregates request and socket headers
 function t_sendRequest(self)
---     log:debug(self, ":t_sendRequest()")
+       log:debug(self, ":t_sendRequest()")
        
        local source = function()
        
-               local line1 = string.format("%s HTTP/%s", 
self.t_httpRequest:t_getRequestString(), self.t_httpProtocol)
+               local line1 = string.format("%s HTTP/%s", 
self.t_httpSendRequest:t_getRequestString(), self.t_httpProtocol)
 
                local t = {}
                
@@ -268,13 +264,13 @@
                for k, v in pairs(self:t_getSendHeaders()) do
                        table.insert(t, k .. ": " .. v)
                end
-               for k, v in pairs(self.t_httpRequest:t_getRequestHeaders()) do
+               for k, v in pairs(self.t_httpSendRequest:t_getRequestHeaders()) 
do
                        table.insert(t, k .. ": " .. v)
                end
                
                table.insert(t, "")
-               if self.t_httpRequest:t_hasBody() then
-                       table.insert(t, self.t_httpRequest:t_body())
+               if self.t_httpSendRequest:t_hasBody() then
+                       table.insert(t, self.t_httpSendRequest:t_body())
                else
                        table.insert(t, "")
                end
@@ -285,7 +281,7 @@
        local sink = socket.sink('keep-open-non-blocking', self.t_sock)
        
        local pump = function (NetworkThreadErr)
---             log:debug(self, ":t_sendRequest.pump()")
+               log:debug(self, ":t_sendRequest.pump()")
                
                if NetworkThreadErr then
                        log:error(self, ":t_sendRequest.pump: ", 
NetworkThreadErr)
@@ -310,17 +306,76 @@
                
                -- no error, we're done, move on!
                self:t_removeWrite()
-               self:t_nextState(true, 't_rcvHeaders')
+               self:t_nextSendState(true, 't_sendComplete')
        end
 
        self:t_addWrite(pump, SOCKET_TIMEOUT)
 end
+
+
+function t_sendComplete(self)
+       if self.t_httpSendRequest then
+               table.insert(self.t_httpRecvRequests, self.t_httpSendRequest)
+               self.t_httpSendRequest = nil
+       end
+
+       self:t_nextSendState(true, 't_sendDequeue')
+
+       if self.t_httpRecvState == 't_recvDequeue' then
+               self:t_nextRecvState(true)
+       end
+end
+
+
+-- t_nextRecvState
+-- manages the http state machine for receiving stuff to the server
+function t_nextRecvState(self, go, newState)
+       log:debug(self, ":t_nextRecvState(", go, ", ", newState, ")")
+
+       if newState then
+               _assert(self[newState] and type(self[newState]) == 'function')
+               self.t_httpRecvState = newState
+       end
+       
+       if go then
+               -- call the function
+               -- self:XXX(bla) is really the same as self["XXX"](self, bla)
+               self[self.t_httpRecvState](self)
+       end
+end
+
+
+-- t_recvDequeue
+-- removes a request from the queue
+function t_recvDequeue(self)
+       log:debug(self, ":t_recvDequeue() queueLength=", 
#self.t_httpRecvRequests)
+
+       self.t_httpRecvRequest = table.remove(self.t_httpRecvRequests, 1)
+
+       if self.t_httpRecvRequest then
+               log:debug(self, " recv processing ", self.t_httpRecvRequest)
+               self:t_nextRecvState(true, 't_rcvHeaders')
+               return
+       end
+       
+       -- back to idle
+       log:debug(self, ": no request recv in queue")
+
+       if self:connected() then
+               local pump = function(NetworkThreadErr)
+                                    self:close("idle close")
+                            end
+
+               self:t_addRead(pump, 0) -- No timeout
+       end
+end
+
 
 
 -- t_rcvHeaders
 --
 function t_rcvHeaders(self)
---     log:debug(self, ":t_rcvHeaders()")
+       log:debug(self, ":t_rcvHeaders()")
 
        local line, err, partial = true
        local source = function()
@@ -344,7 +399,7 @@
        local statusLine = false
 
        local pump = function (NetworkThreadErr)
---             log:debug(self, ":t_rcvHeaders.pump()")
+               log:debug(self, ":t_rcvHeaders.pump()")
                if NetworkThreadErr then
                        log:error(self, ":t_rcvHeaders.pump:", err)
                        --self:t_removeRead()
@@ -360,10 +415,10 @@
                        end
 
                        local data = socket.skip(2, string.find(line, 
"HTTP/%d*%.%d* (%d%d%d)"))
-                               
+
                        if data then
                                statusCode = tonumber(data)
-                               statusLine = chunk
+                               statusLine = line
                        else
                                self:close(err)
                                return
@@ -389,10 +444,10 @@
                                headers[name] = value
                        else
                                -- we're done
-                               
self.t_httpRequest:t_setResponseHeaders(statusCode, statusLine, headers)
+                               
self.t_httpRecvRequest:t_setResponseHeaders(statusCode, statusLine, headers)
 
                                -- move on to our future...
-                               self:t_nextState(true, 't_rcvResponse')
+                               self:t_nextRecvState(true, 't_rcvResponse')
                                return
                        end
                end
@@ -424,7 +479,7 @@
                                end
                        
                                local chunk, err
-                               chunk, err, partial = 
sock:receive(socket.BLOCKSIZE, partial)
+                               chunk, err, partial = sock:receive(BLOCKSIZE, 
partial)
                                
                                if not err then 
                                        return chunk
@@ -457,10 +512,8 @@
                                        return nil, 'done' 
                                end
                                
-                               local size = math.min(socket.BLOCKSIZE, length)
-                               
                                local chunk, err
-                               chunk, err, partial = sock:receive(size, 
partial)
+                               chunk, err, partial = sock:receive(length, 
partial)
                                
                                if err then -- including timeout
                                        return nil, err 
@@ -497,10 +550,10 @@
                                local chunk, err
                                chunk, err, partial = sock:receive(pattern, 
partial)
 
---                             
log:debug("SocketHttp.jive-http-chunked.source(", chunk and #chunk, ", ", err, 
")")
+                               
log:debug("SocketHttp.jive-http-chunked.source(", chunk and #chunk, ", ", err, 
")")
 
                                if err then
---                                     
log:debug("SocketHttp.jive-http-chunked.source - RETURN err")
+                                       
log:debug("SocketHttp.jive-http-chunked.source - RETURN err")
                                        return nil, err 
                                end
                                
@@ -510,7 +563,7 @@
                                        if not size then 
                                                return nil, "invalid chunk 
size" 
                                        end
---                                     
log:debug("SocketHttp.jive-http-chunked.source - size: ", tostring(size))
+                                       
log:debug("SocketHttp.jive-http-chunked.source - size: ", tostring(size))
                        
                                        -- last chunk ?
                                        if size > 0 then
@@ -523,7 +576,7 @@
                                end
                                
                                if step == 2 then
---                                     
log:debug("SocketHttp.jive-http-chunked.source(", chunk and #chunk, ", ", err, 
", ", part and #part, ")")
+                                       
log:debug("SocketHttp.jive-http-chunked.source(", chunk and #chunk, ", ", err, 
", ", part and #part, ")")
                                        
                                        -- remember chunk, go read terminating 
CRLF
                                        step = 3
@@ -533,7 +586,7 @@
                                end
                                
                                if step == 3 then
---                                     
log:debug("SocketHttp.jive-http-chunked.source 3 (", chunk and #chunk, ", ", 
err, ", ", part and #part, ")")
+                                       
log:debug("SocketHttp.jive-http-chunked.source 3 (", chunk and #chunk, ", ", 
err, ", ", part and #part, ")")
                                        
                                        -- done
                                        step = 1
@@ -553,7 +606,7 @@
 sinkt["jive-concat"] = function(request)
        local data = {}
        return function(chunk, src_err)
---             log:debug("SocketHttp.jive-concat.sink(", chunk and #chunk, ", 
", src_err, ")")
+               log:debug("SocketHttp.jive-concat.sink(", chunk and #chunk, ", 
", src_err, ")")
                
                if src_err and src_err != "done" then
                        -- let the pump handle errors
@@ -569,7 +622,7 @@
                        local blob = table.concat(data)
                        -- let request decide what to do with data
                        request:t_setResponseBody(blob)
---                     log:debug("SocketHttp.jive-concat.sink: done ", #blob)
+                       log:debug("SocketHttp.jive-concat.sink: done ", #blob)
                        return nil
                end
                
@@ -582,7 +635,7 @@
 -- a sink that forwards each received chunk as complete data to the request
 sinkt["jive-by-chunk"] = function(request)
        return function(chunk, src_err)
---             log:debug("SocketHttp.jive-by-chunk.sink(", chunk and #chunk, 
", ", src_err, ")")
+               log:debug("SocketHttp.jive-by-chunk.sink(", chunk and #chunk, 
", ", src_err, ")")
        
                if src_err and src_err != "done" then
                        -- let the pump handle errors
@@ -592,12 +645,12 @@
                -- forward any chunk
                if chunk and chunk != "" then
                        -- let request decide what to do with data
---                     log:debug("SocketHttp.jive-by-chunk.sink: chunk bytes: 
", #chunk)
+                       log:debug("SocketHttp.jive-by-chunk.sink: chunk bytes: 
", #chunk)
                        request:t_setResponseBody(chunk)
                end
 
                if not chunk or src_err == "done" then
---                     log:debug("SocketHttp.jive-by-chunk.sink: done")
+                       log:debug("SocketHttp.jive-by-chunk.sink: done")
                        request:t_setResponseBody(nil)
                        return nil
                end
@@ -625,15 +678,15 @@
        local mode
        local len
 
-       if self.t_httpRequest:t_getResponseHeader('Transfer-Encoding') == 
'chunked' then
+       if self.t_httpRecvRequest:t_getResponseHeader('Transfer-Encoding') == 
'chunked' then
        
                mode = 'jive-http-chunked'
                
        else
                        
-               if self.t_httpRequest:t_getResponseHeader("Content-Length") then
+               if self.t_httpRecvRequest:t_getResponseHeader("Content-Length") 
then
                        -- if we have a length, use it!
-                       len = 
tonumber(self.t_httpRequest:t_getResponseHeader("Content-Length"))
+                       len = 
tonumber(self.t_httpRecvRequest:t_getResponseHeader("Content-Length"))
                        mode = 'jive-by-length'
                        
                else
@@ -642,15 +695,15 @@
                end
        end
 
-       local connectionClose = 
self.t_httpRequest:t_getResponseHeader('Connection') == 'close'
+       local connectionClose = 
self.t_httpRecvRequest:t_getResponseHeader('Connection') == 'close'
        
        local source = socket.source(mode, self.t_sock, len or self)
        
-       local sinkMode = self.t_httpRequest:t_getResponseSinkMode()
-       local sink = _getSink(sinkMode, self.t_httpRequest)
+       local sinkMode = self.t_httpRecvRequest:t_getResponseSinkMode()
+       local sink = _getSink(sinkMode, self.t_httpRecvRequest)
 
        local pump = function (NetworkThreadErr)
---             log:debug(self, ":t_rcvResponse.pump(", mode, ", ", 
tostring(nt_err) , ")")
+               log:debug(self, ":t_rcvResponse.pump(", mode, ", ", 
tostring(nt_err) , ")")
                
                if NetworkThreadErr then
                        log:error(self, ":t_rcvResponse.pump() error:", 
NetworkThreadErr)
@@ -668,7 +721,7 @@
                
                if not continue then
                        -- we're done
---                     log:debug(self, ":t_rcvResponse.pump: done (", err, ")")
+                       log:debug(self, ":t_rcvResponse.pump: done (", err, ")")
                        
                        -- remove read handler
                        self:t_removeRead()
@@ -685,23 +738,32 @@
                        end
 
                        -- move on to our future
-                       self:t_nextState(true, 't_sendDequeue')
+                       self:t_nextRecvState(true, 't_recvComplete')
                end
        end
        
        self:t_addRead(pump, SOCKET_TIMEOUT)
+end
+
+
+function t_recvComplete(self)
+       self.t_httpRecvRequest = nil
+       self:t_nextRecvState(true, 't_recvDequeue')
 end
 
 
 -- free
 -- frees our socket
 function free(self)
---     log:debug(self, ":free()")
+       log:debug(self, ":free()")
 
        -- dump queues
        -- FIXME: should it free requests?
-       self.t_httpSendRequests = nil
-       self.t_httpRequest = nil
+       self.t_httpSendRequests = {}
+       self.t_httpSendRequest = false
+
+       self.t_httpRecvRequests = {}
+       self.t_httpRecvRequest = false
        
        SocketTcp.free(self)
 end
@@ -710,22 +772,41 @@
 -- close
 -- close our socket
 function close(self, err)
---     log:debug(self, " closing with err: ", err, ")")
-
-       if self.t_httpRequest then
-               local errorSink = self.t_httpRequest:t_getResponseSink()
+       log:debug(self, " closing with err: ", err)
+
+       -- error for send request
+       if self.t_httpSendRequest then
+               local errorSink = self.t_httpSendRequest:t_getResponseSink()
                if errorSink then
                        errorSink(nil, err)
                end
-               self.t_httpRequest = nil
-       end
-
-       self:t_nextState(false, 't_sendDequeue')
-
+               self.t_httpSendRequest = nil
+       end
+
+       -- error for pipelined requests
+       for i, request in ipairs(self.t_httpRecvRequests) do
+               local errorSink = self.t_httpRecvRequest:t_getResponseSink()
+               if errorSink then
+                       errorSink(nil, err)
+               end
+       end
+       self.t_httpRecvRequests = {}
+
+       -- error for recv request
+       if self.t_httpRecvRequest then
+               local errorSink = self.t_httpRecvRequest:t_getResponseSink()
+               if errorSink then
+                       errorSink(nil, err)
+               end
+               self.t_httpRecvRequest = nil
+       end
+
+       -- close the socket
        SocketTcp.close(self)
 
-       -- restart the state machine if it is idle
-       self:t_sendDequeueIfIdle()
+       -- start again
+       self:t_nextSendState(true, 't_sendDequeue')
+       self:t_nextRecvState(true, 't_recvDequeue')
 end
 
 

_______________________________________________
Jive-checkins mailing list
[email protected]
http://lists.slimdevices.com/cgi-bin/mailman/listinfo/jive-checkins

Reply via email to