On Wed, Sep 13, 2017 at 12:48 AM, Arseny Sher <a.s...@postgrespro.ru> wrote:
> Masahiko Sawada <sawada.m...@gmail.com> writes:
>
>> FWIW, perhaps we can change the replication origin management so that
>> DROP SUBSCRIPTION doesn't drop the replication origin and the apply
>> worker itself removes it when exit. When an apply worker exits it
>> removes the replication origin if the corresponding subscription had
>> been removed.
>

After thought, I think we can change it like followings.
* If the replication origin is not acquired, DROP SUBSCRIPTION can drop it.
* If the replication origin is acquired by someone DROP SUBSCRIPTION
takes over a job of dropping it to the apply worker.
* The apply worker drops the replication origin when exit if the apply
worker has to drop it.

> I don't think this is reliable -- what if worker suddenly dies without
> accomplishing the job?

The apply worker will be launched by the launcher later. If DROP
SUBSCRIPTION is issued before the apply worker launches again, DROP
SUBSCRIPTION itself can remove the replication origin.

Attached a very rough patch for reference. It's very ugly but we can
deal with this case.

Regards,

--
Masahiko Sawada
NIPPON TELEGRAPH AND TELEPHONE CORPORATION
NTT Open Source Software Center
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 2ef414e..9ed773e 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -940,7 +940,7 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
 	snprintf(originname, sizeof(originname), "pg_%u", subid);
 	originid = replorigin_by_name(originname, true);
 	if (originid != InvalidRepOriginId)
-		replorigin_drop(originid, false);
+		replorigin_drop(originid, true, true, true);
 
 	/*
 	 * If there is no slot associated with the subscription, we can finish
diff --git a/src/backend/replication/logical/origin.c b/src/backend/replication/logical/origin.c
index edc6efb..05423a7 100644
--- a/src/backend/replication/logical/origin.c
+++ b/src/backend/replication/logical/origin.c
@@ -129,6 +129,8 @@ typedef struct ReplicationState
 	 */
 	ConditionVariable origin_cv;
 
+	bool	drop_by_worker;
+
 	/*
 	 * Lock protecting remote_lsn and local_lsn.
 	 */
@@ -329,7 +331,7 @@ replorigin_create(char *roname)
  * Needs to be called in a transaction.
  */
 void
-replorigin_drop(RepOriginId roident, bool nowait)
+replorigin_drop(RepOriginId roident, bool nowait, bool need_lock, bool takeover)
 {
 	HeapTuple	tuple;
 	Relation	rel;
@@ -342,7 +344,8 @@ replorigin_drop(RepOriginId roident, bool nowait)
 restart:
 	tuple = NULL;
 	/* cleanup the slot state info */
-	LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
+	if (need_lock)
+		LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
 
 	for (i = 0; i < max_replication_slots; i++)
 	{
@@ -355,6 +358,22 @@ restart:
 			{
 				ConditionVariable *cv;
 
+				if (takeover)
+				{
+					ereport(WARNING,
+							(errcode(ERRCODE_OBJECT_IN_USE),
+							 errmsg("could not drop replication origin with OID %d, in use by PID %d, takeover",
+									state->roident,
+									state->acquired_by)));
+					state->drop_by_worker = true;
+					if (need_lock)
+						LWLockRelease(ReplicationOriginLock);
+
+					/* now release lock again */
+					heap_close(rel, ExclusiveLock);
+					return;
+				}
+
 				if (nowait)
 					ereport(ERROR,
 							(errcode(ERRCODE_OBJECT_IN_USE),
@@ -363,7 +382,8 @@ restart:
 									state->acquired_by)));
 				cv = &state->origin_cv;
 
-				LWLockRelease(ReplicationOriginLock);
+				if (need_lock)
+					LWLockRelease(ReplicationOriginLock);
 				ConditionVariablePrepareToSleep(cv);
 				ConditionVariableSleep(cv, WAIT_EVENT_REPLICATION_ORIGIN_DROP);
 				ConditionVariableCancelSleep();
@@ -384,10 +404,12 @@ restart:
 			state->roident = InvalidRepOriginId;
 			state->remote_lsn = InvalidXLogRecPtr;
 			state->local_lsn = InvalidXLogRecPtr;
+			state->drop_by_worker = false;
 			break;
 		}
 	}
-	LWLockRelease(ReplicationOriginLock);
+	if (need_lock)
+		LWLockRelease(ReplicationOriginLock);
 
 	tuple = SearchSysCache1(REPLORIGIDENT, ObjectIdGetDatum(roident));
 	if (!HeapTupleIsValid(tuple))
@@ -785,6 +807,7 @@ replorigin_redo(XLogReaderState *record)
 						state->roident = InvalidRepOriginId;
 						state->remote_lsn = InvalidXLogRecPtr;
 						state->local_lsn = InvalidXLogRecPtr;
+						state->drop_by_worker = false;
 						break;
 					}
 				}
@@ -987,6 +1010,15 @@ ReplicationOriginExitCleanup(int code, Datum arg)
 		cv = &session_replication_state->origin_cv;
 
 		session_replication_state->acquired_by = 0;
+
+		if (session_replication_state->drop_by_worker)
+		{
+			replorigin_session_origin = InvalidRepOriginId;
+			StartTransactionCommand();
+			replorigin_drop(session_replication_state->roident, false, false, false);
+			CommitTransactionCommand();
+		}
+
 		session_replication_state = NULL;
 	}
 
@@ -1075,6 +1107,7 @@ replorigin_session_setup(RepOriginId node)
 		Assert(session_replication_state->remote_lsn == InvalidXLogRecPtr);
 		Assert(session_replication_state->local_lsn == InvalidXLogRecPtr);
 		session_replication_state->roident = node;
+		session_replication_state->drop_by_worker = false;
 	}
 
 
@@ -1109,6 +1142,7 @@ replorigin_session_reset(void)
 	LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
 
 	session_replication_state->acquired_by = 0;
+	session_replication_state->drop_by_worker = false;
 	cv = &session_replication_state->origin_cv;
 	session_replication_state = NULL;
 
@@ -1205,7 +1239,7 @@ pg_replication_origin_drop(PG_FUNCTION_ARGS)
 	roident = replorigin_by_name(name, false);
 	Assert(OidIsValid(roident));
 
-	replorigin_drop(roident, true);
+	replorigin_drop(roident, true, true, false);
 
 	pfree(name);
 
diff --git a/src/include/replication/origin.h b/src/include/replication/origin.h
index a9595c3..6db5d8b 100644
--- a/src/include/replication/origin.h
+++ b/src/include/replication/origin.h
@@ -41,7 +41,8 @@ extern PGDLLIMPORT TimestampTz replorigin_session_origin_timestamp;
 /* API for querying & manipulating replication origins */
 extern RepOriginId replorigin_by_name(char *name, bool missing_ok);
 extern RepOriginId replorigin_create(char *name);
-extern void replorigin_drop(RepOriginId roident, bool nowait);
+extern void replorigin_drop(RepOriginId roident, bool nowait, bool need_lock,
+							bool takeover);
 extern bool replorigin_by_oid(RepOriginId roident, bool missing_ok,
 				  char **roname);
 
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to