We had this bug in production. Attached file is a stress test I wrote to 
reproduce and understand the issue.
import asyncore, logging, os, signal, shutil, sys, tempfile, time
import transaction
import ZODB
from ZODB.FileStorage import FileStorage
from ZODB.POSException import ConflictError
from ZODB.tests.ConflictResolution import PCounter
from ZODB.tests.MinPO import MinPO
from ZEO.ClientStorage import ClientStorage
from ZEO.StorageServer import StorageServer
from ZEO.tests.forker import get_port

formatter = logging.Formatter('%(asctime)s %(levelname)s %(message)s')

class PRandomlyConflictingCounter(PCounter):

    def _p_resolveConflict(self, oldState, savedState, newState):
        if newState.get('_value', 0) % 5:
            return PCounter._p_resolveConflict(
                self, oldState, savedState, newState)
        raise ConflictError

def fork(name):
    pid = os.fork()
    if pid:
        return pid
    handler = logging.FileHandler("%s-%s.log" % (name, os.getpid()))
    handler.setFormatter(formatter)
    logging.getLogger().addHandler(handler)

class Server(object):

    def __init__(self, name):
        addr = "localhost", get_port()
        self.pid = fork("server")
        if self.pid:
            self.client = lambda *args, **kw: \
                ClientStorage(addr, name, *args, **kw)
        else:
            path = "%s.fs" % os.getpid()
            server = StorageServer(addr, {name: FileStorage(path)})
            asyncore.loop()
            assert 0

class main(object):

    def __new__(cls):
        self = object.__new__(cls)
        self.wd = None
        self.servers = []
        self.clients = []
        pid = os.getpid()
        try:
            self.wd = tempfile.mkdtemp()
            os.chdir(self.wd)
            root = logging.getLogger()
            root.setLevel(logging.DEBUG)
            assert not root.handlers
            self.init_server(PCounter())
            self.init_server(PRandomlyConflictingCounter())
            for i in xrange(4):
                self.clients.append(self.run_client())
            signal.pause()
        finally:
            if pid != os.getpid():
                os._exit(0)
            elif self.wd:
                for server in self.servers:
                    try:
                        os.waitpid(server.pid, 0)
                    except:
                        pass
                for pid in self.clients:
                    try:
                        os.waitpid(pid, 0)
                    except:
                        pass
                #shutil.rmtree(self.wd)

    def init_server(self, obj):
        server = Server(str(len(self.servers)))
        self.servers.append(server)
        db = ZODB.DB(server.client())
        cn = db.open()
        cn.root()[None] = obj
        transaction.commit()
        server.oid = obj._p_oid
        cn.close()

    def run_client(self):
        pid = fork("client")
        if pid:
            return pid
        self.cn = [ZODB.DB(server.client()).open() for server in self.servers]
        delay = len(self.clients) * 0.1
        while 1:
            try:
                if delay:
                    time.sleep(delay)
                for i, cn in enumerate(self.cn):
                    cn.get(self.servers[i].oid).inc()
                transaction.commit()
            except ConflictError:
                transaction.abort()

if __name__ == "__main__":
    main()

Attachment: signature.asc
Description: OpenPGP digital signature

_______________________________________________
For more information about ZODB, see http://zodb.org/

ZODB-Dev mailing list  -  ZODB-Dev@zope.org
https://mail.zope.org/mailman/listinfo/zodb-dev

Reply via email to