Dear SQLAlchemy experts, I'm having difficulty generating an SQL query with SQLAlchemy.
I'm trying to query a list of messages with their originator and recipients. Each message is sent by one host to one or more hosts. Hosts are all stored in one table (host). There are two types of messages stored in two different tables (dep, arr) which have their own association table for the many-to-many relationship (dst_dep, arr_dep). I attached a working code sample to this email (just modify the DB_CONNECTION variable to your needs and create a DB if you want to run it). I am running SQLAlchemy version 1.2.7 and PostgreSQL 10.4. Here is the SQL request I would like to have: SELECT host.address AS src, (SELECT string_agg(host.address, ', ' ORDER BY host.address) FROM dst_dep, *host* WHERE dep.id = dst_dep.msg AND host.id = dst_dep.host) AS dst, dep.msg AS msg FROM host, dep WHERE dep.src = host.id UNION ALL SELECT host.address AS src, (SELECT string_agg(host.address, ', ' ORDER BY host.address) FROM dst_arr, *host* WHERE arr.id = dst_arr.msg AND host.id = dst_arr.host) AS dst, arr.msg AS msg FROM host, arr WHERE arr.src = host.id; This is the SQL query I am currently generating with SQLAlchemy: SELECT host.address AS src, (SELECT string_agg(host.address, ', ' ORDER BY host.address) AS string_agg_1 FROM dst_dep WHERE dep.id = dst_dep.msg AND host.id = dst_dep.host) AS dst, dep.msg AS msg FROM host, dep WHERE dep.src = host.id UNION ALL SELECT host.address AS src, (SELECT string_agg(host.address, ', ' ORDER BY host.address) AS string_agg_2 FROM dst_arr WHERE arr.id = dst_arr.msg AND host.id = dst_arr.host) AS dst, arr.msg AS msg FROM host, arr WHERE arr.src = host.id; This query generates the following SQL error: ERROR: column "host.address" must appear in the GROUP BY clause or be used in an aggregate function LINE 2: host.address AS src, The only difference between the working SQL query and the non working one is the missing table "*host*" in the FROM clause of the "string_agg" SELECT. Code leading to this query: def get_dst_subq(t, t_dst): subq = select([ func.string_agg(Host.address, aggregate_order_by(literal_column("', '"), Host.address).label('dst')) ]).where( (t.id == t_dst.c.msg) & (Host.id == t_dst.c.host) ).as_scalar() return subq def get_bs_subq(t, t_dst): subq = select([ Host.address.label('src'), get_dst_subq(t, t_dst).label('dst'), t.msg.label('msg'), ]).where(t.src == Host.id) return subq tables = ((DEP, dst_dep), (ARR, dst_arr)) q = union_all(*[get_bs_subq(t[0], t[1]) for t in tables]).alias('res') How can I generate the proper, working SQL query with SQLAlchemy? Thank you. Best regards, Kevin -- SQLAlchemy - The Python SQL Toolkit and Object Relational Mapper http://www.sqlalchemy.org/ To post example code, please provide an MCVE: Minimal, Complete, and Verifiable Example. See http://stackoverflow.com/help/mcve for a full description. --- You received this message because you are subscribed to the Google Groups "sqlalchemy" group. To unsubscribe from this group and stop receiving emails from it, send an email to sqlalchemy+unsubscr...@googlegroups.com. To post to this group, send email to sqlalchemy@googlegroups.com. Visit this group at https://groups.google.com/group/sqlalchemy. For more options, visit https://groups.google.com/d/optout.
from sqlalchemy import create_engine from sqlalchemy.orm import scoped_session, sessionmaker from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Column, Integer, String, ForeignKey, Table from sqlalchemy.ext.declarative import declared_attr from sqlalchemy.orm import relationship, backref from sqlalchemy import union_all, select, literal_column, func from sqlalchemy.dialects import postgresql from sqlalchemy.dialects.postgresql import aggregate_order_by DB_CONNECTION = 'postgresql://usr:pwd@localhost:5432/issue' engine = create_engine(DB_CONNECTION, echo=False) db_session = scoped_session(sessionmaker(autocommit=False, autoflush=False, bind=engine)) Base = declarative_base() Base.query = db_session.query_property() ############ # DB model # ############ class Table_Name: @declared_attr def __tablename__(cls): return cls.__name__.lower() class Host(Table_Name, Base): id = Column(Integer, primary_key=True) address = Column(String) class Message(Base): __abstract__ = True msg = Column(String) @declared_attr def __tablename__(cls): return cls.__name__.lower() @declared_attr def src(cls): return Column(Integer, ForeignKey('host.id')) dst_dep = Table('dst_dep', Base.metadata, Column('host', Integer, ForeignKey('host.id')), Column('msg', Integer, ForeignKey('dep.id')) ) class DEP(Message, Base): id = Column(Integer, primary_key=True) dsts = relationship('Host', secondary='dst_dep', backref='deps') dst_arr = Table('dst_arr', Base.metadata, Column('host', Integer, ForeignKey('host.id')), Column('msg', Integer, ForeignKey('arr.id')) ) class ARR(Message, Base): id = Column(Integer, primary_key=True) dsts = relationship('Host', secondary='dst_arr', backref='arrs') ##################### # DB initialization # ##################### def drop_tables(): if engine.dialect.has_table(engine, 'dst_dep'): dst_dep.drop(engine) if engine.dialect.has_table(engine, 'dst_arr'): dst_arr.drop(engine) if engine.dialect.has_table(engine, 'dep'): DEP.__table__.drop(engine) if engine.dialect.has_table(engine, 'arr'): ARR.__table__.drop(engine) if engine.dialect.has_table(engine, 'host'): Host.__table__.drop(engine) def init_db(): drop_tables() Base.metadata.create_all(bind=engine) db_session.commit() ################## # Adding records # ################## def add_records(): deps = [ [1, 1, [2, 3], 'msg_dep_1'], [2, 2, [3], 'msg_dep_2'], [3, 4, [5], 'msg_dep_3'], [4, 7, [4, 8], 'msg_dep_4'], [5, 9, [1, 2, 3], 'msg_dep_5'] ] arrs = [ [1, 4, [2, 4, 5], 'msg_arr_1'], [2, 2, [6], 'msg_arr_2'], [3, 4, [1, 7, 8], 'msg_arr_3'], [4, 6, [2, 3, 4, 5, 8], 'msg_arr_4'], [5, 8, [7, 9], 'msg_arr_5'] ] for h in range(1, 10): host = Host(id=h, address='address_'+str(h)) db_session.add(host) db_session.commit() for d in deps: dep = DEP(id=d[0], src= d[1], msg=d[3]) for dst in d[2]: dep.dsts.append(db_session.query(Host).get(dst)) db_session.add(dep) for a in arrs: arr = ARR(id=a[0], src= a[1], msg=a[3]) for dst in a[2]: arr.dsts.append(db_session.query(Host).get(dst)) db_session.add(arr) db_session.commit() ############ # DB query # ############ def get_dst_subq(t, t_dst): subq = select([ func.string_agg(Host.address, aggregate_order_by(literal_column("', '"), Host.address).label('dst')) ]).where( (t.id == t_dst.c.msg) & (Host.id == t_dst.c.host) ).as_scalar() #print(subq.compile(dialect=postgresql.dialect())) return subq def get_bs_subq(t, t_dst): subq = select([ Host.address.label('src'), get_dst_subq(t, t_dst).label('dst'), t.msg.label('msg'), ]).where(t.src == Host.id) return subq if __name__ == "__main__": init_db() add_records() tables = ((DEP, dst_dep), (ARR, dst_arr)) q = union_all(*[get_bs_subq(t[0], t[1]) for t in tables]).alias('res') print(q.compile(dialect=postgresql.dialect())) messages = db_session.query(q).all() print(messages)