There is no relation() involved here. Every relation() related code is
commented right now.
(see attachment).
Andreas
On Tue, Dec 16, 2008 at 4:48 PM, Michael Bayer <[email protected]>wrote:
>
> your mappers are failing to compile (i.e. if you were to call
> compile_mappers() manually), and the stack trace is specific to a
> relation(), which you haven't illustrated in your setup.
>
>
> On Dec 16, 2008, at 7:31 AM, Andreas Jung wrote:
>
> > Hi,
> >
> > I have a fairly complex model using the declarative layer.
> > For a simple table 'fassung'
> >
> > Toolbox2=# \d fassung
> > Table "public.fassung"
> > Column | Type | Modifiers
> >
> > -------------+-----------------------
> > +----------------------------------------------------
> > id | bigint | not null default
> > nextval('"fassung_id_key"'::text)
> > nr | integer |
> > bezeichnung | character varying(50) |
> > Indexes:
> > "fassung_pkey" primary key, btree (id)
> >
> >
> > I habe the following definition
> >
> > 178 class Fassung(Base):$
> > 179 __tablename__ = 'fassung'
> > 180 __table_args__ = (
> > 181 { 'autoload' : True, }
> > 182 )
> >
> > A simple query fails badly from within a unittest. I have checked
> > the whole mode definition and could not find and further reference
> > to the 'Fassung' mapper or 'fassung' table.
> >
> > Bug or feature?
> >
> > Andreas
> >
> > -------
> >
> >
> >>
> > /local/HRS2/Devel/ajung/tb-dev/parts/modules-svn/toolbox/database/
> > tests/test_database.py(61)testSimpleSelects()
> > -> rows = session.query(mapper).limit(50).all()
> >
> > /home/ajung/.buildout/eggs/SQLAlchemy-0.5.0rc4-py2.4.egg/sqlalchemy/
> > orm/session.py(908)query()
> > -> return self._query_cls(entities, self, **kwargs)
> >
> > /home/ajung/.buildout/eggs/SQLAlchemy-0.5.0rc4-py2.4.egg/sqlalchemy/
> > orm/query.py(95)__init__()
> > -> self.__setup_aliasizers(self._entities)
> >
> > /home/ajung/.buildout/eggs/SQLAlchemy-0.5.0rc4-py2.4.egg/sqlalchemy/
> > orm/query.py(109)__setup_aliasizers()
> > -> mapper, selectable, is_aliased_class = _entity_info(entity)
> >
> > /home/ajung/.buildout/eggs/SQLAlchemy-0.5.0rc4-py2.4.egg/sqlalchemy/
> > orm/util.py(466)_entity_info()
> > -> mapper = class_mapper(entity, compile)
> >
> > /home/ajung/.buildout/eggs/SQLAlchemy-0.5.0rc4-py2.4.egg/sqlalchemy/
> > orm/util.py(543)class_mapper()
> > -> mapper = mapper.compile()
> >
> > /home/ajung/.buildout/eggs/SQLAlchemy-0.5.0rc4-py2.4.egg/sqlalchemy/
> > orm/mapper.py(679)compile()
> > -> mapper._post_configure_properties()
> >
> > /home/ajung/.buildout/eggs/SQLAlchemy-0.5.0rc4-py2.4.egg/sqlalchemy/
> > orm/mapper.py(701)_post_configure_properties()
> > -> prop.init(key, self)
> >
> > /home/ajung/.buildout/eggs/SQLAlchemy-0.5.0rc4-py2.4.egg/sqlalchemy/
> > orm/interfaces.py(404)init()
> > -> self.do_init()
> >
> > /home/ajung/.buildout/eggs/SQLAlchemy-0.5.0rc4-py2.4.egg/sqlalchemy/
> > orm/properties.py(579)do_init()
> > -> self._post_init()
> >
> > /home/ajung/.buildout/eggs/SQLAlchemy-0.5.0rc4-py2.4.egg/sqlalchemy/
> > orm/properties.py(838)_post_init()
> > -> self.backref.compile(self)
> >
> > /home/ajung/.buildout/eggs/SQLAlchemy-0.5.0rc4-py2.4.egg/sqlalchemy/
> > orm/properties.py(989)compile()
> > -> mapper._compile_property(self.key, relation);
> >
> > /home/ajung/.buildout/eggs/SQLAlchemy-0.5.0rc4-py2.4.egg/sqlalchemy/
> > orm/mapper.py(643)_compile_property()
> > -> prop.init(key, self)
> >
> > /home/ajung/.buildout/eggs/SQLAlchemy-0.5.0rc4-py2.4.egg/sqlalchemy/
> > orm/interfaces.py(404)init()
> > -> self.do_init()
> >
> > /home/ajung/.buildout/eggs/SQLAlchemy-0.5.0rc4-py2.4.egg/sqlalchemy/
> > orm/properties.py(578)do_init()
> > -> self._determine_local_remote_pairs()
> >
> > /home/ajung/.buildout/eggs/SQLAlchemy-0.5.0rc4-py2.4.egg/sqlalchemy/
> > orm/properties.py(814)_determine_local_remote_pairs()
> > -> self.local_side, self.remote_side = [util.OrderedSet(x) for x in
> > zip(*list(self.local_remote_pairs))]
> >
> > >
> > begin:vcard
> > fn:Andreas Jung
> > n:Jung;Andreas
> > org:ZOPYX Ltd. & Co. KG
> > adr;quoted-printable:;;Charlottenstr. 37/1;T=C3=BCbingen;;
> > 72070;Germany
> > email;internet:[email protected] <email%3binternet%[email protected]>
> > title:CEO
> > tel;work:+49-7071-793376
> > tel;fax:+49-7071-7936840
> > tel;home:+49-7071-793257
> > x-mozilla-html:FALSE
> > url:www.zopyx.com
> > version:2.1
> > end:vcard
> >
>
>
> >
>
--~--~---------~--~----~------------~-------~--~----~
You received this message because you are subscribed to the Google Groups
"sqlalchemy" group.
To post to this group, send email to [email protected]
To unsubscribe from this group, send email to
[email protected]
For more options, visit this group at
http://groups.google.com/group/sqlalchemy?hl=en
-~----------~----~----~----~------~----~------~--~---
##########################################################################
# HaufeCMS Toolbox
# (C) 2004-2008, Haufe Mediengruppe
##########################################################################
"""
The Toolbox2 database based on the declarative layer
implementation of SQLAlchemy >= 0.4.
Also requires z3c.sqlalchemy >= 1.3.7 + zope.sqlalchemy >= 0.3
"""
import time
import logging
from datetime import datetime
from dm.reuse import rebindFunction
from sqlalchemy import *
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import relation, synonym, sessionmaker, scoped_session
from sqlalchemy.orm import backref, object_session, MapperExtension
from sqlalchemy.exc import InvalidRequestError
from z3c.sqlalchemy import Model
def normalize(s):
""" German string normalizer """
s = s.replace('Ä', 'Ae')
s = s.replace('Ö', 'Oe')
s = s.replace('Ü', 'Ue')
s = s.replace('ä', 'ae')
s = s.replace('Ö', 'oe')
s = s.replace('Ü', 'ue')
s = s.replace('ß', 'ss')
return s
# asDict() will be attached to all mapper classes
# within getModel()
class MyDict(dict):
""" A dict supporting attribute-style access """
def __getattr__(self, name, default=None):
if name in self.keys():
return self.get(name, default)
return super(MyDict, self).__getattr__(name, default)
def asDict(self):
""" Returns current object as a dict"""
return MyDict((k,v)
for k, v in self.__dict__.iteritems()
if not k.startswith('_sa'))
class AccessChecker(MapperExtension):
""" Basically provides a check for the _readonly attribute. """
def before_update(self, mapper, connection, instance):
self._check_readonly(instance)
self._fix_tools(instance)
def before_delete(self, mapper, connection, instance):
self._check_readonly(instance)
def before_insert(self, mapper, connection, instance):
self._check_readonly(instance)
self._fix_tools(instance)
def _fix_tools(self, instance):
""" convert list of tuples (hidx, bezeichnung) into SQL code.
(SA workaround).
"""
# _fix_tools() can be called multiple times. Bail-out if
# instance.tools is already a '{...}' string
if isinstance(instance.tools, str):
return
def escape(s):
""" remove crap from bezeichnung """
s = s.replace('"', '')
return s
if instance.tools is None:
instance.tools = []
assert isinstance(instance.tools, list), 'instance.tools must be a list'
s = ','.join(['{"%s", "%s", "%s"}' % (tp[0],
escape(tp[1]),
tp[2]) for tp in instance.tools])
instance.tools = '{' + s + '}'
def _check_readonly(self, instance):
""" Check readonly flag of instance """
ro = getattr(instance, '_readonly', True)
if ro:
raise ValueError('Trying to update a read-only instance')
def modelProduktRelated(Base, model):
class ProduktView(Base):
__tablename__ = 'produkt'
__table_args__ = (
PrimaryKeyConstraint('id'),
{ 'autoload' : True, }
)
class ProduktGruppe(Base):
__tablename__ = 'produktgruppe'
__table_args__ = ( { 'autoload' : True, })
class ProduktHistorie(Base):
__tablename__ = 'historie_produkt'
__table_args__ = (
PrimaryKeyConstraint('id'),
{ 'autoload' : True, }
)
model.add('produkt', mapper_class=ProduktView)
model.add('produktgruppe', mapper_class=ProduktGruppe)
model.add('historie_produkt', mapper_class=ProduktHistorie)
def modelCommon(Base, model):
class Land(Base):
__tablename__ = 'land'
__table_args__ = ( { 'autoload' : True, })
class Zielgruppen(Base):
__tablename__ = 'zielgruppen'
__table_args__ = (
PrimaryKeyConstraint('id'),
{ 'autoload' : True, }
)
class Sachgebiet(Base):
__tablename__ = 'sachgebiet'
__table_args__ = (
{ 'autoload' : True, }
)
class Format(Base):
__tablename__ = 'format'
__table_args__ = (
PrimaryKeyConstraint('id'),
{ 'autoload' : True, }
)
class DTD(Base):
__tablename__ = 'dtd'
__table_args__ = (
PrimaryKeyConstraint('id'),
{ 'autoload' : True, }
)
class MetaGattung(Base):
__tablename__ = 'metagattung'
__table_args__ = (
PrimaryKeyConstraint('id'),
{ 'autoload' : True, }
)
class Gattung(Base):
__tablename__ = 'gattung'
__table_args__ = (
PrimaryKeyConstraint('id'),
ForeignKeyConstraint(['idmetagattung'], ['metagattung.id']),
ForeignKeyConstraint(['dtd'], ['dtd.kuerzel']),
{ 'autoload' : True, }
)
class Fassung(Base):
__tablename__ = 'fassung'
__table_args__ = (
{ 'autoload' : True, }
)
class Status(Base):
__tablename__ = 'status'
__table_args__ = (
PrimaryKeyConstraint('id'),
{ 'autoload' : True, }
)
class Systemvoraussetzungen(Base):
__tablename__ = 'systemvoraussetzungen'
__table_args__ = (
PrimaryKeyConstraint('id'),
{ 'autoload' : True, }
)
class StyleguideCompliant(Base):
__tablename__ = 'styleguide_compliant'
__table_args__ = (
{ 'autoload' : True, }
)
class Automail(Base):
__tablename__ = 'automail'
__table_args__ = (
PrimaryKeyConstraint('id'),
ForeignKeyConstraint(['hidx'], ['arbeitsmittel.hidx']),
{ 'autoload' : True, }
)
class DownloadReason(Base):
__tablename__ = 'download_reason'
__table_args__ = ( { 'autoload' : True, })
model.add('land', mapper_class=Land)
model.add('zielgruppen', mapper_class=Zielgruppen)
model.add('sachgebiet', mapper_class=Sachgebiet)
model.add('dtd', mapper_class=DTD)
model.add('gattung', mapper_class=Gattung)
model.add('metagattung', mapper_class=MetaGattung)
model.add('format', mapper_class=Format)
model.add('fassung', mapper_class=Fassung)
model.add('status', mapper_class=Status)
model.add('Systemvoraussetzungen', mapper_class=Systemvoraussetzungen)
model.add('styleguide_compliant', mapper_class=StyleguideCompliant)
model.add('automail', mapper_class=Automail)
model.add('download_reason', mapper_class=DownloadReason)
def modelAMDependent(Base, model):
class ArbeitsmittelHistorie(Base):
__tablename__ = 'historie'
__table_args__ = (
PrimaryKeyConstraint('id'),
ForeignKeyConstraint(['hidx'], ['arbeitsmittel.hidx']),
{ 'autoload' : True, }
)
class ArbeitsmittelDownloadHistorie(Base):
__tablename__ = 'download_historie'
__table_args__ = (
PrimaryKeyConstraint('id'),
ForeignKeyConstraint(['reason'], ['download_reason.id']),
ForeignKeyConstraint(['hidx'], ['arbeitsmittel.hidx']),
{ 'autoload' : True, }
)
class ArbeitsmittelSystemvoraussetzungen(Base):
__tablename__ = 'arbeitsmittel_systemvoraussetzungen'
__table_args__ = (
PrimaryKeyConstraint('id'),
ForeignKeyConstraint(['arbeitsmittel_hidx'], ['arbeitsmittel.hidx']),
{ 'autoload' : True, }
)
class RevisedArbeitsmittelVersions(Base):
__tablename__ = 'revised_arbeitsmittel_versions'
__table_args__ = (
PrimaryKeyConstraint('hidx', 'version'),
{ 'autoload' : True, }
)
class VisitedTools(Base):
__tablename__ = 'visited_tools'
__table_args__ = (
PrimaryKeyConstraint('id'),
ForeignKeyConstraint(['hidx'], ['arbeitsmittel.hidx']),
{ 'autoload' : True, }
)
class BookmarkedTools(Base):
__tablename__ = 'bookmarked_tools'
__table_args__ = (
PrimaryKeyConstraint('id'),
ForeignKeyConstraint(['hidx'], ['arbeitsmittel.hidx']),
{ 'autoload' : True, }
)
model.add('bookmarked_tools', mapper_class=BookmarkedTools)
model.add('visited_tools', mapper_class=VisitedTools)
model.add('revised_arbeitsmittel_versions', mapper_class=RevisedArbeitsmittelVersions)
model.add('historie', mapper_class=ArbeitsmittelHistorie)
model.add('download_historie', mapper_class=ArbeitsmittelDownloadHistorie)
model.add('arbeitsmittel_systemvoraussetzungen', mapper_class=ArbeitsmittelSystemvoraussetzungen)
def modelArbeitsmittel(Base, model):
class Arbeitsmittel(Base):
__tablename__ = 'arbeitsmittel'
__table_args__ = (
ForeignKeyConstraint(['hidx'], ['arbeitsmittel.hidx']),
{ 'autoload' : True, })
# column redefinitions needed for relation()s below
idfassung = Column(Integer, ForeignKey('fassung.id'))
format = Column(String, ForeignKey('format.format'))
status = Column(Integer, ForeignKey('status.id'))
idsachgebiet = Column(Integer, ForeignKey('sachgebiet.id'))
idzielgruppe = Column(Integer, ForeignKey('zielgruppen.id'))
idgattung = Column(Integer, ForeignKey('gattung.id'))
styleguide_compliant = Column(Integer, ForeignKey('styleguide_compliant.id'))
# historie = relation(ArbeitsmittelHistorie,
# backref='arbeitsmittel')
# download_historie = relation(ArbeitsmittelDownloadHistorie,
# backref='arbeitsmittel')
# sachgebiet2 = relation(Sachgebiet,
# primaryjoin=Sachgebiet.id==idsachgebiet,
# foreign_keys=[Sachgebiet.id])
# gattung2 = relation(ArbeitsmittelGattung,
# primaryjoin=ArbeitsmittelGattung.id==idgattung,
# foreign_keys=[ArbeitsmittelGattung.id])
# status2 = relation(ArbeitsmittelStatus,
# primaryjoin=status==ArbeitsmittelStatus.status)
# fassung2 = relation(ArbeitsmittelFassung,
# primaryjoin=idfassung==ArbeitsmittelFassung.id)
# format2 = relation(ArbeitsmittelFormat,
# primaryjoin=format==ArbeitsmittelFormat.format)
# zielgruppe2 = relation(ArbeitsmittelZielgruppen,
# primaryjoin=idzielgruppe==ArbeitsmittelZielgruppen.id)
#
#
# systemvoraussetzungen2 = relation(ArbeitsmittelSystemvoraussetzungen)
# styleguide_compliant2 = relation(ArbeitsmittelStyleguideCompliant)
#
@property
def gattung_gattung(self):
return self.gattung2.gattung
model.add('arbeitsmittel', mapper_class=Arbeitsmittel)
def modelHierarchies(Base, model):
class Hierarchies(Base):
__tablename__ = 'hierarchies'
__table_args__ = ( { 'autoload' : True, })
__mapper_args__ = {'extension': AccessChecker()}
# id = Column(Integer, primary_key=True)
# parent_id = Column(Integer, ForeignKey('hierarchies.id'))
# hierarchyshare_id = Column(Integer, ForeignKey('hierarchies.id'))
# pos = Column(Integer)
#
# _children = relation('Hierarchies',primaryjoin=parent_id==id,
# order_by=pos,
# cascade="all",
# backref=backref("parent_node", remote_side='Hierarchies.id')
# )
#
# parent = relation('Hierarchies',
# primaryjoin='Hierarchies.parent_id==Hierarchies.id',
# # do we need the backref here?
# backref=backref("child_node", remote_side='Hierarchies.id'),
# uselist=False,
# )
#
# subscribed_by = relation('Hierarchies',
# primaryjoin='Hierarchies.hierarchyshare_id==Hierarchies.id',
# backref=backref("subscriber", remote_side='Hierarchies.hierarchyshare_id'),
# uselist=True,
# )
_readonly = False
def __str__(self):
return "<%s %s>" % (self.__class__, self.asDict())
__repr__ = __str__
@property
def children(self):
""" Return all children hierarchies (respecting shares) """
lst = list()
for n in self._children:
# pointer (id) of a shared node back to its
# original node within the current tree
n.referenced_by_id = None
if n.hierarchyshare_id is None:
# node does not reference another node (no share)
lst.append(n)
else:
# node points to another node -> obtain the referenced
# node and return it as current child instead of the original
# node
session = object_session(self)
try:
shared_node = session.query(Hierarchies).filter_by(id=n.hierarchyshare_id).one()
except InvalidRequestError:
# silently(!) ignore reference to non-existing nodes
continue
shared_node.referenced_by_id = n.id
lst.append(shared_node)
return lst
@property
def referencing_parent(self):
""" Returns the parent node of the current node if
'referenced_by_id' is set != None.
"""
if not self.is_shared:
raise ValueError('referencing_parent() is only applicable on shared nodes (id=%d)' % self.id)
referenced_by_id = getattr(self, 'referenced_by_id', None)
if referenced_by_id is not None:
session = object_session(self)
other_node = session.query(HierarchiesNode).filter_by(id=referenced_by_id).one()
return other_node.parent
raise ValueError('This node is being referenced (id=%d)' % self.id)
@property
def sorted_tools(self):
""" Returns all tools of the current hierarchies node
(no tools of sub-hierarchies included) as list of
their metadata as dicts.
"""
# get hold of the HIDX of all tools
all_hidxs =[hidx for hidx, bez, aedat_str in (self.tools or [])]
# first obtain the metadata of all tools from the 'arbeitsmittel' table
# and store the results as a dict <HIDX> -> <dict of metadata>
tools_metadata = dict()
if all_hidxs:
session = object_session(self)
## ATT: switching to arbeitsmittel_view here (ajung, 17.10.2008)
## THIS MIGHT CAUSE SOME TROUBLE (must be tested)
#clause = [ArbeitsmittelRaw.c.hidx==hidx for hidx in all_hidxs]
#rows = session.query(ArbeitsmittelRaw).filter(or_(*clause)).all()
clause = [Arbeitsmittel.hidx==hidx for hidx in all_hidxs]
rows = session.query(Arbeitsmittel).filter(or_(*clause)).all()
for r in rows:
tools_metadata[r.hidx] = r.asDict()
# now iterate over all tools
lst = list()
for hidx, bez, aedat_str in (self.tools or []):
# get hold of the original metadata and create a copy
# in order to modify it without side-effects.
metadata = tools_metadata.get(hidx, None)
# Silently discard references to (deleted) non-existing tools
if metadata is None:
continue
d = MyDict(metadata)
# 'custom_bezeichnung_equal_tool_bezeichnung' indicates that
# both titles are equal. It does _not_ indicate any related
# the custom title itself.
d['custom_bezeichnung_equal_tool_bezeichnung'] = False
if len(bez) > 0 and bez == metadata['bezeichnung']:
d['custom_bezeichnung_equal_tool_bezeichnung'] = True
# 'uses_custom_bezeichnung' indicates that the user once
# changed changed the 'bezeichnung'. It does _not_ indicate
# that the 'bezeichnung' of the tool within a hierarchy is
# different from the one of the tool's metadata.
d['uses_custom_bezeichnung'] = False
if bez != '':
# mix-in custom bezeichnung if different
d['bezeichnung'] = bez
d['uses_custom_bezeichnung'] = True
lst.append(d)
if self.sortierung == 'Given':
return lst
elif self.sortierung == 'SortA':
lst.sort(lambda d1,d2: cmp(normalize(d1['bezeichnung']), normalize(d2['bezeichnung'])))
return lst
else:
raise ValueError('Unkown value for sortierung (%s)' % self.sortierung)
@property
def is_shared(self):
""" Return True if the the current node is part of a share.
False otherwise.
"""
current = self
while current.parent is not None:
if getattr(current, 'referenced_by_id', None) is not None:
return True
current = current.parent
return False
@property
def all_tools(self):
""" return all tools within the current subtree
as flat list of tuples (hidx, bez).
"""
tools = []
self._iterateTools(tools)
return tools
def extended_tool_search(self, gattung, last_tb_change, last_tb_change_end, tbstate, existsindlc):
""" Extended search for tools having the specified property """
# we have all tools so that we can perform a one time iteration
# to filter desired ones
result = []
# format date and time
start = end = None
if last_tb_change:
dtsplitted = [int(val) for val in last_tb_change.split('.')]
start = datetime.datetime(dtsplitted[2], dtsplitted[1], dtsplitted[0])
if last_tb_change_end:
dtsplitted = [int(val) for val in last_tb_change_end.split('.')]
end = datetime.datetime(dtsplitted[2], dtsplitted[1], dtsplitted[0])
elif not last_tb_change_end and last_tb_change:
end = datetime.datetime.now()
# iterate thru all tools - could be faster
for tool in self.all_tools:
toolmatch = False
wrappedtool = TBDLCSync(None, tool.hidx)
if int(gattung) == wrappedtool.tb.getValue('gattung').id or int(gattung) == -1:
if int(tbstate) == wrappedtool.tb.getValue('status') or int(tbstate) == -1:
if start and end:
toolmatch = start < wrappedtool.tb.getValue('aedat') < end
else: toolmatch = True
tool['exists_in_dlc'] = False
if existsindlc and toolmatch == True:
match = tool['exists_in_dlc'] = wrappedtool.tbDocumentExistsInDLC()
if int(existsindlc) == 1 and match is False:
toolmatch = False
elif int(existsindlc) == 2 and match is True:
toolmatch = False
if toolmatch is True:
if tool['exists_in_dlc']:
tool['ToolboxLastSync'] = wrappedtool.getLastSyncDate()
result.append(tool)
return result
def _iterateTools(self, tools):
""" helper method to iterate over all tools in the subtree """
tools.extend(self.sorted_tools)
for child in self.children:
child._iterateTools(tools)
@property
def all_hierarchies(self):
""" Return all hierarchies within the current subtree as a flat list """
hierarchies = []
self._iterateHierarchies(self, hierarchies)
return hierarchies
def _iterateHierarchies(self, node, hierarchies):
hierarchies.append(node)
for child in self.children:
child._iterateHierarchies(child, hierarchies)
@property
def all_hierarchies_with_depth(self):
""" Return all hierarchies within the current subtree as a flat list """
hierarchies = []
self._iterateHierarchiesWithDepth(self, hierarchies, 0)
return hierarchies
def _iterateHierarchiesWithDepth(self, node, hierarchies, level):
hierarchies.append((node, level))
for child in self.children:
child._iterateHierarchiesWithDepth(child, hierarchies, level+1)
@property
def all_hierarchies_unaliased(self):
""" Return all hierarchies as a flat list """
hierarchies = []
self._iterateHierarchiesUnaliased(self, hierarchies)
return hierarchies
def _iterateHierarchiesUnaliased(self, node, hierarchies):
hierarchies.append(node)
for child in self._children:
child._iterateHierarchiesUnaliased(child, hierarchies)
def getHierarchyById(self, id):
""" return a hierarchy node from the subtree given by its id """
try:
return [n for n in self.all_hierarchies_unaliased if n.id == id][0]
except IndexError:
return [n for n in self.all_hierarchies if n.id == id][0]
@property
def root(self):
""" return the root node of the tree """
parents = [x for x in self.parents]
if len(parents) == 0:
return self
else:
return parents[-1]
@property
def parents(self):
""" an iterator over all parents """
current = self
while current.parent is not None:
current = current.parent
yield current
@property
def parent_hierarchies(self):
""" return a list of the 'bezeichnung' value for all parent nodes """
return list(reversed([p.bezeichnung for p in self.parents])) + [self.bezeichnung]
def tools_clear(self):
""" Remove all tools """
self.tools = list()
def addTool(self, hidx, bezeichnung='', aedat=''):
"""Add a new tool"""
assert isinstance(self.tools, list), 'self.tools must be a list'
if isinstance(aedat, datetime.datetime):
aedat = aedat.strftime('%d.%m.%Y %H:%M:%S')
assert isinstance(aedat, str), 'aedat must be a string'
assert hidx.startswith('HI'), 'not a valid HIDX (%s)' % hidx
tools = self.tools
tools.append((hidx, bezeichnung, aedat))
self.tools = tools
def removeTool(self, hidx):
""" Remove tool(s) by its HIDX """
assert isinstance(self.tools, list), 'self.tools must be a list'
self.tools = [tp for tp in self.tools if tp[0] != hidx]
@property
def lidx_breadcrumbs(self):
""" Return hierarchies as string "<LIDX...>/<LIDX...>/..."""
return '/'.join([d['lidx'] for d in self.parent_ids])
@property
def parent_ids(self):
""" Returns a sorted list of dicts of the hierarchy nodes down to
the current node. Each dict has keys 'lidx', 'name' and 'pid'.
The main purpose of this method is to be used together with
the query builder where we need unique traversal path for each
hierarchy node (this is required if a product subscribes to the
_same_ hierarchy of another product. In such cases all subscribed
nodes have the same LIDX but different traversal paths.
"""
session = object_session(self)
Hierarchies = self.__class__ # we can't use getMapper()
lst = list()
current = self
running = True
while running:
if current is not None:
lst.append(dict(lidx=current.linkindex,
name=current.bezeichnung,
pid=current.pid))
if current.is_shared:
if current.parent.is_shared:
parent = current .parent
else:
# For a share we must get hold of node within the original
# referencing to the share. The 'id' of the related hierarchies
# node is set within the children() implementation.
referenced_by_id = current.referenced_by_id
# now get hold of the referencing node
h = session.query(Hierarchies).filter_by(id=referenced_by_id).one()
# and use its parent
parent = h.parent
else:
parent = current.parent
current = parent # not shared
else:
running = False
lst.reverse()
return lst
model.add('hierarchies', mapper_class=Hierarchies)
def getModel(metadata):
model = Model()
Base = declarative_base(metadata=metadata)
Base.asDict = rebindFunction(asDict)
modelProduktRelated(Base, model)
modelCommon(Base, model)
modelAMDependent(Base, model)
modelHierarchies(Base, model)
modelArbeitsmittel(Base, model)
return model