On 12/18/21 07:00, Tomas Vondra wrote:
On 12/18/21 05:52, Tom Lane wrote:
Tomas Vondra <tomas.von...@enterprisedb.com> writes:
The problem is exactly the same as in [1] - the aborted transaction
generated WAL, but RecordTransactionAbort() ignores that and does not
update LogwrtResult.Write, with the reasoning that aborted transactions
do not matter. But sequences violate that, because we only write WAL
once every 32 increments, so the following nextval() gets "committed"
without waiting for the replica (because it did not produce WAL).
Ugh.
I'm not sure this is a clear data corruption bug, but it surely walks
and quacks like one. My proposal is to fix this by tracking the lsn of
the last LSN for a sequence increment, and then check that LSN in
RecordTransactionCommit() before calling XLogFlush().
(1) Does that work if the aborted increment was in a different
session? I think it is okay but I'm tired enough to not be sure.
Good point - it doesn't :-( At least not by simply storing LSN in a
global variable or something like that.
The second backend needs to know the LSN of the last WAL-logged sequence
increment, but only the first backend knows that. So we'd need to share
that between backends somehow. I doubt we want to track LSN for every
individual sequence (because for clusters with many dbs / sequences that
may be a lot).
Perhaps we could track just a fixed number o LSN values in shared memory
(say, 1024), and update/read just the element determined by hash(oid).
That is, the backend WAL-logging sequence with given oid would set the
current LSN to array[hash(oid) % 1024], and backend doing nextval()
would simply remember the LSN in that slot. Yes, if there are conflicts
that'll flush more than needed.
Here's a PoC demonstrating this idea. I'm not convinced it's the right
way to deal with this - it surely seems more like a duct tape fix than a
clean solution. But it does the trick.
I wonder if storing this in shmem is good enough - we lose the LSN info
on restart, but the checkpoint should trigger FPI which makes it OK.
A bigger question is whether sequences are the only thing affected by
this. If you look at RecordTransactionCommit() then we skip flush/wait
in two cases:
1) !wrote_xlog - if the xact did not produce WAL
2) !markXidCommitted - if the xact does not have a valid XID
Both apply to sequences, and the PoC patch tweaks them. But maybe there
are other places where we don't generate WAL and/or assign XID in some
cases, to save time?
regards
--
Tomas Vondra
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index e7b0bc804d..c6a0c07846 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -35,6 +35,7 @@
#include "catalog/pg_enum.h"
#include "catalog/storage.h"
#include "commands/async.h"
+#include "commands/sequence.h"
#include "commands/tablecmds.h"
#include "commands/trigger.h"
#include "common/pg_prng.h"
@@ -1289,6 +1290,13 @@ RecordTransactionCommit(void)
bool RelcacheInitFileInval = false;
bool wrote_xlog;
+ /*
+ * Force flush and synchronous replication even if no XID was assigned.
+ * This is needed when depending on WAL produced by another transaction
+ * (possibly in a different backend). Sequences need this, for example.
+ */
+ bool force_sync = false;
+
/*
* Log pending invalidations for logical decoding of in-progress
* transactions. Normally for DDLs, we log this at each command end,
@@ -1299,6 +1307,24 @@ RecordTransactionCommit(void)
if (XLogLogicalInfoActive())
LogLogicalInvalidations();
+ /*
+ * Check the LSN of increments for sequences we touched in this transaction.
+ * If it's higher than XactLastRecEnd, we need to force flush/sync.
+ */
+ {
+ /* Separate call, so that we can compare to XactLastRecEnd. */
+ XLogRecPtr tmpptr = SequenceGetLastLSN();
+
+ /*
+ * If higher than XactLastRecEnd, make sure we flush even without
+ * a XID assigned.
+ */
+ force_sync = (tmpptr > XactLastRecEnd);
+
+ /* Override the XactLastRecEnd value. */
+ XactLastRecEnd = Max(XactLastRecEnd, tmpptr);
+ }
+
/* Get data needed for commit record */
nrels = smgrGetPendingDeletes(true, &rels);
nchildren = xactGetCommittedChildren(&children);
@@ -1446,7 +1472,7 @@ RecordTransactionCommit(void)
* if all to-be-deleted tables are temporary though, since they are lost
* anyway if we crash.)
*/
- if ((wrote_xlog && markXidCommitted &&
+ if ((wrote_xlog && (markXidCommitted || force_sync) &&
synchronous_commit > SYNCHRONOUS_COMMIT_OFF) ||
forceSyncCommit || nrels > 0)
{
@@ -1504,7 +1530,7 @@ RecordTransactionCommit(void)
* Note that at this stage we have marked clog, but still show as running
* in the procarray and continue to hold locks.
*/
- if (wrote_xlog && markXidCommitted)
+ if (wrote_xlog && (markXidCommitted || force_sync))
SyncRepWaitForLSN(XactLastRecEnd, true);
/* remember end of last commit record */
@@ -1512,6 +1538,8 @@ RecordTransactionCommit(void)
/* Reset XactLastRecEnd until the next transaction writes something */
XactLastRecEnd = 0;
+ SequenceResetLastLSN();
+
cleanup:
/* Clean up local data */
if (rels)
diff --git a/src/backend/commands/sequence.c b/src/backend/commands/sequence.c
index 72bfdc07a4..43fe19831e 100644
--- a/src/backend/commands/sequence.c
+++ b/src/backend/commands/sequence.c
@@ -109,6 +109,19 @@ static void init_params(ParseState *pstate, List *options, bool for_identity,
static void do_setval(Oid relid, int64 next, bool iscalled);
static void process_owned_by(Relation seqrel, List *owned_by, bool for_identity);
+/* handle flush/sync for sequences */
+#define SEQUENCE_LSN_COUNT 1024
+
+typedef struct SequenceLsnState {
+ slock_t lock;
+ XLogRecPtr lsn[SEQUENCE_LSN_COUNT];
+} SequenceLsnState;
+
+static SequenceLsnState *SequenceShmemInfo = NULL;
+static XLogRecPtr lastSequenceLSN = InvalidXLogRecPtr;
+
+static void SequenceSetLSN(Oid relid, XLogRecPtr recptr);
+static XLogRecPtr SequenceGetLSN(Oid relid);
/*
* DefineSequence
@@ -406,7 +419,12 @@ fill_seq_with_data(Relation rel, HeapTuple tuple)
recptr = XLogInsert(RM_SEQ_ID, XLOG_SEQ_LOG);
PageSetLSN(page, recptr);
+
+ SequenceSetLSN(rel->rd_id, recptr);
+ lastSequenceLSN = recptr;
}
+ else
+ lastSequenceLSN = SequenceGetLSN(rel->rd_id);
END_CRIT_SECTION();
@@ -810,7 +828,12 @@ nextval_internal(Oid relid, bool check_permissions)
recptr = XLogInsert(RM_SEQ_ID, XLOG_SEQ_LOG);
PageSetLSN(page, recptr);
+
+ SequenceSetLSN(relid, recptr);
+ lastSequenceLSN = recptr;
}
+ else
+ lastSequenceLSN = SequenceGetLSN(relid);
/* Now update sequence tuple to the intended final state */
seq->last_value = last; /* last fetched number */
@@ -1005,7 +1028,12 @@ do_setval(Oid relid, int64 next, bool iscalled)
recptr = XLogInsert(RM_SEQ_ID, XLOG_SEQ_LOG);
PageSetLSN(page, recptr);
+
+ SequenceSetLSN(relid, recptr);
+ lastSequenceLSN = recptr;
}
+ else
+ lastSequenceLSN = SequenceGetLSN(relid);
END_CRIT_SECTION();
@@ -1927,3 +1955,98 @@ seq_mask(char *page, BlockNumber blkno)
mask_unused_space(page);
}
+
+
+/*
+ * Initialization of shared memory for sequence LSN positions.
+ *
+ * To track the last LSN for each sequence we use a fixed-length array, and use
+ * OID of the sequence to select the element. This allows us to handle unknown
+ * number of sequences in a limited amount of space, but it may force us to
+ * sync a bit too much if two sequences conflict (pick the same array element).
+ * But that seems like acceptable tradeoff.
+ */
+Size
+SequenceShmemSize(void)
+{
+ return sizeof(SequenceLsnState);
+}
+
+/*
+ * Initialize the LSN array (set it to InvalidXLogRecPtr).
+ */
+void
+SequenceShmemInit(void)
+{
+ bool found;
+ SequenceShmemInfo
+ = (SequenceLsnState *) ShmemInitStruct("Sequences Shmem", SequenceShmemSize(), &found);
+
+ if (!found)
+ {
+ /* First time through, so initialize */
+ SpinLockInit(&(SequenceShmemInfo->lock));
+ MemSet(SequenceShmemInfo->lsn, 0, SEQUENCE_LSN_COUNT * sizeof(XLogRecPtr));
+ }
+}
+
+/*
+ * Update LSN tracked for a sequence with a given OID.
+ *
+ * XXX Maybe hash the OID.
+ */
+static void
+SequenceSetLSN(Oid relid, XLogRecPtr recptr)
+{
+ int idx = (relid % SEQUENCE_LSN_COUNT);
+
+ Assert(SequenceShmemInfo);
+
+ SpinLockAcquire(&(SequenceShmemInfo->lock));
+ SequenceShmemInfo->lsn[idx] = Max(SequenceShmemInfo->lsn[idx], recptr);
+ SpinLockRelease(&(SequenceShmemInfo->lock));
+}
+
+/*
+ * Get LSN tracked for a sequence with a given OID.
+ *
+ * XXX What if the LSN is 0/0? Presumably that means we don't have any value
+ * (e.g. after a restart). Maybe we should use current insert LSN in that case?
+ * Although, shutdown forces checkpoint, and we write FPI or something?
+ */
+static XLogRecPtr
+SequenceGetLSN(Oid relid)
+{
+ int idx = (relid % SEQUENCE_LSN_COUNT);
+ XLogRecPtr lsn;
+
+ Assert(SequenceShmemInfo);
+
+ SpinLockAcquire(&(SequenceShmemInfo->lock));
+ lsn = SequenceShmemInfo->lsn[idx];
+ SpinLockRelease(&(SequenceShmemInfo->lock));
+
+ return lsn;
+}
+
+/*
+ * Return the higher LSN for any sequence we touched in this session (either
+ * the WAL we wrote, or WAL produced by other transaction we depend on).
+ */
+XLogRecPtr
+SequenceGetLastLSN(void)
+{
+ return lastSequenceLSN;
+}
+
+/*
+ * Reset the last LSN after we flush.
+ *
+ * XXX Not sure we actually need to do this. Maybe we don't need tp reset and
+ * keeping the value forever is fine.
+ */
+void
+SequenceResetLastLSN(void)
+{
+ lastSequenceLSN = InvalidXLogRecPtr;
+}
diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c
index 9fa3e0631e..517cca450b 100644
--- a/src/backend/storage/ipc/ipci.c
+++ b/src/backend/storage/ipc/ipci.c
@@ -23,6 +23,7 @@
#include "access/syncscan.h"
#include "access/twophase.h"
#include "commands/async.h"
+#include "commands/sequence.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "postmaster/autovacuum.h"
@@ -143,6 +144,7 @@ CalculateShmemSize(int *num_semaphores)
size = add_size(size, BTreeShmemSize());
size = add_size(size, SyncScanShmemSize());
size = add_size(size, AsyncShmemSize());
+ size = add_size(size, SequenceShmemSize());
#ifdef EXEC_BACKEND
size = add_size(size, ShmemBackendArraySize());
#endif
@@ -293,6 +295,7 @@ CreateSharedMemoryAndSemaphores(void)
BTreeShmemInit();
SyncScanShmemInit();
AsyncShmemInit();
+ SequenceShmemInit();
#ifdef EXEC_BACKEND
diff --git a/src/include/commands/sequence.h b/src/include/commands/sequence.h
index 40544dd4c7..85fbc3789d 100644
--- a/src/include/commands/sequence.h
+++ b/src/include/commands/sequence.h
@@ -66,4 +66,10 @@ extern void seq_desc(StringInfo buf, XLogReaderState *rptr);
extern const char *seq_identify(uint8 info);
extern void seq_mask(char *pagedata, BlockNumber blkno);
+
+extern Size SequenceShmemSize(void);
+extern void SequenceShmemInit(void);
+extern XLogRecPtr SequenceGetLastLSN(void);
+extern void SequenceResetLastLSN(void);
+
#endif /* SEQUENCE_H */