This turned out to be pleasantly painless. Posted here for the sake of "the next guy."
First, I removed all of the import transaction statements. Next, I changed the models.__init__: def get_tm_session(session_factory, transaction_manager): """ Get a ``sqlalchemy.orm.Session`` instance backed by a transaction. This function will hook the session to the transaction manager which will take care of committing any changes. - When using pyramid_tm it will automatically be committed or aborted depending on whether an exception is raised. - When using scripts you should wrap the session in a manager yourself. For example:: import transaction engine = get_engine(settings) session_factory = get_session_factory(engine) with transaction.manager: dbsession = get_tm_session(session_factory, transaction.manager) """ dbsession = session_factory() #zope.sqlalchemy.register( # dbsession, transaction_manager=transaction_manager) return dbsession Then the relevant code, so that the native SQLA session is back: if request.params.get("event_type", None) == "ssm_source": this_time = time.time() REG = request.registry b_interval = float(REG.settings.get('batch_interval', .25)) group, port = request.params.get("group", ":").split(":") source = request.params.get("source", None) ############################################## ## POSTGRES goes here, route and write for all ## subscribers in tile_sub ############################################## sess = request.dbsession ##upsert for tile updates subsq = sess.query(TileSub).filter(TileSub.groupip==group).filter(TileSub.portnum==port) updl = [] updstable = TileUpd.__table__ updtime = datetime.datetime.now() #sess.begin_nested() for tsub in subsq.all(): instmt = UPSERT(updstable).values( hostname=tsub.hostname, groupip=group, portnum=int(port), sourceip=source ).on_conflict_do_update( constraint = updstable.primary_key, set_ = dict(sourceip=source, timecreated=updtime, timesent = None) ) sess.execute(instmt) * sess.commit()* It now works like a charm. Many thanks for the prompt and thoughtful replies. On Wednesday, June 28, 2017 at 1:16:52 PM UTC-6, Richard Rosenberg wrote: > > > Hello: > > I've run into a problem with SQLA's implementation of postgresql's upsert. > The equivalent statement works fine when run as a straight up query (thru > pgadmin). > > The model(s) in question: > > class TileUpd(Base): > __tablename__ = 'tile_upd' > __table_args__ = TARGS > hostname = Column(PgD.TEXT, ForeignKey('edgenode.hostname'), > primary_key=True) > groupip = Column(PgD.INET, primary_key=True) > portnum = Column(PgD.INTEGER, primary_key=True) > sourceip = Column(PgD.INET, nullable=False) > timecreated = Column(PgD.TIMESTAMP, nullable=False, > server_default=text('current_timestamp')) > timesent = Column(PgD.TIMESTAMP, nullable=True) > > class TileSub(Base): > __tablename__ = 'tile_sub' > __table_args__ = TARGS > groupip = Column(PgD.INET, primary_key=True) > portnum = Column(PgD.INTEGER, primary_key=True) > sourceip = Column(PgD.INET, nullable=False) > hostname = Column(PgD.TEXT, ForeignKey('edgenode.hostname'), > nullable=False) > timecreated = Column(PgD.TIMESTAMP, nullable=False, > server_default=text('current_timestamp')) > > This is a pyramid app, so I (so far) am using the canned transaction > manager, though I prefer to control commits explicitly. > > > from sqlalchemy.dialects.postgresql import insert as UPSERT > > #scaler change loader/endpoint - no json, uses POST > @view_config(route_name='loader', renderer='json') > def loader_view(request: pyramid.request): > """Waits for scaled tile information from scaler cluster and loads > changes > into postgresql""" > logger.debug("loader view") > if request.params.get("event_type", None) == "ssm_source": > this_time = time.time() > REG = request.registry > b_interval = float(REG.settings.get('batch_interval', .25)) > group, port = request.params.get("group", ":").split(":") > source = request.params.get("source", None) > ############################################## > ## POSTGRES goes here, route and write for all > ## subscribers in tile_sub > ############################################## > sess = request.dbsession > ##upsert for tile updates > subsq = > sess.query(TileSub).filter(TileSub.groupip==group).filter(TileSub.portnum==port) > updl = [] > updstable = TileUpd.__table__ > updtime = datetime.datetime.now() > for tsub in subsq.all(): > instmt = UPSERT(updstable).values( > hostname=tsub.hostname, > groupip=group, > portnum=int(port), > sourceip=source > ).on_conflict_do_update( > constraint = updstable.primary_key, > set_ = dict(sourceip=source, timecreated=updtime) > ) > sess.execute(instmt) > transaction.commit() > > The statements appear to be well formed, but SQLA logs show a rollback for > every call to execute: > > 2017-06-28 12:55:12,136 INFO > [sqlalchemy.engine.base.Engine:679][MainThread] BEGIN (implicit) > 2017-06-28 12:55:12,137 INFO > [sqlalchemy.engine.base.Engine:1140][MainThread] SELECT tile_sub.groupip AS > tile_sub_groupip, tile_sub.portnum AS tile_sub_portnum, tile_sub.sourceip > AS tile_sub_sourceip, tile_sub.hostname AS tile_sub_hostname, > tile_sub.timecreated AS tile_sub_timecreated > FROM tile_sub > WHERE tile_sub.groupip = %(groupip_1)s AND tile_sub.portnum = %(portnum_1)s > 2017-06-28 12:55:12,137 INFO > [sqlalchemy.engine.base.Engine:1143][MainThread] {'groupip_1': > '239.129.237.126', 'portnum_1': '55002'} > 2017-06-28 12:55:12,139 INFO > [sqlalchemy.engine.base.Engine:1140][MainThread] INSERT INTO tile_upd > (hostname, groupip, portnum, sourceip) VALUES (%(hostname)s, %(groupip)s, > %(portnum)s, %(sourceip)s) ON CONFLICT ON CONSTRAINT pk_tile_upd DO UPDATE > SET sourceip = %(param_1)s, timecreated = %(param_2)s > 2017-06-28 12:55:12,139 INFO > [sqlalchemy.engine.base.Engine:1143][MainThread] {'hostname': > 'localhost:8080', 'groupip': '239.129.237.126', 'portnum': 55002, > 'sourceip': '172.28.190.54', 'param_1': '172.28.190.54', 'param_2': > datetime.datetime(2017, 6, 28, 12, 55, 12, 135541)} > 2017-06-28 12:55:12,140 INFO > [sqlalchemy.engine.base.Engine:699][MainThread] ROLLBACK > 0.0.0.0 - - [28/Jun/2017 12:55:12] "POST /loader HTTP/1.1" 200 17 > 2017-06-28 12:55:12,148 DEBUG [stempqm.views.qmviews:42][MainThread] > loader view > 2017-06-28 12:55:12,149 INFO > [sqlalchemy.engine.base.Engine:679][MainThread] BEGIN (implicit) > 2017-06-28 12:55:12,150 INFO > [sqlalchemy.engine.base.Engine:1140][MainThread] SELECT tile_sub.groupip AS > tile_sub_groupip, tile_sub.portnum AS tile_sub_portnum, tile_sub.sourceip > AS tile_sub_sourceip, tile_sub.hostname AS tile_sub_hostname, > tile_sub.timecreated AS tile_sub_timecreated > FROM tile_sub > WHERE tile_sub.groupip = %(groupip_1)s AND tile_sub.portnum = %(portnum_1)s > 2017-06-28 12:55:12,151 INFO > [sqlalchemy.engine.base.Engine:1143][MainThread] {'groupip_1': > '239.129.237.128', 'portnum_1': '55002'} > 2017-06-28 12:55:12,153 INFO > [sqlalchemy.engine.base.Engine:1140][MainThread] INSERT INTO tile_upd > (hostname, groupip, portnum, sourceip) VALUES (%(hostname)s, %(groupip)s, > %(portnum)s, %(sourceip)s) ON CONFLICT ON CONSTRAINT pk_tile_upd DO UPDATE > SET sourceip = %(param_1)s, timecreated = %(param_2)s > 2017-06-28 12:55:12,153 INFO > [sqlalchemy.engine.base.Engine:1143][MainThread] {'hostname': > 'localhost:8080', 'groupip': '239.129.237.128', 'portnum': 55002, > 'sourceip': '172.28.190.64', 'param_1': '172.28.190.64', 'param_2': > datetime.datetime(2017, 6, 28, 12, 55, 12, 149604)} > 2017-06-28 12:55:12,154 INFO > [sqlalchemy.engine.base.Engine:699][MainThread] ROLLBACK > > All of the log entries look OK, with the exception of the failure to > commit. However, the postgresql logs show the following: > > 2017-06-27 23:48:42.308 MDT [2913] richard@stemp STATEMENT: INSERT INTO > tile_upd (hostname, groupip, portnum, sourceip, timesent) VALUES > ('localhost:8080', '239.129.235.37', '55002', '172.28.190.42', NULL) > 2017-06-27 23:48:42.331 MDT [2914] richard@stemp ERROR: duplicate key > value violates unique constraint "pk_tile_upd" > 2017-06-27 23:48:42.331 MDT [2914] richard@stemp DETAIL: Key (hostname, > groupip, portnum)=(localhost:8080, 239.129.236.128, 55002) already exists. > 2017-06-27 23:48:42.331 MDT [2914] richard@stemp STATEMENT: INSERT INTO > tile_upd (hostname, groupip, portnum, sourceip, timesent) VALUES > ('localhost:8080', '239.129.236.128', '55002', '172.28.190.69', NULL) > 2017-06-27 23:48:42.346 MDT [2912] richard@stemp ERROR: duplicate key > value violates unique constraint "pk_tile_upd" > 2017-06-27 23:48:42.346 MDT [2912] richard@stemp DETAIL: Key (hostname, > groupip, portnum)=(localhost:8080, 239.129.236.201, 55002) already exists. > 2017-06-27 23:48:42.346 MDT [2912] richard@stemp STATEMENT: INSERT INTO > tile_upd (hostname, groupip, portnum, sourceip, timesent) VALUES > ('localhost:8080', '239.129.236.201', '55002', '172.28.190.75', NULL) > 2017-06-27 23:48:42.403 MDT [2912] richard@stemp ERROR: duplicate key > value violates unique constraint "pk_tile_upd" > 2017-06-27 23:48:42.403 MDT [2912] richard@stemp DETAIL: Key (hostname, > groupip, portnum)=(localhost:8080, 239.129.237.19, 55002) already exists. > 2017-06-27 23:48:42.403 MDT [2912] richard@stemp STATEMENT: INSERT INTO > tile_upd (hostname, groupip, portnum, sourceip, timesent) VALUES > ('localhost:8080', '239.129.237.19', '55002', '172.28.190.65', NULL) > 2017-06-27 23:48:42.468 MDT [2912] richard@stemp ERROR: duplicate key > value violates unique constraint "pk_tile_upd" > > > Additionally, I can run the equivalent statement from pgadmin just fine: > > INSERT INTO tile_upd > (hostname, groupip, portnum, sourceip) > VALUES ('localhost:8080', '239.129.135.40', 55002, '172.28.190.50') > ON CONFLICT ON CONSTRAINT pk_tile_upd DO UPDATE SET timecreated = > current_timestamp, timesent = NULL, sourceip = '172.28.190.50'; > > Query returned successfully: one row affected, 24 msec execution time. > > I am absolutely puzzled, but it seems likely that pyramid_tm is in the way > somehow. It always wants to do its own thing, and calling commit explicitly > is something it seems to abhor. My next step is to wrap this in: > > with transaction.manager as tx: > > But this is really not what I want. I'm tempted to rip out all of the > zopish stuff and go with SQLA's session, but before I try that, I thought > it might be worthwhile to get some further information. > > This is python3.6, pg 9.6, pyramid 1.9b, and pyramid_tm 2.1 running on > stock debian. > > Thanks for any insights. I'd hate to start over due to something like > this. . . > > Richard > -- SQLAlchemy - The Python SQL Toolkit and Object Relational Mapper http://www.sqlalchemy.org/ To post example code, please provide an MCVE: Minimal, Complete, and Verifiable Example. See http://stackoverflow.com/help/mcve for a full description. --- You received this message because you are subscribed to the Google Groups "sqlalchemy" group. To unsubscribe from this group and stop receiving emails from it, send an email to sqlalchemy+unsubscr...@googlegroups.com. To post to this group, send email to sqlalchemy@googlegroups.com. Visit this group at https://groups.google.com/group/sqlalchemy. For more options, visit https://groups.google.com/d/optout.