My goal is to produce a query that would incrementally refresh new data via 
a upsert (insert new data or replace if conflict with primary key) and a 
delete query to remove any primary keys that no longer exist. I am using 
Postgres 9.6 and Sqlalchemy 1.1. For a simple example in SQL I constructed:

CREATE TABLE foo (
  foo_id VARCHAR(2),
  foo_data INTEGER,
  PRIMARY KEY (foo_id));


INSERT INTO foo (foo_id, foo_data) VALUES
  ('A', 1),
  ('B', 2),
  ('C', 3),
  ('D', 4);


-- refreshed third-party table

CREATE TABLE refreshed_foo (like foo);

INSERT INTO refreshed_foo VALUES
  ('X', 23),
  ('Y', 4),
  ('A', 0);


-- CTE query for incremental refresh

WITH new_data AS (
  SELECT * from refreshed_foo
),
perform_inserts AS (
    INSERT INTO foo
      SELECT * from new_data
    ON CONFLICT (foo_id) DO UPDATE SET (foo_data) = (EXCLUDED.foo_data)
RETURNING 1),
perform_prune AS (
DELETE FROM foo
  WHERE foo.foo_id not in (select foo_id from new_data)
  RETURNING 1)

    SELECT
        (SELECT count(*) FROM perform_inserts) inserts,
        (SELECT count(*) FROM perform_prune) prunes;



In Python I have so far:

from sqlalchemy.dialects.postgresql import insert

from sqlalchemy import select, text, tuple_


def upsert_from_select(table, select_query):
    ins = insert(table).from_select(select_query.columns.keys(), select_query)
    non_key_columns = set(
        c.name for c in table.columns.values() if c.primary_key is False
    )
    exclude = {k: v for k, v in ins.excluded.items() if k in non_key_columns}

    query = ins.on_conflict_do_update(
        index_elements=table.primary_key.columns.keys(),
        set_=exclude
    )

    return query


def incremental_refresh(table, select_query):
    cte = select_query.cte('new_data_cte')
    select_cte = select([cte])

    primary_keys = table.primary_key.columns.keys()
    id_columns = [cte.columns[key] for key in primary_keys]

    upsert_query = upsert_from_select(
        table, select_cte
    ).returning(text('1')).cte('perform_upserts')

    delete_query = table.delete(
      tuple_(*table.primary_key.columns).in_(select(id_columns))
    ).returning(text('1')).cte('perform_prune')


    # Combine cte queies?



There are a couple things to point out. I was having some trouble figuring 
out how best to issue a Returning 1 in Sqlalchemy. I don't believe using 
text is the best approach?

Additionally, compiling q from

q = upsert_from_select(table, select_cte).returning(text('1'))


looks correct as does compiling:

q = 
table.delete(tuple_(*table.primary_key.columns).in_(select(id_columns))).returning(text('1')).cte('perform_prune')


Given a table such as:

table = Table('foo', MetaData(), Column('foo_id', VARCHAR(2), 
primary_key=True), Column('foo_data', INTEGER))


I am having trouble figuring out then how to combine the performs_upserts 
and performs_prune CTE's to run as a single statement as in the SQL code. I 
am open to any alterations if there is a better way to produce the same 
result. Thanks for the help. I appreciate it.

-- 
SQLAlchemy - 
The Python SQL Toolkit and Object Relational Mapper

http://www.sqlalchemy.org/

To post example code, please provide an MCVE: Minimal, Complete, and Verifiable 
Example.  See  http://stackoverflow.com/help/mcve for a full description.
--- 
You received this message because you are subscribed to the Google Groups 
"sqlalchemy" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to sqlalchemy+unsubscr...@googlegroups.com.
To post to this group, send email to sqlalchemy@googlegroups.com.
Visit this group at https://groups.google.com/group/sqlalchemy.
For more options, visit https://groups.google.com/d/optout.

Reply via email to