On Mon, 30 Jan 2023 at 13:00, houzj.f...@fujitsu.com <houzj.f...@fujitsu.com> wrote: > > On Monday, January 30, 2023 2:32 PM Amit Kapila <amit.kapil...@gmail.com> > wrote: > > > > On Mon, Jan 30, 2023 at 9:20 AM vignesh C <vignes...@gmail.com> wrote: > > > > > > On Sat, 28 Jan 2023 at 11:26, Amit Kapila <amit.kapil...@gmail.com> wrote: > > > > > > > > One thing that looks a bit odd is that we will anyway have a similar > > > > check in replorigin_drop_guts() which is a static function and > > > > called from only one place, so, will it be required to check at both > > > > places? > > > > > > There is a possibility that the initial check to verify if replication > > > origin exists in replorigin_drop_by_name was successful but later one > > > of either table sync worker or apply worker process might have dropped > > > the replication origin, > > > > > > > Won't locking on the particular origin prevent concurrent drops? IIUC, the > > drop happens after the patch acquires the lock on the origin. > > Yes, I think the existence check in replorigin_drop_guts is unnecessary as we > already lock the origin before that. I think the check in replorigin_drop_guts > is a custom check after calling SearchSysCache1 to get the tuple, but the > error > should not happen as no concurrent drop can be performed. > > To make it simpler, one idea is to move the code that getting the tuple from > system cache to the replorigin_drop_by_name(). After locking the origin, we > can try to get the tuple and do the existence check, and we can reuse > this tuple to perform origin delete. In this approach we only need to check > origin existence once after locking. BTW, if we do this, then we'd better > rename the > replorigin_drop_guts() to something like replorigin_state_clear() as the > function > only clear the in-memory information after that. > > The code could be like: > > ------- > replorigin_drop_by_name(const char *name, bool missing_ok, bool nowait) > ... > /* > * Lock the origin to prevent concurrent drops. We keep the lock > until the > * end of transaction. > */ > LockSharedObject(ReplicationOriginRelationId, roident, 0, > AccessExclusiveLock); > > tuple = SearchSysCache1(REPLORIGIDENT, ObjectIdGetDatum(roident)); > if (!HeapTupleIsValid(tuple)) > { > if (!missing_ok) > elog(ERROR, "cache lookup failed for replication > origin with ID %d", > roident); > > return; > } > > replorigin_state_clear(rel, roident, nowait); > > /* > * Now, we can delete the catalog entry. > */ > CatalogTupleDelete(rel, &tuple->t_self); > ReleaseSysCache(tuple); > > CommandCounterIncrement();
The attached updated patch has the changes to handle the same. Regards, Vignesh
From dc2e9acc6a55772896117cf3b88e4189f994a82d Mon Sep 17 00:00:00 2001 From: Vignesh C <vignes...@gmail.com> Date: Fri, 27 Jan 2023 15:17:09 +0530 Subject: [PATCH v2] Lock the replication origin record instead of locking the pg_replication_origin relation. Lock the replication origin record instead of locking the pg_replication_origin relation. --- src/backend/replication/logical/origin.c | 56 ++++++++++++------------ 1 file changed, 27 insertions(+), 29 deletions(-) diff --git a/src/backend/replication/logical/origin.c b/src/backend/replication/logical/origin.c index b754c43840..3e360cf41e 100644 --- a/src/backend/replication/logical/origin.c +++ b/src/backend/replication/logical/origin.c @@ -338,16 +338,14 @@ replorigin_create(const char *roname) * Helper function to drop a replication origin. */ static void -replorigin_drop_guts(Relation rel, RepOriginId roident, bool nowait) +replorigin_state_clear(RepOriginId roident, bool nowait) { - HeapTuple tuple; int i; /* - * First, clean up the slot state info, if there is any matching slot. + * Clean up the slot state info, if there is any matching slot. */ restart: - tuple = NULL; LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE); for (i = 0; i < max_replication_slots; i++) @@ -402,19 +400,6 @@ restart: } LWLockRelease(ReplicationOriginLock); ConditionVariableCancelSleep(); - - /* - * Now, we can delete the catalog entry. - */ - tuple = SearchSysCache1(REPLORIGIDENT, ObjectIdGetDatum(roident)); - if (!HeapTupleIsValid(tuple)) - elog(ERROR, "cache lookup failed for replication origin with ID %d", - roident); - - CatalogTupleDelete(rel, &tuple->t_self); - ReleaseSysCache(tuple); - - CommandCounterIncrement(); } /* @@ -427,24 +412,37 @@ replorigin_drop_by_name(const char *name, bool missing_ok, bool nowait) { RepOriginId roident; Relation rel; + HeapTuple tuple; Assert(IsTransactionState()); - /* - * To interlock against concurrent drops, we hold ExclusiveLock on - * pg_replication_origin till xact commit. - * - * XXX We can optimize this by acquiring the lock on a specific origin by - * using LockSharedObject if required. However, for that, we first to - * acquire a lock on ReplicationOriginRelationId, get the origin_id, lock - * the specific origin and then re-check if the origin still exists. - */ - rel = table_open(ReplicationOriginRelationId, ExclusiveLock); + rel = table_open(ReplicationOriginRelationId, RowExclusiveLock); roident = replorigin_by_name(name, missing_ok); - if (OidIsValid(roident)) - replorigin_drop_guts(rel, roident, nowait); + /* Lock the origin to prevent concurrent drops */ + LockSharedObject(ReplicationOriginRelationId, roident, 0, + AccessExclusiveLock); + + tuple = SearchSysCache1(REPLORIGIDENT, ObjectIdGetDatum(roident)); + if (!HeapTupleIsValid(tuple)) + { + if (!missing_ok) + elog(ERROR, "cache lookup failed for replication origin with ID %d", + roident); + + return; + } + + replorigin_state_clear(roident, nowait); + + /* + * Now, we can delete the catalog entry. + */ + CatalogTupleDelete(rel, &tuple->t_self); + ReleaseSysCache(tuple); + + CommandCounterIncrement(); /* We keep the lock on pg_replication_origin until commit */ table_close(rel, NoLock); -- 2.34.1