My goal is to produce a query that would incrementally refresh new data via a upsert (insert new data or replace if conflict with primary key) and a delete query to remove any primary keys that no longer exist. I am using Postgres 9.6 and Sqlalchemy 1.1. For a simple example in SQL I constructed:
CREATE TABLE foo ( foo_id VARCHAR(2), foo_data INTEGER, PRIMARY KEY (foo_id)); INSERT INTO foo (foo_id, foo_data) VALUES ('A', 1), ('B', 2), ('C', 3), ('D', 4); -- refreshed third-party table CREATE TABLE refreshed_foo (like foo); INSERT INTO refreshed_foo VALUES ('X', 23), ('Y', 4), ('A', 0); -- CTE query for incremental refresh WITH new_data AS ( SELECT * from refreshed_foo ), perform_inserts AS ( INSERT INTO foo SELECT * from new_data ON CONFLICT (foo_id) DO UPDATE SET (foo_data) = (EXCLUDED.foo_data) RETURNING 1), perform_prune AS ( DELETE FROM foo WHERE foo.foo_id not in (select foo_id from new_data) RETURNING 1) SELECT (SELECT count(*) FROM perform_inserts) inserts, (SELECT count(*) FROM perform_prune) prunes; In Python I have so far: from sqlalchemy.dialects.postgresql import insert from sqlalchemy import select, text, tuple_ def upsert_from_select(table, select_query): ins = insert(table).from_select(select_query.columns.keys(), select_query) non_key_columns = set( c.name for c in table.columns.values() if c.primary_key is False ) exclude = {k: v for k, v in ins.excluded.items() if k in non_key_columns} query = ins.on_conflict_do_update( index_elements=table.primary_key.columns.keys(), set_=exclude ) return query def incremental_refresh(table, select_query): cte = select_query.cte('new_data_cte') select_cte = select([cte]) primary_keys = table.primary_key.columns.keys() id_columns = [cte.columns[key] for key in primary_keys] upsert_query = upsert_from_select( table, select_cte ).returning(text('1')).cte('perform_upserts') delete_query = table.delete( tuple_(*table.primary_key.columns).in_(select(id_columns)) ).returning(text('1')).cte('perform_prune') # Combine cte queies? There are a couple things to point out. I was having some trouble figuring out how best to issue a Returning 1 in Sqlalchemy. I don't believe using text is the best approach? Additionally, compiling q from q = upsert_from_select(table, select_cte).returning(text('1')) looks correct as does compiling: q = table.delete(tuple_(*table.primary_key.columns).in_(select(id_columns))).returning(text('1')).cte('perform_prune') Given a table such as: table = Table('foo', MetaData(), Column('foo_id', VARCHAR(2), primary_key=True), Column('foo_data', INTEGER)) I am having trouble figuring out then how to combine the performs_upserts and performs_prune CTE's to run as a single statement as in the SQL code. I am open to any alterations if there is a better way to produce the same result. Thanks for the help. I appreciate it. -- SQLAlchemy - The Python SQL Toolkit and Object Relational Mapper http://www.sqlalchemy.org/ To post example code, please provide an MCVE: Minimal, Complete, and Verifiable Example. See http://stackoverflow.com/help/mcve for a full description. --- You received this message because you are subscribed to the Google Groups "sqlalchemy" group. To unsubscribe from this group and stop receiving emails from it, send an email to sqlalchemy+unsubscr...@googlegroups.com. To post to this group, send email to sqlalchemy@googlegroups.com. Visit this group at https://groups.google.com/group/sqlalchemy. For more options, visit https://groups.google.com/d/optout.