Christian Heimes wrote at 2005-5-4 22:14 +0200:
>Dieter Maurer wrote:
>> It is possible (by means of a historical connection)
>> and I posted relevant code some time ago.
>> Search the archive for "HistoryJar".
>
>Sorry I can't find the code. Neither with historical jar zodb nor 
>historical jar dieter googles something useful for me.

Maybe, Google does not index attachments.

Here is the code again.

-- 
Dieter

#       $Id: History.py,v 1.8 2004/01/02 16:47:58 dieter Exp $
'''History extensions.'''

from OFS.History import HystoryJar, Historian, Historical
from struct import pack, unpack
from cStringIO import StringIO
from cPickle import Unpickler
from ZODB.Connection import Connection
from ZODB.POSException import POSKeyError
from ZODB.TimeStamp import TimeStamp
from sys import maxint


# Code mostly stolen from "OFS.History"
def historicalRevision(self,serial):
  '''return the state of *self* current at *serial* and set "_p_jar"
  up in a way that all subobject access has the same property.'''
  jar= HistoryJar(self._p_jar,serial)
  rev= jar[self._p_oid]
  parent= getattr(self,'aq_parent',None)
  of= getattr(rev,'__of__',None)
  if parent is not None and of is not None: rev= of(parent)
  return rev


class HistoryJar(HystoryJar):
  '''a relaying connection selecting an object current at a serial (representing a time).'''
  def __init__(self,base,serial):
    HystoryJar.__init__(self,base)
    self._jar= base; self._serial= serial
    self._cache= {}

  def __getitem__(self,oid):
    '''return the state for *oid* current for *_serial*.'''
    # determine which history records are available
    # ATT: this information should be cached
    #      even better, when ZODB would support loading an object valid at serial
    jar= self._jar; obj= jar[oid]
    rev= obj.__class__.__basicnew__()
    rev._p_jar= self
    rev._p_oid= oid
    self.setstate(rev)
    return rev

  def setstate(self,obj):
    serial= self._serial; jar= self._jar; oid= obj._p_oid
    S= jar._storage
    filter= lambda t, serial= serial: t['serial'] <= serial # this might need to become a picklable object, once ZEO supports 'filter'.
    if _hasSmartHistory(S):
      ts= S.history(oid,None,1, filter= filter, )
    else:
      hf= HistoryFetcher(jar,oid,None,last=1, filter=filter)
      ts= hf.next()
    if not ts: raise POSKeyError, (oid,serial)
    t= ts[0]
    oserial= t['serial']
    state= self.oldstate(obj,oserial)
    obj._p_serial= oserial
    obj.__setstate__(state)
    ht= TimeStamp(unpack('8s',serial)[0]).timeTime()
    try: obj._p_historical= ht
    except AttributeError:
      # this may not be possible, e.g. for __dict__ less instances (such as "BTrees"
      pass
    obj._p_changed= 0

  # take over methods from "Connection", because we do not derive
  #   from this class, we must use "im_func".
  oldstate= Connection.oldstate.im_func
  _persistent_load= Connection._persistent_load.im_func
    


class Historian(Historian):
  '''deliver historical revisions.'''
  def __getitem__(self,key):
    self= self.aq_parent
    serial=_decodeSerial(key)
    # Note: we can not use the shortcut below, because
    #   we would loose history information for subobjects
    #   which may be newer.
    # if serial == self._p_serial: return self
    return historicalRevision(self,serial)

class Historical(Historical):
  HistoricalRevisions= Historian()

  # provide information on whether or not this is a historical version
  def isHistorical(self):
    '''false (in fact 'None'), if *self* is not historical;
    otherwise, the time for which *self* has been requested.'''
    return getattr(self,'_p_historical',None)

  def getHistoricalSubpath(self):
    '''returns an URL subpath to reference a historical version corresponding to this time.'''
    ht= self.isHistorical()
    if ht is None: return ''
    return '/HistoricalRevisions/' + _encodeSerial(self._p_serial)


  # must override to use our "historicalRevision"
  def manage_historicalComparison(self, REQUEST, keys=[]):
        "Compare two selected revisions"
        if not keys:
            raise HistorySelectionError, (
                "No historical revision was selected.<p>")
        if len(keys) > 2:
            raise HistorySelectionError, (
                "Only two historical revision can be compared<p>")

        rev1=historicalRevision(self, _decodeSerial(keys[-1]))

        if len(keys)==2:
            rev2=historicalRevision(self, _decodeSerial(keys[0]))
        else:
            rev2=self

        return self.manage_historyCompare(rev1, rev2, REQUEST)
  

class HistoryFetcher:
  '''auxiliary class to incrementally fetch history.'''
  _curr= 0
  _inc= 1
  _complete= 0

  def __init__(self,jar,oid,version=None,first=0,last=None,filter=None):
    '''prepare fetching historical records for *oid* in *version*.

    Tries to find records satisfying *filter*. Records *first*
    through *last* are returned.
    '''
    S= jar._storage
    history= S.history
    if _hasSmartHistory(S): self._smartHistory= history
    else: self._getRawHistory= history
    self._oid= oid
    self._version= version
    self._first= first
    self._last= last
    self._filter= _Filter(filter)

  def fetch(self):
    '''return the relevant history records.'''
    self.reset()
    n= self._last
    if n is None: n= maxint
    return self.next(n-self._first)

  def next(self,n=1):
    '''return up to n more relevant history records.'''
    l= []
    while n > 0:
      x= self._next()
      if x is None: break
      l.append(x); n-= 1
    return l

  def reset(self):
    self._bi= self._complete= self._filter.stopped= 0
    if self._buffer is None or self._last is not None or self._curr == self._first:
      return # just reset the buffer index
    del self._buffer # delete the buffer as well

  _buffer= None
  def _next(self):
    if self._complete: return
    buffer= self._buffer
    if buffer is None or self._bi >= len(buffer): buffer= self._prefetch()
    if not buffer: self._complete= 1; return
    x= buffer[self._bi]; self._bi+= 1
    return x

  def _prefetch(self):
    '''prefetch history information.'''
    buffer= self._buffer
    self._bi= 0
    # if we know last, we fetch them in a single step
    last= self._last
    if last is not None:
      if buffer is not None: return # we already read everything
      self._buffer= buffer= self._fetchHistory(self._first,last)
      return buffer
    if buffer is None:
      # the first round
      curr= self._first
    else: curr= self._curr= self._curr + len(buffer)
    inc= self._inc
    buffer= self._fetchHistory(curr,curr+inc)
    self._inc<<= 1
    self._buffer= buffer
    return buffer

  def _fetchHistory(self,first,last):
    filter= self._filter
    if filter.stopped: return
    filter.setIgnore(first)
    inc= last-first
    r= self._smartHistory(self._oid, self._version,inc,filter)
    if len(r) != inc: filter.stopped= 1
    return r

  def _smartHistory(self,oid,version,size,filter):
    '''emulate a smart (i.e. filtered) history fetching for
    storages that provide only a simple one.'''
    r= [] # the result list
    if size <= 0: return r
    l= 0; h= size+filter._igncount # ATT: this is a very crude approximation -- we may do much better when we cache a bit.
    history= self._getRawHistory
    while 1:
      ts= history(oid,version,h)
      if len(ts) <= l: break
      for t in ts[l:h]:
        if filter(t):
          r.append(t)
          if len(r) >= size: return r # ATT: we may want to cache the remaining elements and how far we had to fetch ahead
      if len(ts) < h: break
      l= h; h<<= 1
    filter.stopped= 1
    return r


class _Filter:
  '''A filter ignoring an initial segment of a sequence accepted by a *baseFilter*.'''
  stopped= None
  _igncount= 0  # ignore that many events

  def __init__(self,baseFilter):
    self._base= baseFilter

  def setIgnore(self,no):
    self._igncount= no

  def __call__(self,obj):
    '''check whether *obj* should pass.'''
    base= self._base
    fd= base is None or base(obj)
    if fd:
      if self._igncount: self._igncount-= 1; return
    return fd



###########################################################################
## HistoryProducer
## an incremental producer of history records
class HistoryProducer:
  '''a history producer to be used together with an incremental merger.
  The elements are compared with respect to time and contain
  both the objects as well as the history record.'''
  def __init__(self,obj):
    self._obj= obj
    self._fetcher= HistoryFetcher(obj._p_jar,obj._p_oid)

  def pop(self):
    h= self._fetcher.next()
    if not h: return
    return _HistoryRecord(h[0],self._obj)

class _HistoryRecord:
  def __init__(self,data,obj):
    self._data= data
    self._obj= obj

  def __cmp__(self,other):
    return cmp(other._data['time'],self._data['time'])



###########################################################################
## Auxiliaries
def _encodeSerial(serial):
  '''encode a serial into a '.' separated sequence of ints.'''
  return '.'.join(map(str, unpack(">HHHH", serial)))

def _decodeSerial(code):
  '''decode a serial encoded as a '.' separated sequence of ints.'''
  return apply(pack, ('>HHHH',)+tuple(map(int,code.split('.'))))


###########################################################################
## non standard ZODB extension handling
try: from ZODB.BaseStorage import _hasSmartHistory
except ImportError:
  def _hasSmartHistory(storage): return 0
_______________________________________________
For more information about ZODB, see the ZODB Wiki:
http://www.zope.org/Wikis/ZODB/

ZODB-Dev mailing list  -  ZODB-Dev@zope.org
http://mail.zope.org/mailman/listinfo/zodb-dev

Reply via email to