I wrote:
> This is uncomfortably much in bed with the tuple table slot code,
> perhaps, but I don't see a way to do it more cleanly unless we want
> to add some new provisions to that API.  Andres, do you have any
> thoughts about that?

Oh!  After nosing around a bit more I remembered systable_recheck_tuple,
which is meant for exactly this purpose.  So v4 attached.

                        regards, tom lane

diff --git a/src/backend/utils/cache/catcache.c b/src/backend/utils/cache/catcache.c
index c0591cffe5..0dcd45d2f0 100644
--- a/src/backend/utils/cache/catcache.c
+++ b/src/backend/utils/cache/catcache.c
@@ -24,6 +24,7 @@
 #include "catalog/pg_operator.h"
 #include "catalog/pg_type.h"
 #include "common/hashfn.h"
+#include "common/pg_prng.h"
 #include "miscadmin.h"
 #include "port/pg_bitutils.h"
 #ifdef CATCACHE_STATS
@@ -90,10 +91,10 @@ static void CatCachePrintStats(int code, Datum arg);
 static void CatCacheRemoveCTup(CatCache *cache, CatCTup *ct);
 static void CatCacheRemoveCList(CatCache *cache, CatCList *cl);
 static void CatalogCacheInitializeCache(CatCache *cache);
-static CatCTup *CatalogCacheCreateEntry(CatCache *cache, HeapTuple ntp,
+static CatCTup *CatalogCacheCreateEntry(CatCache *cache,
+										HeapTuple ntp, SysScanDesc scandesc,
 										Datum *arguments,
-										uint32 hashValue, Index hashIndex,
-										bool negative);
+										uint32 hashValue, Index hashIndex);
 
 static void ReleaseCatCacheWithOwner(HeapTuple tuple, ResourceOwner resowner);
 static void ReleaseCatCacheListWithOwner(CatCList *list, ResourceOwner resowner);
@@ -1372,6 +1373,7 @@ SearchCatCacheMiss(CatCache *cache,
 	SysScanDesc scandesc;
 	HeapTuple	ntp;
 	CatCTup    *ct;
+	bool		stale;
 	Datum		arguments[CATCACHE_MAXKEYS];
 
 	/* Initialize local parameter array */
@@ -1380,16 +1382,6 @@ SearchCatCacheMiss(CatCache *cache,
 	arguments[2] = v3;
 	arguments[3] = v4;
 
-	/*
-	 * Ok, need to make a lookup in the relation, copy the scankey and fill
-	 * out any per-call fields.
-	 */
-	memcpy(cur_skey, cache->cc_skey, sizeof(ScanKeyData) * nkeys);
-	cur_skey[0].sk_argument = v1;
-	cur_skey[1].sk_argument = v2;
-	cur_skey[2].sk_argument = v3;
-	cur_skey[3].sk_argument = v4;
-
 	/*
 	 * Tuple was not found in cache, so we have to try to retrieve it directly
 	 * from the relation.  If found, we will add it to the cache; if not
@@ -1404,9 +1396,28 @@ SearchCatCacheMiss(CatCache *cache,
 	 * will eventually age out of the cache, so there's no functional problem.
 	 * This case is rare enough that it's not worth expending extra cycles to
 	 * detect.
+	 *
+	 * Another case, which we *must* handle, is that the tuple could become
+	 * outdated during CatalogCacheCreateEntry's attempt to detoast it (since
+	 * AcceptInvalidationMessages can run during TOAST table access).  We do
+	 * not want to return already-stale catcache entries, so we loop around
+	 * and do the table scan again if that happens.
 	 */
 	relation = table_open(cache->cc_reloid, AccessShareLock);
 
+	do
+	{
+		/*
+		 * Ok, need to make a lookup in the relation, copy the scankey and
+		 * fill out any per-call fields.  (We must re-do this when retrying,
+		 * because systable_beginscan scribbles on the scankey.)
+		 */
+		memcpy(cur_skey, cache->cc_skey, sizeof(ScanKeyData) * nkeys);
+		cur_skey[0].sk_argument = v1;
+		cur_skey[1].sk_argument = v2;
+		cur_skey[2].sk_argument = v3;
+		cur_skey[3].sk_argument = v4;
+
 	scandesc = systable_beginscan(relation,
 								  cache->cc_indexoid,
 								  IndexScanOK(cache, cur_skey),
@@ -1415,12 +1426,18 @@ SearchCatCacheMiss(CatCache *cache,
 								  cur_skey);
 
 	ct = NULL;
+	stale = false;
 
 	while (HeapTupleIsValid(ntp = systable_getnext(scandesc)))
 	{
-		ct = CatalogCacheCreateEntry(cache, ntp, arguments,
-									 hashValue, hashIndex,
-									 false);
+		ct = CatalogCacheCreateEntry(cache, ntp, scandesc, NULL,
+									 hashValue, hashIndex);
+		/* upon failure, we must start the scan over */
+		if (ct == NULL)
+		{
+			stale = true;
+			break;
+		}
 		/* immediately set the refcount to 1 */
 		ResourceOwnerEnlarge(CurrentResourceOwner);
 		ct->refcount++;
@@ -1429,6 +1446,7 @@ SearchCatCacheMiss(CatCache *cache,
 	}
 
 	systable_endscan(scandesc);
+	} while (stale);
 
 	table_close(relation, AccessShareLock);
 
@@ -1447,9 +1465,11 @@ SearchCatCacheMiss(CatCache *cache,
 		if (IsBootstrapProcessingMode())
 			return NULL;
 
-		ct = CatalogCacheCreateEntry(cache, NULL, arguments,
-									 hashValue, hashIndex,
-									 true);
+		ct = CatalogCacheCreateEntry(cache, NULL, NULL, arguments,
+									 hashValue, hashIndex);
+
+		/* Creating a negative cache entry shouldn't fail */
+		Assert(ct != NULL);
 
 		CACHE_elog(DEBUG2, "SearchCatCache(%s): Contains %d/%d tuples",
 				   cache->cc_relname, cache->cc_ntup, CacheHdr->ch_ntup);
@@ -1663,7 +1683,8 @@ SearchCatCacheList(CatCache *cache,
 	 * We have to bump the member refcounts temporarily to ensure they won't
 	 * get dropped from the cache while loading other members. We use a PG_TRY
 	 * block to ensure we can undo those refcounts if we get an error before
-	 * we finish constructing the CatCList.
+	 * we finish constructing the CatCList.  ctlist must be valid throughout
+	 * the PG_TRY block.
 	 */
 	ctlist = NIL;
 
@@ -1672,19 +1693,23 @@ SearchCatCacheList(CatCache *cache,
 		ScanKeyData cur_skey[CATCACHE_MAXKEYS];
 		Relation	relation;
 		SysScanDesc scandesc;
-
-		/*
-		 * Ok, need to make a lookup in the relation, copy the scankey and
-		 * fill out any per-call fields.
-		 */
-		memcpy(cur_skey, cache->cc_skey, sizeof(ScanKeyData) * cache->cc_nkeys);
-		cur_skey[0].sk_argument = v1;
-		cur_skey[1].sk_argument = v2;
-		cur_skey[2].sk_argument = v3;
-		cur_skey[3].sk_argument = v4;
+		bool		stale;
 
 		relation = table_open(cache->cc_reloid, AccessShareLock);
 
+		do
+		{
+			/*
+			 * Ok, need to make a lookup in the relation, copy the scankey and
+			 * fill out any per-call fields.  (We must re-do this when
+			 * retrying, because systable_beginscan scribbles on the scankey.)
+			 */
+			memcpy(cur_skey, cache->cc_skey, sizeof(ScanKeyData) * cache->cc_nkeys);
+			cur_skey[0].sk_argument = v1;
+			cur_skey[1].sk_argument = v2;
+			cur_skey[2].sk_argument = v3;
+			cur_skey[3].sk_argument = v4;
+
 		scandesc = systable_beginscan(relation,
 									  cache->cc_indexoid,
 									  IndexScanOK(cache, cur_skey),
@@ -1695,6 +1720,8 @@ SearchCatCacheList(CatCache *cache,
 		/* The list will be ordered iff we are doing an index scan */
 		ordered = (scandesc->irel != NULL);
 
+		stale = false;
+
 		while (HeapTupleIsValid(ntp = systable_getnext(scandesc)))
 		{
 			uint32		hashValue;
@@ -1737,9 +1764,32 @@ SearchCatCacheList(CatCache *cache,
 			if (!found)
 			{
 				/* We didn't find a usable entry, so make a new one */
-				ct = CatalogCacheCreateEntry(cache, ntp, arguments,
-											 hashValue, hashIndex,
-											 false);
+				ct = CatalogCacheCreateEntry(cache, ntp, scandesc, NULL,
+											 hashValue, hashIndex);
+				/* upon failure, we must start the scan over */
+				if (ct == NULL)
+				{
+					/*
+					 * Release refcounts on any items we already had.  We dare
+					 * not try to free them if they're now unreferenced, since
+					 * an error while doing that would result in the PG_CATCH
+					 * below doing extra refcount decrements.  Besides, we'll
+					 * likely re-adopt those items in the next iteration, so
+					 * it's not worth complicating matters to try to get rid
+					 * of them.
+					 */
+					foreach(ctlist_item, ctlist)
+					{
+						ct = (CatCTup *) lfirst(ctlist_item);
+						Assert(ct->c_list == NULL);
+						Assert(ct->refcount > 0);
+						ct->refcount--;
+					}
+					/* Reset ctlist in preparation for new try */
+					ctlist = NIL;
+					stale = true;
+					break;
+				}
 			}
 
 			/* Careful here: add entry to ctlist, then bump its refcount */
@@ -1749,6 +1799,7 @@ SearchCatCacheList(CatCache *cache,
 		}
 
 		systable_endscan(scandesc);
+		} while (stale);
 
 		table_close(relation, AccessShareLock);
 
@@ -1865,22 +1916,42 @@ ReleaseCatCacheListWithOwner(CatCList *list, ResourceOwner resowner)
  * CatalogCacheCreateEntry
  *		Create a new CatCTup entry, copying the given HeapTuple and other
  *		supplied data into it.  The new entry initially has refcount 0.
+ *
+ * To create a normal cache entry, ntp must be the HeapTuple just fetched
+ * from scandesc, and "arguments" is not used.  To create a negative cache
+ * entry, pass NULL for ntp and scandesc; then "arguments" is the cache
+ * keys to use.  In either case, hashValue/hashIndex are the hash values
+ * computed from the cache keys.
+ *
+ * Returns NULL if we attempt to detoast the tuple and observe that it
+ * became stale.  (This cannot happen for a negative entry.)  Caller must
+ * retry the tuple lookup in that case.
  */
 static CatCTup *
-CatalogCacheCreateEntry(CatCache *cache, HeapTuple ntp, Datum *arguments,
-						uint32 hashValue, Index hashIndex,
-						bool negative)
+CatalogCacheCreateEntry(CatCache *cache, HeapTuple ntp, SysScanDesc scandesc,
+						Datum *arguments,
+						uint32 hashValue, Index hashIndex)
 {
 	CatCTup    *ct;
 	HeapTuple	dtp;
 	MemoryContext oldcxt;
 
-	/* negative entries have no tuple associated */
 	if (ntp)
 	{
 		int			i;
 
-		Assert(!negative);
+		/*
+		 * The visibility recheck below essentially never fails during our
+		 * regression tests, and there's no easy way to force it to fail for
+		 * testing purposes.  To ensure we have test coverage for the retry
+		 * paths in our callers, make debug builds randomly fail about 0.1% of
+		 * the times through this code path, even when there's no toasted
+		 * fields.
+		 */
+#ifdef USE_ASSERT_CHECKING
+		if (pg_prng_uint32(&pg_global_prng_state) <= (PG_UINT32_MAX / 1000))
+			return NULL;
+#endif
 
 		/*
 		 * If there are any out-of-line toasted fields in the tuple, expand
@@ -1890,7 +1961,20 @@ CatalogCacheCreateEntry(CatCache *cache, HeapTuple ntp, Datum *arguments,
 		 * something using a slightly stale catcache entry.
 		 */
 		if (HeapTupleHasExternal(ntp))
+		{
 			dtp = toast_flatten_tuple(ntp, cache->cc_tupdesc);
+
+			/*
+			 * The tuple could become stale while we are doing toast table
+			 * access (since AcceptInvalidationMessages can run then), so we
+			 * must recheck its visibility afterwards.
+			 */
+			if (!systable_recheck_tuple(scandesc, ntp))
+			{
+				heap_freetuple(dtp);
+				return NULL;
+			}
+		}
 		else
 			dtp = ntp;
 
@@ -1929,7 +2013,7 @@ CatalogCacheCreateEntry(CatCache *cache, HeapTuple ntp, Datum *arguments,
 	}
 	else
 	{
-		Assert(negative);
+		/* Set up keys for a negative cache entry */
 		oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
 		ct = (CatCTup *) palloc(sizeof(CatCTup));
 
@@ -1951,7 +2035,7 @@ CatalogCacheCreateEntry(CatCache *cache, HeapTuple ntp, Datum *arguments,
 	ct->c_list = NULL;
 	ct->refcount = 0;			/* for the moment */
 	ct->dead = false;
-	ct->negative = negative;
+	ct->negative = (ntp == NULL);
 	ct->hash_value = hashValue;
 
 	dlist_push_head(&cache->cc_bucket[hashIndex], &ct->cache_elem);

Reply via email to