Hi, list! I'm on Ubuntu 9.10 x64, Lua 5.1.4. List of installed rocks below.
We're debugging a concurrency bug in our code. I'm still trying to build a minimal reproducible example, but I've decided to post this early in case someone get an idea. Some psionic debugging skills are required to read further. :-) Info I give here is a bit vague, sorry. But I'm able to reliably reproduce the bug. So, if any specific details are required, I should be able to provide them. 1. I have a copas-based server with two commands: PUT <channel> <data> If there is client's connection subscribed to this channel (see below), sends data to that connection immediately and unsubscribes client. Otherwise puts data to channel's FIFO queue. FETCH <channel> If channel's queue is not empty, gets front message and sends it to client. Otherwise, subscribes client to the channel (storing its connection). 2. I've got a test case with three clients: * One PUTs data into a channel (putter), closing and reopening connection each time. * Two others FETCH messages (fetchers). Fetchers fetch data from channels in a loop. Fetchers use single connection without closing it. After a time, either one of fetchers fails with error that its connection was unexpectedly closed. Debugging shows that connection is closed on the following scenario: 1. Fetcher subscribes to a channel, its connection is stored away. 2. After some time, putter sends PUT for the channel and closes the connection. 3. PUT handler detects that channel has subscibers, and sends data to fetcher via stored connection. 4. PUT handler detects that connection was closed and exits from coroutine. 5. Copas code detects that coroutine exited and closes the connection. The problem is that copas closes fetcher's connection instead of putters. I've added some logging to copas code (attached). Here is the log (somewhat boiled down): # Fetcher's connection 0x2248808 goes to wait XXX AFTER resume co: thread: 0x2243310 skt: tcp{client}: 0x2248808 XXX # Putter connects with data for the channel XXX ACCEPTED client: tcp{client}: 0x22d6698 input: tcp{server}: 0x2246758 handler: function: 0x2242a50 XXX BEFORE resume co: thread: 0x21e44c0 skt: tcp{client}: 0x22d6698 XXX # PUT handler sends data to fetcher XXX AFTER resume co: thread: 0x21e44c0 skt: tcp{client}: 0x22d6698 XXX # Send returns XXX BEFORE resume co: thread: 0x21e44c0 skt: tcp{client}: 0x2248808 XXX # No more data in connection [2009-12-14 15:47:55.958395] {28237} [BCO!] failed to read command: closed tcp{client}: 0x22d6698 # Coroutine closes XXX AFTER resume co: thread: 0x21e44c0 skt: tcp{client}: 0x2248808 XXX # Copas closes the wrong connection XXX closing tcp{client}: 0x2248808 CO thread: 0x21e44c0 OK: true RES: nil NEW_Q: nil XXX here Any ideas? May I provide any extra info? Alexander. P.S. Sorry, I can't remember if I've modified copas.lua after I've got the log. Anyway, the meaning is still the same. $ luarocks list Installed rocks: ---------------- copas 1.1.5-1 (installed) - /usr/local/lib/luarocks/rocks coxpcall 1.13.0-1 (installed) - /usr/local/lib/luarocks/rocks luafilesystem 1.5.0-1 (installed) - /usr/local/lib/luarocks/rocks luaposix 5.1.2-1 (installed) - /usr/local/lib/luarocks/rocks luasocket 2.0.2-2 (installed) - /usr/local/lib/luarocks/rocks rings 1.2.3-1 (installed) - /usr/local/lib/luarocks/rocks wsapi cvs-3 (installed) - /usr/local/lib/luarocks/rocks wsapi-fcgi cvs-2 (installed) - /usr/local/lib/luarocks/rocks xavante 2.1.0-1 (installed) - /usr/local/lib/luarocks/rocks
------------------------------------------------------------------------------- -- Copas - Coroutine Oriented Portable Asynchronous Services -- -- A dispatcher based on coroutines that can be used by TCP/IP servers. -- Uses LuaSocket as the interface with the TCP/IP stack. -- -- Authors: Andre Carregal and Javier Guerra -- Contributors: Diego Nehab, Mike Pall, David Burgess, Leonardo Godinho, -- Thomas Harning Jr. and Gary NG -- -- Copyright 2005 - Kepler Project (www.keplerproject.org) -- -- $Id: copas.lua,v 1.37 2009/04/07 22:09:52 carregal Exp $ ------------------------------------------------------------------------------- local socket = require "socket" require "coxpcall" local WATCH_DOG_TIMEOUT = 120 -- Redefines LuaSocket functions with coroutine safe versions -- (this allows the use of socket.http from within copas) local function statusHandler(status, ...) if status then return ... end return nil, ... end function socket.protect(func) return function (...) return statusHandler(copcall(func, ...)) end end function socket.newtry(finalizer) return function (...) local status = (...) or false if (status==false)then copcall(finalizer, select(2, ...) ) error((select(2, ...)), 0) end return ... end end -- end of LuaSocket redefinitions module ("copas", package.seeall) -- Meta information is public even if beginning with an "_" _COPYRIGHT = "Copyright (C) 2005 Kepler Project" _DESCRIPTION = "Coroutine Oriented Portable Asynchronous Services" _VERSION = "Copas 1.1.5" ------------------------------------------------------------------------------- -- Simple set implementation based on LuaSocket's tinyirc.lua example -- adds a FIFO queue for each value in the set ------------------------------------------------------------------------------- local function newset() local reverse = {} local set = {} local q = {} setmetatable(set, { __index = { insert = function(set, value) if not reverse[value] then set[#set + 1] = value reverse[value] = #set end end, remove = function(set, value) local index = reverse[value] if index then reverse[value] = nil local top = set[#set] set[#set] = nil if top ~= value then reverse[top] = index set[index] = top end end end, push = function (set, key, itm) local qKey = q[key] if qKey == nil then q[key] = {itm} else qKey[#qKey + 1] = itm end end, pop = function (set, key) local t = q[key] if t ~= nil then local ret = table.remove (t, 1) if t[1] == nil then q[key] = nil end return ret end end }}) return set end local _servers = newset() -- servers being handled local _reading_log = {} local _writing_log = {} local _reading = newset() -- sockets currently being read local _writing = newset() -- sockets currently being written ------------------------------------------------------------------------------- -- Coroutine based socket I/O functions. ------------------------------------------------------------------------------- -- reads a pattern from a client and yields to the reading set on timeouts function receive(client, pattern, part) local s, err pattern = pattern or "*l" repeat s, err, part = client:receive(pattern, part) if s or err ~= "timeout" then _reading_log[client] = nil return s, err, part end _reading_log[client] = os.time() coroutine.yield(client, _reading) until false end -- same as above but with special treatment when reading chunks, -- unblocks on any data received. function receivePartial(client, pattern) local s, err, part pattern = pattern or "*l" repeat s, err, part = client:receive(pattern) if s or ( (type(pattern)=="number") and part~="" and part ~=nil ) or err ~= "timeout" then _reading_log[client] = nil return s, err, part end _reading_log[client] = os.time() coroutine.yield(client, _reading) until false end -- sends data to a client. The operation is buffered and -- yields to the writing set on timeouts function send(client,data, from, to) local s, err,sent from = from or 1 local lastIndex = from - 1 repeat s, err, lastIndex = client:send(data, lastIndex + 1, to) -- adds extra corrotine swap -- garantees that high throuput dont take other threads to starvation if (math.random(100) > 90) then _writing_log[client] = os.time() coroutine.yield(client, _writing) end if s or err ~= "timeout" then _writing_log[client] = nil return s, err,lastIndex end _writing_log[client] = os.time() coroutine.yield(client, _writing) until false end -- waits until connection is completed function connect(skt, host, port) skt:settimeout(0) local ret, err repeat ret, err = skt:connect (host, port) if ret or err ~= "timeout" then _writing_log[skt] = nil return ret, err end _writing_log[skt] = os.time() coroutine.yield(skt, _writing) until false return ret, err end -- flushes a client write buffer (deprecated) function flush(client) end -- wraps a socket to use Copas methods (send, receive, flush and settimeout) local _skt_mt = {__index = { send = function (self, data, from, to) return send (self.socket, data, from, to) end, receive = function (self, pattern) if (self.timeout==0) then return receivePartial(self.socket, pattern) end return receive (self.socket, pattern) end, flush = function (self) return flush (self.socket) end, settimeout = function (self,time) self.timeout=time return end, }} function wrap (skt) return setmetatable ({socket = skt}, _skt_mt) end -------------------------------------------------- -- Error handling -------------------------------------------------- local _errhandlers = {} -- error handler per coroutine function setErrorHandler (err) local co = coroutine.running() if co then _errhandlers [co] = err end end local function _deferror (msg, co, skt) print (msg, co, skt) end ------------------------------------------------------------------------------- -- Thread handling ------------------------------------------------------------------------------- io.stdout:write("XXX MYCOPAS XXX\n") io.stdout:flush() local function _doTick (co, skt, ...) if not co then return end io.stdout:write("XXX BEFORE resume co: ", tostring(co), " skt: ", tostring(skt), " XXX\n") io.stdout:flush() io.stdout:write("XXX args co: ", tostring(co), ": ") for i = 1, select("#", ...) do io.stdout:write(i, ": ", tostring(select(i, ...)), " ") end io.stdout:write("XXX\n") io.stdout:flush() local ok, res, new_q = coroutine.resume(co, skt, ...) io.stdout:write( "XXX AFTER resume", " CO: ", tostring(co), " OK: ", tostring(ok), " RES: ", tostring(res), " NEW_Q: ", tostring(new_q), " XXX\n" ) io.stdout:flush() if ok and res and new_q then new_q:insert (res) new_q:push (res, co) else if not ok then copcall (_errhandlers [co] or _deferror, res, co, skt) end if skt then io.stdout:write( "XXX CLOSING ", tostring(skt), " CO: ", tostring(co), " OK: ", tostring(ok), " RES: ", tostring(res), " NEW_Q: ", tostring(new_q), " XXX ", debug.traceback("here") ) io.stdout:flush() skt:close() end _errhandlers [co] = nil end end -- accepts a connection on socket input local function _accept(input, handler) local client = input:accept() io.stdout:write( "XXX ACCEPTED client: ", tostring(client), " input: ", tostring(input), " handler: ", tostring(handler), "\n" ) io.stdout:flush() if client then client:settimeout(0) local co = coroutine.create(handler) io.stdout:write( "XXX CREATED co: ", tostring(co), " client: ", tostring(client), " input: ", tostring(input), " handler: ", tostring(handler), "\n" ) _doTick (co, client) --_reading:insert(client) end return client end -- handle threads on a queue local function _tickRead (skt) _doTick (_reading:pop (skt), skt) end local function _tickWrite (skt) _doTick (_writing:pop (skt), skt) end ------------------------------------------------------------------------------- -- Adds a server/handler pair to Copas dispatcher ------------------------------------------------------------------------------- function addserver(server, handler, timeout) server:settimeout(timeout or 0.1) _servers[server] = handler _reading:insert(server) end ------------------------------------------------------------------------------- -- Adds an new courotine thread to Copas dispatcher ------------------------------------------------------------------------------- function addthread(thread, ...) local co = coroutine.create(thread) _doTick (co, nil, ...) end ------------------------------------------------------------------------------- -- tasks registering ------------------------------------------------------------------------------- local _tasks = {} local function addtaskRead (tsk) -- lets tasks call the default _tick() tsk.def_tick = _tickRead _tasks [tsk] = true end local function addtaskWrite (tsk) -- lets tasks call the default _tick() tsk.def_tick = _tickWrite _tasks [tsk] = true end local function tasks () return next, _tasks end ------------------------------------------------------------------------------- -- main tasks: manage readable and writable socket sets ------------------------------------------------------------------------------- -- a task to check ready to read events local _readable_t = { events = function(self) local i = 0 return function () i = i + 1 return self._evs [i] end end, tick = function (self, input) local handler = _servers[input] if handler then input = _accept(input, handler) else _reading:remove (input) self.def_tick (input) end end } addtaskRead (_readable_t) -- a task to check ready to write events local _writable_t = { events = function (self) local i = 0 return function () i = i + 1 return self._evs [i] end end, tick = function (self, output) _writing:remove (output) self.def_tick (output) end } addtaskWrite (_writable_t) local last_cleansing = 0 ------------------------------------------------------------------------------- -- Checks for reads and writes on sockets ------------------------------------------------------------------------------- local function _select (timeout) local err local readable={} local writable={} local r={} local w={} local now = os.time() local duration = os.difftime _readable_t._evs, _writable_t._evs, err = socket.select(_reading, _writing, timeout) local r_evs, w_evs = _readable_t._evs, _writable_t._evs if duration(now, last_cleansing) > WATCH_DOG_TIMEOUT then last_cleansing = now for k,v in pairs(_reading_log) do if not r_evs[k] and duration(now, v) > WATCH_DOG_TIMEOUT then _reading_log[k] = nil r_evs[#r_evs + 1] = k r_evs[k] = #r_evs end end for k,v in pairs(_writing_log) do if not w_evs[k] and duration(now, v) > WATCH_DOG_TIMEOUT then _writing_log[k] = nil w_evs[#w_evs + 1] = k w_evs[k] = #w_evs end end end if err == "timeout" and #r_evs + #w_evs > 0 then return nil else return err end end ------------------------------------------------------------------------------- -- Dispatcher loop step. -- Listen to client requests and handles them ------------------------------------------------------------------------------- function step(timeout) local err = _select (timeout) if err == "timeout" then return end if err then error(err) end for tsk in tasks() do for ev in tsk:events() do tsk:tick (ev) end end end ------------------------------------------------------------------------------- -- Dispatcher endless loop. -- Listen to client requests and handles them forever ------------------------------------------------------------------------------- function loop(timeout) while true do step(timeout) end end
_______________________________________________ Kepler-Project mailing list Kepler-Project@lists.luaforge.net http://lists.luaforge.net/cgi-bin/mailman/listinfo/kepler-project http://www.keplerproject.org/