From e046be4b851ccebddabae47f57cbc0d8f49fadc8 Mon Sep 17 00:00:00 2001
From: Mark Dilger <mark.dilger@enterprisedb.com>
Date: Thu, 17 Jun 2021 12:39:19 -0700
Subject: [PATCH v1] Optionally disabling subscriptions on error

Logical replication apply workers for a subscription can easily get
stuck in an infinite loop of attempting to apply a change,
triggering an error (such as a constraint violation), exiting with
an error written to the subscription worker log, and restarting.

To partially remedy the situation, adding a new
subscription_parameter named 'disable_on_error'.  To be consistent
with old behavior, the parameter defaults to false.  When true,
the apply worker catches errors thrown, and for errors that are
deemed not to be transient, disables the subscription in order to
break the loop.  New columns in the pg_subscription table help
diagnose the situation:  'disabled_by_error' shows whether this has
occurred and 'suberrmsg' includes the messsage field of the error.
The error is still also written to the logs.

In addition to helping on production systems, this makes writing TAP
tests involving error conditions simpler.  Rather than having to
open and parse the apply worker's log file, the test can query the
pg_subscription table.  It also helps that the workers don't go into
an infinite loop during the test.
---
 doc/src/sgml/catalogs.sgml                    |  10 +
 src/backend/catalog/pg_subscription.c         |  10 +
 src/backend/catalog/system_views.sql          |   8 +-
 src/backend/commands/subscriptioncmds.c       |  62 ++++++
 src/backend/replication/logical/launcher.c    |   2 +
 src/backend/replication/logical/worker.c      | 176 +++++++++++++++++-
 src/bin/pg_dump/pg_dump.c                     |   6 +-
 src/bin/pg_dump/pg_dump.h                     |   1 +
 src/include/catalog/pg_subscription.h         |  13 ++
 .../subscription/t/022_disable_on_error.pl    | 125 +++++++++++++
 10 files changed, 408 insertions(+), 5 deletions(-)
 create mode 100644 src/test/subscription/t/022_disable_on_error.pl

diff --git a/doc/src/sgml/catalogs.sgml b/doc/src/sgml/catalogs.sgml
index f517a7d4af..6fd95cb7ca 100644
--- a/doc/src/sgml/catalogs.sgml
+++ b/doc/src/sgml/catalogs.sgml
@@ -7671,6 +7671,16 @@ SCRAM-SHA-256$<replaceable>&lt;iteration count&gt;</replaceable>:<replaceable>&l
       </para></entry>
      </row>
 
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>suberrmsg</structfield> <type>text</type>
+      </para>
+      <para>
+       The message from the error which disabled the subscription, if it has
+       been automatically disabled.
+      </para></entry>
+     </row>
+
      <row>
       <entry role="catalog_table_entry"><para role="column_definition">
        <structfield>subpublications</structfield> <type>text[]</type>
diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c
index 29fc4218cd..20b51a5f1f 100644
--- a/src/backend/catalog/pg_subscription.c
+++ b/src/backend/catalog/pg_subscription.c
@@ -66,6 +66,8 @@ GetSubscription(Oid subid, bool missing_ok)
 	sub->name = pstrdup(NameStr(subform->subname));
 	sub->owner = subform->subowner;
 	sub->enabled = subform->subenabled;
+	sub->disable_on_error = subform->disable_on_error;
+	sub->disabled_by_error = subform->disabled_by_error;
 	sub->binary = subform->subbinary;
 	sub->stream = subform->substream;
 
@@ -95,6 +97,14 @@ GetSubscription(Oid subid, bool missing_ok)
 	Assert(!isnull);
 	sub->synccommit = TextDatumGetCString(datum);
 
+	/* Get errmsg */
+	datum = SysCacheGetAttr(SUBSCRIPTIONOID,
+							tup,
+							Anum_pg_subscription_suberrmsg,
+							&isnull);
+	Assert(!isnull);
+	sub->errmsg = TextDatumGetCString(datum);
+
 	/* Get publications */
 	datum = SysCacheGetAttr(SUBSCRIPTIONOID,
 							tup,
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index 999d984068..49a02f9353 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -1252,8 +1252,10 @@ CREATE VIEW pg_replication_origin_status AS
 
 REVOKE ALL ON pg_replication_origin_status FROM public;
 
--- All columns of pg_subscription except subconninfo are publicly readable.
+-- All columns of pg_subscription except subconninfo and suberrmsg are publicly
+-- readable.
 REVOKE ALL ON pg_subscription FROM public;
-GRANT SELECT (oid, subdbid, subname, subowner, subenabled, subbinary,
-              substream, subslotname, subsynccommit, subpublications)
+GRANT SELECT (oid, subdbid, subname, subowner, subenabled, disable_on_error,
+			  disabled_by_error, subbinary, substream, subslotname,
+			  subsynccommit, subpublications)
     ON pg_subscription TO public;
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 75e195f286..098036f77e 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -63,6 +63,8 @@ static void
 parse_subscription_options(List *options,
 						   bool *connect,
 						   bool *enabled_given, bool *enabled,
+						   bool *disable_on_error_given,
+						   bool *disable_on_error,
 						   bool *create_slot,
 						   bool *slot_name_given, char **slot_name,
 						   bool *copy_data,
@@ -84,13 +86,24 @@ parse_subscription_options(List *options,
 		*connect = true;
 	if (enabled)
 	{
+		Assert(enabled_given);
+
 		*enabled_given = false;
 		*enabled = true;
 	}
+	if (disable_on_error)
+	{
+		Assert(disable_on_error_given);
+
+		*disable_on_error_given = false;
+		*disable_on_error = false;
+	}
 	if (create_slot)
 		*create_slot = true;
 	if (slot_name)
 	{
+		Assert(slot_name_given);
+
 		*slot_name_given = false;
 		*slot_name = NULL;
 	}
@@ -102,11 +115,15 @@ parse_subscription_options(List *options,
 		*refresh = true;
 	if (binary)
 	{
+		Assert(binary_given);
+
 		*binary_given = false;
 		*binary = false;
 	}
 	if (streaming)
 	{
+		Assert(streaming_given);
+
 		*streaming_given = false;
 		*streaming = false;
 	}
@@ -136,6 +153,16 @@ parse_subscription_options(List *options,
 			*enabled_given = true;
 			*enabled = defGetBoolean(defel);
 		}
+		else if (strcmp(defel->defname, "disable_on_error") == 0 &&
+				 disable_on_error)
+		{
+			if (*disable_on_error_given)
+				ereport(ERROR,
+						(errcode(ERRCODE_SYNTAX_ERROR),
+						 errmsg("conflicting or redundant options")));
+			*disable_on_error_given = true;
+			*disable_on_error = defGetBoolean(defel);
+		}
 		else if (strcmp(defel->defname, "create_slot") == 0 && create_slot)
 		{
 			if (create_slot_given)
@@ -334,6 +361,8 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
 	bool		connect;
 	bool		enabled_given;
 	bool		enabled;
+	bool		disable_on_error_given;
+	bool		disable_on_error;
 	bool		copy_data;
 	bool		streaming;
 	bool		streaming_given;
@@ -355,6 +384,8 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
 	parse_subscription_options(stmt->options,
 							   &connect,
 							   &enabled_given, &enabled,
+							   &disable_on_error_given,
+							   &disable_on_error,
 							   &create_slot,
 							   &slotname_given, &slotname,
 							   &copy_data,
@@ -427,6 +458,10 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
 		DirectFunctionCall1(namein, CStringGetDatum(stmt->subname));
 	values[Anum_pg_subscription_subowner - 1] = ObjectIdGetDatum(owner);
 	values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(enabled);
+	values[Anum_pg_subscription_disable_on_error - 1] =
+		BoolGetDatum(disable_on_error);
+	values[Anum_pg_subscription_disabled_by_error - 1] =
+		BoolGetDatum(false);
 	values[Anum_pg_subscription_subbinary - 1] = BoolGetDatum(binary);
 	values[Anum_pg_subscription_substream - 1] = BoolGetDatum(streaming);
 	values[Anum_pg_subscription_subconninfo - 1] =
@@ -438,6 +473,8 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
 		nulls[Anum_pg_subscription_subslotname - 1] = true;
 	values[Anum_pg_subscription_subsynccommit - 1] =
 		CStringGetTextDatum(synchronous_commit);
+	values[Anum_pg_subscription_suberrmsg - 1] =
+		CStringGetTextDatum("");
 	values[Anum_pg_subscription_subpublications - 1] =
 		publicationListToArray(publications);
 
@@ -799,6 +836,8 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel)
 	{
 		case ALTER_SUBSCRIPTION_OPTIONS:
 			{
+				bool		disable_on_error;
+				bool		disable_on_error_given;
 				char	   *slotname;
 				bool		slotname_given;
 				char	   *synchronous_commit;
@@ -810,6 +849,8 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel)
 				parse_subscription_options(stmt->options,
 										   NULL,	/* no "connect" */
 										   NULL, NULL,	/* no "enabled" */
+										   &disable_on_error_given,
+										   &disable_on_error,
 										   NULL,	/* no "create_slot" */
 										   &slotname_given, &slotname,
 										   NULL,	/* no "copy_data" */
@@ -818,6 +859,14 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel)
 										   &binary_given, &binary,
 										   &streaming_given, &streaming);
 
+				if (disable_on_error_given)
+				{
+					values[Anum_pg_subscription_disable_on_error - 1] =
+						BoolGetDatum(disable_on_error);
+					replaces[Anum_pg_subscription_disable_on_error - 1 ] =
+						true;
+				}
+
 				if (slotname_given)
 				{
 					if (sub->enabled && !slotname)
@@ -867,6 +916,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel)
 				parse_subscription_options(stmt->options,
 										   NULL,	/* no "connect" */
 										   &enabled_given, &enabled,
+										   NULL, NULL,	/* no "disable_on_error" */
 										   NULL,	/* no "create_slot" */
 										   NULL, NULL,	/* no "slot_name" */
 										   NULL,	/* no "copy_data" */
@@ -885,6 +935,15 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel)
 					BoolGetDatum(enabled);
 				replaces[Anum_pg_subscription_subenabled - 1] = true;
 
+				/*
+				 * Manually enabling or disabling a subscription clears any
+				 * error which automatically disabled it.
+				 */
+				values[Anum_pg_subscription_disabled_by_error - 1] = false;
+				replaces[Anum_pg_subscription_disabled_by_error - 1] = true;
+				values[Anum_pg_subscription_suberrmsg - 1] = CStringGetTextDatum("");
+				replaces[Anum_pg_subscription_suberrmsg - 1] = true;
+
 				if (enabled)
 					ApplyLauncherWakeupAtCommit();
 
@@ -912,6 +971,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel)
 				parse_subscription_options(stmt->options,
 										   NULL,	/* no "connect" */
 										   NULL, NULL,	/* no "enabled" */
+										   NULL, NULL,	/* no "disable_on_error" */
 										   NULL,	/* no "create_slot" */
 										   NULL, NULL,	/* no "slot_name" */
 										   &copy_data,
@@ -958,6 +1018,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel)
 				parse_subscription_options(stmt->options,
 										   NULL,	/* no "connect" */
 										   NULL, NULL,	/* no "enabled" */
+										   NULL, NULL,	/* no "disable_on_error" */
 										   NULL,	/* no "create_slot" */
 										   NULL, NULL,	/* no "slot_name" */
 										   isadd ? &copy_data : NULL,	/* for drop, no
@@ -1005,6 +1066,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel)
 				parse_subscription_options(stmt->options,
 										   NULL,	/* no "connect" */
 										   NULL, NULL,	/* no "enabled" */
+										   NULL, NULL,	/* no "disable_on_error" */
 										   NULL,	/* no "create_slot" */
 										   NULL, NULL,	/* no "slot_name" */
 										   &copy_data,
diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index e3b11daa89..e8a32b13b6 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -132,6 +132,8 @@ get_subscription_list(void)
 		sub->dbid = subform->subdbid;
 		sub->owner = subform->subowner;
 		sub->enabled = subform->subenabled;
+		sub->disable_on_error = subform->disable_on_error;
+		sub->disabled_by_error = subform->disabled_by_error;
 		sub->name = pstrdup(NameStr(subform->subname));
 		/* We don't fill fields we are not interested in. */
 
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index bbb659dad0..ffd57b7efc 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -62,6 +62,7 @@
 #include "access/xact.h"
 #include "access/xlog_internal.h"
 #include "catalog/catalog.h"
+#include "catalog/indexing.h"
 #include "catalog/namespace.h"
 #include "catalog/partition.h"
 #include "catalog/pg_inherits.h"
@@ -2412,6 +2413,162 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
 	walrcv_endstreaming(LogRepWorkerWalRcvConn, &tli);
 }
 
+/*
+ * Apply main loop with logic to disable a stuck subscription
+ *
+ * Applies changes, and for non-transient errors, catches the error and
+ * disables the subscription before rethrowing.
+ */
+static void
+LogicalRepApplyLoopTry(XLogRecPtr last_received)
+{
+	MemoryContext mcxt = CurrentMemoryContext;
+	bool		did_error = false;
+
+	PG_TRY();
+	{
+		LogicalRepApplyLoop(last_received);
+	}
+	PG_CATCH();
+	{
+		/*
+		 * If the error is transient, network protocol related, or resource
+		 * exhaustion related, just rethrow it.  We only want to disable the
+		 * subscription for permanent errors.
+		 */
+		switch (geterrcode())
+		{
+			case ERRCODE_CONNECTION_EXCEPTION:
+			case ERRCODE_CONNECTION_DOES_NOT_EXIST:
+			case ERRCODE_CONNECTION_FAILURE:
+			case ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION:
+			case ERRCODE_SQLSERVER_REJECTED_ESTABLISHMENT_OF_SQLCONNECTION:
+			case ERRCODE_TRANSACTION_RESOLUTION_UNKNOWN:
+			case ERRCODE_PROTOCOL_VIOLATION:
+			case ERRCODE_INSUFFICIENT_RESOURCES:
+			case ERRCODE_DISK_FULL:
+			case ERRCODE_OUT_OF_MEMORY:
+			case ERRCODE_TOO_MANY_CONNECTIONS:
+			case ERRCODE_CONFIGURATION_LIMIT_EXCEEDED:
+			case ERRCODE_PROGRAM_LIMIT_EXCEEDED:
+			case ERRCODE_STATEMENT_TOO_COMPLEX:
+			case ERRCODE_TOO_MANY_COLUMNS:
+			case ERRCODE_TOO_MANY_ARGUMENTS:
+			case ERRCODE_OPERATOR_INTERVENTION:
+			case ERRCODE_QUERY_CANCELED:
+			case ERRCODE_ADMIN_SHUTDOWN:
+			case ERRCODE_CRASH_SHUTDOWN:
+			case ERRCODE_CANNOT_CONNECT_NOW:
+			case ERRCODE_DATABASE_DROPPED:
+			case ERRCODE_IDLE_SESSION_TIMEOUT:
+				PG_RE_THROW();
+			default:
+				break;
+		}
+
+		/*
+		 * Record that we had an error, but otherwise don't do anything here in the
+		 * catch block, as anything complicated we do might itself throw.
+		 */
+		did_error = true;
+	}
+	PG_END_TRY();
+
+	/*
+	 * If we caught an error above, disable the subscription and record copies
+	 * of the relevant error text.
+	 */
+	if (did_error)
+	{
+		Relation	rel;
+		bool		nulls[Natts_pg_subscription];
+		bool		replaces[Natts_pg_subscription];
+		Datum		values[Natts_pg_subscription];
+		HeapTuple	tup;
+		Oid			subid;
+		Form_pg_subscription form;
+		ErrorData  *edata;
+
+		/*
+		 * Clean up from the error and get a fresh transaction in which to
+		 * disable the subscription.
+		 */
+		MemoryContextSwitchTo(mcxt);
+		edata = CopyErrorData();
+
+		AbortOutOfAnyTransaction();
+		FlushErrorState();
+
+		StartTransactionCommand();
+
+		/* Look up our subscription in the catalogs */
+		rel = table_open(SubscriptionRelationId, RowExclusiveLock);
+		tup = SearchSysCacheCopy2(SUBSCRIPTIONNAME, MyDatabaseId,
+								  CStringGetDatum(MySubscription->name));
+		if (!HeapTupleIsValid(tup))
+			ereport(ERROR,
+					(errcode(ERRCODE_UNDEFINED_OBJECT),
+					 errmsg("subscription \"%s\" does not exist",
+							MySubscription->name)));
+
+		form = (Form_pg_subscription) GETSTRUCT(tup);
+		subid = form->oid;
+		LockSharedObject(SubscriptionRelationId, subid, 0, AccessExclusiveLock);
+
+		/*
+		 * We would not be here unless this subscription's disable_on_error
+		 * field was true when our worker began applying changes, but check
+		 * whether that field has changed in the interim.
+		 */
+		if (!form->disable_on_error)
+			ReThrowError(edata);
+
+		/* Form a new tuple. */
+		memset(values, 0, sizeof(values));
+		memset(nulls, false, sizeof(nulls));
+		memset(replaces, false, sizeof(replaces));
+
+		/* Set the subscription to disabled, and note the reason. */
+		values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(false);
+		replaces[Anum_pg_subscription_subenabled - 1] = true;
+		values[Anum_pg_subscription_disabled_by_error - 1] =
+			BoolGetDatum(true);
+		replaces[Anum_pg_subscription_disabled_by_error - 1] = true;
+		values[Anum_pg_subscription_suberrmsg - 1] =
+			CStringGetTextDatum(edata->message ? edata->message : "");
+		replaces[Anum_pg_subscription_suberrmsg - 1] = true;
+
+		/* Update the catalog */
+		tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
+								replaces);
+		CatalogTupleUpdate(rel, &tup->t_self, tup);
+		heap_freetuple(tup);
+
+		table_close(rel, RowExclusiveLock);
+
+		ereport(LOG,
+				(errmsg("edata is true for subscription '%s': message = \"%s\", hint = \"%s\"",
+						MySubscription->name,
+						edata->message ? edata->message : "<NONE>",
+						edata->hint ? edata->hint : "<NONE>")));
+
+		ereport(LOG,
+				(errmsg("logical replication apply worker for subscription \"%s\" will "
+						"stop because the subscription was disabled due to error",
+						MySubscription->name)));
+
+		CommitTransactionCommand();
+
+		/*
+		 * Now that we've successfully disabled the subscription, rethrow the
+		 * error for any other consumers who might want to see it.  In
+		 * particular, we expect that the worker will write the error to the
+		 * server log and then exit.
+		 */
+		ReThrowError(edata);
+	}
+}
+
 /*
  * Send a Standby Status Update message to server.
  *
@@ -3138,6 +3295,20 @@ ApplyWorkerMain(Datum main_arg)
 		proc_exit(0);
 	}
 
+	/*
+	 * This is probably unnecessary, as an error which sets "disabled_by_error" will also
+	 * have cleared "enabled", but for now, see if this ever resolves to true.
+	 */
+	if (MySubscription->disabled_by_error)
+	{
+		ereport(LOG,
+				(errmsg("logical replication apply worker for subscription \"%s\" will not "
+						"start because the subscription was disabled by an error",
+						MySubscription->name)));
+
+		proc_exit(0);
+	}
+
 	/* Setup synchronous commit according to the user's wishes */
 	SetConfigOption("synchronous_commit", MySubscription->synccommit,
 					PGC_BACKEND, PGC_S_OVERRIDE);
@@ -3241,7 +3412,10 @@ ApplyWorkerMain(Datum main_arg)
 	walrcv_startstreaming(LogRepWorkerWalRcvConn, &options);
 
 	/* Run the main loop. */
-	LogicalRepApplyLoop(origin_startpos);
+	if (MySubscription->disable_on_error)
+		LogicalRepApplyLoopTry(origin_startpos);
+	else
+		LogicalRepApplyLoop(origin_startpos);
 
 	proc_exit(0);
 }
diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c
index 8f53cc7c3b..16d9d006d1 100644
--- a/src/bin/pg_dump/pg_dump.c
+++ b/src/bin/pg_dump/pg_dump.c
@@ -4307,6 +4307,7 @@ getSubscriptions(Archive *fout)
 	int			i_subconninfo;
 	int			i_subslotname;
 	int			i_subsynccommit;
+	int			i_suberrmsg;
 	int			i_subpublications;
 	int			i_subbinary;
 	int			i,
@@ -4338,7 +4339,7 @@ getSubscriptions(Archive *fout)
 					  "SELECT s.tableoid, s.oid, s.subname,\n"
 					  " (%s s.subowner) AS rolname,\n"
 					  " s.subconninfo, s.subslotname, s.subsynccommit,\n"
-					  " s.subpublications,\n",
+					  " s.suberrmsg, s.subpublications,\n",
 					  username_subquery);
 
 	if (fout->remoteVersion >= 140000)
@@ -4367,6 +4368,7 @@ getSubscriptions(Archive *fout)
 	i_subconninfo = PQfnumber(res, "subconninfo");
 	i_subslotname = PQfnumber(res, "subslotname");
 	i_subsynccommit = PQfnumber(res, "subsynccommit");
+	i_suberrmsg = PQfnumber(res, "suberrmsg");
 	i_subpublications = PQfnumber(res, "subpublications");
 	i_subbinary = PQfnumber(res, "subbinary");
 	i_substream = PQfnumber(res, "substream");
@@ -4389,6 +4391,8 @@ getSubscriptions(Archive *fout)
 			subinfo[i].subslotname = pg_strdup(PQgetvalue(res, i, i_subslotname));
 		subinfo[i].subsynccommit =
 			pg_strdup(PQgetvalue(res, i, i_subsynccommit));
+		subinfo[i].suberrmsg =
+			pg_strdup(PQgetvalue(res, i, i_suberrmsg));
 		subinfo[i].subpublications =
 			pg_strdup(PQgetvalue(res, i, i_subpublications));
 		subinfo[i].subbinary =
diff --git a/src/bin/pg_dump/pg_dump.h b/src/bin/pg_dump/pg_dump.h
index 49e1b0a09c..3c2a8dd38c 100644
--- a/src/bin/pg_dump/pg_dump.h
+++ b/src/bin/pg_dump/pg_dump.h
@@ -638,6 +638,7 @@ typedef struct _SubscriptionInfo
 	char	   *subbinary;
 	char	   *substream;
 	char	   *subsynccommit;
+	char	   *suberrmsg;
 	char	   *subpublications;
 } SubscriptionInfo;
 
diff --git a/src/include/catalog/pg_subscription.h b/src/include/catalog/pg_subscription.h
index 0060ebfb40..7ffa41c26b 100644
--- a/src/include/catalog/pg_subscription.h
+++ b/src/include/catalog/pg_subscription.h
@@ -52,6 +52,12 @@ CATALOG(pg_subscription,6100,SubscriptionRelationId) BKI_SHARED_RELATION BKI_ROW
 	bool		subenabled;		/* True if the subscription is enabled (the
 								 * worker should be running) */
 
+	bool		disable_on_error;	/* True if apply errors should automatically
+									   disable the subscription */
+
+	bool		disabled_by_error;	/* True if an error caused the subscription
+									   to be disabled */
+
 	bool		subbinary;		/* True if the subscription wants the
 								 * publisher to send data in binary */
 
@@ -67,6 +73,9 @@ CATALOG(pg_subscription,6100,SubscriptionRelationId) BKI_SHARED_RELATION BKI_ROW
 	/* Synchronous commit setting for worker */
 	text		subsynccommit BKI_FORCE_NOT_NULL;
 
+	/* Message from error which disabled this subscription */
+	text		suberrmsg BKI_FORCE_NOT_NULL;
+
 	/* List of publications subscribed to */
 	text		subpublications[1] BKI_FORCE_NOT_NULL;
 #endif
@@ -91,12 +100,16 @@ typedef struct Subscription
 	char	   *name;			/* Name of the subscription */
 	Oid			owner;			/* Oid of the subscription owner */
 	bool		enabled;		/* Indicates if the subscription is enabled */
+	bool		disable_on_error;	/* Whether errors automatically disable */
+	bool		disabled_by_error;	/* Whether an error has disabled */
 	bool		binary;			/* Indicates if the subscription wants data in
 								 * binary format */
 	bool		stream;			/* Allow streaming in-progress transactions. */
 	char	   *conninfo;		/* Connection string to the publisher */
 	char	   *slotname;		/* Name of the replication slot */
 	char	   *synccommit;		/* Synchronous commit setting for worker */
+	char	   *errmsg;			/* Message from error which disabled */
+	char	   *errhint;		/* Hint from error which disabled */
 	List	   *publications;	/* List of publication names to subscribe to */
 } Subscription;
 
diff --git a/src/test/subscription/t/022_disable_on_error.pl b/src/test/subscription/t/022_disable_on_error.pl
new file mode 100644
index 0000000000..b084bbab5a
--- /dev/null
+++ b/src/test/subscription/t/022_disable_on_error.pl
@@ -0,0 +1,125 @@
+
+# Copyright (c) 2021, PostgreSQL Global Development Group
+
+# Test of logical replication subscription self-disabling feature
+use strict;
+use warnings;
+use PostgresNode;
+use TestLib;
+use Test::More tests => 6;
+
+my @schemas = qw(s1 s2);
+my ($schema, $cmd);
+
+my $node_publisher = get_new_node('publisher');
+$node_publisher->init(allows_streaming => 'logical');
+$node_publisher->start;
+
+my $node_subscriber = get_new_node('subscriber');
+$node_subscriber->init(allows_streaming => 'logical');
+$node_subscriber->start;
+
+# Create identical schema, table and index on both the publisher and
+# subscriber, with publications and subscriptions linking the two.
+#
+my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
+for $schema (@schemas)
+{
+	$cmd = qq(
+CREATE SCHEMA $schema;
+CREATE TABLE $schema.tbl (i INT);
+ALTER TABLE $schema.tbl REPLICA IDENTITY FULL;
+CREATE INDEX ${schema}_tbl_idx ON $schema.tbl(i));
+	$node_publisher->safe_psql('postgres', $cmd);
+	$node_subscriber->safe_psql('postgres', $cmd);
+
+	# Create the publication for this table
+	$cmd = qq(
+CREATE PUBLICATION $schema FOR TABLE $schema.tbl);
+	$node_publisher->safe_psql('postgres', $cmd);
+
+	# Create the subscription for this table
+	$cmd = qq(
+CREATE SUBSCRIPTION $schema
+	CONNECTION '$publisher_connstr'
+	PUBLICATION $schema
+	WITH (disable_on_error = true));
+	$node_subscriber->safe_psql('postgres', $cmd);
+}
+
+# Create an additional unique index in schema s1 on the subscriber only,
+# creating the circumstances for the replication to fail while applying changes
+# to subscription "s1" while succeeding for subscription "s2".
+#
+$cmd = qq(CREATE UNIQUE INDEX s1_tbl_unique ON s1.tbl (i));
+$node_subscriber->safe_psql('postgres', $cmd);
+
+$node_publisher->wait_for_catchup($_) for (@schemas);
+
+# Wait for initial sync to finish as well
+my $synced_query =
+  "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('s', 'r');";
+$node_subscriber->poll_query_until('postgres', $synced_query)
+  or die "Timed out while waiting for subscriber to synchronize data";
+ 
+# Enter non-unique data for schema "s1" on the publisher.  This should cause
+# subscription "s1" on the subscriber to fail and get automatically disabled.
+#
+$cmd = qq(INSERT INTO s1.tbl (i) VALUES (1), (1), (1));
+$node_publisher->safe_psql('postgres', $cmd);
+
+# Wait for subscription "s1" to be automatically disabled.
+#
+my $s1_disabled_query = qq(
+SELECT count(1) = 1
+	FROM pg_catalog.pg_subscription
+	WHERE subname = 's1'
+	  AND disabled_by_error IS TRUE);
+$node_subscriber->poll_query_until('postgres', $s1_disabled_query);
+
+# Enter unique data for both schemas on the publisher.  This should succeed on
+# the publisher node, and not cause any additional problems on the subscriber
+# side either, though disabled subscription "s1" should not replicate anything.
+#
+for $schema (@schemas)
+{
+	$cmd = qq(INSERT INTO $schema.tbl (i) VALUES (2));
+	$node_publisher->safe_psql('postgres', $cmd);
+}
+
+# Wait for the data to replicate for subscription "s2".  This tests that the
+# problems encountered by subscription "s1" do not cause subscription "s2" to
+# get stuck.
+$node_publisher->wait_for_catchup("s2");
+$node_subscriber->poll_query_until('postgres', $synced_query)
+  or die "Timed out while waiting for subscriber to synchronize data";
+
+# Subscription "s1" should have disabled itself due to error
+#
+$cmd = qq(
+SELECT disabled_by_error FROM pg_catalog.pg_subscription WHERE subname = 's1');
+is ($node_subscriber->safe_psql('postgres', $cmd),
+	"t", "subscription s1 automatically disabled");
+$cmd = qq(
+SELECT subenabled FROM pg_catalog.pg_subscription WHERE subname = 's1');
+is ($node_subscriber->safe_psql('postgres', $cmd),
+	"f", "subscription s1 no longer enabled");
+$cmd = qq(
+SELECT suberrmsg FROM pg_catalog.pg_subscription WHERE subname = 's1');
+is ($node_subscriber->safe_psql('postgres', $cmd),
+	qq(duplicate key value violates unique constraint "s1_tbl_unique"),
+	"subscription s1 disabled by unique key violation");
+
+# Subscription "s2" should still be enabled and have replicated all changes
+#
+$cmd = qq(
+SELECT disabled_by_error FROM pg_catalog.pg_subscription WHERE subname = 's2');
+is ($node_subscriber->safe_psql('postgres', $cmd),
+	"f", "subscription s2 not automatically disabled");
+$cmd = qq(
+SELECT subenabled FROM pg_catalog.pg_subscription WHERE subname = 's2');
+is ($node_subscriber->safe_psql('postgres', $cmd),
+	"t", "subscription s2 still enabled");
+$cmd = q(SELECT i FROM s2.tbl);
+is ($node_subscriber->safe_psql('postgres', $cmd),
+	"2", "subscription s2 replicated data");
-- 
2.21.1 (Apple Git-122.3)

