On 2017-04-30 17:59:21 -0700, Andres Freund wrote: > ISTM, we need a xip_status array in SnapBuild->running. Then, whenever > a xl_running_xacts is encountered while builder->running.xcnt > 0, loop > for i in 0 .. xcnt_space, and end SnapBuildEndTxn() if xip_status[i] but > the xid is not xl_running_xacts? Does that make sense?
A hasty implementation, untested besides check-world, of that approach is attached. I'm going out for dinner now, will subject it to mean things afterwards. Needs more testing, comment and commit message policing. But I think the idea is sound? - Andres
>From 6160eda5177e7f538ff13eacc46acb2f2959c257 Mon Sep 17 00:00:00 2001 From: Andres Freund <and...@anarazel.de> Date: Sun, 30 Apr 2017 18:24:06 -0700 Subject: [PATCH] Fix initial logical decoding snapshat race condition. LogStandbySnapshot() has no interlock preventing RecordTransactionCommit() from logging its commit record (abort situation is similar), before the xl_running_xacts record is generated. That can lead to the situation that logical decoding forever waits for a commit/abort record, that was logged in the past. To fix, check for such "missed" transactions when xl_running_xacts are observed, while in SNAPBUILD_FULL_SNAPSHOT. This also reverts changes made to GetRunningTransactionData() and LogStandbySnapshot() by b89e151 as the additional locking does not solve the problem, and is not required anymore. Author: Petr Jelined and Andres Freund Reviewed-By: Andres Freund Discussion: https://postgr.es/m/f37e975c-908f-858e-707f-058d3b1eb...@2ndquadrant.com Backpatch: 9.4, where logical decoding was introduced --- src/backend/replication/logical/snapbuild.c | 98 +++++++++++++++++++++++------ src/backend/storage/ipc/procarray.c | 5 +- src/backend/storage/ipc/standby.c | 19 ------ 3 files changed, 81 insertions(+), 41 deletions(-) diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c index 068d214fa1..8605625555 100644 --- a/src/backend/replication/logical/snapbuild.c +++ b/src/backend/replication/logical/snapbuild.c @@ -203,6 +203,7 @@ struct SnapBuild size_t xcnt; /* number of used xip entries */ size_t xcnt_space; /* allocated size of xip */ TransactionId *xip; /* running xacts array, xidComparator-sorted */ + bool *xip_running; /* xid in ->xip still running? */ } running; /* @@ -253,7 +254,7 @@ static bool ExportInProgress = false; static void SnapBuildEndTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid); /* ->running manipulation */ -static bool SnapBuildTxnIsRunning(SnapBuild *builder, TransactionId xid); +static bool SnapBuildTxnIsRunning(SnapBuild *builder, TransactionId xid, int *off); /* ->committed manipulation */ static void SnapBuildPurgeCommittedTxn(SnapBuild *builder); @@ -700,7 +701,7 @@ SnapBuildProcessChange(SnapBuild *builder, TransactionId xid, XLogRecPtr lsn) * we got into the SNAPBUILD_FULL_SNAPSHOT state. */ if (builder->state < SNAPBUILD_CONSISTENT && - SnapBuildTxnIsRunning(builder, xid)) + SnapBuildTxnIsRunning(builder, xid, NULL)) return false; /* @@ -776,7 +777,7 @@ SnapBuildProcessNewCid(SnapBuild *builder, TransactionId xid, * only exist after we freshly started from an < CONSISTENT snapshot. */ static bool -SnapBuildTxnIsRunning(SnapBuild *builder, TransactionId xid) +SnapBuildTxnIsRunning(SnapBuild *builder, TransactionId xid, int *off) { Assert(builder->state < SNAPBUILD_CONSISTENT); Assert(TransactionIdIsNormal(builder->running.xmin)); @@ -793,8 +794,17 @@ SnapBuildTxnIsRunning(SnapBuild *builder, TransactionId xid) if (search != NULL) { Assert(*search == xid); + + if (off) + { + *off = search - builder->running.xip; + Assert(builder->running.xip[*off] == xid); + } + return true; } + + } return false; @@ -928,6 +938,8 @@ SnapBuildPurgeCommittedTxn(SnapBuild *builder) static void SnapBuildEndTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid) { + int off; + if (builder->state == SNAPBUILD_CONSISTENT) return; @@ -938,10 +950,13 @@ SnapBuildEndTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid) * subxids and since they end at the same time it's sufficient to deal * with them here. */ - if (SnapBuildTxnIsRunning(builder, xid)) + if (SnapBuildTxnIsRunning(builder, xid, &off)) { Assert(builder->running.xcnt > 0); + Assert(builder->running.xip_running[off]); + builder->running.xip_running[off] = false; + if (!--builder->running.xcnt) { /* @@ -1250,9 +1265,15 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn * * a) There were no running transactions when the xl_running_xacts record * was inserted, jump to CONSISTENT immediately. We might find such a - * state we were waiting for b) or c). + * state while waiting for b) or c). * - * b) Wait for all toplevel transactions that were running to end. We + * b) This (in a previous run) or another decoding slot serialized a + * snapshot to disk that we can use. Can't use this method for the + * initial snapshot when slot is being created and needs full snapshot + * for export or direct use, as that snapshot will only contain catalog + * modifying transactions. + * + * c) Wait for all toplevel transactions that were running to end. We * simply track the number of in-progress toplevel transactions and * lower it whenever one commits or aborts. When that number * (builder->running.xcnt) reaches zero, we can go from FULL_SNAPSHOT @@ -1264,11 +1285,13 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn * subtransactions - and by extension suboverflowed xl_running_xacts - * at all. * - * c) This (in a previous run) or another decoding slot serialized a - * snapshot to disk that we can use. Can't use this method for the - * initial snapshot when slot is being created and needs full snapshot - * for export or direct use, as that snapshot will only contain catalog - * modifying transactions. + * Unfortunately there's a race condition around LogStandbySnapshot(), + * where transactions might have logged their commit record, before + * xl_running_xacts itself is logged. In that case the decoding logic + * would have missed that fact. Thus + * + * d) xl_running_xacts shows us that transaction(s) assumed to be still + * running have actually already finished. Adjust their status. * --- */ @@ -1323,16 +1346,15 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn return false; } - /* c) valid on disk state and not building full snapshot */ + /* b) valid on disk state and not building full snapshot */ else if (!builder->building_full_snapshot && SnapBuildRestore(builder, lsn)) { /* there won't be any state to cleanup */ return false; } - /* - * b) first encounter of a useable xl_running_xacts record. If we had + * c) first encounter of a useable xl_running_xacts record. If we had * found one earlier we would either track running transactions (i.e. * builder->running.xcnt != 0) or be consistent (this function wouldn't * get called). @@ -1367,6 +1389,11 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn builder->running.xcnt * sizeof(TransactionId)); memcpy(builder->running.xip, running->xids, builder->running.xcnt * sizeof(TransactionId)); + builder->running.xip_running = + MemoryContextAlloc(builder->context, + builder->running.xcnt * sizeof(bool)); + memset(builder->running.xip_running, 1, + builder->running.xcnt * sizeof(bool)); /* sort so we can do a binary search */ qsort(builder->running.xip, builder->running.xcnt, @@ -1414,13 +1441,44 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn /* nothing could have built up so far, so don't perform cleanup */ return false; } + /* d) already tracking running xids, check whether any were missed */ + else + { + size_t xcnt = running->xcnt; + TransactionId *xip = + MemoryContextAlloc(builder->context, xcnt * sizeof(TransactionId)); + int i; - /* - * We already started to track running xacts and need to wait for all - * in-progress ones to finish. We fall through to the normal processing of - * records so incremental cleanup can be performed. - */ - return true; + memcpy(xip, running->xids, xcnt * sizeof(TransactionId)); + + /* sort so we can do a binary search */ + qsort(xip, xcnt, sizeof(TransactionId), xidComparator); + + /* + * Mark all transactions as finished that we assumed were running, but + * actually aren't according to the xl_running_xacts record. + */ + for (i = 0; i < builder->running.xcnt_space; i++) + { + TransactionId still_running = builder->running.xip[i]; + void *test; + + if (!builder->running.xip_running[i]) + continue; + + test = bsearch(&still_running, xip, xcnt, + sizeof(TransactionId), xidComparator); + if (!test) + SnapBuildEndTxn(builder, lsn, still_running); + } + + /* + * We already started to track running xacts and need to wait for all + * in-progress ones to finish. We fall through to the normal processing of + * records so incremental cleanup can be performed. + */ + return true; + } } diff --git a/src/backend/storage/ipc/procarray.c b/src/backend/storage/ipc/procarray.c index 8a71536791..de3ae92dd7 100644 --- a/src/backend/storage/ipc/procarray.c +++ b/src/backend/storage/ipc/procarray.c @@ -2060,12 +2060,13 @@ GetRunningTransactionData(void) CurrentRunningXacts->oldestRunningXid = oldestRunningXid; CurrentRunningXacts->latestCompletedXid = latestCompletedXid; + /* We don't release XidGenLock here, the caller is responsible for that */ + LWLockRelease(ProcArrayLock); + Assert(TransactionIdIsValid(CurrentRunningXacts->nextXid)); Assert(TransactionIdIsValid(CurrentRunningXacts->oldestRunningXid)); Assert(TransactionIdIsNormal(CurrentRunningXacts->latestCompletedXid)); - /* We don't release the locks here, the caller is responsible for that */ - return CurrentRunningXacts; } diff --git a/src/backend/storage/ipc/standby.c b/src/backend/storage/ipc/standby.c index 8e57f933ca..ddb279e274 100644 --- a/src/backend/storage/ipc/standby.c +++ b/src/backend/storage/ipc/standby.c @@ -929,27 +929,8 @@ LogStandbySnapshot(void) */ running = GetRunningTransactionData(); - /* - * GetRunningTransactionData() acquired ProcArrayLock, we must release it. - * For Hot Standby this can be done before inserting the WAL record - * because ProcArrayApplyRecoveryInfo() rechecks the commit status using - * the clog. For logical decoding, though, the lock can't be released - * early because the clog might be "in the future" from the POV of the - * historic snapshot. This would allow for situations where we're waiting - * for the end of a transaction listed in the xl_running_xacts record - * which, according to the WAL, has committed before the xl_running_xacts - * record. Fortunately this routine isn't executed frequently, and it's - * only a shared lock. - */ - if (wal_level < WAL_LEVEL_LOGICAL) - LWLockRelease(ProcArrayLock); - recptr = LogCurrentRunningXacts(running); - /* Release lock if we kept it longer ... */ - if (wal_level >= WAL_LEVEL_LOGICAL) - LWLockRelease(ProcArrayLock); - /* GetRunningTransactionData() acquired XidGenLock, we must release it */ LWLockRelease(XidGenLock); -- 2.12.0.264.gd6db3f2165.dirty
-- Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org) To make changes to your subscription: http://www.postgresql.org/mailpref/pgsql-hackers