Note that this version (experimentally) uses a dbm database for storing serials. If you don't have/don't want to use gdbm, replace all occurences of gdbm with dbhash or dumbdbm. I didn't used anydbm because it has some strange problems with my python installation :) (but it will be fixed soon). Changelog: 0.03: - don't die if posix.fsync is not available - base class renamed to DirectoryStorage - added directory (re)creation and locking - added a 'magic' file - last oid file open/close bug - moved serials to an gdbm database 0.02: - converted to using os.path.join 0.01: - initial release -Petru
# Copyright (c) 2000 Petru Paler ([EMAIL PROTECTED]) All rights reserved. # To-do list: # Critical: # - Most I/O operations are not atomic. This means that I/O errors (and, for # that matter, any exceptions occured in _finish) leave the storage in an # undefined state # # Not critical: # - Performance tuning -- add caches where apropiate # - make sure locking is ok # - provide directory splitting to enable use on non-reiser filesystems # # ... and probably many others """Directory storage""" __version__ = "0.03" from BaseStorage import BaseStorage from lock_file import lock_file import POSException import os, struct, stat, operator, gdbm try: from posix import fsync except: fsync = None def oid2str(oid): return "%02X"*8 % struct.unpack("!8B", oid) def rmrf(dir): "Recursively delete dir" for f in os.listdir(dir): pathname = os.path.join(dir, f) mode = os.stat(pathname)[stat.ST_MODE] if stat.S_ISDIR(mode): rmrf(pathname) elif stat.S_ISREG(mode): os.unlink(pathname) else: raise 'remove error', 'strange file in dir storage' os.rmdir(dir) MAGIC = 'Directory Storage Format v1.0' class DirectoryStorage(BaseStorage): def __init__(self, dirname, create=0): if not os.path.exists(dirname): create = 1 if os.path.exists(dirname): try: f = open(os.path.join(dirname, 'lock'), 'r+') except: f = open(os.path.join(dirname, 'lock'), 'w+') lock_file(f) try: f.write(str(os.getpid())) f.flush() except: pass self._lock_file = f BaseStorage.__init__(self, dirname) self._oid_dir = os.path.join(dirname, 'oid') self._tindex = [] self._oid = '\0\0\0\0\0\0\0\0' if create: if os.path.exists(dirname): # prune the old dir rmrf(dirname) os.mkdir(dirname) os.mkdir(os.path.join(dirname, 'oid')) f = open(os.path.join(dirname, 'lock'), 'w+') lock_file(f) try: f.write(str(os.getpid())) f.flush() except: pass self._lock_file = f f = open(os.path.join(dirname, 'magic'), 'w+') f.write(MAGIC) f.close() self._sindex = gdbm.open(os.path.join(dirname, 'serial'), 'n') self.lof = open(os.path.join(dirname, 'last_oid'), 'w+') else: self._sindex = gdbm.open(os.path.join(dirname, 'serial'), 'w') f = open(os.path.join(dirname, 'last_oid'), 'r+') self._oid = f.read() self.lof = f def __len__(self): return len(os.listdir(self._oid_dir)) def getSize(self): files = os.listdir(self._oid_dir) files = map(lambda x,d=self._oid_dir: os.path.join(d, x), files) sizes = map(lambda x: os.stat(x)[stat.ST_SIZE], files) return reduce(operator.add, sizes) def load(self, oid, version): self._lock_acquire() try: f = open(os.path.join(self._oid_dir,oid2str(oid)), 'rb') p = f.read() f.close() return p, self._sindex[oid] finally: self._lock_release() def store(self, oid, serial, data, version, transaction): if transaction is not self._transaction: raise POSException.StorageTransactionError(self, transaction) if version: raise POSException.Unsupported, "Versions aren't supported" self._lock_acquire() try: if self._sindex.has_key(oid): if serial != self._sindex[oid]: raise POSException.ConflictError serial = self._serial self._tindex.append((oid, serial, data)) finally: self._lock_release() return serial def _begin(self, tid, u, d, e): pass def _clear_temp(self): self._tindex = [] def _finish(self, tid, user, desc, ext): try: for oid, serial, data in self._tindex: self._sindex[oid] = serial self._sindex.sync() f = open(os.path.join(self._oid_dir,oid2str(oid)), 'wb') f.write(data) if fsync: fsync(f.fileno()) f.close() self.lof.seek(0) self.lof.write(self._oid) if fsync: fsync(self.lof.fileno()) except: raise POSException.TransactionError, 'I/O error commiting transaction -- your storage is likely corrupt' def pack(self, t, referencesf): maxoid = oid2str(self._oid) rootl = ['\0\0\0\0\0\0\0\0'] pop = rootl.pop pindex = {} referenced = pindex.has_key while rootl: oid = pop() if referenced(oid): continue # scan pickle for references f = open(os.path.join(self._oid_dir,oid2str(oid))) p = f.read() f.close() pindex[oid2str(oid)] = p referencesf(p, rootl) # now delete any unreferenced entries deleted = [] for oid in os.listdir(self._oid_dir): if not referenced(oid) and (oid <= maxoid): deleted.append(oid) try: for i in deleted: os.unlink(os.path.join(self._oid_dir,i)) except: raise POSException.TransactionError, 'I/O error during pack -- your storage is probably screwed up'