Hi Shveta, Here is the v4 of the patch with pg_replication_slots view modified to display the field allow_overwrite. Doc was also updated.
Regards, Fabrice On Tue, Sep 30, 2025 at 11:45 AM shveta malik <[email protected]> wrote: > On Tue, Sep 30, 2025 at 3:11 PM shveta malik <[email protected]> > wrote: > > > > On Tue, Sep 30, 2025 at 12:17 PM Fabrice Chapuis > > <[email protected]> wrote: > > > > > > Hi, > > > > > > Here the generated v2 of the Patch. > > > > > > > Thanks. I have refactored the code for synchronize_one_slot() as there > > was some code-repetition. Please take it if you find it okay. > > > > Also I felt that when we create a slot through slot-synchronization, > > we should create it with allow_overwrite as false. And thus in the > > attached patch, I have changed that part as well. > > > > It is a top up patch. Attached it as txt, please rename before > > applying atop your changes. Attached the steps for your reference. > > > > Next, pg_replication_slots (pg_get_replication_slots) can be modified > to display the new property. And the corresponding doc for > pg_replication_slots (doc/src/sgml/system-views.sgml) can be updated. > > thanks > Shveta >
From 25e513bf49f3afe4b6ed7333ae46705c357eeb47 Mon Sep 17 00:00:00 2001 From: Fabrice Chapuis <[email protected]> Date: Mon, 29 Sep 2025 13:10:38 +0200 Subject: [PATCH v4] Add allow_overwrite option to logical replication slot creation This patch adds a new parameter allow_overwrite to pg_create_logical_replication_slot. When true, an existing failover slot on standby will be dropped and replaced automatically. Signed-off-by: Fabrice Chapuis <[email protected]> --- doc/src/sgml/system-views.sgml | 12 ++++++++ src/backend/catalog/system_functions.sql | 1 + src/backend/catalog/system_views.sql | 3 +- src/backend/replication/logical/launcher.c | 2 +- src/backend/replication/logical/slotsync.c | 34 ++++++++++++++++++---- src/backend/replication/slot.c | 3 +- src/backend/replication/slotfuncs.c | 13 ++++++--- src/backend/replication/walsender.c | 4 +-- src/include/catalog/pg_proc.dat | 14 ++++----- src/include/replication/slot.h | 8 ++++- 10 files changed, 72 insertions(+), 22 deletions(-) diff --git a/doc/src/sgml/system-views.sgml b/doc/src/sgml/system-views.sgml index 4187191ea74..0d94622cd0c 100644 --- a/doc/src/sgml/system-views.sgml +++ b/doc/src/sgml/system-views.sgml @@ -3036,6 +3036,18 @@ SELECT * FROM pg_locks pl LEFT JOIN pg_prepared_xacts ppx </para></entry> </row> + <row> + <entry role="catalog_table_entry"><para role="column_definition"> + <structfield>allow_overwrite</structfield> <type>bool</type> + </para> + <para> + The parameter controls whether existing logical replication slot metadata may + be overwritten on the standby instance. The default is <literal>false</literal>. + When true, an existing failover slot on standby instance will be dropped and replaced + automatically. + </para></entry> + </row> + </tbody> </tgroup> </table> diff --git a/src/backend/catalog/system_functions.sql b/src/backend/catalog/system_functions.sql index 2d946d6d9e9..75ea84fd837 100644 --- a/src/backend/catalog/system_functions.sql +++ b/src/backend/catalog/system_functions.sql @@ -480,6 +480,7 @@ CREATE OR REPLACE FUNCTION pg_create_logical_replication_slot( IN temporary boolean DEFAULT false, IN twophase boolean DEFAULT false, IN failover boolean DEFAULT false, + IN allow_overwrite boolean DEFAULT false, OUT slot_name name, OUT lsn pg_lsn) RETURNS RECORD LANGUAGE INTERNAL diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index c77fa0234bb..7fc42ddbe66 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -1046,7 +1046,8 @@ CREATE VIEW pg_replication_slots AS L.conflicting, L.invalidation_reason, L.failover, - L.synced + L.synced, + L.allow_overwrite FROM pg_get_replication_slots() AS L LEFT JOIN pg_database D ON (L.datoid = D.oid); diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c index 218cefe86e2..a26af527775 100644 --- a/src/backend/replication/logical/launcher.c +++ b/src/backend/replication/logical/launcher.c @@ -1519,7 +1519,7 @@ CreateConflictDetectionSlot(void) errmsg("creating replication conflict detection slot")); ReplicationSlotCreate(CONFLICT_DETECTION_SLOT, false, RS_PERSISTENT, false, - false, false); + false, false, false); init_conflict_slot_xmin(); } diff --git a/src/backend/replication/logical/slotsync.c b/src/backend/replication/logical/slotsync.c index 8c061d55bdb..a8d44fe6b45 100644 --- a/src/backend/replication/logical/slotsync.c +++ b/src/backend/replication/logical/slotsync.c @@ -649,22 +649,45 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid) return false; } +retry: /* Search for the named slot */ if ((slot = SearchNamedReplicationSlot(remote_slot->name, true))) { bool synced; + bool allow_overwrite; SpinLockAcquire(&slot->mutex); synced = slot->data.synced; + allow_overwrite = slot->data.allow_overwrite; SpinLockRelease(&slot->mutex); /* User-created slot with the same name exists, raise ERROR. */ if (!synced) - ereport(ERROR, - errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), - errmsg("exiting from slot synchronization because same" - " name slot \"%s\" already exists on the standby", - remote_slot->name)); + { + /* Check if we need to overwrite an existing logical slot */ + if (allow_overwrite) + { + ereport(LOG, + errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("slot \"%s\" already exists" + " on the standby but will be overwritten as" + " allow_overwrite is set to true", + remote_slot->name)); + + /* Get rid of a replication slot that is no longer wanted */ + ReplicationSlotAcquire(remote_slot->name, true, false); + ReplicationSlotDropAcquired(); + goto retry; + } + else + { + ereport(ERROR, + errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("exiting from slot synchronization because same" + " name slot \"%s\" already exists on the standby", + remote_slot->name)); + } + } /* * The slot has been synchronized before. @@ -760,6 +783,7 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid) ReplicationSlotCreate(remote_slot->name, true, RS_TEMPORARY, remote_slot->two_phase, remote_slot->failover, + false, true); /* For shorter lines. */ diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index fd0fdb96d42..fcba4d4cff9 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -351,7 +351,7 @@ IsSlotForConflictCheck(const char *name) void ReplicationSlotCreate(const char *name, bool db_specific, ReplicationSlotPersistency persistency, - bool two_phase, bool failover, bool synced) + bool two_phase, bool failover, bool allow_overwrite, bool synced) { ReplicationSlot *slot = NULL; int i; @@ -445,6 +445,7 @@ ReplicationSlotCreate(const char *name, bool db_specific, slot->data.two_phase = two_phase; slot->data.two_phase_at = InvalidXLogRecPtr; slot->data.failover = failover; + slot->data.allow_overwrite = allow_overwrite; slot->data.synced = synced; /* and then data only present in shared memory */ diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c index b8f21153e7b..6a6a03e9a4d 100644 --- a/src/backend/replication/slotfuncs.c +++ b/src/backend/replication/slotfuncs.c @@ -41,7 +41,7 @@ create_physical_replication_slot(char *name, bool immediately_reserve, /* acquire replication slot, this will check for conflicting names */ ReplicationSlotCreate(name, false, temporary ? RS_TEMPORARY : RS_PERSISTENT, false, - false, false); + false, false, false); if (immediately_reserve) { @@ -116,7 +116,7 @@ pg_create_physical_replication_slot(PG_FUNCTION_ARGS) static void create_logical_replication_slot(char *name, char *plugin, bool temporary, bool two_phase, - bool failover, + bool failover, bool allow_overwrite, XLogRecPtr restart_lsn, bool find_startpoint) { @@ -134,7 +134,7 @@ create_logical_replication_slot(char *name, char *plugin, */ ReplicationSlotCreate(name, true, temporary ? RS_TEMPORARY : RS_EPHEMERAL, two_phase, - failover, false); + failover, allow_overwrite, false); /* * Create logical decoding context to find start point or, if we don't @@ -173,6 +173,7 @@ pg_create_logical_replication_slot(PG_FUNCTION_ARGS) bool temporary = PG_GETARG_BOOL(2); bool two_phase = PG_GETARG_BOOL(3); bool failover = PG_GETARG_BOOL(4); + bool allow_overwrite = PG_GETARG_BOOL(5); Datum result; TupleDesc tupdesc; HeapTuple tuple; @@ -191,6 +192,7 @@ pg_create_logical_replication_slot(PG_FUNCTION_ARGS) temporary, two_phase, failover, + allow_overwrite, InvalidXLogRecPtr, true); @@ -235,7 +237,7 @@ pg_drop_replication_slot(PG_FUNCTION_ARGS) Datum pg_get_replication_slots(PG_FUNCTION_ARGS) { -#define PG_GET_REPLICATION_SLOTS_COLS 20 +#define PG_GET_REPLICATION_SLOTS_COLS 21 ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; XLogRecPtr currlsn; int slotno; @@ -443,6 +445,8 @@ pg_get_replication_slots(PG_FUNCTION_ARGS) values[i++] = BoolGetDatum(slot_contents.data.synced); + values[i++] = BoolGetDatum(slot_contents.data.allow_overwrite); + Assert(i == PG_GET_REPLICATION_SLOTS_COLS); tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc, @@ -726,6 +730,7 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot) temporary, false, false, + false, src_restart_lsn, false); } diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 59822f22b8d..40586b273ba 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -1202,7 +1202,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd) { ReplicationSlotCreate(cmd->slotname, false, cmd->temporary ? RS_TEMPORARY : RS_PERSISTENT, - false, false, false); + false, false, false, false); if (reserve_wal) { @@ -1233,7 +1233,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd) */ ReplicationSlotCreate(cmd->slotname, true, cmd->temporary ? RS_TEMPORARY : RS_EPHEMERAL, - two_phase, failover, false); + two_phase, failover, false, false); /* * Do options check early so that we can bail before calling the diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index 01eba3b5a19..2f980c0dd4f 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -11503,17 +11503,17 @@ proname => 'pg_get_replication_slots', prorows => '10', proisstrict => 'f', proretset => 't', provolatile => 's', prorettype => 'record', proargtypes => '', - proallargtypes => '{name,name,text,oid,bool,bool,int4,xid,xid,pg_lsn,pg_lsn,text,int8,bool,pg_lsn,timestamptz,bool,text,bool,bool}', - proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}', - proargnames => '{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn,wal_status,safe_wal_size,two_phase,two_phase_at,inactive_since,conflicting,invalidation_reason,failover,synced}', + proallargtypes => '{name,name,text,oid,bool,bool,int4,xid,xid,pg_lsn,pg_lsn,text,int8,bool,pg_lsn,timestamptz,bool,text,bool,bool,bool}', + proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}', + proargnames => '{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn,wal_status,safe_wal_size,two_phase,two_phase_at,inactive_since,conflicting,invalidation_reason,failover,synced,allow_overwrite}', prosrc => 'pg_get_replication_slots' }, { oid => '3786', descr => 'set up a logical replication slot', proname => 'pg_create_logical_replication_slot', provolatile => 'v', proparallel => 'u', prorettype => 'record', - proargtypes => 'name name bool bool bool', - proallargtypes => '{name,name,bool,bool,bool,name,pg_lsn}', - proargmodes => '{i,i,i,i,i,o,o}', - proargnames => '{slot_name,plugin,temporary,twophase,failover,slot_name,lsn}', + proargtypes => 'name name bool bool bool bool', + proallargtypes => '{name,name,bool,bool,bool,bool,name,pg_lsn}', + proargmodes => '{i,i,i,i,i,i,o,o}', + proargnames => '{slot_name,plugin,temporary,twophase,failover,allow_overwrite,slot_name,lsn}', prosrc => 'pg_create_logical_replication_slot' }, { oid => '4222', descr => 'copy a logical replication slot, changing temporality and plugin', diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h index fe62162cde3..ca50b79daaa 100644 --- a/src/include/replication/slot.h +++ b/src/include/replication/slot.h @@ -141,6 +141,12 @@ typedef struct ReplicationSlotPersistentData * for logical slots on the primary server. */ bool failover; + + /* + * Allow Postgres to drop logical replication slot on standby server to ensure + * creation of new failover slot when remote sync_replication_slots is set to true. + */ + bool allow_overwrite; } ReplicationSlotPersistentData; /* @@ -301,7 +307,7 @@ extern void ReplicationSlotsShmemInit(void); /* management of individual slots */ extern void ReplicationSlotCreate(const char *name, bool db_specific, ReplicationSlotPersistency persistency, - bool two_phase, bool failover, + bool two_phase, bool failover, bool allow_overwrite, bool synced); extern void ReplicationSlotPersist(void); extern void ReplicationSlotDrop(const char *name, bool nowait); -- 2.39.5
