rebased for cfbot
-- Nathan Bossart Amazon Web Services: https://aws.amazon.com
diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml index cf220c3bcb..7661c0c86e 100644 --- a/doc/src/sgml/monitoring.sgml +++ b/doc/src/sgml/monitoring.sgml @@ -1996,6 +1996,16 @@ postgres 27093 0.0 0.0 30096 2752 ? Ss 11:34 0:00 postgres: ser <entry>Waiting to read or update information about <quote>heavyweight</quote> locks.</entry> </row> + <row> + <entry><literal>LogicalRepLauncherDSA</literal></entry> + <entry>Waiting for logical replication launcher dynamic shared memory + allocator access</entry> + </row> + <row> + <entry><literal>LogicalRepLauncherHash</literal></entry> + <entry>Waiting for logical replication launcher shared memory hash table + access</entry> + </row> <row> <entry><literal>LogicalRepWorker</literal></entry> <entry>Waiting to read or update the state of logical replication diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index baff00dd74..468e5e0bf1 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -1504,6 +1504,14 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel) } list_free(subworkers); + /* + * Clear the last-start time for the apply worker to free up space. If + * this transaction rolls back, the launcher might restart the apply worker + * before wal_retrieve_retry_interval milliseconds have elapsed, but that's + * probably okay. + */ + logicalrep_launcher_delete_last_start_time(subid); + /* * Cleanup of tablesync replication origins. * diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c index afb7acddaa..466917569a 100644 --- a/src/backend/replication/logical/launcher.c +++ b/src/backend/replication/logical/launcher.c @@ -25,6 +25,7 @@ #include "catalog/pg_subscription.h" #include "catalog/pg_subscription_rel.h" #include "funcapi.h" +#include "lib/dshash.h" #include "libpq/pqsignal.h" #include "miscadmin.h" #include "pgstat.h" @@ -57,6 +58,25 @@ int max_logical_replication_workers = 4; int max_sync_workers_per_subscription = 2; int max_parallel_apply_workers_per_subscription = 2; +/* an entry in the last-start times hash table */ +typedef struct LauncherLastStartTimesEntry +{ + Oid subid; + TimestampTz last_start_time; +} LauncherLastStartTimesEntry; + +/* parameters for the last-start times hash table */ +static const dshash_parameters dsh_params = { + sizeof(Oid), + sizeof(LauncherLastStartTimesEntry), + dshash_memcmp, + dshash_memhash, + LWTRANCHE_LAUNCHER_HASH +}; + +static dsa_area *last_start_times_dsa = NULL; +static dshash_table *last_start_times = NULL; + LogicalRepWorker *MyLogicalRepWorker = NULL; typedef struct LogicalRepCtxStruct @@ -64,6 +84,10 @@ typedef struct LogicalRepCtxStruct /* Supervisor process. */ pid_t launcher_pid; + /* hash table for last-start times */ + dsa_handle last_start_dsa; + dshash_table_handle last_start_dsh; + /* Background workers. */ LogicalRepWorker workers[FLEXIBLE_ARRAY_MEMBER]; } LogicalRepCtxStruct; @@ -76,6 +100,9 @@ static void logicalrep_worker_onexit(int code, Datum arg); static void logicalrep_worker_detach(void); static void logicalrep_worker_cleanup(LogicalRepWorker *worker); static int logicalrep_pa_worker_count(Oid subid); +static void logicalrep_launcher_attach_dshmem(void); +static void logicalrep_launcher_set_last_start_time(Oid subid, TimestampTz start_time); +static TimestampTz logicalrep_launcher_get_last_start_time(Oid subid); static bool on_commit_launcher_wakeup = false; @@ -902,6 +929,9 @@ ApplyLauncherShmemInit(void) memset(worker, 0, sizeof(LogicalRepWorker)); SpinLockInit(&worker->relmutex); } + + LogicalRepCtx->last_start_dsa = DSM_HANDLE_INVALID; + LogicalRepCtx->last_start_dsh = DSM_HANDLE_INVALID; } } @@ -947,8 +977,6 @@ ApplyLauncherWakeup(void) void ApplyLauncherMain(Datum main_arg) { - TimestampTz last_start_time = 0; - ereport(DEBUG1, (errmsg_internal("logical replication launcher started"))); @@ -983,58 +1011,56 @@ ApplyLauncherMain(Datum main_arg) now = GetCurrentTimestamp(); - /* Limit the start retry to once a wal_retrieve_retry_interval */ - if (TimestampDifferenceExceeds(last_start_time, now, - wal_retrieve_retry_interval)) - { - /* Use temporary context for the database list and worker info. */ - subctx = AllocSetContextCreate(TopMemoryContext, - "Logical Replication Launcher sublist", - ALLOCSET_DEFAULT_SIZES); - oldctx = MemoryContextSwitchTo(subctx); - - /* search for subscriptions to start or stop. */ - sublist = get_subscription_list(); - - /* Start the missing workers for enabled subscriptions. */ - foreach(lc, sublist) - { - Subscription *sub = (Subscription *) lfirst(lc); - LogicalRepWorker *w; + /* Use temporary context for the database list and worker info. */ + subctx = AllocSetContextCreate(TopMemoryContext, + "Logical Replication Launcher sublist", + ALLOCSET_DEFAULT_SIZES); + oldctx = MemoryContextSwitchTo(subctx); - if (!sub->enabled) - continue; + sublist = get_subscription_list(); + foreach(lc, sublist) + { + Subscription *sub = (Subscription *) lfirst(lc); + LogicalRepWorker *w; + TimestampTz last_start; - LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); - w = logicalrep_worker_find(sub->oid, InvalidOid, false); - LWLockRelease(LogicalRepWorkerLock); + if (!sub->enabled) + continue; - if (w == NULL) - { - last_start_time = now; - wait_time = wal_retrieve_retry_interval; + LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE); + w = logicalrep_worker_find(sub->oid, InvalidOid, false); + LWLockRelease(LogicalRepWorkerLock); - logicalrep_worker_launch(sub->dbid, sub->oid, sub->name, - sub->owner, InvalidOid, DSM_HANDLE_INVALID); - } - } + if (w != NULL) + continue; - /* Switch back to original memory context. */ - MemoryContextSwitchTo(oldctx); - /* Clean the temporary memory. */ - MemoryContextDelete(subctx); - } - else - { /* - * The wait in previous cycle was interrupted in less than - * wal_retrieve_retry_interval since last worker was started, this - * usually means crash of the worker, so we should retry in - * wal_retrieve_retry_interval again. + * If the worker is eligible to start now, launch it. Otherwise, + * adjust wait_time so that we wake up when it can be started. */ - wait_time = wal_retrieve_retry_interval; + last_start = logicalrep_launcher_get_last_start_time(sub->oid); + if (TimestampDifferenceExceeds(last_start, now, + wal_retrieve_retry_interval)) + { + logicalrep_launcher_set_last_start_time(sub->oid, now); + logicalrep_worker_launch(sub->dbid, sub->oid, sub->name, + sub->owner, InvalidOid, + DSM_HANDLE_INVALID); + } + else + { + long elapsed; + + elapsed = TimestampDifferenceMilliseconds(last_start, now); + wait_time = Min(wait_time, wal_retrieve_retry_interval - elapsed); + } } + /* Switch back to original memory context. */ + MemoryContextSwitchTo(oldctx); + /* Clean the temporary memory. */ + MemoryContextDelete(subctx); + /* Wait for more work. */ rc = WaitLatch(MyLatch, WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, @@ -1146,3 +1172,89 @@ pg_stat_get_subscription(PG_FUNCTION_ARGS) return (Datum) 0; } + +/* + * Initialize or attach to the dynamic shared hash table that stores the + * last-start times, if not already done. This must be called before using the + * table. + */ +static void +logicalrep_launcher_attach_dshmem(void) +{ + MemoryContext oldcontext; + + oldcontext = MemoryContextSwitchTo(TopMemoryContext); + LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE); + + if (LogicalRepCtx->last_start_dsh == DSM_HANDLE_INVALID) + { + /* Initialize dynamic shared hash table for last-start times. */ + last_start_times_dsa = dsa_create(LWTRANCHE_LAUNCHER_DSA); + dsa_pin(last_start_times_dsa); + dsa_pin_mapping(last_start_times_dsa); + last_start_times = dshash_create(last_start_times_dsa, &dsh_params, 0); + + /* Store handles in shared memory for other backends to use. */ + LogicalRepCtx->last_start_dsa = dsa_get_handle(last_start_times_dsa); + LogicalRepCtx->last_start_dsh = dshash_get_hash_table_handle(last_start_times); + } + else if (!last_start_times) + { + /* Attach to existing dynamic shared hash table. */ + last_start_times_dsa = dsa_attach(LogicalRepCtx->last_start_dsa); + dsa_pin_mapping(last_start_times_dsa); + last_start_times = dshash_attach(last_start_times_dsa, &dsh_params, + LogicalRepCtx->last_start_dsh, 0); + } + + LWLockRelease(LogicalRepWorkerLock); + MemoryContextSwitchTo(oldcontext); +} + +/* + * Set the last-start time for the subscription. + */ +static void +logicalrep_launcher_set_last_start_time(Oid subid, TimestampTz start_time) +{ + LauncherLastStartTimesEntry *entry; + bool found; + + logicalrep_launcher_attach_dshmem(); + + entry = dshash_find_or_insert(last_start_times, &subid, &found); + entry->last_start_time = start_time; + dshash_release_lock(last_start_times, entry); +} + +/* + * Return the last-start time for the subscription, or 0 if there isn't one. + */ +static TimestampTz +logicalrep_launcher_get_last_start_time(Oid subid) +{ + LauncherLastStartTimesEntry *entry; + TimestampTz ret; + + logicalrep_launcher_attach_dshmem(); + + entry = dshash_find(last_start_times, &subid, false); + if (entry == NULL) + return 0; + + ret = entry->last_start_time; + dshash_release_lock(last_start_times, entry); + + return ret; +} + +/* + * Remove the last-start time for the subscription, if one exists. + */ +void +logicalrep_launcher_delete_last_start_time(Oid subid) +{ + logicalrep_launcher_attach_dshmem(); + + (void) dshash_delete_key(last_start_times, &subid); +} diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index 38dfce7129..aa5ee089f7 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -628,7 +628,15 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) } if (should_exit) + { + /* + * Clear the last-start time for this worker so that the launcher will + * restart it immediately. + */ + logicalrep_launcher_delete_last_start_time(MySubscription->oid); + proc_exit(0); + } } /* diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 79cda39445..0c41d5fdd5 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -174,6 +174,7 @@ #include "postmaster/walwriter.h" #include "replication/decode.h" #include "replication/logical.h" +#include "replication/logicallauncher.h" #include "replication/logicalproto.h" #include "replication/logicalrelation.h" #include "replication/logicalworker.h" @@ -3809,6 +3810,13 @@ apply_worker_exit(void) return; } + /* + * Clear the last-start time for the apply worker so that the launcher + * will restart it immediately, bypassing wal_retrieve_retry_interval. + */ + if (!am_tablesync_worker()) + logicalrep_launcher_delete_last_start_time(MyLogicalRepWorker->subid); + proc_exit(0); } @@ -3849,6 +3857,8 @@ maybe_reread_subscription(void) (errmsg("%s for subscription \"%s\" will stop because the subscription was removed", get_worker_name(), MySubscription->name))); + if (!am_tablesync_worker() && !am_parallel_apply_worker()) + logicalrep_launcher_delete_last_start_time(MyLogicalRepWorker->subid); proc_exit(0); } @@ -4429,6 +4439,8 @@ InitializeApplyWorker(void) (errmsg("%s for subscription %u will not start because the subscription was removed during startup", get_worker_name(), MyLogicalRepWorker->subid))); + if (!am_tablesync_worker() && !am_parallel_apply_worker()) + logicalrep_launcher_delete_last_start_time(MyLogicalRepWorker->subid); proc_exit(0); } @@ -4691,6 +4703,8 @@ DisableSubscriptionAndExit(void) errmsg("subscription \"%s\" has been disabled because of an error", MySubscription->name)); + if (!am_tablesync_worker() && !am_parallel_apply_worker()) + logicalrep_launcher_delete_last_start_time(MyLogicalRepWorker->subid); proc_exit(0); } diff --git a/src/backend/storage/lmgr/lwlock.c b/src/backend/storage/lmgr/lwlock.c index 196bece0a3..d2ec396045 100644 --- a/src/backend/storage/lmgr/lwlock.c +++ b/src/backend/storage/lmgr/lwlock.c @@ -186,6 +186,10 @@ static const char *const BuiltinTrancheNames[] = { "PgStatsHash", /* LWTRANCHE_PGSTATS_DATA: */ "PgStatsData", + /* LWTRANCHE_LAUNCHER_DSA: */ + "LogicalRepLauncherDSA", + /* LWTRANCHE_LAUNCHER_HASH: */ + "LogicalRepLauncherHash", }; StaticAssertDecl(lengthof(BuiltinTrancheNames) == diff --git a/src/include/replication/logicallauncher.h b/src/include/replication/logicallauncher.h index c85593ad11..923f96256a 100644 --- a/src/include/replication/logicallauncher.h +++ b/src/include/replication/logicallauncher.h @@ -27,4 +27,6 @@ extern void AtEOXact_ApplyLauncher(bool isCommit); extern bool IsLogicalLauncher(void); +extern void logicalrep_launcher_delete_last_start_time(Oid subid); + #endif /* LOGICALLAUNCHER_H */ diff --git a/src/include/storage/lwlock.h b/src/include/storage/lwlock.h index e4162db613..d2c7afb8f4 100644 --- a/src/include/storage/lwlock.h +++ b/src/include/storage/lwlock.h @@ -204,6 +204,8 @@ typedef enum BuiltinTrancheIds LWTRANCHE_PGSTATS_DSA, LWTRANCHE_PGSTATS_HASH, LWTRANCHE_PGSTATS_DATA, + LWTRANCHE_LAUNCHER_DSA, + LWTRANCHE_LAUNCHER_HASH, LWTRANCHE_FIRST_USER_DEFINED } BuiltinTrancheIds;