This turned out to be pleasantly painless. Posted here for the sake of "the 
next guy."

First, I removed all of the import transaction statements.

Next, I changed the models.__init__:

def get_tm_session(session_factory, transaction_manager):
    """
    Get a ``sqlalchemy.orm.Session`` instance backed by a transaction.

    This function will hook the session to the transaction manager which
    will take care of committing any changes.

    - When using pyramid_tm it will automatically be committed or aborted
      depending on whether an exception is raised.

    - When using scripts you should wrap the session in a manager yourself.
      For example::

          import transaction

          engine = get_engine(settings)
          session_factory = get_session_factory(engine)
          with transaction.manager:
              dbsession = get_tm_session(session_factory, 
transaction.manager)

    """
    dbsession = session_factory()
    #zope.sqlalchemy.register(
    #    dbsession, transaction_manager=transaction_manager)
    return dbsession

Then the relevant code, so that the native SQLA session is back:

if request.params.get("event_type", None) == "ssm_source":
        this_time = time.time()
        REG = request.registry
        b_interval = float(REG.settings.get('batch_interval', .25))
        group, port = request.params.get("group", ":").split(":")
        source = request.params.get("source", None)
        ##############################################
        ## POSTGRES goes here, route and write for all
        ## subscribers in tile_sub
        ##############################################
        sess = request.dbsession
        ##upsert for tile updates
        subsq = 
sess.query(TileSub).filter(TileSub.groupip==group).filter(TileSub.portnum==port)
        updl = []
        updstable = TileUpd.__table__
        updtime = datetime.datetime.now()
        #sess.begin_nested()
        for tsub in subsq.all():
            instmt = UPSERT(updstable).values(
                hostname=tsub.hostname, 
                groupip=group, 
                portnum=int(port), 
                sourceip=source 
                ).on_conflict_do_update(
                constraint = updstable.primary_key,
                set_ = dict(sourceip=source, timecreated=updtime, timesent 
= None)
            )
            sess.execute(instmt)
       * sess.commit()*


It now works like a charm. 

Many thanks for the prompt and thoughtful replies. 

On Wednesday, June 28, 2017 at 1:16:52 PM UTC-6, Richard Rosenberg wrote:
>
>
> Hello:
>
> I've run into a problem with SQLA's implementation of postgresql's upsert. 
> The equivalent statement works fine when run as a straight up query (thru 
> pgadmin).
>
> The model(s) in question:
>
> class TileUpd(Base):
>     __tablename__ = 'tile_upd'
>     __table_args__ = TARGS
>     hostname = Column(PgD.TEXT, ForeignKey('edgenode.hostname'), 
> primary_key=True)
>     groupip = Column(PgD.INET, primary_key=True)
>     portnum = Column(PgD.INTEGER, primary_key=True) 
>     sourceip = Column(PgD.INET, nullable=False)
>     timecreated = Column(PgD.TIMESTAMP, nullable=False, 
> server_default=text('current_timestamp'))
>     timesent = Column(PgD.TIMESTAMP, nullable=True)
>
> class TileSub(Base):
>     __tablename__ = 'tile_sub'
>     __table_args__ = TARGS
>     groupip = Column(PgD.INET, primary_key=True)
>     portnum = Column(PgD.INTEGER, primary_key=True)    
>     sourceip = Column(PgD.INET, nullable=False)
>     hostname = Column(PgD.TEXT, ForeignKey('edgenode.hostname'), 
> nullable=False)
>     timecreated = Column(PgD.TIMESTAMP, nullable=False, 
> server_default=text('current_timestamp'))
>
> This is a pyramid app, so I (so far) am using the canned transaction 
> manager, though I prefer to control commits explicitly.
>
>
> from sqlalchemy.dialects.postgresql import insert as UPSERT
>
> #scaler change loader/endpoint - no json, uses POST    
> @view_config(route_name='loader', renderer='json')
> def loader_view(request: pyramid.request):
>     """Waits for scaled tile information from scaler cluster and loads 
> changes
>     into postgresql"""
>     logger.debug("loader view")
>     if request.params.get("event_type", None) == "ssm_source":
>         this_time = time.time()
>         REG = request.registry
>         b_interval = float(REG.settings.get('batch_interval', .25))
>         group, port = request.params.get("group", ":").split(":")
>         source = request.params.get("source", None)
>         ##############################################
>         ## POSTGRES goes here, route and write for all
>         ## subscribers in tile_sub
>         ##############################################
>         sess = request.dbsession
>         ##upsert for tile updates
>         subsq = 
> sess.query(TileSub).filter(TileSub.groupip==group).filter(TileSub.portnum==port)
>         updl = []
>         updstable = TileUpd.__table__
>         updtime = datetime.datetime.now()
>         for tsub in subsq.all():
>             instmt = UPSERT(updstable).values(
>                 hostname=tsub.hostname, 
>                 groupip=group, 
>                 portnum=int(port), 
>                 sourceip=source 
>                 ).on_conflict_do_update(
>                 constraint = updstable.primary_key,
>                 set_ = dict(sourceip=source, timecreated=updtime)
>             )
>             sess.execute(instmt)
>         transaction.commit()
>
> The statements appear to be well formed, but SQLA logs show a rollback for 
> every call to execute:
>
> 2017-06-28 12:55:12,136 INFO  
> [sqlalchemy.engine.base.Engine:679][MainThread] BEGIN (implicit)
> 2017-06-28 12:55:12,137 INFO  
> [sqlalchemy.engine.base.Engine:1140][MainThread] SELECT tile_sub.groupip AS 
> tile_sub_groupip, tile_sub.portnum AS tile_sub_portnum, tile_sub.sourceip 
> AS tile_sub_sourceip, tile_sub.hostname AS tile_sub_hostname, 
> tile_sub.timecreated AS tile_sub_timecreated 
> FROM tile_sub 
> WHERE tile_sub.groupip = %(groupip_1)s AND tile_sub.portnum = %(portnum_1)s
> 2017-06-28 12:55:12,137 INFO  
> [sqlalchemy.engine.base.Engine:1143][MainThread] {'groupip_1': 
> '239.129.237.126', 'portnum_1': '55002'}
> 2017-06-28 12:55:12,139 INFO  
> [sqlalchemy.engine.base.Engine:1140][MainThread] INSERT INTO tile_upd 
> (hostname, groupip, portnum, sourceip) VALUES (%(hostname)s, %(groupip)s, 
> %(portnum)s, %(sourceip)s) ON CONFLICT ON CONSTRAINT pk_tile_upd DO UPDATE 
> SET sourceip = %(param_1)s, timecreated = %(param_2)s
> 2017-06-28 12:55:12,139 INFO  
> [sqlalchemy.engine.base.Engine:1143][MainThread] {'hostname': 
> 'localhost:8080', 'groupip': '239.129.237.126', 'portnum': 55002, 
> 'sourceip': '172.28.190.54', 'param_1': '172.28.190.54', 'param_2': 
> datetime.datetime(2017, 6, 28, 12, 55, 12, 135541)}
> 2017-06-28 12:55:12,140 INFO  
> [sqlalchemy.engine.base.Engine:699][MainThread] ROLLBACK
> 0.0.0.0 - - [28/Jun/2017 12:55:12] "POST /loader HTTP/1.1" 200 17
> 2017-06-28 12:55:12,148 DEBUG [stempqm.views.qmviews:42][MainThread] 
> loader view
> 2017-06-28 12:55:12,149 INFO  
> [sqlalchemy.engine.base.Engine:679][MainThread] BEGIN (implicit)
> 2017-06-28 12:55:12,150 INFO  
> [sqlalchemy.engine.base.Engine:1140][MainThread] SELECT tile_sub.groupip AS 
> tile_sub_groupip, tile_sub.portnum AS tile_sub_portnum, tile_sub.sourceip 
> AS tile_sub_sourceip, tile_sub.hostname AS tile_sub_hostname, 
> tile_sub.timecreated AS tile_sub_timecreated 
> FROM tile_sub 
> WHERE tile_sub.groupip = %(groupip_1)s AND tile_sub.portnum = %(portnum_1)s
> 2017-06-28 12:55:12,151 INFO  
> [sqlalchemy.engine.base.Engine:1143][MainThread] {'groupip_1': 
> '239.129.237.128', 'portnum_1': '55002'}
> 2017-06-28 12:55:12,153 INFO  
> [sqlalchemy.engine.base.Engine:1140][MainThread] INSERT INTO tile_upd 
> (hostname, groupip, portnum, sourceip) VALUES (%(hostname)s, %(groupip)s, 
> %(portnum)s, %(sourceip)s) ON CONFLICT ON CONSTRAINT pk_tile_upd DO UPDATE 
> SET sourceip = %(param_1)s, timecreated = %(param_2)s
> 2017-06-28 12:55:12,153 INFO  
> [sqlalchemy.engine.base.Engine:1143][MainThread] {'hostname': 
> 'localhost:8080', 'groupip': '239.129.237.128', 'portnum': 55002, 
> 'sourceip': '172.28.190.64', 'param_1': '172.28.190.64', 'param_2': 
> datetime.datetime(2017, 6, 28, 12, 55, 12, 149604)}
> 2017-06-28 12:55:12,154 INFO  
> [sqlalchemy.engine.base.Engine:699][MainThread] ROLLBACK
>
> All of the log entries look OK, with the exception of the failure to 
> commit. However, the postgresql logs show the following:
>
> 2017-06-27 23:48:42.308 MDT [2913] richard@stemp STATEMENT:  INSERT INTO 
> tile_upd (hostname, groupip, portnum, sourceip, timesent) VALUES 
> ('localhost:8080', '239.129.235.37', '55002', '172.28.190.42', NULL)
> 2017-06-27 23:48:42.331 MDT [2914] richard@stemp ERROR:  duplicate key 
> value violates unique constraint "pk_tile_upd"
> 2017-06-27 23:48:42.331 MDT [2914] richard@stemp DETAIL:  Key (hostname, 
> groupip, portnum)=(localhost:8080, 239.129.236.128, 55002) already exists.
> 2017-06-27 23:48:42.331 MDT [2914] richard@stemp STATEMENT:  INSERT INTO 
> tile_upd (hostname, groupip, portnum, sourceip, timesent) VALUES 
> ('localhost:8080', '239.129.236.128', '55002', '172.28.190.69', NULL)
> 2017-06-27 23:48:42.346 MDT [2912] richard@stemp ERROR:  duplicate key 
> value violates unique constraint "pk_tile_upd"
> 2017-06-27 23:48:42.346 MDT [2912] richard@stemp DETAIL:  Key (hostname, 
> groupip, portnum)=(localhost:8080, 239.129.236.201, 55002) already exists.
> 2017-06-27 23:48:42.346 MDT [2912] richard@stemp STATEMENT:  INSERT INTO 
> tile_upd (hostname, groupip, portnum, sourceip, timesent) VALUES 
> ('localhost:8080', '239.129.236.201', '55002', '172.28.190.75', NULL)
> 2017-06-27 23:48:42.403 MDT [2912] richard@stemp ERROR:  duplicate key 
> value violates unique constraint "pk_tile_upd"
> 2017-06-27 23:48:42.403 MDT [2912] richard@stemp DETAIL:  Key (hostname, 
> groupip, portnum)=(localhost:8080, 239.129.237.19, 55002) already exists.
> 2017-06-27 23:48:42.403 MDT [2912] richard@stemp STATEMENT:  INSERT INTO 
> tile_upd (hostname, groupip, portnum, sourceip, timesent) VALUES 
> ('localhost:8080', '239.129.237.19', '55002', '172.28.190.65', NULL)
> 2017-06-27 23:48:42.468 MDT [2912] richard@stemp ERROR:  duplicate key 
> value violates unique constraint "pk_tile_upd"
>
>
> Additionally, I can run the equivalent statement from pgadmin just fine:
>
> INSERT INTO tile_upd 
> (hostname, groupip, portnum, sourceip) 
> VALUES ('localhost:8080', '239.129.135.40', 55002, '172.28.190.50') 
> ON CONFLICT ON CONSTRAINT pk_tile_upd DO UPDATE SET timecreated = 
> current_timestamp, timesent = NULL, sourceip = '172.28.190.50';
>
> Query returned successfully: one row affected, 24 msec execution time.
>
> I am absolutely puzzled, but it seems likely that pyramid_tm is in the way 
> somehow. It always wants to do its own thing, and calling commit explicitly 
> is something it seems to abhor. My next step is to wrap this in:
>
> with transaction.manager as tx:
>
> But this is really not what I want. I'm tempted to rip out all of the 
> zopish stuff and go with SQLA's session, but before I try that, I thought 
> it might be worthwhile to get some further information.
>
> This is python3.6, pg 9.6, pyramid 1.9b, and pyramid_tm 2.1 running on 
> stock debian.
>
> Thanks for any insights. I'd hate to start over due to something like 
> this. . .
>
> Richard
>

-- 
SQLAlchemy - 
The Python SQL Toolkit and Object Relational Mapper

http://www.sqlalchemy.org/

To post example code, please provide an MCVE: Minimal, Complete, and Verifiable 
Example.  See  http://stackoverflow.com/help/mcve for a full description.
--- 
You received this message because you are subscribed to the Google Groups 
"sqlalchemy" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to sqlalchemy+unsubscr...@googlegroups.com.
To post to this group, send email to sqlalchemy@googlegroups.com.
Visit this group at https://groups.google.com/group/sqlalchemy.
For more options, visit https://groups.google.com/d/optout.

Reply via email to