I found some additional places that should remove the last-start time from the hash table. I've added those in v14.
On Fri, Jan 06, 2023 at 10:30:18AM +0530, Amit Kapila wrote: > On Thu, Jan 5, 2023 at 10:49 PM Nathan Bossart <nathandboss...@gmail.com> > wrote: >> On Thu, Jan 05, 2023 at 11:34:37AM +0530, Amit Kapila wrote: >> > On Thu, Jan 5, 2023 at 10:16 AM Nathan Bossart <nathandboss...@gmail.com> >> > wrote: >> >> In v12, I moved the restart for two_phase mode to the end of >> >> process_syncing_tables_for_apply() so that we don't need to rely on >> >> another >> >> iteration of the loop. >> > >> > This should work but it is better to add a comment before calling >> > CommandCounterIncrement() to indicate that this is for making changes >> > to the relation state visible. >> >> Will do. > > Isn't it better to move this part into a separate patch as this is > useful even without the main patch to improve wakeups? І moved it to a separate patch in v14. >> > Thinking along similar lines, won't apply worker need to be notified >> > of SUBREL_STATE_SYNCWAIT state change by the tablesync worker? >> >> wait_for_worker_state_change() should notify the apply worker in this case. > > I think this is yet to be included in the patch, right? This is already present on HEAD. -- Nathan Bossart Amazon Web Services: https://aws.amazon.com
>From 955111b35d9d416b8d895649525b25c03a15e653 Mon Sep 17 00:00:00 2001 From: Nathan Bossart <nathandboss...@gmail.com> Date: Thu, 5 Jan 2023 21:17:42 -0800 Subject: [PATCH v14 1/3] move restart for two_phase mode to end of process_syncing_tables_for_apply() --- src/backend/replication/logical/tablesync.c | 48 +++++++++++---------- 1 file changed, 26 insertions(+), 22 deletions(-) diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index 2fdfeb5b4c..280fed8cdb 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -415,6 +415,7 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) static HTAB *last_start_times = NULL; ListCell *lc; bool started_tx = false; + bool should_exit = false; Assert(!IsTransactionState()); @@ -446,28 +447,6 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) last_start_times = NULL; } - /* - * Even when the two_phase mode is requested by the user, it remains as - * 'pending' until all tablesyncs have reached READY state. - * - * When this happens, we restart the apply worker and (if the conditions - * are still ok) then the two_phase tri-state will become 'enabled' at - * that time. - * - * Note: If the subscription has no tables then leave the state as - * PENDING, which allows ALTER SUBSCRIPTION ... REFRESH PUBLICATION to - * work. - */ - if (MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING && - AllTablesyncsReady()) - { - ereport(LOG, - (errmsg("logical replication apply worker for subscription \"%s\" will restart so that two_phase can be enabled", - MySubscription->name))); - - proc_exit(0); - } - /* * Process all tables that are being synchronized. */ @@ -619,9 +598,34 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) if (started_tx) { + /* + * Even when the two_phase mode is requested by the user, it remains as + * 'pending' until all tablesyncs have reached READY state. + * + * When this happens, we restart the apply worker and (if the conditions + * are still ok) then the two_phase tri-state will become 'enabled' at + * that time. + * + * Note: If the subscription has no tables then leave the state as + * PENDING, which allows ALTER SUBSCRIPTION ... REFRESH PUBLICATION to + * work. + */ + CommandCounterIncrement(); /* make updates visible */ + if (MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING && + AllTablesyncsReady()) + { + ereport(LOG, + (errmsg("logical replication apply worker for subscription \"%s\" will restart so that two_phase can be enabled", + MySubscription->name))); + should_exit = true; + } + CommitTransactionCommand(); pgstat_report_stat(true); } + + if (should_exit) + proc_exit(0); } /* -- 2.25.1
>From 3018a4029b573c69f6674b86735d0d94c1350eb4 Mon Sep 17 00:00:00 2001 From: Nathan Bossart <nathandboss...@gmail.com> Date: Mon, 21 Nov 2022 16:01:01 -0800 Subject: [PATCH v14 2/3] wake up logical workers as needed instead of relying on periodic wakeups --- src/backend/access/transam/xact.c | 3 ++ src/backend/commands/alter.c | 7 ++++ src/backend/commands/subscriptioncmds.c | 7 ++++ src/backend/replication/logical/worker.c | 46 ++++++++++++++++++++++++ src/include/replication/logicalworker.h | 3 ++ 5 files changed, 66 insertions(+) diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index 24221542e7..54145bf805 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -47,6 +47,7 @@ #include "pgstat.h" #include "replication/logical.h" #include "replication/logicallauncher.h" +#include "replication/logicalworker.h" #include "replication/origin.h" #include "replication/snapbuild.h" #include "replication/syncrep.h" @@ -2360,6 +2361,7 @@ CommitTransaction(void) AtEOXact_PgStat(true, is_parallel_worker); AtEOXact_Snapshot(true, false); AtEOXact_ApplyLauncher(true); + AtEOXact_LogicalRepWorkers(true); pgstat_report_xact_timestamp(0); CurrentResourceOwner = NULL; @@ -2860,6 +2862,7 @@ AbortTransaction(void) AtEOXact_HashTables(false); AtEOXact_PgStat(false, is_parallel_worker); AtEOXact_ApplyLauncher(false); + AtEOXact_LogicalRepWorkers(false); pgstat_report_xact_timestamp(0); } diff --git a/src/backend/commands/alter.c b/src/backend/commands/alter.c index 70d359eb6a..4e8102b59f 100644 --- a/src/backend/commands/alter.c +++ b/src/backend/commands/alter.c @@ -59,6 +59,7 @@ #include "commands/user.h" #include "miscadmin.h" #include "parser/parse_func.h" +#include "replication/logicalworker.h" #include "rewrite/rewriteDefine.h" #include "tcop/utility.h" #include "utils/builtins.h" @@ -279,6 +280,12 @@ AlterObjectRename_internal(Relation rel, Oid objectId, const char *new_name) if (strncmp(new_name, "regress_", 8) != 0) elog(WARNING, "subscriptions created by regression test cases should have names starting with \"regress_\""); #endif + + /* + * Wake up the logical replication workers to handle this change + * quickly. + */ + LogicalRepWorkersWakeupAtCommit(objectId); } else if (nameCacheId >= 0) { diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index b9c5df796f..c0fd82cdf2 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -34,6 +34,7 @@ #include "nodes/makefuncs.h" #include "pgstat.h" #include "replication/logicallauncher.h" +#include "replication/logicalworker.h" #include "replication/origin.h" #include "replication/slot.h" #include "replication/walreceiver.h" @@ -1362,6 +1363,9 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, InvokeObjectPostAlterHook(SubscriptionRelationId, subid, 0); + /* Wake up the logical replication workers to handle this change quickly. */ + LogicalRepWorkersWakeupAtCommit(subid); + return myself; } @@ -1733,6 +1737,9 @@ AlterSubscriptionOwner_internal(Relation rel, HeapTuple tup, Oid newOwnerId) form->oid, 0); ApplyLauncherWakeupAtCommit(); + + /* Wake up the logical replication workers to handle this change quickly. */ + LogicalRepWorkersWakeupAtCommit(form->oid); } /* diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index f8e8cf71eb..cf38ed821e 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -254,6 +254,8 @@ WalReceiverConn *LogRepWorkerWalRcvConn = NULL; Subscription *MySubscription = NULL; static bool MySubscriptionValid = false; +static List *on_commit_wakeup_workers_subids = NIL; + bool in_remote_transaction = false; static XLogRecPtr remote_final_lsn = InvalidXLogRecPtr; @@ -4092,3 +4094,47 @@ reset_apply_error_context_info(void) apply_error_callback_arg.remote_attnum = -1; set_apply_error_context_xact(InvalidTransactionId, InvalidXLogRecPtr); } + +/* + * Wakeup the stored subscriptions' workers on commit if requested. + */ +void +AtEOXact_LogicalRepWorkers(bool isCommit) +{ + if (isCommit && on_commit_wakeup_workers_subids != NIL) + { + ListCell *subid; + + LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); + foreach(subid, on_commit_wakeup_workers_subids) + { + List *workers; + ListCell *worker; + + workers = logicalrep_workers_find(lfirst_oid(subid), true); + foreach(worker, workers) + logicalrep_worker_wakeup_ptr((LogicalRepWorker *) lfirst(worker)); + } + LWLockRelease(LogicalRepWorkerLock); + } + + on_commit_wakeup_workers_subids = NIL; +} + +/* + * Request wakeup of the workers for the given subscription ID on commit of the + * transaction. + * + * This is used to ensure that the workers process assorted changes as soon as + * possible. + */ +void +LogicalRepWorkersWakeupAtCommit(Oid subid) +{ + MemoryContext oldcxt; + + oldcxt = MemoryContextSwitchTo(TopTransactionContext); + on_commit_wakeup_workers_subids = list_append_unique_oid(on_commit_wakeup_workers_subids, + subid); + MemoryContextSwitchTo(oldcxt); +} diff --git a/src/include/replication/logicalworker.h b/src/include/replication/logicalworker.h index f1e7e8a348..3b2084f81f 100644 --- a/src/include/replication/logicalworker.h +++ b/src/include/replication/logicalworker.h @@ -16,4 +16,7 @@ extern void ApplyWorkerMain(Datum main_arg); extern bool IsLogicalWorker(void); +extern void LogicalRepWorkersWakeupAtCommit(Oid subid); +extern void AtEOXact_LogicalRepWorkers(bool isCommit); + #endif /* LOGICALWORKER_H */ -- 2.25.1
>From a1354698bb06ed645595a1c233e506a16ae3108c Mon Sep 17 00:00:00 2001 From: Nathan Bossart <nathandboss...@gmail.com> Date: Thu, 5 Jan 2023 14:22:53 -0800 Subject: [PATCH v14 3/3] bypass wal_retrieve_retry_interval for logical workers as appropriate --- doc/src/sgml/monitoring.sgml | 10 + src/backend/commands/subscriptioncmds.c | 8 + src/backend/replication/logical/launcher.c | 201 +++++++++++++++----- src/backend/replication/logical/tablesync.c | 7 + src/backend/replication/logical/worker.c | 18 ++ src/backend/storage/lmgr/lwlock.c | 4 + src/include/replication/logicallauncher.h | 2 + src/include/storage/lwlock.h | 2 + 8 files changed, 207 insertions(+), 45 deletions(-) diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml index 5bcba0fdec..8f06e234c8 100644 --- a/doc/src/sgml/monitoring.sgml +++ b/doc/src/sgml/monitoring.sgml @@ -1991,6 +1991,16 @@ postgres 27093 0.0 0.0 30096 2752 ? Ss 11:34 0:00 postgres: ser <entry>Waiting to read or update information about <quote>heavyweight</quote> locks.</entry> </row> + <row> + <entry><literal>LogicalRepLauncherDSA</literal></entry> + <entry>Waiting for logical replication launcher dynamic shared memory + allocator access</entry> + </row> + <row> + <entry><literal>LogicalRepLauncherHash</literal></entry> + <entry>Waiting for logical replication launcher shared memory hash table + access</entry> + </row> <row> <entry><literal>LogicalRepWorker</literal></entry> <entry>Waiting to read or update the state of logical replication diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index c0fd82cdf2..6c2c10712e 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -1504,6 +1504,14 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel) } list_free(subworkers); + /* + * Clear the last-start time for the apply worker to free up space. If + * this transaction rolls back, the launcher might restart the apply worker + * before wal_retrieve_retry_interval milliseconds have elapsed, but that's + * probably okay. + */ + logicalrep_launcher_delete_last_start_time(subid); + /* * Cleanup of tablesync replication origins. * diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c index a69e371c05..dfe49db64f 100644 --- a/src/backend/replication/logical/launcher.c +++ b/src/backend/replication/logical/launcher.c @@ -25,6 +25,7 @@ #include "catalog/pg_subscription.h" #include "catalog/pg_subscription_rel.h" #include "funcapi.h" +#include "lib/dshash.h" #include "libpq/pqsignal.h" #include "miscadmin.h" #include "pgstat.h" @@ -56,6 +57,25 @@ int max_logical_replication_workers = 4; int max_sync_workers_per_subscription = 2; +/* an entry in the last-start times hash table */ +typedef struct LauncherLastStartTimesEntry +{ + Oid subid; + TimestampTz last_start_time; +} LauncherLastStartTimesEntry; + +/* parameters for the last-start times hash table */ +static const dshash_parameters dsh_params = { + sizeof(Oid), + sizeof(LauncherLastStartTimesEntry), + dshash_memcmp, + dshash_memhash, + LWTRANCHE_LAUNCHER_HASH +}; + +static dsa_area *last_start_times_dsa = NULL; +static dshash_table *last_start_times = NULL; + LogicalRepWorker *MyLogicalRepWorker = NULL; typedef struct LogicalRepCtxStruct @@ -63,6 +83,10 @@ typedef struct LogicalRepCtxStruct /* Supervisor process. */ pid_t launcher_pid; + /* hash table for last-start times */ + dsa_handle last_start_dsa; + dshash_table_handle last_start_dsh; + /* Background workers. */ LogicalRepWorker workers[FLEXIBLE_ARRAY_MEMBER]; } LogicalRepCtxStruct; @@ -74,6 +98,9 @@ static void logicalrep_launcher_onexit(int code, Datum arg); static void logicalrep_worker_onexit(int code, Datum arg); static void logicalrep_worker_detach(void); static void logicalrep_worker_cleanup(LogicalRepWorker *worker); +static void logicalrep_launcher_attach_dshmem(void); +static void logicalrep_launcher_set_last_start_time(Oid subid, TimestampTz start_time); +static TimestampTz logicalrep_launcher_get_last_start_time(Oid subid); static bool on_commit_launcher_wakeup = false; @@ -756,6 +783,9 @@ ApplyLauncherShmemInit(void) memset(worker, 0, sizeof(LogicalRepWorker)); SpinLockInit(&worker->relmutex); } + + LogicalRepCtx->last_start_dsa = DSM_HANDLE_INVALID; + LogicalRepCtx->last_start_dsh = DSM_HANDLE_INVALID; } } @@ -801,8 +831,6 @@ ApplyLauncherWakeup(void) void ApplyLauncherMain(Datum main_arg) { - TimestampTz last_start_time = 0; - ereport(DEBUG1, (errmsg_internal("logical replication launcher started"))); @@ -837,58 +865,55 @@ ApplyLauncherMain(Datum main_arg) now = GetCurrentTimestamp(); - /* Limit the start retry to once a wal_retrieve_retry_interval */ - if (TimestampDifferenceExceeds(last_start_time, now, - wal_retrieve_retry_interval)) - { - /* Use temporary context for the database list and worker info. */ - subctx = AllocSetContextCreate(TopMemoryContext, - "Logical Replication Launcher sublist", - ALLOCSET_DEFAULT_SIZES); - oldctx = MemoryContextSwitchTo(subctx); - - /* search for subscriptions to start or stop. */ - sublist = get_subscription_list(); - - /* Start the missing workers for enabled subscriptions. */ - foreach(lc, sublist) - { - Subscription *sub = (Subscription *) lfirst(lc); - LogicalRepWorker *w; + /* Use temporary context for the database list and worker info. */ + subctx = AllocSetContextCreate(TopMemoryContext, + "Logical Replication Launcher sublist", + ALLOCSET_DEFAULT_SIZES); + oldctx = MemoryContextSwitchTo(subctx); - if (!sub->enabled) - continue; + sublist = get_subscription_list(); + foreach(lc, sublist) + { + Subscription *sub = (Subscription *) lfirst(lc); + LogicalRepWorker *w; + TimestampTz last_start; - LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); - w = logicalrep_worker_find(sub->oid, InvalidOid, false); - LWLockRelease(LogicalRepWorkerLock); + if (!sub->enabled) + continue; - if (w == NULL) - { - last_start_time = now; - wait_time = wal_retrieve_retry_interval; + LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE); + w = logicalrep_worker_find(sub->oid, InvalidOid, false); + LWLockRelease(LogicalRepWorkerLock); - logicalrep_worker_launch(sub->dbid, sub->oid, sub->name, - sub->owner, InvalidOid); - } - } + if (w != NULL) + continue; - /* Switch back to original memory context. */ - MemoryContextSwitchTo(oldctx); - /* Clean the temporary memory. */ - MemoryContextDelete(subctx); - } - else - { /* - * The wait in previous cycle was interrupted in less than - * wal_retrieve_retry_interval since last worker was started, this - * usually means crash of the worker, so we should retry in - * wal_retrieve_retry_interval again. + * If the worker is eligible to start now, launch it. Otherwise, + * adjust wait_time so that we wake up when it can be started. */ - wait_time = wal_retrieve_retry_interval; + last_start = logicalrep_launcher_get_last_start_time(sub->oid); + if (TimestampDifferenceExceeds(last_start, now, + wal_retrieve_retry_interval)) + { + logicalrep_launcher_set_last_start_time(sub->oid, now); + logicalrep_worker_launch(sub->dbid, sub->oid, sub->name, + sub->owner, InvalidOid); + } + else + { + long elapsed; + + elapsed = TimestampDifferenceMilliseconds(last_start, now); + wait_time = Min(wait_time, wal_retrieve_retry_interval - elapsed); + } } + /* Switch back to original memory context. */ + MemoryContextSwitchTo(oldctx); + /* Clean the temporary memory. */ + MemoryContextDelete(subctx); + /* Wait for more work. */ rc = WaitLatch(MyLatch, WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, @@ -996,3 +1021,89 @@ pg_stat_get_subscription(PG_FUNCTION_ARGS) return (Datum) 0; } + +/* + * Initialize or attach to the dynamic shared hash table that stores the + * last-start times, if not already done. This must be called before using the + * table. + */ +static void +logicalrep_launcher_attach_dshmem(void) +{ + MemoryContext oldcontext; + + oldcontext = MemoryContextSwitchTo(TopMemoryContext); + LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE); + + if (LogicalRepCtx->last_start_dsh == DSM_HANDLE_INVALID) + { + /* Initialize dynamic shared hash table for last-start times. */ + last_start_times_dsa = dsa_create(LWTRANCHE_LAUNCHER_DSA); + dsa_pin(last_start_times_dsa); + dsa_pin_mapping(last_start_times_dsa); + last_start_times = dshash_create(last_start_times_dsa, &dsh_params, 0); + + /* Store handles in shared memory for other backends to use. */ + LogicalRepCtx->last_start_dsa = dsa_get_handle(last_start_times_dsa); + LogicalRepCtx->last_start_dsh = dshash_get_hash_table_handle(last_start_times); + } + else if (!last_start_times) + { + /* Attach to existing dynamic shared hash table. */ + last_start_times_dsa = dsa_attach(LogicalRepCtx->last_start_dsa); + dsa_pin_mapping(last_start_times_dsa); + last_start_times = dshash_attach(last_start_times_dsa, &dsh_params, + LogicalRepCtx->last_start_dsh, 0); + } + + LWLockRelease(LogicalRepWorkerLock); + MemoryContextSwitchTo(oldcontext); +} + +/* + * Set the last-start time for the subscription. + */ +static void +logicalrep_launcher_set_last_start_time(Oid subid, TimestampTz start_time) +{ + LauncherLastStartTimesEntry *entry; + bool found; + + logicalrep_launcher_attach_dshmem(); + + entry = dshash_find_or_insert(last_start_times, &subid, &found); + entry->last_start_time = start_time; + dshash_release_lock(last_start_times, entry); +} + +/* + * Return the last-start time for the subscription, or 0 if there isn't one. + */ +static TimestampTz +logicalrep_launcher_get_last_start_time(Oid subid) +{ + LauncherLastStartTimesEntry *entry; + TimestampTz ret; + + logicalrep_launcher_attach_dshmem(); + + entry = dshash_find(last_start_times, &subid, false); + if (entry == NULL) + return 0; + + ret = entry->last_start_time; + dshash_release_lock(last_start_times, entry); + + return ret; +} + +/* + * Remove the last-start time for the subscription, if one exists. + */ +void +logicalrep_launcher_delete_last_start_time(Oid subid) +{ + logicalrep_launcher_attach_dshmem(); + + (void) dshash_delete_key(last_start_times, &subid); +} diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index 280fed8cdb..adc1043211 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -617,6 +617,13 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) ereport(LOG, (errmsg("logical replication apply worker for subscription \"%s\" will restart so that two_phase can be enabled", MySubscription->name))); + + /* + * Clear the last-start time for the worker so that the launcher + * restarts it immediately, bypassing wal_retrieve_retry_interval. + */ + logicalrep_launcher_delete_last_start_time(MySubscription->oid); + should_exit = true; } diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index cf38ed821e..ac125daad5 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -164,6 +164,7 @@ #include "postmaster/walwriter.h" #include "replication/decode.h" #include "replication/logical.h" +#include "replication/logicallauncher.h" #include "replication/logicalproto.h" #include "replication/logicalrelation.h" #include "replication/logicalworker.h" @@ -3089,6 +3090,8 @@ maybe_reread_subscription(void) "stop because the subscription was removed", MySubscription->name))); + if (!am_tablesync_worker()) + logicalrep_launcher_delete_last_start_time(MyLogicalRepWorker->subid); proc_exit(0); } @@ -3100,6 +3103,8 @@ maybe_reread_subscription(void) "stop because the subscription was disabled", MySubscription->name))); + if (!am_tablesync_worker()) + logicalrep_launcher_delete_last_start_time(MyLogicalRepWorker->subid); proc_exit(0); } @@ -3126,6 +3131,13 @@ maybe_reread_subscription(void) (errmsg("logical replication apply worker for subscription \"%s\" will restart because of a parameter change", MySubscription->name))); + /* + * Clear the last-start time for the apply worker so that the launcher + * will restart it immediately, bypassing wal_retrieve_retry_interval. + */ + if (!am_tablesync_worker()) + logicalrep_launcher_delete_last_start_time(MySubscription->oid); + proc_exit(0); } @@ -3671,6 +3683,8 @@ ApplyWorkerMain(Datum main_arg) (errmsg("logical replication apply worker for subscription %u will not " "start because the subscription was removed during startup", MyLogicalRepWorker->subid))); + if (!am_tablesync_worker()) + logicalrep_launcher_delete_last_start_time(MyLogicalRepWorker->subid); proc_exit(0); } @@ -3684,6 +3698,8 @@ ApplyWorkerMain(Datum main_arg) "start because the subscription was disabled during startup", MySubscription->name))); + if (!am_tablesync_worker()) + logicalrep_launcher_delete_last_start_time(MyLogicalRepWorker->subid); proc_exit(0); } @@ -3884,6 +3900,8 @@ DisableSubscriptionAndExit(void) errmsg("subscription \"%s\" has been disabled because of an error", MySubscription->name)); + if (!am_tablesync_worker()) + logicalrep_launcher_delete_last_start_time(MyLogicalRepWorker->subid); proc_exit(0); } diff --git a/src/backend/storage/lmgr/lwlock.c b/src/backend/storage/lmgr/lwlock.c index 196bece0a3..d2ec396045 100644 --- a/src/backend/storage/lmgr/lwlock.c +++ b/src/backend/storage/lmgr/lwlock.c @@ -186,6 +186,10 @@ static const char *const BuiltinTrancheNames[] = { "PgStatsHash", /* LWTRANCHE_PGSTATS_DATA: */ "PgStatsData", + /* LWTRANCHE_LAUNCHER_DSA: */ + "LogicalRepLauncherDSA", + /* LWTRANCHE_LAUNCHER_HASH: */ + "LogicalRepLauncherHash", }; StaticAssertDecl(lengthof(BuiltinTrancheNames) == diff --git a/src/include/replication/logicallauncher.h b/src/include/replication/logicallauncher.h index e1661b6c91..00f6f89d72 100644 --- a/src/include/replication/logicallauncher.h +++ b/src/include/replication/logicallauncher.h @@ -26,4 +26,6 @@ extern void AtEOXact_ApplyLauncher(bool isCommit); extern bool IsLogicalLauncher(void); +extern void logicalrep_launcher_delete_last_start_time(Oid subid); + #endif /* LOGICALLAUNCHER_H */ diff --git a/src/include/storage/lwlock.h b/src/include/storage/lwlock.h index e4162db613..d2c7afb8f4 100644 --- a/src/include/storage/lwlock.h +++ b/src/include/storage/lwlock.h @@ -204,6 +204,8 @@ typedef enum BuiltinTrancheIds LWTRANCHE_PGSTATS_DSA, LWTRANCHE_PGSTATS_HASH, LWTRANCHE_PGSTATS_DATA, + LWTRANCHE_LAUNCHER_DSA, + LWTRANCHE_LAUNCHER_HASH, LWTRANCHE_FIRST_USER_DEFINED } BuiltinTrancheIds; -- 2.25.1