On Wed, Sep 22, 2021, at 1:57 PM, Euler Taveira wrote: > On Wed, Sep 22, 2021, at 1:18 AM, Amit Kapila wrote: >> On Tue, Sep 21, 2021 at 4:21 PM Marcos Pegoraro <mar...@f10.com.br> wrote: >>> No, I´m talking about that configuration you can have on standby servers >>> recovery_min_apply_delay = '8h' >>> >> >> oh okay, I think this can be useful in some cases where we want to avoid >> data loss similar to its use for physical standby. For example, if the user >> has by mistake truncated the table (or deleted some required data) on the >> publisher, we can always it from the subscriber if we have such a feature. >> >> Having said that, I am not sure if we can call it a restriction. It is more >> of a TODO kind of thing. It doesn't sound advisable to me to keep growing >> the current Restrictions page [1]. > It is a new feature. pglogical supports it and it is useful for delayed > secondary server and if, for some business reason, you have to delay when data > is available. There might be other use cases but these are the ones I > regularly > heard from customers. > > BTW, I have a WIP patch for this feature. I didn't have enough time to post it > because it lacks documentation and tests. I'm planning to do it as soon as > this > CF ends. Long time, no patch. Here it is. I will provide documentation in the next version. I would appreciate some feedback.
-- Euler Taveira EDB https://www.enterprisedb.com/
From 9635fec1a031b82ec5d67cdfe16aa1f553ffa936 Mon Sep 17 00:00:00 2001 From: Euler Taveira <euler.tave...@enterprisedb.com> Date: Sat, 6 Nov 2021 11:31:10 -0300 Subject: [PATCH v1] Time-delayed logical replication subscriber Similar to physical replication, a time-delayed copy of the data is useful for some scenarios (specially to fix errors that might cause data loss). If the subscriber sets apply_delay parameter, the logical replication worker will delay the transaction commit for apply_delay milliseconds. Discussion: https://postgr.es/m/CAB-JLwYOYwL=xtyaxkih5ctm_vm8kjkh7aaitckvmch4rzr...@mail.gmail.com --- src/backend/catalog/pg_subscription.c | 1 + src/backend/catalog/system_views.sql | 3 +- src/backend/commands/subscriptioncmds.c | 44 +++++++++- src/backend/replication/logical/worker.c | 48 +++++++++++ src/backend/utils/adt/timestamp.c | 8 ++ src/bin/pg_dump/pg_dump.c | 16 +++- src/bin/pg_dump/pg_dump.h | 1 + src/bin/psql/describe.c | 8 +- src/include/catalog/pg_subscription.h | 3 + src/include/utils/timestamp.h | 2 + src/test/regress/expected/subscription.out | 96 +++++++++++----------- src/test/subscription/t/029_apply_delay.pl | 71 ++++++++++++++++ 12 files changed, 248 insertions(+), 53 deletions(-) create mode 100644 src/test/subscription/t/029_apply_delay.pl diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c index ca65a8bd20..0788384579 100644 --- a/src/backend/catalog/pg_subscription.c +++ b/src/backend/catalog/pg_subscription.c @@ -69,6 +69,7 @@ GetSubscription(Oid subid, bool missing_ok) sub->binary = subform->subbinary; sub->stream = subform->substream; sub->twophasestate = subform->subtwophasestate; + sub->applydelay = subform->subapplydelay; /* Get conninfo */ datum = SysCacheGetAttr(SUBSCRIPTIONOID, diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index 3cb69b1f87..1cc0d86f2e 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -1261,7 +1261,8 @@ REVOKE ALL ON pg_replication_origin_status FROM public; -- All columns of pg_subscription except subconninfo are publicly readable. REVOKE ALL ON pg_subscription FROM public; GRANT SELECT (oid, subdbid, subname, subowner, subenabled, subbinary, - substream, subtwophasestate, subslotname, subsynccommit, subpublications) + substream, subtwophasestate, subslotname, subsynccommit, + subapplydelay, subpublications) ON pg_subscription TO public; CREATE VIEW pg_stat_subscription_workers AS diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index 3ef6607d24..19916f04a8 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -46,6 +46,7 @@ #include "utils/lsyscache.h" #include "utils/memutils.h" #include "utils/syscache.h" +#include "utils/timestamp.h" /* * Options that can be specified by the user in CREATE/ALTER SUBSCRIPTION @@ -61,6 +62,7 @@ #define SUBOPT_BINARY 0x00000080 #define SUBOPT_STREAMING 0x00000100 #define SUBOPT_TWOPHASE_COMMIT 0x00000200 +#define SUBOPT_APPLY_DELAY 0x00000400 /* check if the 'val' has 'bits' set */ #define IsSet(val, bits) (((val) & (bits)) == (bits)) @@ -82,6 +84,7 @@ typedef struct SubOpts bool binary; bool streaming; bool twophase; + int64 apply_delay; } SubOpts; static List *fetch_table_list(WalReceiverConn *wrconn, List *publications); @@ -249,12 +252,34 @@ parse_subscription_options(ParseState *pstate, List *stmt_options, opts->specified_opts |= SUBOPT_TWOPHASE_COMMIT; opts->twophase = defGetBoolean(defel); } + else if (strcmp(defel->defname, "apply_delay") == 0) + { + char *val; + Interval *interval; + + if (IsSet(opts->specified_opts, SUBOPT_APPLY_DELAY)) + errorConflictingDefElem(defel, pstate); + + opts->specified_opts |= SUBOPT_APPLY_DELAY; + val = defGetString(defel); + + interval = DatumGetIntervalP(DirectFunctionCall3(interval_in, + CStringGetDatum(val), + ObjectIdGetDatum(InvalidOid), + Int32GetDatum(-1))); + opts->apply_delay = interval_to_ms(interval); + } else ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("unrecognized subscription parameter: \"%s\"", defel->defname))); } + if (opts->apply_delay < 0) + ereport(ERROR, + errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE), + errmsg("option \"%s\" must not be negative", "apply_delay")); + /* * We've been explicitly asked to not connect, that requires some * additional processing. @@ -390,7 +415,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, supported_opts = (SUBOPT_CONNECT | SUBOPT_ENABLED | SUBOPT_CREATE_SLOT | SUBOPT_SLOT_NAME | SUBOPT_COPY_DATA | SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY | - SUBOPT_STREAMING | SUBOPT_TWOPHASE_COMMIT); + SUBOPT_STREAMING | SUBOPT_TWOPHASE_COMMIT | + SUBOPT_APPLY_DELAY); parse_subscription_options(pstate, stmt->options, supported_opts, &opts); /* @@ -464,6 +490,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, CharGetDatum(opts.twophase ? LOGICALREP_TWOPHASE_STATE_PENDING : LOGICALREP_TWOPHASE_STATE_DISABLED); + values[Anum_pg_subscription_subapplydelay - 1] = Int64GetDatum(opts.apply_delay); values[Anum_pg_subscription_subconninfo - 1] = CStringGetTextDatum(conninfo); if (opts.slot_name) @@ -913,6 +940,13 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, replaces[Anum_pg_subscription_substream - 1] = true; } + if (IsSet(opts.specified_opts, SUBOPT_APPLY_DELAY)) + { + values[Anum_pg_subscription_subapplydelay - 1] = + Int64GetDatum(opts.apply_delay); + replaces[Anum_pg_subscription_subapplydelay - 1] = true; + } + update_tuple = true; break; } @@ -935,6 +969,14 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, if (opts.enabled) ApplyLauncherWakeupAtCommit(); + /* + * If this subscription has been disabled and it has an apply + * delay set, wake up the logical replication worker to finish + * it as soon as possible. + */ + if (!opts.enabled && sub->applydelay > 0) + logicalrep_worker_wakeup(sub->oid, InvalidOid); + update_tuple = true; break; } diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 5d9acc6173..39231d464e 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -248,6 +248,7 @@ WalReceiverConn *LogRepWorkerWalRcvConn = NULL; Subscription *MySubscription = NULL; bool MySubscriptionValid = false; +TimestampTz MySubscriptionApplyDelayUntil = 0; bool in_remote_transaction = false; static XLogRecPtr remote_final_lsn = InvalidXLogRecPtr; @@ -303,6 +304,8 @@ static void store_flush_position(XLogRecPtr remote_lsn); static void maybe_reread_subscription(void); +static void apply_delay(void); + /* prototype needed because of stream_commit */ static void apply_dispatch(StringInfo s); @@ -373,6 +376,9 @@ begin_replication_step(void) { SetCurrentStatementStartTimestamp(); + /* delay the current transaction? */ + apply_delay(); + if (!IsTransactionState()) { StartTransactionCommand(); @@ -778,6 +784,43 @@ slot_modify_data(TupleTableSlot *slot, TupleTableSlot *srcslot, ExecStoreVirtualTuple(slot); } +static void +apply_delay(void) +{ + /* nothing to do if no delay set */ + if (MySubscription->applydelay <= 0) + return; + + while (true) + { + int diffms; + + diffms = TimestampDifferenceMilliseconds(GetCurrentTimestamp(), MySubscriptionApplyDelayUntil); + + elog(DEBUG2, "logical replication apply delay: %u ms", diffms); + + /* + * Exit without arming the latch if it's already past time to apply + * this transaction. + */ + if (diffms <= 0) + break; + + WaitLatch(MyLatch, + WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, + diffms, + WAIT_EVENT_RECOVERY_APPLY_DELAY); + ResetLatch(MyLatch); + + CHECK_FOR_INTERRUPTS(); + } + + /* + * Delay applied. Reset state. + */ + MySubscriptionApplyDelayUntil = 0; +} + /* * Handle BEGIN message. */ @@ -789,6 +832,11 @@ apply_handle_begin(StringInfo s) logicalrep_read_begin(s, &begin_data); set_apply_error_context_xact(begin_data.xid, begin_data.committime); + /* set apply delay */ + if (MySubscription->applydelay > 0) + MySubscriptionApplyDelayUntil = TimestampTzPlusMilliseconds(TimestampTzGetDatum(begin_data.committime), + MySubscription->applydelay); + remote_final_lsn = begin_data.final_lsn; in_remote_transaction = true; diff --git a/src/backend/utils/adt/timestamp.c b/src/backend/utils/adt/timestamp.c index ae36ff3328..1eda3ed57c 100644 --- a/src/backend/utils/adt/timestamp.c +++ b/src/backend/utils/adt/timestamp.c @@ -2379,6 +2379,14 @@ interval_cmp_internal(Interval *interval1, Interval *interval2) return int128_compare(span1, span2); } +int64 +interval_to_ms(const Interval *interval) +{ + INT128 span = interval_cmp_value(interval) / 1000; + + return span; +} + Datum interval_eq(PG_FUNCTION_ARGS) { diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c index e69dcf8a48..08fc4068cb 100644 --- a/src/bin/pg_dump/pg_dump.c +++ b/src/bin/pg_dump/pg_dump.c @@ -4298,6 +4298,7 @@ getSubscriptions(Archive *fout) int i_subsynccommit; int i_subpublications; int i_subbinary; + int i_subapplydelay; int i, ntups; @@ -4340,12 +4341,17 @@ getSubscriptions(Archive *fout) appendPQExpBufferStr(query, " false AS substream,\n"); if (fout->remoteVersion >= 150000) - appendPQExpBufferStr(query, " s.subtwophasestate\n"); + appendPQExpBufferStr(query, " s.subtwophasestate,\n"); else appendPQExpBuffer(query, - " '%c' AS subtwophasestate\n", + " '%c' AS subtwophasestate,\n", LOGICALREP_TWOPHASE_STATE_DISABLED); + if (fout->remoteVersion >= 150000) + appendPQExpBufferStr(query, " s.subapplydelay\n"); + else + appendPQExpBufferStr(query, " 0 AS subapplydelay\n"); + appendPQExpBufferStr(query, "FROM pg_subscription s\n" "WHERE s.subdbid = (SELECT oid FROM pg_database\n" @@ -4366,6 +4372,7 @@ getSubscriptions(Archive *fout) i_subbinary = PQfnumber(res, "subbinary"); i_substream = PQfnumber(res, "substream"); i_subtwophasestate = PQfnumber(res, "subtwophasestate"); + i_subapplydelay = PQfnumber(res, "subapplydelay"); subinfo = pg_malloc(ntups * sizeof(SubscriptionInfo)); @@ -4393,6 +4400,8 @@ getSubscriptions(Archive *fout) pg_strdup(PQgetvalue(res, i, i_substream)); subinfo[i].subtwophasestate = pg_strdup(PQgetvalue(res, i, i_subtwophasestate)); + subinfo[i].subapplydelay = + strtoi64(PQgetvalue(res, i, i_subapplydelay), NULL, 10); /* Decide whether we want to dump it */ selectDumpableObject(&(subinfo[i].dobj), fout); @@ -4466,6 +4475,9 @@ dumpSubscription(Archive *fout, const SubscriptionInfo *subinfo) if (strcmp(subinfo->subsynccommit, "off") != 0) appendPQExpBuffer(query, ", synchronous_commit = %s", fmtId(subinfo->subsynccommit)); + if (subinfo->subapplydelay > 0) + appendPQExpBuffer(query, ", apply_delay = '" INT64_FORMAT " ms'", subinfo->subapplydelay); + appendPQExpBufferStr(query, ");\n"); if (subinfo->dobj.dump & DUMP_COMPONENT_DEFINITION) diff --git a/src/bin/pg_dump/pg_dump.h b/src/bin/pg_dump/pg_dump.h index 997a3b6071..d3d7ae4587 100644 --- a/src/bin/pg_dump/pg_dump.h +++ b/src/bin/pg_dump/pg_dump.h @@ -658,6 +658,7 @@ typedef struct _SubscriptionInfo char *substream; char *subtwophasestate; char *subsynccommit; + int64 subapplydelay; char *subpublications; } SubscriptionInfo; diff --git a/src/bin/psql/describe.c b/src/bin/psql/describe.c index e3382933d9..99ec85db2d 100644 --- a/src/bin/psql/describe.c +++ b/src/bin/psql/describe.c @@ -6084,7 +6084,7 @@ describeSubscriptions(const char *pattern, bool verbose) PGresult *res; printQueryOpt myopt = pset.popt; static const bool translate_columns[] = {false, false, false, false, - false, false, false, false, false}; + false, false, false, false, false, false}; if (pset.sversion < 100000) { @@ -6124,6 +6124,12 @@ describeSubscriptions(const char *pattern, bool verbose) ", subtwophasestate AS \"%s\"\n", gettext_noop("Two phase commit")); + /* apply_delay is only supported in v15 and higher */ + if (pset.sversion >= 150000) + appendPQExpBuffer(&buf, + ", subapplydelay AS \"%s\"\n", + gettext_noop("Apply delay")); + appendPQExpBuffer(&buf, ", subsynccommit AS \"%s\"\n" ", subconninfo AS \"%s\"\n", diff --git a/src/include/catalog/pg_subscription.h b/src/include/catalog/pg_subscription.h index 18c291289f..57d8472b6e 100644 --- a/src/include/catalog/pg_subscription.h +++ b/src/include/catalog/pg_subscription.h @@ -67,6 +67,8 @@ CATALOG(pg_subscription,6100,SubscriptionRelationId) BKI_SHARED_RELATION BKI_ROW char subtwophasestate; /* Stream two-phase transactions */ + int64 subapplydelay; /* Replication apply delay */ + #ifdef CATALOG_VARLEN /* variable-length fields start here */ /* Connection string to the publisher */ text subconninfo BKI_FORCE_NOT_NULL; @@ -103,6 +105,7 @@ typedef struct Subscription * binary format */ bool stream; /* Allow streaming in-progress transactions. */ char twophasestate; /* Allow streaming two-phase transactions */ + int64 applydelay; /* Replication apply delay */ char *conninfo; /* Connection string to the publisher */ char *slotname; /* Name of the replication slot */ char *synccommit; /* Synchronous commit setting for worker */ diff --git a/src/include/utils/timestamp.h b/src/include/utils/timestamp.h index c1a74f8e2b..58a6e6b6da 100644 --- a/src/include/utils/timestamp.h +++ b/src/include/utils/timestamp.h @@ -78,6 +78,8 @@ extern bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec); +extern int64 interval_to_ms(const Interval *interval); + extern TimestampTz time_t_to_timestamptz(pg_time_t tm); extern pg_time_t timestamptz_to_time_t(TimestampTz t); diff --git a/src/test/regress/expected/subscription.out b/src/test/regress/expected/subscription.out index 80aae83562..bb6866d160 100644 --- a/src/test/regress/expected/subscription.out +++ b/src/test/regress/expected/subscription.out @@ -76,10 +76,10 @@ ALTER SUBSCRIPTION regress_testsub CONNECTION 'foobar'; ERROR: invalid connection string syntax: missing "=" after "foobar" in connection info string \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit | Conninfo ------------------+---------------------------+---------+-------------+--------+-----------+------------------+--------------------+----------------------------- - regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | off | dbname=regress_doesnotexist + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Apply delay | Synchronous commit | Conninfo +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+-------------+--------------------+----------------------------- + regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | 0 | off | dbname=regress_doesnotexist (1 row) ALTER SUBSCRIPTION regress_testsub SET PUBLICATION testpub2, testpub3 WITH (refresh = false); @@ -94,10 +94,10 @@ ERROR: subscription "regress_doesnotexist" does not exist ALTER SUBSCRIPTION regress_testsub SET (create_slot = false); ERROR: unrecognized subscription parameter: "create_slot" \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit | Conninfo ------------------+---------------------------+---------+---------------------+--------+-----------+------------------+--------------------+------------------------------ - regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | f | d | off | dbname=regress_doesnotexist2 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Apply delay | Synchronous commit | Conninfo +-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+-------------+--------------------+------------------------------ + regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | f | d | 0 | off | dbname=regress_doesnotexist2 (1 row) BEGIN; @@ -129,10 +129,10 @@ ALTER SUBSCRIPTION regress_testsub_foo SET (synchronous_commit = foobar); ERROR: invalid value for parameter "synchronous_commit": "foobar" HINT: Available values: local, remote_write, remote_apply, on, off. \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit | Conninfo ----------------------+---------------------------+---------+---------------------+--------+-----------+------------------+--------------------+------------------------------ - regress_testsub_foo | regress_subscription_user | f | {testpub2,testpub3} | f | f | d | local | dbname=regress_doesnotexist2 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Apply delay | Synchronous commit | Conninfo +---------------------+---------------------------+---------+---------------------+--------+-----------+------------------+-------------+--------------------+------------------------------ + regress_testsub_foo | regress_subscription_user | f | {testpub2,testpub3} | f | f | d | 0 | local | dbname=regress_doesnotexist2 (1 row) -- rename back to keep the rest simple @@ -165,19 +165,19 @@ ERROR: binary requires a Boolean value CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, binary = true); WARNING: tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit | Conninfo ------------------+---------------------------+---------+-------------+--------+-----------+------------------+--------------------+----------------------------- - regress_testsub | regress_subscription_user | f | {testpub} | t | f | d | off | dbname=regress_doesnotexist + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Apply delay | Synchronous commit | Conninfo +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+-------------+--------------------+----------------------------- + regress_testsub | regress_subscription_user | f | {testpub} | t | f | d | 0 | off | dbname=regress_doesnotexist (1 row) ALTER SUBSCRIPTION regress_testsub SET (binary = false); ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit | Conninfo ------------------+---------------------------+---------+-------------+--------+-----------+------------------+--------------------+----------------------------- - regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | off | dbname=regress_doesnotexist + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Apply delay | Synchronous commit | Conninfo +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+-------------+--------------------+----------------------------- + regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | 0 | off | dbname=regress_doesnotexist (1 row) DROP SUBSCRIPTION regress_testsub; @@ -188,19 +188,19 @@ ERROR: streaming requires a Boolean value CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, streaming = true); WARNING: tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit | Conninfo ------------------+---------------------------+---------+-------------+--------+-----------+------------------+--------------------+----------------------------- - regress_testsub | regress_subscription_user | f | {testpub} | f | t | d | off | dbname=regress_doesnotexist + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Apply delay | Synchronous commit | Conninfo +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+-------------+--------------------+----------------------------- + regress_testsub | regress_subscription_user | f | {testpub} | f | t | d | 0 | off | dbname=regress_doesnotexist (1 row) ALTER SUBSCRIPTION regress_testsub SET (streaming = false); ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit | Conninfo ------------------+---------------------------+---------+-------------+--------+-----------+------------------+--------------------+----------------------------- - regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | off | dbname=regress_doesnotexist + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Apply delay | Synchronous commit | Conninfo +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+-------------+--------------------+----------------------------- + regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | 0 | off | dbname=regress_doesnotexist (1 row) -- fail - publication already exists @@ -215,10 +215,10 @@ ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub1, testpub2 WITH (refr ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub1, testpub2 WITH (refresh = false); ERROR: publication "testpub1" is already in subscription "regress_testsub" \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit | Conninfo ------------------+---------------------------+---------+-----------------------------+--------+-----------+------------------+--------------------+----------------------------- - regress_testsub | regress_subscription_user | f | {testpub,testpub1,testpub2} | f | f | d | off | dbname=regress_doesnotexist + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Apply delay | Synchronous commit | Conninfo +-----------------+---------------------------+---------+-----------------------------+--------+-----------+------------------+-------------+--------------------+----------------------------- + regress_testsub | regress_subscription_user | f | {testpub,testpub1,testpub2} | f | f | d | 0 | off | dbname=regress_doesnotexist (1 row) -- fail - publication used more then once @@ -233,10 +233,10 @@ ERROR: publication "testpub3" is not in subscription "regress_testsub" -- ok - delete publications ALTER SUBSCRIPTION regress_testsub DROP PUBLICATION testpub1, testpub2 WITH (refresh = false); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit | Conninfo ------------------+---------------------------+---------+-------------+--------+-----------+------------------+--------------------+----------------------------- - regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | off | dbname=regress_doesnotexist + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Apply delay | Synchronous commit | Conninfo +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+-------------+--------------------+----------------------------- + regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | 0 | off | dbname=regress_doesnotexist (1 row) DROP SUBSCRIPTION regress_testsub; @@ -270,10 +270,10 @@ ERROR: two_phase requires a Boolean value CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, two_phase = true); WARNING: tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit | Conninfo ------------------+---------------------------+---------+-------------+--------+-----------+------------------+--------------------+----------------------------- - regress_testsub | regress_subscription_user | f | {testpub} | f | f | p | off | dbname=regress_doesnotexist + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Apply delay | Synchronous commit | Conninfo +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+-------------+--------------------+----------------------------- + regress_testsub | regress_subscription_user | f | {testpub} | f | f | p | 0 | off | dbname=regress_doesnotexist (1 row) --fail - alter of two_phase option not supported. @@ -282,10 +282,10 @@ ERROR: unrecognized subscription parameter: "two_phase" -- but can alter streaming when two_phase enabled ALTER SUBSCRIPTION regress_testsub SET (streaming = true); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit | Conninfo ------------------+---------------------------+---------+-------------+--------+-----------+------------------+--------------------+----------------------------- - regress_testsub | regress_subscription_user | f | {testpub} | f | t | p | off | dbname=regress_doesnotexist + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Apply delay | Synchronous commit | Conninfo +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+-------------+--------------------+----------------------------- + regress_testsub | regress_subscription_user | f | {testpub} | f | t | p | 0 | off | dbname=regress_doesnotexist (1 row) ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); @@ -294,10 +294,10 @@ DROP SUBSCRIPTION regress_testsub; CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, streaming = true, two_phase = true); WARNING: tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit | Conninfo ------------------+---------------------------+---------+-------------+--------+-----------+------------------+--------------------+----------------------------- - regress_testsub | regress_subscription_user | f | {testpub} | f | t | p | off | dbname=regress_doesnotexist + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Apply delay | Synchronous commit | Conninfo +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+-------------+--------------------+----------------------------- + regress_testsub | regress_subscription_user | f | {testpub} | f | t | p | 0 | off | dbname=regress_doesnotexist (1 row) ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); diff --git a/src/test/subscription/t/029_apply_delay.pl b/src/test/subscription/t/029_apply_delay.pl new file mode 100644 index 0000000000..bf9bf2b22d --- /dev/null +++ b/src/test/subscription/t/029_apply_delay.pl @@ -0,0 +1,71 @@ + +# Copyright (c) 2021, PostgreSQL Global Development Group + +# Test replication apply delay +use strict; +use warnings; +use PostgreSQL::Test::Cluster; +use PostgreSQL::Test::Utils; +use Test::More tests => 3; + +# Create publisher node +my $node_publisher = PostgreSQL::Test::Cluster->new('publisher'); +$node_publisher->init(allows_streaming => 'logical'); +$node_publisher->start; + +# Create subscriber node +my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber'); +$node_subscriber->init(allows_streaming => 'logical'); +$node_subscriber->append_conf('postgresql.conf', + "log_min_messages = debug2"); +$node_subscriber->start; + +# Create some preexisting content on publisher +$node_publisher->safe_psql('postgres', + "CREATE TABLE test_tab (a int primary key, b varchar)"); +$node_publisher->safe_psql('postgres', + "INSERT INTO test_tab VALUES (1, 'foo'), (2, 'bar')"); + +# Setup structure on subscriber +$node_subscriber->safe_psql('postgres', + "CREATE TABLE test_tab (a int primary key, b text, c timestamptz DEFAULT now(), d bigint DEFAULT 999)" +); + +# Setup logical replication +my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; +$node_publisher->safe_psql('postgres', + "CREATE PUBLICATION tap_pub FOR TABLE test_tab"); + +my $appname = 'tap_sub'; +$node_subscriber->safe_psql('postgres', + "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub WITH (apply_delay = '10s')" +); + +$node_publisher->wait_for_catchup($appname); + +# Also wait for initial table sync to finish +my $synced_query = + "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');"; +$node_subscriber->poll_query_until('postgres', $synced_query) + or die "Timed out while waiting for subscriber to synchronize data"; + +my $result = + $node_subscriber->safe_psql('postgres', + "SELECT count(*), min(a), max(a) FROM test_tab"); +is($result, qq(2|1|2), 'check initial data was copied to subscriber'); + +# new row to trigger apply delay +$node_publisher->safe_psql('postgres', + "INSERT INTO test_tab VALUES (3, 'baz')"); + +$node_publisher->wait_for_catchup($appname); + +$result = + $node_subscriber->safe_psql('postgres', + "SELECT count(*), min(a), max(a) FROM test_tab"); +is($result, qq(3|1|3), 'check the new row was applied to subscriber'); + +my $logfile = slurp_file($node_subscriber->logfile()); +ok( $logfile =~ + qr/logical replication apply delay/, + 'check if replication apply delay is triggered'); -- 2.30.2