From 7a410e4dd45afaafda7bbaf8153dc965bfd4518a Mon Sep 17 00:00:00 2001
From: Masahiko Sawada <sawada.mshk@gmail.com>
Date: Mon, 2 Aug 2021 14:27:40 +0900
Subject: [PATCH v13 3/3] Add skip_xid option to ALTER SUBSCRIPTION.

If incoming change violates any constraint, logical replication stops
until it's resolved.  This commit introduces another way to skip the
transaction in question, other than manually updating the subscriber's
database or using pg_replication_origin_advance().

The user can specify XID by ALTER SUBSCRIPTION ... SET (skip_xid =
XXX), updating pg_subscription.subskipxid field, telling the apply
worker to skip the transaction. The apply worker skips all data
modification changes within the specified transaction.

After skipping the transaciton the apply worker clears
pg_subscription.subskipxid.  Also, it clears the error statistics of
the subscription in pg_stat_subscription_errors system view as well in
order the user not to get confused.  It's done by sending the message
for clearing a subscription error to the stats collector.
---
 doc/src/sgml/logical-replication.sgml      |  56 ++++-
 doc/src/sgml/ref/alter_subscription.sgml   |  37 +++-
 src/backend/catalog/pg_subscription.c      |  10 +
 src/backend/commands/subscriptioncmds.c    |  42 +++-
 src/backend/postmaster/pgstat.c            |  47 +++-
 src/backend/replication/logical/worker.c   | 195 +++++++++++++++-
 src/include/catalog/pg_subscription.h      |   4 +
 src/include/nodes/parsenodes.h             |   1 +
 src/include/pgstat.h                       |   7 +-
 src/test/regress/expected/subscription.out |  13 ++
 src/test/regress/sql/subscription.sql      |  11 +
 src/test/subscription/t/024_skip_xact.pl   | 244 +++++++++++++++++++++
 12 files changed, 646 insertions(+), 21 deletions(-)
 create mode 100644 src/test/subscription/t/024_skip_xact.pl

diff --git a/doc/src/sgml/logical-replication.sgml b/doc/src/sgml/logical-replication.sgml
index 88646bc859..f7da60290a 100644
--- a/doc/src/sgml/logical-replication.sgml
+++ b/doc/src/sgml/logical-replication.sgml
@@ -333,20 +333,68 @@
   <para>
    A conflict will produce an error and will stop the replication; it must be
    resolved manually by the user.  Details about the conflict can be found in
-   the subscriber's server log.
+   <xref linkend="monitoring-pg-stat-subscription-errors"/> as well as the
+   subscriber's server log.
   </para>
 
   <para>
    The resolution can be done either by changing data on the subscriber so
-   that it does not conflict with the incoming change or by skipping the
-   transaction that conflicts with the existing data.  The transaction can be
-   skipped by calling the <link linkend="pg-replication-origin-advance">
+   that it does not conflict with the incoming changes or by skipping the whole
+   transaction.  This option specifies the ID of the transaction whose
+   application is to be skipped by the logical replication worker.  The logical
+   replication worker skips all data modification transaction conflicts with
+   the existing data.  When a conflict produce an error, it is shown in
+   <structname>pg_stat_subscription_errors</structname> view as follows:
+  </para>
+
+  <programlisting>
+postgres=# SELECT * FROM pg_stat_subscription_errors;
+-[ RECORD 1 ]--------+-----------------------------------------------------------
+datname              | postgres
+subid                | 16395
+subname              | test_sub
+relid                | 16385
+command              | INSERT
+xid                  | 716
+failure_source       | apply
+failure_count        | 50
+last_failure         | 2021-07-21 21:16:02.781779+00
+last_failure_message | duplicate key value violates unique constraint "test_pkey"
+stats_reset          |
+</programlisting>
+
+  <para>
+   and it is also shown in subscriber's server log as follows:
+  </para>
+
+<screen>
+ERROR:  duplicate key value violates unique constraint "test_pkey"
+DETAIL:  Key (id)=(1) already exists.
+CONTEXT:  processing remote data during "INSERT" for replication target relation "public.test" in transaction 716 with commit timestamp 2021-07-15 21:54:58.802874+00
+</screen>
+
+  <para>
+   The transaction ID that contains the change violating the constraint can be
+   found from those outputs (transaction ID 716 in the above case).  The transaction
+   can be skipped by setting <replaceable>skip_xid</replaceable> on the subscription
+   by <command>ALTER SUBSCRIPTION ... SET</command>.  Alternatively, the transaction
+   can also be skipped by calling the <link linkend="pg-replication-origin-advance">
    <function>pg_replication_origin_advance()</function></link> function with
    a <parameter>node_name</parameter> corresponding to the subscription name,
    and a position.  The current position of origins can be seen in the
    <link linkend="view-pg-replication-origin-status">
    <structname>pg_replication_origin_status</structname></link> system view.
   </para>
+
+  <para>
+   In this case, you need to consider changing the data on the subscriber so that it
+   doesn't conflict with incoming changes, or dropping the conflicting constraint or
+   unique index, or writing a trigger on the subscriber to suppress or redirect
+   conflicting incoming changes, or as a last resort, by skipping the whole transaction.
+   They skip the whole transaction, including changes that may not violate any
+   constraint.  They may easily make the subscriber inconsistent, especially if
+   a user specifies the wrong transaction ID or the position of origin.
+  </para>
  </sect1>
 
  <sect1 id="logical-replication-restrictions">
diff --git a/doc/src/sgml/ref/alter_subscription.sgml b/doc/src/sgml/ref/alter_subscription.sgml
index 376fc154b1..1f6c05c5d5 100644
--- a/doc/src/sgml/ref/alter_subscription.sgml
+++ b/doc/src/sgml/ref/alter_subscription.sgml
@@ -202,8 +202,41 @@ ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> RENAME TO <
       <literal>streaming</literal>.
      </para>
      <para>
-       The parameters that can be reset are: <literal>streaming</literal>,
-       <literal>binary</literal>, <literal>synchronous_commit</literal>.
+      The parameters that can be reset are: <literal>slot_name</literal>,
+      <literal>synchronous_commit</literal>, <literal>binary</literal>,
+      <literal>streaming</literal>, and following parameter:
+     </para>
+     <para>
+      <variablelist>
+       <varlistentry>
+        <term><literal>skip_xid</literal> (<type>xid</type>)</term>
+        <listitem>
+         <para>
+          If incoming data violates any constraints the logical replication
+          will stop until it is resolved.  The resolution can be done either
+          by changing data on the subscriber so that it doesn't conflict with
+          incoming change or by skipping the whole transaction.  This option
+          specifies transaction ID that logical replication worker skips to
+          apply.  The logical replication worker skips all data modification
+          changes within the specified transaction.  Therefore, since it skips
+          the whole transaction including the changes that may not violate the
+          constraint, it should only be used as a last resort.  This option has
+          no effect for the transaction that is already prepared with enabling
+          <literal>two_phase</literal> on susbscriber.  After the logical
+          replication successfully skips the transaction, the transaction ID
+          (stored in
+          <structname>pg_subscription</structname>.<structfield>subskipxid</structfield>)
+          is cleared.  See <xref linkend="logical-replication-conflicts"/> for
+          the details of logical replication conflicts.
+         </para>
+
+         <para>
+          Setting and resetting of <literal>skip_xid</literal> option is
+          restrited to superusers.
+         </para>
+        </listitem>
+       </varlistentry>
+      </variablelist>
      </para>
     </listitem>
    </varlistentry>
diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c
index 25021e25a4..cb22cd7463 100644
--- a/src/backend/catalog/pg_subscription.c
+++ b/src/backend/catalog/pg_subscription.c
@@ -104,6 +104,16 @@ GetSubscription(Oid subid, bool missing_ok)
 	Assert(!isnull);
 	sub->publications = textarray_to_stringlist(DatumGetArrayTypeP(datum));
 
+	/* Get skip XID */
+	datum = SysCacheGetAttr(SUBSCRIPTIONOID,
+							tup,
+							Anum_pg_subscription_subskipxid,
+							&isnull);
+	if (!isnull)
+		sub->skipxid = DatumGetTransactionId(datum);
+	else
+		sub->skipxid = InvalidTransactionId;
+
 	ReleaseSysCache(tup);
 
 	return sub;
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 896ec8b836..fd74037fb8 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -60,6 +60,7 @@
 #define SUBOPT_BINARY				0x00000080
 #define SUBOPT_STREAMING			0x00000100
 #define SUBOPT_TWOPHASE_COMMIT		0x00000200
+#define SUBOPT_SKIP_XID				0x00000400
 
 /* check if the 'val' has 'bits' set */
 #define IsSet(val, bits)  (((val) & (bits)) == (bits))
@@ -81,6 +82,7 @@ typedef struct SubOpts
 	bool		binary;
 	bool		streaming;
 	bool		twophase;
+	TransactionId skip_xid;
 } SubOpts;
 
 static List *fetch_table_list(WalReceiverConn *wrconn, List *publications);
@@ -129,6 +131,8 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
 		opts->streaming = false;
 	if (IsSet(supported_opts, SUBOPT_TWOPHASE_COMMIT))
 		opts->twophase = false;
+	if (IsSet(supported_opts, SUBOPT_SKIP_XID))
+		opts->skip_xid = InvalidTransactionId;
 
 	/* Parse options */
 	foreach(lc, stmt_options)
@@ -261,6 +265,29 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
 			opts->specified_opts |= SUBOPT_TWOPHASE_COMMIT;
 			opts->twophase = defGetBoolean(defel);
 		}
+		else if (strcmp(defel->defname, "skip_xid") == 0)
+		{
+			if (IsSet(opts->specified_opts, SUBOPT_SKIP_XID))
+				errorConflictingDefElem(defel, pstate);
+
+			if (!is_reset)
+			{
+				char	   *xid_str = defGetString(defel);
+				TransactionId xid;
+
+				/* Parse the argument as TransactionId */
+				xid = DatumGetTransactionId(DirectFunctionCall1(xidin,
+																CStringGetDatum(xid_str)));
+
+				if (!TransactionIdIsNormal(xid))
+					ereport(ERROR,
+							(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+							 errmsg("invalid transaction id")));
+				opts->skip_xid = xid;
+			}
+
+			opts->specified_opts |= SUBOPT_SKIP_XID;
+		}
 		else
 			ereport(ERROR,
 					(errcode(ERRCODE_SYNTAX_ERROR),
@@ -485,6 +512,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 		CharGetDatum(opts.twophase ?
 					 LOGICALREP_TWOPHASE_STATE_PENDING :
 					 LOGICALREP_TWOPHASE_STATE_DISABLED);
+	nulls[Anum_pg_subscription_subskipxid - 1] = true;
 	values[Anum_pg_subscription_subconninfo - 1] =
 		CStringGetTextDatum(conninfo);
 	if (opts.slot_name)
@@ -888,7 +916,7 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 
 				if (is_reset)
 					supported_opts = (SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY |
-									  SUBOPT_STREAMING);
+									  SUBOPT_STREAMING | SUBOPT_SKIP_XID);
 				else
 					supported_opts = (SUBOPT_SLOT_NAME |
 									  SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY |
@@ -941,6 +969,18 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 					replaces[Anum_pg_subscription_substream - 1] = true;
 				}
 
+				if (IsSet(opts.specified_opts, SUBOPT_SKIP_XID))
+				{
+					if (!superuser())
+						ereport(ERROR,
+								(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
+								 errmsg("must be superuser to set %s", "skip_xid")));
+
+					values[Anum_pg_subscription_subskipxid - 1] =
+						TransactionIdGetDatum(opts.skip_xid);
+					replaces[Anum_pg_subscription_subskipxid - 1] = true;
+				}
+
 				update_tuple = true;
 				break;
 			}
diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c
index 0178186838..da4c493131 100644
--- a/src/backend/postmaster/pgstat.c
+++ b/src/backend/postmaster/pgstat.c
@@ -1744,11 +1744,32 @@ pgstat_reset_subscription_error(Oid subid, Oid subrelid)
 	pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_SUBSCRIPTIONERR);
 	msg.m_subid = subid;
 	msg.m_subrelid = subrelid;
+	msg.m_clear = false;
 	msg.m_reset = true;
 
 	pgstat_send(&msg, offsetof(PgStat_MsgSubscriptionErr, m_reset) + sizeof(bool));
 }
 
+/* ----------
+ * pgstat_clear_subscription_error() -
+ *
+ *	Tell the collector to clear the error of subscription.
+ * ----------
+ */
+void
+pgstat_clear_subscription_error(Oid subid, Oid subrelid)
+{
+	PgStat_MsgSubscriptionErr msg;
+
+	pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_SUBSCRIPTIONERR);
+	msg.m_subid = subid;
+	msg.m_subrelid = subrelid;
+	msg.m_clear = true;
+	msg.m_reset = false;
+
+	pgstat_send(&msg, offsetof(PgStat_MsgSubscriptionErr, m_reset) + sizeof(bool));
+}
+
 /* ----------
  * pgstat_report_autovac() -
  *
@@ -2038,6 +2059,7 @@ pgstat_report_subscription_error(Oid subid, Oid subrelid, Oid relid,
 	msg.m_subid = subid;
 	msg.m_subrelid = subrelid;
 	msg.m_reset = false;
+	msg.m_clear = false;
 	msg.m_relid = relid;
 	msg.m_command = command;
 	msg.m_xid = xid;
@@ -6158,24 +6180,39 @@ pgstat_recv_funcpurge(PgStat_MsgFuncpurge *msg, int len)
 static void
 pgstat_recv_subscription_error(PgStat_MsgSubscriptionErr *msg, int len)
 {
+
 	PgStat_StatSubErrEntry *errent;
-	bool		create = !msg->m_reset;
+	bool		create = !(msg->m_reset || msg->m_clear);
 
 	/* Get subscription error */
 	errent = pgstat_get_subscription_error_entry(msg->m_subid,
 												 msg->m_subrelid,
 												 create);
 
-	if (msg->m_reset)
+	if (msg->m_reset || msg->m_clear)
 	{
 		Assert(!create);
+		Assert(!(msg->m_reset && msg->m_clear));
 
 		if (errent == NULL)
 			return;
 
-		/* reset fields and set reset timestamp */
-		pgstat_reset_subscription_error_entry(errent,
-											  GetCurrentTimestamp());
+		/* Both clear and reset initialize these fields */
+		errent->relid = InvalidOid;
+		errent->command = 0;
+		errent->xid = InvalidTransactionId;
+		errent->failure_count = 0;
+
+		/*
+		 * If the reset is requested, reset more fields and set the reset
+		 * timestamp.
+		 */
+		if (msg->m_reset)
+		{
+			errent->last_failure = 0;
+			errent->last_errmsg[0] = '\0';
+			errent->stat_reset_timestamp = GetCurrentTimestamp();
+		}
 	}
 	else
 	{
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index e91fa86b1a..b20de5909b 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -136,6 +136,7 @@
 #include "access/xact.h"
 #include "access/xlog_internal.h"
 #include "catalog/catalog.h"
+#include "catalog/indexing.h"
 #include "catalog/namespace.h"
 #include "catalog/partition.h"
 #include "catalog/pg_inherits.h"
@@ -255,6 +256,21 @@ static bool in_streamed_transaction = false;
 
 static TransactionId stream_xid = InvalidTransactionId;
 
+/*
+ * skipping_xid is a valid XID if we're skipping all data modification changes
+ * (INSERT/DELETE/UPDATE/TRUNCATE) of the specified transaction in MySubscription->skipxid.
+ * Please note that we don’t skip receiving the changes particularly in streaming
+ * cases, since we decide whether or not to skip applying the changes when starting
+ * to apply.  Once starting skipping changes, we copy the XID to skipping_xid and
+ * then don't stop skipping until we skip the whole transaction even if the
+ * subscription is invalidated and MySubscription->skipxid gets changed or reset.
+ * When stopping the skipping behavior, we reset the skip XID (subskipxid) in the
+ * pg_subscription catalog and associate origin status to the transaction that resets
+ * the skip XID so that we can start streaming from the next transaction.
+ */
+static TransactionId skipping_xid = InvalidTransactionId;
+#define is_skipping_changes() (TransactionIdIsValid(skipping_xid))
+
 /* BufFile handle of the current streaming file */
 static BufFile *stream_fd = NULL;
 
@@ -335,6 +351,9 @@ static void apply_error_callback(void *arg);
 static inline void set_apply_error_context_xact(TransactionId xid, TimestampTz ts);
 static inline void reset_apply_error_context_info(void);
 
+static void maybe_start_skipping_changes(TransactionId xid);
+static void stop_skipping_changes(XLogRecPtr origin_lsn, TimestampTz origin_committs);
+
 /*
  * Should this worker apply changes for given relation.
  *
@@ -789,6 +808,11 @@ apply_handle_begin(StringInfo s)
 
 	remote_final_lsn = begin_data.final_lsn;
 
+	/*
+	 * Enable skipping all changes of this transaction if specified.
+	 */
+	maybe_start_skipping_changes(begin_data.xid);
+
 	in_remote_transaction = true;
 
 	pgstat_report_activity(STATE_RUNNING, NULL);
@@ -813,7 +837,18 @@ apply_handle_commit(StringInfo s)
 								 LSN_FORMAT_ARGS(commit_data.commit_lsn),
 								 LSN_FORMAT_ARGS(remote_final_lsn))));
 
-	apply_handle_commit_internal(&commit_data);
+	/*
+	 * Stop the skipping transaction if enabled. Otherwise, commit the changes
+	 * that are just applied.
+	 */
+	if (is_skipping_changes())
+	{
+		stop_skipping_changes(commit_data.end_lsn, commit_data.committime);
+		store_flush_position(commit_data.end_lsn);
+		in_remote_transaction = false;
+	}
+	else
+		apply_handle_commit_internal(&commit_data);
 
 	/* Process any tables that are being synchronized in parallel. */
 	process_syncing_tables(commit_data.end_lsn);
@@ -841,6 +876,9 @@ apply_handle_begin_prepare(StringInfo s)
 
 	remote_final_lsn = begin_data.prepare_lsn;
 
+	/* Enable skipping all changes of this transaction if specified */
+	maybe_start_skipping_changes(begin_data.xid);
+
 	in_remote_transaction = true;
 
 	pgstat_report_activity(STATE_RUNNING, NULL);
@@ -899,9 +937,10 @@ apply_handle_prepare(StringInfo s)
 
 	/*
 	 * Unlike commit, here, we always prepare the transaction even though no
-	 * change has happened in this transaction. It is done this way because at
-	 * commit prepared time, we won't know whether we have skipped preparing a
-	 * transaction because of no change.
+	 * change has happened in this transaction, possibly because we're
+	 * skipping data-modification changes of this transaction. It is done this
+	 * way because at commit prepared time, we won't know whether we have
+	 * skipped preparing a transaction because of no change.
 	 *
 	 * XXX, We can optimize such that at commit prepared time, we first check
 	 * whether we have prepared the transaction or not but that doesn't seem
@@ -915,6 +954,10 @@ apply_handle_prepare(StringInfo s)
 	CommitTransactionCommand();
 	pgstat_report_stat(false);
 
+	/* Stop the skipping changes if enabled */
+	if (is_skipping_changes())
+		stop_skipping_changes(InvalidXLogRecPtr, 0);
+
 	store_flush_position(prepare_data.end_lsn);
 
 	in_remote_transaction = false;
@@ -1046,6 +1089,9 @@ apply_handle_stream_prepare(StringInfo s)
 
 	elog(DEBUG1, "received prepare for streamed transaction %u", prepare_data.xid);
 
+	/* Enable skipping all changes of this transaction if specified. */
+	maybe_start_skipping_changes(prepare_data.xid);
+
 	/* Replay all the spooled operations. */
 	apply_spooled_messages(prepare_data.xid, prepare_data.prepare_lsn);
 
@@ -1056,6 +1102,10 @@ apply_handle_stream_prepare(StringInfo s)
 
 	pgstat_report_stat(false);
 
+	/* Stop the skipping changes if enabled */
+	if (is_skipping_changes())
+		stop_skipping_changes(InvalidXLogRecPtr, 0);
+
 	store_flush_position(prepare_data.end_lsn);
 
 	in_remote_transaction = false;
@@ -1081,9 +1131,10 @@ apply_handle_origin(StringInfo s)
 {
 	/*
 	 * ORIGIN message can only come inside streaming transaction or inside
-	 * remote transaction and before any actual writes.
+	 * remote transaction and before any actual writes unless we're skipping
+	 * changes of the transaction.
 	 */
-	if (!in_streamed_transaction &&
+	if (!in_streamed_transaction && !is_skipping_changes() &&
 		(!in_remote_transaction ||
 		 (IsTransactionState() && !am_tablesync_worker())))
 		ereport(ERROR,
@@ -1206,6 +1257,7 @@ apply_handle_stream_abort(StringInfo s)
 				 errmsg_internal("STREAM ABORT message without STREAM STOP")));
 
 	logicalrep_read_stream_abort(s, &xid, &subxid);
+	maybe_start_skipping_changes(xid);
 
 	/*
 	 * If the two XIDs are the same, it's in fact abort of toplevel xact, so
@@ -1289,6 +1341,10 @@ apply_handle_stream_abort(StringInfo s)
 		CommitTransactionCommand();
 	}
 
+	/* Stop the skipping transaction if enabled */
+	if (is_skipping_changes())
+		stop_skipping_changes(InvalidXLogRecPtr, 0);
+
 	reset_apply_error_context_info();
 }
 
@@ -1428,9 +1484,23 @@ apply_handle_stream_commit(StringInfo s)
 
 	elog(DEBUG1, "received commit for streamed transaction %u", xid);
 
+	/* Enable skipping all changes of this transaction if specified */
+	maybe_start_skipping_changes(xid);
+
 	apply_spooled_messages(xid, commit_data.commit_lsn);
 
-	apply_handle_commit_internal(&commit_data);
+	if (is_skipping_changes())
+	{
+		stop_skipping_changes(commit_data.end_lsn, commit_data.committime);
+
+		store_flush_position(commit_data.end_lsn);
+		in_remote_transaction = false;
+	}
+	else
+	{
+		/* commit the streamed transaction */
+		apply_handle_commit_internal(&commit_data);
+	}
 
 	/* unlink the files with serialized changes and subxact info */
 	stream_cleanup_files(MyLogicalRepWorker->subid, xid);
@@ -2316,6 +2386,17 @@ apply_dispatch(StringInfo s)
 	LogicalRepMsgType action = pq_getmsgbyte(s);
 	LogicalRepMsgType saved_command;
 
+	/*
+	 * Skip all data-modification changes if we're skipping changes of this
+	 * transaction.
+	 */
+	if (is_skipping_changes() &&
+		(action == LOGICAL_REP_MSG_INSERT ||
+		 action == LOGICAL_REP_MSG_UPDATE ||
+		 action == LOGICAL_REP_MSG_DELETE ||
+		 action == LOGICAL_REP_MSG_TRUNCATE))
+		return;
+
 	/*
 	 * Set the current command being applied. Since this function can be
 	 * called recusively when applying spooled changes, save the current
@@ -3662,3 +3743,103 @@ reset_apply_error_context_info(void)
 	apply_error_callback_arg.remote_attnum = -1;
 	set_apply_error_context_xact(InvalidTransactionId, 0);
 }
+
+/*
+ * Start skipping changes of the transaction if the given XID matches the
+ * transaction ID specified by skip_xid option.
+ */
+static void
+maybe_start_skipping_changes(TransactionId xid)
+{
+	Assert(!is_skipping_changes());
+	Assert(!TransactionIdIsValid(skipping_xid));
+	Assert(!in_remote_transaction);
+	Assert(!in_streamed_transaction);
+
+	if (!TransactionIdIsValid(MySubscription->skipxid) ||
+		MySubscription->skipxid != xid)
+		return;
+
+	skipping_xid = xid;
+	ereport(LOG,
+			errmsg("start skipping logical replication transaction with xid %u",
+				   skipping_xid));
+}
+
+/*
+ * Stop skipping changes and reset the skip XID.  Also, reset the skip XID
+ * (pg_subscription.subskipxid). If origin_lsn and origin_committs are valid, we
+ * set origin state to the transaction commit that resets the skip XID so that we
+ * can start streaming from the transaction next to the one that we just skipped.
+ */
+static void
+stop_skipping_changes(XLogRecPtr origin_lsn, TimestampTz origin_committs)
+{
+	Relation	rel;
+	HeapTuple	tup;
+	bool		nulls[Natts_pg_subscription];
+	bool		replaces[Natts_pg_subscription];
+	Datum		values[Natts_pg_subscription];
+
+	Assert(is_skipping_changes());
+
+	ereport(LOG,
+			(errmsg("done skipping logical replication transaction with xid %u",
+					skipping_xid)));
+
+	/* Stop skipping changes */
+	skipping_xid = InvalidTransactionId;
+
+	memset(values, 0, sizeof(values));
+	memset(nulls, false, sizeof(nulls));
+	memset(replaces, false, sizeof(replaces));
+
+	if (!IsTransactionState())
+		StartTransactionCommand();
+
+	rel = table_open(SubscriptionRelationId, RowExclusiveLock);
+
+	/* Fetch the existing tuple. */
+	tup = SearchSysCacheCopy1(SUBSCRIPTIONOID,
+							  TransactionIdGetDatum(MySubscription->oid));
+
+	if (!HeapTupleIsValid(tup))
+		elog(ERROR, "subscription \"%s\" does not exist", MySubscription->name);
+
+	/* Set subskipxid to null */
+	nulls[Anum_pg_subscription_subskipxid - 1] = true;
+	replaces[Anum_pg_subscription_subskipxid - 1] = true;
+
+	/* Update the system catalog to reset the skip XID */
+	tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
+							replaces);
+	CatalogTupleUpdate(rel, &tup->t_self, tup);
+
+	heap_freetuple(tup);
+	table_close(rel, RowExclusiveLock);
+
+	if (!XLogRecPtrIsInvalid(origin_lsn))
+	{
+		/*
+		 * Update origin state so we can restart streaming from correct
+		 * position in case of crash.
+		 */
+		replorigin_session_origin_lsn = origin_lsn;
+		replorigin_session_origin_timestamp = origin_committs;
+	}
+
+	CommitTransactionCommand();
+	pgstat_report_stat(false);
+
+	/*
+	 * Clear the error statistics of this subscription to let users know the
+	 * subscription is no longer getting stuck by the conflict.
+	 *
+	 * The message for clearing the error statistics can be lost but that's
+	 * okay. The user can know the logical replication is working fine in
+	 * other ways, for example, checking pg_stat_subscription view. And the
+	 * user is able to reset the single subscription error statistics by
+	 * pg_reset_subscription_error SQL function.
+	 */
+	pgstat_clear_subscription_error(MySubscription->oid, InvalidOid);
+}
diff --git a/src/include/catalog/pg_subscription.h b/src/include/catalog/pg_subscription.h
index 21061493ea..beaa6e646d 100644
--- a/src/include/catalog/pg_subscription.h
+++ b/src/include/catalog/pg_subscription.h
@@ -67,6 +67,9 @@ CATALOG(pg_subscription,6100,SubscriptionRelationId) BKI_SHARED_RELATION BKI_ROW
 
 	char		subtwophasestate;	/* Stream two-phase transactions */
 
+	TransactionId subskipxid BKI_FORCE_NULL;	/* All changes associated with
+												 * this XID are skipped */
+
 #ifdef CATALOG_VARLEN			/* variable-length fields start here */
 	/* Connection string to the publisher */
 	text		subconninfo BKI_FORCE_NOT_NULL;
@@ -103,6 +106,7 @@ typedef struct Subscription
 								 * binary format */
 	bool		stream;			/* Allow streaming in-progress transactions. */
 	char		twophasestate;	/* Allow streaming two-phase transactions */
+	TransactionId skipxid;		/* All changes of the XID are skipped */
 	char	   *conninfo;		/* Connection string to the publisher */
 	char	   *slotname;		/* Name of the replication slot */
 	char	   *synccommit;		/* Synchronous commit setting for worker */
diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h
index 5424380bb7..c5afcad231 100644
--- a/src/include/nodes/parsenodes.h
+++ b/src/include/nodes/parsenodes.h
@@ -3694,6 +3694,7 @@ typedef struct AlterSubscriptionStmt
 	char	   *conninfo;		/* Connection string to publisher */
 	List	   *publication;	/* One or more publication to subscribe to */
 	List	   *options;		/* List of DefElem nodes */
+	TransactionId skip_xid;		/* XID to skip */
 } AlterSubscriptionStmt;
 
 typedef struct DropSubscriptionStmt
diff --git a/src/include/pgstat.h b/src/include/pgstat.h
index 6ff8720631..5ed1319743 100644
--- a/src/include/pgstat.h
+++ b/src/include/pgstat.h
@@ -536,7 +536,7 @@ typedef struct PgStat_MsgReplSlot
 
 /* ----------
  * PgStat_MsgSubscriptionErr	Sent by the apply worker or the table sync worker to
- *								update/reset the error happening during logical
+ *								update/reset/clear the error happening during logical
  *								replication.
  * ----------
  */
@@ -554,7 +554,9 @@ typedef struct PgStat_MsgSubscriptionErr
 	Oid			m_subid;
 	Oid			m_subrelid;
 
-	/* The reset message uses below field */
+	/* The clear and reset messages use below fields */
+	bool		m_clear;		/* clear all fields except for last_failure,
+								   last_errmsg and stat_reset_timestamp. */
 	bool		m_reset;		/* Reset all fields and set reset_stats
 								 * timestamp */
 
@@ -1111,6 +1113,7 @@ extern void pgstat_reset_single_counter(Oid objectid, PgStat_Single_Reset_Type t
 extern void pgstat_reset_slru_counter(const char *);
 extern void pgstat_reset_replslot_counter(const char *name);
 extern void pgstat_reset_subscription_error(Oid subid, Oid subrelid);
+extern void pgstat_clear_subscription_error(Oid subid, Oid subrelid);
 
 extern void pgstat_report_autovac(Oid dboid);
 extern void pgstat_report_vacuum(Oid tableoid, bool shared,
diff --git a/src/test/regress/expected/subscription.out b/src/test/regress/expected/subscription.out
index e4c16cab66..e4dc4fb946 100644
--- a/src/test/regress/expected/subscription.out
+++ b/src/test/regress/expected/subscription.out
@@ -293,6 +293,19 @@ ERROR:  unrecognized subscription parameter: "enabled"
 -- fail - RESET must not include values
 ALTER SUBSCRIPTION regress_testsub RESET (synchronous_commit = off);
 ERROR:  RESET must not include values for parameters
+-- it works
+ALTER SUBSCRIPTION regress_testsub SET (skip_xid = 3);
+ALTER SUBSCRIPTION regress_testsub SET (skip_xid = '4294967295');
+ALTER SUBSCRIPTION regress_testsub RESET (skip_xid);
+-- fail - invalid XID
+ALTER SUBSCRIPTION regress_testsub SET (skip_xid = 1.1);
+ERROR:  invalid transaction id
+ALTER SUBSCRIPTION regress_testsub SET (skip_xid = 0);
+ERROR:  invalid transaction id
+ALTER SUBSCRIPTION regress_testsub SET (skip_xid = 1);
+ERROR:  invalid transaction id
+ALTER SUBSCRIPTION regress_testsub SET (skip_xid = 2);
+ERROR:  invalid transaction id
 \dRs+
                                                                      List of subscriptions
       Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit |          Conninfo           
diff --git a/src/test/regress/sql/subscription.sql b/src/test/regress/sql/subscription.sql
index 3b0fbea897..c458b38985 100644
--- a/src/test/regress/sql/subscription.sql
+++ b/src/test/regress/sql/subscription.sql
@@ -228,6 +228,17 @@ ALTER SUBSCRIPTION regress_testsub RESET (enabled);
 -- fail - RESET must not include values
 ALTER SUBSCRIPTION regress_testsub RESET (synchronous_commit = off);
 
+-- it works
+ALTER SUBSCRIPTION regress_testsub SET (skip_xid = 3);
+ALTER SUBSCRIPTION regress_testsub SET (skip_xid = '4294967295');
+ALTER SUBSCRIPTION regress_testsub RESET (skip_xid);
+
+-- fail - invalid XID
+ALTER SUBSCRIPTION regress_testsub SET (skip_xid = 1.1);
+ALTER SUBSCRIPTION regress_testsub SET (skip_xid = 0);
+ALTER SUBSCRIPTION regress_testsub SET (skip_xid = 1);
+ALTER SUBSCRIPTION regress_testsub SET (skip_xid = 2);
+
 \dRs+
 
 ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
diff --git a/src/test/subscription/t/024_skip_xact.pl b/src/test/subscription/t/024_skip_xact.pl
new file mode 100644
index 0000000000..affb663803
--- /dev/null
+++ b/src/test/subscription/t/024_skip_xact.pl
@@ -0,0 +1,244 @@
+
+# Copyright (c) 2021, PostgreSQL Global Development Group
+
+# Tests for skipping logical replication transactions
+use strict;
+use warnings;
+use PostgresNode;
+use TestLib;
+use Test::More tests => 10;
+
+# Test if the error reported on pg_subscription_errors view is expected.
+sub test_subscription_error
+{
+    my ($node, $source, $relname, $expected_error, $msg) = @_;
+
+    # Wait for the error statistics to be updated.
+    $node->poll_query_until(
+	    'postgres', qq[
+SELECT count(1) > 0 FROM pg_stat_subscription_errors
+WHERE relid = '$relname'::regclass AND failure_source = '$source';
+]) or die "Timed out while waiting for statistics to be updated";
+
+    my $result = $node->safe_psql(
+	'postgres',
+	qq[
+SELECT datname, subname, command, relid::regclass, failure_source, failure_count > 0
+FROM pg_stat_subscription_errors
+WHERE relid = '$relname'::regclass AND failure_source = '$source';
+]);
+    is($result, $expected_error, $msg);
+}
+
+# Check the error reported on pg_stat_subscription view and skip the failed
+# transaction.
+sub test_skip_subscription_error
+{
+    my ($node, $source, $subname, $relname, $expected_error, $msg) = @_;
+
+    # Check the reported error.
+    test_subscription_error($node, $source, $relname, $expected_error, $msg);
+
+    # Get XID of the failed transaction.
+    my $skipxid = $node->safe_psql(
+	'postgres',
+	"SELECT xid FROM pg_stat_subscription_errors WHERE relid = '$relname'::regclass");
+    $node->safe_psql('postgres',
+		     "ALTER SUBSCRIPTION $subname SET (skip_xid = '$skipxid')");
+
+    # Restart the subscriber to restart logical replication without interval.
+    $node->restart;
+
+    # Wait for the failed transaction to be skipped.
+    $node->poll_query_until(
+	'postgres',
+	qq[
+SELECT subskipxid IS NULL FROM pg_subscription
+WHERE subname = '$subname'
+]) or die "Timed out while waiting for the transaction to be skipped";
+
+    # Also wait for the error details to be cleared.
+    $node->poll_query_until(
+	'postgres',
+	qq[
+SELECT command IS NULL FROM pg_stat_subscription_errors
+WHERE subname = '$subname' AND failure_source = '$source';
+]) or die "Timed out while waiting for the transaction to be skipped";
+}
+
+# Create publisher node.
+my $node_publisher = PostgresNode->new('publisher');
+$node_publisher->init(allows_streaming => 'logical');
+$node_publisher->append_conf('postgresql.conf',
+			     qq[
+max_prepared_transactions = 10
+logical_decoding_work_mem = 64kB
+]);
+$node_publisher->start;
+
+# Create subscriber node.
+my $node_subscriber = PostgresNode->new('subscriber');
+$node_subscriber->init(allows_streaming => 'logical');
+
+# don't overflow the server log with error messages.
+$node_subscriber->append_conf('postgresql.conf',
+			      qq[
+max_prepared_transactions = 10
+wal_retrieve_retry_interval = 5s
+]);
+$node_subscriber->start;
+
+# Initial table setup on both publisher and subscriber. On subscriber we create
+# the same tables but with primary keys. Also, insert some data that will conflict
+# with the data replicated from publisher later.
+$node_publisher->safe_psql('postgres',
+			   q[
+BEGIN;
+CREATE TABLE test_tab1 (a int);
+CREATE TABLE test_tab2 (a int);
+CREATE TABLE test_tab_streaming (a int, b text);
+INSERT INTO test_tab1 VALUES (1);
+INSERT INTO test_tab2 VALUES (1);
+COMMIT;
+]);
+$node_subscriber->safe_psql('postgres',
+			    q[
+BEGIN;
+CREATE TABLE test_tab1 (a int primary key);
+CREATE TABLE test_tab2 (a int primary key);
+CREATE TABLE test_tab_streaming (a int primary key, b text);
+INSERT INTO test_tab2 VALUES (1);
+INSERT INTO test_tab_streaming SELECT 10000, md5(10000::text);
+COMMIT;
+]);
+
+# Check if there is no subscription errors before starting logical replication.
+my $result =
+    $node_subscriber->safe_psql('postgres',
+				"SELECT count(1) FROM pg_stat_subscription_errors");
+is($result, qq(0), 'check no subscription error');
+
+# Setup publications.
+my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
+$node_publisher->safe_psql('postgres',
+			   q[
+CREATE PUBLICATION tap_pub FOR TABLE test_tab1, test_tab2;
+CREATE PUBLICATION tap_pub_streaming FOR TABLE test_tab_streaming;
+]);
+
+# Create subscriptions. The table sync for test_tab2 on tap_sub will enter to
+# infinite error due to violating the unique constraint.
+my $appname = 'tap_sub';
+$node_subscriber->safe_psql(
+    'postgres',
+    "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub WITH (two_phase = on);");
+my $appname_streaming = 'tap_sub_streaming';
+$node_subscriber->safe_psql(
+    'postgres',
+    "CREATE SUBSCRIPTION tap_sub_streaming CONNECTION '$publisher_connstr application_name=$appname_streaming' PUBLICATION tap_pub_streaming WITH (streaming = on, two_phase = on);");
+
+$node_publisher->wait_for_catchup($appname);
+$node_publisher->wait_for_catchup($appname_streaming);
+
+# Wait for initial table sync for test_tab1 and test_tab_streaming to finish.
+$node_subscriber->poll_query_until('postgres',
+				   q[
+SELECT count(1) = 2 FROM pg_subscription_rel
+WHERE srrelid in ('test_tab1'::regclass, 'test_tab_streaming'::regclass) AND srsubstate = 'r'
+]) or die "Timed out while waiting for subscriber to synchronize data";
+
+# Check the initial data.
+$result = $node_subscriber->safe_psql('postgres',
+				      "SELECT count(a) FROM test_tab1");
+is($result, q(1), 'check initial data was copied to subscriber');
+
+# Insert more data to test_tab1, raising an error on the subscriber due to violating
+# the unique constraint on test_tab1. Then skip the transaction in question.
+$node_publisher->safe_psql('postgres',
+			   qq[
+BEGIN;
+INSERT INTO test_tab1 VALUES (1);
+COMMIT;
+]);
+test_skip_subscription_error($node_subscriber,
+			     'apply', 'tap_sub', 'test_tab1',
+			     qq(postgres|tap_sub|INSERT|test_tab1|apply|t),
+			     'skip the error reported by the apply worker');
+
+# Check the table sync worker's error in the view.
+test_subscription_error($node_subscriber,
+			'tablesync', 'test_tab2',
+			qq(postgres|tap_sub||test_tab2|tablesync|t),
+			'skip the error reported by the table sync worker');
+
+# Insert enough rows to test_tab_streaming to exceed the 64kB limit, also raising an
+# error on the subscriber during applying spooled changes for the same reason. Then
+# skip the transactio in question.
+$node_publisher->safe_psql('postgres',
+			   qq[
+BEGIN;
+INSERT INTO test_tab_streaming SELECT i, md5(i::text) FROM generate_series(1, 10000) s(i);
+COMMIT;
+]);
+test_skip_subscription_error($node_subscriber,
+			     'apply', 'tap_sub_streaming', 'test_tab_streaming',
+			     qq(postgres|tap_sub_streaming|INSERT|test_tab_streaming|apply|t),
+			     'skip the error reported by the table sync worker during during applying streaming changes');
+
+# Insert data to test_tab1 and test_tab_streaming that don't conflict.
+$node_publisher->safe_psql(
+    'postgres',
+    "INSERT INTO test_tab1 VALUES (2)");
+$node_publisher->safe_psql(
+    'postgres',
+    "INSERT INTO test_tab_streaming VALUES (10001, md5(10001::text))");
+
+$node_publisher->wait_for_catchup($appname);
+$node_publisher->wait_for_catchup($appname_streaming);
+
+# Check the data is successfully replicated after skipping the transactions.
+$result = $node_subscriber->safe_psql('postgres',
+				      "SELECT * FROM test_tab1");
+is($result, q(1
+2), "subscription gets changes after skipped transaction");
+$result = $node_subscriber->safe_psql('postgres',
+				      "SELECT count(1) FROM test_tab_streaming");
+is($result, q(2), "subscription gets changes after skipped streamed transaction");
+
+# Tests for skipping the transactions that are prepared and stream_prepared. We insert
+# the same data as the previous tests but prepare the transactions.  Those insertions
+# raise an error on the subscriptions.  Then we skip the transactions in question.
+$node_publisher->safe_psql('postgres',
+			   qq[
+BEGIN;
+INSERT INTO test_tab1 VALUES (1);
+PREPARE TRANSACTION 'skip_sub1';
+COMMIT PREPARED 'skip_sub1';
+]);
+test_skip_subscription_error($node_subscriber,
+			     'apply', 'tap_sub', 'test_tab1',
+			     qq(postgres|tap_sub|INSERT|test_tab1|apply|t),
+			     'skip the error on changes of the prepared transaction');
+
+$node_publisher->safe_psql('postgres',
+			   qq[
+BEGIN;
+INSERT INTO test_tab_streaming SELECT i, md5(i::text) FROM generate_series(1, 10000) s(i);
+PREPARE TRANSACTION 'skip_sub2';
+COMMIT PREPARED 'skip_sub2';
+]);
+test_skip_subscription_error($node_subscriber,
+			     'apply', 'tap_sub_streaming', 'test_tab_streaming',
+			     qq(postgres|tap_sub_streaming|INSERT|test_tab_streaming|apply|t),
+			     'skip the error on changes of the prepared-streamed transaction');
+
+# Check if the view doesn't show any entries after dropping the subscriptions.
+$node_subscriber->safe_psql(
+    'postgres',
+    q[
+DROP SUBSCRIPTION tap_sub;
+DROP SUBSCRIPTION tap_sub_streaming;
+]);
+$result = $node_subscriber->safe_psql('postgres',
+				      "SELECT count(1) FROM pg_stat_subscription_errors");
+is($result, q(0), 'no error after dropping subscription');
-- 
2.24.3 (Apple Git-128)

