On Fri, Feb 20, 2015 at 3:44 PM, Peter Geoghegan <p...@heroku.com> wrote: >>> they'd only see a >>> REORDER_BUFFER_CHANGE_INSERT when that was the definitive outcome of >>> an UPSERT, or alternatively a REORDER_BUFFER_CHANGE_UPDATE when that >>> was the definitive outcome. No need for output plugins to consider >>> REORDER_BUFFER_CHANGE_INTERNAL_SUPERDELETE at all. >> >> Yes. It'd be easiest if the only the final insert/update were actually >> WAL logged as full actions. > > Well, that implies that we'd actually know that we'd succeed when WAL > logging the speculative heap tuple's insertion. We literally have no > way of knowing if that's the case at that point, though - that's just > the nature of value locking scheme #2's optimistic approach.
Attached patch does this. This is just a sketch, to provoke a quick discussion (which is hopefully all this needs). I did things this way because it seemed more efficient than rolling this into a whole new revision of the ON CONFLICT patch series. Ostensibly, this does the right thing: The test_decoding output plugin only ever reports either an INSERT or an UPDATE within a transaction (that contains an INSERT ... ON CONFLICT UPDATE). With jjanes_upsert running, I have not found any problematic cases where this fails to be true (although I have certainly been less than thorough so far - please take that into consideration). I have two basic concerns: * Do you think that what I've sketched here is roughly the right approach? Consolidating super deletions and insertions at transaction reassembly seemed like the right thing to do, but obviously I'm not best qualified to judge that. Clearly, it would be possible to optimize this so REORDER_BUFFER_CHANGE_INSERT "peek ahead" happens much more selectively, which I haven't bothered with yet. * What are the hazards implied by my abuse of the ReorderBufferIterTXNNext() interface? It looks like it should be okay to "peek ahead" like this, because it has a slightly odd convention around memory management that accidentally works. But honestly, I'm feeling too lazy to make myself grok the code within ReorderBufferIterTXNNext() at the moment, and so have not put concerns about the code being totally broken to rest yet. Could the "current" ReorderBufferChange be freed before it is sent to a logical decoding plugin by way of a call to the rb->apply_change() callback (when the xact is spooled to disk, for example)? Thanks -- Peter Geoghegan
From 53784c818b39d39c6a1f3588f7823a9010074869 Mon Sep 17 00:00:00 2001 From: Peter Geoghegan <peter.geoghega...@gmail.com> Date: Sun, 22 Feb 2015 17:02:14 -0800 Subject: [PATCH 7/7] Logical decoding support for ON CONFLICT UPDATE --- src/backend/replication/logical/decode.c | 5 +++- src/backend/replication/logical/reorderbuffer.c | 35 +++++++++++++++++++++++-- src/include/replication/reorderbuffer.h | 3 ++- 3 files changed, 39 insertions(+), 4 deletions(-) diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c index e7614bd..a9cd0df 100644 --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -682,7 +682,10 @@ DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) return; change = ReorderBufferGetChange(ctx->reorder); - change->action = REORDER_BUFFER_CHANGE_DELETE; + if (!(xlrec->flags & XLOG_HEAP_KILLED_SPECULATIVE_TUPLE)) + change->action = REORDER_BUFFER_CHANGE_DELETE; + else + change->action = REORDER_BUFFER_CHANGE_INTERNAL_DELETE; memcpy(&change->data.tp.relnode, &target_node, sizeof(RelFileNode)); diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index 20bb3b7..8f9101e 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -401,6 +401,7 @@ ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change) case REORDER_BUFFER_CHANGE_INSERT: case REORDER_BUFFER_CHANGE_UPDATE: case REORDER_BUFFER_CHANGE_DELETE: + case REORDER_BUFFER_CHANGE_INTERNAL_DELETE: if (change->data.tp.newtuple) { ReorderBufferReturnTupleBuf(rb, change->data.tp.newtuple); @@ -1313,7 +1314,12 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, PG_TRY(); { + /* + * To merge super delete and insert records, peek-ahead is occasionally + * required + */ ReorderBufferChange *change; + ReorderBufferChange *nextchange = NULL; if (using_subtxn) BeginInternalSubTransaction("replay"); @@ -1323,16 +1329,21 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, rb->begin(rb, txn); iterstate = ReorderBufferIterTXNInit(rb, txn); - while ((change = ReorderBufferIterTXNNext(rb, iterstate)) != NULL) + while ((change = nextchange ? nextchange : + ReorderBufferIterTXNNext(rb, iterstate)) != NULL) { Relation relation = NULL; Oid reloid; + /* Forget about previous peek ahead */ + nextchange = NULL; + switch (change->action) { case REORDER_BUFFER_CHANGE_INSERT: case REORDER_BUFFER_CHANGE_UPDATE: case REORDER_BUFFER_CHANGE_DELETE: + case REORDER_BUFFER_CHANGE_INTERNAL_DELETE: Assert(snapshot_now); reloid = RelidByRelfilenode(change->data.tp.relnode.spcNode, @@ -1374,7 +1385,23 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, else if (!IsToastRelation(relation)) { ReorderBufferToastReplace(rb, txn, relation, change); - rb->apply_change(rb, txn, relation, change); + + /* + * Speculative insertion occasionally makes use of + * "super deletion" -- an implementation defined + * delete of a speculatively inserted tuple. + * Neither the super deletion, nor the insertion + * (which must be the prior record type) are + * included in the final assembly. + */ + if (change->action == REORDER_BUFFER_CHANGE_INSERT) + nextchange = ReorderBufferIterTXNNext(rb, iterstate); + + if (change->action != + REORDER_BUFFER_CHANGE_INTERNAL_DELETE && + (!nextchange || nextchange->action != + REORDER_BUFFER_CHANGE_INTERNAL_DELETE)) + rb->apply_change(rb, txn, relation, change); /* * Only clear reassembled toast chunks if we're @@ -2003,6 +2030,8 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn, case REORDER_BUFFER_CHANGE_UPDATE: /* fall through */ case REORDER_BUFFER_CHANGE_DELETE: + /* fall through */ + case REORDER_BUFFER_CHANGE_INTERNAL_DELETE: { char *data; ReorderBufferTupleBuf *oldtup, @@ -2258,6 +2287,8 @@ ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn, case REORDER_BUFFER_CHANGE_UPDATE: /* fall through */ case REORDER_BUFFER_CHANGE_DELETE: + /* fall through */ + case REORDER_BUFFER_CHANGE_INTERNAL_DELETE: if (change->data.tp.newtuple) { Size len = offsetof(ReorderBufferTupleBuf, t_data) + diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h index f1e0f57..c1a8819 100644 --- a/src/include/replication/reorderbuffer.h +++ b/src/include/replication/reorderbuffer.h @@ -51,7 +51,8 @@ enum ReorderBufferChangeType REORDER_BUFFER_CHANGE_DELETE, REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT, REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID, - REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID + REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, + REORDER_BUFFER_CHANGE_INTERNAL_DELETE }; /* -- 1.9.1
-- Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org) To make changes to your subscription: http://www.postgresql.org/mailpref/pgsql-hackers