Changelog:
0.04:
- added missing binary flags to open() calls to make it work on NT.
- added flush() calls after file writing
- added close() method to the DirectoryStorage class
0.03:
- don't die if posix.fsync is not available
- base class renamed to DirectoryStorage
- added directory (re)creation and locking
- added a 'magic' file
- last oid file open/close bug
- moved serials to an gdbm database
0.02:
- converted to using os.path.join
0.01:
- initial release
-Petru
# Copyright (c) 2000 Petru Paler ([EMAIL PROTECTED]) All rights reserved.
# To-do list:
# Critical:
# - Most I/O operations are not atomic. This means that I/O errors (and, for
# that matter, any exceptions occured in _finish) leave the storage in an
# undefined state
#
# Not critical:
# - Performance tuning -- add caches where apropiate
# - make sure locking is ok
# - provide directory splitting to enable use on non-reiser filesystems
#
# ... and probably many others
"""Directory storage"""
__version__ = "0.04"
from BaseStorage import BaseStorage
from lock_file import lock_file
import POSException
import os, struct, stat, operator, gdbm
try:
from posix import fsync
except:
fsync = None
def oid2str(oid):
return "%02X"*8 % struct.unpack("!8B", oid)
def rmrf(dir):
"Recursively delete dir"
for f in os.listdir(dir):
pathname = os.path.join(dir, f)
mode = os.stat(pathname)[stat.ST_MODE]
if stat.S_ISDIR(mode):
rmrf(pathname)
elif stat.S_ISREG(mode):
os.unlink(pathname)
else:
raise 'remove error', 'strange file in dir storage'
os.rmdir(dir)
MAGIC = 'Directory Storage Format v1.0'
class DirectoryStorage(BaseStorage):
def __init__(self, dirname, create=0):
if not os.path.exists(dirname):
create = 1
if os.path.exists(dirname):
try:
f = open(os.path.join(dirname, 'lock'), 'r+')
except:
f = open(os.path.join(dirname, 'lock'), 'w+')
lock_file(f)
try:
f.write(str(os.getpid()))
f.flush()
except:
pass
self._lock_file = f
BaseStorage.__init__(self, dirname)
self._oid_dir = os.path.join(dirname, 'oid')
self._tindex = []
self._oid = '\0\0\0\0\0\0\0\0'
if create:
if os.path.exists(dirname):
# prune the old dir
rmrf(dirname)
os.mkdir(dirname)
os.mkdir(os.path.join(dirname, 'oid'))
f = open(os.path.join(dirname, 'lock'), 'w+')
lock_file(f)
try:
f.write(str(os.getpid()))
f.flush()
except:
pass
self._lock_file = f
f = open(os.path.join(dirname, 'magic'), 'w+')
f.write(MAGIC)
f.close()
self._sindex = gdbm.open(os.path.join(dirname, 'serial'), 'n')
self.lof = open(os.path.join(dirname, 'last_oid'), 'w+b')
else:
self._sindex = gdbm.open(os.path.join(dirname, 'serial'), 'w')
f = open(os.path.join(dirname, 'last_oid'), 'r+b')
self._oid = f.read()
self.lof = f
def __len__(self):
return len(os.listdir(self._oid_dir))
def getSize(self):
files = os.listdir(self._oid_dir)
files = map(lambda x,d=self._oid_dir: os.path.join(d, x), files)
sizes = map(lambda x: os.stat(x)[stat.ST_SIZE], files)
return reduce(operator.add, sizes)
def close(self):
self.lof.close()
self._sindex.close()
self._lock_file.close()
def load(self, oid, version):
self._lock_acquire()
try:
f = open(os.path.join(self._oid_dir,oid2str(oid)), 'rb')
p = f.read()
f.close()
return p, self._sindex[oid]
finally:
self._lock_release()
def store(self, oid, serial, data, version, transaction):
if transaction is not self._transaction:
raise POSException.StorageTransactionError(self, transaction)
if version:
raise POSException.Unsupported, "Versions aren't supported"
self._lock_acquire()
try:
if self._sindex.has_key(oid):
if serial != self._sindex[oid]:
raise POSException.ConflictError
serial = self._serial
self._tindex.append((oid, serial, data))
finally:
self._lock_release()
return serial
def _begin(self, tid, u, d, e):
pass
def _clear_temp(self):
self._tindex = []
def _finish(self, tid, user, desc, ext):
try:
for oid, serial, data in self._tindex:
self._sindex[oid] = serial
self._sindex.sync()
f = open(os.path.join(self._oid_dir,oid2str(oid)),
'wb')
f.write(data)
f.flush()
if fsync:
fsync(f.fileno())
f.close()
self.lof.seek(0)
self.lof.write(self._oid)
self.log.flush()
if fsync:
fsync(self.lof.fileno())
except:
raise POSException.TransactionError, 'I/O error commiting
transaction -- your storage is likely corrupt'
def pack(self, t, referencesf):
maxoid = oid2str(self._oid)
rootl = ['\0\0\0\0\0\0\0\0']
pop = rootl.pop
pindex = {}
referenced = pindex.has_key
while rootl:
oid = pop()
if referenced(oid):
continue
# scan pickle for references
f = open(os.path.join(self._oid_dir,oid2str(oid)), 'rb')
p = f.read()
f.close()
pindex[oid2str(oid)] = p
referencesf(p, rootl)
# now delete any unreferenced entries
deleted = []
for oid in os.listdir(self._oid_dir):
if not referenced(oid) and (oid <= maxoid):
deleted.append(oid)
try:
for i in deleted:
os.unlink(os.path.join(self._oid_dir,i))
except:
raise POSException.TransactionError, 'I/O error during pack --
your storage is probably screwed up'