Attached the whole file
On Thursday, June 7, 2018 at 7:01:21 PM UTC-5, HP3 wrote:
>
> Hello
>
>
> Having difficulty with CTE and @hybrid_property.expression on a adjacency
> list model.
>
>
> class P2B(Base):
> __tablename__ = 'p2bases'
> id = Column(Integer, primary_key=True)
> classname = Column(String)
> parent_id = Column(
> Integer,
> ForeignKey('p2bases.id', ondelete='CASCADE'),
> index=True
> )
>
> parent = relationship(
> 'P2B',
> primaryjoin='P2B.parent_id == P2B.id',
> foreign_keys='P2B.id',
> uselist=False
> )
>
> __mapper_args__ = {
> 'polymorphic_identity': 'P2B',
> 'polymorphic_on': classname
> }
>
> @hybrid_property
> def ancestors(self):
> _ancestors = []
> parent = self.parent
> while parent is not None:
> _ancestors.append(parent)
> parent = parent.parent
> return _ancestors
>
> @ancestors.expression
> def ancestors(cls):
> cte = select([P2B.id, P2B.parent_id]) \
> .where(P2B.id == cls.id) \ # <<<<<< Based on Example1
> .cte(name='cte', recursive=True)
>
> cte = cte.union(
> select([P2B.id, P2B.parent_id])
> .where(P2B.id == cte.c.parent_id)
>
> )
> return cte
>
>
>
>
>
>
> The issue I am facing is that the SQL statement for ancestors expression
> becomes this
>
>
>
>
> WITH RECURSIVE cte(id, parent_id) AS
>
> (SELECT p2bases.id AS id, p2bases.parent_id AS parent_id
>
> FROM p2bases
>
> WHERE p2bases.id = p2bases.id UNION SELECT p2bases.id AS id,
> p2bases.parent_id
> AS parent_id
>
> FROM p2bases, cte
>
> WHERE p2bases.id = cte.parent_id)
>
> SELECT p2bases.id AS p2bases_id, p2bases.classname AS p2bases_classname,
> p2bases.position AS p2bases_position, p2bases.parent_id AS
> p2bases_parent_id,
> cte.id AS cte_id, cte.parent_id AS cte_parent_id
>
> FROM p2bases, cte
>
> WHERE p2bases.id = %(id_1)s
>
> {'id_1': 1}
>
>
>
> Notice that `P2B.id == cls.id` in the cte becomes `p2bases.id = p2bases.id
> `.
>
>
> What am I missing in my @hypbrid_property.expression declaration?
>
>
> How do I use my ancestors @hybrid_property.expression using
> session.query(...)?
>
> ancestors = session.query(P2B.ancestors).get(a_child_id)
>
>
> Example1:
> http://docs.sqlalchemy.org/en/latest/orm/extensions/hybrid.html#correlated-subquery-relationship-hybrid
>
--
SQLAlchemy -
The Python SQL Toolkit and Object Relational Mapper
http://www.sqlalchemy.org/
To post example code, please provide an MCVE: Minimal, Complete, and Verifiable
Example. See http://stackoverflow.com/help/mcve for a full description.
---
You received this message because you are subscribed to the Google Groups
"sqlalchemy" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To post to this group, send email to [email protected].
Visit this group at https://groups.google.com/group/sqlalchemy.
For more options, visit https://groups.google.com/d/optout.
from sqlalchemy import (
asc,
BigInteger,
Boolean,
Column,
DateTime,
desc,
event,
ForeignKey,
# ForeignKeyConstraint,
func,
Index,
inspect,
Integer,
literal,
select,
String,
Table,
Text,
util
)
from sqlalchemy.orm import (
relationship,
backref,
remote
)
from sqlalchemy.ext.orderinglist import ordering_list
from sqlalchemy.ext.hybrid import hybrid_property
from sqlalchemy.dialects.postgresql import JSONB
from sqlalchemy.ext.associationproxy import association_proxy
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import create_engine
from sqlalchemy.orm import scoped_session, sessionmaker
import zope.sqlalchemy
import transaction
import time
import logging
import pytz
import datetime
UTC = pytz.utc
DEFAULT_SERVER_TIME = datetime.datetime(2001, 1, 1, 0, 0, 0, tzinfo=UTC)
Base = declarative_base()
engine = None
# dbname='sqlite:///sqlalchemy.db'
DBNAME = 'postgresql+psycopg2:///sqlalch'
def current_time():
now = datetime.datetime.utcnow()
now = now.replace(microsecond=0, tzinfo=UTC)
return now
def drop_create(engine):
Base.metadata.drop_all(engine)
Base.metadata.create_all(engine)
def init_sqlalchemy_pyramid(dbname=DBNAME, drop=True):
logging.error("PYRAMID DBNAME %s", dbname)
engine = create_engine(dbname, echo=True, use_batch_mode=True)
factory = sessionmaker()
# Note: there is no scoped_session anywhere
# factory = scoped_session(factory)
# Note: expire on commit
# Note: autoflush
factory.configure(bind=engine, autoflush=False, expire_on_commit=False)
dbsession = factory()
zope.sqlalchemy.register(
dbsession,
transaction_manager=transaction.manager)
if drop:
drop_create(engine)
return dbsession
class P2B(Base):
__tablename__ = 'p2bases'
id = Column(Integer, primary_key=True)
uuid = Column(String)
classname = Column(String)
position = Column(Integer)
extras = Column(String)
val = Column(Integer)
time = Column(DateTime(
timezone=True), default=DEFAULT_SERVER_TIME)
parent_id = Column(
Integer,
ForeignKey('p2bases.id', ondelete='CASCADE'),
index=True
)
__mapper_args__ = {
'polymorphic_identity': 'P2Base',
'polymorphic_on': classname
}
@hybrid_property
def ancestors(self):
_ancestors = []
parent = self.parent # Using relationship
while parent is not None:
_ancestors.append(parent)
parent = parent.parent
return _ancestors
@hybrid_property
def myancestors(self):
_ancestors = []
parent = self.myparent # Using @hybrid_prop
while parent is not None:
_ancestors.append(parent)
parent = parent.parent
return _ancestors
@ancestors.expression
def ancestors(cls):
cte = select([P2B.id, P2B.parent_id]) \
.where(P2B.id == cls.id) \
.cte(name='cte', recursive=True)
cte = cte.union(
select([P2B.id, P2B.parent_id])
.where(P2B.id == cte.c.parent_id)
)
return cte
@myancestors.expression
def myancestors(cls):
cte = select([P2B.id, P2B.parent_id]) \
.where(P2B.id == cls.id) \
.cte(name='cte', recursive=True)
cte = cte.union(
select([P2B.id, P2B.parent_id])
.where(P2B.id == cte.c.parent_id)
)
return cte
# Note: potential CircularDependencyError
'''
File "/Users/henddher/.pyenv/versions/3.6.3/envs/p2server_py36/lib/python3.6/site-packages/sqlalchemy/orm/session.py", line 2344, in _flush
flush_context.execute()
File "/Users/henddher/.pyenv/versions/3.6.3/envs/p2server_py36/lib/python3.6/site-packages/sqlalchemy/orm/unitofwork.py", line 383, in execute
postsort_actions):
File "/Users/henddher/.pyenv/versions/3.6.3/envs/p2server_py36/lib/python3.6/site-packages/sqlalchemy/util/topological.py", line 36, in sort_as_subsets
_gen_edges(edges)
sqlalchemy.exc.CircularDependencyError: Circular dependency detected.
(ProcessState(OneToManyDP(P2Base.parent), <P2Page at 0x10da4fcc0>,
delete=False), SaveUpdateState(<P2Page at 0x10da4fcc0>),
SaveUpdateState(<P2Document at 0x10da79a58>),
ProcessState(ManyToOneDP(P2Page.document), <P2Page at 0x10da4fcc0>,
delete=False))
'''
parent = relationship(
'P2B',
primaryjoin='P2B.parent_id == P2B.id',
foreign_keys='P2B.id',
uselist=False
)
@hybrid_property
def myparent(self):
return inspect(self).session.query(P2B)\
.filter(
P2B.parent_id == self.parent_id,
P2B.id == self.id) \
.one_or_none()
class P2KVP(P2B):
__tablename__ = 'p2kvps'
id = Column(
Integer,
ForeignKey('p2bases.id', ondelete='CASCADE'),
primary_key=True
)
__mapper_args__ = {
'polymorphic_identity': 'P2KeyValuePair',
}
ge = relationship(
'P2GE',
primaryjoin='P2KVP.parent_id == P2GE.id',
foreign_keys='P2B.parent_id',
backref=backref(
'attributes',
collection_class=ordering_list('position'),
order_by='P2KVP.position',
# lazy="select"
)
)
annotation = relationship(
'P2An',
primaryjoin='P2KVP.parent_id == P2An.id',
foreign_keys='P2B.parent_id',
backref=backref(
'datakvps',
collection_class=ordering_list('position'),
order_by='P2KVP.position',
# lazy='select'
)
)
baseentity = relationship(
'P2BE',
primaryjoin='P2KVP.parent_id == P2BE.id',
foreign_keys='P2B.parent_id',
backref=backref(
'kvps',
collection_class=ordering_list('position'),
order_by="P2KVP.position"
)
)
class P2BE(P2B):
__tablename__ = 'p2baseentities'
id = Column(
Integer,
ForeignKey('p2bases.id', ondelete='CASCADE'),
primary_key=True
)
uti = Column(String)
__mapper_args__ = {
'polymorphic_identity': 'P2BaseEntity',
}
class P2Doc(P2BE):
__tablename__ = 'p2documents'
id = Column(
Integer,
ForeignKey('p2baseentities.id', ondelete='CASCADE'),
primary_key=True
)
__mapper_args__ = {
'polymorphic_identity': 'P2Document',
}
class P2Pg(P2BE):
__tablename__ = 'p2pages'
id = Column(
Integer,
ForeignKey('p2baseentities.id', ondelete='CASCADE'),
primary_key=True
)
number = Column(String)
document = relationship(
'P2Doc',
primaryjoin='P2Doc.id == P2Pg.parent_id',
foreign_keys='P2B.parent_id',
backref=backref(
'pages',
collection_class=ordering_list('position'),
order_by='P2B.position'
)
)
__mapper_args__ = {
'polymorphic_identity': 'P2Page',
}
class P2An(P2BE):
__tablename__ = 'p2annotations'
id = Column(
Integer,
ForeignKey('p2baseentities.id', ondelete='CASCADE'),
primary_key=True
)
__mapper_args__ = {
'polymorphic_identity': 'P2Annotation',
}
page = relationship(
'P2Pg',
primaryjoin='P2An.parent_id == P2Pg.id',
foreign_keys='P2B.parent_id',
backref=backref(
'annotations',
collection_class=ordering_list('position'),
order_by='P2An.position'
)
)
class P2GE(P2B):
__tablename__ = 'p2graphicalelements'
id = Column(
Integer,
ForeignKey('p2bases.id', ondelete='CASCADE'),
primary_key=True
)
__mapper_args__ = {
'polymorphic_identity': 'P2GraphicalElement',
}
annotation = relationship(
'P2An',
primaryjoin='P2GE.parent_id == P2An.id',
foreign_keys='P2B.parent_id',
backref=backref(
'graphicalelement',
uselist=False,
# lazy='select'
)
)
def test_ancestors():
dbsession = init_sqlalchemy_pyramid()
# create a tree rooted at P2Doc
with transaction.manager:
dbsession.add(
P2Doc(pages=[
P2Pg(annotations=[
P2An(datakvps=[
P2KVP() for k in range(10)],
graphicalelement=P2GE(attributes=[
P2KVP() for at in range(8)]))
for a in range(5)])
for p in range(3)]))
# select a random leaf
with transaction.manager:
child_id = dbsession.query(P2B.id).filter(
P2B.classname == 'P2KeyValuePair').first()[0]
print("CTE", "-" * 40)
with transaction.manager:
cte = dbsession.query(P2B.id, P2B.parent_id) \
.filter(P2B.id == child_id) \
.cte(name='cte', recursive=True)
cte = cte.union(
dbsession.query(P2B.id, P2B.parent_id).filter(
P2B.id == cte.c.parent_id))
cteids = dbsession.query(cte.c.id)
ancestors = dbsession.query(P2B).filter(P2B.id.in_(cteids)).all()[1:]
for a in ancestors:
print("CTE ANCESTORS", a)
assert 3 <= len(ancestors) <= 5, "Found %d" % (len(ancestors))
print("S-CTE", "-" * 40)
with transaction.manager:
scte = select([P2B.id, P2B.parent_id]) \
.where(P2B.id == child_id) \
.cte(name='scte', recursive=True)
scte = scte.union(
select([P2B.id, P2B.parent_id]).where(P2B.id == scte.c.parent_id))
s = select([scte.c.id])
ancestors = dbsession.execute(s).fetchall()
for a in ancestors:
print("S-CTE ANCESTORS", a)
assert 3 <= len(ancestors) <= 5, "Found %d" % (len(ancestors))
print("@hybrid_property ANCESTORS SELF", "-" * 40)
with transaction.manager:
k = dbsession.query(P2KVP).get(child_id)
ancestors = k.ancestors
for a in ancestors:
print("@hybrid_property ANCESTORS SELF", a)
assert 3 <= len(ancestors) <= 5, "Found %d" % (len(ancestors))
print("@hybrid_property MYANCESTORS SELF", "-" * 40)
with transaction.manager:
k = dbsession.query(P2KVP).get(child_id)
ancestors = k.myancestors
for a in ancestors:
print("@hybrid_property MYANCESTORS SELF", a)
assert 3 <= len(ancestors) <= 5, "Found %d" % (len(ancestors))
print("@hybrid_property.expression ANCESTORS CLS", "-" * 40)
with transaction.manager:
ancestors = dbsession.query(
P2B.ancestors).filter(P2B.id == child_id).all() # <<<< child_id IS NEVER USED IN EXPRESSION
for a in ancestors:
print("@hybrid_property.expression ANCESTORS CLS", a)
# assert 3 <= len(ancestors) <= 5, "Found %d" % (len(ancestors))
print("@hybrid_property.expression MYANCESTORS CLS", "-" * 40)
with transaction.manager:
ancestors = dbsession.query(
P2B.myancestors).filter(P2B.id == child_id).all() # <<<< child_id IS NEVER USED IN EXPRESSION
for a in ancestors:
print("@hybrid_property.expression MYANCESTORS CLS", a)
assert 3 <= len(ancestors) <= 5, "Found %d" % (len(ancestors))
if __name__ == '__main__':
test_ancestors()