From 2e21bc932b683e44f2a914fa8691a7b17f2fbf31 Mon Sep 17 00:00:00 2001
From: Hayato Kuroda <kuroda.hayato@fujitsu.com>
Date: Thu, 16 Feb 2023 07:52:23 +0000
Subject: [PATCH v10] Time-delayed logical replication on publisher side

Similar to physical replication, a time-delayed copy of the data for logical
replication can be useful in certain scenarios, particularly to fix errors that
may cause data loss.

This patch introduces a new subscription parameter called 'min_send_delay'. If
this parameter is set, the apply worker (via walrcv_startstreaming) passes the
value to the publisher as an output plugin option. The walsender will then delay
sending the transaction for the given number of milliseconds. It is important to
note that this delay does not take into account the time spent transferring the
transaction, which means that the arrival time at the subscriber may be delayed
further.

The delay occurs before we start sending the transaction on the publisher, and
both regular and prepared transactions are covered. Streamed transactions are
also covered.

The combination of parallel streaming mode and min_send_delay is not allowed.
There are two reasons for this:

1. In parallel streaming mode, we start applying the transaction stream as soon
as the first change arrives, without knowing the transaction's prepare/commit
time. Always waiting for the full 'min_send_delay' period may result in
unnecessary delay.

2. For parallel streaming, the transaction will be opened immediately by the
parallel apply worker. Therefore, if the walsender is delayed in sending the
final record of the transaction, the parallel apply worker must wait to receive
it with an open transaction. This would result in the locks acquired during the
transaction not being released until the min_send_delay has elapsed.

Earlier versions were written by Euler Taveira, Takamichi Osumi, and Kuroda Hayato

Author: Kuroda Hayato, Takamichi Osumi
Reviewed-by: Amit Kapila, Peter Smith, Vignesh C, Shveta Malik,
             Kyotaro Horiguchi, Shi Yu, Wang Wei, Dilip Kumar, Melih Mutlu,
             Andres Freund

\# Please enter the commit message for your changes. Lines starting
---
 doc/src/sgml/catalogs.sgml                    |  10 +
 doc/src/sgml/glossary.sgml                    |  15 ++
 doc/src/sgml/logical-replication.sgml         |   6 +
 doc/src/sgml/monitoring.sgml                  |   5 +
 doc/src/sgml/ref/alter_subscription.sgml      |   5 +-
 doc/src/sgml/ref/create_subscription.sgml     |  68 ++++++-
 src/backend/catalog/pg_subscription.c         |   1 +
 src/backend/catalog/system_views.sql          |   7 +-
 src/backend/commands/subscriptioncmds.c       | 116 ++++++++++-
 .../libpqwalreceiver/libpqwalreceiver.c       |   5 +
 src/backend/replication/logical/decode.c      |   9 +
 src/backend/replication/logical/logical.c     |  18 +-
 .../replication/logical/logicalfuncs.c        |   2 +-
 src/backend/replication/logical/worker.c      |  14 +-
 src/backend/replication/pgoutput/pgoutput.c   |  33 ++++
 src/backend/replication/slotfuncs.c           |   4 +-
 src/backend/replication/walsender.c           |  93 ++++++++-
 src/backend/utils/activity/wait_event.c       |   3 +
 src/bin/pg_dump/pg_dump.c                     |  15 +-
 src/bin/pg_dump/pg_dump.h                     |   1 +
 src/bin/psql/describe.c                       |   9 +-
 src/bin/psql/tab-complete.c                   |   4 +-
 src/include/catalog/pg_subscription.h         |   3 +
 src/include/replication/logical.h             |  18 +-
 src/include/replication/pgoutput.h            |   1 +
 src/include/replication/walreceiver.h         |   1 +
 src/include/utils/wait_event.h                |   3 +-
 src/test/regress/expected/subscription.out    | 185 +++++++++++-------
 src/test/regress/sql/subscription.sql         |  29 +++
 src/test/subscription/t/001_rep_changes.pl    |  27 +++
 30 files changed, 606 insertions(+), 104 deletions(-)

diff --git a/doc/src/sgml/catalogs.sgml b/doc/src/sgml/catalogs.sgml
index c1e4048054..ce6521fe6c 100644
--- a/doc/src/sgml/catalogs.sgml
+++ b/doc/src/sgml/catalogs.sgml
@@ -7892,6 +7892,16 @@ SCRAM-SHA-256$<replaceable>&lt;iteration count&gt;</replaceable>:<replaceable>&l
       </para></entry>
      </row>
 
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>subminsenddelay</structfield> <type>int4</type>
+      </para>
+      <para>
+       The minimum delay, in milliseconds, by the publisher before sending all
+       the changes
+      </para></entry>
+     </row>
+
      <row>
       <entry role="catalog_table_entry"><para role="column_definition">
        <structfield>subenabled</structfield> <type>bool</type>
diff --git a/doc/src/sgml/glossary.sgml b/doc/src/sgml/glossary.sgml
index 7c01a541fe..9ede9d05f6 100644
--- a/doc/src/sgml/glossary.sgml
+++ b/doc/src/sgml/glossary.sgml
@@ -1729,6 +1729,21 @@
    </glossdef>
   </glossentry>
 
+  <glossentry id="glossary-time-delayed-replication">
+   <glossterm>Time-delayed replication</glossterm>
+   <glossdef>
+    <para>
+     Replication setup that delays the application of changes by a specified
+     minimum time-delay period.
+    </para>
+    <para>
+     For more information, see
+     <xref linkend="guc-recovery-min-apply-delay"/> for physical replication
+     and <xref linkend="sql-createsubscription"/> for logical replication.
+    </para>
+   </glossdef>
+  </glossentry>
+
   <glossentry id="glossary-toast">
    <glossterm>TOAST</glossterm>
    <glossdef>
diff --git a/doc/src/sgml/logical-replication.sgml b/doc/src/sgml/logical-replication.sgml
index 1bd5660c87..fe9e7f7b26 100644
--- a/doc/src/sgml/logical-replication.sgml
+++ b/doc/src/sgml/logical-replication.sgml
@@ -247,6 +247,12 @@
    target table.
   </para>
 
+  <para>
+   A subscription can delay the receipt of changes by specifying the
+   <literal>min_send_delay</literal> subscription parameter. See
+   <xref linkend="sql-createsubscription"/> for details.
+  </para>
+
   <sect2 id="logical-replication-subscription-slot">
    <title>Replication Slot Management</title>
 
diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml
index 6249bb50d0..7c47723153 100644
--- a/doc/src/sgml/monitoring.sgml
+++ b/doc/src/sgml/monitoring.sgml
@@ -2349,6 +2349,11 @@ postgres   27093  0.0  0.0  30096  2752 ?        Ss   11:34   0:00 postgres: ser
       <entry>Waiting to acquire an exclusive lock to truncate off any
        empty pages at the end of a table vacuumed.</entry>
      </row>
+     <row>
+      <entry><literal>WalSenderSendDelay</literal></entry>
+      <entry>Waiting while sending changes for time-delayed logical replication
+      in the WAL sender process.</entry>
+     </row>
     </tbody>
    </tgroup>
   </table>
diff --git a/doc/src/sgml/ref/alter_subscription.sgml b/doc/src/sgml/ref/alter_subscription.sgml
index 964fcbb8ff..3f238b958b 100644
--- a/doc/src/sgml/ref/alter_subscription.sgml
+++ b/doc/src/sgml/ref/alter_subscription.sgml
@@ -213,8 +213,9 @@ ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> RENAME TO <
       are <literal>slot_name</literal>,
       <literal>synchronous_commit</literal>,
       <literal>binary</literal>, <literal>streaming</literal>,
-      <literal>disable_on_error</literal>, and
-      <literal>origin</literal>.
+      <literal>disable_on_error</literal>,
+      <literal>origin</literal>, and
+      <literal>min_send_delay</literal>.
      </para>
     </listitem>
    </varlistentry>
diff --git a/doc/src/sgml/ref/create_subscription.sgml b/doc/src/sgml/ref/create_subscription.sgml
index 51c45f17c7..bfb73d9990 100644
--- a/doc/src/sgml/ref/create_subscription.sgml
+++ b/doc/src/sgml/ref/create_subscription.sgml
@@ -349,7 +349,68 @@ CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceabl
          </para>
         </listitem>
        </varlistentry>
-      </variablelist></para>
+
+       <varlistentry>
+        <term><literal>min_send_delay</literal> (<type>integer</type>)</term>
+        <listitem>
+         <para>
+          By default, the publisher sends changes as soon as possible. This
+          parameter allows the user to delay changes by the given time period.
+          If the value is specified without units, it is taken as milliseconds.
+          The default is zero (no delay). See <xref linkend="config-setting-names-values"/>
+          for details on the available valid time units.
+         </para>
+         <para>
+          The delay is effective only when the initial table synchronization
+          has been finished. However, there is a possibility that the table
+          status updated in <link linkend="catalog-pg-subscription-rel"><structname>pg_subscription_rel</structname></link>
+          could be delayed in getting to the "ready" state, and also two-phase
+          (if specified) could be delayed in getting to "enabled".
+         </para>
+         <para>
+          The delay does not take into account the overhead of time spent
+          transferring the transaction. Therefore, the arrival time at the
+          subscriber may be delayed more than the specified
+          <literal>min_send_delay</literal> time.
+         </para>
+         <warning>
+           <para>
+            Delaying the replication means there is a much longer time between
+            making a change on the publisher, and that change being committed
+            on the subscriber. This can impact the performance of synchronous
+            replication. See <xref linkend="guc-synchronous-commit"/>
+            parameter.
+           </para>
+         </warning>
+         <caution>
+           <para>
+            While delaying the sending of changes, there is a possibility that
+            WAL segment files cannot be recycled, which can eventually lead to
+            exceeding the available disk space on the publisher. This is
+            because the replication slot related to the subscription regards
+            these WALs as needed until they are sent to the subscriber and
+            flushed. To avoid this, the <varname>max_slot_wal_keep_size</varname>
+            on the publisher and <literal>min_send_delay</literal> must be
+            tuned, and these values depend on the machine environment and
+            workload. There are several methods to check the amount of delayed
+            WALs, and a typical way is to calculate it from the
+            <link linkend="view-pg-replication-slots"><structname>pg_replication_slots</structname></link>
+            system view and <function>pg_current_wal_lsn</function>. Following
+            shows the amounts of WALs that are delayed the sending in bytes.
+<programlisting>
+SELECT pg_current_wal_lsn() - pg_replication_slots.restart_lsn
+       AS "amount of delaying WALs" FROM pg_replication_slots;
+ amount of delaying WALs
+-------------------------
+              1688964992
+(1 row)
+</programlisting>
+           </para>
+         </caution>
+        </listitem>
+       </varlistentry>
+      </variablelist>
+     </para>
 
     </listitem>
    </varlistentry>
@@ -420,6 +481,11 @@ CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceabl
    published with different column lists are not supported.
   </para>
 
+  <para>
+   A non-zero <literal>min_send_delay</literal> parameter is not allowed when
+   streaming in parallel mode.
+  </para>
+
   <para>
    We allow non-existent publications to be specified so that users can add
    those later. This means
diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c
index a56ae311c3..63a10b06d1 100644
--- a/src/backend/catalog/pg_subscription.c
+++ b/src/backend/catalog/pg_subscription.c
@@ -66,6 +66,7 @@ GetSubscription(Oid subid, bool missing_ok)
 	sub->skiplsn = subform->subskiplsn;
 	sub->name = pstrdup(NameStr(subform->subname));
 	sub->owner = subform->subowner;
+	sub->minsenddelay = subform->subminsenddelay;
 	sub->enabled = subform->subenabled;
 	sub->binary = subform->subbinary;
 	sub->stream = subform->substream;
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index 34ca0e739f..54a705d71b 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -1314,9 +1314,10 @@ REVOKE ALL ON pg_replication_origin_status FROM public;
 
 -- All columns of pg_subscription except subconninfo are publicly readable.
 REVOKE ALL ON pg_subscription FROM public;
-GRANT SELECT (oid, subdbid, subskiplsn, subname, subowner, subenabled,
-              subbinary, substream, subtwophasestate, subdisableonerr,
-              subslotname, subsynccommit, subpublications, suborigin)
+GRANT SELECT (oid, subdbid, subskiplsn, subname, subowner, subminsenddelay,
+              subenabled, subbinary, substream, subtwophasestate,
+              subdisableonerr, subslotname, subsynccommit, subpublications,
+              suborigin)
     ON pg_subscription TO public;
 
 CREATE VIEW pg_stat_subscription_stats AS
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 464db6d247..875709dcda 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -66,6 +66,7 @@
 #define SUBOPT_DISABLE_ON_ERR		0x00000400
 #define SUBOPT_LSN					0x00000800
 #define SUBOPT_ORIGIN				0x00001000
+#define SUBOPT_MIN_SEND_DELAY		0x00002000
 
 /* check if the 'val' has 'bits' set */
 #define IsSet(val, bits)  (((val) & (bits)) == (bits))
@@ -90,6 +91,7 @@ typedef struct SubOpts
 	bool		disableonerr;
 	char	   *origin;
 	XLogRecPtr	lsn;
+	int32		min_send_delay;
 } SubOpts;
 
 static List *fetch_table_list(WalReceiverConn *wrconn, List *publications);
@@ -100,7 +102,7 @@ static void check_publications_origin(WalReceiverConn *wrconn,
 static void check_duplicates_in_publist(List *publist, Datum *datums);
 static List *merge_publications(List *oldpublist, List *newpublist, bool addpub, const char *subname);
 static void ReportSlotConnectionError(List *rstates, Oid subid, char *slotname, char *err);
-
+static int32 defGetMinSendDelay(DefElem *def);
 
 /*
  * Common option parsing function for CREATE and ALTER SUBSCRIPTION commands.
@@ -146,6 +148,8 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
 		opts->disableonerr = false;
 	if (IsSet(supported_opts, SUBOPT_ORIGIN))
 		opts->origin = pstrdup(LOGICALREP_ORIGIN_ANY);
+	if (IsSet(supported_opts, SUBOPT_MIN_SEND_DELAY))
+		opts->min_send_delay = 0;
 
 	/* Parse options */
 	foreach(lc, stmt_options)
@@ -324,6 +328,15 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
 			opts->specified_opts |= SUBOPT_LSN;
 			opts->lsn = lsn;
 		}
+		else if (IsSet(supported_opts, SUBOPT_MIN_SEND_DELAY) &&
+				 strcmp(defel->defname, "min_send_delay") == 0)
+		{
+			if (IsSet(opts->specified_opts, SUBOPT_MIN_SEND_DELAY))
+				errorConflictingDefElem(defel, pstate);
+
+			opts->specified_opts |= SUBOPT_MIN_SEND_DELAY;
+			opts->min_send_delay = defGetMinSendDelay(defel);
+		}
 		else
 			ereport(ERROR,
 					(errcode(ERRCODE_SYNTAX_ERROR),
@@ -404,6 +417,30 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
 								"slot_name = NONE", "create_slot = false")));
 		}
 	}
+
+	/*
+	 * The combination of parallel streaming mode and min_send_delay is not
+	 * allowed. This is because in parallel streaming mode, the walsender
+	 * starts sending the transaction stream without knowing the
+	 * prepare/commit time of the transaction. Always waiting for the full
+	 * 'min_send_delay' time to send may introduce unnecessary delay.
+	 *
+	 * The other possibility was to wait sending COMMIT record of the parallel
+	 * apply transaction but that would cause issues related to resource bloat
+	 * and locks being held for a long time.
+	 */
+	if (IsSet(supported_opts, SUBOPT_MIN_SEND_DELAY) &&
+		opts->min_send_delay > 0 &&
+		opts->streaming == LOGICALREP_STREAM_PARALLEL)
+		ereport(ERROR,
+				errcode(ERRCODE_SYNTAX_ERROR),
+
+		/*
+		 * translator: the first %s is a string of the form "parameter > 0"
+		 * and the second one is "option = value".
+		 */
+				errmsg("%s and %s are mutually exclusive options",
+					   "min_send_delay > 0", "streaming = parallel"));
 }
 
 /*
@@ -560,7 +597,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 					  SUBOPT_SLOT_NAME | SUBOPT_COPY_DATA |
 					  SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY |
 					  SUBOPT_STREAMING | SUBOPT_TWOPHASE_COMMIT |
-					  SUBOPT_DISABLE_ON_ERR | SUBOPT_ORIGIN);
+					  SUBOPT_DISABLE_ON_ERR | SUBOPT_ORIGIN |
+					  SUBOPT_MIN_SEND_DELAY);
 	parse_subscription_options(pstate, stmt->options, supported_opts, &opts);
 
 	/*
@@ -628,6 +666,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 	values[Anum_pg_subscription_subname - 1] =
 		DirectFunctionCall1(namein, CStringGetDatum(stmt->subname));
 	values[Anum_pg_subscription_subowner - 1] = ObjectIdGetDatum(owner);
+	values[Anum_pg_subscription_subminsenddelay - 1] = Int32GetDatum(opts.min_send_delay);
 	values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(opts.enabled);
 	values[Anum_pg_subscription_subbinary - 1] = BoolGetDatum(opts.binary);
 	values[Anum_pg_subscription_substream - 1] = CharGetDatum(opts.streaming);
@@ -1054,7 +1093,7 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 				supported_opts = (SUBOPT_SLOT_NAME |
 								  SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY |
 								  SUBOPT_STREAMING | SUBOPT_DISABLE_ON_ERR |
-								  SUBOPT_ORIGIN);
+								  SUBOPT_ORIGIN | SUBOPT_MIN_SEND_DELAY);
 
 				parse_subscription_options(pstate, stmt->options,
 										   supported_opts, &opts);
@@ -1098,6 +1137,19 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 
 				if (IsSet(opts.specified_opts, SUBOPT_STREAMING))
 				{
+					/*
+					 * The combination of parallel streaming mode and
+					 * min_send_delay is not allowed. See
+					 * parse_subscription_options.
+					 */
+					if (opts.streaming == LOGICALREP_STREAM_PARALLEL &&
+						!IsSet(opts.specified_opts, SUBOPT_MIN_SEND_DELAY) &&
+						sub->minsenddelay > 0)
+						ereport(ERROR,
+								errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+								errmsg("cannot set parallel streaming mode for subscription with %s",
+									   "min_send_delay"));
+
 					values[Anum_pg_subscription_substream - 1] =
 						CharGetDatum(opts.streaming);
 					replaces[Anum_pg_subscription_substream - 1] = true;
@@ -1111,6 +1163,26 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 						= true;
 				}
 
+				if (IsSet(opts.specified_opts, SUBOPT_MIN_SEND_DELAY))
+				{
+					/*
+					 * The combination of parallel streaming mode and
+					 * min_send_delay is not allowed. See
+					 * parse_subscription_options.
+					 */
+					if (opts.min_send_delay > 0 &&
+						!IsSet(opts.specified_opts, SUBOPT_STREAMING) &&
+						sub->stream == LOGICALREP_STREAM_PARALLEL)
+						ereport(ERROR,
+								errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+								errmsg("cannot set %s for subscription in parallel streaming mode",
+									   "min_send_delay"));
+
+					values[Anum_pg_subscription_subminsenddelay - 1] =
+						Int32GetDatum(opts.min_send_delay);
+					replaces[Anum_pg_subscription_subminsenddelay - 1] = true;
+				}
+
 				if (IsSet(opts.specified_opts, SUBOPT_ORIGIN))
 				{
 					values[Anum_pg_subscription_suborigin - 1] =
@@ -2195,3 +2267,41 @@ defGetStreamingMode(DefElem *def)
 					def->defname)));
 	return LOGICALREP_STREAM_OFF;	/* keep compiler quiet */
 }
+
+/*
+ * Extract the min_send_delay value from a DefElem. This is very similar to
+ * parse_and_validate_value() for integer values, because min_send_delay
+ * accepts the same parameter format as recovery_min_apply_delay.
+ */
+static int32
+defGetMinSendDelay(DefElem *def)
+{
+	char	   *input_string;
+	int			result;
+	const char *hintmsg;
+
+	input_string = defGetString(def);
+
+	/* Parse given string as parameter which has millisecond unit */
+	if (!parse_int(input_string, &result, GUC_UNIT_MS, &hintmsg))
+		ereport(ERROR,
+				errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+				errmsg("invalid value for parameter \"%s\": \"%s\"",
+					   "min_send_delay", input_string),
+				hintmsg ? errhint("%s", _(hintmsg)) : 0);
+
+	/*
+	 * Check both the lower boundary for the valid min_send_delay range and
+	 * the upper boundary as the safeguard for some platforms where INT_MAX is
+	 * wider than int32 respectively. Although parse_int() has confirmed that
+	 * the result is less than or equal to INT_MAX, the value will be stored
+	 * in a catalog column of int32.
+	 */
+	if (result < 0 || result > PG_INT32_MAX)
+		ereport(ERROR,
+				errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+				errmsg("%d ms is outside the valid range for parameter \"%s\" (%d .. %d)",
+					   result, "min_send_delay", 0, PG_INT32_MAX));
+
+	return result;
+}
diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
index 560ec974fa..89a72c1abe 100644
--- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
+++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
@@ -443,6 +443,11 @@ libpqrcv_startstreaming(WalReceiverConn *conn,
 			PQserverVersion(conn->streamConn) >= 140000)
 			appendStringInfoString(&cmd, ", binary 'true'");
 
+		if (options->proto.logical.min_send_delay > 0 &&
+			PQserverVersion(conn->streamConn) >= 160000)
+			appendStringInfo(&cmd, ", min_send_delay '%d'",
+							 options->proto.logical.min_send_delay);
+
 		appendStringInfoChar(&cmd, ')');
 	}
 	else
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index 8fe7bb65f1..60f8f7a892 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -676,6 +676,15 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
 								 buf->origptr, buf->endptr);
 	}
 
+	/*
+	 * Delay sending the changes if required. For streaming transactions, this
+	 * means a delay in sending the last stream but that is OK because on the
+	 * downstream the changes will be applied only after receiving the last
+	 * stream.
+	 */
+	if (ctx->min_send_delay > 0 && ctx->delay_send)
+		ctx->delay_send(ctx, xid, commit_time);
+
 	/*
 	 * Send the final commit record if the transaction data is already
 	 * decoded, otherwise, process the entire transaction.
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index c3ec97a0a6..e4dd822cdc 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -156,7 +156,8 @@ StartupDecodingContext(List *output_plugin_options,
 					   XLogReaderRoutine *xl_routine,
 					   LogicalOutputPluginWriterPrepareWrite prepare_write,
 					   LogicalOutputPluginWriterWrite do_write,
-					   LogicalOutputPluginWriterUpdateProgress update_progress)
+					   LogicalOutputPluginWriterUpdateProgress update_progress,
+					   LogicalOutputPluginWriterDelay delay_send)
 {
 	ReplicationSlot *slot;
 	MemoryContext context,
@@ -293,6 +294,7 @@ StartupDecodingContext(List *output_plugin_options,
 	ctx->prepare_write = prepare_write;
 	ctx->write = do_write;
 	ctx->update_progress = update_progress;
+	ctx->delay_send = delay_send;
 
 	ctx->output_plugin_options = output_plugin_options;
 
@@ -316,7 +318,7 @@ StartupDecodingContext(List *output_plugin_options,
  *		marking WAL reserved beforehand.  In that scenario, it's up to the
  *		caller to guarantee that WAL remains available.
  * xl_routine -- XLogReaderRoutine for underlying XLogReader
- * prepare_write, do_write, update_progress --
+ * prepare_write, do_write, update_progress, delay_send --
  *		callbacks that perform the use-case dependent, actual, work.
  *
  * Needs to be called while in a memory context that's at least as long lived
@@ -334,7 +336,8 @@ CreateInitDecodingContext(const char *plugin,
 						  XLogReaderRoutine *xl_routine,
 						  LogicalOutputPluginWriterPrepareWrite prepare_write,
 						  LogicalOutputPluginWriterWrite do_write,
-						  LogicalOutputPluginWriterUpdateProgress update_progress)
+						  LogicalOutputPluginWriterUpdateProgress update_progress,
+						  LogicalOutputPluginWriterDelay delay_send)
 {
 	TransactionId xmin_horizon = InvalidTransactionId;
 	ReplicationSlot *slot;
@@ -435,7 +438,7 @@ CreateInitDecodingContext(const char *plugin,
 	ctx = StartupDecodingContext(NIL, restart_lsn, xmin_horizon,
 								 need_full_snapshot, false,
 								 xl_routine, prepare_write, do_write,
-								 update_progress);
+								 update_progress, delay_send);
 
 	/* call output plugin initialization callback */
 	old_context = MemoryContextSwitchTo(ctx->context);
@@ -475,7 +478,7 @@ CreateInitDecodingContext(const char *plugin,
  * xl_routine
  *		XLogReaderRoutine used by underlying xlogreader
  *
- * prepare_write, do_write, update_progress
+ * prepare_write, do_write, update_progress, delay_send
  *		callbacks that have to be filled to perform the use-case dependent,
  *		actual work.
  *
@@ -493,7 +496,8 @@ CreateDecodingContext(XLogRecPtr start_lsn,
 					  XLogReaderRoutine *xl_routine,
 					  LogicalOutputPluginWriterPrepareWrite prepare_write,
 					  LogicalOutputPluginWriterWrite do_write,
-					  LogicalOutputPluginWriterUpdateProgress update_progress)
+					  LogicalOutputPluginWriterUpdateProgress update_progress,
+					  LogicalOutputPluginWriterDelay delay_send)
 {
 	LogicalDecodingContext *ctx;
 	ReplicationSlot *slot;
@@ -547,7 +551,7 @@ CreateDecodingContext(XLogRecPtr start_lsn,
 	ctx = StartupDecodingContext(output_plugin_options,
 								 start_lsn, InvalidTransactionId, false,
 								 fast_forward, xl_routine, prepare_write,
-								 do_write, update_progress);
+								 do_write, update_progress, delay_send);
 
 	/* call output plugin initialization callback */
 	old_context = MemoryContextSwitchTo(ctx->context);
diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c
index fa1b641a2b..960025197f 100644
--- a/src/backend/replication/logical/logicalfuncs.c
+++ b/src/backend/replication/logical/logicalfuncs.c
@@ -212,7 +212,7 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
 											   .segment_open = wal_segment_open,
 											   .segment_close = wal_segment_close),
 									LogicalOutputPrepareWrite,
-									LogicalOutputWrite, NULL);
+									LogicalOutputWrite, NULL, NULL);
 
 		/*
 		 * After the sanity checks in CreateDecodingContext, make sure the
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index cfb2ab6248..3cde6d37fa 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -3898,7 +3898,8 @@ maybe_reread_subscription(void)
 		newsub->stream != MySubscription->stream ||
 		strcmp(newsub->origin, MySubscription->origin) != 0 ||
 		newsub->owner != MySubscription->owner ||
-		!equal(newsub->publications, MySubscription->publications))
+		!equal(newsub->publications, MySubscription->publications) ||
+		newsub->minsenddelay != MySubscription->minsenddelay)
 	{
 		if (am_parallel_apply_worker())
 			ereport(LOG,
@@ -4617,9 +4618,20 @@ ApplyWorkerMain(Datum main_arg)
 
 	options.proto.logical.twophase = false;
 	options.proto.logical.origin = pstrdup(MySubscription->origin);
+	options.proto.logical.min_send_delay = 0;
 
 	if (!am_tablesync_worker())
 	{
+		/*
+		 * We support time-delayed logical replication only for the apply
+		 * worker. This is because if we support delay during the initial sync
+		 * then once we reach the limit of tablesync workers it would impose a
+		 * delay for each subsequent worker. That would cause initial table
+		 * synchronization completion to take a long time.
+		 */
+		if (server_version >= 160000 && MySubscription->minsenddelay > 0)
+			options.proto.logical.min_send_delay = MySubscription->minsenddelay;
+
 		/*
 		 * Even when the two_phase mode is requested by the user, it remains
 		 * as the tri-state PENDING until all tablesyncs have reached READY
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 0df1acbb7a..02bca8d380 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -285,6 +285,7 @@ parse_output_parameters(List *options, PGOutputData *data)
 	bool		streaming_given = false;
 	bool		two_phase_option_given = false;
 	bool		origin_option_given = false;
+	bool		min_send_delay_option_given = false;
 
 	data->binary = false;
 	data->streaming = LOGICALREP_STREAM_OFF;
@@ -396,6 +397,32 @@ parse_output_parameters(List *options, PGOutputData *data)
 						errcode(ERRCODE_INVALID_PARAMETER_VALUE),
 						errmsg("unrecognized origin value: \"%s\"", data->origin));
 		}
+		else if (strcmp(defel->defname, "min_send_delay") == 0)
+		{
+			unsigned long delay_val;
+			char	   *endptr;
+
+			if (min_send_delay_option_given)
+				ereport(ERROR,
+						errcode(ERRCODE_SYNTAX_ERROR),
+						errmsg("conflicting or redundant options"));
+			min_send_delay_option_given = true;
+
+			errno = 0;
+			delay_val = strtoul(strVal(defel->arg), &endptr, 10);
+			if (errno != 0 || *endptr != '\0')
+				ereport(ERROR,
+						errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+						errmsg("invalid min_send_delay"));
+
+			if (delay_val > PG_INT32_MAX)
+				ereport(ERROR,
+						errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+						errmsg("min_send_delay \"%s\" out of range",
+							   strVal(defel->arg)));
+
+			data->min_send_delay = (int32) delay_val;
+		}
 		else
 			elog(ERROR, "unrecognized pgoutput option: %s", defel->defname);
 	}
@@ -502,6 +529,12 @@ pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
 		else
 			ctx->twophase_opt_given = true;
 
+		/*
+		 * Remember the delay time period to be used later before sending the
+		 * changes.
+		 */
+		ctx->min_send_delay = data->min_send_delay;
+
 		/* Init publication state. */
 		data->publications = NIL;
 		publications_valid = false;
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index 2f3c964824..522f7600a1 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -148,7 +148,7 @@ create_logical_replication_slot(char *name, char *plugin,
 									XL_ROUTINE(.page_read = read_local_xlog_page,
 											   .segment_open = wal_segment_open,
 											   .segment_close = wal_segment_close),
-									NULL, NULL, NULL);
+									NULL, NULL, NULL, NULL);
 
 	/*
 	 * If caller needs us to determine the decoding start point, do so now.
@@ -481,7 +481,7 @@ pg_logical_replication_slot_advance(XLogRecPtr moveto)
 									XL_ROUTINE(.page_read = read_local_xlog_page,
 											   .segment_open = wal_segment_open,
 											   .segment_close = wal_segment_close),
-									NULL, NULL, NULL);
+									NULL, NULL, NULL, NULL);
 
 		/*
 		 * Start reading at the slot's restart_lsn, which we know to point to
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 75e8363e24..b6f4cbfd4c 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -252,6 +252,7 @@ static void WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, Tran
 static void WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write);
 static void WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
 								 bool skipped_xact);
+static void WalSndDelay(LogicalDecodingContext *ctx, TransactionId xid, TimestampTz delay_start);
 static XLogRecPtr WalSndWaitForWal(XLogRecPtr loc);
 static void LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time);
 static TimeOffset LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now);
@@ -1126,7 +1127,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
 												   .segment_open = WalSndSegmentOpen,
 												   .segment_close = wal_segment_close),
 										WalSndPrepareWrite, WalSndWriteData,
-										WalSndUpdateProgress);
+										WalSndUpdateProgress, WalSndDelay);
 
 		/*
 		 * Signal that we don't need the timeout mechanism. We're just
@@ -1285,7 +1286,7 @@ StartLogicalReplication(StartReplicationCmd *cmd)
 										 .segment_open = WalSndSegmentOpen,
 										 .segment_close = wal_segment_close),
 							  WalSndPrepareWrite, WalSndWriteData,
-							  WalSndUpdateProgress);
+							  WalSndUpdateProgress, WalSndDelay);
 	xlogreader = logical_decoding_ctx->reader;
 
 	WalSndSetState(WALSNDSTATE_CATCHUP);
@@ -3849,3 +3850,91 @@ LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now)
 	Assert(time != 0);
 	return now - time;
 }
+
+/*
+ * LogicalDecodingContext 'delay' callback.
+ *
+ * Wait long enough to make sure a transaction is applied at least
+ * min_send_delay time period after it is performed at the publisher.
+ *
+ * delay_start is the transaction end time.
+ */
+static void
+WalSndDelay(LogicalDecodingContext *ctx, TransactionId xid, TimestampTz delay_start)
+{
+	/* Apply the delay by the latch mechanism */
+	while (true)
+	{
+		TimestampTz now;
+		TimestampTz delayUntil;
+		long		remaining_wait_time_ms;
+		long		timeout_sleeptime_ms;
+
+		ResetLatch(MyLatch);
+
+		CHECK_FOR_INTERRUPTS();
+
+		/* This might change wal_sender_timeout */
+		if (ConfigReloadPending)
+		{
+			ConfigReloadPending = false;
+			ProcessConfigFile(PGC_SIGHUP);
+		}
+
+		/* Check for input from the client */
+		ProcessRepliesIfAny();
+
+		/* Die if timeout was reached */
+		WalSndCheckTimeOut();
+
+		/* Send keepalive if the time has come */
+		WalSndKeepaliveIfNecessary();
+
+		/* Try to flush pending output to the client */
+		if (pq_flush_if_writable() != 0)
+			WalSndShutdown();
+
+		/*
+		 * If we've requested to shut down, exit the process.
+		 *
+		 * This is unlike handling at other places where we allow complete WAL
+		 * to be sent before shutdown because we don't want the delayed
+		 * transactions to be applied downstream. This will allow one to use
+		 * the data from downstream in case of some unwanted operations on the
+		 * current node.
+		 */
+		if (got_STOPPING)
+		{
+			QueryCompletion qc;
+
+			/* Inform the standby that XLOG streaming is done */
+			SetQueryCompletion(&qc, CMDTAG_COPY, 0);
+			EndCommand(&qc, DestRemote, false);
+			pq_flush();
+
+			proc_exit(0);
+		}
+
+		now = GetCurrentTimestamp();
+		delayUntil = TimestampTzPlusMilliseconds(delay_start, ctx->min_send_delay);
+		remaining_wait_time_ms = TimestampDifferenceMilliseconds(now, delayUntil);
+
+		/*
+		 * Exit without arming the latch if it's already past time to send
+		 * this transaction.
+		 */
+		if (remaining_wait_time_ms <= 0)
+			break;
+
+		/* Sleep until appropriate time */
+		timeout_sleeptime_ms = WalSndComputeSleeptime(now);
+
+		elog(DEBUG2, "time-delayed replication for txid %u, delay_time = %d ms, remaining wait time: %ld ms",
+			 xid, (int) ctx->min_send_delay, remaining_wait_time_ms);
+
+		/* Sleep until we get reply from worker or we time out */
+		WalSndWait(WL_SOCKET_READABLE,
+				   Min(timeout_sleeptime_ms, remaining_wait_time_ms),
+				   WAIT_EVENT_WALSENDER_SEND_DELAY);
+	}
+}
diff --git a/src/backend/utils/activity/wait_event.c b/src/backend/utils/activity/wait_event.c
index cb99cc6339..76c19fe11d 100644
--- a/src/backend/utils/activity/wait_event.c
+++ b/src/backend/utils/activity/wait_event.c
@@ -515,6 +515,9 @@ pgstat_get_wait_timeout(WaitEventTimeout w)
 		case WAIT_EVENT_VACUUM_TRUNCATE:
 			event_name = "VacuumTruncate";
 			break;
+		case WAIT_EVENT_WALSENDER_SEND_DELAY:
+			event_name = "WalSenderSendDelay";
+			break;
 			/* no default case, so that compiler will warn */
 	}
 
diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c
index 24ba936332..9aea3a6103 100644
--- a/src/bin/pg_dump/pg_dump.c
+++ b/src/bin/pg_dump/pg_dump.c
@@ -4493,6 +4493,7 @@ getSubscriptions(Archive *fout)
 	int			i_subsynccommit;
 	int			i_subpublications;
 	int			i_subbinary;
+	int			i_subminsenddelay;
 	int			i,
 				ntups;
 
@@ -4545,9 +4546,13 @@ getSubscriptions(Archive *fout)
 						  LOGICALREP_TWOPHASE_STATE_DISABLED);
 
 	if (fout->remoteVersion >= 160000)
-		appendPQExpBufferStr(query, " s.suborigin\n");
+		appendPQExpBufferStr(query,
+							 " s.suborigin,\n"
+							 " s.subminsenddelay\n");
 	else
-		appendPQExpBuffer(query, " '%s' AS suborigin\n", LOGICALREP_ORIGIN_ANY);
+		appendPQExpBuffer(query, " '%s' AS suborigin,\n"
+						  " 0 AS subminsenddelay\n",
+						  LOGICALREP_ORIGIN_ANY);
 
 	appendPQExpBufferStr(query,
 						 "FROM pg_subscription s\n"
@@ -4575,6 +4580,7 @@ getSubscriptions(Archive *fout)
 	i_subtwophasestate = PQfnumber(res, "subtwophasestate");
 	i_subdisableonerr = PQfnumber(res, "subdisableonerr");
 	i_suborigin = PQfnumber(res, "suborigin");
+	i_subminsenddelay = PQfnumber(res, "subminsenddelay");
 
 	subinfo = pg_malloc(ntups * sizeof(SubscriptionInfo));
 
@@ -4605,6 +4611,8 @@ getSubscriptions(Archive *fout)
 		subinfo[i].subdisableonerr =
 			pg_strdup(PQgetvalue(res, i, i_subdisableonerr));
 		subinfo[i].suborigin = pg_strdup(PQgetvalue(res, i, i_suborigin));
+		subinfo[i].subminsenddelay =
+			atoi(PQgetvalue(res, i, i_subminsenddelay));
 
 		/* Decide whether we want to dump it */
 		selectDumpableObject(&(subinfo[i].dobj), fout);
@@ -4686,6 +4694,9 @@ dumpSubscription(Archive *fout, const SubscriptionInfo *subinfo)
 	if (strcmp(subinfo->subsynccommit, "off") != 0)
 		appendPQExpBuffer(query, ", synchronous_commit = %s", fmtId(subinfo->subsynccommit));
 
+	if (subinfo->subminsenddelay > 0)
+		appendPQExpBuffer(query, ", min_send_delay = '%d'", subinfo->subminsenddelay);
+
 	appendPQExpBufferStr(query, ");\n");
 
 	if (subinfo->dobj.dump & DUMP_COMPONENT_DEFINITION)
diff --git a/src/bin/pg_dump/pg_dump.h b/src/bin/pg_dump/pg_dump.h
index cdca0b993d..4c55f8efc4 100644
--- a/src/bin/pg_dump/pg_dump.h
+++ b/src/bin/pg_dump/pg_dump.h
@@ -660,6 +660,7 @@ typedef struct _SubscriptionInfo
 	char	   *subdisableonerr;
 	char	   *suborigin;
 	char	   *subsynccommit;
+	int			subminsenddelay;
 	char	   *subpublications;
 } SubscriptionInfo;
 
diff --git a/src/bin/psql/describe.c b/src/bin/psql/describe.c
index c8a0bb7b3a..c7d303a168 100644
--- a/src/bin/psql/describe.c
+++ b/src/bin/psql/describe.c
@@ -6472,7 +6472,7 @@ describeSubscriptions(const char *pattern, bool verbose)
 	PGresult   *res;
 	printQueryOpt myopt = pset.popt;
 	static const bool translate_columns[] = {false, false, false, false,
-	false, false, false, false, false, false, false, false};
+	false, false, false, false, false, false, false, false, false};
 
 	if (pset.sversion < 100000)
 	{
@@ -6527,10 +6527,13 @@ describeSubscriptions(const char *pattern, bool verbose)
 							  gettext_noop("Two-phase commit"),
 							  gettext_noop("Disable on error"));
 
+		/* Origin and min_send_delay are only supported in v16 and higher */
 		if (pset.sversion >= 160000)
 			appendPQExpBuffer(&buf,
-							  ", suborigin AS \"%s\"\n",
-							  gettext_noop("Origin"));
+							  ", suborigin AS \"%s\"\n"
+							  ", subminsenddelay AS \"%s\"\n",
+							  gettext_noop("Origin"),
+							  gettext_noop("Min send delay"));
 
 		appendPQExpBuffer(&buf,
 						  ",  subsynccommit AS \"%s\"\n"
diff --git a/src/bin/psql/tab-complete.c b/src/bin/psql/tab-complete.c
index 5e1882eaea..6643db6f55 100644
--- a/src/bin/psql/tab-complete.c
+++ b/src/bin/psql/tab-complete.c
@@ -1925,7 +1925,7 @@ psql_completion(const char *text, int start, int end)
 		COMPLETE_WITH("(", "PUBLICATION");
 	/* ALTER SUBSCRIPTION <name> SET ( */
 	else if (HeadMatches("ALTER", "SUBSCRIPTION", MatchAny) && TailMatches("SET", "("))
-		COMPLETE_WITH("binary", "disable_on_error", "origin", "slot_name",
+		COMPLETE_WITH("binary", "disable_on_error", "min_send_delay", "origin", "slot_name",
 					  "streaming", "synchronous_commit");
 	/* ALTER SUBSCRIPTION <name> SKIP ( */
 	else if (HeadMatches("ALTER", "SUBSCRIPTION", MatchAny) && TailMatches("SKIP", "("))
@@ -3268,7 +3268,7 @@ psql_completion(const char *text, int start, int end)
 	/* Complete "CREATE SUBSCRIPTION <name> ...  WITH ( <opt>" */
 	else if (HeadMatches("CREATE", "SUBSCRIPTION") && TailMatches("WITH", "("))
 		COMPLETE_WITH("binary", "connect", "copy_data", "create_slot",
-					  "disable_on_error", "enabled", "origin", "slot_name",
+					  "disable_on_error", "enabled", "min_send_delay", "origin", "slot_name",
 					  "streaming", "synchronous_commit", "two_phase");
 
 /* CREATE TRIGGER --- is allowed inside CREATE SCHEMA, so use TailMatches */
diff --git a/src/include/catalog/pg_subscription.h b/src/include/catalog/pg_subscription.h
index b0f2a1705d..69ae4314b4 100644
--- a/src/include/catalog/pg_subscription.h
+++ b/src/include/catalog/pg_subscription.h
@@ -74,6 +74,8 @@ CATALOG(pg_subscription,6100,SubscriptionRelationId) BKI_SHARED_RELATION BKI_ROW
 
 	Oid			subowner BKI_LOOKUP(pg_authid); /* Owner of the subscription */
 
+	int32		subminsenddelay;	/* Replication send delay (ms) */
+
 	bool		subenabled;		/* True if the subscription is enabled (the
 								 * worker should be running) */
 
@@ -122,6 +124,7 @@ typedef struct Subscription
 								 * skipped */
 	char	   *name;			/* Name of the subscription */
 	Oid			owner;			/* Oid of the subscription owner */
+	int32		minsenddelay;	/* Replication send delay (ms) */
 	bool		enabled;		/* Indicates if the subscription is enabled */
 	bool		binary;			/* Indicates if the subscription wants data in
 								 * binary format */
diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h
index 5f49554ea0..c389dc17e5 100644
--- a/src/include/replication/logical.h
+++ b/src/include/replication/logical.h
@@ -30,6 +30,11 @@ typedef void (*LogicalOutputPluginWriterUpdateProgress) (struct LogicalDecodingC
 														 bool skipped_xact
 );
 
+typedef void (*LogicalOutputPluginWriterDelay) (struct LogicalDecodingContext *lr,
+												TransactionId xid,
+												TimestampTz start_time
+);
+
 typedef struct LogicalDecodingContext
 {
 	/* memory context this is all allocated in */
@@ -64,6 +69,7 @@ typedef struct LogicalDecodingContext
 	LogicalOutputPluginWriterPrepareWrite prepare_write;
 	LogicalOutputPluginWriterWrite write;
 	LogicalOutputPluginWriterUpdateProgress update_progress;
+	LogicalOutputPluginWriterDelay delay_send;
 
 	/*
 	 * Output buffer.
@@ -100,6 +106,12 @@ typedef struct LogicalDecodingContext
 	 */
 	bool		twophase_opt_given;
 
+	/*
+	 * The minimum delay, in milliseconds, by the publisher before sending all
+	 * the changes.
+	 */
+	int32		min_send_delay;
+
 	/*
 	 * State for writing output.
 	 */
@@ -121,14 +133,16 @@ extern LogicalDecodingContext *CreateInitDecodingContext(const char *plugin,
 														 XLogReaderRoutine *xl_routine,
 														 LogicalOutputPluginWriterPrepareWrite prepare_write,
 														 LogicalOutputPluginWriterWrite do_write,
-														 LogicalOutputPluginWriterUpdateProgress update_progress);
+														 LogicalOutputPluginWriterUpdateProgress update_progress,
+														 LogicalOutputPluginWriterDelay delay_send);
 extern LogicalDecodingContext *CreateDecodingContext(XLogRecPtr start_lsn,
 													 List *output_plugin_options,
 													 bool fast_forward,
 													 XLogReaderRoutine *xl_routine,
 													 LogicalOutputPluginWriterPrepareWrite prepare_write,
 													 LogicalOutputPluginWriterWrite do_write,
-													 LogicalOutputPluginWriterUpdateProgress update_progress);
+													 LogicalOutputPluginWriterUpdateProgress update_progress,
+													 LogicalOutputPluginWriterDelay delay_send);
 extern void DecodingContextFindStartpoint(LogicalDecodingContext *ctx);
 extern bool DecodingContextReady(LogicalDecodingContext *ctx);
 extern void FreeDecodingContext(LogicalDecodingContext *ctx);
diff --git a/src/include/replication/pgoutput.h b/src/include/replication/pgoutput.h
index b4a8015403..d2fde09e00 100644
--- a/src/include/replication/pgoutput.h
+++ b/src/include/replication/pgoutput.h
@@ -30,6 +30,7 @@ typedef struct PGOutputData
 	bool		messages;
 	bool		two_phase;
 	char	   *origin;
+	int32		min_send_delay;
 } PGOutputData;
 
 #endif							/* PGOUTPUT_H */
diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h
index decffe352d..271797c7b1 100644
--- a/src/include/replication/walreceiver.h
+++ b/src/include/replication/walreceiver.h
@@ -187,6 +187,7 @@ typedef struct
 									 * prepare time */
 			char	   *origin; /* Only publish data originating from the
 								 * specified origin */
+			int32		min_send_delay; /* The minimum send delay (ms) */
 		}			logical;
 	}			proto;
 } WalRcvStreamOptions;
diff --git a/src/include/utils/wait_event.h b/src/include/utils/wait_event.h
index 9ab23e1c4a..cc3a234eba 100644
--- a/src/include/utils/wait_event.h
+++ b/src/include/utils/wait_event.h
@@ -150,7 +150,8 @@ typedef enum
 	WAIT_EVENT_REGISTER_SYNC_REQUEST,
 	WAIT_EVENT_SPIN_DELAY,
 	WAIT_EVENT_VACUUM_DELAY,
-	WAIT_EVENT_VACUUM_TRUNCATE
+	WAIT_EVENT_VACUUM_TRUNCATE,
+	WAIT_EVENT_WALSENDER_SEND_DELAY
 } WaitEventTimeout;
 
 /* ----------
diff --git a/src/test/regress/expected/subscription.out b/src/test/regress/expected/subscription.out
index 3f99b14394..121c335f48 100644
--- a/src/test/regress/expected/subscription.out
+++ b/src/test/regress/expected/subscription.out
@@ -114,18 +114,18 @@ CREATE SUBSCRIPTION regress_testsub4 CONNECTION 'dbname=regress_doesnotexist' PU
 WARNING:  subscription was created, but is not connected
 HINT:  To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription.
 \dRs+ regress_testsub4
-                                                                                         List of subscriptions
-       Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |          Conninfo           | Skip LSN 
-------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
- regress_testsub4 | regress_subscription_user | f       | {testpub}   | f      | off       | d                | f                | none   | off                | dbname=regress_doesnotexist | 0/0
+                                                                                                  List of subscriptions
+       Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Min send delay | Synchronous commit |          Conninfo           | Skip LSN 
+------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+----------------+--------------------+-----------------------------+----------
+ regress_testsub4 | regress_subscription_user | f       | {testpub}   | f      | off       | d                | f                | none   |              0 | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub4 SET (origin = any);
 \dRs+ regress_testsub4
-                                                                                         List of subscriptions
-       Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |          Conninfo           | Skip LSN 
-------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
- regress_testsub4 | regress_subscription_user | f       | {testpub}   | f      | off       | d                | f                | any    | off                | dbname=regress_doesnotexist | 0/0
+                                                                                                  List of subscriptions
+       Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Min send delay | Synchronous commit |          Conninfo           | Skip LSN 
+------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+----------------+--------------------+-----------------------------+----------
+ regress_testsub4 | regress_subscription_user | f       | {testpub}   | f      | off       | d                | f                | any    |              0 | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 DROP SUBSCRIPTION regress_testsub3;
@@ -143,10 +143,10 @@ ALTER SUBSCRIPTION regress_testsub CONNECTION 'foobar';
 ERROR:  invalid connection string syntax: missing "=" after "foobar" in connection info string
 
 \dRs+
-                                                                                         List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |          Conninfo           | Skip LSN 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off       | d                | f                | any    | off                | dbname=regress_doesnotexist | 0/0
+                                                                                                 List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Min send delay | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+----------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off       | d                | f                | any    |              0 | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET PUBLICATION testpub2, testpub3 WITH (refresh = false);
@@ -163,10 +163,10 @@ ERROR:  unrecognized subscription parameter: "create_slot"
 -- ok
 ALTER SUBSCRIPTION regress_testsub SKIP (lsn = '0/12345');
 \dRs+
-                                                                                             List of subscriptions
-      Name       |           Owner           | Enabled |     Publication     | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |           Conninfo           | Skip LSN 
------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+--------------------+------------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub2,testpub3} | f      | off       | d                | f                | any    | off                | dbname=regress_doesnotexist2 | 0/12345
+                                                                                                      List of subscriptions
+      Name       |           Owner           | Enabled |     Publication     | Binary | Streaming | Two-phase commit | Disable on error | Origin | Min send delay | Synchronous commit |           Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+----------------+--------------------+------------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub2,testpub3} | f      | off       | d                | f                | any    |              0 | off                | dbname=regress_doesnotexist2 | 0/12345
 (1 row)
 
 -- ok - with lsn = NONE
@@ -175,10 +175,10 @@ ALTER SUBSCRIPTION regress_testsub SKIP (lsn = NONE);
 ALTER SUBSCRIPTION regress_testsub SKIP (lsn = '0/0');
 ERROR:  invalid WAL location (LSN): 0/0
 \dRs+
-                                                                                             List of subscriptions
-      Name       |           Owner           | Enabled |     Publication     | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |           Conninfo           | Skip LSN 
------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+--------------------+------------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub2,testpub3} | f      | off       | d                | f                | any    | off                | dbname=regress_doesnotexist2 | 0/0
+                                                                                                      List of subscriptions
+      Name       |           Owner           | Enabled |     Publication     | Binary | Streaming | Two-phase commit | Disable on error | Origin | Min send delay | Synchronous commit |           Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+----------------+--------------------+------------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub2,testpub3} | f      | off       | d                | f                | any    |              0 | off                | dbname=regress_doesnotexist2 | 0/0
 (1 row)
 
 BEGIN;
@@ -210,10 +210,10 @@ ALTER SUBSCRIPTION regress_testsub_foo SET (synchronous_commit = foobar);
 ERROR:  invalid value for parameter "synchronous_commit": "foobar"
 HINT:  Available values: local, remote_write, remote_apply, on, off.
 \dRs+
-                                                                                               List of subscriptions
-        Name         |           Owner           | Enabled |     Publication     | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |           Conninfo           | Skip LSN 
----------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+--------------------+------------------------------+----------
- regress_testsub_foo | regress_subscription_user | f       | {testpub2,testpub3} | f      | off       | d                | f                | any    | local              | dbname=regress_doesnotexist2 | 0/0
+                                                                                                        List of subscriptions
+        Name         |           Owner           | Enabled |     Publication     | Binary | Streaming | Two-phase commit | Disable on error | Origin | Min send delay | Synchronous commit |           Conninfo           | Skip LSN 
+---------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+----------------+--------------------+------------------------------+----------
+ regress_testsub_foo | regress_subscription_user | f       | {testpub2,testpub3} | f      | off       | d                | f                | any    |              0 | local              | dbname=regress_doesnotexist2 | 0/0
 (1 row)
 
 -- rename back to keep the rest simple
@@ -247,19 +247,19 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB
 WARNING:  subscription was created, but is not connected
 HINT:  To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription.
 \dRs+
-                                                                                         List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |          Conninfo           | Skip LSN 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub}   | t      | off       | d                | f                | any    | off                | dbname=regress_doesnotexist | 0/0
+                                                                                                 List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Min send delay | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+----------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | t      | off       | d                | f                | any    |              0 | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (binary = false);
 ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
 \dRs+
-                                                                                         List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |          Conninfo           | Skip LSN 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off       | d                | f                | any    | off                | dbname=regress_doesnotexist | 0/0
+                                                                                                 List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Min send delay | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+----------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off       | d                | f                | any    |              0 | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 DROP SUBSCRIPTION regress_testsub;
@@ -271,27 +271,27 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB
 WARNING:  subscription was created, but is not connected
 HINT:  To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription.
 \dRs+
-                                                                                         List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |          Conninfo           | Skip LSN 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | on        | d                | f                | any    | off                | dbname=regress_doesnotexist | 0/0
+                                                                                                 List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Min send delay | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+----------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | on        | d                | f                | any    |              0 | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (streaming = parallel);
 \dRs+
-                                                                                         List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |          Conninfo           | Skip LSN 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | parallel  | d                | f                | any    | off                | dbname=regress_doesnotexist | 0/0
+                                                                                                 List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Min send delay | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+----------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | parallel  | d                | f                | any    |              0 | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (streaming = false);
 ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
 \dRs+
-                                                                                         List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |          Conninfo           | Skip LSN 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off       | d                | f                | any    | off                | dbname=regress_doesnotexist | 0/0
+                                                                                                 List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Min send delay | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+----------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off       | d                | f                | any    |              0 | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 -- fail - publication already exists
@@ -306,10 +306,10 @@ ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub1, testpub2 WITH (refr
 ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub1, testpub2 WITH (refresh = false);
 ERROR:  publication "testpub1" is already in subscription "regress_testsub"
 \dRs+
-                                                                                                 List of subscriptions
-      Name       |           Owner           | Enabled |         Publication         | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |          Conninfo           | Skip LSN 
------------------+---------------------------+---------+-----------------------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub,testpub1,testpub2} | f      | off       | d                | f                | any    | off                | dbname=regress_doesnotexist | 0/0
+                                                                                                         List of subscriptions
+      Name       |           Owner           | Enabled |         Publication         | Binary | Streaming | Two-phase commit | Disable on error | Origin | Min send delay | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-----------------------------+--------+-----------+------------------+------------------+--------+----------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub,testpub1,testpub2} | f      | off       | d                | f                | any    |              0 | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 -- fail - publication used more than once
@@ -324,10 +324,10 @@ ERROR:  publication "testpub3" is not in subscription "regress_testsub"
 -- ok - delete publications
 ALTER SUBSCRIPTION regress_testsub DROP PUBLICATION testpub1, testpub2 WITH (refresh = false);
 \dRs+
-                                                                                         List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |          Conninfo           | Skip LSN 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off       | d                | f                | any    | off                | dbname=regress_doesnotexist | 0/0
+                                                                                                 List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Min send delay | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+----------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off       | d                | f                | any    |              0 | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 DROP SUBSCRIPTION regress_testsub;
@@ -363,10 +363,10 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB
 WARNING:  subscription was created, but is not connected
 HINT:  To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription.
 \dRs+
-                                                                                         List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |          Conninfo           | Skip LSN 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off       | p                | f                | any    | off                | dbname=regress_doesnotexist | 0/0
+                                                                                                 List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Min send delay | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+----------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off       | p                | f                | any    |              0 | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 --fail - alter of two_phase option not supported.
@@ -375,10 +375,10 @@ ERROR:  unrecognized subscription parameter: "two_phase"
 -- but can alter streaming when two_phase enabled
 ALTER SUBSCRIPTION regress_testsub SET (streaming = true);
 \dRs+
-                                                                                         List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |          Conninfo           | Skip LSN 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | on        | p                | f                | any    | off                | dbname=regress_doesnotexist | 0/0
+                                                                                                 List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Min send delay | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+----------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | on        | p                | f                | any    |              0 | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
@@ -388,10 +388,10 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB
 WARNING:  subscription was created, but is not connected
 HINT:  To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription.
 \dRs+
-                                                                                         List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |          Conninfo           | Skip LSN 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | on        | p                | f                | any    | off                | dbname=regress_doesnotexist | 0/0
+                                                                                                 List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Min send delay | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+----------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | on        | p                | f                | any    |              0 | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
@@ -404,20 +404,61 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB
 WARNING:  subscription was created, but is not connected
 HINT:  To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription.
 \dRs+
-                                                                                         List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |          Conninfo           | Skip LSN 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off       | d                | f                | any    | off                | dbname=regress_doesnotexist | 0/0
+                                                                                                 List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Min send delay | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+----------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off       | d                | f                | any    |              0 | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (disable_on_error = true);
 \dRs+
-                                                                                         List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |          Conninfo           | Skip LSN 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off       | d                | t                | any    | off                | dbname=regress_doesnotexist | 0/0
+                                                                                                 List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Min send delay | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+----------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off       | d                | t                | any    |              0 | off                | dbname=regress_doesnotexist | 0/0
+(1 row)
+
+ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
+DROP SUBSCRIPTION regress_testsub;
+-- fail -- min_send_delay must be a non-negative integer
+CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexit' PUBLICATION testpub WITH (connect = false, min_send_delay = foo);
+ERROR:  invalid value for parameter "min_send_delay": "foo"
+CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexit' PUBLICATION testpub WITH (connect = false, min_send_delay = -1);
+ERROR:  -1 ms is outside the valid range for parameter "min_send_delay" (0 .. 2147483647)
+-- fail - specifying streaming = parallel with positive min_send_delay is not
+-- supported
+CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, streaming = parallel, min_send_delay = 123);
+ERROR:  min_send_delay > 0 and streaming = parallel are mutually exclusive options
+-- success -- min_send_delay value without units is taken as milliseconds
+CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexit' PUBLICATION testpub WITH (connect = false, min_send_delay = 123);
+WARNING:  subscription was created, but is not connected
+HINT:  To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription.
+\dRs+
+                                                                                                 List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Min send delay | Synchronous commit |          Conninfo          | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+----------------+--------------------+----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off       | d                | f                | any    |            123 | off                | dbname=regress_doesnotexit | 0/0
+(1 row)
+
+-- success -- min_send_delay value with units other than ms is converted to ms
+-- and stored as an integer
+ALTER SUBSCRIPTION regress_testsub SET (min_send_delay = '1 d');
+\dRs+
+                                                                                                 List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Min send delay | Synchronous commit |          Conninfo          | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+----------------+--------------------+----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off       | d                | f                | any    |       86400000 | off                | dbname=regress_doesnotexit | 0/0
 (1 row)
 
+-- fail - alter subscription with streaming = parallel should fail when
+-- min_send_delay is greater than zero
+ALTER SUBSCRIPTION regress_testsub SET (streaming = parallel);
+ERROR:  cannot set parallel streaming mode for subscription with min_send_delay
+-- fail - alter subscription with min_send_delay should fail when
+-- streaming = parallel is set
+ALTER SUBSCRIPTION regress_testsub SET (min_send_delay = 0, streaming = parallel);
+ALTER SUBSCRIPTION regress_testsub SET (min_send_delay = 123);
+ERROR:  cannot set min_send_delay for subscription in parallel streaming mode
 ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
 DROP SUBSCRIPTION regress_testsub;
 RESET SESSION AUTHORIZATION;
diff --git a/src/test/regress/sql/subscription.sql b/src/test/regress/sql/subscription.sql
index 7281f5fee2..b2d49fcfcb 100644
--- a/src/test/regress/sql/subscription.sql
+++ b/src/test/regress/sql/subscription.sql
@@ -286,6 +286,35 @@ ALTER SUBSCRIPTION regress_testsub SET (disable_on_error = true);
 ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
 DROP SUBSCRIPTION regress_testsub;
 
+-- fail -- min_send_delay must be a non-negative integer
+CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexit' PUBLICATION testpub WITH (connect = false, min_send_delay = foo);
+CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexit' PUBLICATION testpub WITH (connect = false, min_send_delay = -1);
+
+-- fail - specifying streaming = parallel with positive min_send_delay is not
+-- supported
+CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, streaming = parallel, min_send_delay = 123);
+
+-- success -- min_send_delay value without units is taken as milliseconds
+CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexit' PUBLICATION testpub WITH (connect = false, min_send_delay = 123);
+\dRs+
+
+-- success -- min_send_delay value with units other than ms is converted to ms
+-- and stored as an integer
+ALTER SUBSCRIPTION regress_testsub SET (min_send_delay = '1 d');
+\dRs+
+
+-- fail - alter subscription with streaming = parallel should fail when
+-- min_send_delay is greater than zero
+ALTER SUBSCRIPTION regress_testsub SET (streaming = parallel);
+
+-- fail - alter subscription with min_send_delay should fail when
+-- streaming = parallel is set
+ALTER SUBSCRIPTION regress_testsub SET (min_send_delay = 0, streaming = parallel);
+ALTER SUBSCRIPTION regress_testsub SET (min_send_delay = 123);
+
+ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
+DROP SUBSCRIPTION regress_testsub;
+
 RESET SESSION AUTHORIZATION;
 DROP ROLE regress_subscription_user;
 DROP ROLE regress_subscription_user2;
diff --git a/src/test/subscription/t/001_rep_changes.pl b/src/test/subscription/t/001_rep_changes.pl
index 91aa068c95..c4e6f11b07 100644
--- a/src/test/subscription/t/001_rep_changes.pl
+++ b/src/test/subscription/t/001_rep_changes.pl
@@ -515,6 +515,33 @@ $node_publisher->poll_query_until('postgres',
   or die
   "Timed out while waiting for apply to restart after renaming SUBSCRIPTION";
 
+# Test time-delayed logical replication
+#
+# If the subscription sets min_send_delay parameter, the walsender will delay
+# the transaction for min_send_delay milliseconds. We verify this by looking
+# at the time difference between a) when tuples are inserted on the publisher,
+# and b) when those changes are replicated on the subscriber. Even on slow
+# machines, this strategy will give predictable behavior.
+
+# Set min_send_delay parameter to 3 seconds
+my $delay = 3;
+$node_subscriber->safe_psql('postgres',
+	"ALTER SUBSCRIPTION tap_sub_renamed SET (min_send_delay = '${delay}s')");
+
+# Before doing the insertion, get the current timestamp that will be
+# used as a comparison base.
+my $publisher_insert_time = time();
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO tab_ins VALUES (generate_series(1101, 1120))");
+
+# The publisher waits for the replication to complete
+$node_publisher->wait_for_catchup('tap_sub_renamed');
+
+# This test is successful only if at least the configured delay has elapsed.
+ok( time() - $publisher_insert_time >= $delay,
+	"subscriber applies changes only after replication delay"
+);
+
 # check all the cleanup
 $node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub_renamed");
 
-- 
2.27.0

