From d53b8db12749d020a19ea67884c925d5e5510e6f Mon Sep 17 00:00:00 2001
From: Masahiko Sawada <sawada.mshk@gmail.com>
Date: Fri, 10 Dec 2021 14:41:30 +0900
Subject: [PATCH] Add ALTER SUBSCRIPTION ... SKIP to skip the transaction on
 subscriber nodes

If incoming change violates any constraint, logical replication stops
until it's resolved. This commit introduces another way to skip the
transaction in question, other than manually updating the subscriber's
database or using pg_replication_origin_advance().

The user can specify XID by ALTER SUBSCRIPTION ... SKIP (xid = XXX),
updating pg_subscription.subskipxid field, telling the apply worker to
skip the transaction. The apply worker skips all data modification
changes within the specified transaction.

After skipping the transaciton the apply worker clears
pg_subscription.subskipxid.
---
 doc/src/sgml/logical-replication.sgml      |  54 +++++-
 doc/src/sgml/ref/alter_subscription.sgml   |  41 ++++
 src/backend/catalog/pg_subscription.c      |  10 +
 src/backend/commands/subscriptioncmds.c    |  54 ++++++
 src/backend/parser/gram.y                  |   9 +
 src/backend/replication/logical/worker.c   | 213 ++++++++++++++++++++-
 src/bin/pg_dump/pg_dump.c                  |   4 +
 src/bin/psql/tab-complete.c                |   8 +-
 src/include/catalog/pg_subscription.h      |   4 +
 src/include/nodes/parsenodes.h             |   3 +-
 src/test/regress/expected/subscription.out |  26 +++
 src/test/regress/sql/subscription.sql      |  14 ++
 src/test/subscription/t/027_skip_xact.pl   | 204 ++++++++++++++++++++
 13 files changed, 634 insertions(+), 10 deletions(-)
 create mode 100644 src/test/subscription/t/027_skip_xact.pl

diff --git a/doc/src/sgml/logical-replication.sgml b/doc/src/sgml/logical-replication.sgml
index 45b2e1e28f..3237f68b04 100644
--- a/doc/src/sgml/logical-replication.sgml
+++ b/doc/src/sgml/logical-replication.sgml
@@ -333,20 +333,66 @@
   <para>
    A conflict will produce an error and will stop the replication; it must be
    resolved manually by the user.  Details about the conflict can be found in
-   the subscriber's server log.
+   <xref linkend="monitoring-pg-stat-subscription-workers"/> as well as the
+   subscriber's server log.
   </para>
 
   <para>
    The resolution can be done either by changing data on the subscriber so
-   that it does not conflict with the incoming change or by skipping the
-   transaction that conflicts with the existing data.  The transaction can be
-   skipped by calling the <link linkend="pg-replication-origin-advance">
+   that it does not conflict with the incoming changes or by skipping the whole
+   transaction.  This option specifies the ID of the transaction whose
+   application is to be skipped by the logical replication worker.  The logical
+   replication worker skips all data modification transaction conflicts with
+   the existing data.  When a conflict produce an error, it is shown in
+   <structname>pg_stat_subscription_workers</structname> view as follows:
+  </para>
+
+  <programlisting>
+postgres=# SELECT * FROM pg_stat_subscription_workers;
+-[ RECORD 1 ]------+-----------------------------------------------------------
+subid              | 16391
+subname            | test_sub
+subrelid           |
+last_error_relid   | 16385
+last_error_command | INSERT
+last_error_xid     | 716
+last_error_count   | 50
+last_error_message | duplicate key value violates unique constraint "test_pkey"
+last_error_time    | 2021-09-29 15:52:45.165754+00
+</programlisting>
+
+  <para>
+   and it is also shown in subscriber's server log as follows:
+  </para>
+
+<screen>
+ERROR:  duplicate key value violates unique constraint "test_pkey"
+DETAIL:  Key (id)=(1) already exists.
+CONTEXT:  processing remote data during "INSERT" for replication target relation "public.test" in transaction 716 with commit timestamp 2021-09-29 15:52:45.165754+00
+</screen>
+
+  <para>
+   The transaction ID that contains the change violating the constraint can be
+   found from those outputs (transaction ID 716 in the above case).  The transaction
+   can be skipped by using <command>ALTER SUBSCRIPTION ... SKIP</command> on the
+   subscription.  Alternatively, the transaction can also be skipped by calling the
+   <link linkend="pg-replication-origin-advance">
    <function>pg_replication_origin_advance()</function></link> function with
    a <parameter>node_name</parameter> corresponding to the subscription name,
    and a position.  The current position of origins can be seen in the
    <link linkend="view-pg-replication-origin-status">
    <structname>pg_replication_origin_status</structname></link> system view.
   </para>
+
+  <para>
+   In this case, you need to consider changing the data on the subscriber so that it
+   doesn't conflict with incoming changes, or dropping the conflicting constraint or
+   unique index, or writing a trigger on the subscriber to suppress or redirect
+   conflicting incoming changes, or as a last resort, by skipping the whole transaction.
+   They skip the whole transaction, including changes that may not violate any
+   constraint.  They may easily make the subscriber inconsistent, especially if
+   a user specifies the wrong transaction ID or the position of origin.
+  </para>
  </sect1>
 
  <sect1 id="logical-replication-restrictions">
diff --git a/doc/src/sgml/ref/alter_subscription.sgml b/doc/src/sgml/ref/alter_subscription.sgml
index 0b027cc346..79a05e08ab 100644
--- a/doc/src/sgml/ref/alter_subscription.sgml
+++ b/doc/src/sgml/ref/alter_subscription.sgml
@@ -29,6 +29,7 @@ ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> REFRESH PUB
 ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> ENABLE
 ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> DISABLE
 ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> SET ( <replaceable class="parameter">subscription_parameter</replaceable> [= <replaceable class="parameter">value</replaceable>] [, ... ] )
+ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> SKIP ( <replaceable class="parameter">skip_option</replaceable> [= <replaceable class="parameter">value</replaceable>] [, ... ] )
 ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> OWNER TO { <replaceable>new_owner</replaceable> | CURRENT_ROLE | CURRENT_USER | SESSION_USER }
 ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> RENAME TO <replaceable>new_name</replaceable>
 </synopsis>
@@ -207,6 +208,46 @@ ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> RENAME TO <
     </listitem>
    </varlistentry>
 
+   <varlistentry>
+    <term><literal>SKIP ( <replaceable class="parameter">skip_option</replaceable> = <replaceable class="parameter">value</replaceable> [, ... ] )</literal></term>
+    <listitem>
+     <para>
+      Skip applying changes of the particular transaction.  If incoming data
+      violates any constraints the logical replication will stop until it is
+      resolved.	 The resolution can be done either by changing data on the
+      subscriber so that it doesn't conflict with incoming change or by skipping
+      the whole transaction.  The logical replication worker skips all data
+      modification changes within the specified transaction.  Therefore, since
+      it skips the whole transaction including the changes that may not violate
+      the constraint, it should only be used as a last resort.	This option has
+      no effect for the transaction that is already prepared with enabling
+      <literal>two_phase</literal> on susbscriber.  After the logical replication
+      successfully skips the transaction, the transaction ID (stored in
+      <structname>pg_subscription</structname>.<structfield>subskipxid</structfield>)
+      is cleared.  See <xref linkend="logical-replication-conflicts"/> for
+      the details of logical replication conflicts.
+     </para>
+
+     <para>
+      <replaceable>skip_option</replaceable> specifies options for this operation.
+      The supported option is:
+
+      <variablelist>
+       <varlistentry>
+        <term><literal>xid</literal> (<type>xid</type>)</term>
+        <listitem>
+         <para>
+          Specifies the ID of the transaction whose application is to be skipped
+          by the logical replication worker. Setting -1 means to reset the
+          transaction ID.
+         </para>
+        </listitem>
+       </varlistentry>
+      </variablelist>
+     </para>
+    </listitem>
+   </varlistentry>
+
    <varlistentry>
     <term><replaceable class="parameter">new_owner</replaceable></term>
     <listitem>
diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c
index 25021e25a4..cb22cd7463 100644
--- a/src/backend/catalog/pg_subscription.c
+++ b/src/backend/catalog/pg_subscription.c
@@ -104,6 +104,16 @@ GetSubscription(Oid subid, bool missing_ok)
 	Assert(!isnull);
 	sub->publications = textarray_to_stringlist(DatumGetArrayTypeP(datum));
 
+	/* Get skip XID */
+	datum = SysCacheGetAttr(SUBSCRIPTIONOID,
+							tup,
+							Anum_pg_subscription_subskipxid,
+							&isnull);
+	if (!isnull)
+		sub->skipxid = DatumGetTransactionId(datum);
+	else
+		sub->skipxid = InvalidTransactionId;
+
 	ReleaseSysCache(tup);
 
 	return sub;
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 2b658080fe..6e64d8ccac 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -61,6 +61,7 @@
 #define SUBOPT_BINARY				0x00000080
 #define SUBOPT_STREAMING			0x00000100
 #define SUBOPT_TWOPHASE_COMMIT		0x00000200
+#define SUBOPT_XID					0x00000400
 
 /* check if the 'val' has 'bits' set */
 #define IsSet(val, bits)  (((val) & (bits)) == (bits))
@@ -82,6 +83,8 @@ typedef struct SubOpts
 	bool		binary;
 	bool		streaming;
 	bool		twophase;
+	TransactionId xid; /* InvalidTransactionId for resetting purpose, otherwise
+						* normal transaction id */
 } SubOpts;
 
 static List *fetch_table_list(WalReceiverConn *wrconn, List *publications);
@@ -249,6 +252,31 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
 			opts->specified_opts |= SUBOPT_TWOPHASE_COMMIT;
 			opts->twophase = defGetBoolean(defel);
 		}
+		else if (strcmp(defel->defname, "xid") == 0)
+		{
+			char *xid_str = defGetString(defel);
+			TransactionId xid;
+
+			if (strcmp(xid_str, "-1") == 0)
+			{
+				/* Setting -1 to xid means to reset it */
+				xid = InvalidTransactionId;
+			}
+			else
+			{
+				/* Parse the argument as TransactionId */
+				xid = DatumGetTransactionId(DirectFunctionCall1(xidin,
+																CStringGetDatum(xid_str)));
+
+				if (!TransactionIdIsNormal(xid))
+					ereport(ERROR,
+							(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+							 errmsg("invalid transaction id: %s", xid_str)));
+			}
+
+			opts->specified_opts |= SUBOPT_XID;
+			opts->xid = xid;
+		}
 		else
 			ereport(ERROR,
 					(errcode(ERRCODE_SYNTAX_ERROR),
@@ -464,6 +492,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 		CharGetDatum(opts.twophase ?
 					 LOGICALREP_TWOPHASE_STATE_PENDING :
 					 LOGICALREP_TWOPHASE_STATE_DISABLED);
+	nulls[Anum_pg_subscription_subskipxid - 1] = true;
 	values[Anum_pg_subscription_subconninfo - 1] =
 		CStringGetTextDatum(conninfo);
 	if (opts.slot_name)
@@ -1083,6 +1112,31 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 				break;
 			}
 
+		case ALTER_SUBSCRIPTION_SKIP:
+			{
+				parse_subscription_options(pstate, stmt->options, SUBOPT_XID, &opts);
+
+				if (IsSet(opts.specified_opts, SUBOPT_XID))
+				{
+					if (!superuser())
+						ereport(ERROR,
+								(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
+								 errmsg("must be superuser to set %s", "skip_xid")));
+
+					if (TransactionIdIsValid(opts.xid))
+						values[Anum_pg_subscription_subskipxid - 1] =
+							TransactionIdGetDatum(opts.xid);
+					else
+						nulls[Anum_pg_subscription_subskipxid - 1] = true;
+
+					replaces[Anum_pg_subscription_subskipxid - 1] = true;
+
+					update_tuple = true;
+				}
+
+				break;
+			}
+
 		default:
 			elog(ERROR, "unrecognized ALTER SUBSCRIPTION kind %d",
 				 stmt->kind);
diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y
index 3d4dd43e47..ba039ff9a6 100644
--- a/src/backend/parser/gram.y
+++ b/src/backend/parser/gram.y
@@ -9956,6 +9956,15 @@ AlterSubscriptionStmt:
 											(Node *)makeInteger(false), @1));
 					$$ = (Node *)n;
 				}
+			| ALTER SUBSCRIPTION name SKIP definition
+				{
+					AlterSubscriptionStmt *n =
+						makeNode(AlterSubscriptionStmt);
+					n->kind = ALTER_SUBSCRIPTION_SKIP;
+					n->subname = $3;
+					n->options = $5;
+					$$ = (Node *)n;
+				}
 		;
 
 /*****************************************************************************
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 2e79302a48..6c2ff569b9 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -136,6 +136,7 @@
 #include "access/xact.h"
 #include "access/xlog_internal.h"
 #include "catalog/catalog.h"
+#include "catalog/indexing.h"
 #include "catalog/namespace.h"
 #include "catalog/partition.h"
 #include "catalog/pg_inherits.h"
@@ -255,6 +256,19 @@ static bool in_streamed_transaction = false;
 
 static TransactionId stream_xid = InvalidTransactionId;
 
+/*
+ * True if we are skipping all data modification changes (INSERT, UPDATE, etc.) of
+ * the specified transaction at MySubscription->skipxid.  Once we start skipping
+ * changes, we don't stop it until the we skip all changes of the transaction even
+ * if the subscription invalidated and MySubscription->skipxid gets changed or reset.
+ * Also, we don't skip receiving the changes in streaming cases, since we decide
+ * whether or not to skip applying the changes when starting to apply changes.  At
+ * end of the transaction, we disable it and reset the skip XID.  The timing of
+ * resetting the skip XID varies depending on commit or commit/rollback prepared
+ * cases.  Please refer to the comments in these functions for details.
+ */
+static bool skipping_changes = false;
+
 /* BufFile handle of the current streaming file */
 static BufFile *stream_fd = NULL;
 
@@ -330,6 +344,11 @@ static void TwoPhaseTransactionGid(Oid subid, TransactionId xid, char *gid, int
 /* Common streaming function to apply all the spooled messages */
 static void apply_spooled_messages(TransactionId xid, XLogRecPtr lsn);
 
+/* Functions for skipping changes */
+static void maybe_start_skipping_changes(TransactionId xid);
+static void stop_skipping_changes(bool commit);
+static void clear_subscription_skip_xid(void);
+
 /* Functions for apply error callback */
 static void apply_error_callback(void *arg);
 static inline void set_apply_error_context_xact(TransactionId xid, TimestampTz ts);
@@ -789,6 +808,11 @@ apply_handle_begin(StringInfo s)
 
 	remote_final_lsn = begin_data.final_lsn;
 
+	/*
+	 * Enable skipping all changes of this transaction if specified.
+	 */
+	maybe_start_skipping_changes(begin_data.xid);
+
 	in_remote_transaction = true;
 
 	pgstat_report_activity(STATE_RUNNING, NULL);
@@ -841,6 +865,11 @@ apply_handle_begin_prepare(StringInfo s)
 
 	remote_final_lsn = begin_data.prepare_lsn;
 
+	/*
+	 * Enable skipping all changes of this transaction if specified
+	 */
+	maybe_start_skipping_changes(begin_data.xid);
+
 	in_remote_transaction = true;
 
 	pgstat_report_activity(STATE_RUNNING, NULL);
@@ -897,11 +926,32 @@ apply_handle_prepare(StringInfo s)
 								 LSN_FORMAT_ARGS(prepare_data.prepare_lsn),
 								 LSN_FORMAT_ARGS(remote_final_lsn))));
 
+	/*
+	 * If we are skipping all changes of this transaction, we stop it but
+	 * unlike commit, we do not clear subskipxid of pg_subscription catalog
+	 * here and will do that at commit prepared or rollback prepared time.
+	 * If we update the catalog and prepare the transaction, in a case where
+	 * the server crashes between them, subskipxid is cleared but this
+	 * transaction will be resent.  Even if we do that in reverse order,
+	 * subskipxid will not be cleared but this transaction won’t be resent.
+	 *
+	 * Also, one might think that we can skip preparing the skipped transaction.
+	 * But if we do that, PREPARE WAL record won’t be sent to its physical
+	 * standbys, resulting in that users won’t be able to find the prepared
+	 * transaction entry after a fail-over.
+	 *
+	 * subskipxid might be changed or cleared by the user before we receive
+	 * COMMIT PREPARED or ROLLBACK PREPARED. But that’s okay because this
+	 * prepared transaction is empty.
+	 */
+	if (skipping_changes)
+		stop_skipping_changes(false);
+
 	/*
 	 * Unlike commit, here, we always prepare the transaction even though no
-	 * change has happened in this transaction. It is done this way because at
-	 * commit prepared time, we won't know whether we have skipped preparing a
-	 * transaction because of no change.
+	 * change has happened in this transaction or all changes are skipped. It is
+	 * done this way because at commit prepared time, we won't know whether we
+	 * have skipped preparing a transaction because of no change.
 	 *
 	 * XXX, We can optimize such that at commit prepared time, we first check
 	 * whether we have prepared the transaction or not but that doesn't seem
@@ -938,6 +988,24 @@ apply_handle_commit_prepared(StringInfo s)
 	logicalrep_read_commit_prepared(s, &prepare_data);
 	set_apply_error_context_xact(prepare_data.xid, prepare_data.commit_time);
 
+	if (MySubscription->skipxid == prepare_data.xid)
+	{
+		/*
+		 * Clear the subskipxid of pg_subscription catalog.  This catalog
+		 * update must be committed before finishing prepared transaction.
+		 * Because otherwise, in a case where the server crashes between
+		 * finishing prepared transaction and the catalog update, COMMIT
+		 * PREPARED won’t be resent but skipsubxid is left.
+		 *
+		 * Also, we must not update the replication origin LSN and timestamp
+		 * while committing the catalog update so that COMMIT PREPARED will
+		 * be resent in case of a crash immediately after the catalog update
+		 * commit.
+		 */
+		clear_subscription_skip_xid();
+		CommitTransactionCommand();
+	}
+
 	/* Compute GID for two_phase transactions. */
 	TwoPhaseTransactionGid(MySubscription->oid, prepare_data.xid,
 						   gid, sizeof(gid));
@@ -979,6 +1047,17 @@ apply_handle_rollback_prepared(StringInfo s)
 	logicalrep_read_rollback_prepared(s, &rollback_data);
 	set_apply_error_context_xact(rollback_data.xid, rollback_data.rollback_time);
 
+	if (MySubscription->skipxid == rollback_data.xid)
+	{
+		/*
+		 * Same as COMMIT PREPARED, we must clear subskipxid of pg_subscription
+		 * before rolling back the prepared transaction. Please see the comments
+		 * in apply_handle_commit_prepared() for details.
+		 */
+		clear_subscription_skip_xid();
+		CommitTransactionCommand();
+	}
+
 	/* Compute GID for two_phase transactions. */
 	TwoPhaseTransactionGid(MySubscription->oid, rollback_data.xid,
 						   gid, sizeof(gid));
@@ -1046,9 +1125,19 @@ apply_handle_stream_prepare(StringInfo s)
 
 	elog(DEBUG1, "received prepare for streamed transaction %u", prepare_data.xid);
 
+	/* Enable skipping all changes of this transaction if specified. */
+	maybe_start_skipping_changes(prepare_data.xid);
+
 	/* Replay all the spooled operations. */
 	apply_spooled_messages(prepare_data.xid, prepare_data.prepare_lsn);
 
+	/*
+	 * Same as PREPARE, we stop skipping changes but don't clear subskipxid
+	 * here.  See the comments in apply_handle_prepare() for details.
+	 */
+	if (skipping_changes)
+		stop_skipping_changes(false);
+
 	/* Mark the transaction as prepared. */
 	apply_handle_prepare_internal(&prepare_data);
 
@@ -1207,6 +1296,16 @@ apply_handle_stream_abort(StringInfo s)
 
 	logicalrep_read_stream_abort(s, &xid, &subxid);
 
+	/*
+	 * We don't expect that the user set the XID of the transaction that is
+	 * rolled back but if the skip XID is set, clear it.
+	 */
+	if (MySubscription->skipxid == xid || MySubscription->skipxid == subxid)
+	{
+		clear_subscription_skip_xid();
+		CommitTransactionCommand();
+	}
+
 	/*
 	 * If the two XIDs are the same, it's in fact abort of toplevel xact, so
 	 * just delete the files with serialized info.
@@ -1428,8 +1527,15 @@ apply_handle_stream_commit(StringInfo s)
 
 	elog(DEBUG1, "received commit for streamed transaction %u", xid);
 
+	/* Enable skipping all changes of this transaction if specified */
+	maybe_start_skipping_changes(xid);
+
 	apply_spooled_messages(xid, commit_data.commit_lsn);
 
+	/*
+	 * Commit streamed transaction. If we're skipping this transaction,
+	 * we stop it in apply_handle_commit_internal().
+	 */
 	apply_handle_commit_internal(&commit_data);
 
 	/* unlink the files with serialized changes and subxact info */
@@ -1449,8 +1555,17 @@ apply_handle_stream_commit(StringInfo s)
 static void
 apply_handle_commit_internal(LogicalRepCommitData *commit_data)
 {
-	if (IsTransactionState())
+	if (IsTransactionState() || skipping_changes)
 	{
+		/*
+		 * If we are skipping all changes of this transaction, we stop it
+		 * and clear subskipxid of pg_subscription.  The catalog update is
+		 * committed at CommitTransactionCommand() below while updating
+		 * the replication origin LSN and timestamp.
+		 */
+		if (skipping_changes)
+			stop_skipping_changes(true);
+
 		/*
 		 * Update origin state so we can restart streaming from correct
 		 * position in case of crash.
@@ -2319,6 +2434,17 @@ apply_dispatch(StringInfo s)
 	LogicalRepMsgType action = pq_getmsgbyte(s);
 	LogicalRepMsgType saved_command;
 
+	/*
+	 * Skip all data-modification changes if we're skipping changes of this
+	 * transaction.
+	 */
+	if (skipping_changes &&
+		(action == LOGICAL_REP_MSG_INSERT ||
+		 action == LOGICAL_REP_MSG_UPDATE ||
+		 action == LOGICAL_REP_MSG_DELETE ||
+		 action == LOGICAL_REP_MSG_TRUNCATE))
+		return;
+
 	/*
 	 * Set the current command being applied. Since this function can be
 	 * called recusively when applying spooled changes, save the current
@@ -3613,6 +3739,85 @@ IsLogicalWorker(void)
 	return MyLogicalRepWorker != NULL;
 }
 
+/*
+ * Start skipping changes of the transaction if the given XID matches the
+ * transaction ID specified by skip_xid option.
+ */
+static void
+maybe_start_skipping_changes(TransactionId xid)
+{
+	Assert(!skipping_changes);
+	Assert(!in_remote_transaction);
+	Assert(!in_streamed_transaction);
+
+	if (MySubscription->skipxid != xid)
+		return;
+
+	/* Start skipping all changes of this transaction */
+	skipping_changes = true;
+
+	ereport(LOG,
+			errmsg("start skipping logical replication transaction %u",
+				   xid));
+}
+
+/*
+ * Stop skipping changes by resetting skipping_xid.  If clear_subskipxid is true,
+ * we also set NULL to subskipxid of pg_subscription catalog.
+ */
+static void
+stop_skipping_changes(bool clear_subskipxid)
+{
+	Assert(skipping_changes);
+
+	/* Stop skipping changes */
+	skipping_changes = false;
+
+	if (clear_subskipxid)
+		clear_subscription_skip_xid();
+
+	ereport(LOG, (errmsg("done skipping logical replication transaction")));
+}
+
+/* Update subskipxid of pg_subscription to NULL */
+static void
+clear_subscription_skip_xid(void)
+{
+	Relation	rel;
+	HeapTuple	tup;
+	bool		nulls[Natts_pg_subscription];
+	bool		replaces[Natts_pg_subscription];
+	Datum		values[Natts_pg_subscription];
+
+	memset(values, 0, sizeof(values));
+	memset(nulls, false, sizeof(nulls));
+	memset(replaces, false, sizeof(replaces));
+
+	if (!IsTransactionState())
+		StartTransactionCommand();
+
+	rel = table_open(SubscriptionRelationId, RowExclusiveLock);
+
+	/* Fetch the existing tuple. */
+	tup = SearchSysCacheCopy1(SUBSCRIPTIONOID,
+							  ObjectIdGetDatum(MySubscription->oid));
+
+	if (!HeapTupleIsValid(tup))
+		elog(ERROR, "subscription \"%s\" does not exist", MySubscription->name);
+
+	/* Set subskipxid to null */
+	nulls[Anum_pg_subscription_subskipxid - 1] = true;
+	replaces[Anum_pg_subscription_subskipxid - 1] = true;
+
+	/* Update the system catalog to reset the skip XID */
+	tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
+							replaces);
+	CatalogTupleUpdate(rel, &tup->t_self, tup);
+
+	heap_freetuple(tup);
+	table_close(rel, RowExclusiveLock);
+}
+
 /* Error callback to give more context info about the change being applied */
 static void
 apply_error_callback(void *arg)
diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c
index 10a86f9810..c025d64b83 100644
--- a/src/bin/pg_dump/pg_dump.c
+++ b/src/bin/pg_dump/pg_dump.c
@@ -4513,6 +4513,10 @@ getSubscriptions(Archive *fout)
 
 	ntups = PQntuples(res);
 
+	/*
+	 * Get subscription fields. We don't fetch subskipxid as we don't
+	 * include it in the dump.
+	 */
 	i_tableoid = PQfnumber(res, "tableoid");
 	i_oid = PQfnumber(res, "oid");
 	i_subname = PQfnumber(res, "subname");
diff --git a/src/bin/psql/tab-complete.c b/src/bin/psql/tab-complete.c
index 2f412ca3db..ed14d8e6c5 100644
--- a/src/bin/psql/tab-complete.c
+++ b/src/bin/psql/tab-complete.c
@@ -1666,7 +1666,7 @@ psql_completion(const char *text, int start, int end)
 	/* ALTER SUBSCRIPTION <name> */
 	else if (Matches("ALTER", "SUBSCRIPTION", MatchAny))
 		COMPLETE_WITH("CONNECTION", "ENABLE", "DISABLE", "OWNER TO",
-					  "RENAME TO", "REFRESH PUBLICATION", "SET",
+					  "RENAME TO", "REFRESH PUBLICATION", "SET", "SKIP",
 					  "ADD PUBLICATION", "DROP PUBLICATION");
 	/* ALTER SUBSCRIPTION <name> REFRESH PUBLICATION */
 	else if (HeadMatches("ALTER", "SUBSCRIPTION", MatchAny) &&
@@ -1682,6 +1682,12 @@ psql_completion(const char *text, int start, int end)
 	/* ALTER SUBSCRIPTION <name> SET ( */
 	else if (HeadMatches("ALTER", "SUBSCRIPTION", MatchAny) && TailMatches("SET", "("))
 		COMPLETE_WITH("binary", "slot_name", "streaming", "synchronous_commit");
+	/* ALTER SUBSCRIPTION <name> SKIP */
+	else if (Matches("ALTER", "SUBSCRIPTION", MatchAny, "SKIP"))
+		COMPLETE_WITH("(");
+	/* ALTER SUBSCRIPTION <name> SKIP ( */
+	else if (HeadMatches("ALTER", "SUBSCRIPTION", MatchAny) && TailMatches("SKIP", "("))
+		COMPLETE_WITH("xid");
 	/* ALTER SUBSCRIPTION <name> SET PUBLICATION */
 	else if (HeadMatches("ALTER", "SUBSCRIPTION", MatchAny) && TailMatches("SET", "PUBLICATION"))
 	{
diff --git a/src/include/catalog/pg_subscription.h b/src/include/catalog/pg_subscription.h
index 21061493ea..beaa6e646d 100644
--- a/src/include/catalog/pg_subscription.h
+++ b/src/include/catalog/pg_subscription.h
@@ -67,6 +67,9 @@ CATALOG(pg_subscription,6100,SubscriptionRelationId) BKI_SHARED_RELATION BKI_ROW
 
 	char		subtwophasestate;	/* Stream two-phase transactions */
 
+	TransactionId subskipxid BKI_FORCE_NULL;	/* All changes associated with
+												 * this XID are skipped */
+
 #ifdef CATALOG_VARLEN			/* variable-length fields start here */
 	/* Connection string to the publisher */
 	text		subconninfo BKI_FORCE_NOT_NULL;
@@ -103,6 +106,7 @@ typedef struct Subscription
 								 * binary format */
 	bool		stream;			/* Allow streaming in-progress transactions. */
 	char		twophasestate;	/* Allow streaming two-phase transactions */
+	TransactionId skipxid;		/* All changes of the XID are skipped */
 	char	   *conninfo;		/* Connection string to the publisher */
 	char	   *slotname;		/* Name of the replication slot */
 	char	   *synccommit;		/* Synchronous commit setting for worker */
diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h
index 4c5a8a39bf..ab778865d8 100644
--- a/src/include/nodes/parsenodes.h
+++ b/src/include/nodes/parsenodes.h
@@ -3709,7 +3709,8 @@ typedef enum AlterSubscriptionType
 	ALTER_SUBSCRIPTION_ADD_PUBLICATION,
 	ALTER_SUBSCRIPTION_DROP_PUBLICATION,
 	ALTER_SUBSCRIPTION_REFRESH,
-	ALTER_SUBSCRIPTION_ENABLED
+	ALTER_SUBSCRIPTION_ENABLED,
+	ALTER_SUBSCRIPTION_SKIP
 } AlterSubscriptionType;
 
 typedef struct AlterSubscriptionStmt
diff --git a/src/test/regress/expected/subscription.out b/src/test/regress/expected/subscription.out
index 80aae83562..f5c757f4fd 100644
--- a/src/test/regress/expected/subscription.out
+++ b/src/test/regress/expected/subscription.out
@@ -93,6 +93,32 @@ ALTER SUBSCRIPTION regress_doesnotexist CONNECTION 'dbname=regress_doesnotexist2
 ERROR:  subscription "regress_doesnotexist" does not exist
 ALTER SUBSCRIPTION regress_testsub SET (create_slot = false);
 ERROR:  unrecognized subscription parameter: "create_slot"
+-- ok - valid xid
+ALTER SUBSCRIPTION regress_testsub SKIP (xid = 3);
+ALTER SUBSCRIPTION regress_testsub SKIP (xid = 4294967295);
+SELECT subname, subskipxid FROM pg_subscription WHERE subname = 'regress_testsub';
+     subname     | subskipxid 
+-----------------+------------
+ regress_testsub | 4294967295
+(1 row)
+
+ALTER SUBSCRIPTION regress_testsub SKIP (xid = -1);
+SELECT subname, subskipxid FROM pg_subscription WHERE subname = 'regress_testsub';
+     subname     | subskipxid 
+-----------------+------------
+ regress_testsub |           
+(1 row)
+
+-- fail
+ALTER SUBSCRIPTION regress_testsub SKIP (xid = 1.1);
+ERROR:  invalid transaction id: 1.1
+ALTER SUBSCRIPTION regress_testsub SKIP (xid = -2);
+ALTER SUBSCRIPTION regress_testsub SKIP (xid = 0);
+ERROR:  invalid transaction id: 0
+ALTER SUBSCRIPTION regress_testsub SKIP (xid = 1);
+ERROR:  invalid transaction id: 1
+ALTER SUBSCRIPTION regress_testsub SKIP (xid = 2);
+ERROR:  invalid transaction id: 2
 \dRs+
                                                                           List of subscriptions
       Name       |           Owner           | Enabled |     Publication     | Binary | Streaming | Two phase commit | Synchronous commit |           Conninfo           
diff --git a/src/test/regress/sql/subscription.sql b/src/test/regress/sql/subscription.sql
index bd0f4af1e4..00df54e84a 100644
--- a/src/test/regress/sql/subscription.sql
+++ b/src/test/regress/sql/subscription.sql
@@ -72,6 +72,20 @@ ALTER SUBSCRIPTION regress_testsub SET (slot_name = '');
 ALTER SUBSCRIPTION regress_doesnotexist CONNECTION 'dbname=regress_doesnotexist2';
 ALTER SUBSCRIPTION regress_testsub SET (create_slot = false);
 
+-- ok - valid xid
+ALTER SUBSCRIPTION regress_testsub SKIP (xid = 3);
+ALTER SUBSCRIPTION regress_testsub SKIP (xid = 4294967295);
+SELECT subname, subskipxid FROM pg_subscription WHERE subname = 'regress_testsub';
+ALTER SUBSCRIPTION regress_testsub SKIP (xid = -1);
+SELECT subname, subskipxid FROM pg_subscription WHERE subname = 'regress_testsub';
+
+-- fail
+ALTER SUBSCRIPTION regress_testsub SKIP (xid = 1.1);
+ALTER SUBSCRIPTION regress_testsub SKIP (xid = -2);
+ALTER SUBSCRIPTION regress_testsub SKIP (xid = 0);
+ALTER SUBSCRIPTION regress_testsub SKIP (xid = 1);
+ALTER SUBSCRIPTION regress_testsub SKIP (xid = 2);
+
 \dRs+
 
 BEGIN;
diff --git a/src/test/subscription/t/027_skip_xact.pl b/src/test/subscription/t/027_skip_xact.pl
new file mode 100644
index 0000000000..37c2fb3dba
--- /dev/null
+++ b/src/test/subscription/t/027_skip_xact.pl
@@ -0,0 +1,204 @@
+
+# Copyright (c) 2021, PostgreSQL Global Development Group
+
+# Tests for skipping logical replication transactions.
+use strict;
+use warnings;
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More tests => 6;
+
+# Test skipping the transaction. This function must be called after the caller
+# inserting data that conflict with the subscriber.  After waiting for the
+# subscription worker stats are updated, we skip the transaction in question
+# by ALTER SUBSCRIPTION ... SKIP. Then, check if logical replication can continue
+# working by inserting $nonconflict_data on the publisher.
+sub test_skip_xact
+{
+    my ($node_publisher, $node_subscriber, $subname, $relname, $nonconflict_data,
+	$expected, $xid, $msg) = @_;
+
+    # Wait for worker error
+    $node_subscriber->poll_query_until(
+	'postgres',
+	qq[
+SELECT count(1) > 0
+FROM pg_stat_subscription_workers
+WHERE last_error_relid = '$relname'::regclass
+    AND subrelid IS NULL
+    AND last_error_command = 'INSERT'
+    AND last_error_xid = '$xid'
+    AND starts_with(last_error_message, 'duplicate key value violates unique constraint');
+]) or die "Timed out while waiting for worker error";
+
+    # Set skip xid
+    $node_subscriber->safe_psql(
+	'postgres',
+	"ALTER SUBSCRIPTION $subname SKIP (xid = '$xid')");
+
+    # Restart the subscriber node to restart logical replication with no interval.
+    $node_subscriber->restart;
+
+    # Wait for the failed transaction to be skipped.
+    $node_subscriber->poll_query_until(
+	'postgres',
+	"SELECT subskipxid IS NULL FROM pg_subscription WHERE subname = '$subname'");
+
+    # Insert non-conflict data
+    $node_publisher->safe_psql(
+	'postgres',
+	"INSERT INTO $relname VALUES $nonconflict_data");
+
+    $node_publisher->wait_for_catchup($subname);
+
+    # Check replicated data
+    my $res = $node_subscriber->safe_psql(
+	'postgres',
+	"SELECT count(*) FROM $relname");
+    is($res, $expected, $msg);
+}
+
+# Create publisher node.
+my $node_publisher = PostgreSQL::Test::Cluster->new('publisher');
+$node_publisher->init(allows_streaming => 'logical');
+$node_publisher->append_conf('postgresql.conf',
+	qq[
+logical_decoding_work_mem = 64kB
+max_prepared_transactions = 10
+]);
+$node_publisher->start;
+
+# Create subscriber node.
+my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
+$node_subscriber->init(allows_streaming => 'logical');
+$node_subscriber->append_conf('postgresql.conf',
+	qq[
+max_prepared_transactions = 10
+]);
+
+# The subscriber will enter an infinite error loop, so we don't want
+# to overflow the server log with error messages.
+$node_subscriber->append_conf('postgresql.conf',
+			      qq[
+wal_retrieve_retry_interval = 2s
+]);
+$node_subscriber->start;
+
+# Initial table setup on both publisher and subscriber. On subscriber we
+# create the same tables but with primary keys. Also, insert some data that
+# will conflict with the data replicated from publisher later.
+$node_publisher->safe_psql(
+    'postgres',
+    qq[
+BEGIN;
+CREATE TABLE test_tab (a int);
+CREATE TABLE test_tab_streaming (a int, b text);
+COMMIT;
+]);
+$node_subscriber->safe_psql(
+    'postgres',
+    qq[
+BEGIN;
+CREATE TABLE test_tab (a int primary key);
+CREATE TABLE test_tab_streaming (a int primary key, b text);
+INSERT INTO test_tab VALUES (1);
+INSERT INTO test_tab_streaming VALUES (1, md5(1::text));
+COMMIT;
+]);
+
+# Setup publications.
+my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
+$node_publisher->safe_psql(
+    'postgres',
+    qq[
+CREATE PUBLICATION tap_pub FOR TABLE test_tab;
+CREATE PUBLICATION tap_pub_streaming FOR TABLE test_tab_streaming;
+]);
+
+# Create subscriptions.
+$node_subscriber->safe_psql(
+    'postgres',
+    qq[
+CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub WITH (two_phase = on);
+CREATE SUBSCRIPTION tap_sub_streaming CONNECTION '$publisher_connstr' PUBLICATION tap_pub_streaming WITH (two_phase = on, streaming = on);
+]);
+
+$node_publisher->wait_for_catchup('tap_sub');
+$node_publisher->wait_for_catchup('tap_sub_streaming');
+
+# Insert data to test_tab1, raising an error on the subscriber due to violation
+# of the unique constraint on test_tab. Then skip the transaction.
+my $xid = $node_publisher->safe_psql(
+    'postgres',
+    qq[
+BEGIN;
+INSERT INTO test_tab VALUES (1);
+SELECT pg_current_xact_id()::xid;
+COMMIT;
+]);
+test_skip_xact($node_publisher, $node_subscriber, "tap_sub", "test_tab",
+	       "(2)", "2", $xid,
+	       "test skipping transaction");
+
+# Test for PREPARE. Insert the same data to test_tab1 and PREPARE the transaction,
+# raising an error. Then skip the transaction.
+$xid = $node_publisher->safe_psql(
+    'postgres',
+    qq[
+BEGIN;
+INSERT INTO test_tab VALUES (1);
+SELECT pg_current_xact_id()::xid;
+PREPARE TRANSACTION 'gtx';
+COMMIT PREPARED 'gtx';
+]);
+test_skip_xact($node_publisher, $node_subscriber, "tap_sub", "test_tab",
+	       "(3)", "3", $xid,
+	       "test skipping prepare and commit prepared ");
+
+# Test for PREPARE and ROLLBACK PREPARED.
+$xid = $node_publisher->safe_psql(
+    'postgres',
+    qq[
+BEGIN;
+INSERT INTO test_tab VALUES (1);
+SELECT pg_current_xact_id()::xid;
+PREPARE TRANSACTION 'gtx';
+ROLLBACK PREPARED 'gtx';
+]);
+test_skip_xact($node_publisher, $node_subscriber, "tap_sub", "test_tab",
+	       "(4)", "4", $xid,
+	       "test skipping prepare and rollback prepared");
+
+# Test for STREAM COMMIT. Insert enough rows to test_tab_streaming to exceed the 64kB
+# limit, also raising an error on the subscriber during applying spooled changes for the
+# same reason. Then skip the transaction.
+$xid = $node_publisher->safe_psql(
+    'postgres',
+    qq[
+BEGIN;
+INSERT INTO test_tab_streaming SELECT i, md5(i::text) FROM generate_series(1, 10000) s(i);
+SELECT pg_current_xact_id()::xid;
+COMMIT;
+]);
+test_skip_xact($node_publisher, $node_subscriber, "tap_sub_streaming", "test_tab_streaming",
+	       "(2, md5(2::text))", "2", $xid,
+	       "test skipping stream-commit");
+
+# Test for STREAM PREPARE and COMMIT PREPARED.
+$xid = $node_publisher->safe_psql(
+    'postgres',
+    qq[
+BEGIN;
+INSERT INTO test_tab_streaming SELECT i, md5(i::text) FROM generate_series(1, 10000) s(i);
+SELECT pg_current_xact_id()::xid;
+PREPARE TRANSACTION 'gtx';
+COMMIT PREPARED 'gtx';
+]);
+test_skip_xact($node_publisher, $node_subscriber, "tap_sub_streaming", "test_tab_streaming",
+	       "(3, md5(3::text))", "3", $xid,
+	       "test skipping stream-prepare and commit prepared");
+
+my $res = $node_subscriber->safe_psql(
+    'postgres',
+    "SELECT count(*) FROM pg_prepared_xacts");
+is($res, "0", "check all prepared transactions are resolved on the subscriber");
-- 
2.24.3 (Apple Git-128)

