On Thu, Sep 25, 2025 at 10:43 PM shveta malik <[email protected]> wrote: > > On Fri, Sep 26, 2025 at 12:46 AM Masahiko Sawada <[email protected]> > wrote: > > > > On Thu, Sep 25, 2025 at 4:57 AM shveta malik <[email protected]> wrote: > > > > > > On Tue, Sep 23, 2025 at 3:28 AM Masahiko Sawada <[email protected]> > > > wrote: > > > > > > > > > > > > I've attached the updated patch. It incorporates all comments I got so > > > > far and implements to lazily disable logical decoding. It's used only > > > > when the process tries to disable logical decoding during process > > > > exit. > > > > > > > > > > I am resuming the review now. I agree with the discussion of lazily > > > disabling logical decoding on ERROR or process-exit for temp-slot. > > > > > > Few initial comments: > > > > Thank you for the comments! > > > > > > > > 1) > > > I see that on standby too, during proc-exit, we set 'pending_disable'. > > > But it never resets it, as DisableLogicalDecodingIfNecessary is no-op > > > on standby. And thus the checkpoint keeps on attempting to reset it > > > everytime. Do we even need to set it on standby? > > > > > > Logfile has repeated: 'start completing pending logical decoding > > > disable request' > > > > Ugh, I missed that part. I think that standbys should not delegate the > > deactivation to the checkpointer uless the deactivation is actually > > required. > > > > > 2) > > > + ereport(LOG, > > > + (errmsg("skip disabling logical decoding as during process exit"))); > > > > > > 'as' not needed. > > > > I've fixed the above two points and attached the new version patch. > > > > Thanks. > > 1) > Currently, in the existing implementation, if a promotion is in > progress (delay_status_change = true) and, during that time, a process > exits (causing a temporary slot to be released), then on the standby, > we may end up setting pending_disable. As a result, the checkpointer > will have to wait for the transition to complete before it can proceed > with disabling logical decoding (if needed). > > a) > This means the checkpoint may be delayed further, depending on how > long it takes for all processes to respond to ProcSignalBarrier(). > > b) > Additionally, consider the case where the promotion fails midway > (after UpdateLogicalDecodingStatusEndOfRecovery). If the checkpointer > still sees RecoveryInProgress and delay_status_change as true, could > it end up waiting indefinitely for the transition to complete? In my > testing, when promotion fails and the startup process exits, it > usually causes the rest of the processes, including the checkpointer, > to terminate as well. So, it seems that a dangling pending_disable > state may not actually occur on standby in practice. > > I believe scenario (b) can't really happen, but I still wanted to > check with you.
I think so. We don't allow the system to continue starting up if the startup process exits with an error. > I am not sure if (a) is a real concern — what’s your take on it? Since the startup sets delay_status_change=true after creating a checkpoint by PerformRecoveryXLogAction(), I thought that it would not be a problem in practice even if the checkpointer ends up waiting for the recovery to complete. On the other hand, once the process delegated the deactivation to the checkpointer, it would also be okay not to disable logical decoding at its first attempt. One required change would be that if the checkpointer also skips the deactivation when delay_status_change=true, the startup would need to wake up the checkpointer after completing the recovery. Otherwise, the checkpointer might not disable logical decoding until the next checkpoint time. I wanted to avoid adding this part but I'm open to other opinions. > 2) > As per discussion in [1], there was a proposal to implement lazily > disabling decoding both in ERROR and proc-exit scenarios. But I see it > only implemented in proc-exit scenario. Are we planning to do it for > ERROR as well? After more thoughts, I realized that I missed the fact that we actually wrote an ABORT record during the process shutdown. ShutdownPostgres() that calls AbortOutOfAnyTransaction() is the last callback in before_shmem_exit callbacks. So it's probably okay to write STATUS_CHANGE record to disable logical decoding even during process shutdown. As for the race condition at the end of recovery between the startup process and processes updating the logical decoding status, we use delay_status_change flag so that any logical decoding status change initiated in the particular window (i.e., between the startup sets delay_status_change and the recovery completes) has to wait for the startup to complete all end-of-recovery actions. An alternative idea would be that we allow processes to write STATUS_CHANGE records in the particular window even during recovery, by using LocalSetXLogInsertAllowed(). If we implement these ideas, we can simplify the patch quite well as we no longer need the lazy behavior nor wait for the recovery to complete. I've attached a PoC patch that can be applied on top of the v15 patch. Feedback is very welcome. Regards, -- Masahiko Sawada Amazon Web Services: https://aws.amazon.com
From 767e11c048606508fd0ef251f4f2f3783c0cd677 Mon Sep 17 00:00:00 2001 From: Masahiko Sawada <[email protected]> Date: Mon, 29 Sep 2025 17:40:18 -0700 Subject: [PATCH v1501 2/6] POC change. --- src/backend/access/transam/xlog.c | 28 ++ src/backend/postmaster/checkpointer.c | 6 - src/backend/replication/logical/logicalctl.c | 263 +++--------------- src/backend/replication/slot.c | 8 +- src/include/access/xlog.h | 2 + src/include/replication/logicalctl.h | 2 +- .../recovery/t/049_effective_wal_level.pl | 25 +- 7 files changed, 76 insertions(+), 258 deletions(-) diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index ec43557152a..1729302a8ed 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -6224,6 +6224,10 @@ StartupXLOG(void) /* * Update logical decoding status in shared memory and write an * XLOG_LOGICAL_DECODING_STATUS_CHANGE, if necessary. + * + * Note that processes are allowed to change the logical decoding status + * and write the XLOG_LOGICAL_DECODING_STATUS_CHANGE records from this + * point. */ UpdateLogicalDecodingStatusEndOfRecovery(); @@ -7509,6 +7513,30 @@ CreateEndOfRecoveryRecord(void) END_CRIT_SECTION(); } +/* + * Writes a XLOG_LOGICAL_DECODING_STATUS_CHANGE record with the given + * status. + * + * Note that this function temporary acquires an exception of writing + * the WAL record even during recovery to deal with the race condition + * (see comments in start_logical_decoding_status_change() for details). + */ +void +CreateLogicalDecodingStatusChangeRecord(bool status) +{ + int oldXLogAllowed; + XLogRecPtr recptr; + + oldXLogAllowed = LocalSetXLogInsertAllowed(); + + XLogBeginInsert(); + XLogRegisterData(&status, sizeof(bool)); + recptr = XLogInsert(RM_XLOG_ID, XLOG_LOGICAL_DECODING_STATUS_CHANGE); + XLogFlush(recptr); + + LocalXLogInsertAllowed = oldXLogAllowed; +} + /* * Write an OVERWRITE_CONTRECORD message. * diff --git a/src/backend/postmaster/checkpointer.c b/src/backend/postmaster/checkpointer.c index 4edce058f9d..e188a02e2db 100644 --- a/src/backend/postmaster/checkpointer.c +++ b/src/backend/postmaster/checkpointer.c @@ -378,12 +378,6 @@ CheckpointerMain(const void *startup_data, size_t startup_data_len) */ AbsorbSyncRequests(); - /* - * Disable logical decoding if someone requested to disable logical - * decoding. See comments atop logicalctl.c. - */ - DisableLogicalDecodingIfNecessary(true); - ProcessCheckpointerInterrupts(); if (ShutdownXLOGPending || ShutdownRequestPending) break; diff --git a/src/backend/replication/logical/logicalctl.c b/src/backend/replication/logical/logicalctl.c index 9113b1cf13f..ebd34234231 100644 --- a/src/backend/replication/logical/logicalctl.c +++ b/src/backend/replication/logical/logicalctl.c @@ -21,15 +21,6 @@ * decoding. Deactivation follows a similar careful, multi-step process * in reverse order. * - * Activation and deactivation typically occur synchronously after the first - * logical slot is created and after the last logical slot is dropped, - * respectively. However, if deactivation is required when a process is exiting, - * it is delegated to the checkpointer and performed lazily. This is because - * deactivation involves waits and writing a WAL record, which could be - * problematic during process exit when the process is holding interrupts. - * This situation might arise when a process cleans up temporary or ephemeral - * slots on error or at process exit without releasing them explicitly. - * * Standby servers inherit the logical decoding and logical WAL writing status * from the primary server. Unlike normal activation and deactivation, these * are updated simultaneously without status change coordination, solely by @@ -91,22 +82,11 @@ typedef struct LogicalDecodingCtlData bool status_change_inprogress; /* - * Logical decoding is normally disabled after dropping the last logical - * slot, but if it happens during process exit time for example when - * releasing a temporary logical slot on an error, the process sets this - * flag to true, delegating the checkpointer to disable logical decoding - * asynchronously. - */ - bool pending_disable; - - /* - * This flag is set to true by the startup process during recovery, to - * delay any logical decoding status change attempts until the recovery - * actually completes. The flag is set to true only during the recovery by - * the startup process. See comments in - * start_logical_decoding_status_change() for details. + * True if the status change is allowed even during recovery. This flag + * starts with false and is set to true at end of the recovery by the + * startup process. */ - bool delay_status_change; + bool allow_status_change; /* Condition variable signaled when a status change completes */ ConditionVariable cv; @@ -124,8 +104,6 @@ bool XLogLogicalInfo = false; static void update_xlog_logical_info(void); static void abort_logical_decoding_activation(int code, Datum arg); static bool start_logical_decoding_status_change(bool new_status); -static bool check_wait_for_recovery_completion(void); -static void wait_for_recovery_completion(void); Size LogicalDecodingCtlShmemSize(void) @@ -147,8 +125,7 @@ LogicalDecodingCtlShmemInit(void) LogicalDecodingCtl->xlog_logical_info = false; LogicalDecodingCtl->logical_decoding_enabled = false; LogicalDecodingCtl->status_change_inprogress = false; - LogicalDecodingCtl->pending_disable = false; - LogicalDecodingCtl->delay_status_change = false; + LogicalDecodingCtl->allow_status_change = false; ConditionVariableInit(&LogicalDecodingCtl->cv); } } @@ -300,13 +277,31 @@ abort_logical_decoding_activation(int code, Datum arg) * the process of logical decoding status change. If the status change is * required, it ensures we can change logical decoding status, setting * LogicalDecodingCtl->status_change_inprogress on, and returns true. - * Otherwise, if it's not required or not allowed (e.g., during recovery - * or wal_level = 'logical'), it returns false. + * Otherwise, if it's not required returns false. */ static bool start_logical_decoding_status_change(bool new_status) { - Assert(!RecoveryInProgress()); + if (RecoveryInProgress()) + { + bool allow_status_change; + + /* + * During recovery, there is a window between the startup updates the + * logical decoding status at the end of recovery and it actually + * completes the recovery (i.e., until RecoveryInProgress() returns + * false), where no one is basically allowed to insert WAL records but + * logical decoding change could happen. Therefore, we need to check + * if we're in this window, and if so we can proceed with the status + * change. + */ + LWLockAcquire(LogicalDecodingControlLock, LW_SHARED); + allow_status_change = LogicalDecodingCtl->allow_status_change; + LWLockRelease(LogicalDecodingControlLock); + + if (!allow_status_change) + return false; + } retry: LWLockAcquire(LogicalDecodingControlLock, LW_EXCLUSIVE); @@ -317,7 +312,6 @@ retry: */ if (!new_status && CheckLogicalSlotExists()) { - LogicalDecodingCtl->pending_disable = false; LWLockRelease(LogicalDecodingControlLock); return false; } @@ -336,11 +330,11 @@ retry: goto retry; } - /* Return if we don't need to change the status */ if (LogicalDecodingCtl->logical_decoding_enabled == new_status) { - LogicalDecodingCtl->pending_disable = false; LWLockRelease(LogicalDecodingControlLock); + + /* no need to change the status */ return false; } @@ -352,56 +346,6 @@ retry: return true; } -/* - * Returns true if the caller needs to wait for the recovery to complete - * before proceeding with the status change process. - */ -static bool -check_wait_for_recovery_completion(void) -{ - bool delay_status_change; - - /* - * During recovery, there is a race condition with the startup process's - * end-of-recovery action; after the startup process updates logical - * decoding status at the end of recovery, it's possible that other - * processes try to enable or disable logical decoding status before the - * recovery completes but are unable to write WAL records. Therefore, if - * the startup process has done its end-of-recovery work, we need to wait - * for the recovery to actually finish. - */ - LWLockAcquire(LogicalDecodingControlLock, LW_SHARED); - delay_status_change = LogicalDecodingCtl->delay_status_change; - LWLockRelease(LogicalDecodingControlLock); - - return delay_status_change; -} - -/* - * Wait for the recovery to complete, i.e., until RecoveryInProgress() - * return false. - */ -static void -wait_for_recovery_completion(void) -{ - elog(DEBUG1, - "waiting for recovery completion to change logical decoding status"); - - /* - * The startup process already updated logical decoding status at the end - * of recovery but it might not be allowed to write WAL records yet. Wait - * for the recovery to complete and check the status again. - */ - while (RecoveryInProgress()) - { - CHECK_FOR_INTERRUPTS(); - - pgstat_report_wait_start(WAIT_EVENT_LOGICAL_DECODING_STATUS_CHANGE_DELAY); - pg_usleep(100000L); /* wait for 100 msec */ - pgstat_report_wait_end(); - } -} - /* * Enable logical decoding if disabled. * @@ -418,19 +362,6 @@ EnsureLogicalDecodingEnabled(void) if (wal_level != WAL_LEVEL_REPLICA) return; - if (RecoveryInProgress()) - { - /* - * Check if we need to wait for the recovery completion. See the - * comments in check_wait_for_recovery_completion() for the reason why - * we check it here. - */ - if (!check_wait_for_recovery_completion()) - return; - - wait_for_recovery_completion(); - } - /* Prepare and start the activation process if it's disabled */ if (!start_logical_decoding_status_change(true)) return; @@ -480,18 +411,9 @@ EnsureLogicalDecodingEnabled(void) LWLockAcquire(LogicalDecodingControlLock, LW_EXCLUSIVE); LogicalDecodingCtl->logical_decoding_enabled = true; LogicalDecodingCtl->status_change_inprogress = false; - LogicalDecodingCtl->pending_disable = false; LWLockRelease(LogicalDecodingControlLock); - { - XLogRecPtr recptr; - bool logical_decoding = true; - - XLogBeginInsert(); - XLogRegisterData(&logical_decoding, sizeof(bool)); - recptr = XLogInsert(RM_XLOG_ID, XLOG_LOGICAL_DECODING_STATUS_CHANGE); - XLogFlush(recptr); - } + CreateLogicalDecodingStatusChangeRecord(true); END_CRIT_SECTION(); @@ -506,100 +428,16 @@ EnsureLogicalDecodingEnabled(void) * Disable logical decoding if necessary. * * This function expects to be called after dropping a possibly-last logical - * replication slot, or to complete the pending logical decoding deactivation. - * It disable logical decoding only if it was the last remaining logical slot - * and wal_level = 'replica'. Otherwise, it performs no action. - - * If complete_pending is true, it disables logical decoding only if there - * is a pending disable request. + * replication slot. It disable logical decoding only if it was the last + * remaining logical slot and wal_level = 'replica'. Otherwise, it performs + * no action. */ void -DisableLogicalDecodingIfNecessary(bool complete_pending) +DisableLogicalDecodingIfNecessary(void) { - bool need_wait = false; - - /* - * Both complete_pending and proc_exit_inprogress must not be true at the - * same time. - */ - Assert(!(complete_pending && proc_exit_inprogress)); - if (wal_level != WAL_LEVEL_REPLICA) return; - /* Check if there is a pending disable request */ - if (complete_pending) - { - bool pending_disable; - - LWLockAcquire(LogicalDecodingControlLock, LW_SHARED); - pending_disable = LogicalDecodingCtl->pending_disable; - LWLockRelease(LogicalDecodingControlLock); - - if (!pending_disable) - return; - - elog(DEBUG1, - "start completing pending logical decoding disable request"); - } - - if (RecoveryInProgress()) - { - /* - * Check if we need to wait for the recovery completion. See the - * comments in check_wait_for_recovery_completion() for the reason why - * we check it here. - */ - need_wait = check_wait_for_recovery_completion(); - - if (!need_wait) - return; - - /* - * We might be in the process of process exit, in which case we need - * to delegate to the checkpointer the deactivation. We will wait for - * the recovery completion after checking that. - */ - } - - /* - * If this function is called at process exit time for example when - * cleaning up temporary logical slots, we skip to disabling logical - * decoding and delegate to the checkpointer to do that since it could - * involve waits and writing a WAL record even though the process is - * holding any interrupts. - */ - if (proc_exit_inprogress) - { - volatile PROC_HDR *procglobal = ProcGlobal; - ProcNumber checkpointerProc = procglobal->checkpointerProc; - - /* - * It's possible that we might not actually need to disable logical - * decoding if someone creates a new logical slot concurrently. We set - * the flag anyway and the checkpointer will check it and disable - * logical decoding if necessary. - */ - LWLockAcquire(LogicalDecodingControlLock, LW_EXCLUSIVE); - LogicalDecodingCtl->pending_disable = true; - LWLockRelease(LogicalDecodingControlLock); - - if (checkpointerProc != INVALID_PROC_NUMBER) - SetLatch(&GetPGProcByNumber(checkpointerProc)->procLatch); - - ereport(LOG, - (errmsg("skip disabling logical decoding during process exit"))); - - return; - } - - /* - * Wait for the recovery to complete before entering the deactivation - * process - */ - if (need_wait) - wait_for_recovery_completion(); - /* Prepare and start the deactivation process if it's enabled */ if (!start_logical_decoding_status_change(false)) return; @@ -623,16 +461,7 @@ DisableLogicalDecodingIfNecessary(bool complete_pending) LWLockRelease(LogicalDecodingControlLock); /* Write the WAL to disable logical decoding on standbys too */ - if (XLogStandbyInfoActive()) - { - bool logical_decoding = false; - XLogRecPtr recptr; - - XLogBeginInsert(); - XLogRegisterData(&logical_decoding, sizeof(bool)); - recptr = XLogInsert(RM_XLOG_ID, XLOG_LOGICAL_DECODING_STATUS_CHANGE); - XLogFlush(recptr); - } + CreateLogicalDecodingStatusChangeRecord(false); /* Now disable logical information WAL logging */ LWLockAcquire(LogicalDecodingControlLock, LW_EXCLUSIVE); @@ -652,7 +481,6 @@ DisableLogicalDecodingIfNecessary(bool complete_pending) /* Complete the transition */ LWLockAcquire(LogicalDecodingControlLock, LW_EXCLUSIVE); LogicalDecodingCtl->status_change_inprogress = false; - LogicalDecodingCtl->pending_disable = false; LWLockRelease(LogicalDecodingControlLock); END_CRIT_SECTION(); @@ -700,29 +528,16 @@ UpdateLogicalDecodingStatusEndOfRecovery(void) UpdateLogicalDecodingStatus(new_status, false); /* - * We disallow any logical decoding status change until we actually - * completes the recovery, i.e., RecoveryInProgress() returns false. This - * is necessary to deal with the race condition that could happen after - * this point; processes are able to create or drop logical replication - * slots and tries to enable or disable logical decoding accordingly, but - * they are not allowed to write any WAL records until the recovery - * completes. + * Now that we completes the status change on the shmem, we allow + * processes to write XLOG_LOGICAL_DECODING_STATUS_CHANGE records prior to + * completing all end-of-recovery actions. */ - LogicalDecodingCtl->delay_status_change = true; + LogicalDecodingCtl->allow_status_change = true; LWLockRelease(LogicalDecodingControlLock); if (need_wal) - { - XLogRecPtr recptr; - - Assert(XLogStandbyInfoActive()); - - XLogBeginInsert(); - XLogRegisterData(&new_status, sizeof(bool)); - recptr = XLogInsert(RM_XLOG_ID, XLOG_LOGICAL_DECODING_STATUS_CHANGE); - XLogFlush(recptr); - } + CreateLogicalDecodingStatusChangeRecord(new_status); /* * Ensure all running processes have the updated status. We don't need to diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index 66de1a30cec..7978e5894c3 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -753,7 +753,7 @@ ReplicationSlotRelease(void) ReplicationSlotDropAcquired(); if (is_logical) - DisableLogicalDecodingIfNecessary(false); + DisableLogicalDecodingIfNecessary(); } /* @@ -864,7 +864,7 @@ restart: LWLockRelease(ReplicationSlotControlLock); if (dropped_logical && nlogicalslots == 0) - DisableLogicalDecodingIfNecessary(false); + DisableLogicalDecodingIfNecessary(); } /* @@ -894,7 +894,7 @@ ReplicationSlotDrop(const char *name, bool nowait) ReplicationSlotDropAcquired(); if (is_logical) - DisableLogicalDecodingIfNecessary(false); + DisableLogicalDecodingIfNecessary(); } /* @@ -1519,7 +1519,7 @@ restart: LWLockRelease(ReplicationSlotControlLock); if (dropped && nlogicalslots == 0) - DisableLogicalDecodingIfNecessary(false); + DisableLogicalDecodingIfNecessary(); } /* diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h index e90438a4111..02aefeddc41 100644 --- a/src/include/access/xlog.h +++ b/src/include/access/xlog.h @@ -268,6 +268,8 @@ extern XLogRecPtr GetLastImportantRecPtr(void); extern void SetWalWriterSleeping(bool sleeping); +extern void CreateLogicalDecodingStatusChangeRecord(bool status); + extern Size WALReadFromBuffers(char *dstbuf, XLogRecPtr startptr, Size count, TimeLineID tli); diff --git a/src/include/replication/logicalctl.h b/src/include/replication/logicalctl.h index 907bd562c7d..85ec7217805 100644 --- a/src/include/replication/logicalctl.h +++ b/src/include/replication/logicalctl.h @@ -22,7 +22,7 @@ extern bool ProcessBarrierUpdateXLogLogicalInfo(void); extern bool IsLogicalDecodingEnabled(void); extern bool IsXLogLogicalInfoEnabled(void); extern void EnsureLogicalDecodingEnabled(void); -extern void DisableLogicalDecodingIfNecessary(bool complete_pending); +extern void DisableLogicalDecodingIfNecessary(void); extern void UpdateLogicalDecodingStatus(bool new_status, bool need_lock); extern void UpdateLogicalDecodingStatusEndOfRecovery(void); diff --git a/src/test/recovery/t/049_effective_wal_level.pl b/src/test/recovery/t/049_effective_wal_level.pl index 64c748b7c18..83051f9cbf7 100644 --- a/src/test/recovery/t/049_effective_wal_level.pl +++ b/src/test/recovery/t/049_effective_wal_level.pl @@ -43,20 +43,10 @@ $primary->safe_psql('postgres', qq[select pg_drop_replication_slot('test_phy_slot')]); # Create a temporarly logical slot and exits without releasing it explicitely. -# This enables logical decoding but skips disabling it and delegetes to the -# checkpointer. $primary->safe_psql('postgres', qq[select pg_create_logical_replication_slot('test_tmp_slot', 'test_decoding', true)] ); -# Check if the process skipped to disable logical decoding. -$primary->wait_for_log( - "skip disabling logical decoding during process exit"); - -# Wait for the checkpointer to disable logical decoding. -$primary->poll_query_until('postgres', - qq[select current_setting('effective_wal_level') = 'replica';]); - # Create a new logical slot and check if effective_wal_level must be increased # to 'logical'. $primary->safe_psql('postgres', @@ -327,20 +317,12 @@ if ( $ENV{enable_injection_points} eq 'yes' "injection_point 'startup-logical-decoding-status-change-end-of-recovery' is reached" ); - # Drop the logical slot in background. We can drop the logical replication slot - # but have to wait for the recovery to complete before disabling logical decoding. - my $psql = $standby5->background_psql('postgres'); - $psql->query_until( - qr/drop_slot/, - q(\echo drop_slot -select pg_drop_replication_slot('standby5_slot'); -\q -)); + $standby5->safe_psql('postgres', + qq[select pg_drop_replication_slot('standby5_slot');]); $standby5->safe_psql('postgres', qq[select injection_points_wakeup('startup-logical-decoding-status-change-end-of-recovery')] ); - $psql->quit; # Check if logical decoding got disabled after the recovery. test_wal_level($standby5, "replica|replica", @@ -391,9 +373,6 @@ select pg_create_logical_replication_slot('test_slot', 'pgoutput'); select pg_cancel_backend(pid) from pg_stat_activity where query ~ 'slot_cancelled' and pid <> pg_backend_pid() ]); - # Check if the backend aborted the activation process. - $primary->wait_for_log("aborting logical decoding activation process"); - # Wait for the logical slot 'test_slot' to be created. $primary->poll_query_until('postgres', qq[select exists (select 1 from pg_replication_slots where slot_name = 'test_slot')] -- 2.47.3
