On 2020-03-24 16:19:21 -0700, Cary Huang wrote:
> Hi
> 
> 
> 
> From the PG logical replication documentation, I see that there is a
> listed limitation that sequence relation is not replicated
> logically. After some examination, I see that retrieving the next
> value from a sequence using the nextval() call will emits a WAL update
> every 32 calls to nextval(). In fact, when it emits a WAL update, it
> will write a future value 32 increments from now, and maintain a
> internal cache for delivering sequence numbers. It is done this way to
> minimize the write operation to WAL record at a risk of losing some
> values during a crash. So if we were to replicate the sequence, the
> subscriber will receive a future value (32 calls to nextval()) from
> now, and it obviously does not reflect current status. Sequence
> changes caused by other sequence-related SQL functions like setval()
> or ALTER SEQUENCE xxx, will always emit a WAL update, so replicating
> changes caused by these should not be a problem. 
> 
> 
> 
> I have shared a patch that allows sequence relation to be supported in 
> logical replication via the decoding plugin ( test_decoding for example ); it 
> does not support sequence relation in logical replication between a PG 
> publisher and a PG subscriber via pgoutput plugin as it will require much 
> more work. For the replication to make sense, the patch actually disables the 
> WAL update at every 32 nextval() calls, so every call to nextval() will emit 
> a WAL update for proper replication. This is done by setting SEQ_LOG_VALS to 
> 0 in sequence.c
> 
> 
> 
> I think the question is that should we minimize WAL update frequency (every 
> 32 calls) for getting next value in a sequence at a cost of losing values 
> during crash or being able to replicate a sequence relation properly at a 
> cost or more WAL updates?
> 
> 
> 
> 
> 
> Cary Huang
> 
> -------------
> 
> HighGo Software Inc. (Canada)
> 
> mailto:cary.hu...@highgo.ca
> 
> http://www.highgo.ca

> diff --git a/contrib/test_decoding/test_decoding.c 
> b/contrib/test_decoding/test_decoding.c
> index 93c948856e..7a7e572d6c 100644
> --- a/contrib/test_decoding/test_decoding.c
> +++ b/contrib/test_decoding/test_decoding.c
> @@ -466,6 +466,15 @@ pg_decode_change(LogicalDecodingContext *ctx, 
> ReorderBufferTXN *txn,
>                                                                       
> &change->data.tp.oldtuple->tuple,
>                                                                       true);
>                       break;
> +             case REORDER_BUFFER_CHANGE_SEQUENCE:
> +                                     appendStringInfoString(ctx->out, " 
> SEQUENCE:");
> +                                     if (change->data.sequence.newtuple == 
> NULL)
> +                                             
> appendStringInfoString(ctx->out, " (no-tuple-data)");
> +                                     else
> +                                             tuple_to_stringinfo(ctx->out, 
> tupdesc,
> +                                                                             
>         &change->data.sequence.newtuple->tuple,
> +                                                                             
>         false);
> +                                     break;
>               default:
>                       Assert(false);
>       }
> diff --git a/src/backend/commands/sequence.c b/src/backend/commands/sequence.c
> index 6aab73bfd4..941015e4aa 100644
> --- a/src/backend/commands/sequence.c
> +++ b/src/backend/commands/sequence.c
> @@ -49,11 +49,10 @@
>  
>  
>  /*
> - * We don't want to log each fetching of a value from a sequence,
> - * so we pre-log a few fetches in advance. In the event of
> - * crash we can lose (skip over) as many values as we pre-logged.
> + * Sequence replication is now supported and we will now need to log each 
> sequence
> + * update to WAL such that the standby can properly receive the sequence 
> change
>   */
> -#define SEQ_LOG_VALS 32
> +#define SEQ_LOG_VALS 0
>  
>  /*
>   * The "special area" of a sequence's buffer page looks like this.
> diff --git a/src/backend/replication/logical/decode.c 
> b/src/backend/replication/logical/decode.c
> index c2e5e3abf8..3dc14ead08 100644
> --- a/src/backend/replication/logical/decode.c
> +++ b/src/backend/replication/logical/decode.c
> @@ -42,6 +42,7 @@
>  #include "replication/reorderbuffer.h"
>  #include "replication/snapbuild.h"
>  #include "storage/standby.h"
> +#include "commands/sequence.h"
>  
>  typedef struct XLogRecordBuffer
>  {
> @@ -70,9 +71,11 @@ static void DecodeCommit(LogicalDecodingContext *ctx, 
> XLogRecordBuffer *buf,
>                                                xl_xact_parsed_commit *parsed, 
> TransactionId xid);
>  static void DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
>                                               xl_xact_parsed_abort *parsed, 
> TransactionId xid);
> +static void DecodeSequence(LogicalDecodingContext *ctx, XLogRecordBuffer 
> *buf);
>  
>  /* common function to decode tuples */
>  static void DecodeXLogTuple(char *data, Size len, ReorderBufferTupleBuf 
> *tup);
> +static void DecodeSeqTuple(char *data, Size len, ReorderBufferTupleBuf 
> *tuple);
>  
>  /*
>   * Take every XLogReadRecord()ed record and perform the actions required to
> @@ -130,6 +133,10 @@ LogicalDecodingProcessRecord(LogicalDecodingContext 
> *ctx, XLogReaderState *recor
>                       DecodeLogicalMsgOp(ctx, &buf);
>                       break;
>  
> +             case RM_SEQ_ID:
> +                     DecodeSequence(ctx, &buf);
> +                     break;
> +
>                       /*
>                        * Rmgrs irrelevant for logical decoding; they describe 
> stuff not
>                        * represented in logical decoding. Add new rmgrs in 
> rmgrlist.h's
> @@ -145,7 +152,6 @@ LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, 
> XLogReaderState *recor
>               case RM_HASH_ID:
>               case RM_GIN_ID:
>               case RM_GIST_ID:
> -             case RM_SEQ_ID:
>               case RM_SPGIST_ID:
>               case RM_BRIN_ID:
>               case RM_COMMIT_TS_ID:
> @@ -1052,3 +1058,80 @@ DecodeXLogTuple(char *data, Size len, 
> ReorderBufferTupleBuf *tuple)
>       header->t_infomask2 = xlhdr.t_infomask2;
>       header->t_hoff = xlhdr.t_hoff;
>  }
> +
> +/*
> + * Decode Sequence Tuple
> + */
> +static void
> +DecodeSeqTuple(char *data, Size len, ReorderBufferTupleBuf *tuple)
> +{
> +     int                     datalen = len - sizeof(xl_seq_rec) - 
> SizeofHeapTupleHeader;
> +
> +     Assert(datalen >= 0);
> +
> +     tuple->tuple.t_len = datalen + SizeofHeapTupleHeader;;
> +
> +     ItemPointerSetInvalid(&tuple->tuple.t_self);
> +
> +     tuple->tuple.t_tableOid = InvalidOid;
> +
> +     memcpy(((char *) tuple->tuple.t_data),
> +                data + sizeof(xl_seq_rec),
> +                SizeofHeapTupleHeader);
> +
> +     memcpy(((char *) tuple->tuple.t_data) + SizeofHeapTupleHeader,
> +                data + sizeof(xl_seq_rec) + SizeofHeapTupleHeader,
> +                datalen);
> +}
> +
> +/*
> + * Handle sequence decode
> + */
> +static void
> +DecodeSequence(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
> +{
> +     ReorderBufferChange *change;
> +     RelFileNode target_node;
> +     XLogReaderState *r = buf->record;
> +     char       *tupledata = NULL;
> +     Size            tuplelen;
> +     Size            datalen = 0;
> +     uint8           info = XLogRecGetInfo(buf->record) & ~XLR_INFO_MASK;
> +
> +     /* only decode changes flagged with XLOG_SEQ_LOG  */
> +     if (info != XLOG_SEQ_LOG)
> +             return;
> +
> +     /* only interested in our database */
> +     XLogRecGetBlockTag(r, 0, &target_node, NULL, NULL);
> +     if (target_node.dbNode != ctx->slot->data.database)
> +             return;
> +
> +     /* output plugin doesn't look for this origin, no need to queue */
> +     if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
> +             return;
> +
> +     change = ReorderBufferGetChange(ctx->reorder);
> +     change->action = REORDER_BUFFER_CHANGE_SEQUENCE;
> +     change->origin_id = XLogRecGetOrigin(r);
> +
> +     memcpy(&change->data.sequence.relnode, &target_node, 
> sizeof(RelFileNode));
> +
> +     tupledata = XLogRecGetData(r);
> +     datalen = XLogRecGetDataLen(r);
> +
> +     if(!datalen || !tupledata)
> +             return;
> +
> +     tuplelen = datalen - SizeOfHeapHeader - sizeof(xl_seq_rec);
> +
> +     change->data.sequence.newtuple =
> +             ReorderBufferGetTupleBuf(ctx->reorder, tuplelen);
> +
> +     DecodeSeqTuple(tupledata, datalen, change->data.sequence.newtuple);
> +
> +     ReorderBufferXidSetCatalogChanges(ctx->reorder, 
> XLogRecGetXid(buf->record), buf->origptr);
> +
> +     ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr, 
> change);
> +
> +}
> diff --git a/src/backend/replication/logical/reorderbuffer.c 
> b/src/backend/replication/logical/reorderbuffer.c
> index 481277a1fd..24f2cdf51d 100644
> --- a/src/backend/replication/logical/reorderbuffer.c
> +++ b/src/backend/replication/logical/reorderbuffer.c
> @@ -474,6 +474,13 @@ ReorderBufferReturnChange(ReorderBuffer *rb, 
> ReorderBufferChange *change)
>               case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID:
>               case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID:
>                       break;
> +             case REORDER_BUFFER_CHANGE_SEQUENCE:
> +                     if (change->data.sequence.newtuple)
> +                     {
> +                             ReorderBufferReturnTupleBuf(rb, 
> change->data.sequence.newtuple);
> +                             change->data.sequence.newtuple = NULL;
> +                     }
> +                     break;
>       }
>  
>       pfree(change);
> @@ -1833,6 +1840,38 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId 
> xid,
>                               case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID:
>                                       elog(ERROR, "tuplecid value in 
> changequeue");
>                                       break;
> +                             case REORDER_BUFFER_CHANGE_SEQUENCE:
> +                                     Assert(snapshot_now);
> +
> +                                     reloid = 
> RelidByRelfilenode(change->data.sequence.relnode.spcNode,
> +                                                                             
>                 change->data.sequence.relnode.relNode);
> +
> +                                     if (reloid == InvalidOid &&
> +                                             change->data.sequence.newtuple 
> == NULL)
> +                                             goto change_done;
> +                                     else if (reloid == InvalidOid)
> +                                             elog(ERROR, "could not map 
> filenode \"%s\" to relation OID",
> +                                                      
> relpathperm(change->data.tp.relnode,
> +                                                                             
>  MAIN_FORKNUM));
> +
> +                                     relation = 
> RelationIdGetRelation(reloid);
> +
> +                                     if (!RelationIsValid(relation))
> +                                             elog(ERROR, "could not open 
> relation with OID %u (for filenode \"%s\")",
> +                                                      reloid,
> +                                                      
> relpathperm(change->data.sequence.relnode,
> +                                                                             
>  MAIN_FORKNUM));
> +
> +                                     if 
> (!RelationIsLogicallyLogged(relation))
> +                                             goto change_done;
> +
> +                                     /* user-triggered change */
> +                                     if (!IsToastRelation(relation))
> +                                     {
> +                                             ReorderBufferToastReplace(rb, 
> txn, relation, change);
> +                                             rb->apply_change(rb, txn, 
> relation, change);
> +                                     }
> +                                     break;
>                       }
>               }
>  
> @@ -2516,15 +2555,23 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, 
> ReorderBufferTXN *txn,
>               case REORDER_BUFFER_CHANGE_UPDATE:
>               case REORDER_BUFFER_CHANGE_DELETE:
>               case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT:
> +             case REORDER_BUFFER_CHANGE_SEQUENCE:
>                       {
>                               char       *data;
>                               ReorderBufferTupleBuf *oldtup,
>                                                  *newtup;
>                               Size            oldlen = 0;
>                               Size            newlen = 0;
> -
> -                             oldtup = change->data.tp.oldtuple;
> -                             newtup = change->data.tp.newtuple;
> +                             if (change->action == 
> REORDER_BUFFER_CHANGE_SEQUENCE)
> +                             {
> +                                     oldtup = NULL;
> +                                     newtup = change->data.sequence.newtuple;
> +                             }
> +                             else
> +                             {
> +                                     oldtup = change->data.tp.oldtuple;
> +                                     newtup = change->data.tp.newtuple;
> +                             }
>  
>                               if (oldtup)
>                               {
> @@ -2707,14 +2754,23 @@ ReorderBufferChangeSize(ReorderBufferChange *change)
>               case REORDER_BUFFER_CHANGE_UPDATE:
>               case REORDER_BUFFER_CHANGE_DELETE:
>               case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT:
> +             case REORDER_BUFFER_CHANGE_SEQUENCE:
>                       {
>                               ReorderBufferTupleBuf *oldtup,
>                                                  *newtup;
>                               Size            oldlen = 0;
>                               Size            newlen = 0;
>  
> -                             oldtup = change->data.tp.oldtuple;
> -                             newtup = change->data.tp.newtuple;
> +                             if (change->action == 
> REORDER_BUFFER_CHANGE_SEQUENCE)
> +                             {
> +                                     oldtup = NULL;
> +                                     newtup = change->data.sequence.newtuple;
> +                             }
> +                             else
> +                             {
> +                                     oldtup = change->data.tp.oldtuple;
> +                                     newtup = change->data.tp.newtuple;
> +                             }
>  
>                               if (oldtup)
>                               {
> @@ -3048,6 +3104,32 @@ ReorderBufferRestoreChange(ReorderBuffer *rb, 
> ReorderBufferTXN *txn,
>               case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID:
>               case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID:
>                       break;
> +             case REORDER_BUFFER_CHANGE_SEQUENCE:
> +                     if (change->data.sequence.newtuple)
> +                     {
> +                             /* here, data might not be suitably aligned! */
> +                             uint32          tuplelen;
> +
> +                             memcpy(&tuplelen, data + 
> offsetof(HeapTupleData, t_len),
> +                                        sizeof(uint32));
> +
> +                             change->data.sequence.newtuple =
> +                                     ReorderBufferGetTupleBuf(rb, tuplelen - 
> SizeofHeapTupleHeader);
> +
> +                             /* restore ->tuple */
> +                             memcpy(&change->data.sequence.newtuple->tuple, 
> data,
> +                                        sizeof(HeapTupleData));
> +                             data += sizeof(HeapTupleData);
> +
> +                             /* reset t_data pointer into the new tuplebuf */
> +                             change->data.sequence.newtuple->tuple.t_data =
> +                                     
> ReorderBufferTupleBufData(change->data.tp.newtuple);
> +
> +                             /* restore tuple data itself */
> +                             
> memcpy(change->data.sequence.newtuple->tuple.t_data, data, tuplelen);
> +                             data += tuplelen;
> +                     }
> +                     break;
>       }
>  
>       dlist_push_tail(&txn->changes, &change->node);
> diff --git a/src/include/replication/reorderbuffer.h 
> b/src/include/replication/reorderbuffer.h
> index 626ecf4dc9..cf3fd45c5f 100644
> --- a/src/include/replication/reorderbuffer.h
> +++ b/src/include/replication/reorderbuffer.h
> @@ -62,7 +62,8 @@ enum ReorderBufferChangeType
>       REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID,
>       REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT,
>       REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM,
> -     REORDER_BUFFER_CHANGE_TRUNCATE
> +     REORDER_BUFFER_CHANGE_TRUNCATE,
> +     REORDER_BUFFER_CHANGE_SEQUENCE,
>  };
>  
>  /* forward declaration */
> @@ -149,6 +150,15 @@ typedef struct ReorderBufferChange
>                       CommandId       cmax;
>                       CommandId       combocid;
>               }                       tuplecid;
> +             /*
> +              * Truncate data for REORDER_BUFFER_CHANGE_SEQUENCE 
> representing one
> +              * set of relations to be truncated.
> +              */
> +             struct
> +             {
> +                     RelFileNode relnode;
> +                     ReorderBufferTupleBuf *newtuple;
> +             }                       sequence;
>       }                       data;
>  
>       /*

Greetings,

Andres Freund


Reply via email to