From 21accecb2fd1a7b2636e4f5dc68b6a618550b207 Mon Sep 17 00:00:00 2001
From: Takamichi Osumi <osumi.takamichi@fujitsu.com>
Date: Wed, 15 Feb 2023 04:08:12 +0000
Subject: [PATCH] Time-delayed logical replication on publisher side

Similar to physical replication, a time-delayed copy of the data for
logical replication is useful for some scenarios (particularly to fix
errors that might cause data loss).

This patch implements a new subscription parameter called 'min_apply_delay'.

If the subscription sets min_apply_delay parameter, an apply worker passes the
value to the publisher as an output plugin option. And then, the walsender will
delay the transaction sending for given milliseconds.

The delay does not take into account the overhead of time spent in transferring
the transaction, which means that the arrival time at the subscriber may be
delayed more than the given time.

The delay occurs before we start to send the transaction on the publisher.
Regular and prepared transactions are covered. Streamed transactions are also
covered.

Author: Euler Taveira, Takamichi Osumi, Kuroda Hayato
Reviewed-by: Amit Kapila, Peter Smith, Vignesh C, Shveta Malik,
             Kyotaro Horiguchi, Shi Yu, Wang Wei, Dilip Kumar, Melih Mutlu
---
 doc/src/sgml/catalogs.sgml                    |   9 +
 doc/src/sgml/glossary.sgml                    |  15 ++
 doc/src/sgml/logical-replication.sgml         |   6 +
 doc/src/sgml/ref/alter_subscription.sgml      |   5 +-
 doc/src/sgml/ref/create_subscription.sgml     |  34 +++-
 src/backend/catalog/pg_subscription.c         |   1 +
 src/backend/catalog/system_views.sql          |   7 +-
 src/backend/commands/subscriptioncmds.c       |  70 ++++++-
 .../libpqwalreceiver/libpqwalreceiver.c       |   5 +
 src/backend/replication/logical/logical.c     |  27 ++-
 .../replication/logical/logicalfuncs.c        |   2 +-
 src/backend/replication/logical/worker.c      |  12 +-
 src/backend/replication/pgoutput/pgoutput.c   |  41 ++++-
 src/backend/replication/slotfuncs.c           |   4 +-
 src/backend/replication/walsender.c           |  79 +++++++-
 src/bin/pg_dump/pg_dump.c                     |  15 +-
 src/bin/pg_dump/pg_dump.h                     |   1 +
 src/bin/psql/describe.c                       |   9 +-
 src/bin/psql/tab-complete.c                   |   4 +-
 src/include/catalog/pg_subscription.h         |   3 +
 src/include/replication/logical.h             |  11 +-
 src/include/replication/output_plugin.h       |   1 +
 src/include/replication/pgoutput.h            |   1 +
 src/include/replication/walreceiver.h         |   1 +
 src/test/regress/expected/subscription.out    | 171 ++++++++++--------
 src/test/regress/sql/subscription.sql         |  15 ++
 src/test/subscription/t/001_rep_changes.pl    |  28 +++
 27 files changed, 473 insertions(+), 104 deletions(-)

diff --git a/doc/src/sgml/catalogs.sgml b/doc/src/sgml/catalogs.sgml
index c1e4048054..5dc5ca1133 100644
--- a/doc/src/sgml/catalogs.sgml
+++ b/doc/src/sgml/catalogs.sgml
@@ -7873,6 +7873,15 @@ SCRAM-SHA-256$<replaceable>&lt;iteration count&gt;</replaceable>:<replaceable>&l
       </para></entry>
      </row>
 
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>subminapplydelay</structfield> <type>int4</type>
+      </para>
+      <para>
+       The minimum delay, in milliseconds, for applying changes
+      </para></entry>
+     </row>
+
      <row>
       <entry role="catalog_table_entry"><para role="column_definition">
        <structfield>subname</structfield> <type>name</type>
diff --git a/doc/src/sgml/glossary.sgml b/doc/src/sgml/glossary.sgml
index 7c01a541fe..9ede9d05f6 100644
--- a/doc/src/sgml/glossary.sgml
+++ b/doc/src/sgml/glossary.sgml
@@ -1729,6 +1729,21 @@
    </glossdef>
   </glossentry>
 
+  <glossentry id="glossary-time-delayed-replication">
+   <glossterm>Time-delayed replication</glossterm>
+   <glossdef>
+    <para>
+     Replication setup that delays the application of changes by a specified
+     minimum time-delay period.
+    </para>
+    <para>
+     For more information, see
+     <xref linkend="guc-recovery-min-apply-delay"/> for physical replication
+     and <xref linkend="sql-createsubscription"/> for logical replication.
+    </para>
+   </glossdef>
+  </glossentry>
+
   <glossentry id="glossary-toast">
    <glossterm>TOAST</glossterm>
    <glossdef>
diff --git a/doc/src/sgml/logical-replication.sgml b/doc/src/sgml/logical-replication.sgml
index 1bd5660c87..6bd5f61e2b 100644
--- a/doc/src/sgml/logical-replication.sgml
+++ b/doc/src/sgml/logical-replication.sgml
@@ -247,6 +247,12 @@
    target table.
   </para>
 
+  <para>
+   A subscription can delay the application of changes by specifying the
+   <literal>min_apply_delay</literal> subscription parameter. See
+   <xref linkend="sql-createsubscription"/> for details.
+  </para>
+
   <sect2 id="logical-replication-subscription-slot">
    <title>Replication Slot Management</title>
 
diff --git a/doc/src/sgml/ref/alter_subscription.sgml b/doc/src/sgml/ref/alter_subscription.sgml
index 964fcbb8ff..8b7eb28e54 100644
--- a/doc/src/sgml/ref/alter_subscription.sgml
+++ b/doc/src/sgml/ref/alter_subscription.sgml
@@ -213,8 +213,9 @@ ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> RENAME TO <
       are <literal>slot_name</literal>,
       <literal>synchronous_commit</literal>,
       <literal>binary</literal>, <literal>streaming</literal>,
-      <literal>disable_on_error</literal>, and
-      <literal>origin</literal>.
+      <literal>disable_on_error</literal>,
+      <literal>origin</literal>, and
+      <literal>min_apply_delay</literal>.
      </para>
     </listitem>
    </varlistentry>
diff --git a/doc/src/sgml/ref/create_subscription.sgml b/doc/src/sgml/ref/create_subscription.sgml
index 51c45f17c7..cda8a91aba 100644
--- a/doc/src/sgml/ref/create_subscription.sgml
+++ b/doc/src/sgml/ref/create_subscription.sgml
@@ -349,7 +349,39 @@ CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceabl
          </para>
         </listitem>
        </varlistentry>
-      </variablelist></para>
+
+       <varlistentry>
+        <term><literal>min_apply_delay</literal> (<type>integer</type>)</term>
+        <listitem>
+         <para>
+          By default, the publisher sends changes as soon as possible. This
+          parameter allows the user to delay the publisher to send changes by
+          given time period. If the value is specified without units, it is
+          taken as milliseconds. The default is zero (no delay). See
+          <xref linkend="config-setting-names-values"/> for details on the
+          available valid time units.
+         </para>
+         <para>
+          Any delay becomes effective only after all initial table
+          synchronization has finished and occurs before each transaction
+          starts to get applied on the subscriber. The delay does not take into
+          account the overhead of time spent in transferring the transaction,
+          which means that the arrival time at the subscriber may be delayed
+          more than the given time.
+         </para>
+         <warning>
+           <para>
+            Delaying the replication means there is a much longer time between
+            making a change on the publisher, and that change being committed
+            on the subscriber. This can impact the performance of synchronous
+            replication. See <xref linkend="guc-synchronous-commit"/>
+            parameter.
+           </para>
+         </warning>
+        </listitem>
+       </varlistentry>
+      </variablelist>
+     </para>
 
     </listitem>
    </varlistentry>
diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c
index a56ae311c3..e19e5cbca2 100644
--- a/src/backend/catalog/pg_subscription.c
+++ b/src/backend/catalog/pg_subscription.c
@@ -66,6 +66,7 @@ GetSubscription(Oid subid, bool missing_ok)
 	sub->skiplsn = subform->subskiplsn;
 	sub->name = pstrdup(NameStr(subform->subname));
 	sub->owner = subform->subowner;
+	sub->minapplydelay = subform->subminapplydelay;
 	sub->enabled = subform->subenabled;
 	sub->binary = subform->subbinary;
 	sub->stream = subform->substream;
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index 34ca0e739f..7578b80c07 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -1314,9 +1314,10 @@ REVOKE ALL ON pg_replication_origin_status FROM public;
 
 -- All columns of pg_subscription except subconninfo are publicly readable.
 REVOKE ALL ON pg_subscription FROM public;
-GRANT SELECT (oid, subdbid, subskiplsn, subname, subowner, subenabled,
-              subbinary, substream, subtwophasestate, subdisableonerr,
-              subslotname, subsynccommit, subpublications, suborigin)
+GRANT SELECT (oid, subdbid, subskiplsn, subminapplydelay, subname, subowner,
+              subenabled, subbinary, substream, subtwophasestate,
+              subdisableonerr, subslotname, subsynccommit, subpublications,
+              suborigin)
     ON pg_subscription TO public;
 
 CREATE VIEW pg_stat_subscription_stats AS
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 464db6d247..eef595afcf 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -66,6 +66,7 @@
 #define SUBOPT_DISABLE_ON_ERR		0x00000400
 #define SUBOPT_LSN					0x00000800
 #define SUBOPT_ORIGIN				0x00001000
+#define SUBOPT_MIN_APPLY_DELAY		0x00002000
 
 /* check if the 'val' has 'bits' set */
 #define IsSet(val, bits)  (((val) & (bits)) == (bits))
@@ -90,6 +91,7 @@ typedef struct SubOpts
 	bool		disableonerr;
 	char	   *origin;
 	XLogRecPtr	lsn;
+	int32		min_apply_delay;
 } SubOpts;
 
 static List *fetch_table_list(WalReceiverConn *wrconn, List *publications);
@@ -100,7 +102,7 @@ static void check_publications_origin(WalReceiverConn *wrconn,
 static void check_duplicates_in_publist(List *publist, Datum *datums);
 static List *merge_publications(List *oldpublist, List *newpublist, bool addpub, const char *subname);
 static void ReportSlotConnectionError(List *rstates, Oid subid, char *slotname, char *err);
-
+static int32 defGetMinApplyDelay(DefElem *def);
 
 /*
  * Common option parsing function for CREATE and ALTER SUBSCRIPTION commands.
@@ -146,6 +148,8 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
 		opts->disableonerr = false;
 	if (IsSet(supported_opts, SUBOPT_ORIGIN))
 		opts->origin = pstrdup(LOGICALREP_ORIGIN_ANY);
+	if (IsSet(supported_opts, SUBOPT_MIN_APPLY_DELAY))
+		opts->min_apply_delay = 0;
 
 	/* Parse options */
 	foreach(lc, stmt_options)
@@ -324,6 +328,15 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
 			opts->specified_opts |= SUBOPT_LSN;
 			opts->lsn = lsn;
 		}
+		else if (IsSet(supported_opts, SUBOPT_MIN_APPLY_DELAY) &&
+				 strcmp(defel->defname, "min_apply_delay") == 0)
+		{
+			if (IsSet(opts->specified_opts, SUBOPT_MIN_APPLY_DELAY))
+				errorConflictingDefElem(defel, pstate);
+
+			opts->specified_opts |= SUBOPT_MIN_APPLY_DELAY;
+			opts->min_apply_delay = defGetMinApplyDelay(defel);
+		}
 		else
 			ereport(ERROR,
 					(errcode(ERRCODE_SYNTAX_ERROR),
@@ -560,7 +573,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 					  SUBOPT_SLOT_NAME | SUBOPT_COPY_DATA |
 					  SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY |
 					  SUBOPT_STREAMING | SUBOPT_TWOPHASE_COMMIT |
-					  SUBOPT_DISABLE_ON_ERR | SUBOPT_ORIGIN);
+					  SUBOPT_DISABLE_ON_ERR | SUBOPT_ORIGIN |
+					  SUBOPT_MIN_APPLY_DELAY);
 	parse_subscription_options(pstate, stmt->options, supported_opts, &opts);
 
 	/*
@@ -625,6 +639,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 	values[Anum_pg_subscription_oid - 1] = ObjectIdGetDatum(subid);
 	values[Anum_pg_subscription_subdbid - 1] = ObjectIdGetDatum(MyDatabaseId);
 	values[Anum_pg_subscription_subskiplsn - 1] = LSNGetDatum(InvalidXLogRecPtr);
+	values[Anum_pg_subscription_subminapplydelay - 1] = Int32GetDatum(opts.min_apply_delay);
 	values[Anum_pg_subscription_subname - 1] =
 		DirectFunctionCall1(namein, CStringGetDatum(stmt->subname));
 	values[Anum_pg_subscription_subowner - 1] = ObjectIdGetDatum(owner);
@@ -1054,7 +1069,7 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 				supported_opts = (SUBOPT_SLOT_NAME |
 								  SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY |
 								  SUBOPT_STREAMING | SUBOPT_DISABLE_ON_ERR |
-								  SUBOPT_ORIGIN);
+								  SUBOPT_ORIGIN | SUBOPT_MIN_APPLY_DELAY);
 
 				parse_subscription_options(pstate, stmt->options,
 										   supported_opts, &opts);
@@ -1111,6 +1126,13 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 						= true;
 				}
 
+				if (IsSet(opts.specified_opts, SUBOPT_MIN_APPLY_DELAY))
+				{
+					values[Anum_pg_subscription_subminapplydelay - 1] =
+						Int32GetDatum(opts.min_apply_delay);
+					replaces[Anum_pg_subscription_subminapplydelay - 1] = true;
+				}
+
 				if (IsSet(opts.specified_opts, SUBOPT_ORIGIN))
 				{
 					values[Anum_pg_subscription_suborigin - 1] =
@@ -2195,3 +2217,45 @@ defGetStreamingMode(DefElem *def)
 					def->defname)));
 	return LOGICALREP_STREAM_OFF;	/* keep compiler quiet */
 }
+
+/*
+ * Extract the min_apply_delay value from a DefElem. This is very similar to
+ * parse_and_validate_value() for integer values, because min_apply_delay
+ * accepts the same parameter format as recovery_min_apply_delay.
+ */
+static int32
+defGetMinApplyDelay(DefElem *def)
+{
+	char	   *input_string;
+	int			result;
+	const char *hintmsg;
+
+	input_string = defGetString(def);
+
+	/*
+	 * Parse given string as parameter which has millisecond unit
+	 */
+	if (!parse_int(input_string, &result, GUC_UNIT_MS, &hintmsg))
+		ereport(ERROR,
+				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+				 errmsg("invalid value for parameter \"%s\": \"%s\"",
+						"min_apply_delay", input_string),
+				 hintmsg ? errhint("%s", _(hintmsg)) : 0));
+
+	/*
+	 * Check both the lower boundary for the valid min_apply_delay range and
+	 * the upper boundary as the safeguard for some platforms where INT_MAX is
+	 * wider than int32 respectively. Although parse_int() has confirmed that
+	 * the result is less than or equal to INT_MAX, the value will be stored
+	 * in a catalog column of int32.
+	 */
+	if (result < 0 || result > PG_INT32_MAX)
+		ereport(ERROR,
+				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+				 errmsg("%d ms is outside the valid range for parameter \"%s\" (%d .. %d)",
+						result,
+						"min_apply_delay",
+						0, PG_INT32_MAX)));
+
+	return result;
+}
diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
index 560ec974fa..ec0885ba73 100644
--- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
+++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
@@ -443,6 +443,11 @@ libpqrcv_startstreaming(WalReceiverConn *conn,
 			PQserverVersion(conn->streamConn) >= 140000)
 			appendStringInfoString(&cmd, ", binary 'true'");
 
+		if (options->proto.logical.min_apply_delay > 0 &&
+			PQserverVersion(conn->streamConn) >= 160000)
+			appendStringInfo(&cmd, ", min_apply_delay '%d'",
+							 options->proto.logical.min_apply_delay);
+
 		appendStringInfoChar(&cmd, ')');
 	}
 	else
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index c3ec97a0a6..b347940eda 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -156,7 +156,8 @@ StartupDecodingContext(List *output_plugin_options,
 					   XLogReaderRoutine *xl_routine,
 					   LogicalOutputPluginWriterPrepareWrite prepare_write,
 					   LogicalOutputPluginWriterWrite do_write,
-					   LogicalOutputPluginWriterUpdateProgress update_progress)
+					   LogicalOutputPluginWriterUpdateProgress update_progress,
+					   LogicalOutputPluginDelay delay)
 {
 	ReplicationSlot *slot;
 	MemoryContext context,
@@ -293,6 +294,7 @@ StartupDecodingContext(List *output_plugin_options,
 	ctx->prepare_write = prepare_write;
 	ctx->write = do_write;
 	ctx->update_progress = update_progress;
+	ctx->delay = delay;
 
 	ctx->output_plugin_options = output_plugin_options;
 
@@ -316,7 +318,7 @@ StartupDecodingContext(List *output_plugin_options,
  *		marking WAL reserved beforehand.  In that scenario, it's up to the
  *		caller to guarantee that WAL remains available.
  * xl_routine -- XLogReaderRoutine for underlying XLogReader
- * prepare_write, do_write, update_progress --
+ * prepare_write, do_write, update_progress, delay --
  *		callbacks that perform the use-case dependent, actual, work.
  *
  * Needs to be called while in a memory context that's at least as long lived
@@ -334,7 +336,8 @@ CreateInitDecodingContext(const char *plugin,
 						  XLogReaderRoutine *xl_routine,
 						  LogicalOutputPluginWriterPrepareWrite prepare_write,
 						  LogicalOutputPluginWriterWrite do_write,
-						  LogicalOutputPluginWriterUpdateProgress update_progress)
+						  LogicalOutputPluginWriterUpdateProgress update_progress,
+						  LogicalOutputPluginDelay delay)
 {
 	TransactionId xmin_horizon = InvalidTransactionId;
 	ReplicationSlot *slot;
@@ -435,7 +438,7 @@ CreateInitDecodingContext(const char *plugin,
 	ctx = StartupDecodingContext(NIL, restart_lsn, xmin_horizon,
 								 need_full_snapshot, false,
 								 xl_routine, prepare_write, do_write,
-								 update_progress);
+								 update_progress, delay);
 
 	/* call output plugin initialization callback */
 	old_context = MemoryContextSwitchTo(ctx->context);
@@ -475,7 +478,7 @@ CreateInitDecodingContext(const char *plugin,
  * xl_routine
  *		XLogReaderRoutine used by underlying xlogreader
  *
- * prepare_write, do_write, update_progress
+ * prepare_write, do_write, update_progress, delay
  *		callbacks that have to be filled to perform the use-case dependent,
  *		actual work.
  *
@@ -493,7 +496,8 @@ CreateDecodingContext(XLogRecPtr start_lsn,
 					  XLogReaderRoutine *xl_routine,
 					  LogicalOutputPluginWriterPrepareWrite prepare_write,
 					  LogicalOutputPluginWriterWrite do_write,
-					  LogicalOutputPluginWriterUpdateProgress update_progress)
+					  LogicalOutputPluginWriterUpdateProgress update_progress,
+					  LogicalOutputPluginDelay delay)
 {
 	LogicalDecodingContext *ctx;
 	ReplicationSlot *slot;
@@ -547,7 +551,7 @@ CreateDecodingContext(XLogRecPtr start_lsn,
 	ctx = StartupDecodingContext(output_plugin_options,
 								 start_lsn, InvalidTransactionId, false,
 								 fast_forward, xl_routine, prepare_write,
-								 do_write, update_progress);
+								 do_write, update_progress, delay);
 
 	/* call output plugin initialization callback */
 	old_context = MemoryContextSwitchTo(ctx->context);
@@ -1922,3 +1926,12 @@ UpdateDecodingStats(LogicalDecodingContext *ctx)
 	rb->totalTxns = 0;
 	rb->totalBytes = 0;
 }
+
+void
+OutputPluginDelay(struct LogicalDecodingContext *ctx, int32 min_apply_delay)
+{
+	if (!ctx->delay)
+		return;
+
+	ctx->delay(ctx, min_apply_delay);
+}
diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c
index fa1b641a2b..960025197f 100644
--- a/src/backend/replication/logical/logicalfuncs.c
+++ b/src/backend/replication/logical/logicalfuncs.c
@@ -212,7 +212,7 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
 											   .segment_open = wal_segment_open,
 											   .segment_close = wal_segment_close),
 									LogicalOutputPrepareWrite,
-									LogicalOutputWrite, NULL);
+									LogicalOutputWrite, NULL, NULL);
 
 		/*
 		 * After the sanity checks in CreateDecodingContext, make sure the
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index cfb2ab6248..749536dd5d 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -3898,7 +3898,8 @@ maybe_reread_subscription(void)
 		newsub->stream != MySubscription->stream ||
 		strcmp(newsub->origin, MySubscription->origin) != 0 ||
 		newsub->owner != MySubscription->owner ||
-		!equal(newsub->publications, MySubscription->publications))
+		!equal(newsub->publications, MySubscription->publications) ||
+		newsub->minapplydelay != MySubscription->minapplydelay)
 	{
 		if (am_parallel_apply_worker())
 			ereport(LOG,
@@ -4617,9 +4618,18 @@ ApplyWorkerMain(Datum main_arg)
 
 	options.proto.logical.twophase = false;
 	options.proto.logical.origin = pstrdup(MySubscription->origin);
+	options.proto.logical.min_apply_delay = 0;
 
 	if (!am_tablesync_worker())
 	{
+		/*
+		 * Time-delayed logical replication does not support tablesync
+		 * workers, so only the leader apply worker can request walsenders to
+		 * apply delay on the publisher side.
+		 */
+		if (server_version >= 160000 && MySubscription->minapplydelay > 0)
+			options.proto.logical.min_apply_delay = MySubscription->minapplydelay;
+
 		/*
 		 * Even when the two_phase mode is requested by the user, it remains
 		 * as the tri-state PENDING until all tablesyncs have reached READY
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 98377c094b..300500abcd 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -285,6 +285,7 @@ parse_output_parameters(List *options, PGOutputData *data)
 	bool		streaming_given = false;
 	bool		two_phase_option_given = false;
 	bool		origin_option_given = false;
+	bool		min_apply_delay_option_given = false;
 
 	data->binary = false;
 	data->streaming = LOGICALREP_STREAM_OFF;
@@ -396,6 +397,32 @@ parse_output_parameters(List *options, PGOutputData *data)
 						errcode(ERRCODE_INVALID_PARAMETER_VALUE),
 						errmsg("unrecognized origin value: \"%s\"", data->origin));
 		}
+		else if (strcmp(defel->defname, "min_apply_delay") == 0)
+		{
+			unsigned long parsed;
+			char	   *endptr;
+
+			if (min_apply_delay_option_given)
+				ereport(ERROR,
+						errcode(ERRCODE_SYNTAX_ERROR),
+						errmsg("conflicting or redundant options"));
+			min_apply_delay_option_given = true;
+
+			errno = 0;
+			parsed = strtoul(strVal(defel->arg), &endptr, 10);
+			if (errno != 0 || *endptr != '\0')
+				ereport(ERROR,
+						(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+						 errmsg("invalid min_apply_delay")));
+
+			if (parsed > PG_INT32_MAX)
+				ereport(ERROR,
+						(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+						 errmsg("min_apply_delay \"%s\" out of range",
+								strVal(defel->arg))));
+
+			data->min_apply_delay = (int32) parsed;
+		}
 		else
 			elog(ERROR, "unrecognized pgoutput option: %s", defel->defname);
 	}
@@ -537,7 +564,6 @@ pgoutput_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
 {
 	PGOutputTxnData *txndata = MemoryContextAllocZero(ctx->context,
 													  sizeof(PGOutputTxnData));
-
 	txn->output_plugin_private = txndata;
 }
 
@@ -551,10 +577,13 @@ pgoutput_send_begin(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
 {
 	bool		send_replication_origin = txn->origin_id != InvalidRepOriginId;
 	PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
+	PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
 
 	Assert(txndata);
 	Assert(!txndata->sent_begin_txn);
 
+	OutputPluginDelay(ctx, data->min_apply_delay);
+
 	OutputPluginPrepareWrite(ctx, !send_replication_origin);
 	logicalrep_write_begin(ctx->out, txn);
 	txndata->sent_begin_txn = true;
@@ -604,7 +633,9 @@ static void
 pgoutput_begin_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
 {
 	bool		send_replication_origin = txn->origin_id != InvalidRepOriginId;
+	PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
 
+	OutputPluginDelay(ctx, data->min_apply_delay);
 	OutputPluginPrepareWrite(ctx, !send_replication_origin);
 	logicalrep_write_begin_prepare(ctx->out, txn);
 
@@ -1810,9 +1841,17 @@ pgoutput_stream_start(struct LogicalDecodingContext *ctx,
 	/*
 	 * If we already sent the first stream for this transaction then don't
 	 * send the origin id in the subsequent streams.
+	 *
+	 * Otherwise, try to delay sending streams
 	 */
 	if (rbtxn_is_streamed(txn))
 		send_replication_origin = false;
+	else
+	{
+		PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
+
+		OutputPluginDelay(ctx, data->min_apply_delay);
+	}
 
 	OutputPluginPrepareWrite(ctx, !send_replication_origin);
 	logicalrep_write_stream_start(ctx->out, txn->xid, !rbtxn_is_streamed(txn));
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index 2f3c964824..522f7600a1 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -148,7 +148,7 @@ create_logical_replication_slot(char *name, char *plugin,
 									XL_ROUTINE(.page_read = read_local_xlog_page,
 											   .segment_open = wal_segment_open,
 											   .segment_close = wal_segment_close),
-									NULL, NULL, NULL);
+									NULL, NULL, NULL, NULL);
 
 	/*
 	 * If caller needs us to determine the decoding start point, do so now.
@@ -481,7 +481,7 @@ pg_logical_replication_slot_advance(XLogRecPtr moveto)
 									XL_ROUTINE(.page_read = read_local_xlog_page,
 											   .segment_open = wal_segment_open,
 											   .segment_close = wal_segment_close),
-									NULL, NULL, NULL);
+									NULL, NULL, NULL, NULL);
 
 		/*
 		 * Start reading at the slot's restart_lsn, which we know to point to
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 75e8363e24..1bad03d91a 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -252,6 +252,7 @@ static void WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, Tran
 static void WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write);
 static void WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
 								 bool skipped_xact);
+static void WalSndDelay(LogicalDecodingContext *ctx, int32 min_apply_delay);
 static XLogRecPtr WalSndWaitForWal(XLogRecPtr loc);
 static void LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time);
 static TimeOffset LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now);
@@ -1126,7 +1127,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
 												   .segment_open = WalSndSegmentOpen,
 												   .segment_close = wal_segment_close),
 										WalSndPrepareWrite, WalSndWriteData,
-										WalSndUpdateProgress);
+										WalSndUpdateProgress, WalSndDelay);
 
 		/*
 		 * Signal that we don't need the timeout mechanism. We're just
@@ -1285,7 +1286,7 @@ StartLogicalReplication(StartReplicationCmd *cmd)
 										 .segment_open = WalSndSegmentOpen,
 										 .segment_close = wal_segment_close),
 							  WalSndPrepareWrite, WalSndWriteData,
-							  WalSndUpdateProgress);
+							  WalSndUpdateProgress, WalSndDelay);
 	xlogreader = logical_decoding_ctx->reader;
 
 	WalSndSetState(WALSNDSTATE_CATCHUP);
@@ -3849,3 +3850,77 @@ LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now)
 	Assert(time != 0);
 	return now - time;
 }
+
+/*
+ * LogicalDecodingContext 'delay' callback.
+ *
+ * Wait long enough to make sure a transaction is applied at least that
+ * period behind the publisher.
+ */
+static void
+WalSndDelay(LogicalDecodingContext *ctx, int32 min_apply_delay)
+{
+	TimestampTz delay_start = GetCurrentTimestamp();
+
+	/* Wait till delayUntil by the latch mechanism */
+	while (true)
+	{
+		TimestampTz delayUntil;
+		long		diffms;
+		long		timeout_interval_ms;
+
+		ResetLatch(MyLatch);
+
+		CHECK_FOR_INTERRUPTS();
+
+		/* This might change wal_sender_timeout */
+		if (ConfigReloadPending)
+		{
+			ConfigReloadPending = false;
+			ProcessConfigFile(PGC_SIGHUP);
+		}
+
+		/* Check for input from the client */
+		ProcessRepliesIfAny();
+
+		/* die if timeout was reached */
+		WalSndCheckTimeOut();
+
+		/* Send keepalive if the time has come */
+		WalSndKeepaliveIfNecessary();
+
+		/* Try to flush pending output to the client */
+		if (pq_flush_if_writable() != 0)
+			WalSndShutdown();
+
+		/*
+		 * If we've requested to shut down, exit the process.
+		 *
+		 * Note that WalSndDone() cannot be used here because the delaying
+		 * changes will be sent in the function.
+		 */
+		if (got_STOPPING)
+			WalSndShutdown();
+
+		delayUntil = TimestampTzPlusMilliseconds(delay_start, min_apply_delay);
+		diffms = TimestampDifferenceMilliseconds(GetCurrentTimestamp(), delayUntil);
+
+		/*
+		 * Exit without arming the latch if it's already past time to send
+		 * this transaction.
+		 */
+		if (diffms <= 0)
+			break;
+
+		/* Sleep until appropriate time. */
+		timeout_interval_ms = WalSndComputeSleeptime(GetCurrentTimestamp());
+
+		elog(DEBUG2, "time-delayed replication for txid %u, delay_time = %d ms, remaining wait time: %ld ms",
+			 ctx->write_xid, (int) min_apply_delay, diffms);
+
+		/* Sleep until we get reply from worker or we time out */
+		WalSndWait(WL_SOCKET_READABLE,
+				   Min(timeout_interval_ms, diffms),
+				   WAIT_EVENT_WAL_SENDER_WRITE_DATA);
+	}
+}
diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c
index 527c7651ab..1e87f0124e 100644
--- a/src/bin/pg_dump/pg_dump.c
+++ b/src/bin/pg_dump/pg_dump.c
@@ -4494,6 +4494,7 @@ getSubscriptions(Archive *fout)
 	int			i_subsynccommit;
 	int			i_subpublications;
 	int			i_subbinary;
+	int			i_subminapplydelay;
 	int			i,
 				ntups;
 
@@ -4546,9 +4547,13 @@ getSubscriptions(Archive *fout)
 						  LOGICALREP_TWOPHASE_STATE_DISABLED);
 
 	if (fout->remoteVersion >= 160000)
-		appendPQExpBufferStr(query, " s.suborigin\n");
+		appendPQExpBufferStr(query,
+							 " s.suborigin,\n"
+							 " s.subminapplydelay\n");
 	else
-		appendPQExpBuffer(query, " '%s' AS suborigin\n", LOGICALREP_ORIGIN_ANY);
+		appendPQExpBuffer(query, " '%s' AS suborigin,\n"
+						  " 0 AS subminapplydelay\n",
+						  LOGICALREP_ORIGIN_ANY);
 
 	appendPQExpBufferStr(query,
 						 "FROM pg_subscription s\n"
@@ -4576,6 +4581,7 @@ getSubscriptions(Archive *fout)
 	i_subtwophasestate = PQfnumber(res, "subtwophasestate");
 	i_subdisableonerr = PQfnumber(res, "subdisableonerr");
 	i_suborigin = PQfnumber(res, "suborigin");
+	i_subminapplydelay = PQfnumber(res, "subminapplydelay");
 
 	subinfo = pg_malloc(ntups * sizeof(SubscriptionInfo));
 
@@ -4606,6 +4612,8 @@ getSubscriptions(Archive *fout)
 		subinfo[i].subdisableonerr =
 			pg_strdup(PQgetvalue(res, i, i_subdisableonerr));
 		subinfo[i].suborigin = pg_strdup(PQgetvalue(res, i, i_suborigin));
+		subinfo[i].subminapplydelay =
+			atoi(PQgetvalue(res, i, i_subminapplydelay));
 
 		/* Decide whether we want to dump it */
 		selectDumpableObject(&(subinfo[i].dobj), fout);
@@ -4687,6 +4695,9 @@ dumpSubscription(Archive *fout, const SubscriptionInfo *subinfo)
 	if (strcmp(subinfo->subsynccommit, "off") != 0)
 		appendPQExpBuffer(query, ", synchronous_commit = %s", fmtId(subinfo->subsynccommit));
 
+	if (subinfo->subminapplydelay > 0)
+		appendPQExpBuffer(query, ", min_apply_delay = '%d ms'", subinfo->subminapplydelay);
+
 	appendPQExpBufferStr(query, ");\n");
 
 	if (subinfo->dobj.dump & DUMP_COMPONENT_DEFINITION)
diff --git a/src/bin/pg_dump/pg_dump.h b/src/bin/pg_dump/pg_dump.h
index e7cbd8d7ed..b8831c3ed3 100644
--- a/src/bin/pg_dump/pg_dump.h
+++ b/src/bin/pg_dump/pg_dump.h
@@ -661,6 +661,7 @@ typedef struct _SubscriptionInfo
 	char	   *subdisableonerr;
 	char	   *suborigin;
 	char	   *subsynccommit;
+	int			subminapplydelay;
 	char	   *subpublications;
 } SubscriptionInfo;
 
diff --git a/src/bin/psql/describe.c b/src/bin/psql/describe.c
index c8a0bb7b3a..81d4607a1c 100644
--- a/src/bin/psql/describe.c
+++ b/src/bin/psql/describe.c
@@ -6472,7 +6472,7 @@ describeSubscriptions(const char *pattern, bool verbose)
 	PGresult   *res;
 	printQueryOpt myopt = pset.popt;
 	static const bool translate_columns[] = {false, false, false, false,
-	false, false, false, false, false, false, false, false};
+	false, false, false, false, false, false, false, false, false};
 
 	if (pset.sversion < 100000)
 	{
@@ -6527,10 +6527,13 @@ describeSubscriptions(const char *pattern, bool verbose)
 							  gettext_noop("Two-phase commit"),
 							  gettext_noop("Disable on error"));
 
+		/* Origin and min_apply_delay are only supported in v16 and higher */
 		if (pset.sversion >= 160000)
 			appendPQExpBuffer(&buf,
-							  ", suborigin AS \"%s\"\n",
-							  gettext_noop("Origin"));
+							  ", suborigin AS \"%s\"\n"
+							  ", subminapplydelay AS \"%s\"\n",
+							  gettext_noop("Origin"),
+							  gettext_noop("Min apply delay"));
 
 		appendPQExpBuffer(&buf,
 						  ",  subsynccommit AS \"%s\"\n"
diff --git a/src/bin/psql/tab-complete.c b/src/bin/psql/tab-complete.c
index 5e1882eaea..e8b9a43a47 100644
--- a/src/bin/psql/tab-complete.c
+++ b/src/bin/psql/tab-complete.c
@@ -1925,7 +1925,7 @@ psql_completion(const char *text, int start, int end)
 		COMPLETE_WITH("(", "PUBLICATION");
 	/* ALTER SUBSCRIPTION <name> SET ( */
 	else if (HeadMatches("ALTER", "SUBSCRIPTION", MatchAny) && TailMatches("SET", "("))
-		COMPLETE_WITH("binary", "disable_on_error", "origin", "slot_name",
+		COMPLETE_WITH("binary", "disable_on_error", "min_apply_delay", "origin", "slot_name",
 					  "streaming", "synchronous_commit");
 	/* ALTER SUBSCRIPTION <name> SKIP ( */
 	else if (HeadMatches("ALTER", "SUBSCRIPTION", MatchAny) && TailMatches("SKIP", "("))
@@ -3268,7 +3268,7 @@ psql_completion(const char *text, int start, int end)
 	/* Complete "CREATE SUBSCRIPTION <name> ...  WITH ( <opt>" */
 	else if (HeadMatches("CREATE", "SUBSCRIPTION") && TailMatches("WITH", "("))
 		COMPLETE_WITH("binary", "connect", "copy_data", "create_slot",
-					  "disable_on_error", "enabled", "origin", "slot_name",
+					  "disable_on_error", "enabled", "min_apply_delay", "origin", "slot_name",
 					  "streaming", "synchronous_commit", "two_phase");
 
 /* CREATE TRIGGER --- is allowed inside CREATE SCHEMA, so use TailMatches */
diff --git a/src/include/catalog/pg_subscription.h b/src/include/catalog/pg_subscription.h
index b0f2a1705d..d1cfefc6d6 100644
--- a/src/include/catalog/pg_subscription.h
+++ b/src/include/catalog/pg_subscription.h
@@ -74,6 +74,8 @@ CATALOG(pg_subscription,6100,SubscriptionRelationId) BKI_SHARED_RELATION BKI_ROW
 
 	Oid			subowner BKI_LOOKUP(pg_authid); /* Owner of the subscription */
 
+	int32		subminapplydelay;	/* Replication apply delay (ms) */
+
 	bool		subenabled;		/* True if the subscription is enabled (the
 								 * worker should be running) */
 
@@ -122,6 +124,7 @@ typedef struct Subscription
 								 * skipped */
 	char	   *name;			/* Name of the subscription */
 	Oid			owner;			/* Oid of the subscription owner */
+	int32		minapplydelay;	/* Replication apply delay (ms) */
 	bool		enabled;		/* Indicates if the subscription is enabled */
 	bool		binary;			/* Indicates if the subscription wants data in
 								 * binary format */
diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h
index 5f49554ea0..9470af1fda 100644
--- a/src/include/replication/logical.h
+++ b/src/include/replication/logical.h
@@ -30,6 +30,10 @@ typedef void (*LogicalOutputPluginWriterUpdateProgress) (struct LogicalDecodingC
 														 bool skipped_xact
 );
 
+typedef void (*LogicalOutputPluginDelay) (struct LogicalDecodingContext *lr,
+										  int32 min_apply_delay
+);
+
 typedef struct LogicalDecodingContext
 {
 	/* memory context this is all allocated in */
@@ -64,6 +68,7 @@ typedef struct LogicalDecodingContext
 	LogicalOutputPluginWriterPrepareWrite prepare_write;
 	LogicalOutputPluginWriterWrite write;
 	LogicalOutputPluginWriterUpdateProgress update_progress;
+	LogicalOutputPluginDelay delay;
 
 	/*
 	 * Output buffer.
@@ -121,14 +126,16 @@ extern LogicalDecodingContext *CreateInitDecodingContext(const char *plugin,
 														 XLogReaderRoutine *xl_routine,
 														 LogicalOutputPluginWriterPrepareWrite prepare_write,
 														 LogicalOutputPluginWriterWrite do_write,
-														 LogicalOutputPluginWriterUpdateProgress update_progress);
+														 LogicalOutputPluginWriterUpdateProgress update_progress,
+														 LogicalOutputPluginDelay delay);
 extern LogicalDecodingContext *CreateDecodingContext(XLogRecPtr start_lsn,
 													 List *output_plugin_options,
 													 bool fast_forward,
 													 XLogReaderRoutine *xl_routine,
 													 LogicalOutputPluginWriterPrepareWrite prepare_write,
 													 LogicalOutputPluginWriterWrite do_write,
-													 LogicalOutputPluginWriterUpdateProgress update_progress);
+													 LogicalOutputPluginWriterUpdateProgress update_progress,
+													 LogicalOutputPluginDelay delay);
 extern void DecodingContextFindStartpoint(LogicalDecodingContext *ctx);
 extern bool DecodingContextReady(LogicalDecodingContext *ctx);
 extern void FreeDecodingContext(LogicalDecodingContext *ctx);
diff --git a/src/include/replication/output_plugin.h b/src/include/replication/output_plugin.h
index 2d89d26586..a65ff2d241 100644
--- a/src/include/replication/output_plugin.h
+++ b/src/include/replication/output_plugin.h
@@ -246,5 +246,6 @@ typedef struct OutputPluginCallbacks
 extern void OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write);
 extern void OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write);
 extern void OutputPluginUpdateProgress(struct LogicalDecodingContext *ctx, bool skipped_xact);
+extern void OutputPluginDelay(struct LogicalDecodingContext *ctx, int32 min_apply_delay);
 
 #endif							/* OUTPUT_PLUGIN_H */
diff --git a/src/include/replication/pgoutput.h b/src/include/replication/pgoutput.h
index b4a8015403..dc9a70f95e 100644
--- a/src/include/replication/pgoutput.h
+++ b/src/include/replication/pgoutput.h
@@ -30,6 +30,7 @@ typedef struct PGOutputData
 	bool		messages;
 	bool		two_phase;
 	char	   *origin;
+	int32		min_apply_delay;
 } PGOutputData;
 
 #endif							/* PGOUTPUT_H */
diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h
index decffe352d..464cad0e3b 100644
--- a/src/include/replication/walreceiver.h
+++ b/src/include/replication/walreceiver.h
@@ -187,6 +187,7 @@ typedef struct
 									 * prepare time */
 			char	   *origin; /* Only publish data originating from the
 								 * specified origin */
+			int32		min_apply_delay;	/* The minimum apply delay */
 		}			logical;
 	}			proto;
 } WalRcvStreamOptions;
diff --git a/src/test/regress/expected/subscription.out b/src/test/regress/expected/subscription.out
index 3f99b14394..2594395c73 100644
--- a/src/test/regress/expected/subscription.out
+++ b/src/test/regress/expected/subscription.out
@@ -114,18 +114,18 @@ CREATE SUBSCRIPTION regress_testsub4 CONNECTION 'dbname=regress_doesnotexist' PU
 WARNING:  subscription was created, but is not connected
 HINT:  To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription.
 \dRs+ regress_testsub4
-                                                                                         List of subscriptions
-       Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |          Conninfo           | Skip LSN 
-------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
- regress_testsub4 | regress_subscription_user | f       | {testpub}   | f      | off       | d                | f                | none   | off                | dbname=regress_doesnotexist | 0/0
+                                                                                                  List of subscriptions
+       Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Min apply delay | Synchronous commit |          Conninfo           | Skip LSN 
+------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-----------------+--------------------+-----------------------------+----------
+ regress_testsub4 | regress_subscription_user | f       | {testpub}   | f      | off       | d                | f                | none   |               0 | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub4 SET (origin = any);
 \dRs+ regress_testsub4
-                                                                                         List of subscriptions
-       Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |          Conninfo           | Skip LSN 
-------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
- regress_testsub4 | regress_subscription_user | f       | {testpub}   | f      | off       | d                | f                | any    | off                | dbname=regress_doesnotexist | 0/0
+                                                                                                  List of subscriptions
+       Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Min apply delay | Synchronous commit |          Conninfo           | Skip LSN 
+------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-----------------+--------------------+-----------------------------+----------
+ regress_testsub4 | regress_subscription_user | f       | {testpub}   | f      | off       | d                | f                | any    |               0 | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 DROP SUBSCRIPTION regress_testsub3;
@@ -143,10 +143,10 @@ ALTER SUBSCRIPTION regress_testsub CONNECTION 'foobar';
 ERROR:  invalid connection string syntax: missing "=" after "foobar" in connection info string
 
 \dRs+
-                                                                                         List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |          Conninfo           | Skip LSN 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off       | d                | f                | any    | off                | dbname=regress_doesnotexist | 0/0
+                                                                                                  List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Min apply delay | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-----------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off       | d                | f                | any    |               0 | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET PUBLICATION testpub2, testpub3 WITH (refresh = false);
@@ -163,10 +163,10 @@ ERROR:  unrecognized subscription parameter: "create_slot"
 -- ok
 ALTER SUBSCRIPTION regress_testsub SKIP (lsn = '0/12345');
 \dRs+
-                                                                                             List of subscriptions
-      Name       |           Owner           | Enabled |     Publication     | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |           Conninfo           | Skip LSN 
------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+--------------------+------------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub2,testpub3} | f      | off       | d                | f                | any    | off                | dbname=regress_doesnotexist2 | 0/12345
+                                                                                                      List of subscriptions
+      Name       |           Owner           | Enabled |     Publication     | Binary | Streaming | Two-phase commit | Disable on error | Origin | Min apply delay | Synchronous commit |           Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-----------------+--------------------+------------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub2,testpub3} | f      | off       | d                | f                | any    |               0 | off                | dbname=regress_doesnotexist2 | 0/12345
 (1 row)
 
 -- ok - with lsn = NONE
@@ -175,10 +175,10 @@ ALTER SUBSCRIPTION regress_testsub SKIP (lsn = NONE);
 ALTER SUBSCRIPTION regress_testsub SKIP (lsn = '0/0');
 ERROR:  invalid WAL location (LSN): 0/0
 \dRs+
-                                                                                             List of subscriptions
-      Name       |           Owner           | Enabled |     Publication     | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |           Conninfo           | Skip LSN 
------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+--------------------+------------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub2,testpub3} | f      | off       | d                | f                | any    | off                | dbname=regress_doesnotexist2 | 0/0
+                                                                                                      List of subscriptions
+      Name       |           Owner           | Enabled |     Publication     | Binary | Streaming | Two-phase commit | Disable on error | Origin | Min apply delay | Synchronous commit |           Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-----------------+--------------------+------------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub2,testpub3} | f      | off       | d                | f                | any    |               0 | off                | dbname=regress_doesnotexist2 | 0/0
 (1 row)
 
 BEGIN;
@@ -210,10 +210,10 @@ ALTER SUBSCRIPTION regress_testsub_foo SET (synchronous_commit = foobar);
 ERROR:  invalid value for parameter "synchronous_commit": "foobar"
 HINT:  Available values: local, remote_write, remote_apply, on, off.
 \dRs+
-                                                                                               List of subscriptions
-        Name         |           Owner           | Enabled |     Publication     | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |           Conninfo           | Skip LSN 
----------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+--------------------+------------------------------+----------
- regress_testsub_foo | regress_subscription_user | f       | {testpub2,testpub3} | f      | off       | d                | f                | any    | local              | dbname=regress_doesnotexist2 | 0/0
+                                                                                                        List of subscriptions
+        Name         |           Owner           | Enabled |     Publication     | Binary | Streaming | Two-phase commit | Disable on error | Origin | Min apply delay | Synchronous commit |           Conninfo           | Skip LSN 
+---------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-----------------+--------------------+------------------------------+----------
+ regress_testsub_foo | regress_subscription_user | f       | {testpub2,testpub3} | f      | off       | d                | f                | any    |               0 | local              | dbname=regress_doesnotexist2 | 0/0
 (1 row)
 
 -- rename back to keep the rest simple
@@ -247,19 +247,19 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB
 WARNING:  subscription was created, but is not connected
 HINT:  To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription.
 \dRs+
-                                                                                         List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |          Conninfo           | Skip LSN 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub}   | t      | off       | d                | f                | any    | off                | dbname=regress_doesnotexist | 0/0
+                                                                                                  List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Min apply delay | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-----------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | t      | off       | d                | f                | any    |               0 | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (binary = false);
 ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
 \dRs+
-                                                                                         List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |          Conninfo           | Skip LSN 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off       | d                | f                | any    | off                | dbname=regress_doesnotexist | 0/0
+                                                                                                  List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Min apply delay | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-----------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off       | d                | f                | any    |               0 | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 DROP SUBSCRIPTION regress_testsub;
@@ -271,27 +271,27 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB
 WARNING:  subscription was created, but is not connected
 HINT:  To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription.
 \dRs+
-                                                                                         List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |          Conninfo           | Skip LSN 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | on        | d                | f                | any    | off                | dbname=regress_doesnotexist | 0/0
+                                                                                                  List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Min apply delay | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-----------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | on        | d                | f                | any    |               0 | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (streaming = parallel);
 \dRs+
-                                                                                         List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |          Conninfo           | Skip LSN 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | parallel  | d                | f                | any    | off                | dbname=regress_doesnotexist | 0/0
+                                                                                                  List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Min apply delay | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-----------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | parallel  | d                | f                | any    |               0 | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (streaming = false);
 ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
 \dRs+
-                                                                                         List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |          Conninfo           | Skip LSN 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off       | d                | f                | any    | off                | dbname=regress_doesnotexist | 0/0
+                                                                                                  List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Min apply delay | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-----------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off       | d                | f                | any    |               0 | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 -- fail - publication already exists
@@ -306,10 +306,10 @@ ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub1, testpub2 WITH (refr
 ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub1, testpub2 WITH (refresh = false);
 ERROR:  publication "testpub1" is already in subscription "regress_testsub"
 \dRs+
-                                                                                                 List of subscriptions
-      Name       |           Owner           | Enabled |         Publication         | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |          Conninfo           | Skip LSN 
------------------+---------------------------+---------+-----------------------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub,testpub1,testpub2} | f      | off       | d                | f                | any    | off                | dbname=regress_doesnotexist | 0/0
+                                                                                                          List of subscriptions
+      Name       |           Owner           | Enabled |         Publication         | Binary | Streaming | Two-phase commit | Disable on error | Origin | Min apply delay | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-----------------------------+--------+-----------+------------------+------------------+--------+-----------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub,testpub1,testpub2} | f      | off       | d                | f                | any    |               0 | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 -- fail - publication used more than once
@@ -324,10 +324,10 @@ ERROR:  publication "testpub3" is not in subscription "regress_testsub"
 -- ok - delete publications
 ALTER SUBSCRIPTION regress_testsub DROP PUBLICATION testpub1, testpub2 WITH (refresh = false);
 \dRs+
-                                                                                         List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |          Conninfo           | Skip LSN 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off       | d                | f                | any    | off                | dbname=regress_doesnotexist | 0/0
+                                                                                                  List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Min apply delay | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-----------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off       | d                | f                | any    |               0 | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 DROP SUBSCRIPTION regress_testsub;
@@ -363,10 +363,10 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB
 WARNING:  subscription was created, but is not connected
 HINT:  To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription.
 \dRs+
-                                                                                         List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |          Conninfo           | Skip LSN 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off       | p                | f                | any    | off                | dbname=regress_doesnotexist | 0/0
+                                                                                                  List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Min apply delay | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-----------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off       | p                | f                | any    |               0 | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 --fail - alter of two_phase option not supported.
@@ -375,10 +375,10 @@ ERROR:  unrecognized subscription parameter: "two_phase"
 -- but can alter streaming when two_phase enabled
 ALTER SUBSCRIPTION regress_testsub SET (streaming = true);
 \dRs+
-                                                                                         List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |          Conninfo           | Skip LSN 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | on        | p                | f                | any    | off                | dbname=regress_doesnotexist | 0/0
+                                                                                                  List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Min apply delay | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-----------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | on        | p                | f                | any    |               0 | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
@@ -388,10 +388,10 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB
 WARNING:  subscription was created, but is not connected
 HINT:  To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription.
 \dRs+
-                                                                                         List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |          Conninfo           | Skip LSN 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | on        | p                | f                | any    | off                | dbname=regress_doesnotexist | 0/0
+                                                                                                  List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Min apply delay | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-----------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | on        | p                | f                | any    |               0 | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
@@ -404,18 +404,45 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB
 WARNING:  subscription was created, but is not connected
 HINT:  To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription.
 \dRs+
-                                                                                         List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |          Conninfo           | Skip LSN 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off       | d                | f                | any    | off                | dbname=regress_doesnotexist | 0/0
+                                                                                                  List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Min apply delay | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-----------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off       | d                | f                | any    |               0 | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (disable_on_error = true);
 \dRs+
-                                                                                         List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |          Conninfo           | Skip LSN 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off       | d                | t                | any    | off                | dbname=regress_doesnotexist | 0/0
+                                                                                                  List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Min apply delay | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-----------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off       | d                | t                | any    |               0 | off                | dbname=regress_doesnotexist | 0/0
+(1 row)
+
+ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
+DROP SUBSCRIPTION regress_testsub;
+-- fail -- min_apply_delay must be a non-negative integer
+CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexit' PUBLICATION testpub WITH (connect = false, min_apply_delay = foo);
+ERROR:  invalid value for parameter "min_apply_delay": "foo"
+CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexit' PUBLICATION testpub WITH (connect = false, min_apply_delay = -1);
+ERROR:  -1 ms is outside the valid range for parameter "min_apply_delay" (0 .. 2147483647)
+-- success -- min_apply_delay value without unit is take as milliseconds
+CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexit' PUBLICATION testpub WITH (connect = false, min_apply_delay = 123);
+WARNING:  subscription was created, but is not connected
+HINT:  To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription.
+\dRs+
+                                                                                                 List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Min apply delay | Synchronous commit |          Conninfo          | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-----------------+--------------------+----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off       | d                | f                | any    |             123 | off                | dbname=regress_doesnotexit | 0/0
+(1 row)
+
+-- success -- min_apply_delay value with unit is converted into ms and stored as an integer
+ALTER SUBSCRIPTION regress_testsub SET (min_apply_delay = '1 d');
+\dRs+
+                                                                                                 List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Min apply delay | Synchronous commit |          Conninfo          | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-----------------+--------------------+----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off       | d                | f                | any    |        86400000 | off                | dbname=regress_doesnotexit | 0/0
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
diff --git a/src/test/regress/sql/subscription.sql b/src/test/regress/sql/subscription.sql
index 7281f5fee2..2396fbeedb 100644
--- a/src/test/regress/sql/subscription.sql
+++ b/src/test/regress/sql/subscription.sql
@@ -286,6 +286,21 @@ ALTER SUBSCRIPTION regress_testsub SET (disable_on_error = true);
 ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
 DROP SUBSCRIPTION regress_testsub;
 
+-- fail -- min_apply_delay must be a non-negative integer
+CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexit' PUBLICATION testpub WITH (connect = false, min_apply_delay = foo);
+CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexit' PUBLICATION testpub WITH (connect = false, min_apply_delay = -1);
+
+-- success -- min_apply_delay value without unit is take as milliseconds
+CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexit' PUBLICATION testpub WITH (connect = false, min_apply_delay = 123);
+\dRs+
+
+-- success -- min_apply_delay value with unit is converted into ms and stored as an integer
+ALTER SUBSCRIPTION regress_testsub SET (min_apply_delay = '1 d');
+\dRs+
+
+ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
+DROP SUBSCRIPTION regress_testsub;
+
 RESET SESSION AUTHORIZATION;
 DROP ROLE regress_subscription_user;
 DROP ROLE regress_subscription_user2;
diff --git a/src/test/subscription/t/001_rep_changes.pl b/src/test/subscription/t/001_rep_changes.pl
index 91aa068c95..75fd77b891 100644
--- a/src/test/subscription/t/001_rep_changes.pl
+++ b/src/test/subscription/t/001_rep_changes.pl
@@ -515,6 +515,34 @@ $node_publisher->poll_query_until('postgres',
   or die
   "Timed out while waiting for apply to restart after renaming SUBSCRIPTION";
 
+# Test time-delayed logical replication
+#
+# If the subscription sets min_apply_delay parameter, the logical replication
+# worker will delay the transaction apply for min_apply_delay milliseconds. We
+# verify this by looking at the time difference between a) when tuples are
+# inserted on the publisher, and b) when those changes are replicated on the
+# subscriber. Even on slow machines, this strategy will give predictable behavior.
+
+# Set min_apply_delay parameter to 3 seconds
+my $delay = 3;
+$node_subscriber->safe_psql('postgres',
+	"ALTER SUBSCRIPTION tap_sub_renamed SET (min_apply_delay = '${delay}s')");
+
+# Before doing the insertion, get the current timestamp that will be
+# used as a comparison base.
+my $publisher_insert_time = time();
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO tab_ins VALUES (generate_series(1101, 1120))");
+
+# The publisher waits for the replication to complete
+$node_publisher->wait_for_catchup('tap_sub_renamed');
+
+# This test is successful if and only if the LSN has been applied with at least
+# the configured apply delay.
+ok( time() - $publisher_insert_time >= $delay,
+	"subscriber applies WAL only after replication delay for non-streaming transaction"
+);
+
 # check all the cleanup
 $node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub_renamed");
 
-- 
2.27.0

