On Tue, Sep 12, 2017 at 11:42 AM, Philip Martin <philip.martin2...@gmail.com> wrote: > My goal is to produce a query that would incrementally refresh new data via > a upsert (insert new data or replace if conflict with primary key) and a > delete query to remove any primary keys that no longer exist. I am using > Postgres 9.6 and Sqlalchemy 1.1. For a simple example in SQL I constructed:
you will need the patch at: https://bitbucket.org/zzzeek/sqlalchemy/issues/4074/on_conflict_do_update-assumes-the-compiler then here is a demonstration of your statement: from sqlalchemy import table, column, select, literal_column, func from sqlalchemy.dialects.postgresql import insert from sqlalchemy.dialects import postgresql foo = table("foo", column("foo_id"), column("foo_data")) refreshed_foo = table("refreshed_foo", column("foo_id"), column("foo_data")) new_data = select([refreshed_foo]).cte("new_data") new_data_select = select([new_data]) ins = insert(foo).from_select(new_data_select.columns, new_data_select) upsert = ins.on_conflict_do_update( index_elements=[foo.c.foo_id], set_={"foo_data": ins.excluded.foo_data} ).returning(literal_column('1')).cte("perform_inserts") updel = foo.delete().where( ~foo.c.foo_id.in_(select([new_data.c.foo_id])) ).returning(literal_column('1')).cte("perform_prune") new_data = select([refreshed_foo]).cte("new_data") stmt = select([ select([func.count(literal_column('*'))]).select_from(upsert).label("inserts"), select([func.count(literal_column('*'))]).select_from(updel).label("prunes"), ]) print stmt.compile(dialect=postgresql.dialect()) output: WITH new_data AS (SELECT refreshed_foo.foo_id AS foo_id, refreshed_foo.foo_data AS foo_data FROM refreshed_foo), perform_inserts AS (INSERT INTO foo (foo_id, foo_data) SELECT new_data.foo_id, new_data.foo_data FROM new_data ON CONFLICT (foo_id) DO UPDATE SET foo_data = excluded.foo_data RETURNING 1), perform_prune AS (DELETE FROM foo WHERE foo.foo_id NOT IN (SELECT new_data.foo_id FROM new_data) RETURNING 1) SELECT (SELECT count(*) AS count_1 FROM perform_inserts) AS inserts, (SELECT count(*) AS count_2 FROM perform_prune) AS prunes > > CREATE TABLE foo ( > foo_id VARCHAR(2), > foo_data INTEGER, > PRIMARY KEY (foo_id)); > > > INSERT INTO foo (foo_id, foo_data) VALUES > ('A', 1), > ('B', 2), > ('C', 3), > ('D', 4); > > > -- refreshed third-party table > > CREATE TABLE refreshed_foo (like foo); > > INSERT INTO refreshed_foo VALUES > ('X', 23), > ('Y', 4), > ('A', 0); > > > -- CTE query for incremental refresh > > WITH new_data AS ( > SELECT * from refreshed_foo > ), > perform_inserts AS ( > INSERT INTO foo > SELECT * from new_data > ON CONFLICT (foo_id) DO UPDATE SET (foo_data) = (EXCLUDED.foo_data) > RETURNING 1), > perform_prune AS ( > DELETE FROM foo > WHERE foo.foo_id not in (select foo_id from new_data) > RETURNING 1) > > SELECT > (SELECT count(*) FROM perform_inserts) inserts, > (SELECT count(*) FROM perform_prune) prunes; > > > > In Python I have so far: > > from sqlalchemy.dialects.postgresql import insert > > from sqlalchemy import select, text, tuple_ > > > def upsert_from_select(table, select_query): > ins = insert(table).from_select(select_query.columns.keys(), > select_query) > non_key_columns = set( > c.name for c in table.columns.values() if c.primary_key is False > ) > exclude = {k: v for k, v in ins.excluded.items() if k in > non_key_columns} > > query = ins.on_conflict_do_update( > index_elements=table.primary_key.columns.keys(), > set_=exclude > ) > > return query > > > def incremental_refresh(table, select_query): > cte = select_query.cte('new_data_cte') > select_cte = select([cte]) > > primary_keys = table.primary_key.columns.keys() > id_columns = [cte.columns[key] for key in primary_keys] > > upsert_query = upsert_from_select( > table, select_cte > ).returning(text('1')).cte('perform_upserts') > > delete_query = table.delete( > tuple_(*table.primary_key.columns).in_(select(id_columns)) > ).returning(text('1')).cte('perform_prune') > > > # Combine cte queies? > > > > There are a couple things to point out. I was having some trouble figuring > out how best to issue a Returning 1 in Sqlalchemy. I don't believe using > text is the best approach? > > Additionally, compiling q from > > q = upsert_from_select(table, select_cte).returning(text('1')) > > > looks correct as does compiling: > > q = > table.delete(tuple_(*table.primary_key.columns).in_(select(id_columns))).returning(text('1')).cte('perform_prune') > > > Given a table such as: > > table = Table('foo', MetaData(), Column('foo_id', VARCHAR(2), > primary_key=True), Column('foo_data', INTEGER)) > > > I am having trouble figuring out then how to combine the performs_upserts > and performs_prune CTE's to run as a single statement as in the SQL code. I > am open to any alterations if there is a better way to produce the same > result. Thanks for the help. I appreciate it. > > -- > SQLAlchemy - > The Python SQL Toolkit and Object Relational Mapper > > http://www.sqlalchemy.org/ > > To post example code, please provide an MCVE: Minimal, Complete, and > Verifiable Example. See http://stackoverflow.com/help/mcve for a full > description. > --- > You received this message because you are subscribed to the Google Groups > "sqlalchemy" group. > To unsubscribe from this group and stop receiving emails from it, send an > email to sqlalchemy+unsubscr...@googlegroups.com. > To post to this group, send email to sqlalchemy@googlegroups.com. > Visit this group at https://groups.google.com/group/sqlalchemy. > For more options, visit https://groups.google.com/d/optout. -- SQLAlchemy - The Python SQL Toolkit and Object Relational Mapper http://www.sqlalchemy.org/ To post example code, please provide an MCVE: Minimal, Complete, and Verifiable Example. See http://stackoverflow.com/help/mcve for a full description. --- You received this message because you are subscribed to the Google Groups "sqlalchemy" group. To unsubscribe from this group and stop receiving emails from it, send an email to sqlalchemy+unsubscr...@googlegroups.com. To post to this group, send email to sqlalchemy@googlegroups.com. Visit this group at https://groups.google.com/group/sqlalchemy. For more options, visit https://groups.google.com/d/optout.