I'm having a problem with my first multi-threaded application using ZODB (v3.6).

The program basically uses an OOBTree to record whether tasks have been completed. The key is the task identifier. There are multiple Downloader threads that need to read the OOBTree (to check if a task has been completed). There is a single ProcessResults thread that writes to the OOBTree (when a task is completed). Each thread uses the same DB instance, but uses it's own connection instance. I also have a Packer thread that periodically packs the database.

The problem is that the Downloader thread occasionally throws a ReadConfictError exception.

I had thought that I could just wait for a bit and then retry the read, but that didn't help. I have a workaround, which is to assume the task is not done when the exception is thrown. But this isn't very elegant.

I suspect that I may not understand how the transaction manager works. Currently I just use "transaction.get().commit()" after each write to the database.

Any advice gratefully received.

I have attached a skeleton of the code below, if that helps.

- Andrew McLean

# Import various modules to support the ZODB
from ZODB import FileStorage, DB
from ZODB.POSException import ReadConflictError
from BTrees.OOBTree import OOBTree
import transaction
import BTrees.check

class Packer(threading.Thread):
  """Packer class to pack the ZODB at periodic intervals"""
  def __init__(self, db, interval):
      self.__db = db
      self.__interval = interval
  def run(self):
      while 1:
          print "Packing database..."

class Downloader(threading.Thread):
  """Downloader class to download urls"""
  def __init__(self, id, taskQueue, resultQueue, db):
      self.id = id
      self.__taskQueue = taskQueue
      self.__resultQueue = resultQueue
      self.__results = db.open().root()['done']
  def run(self):
      while 1:
          # Get task from the queue
          task = self.__taskQueue.get()
          if (task[1] == 1) or (not self.done(task)):
              # ....
              # Add to results queue
              self.__resultQueue.put((task, result))
  def done(self, task):
      ## backoffDelay = 1
      while 1:
              return self.__results.has_key(task)
          except ReadConflictError:
## print "*** ReadConfictError raised: retrying in %d seconds ***" % backoffDelay
              ## time.sleep(backoffDelay)
              ## backoffDelay = backoffDelay * 2
print "*** ReadConflictError raised: Assume task not completed for safety ***"
              return False

class ProcessResults(threading.Thread):
  """ProcessPage class to process downloaded URLs"""
  def __init__(self, taskQueue, resultQueue, resultWriter, db, problems):
      self.__taskQueue = taskQueue
      self.__resultQueue = resultQueue
      self.__writer = resultWriter
      self.__results = db.open().root()['done']
      self.__problems = problems
  def run(self):
      while 1:
          fullResult = self.__resultQueue.get()

          # ...

          # Write results to database
          self.__results[task] = True

def main():

  # Setting up the ZODB database
  # ...
  storage = FileStorage.FileStorage(databasePath)
  db = DB(storage)
    # Create the "done" object (if required)
  root = db.open().root()
  if not root.has_key('done'):
      root['done'] = OOBTree()

  # ...

  # Set up the workers to process the task queue
  print "Starting threads..."
ProcessResults(taskQueue, resultQueue, resultWriter, db, problems).start()
  Packer(db, 15*60).start()
  for id in range(nDownloaders):
      Downloader(id, taskQueue, resultQueue, db).start()


For more information about ZODB, see the ZODB Wiki:

ZODB-Dev mailing list  -  ZODB-Dev@zope.org

Reply via email to