Author: titmuss
Date: Wed Jan 30 09:51:19 2008
New Revision: 1702
URL: http://svn.slimdevices.com?rev=1702&root=Jive&view=rev
Log:
Bug: 6763
Description:
The http code originally allowed for pipelining requests on the socket. This
was never used, and the code contained a race
condition that meant no more requests could be processed by that socket. To fix
this a single state is now kept for both
send and receive operations. Pipelining can be added back later if thought
useful.
Modified:
branches/7.0/jive/src/pkg/jive/share/jive/net/HttpPool.lua
branches/7.0/jive/src/pkg/jive/share/jive/net/SocketHttp.lua
Modified: branches/7.0/jive/src/pkg/jive/share/jive/net/HttpPool.lua
URL:
http://svn.slimdevices.com/branches/7.0/jive/src/pkg/jive/share/jive/net/HttpPool.lua?rev=1702&root=Jive&r1=1701&r2=1702&view=diff
==============================================================================
--- branches/7.0/jive/src/pkg/jive/share/jive/net/HttpPool.lua (original)
+++ branches/7.0/jive/src/pkg/jive/share/jive/net/HttpPool.lua Wed Jan 30
09:51:19 2008
@@ -145,12 +145,16 @@
self.reqQueueCount = self.reqQueueCount + 1
-- calculate threshold
+--[[
local active = math.floor(self.reqQueueCount / self.pool.threshold) + 1
if active > #self.pool.jshq then
active = #self.pool.jshq
end
self.pool.active = active
-
+--]]
+ self.pool.active = #self.pool.jshq
+
+
-- log:debug(self, ":", self.reqQueueCount, " requests, ",
self.pool.active, " connections")
-- kick all active queues
Modified: branches/7.0/jive/src/pkg/jive/share/jive/net/SocketHttp.lua
URL:
http://svn.slimdevices.com/branches/7.0/jive/src/pkg/jive/share/jive/net/SocketHttp.lua?rev=1702&root=Jive&r1=1701&r2=1702&view=diff
==============================================================================
--- branches/7.0/jive/src/pkg/jive/share/jive/net/SocketHttp.lua (original)
+++ branches/7.0/jive/src/pkg/jive/share/jive/net/SocketHttp.lua Wed Jan 30
09:51:19 2008
@@ -71,20 +71,17 @@
=cut
--]]
function __init(self, jnt, address, port, name)
- --log:debug("SocketHttp:__init(", name, ", ", address, ", ", port, ")")
+-- log:debug("SocketHttp:__init(", name, ", ", address, ", ", port, ")")
-- init superclass
local obj = oo.rawnew(self, SocketTcp(jnt, address, port, name))
-- init states
- obj.t_httpSendState = 't_sendDequeue'
- obj.t_httpRcvState = 't_rcvDequeue'
+ obj.t_httpState = 't_sendDequeue'
-- init queues
obj.t_httpSendRequests = {}
- obj.t_httpSending = false
- obj.t_httpRcvRequests = {}
- obj.t_httpReceiving = false
+ obj.t_httpRequest = false
obj.t_httpProtocol = '1.1'
@@ -108,27 +105,27 @@
-- push the request
table.insert(self.t_httpSendRequests, request)
--- log:info(self, " queuing ", request, " - ", #self.t_httpSendRequests, "
requests in queue")
+-- log:debug(self, " queuing ", request, " - ", #self.t_httpSendRequests,
" requests in queue")
-- start the state machine if it is idle
self:t_sendDequeueIfIdle()
end
--- t_sendNext
+-- t_nextState
-- manages the http state machine for sending stuff to the server
-function t_sendNext(self, go, newState)
--- log:debug(self, ":t_sendNext(", go, ", ", newState, ")")
+function t_nextState(self, go, newState)
+-- log:debug(self, ":t_nextState(", go, ", ", newState, ")")
if newState then
_assert(self[newState] and type(self[newState]) == 'function')
- self.t_httpSendState = newState
+ self.t_httpState = newState
end
if go then
-- call the function
-- self:XXX(bla) is really the same as self["XXX"](self, bla)
- self[self.t_httpSendState](self)
+ self[self.t_httpState](self)
end
end
@@ -148,21 +145,21 @@
-- removes a request from the queue
function t_sendDequeue(self)
-- log:debug(self, ":t_sendDequeue()")
-
- self.t_httpSending = self:_dequeueRequest()
-
- if self.t_httpSending then
--- log:info(self, " processing ", self.t_httpSending)
- self:t_sendNext(true, 't_sendConnect')
+
+ self.t_httpRequest = self:_dequeueRequest()
+
+ if self.t_httpRequest then
+-- log:debug(self, " processing ", self.t_httpRequest)
+ self:t_nextState(true, 't_sendConnect')
return
end
-- back to idle
--- log:info(self, ": no request in queue")
+-- log:debug(self, ": no request in queue")
if self:connected() then
local pump = function(NetworkThreadErr)
- self:close("connect closed")
+ self:close("idle close")
end
self:t_addRead(pump, 0) -- No timeout
@@ -173,8 +170,8 @@
-- t_sendDequeueIfIdle
-- causes a dequeue and processing on the send queue if possible
function t_sendDequeueIfIdle(self)
- if self.t_httpSendState == 't_sendDequeue' then
- self:t_sendNext(true)
+ if self.t_httpState == 't_sendDequeue' then
+ self:t_nextState(true)
end
end
@@ -194,7 +191,7 @@
end
end
- self:t_sendNext(true, 't_sendRequest')
+ self:t_nextState(true, 't_sendRequest')
end
@@ -208,7 +205,7 @@
["User-Agent"] = 'Jive/' .. JIVE_VERSION,
}
- local req_headers = self.t_httpSending:t_getRequestHeaders()
+ local req_headers = self.t_httpRequest:t_getRequestHeaders()
if not req_headers["Host"] then
local address, port = self:t_getAddressPort()
headers["Host"] = address
@@ -217,8 +214,8 @@
end
end
- if self.t_httpSending:t_hasBody() then
- headers["Content-Length"] = #self.t_httpSending:t_body()
+ if self.t_httpRequest:t_hasBody() then
+ headers["Content-Length"] = #self.t_httpRequest:t_body()
end
req_headers["Accept-Language"] = string.lower(locale.getLocale())
@@ -262,7 +259,7 @@
local source = function()
- local line1 = string.format("%s HTTP/%s",
self.t_httpSending:t_getRequestString(), self.t_httpProtocol)
+ local line1 = string.format("%s HTTP/%s",
self.t_httpRequest:t_getRequestString(), self.t_httpProtocol)
local t = {}
@@ -271,13 +268,13 @@
for k, v in pairs(self:t_getSendHeaders()) do
table.insert(t, k .. ": " .. v)
end
- for k, v in pairs(self.t_httpSending:t_getRequestHeaders()) do
+ for k, v in pairs(self.t_httpRequest:t_getRequestHeaders()) do
table.insert(t, k .. ": " .. v)
end
table.insert(t, "")
- if self.t_httpSending:t_hasBody() then
- table.insert(t, self.t_httpSending:t_body())
+ if self.t_httpRequest:t_hasBody() then
+ table.insert(t, self.t_httpRequest:t_body())
else
table.insert(t, "")
end
@@ -313,61 +310,10 @@
-- no error, we're done, move on!
self:t_removeWrite()
- self:t_sendNext(true, 't_sendReceive')
+ self:t_nextState(true, 't_rcvHeaders')
end
self:t_addWrite(pump, SOCKET_TIMEOUT)
-end
-
-
--- t_sendReceive
---
-function t_sendReceive(self)
--- log:debug(self, ":t_sendReceive()")
-
- -- we're done sending request, add it to receive queue
- if self.t_httpSending then
- table.insert(self.t_httpRcvRequests, self.t_httpSending)
- self.t_httpSending = nil
-
- -- get the receive game rolling if possible
- if self.t_httpRcvState == 't_rcvDequeue' then
- self:t_rcvNext(true)
- end
- end
-
- -- stay put until told we can dequeue next
- self:t_sendNext(false, 't_sendReceive')
-end
-
-
--- t_rcvNext
--- manages the http state machine for receiving server data
-function t_rcvNext(self, go, newState)
--- log:debug(self, ":t_rcvNext(", go, ", ", newState, ")")
-
- if newState then
- _assert(self[newState] and type(self[newState]) == 'function')
- self.t_httpRcvState = newState
- end
-
- if go then
- -- call the function
- -- self:XXX(bla) is really the same as self["XXX"](self, bla)
- self[self.t_httpRcvState](self)
- end
-end
-
-
--- t_rcvDequeue
---
-function t_rcvDequeue(self)
--- log:debug(self, ":t_rcvDequeue()")
-
- if #self.t_httpRcvRequests > 0 then
- self.t_httpReceiving = table.remove(self.t_httpRcvRequests, 1)
- self:t_rcvNext(true, 't_rcvHeaders')
- end
end
@@ -381,7 +327,7 @@
line, err, partial = self.t_sock:receive('*l', partial)
if err then
if err == 'timeout' then
- return
+ return false, err
end
log:error(self, ":t_rcvHeaders.pump:", err)
@@ -410,7 +356,7 @@
if not statusCode then
local line, err = source()
if err then
- return false, err
+ return
end
local data = socket.skip(2, string.find(line,
"HTTP/%d*%.%d* (%d%d%d)"))
@@ -419,7 +365,8 @@
statusCode = tonumber(data)
statusLine = chunk
else
- return false, "cannot parse: " .. chunk
+ self:close(err)
+ return
end
end
@@ -427,7 +374,7 @@
while true do
local line, err = source()
if err then
- return false, err
+ return
end
if line ~= "" then
@@ -436,19 +383,16 @@
err = "malformed reponse headers"
log:warn(err)
self:close(err)
- return false, err
+ return
end
headers[name] = value
else
-- we're done
-
self.t_httpReceiving:t_setResponseHeaders(statusCode, statusLine, headers)
-
- -- release send queue
- self:t_sendNext(false, 't_sendDequeue')
-
+
self.t_httpRequest:t_setResponseHeaders(statusCode, statusLine, headers)
+
-- move on to our future...
- self:t_rcvNext(true, 't_rcvResponse')
+ self:t_nextState(true, 't_rcvResponse')
return
end
end
@@ -678,21 +622,18 @@
-- t_rcvResponse
-- acrobatics to read the response body
function t_rcvResponse(self)
-
local mode
local len
- local httpReceiving = self.t_httpReceiving
-
- if httpReceiving:t_getResponseHeader('Transfer-Encoding') == 'chunked'
then
+ if self.t_httpRequest:t_getResponseHeader('Transfer-Encoding') ==
'chunked' then
mode = 'jive-http-chunked'
else
- if httpReceiving:t_getResponseHeader("Content-Length") then
+ if self.t_httpRequest:t_getResponseHeader("Content-Length") then
-- if we have a length, use it!
- len =
tonumber(httpReceiving:t_getResponseHeader("Content-Length"))
+ len =
tonumber(self.t_httpRequest:t_getResponseHeader("Content-Length"))
mode = 'jive-by-length'
else
@@ -703,8 +644,8 @@
local source = socket.source(mode, self.t_sock, len or self)
- local sinkMode = httpReceiving:t_getResponseSinkMode()
- local sink = _getSink(sinkMode, httpReceiving)
+ local sinkMode = self.t_httpRequest:t_getResponseSinkMode()
+ local sink = _getSink(sinkMode, self.t_httpRequest)
local pump = function (NetworkThreadErr)
-- log:debug(self, ":t_rcvResponse.pump(", mode, ", ",
tostring(nt_err) , ")")
@@ -736,38 +677,17 @@
return
end
- if httpReceiving:t_getResponseHeader('Connection') ==
'close' then
+ if self.t_httpRequest:t_getResponseHeader('Connection')
== 'close' then
-- just close the socket, don't reset our state
SocketTcp.close(self)
end
-- move on to our future
- self:t_rcvNext(true, 't_rcvSend')
+ self:t_nextState(true, 't_sendDequeue')
end
end
self:t_addRead(pump, SOCKET_TIMEOUT)
-end
-
-
--- t_rcvSend
---
-function t_rcvSend(self)
--- log:debug(self, ":t_rcvSend()")
-
- -- we're done receiving request, drop it
- if self.t_httpReceiving then
-
--- log:debug(self, " done with ", self.t_httpReceiving)
-
- self.t_httpReceiving = nil
-
- -- get the send game rolling if possible
- self:t_sendDequeueIfIdle()
- end
-
- -- move to dequeue
- self:t_rcvNext(true, 't_rcvDequeue')
end
@@ -779,9 +699,7 @@
-- dump queues
-- FIXME: should it free requests?
self.t_httpSendRequests = nil
- self.t_httpSending = nil
- self.t_httpRcvRequests = nil
- self.t_httpReceiving = nil
+ self.t_httpRequest = nil
SocketTcp.free(self)
end
@@ -790,30 +708,18 @@
-- close
-- close our socket
function close(self, err)
--- log:info(self, " closing with err: ", err, ")")
-
- -- assumption is sending and receiving queries are never the same
- if self.t_httpSending then
- local errorSink = self.t_httpSending:t_getResponseSink()
+-- log:debug(self, " closing with err: ", err, ")")
+
+ if self.t_httpRequest then
+ local errorSink = self.t_httpRequest:t_getResponseSink()
if errorSink then
errorSink(nil, err)
end
- self.t_httpSending = nil
- end
-
- if self.t_httpReceiving then
- local errorSink = self.t_httpReceiving:t_getResponseSink()
- if errorSink then
- errorSink(nil, err)
- end
- self.t_httpReceiving = nil
- end
-
- self:t_sendNext(false, 't_sendDequeue')
- self:t_rcvNext(false, 't_rcvDequeue')
-
- -- FIXME: manage the queues
-
+ self.t_httpRequest = nil
+ end
+
+ self:t_nextState(false, 't_sendDequeue')
+
SocketTcp.close(self)
-- restart the state machine if it is idle
_______________________________________________
Jive-checkins mailing list
[email protected]
http://lists.slimdevices.com/cgi-bin/mailman/listinfo/jive-checkins