On 2016-04-15 16:53:37 -0400, Tom Lane wrote:
> Andres Freund <and...@anarazel.de> writes:
> > On 2016-04-15 15:26:17 -0400, Tom Lane wrote:
> >> I think the bottom line is that we misdesigned the WAL representation
> >> by assuming that this sort of info could always be piggybacked on a
> >> transaction commit record.  It's time to fix that.
>
> > I think we got to piggyback it onto a commit record, as long as there's
> > one.
>
> No objection to that part.  What I'm saying is that when there isn't one,
> the answer is a new record type, not forcing xid assignment.  It might
> look almost like a commit record, but it shouldn't imply that there
> was a transaction.

Here's a patch doing so.  Note that, after putting the record into RM_XACT_ID
first, I've decided to make it a RM_STANDBY_ID type record. I think it's
likely that this is going to be needed beyond transaction commits, and
it generally seems to fit better into standby.c; even if it's a bit
awkward that commit records contain similar data. To avoid duplicating
the *desc.c code, I've exposed standby_desc_invalidations() to also be
used by xactdesc.c.

It fixes the problem at hand, but essentially it's just luck that the
patch is sufficient. The first layer of the issue is that queued
invalidation messages aren't sent; but for vm_extend() there's another
side to it. Namely vm_extend() does

        /*
         * Send a shared-inval message to force other backends to close any smgr
         * references they may have for this rel, which we are about to change.
         * This is a useful optimization because it means that backends don't 
have
         * to keep checking for creation or extension of the file, which happens
         * infrequently.
         */
        CacheInvalidateSmgr(rel->rd_smgr->smgr_rnode);

but CacheInvalidateSmgr is non-transactional as it's comment explains:
 *
 * Note: because these messages are nontransactional, they won't be captured
 * in commit/abort WAL entries.  Instead, calls to CacheInvalidateSmgr()
 * should happen in low-level smgr.c routines, which are executed while
 * replaying WAL as well as when creating it.
 *

as far as I can see vm_extend() is the only current caller forgetting
that rule. I don't think it's safe to use CacheInvalidateSmgr() outside
smgr.c.

The reason this all ends up working nonetheless is that the
heap_inplace_update()s in vacuum triggers a CacheInvalidateHeapTuple()
which queues a relcache invalidation, which in turn does a
RelationCloseSmgr() in RelationClearRelation(). Thereby effectively
doing a transactional CacheInvalidateSmgr().  But that seems more than
fragile.

ISTM we should additionally replace the CacheInvalidateSmgr() with a
CacheInvalidateRelcache() and document that that implies an smgr
invalidation.  Alternatively we could log smgr (and relmapper)
invalidations as well, but that's not quite non-invasive either; but
might be a good long-term idea to keep things simpler.

Comments?

- Andres
>From b2295700fb9e1d01b03bdebca3c71692941144c9 Mon Sep 17 00:00:00 2001
From: Andres Freund <and...@anarazel.de>
Date: Sat, 23 Apr 2016 19:18:00 -0700
Subject: [PATCH] Emit invalidations to standby for transactions without xid.
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

So far, when a transaction with pending invalidations, but without an
assigned xid, committed, we simply ignored those invalidation
messages. That's problematic, because those are actually sent for a
reason.

Known symptoms of this include that existing sessions on a hot-standby
replica sometimes fail to notice new concurrently built indexes and
visibility map updates.

The solution is to WAL log such invalidations in transactions without an
xid. We considered to alternatively force-assign an xid, but that'd be
problematic for vacuum, which might be run in systems with few xids.

Important: This adds a new WAL record, but as the patch has to be
back-patched, we can't bump the WAL page magic. This means that standbys
have to be updated before primaries; otherwise
"PANIC: standby_redo: unknown op code 32" errors can be encountered.

XXX:

Reported-By: Васильев Дмитрий, Masahiko Sawada
Discussion:
    CAB-SwXY6oH=9twbkxjtgr4uc1nqt-vpyatxcseme62adwyk...@mail.gmail.com
    CAD21AoDpZ6Xjg=gFrGPnSn4oTRRcwK1EBrWCq9OqOHuAcMMC=w...@mail.gmail.com
---
 src/backend/access/rmgrdesc/standbydesc.c       | 54 +++++++++++++++++++++++++
 src/backend/access/rmgrdesc/xactdesc.c          | 30 ++------------
 src/backend/access/transam/xact.c               | 18 +++++++++
 src/backend/replication/logical/decode.c        |  9 +++++
 src/backend/replication/logical/reorderbuffer.c | 53 +++++++++++++++---------
 src/backend/storage/ipc/standby.c               | 35 ++++++++++++++++
 src/backend/utils/cache/inval.c                 |  5 ++-
 src/include/replication/reorderbuffer.h         |  2 +
 src/include/storage/standby.h                   |  2 +
 src/include/storage/standbydefs.h               | 21 ++++++++++
 10 files changed, 181 insertions(+), 48 deletions(-)

diff --git a/src/backend/access/rmgrdesc/standbydesc.c b/src/backend/access/rmgrdesc/standbydesc.c
index 4872cfb..e6172cc 100644
--- a/src/backend/access/rmgrdesc/standbydesc.c
+++ b/src/backend/access/rmgrdesc/standbydesc.c
@@ -58,6 +58,14 @@ standby_desc(StringInfo buf, XLogReaderState *record)
 
 		standby_desc_running_xacts(buf, xlrec);
 	}
+	else if (info == XLOG_INVALIDATIONS)
+	{
+		xl_invalidations *xlrec = (xl_invalidations *) rec;
+
+		standby_desc_invalidations(buf, xlrec->nmsgs, xlrec->msgs,
+								   xlrec->dbId, xlrec->tsId,
+								   xlrec->relcacheInitFileInval);
+	}
 }
 
 const char *
@@ -73,7 +81,53 @@ standby_identify(uint8 info)
 		case XLOG_RUNNING_XACTS:
 			id = "RUNNING_XACTS";
 			break;
+		case XLOG_INVALIDATIONS:
+			id = "INVALIDATIONS";
+			break;
 	}
 
 	return id;
 }
+
+/*
+ * This routine is used by both standby_desc and xact_desc, because
+ * transaction commits and XLOG_INVALIDATIONS messages contain invalidations;
+ * it seems pointless to duplicate the code.
+ */
+void
+standby_desc_invalidations(StringInfo buf,
+						   int nmsgs, SharedInvalidationMessage *msgs,
+						   Oid dbId, Oid tsId,
+						   bool relcacheInitFileInval)
+{
+	int i;
+
+	if (relcacheInitFileInval)
+		appendStringInfo(buf, "; relcache init file inval dbid %u tsid %u",
+						 dbId, tsId);
+
+	appendStringInfoString(buf, "; inval msgs:");
+	for (i = 0; i < nmsgs; i++)
+	{
+		SharedInvalidationMessage *msg = &msgs[i];
+
+		if (msg->id >= 0)
+			appendStringInfo(buf, " catcache %d", msg->id);
+		else if (msg->id == SHAREDINVALCATALOG_ID)
+			appendStringInfo(buf, " catalog %u", msg->cat.catId);
+		else if (msg->id == SHAREDINVALRELCACHE_ID)
+			appendStringInfo(buf, " relcache %u", msg->rc.relId);
+		/* not expected, but print something anyway */
+		else if (msg->id == SHAREDINVALSMGR_ID)
+			appendStringInfoString(buf, " smgr");
+		/* not expected, but print something anyway */
+		else if (msg->id == SHAREDINVALRELMAP_ID)
+			appendStringInfoString(buf, " relmap");
+		else if (msg->id == SHAREDINVALRELMAP_ID)
+			appendStringInfo(buf, " relmap db %u", msg->rm.dbId);
+		else if (msg->id == SHAREDINVALSNAPSHOT_ID)
+			appendStringInfo(buf, " snapshot %u", msg->sn.relId);
+		else
+			appendStringInfo(buf, " unknown id %d", msg->id);
+	}
+}
diff --git a/src/backend/access/rmgrdesc/xactdesc.c b/src/backend/access/rmgrdesc/xactdesc.c
index e8a334c..6f07c5c 100644
--- a/src/backend/access/rmgrdesc/xactdesc.c
+++ b/src/backend/access/rmgrdesc/xactdesc.c
@@ -18,6 +18,7 @@
 #include "access/xact.h"
 #include "catalog/catalog.h"
 #include "storage/sinval.h"
+#include "storage/standbydefs.h"
 #include "utils/timestamp.h"
 
 /*
@@ -203,32 +204,9 @@ xact_desc_commit(StringInfo buf, uint8 info, xl_xact_commit *xlrec, RepOriginId
 	}
 	if (parsed.nmsgs > 0)
 	{
-		if (XactCompletionRelcacheInitFileInval(parsed.xinfo))
-			appendStringInfo(buf, "; relcache init file inval dbid %u tsid %u",
-							 parsed.dbId, parsed.tsId);
-
-		appendStringInfoString(buf, "; inval msgs:");
-		for (i = 0; i < parsed.nmsgs; i++)
-		{
-			SharedInvalidationMessage *msg = &parsed.msgs[i];
-
-			if (msg->id >= 0)
-				appendStringInfo(buf, " catcache %d", msg->id);
-			else if (msg->id == SHAREDINVALCATALOG_ID)
-				appendStringInfo(buf, " catalog %u", msg->cat.catId);
-			else if (msg->id == SHAREDINVALRELCACHE_ID)
-				appendStringInfo(buf, " relcache %u", msg->rc.relId);
-			/* not expected, but print something anyway */
-			else if (msg->id == SHAREDINVALSMGR_ID)
-				appendStringInfoString(buf, " smgr");
-			/* not expected, but print something anyway */
-			else if (msg->id == SHAREDINVALRELMAP_ID)
-				appendStringInfoString(buf, " relmap");
-			else if (msg->id == SHAREDINVALSNAPSHOT_ID)
-				appendStringInfo(buf, " snapshot %u", msg->sn.relId);
-			else
-				appendStringInfo(buf, " unknown id %d", msg->id);
-		}
+		standby_desc_invalidations(
+			buf, parsed.nmsgs, parsed.msgs, parsed.dbId, parsed.tsId,
+			XactCompletionRelcacheInitFileInval(parsed.xinfo));
 	}
 
 	if (XactCompletionForceSyncCommit(parsed.xinfo))
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index 7e37331..95690ff 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -1164,6 +1164,24 @@ RecordTransactionCommit(void)
 		Assert(nchildren == 0);
 
 		/*
+		 * Transactions without an assigned xid can contain invalidation
+		 * messages (e.g. explicit relcache invalidations or catcache
+		 * invalidations for inplace updates); standbys need to process
+		 * those. We can't emit a commit record without an xid, and we don't
+		 * want to force assigning an xid, because that'd be problematic for
+		 * e.g. vacuum.  Hence we emit a bespoke record for the
+		 * invalidations. We don't want to use that in case a commit record is
+		 * emitted, so they happen synchronously with commits (besides not
+		 * wanting to emit more WAL recoreds).
+		 */
+		if (nmsgs != 0)
+		{
+			LogStandbyInvalidations(nmsgs, invalMessages,
+									RelcacheInitFileInval);
+			wrote_xlog = true; /* not strictly necessary */
+		}
+
+		/*
 		 * If we didn't create XLOG entries, we're done here; otherwise we
 		 * should trigger flushing those entries the same as a commit record
 		 * would.  This will primarily happen for HOT pruning and the like; we
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index 0cdb0b8..0c248f0 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -327,6 +327,15 @@ DecodeStandbyOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 			break;
 		case XLOG_STANDBY_LOCK:
 			break;
+		case XLOG_INVALIDATIONS:
+			{
+				xl_invalidations *invalidations =
+					(xl_invalidations *) XLogRecGetData(r);
+
+				ReorderBufferImmediateInvalidation(
+					ctx->reorder, invalidations->nmsgs, invalidations->msgs);
+			}
+			break;
 		default:
 			elog(ERROR, "unexpected RM_STANDBY_ID record type: %u", info);
 	}
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 4520708..57821c3 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -1810,26 +1810,8 @@ ReorderBufferForget(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
 	 * catalog and we need to update the caches according to that.
 	 */
 	if (txn->base_snapshot != NULL && txn->ninvalidations > 0)
-	{
-		bool		use_subtxn = IsTransactionOrTransactionBlock();
-
-		if (use_subtxn)
-			BeginInternalSubTransaction("replay");
-
-		/*
-		 * Force invalidations to happen outside of a valid transaction - that
-		 * way entries will just be marked as invalid without accessing the
-		 * catalog. That's advantageous because we don't need to setup the
-		 * full state necessary for catalog access.
-		 */
-		if (use_subtxn)
-			AbortCurrentTransaction();
-
-		ReorderBufferExecuteInvalidations(rb, txn);
-
-		if (use_subtxn)
-			RollbackAndReleaseCurrentSubTransaction();
-	}
+		ReorderBufferImmediateInvalidation(rb, txn->ninvalidations,
+										   txn->invalidations);
 	else
 		Assert(txn->ninvalidations == 0);
 
@@ -1837,6 +1819,37 @@ ReorderBufferForget(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
 	ReorderBufferCleanupTXN(rb, txn);
 }
 
+/*
+ * Execute invalidations happening outside the context of a decoded
+ * transaction. That currently happens either for xid-less commits
+ * (c.f. RecordTransactionCommit()) or for invalidations in uninteresting
+ * transactions (via ReorderBufferForget()).
+ */
+void
+ReorderBufferImmediateInvalidation(ReorderBuffer *rb, uint32 ninvalidations,
+								   SharedInvalidationMessage *invalidations)
+{
+	bool		use_subtxn = IsTransactionOrTransactionBlock();
+	int			i;
+
+	if (use_subtxn)
+		BeginInternalSubTransaction("replay");
+
+	/*
+	 * Force invalidations to happen outside of a valid transaction - that
+	 * way entries will just be marked as invalid without accessing the
+	 * catalog. That's advantageous because we don't need to setup the
+	 * full state necessary for catalog access.
+	 */
+	if (use_subtxn)
+		AbortCurrentTransaction();
+
+	for (i = 0; i < ninvalidations; i++)
+		LocalExecuteInvalidationMessage(&invalidations[i]);
+
+	if (use_subtxn)
+		RollbackAndReleaseCurrentSubTransaction();
+}
 
 /*
  * Tell reorderbuffer about an xid seen in the WAL stream. Has to be called at
diff --git a/src/backend/storage/ipc/standby.c b/src/backend/storage/ipc/standby.c
index 6a9bf84..762dfa6 100644
--- a/src/backend/storage/ipc/standby.c
+++ b/src/backend/storage/ipc/standby.c
@@ -825,6 +825,16 @@ standby_redo(XLogReaderState *record)
 
 		ProcArrayApplyRecoveryInfo(&running);
 	}
+	else if (info == XLOG_INVALIDATIONS)
+	{
+		xl_invalidations *xlrec = (xl_invalidations *) XLogRecGetData(record);
+
+		ProcessCommittedInvalidationMessages(xlrec->msgs,
+											 xlrec->nmsgs,
+											 xlrec->relcacheInitFileInval,
+											 xlrec->dbId,
+											 xlrec->tsId);
+	}
 	else
 		elog(PANIC, "standby_redo: unknown op code %u", info);
 }
@@ -1068,3 +1078,28 @@ LogAccessExclusiveLockPrepare(void)
 	 */
 	(void) GetTopTransactionId();
 }
+
+/*
+ * Emit WAL for invalidations. This currently is only used for commits without
+ * an xid but which contain invalidations.
+ */
+void
+LogStandbyInvalidations(int nmsgs, SharedInvalidationMessage *msgs,
+						bool relcacheInitFileInval)
+{
+	xl_invalidations xlrec;
+
+	/* prepare record */
+	memset(&xlrec, 0, sizeof(xlrec));
+	xlrec.dbId = MyDatabaseId;
+	xlrec.tsId = MyDatabaseTableSpace;
+	xlrec.relcacheInitFileInval = relcacheInitFileInval;
+	xlrec.nmsgs = nmsgs;
+
+	/* perform insertion */
+	XLogBeginInsert();
+	XLogRegisterData((char *) (&xlrec), MinSizeOfInvalidations);
+	XLogRegisterData((char *) msgs,
+					 nmsgs * sizeof(SharedInvalidationMessage));
+	XLogInsert(RM_STANDBY_ID, XLOG_INVALIDATIONS);
+}
diff --git a/src/backend/utils/cache/inval.c b/src/backend/utils/cache/inval.c
index 924bebb..5803518 100644
--- a/src/backend/utils/cache/inval.c
+++ b/src/backend/utils/cache/inval.c
@@ -842,8 +842,9 @@ xactGetCommittedInvalidationMessages(SharedInvalidationMessage **msgs,
 }
 
 /*
- * ProcessCommittedInvalidationMessages is executed by xact_redo_commit()
- * to process invalidation messages added to commit records.
+ * ProcessCommittedInvalidationMessages is executed by xact_redo_commit() or
+ * standby_redo() to process invalidation messages. Currently that happens
+ * only at end-of-xact.
  *
  * Relcache init file invalidation requires processing both
  * before and after we send the SI messages. See AtEOXact_Inval()
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index 4c54953..e070894 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -391,6 +391,8 @@ void ReorderBufferAddNewTupleCids(ReorderBuffer *, TransactionId, XLogRecPtr lsn
 						 CommandId cmin, CommandId cmax, CommandId combocid);
 void ReorderBufferAddInvalidations(ReorderBuffer *, TransactionId, XLogRecPtr lsn,
 							  Size nmsgs, SharedInvalidationMessage *msgs);
+void ReorderBufferImmediateInvalidation(ReorderBuffer *, uint32 ninvalidations,
+										SharedInvalidationMessage *invalidations);
 void		ReorderBufferProcessXid(ReorderBuffer *, TransactionId xid, XLogRecPtr lsn);
 void		ReorderBufferXidSetCatalogChanges(ReorderBuffer *, TransactionId xid, XLogRecPtr lsn);
 bool		ReorderBufferXidHasCatalogChanges(ReorderBuffer *, TransactionId xid);
diff --git a/src/include/storage/standby.h b/src/include/storage/standby.h
index aafc9b8..5205884 100644
--- a/src/include/storage/standby.h
+++ b/src/include/storage/standby.h
@@ -85,5 +85,7 @@ extern void LogAccessExclusiveLock(Oid dbOid, Oid relOid);
 extern void LogAccessExclusiveLockPrepare(void);
 
 extern XLogRecPtr LogStandbySnapshot(void);
+extern void LogStandbyInvalidations(int nmsgs, SharedInvalidationMessage *msgs,
+									bool relcacheInitFileInval);
 
 #endif   /* STANDBY_H */
diff --git a/src/include/storage/standbydefs.h b/src/include/storage/standbydefs.h
index 609d06e..bd3c97f 100644
--- a/src/include/storage/standbydefs.h
+++ b/src/include/storage/standbydefs.h
@@ -17,17 +17,23 @@
 #include "access/xlogreader.h"
 #include "lib/stringinfo.h"
 #include "storage/lockdefs.h"
+#include "storage/sinval.h"
 
 /* Recovery handlers for the Standby Rmgr (RM_STANDBY_ID) */
 extern void standby_redo(XLogReaderState *record);
 extern void standby_desc(StringInfo buf, XLogReaderState *record);
 extern const char *standby_identify(uint8 info);
+extern void standby_desc_invalidations(StringInfo buf,
+									   int nmsgs, SharedInvalidationMessage *msgs,
+									   Oid dbId, Oid tsId,
+									   bool relcacheInitFileInval);
 
 /*
  * XLOG message types
  */
 #define XLOG_STANDBY_LOCK			0x00
 #define XLOG_RUNNING_XACTS			0x10
+#define XLOG_INVALIDATIONS			0x20
 
 typedef struct xl_standby_locks
 {
@@ -50,4 +56,19 @@ typedef struct xl_running_xacts
 	TransactionId xids[FLEXIBLE_ARRAY_MEMBER];
 } xl_running_xacts;
 
+/*
+ * Invalidations for standby, currently only when transactions without an
+ * assigned xid commit.
+ */
+typedef struct xl_invalidations
+{
+	Oid			dbId;			/* MyDatabaseId */
+	Oid			tsId;			/* MyDatabaseTableSpace */
+	bool		relcacheInitFileInval;	/* invalidate relcache init file */
+	int			nmsgs;			/* number of shared inval msgs */
+	SharedInvalidationMessage msgs[FLEXIBLE_ARRAY_MEMBER];
+} xl_invalidations;
+
+#define MinSizeOfInvalidations offsetof(xl_invalidations, msgs)
+
 #endif   /* STANDBYDEFS_H */
-- 
2.7.0.229.g701fa7f

-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to