On 2020-03-24 16:19:21 -0700, Cary Huang wrote: > Hi > > > > From the PG logical replication documentation, I see that there is a > listed limitation that sequence relation is not replicated > logically. After some examination, I see that retrieving the next > value from a sequence using the nextval() call will emits a WAL update > every 32 calls to nextval(). In fact, when it emits a WAL update, it > will write a future value 32 increments from now, and maintain a > internal cache for delivering sequence numbers. It is done this way to > minimize the write operation to WAL record at a risk of losing some > values during a crash. So if we were to replicate the sequence, the > subscriber will receive a future value (32 calls to nextval()) from > now, and it obviously does not reflect current status. Sequence > changes caused by other sequence-related SQL functions like setval() > or ALTER SEQUENCE xxx, will always emit a WAL update, so replicating > changes caused by these should not be a problem. > > > > I have shared a patch that allows sequence relation to be supported in > logical replication via the decoding plugin ( test_decoding for example ); it > does not support sequence relation in logical replication between a PG > publisher and a PG subscriber via pgoutput plugin as it will require much > more work. For the replication to make sense, the patch actually disables the > WAL update at every 32 nextval() calls, so every call to nextval() will emit > a WAL update for proper replication. This is done by setting SEQ_LOG_VALS to > 0 in sequence.c > > > > I think the question is that should we minimize WAL update frequency (every > 32 calls) for getting next value in a sequence at a cost of losing values > during crash or being able to replicate a sequence relation properly at a > cost or more WAL updates? > > > > > > Cary Huang > > ------------- > > HighGo Software Inc. (Canada) > > mailto:cary.hu...@highgo.ca > > http://www.highgo.ca
> diff --git a/contrib/test_decoding/test_decoding.c > b/contrib/test_decoding/test_decoding.c > index 93c948856e..7a7e572d6c 100644 > --- a/contrib/test_decoding/test_decoding.c > +++ b/contrib/test_decoding/test_decoding.c > @@ -466,6 +466,15 @@ pg_decode_change(LogicalDecodingContext *ctx, > ReorderBufferTXN *txn, > > &change->data.tp.oldtuple->tuple, > true); > break; > + case REORDER_BUFFER_CHANGE_SEQUENCE: > + appendStringInfoString(ctx->out, " > SEQUENCE:"); > + if (change->data.sequence.newtuple == > NULL) > + > appendStringInfoString(ctx->out, " (no-tuple-data)"); > + else > + tuple_to_stringinfo(ctx->out, > tupdesc, > + > &change->data.sequence.newtuple->tuple, > + > false); > + break; > default: > Assert(false); > } > diff --git a/src/backend/commands/sequence.c b/src/backend/commands/sequence.c > index 6aab73bfd4..941015e4aa 100644 > --- a/src/backend/commands/sequence.c > +++ b/src/backend/commands/sequence.c > @@ -49,11 +49,10 @@ > > > /* > - * We don't want to log each fetching of a value from a sequence, > - * so we pre-log a few fetches in advance. In the event of > - * crash we can lose (skip over) as many values as we pre-logged. > + * Sequence replication is now supported and we will now need to log each > sequence > + * update to WAL such that the standby can properly receive the sequence > change > */ > -#define SEQ_LOG_VALS 32 > +#define SEQ_LOG_VALS 0 > > /* > * The "special area" of a sequence's buffer page looks like this. > diff --git a/src/backend/replication/logical/decode.c > b/src/backend/replication/logical/decode.c > index c2e5e3abf8..3dc14ead08 100644 > --- a/src/backend/replication/logical/decode.c > +++ b/src/backend/replication/logical/decode.c > @@ -42,6 +42,7 @@ > #include "replication/reorderbuffer.h" > #include "replication/snapbuild.h" > #include "storage/standby.h" > +#include "commands/sequence.h" > > typedef struct XLogRecordBuffer > { > @@ -70,9 +71,11 @@ static void DecodeCommit(LogicalDecodingContext *ctx, > XLogRecordBuffer *buf, > xl_xact_parsed_commit *parsed, > TransactionId xid); > static void DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, > xl_xact_parsed_abort *parsed, > TransactionId xid); > +static void DecodeSequence(LogicalDecodingContext *ctx, XLogRecordBuffer > *buf); > > /* common function to decode tuples */ > static void DecodeXLogTuple(char *data, Size len, ReorderBufferTupleBuf > *tup); > +static void DecodeSeqTuple(char *data, Size len, ReorderBufferTupleBuf > *tuple); > > /* > * Take every XLogReadRecord()ed record and perform the actions required to > @@ -130,6 +133,10 @@ LogicalDecodingProcessRecord(LogicalDecodingContext > *ctx, XLogReaderState *recor > DecodeLogicalMsgOp(ctx, &buf); > break; > > + case RM_SEQ_ID: > + DecodeSequence(ctx, &buf); > + break; > + > /* > * Rmgrs irrelevant for logical decoding; they describe > stuff not > * represented in logical decoding. Add new rmgrs in > rmgrlist.h's > @@ -145,7 +152,6 @@ LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, > XLogReaderState *recor > case RM_HASH_ID: > case RM_GIN_ID: > case RM_GIST_ID: > - case RM_SEQ_ID: > case RM_SPGIST_ID: > case RM_BRIN_ID: > case RM_COMMIT_TS_ID: > @@ -1052,3 +1058,80 @@ DecodeXLogTuple(char *data, Size len, > ReorderBufferTupleBuf *tuple) > header->t_infomask2 = xlhdr.t_infomask2; > header->t_hoff = xlhdr.t_hoff; > } > + > +/* > + * Decode Sequence Tuple > + */ > +static void > +DecodeSeqTuple(char *data, Size len, ReorderBufferTupleBuf *tuple) > +{ > + int datalen = len - sizeof(xl_seq_rec) - > SizeofHeapTupleHeader; > + > + Assert(datalen >= 0); > + > + tuple->tuple.t_len = datalen + SizeofHeapTupleHeader;; > + > + ItemPointerSetInvalid(&tuple->tuple.t_self); > + > + tuple->tuple.t_tableOid = InvalidOid; > + > + memcpy(((char *) tuple->tuple.t_data), > + data + sizeof(xl_seq_rec), > + SizeofHeapTupleHeader); > + > + memcpy(((char *) tuple->tuple.t_data) + SizeofHeapTupleHeader, > + data + sizeof(xl_seq_rec) + SizeofHeapTupleHeader, > + datalen); > +} > + > +/* > + * Handle sequence decode > + */ > +static void > +DecodeSequence(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) > +{ > + ReorderBufferChange *change; > + RelFileNode target_node; > + XLogReaderState *r = buf->record; > + char *tupledata = NULL; > + Size tuplelen; > + Size datalen = 0; > + uint8 info = XLogRecGetInfo(buf->record) & ~XLR_INFO_MASK; > + > + /* only decode changes flagged with XLOG_SEQ_LOG */ > + if (info != XLOG_SEQ_LOG) > + return; > + > + /* only interested in our database */ > + XLogRecGetBlockTag(r, 0, &target_node, NULL, NULL); > + if (target_node.dbNode != ctx->slot->data.database) > + return; > + > + /* output plugin doesn't look for this origin, no need to queue */ > + if (FilterByOrigin(ctx, XLogRecGetOrigin(r))) > + return; > + > + change = ReorderBufferGetChange(ctx->reorder); > + change->action = REORDER_BUFFER_CHANGE_SEQUENCE; > + change->origin_id = XLogRecGetOrigin(r); > + > + memcpy(&change->data.sequence.relnode, &target_node, > sizeof(RelFileNode)); > + > + tupledata = XLogRecGetData(r); > + datalen = XLogRecGetDataLen(r); > + > + if(!datalen || !tupledata) > + return; > + > + tuplelen = datalen - SizeOfHeapHeader - sizeof(xl_seq_rec); > + > + change->data.sequence.newtuple = > + ReorderBufferGetTupleBuf(ctx->reorder, tuplelen); > + > + DecodeSeqTuple(tupledata, datalen, change->data.sequence.newtuple); > + > + ReorderBufferXidSetCatalogChanges(ctx->reorder, > XLogRecGetXid(buf->record), buf->origptr); > + > + ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr, > change); > + > +} > diff --git a/src/backend/replication/logical/reorderbuffer.c > b/src/backend/replication/logical/reorderbuffer.c > index 481277a1fd..24f2cdf51d 100644 > --- a/src/backend/replication/logical/reorderbuffer.c > +++ b/src/backend/replication/logical/reorderbuffer.c > @@ -474,6 +474,13 @@ ReorderBufferReturnChange(ReorderBuffer *rb, > ReorderBufferChange *change) > case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID: > case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID: > break; > + case REORDER_BUFFER_CHANGE_SEQUENCE: > + if (change->data.sequence.newtuple) > + { > + ReorderBufferReturnTupleBuf(rb, > change->data.sequence.newtuple); > + change->data.sequence.newtuple = NULL; > + } > + break; > } > > pfree(change); > @@ -1833,6 +1840,38 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId > xid, > case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID: > elog(ERROR, "tuplecid value in > changequeue"); > break; > + case REORDER_BUFFER_CHANGE_SEQUENCE: > + Assert(snapshot_now); > + > + reloid = > RelidByRelfilenode(change->data.sequence.relnode.spcNode, > + > change->data.sequence.relnode.relNode); > + > + if (reloid == InvalidOid && > + change->data.sequence.newtuple > == NULL) > + goto change_done; > + else if (reloid == InvalidOid) > + elog(ERROR, "could not map > filenode \"%s\" to relation OID", > + > relpathperm(change->data.tp.relnode, > + > MAIN_FORKNUM)); > + > + relation = > RelationIdGetRelation(reloid); > + > + if (!RelationIsValid(relation)) > + elog(ERROR, "could not open > relation with OID %u (for filenode \"%s\")", > + reloid, > + > relpathperm(change->data.sequence.relnode, > + > MAIN_FORKNUM)); > + > + if > (!RelationIsLogicallyLogged(relation)) > + goto change_done; > + > + /* user-triggered change */ > + if (!IsToastRelation(relation)) > + { > + ReorderBufferToastReplace(rb, > txn, relation, change); > + rb->apply_change(rb, txn, > relation, change); > + } > + break; > } > } > > @@ -2516,15 +2555,23 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, > ReorderBufferTXN *txn, > case REORDER_BUFFER_CHANGE_UPDATE: > case REORDER_BUFFER_CHANGE_DELETE: > case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT: > + case REORDER_BUFFER_CHANGE_SEQUENCE: > { > char *data; > ReorderBufferTupleBuf *oldtup, > *newtup; > Size oldlen = 0; > Size newlen = 0; > - > - oldtup = change->data.tp.oldtuple; > - newtup = change->data.tp.newtuple; > + if (change->action == > REORDER_BUFFER_CHANGE_SEQUENCE) > + { > + oldtup = NULL; > + newtup = change->data.sequence.newtuple; > + } > + else > + { > + oldtup = change->data.tp.oldtuple; > + newtup = change->data.tp.newtuple; > + } > > if (oldtup) > { > @@ -2707,14 +2754,23 @@ ReorderBufferChangeSize(ReorderBufferChange *change) > case REORDER_BUFFER_CHANGE_UPDATE: > case REORDER_BUFFER_CHANGE_DELETE: > case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT: > + case REORDER_BUFFER_CHANGE_SEQUENCE: > { > ReorderBufferTupleBuf *oldtup, > *newtup; > Size oldlen = 0; > Size newlen = 0; > > - oldtup = change->data.tp.oldtuple; > - newtup = change->data.tp.newtuple; > + if (change->action == > REORDER_BUFFER_CHANGE_SEQUENCE) > + { > + oldtup = NULL; > + newtup = change->data.sequence.newtuple; > + } > + else > + { > + oldtup = change->data.tp.oldtuple; > + newtup = change->data.tp.newtuple; > + } > > if (oldtup) > { > @@ -3048,6 +3104,32 @@ ReorderBufferRestoreChange(ReorderBuffer *rb, > ReorderBufferTXN *txn, > case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID: > case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID: > break; > + case REORDER_BUFFER_CHANGE_SEQUENCE: > + if (change->data.sequence.newtuple) > + { > + /* here, data might not be suitably aligned! */ > + uint32 tuplelen; > + > + memcpy(&tuplelen, data + > offsetof(HeapTupleData, t_len), > + sizeof(uint32)); > + > + change->data.sequence.newtuple = > + ReorderBufferGetTupleBuf(rb, tuplelen - > SizeofHeapTupleHeader); > + > + /* restore ->tuple */ > + memcpy(&change->data.sequence.newtuple->tuple, > data, > + sizeof(HeapTupleData)); > + data += sizeof(HeapTupleData); > + > + /* reset t_data pointer into the new tuplebuf */ > + change->data.sequence.newtuple->tuple.t_data = > + > ReorderBufferTupleBufData(change->data.tp.newtuple); > + > + /* restore tuple data itself */ > + > memcpy(change->data.sequence.newtuple->tuple.t_data, data, tuplelen); > + data += tuplelen; > + } > + break; > } > > dlist_push_tail(&txn->changes, &change->node); > diff --git a/src/include/replication/reorderbuffer.h > b/src/include/replication/reorderbuffer.h > index 626ecf4dc9..cf3fd45c5f 100644 > --- a/src/include/replication/reorderbuffer.h > +++ b/src/include/replication/reorderbuffer.h > @@ -62,7 +62,8 @@ enum ReorderBufferChangeType > REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, > REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT, > REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM, > - REORDER_BUFFER_CHANGE_TRUNCATE > + REORDER_BUFFER_CHANGE_TRUNCATE, > + REORDER_BUFFER_CHANGE_SEQUENCE, > }; > > /* forward declaration */ > @@ -149,6 +150,15 @@ typedef struct ReorderBufferChange > CommandId cmax; > CommandId combocid; > } tuplecid; > + /* > + * Truncate data for REORDER_BUFFER_CHANGE_SEQUENCE > representing one > + * set of relations to be truncated. > + */ > + struct > + { > + RelFileNode relnode; > + ReorderBufferTupleBuf *newtuple; > + } sequence; > } data; > > /* Greetings, Andres Freund