On Mon, Apr 06, 2020 at 09:18:47PM -0700, Noah Misch wrote:
> On Mon, Apr 06, 2020 at 02:46:09PM -0400, Tom Lane wrote:
> > Noah Misch <n...@leadboat.com> writes:
> > > On Wed, Mar 25, 2020 at 04:42:31PM -0400, Tom Lane wrote:
> > >> So I think what we're actually trying to accomplish here is to
> > >> ensure that instead of deleting up to half of the SLRU space
> > >> before the cutoff, we delete up to half-less-one-segment.
> > >> Maybe it should be half-less-two-segments, just to provide some
> > >> cushion against edge cases.  Reading the first comment in
> > >> SetTransactionIdLimit makes one not want to trust too much in
> > >> arguments based on the exact value of xidWrapLimit, while for
> > >> the other SLRUs it was already unclear whether the edge cases
> > >> were exactly right.
> > 
> > > That could be interesting insurance.  While it would be sad for us to 
> > > miss an
> > > edge case and print "must be vacuumed within 2 transactions" when wrap has
> > > already happened, reaching that message implies the DBA burned ~1M XIDs, 
> > > all
> > > in single-user mode.  More plausible is FreezeMultiXactId() overrunning 
> > > the
> > > limit by tens of segments.  Hence, if we do buy this insurance, let's 
> > > skip far
> > > more segments.  For example, instead of unlinking segments representing 
> > > up to
> > > 2^31 past XIDs, we could divide that into an upper half that we unlink 
> > > and a
> > > lower half.  The lower half will stay in place; eventually, XID 
> > > consumption
> > > will overwrite it.  Truncation behavior won't change until the region of 
> > > CLOG
> > > for pre-oldestXact XIDs exceeds 256 MiB.  Beyond that threshold,
> > > vac_truncate_clog() will unlink the upper 256 MiB and leave the rest.  
> > > CLOG
> > > maximum would rise from 512 MiB to 768 MiB.  Would that be worthwhile?

> > Temporarily wasting some disk
> > space is a lot more palatable than corrupting data, and these code
> > paths are necessarily not terribly well tested.  So +1 for more
> > insurance.
> 
> Okay, I'll give that a try.  I expect this will replace the PagePrecedes
> callback with a PageDiff callback such that PageDiff(a, b) < 0 iff
> PagePrecedes(a, b).  PageDiff callbacks shall distribute return values
> uniformly in [INT_MIN,INT_MAX].  SimpleLruTruncate() will unlink segments
> where INT_MIN/2 < PageDiff(candidate, cutoff) < 0.

While doing so, I found that slru-truncate-modulo-v2.patch did get edge cases
wrong, as you feared.  In particular, if the newest XID reached xidStopLimit
and was in the first page of a segment, TruncateCLOG() would delete its
segment.  Attached slru-truncate-modulo-v3.patch fixes that; as restitution, I
added unit tests covering that and other scenarios.  Reaching the bug via XIDs
was hard, requiring one to burn 1000k-CLOG_XACTS_PER_PAGE=967k XIDs in
single-user mode.  I expect the bug was easier to reach via pg_multixact.

The insurance patch stacks on top of the bug fix patch.  It does have a
negative effect on TruncateMultiXact(), which uses SlruScanDirCbFindEarliest
to skip truncation in corrupted clusters.  SlruScanDirCbFindEarliest() gives
nonsense answers if "future" segments exist.  That can happen today, but the
patch creates new ways to make it happen.  The symptom is wasting yet more
space in pg_multixact.  I am okay with this, since it arises only after one
fills pg_multixact 50% full.  There are alternatives.  We could weaken the
corruption defense in TruncateMultiXact() or look for another implementation
of equivalent defense.  We could unlink, say, 75% or 95% of the "past" instead
of 50% (this patch) or >99.99% (today's behavior).

Thanks,
nm
Author:     Noah Misch <n...@leadboat.com>
Commit:     Noah Misch <n...@leadboat.com>

    Prevent excess SimpleLruTruncate() deletion.
    
    Every core SLRU wraps around.  With the exception of pg_notify, the wrap
    point can fall in the middle of a page.  Account for this in the
    PagePrecedes callback specification and in SimpleLruTruncate()'s use of
    said callback.  Update each callback implementation to fit the new
    specification.  This changes SerialPagePrecedesLogically() from the
    style of asyncQueuePagePrecedes() to the style of CLOGPagePrecedes().
    (Whereas pg_clog and pg_serial share a key space, pg_serial is nothing
    like pg_notify.)
    
    This closes a rare opportunity for data loss, which manifested as
    "apparent wraparound" or "could not access status of transaction"
    errors.  This is more likely to affect pg_multixact, due to the thin
    space between multiStopLimit and multiWrapLimit.  If a user's physical
    replication primary logged ":  apparent wraparound" messages, the user
    should rebuild that standbys of that primary regardless of symptoms.  At
    less risk is a cluster having emitted "not accepting commands" errors or
    "must be vacuumed" warnings at some point.  One can test a cluster for
    this data loss by running VACUUM FREEZE in every database.  Back-patch
    to 9.5 (all supported versions).
    
    Reviewed by Tom Lane.
    
    Discussion: https://postgr.es/m/20190202083822.gc32...@gust.leadboat.com

diff --git a/src/backend/access/transam/clog.c 
b/src/backend/access/transam/clog.c
index f3da40a..d606042 100644
--- a/src/backend/access/transam/clog.c
+++ b/src/backend/access/transam/clog.c
@@ -693,6 +693,7 @@ CLOGShmemInit(void)
        XactCtl->PagePrecedes = CLOGPagePrecedes;
        SimpleLruInit(XactCtl, "Xact", CLOGShmemBuffers(), CLOG_LSNS_PER_PAGE,
                                  XactSLRULock, "pg_xact", 
LWTRANCHE_XACT_BUFFER);
+       SlruPagePrecedesUnitTests(XactCtl, CLOG_XACTS_PER_PAGE);
 }
 
 /*
@@ -933,13 +934,22 @@ TruncateCLOG(TransactionId oldestXact, Oid 
oldestxid_datoid)
 
 
 /*
- * Decide which of two CLOG page numbers is "older" for truncation purposes.
+ * Decide whether a CLOG page number is "older" for truncation purposes.
  *
  * We need to use comparison of TransactionIds here in order to do the right
- * thing with wraparound XID arithmetic.  However, if we are asked about
- * page number zero, we don't want to hand InvalidTransactionId to
- * TransactionIdPrecedes: it'll get weird about permanent xact IDs.  So,
- * offset both xids by FirstNormalTransactionId to avoid that.
+ * thing with wraparound XID arithmetic.  However, TransactionIdPrecedes()
+ * would get weird about permanent xact IDs.  So, offset both such that xid1,
+ * xid2, and xid + CLOG_XACTS_PER_PAGE - 1 are all normal XIDs; this offset is
+ * relevant to page 0 and to the page preceding page 0.
+ *
+ * The page containing oldestXact-2^31 is the important edge case.  The
+ * portion of that page equaling or following oldestXact-2^31 is expendable,
+ * but the portion preceding oldestXact-2^31 is not.  When oldestXact-2^31 is
+ * the first XID of a page and segment, the entire page and segment is
+ * expendable, and we could truncate the segment.  Recognizing that case would
+ * require making oldestXact, not just the page containing oldestXact,
+ * available to this callback.  The benefit would be rare and small, so we
+ * don't optimize that edge case.
  */
 static bool
 CLOGPagePrecedes(int page1, int page2)
@@ -948,11 +958,12 @@ CLOGPagePrecedes(int page1, int page2)
        TransactionId xid2;
 
        xid1 = ((TransactionId) page1) * CLOG_XACTS_PER_PAGE;
-       xid1 += FirstNormalTransactionId;
+       xid1 += FirstNormalTransactionId + 1;
        xid2 = ((TransactionId) page2) * CLOG_XACTS_PER_PAGE;
-       xid2 += FirstNormalTransactionId;
+       xid2 += FirstNormalTransactionId + 1;
 
-       return TransactionIdPrecedes(xid1, xid2);
+       return (TransactionIdPrecedes(xid1, xid2) &&
+                       TransactionIdPrecedes(xid1, xid2 + CLOG_XACTS_PER_PAGE 
- 1));
 }
 
 
diff --git a/src/backend/access/transam/commit_ts.c 
b/src/backend/access/transam/commit_ts.c
index 9cdb136..eaeb8c2 100644
--- a/src/backend/access/transam/commit_ts.c
+++ b/src/backend/access/transam/commit_ts.c
@@ -495,6 +495,7 @@ CommitTsShmemInit(void)
        SimpleLruInit(CommitTsCtl, "CommitTs", CommitTsShmemBuffers(), 0,
                                  CommitTsSLRULock, "pg_commit_ts",
                                  LWTRANCHE_COMMITTS_BUFFER);
+       SlruPagePrecedesUnitTests(CommitTsCtl, COMMIT_TS_XACTS_PER_PAGE);
 
        commitTsShared = ShmemInitStruct("CommitTs shared",
                                                                         
sizeof(CommitTimestampShared),
@@ -883,14 +884,27 @@ AdvanceOldestCommitTsXid(TransactionId oldestXact)
 
 
 /*
- * Decide which of two commitTS page numbers is "older" for truncation
- * purposes.
+ * Decide whether a commitTS page number is "older" for truncation purposes.
+ * Analogous to CLOGPagePrecedes().
  *
- * We need to use comparison of TransactionIds here in order to do the right
- * thing with wraparound XID arithmetic.  However, if we are asked about
- * page number zero, we don't want to hand InvalidTransactionId to
- * TransactionIdPrecedes: it'll get weird about permanent xact IDs.  So,
- * offset both xids by FirstNormalTransactionId to avoid that.
+ * At every supported BLCKSZ, (1 << 31) % COMMIT_TS_XACTS_PER_PAGE == 128.
+ * This introduces differences compared to CLOG and the other SLRUs having (1
+ * << 31) % per_page == 0.  This function never tests exactly
+ * TransactionIdPrecedes(x-2^31, x).  When the system reaches xidStopLimit,
+ * there are two possible counts of page boundaries between oldestXact and the
+ * latest XID assigned, depending on whether oldestXact is within the first
+ * 128 entries of its page.  Since this function doesn't know the location of
+ * oldestXact within page2, it returns false for one page that actually is
+ * expendable.  This is a wider (yet still negligible) version of the
+ * truncation opportunity that CLOGPagePrecedes() cannot recognize.
+ *
+ * For the sake of a worked example, number entries with decimal values such
+ * that page1==1 entries range from 1.0 to 1.999.  Let N+0.15 be the number of
+ * pages that 2^31 entries will span (N is an integer).  If oldestXact=N+2.1,
+ * then the final safe XID assignment leaves newestXact=1.95.  We keep page 2,
+ * because entry=2.85 is the border that toggles whether entries precede the
+ * last entry of the oldestXact page.  While page 2 is expendable at
+ * oldestXact=N+2.1, it would be precious at oldestXact=N+2.9.
  */
 static bool
 CommitTsPagePrecedes(int page1, int page2)
@@ -899,11 +913,12 @@ CommitTsPagePrecedes(int page1, int page2)
        TransactionId xid2;
 
        xid1 = ((TransactionId) page1) * COMMIT_TS_XACTS_PER_PAGE;
-       xid1 += FirstNormalTransactionId;
+       xid1 += FirstNormalTransactionId + 1;
        xid2 = ((TransactionId) page2) * COMMIT_TS_XACTS_PER_PAGE;
-       xid2 += FirstNormalTransactionId;
+       xid2 += FirstNormalTransactionId + 1;
 
-       return TransactionIdPrecedes(xid1, xid2);
+       return (TransactionIdPrecedes(xid1, xid2) &&
+                       TransactionIdPrecedes(xid1, xid2 + 
COMMIT_TS_XACTS_PER_PAGE - 1));
 }
 
 
diff --git a/src/backend/access/transam/multixact.c 
b/src/backend/access/transam/multixact.c
index ce84dac..ff96083 100644
--- a/src/backend/access/transam/multixact.c
+++ b/src/backend/access/transam/multixact.c
@@ -1832,10 +1832,12 @@ MultiXactShmemInit(void)
                                  "MultiXactOffset", 
NUM_MULTIXACTOFFSET_BUFFERS, 0,
                                  MultiXactOffsetSLRULock, 
"pg_multixact/offsets",
                                  LWTRANCHE_MULTIXACTOFFSET_BUFFER);
+       SlruPagePrecedesUnitTests(MultiXactOffsetCtl, 
MULTIXACT_OFFSETS_PER_PAGE);
        SimpleLruInit(MultiXactMemberCtl,
                                  "MultiXactMember", 
NUM_MULTIXACTMEMBER_BUFFERS, 0,
                                  MultiXactMemberSLRULock, 
"pg_multixact/members",
                                  LWTRANCHE_MULTIXACTMEMBER_BUFFER);
+       /* doesn't call SimpleLruTruncate() or meet criteria for unit tests */
 
        /* Initialize our shared state struct */
        MultiXactState = ShmemInitStruct("Shared MultiXact State",
@@ -2978,6 +2980,14 @@ TruncateMultiXact(MultiXactId newOldestMulti, Oid 
newOldestMultiDB)
         * truncate the members SLRU.  So we first scan the directory to 
determine
         * the earliest offsets page number that we can read without error.
         *
+        * When nextMXact is less than one segment away from multiWrapLimit,
+        * SlruScanDirCbFindEarliest can find some early segment other than the
+        * actual earliest.  (MultiXactOffsetPagePrecedes(EARLIEST, LATEST)
+        * returns false, because not all pairs of entries have the same 
answer.)
+        * That can also arise when an earlier truncation attempt failed 
unlink()
+        * or returned early from this function.  The only consequence is
+        * returning early, which wastes space that we could have liberated.
+        *
         * NB: It's also possible that the page that oldestMulti is on has 
already
         * been truncated away, and we crashed before updating oldestMulti.
         */
@@ -3092,15 +3102,11 @@ TruncateMultiXact(MultiXactId newOldestMulti, Oid 
newOldestMultiDB)
 }
 
 /*
- * Decide which of two MultiXactOffset page numbers is "older" for truncation
- * purposes.
- *
- * We need to use comparison of MultiXactId here in order to do the right
- * thing with wraparound.  However, if we are asked about page number zero, we
- * don't want to hand InvalidMultiXactId to MultiXactIdPrecedes: it'll get
- * weird.  So, offset both multis by FirstMultiXactId to avoid that.
- * (Actually, the current implementation doesn't do anything weird with
- * InvalidMultiXactId, but there's no harm in leaving this code like this.)
+ * Decide whether a MultiXactOffset page number is "older" for truncation
+ * purposes.  Analogous to CLOGPagePrecedes().
+ *
+ * Offsetting the values is optional, because MultiXactIdPrecedes() has
+ * translational symmetry.
  */
 static bool
 MultiXactOffsetPagePrecedes(int page1, int page2)
@@ -3109,15 +3115,17 @@ MultiXactOffsetPagePrecedes(int page1, int page2)
        MultiXactId multi2;
 
        multi1 = ((MultiXactId) page1) * MULTIXACT_OFFSETS_PER_PAGE;
-       multi1 += FirstMultiXactId;
+       multi1 += FirstMultiXactId + 1;
        multi2 = ((MultiXactId) page2) * MULTIXACT_OFFSETS_PER_PAGE;
-       multi2 += FirstMultiXactId;
+       multi2 += FirstMultiXactId + 1;
 
-       return MultiXactIdPrecedes(multi1, multi2);
+       return (MultiXactIdPrecedes(multi1, multi2) &&
+                       MultiXactIdPrecedes(multi1,
+                                                               multi2 + 
MULTIXACT_OFFSETS_PER_PAGE - 1));
 }
 
 /*
- * Decide which of two MultiXactMember page numbers is "older" for truncation
+ * Decide whether a MultiXactMember page number is "older" for truncation
  * purposes.  There is no "invalid offset number" so use the numbers verbatim.
  */
 static bool
@@ -3129,7 +3137,9 @@ MultiXactMemberPagePrecedes(int page1, int page2)
        offset1 = ((MultiXactOffset) page1) * MULTIXACT_MEMBERS_PER_PAGE;
        offset2 = ((MultiXactOffset) page2) * MULTIXACT_MEMBERS_PER_PAGE;
 
-       return MultiXactOffsetPrecedes(offset1, offset2);
+       return (MultiXactOffsetPrecedes(offset1, offset2) &&
+                       MultiXactOffsetPrecedes(offset1,
+                                                                       offset2 
+ MULTIXACT_MEMBERS_PER_PAGE - 1));
 }
 
 /*
diff --git a/src/backend/access/transam/slru.c 
b/src/backend/access/transam/slru.c
index 61249f4..33e1e93 100644
--- a/src/backend/access/transam/slru.c
+++ b/src/backend/access/transam/slru.c
@@ -1219,11 +1219,6 @@ SimpleLruTruncate(SlruCtl ctl, int cutoffPage)
        pgstat_count_slru_truncate(shared->slru_stats_idx);
 
        /*
-        * The cutoff point is the start of the segment containing cutoffPage.
-        */
-       cutoffPage -= cutoffPage % SLRU_PAGES_PER_SEGMENT;
-
-       /*
         * Scan shared memory and remove any pages preceding the cutoff page, to
         * ensure we won't rewrite them later.  (Since this is normally called 
in
         * or just after a checkpoint, any dirty pages should have been flushed
@@ -1235,9 +1230,7 @@ restart:;
 
        /*
         * While we are holding the lock, make an important safety check: the
-        * planned cutoff point must be <= the current endpoint page. Otherwise 
we
-        * have already wrapped around, and proceeding with the truncation would
-        * risk removing the current segment.
+        * current endpoint page must not be eligible for removal.
         */
        if (ctl->PagePrecedes(shared->latest_page_number, cutoffPage))
        {
@@ -1269,8 +1262,11 @@ restart:;
                 * Hmm, we have (or may have) I/O operations acting on the 
page, so
                 * we've got to wait for them to finish and then start again. 
This is
                 * the same logic as in SlruSelectLRUPage.  (XXX if page is 
dirty,
-                * wouldn't it be OK to just discard it without writing it?  
For now,
-                * keep the logic the same as it was.)
+                * wouldn't it be OK to just discard it without writing it?
+                * SlruMayDeleteSegment() uses a stricter qualification, so we 
might
+                * not delete this page in the end; even if we don't delete it, 
we
+                * won't have cause to read its data again.  For now, keep the 
logic
+                * the same as it was.)
                 */
                if (shared->page_status[slotno] == SLRU_PAGE_VALID)
                        SlruInternalWritePage(ctl, slotno, NULL);
@@ -1361,18 +1357,133 @@ restart:
 }
 
 /*
+ * Determine whether a segment is okay to delete.
+ *
+ * segpage is the first page of the segment, and cutoffPage is the oldest (in
+ * PagePrecedes order) page in the SLRU containing still-useful data.  Since
+ * every core PagePrecedes callback implements "wrap around", check the
+ * segment's first and last pages:
+ *
+ * first<cutoff  && last<cutoff:  yes
+ * first<cutoff  && last>=cutoff: no; cutoff falls inside this segment
+ * first>=cutoff && last<cutoff:  no; wrap point falls inside this segment
+ * first>=cutoff && last>=cutoff: no; every page of this segment is too young
+ */
+static bool
+SlruMayDeleteSegment(SlruCtl ctl, int segpage, int cutoffPage)
+{
+       int                     seg_last_page = segpage + 
SLRU_PAGES_PER_SEGMENT - 1;
+
+       Assert(segpage % SLRU_PAGES_PER_SEGMENT == 0);
+
+       return (ctl->PagePrecedes(segpage, cutoffPage) &&
+                       ctl->PagePrecedes(seg_last_page, cutoffPage));
+}
+
+#ifdef USE_ASSERT_CHECKING
+static void
+SlruPagePrecedesTestOffset(SlruCtl ctl, int per_page, uint32 offset)
+{
+       TransactionId lhs,
+                               rhs;
+       int                     newestPage,
+                               oldestPage;
+       TransactionId newestXact,
+                               oldestXact;
+
+       /*
+        * Compare an XID pair having undefined order (see RFC 1982), a pair at
+        * "opposite ends" of the XID space.  TransactionIdPrecedes() treats 
each
+        * as preceding the other.  If RHS is oldestXact, LHS is the first XID 
we
+        * must not assign.
+        */
+       lhs = per_page + offset;        /* skip first page to avoid non-normal 
XIDs */
+       rhs = lhs + (1U << 31);
+       Assert(TransactionIdPrecedes(lhs, rhs));
+       Assert(TransactionIdPrecedes(rhs, lhs));
+       Assert(!TransactionIdPrecedes(lhs - 1, rhs));
+       Assert(TransactionIdPrecedes(rhs, lhs - 1));
+       Assert(TransactionIdPrecedes(lhs + 1, rhs));
+       Assert(!TransactionIdPrecedes(rhs, lhs + 1));
+       Assert(!TransactionIdFollowsOrEquals(lhs, rhs));
+       Assert(!TransactionIdFollowsOrEquals(rhs, lhs));
+       Assert(!ctl->PagePrecedes(lhs / per_page, lhs / per_page));
+       Assert(!ctl->PagePrecedes(lhs / per_page, rhs / per_page));
+       Assert(!ctl->PagePrecedes(rhs / per_page, lhs / per_page));
+       Assert(!ctl->PagePrecedes((lhs - per_page) / per_page, rhs / per_page));
+       Assert(ctl->PagePrecedes(rhs / per_page, (lhs - 3 * per_page) / 
per_page));
+       Assert(ctl->PagePrecedes(rhs / per_page, (lhs - 2 * per_page) / 
per_page));
+       Assert(ctl->PagePrecedes(rhs / per_page, (lhs - 1 * per_page) / 
per_page)
+                  || (1U << 31) % per_page != 0);      /* See 
CommitTsPagePrecedes() */
+       Assert(ctl->PagePrecedes((lhs + 1 * per_page) / per_page, rhs / 
per_page)
+                  || (1U << 31) % per_page != 0);
+       Assert(ctl->PagePrecedes((lhs + 2 * per_page) / per_page, rhs / 
per_page));
+       Assert(ctl->PagePrecedes((lhs + 3 * per_page) / per_page, rhs / 
per_page));
+       Assert(!ctl->PagePrecedes(rhs / per_page, (lhs + per_page) / per_page));
+
+       /*
+        * GetNewTransactionId() has assigned the last XID it can safely use, 
and
+        * that XID is in the *LAST* page of the second segment.  We must not
+        * delete that segment.
+        */
+       newestPage = 2 * SLRU_PAGES_PER_SEGMENT - 1;
+       newestXact = newestPage * per_page + offset;
+       Assert(newestXact / per_page == newestPage);
+       oldestXact = newestXact + 1;
+       oldestXact -= 1U << 31;
+       oldestPage = oldestXact / per_page;
+       Assert(!SlruMayDeleteSegment(ctl,
+                                                                (newestPage -
+                                                                 newestPage % 
SLRU_PAGES_PER_SEGMENT),
+                                                                oldestPage));
+
+       /*
+        * GetNewTransactionId() has assigned the last XID it can safely use, 
and
+        * that XID is in the *FIRST* page of the second segment.  We must not
+        * delete that segment.
+        */
+       newestPage = SLRU_PAGES_PER_SEGMENT;
+       newestXact = newestPage * per_page + offset;
+       Assert(newestXact / per_page == newestPage);
+       oldestXact = newestXact + 1;
+       oldestXact -= 1U << 31;
+       oldestPage = oldestXact / per_page;
+       Assert(!SlruMayDeleteSegment(ctl,
+                                                                (newestPage -
+                                                                 newestPage % 
SLRU_PAGES_PER_SEGMENT),
+                                                                oldestPage));
+}
+
+/*
+ * Unit-test a PagePrecedes function.
+ *
+ * This assumes every uint32 >= FirstNormalTransactionId is a valid key.  It
+ * assumes each value occupies a contiguous, fixed-size region of SLRU bytes.
+ * (MultiXactMemberCtl separates flags from XIDs.  AsyncCtl has
+ * variable-length entries, no keys, and no random access.  These unit tests
+ * do not apply to them.)
+ */
+void
+SlruPagePrecedesUnitTests(SlruCtl ctl, int per_page)
+{
+       /* Test first, middle and last entries of a page. */
+       SlruPagePrecedesTestOffset(ctl, per_page, 0);
+       SlruPagePrecedesTestOffset(ctl, per_page, per_page / 2);
+       SlruPagePrecedesTestOffset(ctl, per_page, per_page - 1);
+}
+#endif
+
+/*
  * SlruScanDirectory callback
- *             This callback reports true if there's any segment prior to the 
one
- *             containing the page passed as "data".
+ *             This callback reports true if there's any segment wholly prior 
to the
+ *             one containing the page passed as "data".
  */
 bool
 SlruScanDirCbReportPresence(SlruCtl ctl, char *filename, int segpage, void 
*data)
 {
        int                     cutoffPage = *(int *) data;
 
-       cutoffPage -= cutoffPage % SLRU_PAGES_PER_SEGMENT;
-
-       if (ctl->PagePrecedes(segpage, cutoffPage))
+       if (SlruMayDeleteSegment(ctl, segpage, cutoffPage))
                return true;                    /* found one; don't iterate any 
more */
 
        return false;                           /* keep going */
@@ -1387,7 +1498,7 @@ SlruScanDirCbDeleteCutoff(SlruCtl ctl, char *filename, 
int segpage, void *data)
 {
        int                     cutoffPage = *(int *) data;
 
-       if (ctl->PagePrecedes(segpage, cutoffPage))
+       if (SlruMayDeleteSegment(ctl, segpage, cutoffPage))
                SlruInternalDeleteSegment(ctl, filename);
 
        return false;                           /* keep going */
diff --git a/src/backend/access/transam/subtrans.c 
b/src/backend/access/transam/subtrans.c
index f33ae40..09cfa38 100644
--- a/src/backend/access/transam/subtrans.c
+++ b/src/backend/access/transam/subtrans.c
@@ -196,6 +196,7 @@ SUBTRANSShmemInit(void)
                                  LWTRANCHE_SUBTRANS_BUFFER);
        /* Override default assumption that writes should be fsync'd */
        SubTransCtl->do_fsync = false;
+       SlruPagePrecedesUnitTests(SubTransCtl, SUBTRANS_XACTS_PER_PAGE);
 }
 
 /*
@@ -373,13 +374,8 @@ TruncateSUBTRANS(TransactionId oldestXact)
 
 
 /*
- * Decide which of two SUBTRANS page numbers is "older" for truncation 
purposes.
- *
- * We need to use comparison of TransactionIds here in order to do the right
- * thing with wraparound XID arithmetic.  However, if we are asked about
- * page number zero, we don't want to hand InvalidTransactionId to
- * TransactionIdPrecedes: it'll get weird about permanent xact IDs.  So,
- * offset both xids by FirstNormalTransactionId to avoid that.
+ * Decide whether a SUBTRANS page number is "older" for truncation purposes.
+ * Analogous to CLOGPagePrecedes().
  */
 static bool
 SubTransPagePrecedes(int page1, int page2)
@@ -388,9 +384,10 @@ SubTransPagePrecedes(int page1, int page2)
        TransactionId xid2;
 
        xid1 = ((TransactionId) page1) * SUBTRANS_XACTS_PER_PAGE;
-       xid1 += FirstNormalTransactionId;
+       xid1 += FirstNormalTransactionId + 1;
        xid2 = ((TransactionId) page2) * SUBTRANS_XACTS_PER_PAGE;
-       xid2 += FirstNormalTransactionId;
+       xid2 += FirstNormalTransactionId + 1;
 
-       return TransactionIdPrecedes(xid1, xid2);
+       return (TransactionIdPrecedes(xid1, xid2) &&
+                       TransactionIdPrecedes(xid1, xid2 + 
SUBTRANS_XACTS_PER_PAGE - 1));
 }
diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c
index a3ba88d..7e5cd66 100644
--- a/src/backend/commands/async.c
+++ b/src/backend/commands/async.c
@@ -487,7 +487,12 @@ asyncQueuePageDiff(int p, int q)
        return diff;
 }
 
-/* Is p < q, accounting for wraparound? */
+/*
+ * Is p < q, accounting for wraparound?
+ *
+ * Since asyncQueueIsFull() blocks creation of a page that could precede any
+ * extant page, we need not assess entries within a page.
+ */
 static bool
 asyncQueuePagePrecedes(int p, int q)
 {
@@ -1349,8 +1354,8 @@ asyncQueueIsFull(void)
         * logically precedes the current global tail pointer, ie, the head
         * pointer would wrap around compared to the tail.  We cannot create 
such
         * a head page for fear of confusing slru.c.  For safety we round the 
tail
-        * pointer back to a segment boundary (compare the truncation logic in
-        * asyncQueueAdvanceTail).
+        * pointer back to a segment boundary (truncation logic in
+        * asyncQueueAdvanceTail does not do this, so doing it here is 
optional).
         *
         * Note that this test is *not* dependent on how much space there is on
         * the current head page.  This is necessary because 
asyncQueueAddEntries
diff --git a/src/backend/storage/lmgr/predicate.c 
b/src/backend/storage/lmgr/predicate.c
index ba93fb1..fde1b5c 100644
--- a/src/backend/storage/lmgr/predicate.c
+++ b/src/backend/storage/lmgr/predicate.c
@@ -438,7 +438,7 @@ static void SetPossibleUnsafeConflict(SERIALIZABLEXACT 
*roXact, SERIALIZABLEXACT
 static void ReleaseRWConflict(RWConflict conflict);
 static void FlagSxactUnsafe(SERIALIZABLEXACT *sxact);
 
-static bool SerialPagePrecedesLogically(int p, int q);
+static bool SerialPagePrecedesLogically(int page1, int page2);
 static void SerialInit(void);
 static void SerialAdd(TransactionId xid, SerCommitSeqNo 
minConflictCommitSeqNo);
 static SerCommitSeqNo SerialGetMinConflictCommitSeqNo(TransactionId xid);
@@ -784,27 +784,77 @@ FlagSxactUnsafe(SERIALIZABLEXACT *sxact)
 /*------------------------------------------------------------------------*/
 
 /*
- * We will work on the page range of 0..SERIAL_MAX_PAGE.
- * Compares using wraparound logic, as is required by slru.c.
+ * Decide whether a Serial page number is "older" for truncation purposes.
+ * Analogous to CLOGPagePrecedes().
  */
 static bool
-SerialPagePrecedesLogically(int p, int q)
+SerialPagePrecedesLogically(int page1, int page2)
 {
-       int                     diff;
+       TransactionId xid1;
+       TransactionId xid2;
+
+       xid1 = ((TransactionId) page1) * SERIAL_ENTRIESPERPAGE;
+       xid1 += FirstNormalTransactionId + 1;
+       xid2 = ((TransactionId) page2) * SERIAL_ENTRIESPERPAGE;
+       xid2 += FirstNormalTransactionId + 1;
+
+       return (TransactionIdPrecedes(xid1, xid2) &&
+                       TransactionIdPrecedes(xid1, xid2 + 
SERIAL_ENTRIESPERPAGE - 1));
+}
+
+static void
+SerialPagePrecedesLogicallyUnitTests(void)
+{
+       int                     per_page = SERIAL_ENTRIESPERPAGE,
+                               offset = per_page / 2;
+       int                     newestPage,
+                               oldestPage,
+                               headPage,
+                               targetPage;
+       TransactionId newestXact,
+                               oldestXact;
+
+       /* GetNewTransactionId() has assigned the last XID it can safely use. */
+       newestPage = 2 * SLRU_PAGES_PER_SEGMENT - 1;    /* nothing special */
+       newestXact = newestPage * per_page + offset;
+       Assert(newestXact / per_page == newestPage);
+       oldestXact = newestXact + 1;
+       oldestXact -= 1U << 31;
+       oldestPage = oldestXact / per_page;
 
        /*
-        * We have to compare modulo (SERIAL_MAX_PAGE+1)/2.  Both inputs should 
be
-        * in the range 0..SERIAL_MAX_PAGE.
+        * In this scenario, the SLRU headPage pertains to the last ~1000 XIDs
+        * assigned.  oldestXact finishes, ~2B XIDs having elapsed since it
+        * started.  Further transactions cause us to summarize oldestXact to
+        * tailPage.  Function must return false so SerialAdd() doesn't zero
+        * tailPage (which may contain entries for other old, recently-finished
+        * XIDs) and half the SLRU.  Reaching this requires burning ~2B XIDs in
+        * single-user mode, a negligible possibility.
         */
-       Assert(p >= 0 && p <= SERIAL_MAX_PAGE);
-       Assert(q >= 0 && q <= SERIAL_MAX_PAGE);
-
-       diff = p - q;
-       if (diff >= ((SERIAL_MAX_PAGE + 1) / 2))
-               diff -= SERIAL_MAX_PAGE + 1;
-       else if (diff < -((int) (SERIAL_MAX_PAGE + 1) / 2))
-               diff += SERIAL_MAX_PAGE + 1;
-       return diff < 0;
+       headPage = newestPage;
+       targetPage = oldestPage;
+       Assert(!SerialPagePrecedesLogically(headPage, targetPage));
+
+       /*
+        * In this scenario, the SLRU headPage pertains to oldestXact.  We're
+        * summarizing an XID near newestXact.  (Assume few other XIDs used
+        * SERIALIZABLE, hence the minimal headPage advancement.  Assume
+        * oldestXact was long-running and only recently reached the SLRU.)
+        * Function must return true to make SerialAdd() create targetPage.
+        *
+        * Today's implementation mishandles this case, but it doesn't matter
+        * enough to fix.  Verify that the defect affects just one page by
+        * asserting correct treatment of its prior page.  Reaching this case
+        * requires burning ~2B XIDs in single-user mode, a negligible
+        * possibility.  Moreover, if it does happen, the consequence would be
+        * mild, namely a new transaction failing in SimpleLruReadPage().
+        */
+       headPage = oldestPage;
+       targetPage = newestPage;
+       Assert(SerialPagePrecedesLogically(headPage, targetPage - 1));
+#if 0
+       Assert(SerialPagePrecedesLogically(headPage, targetPage));
+#endif
 }
 
 /*
@@ -824,6 +874,8 @@ SerialInit(void)
                                  LWTRANCHE_SERIAL_BUFFER);
        /* Override default assumption that writes should be fsync'd */
        SerialSlruCtl->do_fsync = false;
+       SerialPagePrecedesLogicallyUnitTests();
+       SlruPagePrecedesUnitTests(SerialSlruCtl, SERIAL_ENTRIESPERPAGE);
 
        /*
         * Create or attach to the SerialControl structure.
@@ -1032,7 +1084,7 @@ CheckPointPredicate(void)
        }
        else
        {
-               /*
+               /*----------
                 * The SLRU is no longer needed. Truncate to head before we set 
head
                 * invalid.
                 *
@@ -1041,6 +1093,25 @@ CheckPointPredicate(void)
                 * that we leave behind will appear to be new again. In that 
case it
                 * won't be removed until XID horizon advances enough to make it
                 * current again.
+                *
+                * XXX: This should happen in vac_truncate_clog(), not in 
checkpoints.
+                * Consider this scenario, starting from a system with no 
in-progress
+                * transactions and VACUUM FREEZE having maximized oldestXact:
+                * - Start a SERIALIZABLE transaction.
+                * - Start, finish, and summarize a SERIALIZABLE transaction, 
creating
+                *   one SLRU page.
+                * - Consume XIDs to reach xidStopLimit.
+                * - Finish all transactions.  Due to the long-running 
SERIALIZABLE
+                *   transaction, earlier checkpoints did not touch headPage.  
The
+                *   next checkpoint will change it, but that checkpoint 
happens after
+                *   the end of the scenario.
+                * - VACUUM to advance XID limits.
+                * - Consume ~2M XIDs, crossing the former xidWrapLimit.
+                * - Start, finish, and summarize a SERIALIZABLE transaction.
+                *   SerialAdd() declines to create the targetPage, because 
headPage
+                *   is not regarded as in the past relative to that 
targetPage.  The
+                *   transaction instigating the summarize fails in
+                *   SimpleLruReadPage().
                 */
                tailPage = serialControl->headPage;
                serialControl->headPage = -1;
diff --git a/src/include/access/slru.h b/src/include/access/slru.h
index 61fbc80..19982f6 100644
--- a/src/include/access/slru.h
+++ b/src/include/access/slru.h
@@ -117,9 +117,14 @@ typedef struct SlruCtlData
        bool            do_fsync;
 
        /*
-        * Decide which of two page numbers is "older" for truncation purposes. 
We
-        * need to use comparison of TransactionIds here in order to do the 
right
-        * thing with wraparound XID arithmetic.
+        * Decide whether a page is "older" for truncation and as a hint for
+        * evicting pages in LRU order.  Return true if every entry of the first
+        * argument is older than every entry of the second argument.  Note that
+        * !PagePrecedes(a,b) && !PagePrecedes(b,a) need not imply a==b; it also
+        * arises when some entries are older and some are not.  For SLRUs using
+        * SimpleLruTruncate(), this must use modular arithmetic.  (For others,
+        * the behavior of this callback has no functional implications.)  Use
+        * SlruPagePrecedesUnitTests() in SLRUs meeting its criteria.
         */
        bool            (*PagePrecedes) (int, int);
 
@@ -143,6 +148,11 @@ extern int SimpleLruReadPage_ReadOnly(SlruCtl ctl, int 
pageno,
                                                                           
TransactionId xid);
 extern void SimpleLruWritePage(SlruCtl ctl, int slotno);
 extern void SimpleLruFlush(SlruCtl ctl, bool allow_redirtied);
+#ifdef USE_ASSERT_CHECKING
+extern void SlruPagePrecedesUnitTests(SlruCtl ctl, int per_page);
+#else
+#define SlruPagePrecedesUnitTests(ctl, per_page) do {} while (0)
+#endif
 extern void SimpleLruTruncate(SlruCtl ctl, int cutoffPage);
 extern bool SimpleLruDoesPhysicalPageExist(SlruCtl ctl, int pageno);
 
Author:     Noah Misch <n...@leadboat.com>
Commit:     Noah Misch <n...@leadboat.com>

    Unlink less in SimpleLruTruncate(), as insurance against bugs.
    
    SimpleLruTruncate() has been unlinking every expendable file.  In edge
    cases, it also deleted important files.  The most recent commit fixed
    that.  Given the history of this class of bugs evading detection, let's
    not trust that patch exclusively.  Instead of unlinking segments
    representing up to 2^31 past XIDs, delete no more than half that much.
    The balance will stay in place; eventually, XID consumption will
    overwrite it.  This could mitigate unknown SimpleLruTruncate() bugs and
    simplify manual remediation after one has overtaken wrap limits in
    single-user mode.
    
    Truncation behavior won't change at all until an SLRU is half full.
    Once it does change, a drawback is conflict with the following defense.
    TruncateMultiXact() skips truncation when unexpected files exist on
    disk, which this change deliberately makes more common.  Hence,
    pg_multixact becomes more likely to persist in consuming its maximum
    storage.  Also, this change may uncover bugs in SLRU page recycling by
    making that more common.  For SLRUs outside of pg_multixact, maximum
    storage rises by 50%; for example, the CLOG maximum rises from 512 MiB
    to 768 MiB.  Usage in pg_multixact may double.  Back-patch to 9.5 (all
    supported versions).
    
    Reviewed by FIXME.
    
    Discussion: https://postgr.es/m/20200330052809.gb2324...@rfd.leadboat.com

diff --git a/doc/src/sgml/maintenance.sgml b/doc/src/sgml/maintenance.sgml
index 54bc1ab..85e009b 100644
--- a/doc/src/sgml/maintenance.sgml
+++ b/doc/src/sgml/maintenance.sgml
@@ -405,6 +405,11 @@
     in every database at least once every two billion transactions.
    </para>
 
+   <!-- This oversimplifies; there are (2^31)-1 XIDs in the past, the same
+   number in the future, and one incomparable.  (For each pair of incomparable
+   XIDs, TransactionIdPrecedes(a, b) and TransactionIdPrecedes(b, a) both
+   return true.)  None of that is important to the DBA, since xidStopLimit
+   intervenes long before. -->
    <para>
     The reason that periodic vacuuming solves the problem is that
     <command>VACUUM</command> will mark rows as <emphasis>frozen</emphasis>, 
indicating that
diff --git a/src/backend/access/transam/clog.c 
b/src/backend/access/transam/clog.c
index d606042..4d0db8f 100644
--- a/src/backend/access/transam/clog.c
+++ b/src/backend/access/transam/clog.c
@@ -52,7 +52,7 @@
  * and CLOG segment numbering at
  * 0xFFFFFFFF/CLOG_XACTS_PER_PAGE/SLRU_PAGES_PER_SEGMENT.  We need take no
  * explicit notice of that fact in this module, except when comparing segment
- * and page numbers in TruncateCLOG (see CLOGPagePrecedes).
+ * and page numbers in TruncateCLOG (see CLOGPageDiff).
  */
 
 /* We need two bits per xact, so four xacts fit in a byte */
@@ -89,7 +89,7 @@ static SlruCtlData XactCtlData;
 
 
 static int     ZeroCLOGPage(int pageno, bool writeXlog);
-static bool CLOGPagePrecedes(int page1, int page2);
+static int32 CLOGPageDiff(int page1, int page2);
 static void WriteZeroPageXlogRec(int pageno);
 static void WriteTruncateXlogRec(int pageno, TransactionId oldestXact,
                                                                 Oid 
oldestXactDb);
@@ -690,10 +690,10 @@ CLOGShmemSize(void)
 void
 CLOGShmemInit(void)
 {
-       XactCtl->PagePrecedes = CLOGPagePrecedes;
+       XactCtl->PageDiff = CLOGPageDiff;
        SimpleLruInit(XactCtl, "Xact", CLOGShmemBuffers(), CLOG_LSNS_PER_PAGE,
                                  XactSLRULock, "pg_xact", 
LWTRANCHE_XACT_BUFFER);
-       SlruPagePrecedesUnitTests(XactCtl, CLOG_XACTS_PER_PAGE);
+       SlruPageDiffUnitTests(XactCtl, CLOG_XACTS_PER_PAGE);
 }
 
 /*
@@ -908,7 +908,7 @@ TruncateCLOG(TransactionId oldestXact, Oid oldestxid_datoid)
        cutoffPage = TransactionIdToPage(oldestXact);
 
        /* Check to see if there's any files that could be removed */
-       if (!SlruScanDirectory(XactCtl, SlruScanDirCbReportPresence, 
&cutoffPage))
+       if (!SlruScanDirectory(XactCtl, SlruScanDirCbWouldTruncate, 
&cutoffPage))
                return;                                 /* nothing to remove */
 
        /*
@@ -934,13 +934,14 @@ TruncateCLOG(TransactionId oldestXact, Oid 
oldestxid_datoid)
 
 
 /*
- * Decide whether a CLOG page number is "older" for truncation purposes.
+ * Diff CLOG page numbers for truncation purposes.
  *
- * We need to use comparison of TransactionIds here in order to do the right
- * thing with wraparound XID arithmetic.  However, TransactionIdPrecedes()
- * would get weird about permanent xact IDs.  So, offset both such that xid1,
- * xid2, and xid + CLOG_XACTS_PER_PAGE - 1 are all normal XIDs; this offset is
- * relevant to page 0 and to the page preceding page 0.
+ * To do the right thing with wraparound XID arithmetic, this mirrors
+ * TransactionIdPrecedes().  The Max() operation ensures we return a positive
+ * value when the wrap point may fall inside these pages.  (When it does, some
+ * pairs of entries have a positive diff, and other pairs have a negative
+ * diff.)  Only the predicate.c SLRU needs the Max() operation; to avoid
+ * having even more corner cases to understand, all XID-indexed SLRUs do it.
  *
  * The page containing oldestXact-2^31 is the important edge case.  The
  * portion of that page equaling or following oldestXact-2^31 is expendable,
@@ -948,22 +949,22 @@ TruncateCLOG(TransactionId oldestXact, Oid 
oldestxid_datoid)
  * the first XID of a page and segment, the entire page and segment is
  * expendable, and we could truncate the segment.  Recognizing that case would
  * require making oldestXact, not just the page containing oldestXact,
- * available to this callback.  The benefit would be rare and small, so we
- * don't optimize that edge case.
+ * available to this callback.  slru.c wouldn't delete the page, anyway.
  */
-static bool
-CLOGPagePrecedes(int page1, int page2)
+static int32
+CLOGPageDiff(int page1, int page2)
 {
        TransactionId xid1;
        TransactionId xid2;
+       int32           diff_head;
+       int32           diff_tail;
 
        xid1 = ((TransactionId) page1) * CLOG_XACTS_PER_PAGE;
-       xid1 += FirstNormalTransactionId + 1;
        xid2 = ((TransactionId) page2) * CLOG_XACTS_PER_PAGE;
-       xid2 += FirstNormalTransactionId + 1;
 
-       return (TransactionIdPrecedes(xid1, xid2) &&
-                       TransactionIdPrecedes(xid1, xid2 + CLOG_XACTS_PER_PAGE 
- 1));
+       diff_head = xid1 - xid2;
+       diff_tail = xid1 - (xid2 + CLOG_XACTS_PER_PAGE - 1);
+       return Max(diff_head, diff_tail);
 }
 
 
diff --git a/src/backend/access/transam/commit_ts.c 
b/src/backend/access/transam/commit_ts.c
index eaeb8c2..41fa9c9 100644
--- a/src/backend/access/transam/commit_ts.c
+++ b/src/backend/access/transam/commit_ts.c
@@ -46,7 +46,7 @@
  * 0xFFFFFFFF/COMMIT_TS_XACTS_PER_PAGE, and CommitTs segment numbering at
  * 0xFFFFFFFF/COMMIT_TS_XACTS_PER_PAGE/SLRU_PAGES_PER_SEGMENT.  We need take no
  * explicit notice of that fact in this module, except when comparing segment
- * and page numbers in TruncateCommitTs (see CommitTsPagePrecedes).
+ * and page numbers in TruncateCommitTs (see CommitTsPageDiff).
  */
 
 /*
@@ -109,7 +109,7 @@ static void TransactionIdSetCommitTs(TransactionId xid, 
TimestampTz ts,
                                                                         
RepOriginId nodeid, int slotno);
 static void error_commit_ts_disabled(void);
 static int     ZeroCommitTsPage(int pageno, bool writeXlog);
-static bool CommitTsPagePrecedes(int page1, int page2);
+static int32 CommitTsPageDiff(int page1, int page2);
 static void ActivateCommitTs(void);
 static void DeactivateCommitTs(void);
 static void WriteZeroPageXlogRec(int pageno);
@@ -491,11 +491,11 @@ CommitTsShmemInit(void)
 {
        bool            found;
 
-       CommitTsCtl->PagePrecedes = CommitTsPagePrecedes;
+       CommitTsCtl->PageDiff = CommitTsPageDiff;
        SimpleLruInit(CommitTsCtl, "CommitTs", CommitTsShmemBuffers(), 0,
                                  CommitTsSLRULock, "pg_commit_ts",
                                  LWTRANCHE_COMMITTS_BUFFER);
-       SlruPagePrecedesUnitTests(CommitTsCtl, COMMIT_TS_XACTS_PER_PAGE);
+       SlruPageDiffUnitTests(CommitTsCtl, COMMIT_TS_XACTS_PER_PAGE);
 
        commitTsShared = ShmemInitStruct("CommitTs shared",
                                                                         
sizeof(CommitTimestampShared),
@@ -831,7 +831,7 @@ TruncateCommitTs(TransactionId oldestXact)
        cutoffPage = TransactionIdToCTsPage(oldestXact);
 
        /* Check to see if there's any files that could be removed */
-       if (!SlruScanDirectory(CommitTsCtl, SlruScanDirCbReportPresence,
+       if (!SlruScanDirectory(CommitTsCtl, SlruScanDirCbWouldTruncate,
                                                   &cutoffPage))
                return;                                 /* nothing to remove */
 
@@ -884,8 +884,8 @@ AdvanceOldestCommitTsXid(TransactionId oldestXact)
 
 
 /*
- * Decide whether a commitTS page number is "older" for truncation purposes.
- * Analogous to CLOGPagePrecedes().
+ * Diff commitTS page numbers for truncation purposes.  Analogous to
+ * CLOGPageDiff().
  *
  * At every supported BLCKSZ, (1 << 31) % COMMIT_TS_XACTS_PER_PAGE == 128.
  * This introduces differences compared to CLOG and the other SLRUs having (1
@@ -896,7 +896,7 @@ AdvanceOldestCommitTsXid(TransactionId oldestXact)
  * 128 entries of its page.  Since this function doesn't know the location of
  * oldestXact within page2, it returns false for one page that actually is
  * expendable.  This is a wider (yet still negligible) version of the
- * truncation opportunity that CLOGPagePrecedes() cannot recognize.
+ * truncation opportunity that CLOGPageDiff() cannot recognize.
  *
  * For the sake of a worked example, number entries with decimal values such
  * that page1==1 entries range from 1.0 to 1.999.  Let N+0.15 be the number of
@@ -906,19 +906,20 @@ AdvanceOldestCommitTsXid(TransactionId oldestXact)
  * last entry of the oldestXact page.  While page 2 is expendable at
  * oldestXact=N+2.1, it would be precious at oldestXact=N+2.9.
  */
-static bool
-CommitTsPagePrecedes(int page1, int page2)
+static int32
+CommitTsPageDiff(int page1, int page2)
 {
        TransactionId xid1;
        TransactionId xid2;
+       int32           diff_head;
+       int32           diff_tail;
 
        xid1 = ((TransactionId) page1) * COMMIT_TS_XACTS_PER_PAGE;
-       xid1 += FirstNormalTransactionId + 1;
        xid2 = ((TransactionId) page2) * COMMIT_TS_XACTS_PER_PAGE;
-       xid2 += FirstNormalTransactionId + 1;
 
-       return (TransactionIdPrecedes(xid1, xid2) &&
-                       TransactionIdPrecedes(xid1, xid2 + 
COMMIT_TS_XACTS_PER_PAGE - 1));
+       diff_head = xid1 - xid2;
+       diff_tail = xid1 - (xid2 + COMMIT_TS_XACTS_PER_PAGE - 1);
+       return Max(diff_head, diff_tail);
 }
 
 
diff --git a/src/backend/access/transam/multixact.c 
b/src/backend/access/transam/multixact.c
index ff96083..1a2e6f9 100644
--- a/src/backend/access/transam/multixact.c
+++ b/src/backend/access/transam/multixact.c
@@ -102,7 +102,7 @@
  * 0xFFFFFFFF/MULTIXACT_OFFSETS_PER_PAGE/SLRU_PAGES_PER_SEGMENT.  We need
  * take no explicit notice of that fact in this module, except when comparing
  * segment and page numbers in TruncateMultiXact (see
- * MultiXactOffsetPagePrecedes).
+ * MultiXactOffsetPageDiff).
  */
 
 /* We need four bytes per offset */
@@ -355,8 +355,8 @@ static char *mxstatus_to_string(MultiXactStatus status);
 /* management of SLRU infrastructure */
 static int     ZeroMultiXactOffsetPage(int pageno, bool writeXlog);
 static int     ZeroMultiXactMemberPage(int pageno, bool writeXlog);
-static bool MultiXactOffsetPagePrecedes(int page1, int page2);
-static bool MultiXactMemberPagePrecedes(int page1, int page2);
+static int32 MultiXactOffsetPageDiff(int page1, int page2);
+static int32 MultiXactMemberPageDiff(int page1, int page2);
 static bool MultiXactOffsetPrecedes(MultiXactOffset offset1,
                                                                        
MultiXactOffset offset2);
 static void ExtendMultiXactOffset(MultiXactId multi);
@@ -1825,14 +1825,14 @@ MultiXactShmemInit(void)
 
        debug_elog2(DEBUG2, "Shared Memory Init for MultiXact");
 
-       MultiXactOffsetCtl->PagePrecedes = MultiXactOffsetPagePrecedes;
-       MultiXactMemberCtl->PagePrecedes = MultiXactMemberPagePrecedes;
+       MultiXactOffsetCtl->PageDiff = MultiXactOffsetPageDiff;
+       MultiXactMemberCtl->PageDiff = MultiXactMemberPageDiff;
 
        SimpleLruInit(MultiXactOffsetCtl,
                                  "MultiXactOffset", 
NUM_MULTIXACTOFFSET_BUFFERS, 0,
                                  MultiXactOffsetSLRULock, 
"pg_multixact/offsets",
                                  LWTRANCHE_MULTIXACTOFFSET_BUFFER);
-       SlruPagePrecedesUnitTests(MultiXactOffsetCtl, 
MULTIXACT_OFFSETS_PER_PAGE);
+       SlruPageDiffUnitTests(MultiXactOffsetCtl, MULTIXACT_OFFSETS_PER_PAGE);
        SimpleLruInit(MultiXactMemberCtl,
                                  "MultiXactMember", 
NUM_MULTIXACTMEMBER_BUFFERS, 0,
                                  MultiXactMemberSLRULock, 
"pg_multixact/members",
@@ -2863,7 +2863,7 @@ SlruScanDirCbFindEarliest(SlruCtl ctl, char *filename, 
int segpage, void *data)
        mxtruncinfo *trunc = (mxtruncinfo *) data;
 
        if (trunc->earliestExistingPage == -1 ||
-               ctl->PagePrecedes(segpage, trunc->earliestExistingPage))
+               ctl->PageDiff(segpage, trunc->earliestExistingPage))
        {
                trunc->earliestExistingPage = segpage;
        }
@@ -2982,11 +2982,12 @@ TruncateMultiXact(MultiXactId newOldestMulti, Oid 
newOldestMultiDB)
         *
         * When nextMXact is less than one segment away from multiWrapLimit,
         * SlruScanDirCbFindEarliest can find some early segment other than the
-        * actual earliest.  (MultiXactOffsetPagePrecedes(EARLIEST, LATEST)
-        * returns false, because not all pairs of entries have the same 
answer.)
-        * That can also arise when an earlier truncation attempt failed 
unlink()
-        * or returned early from this function.  The only consequence is
-        * returning early, which wastes space that we could have liberated.
+        * actual earliest.  (MultiXactOffsetPageDiff(EARLIEST, LATEST) >= 0,
+        * because not all pairs of entries have the same answer.)  That can 
also
+        * arise when an earlier truncation attempt failed unlink(), returned
+        * early from this function, or saw SlruWouldTruncateSegment() decline 
to
+        * delete the older half of the SLRU.  The only consequence is returning
+        * early, which wastes space that we could have liberated.
         *
         * NB: It's also possible that the page that oldestMulti is on has 
already
         * been truncated away, and we crashed before updating oldestMulti.
@@ -3102,44 +3103,42 @@ TruncateMultiXact(MultiXactId newOldestMulti, Oid 
newOldestMultiDB)
 }
 
 /*
- * Decide whether a MultiXactOffset page number is "older" for truncation
- * purposes.  Analogous to CLOGPagePrecedes().
- *
- * Offsetting the values is optional, because MultiXactIdPrecedes() has
- * translational symmetry.
+ * Diff MultiXactOffset page numbers for truncation purposes.  Analogous to
+ * CLOGPageDiff().
  */
-static bool
-MultiXactOffsetPagePrecedes(int page1, int page2)
+static int32
+MultiXactOffsetPageDiff(int page1, int page2)
 {
        MultiXactId multi1;
        MultiXactId multi2;
+       int32           diff_head;
+       int32           diff_tail;
 
        multi1 = ((MultiXactId) page1) * MULTIXACT_OFFSETS_PER_PAGE;
-       multi1 += FirstMultiXactId + 1;
        multi2 = ((MultiXactId) page2) * MULTIXACT_OFFSETS_PER_PAGE;
-       multi2 += FirstMultiXactId + 1;
 
-       return (MultiXactIdPrecedes(multi1, multi2) &&
-                       MultiXactIdPrecedes(multi1,
-                                                               multi2 + 
MULTIXACT_OFFSETS_PER_PAGE - 1));
+       diff_head = multi1 - multi2;
+       diff_tail = multi1 - (multi2 + MULTIXACT_OFFSETS_PER_PAGE - 1);
+       return Max(diff_head, diff_tail);
 }
 
 /*
- * Decide whether a MultiXactMember page number is "older" for truncation
- * purposes.  There is no "invalid offset number" so use the numbers verbatim.
+ * Diff MultiXactMember page numbers for truncation purposes.
  */
-static bool
-MultiXactMemberPagePrecedes(int page1, int page2)
+static int32
+MultiXactMemberPageDiff(int page1, int page2)
 {
        MultiXactOffset offset1;
        MultiXactOffset offset2;
+       int32           diff_head;
+       int32           diff_tail;
 
        offset1 = ((MultiXactOffset) page1) * MULTIXACT_MEMBERS_PER_PAGE;
        offset2 = ((MultiXactOffset) page2) * MULTIXACT_MEMBERS_PER_PAGE;
 
-       return (MultiXactOffsetPrecedes(offset1, offset2) &&
-                       MultiXactOffsetPrecedes(offset1,
-                                                                       offset2 
+ MULTIXACT_MEMBERS_PER_PAGE - 1));
+       diff_head = offset1 - offset2;
+       diff_tail = offset1 - (offset2 + MULTIXACT_MEMBERS_PER_PAGE - 1);
+       return Max(diff_head, diff_tail);
 }
 
 /*
diff --git a/src/backend/access/transam/slru.c 
b/src/backend/access/transam/slru.c
index 33e1e93..b665fe6 100644
--- a/src/backend/access/transam/slru.c
+++ b/src/backend/access/transam/slru.c
@@ -248,7 +248,7 @@ SimpleLruInit(SlruCtl ctl, const char *name, int nslots, 
int nlsns,
 
        /*
         * Initialize the unshared control struct, including directory path. We
-        * assume caller set PagePrecedes.
+        * assume caller set PageDiff.
         */
        ctl->shared = shared;
        ctl->do_fsync = true;           /* default behavior */
@@ -1084,8 +1084,8 @@ SlruSelectLRUPage(SlruCtl ctl, int pageno)
                        {
                                if (this_delta > best_valid_delta ||
                                        (this_delta == best_valid_delta &&
-                                        ctl->PagePrecedes(this_page_number,
-                                                                          
best_valid_page_number)))
+                                        ctl->PageDiff(this_page_number,
+                                                                  
best_valid_page_number) < 0))
                                {
                                        bestvalidslot = slotno;
                                        best_valid_delta = this_delta;
@@ -1096,8 +1096,8 @@ SlruSelectLRUPage(SlruCtl ctl, int pageno)
                        {
                                if (this_delta > best_invalid_delta ||
                                        (this_delta == best_invalid_delta &&
-                                        ctl->PagePrecedes(this_page_number,
-                                                                          
best_invalid_page_number)))
+                                        ctl->PageDiff(this_page_number,
+                                                                  
best_invalid_page_number) < 0))
                                {
                                        bestinvalidslot = slotno;
                                        best_invalid_delta = this_delta;
@@ -1207,7 +1207,8 @@ SimpleLruFlush(SlruCtl ctl, bool allow_redirtied)
 }
 
 /*
- * Remove all segments before the one holding the passed page number
+ * Remove some obsolete segments.  As defense in depth, this deletes less than
+ * PageDiff() authorizes; see SlruWouldTruncateSegment().
  */
 void
 SimpleLruTruncate(SlruCtl ctl, int cutoffPage)
@@ -1232,7 +1233,7 @@ restart:;
         * While we are holding the lock, make an important safety check: the
         * current endpoint page must not be eligible for removal.
         */
-       if (ctl->PagePrecedes(shared->latest_page_number, cutoffPage))
+       if (ctl->PageDiff(shared->latest_page_number, cutoffPage) < 0)
        {
                LWLockRelease(shared->ControlLock);
                ereport(LOG,
@@ -1245,7 +1246,7 @@ restart:;
        {
                if (shared->page_status[slotno] == SLRU_PAGE_EMPTY)
                        continue;
-               if (!ctl->PagePrecedes(shared->page_number[slotno], cutoffPage))
+               if (ctl->PageDiff(shared->page_number[slotno], cutoffPage) >= 0)
                        continue;
 
                /*
@@ -1357,33 +1358,46 @@ restart:
 }
 
 /*
- * Determine whether a segment is okay to delete.
+ * Determine whether to delete a segment.
  *
  * segpage is the first page of the segment, and cutoffPage is the oldest (in
- * PagePrecedes order) page in the SLRU containing still-useful data.  Since
- * every core PagePrecedes callback implements "wrap around", check the
+ * PageDiff order) page in the SLRU containing still-useful data.  Check the
  * segment's first and last pages:
  *
  * first<cutoff  && last<cutoff:  yes
  * first<cutoff  && last>=cutoff: no; cutoff falls inside this segment
  * first>=cutoff && last<cutoff:  no; wrap point falls inside this segment
  * first>=cutoff && last>=cutoff: no; every page of this segment is too young
+ *
+ * The PageDiff specification requires us not to remove pages where the
+ * callback reports negative values close to INT_MIN.  Our interpretation is
+ * to decline to delete segments containing a page P such that PageDiff(P,
+ * cutoffPage) is in [INT_MIN, INT_MIN/2].
  */
 static bool
-SlruMayDeleteSegment(SlruCtl ctl, int segpage, int cutoffPage)
+SlruWouldTruncateSegment(SlruCtl ctl, int segpage, int cutoffPage)
 {
-       int                     seg_last_page = segpage + 
SLRU_PAGES_PER_SEGMENT - 1;
+       int                     first_page_diff;
 
        Assert(segpage % SLRU_PAGES_PER_SEGMENT == 0);
 
-       return (ctl->PagePrecedes(segpage, cutoffPage) &&
-                       ctl->PagePrecedes(seg_last_page, cutoffPage));
+       first_page_diff = ctl->PageDiff(segpage, cutoffPage);
+       if (first_page_diff < 0 && first_page_diff > INT_MIN / 2)
+       {
+               int                     seg_last_page = segpage + 
SLRU_PAGES_PER_SEGMENT - 1;
+               int                     last_page_diff = 
ctl->PageDiff(seg_last_page, cutoffPage);
+
+               return last_page_diff < 0 && last_page_diff > INT_MIN / 2;
+       }
+       return false;
 }
 
 #ifdef USE_ASSERT_CHECKING
 static void
-SlruPagePrecedesTestOffset(SlruCtl ctl, int per_page, uint32 offset)
+SlruPageDiffTestOffset(SlruCtl ctl, int per_page, uint32 offset)
 {
+       int32           large_negative = INT_MIN / 1000 * 999,
+                               large_positive = INT_MAX / 1000 * 999;
        TransactionId lhs,
                                rhs;
        int                     newestPage,
@@ -1407,19 +1421,27 @@ SlruPagePrecedesTestOffset(SlruCtl ctl, int per_page, 
uint32 offset)
        Assert(!TransactionIdPrecedes(rhs, lhs + 1));
        Assert(!TransactionIdFollowsOrEquals(lhs, rhs));
        Assert(!TransactionIdFollowsOrEquals(rhs, lhs));
-       Assert(!ctl->PagePrecedes(lhs / per_page, lhs / per_page));
-       Assert(!ctl->PagePrecedes(lhs / per_page, rhs / per_page));
-       Assert(!ctl->PagePrecedes(rhs / per_page, lhs / per_page));
-       Assert(!ctl->PagePrecedes((lhs - per_page) / per_page, rhs / per_page));
-       Assert(ctl->PagePrecedes(rhs / per_page, (lhs - 3 * per_page) / 
per_page));
-       Assert(ctl->PagePrecedes(rhs / per_page, (lhs - 2 * per_page) / 
per_page));
-       Assert(ctl->PagePrecedes(rhs / per_page, (lhs - 1 * per_page) / 
per_page)
-                  || (1U << 31) % per_page != 0);      /* See 
CommitTsPagePrecedes() */
-       Assert(ctl->PagePrecedes((lhs + 1 * per_page) / per_page, rhs / 
per_page)
+       Assert(ctl->PageDiff(lhs / per_page, lhs / per_page) == 0);
+       Assert(ctl->PageDiff(lhs / per_page, rhs / per_page) > large_positive);
+       Assert(ctl->PageDiff(rhs / per_page, lhs / per_page) > large_positive);
+       Assert(ctl->PageDiff((lhs - per_page) / per_page, rhs / per_page) >
+                  large_positive);
+       Assert(ctl->PageDiff(rhs / per_page, (lhs - 3 * per_page) / per_page) <
+                  large_negative);
+       Assert(ctl->PageDiff(rhs / per_page, (lhs - 2 * per_page) / per_page) <
+                  large_negative);
+       Assert(ctl->PageDiff(rhs / per_page, (lhs - 1 * per_page) / per_page) <
+                  large_negative
+                  || (1U << 31) % per_page != 0);      /* See 
CommitTsPageDiff() */
+       Assert(ctl->PageDiff((lhs + 1 * per_page) / per_page, rhs / per_page) <
+                  large_negative
                   || (1U << 31) % per_page != 0);
-       Assert(ctl->PagePrecedes((lhs + 2 * per_page) / per_page, rhs / 
per_page));
-       Assert(ctl->PagePrecedes((lhs + 3 * per_page) / per_page, rhs / 
per_page));
-       Assert(!ctl->PagePrecedes(rhs / per_page, (lhs + per_page) / per_page));
+       Assert(ctl->PageDiff((lhs + 2 * per_page) / per_page, rhs / per_page) <
+                  large_negative);
+       Assert(ctl->PageDiff((lhs + 3 * per_page) / per_page, rhs / per_page) <
+                  large_negative);
+       Assert(ctl->PageDiff(rhs / per_page, (lhs + per_page) / per_page) >
+                  large_positive);
 
        /*
         * GetNewTransactionId() has assigned the last XID it can safely use, 
and
@@ -1432,10 +1454,10 @@ SlruPagePrecedesTestOffset(SlruCtl ctl, int per_page, 
uint32 offset)
        oldestXact = newestXact + 1;
        oldestXact -= 1U << 31;
        oldestPage = oldestXact / per_page;
-       Assert(!SlruMayDeleteSegment(ctl,
-                                                                (newestPage -
-                                                                 newestPage % 
SLRU_PAGES_PER_SEGMENT),
-                                                                oldestPage));
+       Assert(!SlruWouldTruncateSegment(ctl,
+                                                                        
(newestPage -
+                                                                         
newestPage % SLRU_PAGES_PER_SEGMENT),
+                                                                        
oldestPage));
 
        /*
         * GetNewTransactionId() has assigned the last XID it can safely use, 
and
@@ -1448,42 +1470,44 @@ SlruPagePrecedesTestOffset(SlruCtl ctl, int per_page, 
uint32 offset)
        oldestXact = newestXact + 1;
        oldestXact -= 1U << 31;
        oldestPage = oldestXact / per_page;
-       Assert(!SlruMayDeleteSegment(ctl,
-                                                                (newestPage -
-                                                                 newestPage % 
SLRU_PAGES_PER_SEGMENT),
-                                                                oldestPage));
+       Assert(!SlruWouldTruncateSegment(ctl,
+                                                                        
(newestPage -
+                                                                         
newestPage % SLRU_PAGES_PER_SEGMENT),
+                                                                        
oldestPage));
 }
 
 /*
- * Unit-test a PagePrecedes function.
+ * Unit-test a PageDiff function.
  *
  * This assumes every uint32 >= FirstNormalTransactionId is a valid key.  It
  * assumes each value occupies a contiguous, fixed-size region of SLRU bytes.
  * (MultiXactMemberCtl separates flags from XIDs.  AsyncCtl has
  * variable-length entries, no keys, and no random access.  These unit tests
  * do not apply to them.)
+ *
+ * This is stricter than the PageDiff API requires.
  */
 void
-SlruPagePrecedesUnitTests(SlruCtl ctl, int per_page)
+SlruPageDiffUnitTests(SlruCtl ctl, int per_page)
 {
        /* Test first, middle and last entries of a page. */
-       SlruPagePrecedesTestOffset(ctl, per_page, 0);
-       SlruPagePrecedesTestOffset(ctl, per_page, per_page / 2);
-       SlruPagePrecedesTestOffset(ctl, per_page, per_page - 1);
+       SlruPageDiffTestOffset(ctl, per_page, 0);
+       SlruPageDiffTestOffset(ctl, per_page, per_page / 2);
+       SlruPageDiffTestOffset(ctl, per_page, per_page - 1);
 }
 #endif
 
 /*
  * SlruScanDirectory callback
- *             This callback reports true if there's any segment wholly prior 
to the
- *             one containing the page passed as "data".
+ *             This callback reports true if SimpleLruTruncate(ctl, *data) 
would
+ *             unlink any segment.
  */
 bool
-SlruScanDirCbReportPresence(SlruCtl ctl, char *filename, int segpage, void 
*data)
+SlruScanDirCbWouldTruncate(SlruCtl ctl, char *filename, int segpage, void 
*data)
 {
        int                     cutoffPage = *(int *) data;
 
-       if (SlruMayDeleteSegment(ctl, segpage, cutoffPage))
+       if (SlruWouldTruncateSegment(ctl, segpage, cutoffPage))
                return true;                    /* found one; don't iterate any 
more */
 
        return false;                           /* keep going */
@@ -1498,7 +1522,7 @@ SlruScanDirCbDeleteCutoff(SlruCtl ctl, char *filename, 
int segpage, void *data)
 {
        int                     cutoffPage = *(int *) data;
 
-       if (SlruMayDeleteSegment(ctl, segpage, cutoffPage))
+       if (SlruWouldTruncateSegment(ctl, segpage, cutoffPage))
                SlruInternalDeleteSegment(ctl, filename);
 
        return false;                           /* keep going */
diff --git a/src/backend/access/transam/subtrans.c 
b/src/backend/access/transam/subtrans.c
index 09cfa38..76ce2d4 100644
--- a/src/backend/access/transam/subtrans.c
+++ b/src/backend/access/transam/subtrans.c
@@ -44,7 +44,7 @@
  * 0xFFFFFFFF/SUBTRANS_XACTS_PER_PAGE, and segment numbering at
  * 0xFFFFFFFF/SUBTRANS_XACTS_PER_PAGE/SLRU_PAGES_PER_SEGMENT.  We need take no
  * explicit notice of that fact in this module, except when comparing segment
- * and page numbers in TruncateSUBTRANS (see SubTransPagePrecedes) and zeroing
+ * and page numbers in TruncateSUBTRANS (see SubTransPageDiff) and zeroing
  * them in StartupSUBTRANS.
  */
 
@@ -64,7 +64,7 @@ static SlruCtlData SubTransCtlData;
 
 
 static int     ZeroSUBTRANSPage(int pageno);
-static bool SubTransPagePrecedes(int page1, int page2);
+static int32 SubTransPageDiff(int page1, int page2);
 
 
 /*
@@ -190,13 +190,13 @@ SUBTRANSShmemSize(void)
 void
 SUBTRANSShmemInit(void)
 {
-       SubTransCtl->PagePrecedes = SubTransPagePrecedes;
+       SubTransCtl->PageDiff = SubTransPageDiff;
        SimpleLruInit(SubTransCtl, "Subtrans", NUM_SUBTRANS_BUFFERS, 0,
                                  SubtransSLRULock, "pg_subtrans",
                                  LWTRANCHE_SUBTRANS_BUFFER);
        /* Override default assumption that writes should be fsync'd */
        SubTransCtl->do_fsync = false;
-       SlruPagePrecedesUnitTests(SubTransCtl, SUBTRANS_XACTS_PER_PAGE);
+       SlruPageDiffUnitTests(SubTransCtl, SUBTRANS_XACTS_PER_PAGE);
 }
 
 /*
@@ -374,20 +374,21 @@ TruncateSUBTRANS(TransactionId oldestXact)
 
 
 /*
- * Decide whether a SUBTRANS page number is "older" for truncation purposes.
- * Analogous to CLOGPagePrecedes().
+ * Diff SUBTRANS page numbers for truncation purposes.  Analogous to
+ * CLOGPageDiff().
  */
-static bool
-SubTransPagePrecedes(int page1, int page2)
+static int32
+SubTransPageDiff(int page1, int page2)
 {
        TransactionId xid1;
        TransactionId xid2;
+       int32           diff_head;
+       int32           diff_tail;
 
        xid1 = ((TransactionId) page1) * SUBTRANS_XACTS_PER_PAGE;
-       xid1 += FirstNormalTransactionId + 1;
        xid2 = ((TransactionId) page2) * SUBTRANS_XACTS_PER_PAGE;
-       xid2 += FirstNormalTransactionId + 1;
 
-       return (TransactionIdPrecedes(xid1, xid2) &&
-                       TransactionIdPrecedes(xid1, xid2 + 
SUBTRANS_XACTS_PER_PAGE - 1));
+       diff_head = xid1 - xid2;
+       diff_tail = xid1 - (xid2 + SUBTRANS_XACTS_PER_PAGE - 1);
+       return Max(diff_head, diff_tail);
 }
diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c
index 7e5cd66..f3a676f 100644
--- a/src/backend/commands/async.c
+++ b/src/backend/commands/async.c
@@ -207,13 +207,13 @@ typedef struct QueuePosition
 
 /* choose logically smaller QueuePosition */
 #define QUEUE_POS_MIN(x,y) \
-       (asyncQueuePagePrecedes((x).page, (y).page) ? (x) : \
+       (asyncQueuePageDiff((x).page, (y).page) < 0 ? (x) : \
         (x).page != (y).page ? (y) : \
         (x).offset < (y).offset ? (x) : (y))
 
 /* choose logically larger QueuePosition */
 #define QUEUE_POS_MAX(x,y) \
-       (asyncQueuePagePrecedes((x).page, (y).page) ? (y) : \
+       (asyncQueuePageDiff((x).page, (y).page) < 0 ? (y) : \
         (x).page != (y).page ? (x) : \
         (x).offset > (y).offset ? (x) : (y))
 
@@ -433,8 +433,7 @@ static bool backendTryAdvanceTail = false;
 bool           Trace_notify = false;
 
 /* local function prototypes */
-static int     asyncQueuePageDiff(int p, int q);
-static bool asyncQueuePagePrecedes(int p, int q);
+static int32 asyncQueuePageDiff(int p, int q);
 static void queue_listen(ListenActionKind action, const char *channel);
 static void Async_UnlistenOnExit(int code, Datum arg);
 static void Exec_ListenPreCommit(void);
@@ -465,12 +464,16 @@ static void ClearPendingActionsAndNotifies(void);
 
 /*
  * Compute the difference between two queue page numbers (i.e., p - q),
- * accounting for wraparound.
+ * accounting for wraparound.  Since asyncQueueIsFull() blocks creation of a
+ * page that could precede any extant page, we need not assess entries within
+ * a page.
  */
-static int
+static int32
 asyncQueuePageDiff(int p, int q)
 {
-       int                     diff;
+       int                     diff_max = ((QUEUE_MAX_PAGE + 1) / 2) - 1,
+                               diff;
+       int32           scale = INT_MAX / diff_max;
 
        /*
         * We have to compare modulo (QUEUE_MAX_PAGE+1)/2.  Both inputs should 
be
@@ -484,19 +487,24 @@ asyncQueuePageDiff(int p, int q)
                diff -= QUEUE_MAX_PAGE + 1;
        else if (diff < -((QUEUE_MAX_PAGE + 1) / 2))
                diff += QUEUE_MAX_PAGE + 1;
-       return diff;
+       return diff * scale;
 }
 
-/*
- * Is p < q, accounting for wraparound?
- *
- * Since asyncQueueIsFull() blocks creation of a page that could precede any
- * extant page, we need not assess entries within a page.
- */
-static bool
-asyncQueuePagePrecedes(int p, int q)
+static void
+asyncQueuePageDiffUnitTests(void)
 {
-       return asyncQueuePageDiff(p, q) < 0;
+       int32           large_negative = INT_MIN / 1000 * 999,
+                               large_positive = INT_MAX / 1000 * 999;
+       int                     diff_min = -((QUEUE_MAX_PAGE + 1) / 2),
+                               diff_max = ((QUEUE_MAX_PAGE + 1) / 2) - 1;
+
+       Assert(asyncQueuePageDiff(diff_max, diff_max) == 0);
+       Assert(asyncQueuePageDiff(diff_max, 0) > large_positive);
+       Assert(asyncQueuePageDiff(diff_max + 1, 0) < large_negative);
+       Assert(asyncQueuePageDiff(0, QUEUE_MAX_PAGE + diff_min + 1) <
+                  large_negative);
+       Assert(asyncQueuePageDiff(0, QUEUE_MAX_PAGE + diff_min + 2) >
+                  large_positive);
 }
 
 /*
@@ -557,11 +565,12 @@ AsyncShmemInit(void)
        /*
         * Set up SLRU management of the pg_notify data.
         */
-       NotifyCtl->PagePrecedes = asyncQueuePagePrecedes;
+       NotifyCtl->PageDiff = asyncQueuePageDiff;
        SimpleLruInit(NotifyCtl, "Notify", NUM_NOTIFY_BUFFERS, 0,
                                  NotifySLRULock, "pg_notify", 
LWTRANCHE_NOTIFY_BUFFER);
        /* Override default assumption that writes should be fsync'd */
        NotifyCtl->do_fsync = false;
+       asyncQueuePageDiffUnitTests();
 
        if (!found)
        {
@@ -1366,7 +1375,7 @@ asyncQueueIsFull(void)
                nexthead = 0;                   /* wrap around */
        boundary = QUEUE_POS_PAGE(QUEUE_TAIL);
        boundary -= boundary % SLRU_PAGES_PER_SEGMENT;
-       return asyncQueuePagePrecedes(nexthead, boundary);
+       return asyncQueuePageDiff(nexthead, boundary) < 0;
 }
 
 /*
@@ -2202,7 +2211,7 @@ asyncQueueAdvanceTail(void)
         */
        newtailpage = QUEUE_POS_PAGE(min);
        boundary = newtailpage - (newtailpage % SLRU_PAGES_PER_SEGMENT);
-       if (asyncQueuePagePrecedes(oldtailpage, boundary))
+       if (asyncQueuePageDiff(oldtailpage, boundary) < 0)
        {
                /*
                 * SimpleLruTruncate() will ask for NotifySLRULock but will also
diff --git a/src/backend/storage/lmgr/predicate.c 
b/src/backend/storage/lmgr/predicate.c
index fde1b5c..6dbd87e 100644
--- a/src/backend/storage/lmgr/predicate.c
+++ b/src/backend/storage/lmgr/predicate.c
@@ -438,7 +438,7 @@ static void SetPossibleUnsafeConflict(SERIALIZABLEXACT 
*roXact, SERIALIZABLEXACT
 static void ReleaseRWConflict(RWConflict conflict);
 static void FlagSxactUnsafe(SERIALIZABLEXACT *sxact);
 
-static bool SerialPagePrecedesLogically(int page1, int page2);
+static int32 SerialPageDiffLogically(int page1, int page2);
 static void SerialInit(void);
 static void SerialAdd(TransactionId xid, SerCommitSeqNo 
minConflictCommitSeqNo);
 static SerCommitSeqNo SerialGetMinConflictCommitSeqNo(TransactionId xid);
@@ -784,26 +784,30 @@ FlagSxactUnsafe(SERIALIZABLEXACT *sxact)
 /*------------------------------------------------------------------------*/
 
 /*
- * Decide whether a Serial page number is "older" for truncation purposes.
- * Analogous to CLOGPagePrecedes().
+ * Diff Serial page numbers for truncation purposes.  Analogous to
+ * CLOGPageDiff().
+ *
+ * This must follow stricter rules than PageDiff demands, for the benefit of
+ * the call local to this file.
  */
-static bool
-SerialPagePrecedesLogically(int page1, int page2)
+static int32
+SerialPageDiffLogically(int page1, int page2)
 {
        TransactionId xid1;
        TransactionId xid2;
+       int32           diff_head;
+       int32           diff_tail;
 
        xid1 = ((TransactionId) page1) * SERIAL_ENTRIESPERPAGE;
-       xid1 += FirstNormalTransactionId + 1;
        xid2 = ((TransactionId) page2) * SERIAL_ENTRIESPERPAGE;
-       xid2 += FirstNormalTransactionId + 1;
 
-       return (TransactionIdPrecedes(xid1, xid2) &&
-                       TransactionIdPrecedes(xid1, xid2 + 
SERIAL_ENTRIESPERPAGE - 1));
+       diff_head = xid1 - xid2;
+       diff_tail = xid1 - (xid2 + SERIAL_ENTRIESPERPAGE - 1);
+       return Max(diff_head, diff_tail);
 }
 
 static void
-SerialPagePrecedesLogicallyUnitTests(void)
+SerialPageDiffLogicallyUnitTests(void)
 {
        int                     per_page = SERIAL_ENTRIESPERPAGE,
                                offset = per_page / 2;
@@ -826,21 +830,21 @@ SerialPagePrecedesLogicallyUnitTests(void)
         * In this scenario, the SLRU headPage pertains to the last ~1000 XIDs
         * assigned.  oldestXact finishes, ~2B XIDs having elapsed since it
         * started.  Further transactions cause us to summarize oldestXact to
-        * tailPage.  Function must return false so SerialAdd() doesn't zero
-        * tailPage (which may contain entries for other old, recently-finished
-        * XIDs) and half the SLRU.  Reaching this requires burning ~2B XIDs in
-        * single-user mode, a negligible possibility.
+        * tailPage.  Function must return non-negative so SerialAdd() doesn't
+        * zero tailPage (which may contain entries for other old,
+        * recently-finished XIDs) and half the SLRU.  Reaching this requires
+        * burning ~2B XIDs in single-user mode, a negligible possibility.
         */
        headPage = newestPage;
        targetPage = oldestPage;
-       Assert(!SerialPagePrecedesLogically(headPage, targetPage));
+       Assert(SerialPageDiffLogically(headPage, targetPage) >= 0);
 
        /*
         * In this scenario, the SLRU headPage pertains to oldestXact.  We're
         * summarizing an XID near newestXact.  (Assume few other XIDs used
         * SERIALIZABLE, hence the minimal headPage advancement.  Assume
         * oldestXact was long-running and only recently reached the SLRU.)
-        * Function must return true to make SerialAdd() create targetPage.
+        * Function must return negative to make SerialAdd() create targetPage.
         *
         * Today's implementation mishandles this case, but it doesn't matter
         * enough to fix.  Verify that the defect affects just one page by
@@ -851,9 +855,9 @@ SerialPagePrecedesLogicallyUnitTests(void)
         */
        headPage = oldestPage;
        targetPage = newestPage;
-       Assert(SerialPagePrecedesLogically(headPage, targetPage - 1));
+       Assert(SerialPageDiffLogically(headPage, targetPage - 1) < 0);
 #if 0
-       Assert(SerialPagePrecedesLogically(headPage, targetPage));
+       Assert(SerialPageDiffLogically(headPage, targetPage) < 0);
 #endif
 }
 
@@ -868,14 +872,14 @@ SerialInit(void)
        /*
         * Set up SLRU management of the pg_serial data.
         */
-       SerialSlruCtl->PagePrecedes = SerialPagePrecedesLogically;
+       SerialSlruCtl->PageDiff = SerialPageDiffLogically;
        SimpleLruInit(SerialSlruCtl, "Serial",
                                  NUM_SERIAL_BUFFERS, 0, SerialSLRULock, 
"pg_serial",
                                  LWTRANCHE_SERIAL_BUFFER);
        /* Override default assumption that writes should be fsync'd */
        SerialSlruCtl->do_fsync = false;
-       SerialPagePrecedesLogicallyUnitTests();
-       SlruPagePrecedesUnitTests(SerialSlruCtl, SERIAL_ENTRIESPERPAGE);
+       SerialPageDiffLogicallyUnitTests();
+       SlruPageDiffUnitTests(SerialSlruCtl, SERIAL_ENTRIESPERPAGE);
 
        /*
         * Create or attach to the SerialControl structure.
@@ -937,8 +941,8 @@ SerialAdd(TransactionId xid, SerCommitSeqNo 
minConflictCommitSeqNo)
        else
        {
                firstZeroPage = SerialNextPage(serialControl->headPage);
-               isNewPage = SerialPagePrecedesLogically(serialControl->headPage,
-                                                                               
                targetPage);
+               isNewPage = SerialPageDiffLogically(serialControl->headPage,
+                                                                               
        targetPage) < 0;
        }
 
        if (!TransactionIdIsValid(serialControl->headXid)
diff --git a/src/include/access/slru.h b/src/include/access/slru.h
index 19982f6..a8144a5 100644
--- a/src/include/access/slru.h
+++ b/src/include/access/slru.h
@@ -28,7 +28,7 @@
  * xxxx is CLOG or SUBTRANS, respectively), and segment numbering at
  * 0xFFFFFFFF/xxxx_XACTS_PER_PAGE/SLRU_PAGES_PER_SEGMENT.  We need
  * take no explicit notice of that fact in slru.c, except when comparing
- * segment and page numbers in SimpleLruTruncate (see PagePrecedes()).
+ * segment and page numbers in SimpleLruTruncate (see PageDiff()).
  */
 #define SLRU_PAGES_PER_SEGMENT 32
 
@@ -117,16 +117,18 @@ typedef struct SlruCtlData
        bool            do_fsync;
 
        /*
-        * Decide whether a page is "older" for truncation and as a hint for
-        * evicting pages in LRU order.  Return true if every entry of the first
-        * argument is older than every entry of the second argument.  Note that
-        * !PagePrecedes(a,b) && !PagePrecedes(b,a) need not imply a==b; it also
-        * arises when some entries are older and some are not.  For SLRUs using
-        * SimpleLruTruncate(), this must use modular arithmetic.  (For others,
-        * the behavior of this callback has no functional implications.)  Use
-        * SlruPagePrecedesUnitTests() in SLRUs meeting its criteria.
+        * Compute distance between two page numbers, for truncation and as a 
hint
+        * for evicting pages in LRU order.  Callbacks shall distribute return
+        * values uniformly in [INT_MIN,INT_MAX].  If PageDiff(P, oldest_needed)
+        * is negative but not close to INT_MIN, that implies data in page P is
+        * obsolete.  The exception for values close to INT_MIN permits
+        * implementations to return such values for edge cases where the answer
+        * changes mid-page from INT_MIN to INT_MAX.  Use 
SlruPageDiffUnitTests()
+        * in SLRUs meeting its criteria.  For SLRUs using SimpleLruTruncate(),
+        * this must use modular arithmetic.  (For others, the behavior of this
+        * callback has no functional implications.)
         */
-       bool            (*PagePrecedes) (int, int);
+       int32           (*PageDiff) (int, int);
 
        /*
         * Dir is set during SimpleLruInit and does not change thereafter. Since
@@ -149,9 +151,9 @@ extern int  SimpleLruReadPage_ReadOnly(SlruCtl ctl, int 
pageno,
 extern void SimpleLruWritePage(SlruCtl ctl, int slotno);
 extern void SimpleLruFlush(SlruCtl ctl, bool allow_redirtied);
 #ifdef USE_ASSERT_CHECKING
-extern void SlruPagePrecedesUnitTests(SlruCtl ctl, int per_page);
+extern void SlruPageDiffUnitTests(SlruCtl ctl, int per_page);
 #else
-#define SlruPagePrecedesUnitTests(ctl, per_page) do {} while (0)
+#define SlruPageDiffUnitTests(ctl, per_page) do {} while (0)
 #endif
 extern void SimpleLruTruncate(SlruCtl ctl, int cutoffPage);
 extern bool SimpleLruDoesPhysicalPageExist(SlruCtl ctl, int pageno);
@@ -162,8 +164,8 @@ extern bool SlruScanDirectory(SlruCtl ctl, SlruScanCallback 
callback, void *data
 extern void SlruDeleteSegment(SlruCtl ctl, int segno);
 
 /* SlruScanDirectory public callbacks */
-extern bool SlruScanDirCbReportPresence(SlruCtl ctl, char *filename,
-                                                                               
int segpage, void *data);
+extern bool SlruScanDirCbWouldTruncate(SlruCtl ctl, char *filename,
+                                                                          int 
segpage, void *data);
 extern bool SlruScanDirCbDeleteAll(SlruCtl ctl, char *filename, int segpage,
                                                                   void *data);
 

Reply via email to