Dear SQLAlchemy experts,

I'm having difficulty generating an SQL query with SQLAlchemy.

I'm trying to query a list of messages with their originator and 
recipients. Each message is sent by one host to one or more hosts. Hosts 
are all stored in one table (host). There are two types of messages stored 
in two different tables (dep, arr) which have their own association table 
for the many-to-many relationship (dst_dep, arr_dep).

I attached a working code sample to this email (just modify the 
DB_CONNECTION variable to your needs and create a DB if you want to run it).

I am running SQLAlchemy version 1.2.7 and PostgreSQL 10.4.

Here is the SQL request I would like to have:

SELECT
    host.address AS src,
    (SELECT string_agg(host.address, ', ' ORDER BY host.address)
    FROM dst_dep, *host*
    WHERE dep.id = dst_dep.msg AND host.id = dst_dep.host) AS dst,
    dep.msg AS msg
FROM host, dep
WHERE dep.src = host.id
UNION ALL
SELECT
    host.address AS src,
    (SELECT string_agg(host.address, ', ' ORDER BY host.address)
    FROM dst_arr, *host*
    WHERE arr.id = dst_arr.msg AND host.id = dst_arr.host) AS dst,
    arr.msg AS msg
FROM host, arr
WHERE arr.src = host.id;

This is the SQL query I am currently generating with SQLAlchemy:

SELECT
    host.address AS src,
    (SELECT string_agg(host.address, ', ' ORDER BY host.address) AS 
string_agg_1
    FROM dst_dep
    WHERE dep.id = dst_dep.msg AND host.id = dst_dep.host) AS dst,
    dep.msg AS msg
FROM host, dep
WHERE dep.src = host.id
UNION ALL
SELECT
    host.address AS src,
    (SELECT string_agg(host.address, ', ' ORDER BY host.address) AS 
string_agg_2
    FROM dst_arr
    WHERE arr.id = dst_arr.msg AND host.id = dst_arr.host) AS dst,
    arr.msg AS msg
FROM host, arr
WHERE arr.src = host.id;

This query generates the following SQL error:

ERROR:  column "host.address" must appear in the GROUP BY clause or be used 
in an aggregate function
LINE 2:     host.address AS src,

The only difference between the working SQL query and the non working one 
is the missing table "*host*" in the FROM clause of the "string_agg" SELECT.

Code leading to this query:

def get_dst_subq(t, t_dst):

    subq =  select([
        func.string_agg(Host.address, aggregate_order_by(literal_column("', 
'"), Host.address).label('dst'))
    ]).where(
        (t.id == t_dst.c.msg)
        & (Host.id == t_dst.c.host)
    ).as_scalar()

    return subq


def get_bs_subq(t, t_dst):

    subq = select([
        Host.address.label('src'),
        get_dst_subq(t, t_dst).label('dst'),
        t.msg.label('msg'),
    ]).where(t.src == Host.id)

    return subq

tables = ((DEP, dst_dep), (ARR, dst_arr))

q = union_all(*[get_bs_subq(t[0], t[1]) for t in tables]).alias('res')

How can I generate the proper, working SQL query with SQLAlchemy? Thank you.

Best regards,
Kevin

-- 
SQLAlchemy - 
The Python SQL Toolkit and Object Relational Mapper

http://www.sqlalchemy.org/

To post example code, please provide an MCVE: Minimal, Complete, and Verifiable 
Example.  See  http://stackoverflow.com/help/mcve for a full description.
--- 
You received this message because you are subscribed to the Google Groups 
"sqlalchemy" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to sqlalchemy+unsubscr...@googlegroups.com.
To post to this group, send email to sqlalchemy@googlegroups.com.
Visit this group at https://groups.google.com/group/sqlalchemy.
For more options, visit https://groups.google.com/d/optout.
from sqlalchemy import create_engine
from sqlalchemy.orm import scoped_session, sessionmaker
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, ForeignKey, Table
from sqlalchemy.ext.declarative import declared_attr
from sqlalchemy.orm import relationship, backref
from sqlalchemy import union_all, select, literal_column, func
from sqlalchemy.dialects import postgresql
from sqlalchemy.dialects.postgresql import aggregate_order_by


DB_CONNECTION = 'postgresql://usr:pwd@localhost:5432/issue'

engine = create_engine(DB_CONNECTION, echo=False)
db_session = scoped_session(sessionmaker(autocommit=False,
                                         autoflush=False,
                                         bind=engine))
Base = declarative_base()
Base.query = db_session.query_property()


############
# DB model #
############

class Table_Name:

    @declared_attr
    def __tablename__(cls):
        return cls.__name__.lower()


class Host(Table_Name, Base):

    id = Column(Integer, primary_key=True)
    address = Column(String)


class Message(Base):

    __abstract__ = True

    msg = Column(String)

    @declared_attr
    def __tablename__(cls):
        return cls.__name__.lower()

    @declared_attr
    def src(cls):
        return Column(Integer, ForeignKey('host.id'))


dst_dep = Table('dst_dep', Base.metadata,
    Column('host', Integer, ForeignKey('host.id')),
    Column('msg', Integer, ForeignKey('dep.id'))
)


class DEP(Message, Base):
    id = Column(Integer, primary_key=True)
    dsts = relationship('Host', secondary='dst_dep', backref='deps')


dst_arr = Table('dst_arr', Base.metadata,
    Column('host', Integer, ForeignKey('host.id')),
    Column('msg', Integer, ForeignKey('arr.id'))
)


class ARR(Message, Base):
    id = Column(Integer, primary_key=True)
    dsts = relationship('Host', secondary='dst_arr', backref='arrs')


#####################
# DB initialization #
#####################

def drop_tables():

    if engine.dialect.has_table(engine, 'dst_dep'):
        dst_dep.drop(engine)
    if engine.dialect.has_table(engine, 'dst_arr'):
        dst_arr.drop(engine)
    if engine.dialect.has_table(engine, 'dep'):
        DEP.__table__.drop(engine)
    if engine.dialect.has_table(engine, 'arr'):
        ARR.__table__.drop(engine)
    if engine.dialect.has_table(engine, 'host'):
        Host.__table__.drop(engine)
 

def init_db():
    drop_tables()
    Base.metadata.create_all(bind=engine)
    db_session.commit()


##################
# Adding records #
##################

def add_records():

    deps = [
        [1, 1, [2, 3], 'msg_dep_1'],
        [2, 2, [3], 'msg_dep_2'],
        [3, 4, [5], 'msg_dep_3'],
        [4, 7, [4, 8], 'msg_dep_4'],
        [5, 9, [1, 2, 3], 'msg_dep_5']
    ]
    
    arrs = [
        [1, 4, [2, 4, 5], 'msg_arr_1'],              
        [2, 2, [6], 'msg_arr_2'],
        [3, 4, [1, 7, 8], 'msg_arr_3'],
        [4, 6, [2, 3, 4, 5, 8], 'msg_arr_4'],
        [5, 8, [7, 9], 'msg_arr_5']
    ]
    
    for h in range(1, 10):
        host = Host(id=h, address='address_'+str(h))
        db_session.add(host)
    
    db_session.commit()

    for d in deps:
        dep = DEP(id=d[0], src= d[1], msg=d[3])
        for dst in d[2]:
            dep.dsts.append(db_session.query(Host).get(dst))
        db_session.add(dep)
    
    for a in arrs:
        arr = ARR(id=a[0], src= a[1], msg=a[3])
        for dst in a[2]:
            arr.dsts.append(db_session.query(Host).get(dst))
        db_session.add(arr)
    
    db_session.commit()


############
# DB query #
############

def get_dst_subq(t, t_dst):

    subq =  select([
        func.string_agg(Host.address, aggregate_order_by(literal_column("', '"), Host.address).label('dst'))
    ]).where(
        (t.id == t_dst.c.msg)
        & (Host.id == t_dst.c.host)
    ).as_scalar()

    #print(subq.compile(dialect=postgresql.dialect()))

    return subq


def get_bs_subq(t, t_dst):

    subq = select([
        Host.address.label('src'),
        get_dst_subq(t, t_dst).label('dst'),
        t.msg.label('msg'),
    ]).where(t.src == Host.id)

    return subq


if __name__ == "__main__":

    init_db()
    add_records()

    tables = ((DEP, dst_dep), (ARR, dst_arr))

    q = union_all(*[get_bs_subq(t[0], t[1]) for t in tables]).alias('res')
    
    print(q.compile(dialect=postgresql.dialect()))
    messages = db_session.query(q).all()
    print(messages)

Reply via email to