On Sep 3, 2013, at 8:47 AM, Paul Balomiri <[email protected]> wrote:
> I would like to install
>
> event.listen(list, 'append', append_listener)
> event.listen(list, 'remove', rm_listener)
>
> on those lists, such that the GroupByKeyCollection can modify added objects
> according to the relationship it implements:
> * set the appropiate foreign key constraints
> * insert a removed object with it's new value for the key attribute after a
> change (announced by append_listener)
> * reset the fks upon item removal.
using event.listen with GBK doesn't make sense. events can only be used with
specific target types, the "remove" "append" events only apply to an
ORM-produced InstrumentedAttribute, such as Person._addresses_by_role here
(note, we mean the class-bound attribute, not the collection on an instance).
There is no need to use event.listen with the collection itself, as
remove/append are produced originally by the add()/remove() methods on GBK
itself; any extra logic which should take place would be invoked directly from
there (and in fact my original example fails to fire off the event with
remove()).
Additionally, all the usage of prepare_instrumentation() etc. should not be
necessary, that's all internal stuff which is called automatically.
As mentioned before, the behavior of this collection is completely outside the
realm of a "normal" collection so it needs to implement the append/remove
events directly, which isn't something a new user to SQLAlchemy would typically
be able to handle without a much deeper understanding of how the attribute
system works.
I've implemented your test case as below as well as some other variants in
association with the original code I gave you - for the "remove" case I've
added the necessary code to the custom collection. All foreign key constraints
are set correctly as a function of the ORM's normal operation, and as far as
"reset", when an association between Person and Address is removed, we want to
just delete the association so cascade is used for that. I'm not sure what
"insert a removed object with it's new value for the key attribute after a
change" means; add a test to the TestPA class illustrating the behavior you
want and I'll add it.
from sqlalchemy import *
from sqlalchemy.orm import *
from sqlalchemy.ext.declarative import declarative_base
import collections
from sqlalchemy.orm.collections import collection, collection_adapter
from sqlalchemy.ext.associationproxy import association_proxy,
_AssociationCollection
Base = declarative_base()
class GroupByKeyCollection(collections.defaultdict):
def __init__(self, keyfunc):
super(GroupByKeyCollection, self).__init__(list)
self.keyfunc = keyfunc
@collection.appender
def add(self, value, _sa_initiator=None):
key = self.keyfunc(value)
self[key].append(value)
@collection.remover
def remove(self, value, _sa_initiator=None):
key = self.keyfunc(value)
adapter = collection_adapter(self)
adapter.fire_remove_event(value, None)
self[key].remove(value)
@collection.internally_instrumented
def __setitem__(self, key, value):
adapter = collection_adapter(self)
# the collection API usually provides these events transparently, but
due to
# the unusual structure, we pretty much have to fire them ourselves
# for each item.
for item in value:
item = adapter.fire_append_event(item, None)
collections.defaultdict.__setitem__(self, key, value)
@collection.internally_instrumented
def __delitem__(self, key):
adapter = collection_adapter(self)
for item in value:
adapter.fire_remove_event(item, None)
collections.defaultdict.__delitem__(self, key, value)
@collection.iterator
def iterate(self):
for collection in self.values():
for item in collection:
yield item
@collection.converter
def _convert(self, target):
for collection in target.values():
for item in collection:
yield item
def update(self, k):
raise NotImplementedError()
class AssociationGBK(_AssociationCollection):
def __init__(self, lazy_collection, creator, value_attr, parent):
getter, setter = parent._default_getset(parent.collection_class)
super(AssociationGBK, self).__init__(
lazy_collection, creator, getter, setter, parent)
def _create(self, key, value):
return self.creator(key, value)
def _get(self, object):
return self.getter(object)
def _set(self, object, key, value):
return self.setter(object, key, value)
def __getitem__(self, key):
return [self._get(item) for item in self.col[key]]
def __setitem__(self, key, value):
self.col[key] = [self._create(key, item) for item in value]
def add(self, key, item):
self.col.add(self._create(key, item))
def remove(self, key, item):
for i, val in enumerate(self[key]):
if val == item:
self.col.remove(self.col[key][i])
return
raise ValueError("value not in list")
def items(self):
return ((key, [self._get(item) for item in self.col[key]])
for key in self.col)
def update(self, kw):
for key, value in kw.items():
self[key] = value
def clear(self):
self.col.clear()
def copy(self):
return dict(self.items())
def __repr__(self):
return repr(dict(self.items()))
# ------------------------------------------------------------------------------
Base = declarative_base()
class Person(Base):
__tablename__ = 'person'
id = Column(Integer, primary_key=True)
_addresses_by_role = relationship("PersonToAddress",
cascade="all, delete-orphan",
collection_class=
lambda: GroupByKeyCollection(
keyfunc=lambda item: item.role
)
)
addresses_by_role = association_proxy(
"_addresses_by_role",
"address",
proxy_factory=AssociationGBK,
creator=lambda k, v: PersonToAddress(role=k,
address=v))
class PersonToAddress(Base):
__tablename__ = 'person_to_address'
id = Column(Integer, primary_key=True)
person_id = Column(Integer, ForeignKey('person.id'))
address_id = Column(Integer, ForeignKey('address.id'))
role = Column(String)
person = relationship("Person", backref="p2as")
address = relationship("Address")
class Address(Base):
__tablename__ = 'address'
id = Column(Integer, primary_key=True)
name = Column(String)
import unittest
class TestPA(unittest.TestCase):
def setUp(self):
self.e = create_engine("sqlite://", echo=True)
Base.metadata.create_all(self.e)
self.sess = Session(self.e)
def tearDown(self):
self.sess.close()
self.e.dispose()
def _assertAssociated(self, p1, a1, p2a):
# custom collection
self.assertEquals(p1._addresses_by_role["home"], [p2a])
# association proxy
self.assertEquals(p1.addresses_by_role["home"], [a1])
# direct collection
self.assertEquals(p1.p2as, [p2a])
# foreign keys set correctly
self.assertEquals(p2a.person_id, p1.id)
self.assertEquals(p2a.address_id, a1.id)
def _assertDeassociated(self, p1, a1, p2a):
# when deassociating p1 from a1, there's no more association.
# we want p2a to be deleted:
self.assertEquals(self.sess.query(PersonToAddress).count(), 0)
# list is empty
self.assertEquals(p1.addresses_by_role["home"], [])
# direct collection empty
self.assertEquals(p1.p2as, [])
def test_associate_via_collection(self):
sess = self.sess
p1 = Person()
a1 = Address(name="Bucharest")
p2a = PersonToAddress(address=a1, role="home")
sess.add(p1)
# associate using the collection
p1._addresses_by_role.add(p2a)
sess.commit()
self._assertAssociated(p1, a1, p2a)
def test_associate_via_scalar(self):
sess = self.sess
p1 = Person()
a1 = Address(name="Bucharest")
# here, p2a already refers to a1/p1, the "_addresses_by_role"
# will be set up when it loads after a commit
p2a = PersonToAddress(address=a1, person=p1, role="home")
sess.add(p1)
sess.commit()
self._assertAssociated(p1, a1, p2a)
def test_associate_via_association_proxy(self):
sess = self.sess
p1 = Person()
a1 = Address(name="Bucharest")
sess.add(p1)
# associate using the association proxy
p1.addresses_by_role.add("home", a1)
sess.commit()
p2a = p1._addresses_by_role["home"][0]
self._assertAssociated(p1, a1, p2a)
def test_deassociate(self):
sess = self.sess
p1 = Person()
a1 = Address(name="Bucharest")
sess.add(p1)
p1.addresses_by_role.add("home", a1)
sess.commit()
p2a = p1._addresses_by_role["home"][0]
self._assertAssociated(p1, a1, p2a)
# now deassociate
p1.addresses_by_role.remove("home", a1)
sess.commit()
self._assertDeassociated(p1, a1, p2a)
if __name__ == '__main__':
unittest.main()
signature.asc
Description: Message signed with OpenPGP using GPGMail
