On 2020-03-24 16:19:21 -0700, Cary Huang wrote:
> Hi
>
>
>
> From the PG logical replication documentation, I see that there is a
> listed limitation that sequence relation is not replicated
> logically. After some examination, I see that retrieving the next
> value from a sequence using the nextval() call will emits a WAL update
> every 32 calls to nextval(). In fact, when it emits a WAL update, it
> will write a future value 32 increments from now, and maintain a
> internal cache for delivering sequence numbers. It is done this way to
> minimize the write operation to WAL record at a risk of losing some
> values during a crash. So if we were to replicate the sequence, the
> subscriber will receive a future value (32 calls to nextval()) from
> now, and it obviously does not reflect current status. Sequence
> changes caused by other sequence-related SQL functions like setval()
> or ALTER SEQUENCE xxx, will always emit a WAL update, so replicating
> changes caused by these should not be a problem.
>
>
>
> I have shared a patch that allows sequence relation to be supported in
> logical replication via the decoding plugin ( test_decoding for example ); it
> does not support sequence relation in logical replication between a PG
> publisher and a PG subscriber via pgoutput plugin as it will require much
> more work. For the replication to make sense, the patch actually disables the
> WAL update at every 32 nextval() calls, so every call to nextval() will emit
> a WAL update for proper replication. This is done by setting SEQ_LOG_VALS to
> 0 in sequence.c
>
>
>
> I think the question is that should we minimize WAL update frequency (every
> 32 calls) for getting next value in a sequence at a cost of losing values
> during crash or being able to replicate a sequence relation properly at a
> cost or more WAL updates?
>
>
>
>
>
> Cary Huang
>
> -------------
>
> HighGo Software Inc. (Canada)
>
> mailto:[email protected]
>
> http://www.highgo.ca
> diff --git a/contrib/test_decoding/test_decoding.c
> b/contrib/test_decoding/test_decoding.c
> index 93c948856e..7a7e572d6c 100644
> --- a/contrib/test_decoding/test_decoding.c
> +++ b/contrib/test_decoding/test_decoding.c
> @@ -466,6 +466,15 @@ pg_decode_change(LogicalDecodingContext *ctx,
> ReorderBufferTXN *txn,
>
> &change->data.tp.oldtuple->tuple,
> true);
> break;
> + case REORDER_BUFFER_CHANGE_SEQUENCE:
> + appendStringInfoString(ctx->out, "
> SEQUENCE:");
> + if (change->data.sequence.newtuple ==
> NULL)
> +
> appendStringInfoString(ctx->out, " (no-tuple-data)");
> + else
> + tuple_to_stringinfo(ctx->out,
> tupdesc,
> +
> &change->data.sequence.newtuple->tuple,
> +
> false);
> + break;
> default:
> Assert(false);
> }
> diff --git a/src/backend/commands/sequence.c b/src/backend/commands/sequence.c
> index 6aab73bfd4..941015e4aa 100644
> --- a/src/backend/commands/sequence.c
> +++ b/src/backend/commands/sequence.c
> @@ -49,11 +49,10 @@
>
>
> /*
> - * We don't want to log each fetching of a value from a sequence,
> - * so we pre-log a few fetches in advance. In the event of
> - * crash we can lose (skip over) as many values as we pre-logged.
> + * Sequence replication is now supported and we will now need to log each
> sequence
> + * update to WAL such that the standby can properly receive the sequence
> change
> */
> -#define SEQ_LOG_VALS 32
> +#define SEQ_LOG_VALS 0
>
> /*
> * The "special area" of a sequence's buffer page looks like this.
> diff --git a/src/backend/replication/logical/decode.c
> b/src/backend/replication/logical/decode.c
> index c2e5e3abf8..3dc14ead08 100644
> --- a/src/backend/replication/logical/decode.c
> +++ b/src/backend/replication/logical/decode.c
> @@ -42,6 +42,7 @@
> #include "replication/reorderbuffer.h"
> #include "replication/snapbuild.h"
> #include "storage/standby.h"
> +#include "commands/sequence.h"
>
> typedef struct XLogRecordBuffer
> {
> @@ -70,9 +71,11 @@ static void DecodeCommit(LogicalDecodingContext *ctx,
> XLogRecordBuffer *buf,
> xl_xact_parsed_commit *parsed,
> TransactionId xid);
> static void DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
> xl_xact_parsed_abort *parsed,
> TransactionId xid);
> +static void DecodeSequence(LogicalDecodingContext *ctx, XLogRecordBuffer
> *buf);
>
> /* common function to decode tuples */
> static void DecodeXLogTuple(char *data, Size len, ReorderBufferTupleBuf
> *tup);
> +static void DecodeSeqTuple(char *data, Size len, ReorderBufferTupleBuf
> *tuple);
>
> /*
> * Take every XLogReadRecord()ed record and perform the actions required to
> @@ -130,6 +133,10 @@ LogicalDecodingProcessRecord(LogicalDecodingContext
> *ctx, XLogReaderState *recor
> DecodeLogicalMsgOp(ctx, &buf);
> break;
>
> + case RM_SEQ_ID:
> + DecodeSequence(ctx, &buf);
> + break;
> +
> /*
> * Rmgrs irrelevant for logical decoding; they describe
> stuff not
> * represented in logical decoding. Add new rmgrs in
> rmgrlist.h's
> @@ -145,7 +152,6 @@ LogicalDecodingProcessRecord(LogicalDecodingContext *ctx,
> XLogReaderState *recor
> case RM_HASH_ID:
> case RM_GIN_ID:
> case RM_GIST_ID:
> - case RM_SEQ_ID:
> case RM_SPGIST_ID:
> case RM_BRIN_ID:
> case RM_COMMIT_TS_ID:
> @@ -1052,3 +1058,80 @@ DecodeXLogTuple(char *data, Size len,
> ReorderBufferTupleBuf *tuple)
> header->t_infomask2 = xlhdr.t_infomask2;
> header->t_hoff = xlhdr.t_hoff;
> }
> +
> +/*
> + * Decode Sequence Tuple
> + */
> +static void
> +DecodeSeqTuple(char *data, Size len, ReorderBufferTupleBuf *tuple)
> +{
> + int datalen = len - sizeof(xl_seq_rec) -
> SizeofHeapTupleHeader;
> +
> + Assert(datalen >= 0);
> +
> + tuple->tuple.t_len = datalen + SizeofHeapTupleHeader;;
> +
> + ItemPointerSetInvalid(&tuple->tuple.t_self);
> +
> + tuple->tuple.t_tableOid = InvalidOid;
> +
> + memcpy(((char *) tuple->tuple.t_data),
> + data + sizeof(xl_seq_rec),
> + SizeofHeapTupleHeader);
> +
> + memcpy(((char *) tuple->tuple.t_data) + SizeofHeapTupleHeader,
> + data + sizeof(xl_seq_rec) + SizeofHeapTupleHeader,
> + datalen);
> +}
> +
> +/*
> + * Handle sequence decode
> + */
> +static void
> +DecodeSequence(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
> +{
> + ReorderBufferChange *change;
> + RelFileNode target_node;
> + XLogReaderState *r = buf->record;
> + char *tupledata = NULL;
> + Size tuplelen;
> + Size datalen = 0;
> + uint8 info = XLogRecGetInfo(buf->record) & ~XLR_INFO_MASK;
> +
> + /* only decode changes flagged with XLOG_SEQ_LOG */
> + if (info != XLOG_SEQ_LOG)
> + return;
> +
> + /* only interested in our database */
> + XLogRecGetBlockTag(r, 0, &target_node, NULL, NULL);
> + if (target_node.dbNode != ctx->slot->data.database)
> + return;
> +
> + /* output plugin doesn't look for this origin, no need to queue */
> + if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
> + return;
> +
> + change = ReorderBufferGetChange(ctx->reorder);
> + change->action = REORDER_BUFFER_CHANGE_SEQUENCE;
> + change->origin_id = XLogRecGetOrigin(r);
> +
> + memcpy(&change->data.sequence.relnode, &target_node,
> sizeof(RelFileNode));
> +
> + tupledata = XLogRecGetData(r);
> + datalen = XLogRecGetDataLen(r);
> +
> + if(!datalen || !tupledata)
> + return;
> +
> + tuplelen = datalen - SizeOfHeapHeader - sizeof(xl_seq_rec);
> +
> + change->data.sequence.newtuple =
> + ReorderBufferGetTupleBuf(ctx->reorder, tuplelen);
> +
> + DecodeSeqTuple(tupledata, datalen, change->data.sequence.newtuple);
> +
> + ReorderBufferXidSetCatalogChanges(ctx->reorder,
> XLogRecGetXid(buf->record), buf->origptr);
> +
> + ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr,
> change);
> +
> +}
> diff --git a/src/backend/replication/logical/reorderbuffer.c
> b/src/backend/replication/logical/reorderbuffer.c
> index 481277a1fd..24f2cdf51d 100644
> --- a/src/backend/replication/logical/reorderbuffer.c
> +++ b/src/backend/replication/logical/reorderbuffer.c
> @@ -474,6 +474,13 @@ ReorderBufferReturnChange(ReorderBuffer *rb,
> ReorderBufferChange *change)
> case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID:
> case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID:
> break;
> + case REORDER_BUFFER_CHANGE_SEQUENCE:
> + if (change->data.sequence.newtuple)
> + {
> + ReorderBufferReturnTupleBuf(rb,
> change->data.sequence.newtuple);
> + change->data.sequence.newtuple = NULL;
> + }
> + break;
> }
>
> pfree(change);
> @@ -1833,6 +1840,38 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId
> xid,
> case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID:
> elog(ERROR, "tuplecid value in
> changequeue");
> break;
> + case REORDER_BUFFER_CHANGE_SEQUENCE:
> + Assert(snapshot_now);
> +
> + reloid =
> RelidByRelfilenode(change->data.sequence.relnode.spcNode,
> +
> change->data.sequence.relnode.relNode);
> +
> + if (reloid == InvalidOid &&
> + change->data.sequence.newtuple
> == NULL)
> + goto change_done;
> + else if (reloid == InvalidOid)
> + elog(ERROR, "could not map
> filenode \"%s\" to relation OID",
> +
> relpathperm(change->data.tp.relnode,
> +
> MAIN_FORKNUM));
> +
> + relation =
> RelationIdGetRelation(reloid);
> +
> + if (!RelationIsValid(relation))
> + elog(ERROR, "could not open
> relation with OID %u (for filenode \"%s\")",
> + reloid,
> +
> relpathperm(change->data.sequence.relnode,
> +
> MAIN_FORKNUM));
> +
> + if
> (!RelationIsLogicallyLogged(relation))
> + goto change_done;
> +
> + /* user-triggered change */
> + if (!IsToastRelation(relation))
> + {
> + ReorderBufferToastReplace(rb,
> txn, relation, change);
> + rb->apply_change(rb, txn,
> relation, change);
> + }
> + break;
> }
> }
>
> @@ -2516,15 +2555,23 @@ ReorderBufferSerializeChange(ReorderBuffer *rb,
> ReorderBufferTXN *txn,
> case REORDER_BUFFER_CHANGE_UPDATE:
> case REORDER_BUFFER_CHANGE_DELETE:
> case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT:
> + case REORDER_BUFFER_CHANGE_SEQUENCE:
> {
> char *data;
> ReorderBufferTupleBuf *oldtup,
> *newtup;
> Size oldlen = 0;
> Size newlen = 0;
> -
> - oldtup = change->data.tp.oldtuple;
> - newtup = change->data.tp.newtuple;
> + if (change->action ==
> REORDER_BUFFER_CHANGE_SEQUENCE)
> + {
> + oldtup = NULL;
> + newtup = change->data.sequence.newtuple;
> + }
> + else
> + {
> + oldtup = change->data.tp.oldtuple;
> + newtup = change->data.tp.newtuple;
> + }
>
> if (oldtup)
> {
> @@ -2707,14 +2754,23 @@ ReorderBufferChangeSize(ReorderBufferChange *change)
> case REORDER_BUFFER_CHANGE_UPDATE:
> case REORDER_BUFFER_CHANGE_DELETE:
> case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT:
> + case REORDER_BUFFER_CHANGE_SEQUENCE:
> {
> ReorderBufferTupleBuf *oldtup,
> *newtup;
> Size oldlen = 0;
> Size newlen = 0;
>
> - oldtup = change->data.tp.oldtuple;
> - newtup = change->data.tp.newtuple;
> + if (change->action ==
> REORDER_BUFFER_CHANGE_SEQUENCE)
> + {
> + oldtup = NULL;
> + newtup = change->data.sequence.newtuple;
> + }
> + else
> + {
> + oldtup = change->data.tp.oldtuple;
> + newtup = change->data.tp.newtuple;
> + }
>
> if (oldtup)
> {
> @@ -3048,6 +3104,32 @@ ReorderBufferRestoreChange(ReorderBuffer *rb,
> ReorderBufferTXN *txn,
> case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID:
> case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID:
> break;
> + case REORDER_BUFFER_CHANGE_SEQUENCE:
> + if (change->data.sequence.newtuple)
> + {
> + /* here, data might not be suitably aligned! */
> + uint32 tuplelen;
> +
> + memcpy(&tuplelen, data +
> offsetof(HeapTupleData, t_len),
> + sizeof(uint32));
> +
> + change->data.sequence.newtuple =
> + ReorderBufferGetTupleBuf(rb, tuplelen -
> SizeofHeapTupleHeader);
> +
> + /* restore ->tuple */
> + memcpy(&change->data.sequence.newtuple->tuple,
> data,
> + sizeof(HeapTupleData));
> + data += sizeof(HeapTupleData);
> +
> + /* reset t_data pointer into the new tuplebuf */
> + change->data.sequence.newtuple->tuple.t_data =
> +
> ReorderBufferTupleBufData(change->data.tp.newtuple);
> +
> + /* restore tuple data itself */
> +
> memcpy(change->data.sequence.newtuple->tuple.t_data, data, tuplelen);
> + data += tuplelen;
> + }
> + break;
> }
>
> dlist_push_tail(&txn->changes, &change->node);
> diff --git a/src/include/replication/reorderbuffer.h
> b/src/include/replication/reorderbuffer.h
> index 626ecf4dc9..cf3fd45c5f 100644
> --- a/src/include/replication/reorderbuffer.h
> +++ b/src/include/replication/reorderbuffer.h
> @@ -62,7 +62,8 @@ enum ReorderBufferChangeType
> REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID,
> REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT,
> REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM,
> - REORDER_BUFFER_CHANGE_TRUNCATE
> + REORDER_BUFFER_CHANGE_TRUNCATE,
> + REORDER_BUFFER_CHANGE_SEQUENCE,
> };
>
> /* forward declaration */
> @@ -149,6 +150,15 @@ typedef struct ReorderBufferChange
> CommandId cmax;
> CommandId combocid;
> } tuplecid;
> + /*
> + * Truncate data for REORDER_BUFFER_CHANGE_SEQUENCE
> representing one
> + * set of relations to be truncated.
> + */
> + struct
> + {
> + RelFileNode relnode;
> + ReorderBufferTupleBuf *newtuple;
> + } sequence;
> } data;
>
> /*
Greetings,
Andres Freund