On Wed, Jan 04, 2023 at 08:12:37PM -0800, Nathan Bossart wrote: > On Thu, Jan 05, 2023 at 09:09:12AM +0530, Amit Kapila wrote: >> But there doesn't appear to be any guarantee that the result for >> AllTablesyncsReady() will change between the time it is invoked >> earlier in the function and at the place you have it in the patch. >> This is because the value of 'table_states_valid' may not have >> changed. So, how is this supposed to work? > > The call to CommandCounterIncrement() should set table_states_valid to > false if needed.
In v12, I moved the restart for two_phase mode to the end of process_syncing_tables_for_apply() so that we don't need to rely on another iteration of the loop. -- Nathan Bossart Amazon Web Services: https://aws.amazon.com
>From 57bfee9615e25d62dc975325695fdf445c002a65 Mon Sep 17 00:00:00 2001 From: Nathan Bossart <nathandboss...@gmail.com> Date: Mon, 21 Nov 2022 16:01:01 -0800 Subject: [PATCH v12 1/2] wake up logical workers as needed instead of relying on periodic wakeups --- src/backend/access/transam/xact.c | 3 ++ src/backend/commands/alter.c | 7 +++ src/backend/commands/subscriptioncmds.c | 4 ++ src/backend/replication/logical/tablesync.c | 49 ++++++++++++--------- src/backend/replication/logical/worker.c | 46 +++++++++++++++++++ src/include/replication/logicalworker.h | 3 ++ 6 files changed, 90 insertions(+), 22 deletions(-) diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index 24221542e7..54145bf805 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -47,6 +47,7 @@ #include "pgstat.h" #include "replication/logical.h" #include "replication/logicallauncher.h" +#include "replication/logicalworker.h" #include "replication/origin.h" #include "replication/snapbuild.h" #include "replication/syncrep.h" @@ -2360,6 +2361,7 @@ CommitTransaction(void) AtEOXact_PgStat(true, is_parallel_worker); AtEOXact_Snapshot(true, false); AtEOXact_ApplyLauncher(true); + AtEOXact_LogicalRepWorkers(true); pgstat_report_xact_timestamp(0); CurrentResourceOwner = NULL; @@ -2860,6 +2862,7 @@ AbortTransaction(void) AtEOXact_HashTables(false); AtEOXact_PgStat(false, is_parallel_worker); AtEOXact_ApplyLauncher(false); + AtEOXact_LogicalRepWorkers(false); pgstat_report_xact_timestamp(0); } diff --git a/src/backend/commands/alter.c b/src/backend/commands/alter.c index 70d359eb6a..4e8102b59f 100644 --- a/src/backend/commands/alter.c +++ b/src/backend/commands/alter.c @@ -59,6 +59,7 @@ #include "commands/user.h" #include "miscadmin.h" #include "parser/parse_func.h" +#include "replication/logicalworker.h" #include "rewrite/rewriteDefine.h" #include "tcop/utility.h" #include "utils/builtins.h" @@ -279,6 +280,12 @@ AlterObjectRename_internal(Relation rel, Oid objectId, const char *new_name) if (strncmp(new_name, "regress_", 8) != 0) elog(WARNING, "subscriptions created by regression test cases should have names starting with \"regress_\""); #endif + + /* + * Wake up the logical replication workers to handle this change + * quickly. + */ + LogicalRepWorkersWakeupAtCommit(objectId); } else if (nameCacheId >= 0) { diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index b9c5df796f..b9bbb2cf4e 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -34,6 +34,7 @@ #include "nodes/makefuncs.h" #include "pgstat.h" #include "replication/logicallauncher.h" +#include "replication/logicalworker.h" #include "replication/origin.h" #include "replication/slot.h" #include "replication/walreceiver.h" @@ -1362,6 +1363,9 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, InvokeObjectPostAlterHook(SubscriptionRelationId, subid, 0); + /* Wake up the logical replication workers to handle this change quickly. */ + LogicalRepWorkersWakeupAtCommit(subid); + return myself; } diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index 2fdfeb5b4c..85e2a2861f 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -415,6 +415,7 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) static HTAB *last_start_times = NULL; ListCell *lc; bool started_tx = false; + bool should_exit = false; Assert(!IsTransactionState()); @@ -446,28 +447,6 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) last_start_times = NULL; } - /* - * Even when the two_phase mode is requested by the user, it remains as - * 'pending' until all tablesyncs have reached READY state. - * - * When this happens, we restart the apply worker and (if the conditions - * are still ok) then the two_phase tri-state will become 'enabled' at - * that time. - * - * Note: If the subscription has no tables then leave the state as - * PENDING, which allows ALTER SUBSCRIPTION ... REFRESH PUBLICATION to - * work. - */ - if (MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING && - AllTablesyncsReady()) - { - ereport(LOG, - (errmsg("logical replication apply worker for subscription \"%s\" will restart so that two_phase can be enabled", - MySubscription->name))); - - proc_exit(0); - } - /* * Process all tables that are being synchronized. */ @@ -619,9 +598,35 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) if (started_tx) { + /* + * Even when the two_phase mode is requested by the user, it remains as + * 'pending' until all tablesyncs have reached READY state. + * + * When this happens, we restart the apply worker and (if the conditions + * are still ok) then the two_phase tri-state will become 'enabled' at + * that time. + * + * Note: If the subscription has no tables then leave the state as + * PENDING, which allows ALTER SUBSCRIPTION ... REFRESH PUBLICATION to + * work. + */ + CommandCounterIncrement(); + if (MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING && + AllTablesyncsReady()) + { + ereport(LOG, + (errmsg("logical replication apply worker for subscription \"%s\" will restart so that two_phase can be enabled", + MySubscription->name))); + + should_exit = true; + } + CommitTransactionCommand(); pgstat_report_stat(true); } + + if (should_exit) + proc_exit(0); } /* diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 446f84fa97..3e2ea32e1e 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -254,6 +254,8 @@ WalReceiverConn *LogRepWorkerWalRcvConn = NULL; Subscription *MySubscription = NULL; static bool MySubscriptionValid = false; +static List *on_commit_wakeup_workers_subids = NIL; + bool in_remote_transaction = false; static XLogRecPtr remote_final_lsn = InvalidXLogRecPtr; @@ -4097,3 +4099,47 @@ reset_apply_error_context_info(void) apply_error_callback_arg.remote_attnum = -1; set_apply_error_context_xact(InvalidTransactionId, InvalidXLogRecPtr); } + +/* + * Wakeup the stored subscriptions' workers on commit if requested. + */ +void +AtEOXact_LogicalRepWorkers(bool isCommit) +{ + if (isCommit && on_commit_wakeup_workers_subids != NIL) + { + ListCell *subid; + + LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); + foreach(subid, on_commit_wakeup_workers_subids) + { + List *workers; + ListCell *worker; + + workers = logicalrep_workers_find(lfirst_oid(subid), true); + foreach(worker, workers) + logicalrep_worker_wakeup_ptr((LogicalRepWorker *) lfirst(worker)); + } + LWLockRelease(LogicalRepWorkerLock); + } + + on_commit_wakeup_workers_subids = NIL; +} + +/* + * Request wakeup of the workers for the given subscription ID on commit of the + * transaction. + * + * This is used to ensure that the workers process assorted changes as soon as + * possible. + */ +void +LogicalRepWorkersWakeupAtCommit(Oid subid) +{ + MemoryContext oldcxt; + + oldcxt = MemoryContextSwitchTo(TopTransactionContext); + on_commit_wakeup_workers_subids = list_append_unique_oid(on_commit_wakeup_workers_subids, + subid); + MemoryContextSwitchTo(oldcxt); +} diff --git a/src/include/replication/logicalworker.h b/src/include/replication/logicalworker.h index f1e7e8a348..3b2084f81f 100644 --- a/src/include/replication/logicalworker.h +++ b/src/include/replication/logicalworker.h @@ -16,4 +16,7 @@ extern void ApplyWorkerMain(Datum main_arg); extern bool IsLogicalWorker(void); +extern void LogicalRepWorkersWakeupAtCommit(Oid subid); +extern void AtEOXact_LogicalRepWorkers(bool isCommit); + #endif /* LOGICALWORKER_H */ -- 2.25.1
>From f2df8156500f55a4db433dc385da5b4020fe8fa9 Mon Sep 17 00:00:00 2001 From: Nathan Bossart <nathandboss...@gmail.com> Date: Wed, 4 Jan 2023 11:38:20 -0800 Subject: [PATCH v12 2/2] bypass wal_retrieve_retry_interval for logical workers as appropriate --- src/backend/commands/subscriptioncmds.c | 9 +- src/backend/replication/logical/launcher.c | 155 +++++++++++++++----- src/backend/replication/logical/tablesync.c | 1 + src/backend/replication/logical/worker.c | 2 +- src/include/replication/worker_internal.h | 3 + 5 files changed, 128 insertions(+), 42 deletions(-) diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index b9bbb2cf4e..3293ec2043 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -1137,8 +1137,13 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, BoolGetDatum(opts.enabled); replaces[Anum_pg_subscription_subenabled - 1] = true; - if (opts.enabled) - ApplyLauncherWakeupAtCommit(); + /* + * Even if the subscription is disabled, we wake up the + * launcher so that it clears its last start time. This + * ensures the workers will be able to start up right away when + * the subscription is enabled again. + */ + ApplyLauncherWakeupAtCommit(); update_tuple = true; break; diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c index a69e371c05..544e73dd62 100644 --- a/src/backend/replication/logical/launcher.c +++ b/src/backend/replication/logical/launcher.c @@ -297,7 +297,7 @@ retry: { LogicalRepWorker *w = &LogicalRepCtx->workers[i]; - if (!w->in_use) + if (!w->in_use && !w->restart_immediately) { worker = w; slot = i; @@ -620,8 +620,14 @@ logicalrep_worker_cleanup(LogicalRepWorker *worker) worker->proc = NULL; worker->dbid = InvalidOid; worker->userid = InvalidOid; - worker->subid = InvalidOid; worker->relid = InvalidOid; + + /* + * If restart_immediately was set, retain the subscription ID so that the + * launcher knows which worker it should restart right away. + */ + if (!worker->restart_immediately) + worker->subid = InvalidOid; } /* @@ -801,7 +807,13 @@ ApplyLauncherWakeup(void) void ApplyLauncherMain(Datum main_arg) { - TimestampTz last_start_time = 0; + struct launcher_start_time_mapping + { + Oid subid; + TimestampTz last_start_time; + }; + HTAB *last_start_times = NULL; + HASHCTL ctl; ereport(DEBUG1, (errmsg_internal("logical replication launcher started"))); @@ -822,6 +834,18 @@ ApplyLauncherMain(Datum main_arg) */ BackgroundWorkerInitializeConnection(NULL, NULL, 0); + /* + * Prepare a hash table for tracking last start times of workers, to avoid + * immediate restarts. Ideally, this hash table would be created in shared + * memory so that other backends could adjust it directly to avoid race + * conditions, but it would need to be a fixed size, and the number of + * subscriptions is virtually unbounded. + */ + ctl.keysize = sizeof(Oid); + ctl.entrysize = sizeof(struct launcher_start_time_mapping); + last_start_times = hash_create("Logical replication apply worker start times", + 256, &ctl, HASH_ELEM | HASH_BLOBS); + /* Enter main loop */ for (;;) { @@ -832,63 +856,116 @@ ApplyLauncherMain(Datum main_arg) MemoryContext oldctx; TimestampTz now; long wait_time = DEFAULT_NAPTIME_PER_CYCLE; + HASH_SEQ_STATUS status; + struct launcher_start_time_mapping *hentry; CHECK_FOR_INTERRUPTS(); now = GetCurrentTimestamp(); - /* Limit the start retry to once a wal_retrieve_retry_interval */ - if (TimestampDifferenceExceeds(last_start_time, now, - wal_retrieve_retry_interval)) + /* Use temporary context for the database list and worker info. */ + subctx = AllocSetContextCreate(TopMemoryContext, + "Logical Replication Launcher sublist", + ALLOCSET_DEFAULT_SIZES); + oldctx = MemoryContextSwitchTo(subctx); + + sublist = get_subscription_list(); + foreach(lc, sublist) { - /* Use temporary context for the database list and worker info. */ - subctx = AllocSetContextCreate(TopMemoryContext, - "Logical Replication Launcher sublist", - ALLOCSET_DEFAULT_SIZES); - oldctx = MemoryContextSwitchTo(subctx); + Subscription *sub = (Subscription *) lfirst(lc); + bool bypass_retry_interval = false; + LogicalRepWorker *w; - /* search for subscriptions to start or stop. */ - sublist = get_subscription_list(); + LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE); - /* Start the missing workers for enabled subscriptions. */ - foreach(lc, sublist) + /* + * Bypass wal_retrieve_retry_interval if the worker set + * restart_immediately. We do this before checking if the + * subscription is enabled so that the slot can be freed + * regardless. + */ + for (int i = 0; i < max_logical_replication_workers; i++) { - Subscription *sub = (Subscription *) lfirst(lc); - LogicalRepWorker *w; + w = &LogicalRepCtx->workers[i]; - if (!sub->enabled) - continue; + if (!w->in_use && !w->proc && w->subid == sub->oid && + w->relid == InvalidOid && w->restart_immediately) + { + w->restart_immediately = false; + logicalrep_worker_cleanup(w); + bypass_retry_interval = true; + break; + } + } - LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); - w = logicalrep_worker_find(sub->oid, InvalidOid, false); - LWLockRelease(LogicalRepWorkerLock); + w = logicalrep_worker_find(sub->oid, InvalidOid, false); + LWLockRelease(LogicalRepWorkerLock); - if (w == NULL) - { - last_start_time = now; - wait_time = wal_retrieve_retry_interval; + /* + * If the subscription isn't enabled, clear its entry in + * last_start_times so that its apply worker is restarted right + * away when it is enabled again. There is a chance that the + * subscription could be enabled again before we've had a chance to + * clear its entry, in which case we'll wait a little bit before + * starting the worker. + */ + if (!sub->enabled) + { + hash_search(last_start_times, &sub->oid, HASH_REMOVE, NULL); + continue; + } + + /* + * If its okay to start the worker now, do so. Otherwise, adjust + * wait_time so that we wake up when we can start it. + */ + if (w == NULL) + { + bool found; + hentry = hash_search(last_start_times, &sub->oid, + HASH_ENTER, &found); + + if (bypass_retry_interval || !found || + TimestampDifferenceExceeds(hentry->last_start_time, now, + wal_retrieve_retry_interval)) + { + hentry->last_start_time = now; logicalrep_worker_launch(sub->dbid, sub->oid, sub->name, sub->owner, InvalidOid); } - } + else + { + long elapsed; - /* Switch back to original memory context. */ - MemoryContextSwitchTo(oldctx); - /* Clean the temporary memory. */ - MemoryContextDelete(subctx); + elapsed = TimestampDifferenceMilliseconds(hentry->last_start_time, now); + wait_time = Min(wait_time, wal_retrieve_retry_interval - elapsed); + } + } } - else + + /* + * Do garbage collection on the last_start_times hash table. In + * theory, a subscription OID could be reused before its entry is + * removed, but the risk of that seems low, and at worst the launcher + * will wait a bit longer before starting the new subscription's apply + * worker. This risk could be reduced by removing entries for + * subscriptions that aren't in sublist, but it doesn't seem worth the + * trouble. + */ + hash_seq_init(&status, last_start_times); + while ((hentry = (struct launcher_start_time_mapping *) hash_seq_search(&status)) != NULL) { - /* - * The wait in previous cycle was interrupted in less than - * wal_retrieve_retry_interval since last worker was started, this - * usually means crash of the worker, so we should retry in - * wal_retrieve_retry_interval again. - */ - wait_time = wal_retrieve_retry_interval; + if (TimestampDifferenceExceeds(hentry->last_start_time, now, + wal_retrieve_retry_interval)) + hash_search(last_start_times, &hentry->subid, HASH_REMOVE, NULL); } + /* Switch back to original memory context. */ + MemoryContextSwitchTo(oldctx); + /* Clean the temporary memory. */ + MemoryContextDelete(subctx); + /* Wait for more work. */ rc = WaitLatch(MyLatch, WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index 85e2a2861f..d6213c55fc 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -618,6 +618,7 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) (errmsg("logical replication apply worker for subscription \"%s\" will restart so that two_phase can be enabled", MySubscription->name))); + MyLogicalRepWorker->restart_immediately = true; should_exit = true; } diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 3e2ea32e1e..0521459898 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -3130,7 +3130,7 @@ maybe_reread_subscription(void) ereport(LOG, (errmsg("logical replication apply worker for subscription \"%s\" will restart because of a parameter change", MySubscription->name))); - + MyLogicalRepWorker->restart_immediately = true; proc_exit(0); } diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h index 2a3ec5c2d8..68a56c175b 100644 --- a/src/include/replication/worker_internal.h +++ b/src/include/replication/worker_internal.h @@ -66,6 +66,9 @@ typedef struct LogicalRepWorker TimestampTz last_recv_time; XLogRecPtr reply_lsn; TimestampTz reply_time; + + /* Should the launcher restart the worker immediately? */ + bool restart_immediately; } LogicalRepWorker; /* Main memory context for apply worker. Permanent during worker lifetime. */ -- 2.25.1