On 12/7/21 15:38, Peter Eisentraut wrote:
> I have checked the 0001 and 0003 patches.  (I think we have dismissed
> the approach in 0002 for now.  And let's talk about 0004 later.)
> 

Right, I think that's correct.

> I have attached a few fixup patches, described further below.
> 
> # 0001
> 
> The argument "create" for fill_seq_with_data() is always true (and
> patch 0002 removes it).  So I'm not sure if it's needed.  If it is, it
> should be documented somewhere.
> 

Good point. I think it could be removed, but IIRC it'll be needed when
adding sequence decoding to the built-in replication, and that patch is
meant to be just an implementation of the API, without changing WAL.

OTOH I don't see it in the last version of that patch (in ResetSequence2
call) so maybe it's not really needed. I've kept it for now, but I'll
investigate.

> About the comment you added in nextval_internal(): It's a good
> explanation, so I would leave it in.  I also agree with the
> conclusion of the comment that the current solution is reasonable.  We
> probably don't need the same comment again in fill_seq_with_data() and
> again in do_setval().  Note that both of the latter functions already
> point to nextval_interval() for other comments, so the same can be
> relied upon here as well.
> 

True. I moved it a bit in nextval_internal() and removed the other
copies. The existing references should be enough.

> The order of the new fields sequence_cb and stream_sequence_cb is a
> bit inconsistent compared to truncate_cb and stream_truncate_cb.
> Maybe take another look to make the order more uniform.
> 

You mean in OutputPluginCallbacks? I'd actually argue it's the truncate
callbacks that are inconsistent - in the regular section truncate_cb is
right before commit_cb, while in the streaming section it's at the end.

Or what order do you think would be better? I'm fine with changing it.

> Some documentation needs to be added to the Logical Decoding chapter.
> I have attached a patch that I think catches all the places that need
> to be updated, with some details left for you to fill in. ;-) The
> ordering of the some of the documentation chunks reflects the order in
> which the callbacks appear in the header files, which might not be
> optimal; see above.
> 

Thanks. I added a bit about the callbacks being optional and what the
parameters mean (only for sequence_cb, as the stream_ callbacks
generally don't copy that bit).

> In reorderbuffer.c, you left a comment about how to access a sequence
> tuple.  There is an easier way, using Form_pg_sequence_data, which is
> how sequence.c also does it.  See attached patch.
> 

Yeah, that looks much nicer.

> # 0003
> 
> The tests added in 0003 don't pass for me.  It appears that you might
> have forgotten to update the expected files after you added some tests
> or something.  The cfbot shows the same.  See attached patch for a
> correction, but do check what your intent was.
> 

Yeah. I suspect I removed the expected results due to the experimental
rework. I haven't noticed that because some of the tests for the
built-in replication are expected to fail.


Attached is an updated version of the first two parts (infrastructure
and test_decoding), with all your fixes merged.


regards

-- 
Tomas Vondra
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company
From 999514b9f802db9b888543532355e9c36646da30 Mon Sep 17 00:00:00 2001
From: Tomas Vondra <tomas.von...@postgresql.org>
Date: Fri, 24 Sep 2021 00:41:33 +0200
Subject: [PATCH 1/2] Logical decoding of sequences

This adds the infrastructure for logical decoding of sequences. Support
for built-in logical replication and test_decoding is added separately.

When decoding sequences, we differentiate between a sequences created in
a (running) transaction, and sequences created in other (already
committed) transactions. Changes for sequences created in the same
top-level transaction are treated as "transactional" i.e. just like any
other change from that transaction (and discarded in case of a
rollback). Changes for sequences created earlier are treated as not
transactional - are processed immediately, as if performed outside any
transaction (and thus not rolled back).

This mixed behavior is necessary - sequences are non-transactional (e.g.
ROLLBACK does not undo the sequence increments). But for new sequences,
we need to handle them in a transactional way, because if we ever get
some DDL support, the sequence won't exist until the transaction gets
applied. So we need to ensure the increments don't happen until the
sequence gets created.

To differentiate which sequences are "old" and which were created in a
still-running transaction, we track sequences created in running
transactions in a hash table. Each sequence is identified by it's
relfilenode, and at transaction commit we discard all sequences created
in that particular transaction. For each sequence we track the XID of
the (sub)transaction that created it, and we cleanup the sequences for
each subtransaction when it completes (commit/rollback).

We don't use the XID to check if it's the same top-level transaction.
It's enough to know it was created in an in-progress transaction, and we
know it must be the current one because otherwise it wouldn't see the
sequence object.

The main changes in this patch are:

1) ensure WAL-logging of all necessary info for sequence advances

We need to be able to associate the advance with a XID, but until now
sequence advance might have XID 0 if it was the first thing the
transaction did. So we ensure the transaction has XID.

Note: This is needed because of subxacts. A XID 0 might still have the
sequence created in a different subxact of the same top-level xact.

Then there's a "created" flag added to the xl_seq_rec, which is
necessary to differentiate WAL for created/reset sequences.

2) decoding / queueing of sequences into ReorderBuffer

This is mostly copy-paste of existing code for other decoded events.

3) tracking sequences created in in-progress transactions

We use a simple hash table, indexed by relfilenode.
---
 doc/src/sgml/logicaldecoding.sgml             |  65 ++-
 src/backend/commands/sequence.c               |  40 +-
 src/backend/replication/logical/decode.c      | 131 +++++-
 src/backend/replication/logical/logical.c     |  89 ++++
 .../replication/logical/reorderbuffer.c       | 403 ++++++++++++++++++
 src/include/commands/sequence.h               |   1 +
 src/include/replication/output_plugin.h       |  29 ++
 src/include/replication/reorderbuffer.h       |  44 +-
 8 files changed, 791 insertions(+), 11 deletions(-)

diff --git a/doc/src/sgml/logicaldecoding.sgml b/doc/src/sgml/logicaldecoding.sgml
index b6353c7a12..4ede56678c 100644
--- a/doc/src/sgml/logicaldecoding.sgml
+++ b/doc/src/sgml/logicaldecoding.sgml
@@ -458,6 +458,7 @@ typedef struct OutputPluginCallbacks
     LogicalDecodeTruncateCB truncate_cb;
     LogicalDecodeCommitCB commit_cb;
     LogicalDecodeMessageCB message_cb;
+    LogicalDecodeSequenceCB sequence_cb;
     LogicalDecodeFilterByOriginCB filter_by_origin_cb;
     LogicalDecodeShutdownCB shutdown_cb;
     LogicalDecodeFilterPrepareCB filter_prepare_cb;
@@ -472,6 +473,7 @@ typedef struct OutputPluginCallbacks
     LogicalDecodeStreamCommitCB stream_commit_cb;
     LogicalDecodeStreamChangeCB stream_change_cb;
     LogicalDecodeStreamMessageCB stream_message_cb;
+    LogicalDecodeStreamSequenceCB stream_sequence_cb;
     LogicalDecodeStreamTruncateCB stream_truncate_cb;
 } OutputPluginCallbacks;
 
@@ -481,9 +483,12 @@ typedef void (*LogicalOutputPluginInit) (struct OutputPluginCallbacks *cb);
      and <function>commit_cb</function> callbacks are required,
      while <function>startup_cb</function>,
      <function>filter_by_origin_cb</function>, <function>truncate_cb</function>,
+     <function>sequence_cb</function>,
      and <function>shutdown_cb</function> are optional.
      If <function>truncate_cb</function> is not set but a
      <command>TRUNCATE</command> is to be decoded, the action will be ignored.
+     Similarly, if <function>sequence_cb</function> is not set and a sequence
+     change is to be decoded, the action will be ignored.
     </para>
 
     <para>
@@ -492,7 +497,8 @@ typedef void (*LogicalOutputPluginInit) (struct OutputPluginCallbacks *cb);
      <function>stream_stop_cb</function>, <function>stream_abort_cb</function>,
      <function>stream_commit_cb</function>, <function>stream_change_cb</function>,
      and <function>stream_prepare_cb</function>
-     are required, while <function>stream_message_cb</function> and
+     are required, while <function>stream_message_cb</function>,
+     <function>stream_sequence_cb</function>, and
      <function>stream_truncate_cb</function> are optional.
     </para>
 
@@ -808,6 +814,37 @@ typedef void (*LogicalDecodeMessageCB) (struct LogicalDecodingContext *ctx,
      </para>
     </sect3>
 
+    <sect3 id="logicaldecoding-output-plugin-sequence">
+     <title>Sequence Callback</title>
+
+     <para>
+      The optional <function>sequence_cb</function> callback is called for
+      actions that update a sequence value.
+<programlisting>
+typedef void (*LogicalDecodeSequenceCB) (struct LogicalDecodingContext *ctx,
+                                         ReorderBufferTXN *txn,
+                                         XLogRecPtr sequence_lsn,
+                                         Relation rel,
+                                         bool transactional,
+                                         bool created,
+                                         int64 last_value,
+                                         int64 log_cnt,
+                                         bool is_called);
+</programlisting>
+      The <parameter>txn</parameter> parameter contains meta information about
+      the transaction the sequence change is part of. Note however that it can
+      be NULL when the sequence change is non-transactional and the XID was not
+      assigned yet in the transaction which updated the sequence value.
+      The <parameter>sequence_lsn</parameter> has WAL location of the sequence
+      update. The <parameter>transactional</parameter> says if the sequence has
+      to be replayed as part of the transaction or directly.
+
+      The <parameter>created</parameter>, <parameter>last_value</parameter>,
+      <parameter>log_cnt</parameter> and <parameter>is_called</parameter> parameters
+      describe the sequence change.
+     </para>
+    </sect3>
+
     <sect3 id="logicaldecoding-output-plugin-filter-prepare">
      <title>Prepare Filter Callback</title>
 
@@ -1017,6 +1054,27 @@ typedef void (*LogicalDecodeStreamMessageCB) (struct LogicalDecodingContext *ctx
      </para>
     </sect3>
 
+    <sect3 id="logicaldecoding-output-plugin-stream-sequence">
+     <title>Stream Sequence Callback</title>
+     <para>
+      The optional <function>stream_sequence_cb</function> callback is called
+      for actions that change a sequence in a block of streamed changes
+      (demarcated by <function>stream_start_cb</function> and
+      <function>stream_stop_cb</function> calls).
+<programlisting>
+typedef void (*LogicalDecodeStreamSequenceCB) (struct LogicalDecodingContext *ctx,
+                                               ReorderBufferTXN *txn,
+                                               XLogRecPtr sequence_lsn,
+                                               Relation rel,
+                                               bool transactional,
+                                               bool created,
+                                               int64 last_value,
+                                               int64 log_cnt,
+                                               bool is_called);
+</programlisting>
+     </para>
+    </sect3>
+
     <sect3 id="logicaldecoding-output-plugin-stream-truncate">
      <title>Stream Truncate Callback</title>
      <para>
@@ -1197,8 +1255,9 @@ OutputPluginWrite(ctx, true);
     in-progress transactions. There are multiple required streaming callbacks
     (<function>stream_start_cb</function>, <function>stream_stop_cb</function>,
     <function>stream_abort_cb</function>, <function>stream_commit_cb</function>
-    and <function>stream_change_cb</function>) and two optional callbacks
-    (<function>stream_message_cb</function> and <function>stream_truncate_cb</function>).
+    and <function>stream_change_cb</function>) and multiple optional callbacks
+    (<function>stream_message_cb</function>, <function>stream_sequence_cb</function>,
+    and <function>stream_truncate_cb</function>).
     Also, if streaming of two-phase commands is to be supported, then additional
     callbacks must be provided. (See <xref linkend="logicaldecoding-two-phase-commits"/>
     for details).
diff --git a/src/backend/commands/sequence.c b/src/backend/commands/sequence.c
index 72bfdc07a4..c47c09a94e 100644
--- a/src/backend/commands/sequence.c
+++ b/src/backend/commands/sequence.c
@@ -94,7 +94,7 @@ static HTAB *seqhashtab = NULL; /* hash table for SeqTable items */
  */
 static SeqTableData *last_used_seq = NULL;
 
-static void fill_seq_with_data(Relation rel, HeapTuple tuple);
+static void fill_seq_with_data(Relation rel, HeapTuple tuple, bool create);
 static Relation lock_and_open_sequence(SeqTable seq);
 static void create_seq_hashtable(void);
 static void init_sequence(Oid relid, SeqTable *p_elm, Relation *p_rel);
@@ -222,7 +222,7 @@ DefineSequence(ParseState *pstate, CreateSeqStmt *seq)
 
 	/* now initialize the sequence's data */
 	tuple = heap_form_tuple(tupDesc, value, null);
-	fill_seq_with_data(rel, tuple);
+	fill_seq_with_data(rel, tuple, true);
 
 	/* process OWNED BY if given */
 	if (owned_by)
@@ -327,7 +327,7 @@ ResetSequence(Oid seq_relid)
 	/*
 	 * Insert the modified tuple into the new storage file.
 	 */
-	fill_seq_with_data(seq_rel, tuple);
+	fill_seq_with_data(seq_rel, tuple, true);
 
 	/* Clear local cache so that we don't think we have cached numbers */
 	/* Note that we do not change the currval() state */
@@ -340,7 +340,7 @@ ResetSequence(Oid seq_relid)
  * Initialize a sequence's relation with the specified tuple as content
  */
 static void
-fill_seq_with_data(Relation rel, HeapTuple tuple)
+fill_seq_with_data(Relation rel, HeapTuple tuple, bool create)
 {
 	Buffer		buf;
 	Page		page;
@@ -378,7 +378,10 @@ fill_seq_with_data(Relation rel, HeapTuple tuple)
 
 	/* check the comment above nextval_internal()'s equivalent call. */
 	if (RelationNeedsWAL(rel))
+	{
 		GetTopTransactionId();
+		GetCurrentTransactionId();
+	}
 
 	START_CRIT_SECTION();
 
@@ -399,6 +402,7 @@ fill_seq_with_data(Relation rel, HeapTuple tuple)
 		XLogRegisterBuffer(0, buf, REGBUF_WILL_INIT);
 
 		xlrec.node = rel->rd_node;
+		xlrec.created = create;
 
 		XLogRegisterData((char *) &xlrec, sizeof(xl_seq_rec));
 		XLogRegisterData((char *) tuple->t_data, tuple->t_len);
@@ -502,7 +506,7 @@ AlterSequence(ParseState *pstate, AlterSeqStmt *stmt)
 		/*
 		 * Insert the modified tuple into the new storage file.
 		 */
-		fill_seq_with_data(seqrel, newdatatuple);
+		fill_seq_with_data(seqrel, newdatatuple, true);
 	}
 
 	/* process OWNED BY if given */
@@ -764,9 +768,29 @@ nextval_internal(Oid relid, bool check_permissions)
 	 * It's sufficient to ensure the toplevel transaction has an xid, no need
 	 * to assign xids subxacts, that'll already trigger an appropriate wait.
 	 * (Have to do that here, so we're outside the critical section)
+	 *
+	 * We have to ensure we have a proper XID, which will be included in
+	 * the XLOG record by XLogRecordAssemble. Otherwise the first nextval()
+	 * in a subxact (without any preceding changes) would get XID 0, and it
+	 * would then be impossible to decide which top xact it belongs to.
+	 * It'd also trigger assert in DecodeSequence.
+	 *
+	 * XXX This might seem unnecessary, because if there's no XID the xact
+	 * couldn't have done anything important yet, e.g. it could not have
+	 * created a sequence. But that's incorrect, as it ignores subxacts. The
+	 * current subtransaction might not have done anything yet (thus no XID),
+	 * but an earlier one might have created the sequence.
+	 *
+	 * XXX Not sure if this is the best solution. Maybe do this only with
+	 * wal_level=logical to minimize the overhead. OTOH advancing the
+	 * sequence is likely followed by using the value(s) for some other
+	 * activity, which assigns the XID.
 	 */
 	if (logit && RelationNeedsWAL(seqrel))
+	{
 		GetTopTransactionId();
+		GetCurrentTransactionId();
+	}
 
 	/* ready to change the on-disk (or really, in-buffer) tuple */
 	START_CRIT_SECTION();
@@ -803,6 +827,7 @@ nextval_internal(Oid relid, bool check_permissions)
 		seq->log_cnt = 0;
 
 		xlrec.node = seqrel->rd_node;
+		xlrec.created = false;
 
 		XLogRegisterData((char *) &xlrec, sizeof(xl_seq_rec));
 		XLogRegisterData((char *) seqdatatuple.t_data, seqdatatuple.t_len);
@@ -977,7 +1002,10 @@ do_setval(Oid relid, int64 next, bool iscalled)
 
 	/* check the comment above nextval_internal()'s equivalent call. */
 	if (RelationNeedsWAL(seqrel))
+	{
 		GetTopTransactionId();
+		GetCurrentTransactionId();
+	}
 
 	/* ready to change the on-disk (or really, in-buffer) tuple */
 	START_CRIT_SECTION();
@@ -999,6 +1027,8 @@ do_setval(Oid relid, int64 next, bool iscalled)
 		XLogRegisterBuffer(0, buf, REGBUF_WILL_INIT);
 
 		xlrec.node = seqrel->rd_node;
+		xlrec.created = false;
+
 		XLogRegisterData((char *) &xlrec, sizeof(xl_seq_rec));
 		XLogRegisterData((char *) seqdatatuple.t_data, seqdatatuple.t_len);
 
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index a2b69511b4..b61f2e264a 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -42,6 +42,7 @@
 #include "replication/reorderbuffer.h"
 #include "replication/snapbuild.h"
 #include "storage/standby.h"
+#include "commands/sequence.h"
 
 typedef struct XLogRecordBuffer
 {
@@ -74,10 +75,11 @@ static void DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
 						bool two_phase);
 static void DecodePrepare(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
 						  xl_xact_parsed_prepare *parsed);
-
+static void DecodeSequence(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
 
 /* common function to decode tuples */
 static void DecodeXLogTuple(char *data, Size len, ReorderBufferTupleBuf *tup);
+static void DecodeSeqTuple(char *data, Size len, ReorderBufferTupleBuf *tuple);
 
 /* helper functions for decoding transactions */
 static inline bool FilterPrepare(LogicalDecodingContext *ctx,
@@ -158,6 +160,10 @@ LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *recor
 			DecodeLogicalMsgOp(ctx, &buf);
 			break;
 
+		case RM_SEQ_ID:
+			DecodeSequence(ctx, &buf);
+			break;
+
 			/*
 			 * Rmgrs irrelevant for logical decoding; they describe stuff not
 			 * represented in logical decoding. Add new rmgrs in rmgrlist.h's
@@ -173,7 +179,6 @@ LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *recor
 		case RM_HASH_ID:
 		case RM_GIN_ID:
 		case RM_GIST_ID:
-		case RM_SEQ_ID:
 		case RM_SPGIST_ID:
 		case RM_BRIN_ID:
 		case RM_COMMIT_TS_ID:
@@ -1313,3 +1318,125 @@ DecodeTXNNeedSkip(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
 			(txn_dbid != InvalidOid && txn_dbid != ctx->slot->data.database) ||
 			ctx->fast_forward || FilterByOrigin(ctx, origin_id));
 }
+
+/*
+ * Decode Sequence Tuple
+ */
+static void
+DecodeSeqTuple(char *data, Size len, ReorderBufferTupleBuf *tuple)
+{
+	int			datalen = len - sizeof(xl_seq_rec) - SizeofHeapTupleHeader;
+
+	Assert(datalen >= 0);
+
+	tuple->tuple.t_len = datalen + SizeofHeapTupleHeader;;
+
+	ItemPointerSetInvalid(&tuple->tuple.t_self);
+
+	tuple->tuple.t_tableOid = InvalidOid;
+
+	memcpy(((char *) tuple->tuple.t_data),
+		   data + sizeof(xl_seq_rec),
+		   SizeofHeapTupleHeader);
+
+	memcpy(((char *) tuple->tuple.t_data) + SizeofHeapTupleHeader,
+		   data + sizeof(xl_seq_rec) + SizeofHeapTupleHeader,
+		   datalen);
+}
+
+/*
+ * Handle sequence decode
+ *
+ * Decoding sequences is a bit tricky, because while most sequence actions
+ * are non-transactional (not subject to rollback), some need to be handled
+ * as transactional.
+ *
+ * By default, a sequence increment is non-transactional - we must not queue
+ * it in a transaction as other changes, because the transaction might get
+ * rolled back and we'd discard the increment. The downstream would not be
+ * notified about the increment, which is wrong.
+ *
+ * On the other hand, the sequence may be created in a transaction. In this
+ * case we *should* queue the change as other changes in the transaction,
+ * because we don't want to send the increments for unknown sequence to the
+ * plugin - it might get confused about which sequence it's related to etc.
+ */
+static void
+DecodeSequence(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
+{
+	SnapBuild  *builder = ctx->snapshot_builder;
+	ReorderBufferTupleBuf *tuplebuf;
+	RelFileNode target_node;
+	XLogReaderState *r = buf->record;
+	char	   *tupledata = NULL;
+	Size		tuplelen;
+	Size		datalen = 0;
+	TransactionId xid = XLogRecGetXid(r);
+	uint8		info = XLogRecGetInfo(buf->record) & ~XLR_INFO_MASK;
+	xl_seq_rec *xlrec;
+	Snapshot	snapshot;
+	RepOriginId origin_id = XLogRecGetOrigin(r);
+	bool		transactional;
+
+	/* only decode changes flagged with XLOG_SEQ_LOG */
+	if (info != XLOG_SEQ_LOG)
+		elog(ERROR, "unexpected RM_SEQ_ID record type: %u", info);
+
+	/*
+	 * If we don't have snapshot or we are just fast-forwarding, there is no
+	 * point in decoding messages.
+	 */
+	if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT ||
+		ctx->fast_forward)
+		return;
+
+	/* only interested in our database */
+	XLogRecGetBlockTag(r, 0, &target_node, NULL, NULL);
+	if (target_node.dbNode != ctx->slot->data.database)
+		return;
+
+	/* output plugin doesn't look for this origin, no need to queue */
+	if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
+		return;
+
+	tupledata = XLogRecGetData(r);
+	datalen = XLogRecGetDataLen(r);
+	tuplelen = datalen - SizeOfHeapHeader - sizeof(xl_seq_rec);
+
+	/* extract the WAL record, with "created" flag */
+	xlrec = (xl_seq_rec *) XLogRecGetData(r);
+
+	/* XXX how could we have sequence change without data? */
+	if(!datalen || !tupledata)
+		return;
+
+	tuplebuf = ReorderBufferGetTupleBuf(ctx->reorder, tuplelen);
+	DecodeSeqTuple(tupledata, datalen, tuplebuf);
+
+	/*
+	 * Should we handle the sequence increment as transactional or not?
+	 *
+	 * If the sequence was created in a still-running transaction, treat
+	 * it as transactional and queue the increments. Otherwise it needs
+	 * to be treated as non-transactional, in which case we send it to
+	 * the plugin right away.
+	 */
+	transactional = ReorderBufferSequenceIsTransactional(ctx->reorder,
+														 target_node,
+														 xlrec->created);
+
+	/* Skip the change if already processed (per the snapshot). */
+	if (transactional &&
+		!SnapBuildProcessChange(builder, xid, buf->origptr))
+		return;
+	else if (!transactional &&
+			 (SnapBuildCurrentState(builder) != SNAPBUILD_CONSISTENT ||
+			  SnapBuildXactNeedsSkip(builder, buf->origptr)))
+		return;
+
+	/* Queue the increment (or send immediately if not transactional). */
+	snapshot = SnapBuildGetOrBuildSnapshot(builder, xid);
+	ReorderBufferQueueSequence(ctx->reorder, xid, snapshot, buf->endptr,
+							   origin_id, target_node, transactional,
+							   xlrec->created, tuplebuf);
+}
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 10cbdea124..582e8c2f7d 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -73,6 +73,10 @@ static void truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 static void message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 							   XLogRecPtr message_lsn, bool transactional,
 							   const char *prefix, Size message_size, const char *message);
+static void sequence_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+								XLogRecPtr sequence_lsn, Relation rel,
+								bool transactional, bool created,
+								int64 last_value, int64 log_cnt, bool is_called);
 
 /* streaming callbacks */
 static void stream_start_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
@@ -90,6 +94,10 @@ static void stream_change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn
 static void stream_message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 									  XLogRecPtr message_lsn, bool transactional,
 									  const char *prefix, Size message_size, const char *message);
+static void stream_sequence_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+									   XLogRecPtr sequence_lsn, Relation rel,
+									   bool transactional, bool created,
+									   int64 last_value, int64 log_cnt, bool is_called);
 static void stream_truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 									   int nrelations, Relation relations[], ReorderBufferChange *change);
 
@@ -218,6 +226,7 @@ StartupDecodingContext(List *output_plugin_options,
 	ctx->reorder->apply_truncate = truncate_cb_wrapper;
 	ctx->reorder->commit = commit_cb_wrapper;
 	ctx->reorder->message = message_cb_wrapper;
+	ctx->reorder->sequence = sequence_cb_wrapper;
 
 	/*
 	 * To support streaming, we require start/stop/abort/commit/change
@@ -234,6 +243,7 @@ StartupDecodingContext(List *output_plugin_options,
 		(ctx->callbacks.stream_commit_cb != NULL) ||
 		(ctx->callbacks.stream_change_cb != NULL) ||
 		(ctx->callbacks.stream_message_cb != NULL) ||
+		(ctx->callbacks.stream_sequence_cb != NULL) ||
 		(ctx->callbacks.stream_truncate_cb != NULL);
 
 	/*
@@ -251,6 +261,7 @@ StartupDecodingContext(List *output_plugin_options,
 	ctx->reorder->stream_commit = stream_commit_cb_wrapper;
 	ctx->reorder->stream_change = stream_change_cb_wrapper;
 	ctx->reorder->stream_message = stream_message_cb_wrapper;
+	ctx->reorder->stream_sequence = stream_sequence_cb_wrapper;
 	ctx->reorder->stream_truncate = stream_truncate_cb_wrapper;
 
 
@@ -1203,6 +1214,43 @@ message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 	error_context_stack = errcallback.previous;
 }
 
+static void
+sequence_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+					XLogRecPtr sequence_lsn, Relation rel,
+					bool transactional, bool created,
+					int64 last_value, int64 log_cnt, bool is_called)
+{
+	LogicalDecodingContext *ctx = cache->private_data;
+	LogicalErrorCallbackState state;
+	ErrorContextCallback errcallback;
+
+	Assert(!ctx->fast_forward);
+
+	if (ctx->callbacks.sequence_cb == NULL)
+		return;
+
+	/* Push callback + info on the error context stack */
+	state.ctx = ctx;
+	state.callback_name = "sequence";
+	state.report_location = sequence_lsn;
+	errcallback.callback = output_plugin_error_callback;
+	errcallback.arg = (void *) &state;
+	errcallback.previous = error_context_stack;
+	error_context_stack = &errcallback;
+
+	/* set output state */
+	ctx->accept_writes = true;
+	ctx->write_xid = txn != NULL ? txn->xid : InvalidTransactionId;
+	ctx->write_location = sequence_lsn;
+
+	/* do the actual work: call callback */
+	ctx->callbacks.sequence_cb(ctx, txn, sequence_lsn, rel, transactional,
+							   created, last_value, log_cnt, is_called);
+
+	/* Pop the error context stack */
+	error_context_stack = errcallback.previous;
+}
+
 static void
 stream_start_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 						XLogRecPtr first_lsn)
@@ -1508,6 +1556,47 @@ stream_message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 	error_context_stack = errcallback.previous;
 }
 
+static void
+stream_sequence_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+						   XLogRecPtr sequence_lsn, Relation rel,
+						   bool transactional, bool created,
+						   int64 last_value, int64 log_cnt, bool is_called)
+{
+	LogicalDecodingContext *ctx = cache->private_data;
+	LogicalErrorCallbackState state;
+	ErrorContextCallback errcallback;
+
+	Assert(!ctx->fast_forward);
+
+	/* We're only supposed to call this when streaming is supported. */
+	Assert(ctx->streaming);
+
+	/* this callback is optional */
+	if (ctx->callbacks.stream_sequence_cb == NULL)
+		return;
+
+	/* Push callback + info on the error context stack */
+	state.ctx = ctx;
+	state.callback_name = "stream_sequence";
+	state.report_location = sequence_lsn;
+	errcallback.callback = output_plugin_error_callback;
+	errcallback.arg = (void *) &state;
+	errcallback.previous = error_context_stack;
+	error_context_stack = &errcallback;
+
+	/* set output state */
+	ctx->accept_writes = true;
+	ctx->write_xid = txn != NULL ? txn->xid : InvalidTransactionId;
+	ctx->write_location = sequence_lsn;
+
+	/* do the actual work: call callback */
+	ctx->callbacks.sequence_cb(ctx, txn, sequence_lsn, rel, transactional,
+							   created, last_value, log_cnt, is_called);
+
+	/* Pop the error context stack */
+	error_context_stack = errcallback.previous;
+}
+
 static void
 stream_truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 						   int nrelations, Relation relations[],
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 46e66608cf..b4fa16bebe 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -77,6 +77,35 @@
  *	  a bit more memory to the oldest subtransactions, because it's likely
  *	  they are the source for the next sequence of changes.
  *
+ *	  When decoding sequences, we differentiate between a sequences created
+ *	  in a (running) transaction, and sequences created in other (already
+ *	  committed) transactions. Changes for sequences created in the same
+ *	  top-level transaction are treated as "transactional" i.e. just like
+ *	  any other change from that transaction (and discarded in case of a
+ *	  rollback). Changes for sequences created earlier are treated as not
+ *	  transactional - are processed immediately, as if performed outside
+ *	  any transaction (and thus not rolled back).
+ *
+ *	  This mixed behavior is necessary - sequences are non-transactional
+ *	  (e.g. ROLLBACK does not undo the sequence increments). But for new
+ *	  sequences, we need to handle them in a transactional way, because if
+ *	  we ever get some DDL support, the sequence won't exist until the
+ *	  transaction gets applied. So we need to ensure the increments don't
+ *	  happen until the sequence gets created.
+ *
+ *	  To differentiate which sequences are "old" and which were created
+ *	  in a still-running transaction, we track sequences created in running
+ *	  transactions in a hash table. Each sequence is identified by it's
+ *	  relfilenode, and at transaction commit we discard all sequences
+ *	  created in that particular transaction. For each sequence we track
+ *	  the XID of the (sub)transaction that created it, and we cleanup the
+ *	  sequences for each subtransaction when it completes (commit/rollback).
+ *
+ *	  We don't use the XID to check if it's the same top-level transaction.
+ *	  It's enough to know it was created in an in-progress transaction,
+ *	  and we know it must be the current one because otherwise it wouldn't
+ *	  see the sequence object.
+ *
  * -------------------------------------------------------------------------
  */
 #include "postgres.h"
@@ -91,6 +120,7 @@
 #include "access/xact.h"
 #include "access/xlog_internal.h"
 #include "catalog/catalog.h"
+#include "commands/sequence.h"
 #include "lib/binaryheap.h"
 #include "miscadmin.h"
 #include "pgstat.h"
@@ -116,6 +146,13 @@ typedef struct ReorderBufferTXNByIdEnt
 	ReorderBufferTXN *txn;
 } ReorderBufferTXNByIdEnt;
 
+/* entry for hash table we use to track sequences created in running xacts */
+typedef struct ReorderBufferSequenceEnt
+{
+	RelFileNode		rnode;
+	TransactionId	xid;
+} ReorderBufferSequenceEnt;
+
 /* data structures for (relfilenode, ctid) => (cmin, cmax) mapping */
 typedef struct ReorderBufferTupleCidKey
 {
@@ -339,6 +376,14 @@ ReorderBufferAllocate(void)
 	buffer->by_txn = hash_create("ReorderBufferByXid", 1000, &hash_ctl,
 								 HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
 
+	/* hash table of sequences, mapping relfilenode to XID of transaction */
+	hash_ctl.keysize = sizeof(RelFileNode);
+	hash_ctl.entrysize = sizeof(ReorderBufferSequenceEnt);
+	hash_ctl.hcxt = buffer->context;
+
+	buffer->sequences = hash_create("ReorderBufferSequenceHash", 1000, &hash_ctl,
+								 HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
+
 	buffer->by_txn_last_xid = InvalidTransactionId;
 	buffer->by_txn_last_txn = NULL;
 
@@ -525,6 +570,13 @@ ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change,
 				change->data.truncate.relids = NULL;
 			}
 			break;
+		case REORDER_BUFFER_CHANGE_SEQUENCE:
+			if (change->data.sequence.tuple)
+			{
+				ReorderBufferReturnTupleBuf(rb, change->data.sequence.tuple);
+				change->data.sequence.tuple = NULL;
+			}
+			break;
 		case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM:
 		case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT:
 		case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID:
@@ -859,6 +911,231 @@ ReorderBufferQueueMessage(ReorderBuffer *rb, TransactionId xid,
 	}
 }
 
+/*
+ * Treat the sequence increment as transactional?
+ *
+ * The hash table tracks all sequences created in in-progress transactions,
+ * so we simply do a lookup (the sequence is identified by relfilende). If
+ * we find a match, the increment should be handled as transactional.
+ */
+bool
+ReorderBufferSequenceIsTransactional(ReorderBuffer *rb,
+									 RelFileNode rnode, bool created)
+{
+	bool	found = false;
+
+	if (created)
+		return true;
+
+	hash_search(rb->sequences,
+				(void *) &rnode,
+				HASH_FIND,
+				&found);
+
+	return found;
+}
+
+/*
+ * Cleanup sequences created in in-progress transactions.
+ *
+ * There's no way to search by XID, so we simply do a seqscan of all
+ * the entries in the hash table. Hopefully there are only a couple
+ * entries in most cases - people generally don't create many new
+ * sequences over and over.
+ */
+static void
+ReorderBufferSequenceCleanup(ReorderBuffer *rb, TransactionId xid)
+{
+	HASH_SEQ_STATUS scan_status;
+	ReorderBufferSequenceEnt *ent;
+
+	hash_seq_init(&scan_status, rb->sequences);
+	while ((ent = (ReorderBufferSequenceEnt *) hash_seq_search(&scan_status)) != NULL)
+	{
+		/* skip sequences not from this transaction */
+		if (ent->xid != xid)
+			continue;
+
+		(void) hash_search(rb->sequences,
+					   (void *) &(ent->rnode),
+					   HASH_REMOVE, NULL);
+	}
+}
+
+/*
+ * A transactional sequence increment is queued to be processed upon commit
+ * and a non-transactional increment gets processed immediately.
+ *
+ * A sequence update may be both transactional and non-transactional. When
+ * created in a running transaction, treat it as transactional and queue
+ * the change in it. Otherwise treat it as non-transactional, so that we
+ * don't forget the increment in case of a rollback.
+ */
+void
+ReorderBufferQueueSequence(ReorderBuffer *rb, TransactionId xid,
+						   Snapshot snapshot, XLogRecPtr lsn, RepOriginId origin_id,
+						   RelFileNode rnode, bool transactional, bool created,
+						   ReorderBufferTupleBuf *tuplebuf)
+{
+	/*
+	 * Change needs to be handled as transactional, because the sequence was
+	 * created in a transaction that is still running. In that case all the
+	 * changes need to be queued in that transaction, we must not send them
+	 * to the downstream until the transaction commits.
+	 *
+	 * There's a bit of a trouble with subtransactions - we can't queue it
+	 * into the subxact, because it might be rolled back and we'd lose the
+	 * increment. We need to queue it into the same (sub)xact that created
+	 * the sequence, which is why we track the XID in the hash table.
+	 */
+	if (transactional)
+	{
+		MemoryContext oldcontext;
+		ReorderBufferChange *change;
+
+		/* lookup sequence by relfilenode */
+		ReorderBufferSequenceEnt   *ent;
+		bool						found;
+
+		/* transactional changes require a transaction */
+		Assert(xid != InvalidTransactionId);
+
+		/* search the lookup table (we ignore the return value, found is enough) */
+		ent = hash_search(rb->sequences,
+						  (void *) &rnode,
+						  created ? HASH_ENTER : HASH_FIND,
+						  &found);
+
+		/*
+		 * If this is the "create" increment, we must not have found any
+		 * pre-existing entry in the hash table (i.e. there must not be
+		 * any conflicting sequence).
+		 */
+		Assert(!(created && found));
+
+		/* But we must have either created or found an existing entry. */
+		Assert(created || found);
+
+		/*
+		 * When creating the sequence, remember the XID of the transaction
+		 * that created id.
+		 */
+		if (created)
+			ent->xid = xid;
+
+		/* XXX Maybe check that we're still in the same top-level xact? */
+
+		/* OK, allocate and queue the change */
+		oldcontext = MemoryContextSwitchTo(rb->context);
+
+		change = ReorderBufferGetChange(rb);
+
+		change->action = REORDER_BUFFER_CHANGE_SEQUENCE;
+		change->origin_id = origin_id;
+
+		memcpy(&change->data.sequence.relnode, &rnode, sizeof(RelFileNode));
+
+		change->data.sequence.created = created;
+		change->data.sequence.tuple = tuplebuf;
+
+		/* add it to the same subxact that created the sequence */
+		ReorderBufferQueueChange(rb, ent->xid, lsn, change, false);
+
+		MemoryContextSwitchTo(oldcontext);
+	}
+	else
+	{
+		/*
+		 * This increment is for a sequence that was not created in any
+		 * running transaction, so we treat it as non-transactional and
+		 * just send it to the output plugin directly.
+		 */
+		ReorderBufferTXN *txn = NULL;
+		volatile Snapshot snapshot_now = snapshot;
+		bool	using_subtxn;
+
+#ifdef USE_ASSERT_CHECKING
+		/* All "creates" have to be handled as transactional. */
+		Assert(!created);
+
+		/* Make sure the sequence is not in the hash table. */
+		{
+			bool	found;
+			hash_search(rb->sequences,
+						(void *) &rnode,
+						HASH_FIND, &found);
+			Assert(!found);
+		}
+#endif
+
+		if (xid != InvalidTransactionId)
+			txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
+
+		/* setup snapshot to allow catalog access */
+		SetupHistoricSnapshot(snapshot_now, NULL);
+
+		/*
+		 * Decoding needs access to syscaches et al., which in turn use
+		 * heavyweight locks and such. Thus we need to have enough state around to
+		 * keep track of those.  The easiest way is to simply use a transaction
+		 * internally.  That also allows us to easily enforce that nothing writes
+		 * to the database by checking for xid assignments.
+		 *
+		 * When we're called via the SQL SRF there's already a transaction
+		 * started, so start an explicit subtransaction there.
+		 */
+		using_subtxn = IsTransactionOrTransactionBlock();
+
+		PG_TRY();
+		{
+			Relation	relation;
+			HeapTuple	tuple;
+			Form_pg_sequence_data seq;
+			Oid			reloid;
+
+			if (using_subtxn)
+				BeginInternalSubTransaction("sequence");
+			else
+				StartTransactionCommand();
+
+			reloid = RelidByRelfilenode(rnode.spcNode, rnode.relNode);
+
+			if (reloid == InvalidOid)
+				elog(ERROR, "could not map filenode \"%s\" to relation OID",
+					 relpathperm(rnode,
+								 MAIN_FORKNUM));
+
+			relation = RelationIdGetRelation(reloid);
+			tuple = &tuplebuf->tuple;
+			seq = (Form_pg_sequence_data) GETSTRUCT(tuple);
+
+			rb->sequence(rb, txn, lsn, relation, transactional, created,
+						 seq->last_value, seq->log_cnt, seq->is_called);
+
+			RelationClose(relation);
+
+			TeardownHistoricSnapshot(false);
+
+			AbortCurrentTransaction();
+
+			if (using_subtxn)
+				RollbackAndReleaseCurrentSubTransaction();
+		}
+		PG_CATCH();
+		{
+			TeardownHistoricSnapshot(true);
+
+			AbortCurrentTransaction();
+
+			if (using_subtxn)
+				RollbackAndReleaseCurrentSubTransaction();
+
+			PG_RE_THROW();
+		}
+		PG_END_TRY();
+	}
+}
+
 /*
  * AssertTXNLsnOrder
  *		Verify LSN ordering of transaction lists in the reorderbuffer
@@ -1535,6 +1812,9 @@ ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
 				&found);
 	Assert(found);
 
+	/* Remove sequences created in this transaction (if any). */
+	ReorderBufferSequenceCleanup(rb, txn->xid);
+
 	/* remove entries spilled to disk */
 	if (rbtxn_is_serialized(txn))
 		ReorderBufferRestoreCleanup(rb, txn);
@@ -1950,6 +2230,31 @@ ReorderBufferApplyMessage(ReorderBuffer *rb, ReorderBufferTXN *txn,
 					change->data.msg.message);
 }
 
+/*
+ * Helper function for ReorderBufferProcessTXN for applying sequences.
+ */
+static inline void
+ReorderBufferApplySequence(ReorderBuffer *rb, ReorderBufferTXN *txn,
+						   Relation relation, ReorderBufferChange *change,
+						   bool streaming)
+{
+	HeapTuple	tuple;
+	Form_pg_sequence_data seq;
+
+	tuple = &change->data.sequence.tuple->tuple;
+	seq = (Form_pg_sequence_data) GETSTRUCT(tuple);
+
+	/* Only ever called from ReorderBufferApplySequence, so transational. */
+	if (streaming)
+		rb->stream_sequence(rb, txn, change->lsn, relation, true,
+							change->data.sequence.created,
+							seq->last_value, seq->log_cnt, seq->is_called);
+	else
+		rb->sequence(rb, txn, change->lsn, relation, true,
+					 change->data.sequence.created,
+					 seq->last_value, seq->log_cnt, seq->is_called);
+}
+
 /*
  * Function to store the command id and snapshot at the end of the current
  * stream so that we can reuse the same while sending the next stream.
@@ -2392,6 +2697,31 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
 				case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID:
 					elog(ERROR, "tuplecid value in changequeue");
 					break;
+
+				case REORDER_BUFFER_CHANGE_SEQUENCE:
+					Assert(snapshot_now);
+
+					reloid = RelidByRelfilenode(change->data.sequence.relnode.spcNode,
+												change->data.sequence.relnode.relNode);
+
+					if (reloid == InvalidOid)
+						elog(ERROR, "could not map filenode \"%s\" to relation OID",
+							 relpathperm(change->data.sequence.relnode,
+										 MAIN_FORKNUM));
+
+					relation = RelationIdGetRelation(reloid);
+
+					if (!RelationIsValid(relation))
+						elog(ERROR, "could not open relation with OID %u (for filenode \"%s\")",
+							 reloid,
+							 relpathperm(change->data.sequence.relnode,
+										 MAIN_FORKNUM));
+
+					if (RelationIsLogicallyLogged(relation))
+						ReorderBufferApplySequence(rb, txn, relation, change, streaming);
+
+					RelationClose(relation);
+					break;
 			}
 		}
 
@@ -3776,6 +4106,39 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
 				memcpy(data, change->data.truncate.relids, size);
 				data += size;
 
+				break;
+			}
+		case REORDER_BUFFER_CHANGE_SEQUENCE:
+			{
+				char	   *data;
+				ReorderBufferTupleBuf *tup;
+				Size		len = 0;
+
+				tup = change->data.sequence.tuple;
+
+				if (tup)
+				{
+					sz += sizeof(HeapTupleData);
+					len = tup->tuple.t_len;
+					sz += len;
+				}
+
+				/* make sure we have enough space */
+				ReorderBufferSerializeReserve(rb, sz);
+
+				data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
+				/* might have been reallocated above */
+				ondisk = (ReorderBufferDiskChange *) rb->outbuf;
+
+				if (len)
+				{
+					memcpy(data, &tup->tuple, sizeof(HeapTupleData));
+					data += sizeof(HeapTupleData);
+
+					memcpy(data, tup->tuple.t_data, len);
+					data += len;
+				}
+
 				break;
 			}
 		case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM:
@@ -4040,6 +4403,22 @@ ReorderBufferChangeSize(ReorderBufferChange *change)
 			{
 				sz += sizeof(Oid) * change->data.truncate.nrelids;
 
+				break;
+			}
+		case REORDER_BUFFER_CHANGE_SEQUENCE:
+			{
+				ReorderBufferTupleBuf *tup;
+				Size		len = 0;
+
+				tup = change->data.sequence.tuple;
+
+				if (tup)
+				{
+					sz += sizeof(HeapTupleData);
+					len = tup->tuple.t_len;
+					sz += len;
+				}
+
 				break;
 			}
 		case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM:
@@ -4341,6 +4720,30 @@ ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
 
 				break;
 			}
+
+		case REORDER_BUFFER_CHANGE_SEQUENCE:
+			if (change->data.sequence.tuple)
+			{
+				uint32		tuplelen = ((HeapTuple) data)->t_len;
+
+				change->data.sequence.tuple =
+					ReorderBufferGetTupleBuf(rb, tuplelen - SizeofHeapTupleHeader);
+
+				/* restore ->tuple */
+				memcpy(&change->data.sequence.tuple->tuple, data,
+					   sizeof(HeapTupleData));
+				data += sizeof(HeapTupleData);
+
+				/* reset t_data pointer into the new tuplebuf */
+				change->data.sequence.tuple->tuple.t_data =
+					ReorderBufferTupleBufData(change->data.sequence.tuple);
+
+				/* restore tuple data itself */
+				memcpy(change->data.sequence.tuple->tuple.t_data, data, tuplelen);
+				data += tuplelen;
+			}
+			break;
+
 		case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM:
 		case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT:
 		case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID:
diff --git a/src/include/commands/sequence.h b/src/include/commands/sequence.h
index 40544dd4c7..5919fb90ee 100644
--- a/src/include/commands/sequence.h
+++ b/src/include/commands/sequence.h
@@ -48,6 +48,7 @@ typedef FormData_pg_sequence_data *Form_pg_sequence_data;
 typedef struct xl_seq_rec
 {
 	RelFileNode node;
+	bool		created;	/* is this a CREATE SEQUENCE */
 	/* SEQUENCE TUPLE DATA FOLLOWS AT THE END */
 } xl_seq_rec;
 
diff --git a/src/include/replication/output_plugin.h b/src/include/replication/output_plugin.h
index 810495ed0e..57bc13f11c 100644
--- a/src/include/replication/output_plugin.h
+++ b/src/include/replication/output_plugin.h
@@ -88,6 +88,19 @@ typedef void (*LogicalDecodeMessageCB) (struct LogicalDecodingContext *ctx,
 										Size message_size,
 										const char *message);
 
+/*
+ * Called for the generic logical decoding sequences.
+ */
+typedef void (*LogicalDecodeSequenceCB) (struct LogicalDecodingContext *ctx,
+										 ReorderBufferTXN *txn,
+										 XLogRecPtr sequence_lsn,
+										 Relation rel,
+										 bool transactional,
+										 bool created,
+										 int64 last_value,
+										 int64 log_cnt,
+										 bool is_called);
+
 /*
  * Filter changes by origin.
  */
@@ -199,6 +212,20 @@ typedef void (*LogicalDecodeStreamMessageCB) (struct LogicalDecodingContext *ctx
 											  Size message_size,
 											  const char *message);
 
+/*
+ * Called for the streaming generic logical decoding sequences from in-progress
+ * transactions.
+ */
+typedef void (*LogicalDecodeStreamSequenceCB) (struct LogicalDecodingContext *ctx,
+											   ReorderBufferTXN *txn,
+											   XLogRecPtr sequence_lsn,
+											   Relation rel,
+											   bool transactional,
+											   bool created,
+											   int64 last_value,
+											   int64 log_cnt,
+											   bool is_called);
+
 /*
  * Callback for streaming truncates from in-progress transactions.
  */
@@ -219,6 +246,7 @@ typedef struct OutputPluginCallbacks
 	LogicalDecodeTruncateCB truncate_cb;
 	LogicalDecodeCommitCB commit_cb;
 	LogicalDecodeMessageCB message_cb;
+	LogicalDecodeSequenceCB sequence_cb;
 	LogicalDecodeFilterByOriginCB filter_by_origin_cb;
 	LogicalDecodeShutdownCB shutdown_cb;
 
@@ -237,6 +265,7 @@ typedef struct OutputPluginCallbacks
 	LogicalDecodeStreamCommitCB stream_commit_cb;
 	LogicalDecodeStreamChangeCB stream_change_cb;
 	LogicalDecodeStreamMessageCB stream_message_cb;
+	LogicalDecodeStreamSequenceCB stream_sequence_cb;
 	LogicalDecodeStreamTruncateCB stream_truncate_cb;
 } OutputPluginCallbacks;
 
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index 5b40ff75f7..63e6ed037b 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -64,7 +64,8 @@ enum ReorderBufferChangeType
 	REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT,
 	REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM,
 	REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT,
-	REORDER_BUFFER_CHANGE_TRUNCATE
+	REORDER_BUFFER_CHANGE_TRUNCATE,
+	REORDER_BUFFER_CHANGE_SEQUENCE
 };
 
 /* forward declaration */
@@ -158,6 +159,14 @@ typedef struct ReorderBufferChange
 			uint32		ninvalidations; /* Number of messages */
 			SharedInvalidationMessage *invalidations;	/* invalidation message */
 		}			inval;
+
+		/* Context data for Sequence changes */
+		struct
+		{
+			RelFileNode relnode;
+			bool		created;
+			ReorderBufferTupleBuf *tuple;
+		}			sequence;
 	}			data;
 
 	/*
@@ -430,6 +439,15 @@ typedef void (*ReorderBufferMessageCB) (ReorderBuffer *rb,
 										const char *prefix, Size sz,
 										const char *message);
 
+/* sequence callback signature */
+typedef void (*ReorderBufferSequenceCB) (ReorderBuffer *rb,
+										 ReorderBufferTXN *txn,
+										 XLogRecPtr sequence_lsn,
+										 Relation rel,
+										 bool transactional, bool created,
+										 int64 last_value, int64 log_cnt,
+										 bool is_called);
+
 /* begin prepare callback signature */
 typedef void (*ReorderBufferBeginPrepareCB) (ReorderBuffer *rb,
 											 ReorderBufferTXN *txn);
@@ -496,6 +514,15 @@ typedef void (*ReorderBufferStreamMessageCB) (
 											  const char *prefix, Size sz,
 											  const char *message);
 
+/* stream sequence callback signature */
+typedef void (*ReorderBufferStreamSequenceCB) (ReorderBuffer *rb,
+											   ReorderBufferTXN *txn,
+											   XLogRecPtr sequence_lsn,
+											   Relation rel,
+											   bool transactional, bool created,
+											   int64 last_value, int64 log_cnt,
+											   bool is_called);
+
 /* stream truncate callback signature */
 typedef void (*ReorderBufferStreamTruncateCB) (
 											   ReorderBuffer *rb,
@@ -511,6 +538,12 @@ struct ReorderBuffer
 	 */
 	HTAB	   *by_txn;
 
+	/*
+	 * relfilenode => XID lookup table for sequences created in a transaction
+	 * (also includes altered sequences, which assigns new relfilenode)
+	 */
+	HTAB	   *sequences;
+
 	/*
 	 * Transactions that could be a toplevel xact, ordered by LSN of the first
 	 * record bearing that xid.
@@ -541,6 +574,7 @@ struct ReorderBuffer
 	ReorderBufferApplyTruncateCB apply_truncate;
 	ReorderBufferCommitCB commit;
 	ReorderBufferMessageCB message;
+	ReorderBufferSequenceCB sequence;
 
 	/*
 	 * Callbacks to be called when streaming a transaction at prepare time.
@@ -560,6 +594,7 @@ struct ReorderBuffer
 	ReorderBufferStreamCommitCB stream_commit;
 	ReorderBufferStreamChangeCB stream_change;
 	ReorderBufferStreamMessageCB stream_message;
+	ReorderBufferStreamSequenceCB stream_sequence;
 	ReorderBufferStreamTruncateCB stream_truncate;
 
 	/*
@@ -635,6 +670,10 @@ void		ReorderBufferQueueChange(ReorderBuffer *, TransactionId,
 void		ReorderBufferQueueMessage(ReorderBuffer *, TransactionId, Snapshot snapshot, XLogRecPtr lsn,
 									  bool transactional, const char *prefix,
 									  Size message_size, const char *message);
+void		ReorderBufferQueueSequence(ReorderBuffer *rb, TransactionId xid,
+									   Snapshot snapshot, XLogRecPtr lsn, RepOriginId origin_id,
+									   RelFileNode rnode, bool transactional, bool created,
+									   ReorderBufferTupleBuf *tuplebuf);
 void		ReorderBufferCommit(ReorderBuffer *, TransactionId,
 								XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
 								TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn);
@@ -682,4 +721,7 @@ void		ReorderBufferSetRestartPoint(ReorderBuffer *, XLogRecPtr ptr);
 
 void		StartupReorderBuffer(void);
 
+bool		ReorderBufferSequenceIsTransactional(ReorderBuffer *rb,
+												 RelFileNode rnode, bool created);
+
 #endif
-- 
2.31.1

From 936470a0cf0eaa56c90b32b23c25dc317a1d3a55 Mon Sep 17 00:00:00 2001
From: Tomas Vondra <tomas.von...@postgresql.org>
Date: Fri, 24 Sep 2021 00:42:04 +0200
Subject: [PATCH 2/2] Add support for decoding sequences to test_decoding

---
 contrib/test_decoding/Makefile              |   3 +-
 contrib/test_decoding/expected/sequence.out | 327 ++++++++++++++++++++
 contrib/test_decoding/sql/sequence.sql      | 119 +++++++
 contrib/test_decoding/test_decoding.c       |  69 +++++
 4 files changed, 517 insertions(+), 1 deletion(-)
 create mode 100644 contrib/test_decoding/expected/sequence.out
 create mode 100644 contrib/test_decoding/sql/sequence.sql

diff --git a/contrib/test_decoding/Makefile b/contrib/test_decoding/Makefile
index 9a31e0b879..56ddc3abae 100644
--- a/contrib/test_decoding/Makefile
+++ b/contrib/test_decoding/Makefile
@@ -5,7 +5,8 @@ PGFILEDESC = "test_decoding - example of a logical decoding output plugin"
 
 REGRESS = ddl xact rewrite toast permissions decoding_in_xact \
 	decoding_into_rel binary prepared replorigin time messages \
-	spill slot truncate stream stats twophase twophase_stream
+	spill slot truncate stream stats twophase twophase_stream \
+	sequence
 ISOLATION = mxact delayed_startup ondisk_startup concurrent_ddl_dml \
 	oldest_xmin snapshot_transfer subxact_without_top concurrent_stream \
 	twophase_snapshot
diff --git a/contrib/test_decoding/expected/sequence.out b/contrib/test_decoding/expected/sequence.out
new file mode 100644
index 0000000000..d94e185fb9
--- /dev/null
+++ b/contrib/test_decoding/expected/sequence.out
@@ -0,0 +1,327 @@
+-- predictability
+SET synchronous_commit = on;
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
+ ?column? 
+----------
+ init
+(1 row)
+
+CREATE SEQUENCE test_sequence;
+-- test the sequence changes by several nextval() calls
+SELECT nextval('test_sequence');
+ nextval 
+---------
+       1
+(1 row)
+
+SELECT nextval('test_sequence');
+ nextval 
+---------
+       2
+(1 row)
+
+SELECT nextval('test_sequence');
+ nextval 
+---------
+       3
+(1 row)
+
+SELECT nextval('test_sequence');
+ nextval 
+---------
+       4
+(1 row)
+
+-- test the sequence changes by several ALTER commands
+ALTER SEQUENCE test_sequence INCREMENT BY 10;
+SELECT nextval('test_sequence');
+ nextval 
+---------
+      14
+(1 row)
+
+ALTER SEQUENCE test_sequence START WITH 3000;
+ALTER SEQUENCE test_sequence MAXVALUE 10000;
+ALTER SEQUENCE test_sequence RESTART WITH 4000;
+SELECT nextval('test_sequence');
+ nextval 
+---------
+    4000
+(1 row)
+
+-- test the sequence changes by several setval() calls
+SELECT setval('test_sequence', 3500);
+ setval 
+--------
+   3500
+(1 row)
+
+SELECT nextval('test_sequence');
+ nextval 
+---------
+    3510
+(1 row)
+
+SELECT setval('test_sequence', 3500, true);
+ setval 
+--------
+   3500
+(1 row)
+
+SELECT nextval('test_sequence');
+ nextval 
+---------
+    3510
+(1 row)
+
+SELECT setval('test_sequence', 3500, false);
+ setval 
+--------
+   3500
+(1 row)
+
+SELECT nextval('test_sequence');
+ nextval 
+---------
+    3500
+(1 row)
+
+-- show results and drop sequence
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '0', 'skip-sequences', '0');
+                                              data                                              
+------------------------------------------------------------------------------------------------
+ BEGIN
+ sequence public.test_sequence: transactional:1 created:1 last_value:1 log_cnt:0 is_called:0
+ COMMIT
+ sequence public.test_sequence: transactional:0 created:0 last_value:33 log_cnt:0 is_called:1
+ BEGIN
+ sequence public.test_sequence: transactional:1 created:1 last_value:4 log_cnt:0 is_called:1
+ COMMIT
+ sequence public.test_sequence: transactional:0 created:0 last_value:334 log_cnt:0 is_called:1
+ BEGIN
+ sequence public.test_sequence: transactional:1 created:1 last_value:14 log_cnt:32 is_called:1
+ COMMIT
+ BEGIN
+ sequence public.test_sequence: transactional:1 created:1 last_value:14 log_cnt:0 is_called:1
+ COMMIT
+ BEGIN
+ sequence public.test_sequence: transactional:1 created:1 last_value:4000 log_cnt:0 is_called:0
+ COMMIT
+ sequence public.test_sequence: transactional:0 created:0 last_value:4320 log_cnt:0 is_called:1
+ sequence public.test_sequence: transactional:0 created:0 last_value:3500 log_cnt:0 is_called:1
+ sequence public.test_sequence: transactional:0 created:0 last_value:3830 log_cnt:0 is_called:1
+ sequence public.test_sequence: transactional:0 created:0 last_value:3500 log_cnt:0 is_called:1
+ sequence public.test_sequence: transactional:0 created:0 last_value:3830 log_cnt:0 is_called:1
+ sequence public.test_sequence: transactional:0 created:0 last_value:3500 log_cnt:0 is_called:0
+ sequence public.test_sequence: transactional:0 created:0 last_value:3820 log_cnt:0 is_called:1
+(24 rows)
+
+DROP SEQUENCE test_sequence;
+-- rollback on sequence creation and update
+BEGIN;
+CREATE SEQUENCE test_sequence;
+CREATE TABLE test_table (a INT);
+SELECT nextval('test_sequence');
+ nextval 
+---------
+       1
+(1 row)
+
+SELECT nextval('test_sequence');
+ nextval 
+---------
+       2
+(1 row)
+
+SELECT nextval('test_sequence');
+ nextval 
+---------
+       3
+(1 row)
+
+SELECT setval('test_sequence', 3000);
+ setval 
+--------
+   3000
+(1 row)
+
+SELECT nextval('test_sequence');
+ nextval 
+---------
+    3001
+(1 row)
+
+SELECT nextval('test_sequence');
+ nextval 
+---------
+    3002
+(1 row)
+
+SELECT nextval('test_sequence');
+ nextval 
+---------
+    3003
+(1 row)
+
+ALTER SEQUENCE test_sequence RESTART WITH 6000;
+INSERT INTO test_table VALUES( (SELECT nextval('test_sequence')) );
+SELECT nextval('test_sequence');
+ nextval 
+---------
+    6001
+(1 row)
+
+ROLLBACK;
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'skip-sequences', '0');
+ data 
+------
+(0 rows)
+
+-- rollback on table creation with serial column
+BEGIN;
+CREATE TABLE test_table (a SERIAL, b INT);
+INSERT INTO test_table (b) VALUES (100);
+INSERT INTO test_table (b) VALUES (200);
+INSERT INTO test_table (b) VALUES (300);
+SELECT setval('test_table_a_seq', 3000);
+ setval 
+--------
+   3000
+(1 row)
+
+INSERT INTO test_table (b) VALUES (400);
+INSERT INTO test_table (b) VALUES (500);
+INSERT INTO test_table (b) VALUES (600);
+ALTER SEQUENCE test_table_a_seq RESTART WITH 6000;
+INSERT INTO test_table (b) VALUES (700);
+INSERT INTO test_table (b) VALUES (800);
+INSERT INTO test_table (b) VALUES (900);
+ROLLBACK;
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'skip-sequences', '0');
+ data 
+------
+(0 rows)
+
+-- rollback on table with serial column
+CREATE TABLE test_table (a SERIAL, b INT);
+BEGIN;
+INSERT INTO test_table (b) VALUES (100);
+INSERT INTO test_table (b) VALUES (200);
+INSERT INTO test_table (b) VALUES (300);
+SELECT setval('test_table_a_seq', 3000);
+ setval 
+--------
+   3000
+(1 row)
+
+INSERT INTO test_table (b) VALUES (400);
+INSERT INTO test_table (b) VALUES (500);
+INSERT INTO test_table (b) VALUES (600);
+ALTER SEQUENCE test_table_a_seq RESTART WITH 6000;
+INSERT INTO test_table (b) VALUES (700);
+INSERT INTO test_table (b) VALUES (800);
+INSERT INTO test_table (b) VALUES (900);
+ROLLBACK;
+-- check table and sequence values after rollback
+SELECT * from test_table_a_seq;
+ last_value | log_cnt | is_called 
+------------+---------+-----------
+       3003 |      30 | t
+(1 row)
+
+SELECT nextval('test_table_a_seq');
+ nextval 
+---------
+    3004
+(1 row)
+
+DROP TABLE test_table;
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '0', 'skip-sequences', '0');
+                                               data                                                
+---------------------------------------------------------------------------------------------------
+ BEGIN
+ sequence public.test_table_a_seq: transactional:1 created:1 last_value:1 log_cnt:0 is_called:0
+ COMMIT
+ sequence public.test_table_a_seq: transactional:0 created:0 last_value:33 log_cnt:0 is_called:1
+ sequence public.test_table_a_seq: transactional:0 created:0 last_value:3000 log_cnt:0 is_called:1
+ sequence public.test_table_a_seq: transactional:0 created:0 last_value:3033 log_cnt:0 is_called:1
+ BEGIN
+ COMMIT
+(8 rows)
+
+-- savepoint test on table with serial column
+BEGIN;
+CREATE TABLE test_table (a SERIAL, b INT);
+INSERT INTO test_table (b) VALUES (100);
+INSERT INTO test_table (b) VALUES (200);
+SAVEPOINT a;
+INSERT INTO test_table (b) VALUES (300);
+ROLLBACK TO SAVEPOINT a;
+DROP TABLE test_table;
+COMMIT;
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '0', 'skip-sequences', '0');
+                                              data                                               
+-------------------------------------------------------------------------------------------------
+ BEGIN
+ sequence public.test_table_a_seq: transactional:1 created:1 last_value:1 log_cnt:0 is_called:0
+ sequence public.test_table_a_seq: transactional:1 created:0 last_value:33 log_cnt:0 is_called:1
+ table public.test_table: INSERT: a[integer]:1 b[integer]:100
+ table public.test_table: INSERT: a[integer]:2 b[integer]:200
+ COMMIT
+(6 rows)
+
+-- savepoint test on table with serial column
+BEGIN;
+CREATE SEQUENCE test_sequence;
+SELECT nextval('test_sequence');
+ nextval 
+---------
+       1
+(1 row)
+
+SELECT setval('test_sequence', 3000);
+ setval 
+--------
+   3000
+(1 row)
+
+SELECT nextval('test_sequence');
+ nextval 
+---------
+    3001
+(1 row)
+
+SAVEPOINT a;
+ALTER SEQUENCE test_sequence START WITH 7000;
+SELECT setval('test_sequence', 5000);
+ setval 
+--------
+   5000
+(1 row)
+
+ROLLBACK TO SAVEPOINT a;
+SELECT * FROM test_sequence;
+ last_value | log_cnt | is_called 
+------------+---------+-----------
+       3001 |      32 | t
+(1 row)
+
+DROP SEQUENCE test_sequence;
+COMMIT;
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '0', 'skip-sequences', '0');
+                                              data                                              
+------------------------------------------------------------------------------------------------
+ BEGIN
+ sequence public.test_sequence: transactional:1 created:1 last_value:1 log_cnt:0 is_called:0
+ sequence public.test_sequence: transactional:1 created:0 last_value:33 log_cnt:0 is_called:1
+ sequence public.test_sequence: transactional:1 created:0 last_value:3000 log_cnt:0 is_called:1
+ sequence public.test_sequence: transactional:1 created:0 last_value:3033 log_cnt:0 is_called:1
+ COMMIT
+(6 rows)
+
+SELECT pg_drop_replication_slot('regression_slot');
+ pg_drop_replication_slot 
+--------------------------
+ 
+(1 row)
+
diff --git a/contrib/test_decoding/sql/sequence.sql b/contrib/test_decoding/sql/sequence.sql
new file mode 100644
index 0000000000..21c4b79222
--- /dev/null
+++ b/contrib/test_decoding/sql/sequence.sql
@@ -0,0 +1,119 @@
+-- predictability
+SET synchronous_commit = on;
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
+
+CREATE SEQUENCE test_sequence;
+
+-- test the sequence changes by several nextval() calls
+SELECT nextval('test_sequence');
+SELECT nextval('test_sequence');
+SELECT nextval('test_sequence');
+SELECT nextval('test_sequence');
+
+-- test the sequence changes by several ALTER commands
+ALTER SEQUENCE test_sequence INCREMENT BY 10;
+SELECT nextval('test_sequence');
+
+ALTER SEQUENCE test_sequence START WITH 3000;
+ALTER SEQUENCE test_sequence MAXVALUE 10000;
+ALTER SEQUENCE test_sequence RESTART WITH 4000;
+SELECT nextval('test_sequence');
+
+-- test the sequence changes by several setval() calls
+SELECT setval('test_sequence', 3500);
+SELECT nextval('test_sequence');
+SELECT setval('test_sequence', 3500, true);
+SELECT nextval('test_sequence');
+SELECT setval('test_sequence', 3500, false);
+SELECT nextval('test_sequence');
+
+-- show results and drop sequence
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '0', 'skip-sequences', '0');
+DROP SEQUENCE test_sequence;
+
+-- rollback on sequence creation and update
+BEGIN;
+CREATE SEQUENCE test_sequence;
+CREATE TABLE test_table (a INT);
+SELECT nextval('test_sequence');
+SELECT nextval('test_sequence');
+SELECT nextval('test_sequence');
+SELECT setval('test_sequence', 3000);
+SELECT nextval('test_sequence');
+SELECT nextval('test_sequence');
+SELECT nextval('test_sequence');
+ALTER SEQUENCE test_sequence RESTART WITH 6000;
+INSERT INTO test_table VALUES( (SELECT nextval('test_sequence')) );
+SELECT nextval('test_sequence');
+ROLLBACK;
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'skip-sequences', '0');
+
+-- rollback on table creation with serial column
+BEGIN;
+CREATE TABLE test_table (a SERIAL, b INT);
+INSERT INTO test_table (b) VALUES (100);
+INSERT INTO test_table (b) VALUES (200);
+INSERT INTO test_table (b) VALUES (300);
+SELECT setval('test_table_a_seq', 3000);
+INSERT INTO test_table (b) VALUES (400);
+INSERT INTO test_table (b) VALUES (500);
+INSERT INTO test_table (b) VALUES (600);
+ALTER SEQUENCE test_table_a_seq RESTART WITH 6000;
+INSERT INTO test_table (b) VALUES (700);
+INSERT INTO test_table (b) VALUES (800);
+INSERT INTO test_table (b) VALUES (900);
+ROLLBACK;
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'skip-sequences', '0');
+
+-- rollback on table with serial column
+CREATE TABLE test_table (a SERIAL, b INT);
+
+BEGIN;
+INSERT INTO test_table (b) VALUES (100);
+INSERT INTO test_table (b) VALUES (200);
+INSERT INTO test_table (b) VALUES (300);
+SELECT setval('test_table_a_seq', 3000);
+INSERT INTO test_table (b) VALUES (400);
+INSERT INTO test_table (b) VALUES (500);
+INSERT INTO test_table (b) VALUES (600);
+ALTER SEQUENCE test_table_a_seq RESTART WITH 6000;
+INSERT INTO test_table (b) VALUES (700);
+INSERT INTO test_table (b) VALUES (800);
+INSERT INTO test_table (b) VALUES (900);
+ROLLBACK;
+
+-- check table and sequence values after rollback
+SELECT * from test_table_a_seq;
+SELECT nextval('test_table_a_seq');
+
+DROP TABLE test_table;
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '0', 'skip-sequences', '0');
+
+-- savepoint test on table with serial column
+BEGIN;
+CREATE TABLE test_table (a SERIAL, b INT);
+INSERT INTO test_table (b) VALUES (100);
+INSERT INTO test_table (b) VALUES (200);
+SAVEPOINT a;
+INSERT INTO test_table (b) VALUES (300);
+ROLLBACK TO SAVEPOINT a;
+DROP TABLE test_table;
+COMMIT;
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '0', 'skip-sequences', '0');
+
+-- savepoint test on table with serial column
+BEGIN;
+CREATE SEQUENCE test_sequence;
+SELECT nextval('test_sequence');
+SELECT setval('test_sequence', 3000);
+SELECT nextval('test_sequence');
+SAVEPOINT a;
+ALTER SEQUENCE test_sequence START WITH 7000;
+SELECT setval('test_sequence', 5000);
+ROLLBACK TO SAVEPOINT a;
+SELECT * FROM test_sequence;
+DROP SEQUENCE test_sequence;
+COMMIT;
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '0', 'skip-sequences', '0');
+
+SELECT pg_drop_replication_slot('regression_slot');
diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c
index e5cd84e85e..45765d299e 100644
--- a/contrib/test_decoding/test_decoding.c
+++ b/contrib/test_decoding/test_decoding.c
@@ -35,6 +35,7 @@ typedef struct
 	bool		include_timestamp;
 	bool		skip_empty_xacts;
 	bool		only_local;
+	bool		skip_sequences;
 } TestDecodingData;
 
 /*
@@ -76,6 +77,10 @@ static void pg_decode_message(LogicalDecodingContext *ctx,
 							  ReorderBufferTXN *txn, XLogRecPtr message_lsn,
 							  bool transactional, const char *prefix,
 							  Size sz, const char *message);
+static void pg_decode_sequence(LogicalDecodingContext *ctx,
+							  ReorderBufferTXN *txn, XLogRecPtr sequence_lsn,
+							  Relation rel, bool transactional, bool created,
+							  int64 last_value, int64 log_cnt, bool is_called);
 static bool pg_decode_filter_prepare(LogicalDecodingContext *ctx,
 									 TransactionId xid,
 									 const char *gid);
@@ -116,6 +121,10 @@ static void pg_decode_stream_message(LogicalDecodingContext *ctx,
 									 ReorderBufferTXN *txn, XLogRecPtr message_lsn,
 									 bool transactional, const char *prefix,
 									 Size sz, const char *message);
+static void pg_decode_stream_sequence(LogicalDecodingContext *ctx,
+									  ReorderBufferTXN *txn, XLogRecPtr sequence_lsn,
+									  Relation rel, bool transactional, bool created,
+									  int64 last_value, int64 log_cnt, bool is_called);
 static void pg_decode_stream_truncate(LogicalDecodingContext *ctx,
 									  ReorderBufferTXN *txn,
 									  int nrelations, Relation relations[],
@@ -141,6 +150,7 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
 	cb->filter_by_origin_cb = pg_decode_filter;
 	cb->shutdown_cb = pg_decode_shutdown;
 	cb->message_cb = pg_decode_message;
+	cb->sequence_cb = pg_decode_sequence;
 	cb->filter_prepare_cb = pg_decode_filter_prepare;
 	cb->begin_prepare_cb = pg_decode_begin_prepare_txn;
 	cb->prepare_cb = pg_decode_prepare_txn;
@@ -153,6 +163,7 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
 	cb->stream_commit_cb = pg_decode_stream_commit;
 	cb->stream_change_cb = pg_decode_stream_change;
 	cb->stream_message_cb = pg_decode_stream_message;
+	cb->stream_sequence_cb = pg_decode_stream_sequence;
 	cb->stream_truncate_cb = pg_decode_stream_truncate;
 }
 
@@ -175,6 +186,9 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
 	data->skip_empty_xacts = false;
 	data->only_local = false;
 
+	/* skip sequences by default for backwards compatibility */
+	data->skip_sequences = true;
+
 	ctx->output_plugin_private = data;
 
 	opt->output_type = OUTPUT_PLUGIN_TEXTUAL_OUTPUT;
@@ -265,6 +279,17 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
 						 errmsg("could not parse value \"%s\" for parameter \"%s\"",
 								strVal(elem->arg), elem->defname)));
 		}
+		else if (strcmp(elem->defname, "skip-sequences") == 0)
+		{
+
+			if (elem->arg == NULL)
+				continue;	/* true by default */
+			else if (!parse_bool(strVal(elem->arg), &data->skip_sequences))
+				ereport(ERROR,
+						(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+						 errmsg("could not parse value \"%s\" for parameter \"%s\"",
+								strVal(elem->arg), elem->defname)));
+		}
 		else
 		{
 			ereport(ERROR,
@@ -744,6 +769,28 @@ pg_decode_message(LogicalDecodingContext *ctx,
 	OutputPluginWrite(ctx, true);
 }
 
+static void
+pg_decode_sequence(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
+				   XLogRecPtr sequence_lsn, Relation rel,
+				   bool transactional, bool created,
+				   int64 last_value, int64 log_cnt, bool is_called)
+{
+	TestDecodingData *data = ctx->output_plugin_private;
+
+	/* return if requested to skip_sequences */
+	if (data->skip_sequences)
+		return;
+
+	OutputPluginPrepareWrite(ctx, true);
+	appendStringInfoString(ctx->out, "sequence ");
+	appendStringInfoString(ctx->out,
+						   quote_qualified_identifier(get_namespace_name(get_rel_namespace(RelationGetRelid(rel))),
+													  RelationGetRelationName(rel)));
+	appendStringInfo(ctx->out, 	": transactional:%d created:%d last_value:%zu log_cnt:%zu is_called:%d",
+					 transactional, created, last_value, log_cnt, is_called);
+	OutputPluginWrite(ctx, true);
+}
+
 static void
 pg_decode_stream_start(LogicalDecodingContext *ctx,
 					   ReorderBufferTXN *txn)
@@ -943,6 +990,28 @@ pg_decode_stream_message(LogicalDecodingContext *ctx,
 	OutputPluginWrite(ctx, true);
 }
 
+static void
+pg_decode_stream_sequence(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
+						  XLogRecPtr sequence_lsn, Relation rel,
+						  bool transactional, bool created,
+						  int64 last_value, int64 log_cnt, bool is_called)
+{
+	TestDecodingData *data = ctx->output_plugin_private;
+
+	/* return if requested to skip_sequences */
+	if (data->skip_sequences)
+		return;
+
+	OutputPluginPrepareWrite(ctx, true);
+	appendStringInfoString(ctx->out, "streaming sequence ");
+	appendStringInfoString(ctx->out,
+						   quote_qualified_identifier(get_namespace_name(get_rel_namespace(RelationGetRelid(rel))),
+													  RelationGetRelationName(rel)));
+	appendStringInfo(ctx->out, 	": transactional:%d created:%d last_value:%zu log_cnt:%zu is_called:%d",
+					 transactional, created, last_value, log_cnt, is_called);
+	OutputPluginWrite(ctx, true);
+}
+
 /*
  * In streaming mode, we don't display the detailed information of Truncate.
  * See pg_decode_stream_change.
-- 
2.31.1

Reply via email to