Toby Dickenson <[EMAIL PROTECTED]> writes:

> On Thursday 22 April 2004 11:43, Syver Enstad wrote:
> 
> > cache_deactivate_after sounds interesting. Since I am running ZODB in
> > a web server I don't want the data to timeout and disappear from
> > memory since every object I have should be loaded at all times.
> 
> Thats why ZODB ignores that parameter now ;-)

Good :-)

I have a strange case here with ReadConflictErrors. I don't know if
this is covered already but anyway. I am using ZODB 3.2 so this might
be fixed in 3.3 for all I know.

Just replace the twisted stuff with the standard lib unittest module to
have it work on a computer without twisted installed.

import pdb
import sys
import time
import threading
import os

from ZODB import DB
from ZODB.PersistentList import PersistentList
from ZODB.FileStorage import FileStorage
from ZODB.POSException import ReadConflictError
from Persistence import Persistent

from twisted.trial import unittest


class Computer(Persistent):
    def __init__(self, oidInteger):
        self._oid = oidInteger
        self._articleStatus = None
        self._endUserPrice = None
        self._location = None
        
    def setArticleStatus(self, anObject):
        self._articleStatus = anObject

    def setEndUserPrice(self, anObject):
        self._endUserPrice = anObject

    def setLocation(self, anObject):
        self._location = anObject


class MockArticleDb(Persistent):
    def __init__(self):
        self._articles = PersistentList()

    def addArticle(self, anArticle):
        self._articles.append(anArticle)

    def articles(self):
        return self._articles



class TestReadConflictError(unittest.TestCase):
    def setUp(self):
        try:
            os.unlink('test.fs')
        except OSError, err:
            pass
        storage = FileStorage(
            'test.fs',
            create=True)
        db = DB(storage, cache_size=100000)
        connection = db.open()
        connection.setLocalTransaction()
        connection.root()['articledb'] = MockArticleDb()
        computer = Computer(1)
        computer.setArticleStatus('For sale')
        computer.setEndUserPrice(10050)
        computer.setLocation('H0101')
        connection.root()['articledb'].addArticle(computer)
        connection.getTransaction().commit()
        connection.close()
        db.close()
        storage.close()

        storage = FileStorage(
            'test.fs',
            create=False)
        self._db = DB(storage, cache_size=100000)
        

    def writeStuff(self, connection):
        begin = time.time()
        connection.sync()
        articleDb = connection.root()['articledb']
        eachBegin = time.time()
        computer = Computer(1)
        computer.setArticleStatus('For sale')
        computer.setEndUserPrice(10050)
        computer.setLocation('H0101')
        articleDb.addArticle(computer)
        connection.getTransaction().commit()
        print 'write list length', len(articleDb.articles())

    def doRead(self, oddRead, connection):
        tryCount = 1
        articleDb = connection.root()['articledb']
        try:
            for article in articleDb.articles():
                pass
        except ReadConflictError, err:
            if True: 
                connection.sync()
            else: # this also works
                connection.close()
                connection = self._db.open()
                connection.setLocalTransaction()
            for article in articleDb.articles():
                pass
            tryCount += 1
        print 'read list length', len(articleDb.articles())
        if oddRead:
            # on first, third, fifth read we get a ReadConflictError
            self.assertEquals(
                2,
                tryCount)
        else:
            # but on second, fourth and so forth we don't
            self.assertEquals(
                1,
                tryCount)                
        
    def testProvokeReadConflictError(self):
        conn2 = self._db.open()
        conn2.setLocalTransaction()
        for each in range(1, 11): # 1000 works also but takes a bit of time
            begin = time.time()
            connection = self._db.open()
            connection.setLocalTransaction()
            self.writeStuff(conn2)
            oddRead = each % 2 != 0
            self.doRead(oddRead, connection)
            connection.close()
        
        
if __name__ == '__main__':
    from twisted.scripts.trial import run
    import sys
    sys.argv.append(sys.argv[0])
    run()
_______________________________________________
Zope-Dev maillist  -  [EMAIL PROTECTED]
http://mail.zope.org/mailman/listinfo/zope-dev
**  No cross posts or HTML encoding!  **
(Related lists - 
 http://mail.zope.org/mailman/listinfo/zope-announce
 http://mail.zope.org/mailman/listinfo/zope )

Reply via email to