On Tuesday, September 15, 2015 12:07 PM, Alvaro Herrera 
<alvhe...@2ndquadrant.com> wrote:
> Kevin Grittner wrote:
>> Alvaro Herrera <alvhe...@2ndquadrant.com> wrote:

>>> I would place it inside src/test/modules, so that buildfarm
>>> runs it automatically; I'm not sure it will pick up things in
>>> src/test.
>>
>> As it stands now, the test is getting run as part of `make
>> check-world`, and it seems like src/test/modules is about testing
>> separate executables, so I don't think it makes sense to move the
>> tests -- but I could be convinced that I'm missing something.
>
> It's not conceived just as a way to test separate executables; maybe it
> is at the moment (though I don't think it is?) but if so that's just an
> accident.  The intention is to have modules that get tested without them
> being installed, which wasn't the case when they were in contrib.
>
> The problem with check-world is that buildfarm doesn't run it.  We don't
> want to set up separate buildfarm modules for each subdir in src/test;
> that would be pretty tedious.

OK, moved.

All other issues raised by Álvaro and Steve have been addressed,
except for this one, which I will argue against:

> So if I understand correctly, every call to BufferGetPage needs to have
> a TestForOldSnapshot immediately afterwards?  It seems easy to later
> introduce places that fail to test for old snapshots.  What happens if
> they do?  Does vacuum remove tuples anyway and then the query returns
> wrong results?  That seems pretty dangerous.  Maybe the snapshot could
> be an argument of BufferGetPage?

There are 486 occurences of BufferGetPage in the source code, and
this patch follows 36 of them with TestForOldSnapshot.  This only
needs to be done when a page is going to be used for a scan which
produces user-visible results.  That is, it is *not* needed for
positioning within indexes to add or vacuum away entries, for heap
inserts or deletions (assuming the row to be deleted has already
been found).  It seems wrong to modify about 450 BufferGetPage
references to add a NULL parameter; and if we do want to make that
noop change as insurance, it seems like it should be a separate
patch, since the substance of this patch would be buried under the
volume of that.

I will add this to the November CF.

--
Kevin Grittner
EDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company
diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml
index 5081da0..33b225a 100644
--- a/doc/src/sgml/config.sgml
+++ b/doc/src/sgml/config.sgml
@@ -1944,6 +1944,42 @@ include_dir 'conf.d'
         </para>
        </listitem>
       </varlistentry>
+
+      <varlistentry id="guc-old-snapshot-threshold" xreflabel="old_snapshot_threshold">
+       <term><varname>old_snapshot_threshold</varname> (<type>integer</type>)
+       <indexterm>
+        <primary><varname>old_snapshot_threshold</> configuration parameter</primary>
+       </indexterm>
+       </term>
+       <listitem>
+        <para>
+         Sets the minimum time that a snapshot can be used without risk of a
+         <literal>snapshot too old</> error occurring when using the snapshot.
+         This parameter can only be set at server start.
+        </para>
+
+        <para>
+         Beyond the threshold, old data may be vacuumed away.  This can help
+         prevent bloat in the face of snapshots which remain in use for a
+         long time.  To prevent incorrect results due to cleanup of data which
+         would otherwise be visible to the snapshot, an error is generated
+         when the snapshot is older than this threshold and the snapshot is
+         used to read a page which has been modified since the snapshot was
+         built.
+        </para>
+
+        <para>
+         A value of <literal>-1</> disables this feature, and is the default.
+         Useful values for production work probably range from a small number
+         of hours to a few days.  The setting will be coerced to a granularity
+         of minutes, and small numbers (such as <literal>0</> or
+         <literal>1min</>) are only allowed because they may sometimes be
+         useful for testing.  While a setting as high as <literal>60d</> is
+         allowed, please note that in many workloads extreme bloat or
+         transaction ID wraparound may occur in much shorter time frames.
+        </para>
+       </listitem>
+      </varlistentry>
      </variablelist>
     </sect2>
    </sect1>
@@ -2869,6 +2905,10 @@ include_dir 'conf.d'
         You should also consider setting <varname>hot_standby_feedback</>
         on standby server(s) as an alternative to using this parameter.
        </para>
+       <para>
+        This does not prevent cleanup of dead rows which have reached the age
+        specified by <varname>old_snapshot_threshold</>.
+       </para>
       </listitem>
      </varlistentry>
 
@@ -3016,6 +3056,16 @@ include_dir 'conf.d'
         until it eventually reaches the primary.  Standbys make no other use
         of feedback they receive other than to pass upstream.
        </para>
+       <para>
+        This setting does not override the behavior of
+        <varname>old_snapshot_threshold</> on the primary; a snapshot on the
+        standby which exceeds the primary's age threshold can become invalid,
+        resulting in cancellation of transactions on the standby.  This is
+        because <varname>old_snapshot_threshold</> is intended to provide an
+        absolute limit on the time which dead rows can contribute to bloat,
+        which would otherwise be violated because of the configuration of a
+        standby.
+       </para>
       </listitem>
      </varlistentry>
 
diff --git a/src/backend/access/brin/brin.c b/src/backend/access/brin/brin.c
index 99337b0..6e65a8d 100644
--- a/src/backend/access/brin/brin.c
+++ b/src/backend/access/brin/brin.c
@@ -96,7 +96,7 @@ brininsert(PG_FUNCTION_ARGS)
 	MemoryContext tupcxt = NULL;
 	MemoryContext oldcxt = NULL;
 
-	revmap = brinRevmapInitialize(idxRel, &pagesPerRange);
+	revmap = brinRevmapInitialize(idxRel, &pagesPerRange, NULL);
 
 	for (;;)
 	{
@@ -113,7 +113,7 @@ brininsert(PG_FUNCTION_ARGS)
 		/* normalize the block number to be the first block in the range */
 		heapBlk = (heapBlk / pagesPerRange) * pagesPerRange;
 		brtup = brinGetTupleForHeapBlock(revmap, heapBlk, &buf, &off, NULL,
-										 BUFFER_LOCK_SHARE);
+										 BUFFER_LOCK_SHARE, NULL);
 
 		/* if range is unsummarized, there's nothing to do */
 		if (!brtup)
@@ -248,7 +248,8 @@ brinbeginscan(PG_FUNCTION_ARGS)
 	scan = RelationGetIndexScan(r, nkeys, norderbys);
 
 	opaque = (BrinOpaque *) palloc(sizeof(BrinOpaque));
-	opaque->bo_rmAccess = brinRevmapInitialize(r, &opaque->bo_pagesPerRange);
+	opaque->bo_rmAccess = brinRevmapInitialize(r, &opaque->bo_pagesPerRange,
+											   scan->xs_snapshot);
 	opaque->bo_bdesc = brin_build_desc(r);
 	scan->opaque = opaque;
 
@@ -333,7 +334,8 @@ bringetbitmap(PG_FUNCTION_ARGS)
 		MemoryContextResetAndDeleteChildren(perRangeCxt);
 
 		tup = brinGetTupleForHeapBlock(opaque->bo_rmAccess, heapBlk, &buf,
-									   &off, &size, BUFFER_LOCK_SHARE);
+									   &off, &size, BUFFER_LOCK_SHARE,
+									   scan->xs_snapshot);
 		if (tup)
 		{
 			tup = brin_copy_tuple(tup, size);
@@ -637,7 +639,7 @@ brinbuild(PG_FUNCTION_ARGS)
 	/*
 	 * Initialize our state, including the deformed tuple state.
 	 */
-	revmap = brinRevmapInitialize(index, &pagesPerRange);
+	revmap = brinRevmapInitialize(index, &pagesPerRange, NULL);
 	state = initialize_brin_buildstate(index, revmap, pagesPerRange);
 
 	/*
@@ -1007,7 +1009,8 @@ summarize_range(IndexInfo *indexInfo, BrinBuildState *state, Relation heapRel,
 		 * the same.)
 		 */
 		phtup = brinGetTupleForHeapBlock(state->bs_rmAccess, heapBlk, &phbuf,
-										 &offset, &phsz, BUFFER_LOCK_SHARE);
+										 &offset, &phsz, BUFFER_LOCK_SHARE,
+										 NULL);
 		/* the placeholder tuple must exist */
 		if (phtup == NULL)
 			elog(ERROR, "missing placeholder tuple");
@@ -1042,7 +1045,7 @@ brinsummarize(Relation index, Relation heapRel, double *numSummarized,
 	BlockNumber pagesPerRange;
 	Buffer		buf;
 
-	revmap = brinRevmapInitialize(index, &pagesPerRange);
+	revmap = brinRevmapInitialize(index, &pagesPerRange, NULL);
 
 	/*
 	 * Scan the revmap to find unsummarized items.
@@ -1057,7 +1060,7 @@ brinsummarize(Relation index, Relation heapRel, double *numSummarized,
 		CHECK_FOR_INTERRUPTS();
 
 		tup = brinGetTupleForHeapBlock(revmap, heapBlk, &buf, &off, NULL,
-									   BUFFER_LOCK_SHARE);
+									   BUFFER_LOCK_SHARE, NULL);
 		if (tup == NULL)
 		{
 			/* no revmap entry for this heap range. Summarize it. */
diff --git a/src/backend/access/brin/brin_revmap.c b/src/backend/access/brin/brin_revmap.c
index 6ddcfda..6e42881 100644
--- a/src/backend/access/brin/brin_revmap.c
+++ b/src/backend/access/brin/brin_revmap.c
@@ -68,15 +68,19 @@ static void revmap_physical_extend(BrinRevmap *revmap);
  * brinRevmapTerminate when caller is done with it.
  */
 BrinRevmap *
-brinRevmapInitialize(Relation idxrel, BlockNumber *pagesPerRange)
+brinRevmapInitialize(Relation idxrel, BlockNumber *pagesPerRange,
+					 Snapshot snapshot)
 {
 	BrinRevmap *revmap;
 	Buffer		meta;
 	BrinMetaPageData *metadata;
+	Page		page;
 
 	meta = ReadBuffer(idxrel, BRIN_METAPAGE_BLKNO);
 	LockBuffer(meta, BUFFER_LOCK_SHARE);
-	metadata = (BrinMetaPageData *) PageGetContents(BufferGetPage(meta));
+	page = BufferGetPage(meta);
+	TestForOldSnapshot(snapshot, idxrel, page);
+	metadata = (BrinMetaPageData *) PageGetContents(page);
 
 	revmap = palloc(sizeof(BrinRevmap));
 	revmap->rm_irel = idxrel;
@@ -185,7 +189,8 @@ brinSetHeapBlockItemptr(Buffer buf, BlockNumber pagesPerRange,
  */
 BrinTuple *
 brinGetTupleForHeapBlock(BrinRevmap *revmap, BlockNumber heapBlk,
-						 Buffer *buf, OffsetNumber *off, Size *size, int mode)
+						 Buffer *buf, OffsetNumber *off, Size *size, int mode,
+						 Snapshot snapshot)
 {
 	Relation	idxRel = revmap->rm_irel;
 	BlockNumber mapBlk;
@@ -262,6 +267,7 @@ brinGetTupleForHeapBlock(BrinRevmap *revmap, BlockNumber heapBlk,
 		}
 		LockBuffer(*buf, mode);
 		page = BufferGetPage(*buf);
+		TestForOldSnapshot(snapshot, idxRel, page);
 
 		/* If we land on a revmap page, start over */
 		if (BRIN_IS_REGULAR_PAGE(page))
diff --git a/src/backend/access/gin/ginbtree.c b/src/backend/access/gin/ginbtree.c
index f0ff91a..f68629e 100644
--- a/src/backend/access/gin/ginbtree.c
+++ b/src/backend/access/gin/ginbtree.c
@@ -71,7 +71,7 @@ ginTraverseLock(Buffer buffer, bool searchMode)
  * is share-locked, and stack->parent is NULL.
  */
 GinBtreeStack *
-ginFindLeafPage(GinBtree btree, bool searchMode)
+ginFindLeafPage(GinBtree btree, bool searchMode, Snapshot snapshot)
 {
 	GinBtreeStack *stack;
 
@@ -90,6 +90,7 @@ ginFindLeafPage(GinBtree btree, bool searchMode)
 		stack->off = InvalidOffsetNumber;
 
 		page = BufferGetPage(stack->buffer);
+		TestForOldSnapshot(snapshot, btree->index, page);
 
 		access = ginTraverseLock(stack->buffer, searchMode);
 
@@ -116,6 +117,7 @@ ginFindLeafPage(GinBtree btree, bool searchMode)
 			stack->buffer = ginStepRight(stack->buffer, btree->index, access);
 			stack->blkno = rightlink;
 			page = BufferGetPage(stack->buffer);
+			TestForOldSnapshot(snapshot, btree->index, page);
 
 			if (!searchMode && GinPageIsIncompleteSplit(page))
 				ginFinishSplit(btree, stack, false, NULL);
diff --git a/src/backend/access/gin/gindatapage.c b/src/backend/access/gin/gindatapage.c
index ec8c94b..597f335 100644
--- a/src/backend/access/gin/gindatapage.c
+++ b/src/backend/access/gin/gindatapage.c
@@ -1820,7 +1820,7 @@ ginInsertItemPointers(Relation index, BlockNumber rootBlkno,
 	{
 		/* search for the leaf page where the first item should go to */
 		btree.itemptr = insertdata.items[insertdata.curitem];
-		stack = ginFindLeafPage(&btree, false);
+		stack = ginFindLeafPage(&btree, false, NULL);
 
 		ginInsertValue(&btree, stack, &insertdata, buildStats);
 	}
@@ -1830,7 +1830,8 @@ ginInsertItemPointers(Relation index, BlockNumber rootBlkno,
  * Starts a new scan on a posting tree.
  */
 GinBtreeStack *
-ginScanBeginPostingTree(GinBtree btree, Relation index, BlockNumber rootBlkno)
+ginScanBeginPostingTree(GinBtree btree, Relation index, BlockNumber rootBlkno,
+						Snapshot snapshot)
 {
 	GinBtreeStack *stack;
 
@@ -1838,7 +1839,7 @@ ginScanBeginPostingTree(GinBtree btree, Relation index, BlockNumber rootBlkno)
 
 	btree->fullScan = TRUE;
 
-	stack = ginFindLeafPage(btree, TRUE);
+	stack = ginFindLeafPage(btree, TRUE, snapshot);
 
 	return stack;
 }
diff --git a/src/backend/access/gin/ginget.c b/src/backend/access/gin/ginget.c
index 54b2db8..7e3be27 100644
--- a/src/backend/access/gin/ginget.c
+++ b/src/backend/access/gin/ginget.c
@@ -19,6 +19,7 @@
 #include "miscadmin.h"
 #include "utils/datum.h"
 #include "utils/memutils.h"
+#include "utils/rel.h"
 
 /* GUC parameter */
 int			GinFuzzySearchLimit = 0;
@@ -63,7 +64,7 @@ moveRightIfItNeeded(GinBtreeData *btree, GinBtreeStack *stack)
  */
 static void
 scanPostingTree(Relation index, GinScanEntry scanEntry,
-				BlockNumber rootPostingTree)
+				BlockNumber rootPostingTree, Snapshot snapshot)
 {
 	GinBtreeData btree;
 	GinBtreeStack *stack;
@@ -71,7 +72,7 @@ scanPostingTree(Relation index, GinScanEntry scanEntry,
 	Page		page;
 
 	/* Descend to the leftmost leaf page */
-	stack = ginScanBeginPostingTree(&btree, index, rootPostingTree);
+	stack = ginScanBeginPostingTree(&btree, index, rootPostingTree, snapshot);
 	buffer = stack->buffer;
 	IncrBufferRefCount(buffer); /* prevent unpin in freeGinBtreeStack */
 
@@ -114,7 +115,7 @@ scanPostingTree(Relation index, GinScanEntry scanEntry,
  */
 static bool
 collectMatchBitmap(GinBtreeData *btree, GinBtreeStack *stack,
-				   GinScanEntry scanEntry)
+				   GinScanEntry scanEntry, Snapshot snapshot)
 {
 	OffsetNumber attnum;
 	Form_pg_attribute attr;
@@ -145,6 +146,7 @@ collectMatchBitmap(GinBtreeData *btree, GinBtreeStack *stack,
 			return true;
 
 		page = BufferGetPage(stack->buffer);
+		TestForOldSnapshot(snapshot, btree->index, page);
 		itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, stack->off));
 
 		/*
@@ -224,7 +226,7 @@ collectMatchBitmap(GinBtreeData *btree, GinBtreeStack *stack,
 			LockBuffer(stack->buffer, GIN_UNLOCK);
 
 			/* Collect all the TIDs in this entry's posting tree */
-			scanPostingTree(btree->index, scanEntry, rootPostingTree);
+			scanPostingTree(btree->index, scanEntry, rootPostingTree, snapshot);
 
 			/*
 			 * We lock again the entry page and while it was unlocked insert
@@ -291,7 +293,7 @@ collectMatchBitmap(GinBtreeData *btree, GinBtreeStack *stack,
  * Start* functions setup beginning state of searches: finds correct buffer and pins it.
  */
 static void
-startScanEntry(GinState *ginstate, GinScanEntry entry)
+startScanEntry(GinState *ginstate, GinScanEntry entry, Snapshot snapshot)
 {
 	GinBtreeData btreeEntry;
 	GinBtreeStack *stackEntry;
@@ -316,7 +318,7 @@ restartScanEntry:
 	ginPrepareEntryScan(&btreeEntry, entry->attnum,
 						entry->queryKey, entry->queryCategory,
 						ginstate);
-	stackEntry = ginFindLeafPage(&btreeEntry, true);
+	stackEntry = ginFindLeafPage(&btreeEntry, true, snapshot);
 	page = BufferGetPage(stackEntry->buffer);
 	needUnlock = TRUE;
 
@@ -333,7 +335,7 @@ restartScanEntry:
 		 * for the entry type.
 		 */
 		btreeEntry.findItem(&btreeEntry, stackEntry);
-		if (collectMatchBitmap(&btreeEntry, stackEntry, entry) == false)
+		if (!collectMatchBitmap(&btreeEntry, stackEntry, entry, snapshot))
 		{
 			/*
 			 * GIN tree was seriously restructured, so we will cleanup all
@@ -381,7 +383,7 @@ restartScanEntry:
 			needUnlock = FALSE;
 
 			stack = ginScanBeginPostingTree(&entry->btree, ginstate->index,
-											rootPostingTree);
+											rootPostingTree, snapshot);
 			entry->buffer = stack->buffer;
 
 			/*
@@ -533,7 +535,7 @@ startScan(IndexScanDesc scan)
 	uint32		i;
 
 	for (i = 0; i < so->totalentries; i++)
-		startScanEntry(ginstate, so->entries[i]);
+		startScanEntry(ginstate, so->entries[i], scan->xs_snapshot);
 
 	if (GinFuzzySearchLimit > 0)
 	{
@@ -578,7 +580,8 @@ startScan(IndexScanDesc scan)
  * keep it pinned to prevent interference with vacuum.
  */
 static void
-entryLoadMoreItems(GinState *ginstate, GinScanEntry entry, ItemPointerData advancePast)
+entryLoadMoreItems(GinState *ginstate, GinScanEntry entry,
+				   ItemPointerData advancePast, Snapshot snapshot)
 {
 	Page		page;
 	int			i;
@@ -622,7 +625,7 @@ entryLoadMoreItems(GinState *ginstate, GinScanEntry entry, ItemPointerData advan
 			entry->btree.itemptr.ip_posid++;
 		}
 		entry->btree.fullScan = false;
-		stack = ginFindLeafPage(&entry->btree, true);
+		stack = ginFindLeafPage(&entry->btree, true, snapshot);
 
 		/* we don't need the stack, just the buffer. */
 		entry->buffer = stack->buffer;
@@ -732,7 +735,7 @@ entryLoadMoreItems(GinState *ginstate, GinScanEntry entry, ItemPointerData advan
  */
 static void
 entryGetItem(GinState *ginstate, GinScanEntry entry,
-			 ItemPointerData advancePast)
+			 ItemPointerData advancePast, Snapshot snapshot)
 {
 	Assert(!entry->isFinished);
 
@@ -855,7 +858,7 @@ entryGetItem(GinState *ginstate, GinScanEntry entry,
 			/* If we've processed the current batch, load more items */
 			while (entry->offset >= entry->nlist)
 			{
-				entryLoadMoreItems(ginstate, entry, advancePast);
+				entryLoadMoreItems(ginstate, entry, advancePast, snapshot);
 
 				if (entry->isFinished)
 				{
@@ -894,7 +897,7 @@ entryGetItem(GinState *ginstate, GinScanEntry entry,
  */
 static void
 keyGetItem(GinState *ginstate, MemoryContext tempCtx, GinScanKey key,
-		   ItemPointerData advancePast)
+		   ItemPointerData advancePast, Snapshot snapshot)
 {
 	ItemPointerData minItem;
 	ItemPointerData curPageLossy;
@@ -941,7 +944,7 @@ keyGetItem(GinState *ginstate, MemoryContext tempCtx, GinScanKey key,
 		 */
 		if (ginCompareItemPointers(&entry->curItem, &advancePast) <= 0)
 		{
-			entryGetItem(ginstate, entry, advancePast);
+			entryGetItem(ginstate, entry, advancePast, snapshot);
 			if (entry->isFinished)
 				continue;
 		}
@@ -999,7 +1002,7 @@ keyGetItem(GinState *ginstate, MemoryContext tempCtx, GinScanKey key,
 
 		if (ginCompareItemPointers(&entry->curItem, &advancePast) <= 0)
 		{
-			entryGetItem(ginstate, entry, advancePast);
+			entryGetItem(ginstate, entry, advancePast, snapshot);
 			if (entry->isFinished)
 				continue;
 		}
@@ -1208,7 +1211,8 @@ scanGetItem(IndexScanDesc scan, ItemPointerData advancePast,
 			GinScanKey	key = so->keys + i;
 
 			/* Fetch the next item for this key that is > advancePast. */
-			keyGetItem(&so->ginstate, so->tempCtx, key, advancePast);
+			keyGetItem(&so->ginstate, so->tempCtx, key, advancePast,
+					   scan->xs_snapshot);
 
 			if (key->isFinished)
 				return false;
@@ -1330,6 +1334,7 @@ scanGetCandidate(IndexScanDesc scan, pendingPosition *pos)
 	for (;;)
 	{
 		page = BufferGetPage(pos->pendingBuffer);
+		TestForOldSnapshot(scan->xs_snapshot, scan->indexRelation, page);
 
 		maxoff = PageGetMaxOffsetNumber(page);
 		if (pos->firstOffset > maxoff)
@@ -1510,6 +1515,7 @@ collectMatchesForHeapRow(IndexScanDesc scan, pendingPosition *pos)
 			   sizeof(bool) * (pos->lastOffset - pos->firstOffset));
 
 		page = BufferGetPage(pos->pendingBuffer);
+		TestForOldSnapshot(scan->xs_snapshot, scan->indexRelation, page);
 
 		for (i = 0; i < so->nkeys; i++)
 		{
@@ -1696,12 +1702,15 @@ scanPendingInsert(IndexScanDesc scan, TIDBitmap *tbm, int64 *ntids)
 	int			i;
 	pendingPosition pos;
 	Buffer		metabuffer = ReadBuffer(scan->indexRelation, GIN_METAPAGE_BLKNO);
+	Page		page;
 	BlockNumber blkno;
 
 	*ntids = 0;
 
 	LockBuffer(metabuffer, GIN_SHARE);
-	blkno = GinPageGetMeta(BufferGetPage(metabuffer))->head;
+	page = BufferGetPage(metabuffer);
+	TestForOldSnapshot(scan->xs_snapshot, scan->indexRelation, page);
+	blkno = GinPageGetMeta(page)->head;
 
 	/*
 	 * fetch head of list before unlocking metapage. head page must be pinned
diff --git a/src/backend/access/gin/gininsert.c b/src/backend/access/gin/gininsert.c
index 49e9185..9c999c1 100644
--- a/src/backend/access/gin/gininsert.c
+++ b/src/backend/access/gin/gininsert.c
@@ -192,7 +192,7 @@ ginEntryInsert(GinState *ginstate,
 
 	ginPrepareEntryScan(&btree, attnum, key, category, ginstate);
 
-	stack = ginFindLeafPage(&btree, false);
+	stack = ginFindLeafPage(&btree, false, NULL);
 	page = BufferGetPage(stack->buffer);
 
 	if (btree.findItem(&btree, stack))
diff --git a/src/backend/access/gist/gistget.c b/src/backend/access/gist/gistget.c
index ce8e582..e41756a 100644
--- a/src/backend/access/gist/gistget.c
+++ b/src/backend/access/gist/gistget.c
@@ -337,6 +337,7 @@ gistScanPage(IndexScanDesc scan, GISTSearchItem *pageItem, double *myDistances,
 	LockBuffer(buffer, GIST_SHARE);
 	gistcheckpage(scan->indexRelation, buffer);
 	page = BufferGetPage(buffer);
+	TestForOldSnapshot(scan->xs_snapshot, r, page);
 	opaque = GistPageGetOpaque(page);
 
 	/*
diff --git a/src/backend/access/hash/hash.c b/src/backend/access/hash/hash.c
index 24b06a5..356dc38 100644
--- a/src/backend/access/hash/hash.c
+++ b/src/backend/access/hash/hash.c
@@ -252,6 +252,7 @@ hashgettuple(PG_FUNCTION_ARGS)
 		buf = so->hashso_curbuf;
 		Assert(BufferIsValid(buf));
 		page = BufferGetPage(buf);
+		TestForOldSnapshot(scan->xs_snapshot, rel, page);
 		maxoffnum = PageGetMaxOffsetNumber(page);
 		for (offnum = ItemPointerGetOffsetNumber(current);
 			 offnum <= maxoffnum;
diff --git a/src/backend/access/hash/hashsearch.c b/src/backend/access/hash/hashsearch.c
index e9d7b7f..3c57170 100644
--- a/src/backend/access/hash/hashsearch.c
+++ b/src/backend/access/hash/hashsearch.c
@@ -188,7 +188,9 @@ _hash_first(IndexScanDesc scan, ScanDirection dir)
 
 	/* Read the metapage */
 	metabuf = _hash_getbuf(rel, HASH_METAPAGE, HASH_READ, LH_META_PAGE);
-	metap = HashPageGetMeta(BufferGetPage(metabuf));
+	page = BufferGetPage(metabuf);
+	TestForOldSnapshot(scan->xs_snapshot, rel, page);
+	metap = HashPageGetMeta(page);
 
 	/*
 	 * Loop until we get a lock on the correct target bucket.
@@ -241,6 +243,7 @@ _hash_first(IndexScanDesc scan, ScanDirection dir)
 	/* Fetch the primary bucket page for the bucket */
 	buf = _hash_getbuf(rel, blkno, HASH_READ, LH_BUCKET_PAGE);
 	page = BufferGetPage(buf);
+	TestForOldSnapshot(scan->xs_snapshot, rel, page);
 	opaque = (HashPageOpaque) PageGetSpecialPointer(page);
 	Assert(opaque->hasho_bucket == bucket);
 
@@ -347,6 +350,7 @@ _hash_step(IndexScanDesc scan, Buffer *bufP, ScanDirection dir)
 					_hash_readnext(rel, &buf, &page, &opaque);
 					if (BufferIsValid(buf))
 					{
+						TestForOldSnapshot(scan->xs_snapshot, rel, page);
 						maxoff = PageGetMaxOffsetNumber(page);
 						offnum = _hash_binsearch(page, so->hashso_sk_hash);
 					}
@@ -388,6 +392,7 @@ _hash_step(IndexScanDesc scan, Buffer *bufP, ScanDirection dir)
 					_hash_readprev(rel, &buf, &page, &opaque);
 					if (BufferIsValid(buf))
 					{
+						TestForOldSnapshot(scan->xs_snapshot, rel, page);
 						maxoff = PageGetMaxOffsetNumber(page);
 						offnum = _hash_binsearch_last(page, so->hashso_sk_hash);
 					}
diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index bcf9871..bd9cc4b 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -382,6 +382,7 @@ heapgetpage(HeapScanDesc scan, BlockNumber page)
 	LockBuffer(buffer, BUFFER_LOCK_SHARE);
 
 	dp = (Page) BufferGetPage(buffer);
+	TestForOldSnapshot(snapshot, scan->rs_rd, dp);
 	lines = PageGetMaxOffsetNumber(dp);
 	ntup = 0;
 
@@ -512,6 +513,7 @@ heapgettup(HeapScanDesc scan,
 		LockBuffer(scan->rs_cbuf, BUFFER_LOCK_SHARE);
 
 		dp = (Page) BufferGetPage(scan->rs_cbuf);
+		TestForOldSnapshot(snapshot, scan->rs_rd, dp);
 		lines = PageGetMaxOffsetNumber(dp);
 		/* page and lineoff now reference the physically next tid */
 
@@ -554,6 +556,7 @@ heapgettup(HeapScanDesc scan,
 		LockBuffer(scan->rs_cbuf, BUFFER_LOCK_SHARE);
 
 		dp = (Page) BufferGetPage(scan->rs_cbuf);
+		TestForOldSnapshot(snapshot, scan->rs_rd, dp);
 		lines = PageGetMaxOffsetNumber(dp);
 
 		if (!scan->rs_inited)
@@ -588,6 +591,7 @@ heapgettup(HeapScanDesc scan,
 
 		/* Since the tuple was previously fetched, needn't lock page here */
 		dp = (Page) BufferGetPage(scan->rs_cbuf);
+		TestForOldSnapshot(snapshot, scan->rs_rd, dp);
 		lineoff = ItemPointerGetOffsetNumber(&(tuple->t_self));
 		lpp = PageGetItemId(dp, lineoff);
 		Assert(ItemIdIsNormal(lpp));
@@ -712,6 +716,7 @@ heapgettup(HeapScanDesc scan,
 		LockBuffer(scan->rs_cbuf, BUFFER_LOCK_SHARE);
 
 		dp = (Page) BufferGetPage(scan->rs_cbuf);
+		TestForOldSnapshot(snapshot, scan->rs_rd, dp);
 		lines = PageGetMaxOffsetNumber((Page) dp);
 		linesleft = lines;
 		if (backward)
@@ -786,6 +791,7 @@ heapgettup_pagemode(HeapScanDesc scan,
 		}
 
 		dp = (Page) BufferGetPage(scan->rs_cbuf);
+		TestForOldSnapshot(scan->rs_snapshot, scan->rs_rd, dp);
 		lines = scan->rs_ntuples;
 		/* page and lineindex now reference the next visible tid */
 
@@ -826,6 +832,7 @@ heapgettup_pagemode(HeapScanDesc scan,
 		}
 
 		dp = (Page) BufferGetPage(scan->rs_cbuf);
+		TestForOldSnapshot(scan->rs_snapshot, scan->rs_rd, dp);
 		lines = scan->rs_ntuples;
 
 		if (!scan->rs_inited)
@@ -859,6 +866,7 @@ heapgettup_pagemode(HeapScanDesc scan,
 
 		/* Since the tuple was previously fetched, needn't lock page here */
 		dp = (Page) BufferGetPage(scan->rs_cbuf);
+		TestForOldSnapshot(scan->rs_snapshot, scan->rs_rd, dp);
 		lineoff = ItemPointerGetOffsetNumber(&(tuple->t_self));
 		lpp = PageGetItemId(dp, lineoff);
 		Assert(ItemIdIsNormal(lpp));
@@ -973,6 +981,7 @@ heapgettup_pagemode(HeapScanDesc scan,
 		heapgetpage(scan, page);
 
 		dp = (Page) BufferGetPage(scan->rs_cbuf);
+		TestForOldSnapshot(scan->rs_snapshot, scan->rs_rd, dp);
 		lines = scan->rs_ntuples;
 		linesleft = lines;
 		if (backward)
@@ -1648,6 +1657,7 @@ heap_fetch(Relation relation,
 	 */
 	LockBuffer(buffer, BUFFER_LOCK_SHARE);
 	page = BufferGetPage(buffer);
+	TestForOldSnapshot(snapshot, relation, page);
 
 	/*
 	 * We'd better check for out-of-range offnum in case of VACUUM since the
@@ -1977,6 +1987,7 @@ heap_get_latest_tid(Relation relation,
 		buffer = ReadBuffer(relation, ItemPointerGetBlockNumber(&ctid));
 		LockBuffer(buffer, BUFFER_LOCK_SHARE);
 		page = BufferGetPage(buffer);
+		TestForOldSnapshot(snapshot, relation, page);
 
 		/*
 		 * Check for bogus item number.  This is not treated as an error
diff --git a/src/backend/access/heap/pruneheap.c b/src/backend/access/heap/pruneheap.c
index 563e5c3..3218fd7 100644
--- a/src/backend/access/heap/pruneheap.c
+++ b/src/backend/access/heap/pruneheap.c
@@ -92,12 +92,21 @@ heap_page_prune_opt(Relation relation, Buffer buffer)
 	 * need to use the horizon that includes slots, otherwise the data-only
 	 * horizon can be used. Note that the toast relation of user defined
 	 * relations are *not* considered catalog relations.
+	 *
+	 * It is OK to apply the old snapshot limit before acquiring the cleanup
+	 * lock because the worst that can happen is that we are not quite as
+	 * aggressive about the cleanup (by however many transaction IDs are
+	 * consumed between this point and acquiring the lock).  This allows us to
+	 * save significant overhead in the case where the page is found not to be
+	 * prunable.
 	 */
 	if (IsCatalogRelation(relation) ||
 		RelationIsAccessibleInLogicalDecoding(relation))
 		OldestXmin = RecentGlobalXmin;
 	else
-		OldestXmin = RecentGlobalDataXmin;
+		OldestXmin =
+				TransactionIdLimitedForOldSnapshots(RecentGlobalDataXmin,
+													relation);
 
 	Assert(TransactionIdIsValid(OldestXmin));
 
diff --git a/src/backend/access/nbtree/nbtinsert.c b/src/backend/access/nbtree/nbtinsert.c
index 77c2fdf..8b9b1b2 100644
--- a/src/backend/access/nbtree/nbtinsert.c
+++ b/src/backend/access/nbtree/nbtinsert.c
@@ -119,7 +119,7 @@ _bt_doinsert(Relation rel, IndexTuple itup,
 
 top:
 	/* find the first page containing this key */
-	stack = _bt_search(rel, natts, itup_scankey, false, &buf, BT_WRITE);
+	stack = _bt_search(rel, natts, itup_scankey, false, &buf, BT_WRITE, NULL);
 
 	offset = InvalidOffsetNumber;
 
@@ -135,7 +135,7 @@ top:
 	 * precise description.
 	 */
 	buf = _bt_moveright(rel, buf, natts, itup_scankey, false,
-						true, stack, BT_WRITE);
+						true, stack, BT_WRITE, NULL);
 
 	/*
 	 * If we're not allowing duplicates, make sure the key isn't already in
@@ -1671,7 +1671,8 @@ _bt_insert_parent(Relation rel,
 			elog(DEBUG2, "concurrent ROOT page split");
 			lpageop = (BTPageOpaque) PageGetSpecialPointer(page);
 			/* Find the leftmost page at the next level up */
-			pbuf = _bt_get_endpoint(rel, lpageop->btpo.level + 1, false);
+			pbuf = _bt_get_endpoint(rel, lpageop->btpo.level + 1, false,
+									NULL);
 			/* Set up a phony stack entry pointing there */
 			stack = &fakestack;
 			stack->bts_blkno = BufferGetBlockNumber(pbuf);
diff --git a/src/backend/access/nbtree/nbtpage.c b/src/backend/access/nbtree/nbtpage.c
index 6e65db9..0ef27ef 100644
--- a/src/backend/access/nbtree/nbtpage.c
+++ b/src/backend/access/nbtree/nbtpage.c
@@ -1255,7 +1255,7 @@ _bt_pagedel(Relation rel, Buffer buf)
 				itup_scankey = _bt_mkscankey(rel, targetkey);
 				/* find the leftmost leaf page containing this key */
 				stack = _bt_search(rel, rel->rd_rel->relnatts, itup_scankey,
-								   false, &lbuf, BT_READ);
+								   false, &lbuf, BT_READ, NULL);
 				/* don't need a pin on the page */
 				_bt_relbuf(rel, lbuf);
 
diff --git a/src/backend/access/nbtree/nbtsearch.c b/src/backend/access/nbtree/nbtsearch.c
index d69a057..bb0bf72 100644
--- a/src/backend/access/nbtree/nbtsearch.c
+++ b/src/backend/access/nbtree/nbtsearch.c
@@ -30,7 +30,7 @@ static bool _bt_readpage(IndexScanDesc scan, ScanDirection dir,
 static void _bt_saveitem(BTScanOpaque so, int itemIndex,
 			 OffsetNumber offnum, IndexTuple itup);
 static bool _bt_steppage(IndexScanDesc scan, ScanDirection dir);
-static Buffer _bt_walk_left(Relation rel, Buffer buf);
+static Buffer _bt_walk_left(Relation rel, Buffer buf, Snapshot snapshot);
 static bool _bt_endpoint(IndexScanDesc scan, ScanDirection dir);
 static void _bt_drop_lock_and_maybe_pin(IndexScanDesc scan, BTScanPos sp);
 
@@ -79,6 +79,10 @@ _bt_drop_lock_and_maybe_pin(IndexScanDesc scan, BTScanPos sp)
  * address of the leaf-page buffer, which is read-locked and pinned.
  * No locks are held on the parent pages, however!
  *
+ * If the snapshot parameter is not NULL, "old snapshot" checking will take
+ * place during the descent through the tree.  This is not needed when
+ * positioning for an insert or delete, so NULL is used for those cases.
+ *
  * NOTE that the returned buffer is read-locked regardless of the access
  * parameter.  However, access = BT_WRITE will allow an empty root page
  * to be created and returned.  When access = BT_READ, an empty index
@@ -87,7 +91,7 @@ _bt_drop_lock_and_maybe_pin(IndexScanDesc scan, BTScanPos sp)
  */
 BTStack
 _bt_search(Relation rel, int keysz, ScanKey scankey, bool nextkey,
-		   Buffer *bufP, int access)
+		   Buffer *bufP, int access, Snapshot snapshot)
 {
 	BTStack		stack_in = NULL;
 
@@ -96,7 +100,9 @@ _bt_search(Relation rel, int keysz, ScanKey scankey, bool nextkey,
 
 	/* If index is empty and access = BT_READ, no root page is created. */
 	if (!BufferIsValid(*bufP))
+	{
 		return (BTStack) NULL;
+	}
 
 	/* Loop iterates once per level descended in the tree */
 	for (;;)
@@ -124,7 +130,7 @@ _bt_search(Relation rel, int keysz, ScanKey scankey, bool nextkey,
 		 */
 		*bufP = _bt_moveright(rel, *bufP, keysz, scankey, nextkey,
 							  (access == BT_WRITE), stack_in,
-							  BT_READ);
+							  BT_READ, snapshot);
 
 		/* if this is a leaf page, we're done */
 		page = BufferGetPage(*bufP);
@@ -197,6 +203,10 @@ _bt_search(Relation rel, int keysz, ScanKey scankey, bool nextkey,
  * On entry, we have the buffer pinned and a lock of the type specified by
  * 'access'.  If we move right, we release the buffer and lock and acquire
  * the same on the right sibling.  Return value is the buffer we stop at.
+ *
+ * If the snapshot parameter is not NULL, "old snapshot" checking will take
+ * place during the descent through the tree.  This is not needed when
+ * positioning for an insert or delete, so NULL is used for those cases.
  */
 Buffer
 _bt_moveright(Relation rel,
@@ -206,7 +216,8 @@ _bt_moveright(Relation rel,
 			  bool nextkey,
 			  bool forupdate,
 			  BTStack stack,
-			  int access)
+			  int access,
+			  Snapshot snapshot)
 {
 	Page		page;
 	BTPageOpaque opaque;
@@ -232,6 +243,7 @@ _bt_moveright(Relation rel,
 	for (;;)
 	{
 		page = BufferGetPage(buf);
+		TestForOldSnapshot(snapshot, rel, page);
 		opaque = (BTPageOpaque) PageGetSpecialPointer(page);
 
 		if (P_RIGHTMOST(opaque))
@@ -970,7 +982,8 @@ _bt_first(IndexScanDesc scan, ScanDirection dir)
 	 * Use the manufactured insertion scan key to descend the tree and
 	 * position ourselves on the target leaf page.
 	 */
-	stack = _bt_search(rel, keysCount, scankeys, nextkey, &buf, BT_READ);
+	stack = _bt_search(rel, keysCount, scankeys, nextkey, &buf, BT_READ,
+					   scan->xs_snapshot);
 
 	/* don't need to keep the stack around... */
 	_bt_freestack(stack);
@@ -1363,6 +1376,7 @@ _bt_steppage(IndexScanDesc scan, ScanDirection dir)
 			so->currPos.buf = _bt_getbuf(rel, blkno, BT_READ);
 			/* check for deleted page */
 			page = BufferGetPage(so->currPos.buf);
+			TestForOldSnapshot(scan->xs_snapshot, rel, page);
 			opaque = (BTPageOpaque) PageGetSpecialPointer(page);
 			if (!P_IGNORE(opaque))
 			{
@@ -1421,7 +1435,8 @@ _bt_steppage(IndexScanDesc scan, ScanDirection dir)
 			}
 
 			/* Step to next physical page */
-			so->currPos.buf = _bt_walk_left(rel, so->currPos.buf);
+			so->currPos.buf = _bt_walk_left(rel, so->currPos.buf,
+											scan->xs_snapshot);
 
 			/* if we're physically at end of index, return failure */
 			if (so->currPos.buf == InvalidBuffer)
@@ -1436,6 +1451,7 @@ _bt_steppage(IndexScanDesc scan, ScanDirection dir)
 			 * and do it all again.
 			 */
 			page = BufferGetPage(so->currPos.buf);
+			TestForOldSnapshot(scan->xs_snapshot, rel, page);
 			opaque = (BTPageOpaque) PageGetSpecialPointer(page);
 			if (!P_IGNORE(opaque))
 			{
@@ -1469,7 +1485,7 @@ _bt_steppage(IndexScanDesc scan, ScanDirection dir)
  * again if it's important.
  */
 static Buffer
-_bt_walk_left(Relation rel, Buffer buf)
+_bt_walk_left(Relation rel, Buffer buf, Snapshot snapshot)
 {
 	Page		page;
 	BTPageOpaque opaque;
@@ -1499,6 +1515,7 @@ _bt_walk_left(Relation rel, Buffer buf)
 		CHECK_FOR_INTERRUPTS();
 		buf = _bt_getbuf(rel, blkno, BT_READ);
 		page = BufferGetPage(buf);
+		TestForOldSnapshot(snapshot, rel, page);
 		opaque = (BTPageOpaque) PageGetSpecialPointer(page);
 
 		/*
@@ -1525,12 +1542,14 @@ _bt_walk_left(Relation rel, Buffer buf)
 			blkno = opaque->btpo_next;
 			buf = _bt_relandgetbuf(rel, buf, blkno, BT_READ);
 			page = BufferGetPage(buf);
+			TestForOldSnapshot(snapshot, rel, page);
 			opaque = (BTPageOpaque) PageGetSpecialPointer(page);
 		}
 
 		/* Return to the original page to see what's up */
 		buf = _bt_relandgetbuf(rel, buf, obknum, BT_READ);
 		page = BufferGetPage(buf);
+		TestForOldSnapshot(snapshot, rel, page);
 		opaque = (BTPageOpaque) PageGetSpecialPointer(page);
 		if (P_ISDELETED(opaque))
 		{
@@ -1548,6 +1567,7 @@ _bt_walk_left(Relation rel, Buffer buf)
 				blkno = opaque->btpo_next;
 				buf = _bt_relandgetbuf(rel, buf, blkno, BT_READ);
 				page = BufferGetPage(buf);
+				TestForOldSnapshot(snapshot, rel, page);
 				opaque = (BTPageOpaque) PageGetSpecialPointer(page);
 				if (!P_ISDELETED(opaque))
 					break;
@@ -1584,7 +1604,8 @@ _bt_walk_left(Relation rel, Buffer buf)
  * The returned buffer is pinned and read-locked.
  */
 Buffer
-_bt_get_endpoint(Relation rel, uint32 level, bool rightmost)
+_bt_get_endpoint(Relation rel, uint32 level, bool rightmost,
+				 Snapshot snapshot)
 {
 	Buffer		buf;
 	Page		page;
@@ -1607,6 +1628,7 @@ _bt_get_endpoint(Relation rel, uint32 level, bool rightmost)
 		return InvalidBuffer;
 
 	page = BufferGetPage(buf);
+	TestForOldSnapshot(snapshot, rel, page);
 	opaque = (BTPageOpaque) PageGetSpecialPointer(page);
 
 	for (;;)
@@ -1626,6 +1648,7 @@ _bt_get_endpoint(Relation rel, uint32 level, bool rightmost)
 					 RelationGetRelationName(rel));
 			buf = _bt_relandgetbuf(rel, buf, blkno, BT_READ);
 			page = BufferGetPage(buf);
+			TestForOldSnapshot(snapshot, rel, page);
 			opaque = (BTPageOpaque) PageGetSpecialPointer(page);
 		}
 
@@ -1678,7 +1701,7 @@ _bt_endpoint(IndexScanDesc scan, ScanDirection dir)
 	 * version of _bt_search().  We don't maintain a stack since we know we
 	 * won't need it.
 	 */
-	buf = _bt_get_endpoint(rel, 0, ScanDirectionIsBackward(dir));
+	buf = _bt_get_endpoint(rel, 0, ScanDirectionIsBackward(dir), scan->xs_snapshot);
 
 	if (!BufferIsValid(buf))
 	{
diff --git a/src/backend/access/spgist/spgscan.c b/src/backend/access/spgist/spgscan.c
index 8a0d909..5aa6740 100644
--- a/src/backend/access/spgist/spgscan.c
+++ b/src/backend/access/spgist/spgscan.c
@@ -319,7 +319,7 @@ spgLeafTest(Relation index, SpGistScanOpaque so,
  */
 static void
 spgWalk(Relation index, SpGistScanOpaque so, bool scanWholeIndex,
-		storeRes_func storeRes)
+		storeRes_func storeRes, Snapshot snapshot)
 {
 	Buffer		buffer = InvalidBuffer;
 	bool		reportedSome = false;
@@ -360,6 +360,7 @@ redirect:
 		/* else new pointer points to the same page, no work needed */
 
 		page = BufferGetPage(buffer);
+		TestForOldSnapshot(snapshot, index, page);
 
 		isnull = SpGistPageStoresNulls(page) ? true : false;
 
@@ -584,7 +585,7 @@ spggetbitmap(PG_FUNCTION_ARGS)
 	so->tbm = tbm;
 	so->ntids = 0;
 
-	spgWalk(scan->indexRelation, so, true, storeBitmap);
+	spgWalk(scan->indexRelation, so, true, storeBitmap, scan->xs_snapshot);
 
 	PG_RETURN_INT64(so->ntids);
 }
@@ -645,7 +646,8 @@ spggettuple(PG_FUNCTION_ARGS)
 		}
 		so->iPtr = so->nPtrs = 0;
 
-		spgWalk(scan->indexRelation, so, false, storeGettuple);
+		spgWalk(scan->indexRelation, so, false, storeGettuple,
+				scan->xs_snapshot);
 
 		if (so->nPtrs == 0)
 			break;				/* must have completed scan */
diff --git a/src/backend/commands/vacuum.c b/src/backend/commands/vacuum.c
index 6d55148..ca41ecf 100644
--- a/src/backend/commands/vacuum.c
+++ b/src/backend/commands/vacuum.c
@@ -489,7 +489,8 @@ vacuum_set_xid_limits(Relation rel,
 	 * working on a particular table at any time, and that each vacuum is
 	 * always an independent transaction.
 	 */
-	*oldestXmin = GetOldestXmin(rel, true);
+	*oldestXmin =
+		TransactionIdLimitedForOldSnapshots(GetOldestXmin(rel, true), rel);
 
 	Assert(TransactionIdIsNormal(*oldestXmin));
 
diff --git a/src/backend/commands/vacuumlazy.c b/src/backend/commands/vacuumlazy.c
index a01cfb4..c091d14 100644
--- a/src/backend/commands/vacuumlazy.c
+++ b/src/backend/commands/vacuumlazy.c
@@ -270,7 +270,8 @@ lazy_vacuum_rel(Relation onerel, int options, VacuumParams *params,
 	possibly_freeable = vacrelstats->rel_pages - vacrelstats->nonempty_pages;
 	if (possibly_freeable > 0 &&
 		(possibly_freeable >= REL_TRUNCATE_MINIMUM ||
-		 possibly_freeable >= vacrelstats->rel_pages / REL_TRUNCATE_FRACTION))
+		 possibly_freeable >= vacrelstats->rel_pages / REL_TRUNCATE_FRACTION) &&
+		old_snapshot_threshold < 0)
 		lazy_truncate_heap(onerel, vacrelstats);
 
 	/* Vacuum the Free Space Map */
diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c
index 32ac58f..d73ffd3 100644
--- a/src/backend/storage/ipc/ipci.c
+++ b/src/backend/storage/ipc/ipci.c
@@ -43,6 +43,7 @@
 #include "storage/procsignal.h"
 #include "storage/sinvaladt.h"
 #include "storage/spin.h"
+#include "utils/snapmgr.h"
 
 
 shmem_startup_hook_type shmem_startup_hook = NULL;
@@ -136,6 +137,7 @@ CreateSharedMemoryAndSemaphores(bool makePrivate, int port)
 		size = add_size(size, ReplicationOriginShmemSize());
 		size = add_size(size, WalSndShmemSize());
 		size = add_size(size, WalRcvShmemSize());
+		size = add_size(size, SnapMgrShmemSize());
 		size = add_size(size, BTreeShmemSize());
 		size = add_size(size, SyncScanShmemSize());
 		size = add_size(size, AsyncShmemSize());
@@ -247,6 +249,7 @@ CreateSharedMemoryAndSemaphores(bool makePrivate, int port)
 	/*
 	 * Set up other modules that need some shared memory space
 	 */
+	SnapMgrInit();
 	BTreeShmemInit();
 	SyncScanShmemInit();
 	AsyncShmemInit();
diff --git a/src/backend/storage/ipc/procarray.c b/src/backend/storage/ipc/procarray.c
index b13fe03..ffe01c5 100644
--- a/src/backend/storage/ipc/procarray.c
+++ b/src/backend/storage/ipc/procarray.c
@@ -1749,6 +1749,15 @@ GetSnapshotData(Snapshot snapshot)
 	snapshot->regd_count = 0;
 	snapshot->copied = false;
 
+	/*
+	 * Capture the current time and WAL stream location in case this snapshot
+	 * becomes old enough to need to fall back on the special "old snapshot"
+	 * logic.
+	 */
+	snapshot->lsn = GetXLogInsertRecPtr();
+	snapshot->whenTaken = GetSnapshotCurrentTimestamp();
+	MaintainOldSnapshotTimeMapping(snapshot->whenTaken, xmin);
+
 	return snapshot;
 }
 
diff --git a/src/backend/storage/lmgr/lwlocknames.txt b/src/backend/storage/lmgr/lwlocknames.txt
index c557cb6..f8996cd 100644
--- a/src/backend/storage/lmgr/lwlocknames.txt
+++ b/src/backend/storage/lmgr/lwlocknames.txt
@@ -46,3 +46,4 @@ CommitTsControlLock					38
 CommitTsLock						39
 ReplicationOriginLock				40
 MultiXactTruncationLock				41
+OldSnapshotTimeMapLock				42
diff --git a/src/backend/utils/errcodes.txt b/src/backend/utils/errcodes.txt
index 7b97d45..241998d 100644
--- a/src/backend/utils/errcodes.txt
+++ b/src/backend/utils/errcodes.txt
@@ -413,6 +413,10 @@ Section: Class 58 - System Error (errors external to PostgreSQL itself)
 58P01    E    ERRCODE_UNDEFINED_FILE                                         undefined_file
 58P02    E    ERRCODE_DUPLICATE_FILE                                         duplicate_file
 
+Section: Class 72 - Snapshot Failure
+# (class borrowed from Oracle)
+72000    E    ERRCODE_SNAPSHOT_TOO_OLD                                       snapshot_too_old
+
 Section: Class F0 - Configuration File Error
 
 # (PostgreSQL-specific error class)
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index 71090f2..5fd6fe7 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -2558,6 +2558,17 @@ static struct config_int ConfigureNamesInt[] =
 	},
 
 	{
+		{"old_snapshot_threshold", PGC_POSTMASTER, RESOURCES_ASYNCHRONOUS,
+			gettext_noop("Time before a snapshot is too old to read pages changed after the snapshot was taken."),
+			gettext_noop("A value of -1 disables this feature."),
+			GUC_UNIT_MIN
+		},
+		&old_snapshot_threshold,
+		-1, -1, MINS_PER_HOUR * HOURS_PER_DAY * 60,
+		NULL, NULL, NULL
+	},
+
+	{
 		{"tcp_keepalives_idle", PGC_USERSET, CLIENT_CONN_OTHER,
 			gettext_noop("Time between issuing TCP keepalives."),
 			gettext_noop("A value of 0 uses the system default."),
diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample
index dcf929f..ba27169 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -165,6 +165,8 @@
 #effective_io_concurrency = 1		# 1-1000; 0 disables prefetching
 #max_worker_processes = 8
 #max_parallel_degree = 0		# max number of worker processes per node
+#old_snapshot_threshold = -1		# 1min-60d; -1 disables; 0 is immediate
+									# (change requires restart)
 
 
 #------------------------------------------------------------------------------
diff --git a/src/backend/utils/time/snapmgr.c b/src/backend/utils/time/snapmgr.c
index 074935c..13c58d2 100644
--- a/src/backend/utils/time/snapmgr.c
+++ b/src/backend/utils/time/snapmgr.c
@@ -46,14 +46,18 @@
 
 #include "access/transam.h"
 #include "access/xact.h"
+#include "access/xlog.h"
+#include "catalog/catalog.h"
 #include "lib/pairingheap.h"
 #include "miscadmin.h"
 #include "storage/predicate.h"
 #include "storage/proc.h"
 #include "storage/procarray.h"
 #include "storage/sinval.h"
+#include "storage/spin.h"
 #include "utils/builtins.h"
 #include "utils/memutils.h"
+#include "utils/rel.h"
 #include "utils/resowner_private.h"
 #include "utils/snapmgr.h"
 #include "utils/syscache.h"
@@ -61,6 +65,62 @@
 
 
 /*
+ * GUC parameters
+ */
+int			old_snapshot_threshold;		/* number of minutes, -1 disables */
+
+/*
+ * Structure for dealing with old_snapshot_threshold implementation.
+ */
+typedef struct OldSnapshotControlData
+{
+	/*
+	 * Variables for old snapshot handling are shared among processes and are
+	 * only allowed to move forward.
+	 */
+	slock_t		mutex_current;			/* protect current timestamp */
+	int64		current_timestamp;		/* latest snapshot timestamp */
+	slock_t		mutex_threshold;		/* protect threshold fields */
+	int64		threshold_timestamp;	/* earlier snapshot is old */
+	TransactionId threshold_xid;		/* earlier xid may be gone */
+
+	/*
+	 * Keep one xid per minute for old snapshot error handling.
+	 *
+	 * Use a circular buffer with a head offset, a count of entries currently
+	 * used, and a timestamp corresponding to the xid at the head offset.  A
+	 * count_used value of zero means that there are no times stored; a
+	 * count_used value of old_snapshot_threshold means that the buffer is
+	 * full and the head must be advanced to add new entries.  Use timestamps
+	 * aligned to minute boundaries, since that seems less surprising than
+	 * aligning based on the first usage timestamp.
+	 *
+	 * It is OK if the xid for a given time slot is from earlier than
+	 * calculated by adding the number of minutes corresponding to the
+	 * (possibly wrapped) distance from the head offset to the time of the
+	 * head entry, since that just results in the vacuuming of old tuples
+	 * being slightly less aggressive.  It would not be OK for it to be off in
+	 * the other direction, since it might result in vacuuming tuples that are
+	 * still expected to be there.
+	 *
+	 * Use of an SLRU was considered but not chosen because it is more
+	 * heavyweight than is needed for this, and would probably not be any less
+	 * code to implement.
+	 *
+	 * Persistence is not needed.
+	 */
+	int			head_offset;		/* subscript of oldest tracked time */
+	int64		head_timestamp;		/* time corresponding to head xid */
+	int			count_used;			/* how many slots are in use */
+	TransactionId xid_by_minute[FLEXIBLE_ARRAY_MEMBER];
+}	OldSnapshotControlData;
+
+typedef struct OldSnapshotControlData *OldSnapshotControl;
+
+static volatile OldSnapshotControl oldSnapshotControl;
+
+
+/*
  * CurrentSnapshot points to the only snapshot taken in transaction-snapshot
  * mode, and to the latest one taken in a read-committed transaction.
  * SecondarySnapshot is a snapshot that's always up-to-date as of the current
@@ -153,6 +213,7 @@ static Snapshot FirstXactSnapshot = NULL;
 static List *exportedSnapshots = NIL;
 
 /* Prototypes for local functions */
+static int64 AlignTimestampToMinuteBoundary(int64 ts);
 static Snapshot CopySnapshot(Snapshot snapshot);
 static void FreeSnapshot(Snapshot snapshot);
 static void SnapshotResetXmin(void);
@@ -174,6 +235,47 @@ typedef struct SerializedSnapshotData
 	CommandId	curcid;
 } SerializedSnapshotData;
 
+Size
+SnapMgrShmemSize(void)
+{
+	Size		size;
+
+	size = offsetof(OldSnapshotControlData, xid_by_minute);
+	if (old_snapshot_threshold > 0)
+		size = add_size(size, mul_size(sizeof(TransactionId),
+									   old_snapshot_threshold));
+
+	return size;
+}
+
+/*
+ * Initialize for managing old snapshot detection.
+ */
+void
+SnapMgrInit(void)
+{
+	bool		found;
+
+	/*
+	 * Create or attach to the OldSnapshotControl structure.
+	 */
+	oldSnapshotControl = (OldSnapshotControl)
+		ShmemInitStruct("OldSnapshotControlData",
+						SnapMgrShmemSize(), &found);
+
+	if (!found)
+	{
+		SpinLockInit(&oldSnapshotControl->mutex_current);
+		oldSnapshotControl->current_timestamp = 0;
+		SpinLockInit(&oldSnapshotControl->mutex_threshold);
+		oldSnapshotControl->threshold_timestamp = 0;
+		oldSnapshotControl->threshold_xid = InvalidTransactionId;
+		oldSnapshotControl->head_offset = 0;
+		oldSnapshotControl->head_timestamp = 0;
+		oldSnapshotControl->count_used = 0;
+	}
+}
+
 /*
  * GetTransactionSnapshot
  *		Get the appropriate snapshot for a new query in a transaction.
@@ -1405,6 +1507,259 @@ ThereAreNoPriorRegisteredSnapshots(void)
 	return false;
 }
 
+
+/*
+ * Return an int64 timestamp which is exactly on a minute boundary.
+ *
+ * If the argument is already aligned, return that value, otherwise move to
+ * the next minute boundary following the given time.
+ */
+static int64
+AlignTimestampToMinuteBoundary(int64 ts)
+{
+	int64		retval = ts + (USECS_PER_MINUTE - 1);
+
+	return retval - (retval % USECS_PER_MINUTE);
+}
+
+/*
+ * Get current timestamp for snapshots as int64 that never moves backward.
+ */
+int64
+GetSnapshotCurrentTimestamp(void)
+{
+	int64		now = GetCurrentIntegerTimestamp();
+
+	/*
+	 * Don't let time move backward; if it hasn't advanced, use the old value.
+	 */
+	SpinLockAcquire(&oldSnapshotControl->mutex_current);
+	if (now <= oldSnapshotControl->current_timestamp)
+		now = oldSnapshotControl->current_timestamp;
+	else
+		oldSnapshotControl->current_timestamp = now;
+	SpinLockRelease(&oldSnapshotControl->mutex_current);
+
+	return now;
+}
+
+/*
+ * Get timestamp through which vacuum may have processed based on last stored
+ * value for threshold_timestamp.
+ *
+ * XXX: If we can trust a read of an int64 value to be atomic, we can skip the
+ * spinlock here.
+ */
+int64
+GetOldSnapshotThresholdTimestamp(void)
+{
+	int64		threshold_timestamp;
+
+	SpinLockAcquire(&oldSnapshotControl->mutex_threshold);
+	threshold_timestamp = oldSnapshotControl->threshold_timestamp;
+	SpinLockRelease(&oldSnapshotControl->mutex_threshold);
+
+	return threshold_timestamp;
+}
+
+/*
+ * TransactionIdLimitedForOldSnapshots
+ *
+ * Apply old snapshot limit, if any.  This is intended to be called for page
+ * pruning and table vacuuming, to allow old_snapshot_threshold to override
+ * the normal global xmin value.  Actual testing for snapshot too old will be
+ * based on whether a snapshot timestamp is prior to the threshold timestamp
+ * set in this function.
+ */
+TransactionId
+TransactionIdLimitedForOldSnapshots(TransactionId recentXmin,
+									Relation relation)
+{
+	if (TransactionIdIsNormal(recentXmin)
+		&& old_snapshot_threshold >= 0
+		&& RelationNeedsWAL(relation)
+		&& !IsCatalogRelation(relation)
+		&& !RelationIsAccessibleInLogicalDecoding(relation))
+	{
+		int64		ts;
+		TransactionId xlimit = recentXmin;
+		bool		same_ts_as_threshold = false;
+
+		ts = AlignTimestampToMinuteBoundary(GetSnapshotCurrentTimestamp())
+			 - (old_snapshot_threshold * USECS_PER_MINUTE);
+
+		/* Check for fast exit without LW locking. */
+		SpinLockAcquire(&oldSnapshotControl->mutex_threshold);
+		if (ts == oldSnapshotControl->threshold_timestamp)
+		{
+			xlimit = oldSnapshotControl->threshold_xid;
+			same_ts_as_threshold = true;
+		}
+		SpinLockRelease(&oldSnapshotControl->mutex_threshold);
+
+		if (!same_ts_as_threshold)
+		{
+			LWLockAcquire(OldSnapshotTimeMapLock, LW_SHARED);
+
+			if (oldSnapshotControl->count_used > 0
+				&& ts >= oldSnapshotControl->head_timestamp)
+			{
+				int		offset;
+
+				offset = ((ts - oldSnapshotControl->head_timestamp)
+						  / USECS_PER_MINUTE);
+				if (offset > oldSnapshotControl->count_used - 1)
+					offset = oldSnapshotControl->count_used - 1;
+				offset = (oldSnapshotControl->head_offset + offset)
+						% old_snapshot_threshold;
+				xlimit = oldSnapshotControl->xid_by_minute[offset];
+
+				if (NormalTransactionIdFollows(xlimit, recentXmin))
+				{
+					SpinLockAcquire(&oldSnapshotControl->mutex_threshold);
+					oldSnapshotControl->threshold_timestamp = ts;
+					oldSnapshotControl->threshold_xid = xlimit;
+					SpinLockRelease(&oldSnapshotControl->mutex_threshold);
+				}
+			}
+
+			LWLockRelease(OldSnapshotTimeMapLock);
+		}
+
+		if (NormalTransactionIdFollows(xlimit, recentXmin))
+			return xlimit;
+	}
+
+	return recentXmin;
+}
+
+/*
+ * Take care of the circular buffer that maps time to xid.
+ */
+void
+MaintainOldSnapshotTimeMapping(int64 whenTaken, TransactionId xmin)
+{
+	int64		ts;
+
+	/*
+	 * Fast exit when old_snapshot_threshold is not used or when snapshots
+	 * should immediately be considered "old" (for testing purposes).
+	 */
+	if (old_snapshot_threshold <= 0)
+		return;
+
+	/*
+	 * We don't want to do something stupid with unusual values, but we don't
+	 * want to litter the log with warnings or break otherwise normal
+	 * processing for this feature; so if something seems unreasonable, just
+	 * log at DEBUG level and return without doing anything.
+	 */
+	if (whenTaken < 0)
+	{
+		elog(DEBUG1,
+			 "MaintainOldSnapshotTimeMapping called with negative whenTaken = %ld",
+			 (long) whenTaken);
+		return;
+	}
+	if (!TransactionIdIsNormal(xmin))
+	{
+		elog(DEBUG1,
+			 "MaintainOldSnapshotTimeMapping called with xmin = %lu",
+			 (unsigned long) xmin);
+		return;
+	}
+
+	ts = AlignTimestampToMinuteBoundary(whenTaken);
+
+	LWLockAcquire(OldSnapshotTimeMapLock, LW_EXCLUSIVE);
+
+	Assert(oldSnapshotControl->head_offset >= 0);
+	Assert(oldSnapshotControl->head_offset < old_snapshot_threshold);
+	Assert((oldSnapshotControl->head_timestamp % USECS_PER_MINUTE) == 0);
+	Assert(oldSnapshotControl->count_used >= 0);
+	Assert(oldSnapshotControl->count_used <= old_snapshot_threshold);
+
+	if (oldSnapshotControl->count_used == 0)
+	{
+		/* set up first entry for empty mapping */
+		oldSnapshotControl->head_offset = 0;
+		oldSnapshotControl->head_timestamp = ts;
+		oldSnapshotControl->count_used = 1;
+		oldSnapshotControl->xid_by_minute[0] = xmin;
+	}
+	else if (ts < oldSnapshotControl->head_timestamp)
+	{
+		/* old ts; log it at DEBUG */
+		LWLockRelease(OldSnapshotTimeMapLock);
+		elog(DEBUG1,
+			 "MaintainOldSnapshotTimeMapping called with old whenTaken = %ld",
+			 (long) whenTaken);
+		return;
+	}
+	else if (ts <= (oldSnapshotControl->head_timestamp +
+					((oldSnapshotControl->count_used - 1)
+					 * USECS_PER_MINUTE)))
+	{
+		/* existing mapping; advance xid if possible */
+		int		bucket = (oldSnapshotControl->head_offset
+						  + ((ts - oldSnapshotControl->head_timestamp)
+							 / USECS_PER_MINUTE))
+						 % old_snapshot_threshold;
+
+		if (TransactionIdPrecedes(oldSnapshotControl->xid_by_minute[bucket], xmin))
+			oldSnapshotControl->xid_by_minute[bucket] = xmin;
+	}
+	else
+	{
+		/* We need a new bucket, but it might not be the very next one. */
+		int		advance = ((ts - oldSnapshotControl->head_timestamp)
+						   / USECS_PER_MINUTE);
+
+		oldSnapshotControl->head_timestamp = ts;
+
+		if (advance >= old_snapshot_threshold)
+		{
+			/* Advance is so far that all old data is junk; start over. */
+			oldSnapshotControl->head_offset = 0;
+			oldSnapshotControl->count_used = 1;
+			oldSnapshotControl->xid_by_minute[0] = xmin;
+		}
+		else
+		{
+			/* Store the new value in one or more buckets. */
+			int i;
+
+			for (i = 0; i < advance; i++)
+			{
+				if (oldSnapshotControl->count_used == old_snapshot_threshold)
+				{
+					/* Map full and new value replaces old head. */
+					int		old_head = oldSnapshotControl->head_offset;
+
+					if (old_head == (old_snapshot_threshold - 1))
+						oldSnapshotControl->head_offset = 0;
+					else
+						oldSnapshotControl->head_offset = old_head + 1;
+					oldSnapshotControl->xid_by_minute[old_head] = xmin;
+				}
+				else
+				{
+					/* Extend map to unused entry. */
+					int		new_tail = (oldSnapshotControl->head_offset
+										+ oldSnapshotControl->count_used)
+									   % old_snapshot_threshold;
+
+					oldSnapshotControl->count_used++;
+					oldSnapshotControl->xid_by_minute[new_tail] = xmin;
+				}
+			}
+		}
+	}
+
+	LWLockRelease(OldSnapshotTimeMapLock);
+}
+
+
 /*
  * Setup a snapshot that replaces normal catalog snapshots that allows catalog
  * access to behave just like it did at a certain point in the past.
diff --git a/src/include/access/brin_revmap.h b/src/include/access/brin_revmap.h
index cca6ec5..300838c 100644
--- a/src/include/access/brin_revmap.h
+++ b/src/include/access/brin_revmap.h
@@ -18,12 +18,13 @@
 #include "storage/itemptr.h"
 #include "storage/off.h"
 #include "utils/relcache.h"
+#include "utils/snapshot.h"
 
 /* struct definition lives in brin_revmap.c */
 typedef struct BrinRevmap BrinRevmap;
 
 extern BrinRevmap *brinRevmapInitialize(Relation idxrel,
-					 BlockNumber *pagesPerRange);
+					 BlockNumber *pagesPerRange, Snapshot snapshot);
 extern void brinRevmapTerminate(BrinRevmap *revmap);
 
 extern void brinRevmapExtend(BrinRevmap *revmap,
@@ -34,6 +35,6 @@ extern void brinSetHeapBlockItemptr(Buffer rmbuf, BlockNumber pagesPerRange,
 						BlockNumber heapBlk, ItemPointerData tid);
 extern BrinTuple *brinGetTupleForHeapBlock(BrinRevmap *revmap,
 						 BlockNumber heapBlk, Buffer *buf, OffsetNumber *off,
-						 Size *size, int mode);
+						 Size *size, int mode, Snapshot snapshot);
 
 #endif   /* BRIN_REVMAP_H */
diff --git a/src/include/access/gin_private.h b/src/include/access/gin_private.h
index 5021887..173f8b4 100644
--- a/src/include/access/gin_private.h
+++ b/src/include/access/gin_private.h
@@ -699,7 +699,7 @@ typedef struct
  * PostingItem
  */
 
-extern GinBtreeStack *ginFindLeafPage(GinBtree btree, bool searchMode);
+extern GinBtreeStack *ginFindLeafPage(GinBtree btree, bool searchMode, Snapshot snapshot);
 extern Buffer ginStepRight(Buffer buffer, Relation index, int lockmode);
 extern void freeGinBtreeStack(GinBtreeStack *stack);
 extern void ginInsertValue(GinBtree btree, GinBtreeStack *stack,
@@ -727,7 +727,7 @@ extern void GinPageDeletePostingItem(Page page, OffsetNumber offset);
 extern void ginInsertItemPointers(Relation index, BlockNumber rootBlkno,
 					  ItemPointerData *items, uint32 nitem,
 					  GinStatsData *buildStats);
-extern GinBtreeStack *ginScanBeginPostingTree(GinBtree btree, Relation index, BlockNumber rootBlkno);
+extern GinBtreeStack *ginScanBeginPostingTree(GinBtree btree, Relation index, BlockNumber rootBlkno, Snapshot snapshot);
 extern void ginDataFillRoot(GinBtree btree, Page root, BlockNumber lblkno, Page lpage, BlockNumber rblkno, Page rpage);
 extern void ginPrepareDataScan(GinBtree btree, Relation index, BlockNumber rootBlkno);
 
diff --git a/src/include/access/nbtree.h b/src/include/access/nbtree.h
index 9e48efd..00bbaf1 100644
--- a/src/include/access/nbtree.h
+++ b/src/include/access/nbtree.h
@@ -701,17 +701,18 @@ extern int	_bt_pagedel(Relation rel, Buffer buf);
  */
 extern BTStack _bt_search(Relation rel,
 		   int keysz, ScanKey scankey, bool nextkey,
-		   Buffer *bufP, int access);
+		   Buffer *bufP, int access, Snapshot snapshot);
 extern Buffer _bt_moveright(Relation rel, Buffer buf, int keysz,
 			  ScanKey scankey, bool nextkey, bool forupdate, BTStack stack,
-			  int access);
+			  int access, Snapshot snapshot);
 extern OffsetNumber _bt_binsrch(Relation rel, Buffer buf, int keysz,
 			ScanKey scankey, bool nextkey);
 extern int32 _bt_compare(Relation rel, int keysz, ScanKey scankey,
 			Page page, OffsetNumber offnum);
 extern bool _bt_first(IndexScanDesc scan, ScanDirection dir);
 extern bool _bt_next(IndexScanDesc scan, ScanDirection dir);
-extern Buffer _bt_get_endpoint(Relation rel, uint32 level, bool rightmost);
+extern Buffer _bt_get_endpoint(Relation rel, uint32 level, bool rightmost,
+							   Snapshot snapshot);
 
 /*
  * prototypes for functions in nbtutils.c
diff --git a/src/include/storage/bufmgr.h b/src/include/storage/bufmgr.h
index 0f59201..fe80f3b 100644
--- a/src/include/storage/bufmgr.h
+++ b/src/include/storage/bufmgr.h
@@ -14,11 +14,14 @@
 #ifndef BUFMGR_H
 #define BUFMGR_H
 
+#include "catalog/catalog.h"
 #include "storage/block.h"
 #include "storage/buf.h"
 #include "storage/bufpage.h"
 #include "storage/relfilenode.h"
 #include "utils/relcache.h"
+#include "utils/snapmgr.h"
+#include "utils/tqual.h"
 
 typedef void *Block;
 
@@ -148,6 +151,37 @@ extern PGDLLIMPORT int32 *LocalRefCount;
 #define BufferGetPage(buffer) ((Page)BufferGetBlock(buffer))
 
 /*
+ * Check whether the given snapshot is too old to have safely read the given
+ * page from the given table.  If so, throw a "snapshot too old" error.
+ *
+ * This test generally needs to be performed after every BufferGetPage() call
+ * that is executed as part of a scan.  It is not needed for calls made for
+ * modifying the page (for example, to position to the right place to insert a
+ * new index tuple or for vacuuming).
+ *
+ * Note that a NULL snapshot argument is allowed and causes a fast return
+ * without error; this is to support call sites which can be called from
+ * either scans or index modification areas.
+ *
+ * This is a macro for speed; keep the tests that are fastest and/or most
+ * likely to exclude a page from old snapshot testing near the front.
+ */
+#define TestForOldSnapshot(snapshot, relation, page) \
+	do { \
+		if (old_snapshot_threshold >= 0 \
+		 && (snapshot) != NULL \
+		 && (snapshot)->satisfies == HeapTupleSatisfiesMVCC \
+		 && !XLogRecPtrIsInvalid((snapshot)->lsn) \
+		 && PageGetLSN(page) > (snapshot)->lsn \
+		 && !IsCatalogRelation(relation) \
+		 && !RelationIsAccessibleInLogicalDecoding(relation) \
+		 && (snapshot)->whenTaken < GetOldSnapshotThresholdTimestamp()) \
+			ereport(ERROR, \
+					(errcode(ERRCODE_SNAPSHOT_TOO_OLD), \
+					 errmsg("snapshot too old"))); \
+	} while (0)
+
+/*
  * prototypes for functions in bufmgr.c
  */
 extern bool ComputeIoConcurrency(int io_concurrency, double *target);
diff --git a/src/include/utils/rel.h b/src/include/utils/rel.h
index 8a55a09..77d43bd 100644
--- a/src/include/utils/rel.h
+++ b/src/include/utils/rel.h
@@ -15,6 +15,7 @@
 #define REL_H
 
 #include "access/tupdesc.h"
+#include "access/xlog.h"
 #include "catalog/pg_am.h"
 #include "catalog/pg_class.h"
 #include "catalog/pg_index.h"
diff --git a/src/include/utils/snapmgr.h b/src/include/utils/snapmgr.h
index f8524eb..09a33c1 100644
--- a/src/include/utils/snapmgr.h
+++ b/src/include/utils/snapmgr.h
@@ -14,10 +14,20 @@
 #define SNAPMGR_H
 
 #include "fmgr.h"
+#include "utils/relcache.h"
 #include "utils/resowner.h"
 #include "utils/snapshot.h"
 
 
+/* GUC variables */
+extern int	old_snapshot_threshold;
+
+
+extern Size SnapMgrShmemSize(void);
+extern void SnapMgrInit(void);
+extern int64 GetSnapshotCurrentTimestamp(void);
+extern int64 GetOldSnapshotThresholdTimestamp(void);
+
 extern bool FirstSnapshotSet;
 
 extern TransactionId TransactionXmin;
@@ -54,6 +64,9 @@ extern void ImportSnapshot(const char *idstr);
 extern bool XactHasExportedSnapshots(void);
 extern void DeleteAllExportedSnapshotFiles(void);
 extern bool ThereAreNoPriorRegisteredSnapshots(void);
+extern TransactionId TransactionIdLimitedForOldSnapshots(TransactionId recentXmin,
+														 Relation relation);
+extern void MaintainOldSnapshotTimeMapping(int64 whenTaken, TransactionId xmin);
 
 extern char *ExportSnapshot(Snapshot snapshot);
 
diff --git a/src/include/utils/snapshot.h b/src/include/utils/snapshot.h
index cbf1bbd..e4e091d 100644
--- a/src/include/utils/snapshot.h
+++ b/src/include/utils/snapshot.h
@@ -14,6 +14,7 @@
 #define SNAPSHOT_H
 
 #include "access/htup.h"
+#include "access/xlogdefs.h"
 #include "lib/pairingheap.h"
 #include "storage/buf.h"
 
@@ -105,6 +106,9 @@ typedef struct SnapshotData
 	uint32		active_count;	/* refcount on ActiveSnapshot stack */
 	uint32		regd_count;		/* refcount on RegisteredSnapshots */
 	pairingheap_node ph_node;	/* link in the RegisteredSnapshots heap */
+
+	int64		whenTaken;		/* timestamp when snapshot was taken */
+	XLogRecPtr	lsn;			/* position in the WAL stream when taken */
 } SnapshotData;
 
 /*
diff --git a/src/test/modules/Makefile b/src/test/modules/Makefile
index 6167ec1..299dc4f 100644
--- a/src/test/modules/Makefile
+++ b/src/test/modules/Makefile
@@ -8,6 +8,7 @@ SUBDIRS = \
 		  brin \
 		  commit_ts \
 		  dummy_seclabel \
+		  sto \
 		  test_ddl_deparse \
 		  test_extensions \
 		  test_parser \
diff --git a/src/test/modules/sto/.gitignore b/src/test/modules/sto/.gitignore
new file mode 100644
index 0000000..e07b677
--- /dev/null
+++ b/src/test/modules/sto/.gitignore
@@ -0,0 +1,2 @@
+# Generated by regression tests
+/tmp_check/
diff --git a/src/test/modules/sto/Makefile b/src/test/modules/sto/Makefile
new file mode 100644
index 0000000..55d3822
--- /dev/null
+++ b/src/test/modules/sto/Makefile
@@ -0,0 +1,20 @@
+#-------------------------------------------------------------------------
+#
+# Makefile for src/test/modules/sto
+#
+# Portions Copyright (c) 1996-2015, PostgreSQL Global Development Group
+# Portions Copyright (c) 1994, Regents of the University of California
+#
+# src/test/modules/sto/Makefile
+#
+#-------------------------------------------------------------------------
+
+subdir = src/test/modules/sto
+top_builddir = ../../../..
+include $(top_builddir)/src/Makefile.global
+
+clean distclean maintainer-clean:
+	rm -rf tmp_check
+
+check:
+	$(prove_check)
diff --git a/src/test/modules/sto/t/001_snapshot_too_old.pl b/src/test/modules/sto/t/001_snapshot_too_old.pl
new file mode 100644
index 0000000..6b1a431
--- /dev/null
+++ b/src/test/modules/sto/t/001_snapshot_too_old.pl
@@ -0,0 +1,124 @@
+use strict;
+use warnings;
+
+use Config;
+use TestLib;
+use DBI;
+use DBD::Pg ':async';
+
+use Test::More tests => 9;
+
+my $tempdir       = TestLib::tempdir;
+
+my $driver   = "Pg";
+my $database = "postgres";
+
+sub log_sleep
+{
+	my ($t) = @_;
+	print "sleeping $t seconds...";
+	sleep $t;
+	print "done!\n";
+}
+
+sub log_sleep_to_new_minute
+{
+	my ($sec) = localtime;
+	log_sleep (61 - $sec);
+}
+
+sub connect_ok
+{
+	my ($db) = @_;
+	my $dbh = DBI->connect("dbi:$driver:database=$db", '', '',
+						   {AutoCommit => 1, RaiseError => 0, PrintError => 1})
+		or print $DBI::errstr;
+	ok((defined $dbh), "get DBI connection to $database");
+	return $dbh;
+}
+
+sub setting_ok
+{
+	my ($dbh, $setting, $expected_value) = @_;
+	my $actual_value = $dbh->selectrow_array("SHOW $setting");
+	is($actual_value, $expected_value, "check $setting");
+}
+
+sub select_scalar_ok
+{
+	my ($dbh, $statement, $expected_value) = @_;
+	my $actual_value = $dbh->selectrow_array($statement);
+	is($actual_value, $expected_value, "SELECT scalar value");
+}
+
+sub fetch_first_ok
+{
+	my ($dbh) = @_;
+	select_scalar_ok($dbh, 'FETCH FIRST FROM cursor1', '1');
+}
+
+sub snapshot_too_old_ok
+{
+	my ($dbh) = @_;
+	my $actual_value = $dbh->selectrow_array('FETCH FIRST FROM cursor1');
+	my $errstr = $dbh->errstr;
+	is($errstr, 'ERROR:  snapshot too old', 'expect "snapshot too old" error');
+}
+
+# Initialize cluster
+start_test_server($tempdir, qq(
+old_snapshot_threshold = '1min'
+autovacuum = off
+enable_indexonlyscan = off
+max_connections = 10
+log_statement = 'all'
+));
+
+# Confirm that we can connect to the database.
+my $conn1 = connect_ok($database);
+my $conn2 = connect_ok($database);
+
+# Confirm that the settings "took".
+setting_ok($conn1, 'old_snapshot_threshold', '1min');
+
+# Do some setup.
+$conn1->do(qq(CREATE TABLE t (c int NOT NULL)));
+$conn1->do(qq(INSERT INTO t SELECT generate_series(1, 1000)));
+$conn1->do(qq(VACUUM ANALYZE t));
+$conn1->do(qq(CREATE TABLE u (c int NOT NULL)));
+
+# Open long-lived cursor
+$conn1->begin_work;
+$conn1->do(qq(DECLARE cursor1 CURSOR FOR SELECT c FROM t));
+
+# Bump the last-used transaction number.
+$conn2->do(qq(INSERT INTO u VALUES (1)));
+
+# Confirm immediate fetch works.
+fetch_first_ok($conn1);
+
+# Confirm delayed fetch works with no modifications.
+log_sleep_to_new_minute;
+fetch_first_ok($conn1);
+
+# Try again to confirm no pruning affect.
+fetch_first_ok($conn1);
+
+# Confirm that fetch immediately after update is OK.
+# This assumes that they happen within one minute of each other.
+# Still using same snapshot, so value shouldn't change.
+$conn2->do(qq(UPDATE t SET c = 1001 WHERE c = 1));
+fetch_first_ok($conn1);
+
+# Try again to confirm no pruning affect.
+$conn2->do(qq(VACUUM ANALYZE t));
+fetch_first_ok($conn1);
+
+# Passage of time should now make this get the "snapshot too old" error.
+log_sleep_to_new_minute;
+snapshot_too_old_ok($conn1);
+$conn1->rollback;
+
+# Clean up a bit, just to minimize noise in log file.
+$conn2->disconnect;
+$conn1->disconnect;
diff --git a/src/test/modules/sto/t/002_snapshot_too_old_select.pl b/src/test/modules/sto/t/002_snapshot_too_old_select.pl
new file mode 100644
index 0000000..d0985c3
--- /dev/null
+++ b/src/test/modules/sto/t/002_snapshot_too_old_select.pl
@@ -0,0 +1,126 @@
+use strict;
+use warnings;
+
+use Config;
+use TestLib;
+use DBI;
+use DBD::Pg ':async';
+
+use Test::More tests => 9;
+
+my $tempdir       = TestLib::tempdir;
+
+my $driver   = "Pg";
+my $database = "postgres";
+
+sub log_sleep
+{
+	my ($t) = @_;
+	print "sleeping $t seconds...";
+	sleep $t;
+	print "done!\n";
+}
+
+sub log_sleep_to_new_minute
+{
+	my ($sec) = localtime;
+	log_sleep (61 - $sec);
+}
+
+sub connect_ok
+{
+	my ($db) = @_;
+	my $dbh = DBI->connect("dbi:$driver:database=$db", '', '',
+						   {AutoCommit => 1, RaiseError => 0, PrintError => 1})
+		or print $DBI::errstr;
+	ok((defined $dbh), "get DBI connection to $database");
+	return $dbh;
+}
+
+sub setting_ok
+{
+	my ($dbh, $setting, $expected_value) = @_;
+	my $actual_value = $dbh->selectrow_array("SHOW $setting");
+	is($actual_value, $expected_value, "check $setting");
+}
+
+sub select_scalar_ok
+{
+	my ($dbh, $statement, $expected_value) = @_;
+	my $actual_value = $dbh->selectrow_array($statement);
+	is($actual_value, $expected_value, "SELECT scalar value");
+}
+
+sub select_ok
+{
+	my ($dbh) = @_;
+	select_scalar_ok($dbh, 'select c from t order by c limit 1', '1');
+}
+
+sub snapshot_too_old_ok
+{
+	my ($dbh) = @_;
+	my $actual_value = $dbh->selectrow_array('select c from t order by c limit 1');
+	my $errstr = $dbh->errstr;
+	is($errstr, 'ERROR:  snapshot too old', 'expect "snapshot too old" error');
+}
+
+# Initialize cluster
+start_test_server($tempdir, qq(
+old_snapshot_threshold = '1min'
+autovacuum = off
+enable_indexonlyscan = off
+max_connections = 10
+log_statement = 'all'
+));
+
+# Confirm that we can connect to the database.
+my $conn1 = connect_ok($database);
+my $conn2 = connect_ok($database);
+
+# Confirm that the settings "took".
+setting_ok($conn1, 'old_snapshot_threshold', '1min');
+
+# Do some setup.
+$conn1->do(qq(CREATE TABLE t (c int NOT NULL)));
+$conn1->do(qq(INSERT INTO t SELECT generate_series(1, 1000)));
+$conn1->do(qq(VACUUM ANALYZE t));
+$conn1->do(qq(CREATE TABLE u (c int NOT NULL)));
+
+# Open long-lived snapshot
+$conn1->begin_work;
+$conn1->do(qq(set transaction isolation level REPEATABLE READ));
+$conn1->do(qq(SELECT c FROM t));
+
+# Bump the last-used transaction number.
+$conn2->do(qq(INSERT INTO u VALUES (1)));
+
+# Confirm immediate fetch works.
+select_ok($conn1);
+
+# Confirm delayed fetch works with no modifications.
+log_sleep_to_new_minute;
+select_ok($conn1);
+
+# Try again to confirm no pruning affect.
+select_ok($conn1);
+
+# Confirm that fetch immediately after update is OK.
+# This assumes that they happen within one minute of each other.
+# Still using same snapshot, so value shouldn't change.
+$conn2->do(qq(UPDATE t SET c = 1001 WHERE c = 1));
+select_ok($conn1);
+
+# Try again to confirm no pruning affect.
+log_sleep_to_new_minute;
+$conn2->do(qq(VACUUM ANALYZE t));
+select_ok($conn1);
+
+# Passage of time should now make this get the "snapshot too old" error.
+log_sleep_to_new_minute;
+snapshot_too_old_ok($conn1);
+$conn1->rollback;
+
+# Clean up a bit, just to minimize noise in log file.
+$conn2->disconnect;
+$conn1->disconnect;
diff --git a/src/test/perl/TestLib.pm b/src/test/perl/TestLib.pm
index 02533eb..5fffc78 100644
--- a/src/test/perl/TestLib.pm
+++ b/src/test/perl/TestLib.pm
@@ -135,7 +135,7 @@ sub tempdir_short
 # --config-auth).
 sub standard_initdb
 {
-	my $pgdata = shift;
+	my ($pgdata, $confappend) = @_;
 	system_or_bail('initdb', '-D', "$pgdata", '-A' , 'trust', '-N');
 	system_or_bail($ENV{PG_REGRESS}, '--config-auth', $pgdata);
 
@@ -153,6 +153,11 @@ sub standard_initdb
 		print CONF "unix_socket_directories = '$tempdir_short'\n";
 		print CONF "listen_addresses = ''\n";
 	}
+	if (defined $confappend and length $confappend)
+	{
+		print CONF "\n# Added by test\n";
+		print CONF "$confappend\n";
+	}
 	close CONF;
 
 	$ENV{PGHOST}         = $windows_os ? "127.0.0.1" : $tempdir_short;
@@ -183,11 +188,11 @@ my ($test_server_datadir, $test_server_logfile);
 # Initialize a new cluster for testing in given directory, and start it.
 sub start_test_server
 {
-	my ($tempdir) = @_;
+	my ($tempdir, $confappend) = @_;
 	my $ret;
 
 	print("### Starting test server in $tempdir\n");
-	standard_initdb "$tempdir/pgdata";
+	standard_initdb "$tempdir/pgdata", $confappend;
 
 	$ret = system_log('pg_ctl', '-D', "$tempdir/pgdata", '-w', '-l',
 	  "$log_path/postmaster.log", '-o', "--log-statement=all",
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to