From 3ffde62a5a4aa176026ef1df3fb95006750cebed Mon Sep 17 00:00:00 2001
From: Masahiko Sawada <sawada.mshk@gmail.com>
Date: Tue, 22 Feb 2022 10:21:53 +0900
Subject: [PATCH v1] Reconsider pg_stat_subscription_workers view.

As the result of the discussion, we've concluded that the stats
collector is not an appropriate place to store the error information of
subscription workers.

This commits changes the view so that it stores only statistics
counters: apply_error_count and sync_error_count.  The
removed error information such as error-XID and the error message
would be stored in another way in the future in which is more reliable
and persistent.

Removing these error details, since we don't need to record the
error information for apply workers and tablesync workers separately,
the view now has one entry per subscription.

Also, it changes the view name to pg_stat_subscription_activity
since the word "worker" is an implementation detail that we use one
worker for one tablesync.

Discussion: https://postgr.es/m/20220125063131.4cmvsxbz2tdg6g65@alap3.anarazel.de
---
 doc/src/sgml/logical-replication.sgml       |   2 -
 doc/src/sgml/monitoring.sgml                | 127 +++-----
 src/backend/catalog/system_functions.sql    |   6 +-
 src/backend/catalog/system_views.sql        |  26 +-
 src/backend/postmaster/pgstat.c             | 308 ++++++++------------
 src/backend/replication/logical/worker.c    |  44 +--
 src/backend/utils/adt/pgstatfuncs.c         | 120 ++------
 src/include/catalog/pg_proc.dat             |  27 +-
 src/include/pgstat.h                        |  99 ++-----
 src/test/regress/expected/rules.out         |  22 +-
 src/test/subscription/t/026_worker_stats.pl | 153 +++-------
 src/tools/pgindent/typedefs.list            |   5 +-
 12 files changed, 287 insertions(+), 652 deletions(-)

diff --git a/doc/src/sgml/logical-replication.sgml b/doc/src/sgml/logical-replication.sgml
index 96b4886e08..52b6dbbb37 100644
--- a/doc/src/sgml/logical-replication.sgml
+++ b/doc/src/sgml/logical-replication.sgml
@@ -346,8 +346,6 @@
   <para>
    A conflict will produce an error and will stop the replication; it must be
    resolved manually by the user.  Details about the conflict can be found in
-   <link linkend="monitoring-pg-stat-subscription-workers">
-   <structname>pg_stat_subscription_workers</structname></link> and the
    subscriber's server log.
   </para>
 
diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml
index bf7625d988..fe9500f3f6 100644
--- a/doc/src/sgml/monitoring.sgml
+++ b/doc/src/sgml/monitoring.sgml
@@ -628,11 +628,11 @@ postgres   27093  0.0  0.0  30096  2752 ?        Ss   11:34   0:00 postgres: ser
      </row>
 
      <row>
-      <entry><structname>pg_stat_subscription_workers</structname><indexterm><primary>pg_stat_subscription_workers</primary></indexterm></entry>
-      <entry>One row per subscription worker, showing statistics about errors
-      that occurred on that subscription worker.
-      See <link linkend="monitoring-pg-stat-subscription-workers">
-      <structname>pg_stat_subscription_workers</structname></link> for details.
+      <entry><structname>pg_stat_subscription_activity</structname><indexterm><primary>pg_stat_subscription_activity</primary></indexterm></entry>
+      <entry>One row per subscription, showing statistics about subscription
+      activity.
+      See <link linkend="monitoring-pg-stat-subscription-activity">
+      <structname>pg_stat_subscription_activity</structname></link> for details.
       </entry>
      </row>
 
@@ -3063,23 +3063,21 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i
 
  </sect2>
 
- <sect2 id="monitoring-pg-stat-subscription-workers">
-  <title><structname>pg_stat_subscription_workers</structname></title>
+ <sect2 id="monitoring-pg-stat-subscription-activity">
+  <title><structname>pg_stat_subscription_activity</structname></title>
 
   <indexterm>
-   <primary>pg_stat_subscription_workers</primary>
+   <primary>pg_stat_subscription_activity</primary>
   </indexterm>
 
   <para>
-   The <structname>pg_stat_subscription_workers</structname> view will contain
-   one row per subscription worker on which errors have occurred, for workers
-   applying logical replication changes and workers handling the initial data
-   copy of the subscribed tables.  The statistics entry is removed when the
+   The <structname>pg_stat_subscription_activity</structname> view will contain
+   one row per subscription.  The statistics entry is removed when the
    corresponding subscription is dropped.
   </para>
 
-  <table id="pg-stat-subscription-workers" xreflabel="pg_stat_subscription_workers">
-   <title><structname>pg_stat_subscription_workers</structname> View</title>
+  <table id="pg-stat-subscription-activity" xreflabel="pg_stat_subscription_activity">
+   <title><structname>pg_stat_subscription_activity</structname> View</title>
    <tgroup cols="1">
     <thead>
      <row>
@@ -3113,69 +3111,19 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i
 
      <row>
       <entry role="catalog_table_entry"><para role="column_definition">
-       <structfield>subrelid</structfield> <type>oid</type>
+       <structfield>apply_error_count</structfield> <type>uint8</type>
       </para>
       <para>
-       OID of the relation that the worker is synchronizing; null for the
-       main apply worker
-      </para></entry>
-     </row>
-
-     <row>
-      <entry role="catalog_table_entry"><para role="column_definition">
-       <structfield>last_error_relid</structfield> <type>oid</type>
-      </para>
-      <para>
-       OID of the relation that the worker was processing when the
-       error occurred
-      </para></entry>
-     </row>
-
-     <row>
-      <entry role="catalog_table_entry"><para role="column_definition">
-       <structfield>last_error_command</structfield> <type>text</type>
-      </para>
-      <para>
-       Name of command being applied when the error occurred.  This field
-       is null if the error was reported during the initial data copy.
-      </para></entry>
-     </row>
-
-     <row>
-      <entry role="catalog_table_entry"><para role="column_definition">
-       <structfield>last_error_xid</structfield> <type>xid</type>
-      </para>
-      <para>
-       Transaction ID of the publisher node being applied when the error
-       occurred.  This field is null if the error was reported
-       during the initial data copy.
-      </para></entry>
-     </row>
-
-     <row>
-      <entry role="catalog_table_entry"><para role="column_definition">
-       <structfield>last_error_count</structfield> <type>uint8</type>
-      </para>
-      <para>
-       Number of consecutive times the error occurred
-      </para></entry>
-     </row>
-
-     <row>
-      <entry role="catalog_table_entry"><para role="column_definition">
-       <structfield>last_error_message</structfield> <type>text</type>
-      </para>
-      <para>
-       The error message
+       Number of times the error occurred during the application of changes
       </para></entry>
      </row>
 
      <row>
       <entry role="catalog_table_entry"><para role="column_definition">
-       <structfield>last_error_time</structfield> <type>timestamp with time zone</type>
+       <structfield>sync_error_count</structfield> <type>uint8</type>
       </para>
       <para>
-       Last time at which this error occurred
+       Number of times the error occurred during the initial data copy
       </para></entry>
      </row>
 
@@ -5267,6 +5215,24 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i
        </para></entry>
       </row>
 
+      <row>
+       <entry role="func_table_entry"><para role="func_signature">
+        <indexterm>
+          <primary>pg_stat_reset_single_subscription_counters</primary>
+        </indexterm>
+        <function>pg_stat_reset_single_subscription_counters</function> ( <type>oid</type> )
+        <returnvalue>void</returnvalue>
+       </para>
+       <para>
+        Resets statistics for a single subscription shown in the
+        <structname>pg_stat_subscription_activity</structname> view to zero.
+       </para>
+       <para>
+        This function is restricted to superusers by default, but other users
+        can be granted EXECUTE to run the function.
+       </para></entry>
+      </row>
+
       <row>
        <entry role="func_table_entry"><para role="func_signature">
         <indexterm>
@@ -5317,31 +5283,6 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i
        </para></entry>
       </row>
 
-      <row>
-       <entry role="func_table_entry"><para role="func_signature">
-        <indexterm>
-          <primary>pg_stat_reset_subscription_worker</primary>
-        </indexterm>
-        <function>pg_stat_reset_subscription_worker</function> ( <parameter>subid</parameter> <type>oid</type> <optional>, <parameter>relid</parameter> <type>oid</type> </optional> )
-        <returnvalue>void</returnvalue>
-       </para>
-       <para>
-        Resets the statistics of subscription workers running on the
-        subscription with <parameter>subid</parameter> shown in the
-        <structname>pg_stat_subscription_workers</structname> view.  If the
-        argument <parameter>relid</parameter> is not <literal>NULL</literal>,
-        resets statistics of the subscription worker handling the initial data
-        copy of the relation with <parameter>relid</parameter>.  Otherwise,
-        resets the subscription worker statistics of the main apply worker.
-        If the argument <parameter>relid</parameter> is omitted, resets the
-        statistics of all subscription workers running on the subscription
-        with <parameter>subid</parameter>.
-       </para>
-       <para>
-        This function is restricted to superusers by default, but other users
-        can be granted EXECUTE to run the function.
-       </para></entry>
-      </row>
      </tbody>
     </tgroup>
    </table>
diff --git a/src/backend/catalog/system_functions.sql b/src/backend/catalog/system_functions.sql
index fd1421788e..50d0661f2f 100644
--- a/src/backend/catalog/system_functions.sql
+++ b/src/backend/catalog/system_functions.sql
@@ -637,11 +637,9 @@ REVOKE EXECUTE ON FUNCTION pg_stat_reset_single_table_counters(oid) FROM public;
 
 REVOKE EXECUTE ON FUNCTION pg_stat_reset_single_function_counters(oid) FROM public;
 
-REVOKE EXECUTE ON FUNCTION pg_stat_reset_replication_slot(text) FROM public;
-
-REVOKE EXECUTE ON FUNCTION pg_stat_reset_subscription_worker(oid) FROM public;
+REVOKE EXECUTE ON FUNCTION pg_stat_reset_single_subscription_counters(oid) FROM public;
 
-REVOKE EXECUTE ON FUNCTION pg_stat_reset_subscription_worker(oid, oid) FROM public;
+REVOKE EXECUTE ON FUNCTION pg_stat_reset_replication_slot(text) FROM public;
 
 REVOKE EXECUTE ON FUNCTION lo_import(text) FROM public;
 
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index 3cb69b1f87..ffbbe49976 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -1264,25 +1264,11 @@ GRANT SELECT (oid, subdbid, subname, subowner, subenabled, subbinary,
               substream, subtwophasestate, subslotname, subsynccommit, subpublications)
     ON pg_subscription TO public;
 
-CREATE VIEW pg_stat_subscription_workers AS
+CREATE VIEW pg_stat_subscription_activity AS
     SELECT
-        w.subid,
+        a.subid,
         s.subname,
-        w.subrelid,
-        w.last_error_relid,
-        w.last_error_command,
-        w.last_error_xid,
-        w.last_error_count,
-        w.last_error_message,
-        w.last_error_time
-    FROM (SELECT
-              oid as subid,
-              NULL as relid
-          FROM pg_subscription
-          UNION ALL
-          SELECT
-              srsubid as subid,
-              srrelid as relid
-          FROM pg_subscription_rel) sr,
-          LATERAL pg_stat_get_subscription_worker(sr.subid, sr.relid) w
-          JOIN pg_subscription s ON (w.subid = s.oid);
+        a.apply_error_count,
+        a.sync_error_count
+    FROM pg_subscription as s,
+        LATERAL pg_stat_get_subscription_activity(oid) as a;
diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c
index 0646f53098..ec73f22ef3 100644
--- a/src/backend/postmaster/pgstat.c
+++ b/src/backend/postmaster/pgstat.c
@@ -106,7 +106,7 @@
 #define PGSTAT_DB_HASH_SIZE		16
 #define PGSTAT_TAB_HASH_SIZE	512
 #define PGSTAT_FUNCTION_HASH_SIZE	512
-#define PGSTAT_SUBWORKER_HASH_SIZE	32
+#define PGSTAT_SUBSCRIPTION_HASH_SIZE	32
 #define PGSTAT_REPLSLOT_HASH_SIZE	32
 
 
@@ -322,14 +322,13 @@ NON_EXEC_STATIC void PgstatCollectorMain(int argc, char *argv[]) pg_attribute_no
 static PgStat_StatDBEntry *pgstat_get_db_entry(Oid databaseid, bool create);
 static PgStat_StatTabEntry *pgstat_get_tab_entry(PgStat_StatDBEntry *dbentry,
 												 Oid tableoid, bool create);
-static PgStat_StatSubWorkerEntry *pgstat_get_subworker_entry(PgStat_StatDBEntry *dbentry,
-															 Oid subid, Oid subrelid,
-															 bool create);
+static PgStat_StatSubEntry *pgstat_get_sub_entry(PgStat_StatDBEntry *dbentry,
+												 Oid subid, bool create);
 static void pgstat_write_statsfiles(bool permanent, bool allDbs);
 static void pgstat_write_db_statsfile(PgStat_StatDBEntry *dbentry, bool permanent);
 static HTAB *pgstat_read_statsfiles(Oid onlydb, bool permanent, bool deep);
 static void pgstat_read_db_statsfile(Oid databaseid, HTAB *tabhash, HTAB *funchash,
-									 HTAB *subworkerhash, bool permanent);
+									 HTAB *subhash, bool permanent);
 static void backend_read_statsfile(void);
 
 static bool pgstat_write_statsfile_needed(void);
@@ -381,7 +380,7 @@ static void pgstat_recv_disconnect(PgStat_MsgDisconnect *msg, int len);
 static void pgstat_recv_replslot(PgStat_MsgReplSlot *msg, int len);
 static void pgstat_recv_tempfile(PgStat_MsgTempFile *msg, int len);
 static void pgstat_recv_subscription_purge(PgStat_MsgSubscriptionPurge *msg, int len);
-static void pgstat_recv_subworker_error(PgStat_MsgSubWorkerError *msg, int len);
+static void pgstat_recv_subscription_error(PgStat_MsgSubscriptionError *msg, int len);
 
 /* ------------------------------------------------------------
  * Public functions called from postmaster follow
@@ -1313,13 +1312,13 @@ pgstat_vacuum_stat(void)
 	}
 
 	/*
-	 * Repeat for subscription workers.  Similarly, we needn't bother in the
-	 * common case where no subscription workers' stats are being collected.
+	 * Repeat for subscription.  Similarly, we needn't bother in the common
+	 * case where no subscription stats are being collected.
 	 */
-	if (dbentry->subworkers != NULL &&
-		hash_get_num_entries(dbentry->subworkers) > 0)
+	if (dbentry->subscriptions != NULL &&
+		hash_get_num_entries(dbentry->subscriptions) > 0)
 	{
-		PgStat_StatSubWorkerEntry *subwentry;
+		PgStat_StatSubEntry *subentry;
 		PgStat_MsgSubscriptionPurge spmsg;
 
 		/*
@@ -1331,35 +1330,16 @@ pgstat_vacuum_stat(void)
 		spmsg.m_databaseid = MyDatabaseId;
 		spmsg.m_nentries = 0;
 
-		hash_seq_init(&hstat, dbentry->subworkers);
-		while ((subwentry = (PgStat_StatSubWorkerEntry *) hash_seq_search(&hstat)) != NULL)
+		hash_seq_init(&hstat, dbentry->subscriptions);
+		while ((subentry = (PgStat_StatSubEntry *) hash_seq_search(&hstat)) != NULL)
 		{
-			bool		exists = false;
-			Oid			subid = subwentry->key.subid;
+			Oid			subid = subentry->subid;
 
 			CHECK_FOR_INTERRUPTS();
 
 			if (hash_search(htab, (void *) &subid, HASH_FIND, NULL) != NULL)
 				continue;
 
-			/*
-			 * It is possible that we have multiple entries for the
-			 * subscription corresponding to apply worker and tablesync
-			 * workers. In such cases, we don't need to add the same subid
-			 * again.
-			 */
-			for (int i = 0; i < spmsg.m_nentries; i++)
-			{
-				if (spmsg.m_subids[i] == subid)
-				{
-					exists = true;
-					break;
-				}
-			}
-
-			if (exists)
-				continue;
-
 			/* This subscription is dead, add the subid to the message */
 			spmsg.m_subids[spmsg.m_nentries++] = subid;
 
@@ -1551,8 +1531,7 @@ pgstat_reset_shared_counters(const char *target)
  * ----------
  */
 void
-pgstat_reset_single_counter(Oid objoid, Oid subobjoid,
-							PgStat_Single_Reset_Type type)
+pgstat_reset_single_counter(Oid objoid, PgStat_Single_Reset_Type type)
 {
 	PgStat_MsgResetsinglecounter msg;
 
@@ -1563,7 +1542,6 @@ pgstat_reset_single_counter(Oid objoid, Oid subobjoid,
 	msg.m_databaseid = MyDatabaseId;
 	msg.m_resettype = type;
 	msg.m_objectid = objoid;
-	msg.m_subobjectid = subobjoid;
 
 	pgstat_send(&msg, sizeof(msg));
 }
@@ -1949,31 +1927,21 @@ pgstat_report_replslot_drop(const char *slotname)
 }
 
 /* ----------
- * pgstat_report_subworker_error() -
+ * pgstat_report_subscription_error() -
  *
- *	Tell the collector about the subscription worker error.
+ *	Tell the collector about the subscription error.
  * ----------
  */
 void
-pgstat_report_subworker_error(Oid subid, Oid subrelid, Oid relid,
-							  LogicalRepMsgType command, TransactionId xid,
-							  const char *errmsg)
+pgstat_report_subscription_error(Oid subid, bool is_apply_error)
 {
-	PgStat_MsgSubWorkerError msg;
-	int			len;
+	PgStat_MsgSubscriptionError msg;
 
-	pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_SUBWORKERERROR);
+	pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_SUBSCRIPTIONERROR);
 	msg.m_databaseid = MyDatabaseId;
 	msg.m_subid = subid;
-	msg.m_subrelid = subrelid;
-	msg.m_relid = relid;
-	msg.m_command = command;
-	msg.m_xid = xid;
-	msg.m_timestamp = GetCurrentTimestamp();
-	strlcpy(msg.m_message, errmsg, PGSTAT_SUBWORKERERROR_MSGLEN);
-
-	len = offsetof(PgStat_MsgSubWorkerError, m_message) + strlen(msg.m_message) + 1;
-	pgstat_send(&msg, len);
+	msg.m_is_apply_error = is_apply_error;
+	pgstat_send(&msg, sizeof(PgStat_MsgSubscriptionError));
 }
 
 /* ----------
@@ -3000,32 +2968,29 @@ pgstat_fetch_stat_funcentry(Oid func_id)
 
 /*
  * ---------
- * pgstat_fetch_stat_subworker_entry() -
+ * pgstat_fetch_stat_subentry() -
  *
  *	Support function for the SQL-callable pgstat* functions. Returns
- *	the collected statistics for subscription worker or NULL.
+ *	the collected statistics for subscription or NULL.
  * ---------
  */
-PgStat_StatSubWorkerEntry *
-pgstat_fetch_stat_subworker_entry(Oid subid, Oid subrelid)
+PgStat_StatSubEntry *
+pgstat_fetch_stat_subentry(Oid subid)
 {
 	PgStat_StatDBEntry *dbentry;
-	PgStat_StatSubWorkerEntry *wentry = NULL;
+	PgStat_StatSubEntry *entry = NULL;
 
 	/* Load the stats file if needed */
 	backend_read_statsfile();
 
 	/*
-	 * Lookup our database, then find the requested subscription worker stats.
+	 * Lookup our database, then find the requested subscription stats.
 	 */
 	dbentry = pgstat_fetch_stat_dbentry(MyDatabaseId);
-	if (dbentry != NULL && dbentry->subworkers != NULL)
-	{
-		wentry = pgstat_get_subworker_entry(dbentry, subid, subrelid,
-											false);
-	}
+	if (dbentry != NULL && dbentry->subscriptions != NULL)
+		entry = pgstat_get_sub_entry(dbentry, subid, false);
 
-	return wentry;
+	return entry;
 }
 
 /*
@@ -3738,12 +3703,12 @@ PgstatCollectorMain(int argc, char *argv[])
 					pgstat_recv_disconnect(&msg.msg_disconnect, len);
 					break;
 
-				case PGSTAT_MTYPE_SUBSCRIPTIONPURGE:
-					pgstat_recv_subscription_purge(&msg.msg_subscriptionpurge, len);
+				case PGSTAT_MTYPE_SUBSCRIPTIONERROR:
+					pgstat_recv_subscription_error(&msg.msg_subscriptionerror, len);
 					break;
 
-				case PGSTAT_MTYPE_SUBWORKERERROR:
-					pgstat_recv_subworker_error(&msg.msg_subworkererror, len);
+				case PGSTAT_MTYPE_SUBSCRIPTIONPURGE:
+					pgstat_recv_subscription_purge(&msg.msg_subscriptionpurge, len);
 					break;
 
 				default:
@@ -3791,8 +3756,7 @@ PgstatCollectorMain(int argc, char *argv[])
 /*
  * Subroutine to clear stats in a database entry
  *
- * Tables, functions, and subscription workers hashes are initialized
- * to empty.
+ * Tables, functions, and subscriptions hashes are initialized to empty.
  */
 static void
 reset_dbentry_counters(PgStat_StatDBEntry *dbentry)
@@ -3846,12 +3810,12 @@ reset_dbentry_counters(PgStat_StatDBEntry *dbentry)
 									 &hash_ctl,
 									 HASH_ELEM | HASH_BLOBS);
 
-	hash_ctl.keysize = sizeof(PgStat_StatSubWorkerKey);
-	hash_ctl.entrysize = sizeof(PgStat_StatSubWorkerEntry);
-	dbentry->subworkers = hash_create("Per-database subscription worker",
-									  PGSTAT_SUBWORKER_HASH_SIZE,
-									  &hash_ctl,
-									  HASH_ELEM | HASH_BLOBS);
+	hash_ctl.keysize = sizeof(Oid);
+	hash_ctl.entrysize = sizeof(PgStat_StatSubEntry);
+	dbentry->subscriptions = hash_create("Per-database subscription",
+										 PGSTAT_SUBSCRIPTION_HASH_SIZE,
+										 &hash_ctl,
+										 HASH_ELEM | HASH_BLOBS);
 }
 
 /*
@@ -3876,7 +3840,7 @@ pgstat_get_db_entry(Oid databaseid, bool create)
 
 	/*
 	 * If not found, initialize the new one.  This creates empty hash tables
-	 * for tables, functions, and subscription workers, too.
+	 * for tables, functions, and subscriptions, too.
 	 */
 	if (!found)
 		reset_dbentry_counters(result);
@@ -3935,29 +3899,23 @@ pgstat_get_tab_entry(PgStat_StatDBEntry *dbentry, Oid tableoid, bool create)
 }
 
 /* ----------
- * pgstat_get_subworker_entry
+ * pgstat_get_sub_entry
  *
- * Return subscription worker entry with the given subscription OID and
- * relation OID.  If subrelid is InvalidOid, it returns an entry of the
- * apply worker otherwise returns an entry of the table sync worker
- * associated with subrelid.  If no subscription worker entry exists,
- * initialize it, if the create parameter is true.  Else, return NULL.
+ * Return subscription statistics entry with the given subscription OID.
+ * If no subscription entry exists, initialize it, if the create parameter is
+ * true.  Else, return NULL.
  * ----------
  */
-static PgStat_StatSubWorkerEntry *
-pgstat_get_subworker_entry(PgStat_StatDBEntry *dbentry, Oid subid, Oid subrelid,
-						   bool create)
+static PgStat_StatSubEntry *
+pgstat_get_sub_entry(PgStat_StatDBEntry *dbentry, Oid subid, bool create)
 {
-	PgStat_StatSubWorkerEntry *subwentry;
-	PgStat_StatSubWorkerKey key;
+	PgStat_StatSubEntry *subentry;
 	bool		found;
 	HASHACTION	action = (create ? HASH_ENTER : HASH_FIND);
 
-	key.subid = subid;
-	key.subrelid = subrelid;
-	subwentry = (PgStat_StatSubWorkerEntry *) hash_search(dbentry->subworkers,
-														  (void *) &key,
-														  action, &found);
+	subentry = (PgStat_StatSubEntry *) hash_search(dbentry->subscriptions,
+												   (void *) &subid,
+												   action, &found);
 
 	if (!create && !found)
 		return NULL;
@@ -3965,15 +3923,11 @@ pgstat_get_subworker_entry(PgStat_StatDBEntry *dbentry, Oid subid, Oid subrelid,
 	/* If not found, initialize the new one */
 	if (!found)
 	{
-		subwentry->last_error_relid = InvalidOid;
-		subwentry->last_error_command = 0;
-		subwentry->last_error_xid = InvalidTransactionId;
-		subwentry->last_error_count = 0;
-		subwentry->last_error_time = 0;
-		subwentry->last_error_message[0] = '\0';
+		subentry->apply_error_count = 0;
+		subentry->sync_error_count = 0;
 	}
 
-	return subwentry;
+	return subentry;
 }
 
 /* ----------
@@ -4059,8 +4013,8 @@ pgstat_write_statsfiles(bool permanent, bool allDbs)
 	while ((dbentry = (PgStat_StatDBEntry *) hash_seq_search(&hstat)) != NULL)
 	{
 		/*
-		 * Write out the table, function, and subscription-worker stats for
-		 * this DB into the appropriate per-DB stat file, if required.
+		 * Write out the table, function, and subscription stats for this DB
+		 * into the appropriate per-DB stat file, if required.
 		 */
 		if (allDbs || pgstat_db_requested(dbentry->databaseid))
 		{
@@ -4177,7 +4131,7 @@ pgstat_write_db_statsfile(PgStat_StatDBEntry *dbentry, bool permanent)
 	HASH_SEQ_STATUS sstat;
 	PgStat_StatTabEntry *tabentry;
 	PgStat_StatFuncEntry *funcentry;
-	PgStat_StatSubWorkerEntry *subwentry;
+	PgStat_StatSubEntry *subentry;
 	FILE	   *fpout;
 	int32		format_id;
 	Oid			dbid = dbentry->databaseid;
@@ -4233,13 +4187,13 @@ pgstat_write_db_statsfile(PgStat_StatDBEntry *dbentry, bool permanent)
 	}
 
 	/*
-	 * Walk through the database's subscription worker stats table.
+	 * Walk through the database's subscription stats table.
 	 */
-	hash_seq_init(&sstat, dbentry->subworkers);
-	while ((subwentry = (PgStat_StatSubWorkerEntry *) hash_seq_search(&sstat)) != NULL)
+	hash_seq_init(&sstat, dbentry->subscriptions);
+	while ((subentry = (PgStat_StatSubEntry *) hash_seq_search(&sstat)) != NULL)
 	{
 		fputc('S', fpout);
-		rc = fwrite(subwentry, sizeof(PgStat_StatSubWorkerEntry), 1, fpout);
+		rc = fwrite(subentry, sizeof(PgStat_StatSubEntry), 1, fpout);
 		(void) rc;				/* we'll check for error with ferror */
 	}
 
@@ -4301,8 +4255,8 @@ pgstat_write_db_statsfile(PgStat_StatDBEntry *dbentry, bool permanent)
  *	files after reading; the in-memory status is now authoritative, and the
  *	files would be out of date in case somebody else reads them.
  *
- *	If a 'deep' read is requested, table/function/subscription-worker stats are
- *	read, otherwise the table/function/subscription-worker hash tables remain
+ *	If a 'deep' read is requested, table/function/subscription stats are
+ *	read, otherwise the table/function/subscription hash tables remain
  *	empty.
  * ----------
  */
@@ -4482,7 +4436,7 @@ pgstat_read_statsfiles(Oid onlydb, bool permanent, bool deep)
 				memcpy(dbentry, &dbbuf, sizeof(PgStat_StatDBEntry));
 				dbentry->tables = NULL;
 				dbentry->functions = NULL;
-				dbentry->subworkers = NULL;
+				dbentry->subscriptions = NULL;
 
 				/*
 				 * In the collector, disregard the timestamp we read from the
@@ -4494,7 +4448,7 @@ pgstat_read_statsfiles(Oid onlydb, bool permanent, bool deep)
 					dbentry->stats_timestamp = 0;
 
 				/*
-				 * Don't create tables/functions/subworkers hashtables for
+				 * Don't create tables/functions/subscriptions hashtables for
 				 * uninteresting databases.
 				 */
 				if (onlydb != InvalidOid)
@@ -4520,13 +4474,13 @@ pgstat_read_statsfiles(Oid onlydb, bool permanent, bool deep)
 												 &hash_ctl,
 												 HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
 
-				hash_ctl.keysize = sizeof(PgStat_StatSubWorkerKey);
-				hash_ctl.entrysize = sizeof(PgStat_StatSubWorkerEntry);
+				hash_ctl.keysize = sizeof(Oid);
+				hash_ctl.entrysize = sizeof(PgStat_StatSubEntry);
 				hash_ctl.hcxt = pgStatLocalContext;
-				dbentry->subworkers = hash_create("Per-database subscription worker",
-												  PGSTAT_SUBWORKER_HASH_SIZE,
-												  &hash_ctl,
-												  HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
+				dbentry->subscriptions = hash_create("Per-database subscription",
+													 PGSTAT_SUBSCRIPTION_HASH_SIZE,
+													 &hash_ctl,
+													 HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
 
 				/*
 				 * If requested, read the data from the database-specific
@@ -4536,7 +4490,7 @@ pgstat_read_statsfiles(Oid onlydb, bool permanent, bool deep)
 					pgstat_read_db_statsfile(dbentry->databaseid,
 											 dbentry->tables,
 											 dbentry->functions,
-											 dbentry->subworkers,
+											 dbentry->subscriptions,
 											 permanent);
 
 				break;
@@ -4615,20 +4569,20 @@ done:
  *	removed after reading.
  *
  *	Note: this code has the ability to skip storing per-table, per-function, or
- *	per-subscription-worker data, if NULL is passed for the corresponding hashtable.
+ *	per-subscription data, if NULL is passed for the corresponding hashtable.
  *	That's not used at the moment though.
  * ----------
  */
 static void
 pgstat_read_db_statsfile(Oid databaseid, HTAB *tabhash, HTAB *funchash,
-						 HTAB *subworkerhash, bool permanent)
+						 HTAB *subhash, bool permanent)
 {
 	PgStat_StatTabEntry *tabentry;
 	PgStat_StatTabEntry tabbuf;
 	PgStat_StatFuncEntry funcbuf;
 	PgStat_StatFuncEntry *funcentry;
-	PgStat_StatSubWorkerEntry subwbuf;
-	PgStat_StatSubWorkerEntry *subwentry;
+	PgStat_StatSubEntry subbuf;
+	PgStat_StatSubEntry *subentry;
 	FILE	   *fpin;
 	int32		format_id;
 	bool		found;
@@ -4743,12 +4697,12 @@ pgstat_read_db_statsfile(Oid databaseid, HTAB *tabhash, HTAB *funchash,
 				break;
 
 				/*
-				 * 'S'	A PgStat_StatSubWorkerEntry struct describing
-				 * subscription worker statistics.
+				 * 'S'	A PgStat_StatSubEntry struct describing subscription
+				 * statistics.
 				 */
 			case 'S':
-				if (fread(&subwbuf, 1, sizeof(PgStat_StatSubWorkerEntry),
-						  fpin) != sizeof(PgStat_StatSubWorkerEntry))
+				if (fread(&subbuf, 1, sizeof(PgStat_StatSubEntry),
+						  fpin) != sizeof(PgStat_StatSubEntry))
 				{
 					ereport(pgStatRunningInCollector ? LOG : WARNING,
 							(errmsg("corrupted statistics file \"%s\"",
@@ -4757,14 +4711,14 @@ pgstat_read_db_statsfile(Oid databaseid, HTAB *tabhash, HTAB *funchash,
 				}
 
 				/*
-				 * Skip if subscription worker data not wanted.
+				 * Skip if subscription data not wanted.
 				 */
-				if (subworkerhash == NULL)
+				if (subhash == NULL)
 					break;
 
-				subwentry = (PgStat_StatSubWorkerEntry *) hash_search(subworkerhash,
-																	  (void *) &subwbuf.key,
-																	  HASH_ENTER, &found);
+				subentry = (PgStat_StatSubEntry *) hash_search(subhash,
+															   (void *) &subbuf.subid,
+															   HASH_ENTER, &found);
 
 				if (found)
 				{
@@ -4774,7 +4728,7 @@ pgstat_read_db_statsfile(Oid databaseid, HTAB *tabhash, HTAB *funchash,
 					goto done;
 				}
 
-				memcpy(subwentry, &subwbuf, sizeof(subwbuf));
+				memcpy(subentry, &subbuf, sizeof(subbuf));
 				break;
 
 				/*
@@ -5450,8 +5404,8 @@ pgstat_recv_dropdb(PgStat_MsgDropdb *msg, int len)
 			hash_destroy(dbentry->tables);
 		if (dbentry->functions != NULL)
 			hash_destroy(dbentry->functions);
-		if (dbentry->subworkers != NULL)
-			hash_destroy(dbentry->subworkers);
+		if (dbentry->subscriptions != NULL)
+			hash_destroy(dbentry->subscriptions);
 
 		if (hash_search(pgStatDBHash,
 						(void *) &dbid,
@@ -5489,16 +5443,16 @@ pgstat_recv_resetcounter(PgStat_MsgResetcounter *msg, int len)
 		hash_destroy(dbentry->tables);
 	if (dbentry->functions != NULL)
 		hash_destroy(dbentry->functions);
-	if (dbentry->subworkers != NULL)
-		hash_destroy(dbentry->subworkers);
+	if (dbentry->subscriptions != NULL)
+		hash_destroy(dbentry->subscriptions);
 
 	dbentry->tables = NULL;
 	dbentry->functions = NULL;
-	dbentry->subworkers = NULL;
+	dbentry->subscriptions = NULL;
 
 	/*
 	 * Reset database-level stats, too.  This creates empty hash tables for
-	 * tables, functions, and subscription workers.
+	 * tables, functions, and subscriptions.
 	 */
 	reset_dbentry_counters(dbentry);
 }
@@ -5567,14 +5521,9 @@ pgstat_recv_resetsinglecounter(PgStat_MsgResetsinglecounter *msg, int len)
 	else if (msg->m_resettype == RESET_FUNCTION)
 		(void) hash_search(dbentry->functions, (void *) &(msg->m_objectid),
 						   HASH_REMOVE, NULL);
-	else if (msg->m_resettype == RESET_SUBWORKER)
-	{
-		PgStat_StatSubWorkerKey key;
-
-		key.subid = msg->m_objectid;
-		key.subrelid = msg->m_subobjectid;
-		(void) hash_search(dbentry->subworkers, (void *) &key, HASH_REMOVE, NULL);
-	}
+	else if (msg->m_resettype == RESET_SUBSCRIPTION)
+		(void) hash_search(dbentry->subscriptions, (void *) &(msg->m_objectid),
+						   HASH_REMOVE, NULL);
 }
 
 /* ----------
@@ -6126,73 +6075,48 @@ pgstat_recv_funcpurge(PgStat_MsgFuncpurge *msg, int len)
 static void
 pgstat_recv_subscription_purge(PgStat_MsgSubscriptionPurge *msg, int len)
 {
-	HASH_SEQ_STATUS hstat;
 	PgStat_StatDBEntry *dbentry;
-	PgStat_StatSubWorkerEntry *subwentry;
 
 	dbentry = pgstat_get_db_entry(msg->m_databaseid, false);
 
 	/* No need to purge if we don't even know the database */
-	if (!dbentry || !dbentry->subworkers)
+	if (!dbentry || !dbentry->subscriptions)
 		return;
 
-	/* Remove all subscription worker statistics for the given subscriptions */
-	hash_seq_init(&hstat, dbentry->subworkers);
-	while ((subwentry = (PgStat_StatSubWorkerEntry *) hash_seq_search(&hstat)) != NULL)
+	/*
+	 * Process all subscription entries in the message.
+	 */
+	for (int i = 0; i < msg->m_nentries; i++)
 	{
-		for (int i = 0; i < msg->m_nentries; i++)
-		{
-			if (subwentry->key.subid == msg->m_subids[i])
-			{
-				(void) hash_search(dbentry->subworkers, (void *) &(subwentry->key),
-								   HASH_REMOVE, NULL);
-				break;
-			}
-		}
+		/* Remove from hashtable if present; we don't care if it's not */
+		(void) hash_search(dbentry->subscriptions,
+						   (void *) &(msg->m_subids[i]),
+						   HASH_REMOVE, NULL);
 	}
 }
 
 /* ----------
- * pgstat_recv_subworker_error() -
+ * pgstat_recv_subscription_error() -
  *
- *	Process a SUBWORKERERROR message.
+ *	Process a SUBSCRIPTIONERROR message.
  * ----------
  */
 static void
-pgstat_recv_subworker_error(PgStat_MsgSubWorkerError *msg, int len)
+pgstat_recv_subscription_error(PgStat_MsgSubscriptionError *msg, int len)
 {
 	PgStat_StatDBEntry *dbentry;
-	PgStat_StatSubWorkerEntry *subwentry;
+	PgStat_StatSubEntry *subentry;
 
 	dbentry = pgstat_get_db_entry(msg->m_databaseid, true);
 
-	/* Get the subscription worker stats */
-	subwentry = pgstat_get_subworker_entry(dbentry, msg->m_subid,
-										   msg->m_subrelid, true);
-	Assert(subwentry);
+	/* Get the subscription stats */
+	subentry = pgstat_get_sub_entry(dbentry, msg->m_subid, true);
+	Assert(subentry);
 
-	if (subwentry->last_error_relid == msg->m_relid &&
-		subwentry->last_error_command == msg->m_command &&
-		subwentry->last_error_xid == msg->m_xid &&
-		strcmp(subwentry->last_error_message, msg->m_message) == 0)
-	{
-		/*
-		 * The same error occurred again in succession, just update its
-		 * timestamp and count.
-		 */
-		subwentry->last_error_count++;
-		subwentry->last_error_time = msg->m_timestamp;
-		return;
-	}
-
-	/* Otherwise, update the error information */
-	subwentry->last_error_relid = msg->m_relid;
-	subwentry->last_error_command = msg->m_command;
-	subwentry->last_error_xid = msg->m_xid;
-	subwentry->last_error_count = 1;
-	subwentry->last_error_time = msg->m_timestamp;
-	strlcpy(subwentry->last_error_message, msg->m_message,
-			PGSTAT_SUBWORKERERROR_MSGLEN);
+	if (msg->m_is_apply_error)
+		subentry->apply_error_count++;
+	else
+		subentry->sync_error_count++;
 }
 
 /* ----------
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 5d9acc6173..e63118abaa 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -3377,7 +3377,6 @@ void
 ApplyWorkerMain(Datum main_arg)
 {
 	int			worker_slot = DatumGetInt32(main_arg);
-	MemoryContext cctx = CurrentMemoryContext;
 	MemoryContext oldctx;
 	char		originname[NAMEDATALEN];
 	XLogRecPtr	origin_startpos;
@@ -3485,20 +3484,15 @@ ApplyWorkerMain(Datum main_arg)
 		}
 		PG_CATCH();
 		{
-			MemoryContext ecxt = MemoryContextSwitchTo(cctx);
-			ErrorData  *errdata = CopyErrorData();
-
 			/*
-			 * Report the table sync error. There is no corresponding message
-			 * type for table synchronization.
+			 * Abort the current transaction so that we send the stats message
+			 * in an idle state.
 			 */
-			pgstat_report_subworker_error(MyLogicalRepWorker->subid,
-										  MyLogicalRepWorker->relid,
-										  MyLogicalRepWorker->relid,
-										  0,	/* message type */
-										  InvalidTransactionId,
-										  errdata->message);
-			MemoryContextSwitchTo(ecxt);
+			AbortOutOfAnyTransaction();
+
+			/* Report the worker failed during table synchronization */
+			pgstat_report_subscription_error(MyLogicalRepWorker->subid, false);
+
 			PG_RE_THROW();
 		}
 		PG_END_TRY();
@@ -3625,22 +3619,14 @@ ApplyWorkerMain(Datum main_arg)
 	}
 	PG_CATCH();
 	{
-		/* report the apply error */
-		if (apply_error_callback_arg.command != 0)
-		{
-			MemoryContext ecxt = MemoryContextSwitchTo(cctx);
-			ErrorData  *errdata = CopyErrorData();
-
-			pgstat_report_subworker_error(MyLogicalRepWorker->subid,
-										  MyLogicalRepWorker->relid,
-										  apply_error_callback_arg.rel != NULL
-										  ? apply_error_callback_arg.rel->localreloid
-										  : InvalidOid,
-										  apply_error_callback_arg.command,
-										  apply_error_callback_arg.remote_xid,
-										  errdata->message);
-			MemoryContextSwitchTo(ecxt);
-		}
+		/*
+		 * Abort the current transaction so that we send the stats message in
+		 * an idle state.
+		 */
+		AbortOutOfAnyTransaction();
+
+		/* Report the worker failed during the application of the change */
+		pgstat_report_subscription_error(MyLogicalRepWorker->subid, true);
 
 		PG_RE_THROW();
 	}
diff --git a/src/backend/utils/adt/pgstatfuncs.c b/src/backend/utils/adt/pgstatfuncs.c
index 30e8dfa7c1..a4e446dd6e 100644
--- a/src/backend/utils/adt/pgstatfuncs.c
+++ b/src/backend/utils/adt/pgstatfuncs.c
@@ -2163,7 +2163,7 @@ pg_stat_reset_single_table_counters(PG_FUNCTION_ARGS)
 {
 	Oid			taboid = PG_GETARG_OID(0);
 
-	pgstat_reset_single_counter(taboid, InvalidOid, RESET_TABLE);
+	pgstat_reset_single_counter(taboid, RESET_TABLE);
 
 	PG_RETURN_VOID();
 }
@@ -2173,38 +2173,21 @@ pg_stat_reset_single_function_counters(PG_FUNCTION_ARGS)
 {
 	Oid			funcoid = PG_GETARG_OID(0);
 
-	pgstat_reset_single_counter(funcoid, InvalidOid, RESET_FUNCTION);
+	pgstat_reset_single_counter(funcoid, RESET_FUNCTION);
 
 	PG_RETURN_VOID();
 }
 
 Datum
-pg_stat_reset_subscription_worker_subrel(PG_FUNCTION_ARGS)
+pg_stat_reset_single_subscription_counters(PG_FUNCTION_ARGS)
 {
 	Oid			subid = PG_GETARG_OID(0);
-	Oid			relid = PG_ARGISNULL(1) ? InvalidOid : PG_GETARG_OID(1);
 
-	pgstat_reset_single_counter(subid, relid, RESET_SUBWORKER);
+	pgstat_reset_single_counter(subid, RESET_SUBSCRIPTION);
 
 	PG_RETURN_VOID();
 }
 
-/* Reset all subscription worker stats associated with the given subscription */
-Datum
-pg_stat_reset_subscription_worker_sub(PG_FUNCTION_ARGS)
-{
-	Oid			subid = PG_GETARG_OID(0);
-
-	/*
-	 * Use subscription drop message to remove statistics of all subscription
-	 * workers.
-	 */
-	pgstat_report_subscription_drop(subid);
-
-	PG_RETURN_VOID();
-}
-
-
 /* Reset SLRU counters (a specific one or all of them). */
 Datum
 pg_stat_reset_slru(PG_FUNCTION_ARGS)
@@ -2400,97 +2383,50 @@ pg_stat_get_replication_slot(PG_FUNCTION_ARGS)
 }
 
 /*
- * Get the subscription worker statistics for the given subscription
- * (and relation).
+ * Get the subscription activity statistics for the given subscription.
  */
 Datum
-pg_stat_get_subscription_worker(PG_FUNCTION_ARGS)
+pg_stat_get_subscription_activity(PG_FUNCTION_ARGS)
 {
-#define PG_STAT_GET_SUBSCRIPTION_WORKER_COLS	8
+#define PG_STAT_GET_SUBSCRIPTION_ACTIVITY_COLS	3
 	Oid			subid = PG_GETARG_OID(0);
-	Oid			subrelid;
 	TupleDesc	tupdesc;
-	Datum		values[PG_STAT_GET_SUBSCRIPTION_WORKER_COLS];
-	bool		nulls[PG_STAT_GET_SUBSCRIPTION_WORKER_COLS];
-	PgStat_StatSubWorkerEntry *wentry;
-	int			i;
-
-	if (PG_ARGISNULL(1))
-		subrelid = InvalidOid;
-	else
-		subrelid = PG_GETARG_OID(1);
+	Datum		values[PG_STAT_GET_SUBSCRIPTION_ACTIVITY_COLS];
+	bool		nulls[PG_STAT_GET_SUBSCRIPTION_ACTIVITY_COLS];
+	PgStat_StatSubEntry *entry;
 
-	/* Get subscription worker stats */
-	wentry = pgstat_fetch_stat_subworker_entry(subid, subrelid);
-
-	/* Return NULL if there is no worker statistics */
-	if (wentry == NULL)
-		PG_RETURN_NULL();
+	/* Get subscription stats */
+	entry = pgstat_fetch_stat_subentry(subid);
 
 	/* Initialise attributes information in the tuple descriptor */
-	tupdesc = CreateTemplateTupleDesc(PG_STAT_GET_SUBSCRIPTION_WORKER_COLS);
+	tupdesc = CreateTemplateTupleDesc(PG_STAT_GET_SUBSCRIPTION_ACTIVITY_COLS);
 	TupleDescInitEntry(tupdesc, (AttrNumber) 1, "subid",
 					   OIDOID, -1, 0);
-	TupleDescInitEntry(tupdesc, (AttrNumber) 2, "subrelid",
-					   OIDOID, -1, 0);
-	TupleDescInitEntry(tupdesc, (AttrNumber) 3, "last_error_relid",
-					   OIDOID, -1, 0);
-	TupleDescInitEntry(tupdesc, (AttrNumber) 4, "last_error_command",
-					   TEXTOID, -1, 0);
-	TupleDescInitEntry(tupdesc, (AttrNumber) 5, "last_error_xid",
-					   XIDOID, -1, 0);
-	TupleDescInitEntry(tupdesc, (AttrNumber) 6, "last_error_count",
+	TupleDescInitEntry(tupdesc, (AttrNumber) 2, "apply_error_count",
+					   INT8OID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 3, "sync_error_count",
 					   INT8OID, -1, 0);
-	TupleDescInitEntry(tupdesc, (AttrNumber) 7, "last_error_message",
-					   TEXTOID, -1, 0);
-	TupleDescInitEntry(tupdesc, (AttrNumber) 8, "last_error_time",
-					   TIMESTAMPTZOID, -1, 0);
 	BlessTupleDesc(tupdesc);
 
 	/* Initialise values and NULL flags arrays */
 	MemSet(values, 0, sizeof(values));
 	MemSet(nulls, 0, sizeof(nulls));
 
-	i = 0;
 	/* subid */
-	values[i++] = ObjectIdGetDatum(subid);
-
-	/* subrelid */
-	if (OidIsValid(subrelid))
-		values[i++] = ObjectIdGetDatum(subrelid);
-	else
-		nulls[i++] = true;
-
-	/* last_error_relid */
-	if (OidIsValid(wentry->last_error_relid))
-		values[i++] = ObjectIdGetDatum(wentry->last_error_relid);
-	else
-		nulls[i++] = true;
-
-	/* last_error_command */
-	if (wentry->last_error_command != 0)
-		values[i++] =
-			CStringGetTextDatum(logicalrep_message_type(wentry->last_error_command));
-	else
-		nulls[i++] = true;
-
-	/* last_error_xid */
-	if (TransactionIdIsValid(wentry->last_error_xid))
-		values[i++] = TransactionIdGetDatum(wentry->last_error_xid);
-	else
-		nulls[i++] = true;
+	values[0] = ObjectIdGetDatum(subid);
 
-	/* last_error_count */
-	values[i++] = Int64GetDatum(wentry->last_error_count);
-
-	/* last_error_message */
-	values[i++] = CStringGetTextDatum(wentry->last_error_message);
+	/*
+	 * Fill the subscription statistics.  Return all-zero stats if there is no
+	 * subscription statistics entry.
+	 */
+	if (entry != NULL)
+	{
+		/* apply_error_count */
+		values[1] = Int64GetDatum(entry->apply_error_count);
 
-	/* last_error_time */
-	if (wentry->last_error_time != 0)
-		values[i++] = TimestampTzGetDatum(wentry->last_error_time);
-	else
-		nulls[i++] = true;
+		/* sync_error_count */
+		values[2] = Int64GetDatum(entry->sync_error_count);
+	}
 
 	/* Returns the record as Datum */
 	PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls)));
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 7f1ee97f55..6f8fa91b37 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -5377,14 +5377,14 @@
   proargmodes => '{i,o,o,o,o,o,o,o,o,o,o}',
   proargnames => '{slot_name,slot_name,spill_txns,spill_count,spill_bytes,stream_txns,stream_count,stream_bytes,total_txns,total_bytes,stats_reset}',
   prosrc => 'pg_stat_get_replication_slot' },
-{ oid => '8523', descr => 'statistics: information about subscription worker',
-  proname => 'pg_stat_get_subscription_worker', prorows => '1', proisstrict => 'f',
-  proretset => 't', provolatile => 's', proparallel => 'r',
-  prorettype => 'record', proargtypes => 'oid oid',
-  proallargtypes => '{oid,oid,oid,oid,oid,text,xid,int8,text,timestamptz}',
-  proargmodes => '{i,i,o,o,o,o,o,o,o,o}',
-  proargnames => '{subid,subrelid,subid,subrelid,last_error_relid,last_error_command,last_error_xid,last_error_count,last_error_message,last_error_time}',
-  prosrc => 'pg_stat_get_subscription_worker' },
+{ oid => '8523', descr => 'statistics: information about subscription activity',
+  proname => 'pg_stat_get_subscription_activity', proisstrict => 'f',
+  provolatile => 's', proparallel => 'r',
+  prorettype => 'record', proargtypes => 'oid',
+  proallargtypes => '{oid,oid,int8,int8}',
+  proargmodes => '{i,o,o,o}',
+  proargnames => '{subid,subid,apply_error_count,sync_error_count}',
+  prosrc => 'pg_stat_get_subscription_activity' },
 { oid => '6118', descr => 'statistics: information about subscription',
   proname => 'pg_stat_get_subscription', prorows => '10', proisstrict => 'f',
   proretset => 't', provolatile => 's', proparallel => 'r',
@@ -5773,15 +5773,10 @@
   provolatile => 'v', prorettype => 'void', proargtypes => 'text',
   prosrc => 'pg_stat_reset_replication_slot' },
 { oid => '8524',
-  descr => 'statistics: reset collected statistics for a single subscription worker',
-  proname => 'pg_stat_reset_subscription_worker', proisstrict => 'f',
-  provolatile => 'v', prorettype => 'void', proargtypes => 'oid oid',
-  prosrc => 'pg_stat_reset_subscription_worker_subrel' },
-{ oid => '8525',
-  descr => 'statistics: reset all collected statistics for a single subscription',
-  proname => 'pg_stat_reset_subscription_worker',
+  descr => 'statistics: reset collected statistics for a single subscription',
+  proname => 'pg_stat_reset_single_subscription_counters',
   provolatile => 'v', prorettype => 'void', proargtypes => 'oid',
-  prosrc => 'pg_stat_reset_subscription_worker_sub' },
+  prosrc => 'pg_stat_reset_single_subscription_counters' },
 
 { oid => '3163', descr => 'current trigger depth',
   proname => 'pg_trigger_depth', provolatile => 's', proparallel => 'r',
diff --git a/src/include/pgstat.h b/src/include/pgstat.h
index e10d20222a..9fae64e15e 100644
--- a/src/include/pgstat.h
+++ b/src/include/pgstat.h
@@ -84,8 +84,8 @@ typedef enum StatMsgType
 	PGSTAT_MTYPE_REPLSLOT,
 	PGSTAT_MTYPE_CONNECT,
 	PGSTAT_MTYPE_DISCONNECT,
+	PGSTAT_MTYPE_SUBSCRIPTIONERROR,
 	PGSTAT_MTYPE_SUBSCRIPTIONPURGE,
-	PGSTAT_MTYPE_SUBWORKERERROR,
 } StatMsgType;
 
 /* ----------
@@ -149,7 +149,7 @@ typedef enum PgStat_Single_Reset_Type
 {
 	RESET_TABLE,
 	RESET_FUNCTION,
-	RESET_SUBWORKER
+	RESET_SUBSCRIPTION
 } PgStat_Single_Reset_Type;
 
 /* ------------------------------------------------------------
@@ -368,7 +368,6 @@ typedef struct PgStat_MsgResetsinglecounter
 	Oid			m_databaseid;
 	PgStat_Single_Reset_Type m_resettype;
 	Oid			m_objectid;
-	Oid			m_subobjectid;
 } PgStat_MsgResetsinglecounter;
 
 /* ----------
@@ -558,37 +557,18 @@ typedef struct PgStat_MsgSubscriptionPurge
 } PgStat_MsgSubscriptionPurge;
 
 /* ----------
- * PgStat_MsgSubWorkerError		Sent by the apply worker or the table sync
- *								worker to report the error occurred while
- *								processing changes.
+ * PgStat_MsgSubscriptionError	Sent by the apply worker or the table sync
+ *								worker to report an error on the subscription.
  * ----------
  */
-#define PGSTAT_SUBWORKERERROR_MSGLEN 256
-typedef struct PgStat_MsgSubWorkerError
+typedef struct PgStat_MsgSubscriptionError
 {
 	PgStat_MsgHdr m_hdr;
 
-	/*
-	 * m_subid and m_subrelid are used to determine the subscription and the
-	 * reporter of the error. m_subrelid is InvalidOid if reported by an apply
-	 * worker otherwise reported by a table sync worker.
-	 */
 	Oid			m_databaseid;
 	Oid			m_subid;
-	Oid			m_subrelid;
-
-	/*
-	 * Oid of the table that the reporter was actually processing. m_relid can
-	 * be InvalidOid if an error occurred during worker applying a
-	 * non-data-modification message such as RELATION.
-	 */
-	Oid			m_relid;
-
-	LogicalRepMsgType m_command;
-	TransactionId m_xid;
-	TimestampTz m_timestamp;
-	char		m_message[PGSTAT_SUBWORKERERROR_MSGLEN];
-} PgStat_MsgSubWorkerError;
+	bool		m_is_apply_error;
+} PgStat_MsgSubscriptionError;
 
 /* ----------
  * PgStat_MsgRecoveryConflict	Sent by the backend upon recovery conflict
@@ -767,8 +747,8 @@ typedef union PgStat_Msg
 	PgStat_MsgReplSlot msg_replslot;
 	PgStat_MsgConnect msg_connect;
 	PgStat_MsgDisconnect msg_disconnect;
+	PgStat_MsgSubscriptionError msg_subscriptionerror;
 	PgStat_MsgSubscriptionPurge msg_subscriptionpurge;
-	PgStat_MsgSubWorkerError msg_subworkererror;
 } PgStat_Msg;
 
 
@@ -823,16 +803,12 @@ typedef struct PgStat_StatDBEntry
 	TimestampTz stats_timestamp;	/* time of db stats file update */
 
 	/*
-	 * tables, functions, and subscription workers must be last in the struct,
-	 * because we don't write the pointers out to the stats file.
-	 *
-	 * subworkers is the hash table of PgStat_StatSubWorkerEntry which stores
-	 * statistics of logical replication workers: apply worker and table sync
-	 * worker.
+	 * tables, functions, and subscription must be last in the struct, because
+	 * we don't write the pointers out to the stats file.
 	 */
 	HTAB	   *tables;
 	HTAB	   *functions;
-	HTAB	   *subworkers;
+	HTAB	   *subscriptions;
 } PgStat_StatDBEntry;
 
 
@@ -887,6 +863,16 @@ typedef struct PgStat_StatFuncEntry
 	PgStat_Counter f_self_time;
 } PgStat_StatFuncEntry;
 
+/*
+ * Subscription statistics kept in the stats collector.
+ */
+typedef struct PgStat_StatSubEntry
+{
+	Oid			subid;			/* hash key (must be first) */
+
+	PgStat_Counter apply_error_count;
+	PgStat_Counter sync_error_count;
+} PgStat_StatSubEntry;
 
 /*
  * Archiver statistics kept in the stats collector
@@ -989,39 +975,6 @@ typedef struct PgStat_StatReplSlotEntry
 	TimestampTz stat_reset_timestamp;
 } PgStat_StatReplSlotEntry;
 
-/* The lookup key for subscription worker hash table */
-typedef struct PgStat_StatSubWorkerKey
-{
-	Oid			subid;
-
-	/*
-	 * Oid of the table for which tablesync worker will copy the initial data.
-	 * An InvalidOid will be assigned for apply workers.
-	 */
-	Oid			subrelid;
-} PgStat_StatSubWorkerKey;
-
-/*
- * Logical replication apply worker and table sync worker statistics kept in the
- * stats collector.
- */
-typedef struct PgStat_StatSubWorkerEntry
-{
-	PgStat_StatSubWorkerKey key;	/* hash key (must be first) */
-
-	/*
-	 * Subscription worker error statistics representing an error that
-	 * occurred during application of changes or the initial table
-	 * synchronization.
-	 */
-	Oid			last_error_relid;
-	LogicalRepMsgType last_error_command;
-	TransactionId last_error_xid;
-	PgStat_Counter last_error_count;
-	TimestampTz last_error_time;
-	char		last_error_message[PGSTAT_SUBWORKERERROR_MSGLEN];
-} PgStat_StatSubWorkerEntry;
-
 /*
  * Working state needed to accumulate per-function-call timing statistics.
  */
@@ -1111,8 +1064,7 @@ extern void pgstat_drop_database(Oid databaseid);
 extern void pgstat_clear_snapshot(void);
 extern void pgstat_reset_counters(void);
 extern void pgstat_reset_shared_counters(const char *);
-extern void pgstat_reset_single_counter(Oid objectid, Oid subobjectid,
-										PgStat_Single_Reset_Type type);
+extern void pgstat_reset_single_counter(Oid objectid, PgStat_Single_Reset_Type type);
 extern void pgstat_reset_slru_counter(const char *);
 extern void pgstat_reset_replslot_counter(const char *name);
 
@@ -1131,9 +1083,7 @@ extern void pgstat_report_checksum_failure(void);
 extern void pgstat_report_replslot(const PgStat_StatReplSlotEntry *repSlotStat);
 extern void pgstat_report_replslot_create(const char *slotname);
 extern void pgstat_report_replslot_drop(const char *slotname);
-extern void pgstat_report_subworker_error(Oid subid, Oid subrelid, Oid relid,
-										  LogicalRepMsgType command,
-										  TransactionId xid, const char *errmsg);
+extern void pgstat_report_subscription_error(Oid subid, bool is_apply_error);
 extern void pgstat_report_subscription_drop(Oid subid);
 
 extern void pgstat_initialize(void);
@@ -1226,8 +1176,7 @@ extern void pgstat_send_wal(bool force);
 extern PgStat_StatDBEntry *pgstat_fetch_stat_dbentry(Oid dbid);
 extern PgStat_StatTabEntry *pgstat_fetch_stat_tabentry(Oid relid);
 extern PgStat_StatFuncEntry *pgstat_fetch_stat_funcentry(Oid funcid);
-extern PgStat_StatSubWorkerEntry *pgstat_fetch_stat_subworker_entry(Oid subid,
-																	Oid subrelid);
+extern PgStat_StatSubEntry *pgstat_fetch_stat_subentry(Oid subid);
 extern PgStat_ArchiverStats *pgstat_fetch_stat_archiver(void);
 extern PgStat_BgWriterStats *pgstat_fetch_stat_bgwriter(void);
 extern PgStat_CheckpointerStats *pgstat_fetch_stat_checkpointer(void);
diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out
index 1420288d67..d24180f9b7 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -2072,24 +2072,12 @@ pg_stat_subscription| SELECT su.oid AS subid,
     st.latest_end_time
    FROM (pg_subscription su
      LEFT JOIN pg_stat_get_subscription(NULL::oid) st(subid, relid, pid, received_lsn, last_msg_send_time, last_msg_receipt_time, latest_end_lsn, latest_end_time) ON ((st.subid = su.oid)));
-pg_stat_subscription_workers| SELECT w.subid,
+pg_stat_subscription_activity| SELECT a.subid,
     s.subname,
-    w.subrelid,
-    w.last_error_relid,
-    w.last_error_command,
-    w.last_error_xid,
-    w.last_error_count,
-    w.last_error_message,
-    w.last_error_time
-   FROM ( SELECT pg_subscription.oid AS subid,
-            NULL::oid AS relid
-           FROM pg_subscription
-        UNION ALL
-         SELECT pg_subscription_rel.srsubid AS subid,
-            pg_subscription_rel.srrelid AS relid
-           FROM pg_subscription_rel) sr,
-    (LATERAL pg_stat_get_subscription_worker(sr.subid, sr.relid) w(subid, subrelid, last_error_relid, last_error_command, last_error_xid, last_error_count, last_error_message, last_error_time)
-     JOIN pg_subscription s ON ((w.subid = s.oid)));
+    a.apply_error_count,
+    a.sync_error_count
+   FROM pg_subscription s,
+    LATERAL pg_stat_get_subscription_activity(s.oid) a(subid, apply_error_count, sync_error_count);
 pg_stat_sys_indexes| SELECT pg_stat_all_indexes.relid,
     pg_stat_all_indexes.indexrelid,
     pg_stat_all_indexes.schemaname,
diff --git a/src/test/subscription/t/026_worker_stats.pl b/src/test/subscription/t/026_worker_stats.pl
index f72e4766e8..de706ca818 100644
--- a/src/test/subscription/t/026_worker_stats.pl
+++ b/src/test/subscription/t/026_worker_stats.pl
@@ -1,45 +1,13 @@
 
 # Copyright (c) 2021-2022, PostgreSQL Global Development Group
 
-# Tests for subscription error stats.
+# Tests for subscription stats.
 use strict;
 use warnings;
 use PostgreSQL::Test::Cluster;
 use PostgreSQL::Test::Utils;
 use Test::More;
 
-# Test if the error reported on pg_stat_subscription_workers view is expected.
-sub test_subscription_error
-{
-    my ($node, $relname, $command, $xid, $by_apply_worker, $errmsg_prefix, $msg)
-	= @_;
-
-    my $check_sql = qq[
-SELECT count(1) > 0
-FROM pg_stat_subscription_workers
-WHERE last_error_relid = '$relname'::regclass
-    AND starts_with(last_error_message, '$errmsg_prefix')];
-
-    # subrelid
-    $check_sql .= $by_apply_worker
-	? qq[ AND subrelid IS NULL]
-	: qq[ AND subrelid = '$relname'::regclass];
-
-    # last_error_command
-    $check_sql .= $command eq ''
-	? qq[ AND last_error_command IS NULL]
-	: qq[ AND last_error_command = '$command'];
-
-    # last_error_xid
-    $check_sql .= $xid eq ''
-	? qq[ AND last_error_xid IS NULL]
-	: qq[ AND last_error_xid = '$xid'::xid];
-
-    # Wait for the particular error statistics to be reported.
-    $node->poll_query_until('postgres', $check_sql,
-) or die "Timed out while waiting for " . $msg;
-}
-
 # Create publisher node.
 my $node_publisher = PostgreSQL::Test::Cluster->new('publisher');
 $node_publisher->init(allows_streaming => 'logical');
@@ -48,116 +16,83 @@ $node_publisher->start;
 # Create subscriber node.
 my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
 $node_subscriber->init(allows_streaming => 'logical');
-
-# The subscriber will enter an infinite error loop, so we don't want
-# to overflow the server log with error messages.
-$node_subscriber->append_conf('postgresql.conf',
-			      qq[
-wal_retrieve_retry_interval = 2s
-]);
 $node_subscriber->start;
 
 # Initial table setup on both publisher and subscriber. On subscriber we
 # create the same tables but with primary keys. Also, insert some data that
 # will conflict with the data replicated from publisher later.
 $node_publisher->safe_psql(
-    'postgres',
-    qq[
+	'postgres',
+	qq[
 BEGIN;
 CREATE TABLE test_tab1 (a int);
-CREATE TABLE test_tab2 (a int);
 INSERT INTO test_tab1 VALUES (1);
-INSERT INTO test_tab2 VALUES (1);
 COMMIT;
 ]);
 $node_subscriber->safe_psql(
-    'postgres',
-    qq[
+	'postgres',
+	qq[
 BEGIN;
 CREATE TABLE test_tab1 (a int primary key);
-CREATE TABLE test_tab2 (a int primary key);
-INSERT INTO test_tab2 VALUES (1);
+INSERT INTO test_tab1 VALUES (1);
 COMMIT;
 ]);
 
-# Setup publications.
+# Setup publication.
 my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
-$node_publisher->safe_psql(
-    'postgres',
-    "CREATE PUBLICATION tap_pub FOR TABLE test_tab1, test_tab2;");
+$node_publisher->safe_psql('postgres',
+	"CREATE PUBLICATION tap_pub FOR TABLE test_tab1;");
 
 # There shouldn't be any subscription errors before starting logical replication.
-my $result = $node_subscriber->safe_psql(
-    'postgres',
-    "SELECT count(1) FROM pg_stat_subscription_workers");
+my $result = $node_subscriber->safe_psql('postgres',
+	"SELECT count(1) FROM pg_stat_subscription_activity");
 is($result, qq(0), 'check no subscription error');
 
-# Create subscription. The table sync for test_tab2 on tap_sub will enter into
+# Create subscription. The table sync for test_tab1 on tap_sub will enter into
 # infinite error loop due to violating the unique constraint.
-$node_subscriber->safe_psql(
-    'postgres',
-    "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub;");
+$node_subscriber->safe_psql('postgres',
+	"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub;"
+);
 
 $node_publisher->wait_for_catchup('tap_sub');
 
+# Wait for the table sync error to be reported.
+$node_subscriber->poll_query_until(
+	'postgres',
+	qq[
+SELECT apply_error_count = 0 AND sync_error_count > 0
+FROM pg_stat_subscription_activity
+WHERE subname = 'tap_sub'
+]) or die "Timed out while waiting for table sync error";
+
+# Truncate test_tab1 so that table sync can continue.
+$node_subscriber->safe_psql('postgres', "TRUNCATE test_tab1;");
+
 # Wait for initial table sync for test_tab1 to finish.
 $node_subscriber->poll_query_until(
-    'postgres',
-    qq[
+	'postgres',
+	qq[
 SELECT count(1) = 1 FROM pg_subscription_rel
 WHERE srrelid = 'test_tab1'::regclass AND srsubstate in ('r', 's')
 ]) or die "Timed out while waiting for subscriber to synchronize data";
 
-# Check the initial data.
-$result = $node_subscriber->safe_psql(
-    'postgres',
-    "SELECT count(a) FROM test_tab1");
-is($result, q(1), 'check initial data are copied to subscriber');
-
-# Insert more data to test_tab1, raising an error on the subscriber due to
-# violation of the unique constraint on test_tab1.
-my $xid = $node_publisher->safe_psql(
-    'postgres',
-    qq[
-BEGIN;
-INSERT INTO test_tab1 VALUES (1);
-SELECT pg_current_xact_id()::xid;
-COMMIT;
-]);
-test_subscription_error($node_subscriber, 'test_tab1', 'INSERT', $xid,
-			1,	# check apply worker error
-			qq(duplicate key value violates unique constraint),
-			'error reported by the apply worker');
-
-# Check the table sync worker's error in the view.
-test_subscription_error($node_subscriber, 'test_tab2', '', '',
-			0,	# check tablesync worker error
-			qq(duplicate key value violates unique constraint),
-			'the error reported by the table sync worker');
-
-# Test for resetting subscription worker statistics.
-# Truncate test_tab1 and test_tab2 so that applying changes and table sync can
-# continue, respectively.
-$node_subscriber->safe_psql(
-    'postgres',
-    "TRUNCATE test_tab1, test_tab2;");
+# Insert more data to test_tab1 on the subscriber and then on the publisher, raising an
+# error on the subscriber due to violation of the unique constraint on test_tab1.
+$node_subscriber->safe_psql('postgres', "INSERT INTO test_tab1 VALUES (2)");
 
-# Wait for the data to be replicated.
-$node_subscriber->poll_query_until(
-    'postgres',
-    "SELECT count(1) > 0 FROM test_tab1");
-$node_subscriber->poll_query_until(
-    'postgres',
-    "SELECT count(1) > 0 FROM test_tab2");
+$node_publisher->safe_psql('postgres', "INSERT INTO test_tab1 VALUES (1)");
 
-# There shouldn't be any errors in the view after dropping the subscription.
-$node_subscriber->safe_psql(
-    'postgres',
-    "DROP SUBSCRIPTION tap_sub;");
-$result = $node_subscriber->safe_psql(
-    'postgres',
-    "SELECT count(1) FROM pg_stat_subscription_workers");
-is($result, q(0), 'no error after dropping subscription');
+# Wait for the apply error to be reported.
+$node_subscriber->poll_query_until(
+	'postgres',
+	qq[
+SELECT apply_error_count > 0 AND sync_error_count > 0
+FROM pg_stat_subscription_activity
+WHERE subname = 'tap_sub'
+]) or die "Timed out while waiting for apply error";
+
+# Truncate test_tab1 so that table sync can continue.
+$node_subscriber->safe_psql('postgres', "TRUNCATE test_tab1;");
 
 $node_subscriber->stop('fast');
 $node_publisher->stop('fast');
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 15684f53ba..94b099df5e 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -1944,8 +1944,8 @@ PgStat_MsgResetsharedcounter
 PgStat_MsgResetsinglecounter
 PgStat_MsgResetslrucounter
 PgStat_MsgSLRU
+PgStat_MsgSubscriptionError
 PgStat_MsgSubscriptionPurge
-PgStat_MsgSubWorkerError
 PgStat_MsgTabpurge
 PgStat_MsgTabstat
 PgStat_MsgTempFile
@@ -1957,8 +1957,7 @@ PgStat_Single_Reset_Type
 PgStat_StatDBEntry
 PgStat_StatFuncEntry
 PgStat_StatReplSlotEntry
-PgStat_StatSubWorkerEntry
-PgStat_StatSubWorkerKey
+PgStat_StatSubEntry
 PgStat_StatTabEntry
 PgStat_SubXactStatus
 PgStat_TableCounts
-- 
2.24.3 (Apple Git-128)

