On Tue, Jul 5, 2022, at 5:41 AM, Peter Smith wrote:
> Here are some review comments for your v4-0001 patch. I hope they are
> useful for you.
Thanks for your review.

> This thread name "logical replication restrictions" seems quite
> unrelated to the patch here. Maybe it's better to start a new thread
> otherwise nobody is going to recognise what this thread is really
> about.
I agree that the $SUBJECT does not describe the proposal. I decided that it is
not worth creating a thread because (i) there are some interaction and they
could be monitoring this thread and (ii) the CF entry has the correct
description.

> Similar to physical replication, a time-delayed copy of the data for
> logical replication is useful for some scenarios (specially to fix
> errors that might cause data loss).
I changed the commit message a bit. 

> Maybe take some examples from the regression tests to show usage of
> the new parameter
I don't think an example is really useful in a commit message. If you are
checking this commit, it is a matter of reading the regression tests or
documentation to obtain an example of how to use it.

> I think this should say that the units are ms.
Unit included.

> Is the "integer" type here correct? It might eventually be stored as
> an integer, but IIUC (going by the tests) from the user point-of-view
> this parameter is really "text" type for representing ms or interval,
> right?
The internal representation is integer. The unit is correct. If you use units,
the format is text that what the section [1] calls "Numeric with Unit".  Even
if the user is unsure about its usage, an example might help here.

> SUGGESTION
> As with the physical replication feature (recovery_min_apply_delay),
> it can be useful for logical replication to delay the data
> replication.
It is not "data replication", it is applying changes. I reworded that sentence.

> SUGGESTION
> Time spent in logical decoding and in transferring the transaction may
> reduce the actual wait time.
Changed.

> If the system clocks on publisher and subscriber are not
> +          synchronized, this may lead to apply changes earlier than expected.
> 
> Why just say "earlier than expected"? If the publisher's time is ahead
> of the subscriber then the changes might also be *later* than
> expected, right? So, perhaps it is better to just say "other than
> expected".
This sentence is similar to another one in the recovery_min_apply_delay. I want
to emphasize the fact that even if you use a 30-minute delay, it might apply a
change that happened 29 minutes 55 seconds ago. The main reason for this
feature is to avoid modifying changes *earlier*. If it applies the change 30
minutes 5 seconds, it is fine.

> Should there also be a big warning box about the impact if using
> synchronous_commit (like the other streaming replication page has this
> warning)?
Impact? Could you elaborate?

> I think there should be some examples somewhere showing how to specify
> this parameter. Maybe they are better added somewhere in "31.2
> Subscription" and xrefed from here.
I added one example in the CREATE SUBSCRIPTION. We can add an example in the
section 31.2, however, since it is a new chapter I think it lacks examples for
the other options too (streaming, two_phase, copy_data, ...). It could be
submitted as a separate patch IMO.

> I think there should be a default assignment to 0 (done where all the
> other supported option defaults are set)
It could for completeness. the memset() takes care of it. Anyway, I added it to
the beginning of the parse_subscription_options().

> + if (opts->min_apply_delay < 0)
> + ereport(ERROR,
> + errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
> + errmsg("option \"%s\" must not be negative", "min_apply_delay"));
> +
> 
> I thought this check only needs to be do within scope of the preceding
> if - (IsSet(supported_opts, SUBOPT_MIN_APPLY_DELAY) &&
> strcmp(defel->defname, "min_apply_delay") == 0)
Fixed.

> + /*
> + * If this subscription has been disabled and it has an apply
> + * delay set, wake up the logical replication worker to finish
> + * it as soon as possible.
> + */
> + if (!opts.enabled && sub->applydelay > 0)
> 
> I did not really understand the logic why should the min_apply_delay
> override the enabled=false? It is a called *minimum* delay so if it
> ends up being way over the parameter value (because the subscription
> is disabled) then why does that matter?
It doesn't. The main point of this code (as I tried to explain in the comment)
is to kill the worker as soon as possible if you disable the subscription.
Isn't the comment clear?

> Subscription *MySubscription = NULL;
> static bool MySubscriptionValid = false;
> +TimestampTz MySubscriptionMinApplyDelayUntil = 0;
> 
> Looking at the only usage of this variable (in apply_delay) and how it
> is used I did see why this cannot just be a local member of the
> apply_delay function?
Good catch. A previous patch used that variable outside that function scope.

> +/*
> + * Apply the informed delay for the transaction.
> + *
> + * A regular transaction uses the commit time to calculate the delay. A
> + * prepared transaction uses the prepare time to calculate the delay.
> + */
> +static void
> +apply_delay(TimestampTz ts)
> 
> I didn't think it needs to mention here about the different kinds of
> transactions because where it comes from has nothing really to do with
> this function's logic.
Fixed.

> Refer to comment #14 about MySubscriptionMinApplyDelayUntil.
Fixed.

> It seems to rely on the spooling happening at the end. But won't this
> cause a problem later when/if the "parallel apply" patch [1] is pushed
> and the stream bgworkers are doing stuff on the fly instead of
> spooling at the end?
> 
> Or are you expecting that the "parallel apply" feature should be
> disabled if there is any min_apply_delay parameter specified?
I didn't read the "parallel apply" patch yet.

> Let's keep the alphabetical order of the parameters in COMPLETE_WITH, as per 
> [2]
Fixed.

> + int64 subapplydelay; /* Replication apply delay */
> +
> 
> IMO the comment should mention the units "(ms)"
I'm not sure. It should be documented in the catalogs. It is an important
information for user-visible interface. There are a few places in the
documentation that the unit is mentioned.

> There are some test cases for CREATE SUBSCRIPTION but there are no
> test cases for ALTER SUBSCRIPTION changing this new parameter.
I added a test to cover ALTER SUBSCRIPTION and also for the disabling a
subscription that contains a delay set.

> I received the following error when trying to run these 'subscription' tests:
Fixed.


--
Euler Taveira
EDB   https://www.enterprisedb.com/
From 7a79e6b640826c5602805d2ff27ed226bdb1c940 Mon Sep 17 00:00:00 2001
From: Euler Taveira <euler.tave...@enterprisedb.com>
Date: Sat, 6 Nov 2021 11:31:10 -0300
Subject: [PATCH v5] Time-delayed logical replication subscriber

Similar to physical replication, a time-delayed copy of the data for
logical replication is useful for some scenarios (particularly to fix
errors that might cause data loss).

If the subscriber sets min_apply_delay parameter, the logical
replication worker will delay the transaction commit for min_apply_delay
milliseconds.

The delay is calculated between the WAL time stamp and the current time
on the subscriber.

The delay occurs only on WAL records for transaction begins. The main
reason is to avoid keeping a transaction open for a long time. Regular
and prepared transactions are covered. Streamed transactions are also
covered.

Author: Euler Taveira
Discussion: https://postgr.es/m/CAB-JLwYOYwL=xtyaxkih5ctm_vm8kjkh7aaitckvmch4rzr...@mail.gmail.com
---
 doc/src/sgml/catalogs.sgml                 |  10 ++
 doc/src/sgml/ref/alter_subscription.sgml   |   5 +-
 doc/src/sgml/ref/create_subscription.sgml  |  43 +++++-
 src/backend/catalog/pg_subscription.c      |   1 +
 src/backend/catalog/system_views.sql       |   7 +-
 src/backend/commands/subscriptioncmds.c    |  48 +++++-
 src/backend/replication/logical/worker.c   |  85 +++++++++++
 src/backend/utils/adt/timestamp.c          |  32 ++++
 src/bin/pg_dump/pg_dump.c                  |  17 ++-
 src/bin/pg_dump/pg_dump.h                  |   1 +
 src/bin/psql/describe.c                    |   9 +-
 src/bin/psql/tab-complete.c                |   4 +-
 src/include/catalog/pg_subscription.h      |   3 +
 src/include/datatype/timestamp.h           |   2 +
 src/include/utils/timestamp.h              |   2 +
 src/test/regress/expected/subscription.out | 165 ++++++++++++---------
 src/test/regress/sql/subscription.sql      |  20 +++
 src/test/subscription/t/032_apply_delay.pl | 130 ++++++++++++++++
 18 files changed, 501 insertions(+), 83 deletions(-)
 create mode 100644 src/test/subscription/t/032_apply_delay.pl

diff --git a/doc/src/sgml/catalogs.sgml b/doc/src/sgml/catalogs.sgml
index cd2cc37aeb..efc745a9ea 100644
--- a/doc/src/sgml/catalogs.sgml
+++ b/doc/src/sgml/catalogs.sgml
@@ -7833,6 +7833,16 @@ SCRAM-SHA-256$<replaceable>&lt;iteration count&gt;</replaceable>:<replaceable>&l
       </para></entry>
      </row>
 
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>subapplydelay</structfield> <type>int8</type>
+      </para>
+      <para>
+       Delay the application of changes by a specified amount of time. The
+       unit is in milliseconds.
+      </para></entry>
+     </row>
+
      <row>
       <entry role="catalog_table_entry"><para role="column_definition">
        <structfield>subname</structfield> <type>name</type>
diff --git a/doc/src/sgml/ref/alter_subscription.sgml b/doc/src/sgml/ref/alter_subscription.sgml
index 64efc21f53..8901e1361c 100644
--- a/doc/src/sgml/ref/alter_subscription.sgml
+++ b/doc/src/sgml/ref/alter_subscription.sgml
@@ -208,8 +208,9 @@ ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> RENAME TO <
       are <literal>slot_name</literal>,
       <literal>synchronous_commit</literal>,
       <literal>binary</literal>, <literal>streaming</literal>,
-      <literal>disable_on_error</literal>, and
-      <literal>origin</literal>.
+      <literal>disable_on_error</literal>,
+      <literal>origin</literal>, and
+      <literal>min_apply_delay</literal>.
      </para>
     </listitem>
    </varlistentry>
diff --git a/doc/src/sgml/ref/create_subscription.sgml b/doc/src/sgml/ref/create_subscription.sgml
index 7390c715bc..1fc8e7474a 100644
--- a/doc/src/sgml/ref/create_subscription.sgml
+++ b/doc/src/sgml/ref/create_subscription.sgml
@@ -317,7 +317,36 @@ CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceabl
          </para>
         </listitem>
        </varlistentry>
-      </variablelist></para>
+
+       <varlistentry>
+        <term><literal>min_apply_delay</literal> (<type>integer</type>)</term>
+        <listitem>
+         <para>
+          By default, subscriber applies changes as soon as possible. As with
+          the physical replication feature
+          (<xref linkend="guc-recovery-min-apply-delay"/>), it can be useful to
+          have a time-delayed logical replica. This parameter allows you to
+          delay the application of changes by a specified amount of time. If
+          this value is specified without units, it is taken as seconds. The
+          default is zero, adding no delay.
+         </para>
+         <para>
+          The delay occurs only on WAL records for transaction begins and after
+          the initial table synchronization. It is possible that the
+          replication delay between publisher and subscriber exceeds the value
+          of this parameter, in which case no delay is added. Note that the
+          delay is calculated between the WAL time stamp as written on
+          publisher and the current time on the subscriber. Time spent in logical
+          decoding and in transfering the transaction may reduce the actual wait
+          time. If the system clocks on publisher and subscriber are not
+          synchronized, this may lead to apply changes earlier than expected.
+          This is not a major issue because a typical setting of this parameter
+          are much larger than typical time deviations between servers.
+         </para>
+        </listitem>
+       </varlistentry>
+      </variablelist>
+     </para>
 
     </listitem>
    </varlistentry>
@@ -413,6 +442,18 @@ CREATE SUBSCRIPTION mysub
         PUBLICATION insert_only
                WITH (enabled = false);
 </programlisting></para>
+
+  <para>
+   Create a subscription to a remote server that replicates tables in
+   the <literal>baz</literal> publication and starts replicating immediately on
+   commit. Pre-existing data is not copied. The application of changes is
+   delayed by 4 hours.
+<programlisting>
+CREATE SUBSCRIPTION foo
+         CONNECTION 'host=192.168.1.50 port=5432 user=foo dbname=foodb'
+        PUBLICATION baz
+               WITH (copy_data = false, min_apply_delay = '4h');
+</programlisting></para>
  </refsect1>
 
  <refsect1>
diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c
index c7d2537fb5..e1ea3ccece 100644
--- a/src/backend/catalog/pg_subscription.c
+++ b/src/backend/catalog/pg_subscription.c
@@ -64,6 +64,7 @@ GetSubscription(Oid subid, bool missing_ok)
 	sub->oid = subid;
 	sub->dbid = subform->subdbid;
 	sub->skiplsn = subform->subskiplsn;
+	sub->applydelay = subform->subapplydelay;
 	sub->name = pstrdup(NameStr(subform->subname));
 	sub->owner = subform->subowner;
 	sub->enabled = subform->subenabled;
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index f369b1fc14..50175323b9 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -1297,9 +1297,10 @@ REVOKE ALL ON pg_replication_origin_status FROM public;
 
 -- All columns of pg_subscription except subconninfo are publicly readable.
 REVOKE ALL ON pg_subscription FROM public;
-GRANT SELECT (oid, subdbid, subskiplsn, subname, subowner, subenabled,
-              subbinary, substream, subtwophasestate, subdisableonerr,
-              subslotname, subsynccommit, subpublications, suborigin)
+GRANT SELECT (oid, subdbid, subskiplsn, subapplydelay, subname, subowner,
+              subenabled, subbinary, substream, subtwophasestate,
+              subdisableonerr, subslotname, subsynccommit, subpublications,
+              suborigin)
     ON pg_subscription TO public;
 
 CREATE VIEW pg_stat_subscription_stats AS
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index f73dfb6067..aaeda7e8a9 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -47,6 +47,7 @@
 #include "utils/memutils.h"
 #include "utils/pg_lsn.h"
 #include "utils/syscache.h"
+#include "utils/timestamp.h"
 
 /*
  * Options that can be specified by the user in CREATE/ALTER SUBSCRIPTION
@@ -65,6 +66,7 @@
 #define SUBOPT_DISABLE_ON_ERR		0x00000400
 #define SUBOPT_LSN					0x00000800
 #define SUBOPT_ORIGIN				0x00001000
+#define SUBOPT_MIN_APPLY_DELAY		0x00002000
 
 /* check if the 'val' has 'bits' set */
 #define IsSet(val, bits)  (((val) & (bits)) == (bits))
@@ -89,6 +91,7 @@ typedef struct SubOpts
 	bool		disableonerr;
 	char	   *origin;
 	XLogRecPtr	lsn;
+	int64		min_apply_delay;
 } SubOpts;
 
 static List *fetch_table_list(WalReceiverConn *wrconn, List *publications);
@@ -141,6 +144,8 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
 		opts->disableonerr = false;
 	if (IsSet(supported_opts, SUBOPT_ORIGIN))
 		opts->origin = pstrdup(LOGICALREP_ORIGIN_ANY);
+	if (IsSet(supported_opts, SUBOPT_MIN_APPLY_DELAY))
+		opts->min_apply_delay = 0;
 
 	/* Parse options */
 	foreach(lc, stmt_options)
@@ -319,12 +324,35 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
 			opts->specified_opts |= SUBOPT_LSN;
 			opts->lsn = lsn;
 		}
+		else if (IsSet(supported_opts, SUBOPT_MIN_APPLY_DELAY) &&
+				 strcmp(defel->defname, "min_apply_delay") == 0)
+		{
+			char	   *val;
+			Interval   *interval;
+
+			if (IsSet(opts->specified_opts, SUBOPT_MIN_APPLY_DELAY))
+				errorConflictingDefElem(defel, pstate);
+
+			opts->specified_opts |= SUBOPT_MIN_APPLY_DELAY;
+			val = defGetString(defel);
+
+			interval = DatumGetIntervalP(DirectFunctionCall3(interval_in,
+																	CStringGetDatum(val),
+																	ObjectIdGetDatum(InvalidOid),
+																	Int32GetDatum(-1)));
+			opts->min_apply_delay = interval_to_ms(interval);
+		}
 		else
 			ereport(ERROR,
 					(errcode(ERRCODE_SYNTAX_ERROR),
 					 errmsg("unrecognized subscription parameter: \"%s\"", defel->defname)));
 	}
 
+	if (opts->min_apply_delay < 0 && IsSet(supported_opts, SUBOPT_MIN_APPLY_DELAY))
+		ereport(ERROR,
+				errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
+				errmsg("option \"%s\" must not be negative", "min_apply_delay"));
+
 	/*
 	 * We've been explicitly asked to not connect, that requires some
 	 * additional processing.
@@ -557,7 +585,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 					  SUBOPT_SLOT_NAME | SUBOPT_COPY_DATA |
 					  SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY |
 					  SUBOPT_STREAMING | SUBOPT_TWOPHASE_COMMIT |
-					  SUBOPT_DISABLE_ON_ERR | SUBOPT_ORIGIN);
+					  SUBOPT_DISABLE_ON_ERR | SUBOPT_ORIGIN |
+					  SUBOPT_MIN_APPLY_DELAY);
 	parse_subscription_options(pstate, stmt->options, supported_opts, &opts);
 
 	/*
@@ -622,6 +651,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 	values[Anum_pg_subscription_oid - 1] = ObjectIdGetDatum(subid);
 	values[Anum_pg_subscription_subdbid - 1] = ObjectIdGetDatum(MyDatabaseId);
 	values[Anum_pg_subscription_subskiplsn - 1] = LSNGetDatum(InvalidXLogRecPtr);
+	values[Anum_pg_subscription_subapplydelay - 1] = Int64GetDatum(opts.min_apply_delay);
 	values[Anum_pg_subscription_subname - 1] =
 		DirectFunctionCall1(namein, CStringGetDatum(stmt->subname));
 	values[Anum_pg_subscription_subowner - 1] = ObjectIdGetDatum(owner);
@@ -1044,7 +1074,7 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 				supported_opts = (SUBOPT_SLOT_NAME |
 								  SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY |
 								  SUBOPT_STREAMING | SUBOPT_DISABLE_ON_ERR |
-								  SUBOPT_ORIGIN);
+								  SUBOPT_ORIGIN | SUBOPT_MIN_APPLY_DELAY);
 
 				parse_subscription_options(pstate, stmt->options,
 										   supported_opts, &opts);
@@ -1100,6 +1130,12 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 					replaces[Anum_pg_subscription_subdisableonerr - 1]
 						= true;
 				}
+				if (IsSet(opts.specified_opts, SUBOPT_MIN_APPLY_DELAY))
+				{
+					values[Anum_pg_subscription_subapplydelay - 1] =
+						Int64GetDatum(opts.min_apply_delay);
+					replaces[Anum_pg_subscription_subapplydelay - 1] = true;
+				}
 
 				if (IsSet(opts.specified_opts, SUBOPT_ORIGIN))
 				{
@@ -1130,6 +1166,14 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 				if (opts.enabled)
 					ApplyLauncherWakeupAtCommit();
 
+				/*
+				 * If this subscription has been disabled and it has an apply
+				 * delay set, wake up the logical replication worker to finish
+				 * it as soon as possible.
+				 */
+				if (!opts.enabled && sub->applydelay > 0)
+					logicalrep_worker_wakeup(sub->oid, InvalidOid);
+
 				update_tuple = true;
 				break;
 			}
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 5f8c541763..5fa09a867b 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -324,6 +324,8 @@ static void maybe_reread_subscription(void);
 
 static void DisableSubscriptionAndExit(void);
 
+static void apply_delay(TimestampTz ts);
+
 /* prototype needed because of stream_commit */
 static void apply_dispatch(StringInfo s);
 
@@ -803,6 +805,57 @@ slot_modify_data(TupleTableSlot *slot, TupleTableSlot *srcslot,
 	ExecStoreVirtualTuple(slot);
 }
 
+/*
+ * When min_apply_delay parameter is set on subscriber, we wait long enough to
+ * make sure a transaction is applied at least that interval behind the
+ * publisher.
+ *
+ * It applies the delay for the next transaction but before starting the
+ * transaction. The main reason for this design is to avoid a long-running
+ * transaction (which can cause some operational challenges) if the user sets a
+ * high value for the delay. This design is different from the physical
+ * replication (that applies the delay at commit time) mainly because write
+ * operations may allow some issues (such as bloat and locks) that can be
+ * minimized if it does not keep the transaction open for such a long time.
+ */
+static void
+apply_delay(TimestampTz ts)
+{
+	TimestampTz	delay_until = 0;
+
+	/* nothing to do if no delay set */
+	if (MySubscription->applydelay <= 0)
+		return;
+
+	/* set apply delay */
+	delay_until = TimestampTzPlusMilliseconds(TimestampTzGetDatum(ts),
+												MySubscription->applydelay);
+
+	while (true)
+	{
+		long		diffms;
+
+		diffms = TimestampDifferenceMilliseconds(GetCurrentTimestamp(), delay_until);
+
+		/*
+		 * Exit without arming the latch if it's already past time to apply
+		 * this transaction.
+		 */
+		if (diffms <= 0)
+			break;
+
+		elog(DEBUG2, "logical replication apply delay: %ld ms", diffms);
+
+		WaitLatch(MyLatch,
+				  WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
+				  diffms,
+				  WAIT_EVENT_RECOVERY_APPLY_DELAY);
+		ResetLatch(MyLatch);
+
+		CHECK_FOR_INTERRUPTS();
+	}
+}
+
 /*
  * Handle BEGIN message.
  */
@@ -814,6 +867,9 @@ apply_handle_begin(StringInfo s)
 	logicalrep_read_begin(s, &begin_data);
 	set_apply_error_context_xact(begin_data.xid, begin_data.final_lsn);
 
+	/* Should we delay the current transaction? */
+	apply_delay(begin_data.committime);
+
 	remote_final_lsn = begin_data.final_lsn;
 
 	maybe_start_skipping_changes(begin_data.final_lsn);
@@ -868,6 +924,9 @@ apply_handle_begin_prepare(StringInfo s)
 	logicalrep_read_begin_prepare(s, &begin_data);
 	set_apply_error_context_xact(begin_data.xid, begin_data.prepare_lsn);
 
+	/* Should we delay the current prepared transaction? */
+	apply_delay(begin_data.prepare_time);
+
 	remote_final_lsn = begin_data.prepare_lsn;
 
 	maybe_start_skipping_changes(begin_data.prepare_lsn);
@@ -1090,6 +1149,19 @@ apply_handle_stream_prepare(StringInfo s)
 
 	elog(DEBUG1, "received prepare for streamed transaction %u", prepare_data.xid);
 
+	/*
+	 * Should we delay the current prepared transaction?
+	 *
+	 * Although the delay is applied in BEGIN PREPARE messages, streamed
+	 * prepared transactions apply the delay in a STREAM PREPARE message.
+	 * That's ok because no changes have been applied yet
+	 * (apply_spooled_messages() will do it).
+	 * The STREAM START message does not contain a prepare time (it will be
+	 * available when the in-progress prepared transaction finishes), hence, it
+	 * was not possible to apply a delay at that time.
+	 */
+	apply_delay(prepare_data.prepare_time);
+
 	/* Replay all the spooled operations. */
 	apply_spooled_messages(prepare_data.xid, prepare_data.prepare_lsn);
 
@@ -1481,6 +1553,19 @@ apply_handle_stream_commit(StringInfo s)
 
 	elog(DEBUG1, "received commit for streamed transaction %u", xid);
 
+	/*
+	 * Should we delay the current transaction?
+	 *
+	 * Although the delay is applied in BEGIN messages, streamed transactions
+	 * apply the delay in a STREAM COMMIT message. That's ok because no changes
+	 * have been applied yet (apply_spooled_messages() will do it).
+	 * The STREAM START message would be a natural choice for this delay but
+	 * there is no commit time yet (it will be available when the in-progress
+	 * transaction finishes), hence, it was not possible to apply a delay at
+	 * that time.
+	 */
+	apply_delay(commit_data.committime);
+
 	apply_spooled_messages(xid, commit_data.commit_lsn);
 
 	apply_handle_commit_internal(&commit_data);
diff --git a/src/backend/utils/adt/timestamp.c b/src/backend/utils/adt/timestamp.c
index 49cdb290ac..89f57f7c33 100644
--- a/src/backend/utils/adt/timestamp.c
+++ b/src/backend/utils/adt/timestamp.c
@@ -2411,6 +2411,38 @@ interval_cmp_internal(const Interval *interval1, const Interval *interval2)
 	return int128_compare(span1, span2);
 }
 
+/*
+ * Given an Interval returns the number of milliseconds.
+ */
+int64
+interval_to_ms(const Interval *interval)
+{
+	int64			days;
+	int64			ms;
+	int64			result;
+
+	days = interval->month * INT64CONST(30);
+	days += interval->day;
+
+	/*
+	 * The following operations use these special functions to detect overflow.
+	 * Number of ms per informed days.
+	 */
+	if (pg_mul_s64_overflow(days, MSECS_PER_DAY, &result))
+		ereport(ERROR,
+				(errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
+				 errmsg("bigint out of range")));
+
+	/* adds portion time (in ms) to the previous result. */
+	ms = interval->time / INT64CONST(1000);
+	if (pg_add_s64_overflow(result, ms, &result))
+		ereport(ERROR,
+				(errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
+				 errmsg("bigint out of range")));
+
+	return result;
+}
+
 Datum
 interval_eq(PG_FUNCTION_ARGS)
 {
diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c
index da6605175a..5ef61f3de1 100644
--- a/src/bin/pg_dump/pg_dump.c
+++ b/src/bin/pg_dump/pg_dump.c
@@ -4441,6 +4441,7 @@ getSubscriptions(Archive *fout)
 	int			i_subsynccommit;
 	int			i_subpublications;
 	int			i_subbinary;
+	int			i_subapplydelay;
 	int			i,
 				ntups;
 
@@ -4493,9 +4494,15 @@ getSubscriptions(Archive *fout)
 						  LOGICALREP_TWOPHASE_STATE_DISABLED);
 
 	if (fout->remoteVersion >= 160000)
-		appendPQExpBufferStr(query, " s.suborigin\n");
+	{
+		appendPQExpBufferStr(query, " s.suborigin,\n");
+		appendPQExpBufferStr(query, " s.subapplydelay\n");
+	}
 	else
-		appendPQExpBuffer(query, " '%s' AS suborigin\n", LOGICALREP_ORIGIN_ANY);
+	{
+		appendPQExpBuffer(query, " '%s' AS suborigin,\n", LOGICALREP_ORIGIN_ANY);
+		appendPQExpBufferStr(query, " 0 AS subapplydelay\n");
+	}
 
 	appendPQExpBufferStr(query,
 						 "FROM pg_subscription s\n"
@@ -4523,6 +4530,7 @@ getSubscriptions(Archive *fout)
 	i_subtwophasestate = PQfnumber(res, "subtwophasestate");
 	i_subdisableonerr = PQfnumber(res, "subdisableonerr");
 	i_suborigin = PQfnumber(res, "suborigin");
+	i_subapplydelay = PQfnumber(res, "subapplydelay");
 
 	subinfo = pg_malloc(ntups * sizeof(SubscriptionInfo));
 
@@ -4553,6 +4561,8 @@ getSubscriptions(Archive *fout)
 		subinfo[i].subdisableonerr =
 			pg_strdup(PQgetvalue(res, i, i_subdisableonerr));
 		subinfo[i].suborigin = pg_strdup(PQgetvalue(res, i, i_suborigin));
+		subinfo[i].subapplydelay =
+			strtoi64(PQgetvalue(res, i, i_subapplydelay), NULL, 10);
 
 		/* Decide whether we want to dump it */
 		selectDumpableObject(&(subinfo[i].dobj), fout);
@@ -4632,6 +4642,9 @@ dumpSubscription(Archive *fout, const SubscriptionInfo *subinfo)
 	if (strcmp(subinfo->subsynccommit, "off") != 0)
 		appendPQExpBuffer(query, ", synchronous_commit = %s", fmtId(subinfo->subsynccommit));
 
+	if (subinfo->subapplydelay > 0)
+		appendPQExpBuffer(query, ", min_apply_delay = '" INT64_FORMAT " ms'", subinfo->subapplydelay);
+
 	appendPQExpBufferStr(query, ");\n");
 
 	if (subinfo->dobj.dump & DUMP_COMPONENT_DEFINITION)
diff --git a/src/bin/pg_dump/pg_dump.h b/src/bin/pg_dump/pg_dump.h
index 69ee939d44..91b73e10d2 100644
--- a/src/bin/pg_dump/pg_dump.h
+++ b/src/bin/pg_dump/pg_dump.h
@@ -661,6 +661,7 @@ typedef struct _SubscriptionInfo
 	char	   *subdisableonerr;
 	char	   *suborigin;
 	char	   *subsynccommit;
+	int64		subapplydelay;
 	char	   *subpublications;
 } SubscriptionInfo;
 
diff --git a/src/bin/psql/describe.c b/src/bin/psql/describe.c
index 327a69487b..0be1d44e81 100644
--- a/src/bin/psql/describe.c
+++ b/src/bin/psql/describe.c
@@ -6469,7 +6469,7 @@ describeSubscriptions(const char *pattern, bool verbose)
 	PGresult   *res;
 	printQueryOpt myopt = pset.popt;
 	static const bool translate_columns[] = {false, false, false, false,
-	false, false, false, false, false, false, false, false};
+	false, false, false, false, false, false, false, false, false};
 
 	if (pset.sversion < 100000)
 	{
@@ -6511,10 +6511,13 @@ describeSubscriptions(const char *pattern, bool verbose)
 							  gettext_noop("Two-phase commit"),
 							  gettext_noop("Disable on error"));
 
+		/* origin and min_apply_delay are only supported in v16 and higher */
 		if (pset.sversion >= 160000)
 			appendPQExpBuffer(&buf,
-							  ", suborigin AS \"%s\"\n",
-							  gettext_noop("Origin"));
+							  ", suborigin AS \"%s\"\n"
+							  ", subapplydelay AS \"%s\"\n",
+							  gettext_noop("Origin"),
+							  gettext_noop("Apply delay"));
 
 		appendPQExpBuffer(&buf,
 						  ",  subsynccommit AS \"%s\"\n"
diff --git a/src/bin/psql/tab-complete.c b/src/bin/psql/tab-complete.c
index f265e043e9..42070f7240 100644
--- a/src/bin/psql/tab-complete.c
+++ b/src/bin/psql/tab-complete.c
@@ -1873,7 +1873,7 @@ psql_completion(const char *text, int start, int end)
 		COMPLETE_WITH("(", "PUBLICATION");
 	/* ALTER SUBSCRIPTION <name> SET ( */
 	else if (HeadMatches("ALTER", "SUBSCRIPTION", MatchAny) && TailMatches("SET", "("))
-		COMPLETE_WITH("binary", "disable_on_error", "origin", "slot_name",
+		COMPLETE_WITH("binary", "disable_on_error", "origin", "min_apply_delay", "slot_name",
 					  "streaming", "synchronous_commit");
 	/* ALTER SUBSCRIPTION <name> SKIP ( */
 	else if (HeadMatches("ALTER", "SUBSCRIPTION", MatchAny) && TailMatches("SKIP", "("))
@@ -3153,7 +3153,7 @@ psql_completion(const char *text, int start, int end)
 	/* Complete "CREATE SUBSCRIPTION <name> ...  WITH ( <opt>" */
 	else if (HeadMatches("CREATE", "SUBSCRIPTION") && TailMatches("WITH", "("))
 		COMPLETE_WITH("binary", "connect", "copy_data", "create_slot",
-					  "disable_on_error", "enabled", "origin", "slot_name",
+					  "disable_on_error", "enabled", "origin", "min_apply_delay", "slot_name",
 					  "streaming", "synchronous_commit", "two_phase");
 
 /* CREATE TRIGGER --- is allowed inside CREATE SCHEMA, so use TailMatches */
diff --git a/src/include/catalog/pg_subscription.h b/src/include/catalog/pg_subscription.h
index c9a3026b28..3221072fe8 100644
--- a/src/include/catalog/pg_subscription.h
+++ b/src/include/catalog/pg_subscription.h
@@ -70,6 +70,8 @@ CATALOG(pg_subscription,6100,SubscriptionRelationId) BKI_SHARED_RELATION BKI_ROW
 	XLogRecPtr	subskiplsn;		/* All changes finished at this LSN are
 								 * skipped */
 
+	int64		subapplydelay;	/* Replication apply delay */
+
 	NameData	subname;		/* Name of the subscription */
 
 	Oid			subowner BKI_LOOKUP(pg_authid); /* Owner of the subscription */
@@ -119,6 +121,7 @@ typedef struct Subscription
 								 * in */
 	XLogRecPtr	skiplsn;		/* All changes finished at this LSN are
 								 * skipped */
+	int64		applydelay;		/* Replication apply delay */
 	char	   *name;			/* Name of the subscription */
 	Oid			owner;			/* Oid of the subscription owner */
 	bool		enabled;		/* Indicates if the subscription is enabled */
diff --git a/src/include/datatype/timestamp.h b/src/include/datatype/timestamp.h
index d155f1b03b..d5bbfad1c4 100644
--- a/src/include/datatype/timestamp.h
+++ b/src/include/datatype/timestamp.h
@@ -127,6 +127,8 @@ struct pg_itm_in
 #define SECS_PER_MINUTE 60
 #define MINS_PER_HOUR	60
 
+#define MSECS_PER_DAY	INT64CONST(86400000)
+
 #define USECS_PER_DAY	INT64CONST(86400000000)
 #define USECS_PER_HOUR	INT64CONST(3600000000)
 #define USECS_PER_MINUTE INT64CONST(60000000)
diff --git a/src/include/utils/timestamp.h b/src/include/utils/timestamp.h
index edf3a97318..91709035da 100644
--- a/src/include/utils/timestamp.h
+++ b/src/include/utils/timestamp.h
@@ -78,6 +78,8 @@ extern bool TimestampDifferenceExceeds(TimestampTz start_time,
 									   TimestampTz stop_time,
 									   int msec);
 
+extern int64 interval_to_ms(const Interval *interval);
+
 extern TimestampTz time_t_to_timestamptz(pg_time_t tm);
 extern pg_time_t timestamptz_to_time_t(TimestampTz t);
 
diff --git a/src/test/regress/expected/subscription.out b/src/test/regress/expected/subscription.out
index ef0ebf96b9..135d555707 100644
--- a/src/test/regress/expected/subscription.out
+++ b/src/test/regress/expected/subscription.out
@@ -77,18 +77,18 @@ ERROR:  unrecognized origin value: "foo"
 CREATE SUBSCRIPTION regress_testsub4 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (slot_name = NONE, connect = false, origin = none);
 WARNING:  tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables
 \dRs+ regress_testsub4
-                                                                                         List of subscriptions
-       Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |          Conninfo           | Skip LSN 
-------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
- regress_testsub4 | regress_subscription_user | f       | {testpub}   | f      | f         | d                | f                | none   | off                | dbname=regress_doesnotexist | 0/0
+                                                                                                List of subscriptions
+       Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Apply delay | Synchronous commit |          Conninfo           | Skip LSN 
+------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------+--------------------+-----------------------------+----------
+ regress_testsub4 | regress_subscription_user | f       | {testpub}   | f      | f         | d                | f                | none   |           0 | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub4 SET (origin = any);
 \dRs+ regress_testsub4
-                                                                                         List of subscriptions
-       Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |          Conninfo           | Skip LSN 
-------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
- regress_testsub4 | regress_subscription_user | f       | {testpub}   | f      | f         | d                | f                | any    | off                | dbname=regress_doesnotexist | 0/0
+                                                                                                List of subscriptions
+       Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Apply delay | Synchronous commit |          Conninfo           | Skip LSN 
+------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------+--------------------+-----------------------------+----------
+ regress_testsub4 | regress_subscription_user | f       | {testpub}   | f      | f         | d                | f                | any    |           0 | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 DROP SUBSCRIPTION regress_testsub3;
@@ -98,10 +98,10 @@ ALTER SUBSCRIPTION regress_testsub CONNECTION 'foobar';
 ERROR:  invalid connection string syntax: missing "=" after "foobar" in connection info string
 
 \dRs+
-                                                                                         List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |          Conninfo           | Skip LSN 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f         | d                | f                | any    | off                | dbname=regress_doesnotexist | 0/0
+                                                                                                List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Apply delay | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f         | d                | f                | any    |           0 | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET PUBLICATION testpub2, testpub3 WITH (refresh = false);
@@ -118,10 +118,10 @@ ERROR:  unrecognized subscription parameter: "create_slot"
 -- ok
 ALTER SUBSCRIPTION regress_testsub SKIP (lsn = '0/12345');
 \dRs+
-                                                                                             List of subscriptions
-      Name       |           Owner           | Enabled |     Publication     | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |           Conninfo           | Skip LSN 
------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+--------------------+------------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub2,testpub3} | f      | f         | d                | f                | any    | off                | dbname=regress_doesnotexist2 | 0/12345
+                                                                                                    List of subscriptions
+      Name       |           Owner           | Enabled |     Publication     | Binary | Streaming | Two-phase commit | Disable on error | Origin | Apply delay | Synchronous commit |           Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------+--------------------+------------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub2,testpub3} | f      | f         | d                | f                | any    |           0 | off                | dbname=regress_doesnotexist2 | 0/12345
 (1 row)
 
 -- ok - with lsn = NONE
@@ -130,10 +130,10 @@ ALTER SUBSCRIPTION regress_testsub SKIP (lsn = NONE);
 ALTER SUBSCRIPTION regress_testsub SKIP (lsn = '0/0');
 ERROR:  invalid WAL location (LSN): 0/0
 \dRs+
-                                                                                             List of subscriptions
-      Name       |           Owner           | Enabled |     Publication     | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |           Conninfo           | Skip LSN 
------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+--------------------+------------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub2,testpub3} | f      | f         | d                | f                | any    | off                | dbname=regress_doesnotexist2 | 0/0
+                                                                                                    List of subscriptions
+      Name       |           Owner           | Enabled |     Publication     | Binary | Streaming | Two-phase commit | Disable on error | Origin | Apply delay | Synchronous commit |           Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------+--------------------+------------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub2,testpub3} | f      | f         | d                | f                | any    |           0 | off                | dbname=regress_doesnotexist2 | 0/0
 (1 row)
 
 BEGIN;
@@ -165,10 +165,10 @@ ALTER SUBSCRIPTION regress_testsub_foo SET (synchronous_commit = foobar);
 ERROR:  invalid value for parameter "synchronous_commit": "foobar"
 HINT:  Available values: local, remote_write, remote_apply, on, off.
 \dRs+
-                                                                                               List of subscriptions
-        Name         |           Owner           | Enabled |     Publication     | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |           Conninfo           | Skip LSN 
----------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+--------------------+------------------------------+----------
- regress_testsub_foo | regress_subscription_user | f       | {testpub2,testpub3} | f      | f         | d                | f                | any    | local              | dbname=regress_doesnotexist2 | 0/0
+                                                                                                      List of subscriptions
+        Name         |           Owner           | Enabled |     Publication     | Binary | Streaming | Two-phase commit | Disable on error | Origin | Apply delay | Synchronous commit |           Conninfo           | Skip LSN 
+---------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------+--------------------+------------------------------+----------
+ regress_testsub_foo | regress_subscription_user | f       | {testpub2,testpub3} | f      | f         | d                | f                | any    |           0 | local              | dbname=regress_doesnotexist2 | 0/0
 (1 row)
 
 -- rename back to keep the rest simple
@@ -201,19 +201,19 @@ ERROR:  binary requires a Boolean value
 CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, binary = true);
 WARNING:  tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables
 \dRs+
-                                                                                         List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |          Conninfo           | Skip LSN 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub}   | t      | f         | d                | f                | any    | off                | dbname=regress_doesnotexist | 0/0
+                                                                                                List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Apply delay | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | t      | f         | d                | f                | any    |           0 | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (binary = false);
 ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
 \dRs+
-                                                                                         List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |          Conninfo           | Skip LSN 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f         | d                | f                | any    | off                | dbname=regress_doesnotexist | 0/0
+                                                                                                List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Apply delay | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f         | d                | f                | any    |           0 | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 DROP SUBSCRIPTION regress_testsub;
@@ -224,19 +224,19 @@ ERROR:  streaming requires a Boolean value
 CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, streaming = true);
 WARNING:  tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables
 \dRs+
-                                                                                         List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |          Conninfo           | Skip LSN 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | t         | d                | f                | any    | off                | dbname=regress_doesnotexist | 0/0
+                                                                                                List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Apply delay | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | t         | d                | f                | any    |           0 | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (streaming = false);
 ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
 \dRs+
-                                                                                         List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |          Conninfo           | Skip LSN 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f         | d                | f                | any    | off                | dbname=regress_doesnotexist | 0/0
+                                                                                                List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Apply delay | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f         | d                | f                | any    |           0 | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 -- fail - publication already exists
@@ -251,10 +251,10 @@ ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub1, testpub2 WITH (refr
 ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub1, testpub2 WITH (refresh = false);
 ERROR:  publication "testpub1" is already in subscription "regress_testsub"
 \dRs+
-                                                                                                 List of subscriptions
-      Name       |           Owner           | Enabled |         Publication         | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |          Conninfo           | Skip LSN 
------------------+---------------------------+---------+-----------------------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub,testpub1,testpub2} | f      | f         | d                | f                | any    | off                | dbname=regress_doesnotexist | 0/0
+                                                                                                        List of subscriptions
+      Name       |           Owner           | Enabled |         Publication         | Binary | Streaming | Two-phase commit | Disable on error | Origin | Apply delay | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-----------------------------+--------+-----------+------------------+------------------+--------+-------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub,testpub1,testpub2} | f      | f         | d                | f                | any    |           0 | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 -- fail - publication used more then once
@@ -269,10 +269,10 @@ ERROR:  publication "testpub3" is not in subscription "regress_testsub"
 -- ok - delete publications
 ALTER SUBSCRIPTION regress_testsub DROP PUBLICATION testpub1, testpub2 WITH (refresh = false);
 \dRs+
-                                                                                         List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |          Conninfo           | Skip LSN 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f         | d                | f                | any    | off                | dbname=regress_doesnotexist | 0/0
+                                                                                                List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Apply delay | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f         | d                | f                | any    |           0 | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 DROP SUBSCRIPTION regress_testsub;
@@ -306,10 +306,10 @@ ERROR:  two_phase requires a Boolean value
 CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, two_phase = true);
 WARNING:  tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables
 \dRs+
-                                                                                         List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |          Conninfo           | Skip LSN 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f         | p                | f                | any    | off                | dbname=regress_doesnotexist | 0/0
+                                                                                                List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Apply delay | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f         | p                | f                | any    |           0 | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 --fail - alter of two_phase option not supported.
@@ -318,10 +318,10 @@ ERROR:  unrecognized subscription parameter: "two_phase"
 -- but can alter streaming when two_phase enabled
 ALTER SUBSCRIPTION regress_testsub SET (streaming = true);
 \dRs+
-                                                                                         List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |          Conninfo           | Skip LSN 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | t         | p                | f                | any    | off                | dbname=regress_doesnotexist | 0/0
+                                                                                                List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Apply delay | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | t         | p                | f                | any    |           0 | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
@@ -330,10 +330,10 @@ DROP SUBSCRIPTION regress_testsub;
 CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, streaming = true, two_phase = true);
 WARNING:  tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables
 \dRs+
-                                                                                         List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |          Conninfo           | Skip LSN 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | t         | p                | f                | any    | off                | dbname=regress_doesnotexist | 0/0
+                                                                                                List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Apply delay | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | t         | p                | f                | any    |           0 | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
@@ -345,18 +345,47 @@ ERROR:  disable_on_error requires a Boolean value
 CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, disable_on_error = false);
 WARNING:  tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables
 \dRs+
-                                                                                         List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |          Conninfo           | Skip LSN 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f         | d                | f                | any    | off                | dbname=regress_doesnotexist | 0/0
+                                                                                                List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Apply delay | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f         | d                | f                | any    |           0 | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (disable_on_error = true);
 \dRs+
-                                                                                         List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |          Conninfo           | Skip LSN 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f         | d                | t                | any    | off                | dbname=regress_doesnotexist | 0/0
+                                                                                                List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Apply delay | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f         | d                | t                | any    |           0 | off                | dbname=regress_doesnotexist | 0/0
+(1 row)
+
+ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
+DROP SUBSCRIPTION regress_testsub;
+-- fail -- min_apply_delay must be a non-negative integer
+CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, min_apply_delay = foo);
+ERROR:  invalid input syntax for type interval: "foo"
+CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, min_apply_delay = -1);
+ERROR:  option "min_apply_delay" must not be negative
+-- success -- 123 s
+CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, min_apply_delay = 123);
+WARNING:  tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables
+\dRs+
+                                                                                                List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Apply delay | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f         | d                | f                | any    |      123000 | off                | dbname=regress_doesnotexist | 0/0
+(1 row)
+
+ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
+DROP SUBSCRIPTION regress_testsub;
+-- success -- interval is converted into ms and stored as integer
+CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, min_apply_delay = '4h 27min 35s');
+WARNING:  tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables
+\dRs+
+                                                                                                List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Apply delay | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f         | d                | f                | any    |    16055000 | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
diff --git a/src/test/regress/sql/subscription.sql b/src/test/regress/sql/subscription.sql
index 4425fafc46..fba4223aa8 100644
--- a/src/test/regress/sql/subscription.sql
+++ b/src/test/regress/sql/subscription.sql
@@ -264,6 +264,26 @@ ALTER SUBSCRIPTION regress_testsub SET (disable_on_error = true);
 ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
 DROP SUBSCRIPTION regress_testsub;
 
+-- fail -- min_apply_delay must be a non-negative integer
+CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, min_apply_delay = foo);
+CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, min_apply_delay = -1);
+
+-- success -- 123 s
+CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, min_apply_delay = 123);
+
+\dRs+
+
+ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
+DROP SUBSCRIPTION regress_testsub;
+
+-- success -- interval is converted into ms and stored as integer
+CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, min_apply_delay = '4h 27min 35s');
+
+\dRs+
+
+ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
+DROP SUBSCRIPTION regress_testsub;
+
 RESET SESSION AUTHORIZATION;
 DROP ROLE regress_subscription_user;
 DROP ROLE regress_subscription_user2;
diff --git a/src/test/subscription/t/032_apply_delay.pl b/src/test/subscription/t/032_apply_delay.pl
new file mode 100644
index 0000000000..fcdab4a8b0
--- /dev/null
+++ b/src/test/subscription/t/032_apply_delay.pl
@@ -0,0 +1,130 @@
+
+# Copyright (c) 2022, PostgreSQL Global Development Group
+
+# Test replication apply delay
+use strict;
+use warnings;
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+# Create publisher node.
+my $node_publisher = PostgreSQL::Test::Cluster->new('publisher');
+$node_publisher->init(allows_streaming => 'logical');
+$node_publisher->append_conf('postgresql.conf',
+	'logical_decoding_work_mem = 64kB');
+$node_publisher->start;
+
+# Create subscriber node.
+my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
+$node_subscriber->init;
+$node_subscriber->append_conf('postgresql.conf',
+	"log_min_messages = debug2");
+$node_subscriber->start;
+
+# Create some preexisting content on publisher.
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE test_tab (a int primary key, b varchar)");
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO test_tab VALUES (1, 'foo'), (2, 'bar')");
+
+# Setup structure on subscriber.
+$node_subscriber->safe_psql('postgres',
+	"CREATE TABLE test_tab (a int primary key, b text, c timestamptz DEFAULT now(), d bigint DEFAULT 999)"
+);
+
+# Setup logical replication.
+my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
+$node_publisher->safe_psql('postgres',
+	"CREATE PUBLICATION tap_pub FOR TABLE test_tab");
+
+my $appname = 'tap_sub';
+$node_subscriber->safe_psql('postgres',
+	"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub WITH (streaming = on, min_apply_delay = '2s')"
+);
+
+$node_publisher->wait_for_catchup($appname);
+
+# Check log starting now for logical replication apply delay.
+my $log_location = -s $node_subscriber->logfile;
+
+# Wait for initial table sync to finish.
+my $synced_query =
+  "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');";
+$node_subscriber->poll_query_until('postgres', $synced_query)
+  or die "Timed out while waiting for subscriber to synchronize data";
+
+my $result =
+  $node_subscriber->safe_psql('postgres',
+	"SELECT count(*), min(a), max(a) FROM test_tab");
+is($result, qq(2|1|2), 'check initial data was copied to subscriber');
+
+# New row to trigger apply delay.
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO test_tab VALUES (3, 'baz')");
+
+$node_publisher->wait_for_catchup($appname);
+
+$result =
+  $node_subscriber->safe_psql('postgres',
+	"SELECT count(*), min(a), max(a) FROM test_tab");
+is($result, qq(3|1|3), 'check if the new row was applied to subscriber');
+
+check_apply_delay_log("logical replication apply delay");
+
+# Test streamed transaction.
+# Insert, update and delete enough rows to exceed 64kB limit.
+$node_publisher->safe_psql(
+	'postgres', q{
+BEGIN;
+INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(4, 5000) s(i);
+UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0;
+DELETE FROM test_tab WHERE mod(a,3) = 0;
+COMMIT;
+});
+
+$node_publisher->wait_for_catchup($appname);
+
+$result =
+  $node_subscriber->safe_psql('postgres',
+	"SELECT count(*), min(a), max(a) FROM test_tab");
+is($result, qq(3334|1|5000), 'check if the new rows were applied to subscriber');
+
+check_apply_delay_log("logical replication apply delay");
+
+# Test ALTER SUBSCRIPTION. Delay 86460 seconds (1 day 1 minute).
+$node_subscriber->safe_psql('postgres',
+	"ALTER SUBSCRIPTION tap_sub SET (min_apply_delay = 86460)"
+);
+
+# New row to trigger apply delay.
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO test_tab VALUES (0, 'foobar')");
+
+# Disable subscription. worker should die immediately.
+$node_subscriber->safe_psql('postgres',
+	"ALTER SUBSCRIPTION tap_sub DISABLE"
+);
+
+# Wait until worker dies.
+my $sub_query =
+  "SELECT count(1) = 0 FROM pg_stat_subscription WHERE subname = 'tap_sub' AND pid IS NOT NULL;";
+$node_subscriber->poll_query_until('postgres', $sub_query)
+  or die "Timed out while waiting for subscriber to die";
+
+$node_subscriber->stop;
+$node_publisher->stop;
+
+done_testing();
+
+sub check_apply_delay_log
+{
+	my $message          = shift;
+	my $old_log_location = $log_location;
+
+	$log_location = $node_subscriber->wait_for_log(qr/$message/, $log_location);
+
+	cmp_ok($log_location, '>', $old_log_location,
+		"logfile contains triggered logical replication apply delay"
+	);
+}
-- 
2.30.2

Reply via email to