From 020ee66b792e2a4398fed8ae37c0adf827e8dd19 Mon Sep 17 00:00:00 2001
From: "dgrowley@gmail.com" <dgrowley@gmail.com>
Date: Fri, 11 Sep 2020 23:06:31 +1200
Subject: [PATCH v8 1/2] Optimize compactify_tuples function

This function could often be seen in profiles of the recovery process. A
qsort was performed in order to sort the item pointers in reverse offset
order so that we could safely move tuples up to the end of the page
without overwriting the memory of yet-to-be-moved tuples. i.e. we used to
compact the page starting at the back of the page and move towards the
front.  The qsort that this required could be expensive for pages with a
large number of tuples.

In this commit, we take another approach to the tuple defragmentation.

Now, instead of sorting the remaining item pointers, we first check if the
remaining line pointers are presorted and only memmove the tuples that
need to be moved. This presorted check can be done very cheaply in the
calling functions when they're building the array of remaining line
pointers.  This presorted case is very fast.

When the item pointer array is not presorted we must copy tuples that need
to be moved into a temp buffer before copying them back into the page
again. This differs from what we used to do here as we're now copying the
tuples back into the page in reverse line pointer order. Previously we
left the existing order alone.  Reordering the tuples results in an
increased likelihood of hitting the pre-sorted case the next time around.
Any newly added tuple which consumes a new line pointer will also maintain
the correct sort order of tuples in the page which will result in the
presorted case being hit the next time.  Only consuming an unused line
pointer can cause the order of tuples to go out again, but that will be
corrected next time the function is called for the page.

Benchmarks have shown that the non-presorted case is at least equally as
fast as the original qsort method when the page just has a few tuples.
As the number of tuples becomes larger the new method maintains its
performance.  The original qsort method became much slower when the number
of tuples on the page became large.

Author: David Rowley
Reviewed-by: Thomas Munro
Discussion: https://postgr.es/m/CA+hUKGKMQFVpjr106gRhwk6R-nXv0qOcTreZuQzxgpHESAL6dw@mail.gmail.com
---
 src/backend/storage/page/bufpage.c | 279 +++++++++++++++++++++++++----
 1 file changed, 246 insertions(+), 33 deletions(-)

diff --git a/src/backend/storage/page/bufpage.c b/src/backend/storage/page/bufpage.c
index d708117a40..6dab1c264d 100644
--- a/src/backend/storage/page/bufpage.c
+++ b/src/backend/storage/page/bufpage.c
@@ -411,51 +411,243 @@ PageRestoreTempPage(Page tempPage, Page oldPage)
 }
 
 /*
- * sorting support for PageRepairFragmentation and PageIndexMultiDelete
+ * Tuple defrag support for PageRepairFragmentation and PageIndexMultiDelete
  */
-typedef struct itemIdSortData
+typedef struct itemIdCompactData
 {
 	uint16		offsetindex;	/* linp array index */
 	int16		itemoff;		/* page offset of item data */
 	uint16		alignedlen;		/* MAXALIGN(item data len) */
-} itemIdSortData;
-typedef itemIdSortData *itemIdSort;
-
-static int
-itemoffcompare(const void *itemidp1, const void *itemidp2)
-{
-	/* Sort in decreasing itemoff order */
-	return ((itemIdSort) itemidp2)->itemoff -
-		((itemIdSort) itemidp1)->itemoff;
-}
+} itemIdCompactData;
+typedef itemIdCompactData *itemIdCompact;
 
 /*
- * After removing or marking some line pointers unused, move the tuples to
- * remove the gaps caused by the removed items.
+ * After removing or marking some line pointers unused, rearrange the tuples
+ * so that they're located in the page in reverse line pointer order and
+ * remove the gaps caused by the newly unused line pointers.
+ *
+ * This function can be fairly hot, especially during recovery, so it pays to
+ * take some measures to make it as optimal as possible.
+ *
+ * Callers may pass 'presorted' as true iif the 'itemidbase' array is sorted
+ * in descending order of itemoff.  When this is true we can just memmove
+ * tuples towards the end of the page.  This is quite a common case as it's
+ * the order that tuples are initially inserted into pages.  It's also the
+ * order that this function will reorder the tuples into, so hitting this case
+ * is still fairly common for tables that are heavily updated.
+ *
+ * When the 'itemidbase' array is not presorted then we're unable to just
+ * memmove tuples around freely.  Doing so could cause us to overwrite the
+ * memory belonging to a tuple we've not moved yet.  In this case, we copy all
+ * the tuples that need to be moved into a temporary buffer.  We can then
+ * simply memcpy() out of that temporary buffer back into the page at the
+ * correct offset.
+ *
+ * Callers must ensure that nitems is > 0
  */
 static void
-compactify_tuples(itemIdSort itemidbase, int nitems, Page page)
+compactify_tuples(itemIdCompact itemidbase, int nitems, Page page, bool presorted)
 {
 	PageHeader	phdr = (PageHeader) page;
 	Offset		upper;
+	Offset		copy_tail;
+	Offset		copy_head;
+	itemIdCompact itemidptr;
 	int			i;
 
-	/* sort itemIdSortData array into decreasing itemoff order */
-	qsort((char *) itemidbase, nitems, sizeof(itemIdSortData),
-		  itemoffcompare);
+	/* Code within will not work correctly if nitems == 0 */
+	Assert(nitems > 0);
 
-	upper = phdr->pd_special;
-	for (i = 0; i < nitems; i++)
+	if (presorted)
 	{
-		itemIdSort	itemidptr = &itemidbase[i];
-		ItemId		lp;
 
-		lp = PageGetItemId(page, itemidptr->offsetindex + 1);
-		upper -= itemidptr->alignedlen;
+#ifdef USE_ASSERT_CHECKING
+		{
+			/*
+			 * Verify we've not gotten any new callers that are incorrectly
+			 * passing a true presorted value.
+			 */
+			Offset		lastoff = phdr->pd_special;
+
+			for (i = 0; i < nitems; i++)
+			{
+				itemidptr = &itemidbase[i];
+
+				Assert(lastoff > itemidptr->itemoff);
+
+				lastoff = itemidptr->itemoff;
+			}
+		}
+#endif							/* USE_ASSERT_CHECKING */
+
+		/*
+		 * 'itemidbase' is already in the optimal order, i.e, lower item
+		 * pointers have a higher offset.  This allows us to memmove the
+		 * tuples up to the end of the page without having to worry about
+		 * overwriting other tuples that have not been moved yet.
+		 *
+		 * There's a good chance that there are tuples already right at the
+		 * end of the page that we can simply skip over because they're
+		 * already in the correct location within the page.  We'll do that
+		 * first...
+		 */
+		upper = phdr->pd_special;
+		i = 0;
+		do
+		{
+			itemidptr = &itemidbase[i];
+			if (upper != itemidptr->itemoff + itemidptr->alignedlen)
+				break;
+			upper -= itemidptr->alignedlen;
+
+			i++;
+		} while (i < nitems);
+
+		/*
+		 * Now that we've found the first tuple that needs to be moved we can
+		 * do the tuple compactification.  We try and make the least number of
+		 * memmove calls and only call memmove when there's a gap.  When we
+		 * see a gap we just move all tuples after the gap up until the point
+		 * of the last move operation.
+		 */
+		copy_tail = copy_head = itemidptr->itemoff + itemidptr->alignedlen;
+		for (; i < nitems; i++)
+		{
+			ItemId		lp;
+
+			itemidptr = &itemidbase[i];
+			lp = PageGetItemId(page, itemidptr->offsetindex + 1);
+
+			if (copy_head != itemidptr->itemoff + itemidptr->alignedlen)
+			{
+				memmove((char *) page + upper,
+						page + copy_head,
+						copy_tail - copy_head);
+
+				/*
+				 * We've now moved all tuples already seen, but not the
+				 * current tuple, so we set the copy_tail to the end of this
+				 * tuple so it can be moved in another iteration of the loop.
+				 */
+				copy_tail = itemidptr->itemoff + itemidptr->alignedlen;
+			}
+			/* shift the target offset down by the length of this tuple */
+			upper -= itemidptr->alignedlen;
+			/* point the copy_head to the start of this tuple */
+			copy_head = itemidptr->itemoff;
+
+			/* update the line pointer to reference the new offset */
+			lp->lp_off = upper;
+
+		}
+
+		/* move the remaining tuples. */
 		memmove((char *) page + upper,
-				(char *) page + itemidptr->itemoff,
-				itemidptr->alignedlen);
-		lp->lp_off = upper;
+				page + copy_head,
+				copy_tail - copy_head);
+	}
+	else
+	{
+		PGAlignedBlock scratch;
+		char	   *scratchptr = scratch.data;
+
+		/*
+		 * Non-presorted case:  The tuples in the itemidbase array may be in
+		 * any order. So, in order to move these to the end of the page we
+		 * must make a temp copy of each tuple that needs to be moved before
+		 * we copy them back into the page at the new offset.
+		 *
+		 * If a large percentage of tuples have been pruned (>75%) then we'll
+		 * copy these into the temp buffer tuple-by-tuple, otherwise, we'll
+		 * just do a single memcpy for all tuples that need to be moved.
+		 */
+		if (nitems < PageGetMaxOffsetNumber(page) / 4)
+		{
+			i = 0;
+			do
+			{
+				itemidptr = &itemidbase[i];
+				memcpy(scratchptr + itemidptr->itemoff, page + itemidptr->itemoff,
+					   itemidptr->alignedlen);
+				i++;
+			} while (i < nitems);
+
+			/* Set things up for the compactification code below */
+			i = 0;
+			itemidptr = &itemidbase[0];
+			upper = phdr->pd_special;
+		}
+		else
+		{
+			upper = phdr->pd_special;
+
+			/*
+			 * Many tuples are likely to already be in the correct location.
+			 * There's no need to copy these into the temp buffer.  Instead
+			 * we'll just skip forward in the itemidbase array to the position
+			 * that we do need to move tuples from so that the code below just
+			 * leaves these ones alone.
+			 */
+			i = 0;
+			do
+			{
+				itemidptr = &itemidbase[i];
+				if (upper != itemidptr->itemoff + itemidptr->alignedlen)
+					break;
+				upper -= itemidptr->alignedlen;
+
+				i++;
+			} while (i < nitems);
+
+			/* Copy all tuples that need to be moved into the temp buffer */
+			memcpy(scratchptr + phdr->pd_upper,
+				   page + phdr->pd_upper,
+				   upper - phdr->pd_upper);
+		}
+
+		/*
+		 * Do the tuple compactification.  itemidptr is already pointing to
+		 * the first tuple that we're going to move.  Here we collapse the
+		 * memcpy calls for adjacent tuples into a single call.  This is done
+		 * by delaying the memcpy call until we find a gap that needs to be
+		 * closed.
+		 */
+		copy_tail = copy_head = itemidptr->itemoff + itemidptr->alignedlen;
+		for (; i < nitems; i++)
+		{
+			ItemId		lp;
+
+			itemidptr = &itemidbase[i];
+			lp = PageGetItemId(page, itemidptr->offsetindex + 1);
+
+			/* copy pending tuples when we detect a gap */
+			if (copy_head != itemidptr->itemoff + itemidptr->alignedlen)
+			{
+				memcpy((char *) page + upper,
+					   scratchptr + copy_head,
+					   copy_tail - copy_head);
+
+				/*
+				 * We've now copied all tuples already seen, but not the
+				 * current tuple, so we set the copy_tail to the end of this
+				 * tuple.
+				 */
+				copy_tail = itemidptr->itemoff + itemidptr->alignedlen;
+			}
+			/* shift the target offset down by the length of this tuple */
+			upper -= itemidptr->alignedlen;
+			/* point the copy_head to the start of this tuple */
+			copy_head = itemidptr->itemoff;
+
+			/* update the line pointer to reference the new offset */
+			lp->lp_off = upper;
+
+		}
+
+		/* Copy the remaining chunk */
+		memcpy((char *) page + upper,
+			   scratchptr + copy_head,
+			   copy_tail - copy_head);
 	}
 
 	phdr->pd_upper = upper;
@@ -477,14 +669,16 @@ PageRepairFragmentation(Page page)
 	Offset		pd_lower = ((PageHeader) page)->pd_lower;
 	Offset		pd_upper = ((PageHeader) page)->pd_upper;
 	Offset		pd_special = ((PageHeader) page)->pd_special;
-	itemIdSortData itemidbase[MaxHeapTuplesPerPage];
-	itemIdSort	itemidptr;
+	Offset		last_offset;
+	itemIdCompactData itemidbase[MaxHeapTuplesPerPage];
+	itemIdCompact itemidptr;
 	ItemId		lp;
 	int			nline,
 				nstorage,
 				nunused;
 	int			i;
 	Size		totallen;
+	bool		presorted = true;	/* For now */
 
 	/*
 	 * It's worth the trouble to be more paranoid here than in most places,
@@ -509,6 +703,7 @@ PageRepairFragmentation(Page page)
 	nline = PageGetMaxOffsetNumber(page);
 	itemidptr = itemidbase;
 	nunused = totallen = 0;
+	last_offset = pd_special;
 	for (i = FirstOffsetNumber; i <= nline; i++)
 	{
 		lp = PageGetItemId(page, i);
@@ -518,6 +713,12 @@ PageRepairFragmentation(Page page)
 			{
 				itemidptr->offsetindex = i - 1;
 				itemidptr->itemoff = ItemIdGetOffset(lp);
+
+				if (last_offset > itemidptr->itemoff)
+					last_offset = itemidptr->itemoff;
+				else
+					presorted = false;
+
 				if (unlikely(itemidptr->itemoff < (int) pd_upper ||
 							 itemidptr->itemoff >= (int) pd_special))
 					ereport(ERROR,
@@ -552,7 +753,7 @@ PageRepairFragmentation(Page page)
 					 errmsg("corrupted item lengths: total %u, available space %u",
 							(unsigned int) totallen, pd_special - pd_lower)));
 
-		compactify_tuples(itemidbase, nstorage, page);
+		compactify_tuples(itemidbase, nstorage, page, presorted);
 	}
 
 	/* Set hint bit for PageAddItem */
@@ -831,9 +1032,10 @@ PageIndexMultiDelete(Page page, OffsetNumber *itemnos, int nitems)
 	Offset		pd_lower = phdr->pd_lower;
 	Offset		pd_upper = phdr->pd_upper;
 	Offset		pd_special = phdr->pd_special;
-	itemIdSortData itemidbase[MaxIndexTuplesPerPage];
+	Offset		last_offset;
+	itemIdCompactData itemidbase[MaxIndexTuplesPerPage];
 	ItemIdData	newitemids[MaxIndexTuplesPerPage];
-	itemIdSort	itemidptr;
+	itemIdCompact itemidptr;
 	ItemId		lp;
 	int			nline,
 				nused;
@@ -842,6 +1044,7 @@ PageIndexMultiDelete(Page page, OffsetNumber *itemnos, int nitems)
 	unsigned	offset;
 	int			nextitm;
 	OffsetNumber offnum;
+	bool		presorted = true;	/* For now */
 
 	Assert(nitems <= MaxIndexTuplesPerPage);
 
@@ -883,6 +1086,7 @@ PageIndexMultiDelete(Page page, OffsetNumber *itemnos, int nitems)
 	totallen = 0;
 	nused = 0;
 	nextitm = 0;
+	last_offset = pd_special;
 	for (offnum = FirstOffsetNumber; offnum <= nline; offnum = OffsetNumberNext(offnum))
 	{
 		lp = PageGetItemId(page, offnum);
@@ -906,6 +1110,12 @@ PageIndexMultiDelete(Page page, OffsetNumber *itemnos, int nitems)
 		{
 			itemidptr->offsetindex = nused; /* where it will go */
 			itemidptr->itemoff = offset;
+
+			if (last_offset > itemidptr->itemoff)
+				last_offset = itemidptr->itemoff;
+			else
+				presorted = false;
+
 			itemidptr->alignedlen = MAXALIGN(size);
 			totallen += itemidptr->alignedlen;
 			newitemids[nused] = *lp;
@@ -932,7 +1142,10 @@ PageIndexMultiDelete(Page page, OffsetNumber *itemnos, int nitems)
 	phdr->pd_lower = SizeOfPageHeaderData + nused * sizeof(ItemIdData);
 
 	/* and compactify the tuple data */
-	compactify_tuples(itemidbase, nused, page);
+	if (nused > 0)
+		compactify_tuples(itemidbase, nused, page, presorted);
+	else
+		phdr->pd_upper = pd_special;
 }
 
 
-- 
2.25.1

