On 2017-04-30 17:59:21 -0700, Andres Freund wrote:
> ISTM, we need a xip_status array in SnapBuild->running.  Then, whenever
> a xl_running_xacts is encountered while builder->running.xcnt > 0, loop
> for i in 0 .. xcnt_space, and end SnapBuildEndTxn() if xip_status[i] but
> the xid is not xl_running_xacts?  Does that make sense?

A hasty implementation, untested besides check-world, of that approach
is attached.  I'm going out for dinner now, will subject it to mean
things afterwards.

Needs more testing, comment and commit message policing.  But I think
the idea is sound?

- Andres
>From 6160eda5177e7f538ff13eacc46acb2f2959c257 Mon Sep 17 00:00:00 2001
From: Andres Freund <and...@anarazel.de>
Date: Sun, 30 Apr 2017 18:24:06 -0700
Subject: [PATCH] Fix initial logical decoding snapshat race condition.

LogStandbySnapshot() has no interlock preventing
RecordTransactionCommit() from logging its commit record (abort
situation is similar), before the xl_running_xacts record is
generated.  That can lead to the situation that logical decoding
forever waits for a commit/abort record, that was logged in the past.

To fix, check for such "missed" transactions when xl_running_xacts are
observed, while in SNAPBUILD_FULL_SNAPSHOT.

This also reverts changes made to GetRunningTransactionData() and
LogStandbySnapshot() by b89e151 as the additional locking does not
solve the problem, and is not required anymore.

Author: Petr Jelined and Andres Freund
Reviewed-By: Andres Freund
Discussion: https://postgr.es/m/f37e975c-908f-858e-707f-058d3b1eb...@2ndquadrant.com
Backpatch: 9.4, where logical decoding was introduced
---
 src/backend/replication/logical/snapbuild.c | 98 +++++++++++++++++++++++------
 src/backend/storage/ipc/procarray.c         |  5 +-
 src/backend/storage/ipc/standby.c           | 19 ------
 3 files changed, 81 insertions(+), 41 deletions(-)

diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c
index 068d214fa1..8605625555 100644
--- a/src/backend/replication/logical/snapbuild.c
+++ b/src/backend/replication/logical/snapbuild.c
@@ -203,6 +203,7 @@ struct SnapBuild
 		size_t		xcnt;		/* number of used xip entries */
 		size_t		xcnt_space; /* allocated size of xip */
 		TransactionId *xip;		/* running xacts array, xidComparator-sorted */
+		bool	   *xip_running; /* xid in ->xip still running? */
 	}			running;
 
 	/*
@@ -253,7 +254,7 @@ static bool ExportInProgress = false;
 static void SnapBuildEndTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid);
 
 /* ->running manipulation */
-static bool SnapBuildTxnIsRunning(SnapBuild *builder, TransactionId xid);
+static bool SnapBuildTxnIsRunning(SnapBuild *builder, TransactionId xid, int *off);
 
 /* ->committed manipulation */
 static void SnapBuildPurgeCommittedTxn(SnapBuild *builder);
@@ -700,7 +701,7 @@ SnapBuildProcessChange(SnapBuild *builder, TransactionId xid, XLogRecPtr lsn)
 	 * we got into the SNAPBUILD_FULL_SNAPSHOT state.
 	 */
 	if (builder->state < SNAPBUILD_CONSISTENT &&
-		SnapBuildTxnIsRunning(builder, xid))
+		SnapBuildTxnIsRunning(builder, xid, NULL))
 		return false;
 
 	/*
@@ -776,7 +777,7 @@ SnapBuildProcessNewCid(SnapBuild *builder, TransactionId xid,
  * only exist after we freshly started from an < CONSISTENT snapshot.
  */
 static bool
-SnapBuildTxnIsRunning(SnapBuild *builder, TransactionId xid)
+SnapBuildTxnIsRunning(SnapBuild *builder, TransactionId xid, int *off)
 {
 	Assert(builder->state < SNAPBUILD_CONSISTENT);
 	Assert(TransactionIdIsNormal(builder->running.xmin));
@@ -793,8 +794,17 @@ SnapBuildTxnIsRunning(SnapBuild *builder, TransactionId xid)
 		if (search != NULL)
 		{
 			Assert(*search == xid);
+
+			if (off)
+			{
+				*off = search - builder->running.xip;
+				Assert(builder->running.xip[*off] == xid);
+			}
+
 			return true;
 		}
+
+
 	}
 
 	return false;
@@ -928,6 +938,8 @@ SnapBuildPurgeCommittedTxn(SnapBuild *builder)
 static void
 SnapBuildEndTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid)
 {
+	int off;
+
 	if (builder->state == SNAPBUILD_CONSISTENT)
 		return;
 
@@ -938,10 +950,13 @@ SnapBuildEndTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid)
 	 * subxids and since they end at the same time it's sufficient to deal
 	 * with them here.
 	 */
-	if (SnapBuildTxnIsRunning(builder, xid))
+	if (SnapBuildTxnIsRunning(builder, xid, &off))
 	{
 		Assert(builder->running.xcnt > 0);
 
+		Assert(builder->running.xip_running[off]);
+		builder->running.xip_running[off] = false;
+
 		if (!--builder->running.xcnt)
 		{
 			/*
@@ -1250,9 +1265,15 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn
 	 *
 	 * a) There were no running transactions when the xl_running_xacts record
 	 *	  was inserted, jump to CONSISTENT immediately. We might find such a
-	 *	  state we were waiting for b) or c).
+	 *	  state while waiting for b) or c).
 	 *
-	 * b) Wait for all toplevel transactions that were running to end. We
+	 * b) This (in a previous run) or another decoding slot serialized a
+	 *	  snapshot to disk that we can use.  Can't use this method for the
+	 *	  initial snapshot when slot is being created and needs full snapshot
+	 *	  for export or direct use, as that snapshot will only contain catalog
+	 *	  modifying transactions.
+	 *
+	 * c) Wait for all toplevel transactions that were running to end. We
 	 *	  simply track the number of in-progress toplevel transactions and
 	 *	  lower it whenever one commits or aborts. When that number
 	 *	  (builder->running.xcnt) reaches zero, we can go from FULL_SNAPSHOT
@@ -1264,11 +1285,13 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn
 	 *	  subtransactions - and by extension suboverflowed xl_running_xacts -
 	 *	  at all.
 	 *
-	 * c) This (in a previous run) or another decoding slot serialized a
-	 *	  snapshot to disk that we can use.  Can't use this method for the
-	 *	  initial snapshot when slot is being created and needs full snapshot
-	 *	  for export or direct use, as that snapshot will only contain catalog
-	 *	  modifying transactions.
+	 *    Unfortunately there's a race condition around LogStandbySnapshot(),
+	 *    where transactions might have logged their commit record, before
+	 *    xl_running_xacts itself is logged. In that case the decoding logic
+	 *    would have missed that fact.  Thus
+	 *
+	 * d) xl_running_xacts shows us that transaction(s) assumed to be still
+	 *    running have actually already finished.  Adjust their status.
 	 * ---
 	 */
 
@@ -1323,16 +1346,15 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn
 
 		return false;
 	}
-	/* c) valid on disk state and not building full snapshot */
+	/* b) valid on disk state and not building full snapshot */
 	else if (!builder->building_full_snapshot &&
 			 SnapBuildRestore(builder, lsn))
 	{
 		/* there won't be any state to cleanup */
 		return false;
 	}
-
 	/*
-	 * b) first encounter of a useable xl_running_xacts record. If we had
+	 * c) first encounter of a useable xl_running_xacts record. If we had
 	 * found one earlier we would either track running transactions (i.e.
 	 * builder->running.xcnt != 0) or be consistent (this function wouldn't
 	 * get called).
@@ -1367,6 +1389,11 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn
 							   builder->running.xcnt * sizeof(TransactionId));
 		memcpy(builder->running.xip, running->xids,
 			   builder->running.xcnt * sizeof(TransactionId));
+		builder->running.xip_running =
+			MemoryContextAlloc(builder->context,
+							   builder->running.xcnt * sizeof(bool));
+		memset(builder->running.xip_running, 1,
+			   builder->running.xcnt * sizeof(bool));
 
 		/* sort so we can do a binary search */
 		qsort(builder->running.xip, builder->running.xcnt,
@@ -1414,13 +1441,44 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn
 		/* nothing could have built up so far, so don't perform cleanup */
 		return false;
 	}
+	/* d) already tracking running xids, check whether any were missed */
+	else
+	{
+		size_t xcnt = running->xcnt;
+		TransactionId *xip =
+			MemoryContextAlloc(builder->context, xcnt * sizeof(TransactionId));
+		int i;
 
-	/*
-	 * We already started to track running xacts and need to wait for all
-	 * in-progress ones to finish. We fall through to the normal processing of
-	 * records so incremental cleanup can be performed.
-	 */
-	return true;
+		memcpy(xip, running->xids, xcnt * sizeof(TransactionId));
+
+		/* sort so we can do a binary search */
+		qsort(xip, xcnt, sizeof(TransactionId), xidComparator);
+
+		/*
+		 * Mark all transactions as finished that we assumed were running, but
+		 * actually aren't according to the xl_running_xacts record.
+		 */
+		for (i = 0; i < builder->running.xcnt_space; i++)
+		{
+			TransactionId still_running = builder->running.xip[i];
+			void *test;
+
+			if (!builder->running.xip_running[i])
+				continue;
+
+			test = bsearch(&still_running, xip, xcnt,
+						   sizeof(TransactionId), xidComparator);
+			if (!test)
+				SnapBuildEndTxn(builder, lsn, still_running);
+		}
+
+		/*
+		 * We already started to track running xacts and need to wait for all
+		 * in-progress ones to finish. We fall through to the normal processing of
+		 * records so incremental cleanup can be performed.
+		 */
+		return true;
+	}
 }
 
 
diff --git a/src/backend/storage/ipc/procarray.c b/src/backend/storage/ipc/procarray.c
index 8a71536791..de3ae92dd7 100644
--- a/src/backend/storage/ipc/procarray.c
+++ b/src/backend/storage/ipc/procarray.c
@@ -2060,12 +2060,13 @@ GetRunningTransactionData(void)
 	CurrentRunningXacts->oldestRunningXid = oldestRunningXid;
 	CurrentRunningXacts->latestCompletedXid = latestCompletedXid;
 
+	/* We don't release XidGenLock here, the caller is responsible for that */
+	LWLockRelease(ProcArrayLock);
+
 	Assert(TransactionIdIsValid(CurrentRunningXacts->nextXid));
 	Assert(TransactionIdIsValid(CurrentRunningXacts->oldestRunningXid));
 	Assert(TransactionIdIsNormal(CurrentRunningXacts->latestCompletedXid));
 
-	/* We don't release the locks here, the caller is responsible for that */
-
 	return CurrentRunningXacts;
 }
 
diff --git a/src/backend/storage/ipc/standby.c b/src/backend/storage/ipc/standby.c
index 8e57f933ca..ddb279e274 100644
--- a/src/backend/storage/ipc/standby.c
+++ b/src/backend/storage/ipc/standby.c
@@ -929,27 +929,8 @@ LogStandbySnapshot(void)
 	 */
 	running = GetRunningTransactionData();
 
-	/*
-	 * GetRunningTransactionData() acquired ProcArrayLock, we must release it.
-	 * For Hot Standby this can be done before inserting the WAL record
-	 * because ProcArrayApplyRecoveryInfo() rechecks the commit status using
-	 * the clog. For logical decoding, though, the lock can't be released
-	 * early because the clog might be "in the future" from the POV of the
-	 * historic snapshot. This would allow for situations where we're waiting
-	 * for the end of a transaction listed in the xl_running_xacts record
-	 * which, according to the WAL, has committed before the xl_running_xacts
-	 * record. Fortunately this routine isn't executed frequently, and it's
-	 * only a shared lock.
-	 */
-	if (wal_level < WAL_LEVEL_LOGICAL)
-		LWLockRelease(ProcArrayLock);
-
 	recptr = LogCurrentRunningXacts(running);
 
-	/* Release lock if we kept it longer ... */
-	if (wal_level >= WAL_LEVEL_LOGICAL)
-		LWLockRelease(ProcArrayLock);
-
 	/* GetRunningTransactionData() acquired XidGenLock, we must release it */
 	LWLockRelease(XidGenLock);
 
-- 
2.12.0.264.gd6db3f2165.dirty

-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to