I want to achieve something like this:
Base = declarative_base()
# the abstract base, no table generated for it
class TreeNode(Base):
__abstract__ = True
parent = Column(ForeignKey('child_class_table.id'), nullable=False)
def is_root(self):
pass
def get_parent(self):
pass
# more methods
# the child, mapped to a table(in this case named 'discipline')
class Discipline(TreeNode):
__tablename__ = 'discipline'
id = Column(Integer, primary_key=True)
name = Column(Unicode(500), nullable=False)
# a generated auxiliary table for the child(in this case named
'discipline_closure')
class DisciplineClosure(Base):
__tablename__ = 'discipline_closure'
ancestor_id = Column(ForeignKey('discipline.id'), nullable=False)
descendant_id = Column(ForeignKey('discipline.id'), nullable=False)
depth = Column(Integer, nullable=False)
Is this possible in SQLAlchemy? How to implement it ?
Or is there an another way to implement my closure table ?
My Django implementation Attached, but I wanna do it using SQLAlchemy.
--
SQLAlchemy -
The Python SQL Toolkit and Object Relational Mapper
http://www.sqlalchemy.org/
To post example code, please provide an MCVE: Minimal, Complete, and Verifiable
Example. See http://stackoverflow.com/help/mcve for a full description.
---
You received this message because you are subscribed to the Google Groups
"sqlalchemy" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To post to this group, send email to [email protected].
Visit this group at https://groups.google.com/group/sqlalchemy.
For more options, visit https://groups.google.com/d/optout.
# from __future__ import unicode_literals
from django.db import models
from django.db.models import Q, Max
from django.db.models.base import ModelBase
from django.utils.six import with_metaclass
import re
def _closure_unicode(self):
return "\'%s\' -> \'%s\', depth: %d" % \
(self.ancestor, self.descendant, self.depth)
# this function is stolen from stack overflow
def camel_to_snake_case(name):
s1 = re.sub('(.)([A-Z][a-z]+)', r'\1_\2', name)
return re.sub('([a-z0-9])([A-Z])', r'\1_\2', s1).lower()
def _remove_ancestor_cls(node):
Closure = node._closure
ancs = node.get_ancestors()
descs = node.get_descendants(include_self=True)
for anc in ancs:
for desc in descs:
Closure.objects.filter(
Q(ancestor=anc) & Q(descendant=desc)
).delete()
def _create_cls(parent, child):
Closure = parent._closure
anc_cls = Closure.objects.filter(descendant=parent)
descs = child.get_descendants(include_self=True)
for desc in descs:
for cl in anc_cls:
new_cl = Closure()
new_cl.ancestor = cl.ancestor
new_cl.descendant = desc
new_cl.depth = Closure.objects.get(
Q(ancestor=child) & Q(descendant=desc)
).depth + cl.depth + 1
new_cl.save()
class TreeNodeMeta(ModelBase):
""" Metaclass of TreeNode, creates a closure table
for each subclass of TreeNode
"""
def __init__(cls, name, bases, dct):
""" creates a closure table for each subclass of TreeNode
[cls] is the subclass of TreeNode
"""
super(TreeNodeMeta, cls).__init__(name, bases, dct)
# not gonna create closure table for TreeNode itself,
# only for its subclasses
if cls.__module__ == __name__:
return
meta_attrs = {
'unique_together': (('ancestor', 'descendant'),)
}
meta_attrs['db_table'] = '%s_closure' \
% camel_to_snake_case(cls.__name__)
closure = type(
'%sClosure' % cls.__name__, # class name
(models.Model, ), # base classes
{ # attributes
'ancestor': models.ForeignKey(
cls.__name__,
related_name='_as_ancestor',
on_delete=models.CASCADE
),
'descendant': models.ForeignKey(
cls.__name__,
related_name='_as_descendant',
on_delete=models.CASCADE
),
'depth': models.IntegerField(),
'__module__': cls.__module__,
'__unicode__': _closure_unicode,
'Meta': type(str('Meta'), (object,), meta_attrs),
}
)
cls._closure = closure
class TreeNode(with_metaclass(TreeNodeMeta, models.Model)):
""" For tree-based models. Make your model inherit from it.
It creates a closure table for each subclass. The closure table
maintains ancestor-descendant relationships for the tree nodes
"""
class Meta:
abstract = True
parent = models.ForeignKey('self', null=True, on_delete=models.CASCADE)
def save(self, *args, **kwargs):
""" Saves the tree node into database.
When saving a newly created node, except for the root, a valid
parent must be specified, or save() will raise AttributeError.
For an exsisting node, use 'become_child_of' method to change
parent, rather than alter 'parent' property directly.
"""
Closure = self._closure
# create = self.pk is None # what if the pk has default value ?
create = not self.__class__.objects.filter(pk=self.pk).exists()
if create:
not_root = Closure.objects.exists()
if not_root: # check parent
if not hasattr(self, 'parent') or self.parent == None:
raise AttributeError('parent not set')
else: # throw if self.parent is not in the closure table
qs = Closure.objects.filter( Q(ancestor=self.parent)
| Q(descendant=self.parent) )
if qs.count() == 0:
raise AttributeError('invalid parent')
ret = super(TreeNode, self).save(*args, **kwargs)
# set ancestor-descendant relationship for each ancestor
if not_root:
ancestor_cls = Closure.objects.filter(descendant=self.parent)
for cl in ancestor_cls:
new_cl = Closure()
new_cl.ancestor = cl.ancestor
new_cl.descendant = self
new_cl.depth = cl.depth + 1
new_cl.save()
cl = Closure()
cl.ancestor = self
cl.descendant = self
cl.depth = 0
cl.save()
return ret
else: # update
if self.parent != self.get_parent():
if self.is_child_of(self.parent):
return super(TreeNode, self).save(*args, **kwargs)
else:
raise AttributeError(
'You should not re-parent a node by setting ' \
'its \'parent\' property directly. ' \
'Use \'become_child_of\' method instead')
else:
return super(TreeNode, self).save(*args, **kwargs)
def delete(self, *args, **kwargs):
""" Delete the node and its desendants.
May not be called when deleting on QuerySet or
cascade delete is triggered
"""
Closure = self._closure
descs = self.get_descendants(include_self=True)
for desc in descs:
Closure.objects.filter( Q(ancestor=desc)
| Q(descendant=desc)).delete()
# descendants are cascade deleted
super(TreeNode, self).delete(*args, **kwargs)
def is_root(self):
""" Returns True if the node is root node.
"""
return not self.get_ancestors().exists()
@classmethod
def get_root(cls):
""" Returns the root node if it exists, otherwise returns None.
"""
if cls.objects.exists():
Closure = cls.objects.all()[0]._closure
else:
return None
max_depth = Closure.objects.all().aggregate(Max('depth'))['depth__max']
return Closure.objects.filter(depth=max_depth)[0].ancestor
def get_parent(self):
""" Returns the parent node, returns None for the root.
"""
ancs = self.get_ancestors(depth=1)
if ancs.exists():
return ancs[0]
else:
return None
def get_children(self, include_self=False):
return self.get_descendants(depth=1, include_self=include_self)
def get_ancestors(self, depth=None, include_self=False):
condi = Q(descendant=self)
if depth is not None:
condi &= Q(depth__lte=depth)
if not include_self:
condi &= Q(depth__gte=1)
qs = self._closure.objects.filter(condi)
return self.__class__.objects.filter(_as_ancestor__in=qs)
def get_descendants(self, depth=None, include_self=False):
condi = Q(ancestor=self)
if depth is not None:
condi &= Q(depth__lte=depth)
if not include_self:
condi &= Q(depth__gte=1)
qs = self._closure.objects.filter(condi)
return self.__class__.objects.filter(_as_descendant__in=qs)
def is_child_of(self, node):
condi = Q(ancestor=node) & Q(descendant=self) & Q(depth=1)
return self._closure.objects.filter(condi).exists()
def is_parent_of(self, node):
return node.is_child_of(self)
def is_ancestor_of(self, node):
condi = Q(ancestor=self) & Q(descendant=node) & Q(depth__gte=1)
return self._closure.objects.filter(condi).exists()
def is_descendant_of(self, node):
condi = Q(ancestor=node) & Q(descendant=self) & Q(depth__gte=1)
return self._closure.objects.filter(condi).exists()
def become_child_of(self, node):
new_parent = node
if new_parent == self.get_parent():
return
if new_parent.is_descendant_of(self):
_remove_ancestor_cls(new_parent)
old_parent = self.get_parent()
_remove_ancestor_cls(self)
_create_cls(parent=new_parent, child=self)
if old_parent is not None:
_create_cls(parent=old_parent, child=new_parent)
new_parent.parent = old_parent
new_parent.save()
else:
_remove_ancestor_cls(self)
_create_cls(parent=new_parent, child=self)
self.parent = new_parent
self.save()