From a1f3df6acf4306ee1f05296596ddcfc057dc8201 Mon Sep 17 00:00:00 2001
From: Hou Zhijie <houzj.fnst@cn.fujitsu.com>
Date: Thu, 2 Feb 2023 16:29:28 +0800
Subject: [PATCH v92 2/2] Retry to apply streaming xact only in apply worker

When the subscription parameter is set streaming=parallel, the logic tries to
apply the streaming transaction using a parallel apply worker. If this
fails the parallel worker exits with an error.

In this case, retry applying the streaming transaction using the normal
streaming=on mode. This is done to avoid getting caught in a loop of the same
retry errors.

A new flag field "subretry" has been introduced to catalog "pg_subscription".
If there are any active parallel apply workers and the subscriber exits with an
error, this flag will be set true, and whenever the transaction is applied
successfully, this flag is reset false.
Now, when deciding how to apply a streaming transaction, the logic can know if
this transaction has previously failed or not (by checking the "subretry"
field).

Note: Since we add a new field 'subretry' to catalog 'pg_subscription' has been
expanded, we need bump catalog version.
---
 doc/src/sgml/catalogs.sgml                         |  10 ++
 doc/src/sgml/logical-replication.sgml              |  11 +-
 doc/src/sgml/ref/create_subscription.sgml          |   5 +
 src/backend/catalog/pg_subscription.c              |   1 +
 src/backend/catalog/system_views.sql               |   2 +-
 src/backend/commands/subscriptioncmds.c            |   1 +
 .../replication/logical/applyparallelworker.c      |  30 ++++
 src/backend/replication/logical/worker.c           | 182 +++++++++++++++------
 src/bin/pg_dump/pg_dump.c                          |   5 +-
 src/include/catalog/pg_subscription.h              |   7 +
 src/include/replication/worker_internal.h          |   2 +
 11 files changed, 200 insertions(+), 56 deletions(-)

diff --git a/doc/src/sgml/catalogs.sgml b/doc/src/sgml/catalogs.sgml
index c1e4048..d918327 100644
--- a/doc/src/sgml/catalogs.sgml
+++ b/doc/src/sgml/catalogs.sgml
@@ -7950,6 +7950,16 @@ SCRAM-SHA-256$<replaceable>&lt;iteration count&gt;</replaceable>:<replaceable>&l
 
      <row>
       <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>subretry</structfield> <type>bool</type>
+      </para>
+      <para>
+       True if previous change failed to be applied while there were any
+       active parallel apply workers, necessitating a retry.
+      </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
        <structfield>subconninfo</structfield> <type>text</type>
       </para>
       <para>
diff --git a/doc/src/sgml/logical-replication.sgml b/doc/src/sgml/logical-replication.sgml
index 1bd5660..78515e9 100644
--- a/doc/src/sgml/logical-replication.sgml
+++ b/doc/src/sgml/logical-replication.sgml
@@ -1504,12 +1504,11 @@ CONTEXT:  processing remote data for replication origin "pg_16395" during "INSER
 
   <para>
    When the streaming mode is <literal>parallel</literal>, the finish LSN of
-   failed transactions may not be logged. In that case, it may be necessary to
-   change the streaming mode to <literal>on</literal> or <literal>off</literal> and
-   cause the same conflicts again so the finish LSN of the failed transaction will
-   be written to the server log. For the usage of finish LSN, please refer to <link
-   linkend="sql-altersubscription"><command>ALTER SUBSCRIPTION ...
-   SKIP</command></link>.
+   failed transactions may not be logged. In that case, the failed transaction
+   will be retried in <literal>streaming = on</literal> mode. If it fails
+   again, the finish LSN of the failed transaction will be written to the
+   server log. For the usage of finish LSN, please refer to
+   <link linkend="sql-altersubscription"><command>ALTER SUBSCRIPTION ... SKIP</command></link>.
   </para>
  </sect1>
 
diff --git a/doc/src/sgml/ref/create_subscription.sgml b/doc/src/sgml/ref/create_subscription.sgml
index 51c45f1..0f45970 100644
--- a/doc/src/sgml/ref/create_subscription.sgml
+++ b/doc/src/sgml/ref/create_subscription.sgml
@@ -251,6 +251,11 @@ CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceabl
           transaction is committed. Note that if an error happens in a
           parallel apply worker, the finish LSN of the remote transaction
           might not be reported in the server log.
+          When applying streaming transactions, if a deadlock is detected, the
+          parallel apply worker will exit with an error. The
+          <literal>parallel</literal> mode is disregarded when retrying;
+          instead the transaction will be applied using <literal>on</literal>
+          mode.
          </para>
         </listitem>
        </varlistentry>
diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c
index a56ae31..0eb5ffb 100644
--- a/src/backend/catalog/pg_subscription.c
+++ b/src/backend/catalog/pg_subscription.c
@@ -71,6 +71,7 @@ GetSubscription(Oid subid, bool missing_ok)
 	sub->stream = subform->substream;
 	sub->twophasestate = subform->subtwophasestate;
 	sub->disableonerr = subform->subdisableonerr;
+	sub->retry = subform->subretry;
 
 	/* Get conninfo */
 	datum = SysCacheGetAttr(SUBSCRIPTIONOID,
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index 8608e3f..5171fea 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -1301,7 +1301,7 @@ REVOKE ALL ON pg_replication_origin_status FROM public;
 REVOKE ALL ON pg_subscription FROM public;
 GRANT SELECT (oid, subdbid, subskiplsn, subname, subowner, subenabled,
               subbinary, substream, subtwophasestate, subdisableonerr,
-              subslotname, subsynccommit, subpublications, suborigin)
+              subretry, subslotname, subsynccommit, subpublications, suborigin)
     ON pg_subscription TO public;
 
 CREATE VIEW pg_stat_subscription_stats AS
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 464db6d..365e9cb 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -636,6 +636,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 					 LOGICALREP_TWOPHASE_STATE_PENDING :
 					 LOGICALREP_TWOPHASE_STATE_DISABLED);
 	values[Anum_pg_subscription_subdisableonerr - 1] = BoolGetDatum(opts.disableonerr);
+	values[Anum_pg_subscription_subretry - 1] = BoolGetDatum(false);
 	values[Anum_pg_subscription_subconninfo - 1] =
 		CStringGetTextDatum(conninfo);
 	if (opts.slot_name)
diff --git a/src/backend/replication/logical/applyparallelworker.c b/src/backend/replication/logical/applyparallelworker.c
index 3e0b994..0fefc73 100644
--- a/src/backend/replication/logical/applyparallelworker.c
+++ b/src/backend/replication/logical/applyparallelworker.c
@@ -312,6 +312,17 @@ pa_can_start(void)
 	if (!AllTablesyncsReady())
 		return false;
 
+	/*
+	 * Don't use parallel apply workers for retries, because it is possible
+	 * that a deadlock was detected the last time we tried to apply a
+	 * transaction using a parallel apply worker.
+	 */
+	if (MySubscription->retry)
+	{
+		elog(DEBUG1, "parallel apply workers are not used for retries");
+		return false;
+	}
+
 	return true;
 }
 
@@ -1675,3 +1686,22 @@ pa_xact_finish(ParallelApplyWorkerInfo *winfo, XLogRecPtr remote_lsn)
 
 	pa_free_worker(winfo);
 }
+
+/* Check if any active parallel apply workers. */
+bool
+pa_have_active_worker(void)
+{
+	ListCell   *lc;
+
+	foreach(lc, ParallelApplyWorkerPool)
+	{
+		ParallelApplyWorkerInfo *tmp_winfo;
+
+		tmp_winfo = (ParallelApplyWorkerInfo *) lfirst(lc);
+
+		if (tmp_winfo->in_use)
+			return true;
+	}
+
+	return false;
+}
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 01572a0..6f53306 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -391,7 +391,7 @@ static void stream_close_file(void);
 
 static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply);
 
-static void DisableSubscriptionAndExit(void);
+static void DisableSubscriptionOnError(void);
 
 static void apply_handle_commit_internal(LogicalRepCommitData *commit_data);
 static void apply_handle_insert_internal(ApplyExecutionData *edata,
@@ -428,6 +428,8 @@ static inline void reset_apply_error_context_info(void);
 static TransApplyAction get_transaction_apply_action(TransactionId xid,
 													 ParallelApplyWorkerInfo **winfo);
 
+static void set_subscription_retry(bool retry);
+
 /*
  * Return the name of the logical replication worker.
  */
@@ -1046,6 +1048,9 @@ apply_handle_commit(StringInfo s)
 	/* Process any tables that are being synchronized in parallel. */
 	process_syncing_tables(commit_data.end_lsn);
 
+	/* Reset the retry flag. */
+	set_subscription_retry(false);
+
 	pgstat_report_activity(STATE_IDLE, NULL);
 	reset_apply_error_context_info();
 }
@@ -1155,6 +1160,9 @@ apply_handle_prepare(StringInfo s)
 
 	in_remote_transaction = false;
 
+	/* Reset the retry flag. */
+	set_subscription_retry(false);
+
 	/* Process any tables that are being synchronized in parallel. */
 	process_syncing_tables(prepare_data.end_lsn);
 
@@ -1211,6 +1219,9 @@ apply_handle_commit_prepared(StringInfo s)
 	store_flush_position(prepare_data.end_lsn, XactLastCommitEnd);
 	in_remote_transaction = false;
 
+	/* Reset the retry flag. */
+	set_subscription_retry(false);
+
 	/* Process any tables that are being synchronized in parallel. */
 	process_syncing_tables(prepare_data.end_lsn);
 
@@ -1272,6 +1283,9 @@ apply_handle_rollback_prepared(StringInfo s)
 	store_flush_position(rollback_data.rollback_end_lsn, XactLastCommitEnd);
 	in_remote_transaction = false;
 
+	/* Reset the retry flag. */
+	set_subscription_retry(false);
+
 	/* Process any tables that are being synchronized in parallel. */
 	process_syncing_tables(rollback_data.rollback_end_lsn);
 
@@ -1397,6 +1411,9 @@ apply_handle_stream_prepare(StringInfo s)
 			break;
 	}
 
+	/* Reset the retry flag. */
+	set_subscription_retry(false);
+
 	pgstat_report_stat(false);
 
 	/* Process any tables that are being synchronized in parallel. */
@@ -1974,6 +1991,10 @@ apply_handle_stream_abort(StringInfo s)
 			break;
 	}
 
+	/* Reset the retry flag. */
+	if (toplevel_xact)
+		set_subscription_retry(false);
+
 	reset_apply_error_context_info();
 }
 
@@ -2246,6 +2267,9 @@ apply_handle_stream_commit(StringInfo s)
 	/* Process any tables that are being synchronized in parallel. */
 	process_syncing_tables(commit_data.end_lsn);
 
+	/* Reset the retry flag. */
+	set_subscription_retry(false);
+
 	pgstat_report_activity(STATE_IDLE, NULL);
 
 	reset_apply_error_context_info();
@@ -4344,20 +4368,28 @@ start_table_sync(XLogRecPtr *origin_startpos, char **myslotname)
 	}
 	PG_CATCH();
 	{
+		/*
+		 * Emit the error message, and recover from the error state to an idle
+		 * state
+		 */
+		HOLD_INTERRUPTS();
+
+		EmitErrorReport();
+		AbortOutOfAnyTransaction();
+		FlushErrorState();
+
+		RESUME_INTERRUPTS();
+
+		/* Report the worker failed during table synchronization */
+		pgstat_report_subscription_error(MySubscription->oid, false);
+
 		if (MySubscription->disableonerr)
-			DisableSubscriptionAndExit();
-		else
-		{
-			/*
-			 * Report the worker failed during table synchronization. Abort
-			 * the current transaction so that the stats message is sent in an
-			 * idle state.
-			 */
-			AbortOutOfAnyTransaction();
-			pgstat_report_subscription_error(MySubscription->oid, false);
+			DisableSubscriptionOnError();
 
-			PG_RE_THROW();
-		}
+		/* Set the retry flag. */
+		set_subscription_retry(true);
+
+		proc_exit(0);
 	}
 	PG_END_TRY();
 
@@ -4382,20 +4414,27 @@ start_apply(XLogRecPtr origin_startpos)
 	}
 	PG_CATCH();
 	{
+		/*
+		 * Emit the error message, and recover from the error state to an idle
+		 * state
+		 */
+		HOLD_INTERRUPTS();
+
+		EmitErrorReport();
+		AbortOutOfAnyTransaction();
+		FlushErrorState();
+
+		RESUME_INTERRUPTS();
+
+		/* Report the worker failed while applying changes */
+		pgstat_report_subscription_error(MySubscription->oid,
+										 !am_tablesync_worker());
+
 		if (MySubscription->disableonerr)
-			DisableSubscriptionAndExit();
-		else
-		{
-			/*
-			 * Report the worker failed while applying changes. Abort the
-			 * current transaction so that the stats message is sent in an
-			 * idle state.
-			 */
-			AbortOutOfAnyTransaction();
-			pgstat_report_subscription_error(MySubscription->oid, !am_tablesync_worker());
+			DisableSubscriptionOnError();
 
-			PG_RE_THROW();
-		}
+		/* Set the retry flag. */
+		set_subscription_retry(true);
 	}
 	PG_END_TRY();
 }
@@ -4674,28 +4713,11 @@ ApplyWorkerMain(Datum main_arg)
 }
 
 /*
- * After error recovery, disable the subscription in a new transaction
- * and exit cleanly.
+ * Disable the subscription in a new transaction.
  */
 static void
-DisableSubscriptionAndExit(void)
+DisableSubscriptionOnError(void)
 {
-	/*
-	 * Emit the error message, and recover from the error state to an idle
-	 * state
-	 */
-	HOLD_INTERRUPTS();
-
-	EmitErrorReport();
-	AbortOutOfAnyTransaction();
-	FlushErrorState();
-
-	RESUME_INTERRUPTS();
-
-	/* Report the worker failed during either table synchronization or apply */
-	pgstat_report_subscription_error(MyLogicalRepWorker->subid,
-									 !am_tablesync_worker());
-
 	/* Disable the subscription */
 	StartTransactionCommand();
 	DisableSubscription(MySubscription->oid);
@@ -4705,12 +4727,10 @@ DisableSubscriptionAndExit(void)
 	if (!am_tablesync_worker() && !am_parallel_apply_worker())
 		ApplyLauncherForgetWorkerStartTime(MyLogicalRepWorker->subid);
 
-	/* Notify the subscription has been disabled and exit */
+	/* Notify the subscription has been disabled */
 	ereport(LOG,
 			errmsg("subscription \"%s\" has been disabled because of an error",
 				   MySubscription->name));
-
-	proc_exit(0);
 }
 
 /*
@@ -5063,3 +5083,71 @@ get_transaction_apply_action(TransactionId xid, ParallelApplyWorkerInfo **winfo)
 		return TRANS_LEADER_APPLY;
 	}
 }
+
+/*
+ * Set subretry of pg_subscription catalog.
+ *
+ * If retry is true, subscriber is about to exit with an error. Otherwise, it
+ * means that the transaction was applied successfully.
+ */
+static void
+set_subscription_retry(bool retry)
+{
+	Relation	rel;
+	HeapTuple	tup;
+	bool		started_tx = false;
+	bool		nulls[Natts_pg_subscription];
+	bool		replaces[Natts_pg_subscription];
+	Datum		values[Natts_pg_subscription];
+
+	/* Fast path - if no state change then nothing to do */
+	if (MySubscription->retry == retry)
+		return;
+
+	/* Fast path - skip for parallel apply workers */
+	if (am_parallel_apply_worker())
+		return;
+
+	/* Fast path - skip set retry if no active parallel apply workers */
+	if (retry && !pa_have_active_worker())
+		return;
+
+	if (!IsTransactionState())
+	{
+		StartTransactionCommand();
+		started_tx = true;
+	}
+
+	/* Look up the subscription in the catalog */
+	rel = table_open(SubscriptionRelationId, RowExclusiveLock);
+	tup = SearchSysCacheCopy1(SUBSCRIPTIONOID,
+							  ObjectIdGetDatum(MySubscription->oid));
+
+	if (!HeapTupleIsValid(tup))
+		elog(ERROR, "subscription \"%s\" does not exist", MySubscription->name);
+
+	LockSharedObject(SubscriptionRelationId, MySubscription->oid, 0,
+					 AccessShareLock);
+
+	/* Form a new tuple. */
+	memset(values, 0, sizeof(values));
+	memset(nulls, false, sizeof(nulls));
+	memset(replaces, false, sizeof(replaces));
+
+	/* Set subretry */
+	values[Anum_pg_subscription_subretry - 1] = BoolGetDatum(retry);
+	replaces[Anum_pg_subscription_subretry - 1] = true;
+
+	tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
+							replaces);
+
+	/* Update the catalog. */
+	CatalogTupleUpdate(rel, &tup->t_self, tup);
+
+	/* Cleanup. */
+	heap_freetuple(tup);
+	table_close(rel, NoLock);
+
+	if (started_tx)
+		CommitTransactionCommand();
+}
diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c
index 527c765..2f6ec5d 100644
--- a/src/bin/pg_dump/pg_dump.c
+++ b/src/bin/pg_dump/pg_dump.c
@@ -4560,8 +4560,9 @@ getSubscriptions(Archive *fout)
 	ntups = PQntuples(res);
 
 	/*
-	 * Get subscription fields. We don't include subskiplsn in the dump as
-	 * after restoring the dump this value may no longer be relevant.
+	 * Get subscription fields. We don't include subskiplsn and subretry in
+	 * the dump as after restoring the dump this value may no longer be
+	 * relevant.
 	 */
 	i_tableoid = PQfnumber(res, "tableoid");
 	i_oid = PQfnumber(res, "oid");
diff --git a/src/include/catalog/pg_subscription.h b/src/include/catalog/pg_subscription.h
index b0f2a17..8edbbca 100644
--- a/src/include/catalog/pg_subscription.h
+++ b/src/include/catalog/pg_subscription.h
@@ -88,6 +88,11 @@ CATALOG(pg_subscription,6100,SubscriptionRelationId) BKI_SHARED_RELATION BKI_ROW
 	bool		subdisableonerr;	/* True if a worker error should cause the
 									 * subscription to be disabled */
 
+	bool		subretry BKI_DEFAULT(f);	/* True if previous change failed
+											 * to be applied while there were
+											 * any active parallel apply
+											 * workers */
+
 #ifdef CATALOG_VARLEN			/* variable-length fields start here */
 	/* Connection string to the publisher */
 	text		subconninfo BKI_FORCE_NOT_NULL;
@@ -131,6 +136,8 @@ typedef struct Subscription
 	bool		disableonerr;	/* Indicates if the subscription should be
 								 * automatically disabled if a worker error
 								 * occurs */
+	bool		retry;			/* Indicates if previous change failed to be
+								 * applied using a parallel apply worker */
 	char	   *conninfo;		/* Connection string to the publisher */
 	char	   *slotname;		/* Name of the replication slot */
 	char	   *synccommit;		/* Synchronous commit setting for worker */
diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h
index 34e5006..9a60578 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -304,6 +304,8 @@ extern void pa_decr_and_wait_stream_block(void);
 extern void pa_xact_finish(ParallelApplyWorkerInfo *winfo,
 						   XLogRecPtr remote_lsn);
 
+extern bool pa_have_active_worker(void);
+
 #define isParallelApplyWorker(worker) ((worker)->leader_pid != InvalidPid)
 
 static inline bool
-- 
2.7.2.windows.1

