Um...

I guess I must have asked too many questions at once :) I've implemented
a first pass of a container for external data and got it working on some
of my collections.

I've included the (first pass of the) base DBContainer and DBContained
objects for criticism, and would be very grateful if anyone would like
to take a crack at it.

Keep in mind, this is my first time working with the system, so I'm sure
I'm going about some things the wrong way. In particular, I already
think:

1) Instead of have the contained object update the container, I should
rely on the event notification system.

2) Right now, the system works by translating objects "at the border"
(in IExternalContainer). Some translation is necessary, for instance, to
move from mx.DateTime to datetime.datetime, but still I think I should
somehow be making use of the "adaptor" interface. 

3) Along these same lines, IDBContainer._containedType should really be
an interface (Object( schema = IDBContained ))

Note: is there a tutorial on writing containers anywhere I should have
read? I mainly figured this out by banging on it and fishing around in
the code. I'd love to figure out, for instance, what is really happening
with the traversals (with some interaction diagrams). I do think it was
harder than it should have been. (But, then again, I think that about
most things...:))

BTW in my humble opinion, ILocation.__name__ is not well named. When I
first got an error referring to __name__ I thought it was expecting a
class object. And what happens when, for some strange reason, someone
wants to put a class in a container, and doesn't like its default name?

Thanks,
- Shaun
------------------------------------------------------------------------
-------------

class IExternalContainer( Interface ):
    """an external container interface."""

    def add( obj ):
        """add 'obj'"""

    def update( obj ):
        """update obj"""

    def delete( obj ):
        """delete obj"""

    def connect( obj ):
        """establish a connection to external database"""

    def __iter__( ):
        """iterates through external objects"""

class IDBContainer( IContainer, ILocation ):
    """container that lives in zope, but contains objects
    made persistent by an external container."""

    _database = Object( schema = IExternalContainer )        
    _containedType = Attribute( "type of object contained" )

    def __setitem__( key, obj ): 
        """Add a IDBContained object.""" 

    def _updateItem( key, newKey = None ):
        """notify that object has been modified.
        
        If the key has been changed, 'key' should be the old
        key of the object, and 'newKey' should be set to the
        new key."""

    def __delitem__( key ):
        """delete item, removing it both from this view
        and from external storage."""

    def getAttributeNames():
        """get names of attributes of contained object"""

class IDBContained( IContained, ILocation ):
    """Contained in IDBContainer. Must notify its parent
    if it is changed. Because of this, it must also know how
    to get its key.
    """
    __parent__ = Field(
        constraint = ContainerTypesConstraint( IDBContainer ) )

    def getZopeKey():
        """return ascii str or unicode key."""

IDBContainer[ '__setitem__' ].precondition = ItemTypePrecondition(
IDBContained )
class DBContainer( IterableUserDict, Contained ):
    """
    Implements L{IDBContainer}

    >>> from zope.interface.verify import verifyClass
    >>> verifyClass( IDBContainer, DBContainer )
    True

    """
    implements( IDBContainer )
    
    # prevent browser from choosing name
    nameAllowed = False
    
    # defaults for IDBContainer
    _database = None
    _containedType = None
    
    # defaults for ILocation (base of IDBContainer)
    __parent__ = None
    __name__ = None
    
    def __init__( self  ):
        super( DBContainer, self ).__init__( )
        self.__setstate__()

    def _filterKey( self, newKey ):
        """check that key is ascii string or unicode, and convert
        to unicode."""
        
        if newKey is None:
            TypeError( "key can't be None" )
        elif isinstance( newKey, str ):
            try:
                newKey = unicode( newKey )
            except UnicodeError:
                raise TypeError("name not unicode or ascii string")

        elif not isinstance( newKey, unicode ):
            raise TypeError( "key '%s' must be ascii or unicode string"
% repr( newKey ) )

        return newKey

    def __setitem__( self, key, obj ):
        "Implements
L{cranedata.web.interfaces.IFundContainer.__setitem__}."
        key = self._filterKey( key )
        if not isinstance( obj, self._containedType ):
            raise TypeError( "object '%s' must be a %s" % ( repr( obj ),
self._containedType.__name__ ) )

        self._database.add( obj )
        setitem( self, self.data.__setitem__, key, obj )

    def __getitem__( self, key ):
        return self.data[ self._filterKey( key ) ]

    def __delitem__( self, key ):
        obj = self.data.pop( self._filterKey( key ) )
        self._database.delete( obj )
        uncontained( obj, self, obj.__name__ )

    def _updateItem( self, oldKey, newKey = None ):
        """implements L{IFundContainer.__setitem__}
        be specified"""
        oldKey = self._filterKey( oldKey )
        if newKey is not None:
            obj = self.data.pop( oldKey )
        else:
            obj = self.data[ oldKey ]
            
        self._database.update( obj )

        if newKey is not None:
            self.data[ self._filterKey( newKey ) ] = obj


    # need __getstate__, __setstate__ for ZODB persistence
    
    def __getstate__( self ):
        # we have to remember our tree-location
        return {
            '__name__' : self.__name__,
            '__parent__':self.__parent__,
            #'__annotations__': self.__annotations__
            }

    def __setstate__( self, dct = None ):
        if dct:
            self.__parent__ = dct[ '__parent__' ]
            self.__name__ = dct[ '__name__' ]
            self.data = {}
            
        self._database.connect()
        self._load()
        
    def _load( self ):
        for row in self._database:
            self.data[ row.getZopeKey() ] = row
            row.__parent__ = self

    def getAttributeNames( self ):
        return self._containedType.getAttributeNames()


class DBContained( Location ):
    """Object meant to be contained by DBContained.

    It knows own key, and notifies parent on update to any attribute but
    a 'magic' one (start and end w/ `__`). 

    @cvar __keys__: attributes in keys. If not set, key is all
attributes
        except magic ones
    @cvar __rowInterface__: an Interface that defines the attributes of
the object.
        If this is defined, it is used to validate data, and reject
attributes
        not in list.

    @warning: no action is taken on __del__

    >>> r = DBContained( a = 1, b = 2, c = 3 )
    >>> l = r.__dict__.items()
    >>> l.sort()
    >>> print l
    [('a', 1), ('b', 2), ('c', 3)]
    >>> r.getZopeKey()
    u'1_2_3'
    >>> s = DBContained( a = 1, b = 'spring', c = 'fall', __keys__ =
['c', 'a'] )
    >>> s.getZopeKey()
    u'fall_1'
    >>> class Parent:
    ...     def _updateItem( self, oldKey, newKey ):
    ...         print repr( oldKey )
    ...         print repr( newKey )
    ...
    >>> r.__parent__ = Parent()
    >>> r.a = 4
    u'1_2_3'
    u'4_2_3'
    >>> r.bb = 'winter'
    u'4_2_3'
    u'4_2_winter_3'
    >>> r.__x__ = 1

    >>> from zope.interface import Interface
    >>> from zope.schema import Int
    >>> class IR( Interface ):
    ...     a = Int( )
    ...     b = Int( )
    >>> class RR( DBContained ):
    ...     __rowInterface__ = IR
    >>> r = RR( a = 1, c = 2 )
    Traceback (most recent call last):
        ...
    TypeError: attr c not in row interface <InterfaceClass
cranedata.web.container.IR>
    >>> r = RR( a = 1, b = 2 )
    >>> RR.getAttributeNames()
    ('a', 'b')
    >>> r = RR( a = 'foo', b = 2 )
    Traceback (most recent call last):
    ...
    WrongType: ('foo', (<type 'int'>, <type 'long'>))

    We convert strings to unicode:
    
    >>> class IS( Interface ):
    ...     s = Text( )
    >>> class SS( DBContained ):
    ...     __rowInterface__ = IS
    >>> r = SS( s = 'hello' )
    >>> r.s
    u'hello'
    
    """

    implements( IDBContained )

    def __init__( self, **kw ):
        self.__dict__.update( kw )
        try:
            iface = self.__rowInterface__
        except AttributeError:
            pass
        else:
            fields = dict( iface.namesAndDescriptions( all = True ) )
            for attr, val in self.__dict__.iteritems():
                if self.isMagicAttribute( attr ):
                    continue
                field = fields.pop( attr )
                if field is None:
                    raise TypeError( "attr %s not in row interface %s" %
\
                                     ( attr, str( iface ) ) )
                
                # convert strings to unicode if required
                if isinstance( field, Text ) and isinstance( val, str ):
                    setattr( self, attr, unicode( val ) )
                # truncate datetime to date if required
                if isinstance( field, date ) and isinstance( val,
datetime ):
                    setattr(
                        self, attr, date( date.fromordinal(
val.toordinal() ) ) )
                               
                bound = field.bind( self )
                bound.validate( bound.get( self ) )

            # set defaults, or raise error when required and no default
            for fname, field in fields.iteritems():
                if not IMethod.providedBy( field ):
                    if IField.providedBy( field ):
                        if field.required:
                            if field.default:
                                setattr( self, fname, field.default )
                            else:
                                raise RequiredMissing( "missing: %s" %
fname )

                        else:
                            if field.missing_value:
                                setattr( self, fname,
field.missing_value )
                            else:
                                setattr( self, fname, None )
                    else:
                        setattr( self, fname, None )
                    
                
        super( DBContained, self ).__init__( ) 


    @staticmethod
    def isMagicAttribute( attr ):
        return attr.startswith( '__' ) and attr.endswith( '__' )

    def getZopeKeyAttributes( self ):
        try:
            keys = self.__keys__
        except AttributeError:
            keys = filter(
                lambda k: not self.isMagicAttribute( k ),
self.__dict__.keys() )
            keys.sort()
    
            
        return keys

    @classmethod
    def getAttributeNames( self ):
        return tuple( self.__rowInterface__.names( all = True ) )
        

    def getAttributeDictionary( self ):
        """return attr:val dictionary for 'non-magic' attributes.

        >>> r = DBContained( a = 1, b = 2, __keys__ = ( 'a', ), __foo__
= 23 )
        >>> sorted( r.getAttributeDictionary().items() )
        [('a', 1), ('b', 2)]

        """
        return dict( ifilter(
            lambda item: not self.isMagicAttribute( item[ 0 ] ),
            self.__dict__.iteritems() ) )

    def getZopeKey( self ):
        keys = self.getZopeKeyAttributes()
        key = [ str( getattr( self, col, u'None' ) ) for col in
self.getZopeKeyAttributes() ]
        if len( key ) > 0:
            return u'_'.join( key )
        else:
            return u'*'

    def __setattr__( self, attr, val ):
        oldKey = self.getZopeKey()
        self.__dict__[ attr ] = val
        # notify parent of change, but not for 'magic names':
        if ( not attr.startswith( '__' ) or not attr.endswith( '__' ) )
and \
               self.__parent__ != None:
            self.__parent__._updateItem( oldKey, self.getZopeKey() )


    def checkZopeKey( self, key ):
        assert key == self.getZopeKey()
        
    __name__ = property( getZopeKey, checkZopeKey )
            


_______________________________________________
Zope3-users mailing list
Zope3-users@zope.org
http://mail.zope.org/mailman/listinfo/zope3-users

Reply via email to