Author: titmuss
Date: Tue Feb 12 03:53:53 2008
New Revision: 1916
URL: http://svn.slimdevices.com?rev=1916&root=Jive&view=rev
Log:
Bug: 7074
Description:
Add back in http pipelining, this should reduce the volume latency.
Increased the block size to 4096.
For the jive-by-length sink try to read all the data at one time, not in blocks.
Bug: 7012
Description:
Return HTTP error codes correctly to fix artwork on SN.
Modified:
branches/7.0/jive/src/pkg/jive/share/jive/net/SocketHttp.lua
Modified: branches/7.0/jive/src/pkg/jive/share/jive/net/SocketHttp.lua
URL:
http://svn.slimdevices.com/branches/7.0/jive/src/pkg/jive/share/jive/net/SocketHttp.lua?rev=1916&root=Jive&r1=1915&r2=1916&view=diff
==============================================================================
--- branches/7.0/jive/src/pkg/jive/share/jive/net/SocketHttp.lua (original)
+++ branches/7.0/jive/src/pkg/jive/share/jive/net/SocketHttp.lua Tue Feb 12
03:53:53 2008
@@ -30,8 +30,7 @@
-- stuff we use
-local _assert, tostring, tonumber, type = _assert, tostring, tonumber, type
-local setmetatable, pairs = setmetatable, pairs
+local _assert, ipairs, pairs, setmetatable, tostring, tonumber, type =
_assert, ipairs, pairs, setmetatable, tostring, tonumber, type
local math = require("math")
local table = require("table")
@@ -57,6 +56,8 @@
oo.class(_M, SocketTcp)
+local BLOCKSIZE = 4096
+
local SOCKET_TIMEOUT = 70 -- timeout for socket operations (seconds)
@@ -71,17 +72,21 @@
=cut
--]]
function __init(self, jnt, address, port, name)
--- log:debug("SocketHttp:__init(", name, ", ", address, ", ", port, ")")
+ log:debug("SocketHttp:__init(", name, ", ", address, ", ", port, ")")
-- init superclass
local obj = oo.rawnew(self, SocketTcp(jnt, address, port, name))
-- init states
- obj.t_httpState = 't_sendDequeue'
+ obj.t_httpSendState = 't_sendDequeue'
+ obj.t_httpRecvState = 't_recvDequeue'
-- init queues
obj.t_httpSendRequests = {}
- obj.t_httpRequest = false
+ obj.t_httpSendRequest = false
+
+ obj.t_httpRecvRequests = {}
+ obj.t_httpRecvRequest = false
obj.t_httpProtocol = '1.1'
@@ -105,27 +110,27 @@
-- push the request
table.insert(self.t_httpSendRequests, request)
--- log:debug(self, " queuing ", request, " - ", #self.t_httpSendRequests,
" requests in queue")
+ log:debug(self, " queuing ", request, " - ", #self.t_httpSendRequests,
" requests in queue")
-- start the state machine if it is idle
self:t_sendDequeueIfIdle()
end
--- t_nextState
+-- t_nextSendState
-- manages the http state machine for sending stuff to the server
-function t_nextState(self, go, newState)
--- log:debug(self, ":t_nextState(", go, ", ", newState, ")")
+function t_nextSendState(self, go, newState)
+ log:debug(self, ":t_nextSendState(", go, ", ", newState, ")")
if newState then
_assert(self[newState] and type(self[newState]) == 'function')
- self.t_httpState = newState
+ self.t_httpSendState = newState
end
if go then
-- call the function
-- self:XXX(bla) is really the same as self["XXX"](self, bla)
- self[self.t_httpState](self)
+ self[self.t_httpSendState](self)
end
end
@@ -144,25 +149,14 @@
-- t_sendDequeue
-- removes a request from the queue
function t_sendDequeue(self)
--- log:debug(self, ":t_sendDequeue()")
-
- self.t_httpRequest = self:_dequeueRequest()
-
- if self.t_httpRequest then
--- log:debug(self, " processing ", self.t_httpRequest)
- self:t_nextState(true, 't_sendConnect')
+ log:debug(self, ":t_sendDequeue()")
+
+ self.t_httpSendRequest = self:_dequeueRequest()
+
+ if self.t_httpSendRequest then
+ log:debug(self, " send processing ", self.t_httpSendRequest)
+ self:t_nextSendState(true, 't_sendConnect')
return
- end
-
- -- back to idle
--- log:debug(self, ": no request in queue")
-
- if self:connected() then
- local pump = function(NetworkThreadErr)
- self:close("idle close")
- end
-
- self:t_addRead(pump, 0) -- No timeout
end
end
@@ -170,8 +164,10 @@
-- t_sendDequeueIfIdle
-- causes a dequeue and processing on the send queue if possible
function t_sendDequeueIfIdle(self)
- if self.t_httpState == 't_sendDequeue' then
- self:t_nextState(true)
+ log:debug(self, ":t_sendDequeueIfIdle state=", self.t_httpSendState)
+
+ if self.t_httpSendState == 't_sendDequeue' then
+ self:t_nextSendState(true)
end
end
@@ -179,7 +175,7 @@
-- t_sendConnect
-- open our socket
function t_sendConnect(self)
--- log:debug(self, ":t_sendConnect()")
+ log:debug(self, ":t_sendConnect()")
if not self:connected() then
local err = socket.skip(1, self:t_connect())
@@ -191,21 +187,21 @@
end
end
- self:t_nextState(true, 't_sendRequest')
+ self:t_nextSendState(true, 't_sendRequest')
end
-- t_getSendHeaders
-- calculates the headers to send from a socket perspective
function t_getSendHeaders(self)
--- log:debug(self, ":t_getSendHeaders()")
+ log:debug(self, ":t_getSendHeaders()")
-- default set
local headers = {
["User-Agent"] = 'Jive/' .. JIVE_VERSION,
}
- local req_headers = self.t_httpRequest:t_getRequestHeaders()
+ local req_headers = self.t_httpSendRequest:t_getRequestHeaders()
if not req_headers["Host"] then
local address, port = self:t_getAddressPort()
headers["Host"] = address
@@ -214,8 +210,8 @@
end
end
- if self.t_httpRequest:t_hasBody() then
- headers["Content-Length"] = #self.t_httpRequest:t_body()
+ if self.t_httpSendRequest:t_hasBody() then
+ headers["Content-Length"] = #self.t_httpSendRequest:t_body()
end
req_headers["Accept-Language"] = string.lower(locale.getLocale())
@@ -235,12 +231,12 @@
},
{
__call = function(self, chunk, err)
--- log:debug("keep-open-non-blocking sink(", chunk
and #chunk, ", ", tostring(err), ", ", tostring(first), ")")
+ log:debug("keep-open-non-blocking sink(", chunk
and #chunk, ", ", tostring(err), ", ", tostring(first), ")")
if chunk then
local res, err
-- if send times out, err is 'timeout'
and first is updated.
res, err, first = sock:send(chunk,
first+1)
--- log:debug("keep-open-non-blocking sent
- first is ", tostring(first), " returning ", tostring(res), ", " ,
tostring(err))
+ log:debug("keep-open-non-blocking sent
- first is ", tostring(first), " returning ", tostring(res), ", " ,
tostring(err))
-- we return the err
return res, err
else
@@ -255,11 +251,11 @@
-- t_sendRequest
-- send the headers, aggregates request and socket headers
function t_sendRequest(self)
--- log:debug(self, ":t_sendRequest()")
+ log:debug(self, ":t_sendRequest()")
local source = function()
- local line1 = string.format("%s HTTP/%s",
self.t_httpRequest:t_getRequestString(), self.t_httpProtocol)
+ local line1 = string.format("%s HTTP/%s",
self.t_httpSendRequest:t_getRequestString(), self.t_httpProtocol)
local t = {}
@@ -268,13 +264,13 @@
for k, v in pairs(self:t_getSendHeaders()) do
table.insert(t, k .. ": " .. v)
end
- for k, v in pairs(self.t_httpRequest:t_getRequestHeaders()) do
+ for k, v in pairs(self.t_httpSendRequest:t_getRequestHeaders())
do
table.insert(t, k .. ": " .. v)
end
table.insert(t, "")
- if self.t_httpRequest:t_hasBody() then
- table.insert(t, self.t_httpRequest:t_body())
+ if self.t_httpSendRequest:t_hasBody() then
+ table.insert(t, self.t_httpSendRequest:t_body())
else
table.insert(t, "")
end
@@ -285,7 +281,7 @@
local sink = socket.sink('keep-open-non-blocking', self.t_sock)
local pump = function (NetworkThreadErr)
--- log:debug(self, ":t_sendRequest.pump()")
+ log:debug(self, ":t_sendRequest.pump()")
if NetworkThreadErr then
log:error(self, ":t_sendRequest.pump: ",
NetworkThreadErr)
@@ -310,17 +306,76 @@
-- no error, we're done, move on!
self:t_removeWrite()
- self:t_nextState(true, 't_rcvHeaders')
+ self:t_nextSendState(true, 't_sendComplete')
end
self:t_addWrite(pump, SOCKET_TIMEOUT)
end
+
+
+function t_sendComplete(self)
+ if self.t_httpSendRequest then
+ table.insert(self.t_httpRecvRequests, self.t_httpSendRequest)
+ self.t_httpSendRequest = nil
+ end
+
+ self:t_nextSendState(true, 't_sendDequeue')
+
+ if self.t_httpRecvState == 't_recvDequeue' then
+ self:t_nextRecvState(true)
+ end
+end
+
+
+-- t_nextRecvState
+-- manages the http state machine for receiving stuff to the server
+function t_nextRecvState(self, go, newState)
+ log:debug(self, ":t_nextRecvState(", go, ", ", newState, ")")
+
+ if newState then
+ _assert(self[newState] and type(self[newState]) == 'function')
+ self.t_httpRecvState = newState
+ end
+
+ if go then
+ -- call the function
+ -- self:XXX(bla) is really the same as self["XXX"](self, bla)
+ self[self.t_httpRecvState](self)
+ end
+end
+
+
+-- t_recvDequeue
+-- removes a request from the queue
+function t_recvDequeue(self)
+ log:debug(self, ":t_recvDequeue() queueLength=",
#self.t_httpRecvRequests)
+
+ self.t_httpRecvRequest = table.remove(self.t_httpRecvRequests, 1)
+
+ if self.t_httpRecvRequest then
+ log:debug(self, " recv processing ", self.t_httpRecvRequest)
+ self:t_nextRecvState(true, 't_rcvHeaders')
+ return
+ end
+
+ -- back to idle
+ log:debug(self, ": no request recv in queue")
+
+ if self:connected() then
+ local pump = function(NetworkThreadErr)
+ self:close("idle close")
+ end
+
+ self:t_addRead(pump, 0) -- No timeout
+ end
+end
+
-- t_rcvHeaders
--
function t_rcvHeaders(self)
--- log:debug(self, ":t_rcvHeaders()")
+ log:debug(self, ":t_rcvHeaders()")
local line, err, partial = true
local source = function()
@@ -344,7 +399,7 @@
local statusLine = false
local pump = function (NetworkThreadErr)
--- log:debug(self, ":t_rcvHeaders.pump()")
+ log:debug(self, ":t_rcvHeaders.pump()")
if NetworkThreadErr then
log:error(self, ":t_rcvHeaders.pump:", err)
--self:t_removeRead()
@@ -360,10 +415,10 @@
end
local data = socket.skip(2, string.find(line,
"HTTP/%d*%.%d* (%d%d%d)"))
-
+
if data then
statusCode = tonumber(data)
- statusLine = chunk
+ statusLine = line
else
self:close(err)
return
@@ -389,10 +444,10 @@
headers[name] = value
else
-- we're done
-
self.t_httpRequest:t_setResponseHeaders(statusCode, statusLine, headers)
+
self.t_httpRecvRequest:t_setResponseHeaders(statusCode, statusLine, headers)
-- move on to our future...
- self:t_nextState(true, 't_rcvResponse')
+ self:t_nextRecvState(true, 't_rcvResponse')
return
end
end
@@ -424,7 +479,7 @@
end
local chunk, err
- chunk, err, partial =
sock:receive(socket.BLOCKSIZE, partial)
+ chunk, err, partial = sock:receive(BLOCKSIZE,
partial)
if not err then
return chunk
@@ -457,10 +512,8 @@
return nil, 'done'
end
- local size = math.min(socket.BLOCKSIZE, length)
-
local chunk, err
- chunk, err, partial = sock:receive(size,
partial)
+ chunk, err, partial = sock:receive(length,
partial)
if err then -- including timeout
return nil, err
@@ -497,10 +550,10 @@
local chunk, err
chunk, err, partial = sock:receive(pattern,
partial)
---
log:debug("SocketHttp.jive-http-chunked.source(", chunk and #chunk, ", ", err,
")")
+
log:debug("SocketHttp.jive-http-chunked.source(", chunk and #chunk, ", ", err,
")")
if err then
---
log:debug("SocketHttp.jive-http-chunked.source - RETURN err")
+
log:debug("SocketHttp.jive-http-chunked.source - RETURN err")
return nil, err
end
@@ -510,7 +563,7 @@
if not size then
return nil, "invalid chunk
size"
end
---
log:debug("SocketHttp.jive-http-chunked.source - size: ", tostring(size))
+
log:debug("SocketHttp.jive-http-chunked.source - size: ", tostring(size))
-- last chunk ?
if size > 0 then
@@ -523,7 +576,7 @@
end
if step == 2 then
---
log:debug("SocketHttp.jive-http-chunked.source(", chunk and #chunk, ", ", err,
", ", part and #part, ")")
+
log:debug("SocketHttp.jive-http-chunked.source(", chunk and #chunk, ", ", err,
", ", part and #part, ")")
-- remember chunk, go read terminating
CRLF
step = 3
@@ -533,7 +586,7 @@
end
if step == 3 then
---
log:debug("SocketHttp.jive-http-chunked.source 3 (", chunk and #chunk, ", ",
err, ", ", part and #part, ")")
+
log:debug("SocketHttp.jive-http-chunked.source 3 (", chunk and #chunk, ", ",
err, ", ", part and #part, ")")
-- done
step = 1
@@ -553,7 +606,7 @@
sinkt["jive-concat"] = function(request)
local data = {}
return function(chunk, src_err)
--- log:debug("SocketHttp.jive-concat.sink(", chunk and #chunk, ",
", src_err, ")")
+ log:debug("SocketHttp.jive-concat.sink(", chunk and #chunk, ",
", src_err, ")")
if src_err and src_err != "done" then
-- let the pump handle errors
@@ -569,7 +622,7 @@
local blob = table.concat(data)
-- let request decide what to do with data
request:t_setResponseBody(blob)
--- log:debug("SocketHttp.jive-concat.sink: done ", #blob)
+ log:debug("SocketHttp.jive-concat.sink: done ", #blob)
return nil
end
@@ -582,7 +635,7 @@
-- a sink that forwards each received chunk as complete data to the request
sinkt["jive-by-chunk"] = function(request)
return function(chunk, src_err)
--- log:debug("SocketHttp.jive-by-chunk.sink(", chunk and #chunk,
", ", src_err, ")")
+ log:debug("SocketHttp.jive-by-chunk.sink(", chunk and #chunk,
", ", src_err, ")")
if src_err and src_err != "done" then
-- let the pump handle errors
@@ -592,12 +645,12 @@
-- forward any chunk
if chunk and chunk != "" then
-- let request decide what to do with data
--- log:debug("SocketHttp.jive-by-chunk.sink: chunk bytes:
", #chunk)
+ log:debug("SocketHttp.jive-by-chunk.sink: chunk bytes:
", #chunk)
request:t_setResponseBody(chunk)
end
if not chunk or src_err == "done" then
--- log:debug("SocketHttp.jive-by-chunk.sink: done")
+ log:debug("SocketHttp.jive-by-chunk.sink: done")
request:t_setResponseBody(nil)
return nil
end
@@ -625,15 +678,15 @@
local mode
local len
- if self.t_httpRequest:t_getResponseHeader('Transfer-Encoding') ==
'chunked' then
+ if self.t_httpRecvRequest:t_getResponseHeader('Transfer-Encoding') ==
'chunked' then
mode = 'jive-http-chunked'
else
- if self.t_httpRequest:t_getResponseHeader("Content-Length") then
+ if self.t_httpRecvRequest:t_getResponseHeader("Content-Length")
then
-- if we have a length, use it!
- len =
tonumber(self.t_httpRequest:t_getResponseHeader("Content-Length"))
+ len =
tonumber(self.t_httpRecvRequest:t_getResponseHeader("Content-Length"))
mode = 'jive-by-length'
else
@@ -642,15 +695,15 @@
end
end
- local connectionClose =
self.t_httpRequest:t_getResponseHeader('Connection') == 'close'
+ local connectionClose =
self.t_httpRecvRequest:t_getResponseHeader('Connection') == 'close'
local source = socket.source(mode, self.t_sock, len or self)
- local sinkMode = self.t_httpRequest:t_getResponseSinkMode()
- local sink = _getSink(sinkMode, self.t_httpRequest)
+ local sinkMode = self.t_httpRecvRequest:t_getResponseSinkMode()
+ local sink = _getSink(sinkMode, self.t_httpRecvRequest)
local pump = function (NetworkThreadErr)
--- log:debug(self, ":t_rcvResponse.pump(", mode, ", ",
tostring(nt_err) , ")")
+ log:debug(self, ":t_rcvResponse.pump(", mode, ", ",
tostring(nt_err) , ")")
if NetworkThreadErr then
log:error(self, ":t_rcvResponse.pump() error:",
NetworkThreadErr)
@@ -668,7 +721,7 @@
if not continue then
-- we're done
--- log:debug(self, ":t_rcvResponse.pump: done (", err, ")")
+ log:debug(self, ":t_rcvResponse.pump: done (", err, ")")
-- remove read handler
self:t_removeRead()
@@ -685,23 +738,32 @@
end
-- move on to our future
- self:t_nextState(true, 't_sendDequeue')
+ self:t_nextRecvState(true, 't_recvComplete')
end
end
self:t_addRead(pump, SOCKET_TIMEOUT)
+end
+
+
+function t_recvComplete(self)
+ self.t_httpRecvRequest = nil
+ self:t_nextRecvState(true, 't_recvDequeue')
end
-- free
-- frees our socket
function free(self)
--- log:debug(self, ":free()")
+ log:debug(self, ":free()")
-- dump queues
-- FIXME: should it free requests?
- self.t_httpSendRequests = nil
- self.t_httpRequest = nil
+ self.t_httpSendRequests = {}
+ self.t_httpSendRequest = false
+
+ self.t_httpRecvRequests = {}
+ self.t_httpRecvRequest = false
SocketTcp.free(self)
end
@@ -710,22 +772,41 @@
-- close
-- close our socket
function close(self, err)
--- log:debug(self, " closing with err: ", err, ")")
-
- if self.t_httpRequest then
- local errorSink = self.t_httpRequest:t_getResponseSink()
+ log:debug(self, " closing with err: ", err)
+
+ -- error for send request
+ if self.t_httpSendRequest then
+ local errorSink = self.t_httpSendRequest:t_getResponseSink()
if errorSink then
errorSink(nil, err)
end
- self.t_httpRequest = nil
- end
-
- self:t_nextState(false, 't_sendDequeue')
-
+ self.t_httpSendRequest = nil
+ end
+
+ -- error for pipelined requests
+ for i, request in ipairs(self.t_httpRecvRequests) do
+ local errorSink = self.t_httpRecvRequest:t_getResponseSink()
+ if errorSink then
+ errorSink(nil, err)
+ end
+ end
+ self.t_httpRecvRequests = {}
+
+ -- error for recv request
+ if self.t_httpRecvRequest then
+ local errorSink = self.t_httpRecvRequest:t_getResponseSink()
+ if errorSink then
+ errorSink(nil, err)
+ end
+ self.t_httpRecvRequest = nil
+ end
+
+ -- close the socket
SocketTcp.close(self)
- -- restart the state machine if it is idle
- self:t_sendDequeueIfIdle()
+ -- start again
+ self:t_nextSendState(true, 't_sendDequeue')
+ self:t_nextRecvState(true, 't_recvDequeue')
end
_______________________________________________
Jive-checkins mailing list
[email protected]
http://lists.slimdevices.com/cgi-bin/mailman/listinfo/jive-checkins