Nathan Bossart <nathandboss...@gmail.com> writes: > I found some additional places that should remove the last-start time from > the hash table. I've added those in v14.
I've pushed 0001 and 0002, which seem pretty uncontroversial. Attached is a rebased 0003, just to keep the cfbot happy. I'm kind of wondering whether 0003 is worth the complexity TBH, but in any case I ran out of time to look at it closely today. regards, tom lane
diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml index 5bcba0fdec..8f06e234c8 100644 --- a/doc/src/sgml/monitoring.sgml +++ b/doc/src/sgml/monitoring.sgml @@ -1991,6 +1991,16 @@ postgres 27093 0.0 0.0 30096 2752 ? Ss 11:34 0:00 postgres: ser <entry>Waiting to read or update information about <quote>heavyweight</quote> locks.</entry> </row> + <row> + <entry><literal>LogicalRepLauncherDSA</literal></entry> + <entry>Waiting for logical replication launcher dynamic shared memory + allocator access</entry> + </row> + <row> + <entry><literal>LogicalRepLauncherHash</literal></entry> + <entry>Waiting for logical replication launcher shared memory hash table + access</entry> + </row> <row> <entry><literal>LogicalRepWorker</literal></entry> <entry>Waiting to read or update the state of logical replication diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index f15a332bae..88f180c2a7 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -1504,6 +1504,14 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel) } list_free(subworkers); + /* + * Clear the last-start time for the apply worker to free up space. If + * this transaction rolls back, the launcher might restart the apply worker + * before wal_retrieve_retry_interval milliseconds have elapsed, but that's + * probably okay. + */ + logicalrep_launcher_delete_last_start_time(subid); + /* * Cleanup of tablesync replication origins. * diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c index a69e371c05..dfe49db64f 100644 --- a/src/backend/replication/logical/launcher.c +++ b/src/backend/replication/logical/launcher.c @@ -25,6 +25,7 @@ #include "catalog/pg_subscription.h" #include "catalog/pg_subscription_rel.h" #include "funcapi.h" +#include "lib/dshash.h" #include "libpq/pqsignal.h" #include "miscadmin.h" #include "pgstat.h" @@ -56,6 +57,25 @@ int max_logical_replication_workers = 4; int max_sync_workers_per_subscription = 2; +/* an entry in the last-start times hash table */ +typedef struct LauncherLastStartTimesEntry +{ + Oid subid; + TimestampTz last_start_time; +} LauncherLastStartTimesEntry; + +/* parameters for the last-start times hash table */ +static const dshash_parameters dsh_params = { + sizeof(Oid), + sizeof(LauncherLastStartTimesEntry), + dshash_memcmp, + dshash_memhash, + LWTRANCHE_LAUNCHER_HASH +}; + +static dsa_area *last_start_times_dsa = NULL; +static dshash_table *last_start_times = NULL; + LogicalRepWorker *MyLogicalRepWorker = NULL; typedef struct LogicalRepCtxStruct @@ -63,6 +83,10 @@ typedef struct LogicalRepCtxStruct /* Supervisor process. */ pid_t launcher_pid; + /* hash table for last-start times */ + dsa_handle last_start_dsa; + dshash_table_handle last_start_dsh; + /* Background workers. */ LogicalRepWorker workers[FLEXIBLE_ARRAY_MEMBER]; } LogicalRepCtxStruct; @@ -74,6 +98,9 @@ static void logicalrep_launcher_onexit(int code, Datum arg); static void logicalrep_worker_onexit(int code, Datum arg); static void logicalrep_worker_detach(void); static void logicalrep_worker_cleanup(LogicalRepWorker *worker); +static void logicalrep_launcher_attach_dshmem(void); +static void logicalrep_launcher_set_last_start_time(Oid subid, TimestampTz start_time); +static TimestampTz logicalrep_launcher_get_last_start_time(Oid subid); static bool on_commit_launcher_wakeup = false; @@ -756,6 +783,9 @@ ApplyLauncherShmemInit(void) memset(worker, 0, sizeof(LogicalRepWorker)); SpinLockInit(&worker->relmutex); } + + LogicalRepCtx->last_start_dsa = DSM_HANDLE_INVALID; + LogicalRepCtx->last_start_dsh = DSM_HANDLE_INVALID; } } @@ -801,8 +831,6 @@ ApplyLauncherWakeup(void) void ApplyLauncherMain(Datum main_arg) { - TimestampTz last_start_time = 0; - ereport(DEBUG1, (errmsg_internal("logical replication launcher started"))); @@ -837,58 +865,55 @@ ApplyLauncherMain(Datum main_arg) now = GetCurrentTimestamp(); - /* Limit the start retry to once a wal_retrieve_retry_interval */ - if (TimestampDifferenceExceeds(last_start_time, now, - wal_retrieve_retry_interval)) - { - /* Use temporary context for the database list and worker info. */ - subctx = AllocSetContextCreate(TopMemoryContext, - "Logical Replication Launcher sublist", - ALLOCSET_DEFAULT_SIZES); - oldctx = MemoryContextSwitchTo(subctx); - - /* search for subscriptions to start or stop. */ - sublist = get_subscription_list(); - - /* Start the missing workers for enabled subscriptions. */ - foreach(lc, sublist) - { - Subscription *sub = (Subscription *) lfirst(lc); - LogicalRepWorker *w; + /* Use temporary context for the database list and worker info. */ + subctx = AllocSetContextCreate(TopMemoryContext, + "Logical Replication Launcher sublist", + ALLOCSET_DEFAULT_SIZES); + oldctx = MemoryContextSwitchTo(subctx); - if (!sub->enabled) - continue; + sublist = get_subscription_list(); + foreach(lc, sublist) + { + Subscription *sub = (Subscription *) lfirst(lc); + LogicalRepWorker *w; + TimestampTz last_start; - LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); - w = logicalrep_worker_find(sub->oid, InvalidOid, false); - LWLockRelease(LogicalRepWorkerLock); + if (!sub->enabled) + continue; - if (w == NULL) - { - last_start_time = now; - wait_time = wal_retrieve_retry_interval; + LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE); + w = logicalrep_worker_find(sub->oid, InvalidOid, false); + LWLockRelease(LogicalRepWorkerLock); - logicalrep_worker_launch(sub->dbid, sub->oid, sub->name, - sub->owner, InvalidOid); - } - } + if (w != NULL) + continue; - /* Switch back to original memory context. */ - MemoryContextSwitchTo(oldctx); - /* Clean the temporary memory. */ - MemoryContextDelete(subctx); - } - else - { /* - * The wait in previous cycle was interrupted in less than - * wal_retrieve_retry_interval since last worker was started, this - * usually means crash of the worker, so we should retry in - * wal_retrieve_retry_interval again. + * If the worker is eligible to start now, launch it. Otherwise, + * adjust wait_time so that we wake up when it can be started. */ - wait_time = wal_retrieve_retry_interval; + last_start = logicalrep_launcher_get_last_start_time(sub->oid); + if (TimestampDifferenceExceeds(last_start, now, + wal_retrieve_retry_interval)) + { + logicalrep_launcher_set_last_start_time(sub->oid, now); + logicalrep_worker_launch(sub->dbid, sub->oid, sub->name, + sub->owner, InvalidOid); + } + else + { + long elapsed; + + elapsed = TimestampDifferenceMilliseconds(last_start, now); + wait_time = Min(wait_time, wal_retrieve_retry_interval - elapsed); + } } + /* Switch back to original memory context. */ + MemoryContextSwitchTo(oldctx); + /* Clean the temporary memory. */ + MemoryContextDelete(subctx); + /* Wait for more work. */ rc = WaitLatch(MyLatch, WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, @@ -996,3 +1021,89 @@ pg_stat_get_subscription(PG_FUNCTION_ARGS) return (Datum) 0; } + +/* + * Initialize or attach to the dynamic shared hash table that stores the + * last-start times, if not already done. This must be called before using the + * table. + */ +static void +logicalrep_launcher_attach_dshmem(void) +{ + MemoryContext oldcontext; + + oldcontext = MemoryContextSwitchTo(TopMemoryContext); + LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE); + + if (LogicalRepCtx->last_start_dsh == DSM_HANDLE_INVALID) + { + /* Initialize dynamic shared hash table for last-start times. */ + last_start_times_dsa = dsa_create(LWTRANCHE_LAUNCHER_DSA); + dsa_pin(last_start_times_dsa); + dsa_pin_mapping(last_start_times_dsa); + last_start_times = dshash_create(last_start_times_dsa, &dsh_params, 0); + + /* Store handles in shared memory for other backends to use. */ + LogicalRepCtx->last_start_dsa = dsa_get_handle(last_start_times_dsa); + LogicalRepCtx->last_start_dsh = dshash_get_hash_table_handle(last_start_times); + } + else if (!last_start_times) + { + /* Attach to existing dynamic shared hash table. */ + last_start_times_dsa = dsa_attach(LogicalRepCtx->last_start_dsa); + dsa_pin_mapping(last_start_times_dsa); + last_start_times = dshash_attach(last_start_times_dsa, &dsh_params, + LogicalRepCtx->last_start_dsh, 0); + } + + LWLockRelease(LogicalRepWorkerLock); + MemoryContextSwitchTo(oldcontext); +} + +/* + * Set the last-start time for the subscription. + */ +static void +logicalrep_launcher_set_last_start_time(Oid subid, TimestampTz start_time) +{ + LauncherLastStartTimesEntry *entry; + bool found; + + logicalrep_launcher_attach_dshmem(); + + entry = dshash_find_or_insert(last_start_times, &subid, &found); + entry->last_start_time = start_time; + dshash_release_lock(last_start_times, entry); +} + +/* + * Return the last-start time for the subscription, or 0 if there isn't one. + */ +static TimestampTz +logicalrep_launcher_get_last_start_time(Oid subid) +{ + LauncherLastStartTimesEntry *entry; + TimestampTz ret; + + logicalrep_launcher_attach_dshmem(); + + entry = dshash_find(last_start_times, &subid, false); + if (entry == NULL) + return 0; + + ret = entry->last_start_time; + dshash_release_lock(last_start_times, entry); + + return ret; +} + +/* + * Remove the last-start time for the subscription, if one exists. + */ +void +logicalrep_launcher_delete_last_start_time(Oid subid) +{ + logicalrep_launcher_attach_dshmem(); + + (void) dshash_delete_key(last_start_times, &subid); +} diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index 09b3e8b32a..61be761e03 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -618,6 +618,13 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) ereport(LOG, (errmsg("logical replication apply worker for subscription \"%s\" will restart so that two_phase can be enabled", MySubscription->name))); + + /* + * Clear the last-start time for this worker so that the + * launcher will restart it immediately. + */ + logicalrep_launcher_delete_last_start_time(MySubscription->oid); + should_exit = true; } } diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index f8649e142c..1e4d94d359 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -164,6 +164,7 @@ #include "postmaster/walwriter.h" #include "replication/decode.h" #include "replication/logical.h" +#include "replication/logicallauncher.h" #include "replication/logicalproto.h" #include "replication/logicalrelation.h" #include "replication/logicalworker.h" @@ -3089,6 +3090,8 @@ maybe_reread_subscription(void) "stop because the subscription was removed", MySubscription->name))); + if (!am_tablesync_worker()) + logicalrep_launcher_delete_last_start_time(MyLogicalRepWorker->subid); proc_exit(0); } @@ -3100,6 +3103,8 @@ maybe_reread_subscription(void) "stop because the subscription was disabled", MySubscription->name))); + if (!am_tablesync_worker()) + logicalrep_launcher_delete_last_start_time(MyLogicalRepWorker->subid); proc_exit(0); } @@ -3126,6 +3131,13 @@ maybe_reread_subscription(void) (errmsg("logical replication apply worker for subscription \"%s\" will restart because of a parameter change", MySubscription->name))); + /* + * Clear the last-start time for the apply worker so that the launcher + * will restart it immediately, bypassing wal_retrieve_retry_interval. + */ + if (!am_tablesync_worker()) + logicalrep_launcher_delete_last_start_time(MySubscription->oid); + proc_exit(0); } @@ -3671,6 +3683,8 @@ ApplyWorkerMain(Datum main_arg) (errmsg("logical replication apply worker for subscription %u will not " "start because the subscription was removed during startup", MyLogicalRepWorker->subid))); + if (!am_tablesync_worker()) + logicalrep_launcher_delete_last_start_time(MyLogicalRepWorker->subid); proc_exit(0); } @@ -3684,6 +3698,8 @@ ApplyWorkerMain(Datum main_arg) "start because the subscription was disabled during startup", MySubscription->name))); + if (!am_tablesync_worker()) + logicalrep_launcher_delete_last_start_time(MyLogicalRepWorker->subid); proc_exit(0); } @@ -3884,6 +3900,8 @@ DisableSubscriptionAndExit(void) errmsg("subscription \"%s\" has been disabled because of an error", MySubscription->name)); + if (!am_tablesync_worker()) + logicalrep_launcher_delete_last_start_time(MyLogicalRepWorker->subid); proc_exit(0); } diff --git a/src/backend/storage/lmgr/lwlock.c b/src/backend/storage/lmgr/lwlock.c index 196bece0a3..d2ec396045 100644 --- a/src/backend/storage/lmgr/lwlock.c +++ b/src/backend/storage/lmgr/lwlock.c @@ -186,6 +186,10 @@ static const char *const BuiltinTrancheNames[] = { "PgStatsHash", /* LWTRANCHE_PGSTATS_DATA: */ "PgStatsData", + /* LWTRANCHE_LAUNCHER_DSA: */ + "LogicalRepLauncherDSA", + /* LWTRANCHE_LAUNCHER_HASH: */ + "LogicalRepLauncherHash", }; StaticAssertDecl(lengthof(BuiltinTrancheNames) == diff --git a/src/include/replication/logicallauncher.h b/src/include/replication/logicallauncher.h index e1661b6c91..00f6f89d72 100644 --- a/src/include/replication/logicallauncher.h +++ b/src/include/replication/logicallauncher.h @@ -26,4 +26,6 @@ extern void AtEOXact_ApplyLauncher(bool isCommit); extern bool IsLogicalLauncher(void); +extern void logicalrep_launcher_delete_last_start_time(Oid subid); + #endif /* LOGICALLAUNCHER_H */ diff --git a/src/include/storage/lwlock.h b/src/include/storage/lwlock.h index e4162db613..d2c7afb8f4 100644 --- a/src/include/storage/lwlock.h +++ b/src/include/storage/lwlock.h @@ -204,6 +204,8 @@ typedef enum BuiltinTrancheIds LWTRANCHE_PGSTATS_DSA, LWTRANCHE_PGSTATS_HASH, LWTRANCHE_PGSTATS_DATA, + LWTRANCHE_LAUNCHER_DSA, + LWTRANCHE_LAUNCHER_HASH, LWTRANCHE_FIRST_USER_DEFINED } BuiltinTrancheIds;