On Wed, Sep 22, 2021, at 1:57 PM, Euler Taveira wrote:
> On Wed, Sep 22, 2021, at 1:18 AM, Amit Kapila wrote:
>> On Tue, Sep 21, 2021 at 4:21 PM Marcos Pegoraro <mar...@f10.com.br> wrote:
>>> No, I´m talking about that configuration you can have on standby servers
>>> recovery_min_apply_delay = '8h'
>>> 
>> 
>> oh okay, I think this can be useful in some cases where we want to avoid 
>> data loss similar to its use for physical standby. For example, if the user 
>> has by mistake truncated the table (or deleted some required data) on the 
>> publisher, we can always it from the subscriber if we have such a feature.
>> 
>> Having said that, I am not sure if we can call it a restriction. It is more 
>> of a TODO kind of thing. It doesn't sound advisable to me to keep growing 
>> the current Restrictions page [1].
> It is a new feature. pglogical supports it and it is useful for delayed
> secondary server and if, for some business reason, you have to delay when data
> is available. There might be other use cases but these are the ones I 
> regularly
> heard from customers.
> 
> BTW, I have a WIP patch for this feature. I didn't have enough time to post it
> because it lacks documentation and tests. I'm planning to do it as soon as 
> this
> CF ends.
Long time, no patch. Here it is. I will provide documentation in the next
version. I would appreciate some feedback.


--
Euler Taveira
EDB   https://www.enterprisedb.com/
From 9635fec1a031b82ec5d67cdfe16aa1f553ffa936 Mon Sep 17 00:00:00 2001
From: Euler Taveira <euler.tave...@enterprisedb.com>
Date: Sat, 6 Nov 2021 11:31:10 -0300
Subject: [PATCH v1] Time-delayed logical replication subscriber

Similar to physical replication, a time-delayed copy of the data is
useful for some scenarios (specially to fix errors that might cause data
loss).

If the subscriber sets apply_delay parameter, the logical replication
worker will delay the transaction commit for apply_delay milliseconds.

Discussion: https://postgr.es/m/CAB-JLwYOYwL=xtyaxkih5ctm_vm8kjkh7aaitckvmch4rzr...@mail.gmail.com
---
 src/backend/catalog/pg_subscription.c      |  1 +
 src/backend/catalog/system_views.sql       |  3 +-
 src/backend/commands/subscriptioncmds.c    | 44 +++++++++-
 src/backend/replication/logical/worker.c   | 48 +++++++++++
 src/backend/utils/adt/timestamp.c          |  8 ++
 src/bin/pg_dump/pg_dump.c                  | 16 +++-
 src/bin/pg_dump/pg_dump.h                  |  1 +
 src/bin/psql/describe.c                    |  8 +-
 src/include/catalog/pg_subscription.h      |  3 +
 src/include/utils/timestamp.h              |  2 +
 src/test/regress/expected/subscription.out | 96 +++++++++++-----------
 src/test/subscription/t/029_apply_delay.pl | 71 ++++++++++++++++
 12 files changed, 248 insertions(+), 53 deletions(-)
 create mode 100644 src/test/subscription/t/029_apply_delay.pl

diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c
index ca65a8bd20..0788384579 100644
--- a/src/backend/catalog/pg_subscription.c
+++ b/src/backend/catalog/pg_subscription.c
@@ -69,6 +69,7 @@ GetSubscription(Oid subid, bool missing_ok)
 	sub->binary = subform->subbinary;
 	sub->stream = subform->substream;
 	sub->twophasestate = subform->subtwophasestate;
+	sub->applydelay = subform->subapplydelay;
 
 	/* Get conninfo */
 	datum = SysCacheGetAttr(SUBSCRIPTIONOID,
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index 3cb69b1f87..1cc0d86f2e 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -1261,7 +1261,8 @@ REVOKE ALL ON pg_replication_origin_status FROM public;
 -- All columns of pg_subscription except subconninfo are publicly readable.
 REVOKE ALL ON pg_subscription FROM public;
 GRANT SELECT (oid, subdbid, subname, subowner, subenabled, subbinary,
-              substream, subtwophasestate, subslotname, subsynccommit, subpublications)
+              substream, subtwophasestate, subslotname, subsynccommit,
+              subapplydelay, subpublications)
     ON pg_subscription TO public;
 
 CREATE VIEW pg_stat_subscription_workers AS
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 3ef6607d24..19916f04a8 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -46,6 +46,7 @@
 #include "utils/lsyscache.h"
 #include "utils/memutils.h"
 #include "utils/syscache.h"
+#include "utils/timestamp.h"
 
 /*
  * Options that can be specified by the user in CREATE/ALTER SUBSCRIPTION
@@ -61,6 +62,7 @@
 #define SUBOPT_BINARY				0x00000080
 #define SUBOPT_STREAMING			0x00000100
 #define SUBOPT_TWOPHASE_COMMIT		0x00000200
+#define SUBOPT_APPLY_DELAY			0x00000400
 
 /* check if the 'val' has 'bits' set */
 #define IsSet(val, bits)  (((val) & (bits)) == (bits))
@@ -82,6 +84,7 @@ typedef struct SubOpts
 	bool		binary;
 	bool		streaming;
 	bool		twophase;
+	int64		apply_delay;
 } SubOpts;
 
 static List *fetch_table_list(WalReceiverConn *wrconn, List *publications);
@@ -249,12 +252,34 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
 			opts->specified_opts |= SUBOPT_TWOPHASE_COMMIT;
 			opts->twophase = defGetBoolean(defel);
 		}
+		else if (strcmp(defel->defname, "apply_delay") == 0)
+		{
+			char	   *val;
+			Interval   *interval;
+
+			if (IsSet(opts->specified_opts, SUBOPT_APPLY_DELAY))
+				errorConflictingDefElem(defel, pstate);
+
+			opts->specified_opts |= SUBOPT_APPLY_DELAY;
+			val = defGetString(defel);
+
+			interval = DatumGetIntervalP(DirectFunctionCall3(interval_in,
+																	CStringGetDatum(val),
+																	ObjectIdGetDatum(InvalidOid),
+																	Int32GetDatum(-1)));
+			opts->apply_delay = interval_to_ms(interval);
+		}
 		else
 			ereport(ERROR,
 					(errcode(ERRCODE_SYNTAX_ERROR),
 					 errmsg("unrecognized subscription parameter: \"%s\"", defel->defname)));
 	}
 
+	if (opts->apply_delay < 0)
+		ereport(ERROR,
+				errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
+				errmsg("option \"%s\" must not be negative", "apply_delay"));
+
 	/*
 	 * We've been explicitly asked to not connect, that requires some
 	 * additional processing.
@@ -390,7 +415,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 	supported_opts = (SUBOPT_CONNECT | SUBOPT_ENABLED | SUBOPT_CREATE_SLOT |
 					  SUBOPT_SLOT_NAME | SUBOPT_COPY_DATA |
 					  SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY |
-					  SUBOPT_STREAMING | SUBOPT_TWOPHASE_COMMIT);
+					  SUBOPT_STREAMING | SUBOPT_TWOPHASE_COMMIT |
+					  SUBOPT_APPLY_DELAY);
 	parse_subscription_options(pstate, stmt->options, supported_opts, &opts);
 
 	/*
@@ -464,6 +490,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 		CharGetDatum(opts.twophase ?
 					 LOGICALREP_TWOPHASE_STATE_PENDING :
 					 LOGICALREP_TWOPHASE_STATE_DISABLED);
+	values[Anum_pg_subscription_subapplydelay - 1] = Int64GetDatum(opts.apply_delay);
 	values[Anum_pg_subscription_subconninfo - 1] =
 		CStringGetTextDatum(conninfo);
 	if (opts.slot_name)
@@ -913,6 +940,13 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 					replaces[Anum_pg_subscription_substream - 1] = true;
 				}
 
+				if (IsSet(opts.specified_opts, SUBOPT_APPLY_DELAY))
+				{
+					values[Anum_pg_subscription_subapplydelay - 1] =
+						Int64GetDatum(opts.apply_delay);
+					replaces[Anum_pg_subscription_subapplydelay - 1] = true;
+				}
+
 				update_tuple = true;
 				break;
 			}
@@ -935,6 +969,14 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 				if (opts.enabled)
 					ApplyLauncherWakeupAtCommit();
 
+				/*
+				 * If this subscription has been disabled and it has an apply
+				 * delay set, wake up the logical replication worker to finish
+				 * it as soon as possible.
+				 */
+				if (!opts.enabled && sub->applydelay > 0)
+					logicalrep_worker_wakeup(sub->oid, InvalidOid);
+
 				update_tuple = true;
 				break;
 			}
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 5d9acc6173..39231d464e 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -248,6 +248,7 @@ WalReceiverConn *LogRepWorkerWalRcvConn = NULL;
 
 Subscription *MySubscription = NULL;
 bool		MySubscriptionValid = false;
+TimestampTz	MySubscriptionApplyDelayUntil = 0;
 
 bool		in_remote_transaction = false;
 static XLogRecPtr remote_final_lsn = InvalidXLogRecPtr;
@@ -303,6 +304,8 @@ static void store_flush_position(XLogRecPtr remote_lsn);
 
 static void maybe_reread_subscription(void);
 
+static void apply_delay(void);
+
 /* prototype needed because of stream_commit */
 static void apply_dispatch(StringInfo s);
 
@@ -373,6 +376,9 @@ begin_replication_step(void)
 {
 	SetCurrentStatementStartTimestamp();
 
+	/* delay the current transaction? */
+	apply_delay();
+
 	if (!IsTransactionState())
 	{
 		StartTransactionCommand();
@@ -778,6 +784,43 @@ slot_modify_data(TupleTableSlot *slot, TupleTableSlot *srcslot,
 	ExecStoreVirtualTuple(slot);
 }
 
+static void
+apply_delay(void)
+{
+	/* nothing to do if no delay set */
+	if (MySubscription->applydelay <= 0)
+		return;
+
+	while (true)
+	{
+		int			diffms;
+
+		diffms = TimestampDifferenceMilliseconds(GetCurrentTimestamp(), MySubscriptionApplyDelayUntil);
+
+		elog(DEBUG2, "logical replication apply delay: %u ms", diffms);
+
+		/*
+		 * Exit without arming the latch if it's already past time to apply
+		 * this transaction.
+		 */
+		if (diffms <= 0)
+			break;
+
+		WaitLatch(MyLatch,
+				  WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
+				  diffms,
+				  WAIT_EVENT_RECOVERY_APPLY_DELAY);
+		ResetLatch(MyLatch);
+
+		CHECK_FOR_INTERRUPTS();
+	}
+
+	/*
+	 * Delay applied. Reset state.
+	 */
+	MySubscriptionApplyDelayUntil = 0;
+}
+
 /*
  * Handle BEGIN message.
  */
@@ -789,6 +832,11 @@ apply_handle_begin(StringInfo s)
 	logicalrep_read_begin(s, &begin_data);
 	set_apply_error_context_xact(begin_data.xid, begin_data.committime);
 
+	/* set apply delay */
+	if (MySubscription->applydelay > 0)
+		MySubscriptionApplyDelayUntil = TimestampTzPlusMilliseconds(TimestampTzGetDatum(begin_data.committime),
+													MySubscription->applydelay);
+
 	remote_final_lsn = begin_data.final_lsn;
 
 	in_remote_transaction = true;
diff --git a/src/backend/utils/adt/timestamp.c b/src/backend/utils/adt/timestamp.c
index ae36ff3328..1eda3ed57c 100644
--- a/src/backend/utils/adt/timestamp.c
+++ b/src/backend/utils/adt/timestamp.c
@@ -2379,6 +2379,14 @@ interval_cmp_internal(Interval *interval1, Interval *interval2)
 	return int128_compare(span1, span2);
 }
 
+int64
+interval_to_ms(const Interval *interval)
+{
+	INT128		span = interval_cmp_value(interval) / 1000;
+
+	return span;
+}
+
 Datum
 interval_eq(PG_FUNCTION_ARGS)
 {
diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c
index e69dcf8a48..08fc4068cb 100644
--- a/src/bin/pg_dump/pg_dump.c
+++ b/src/bin/pg_dump/pg_dump.c
@@ -4298,6 +4298,7 @@ getSubscriptions(Archive *fout)
 	int			i_subsynccommit;
 	int			i_subpublications;
 	int			i_subbinary;
+	int			i_subapplydelay;
 	int			i,
 				ntups;
 
@@ -4340,12 +4341,17 @@ getSubscriptions(Archive *fout)
 		appendPQExpBufferStr(query, " false AS substream,\n");
 
 	if (fout->remoteVersion >= 150000)
-		appendPQExpBufferStr(query, " s.subtwophasestate\n");
+		appendPQExpBufferStr(query, " s.subtwophasestate,\n");
 	else
 		appendPQExpBuffer(query,
-						  " '%c' AS subtwophasestate\n",
+						  " '%c' AS subtwophasestate,\n",
 						  LOGICALREP_TWOPHASE_STATE_DISABLED);
 
+	if (fout->remoteVersion >= 150000)
+		appendPQExpBufferStr(query, " s.subapplydelay\n");
+	else
+		appendPQExpBufferStr(query, " 0 AS subapplydelay\n");
+
 	appendPQExpBufferStr(query,
 						 "FROM pg_subscription s\n"
 						 "WHERE s.subdbid = (SELECT oid FROM pg_database\n"
@@ -4366,6 +4372,7 @@ getSubscriptions(Archive *fout)
 	i_subbinary = PQfnumber(res, "subbinary");
 	i_substream = PQfnumber(res, "substream");
 	i_subtwophasestate = PQfnumber(res, "subtwophasestate");
+	i_subapplydelay = PQfnumber(res, "subapplydelay");
 
 	subinfo = pg_malloc(ntups * sizeof(SubscriptionInfo));
 
@@ -4393,6 +4400,8 @@ getSubscriptions(Archive *fout)
 			pg_strdup(PQgetvalue(res, i, i_substream));
 		subinfo[i].subtwophasestate =
 			pg_strdup(PQgetvalue(res, i, i_subtwophasestate));
+		subinfo[i].subapplydelay =
+			strtoi64(PQgetvalue(res, i, i_subapplydelay), NULL, 10);
 
 		/* Decide whether we want to dump it */
 		selectDumpableObject(&(subinfo[i].dobj), fout);
@@ -4466,6 +4475,9 @@ dumpSubscription(Archive *fout, const SubscriptionInfo *subinfo)
 	if (strcmp(subinfo->subsynccommit, "off") != 0)
 		appendPQExpBuffer(query, ", synchronous_commit = %s", fmtId(subinfo->subsynccommit));
 
+	if (subinfo->subapplydelay > 0)
+		appendPQExpBuffer(query, ", apply_delay = '" INT64_FORMAT " ms'", subinfo->subapplydelay);
+
 	appendPQExpBufferStr(query, ");\n");
 
 	if (subinfo->dobj.dump & DUMP_COMPONENT_DEFINITION)
diff --git a/src/bin/pg_dump/pg_dump.h b/src/bin/pg_dump/pg_dump.h
index 997a3b6071..d3d7ae4587 100644
--- a/src/bin/pg_dump/pg_dump.h
+++ b/src/bin/pg_dump/pg_dump.h
@@ -658,6 +658,7 @@ typedef struct _SubscriptionInfo
 	char	   *substream;
 	char	   *subtwophasestate;
 	char	   *subsynccommit;
+	int64		subapplydelay;
 	char	   *subpublications;
 } SubscriptionInfo;
 
diff --git a/src/bin/psql/describe.c b/src/bin/psql/describe.c
index e3382933d9..99ec85db2d 100644
--- a/src/bin/psql/describe.c
+++ b/src/bin/psql/describe.c
@@ -6084,7 +6084,7 @@ describeSubscriptions(const char *pattern, bool verbose)
 	PGresult   *res;
 	printQueryOpt myopt = pset.popt;
 	static const bool translate_columns[] = {false, false, false, false,
-	false, false, false, false, false};
+	false, false, false, false, false, false};
 
 	if (pset.sversion < 100000)
 	{
@@ -6124,6 +6124,12 @@ describeSubscriptions(const char *pattern, bool verbose)
 							  ", subtwophasestate AS \"%s\"\n",
 							  gettext_noop("Two phase commit"));
 
+		/* apply_delay is only supported in v15 and higher */
+		if (pset.sversion >= 150000)
+			appendPQExpBuffer(&buf,
+							  ", subapplydelay AS \"%s\"\n",
+							  gettext_noop("Apply delay"));
+
 		appendPQExpBuffer(&buf,
 						  ",  subsynccommit AS \"%s\"\n"
 						  ",  subconninfo AS \"%s\"\n",
diff --git a/src/include/catalog/pg_subscription.h b/src/include/catalog/pg_subscription.h
index 18c291289f..57d8472b6e 100644
--- a/src/include/catalog/pg_subscription.h
+++ b/src/include/catalog/pg_subscription.h
@@ -67,6 +67,8 @@ CATALOG(pg_subscription,6100,SubscriptionRelationId) BKI_SHARED_RELATION BKI_ROW
 
 	char		subtwophasestate;	/* Stream two-phase transactions */
 
+	int64		subapplydelay;		/* Replication apply delay */
+
 #ifdef CATALOG_VARLEN			/* variable-length fields start here */
 	/* Connection string to the publisher */
 	text		subconninfo BKI_FORCE_NOT_NULL;
@@ -103,6 +105,7 @@ typedef struct Subscription
 								 * binary format */
 	bool		stream;			/* Allow streaming in-progress transactions. */
 	char		twophasestate;	/* Allow streaming two-phase transactions */
+	int64		applydelay;		/* Replication apply delay */
 	char	   *conninfo;		/* Connection string to the publisher */
 	char	   *slotname;		/* Name of the replication slot */
 	char	   *synccommit;		/* Synchronous commit setting for worker */
diff --git a/src/include/utils/timestamp.h b/src/include/utils/timestamp.h
index c1a74f8e2b..58a6e6b6da 100644
--- a/src/include/utils/timestamp.h
+++ b/src/include/utils/timestamp.h
@@ -78,6 +78,8 @@ extern bool TimestampDifferenceExceeds(TimestampTz start_time,
 									   TimestampTz stop_time,
 									   int msec);
 
+extern int64 interval_to_ms(const Interval *interval);
+
 extern TimestampTz time_t_to_timestamptz(pg_time_t tm);
 extern pg_time_t timestamptz_to_time_t(TimestampTz t);
 
diff --git a/src/test/regress/expected/subscription.out b/src/test/regress/expected/subscription.out
index 80aae83562..bb6866d160 100644
--- a/src/test/regress/expected/subscription.out
+++ b/src/test/regress/expected/subscription.out
@@ -76,10 +76,10 @@ ALTER SUBSCRIPTION regress_testsub CONNECTION 'foobar';
 ERROR:  invalid connection string syntax: missing "=" after "foobar" in connection info string
 
 \dRs+
-                                                                     List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit |          Conninfo           
------------------+---------------------------+---------+-------------+--------+-----------+------------------+--------------------+-----------------------------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f         | d                | off                | dbname=regress_doesnotexist
+                                                                            List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Apply delay | Synchronous commit |          Conninfo           
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+-------------+--------------------+-----------------------------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f         | d                |           0 | off                | dbname=regress_doesnotexist
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET PUBLICATION testpub2, testpub3 WITH (refresh = false);
@@ -94,10 +94,10 @@ ERROR:  subscription "regress_doesnotexist" does not exist
 ALTER SUBSCRIPTION regress_testsub SET (create_slot = false);
 ERROR:  unrecognized subscription parameter: "create_slot"
 \dRs+
-                                                                          List of subscriptions
-      Name       |           Owner           | Enabled |     Publication     | Binary | Streaming | Two phase commit | Synchronous commit |           Conninfo           
------------------+---------------------------+---------+---------------------+--------+-----------+------------------+--------------------+------------------------------
- regress_testsub | regress_subscription_user | f       | {testpub2,testpub3} | f      | f         | d                | off                | dbname=regress_doesnotexist2
+                                                                                 List of subscriptions
+      Name       |           Owner           | Enabled |     Publication     | Binary | Streaming | Two phase commit | Apply delay | Synchronous commit |           Conninfo           
+-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+-------------+--------------------+------------------------------
+ regress_testsub | regress_subscription_user | f       | {testpub2,testpub3} | f      | f         | d                |           0 | off                | dbname=regress_doesnotexist2
 (1 row)
 
 BEGIN;
@@ -129,10 +129,10 @@ ALTER SUBSCRIPTION regress_testsub_foo SET (synchronous_commit = foobar);
 ERROR:  invalid value for parameter "synchronous_commit": "foobar"
 HINT:  Available values: local, remote_write, remote_apply, on, off.
 \dRs+
-                                                                            List of subscriptions
-        Name         |           Owner           | Enabled |     Publication     | Binary | Streaming | Two phase commit | Synchronous commit |           Conninfo           
----------------------+---------------------------+---------+---------------------+--------+-----------+------------------+--------------------+------------------------------
- regress_testsub_foo | regress_subscription_user | f       | {testpub2,testpub3} | f      | f         | d                | local              | dbname=regress_doesnotexist2
+                                                                                   List of subscriptions
+        Name         |           Owner           | Enabled |     Publication     | Binary | Streaming | Two phase commit | Apply delay | Synchronous commit |           Conninfo           
+---------------------+---------------------------+---------+---------------------+--------+-----------+------------------+-------------+--------------------+------------------------------
+ regress_testsub_foo | regress_subscription_user | f       | {testpub2,testpub3} | f      | f         | d                |           0 | local              | dbname=regress_doesnotexist2
 (1 row)
 
 -- rename back to keep the rest simple
@@ -165,19 +165,19 @@ ERROR:  binary requires a Boolean value
 CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, binary = true);
 WARNING:  tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables
 \dRs+
-                                                                     List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit |          Conninfo           
------------------+---------------------------+---------+-------------+--------+-----------+------------------+--------------------+-----------------------------
- regress_testsub | regress_subscription_user | f       | {testpub}   | t      | f         | d                | off                | dbname=regress_doesnotexist
+                                                                            List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Apply delay | Synchronous commit |          Conninfo           
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+-------------+--------------------+-----------------------------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | t      | f         | d                |           0 | off                | dbname=regress_doesnotexist
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (binary = false);
 ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
 \dRs+
-                                                                     List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit |          Conninfo           
------------------+---------------------------+---------+-------------+--------+-----------+------------------+--------------------+-----------------------------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f         | d                | off                | dbname=regress_doesnotexist
+                                                                            List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Apply delay | Synchronous commit |          Conninfo           
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+-------------+--------------------+-----------------------------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f         | d                |           0 | off                | dbname=regress_doesnotexist
 (1 row)
 
 DROP SUBSCRIPTION regress_testsub;
@@ -188,19 +188,19 @@ ERROR:  streaming requires a Boolean value
 CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, streaming = true);
 WARNING:  tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables
 \dRs+
-                                                                     List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit |          Conninfo           
------------------+---------------------------+---------+-------------+--------+-----------+------------------+--------------------+-----------------------------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | t         | d                | off                | dbname=regress_doesnotexist
+                                                                            List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Apply delay | Synchronous commit |          Conninfo           
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+-------------+--------------------+-----------------------------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | t         | d                |           0 | off                | dbname=regress_doesnotexist
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (streaming = false);
 ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
 \dRs+
-                                                                     List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit |          Conninfo           
------------------+---------------------------+---------+-------------+--------+-----------+------------------+--------------------+-----------------------------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f         | d                | off                | dbname=regress_doesnotexist
+                                                                            List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Apply delay | Synchronous commit |          Conninfo           
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+-------------+--------------------+-----------------------------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f         | d                |           0 | off                | dbname=regress_doesnotexist
 (1 row)
 
 -- fail - publication already exists
@@ -215,10 +215,10 @@ ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub1, testpub2 WITH (refr
 ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub1, testpub2 WITH (refresh = false);
 ERROR:  publication "testpub1" is already in subscription "regress_testsub"
 \dRs+
-                                                                             List of subscriptions
-      Name       |           Owner           | Enabled |         Publication         | Binary | Streaming | Two phase commit | Synchronous commit |          Conninfo           
------------------+---------------------------+---------+-----------------------------+--------+-----------+------------------+--------------------+-----------------------------
- regress_testsub | regress_subscription_user | f       | {testpub,testpub1,testpub2} | f      | f         | d                | off                | dbname=regress_doesnotexist
+                                                                                    List of subscriptions
+      Name       |           Owner           | Enabled |         Publication         | Binary | Streaming | Two phase commit | Apply delay | Synchronous commit |          Conninfo           
+-----------------+---------------------------+---------+-----------------------------+--------+-----------+------------------+-------------+--------------------+-----------------------------
+ regress_testsub | regress_subscription_user | f       | {testpub,testpub1,testpub2} | f      | f         | d                |           0 | off                | dbname=regress_doesnotexist
 (1 row)
 
 -- fail - publication used more then once
@@ -233,10 +233,10 @@ ERROR:  publication "testpub3" is not in subscription "regress_testsub"
 -- ok - delete publications
 ALTER SUBSCRIPTION regress_testsub DROP PUBLICATION testpub1, testpub2 WITH (refresh = false);
 \dRs+
-                                                                     List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit |          Conninfo           
------------------+---------------------------+---------+-------------+--------+-----------+------------------+--------------------+-----------------------------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f         | d                | off                | dbname=regress_doesnotexist
+                                                                            List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Apply delay | Synchronous commit |          Conninfo           
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+-------------+--------------------+-----------------------------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f         | d                |           0 | off                | dbname=regress_doesnotexist
 (1 row)
 
 DROP SUBSCRIPTION regress_testsub;
@@ -270,10 +270,10 @@ ERROR:  two_phase requires a Boolean value
 CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, two_phase = true);
 WARNING:  tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables
 \dRs+
-                                                                     List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit |          Conninfo           
------------------+---------------------------+---------+-------------+--------+-----------+------------------+--------------------+-----------------------------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f         | p                | off                | dbname=regress_doesnotexist
+                                                                            List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Apply delay | Synchronous commit |          Conninfo           
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+-------------+--------------------+-----------------------------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f         | p                |           0 | off                | dbname=regress_doesnotexist
 (1 row)
 
 --fail - alter of two_phase option not supported.
@@ -282,10 +282,10 @@ ERROR:  unrecognized subscription parameter: "two_phase"
 -- but can alter streaming when two_phase enabled
 ALTER SUBSCRIPTION regress_testsub SET (streaming = true);
 \dRs+
-                                                                     List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit |          Conninfo           
------------------+---------------------------+---------+-------------+--------+-----------+------------------+--------------------+-----------------------------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | t         | p                | off                | dbname=regress_doesnotexist
+                                                                            List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Apply delay | Synchronous commit |          Conninfo           
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+-------------+--------------------+-----------------------------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | t         | p                |           0 | off                | dbname=regress_doesnotexist
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
@@ -294,10 +294,10 @@ DROP SUBSCRIPTION regress_testsub;
 CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, streaming = true, two_phase = true);
 WARNING:  tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables
 \dRs+
-                                                                     List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit |          Conninfo           
------------------+---------------------------+---------+-------------+--------+-----------+------------------+--------------------+-----------------------------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | t         | p                | off                | dbname=regress_doesnotexist
+                                                                            List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Apply delay | Synchronous commit |          Conninfo           
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+-------------+--------------------+-----------------------------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | t         | p                |           0 | off                | dbname=regress_doesnotexist
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
diff --git a/src/test/subscription/t/029_apply_delay.pl b/src/test/subscription/t/029_apply_delay.pl
new file mode 100644
index 0000000000..bf9bf2b22d
--- /dev/null
+++ b/src/test/subscription/t/029_apply_delay.pl
@@ -0,0 +1,71 @@
+
+# Copyright (c) 2021, PostgreSQL Global Development Group
+
+# Test replication apply delay
+use strict;
+use warnings;
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More tests => 3;
+
+# Create publisher node
+my $node_publisher = PostgreSQL::Test::Cluster->new('publisher');
+$node_publisher->init(allows_streaming => 'logical');
+$node_publisher->start;
+
+# Create subscriber node
+my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
+$node_subscriber->init(allows_streaming => 'logical');
+$node_subscriber->append_conf('postgresql.conf',
+	"log_min_messages = debug2");
+$node_subscriber->start;
+
+# Create some preexisting content on publisher
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE test_tab (a int primary key, b varchar)");
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO test_tab VALUES (1, 'foo'), (2, 'bar')");
+
+# Setup structure on subscriber
+$node_subscriber->safe_psql('postgres',
+	"CREATE TABLE test_tab (a int primary key, b text, c timestamptz DEFAULT now(), d bigint DEFAULT 999)"
+);
+
+# Setup logical replication
+my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
+$node_publisher->safe_psql('postgres',
+	"CREATE PUBLICATION tap_pub FOR TABLE test_tab");
+
+my $appname = 'tap_sub';
+$node_subscriber->safe_psql('postgres',
+	"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub WITH (apply_delay = '10s')"
+);
+
+$node_publisher->wait_for_catchup($appname);
+
+# Also wait for initial table sync to finish
+my $synced_query =
+  "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');";
+$node_subscriber->poll_query_until('postgres', $synced_query)
+  or die "Timed out while waiting for subscriber to synchronize data";
+
+my $result =
+  $node_subscriber->safe_psql('postgres',
+	"SELECT count(*), min(a), max(a) FROM test_tab");
+is($result, qq(2|1|2), 'check initial data was copied to subscriber');
+
+# new row to trigger apply delay
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO test_tab VALUES (3, 'baz')");
+
+$node_publisher->wait_for_catchup($appname);
+
+$result =
+  $node_subscriber->safe_psql('postgres',
+	"SELECT count(*), min(a), max(a) FROM test_tab");
+is($result, qq(3|1|3), 'check the new row was applied to subscriber');
+
+my $logfile = slurp_file($node_subscriber->logfile());
+ok( $logfile =~
+	  qr/logical replication apply delay/,
+	'check if replication apply delay is triggered');
-- 
2.30.2

Reply via email to