On Fri, 2006-07-14 at 08:58 +0200, Joachim Schmitz wrote:
> Roché Compaan schrieb:
> > 
> > If you have common add/edit methods in your app used by all of your
> > classes, you could do application level replication between Zopes. We
> > have written a small replication module for on of our apps that copes
> > extremely well with disconnected Zopes, and replicates a very compact
> > serialisation of commands to each other (as opposed to whole objects).
> > And they replicate both ways. We been using it for about 4 years in
> > production now.
> > 
> > I could send you an example if you want to evaluate this route.
> > 
> could you post it to the list, cause that would be very interesting for 
> us too.

No problem, but remember this is app specific and still works with the
ZODB distributed with Zope 2.7.

In summary, it works like this:

There are 3 classes: ReplicationLog, ReplicationTarget and

You add a ReplicationLog to your Zope app and add ReplicationTarget
instances inside it. You log transactions to a ReplicationLog by calling
it's "log" method e.g:

self.ReplicationLog.log(object, 'manage_edit', {'x': 1, 'y': 2})

where the first arg is the object, second arg is the method and the last
arg is mapping of values that should change.

The ReplicationLog serialises arguments in '_serialise_args' and this
will be the app specific part. You will for instance notice that we have
a special serialisation for something we call a "reference".

A cron job calls replicate on your ReplicationLog instance.

Roché Compaan
Upfront Systems                 http://www.upfrontsystems.co.za
import time
import binascii
import urlparse, urllib, urllib2
import socket
from sys import exc_info
from zlib import compress, decompress

from zLOG import LOG, INFO
from cPickle import Pickler, Unpickler
from cStringIO import StringIO
from AccessControl import ClassSecurityInfo
from DateTime import DateTime
from BTrees.OOBTree import OOBTree, OOSet
from ZODB import Persistent
from ZODB.PersistentMapping import PersistentMapping
from zLOG import INFO
from Acquisition import ImplicitAcquisitionWrapper
from Acquisition import aq_base

from bases import CremeSimpleItem, CremeContainer
from Products.Associations.IReferenceWrappers import IReferenceListWrapper
from utils import zlog

def oid2str(oid):
    return binascii.b2a_hex(oid).upper()

class ReplogTransaction(Persistent):

    meta_type = 'ReplogTransaction'

    def __init__(self, transaction_id):
        self.id = transaction_id
        self._entries = OOBTree()
        t = get_transaction()
        self.user = t.user
        self.description = t.description

    def add_entry(self, entry_dict):
        self._entries[entry_dict.get('id')] = entry_dict

    def replicate(self, target):
        d = {'id': self.id,
             'user': self.user, 
             'description': self.description,
             'entries': self._entries}
        f = StringIO()
        p = Pickler(f)

        payloadStr = f.getvalue()
        LOG('Replication', INFO, 'replicate> transaction id: %s; '
                'size (uncompressed): %s' % (
                    oid2str(self.id), len(payloadStr))) #DBG
        payloadStr = compress(payloadStr)

        handler = FixedHTTPHandler()
        opener = urllib2.build_opener(handler)

        LOG('Replication', INFO, 'replicate> transaction id: %s; size: %s' % (
            oid2str(self.id), len(payloadStr))) #DBG
        url = '%s/load' % target.url
        schema, domain, path, x1, x2, x3 = urlparse.urlparse(url)
        newurl = '%s://%s:[EMAIL PROTECTED]' % (
            schema, target.username, target.password, domain, path)
            urllib2.urlopen(newurl, urllib.urlencode({'data': payloadStr}))
        except urllib2.HTTPError, e:
            if e.code != 204: # 204 == 'No content' which is what we expect

class FixedHTTPHandler(urllib2.HTTPHandler):
    """ urllib2.HTTPHandler does not handle http://usn:[EMAIL PROTECTED]/path

    def do_open(self, http_class, req):
        host = req.get_host()
        if not host:
            raise URLError('no host given')

        user_passwd, host = urllib.splituser(host)
        host = urllib.unquote(host)
        if user_passwd:
            import base64
            auth = base64.encodestring(user_passwd).strip()
            auth = None

        h = http_class(host) # will parse host:port
        if req.has_data():
            data = req.get_data()
            h.putrequest('POST', req.get_selector())
            if not req.headers.has_key('Content-type'):
            if not req.headers.has_key('Content-length'):
                h.putheader('Content-length', '%d' % len(data))
            h.putrequest('GET', req.get_selector())

        if auth: h.putheader('Authorization', 'Basic %s' % auth)

        scheme, sel = urllib.splittype(req.get_selector())
        sel_host, sel_path = urllib.splithost(sel)
        h.putheader('Host', sel_host or host)
        for args in self.parent.addheaders:
        for k, v in req.headers.items():
            h.putheader(k, v)
        # httplib will attempt to connect() here.  be prepared
        # to convert a socket error to a URLError.
        except socket.error, err:
            raise urllib2.URLError(err)
        if req.has_data():

        code, msg, hdrs = h.getreply()
        fp = h.getfile()
        if code == 200:
            return urllib.addinfourl(fp, hdrs, req.get_full_url())
            return self.parent.error('http', req, fp, code, msg, hdrs)

class ReplicationTarget(CremeSimpleItem):

    meta_type = 'ReplicationTarget'

    _properties = (
        {'id':'url', 'type':'string', 'mode':'wd'},
        {'id':'username', 'type':'string', 'mode':'wd'},
        {'id':'password', 'type':'string', 'mode':'wd'},

    def __init__(self, **kw):
        self._transactions = OOBTree()
        CremeSimpleItem.__init__(self, **kw)

    def log(self, entry_dict):
        t_id = self._p_jar._storage._serial
        if not self._transactions.has_key(t_id):
            self._transactions[t_id] = ReplogTransaction(t_id)
        transaction = self._transactions[t_id]

    def replicate(self):
        """ replicate log to targets """
        keys = OOSet(self._transactions.keys())
        for key in keys:
            transaction = self._transactions[key]
                del self._transactions[key]
                from sys import exc_info
                import traceback
                info = exc_info()
                     'Could not replicate transaction %s to %s'%(
                        oid2str(transaction.id), self.id)) 

    def html(self, suppress_entries=0):
        """ html log for viewing transactions in the ZMI """
        out = []
        keys = OOSet(self._transactions.keys())
        for t_id in keys:
            t = self._transactions[t_id]
<h4>Transaction id: %s</h4>
<em>User:</em> %s<br/>
<em>Description:</em> %s<br/>
''' % (oid2str(t.id), t.user, t.description))
            if suppress_entries:
            for entry_id in t._entries.keys():
                entry = t._entries[entry_id]
<em>id:</em> %(id)s<br/>
<em>obj:</em> %(path)s<br/>
<em>method:</em> %(method)s<br/>
<em>args:</em> %(args)s<br/>
''' % entry)
        out = '<hr>'.join(out)
        return '<html><body>%s</body></html>' % out

class ReplicationLog(CremeContainer):
    """ Replicates log entries to replication targets """

    meta_type = 'ReplicationLog'

    security = ClassSecurityInfo()
    security.declareObjectProtected('Use ReplicationLog')

    def __init__(self, **kw):
        self.id = kw.get('id', 'ReplicationLog')
        self._last_transaction_id = 0
        self._last_entry_id = 0

    def _get_entry_id(self):
        # XXX This check can go after the method has been called once.
        if not getattr(self, '_last_entry_id'):
            self._last_entry_id = 0
        new_id = str(time.time())
        while new_id == self._last_entry_id:
            new_id = str(time.time())
        self._last_entry_id = new_id
        return new_id

    def log(self, obj, method, meth_args):
        if obj.meta_type in ['ReplicationTarget', 'ReplicationLog']:

        entry_dict = {}
        entry_dict['id'] = self._get_entry_id()
        entry_dict['path'] = self._object_path(obj)
        entry_dict['method'] = method
        entry_dict['args'] = self._serialize_args(meth_args) 
        zLOG.LOG('Replication', INFO, 'ReplicationLog.log> entry_dict: %s' % entry_dict) #DBG 

        for target in self.objectValues('ReplicationTarget'):

    def replicate(self):
        """ replicate log to targets """
        for target in self.objectValues('ReplicationTarget'):

    def load(self, data=None, REQUEST=None):
        """ load replicated data """
        if data is None:
            data = REQUEST.get('data', None)
        if data is None:
        payload = decompress(data)
        f = StringIO(payload)
        transaction = Unpickler(f).load()
        t_id = transaction['id']

        if t_id > self._last_transaction_id:
            self.REQUEST.set('replicating', '1')
            t = get_transaction()
            t.user = transaction['user'] + ' (replicated)'
            t.description = transaction['description']
            for entry_dict in transaction['entries'].values():
            zLOG.LOG('Replication', INFO, 'ReplicationLog.load> transaction id: %s' % oid2str(t_id)) #DBG 
                 'Received old transaction: %s (last transaction: %s)'%(
                    t_id, self._last_transaction_id),

    def _serialize_args(self, meth_args):

        def _serialize_value(value):
            """ Return a 'reference://' if value has a path.
                Otherwise, pass through.
            if getattr(aq_base(value), 'getPhysicalPath', None):
                return 'reference://%s' % self._object_path(value)
            return value

        d = {}
        for k,v in meth_args.items():
            if not v:
            elif type(v) == type([]) or \
                d[k] = [_serialize_value(item) for item in v]
            elif isinstance(v, DateTime):
                d[k] = v.ISO()
                d[k] = _serialize_value(v)
        return d

    def _deserialize_args(self, meth_args):
        d = {}
        for k,v in meth_args.items():
            if type(v) == type([]):
                reference_list = []
                for item in v:
                    if hasattr(item, 'find') and \
                        path = self._siteRoot() + item[12:]
                d[k] = reference_list
            elif hasattr(v, 'find') and v.startswith('reference://'):
                path = self._siteRoot() + v[12:]
                d[k] = self.unrestrictedTraverse(path)
            elif type(v) == type(u''):
                d[k] = v.encode('ISO-8859-1') 
            elif type(v) == type(''):
                d[k] = unicode(v, 'ISO-8859-1').encode('ISO-8859-1')
                d[k] = v
        return d

    def realize_entry(self, entry_dict):
        # XXX: Temporary Hack to prevent 'realizing' of Reminders and
        # SystemEvents
        for mt in ('Reminder', 'SystemEvent'):
            if entry_dict['path'].split('/')[-1].find(mt) != -1:
        path = self._siteRoot() + entry_dict['path']
        obj = self.unrestrictedTraverse(path)
        method = getattr(obj, entry_dict['method'])
        meth_args = self._deserialize_args(entry_dict['args'])

    def _object_path(self, obj):
        root = self.getCreme().getPhysicalPath()
        path = obj.getPhysicalPath()[len(root):]
        return '/'.join(path)

    def _siteRoot(self):
        return '/'.join(self.getCreme().getPhysicalPath(
For more information about ZODB, see the ZODB Wiki:

ZODB-Dev mailing list  -  ZODB-Dev@zope.org

Reply via email to