On Tue, Sep 12, 2017 at 11:42 AM, Philip Martin
<philip.martin2...@gmail.com> wrote:
> My goal is to produce a query that would incrementally refresh new data via
> a upsert (insert new data or replace if conflict with primary key) and a
> delete query to remove any primary keys that no longer exist. I am using
> Postgres 9.6 and Sqlalchemy 1.1. For a simple example in SQL I constructed:

you will need the patch at:
https://bitbucket.org/zzzeek/sqlalchemy/issues/4074/on_conflict_do_update-assumes-the-compiler

then here is a demonstration of your statement:

from sqlalchemy import table, column, select, literal_column, func
from sqlalchemy.dialects.postgresql import insert
from sqlalchemy.dialects import postgresql

foo = table("foo", column("foo_id"), column("foo_data"))
refreshed_foo = table("refreshed_foo", column("foo_id"), column("foo_data"))

new_data = select([refreshed_foo]).cte("new_data")

new_data_select = select([new_data])
ins = insert(foo).from_select(new_data_select.columns, new_data_select)

upsert = ins.on_conflict_do_update(
    index_elements=[foo.c.foo_id],
    set_={"foo_data": ins.excluded.foo_data}
).returning(literal_column('1')).cte("perform_inserts")

updel = foo.delete().where(
    ~foo.c.foo_id.in_(select([new_data.c.foo_id]))
).returning(literal_column('1')).cte("perform_prune")

new_data = select([refreshed_foo]).cte("new_data")

stmt = select([
    
select([func.count(literal_column('*'))]).select_from(upsert).label("inserts"),
    
select([func.count(literal_column('*'))]).select_from(updel).label("prunes"),

])

print stmt.compile(dialect=postgresql.dialect())



output:

WITH new_data AS
(SELECT refreshed_foo.foo_id AS foo_id, refreshed_foo.foo_data AS foo_data
FROM refreshed_foo),
perform_inserts AS
(INSERT INTO foo (foo_id, foo_data) SELECT new_data.foo_id, new_data.foo_data
FROM new_data ON CONFLICT (foo_id) DO UPDATE SET foo_data =
excluded.foo_data RETURNING 1),
perform_prune AS
(DELETE FROM foo WHERE foo.foo_id NOT IN (SELECT new_data.foo_id
FROM new_data) RETURNING 1)
 SELECT (SELECT count(*) AS count_1
FROM perform_inserts) AS inserts, (SELECT count(*) AS count_2
FROM perform_prune) AS prunes






>
> CREATE TABLE foo (
>   foo_id VARCHAR(2),
>   foo_data INTEGER,
>   PRIMARY KEY (foo_id));
>
>
> INSERT INTO foo (foo_id, foo_data) VALUES
>   ('A', 1),
>   ('B', 2),
>   ('C', 3),
>   ('D', 4);
>
>
> -- refreshed third-party table
>
> CREATE TABLE refreshed_foo (like foo);
>
> INSERT INTO refreshed_foo VALUES
>   ('X', 23),
>   ('Y', 4),
>   ('A', 0);
>
>
> -- CTE query for incremental refresh
>
> WITH new_data AS (
>   SELECT * from refreshed_foo
> ),
> perform_inserts AS (
>     INSERT INTO foo
>       SELECT * from new_data
>     ON CONFLICT (foo_id) DO UPDATE SET (foo_data) = (EXCLUDED.foo_data)
> RETURNING 1),
> perform_prune AS (
> DELETE FROM foo
>   WHERE foo.foo_id not in (select foo_id from new_data)
>   RETURNING 1)
>
>     SELECT
>         (SELECT count(*) FROM perform_inserts) inserts,
>         (SELECT count(*) FROM perform_prune) prunes;
>
>
>
> In Python I have so far:
>
> from sqlalchemy.dialects.postgresql import insert
>
> from sqlalchemy import select, text, tuple_
>
>
> def upsert_from_select(table, select_query):
>     ins = insert(table).from_select(select_query.columns.keys(),
> select_query)
>     non_key_columns = set(
>         c.name for c in table.columns.values() if c.primary_key is False
>     )
>     exclude = {k: v for k, v in ins.excluded.items() if k in
> non_key_columns}
>
>     query = ins.on_conflict_do_update(
>         index_elements=table.primary_key.columns.keys(),
>         set_=exclude
>     )
>
>     return query
>
>
> def incremental_refresh(table, select_query):
>     cte = select_query.cte('new_data_cte')
>     select_cte = select([cte])
>
>     primary_keys = table.primary_key.columns.keys()
>     id_columns = [cte.columns[key] for key in primary_keys]
>
>     upsert_query = upsert_from_select(
>         table, select_cte
>     ).returning(text('1')).cte('perform_upserts')
>
>     delete_query = table.delete(
>       tuple_(*table.primary_key.columns).in_(select(id_columns))
>     ).returning(text('1')).cte('perform_prune')
>
>
>     # Combine cte queies?
>
>
>
> There are a couple things to point out. I was having some trouble figuring
> out how best to issue a Returning 1 in Sqlalchemy. I don't believe using
> text is the best approach?
>
> Additionally, compiling q from
>
> q = upsert_from_select(table, select_cte).returning(text('1'))
>
>
> looks correct as does compiling:
>
> q =
> table.delete(tuple_(*table.primary_key.columns).in_(select(id_columns))).returning(text('1')).cte('perform_prune')
>
>
> Given a table such as:
>
> table = Table('foo', MetaData(), Column('foo_id', VARCHAR(2),
> primary_key=True), Column('foo_data', INTEGER))
>
>
> I am having trouble figuring out then how to combine the performs_upserts
> and performs_prune CTE's to run as a single statement as in the SQL code. I
> am open to any alterations if there is a better way to produce the same
> result. Thanks for the help. I appreciate it.
>
> --
> SQLAlchemy -
> The Python SQL Toolkit and Object Relational Mapper
>
> http://www.sqlalchemy.org/
>
> To post example code, please provide an MCVE: Minimal, Complete, and
> Verifiable Example. See http://stackoverflow.com/help/mcve for a full
> description.
> ---
> You received this message because you are subscribed to the Google Groups
> "sqlalchemy" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to sqlalchemy+unsubscr...@googlegroups.com.
> To post to this group, send email to sqlalchemy@googlegroups.com.
> Visit this group at https://groups.google.com/group/sqlalchemy.
> For more options, visit https://groups.google.com/d/optout.

-- 
SQLAlchemy - 
The Python SQL Toolkit and Object Relational Mapper

http://www.sqlalchemy.org/

To post example code, please provide an MCVE: Minimal, Complete, and Verifiable 
Example.  See  http://stackoverflow.com/help/mcve for a full description.
--- 
You received this message because you are subscribed to the Google Groups 
"sqlalchemy" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to sqlalchemy+unsubscr...@googlegroups.com.
To post to this group, send email to sqlalchemy@googlegroups.com.
Visit this group at https://groups.google.com/group/sqlalchemy.
For more options, visit https://groups.google.com/d/optout.

Reply via email to