On 10/16/18, Amit Kapila <amit.kapil...@gmail.com> wrote:
> I think we can avoid using prevBlockNumber and try_every_block, if we
> maintain a small cache which tells whether the particular block is
> tried or not.  What I am envisioning is that while finding the block
> with free space, if we came to know that the relation in question is
> small enough that it doesn't have FSM, we can perform a local_search.
> In local_seach, we can enquire the cache for any block that we can try
> and if we find any block, we can try inserting in that block,
> otherwise, we need to extend the relation.  One simple way to imagine
> such a cache would be an array of structure and structure has blkno
> and status fields.  After we get the usable block, we need to clear
> the cache, if exists.

Here is the design I've implemented in the attached v6. There is more
code than v5, but there's a cleaner separation between freespace.c and
hio.c, as you preferred. I also think it's more robust. I've expended
some effort to avoid doing unnecessary system calls to get the number
of blocks.
--

For the local, in-memory map, maintain a static array of status
markers, of fixed-length HEAP_FSM_CREATION_THRESHOLD, indexed by block
number. This is populated every time we call GetPageWithFreeSpace() on
small tables with no FSM. The statuses are

'zero' (beyond the relation)
'available to try'
'tried already'

Example for a 4-page heap:

01234567
AAAA0000

If we try block 3 and there is no space, we set it to 'tried' and next
time through the loop we'll try 2, etc:

01234567
AAAT0000

If we try all available blocks, we will extend the relation. As in the
master branch, first we call GetPageWithFreeSpace() again to see if
another backend extended the relation to 5 blocks while we were
waiting for the lock. If we find a new block, we will mark the new
block available and leave the rest alone:

01234567
TTTTA000

On the odd chance we still can't insert into the new block, we'll skip
checking any others and we'll redo the logic to extend the relation.

If we're about to successfully return a buffer, whether from an
existing block, or by extension, we clear the local map.

Once this is in shape, I'll do some performance testing.

-John Naylor
From 529fa1f57946d70736b2304c2883213e45f7c077 Mon Sep 17 00:00:00 2001
From: John Naylor <jcnay...@gmail.com>
Date: Mon, 22 Oct 2018 13:39:25 +0700
Subject: [PATCH v6] Avoid creation of the free space map for small tables.

The FSM isn't created if the heap has fewer than 8 blocks. If the last
known good block has insufficient space, try every block before extending
the heap.

If a heap with a FSM is truncated back to below the threshold, the FSM
stays around and can be used as usual.
---
 contrib/pageinspect/expected/page.out     |  77 ++++----
 contrib/pageinspect/sql/page.sql          |  33 ++--
 src/backend/access/brin/brin.c            |   2 +-
 src/backend/access/brin/brin_pageops.c    |   8 +-
 src/backend/access/heap/hio.c             |  57 ++++--
 src/backend/commands/vacuumlazy.c         |  17 +-
 src/backend/storage/freespace/freespace.c | 224 +++++++++++++++++++++-
 src/backend/storage/freespace/indexfsm.c  |   4 +-
 src/include/storage/freespace.h           |   7 +-
 9 files changed, 344 insertions(+), 85 deletions(-)

diff --git a/contrib/pageinspect/expected/page.out b/contrib/pageinspect/expected/page.out
index 3fcd9fbe6d..83e5910453 100644
--- a/contrib/pageinspect/expected/page.out
+++ b/contrib/pageinspect/expected/page.out
@@ -1,48 +1,69 @@
 CREATE EXTENSION pageinspect;
-CREATE TABLE test1 (a int, b int);
-INSERT INTO test1 VALUES (16777217, 131584);
-VACUUM test1;  -- set up FSM
+CREATE TABLE test_rel_forks (a int);
+-- Make sure there are enough blocks in the heap for the FSM to be created.
+INSERT INTO test_rel_forks SELECT g from generate_series(1,10000) g;
+-- set up FSM and VM
+VACUUM test_rel_forks;
 -- The page contents can vary, so just test that it can be read
 -- successfully, but don't keep the output.
-SELECT octet_length(get_raw_page('test1', 'main', 0)) AS main_0;
+SELECT octet_length(get_raw_page('test_rel_forks', 'main', 0)) AS main_0;
  main_0 
 --------
    8192
 (1 row)
 
-SELECT octet_length(get_raw_page('test1', 'main', 1)) AS main_1;
-ERROR:  block number 1 is out of range for relation "test1"
-SELECT octet_length(get_raw_page('test1', 'fsm', 0)) AS fsm_0;
+SELECT octet_length(get_raw_page('test_rel_forks', 'main', 100)) AS main_100;
+ERROR:  block number 100 is out of range for relation "test_rel_forks"
+SELECT octet_length(get_raw_page('test_rel_forks', 'fsm', 0)) AS fsm_0;
  fsm_0 
 -------
   8192
 (1 row)
 
-SELECT octet_length(get_raw_page('test1', 'fsm', 1)) AS fsm_1;
- fsm_1 
--------
-  8192
-(1 row)
-
-SELECT octet_length(get_raw_page('test1', 'vm', 0)) AS vm_0;
+SELECT octet_length(get_raw_page('test_rel_forks', 'fsm', 10)) AS fsm_10;
+ERROR:  block number 10 is out of range for relation "test_rel_forks"
+SELECT octet_length(get_raw_page('test_rel_forks', 'vm', 0)) AS vm_0;
  vm_0 
 ------
  8192
 (1 row)
 
-SELECT octet_length(get_raw_page('test1', 'vm', 1)) AS vm_1;
-ERROR:  block number 1 is out of range for relation "test1"
+SELECT octet_length(get_raw_page('test_rel_forks', 'vm', 1)) AS vm_1;
+ERROR:  block number 1 is out of range for relation "test_rel_forks"
 SELECT octet_length(get_raw_page('xxx', 'main', 0));
 ERROR:  relation "xxx" does not exist
-SELECT octet_length(get_raw_page('test1', 'xxx', 0));
+SELECT octet_length(get_raw_page('test_rel_forks', 'xxx', 0));
 ERROR:  invalid fork name
 HINT:  Valid fork names are "main", "fsm", "vm", and "init".
-SELECT get_raw_page('test1', 0) = get_raw_page('test1', 'main', 0);
+SELECT * FROM fsm_page_contents(get_raw_page('test_rel_forks', 'fsm', 0));
+ fsm_page_contents 
+-------------------
+ 0: 192           +
+ 1: 192           +
+ 3: 192           +
+ 7: 192           +
+ 15: 192          +
+ 31: 192          +
+ 63: 192          +
+ 127: 192         +
+ 255: 192         +
+ 511: 192         +
+ 1023: 192        +
+ 2047: 192        +
+ 4095: 192        +
+ fp_next_slot: 0  +
+ 
+(1 row)
+
+SELECT get_raw_page('test_rel_forks', 0) = get_raw_page('test_rel_forks', 'main', 0);
  ?column? 
 ----------
  t
 (1 row)
 
+DROP TABLE test_rel_forks;
+CREATE TABLE test1 (a int, b int);
+INSERT INTO test1 VALUES (16777217, 131584);
 SELECT pagesize, version FROM page_header(get_raw_page('test1', 0));
  pagesize | version 
 ----------+---------
@@ -62,26 +83,6 @@ SELECT tuple_data_split('test1'::regclass, t_data, t_infomask, t_infomask2, t_bi
  {"\\x01000001","\\x00020200"}
 (1 row)
 
-SELECT * FROM fsm_page_contents(get_raw_page('test1', 'fsm', 0));
- fsm_page_contents 
--------------------
- 0: 254           +
- 1: 254           +
- 3: 254           +
- 7: 254           +
- 15: 254          +
- 31: 254          +
- 63: 254          +
- 127: 254         +
- 255: 254         +
- 511: 254         +
- 1023: 254        +
- 2047: 254        +
- 4095: 254        +
- fp_next_slot: 0  +
- 
-(1 row)
-
 DROP TABLE test1;
 -- check that using any of these functions with a partitioned table or index
 -- would fail
diff --git a/contrib/pageinspect/sql/page.sql b/contrib/pageinspect/sql/page.sql
index 8ac9991837..ee811759d5 100644
--- a/contrib/pageinspect/sql/page.sql
+++ b/contrib/pageinspect/sql/page.sql
@@ -1,26 +1,35 @@
 CREATE EXTENSION pageinspect;
 
-CREATE TABLE test1 (a int, b int);
-INSERT INTO test1 VALUES (16777217, 131584);
+CREATE TABLE test_rel_forks (a int);
+-- Make sure there are enough blocks in the heap for the FSM to be created.
+INSERT INTO test_rel_forks SELECT g from generate_series(1,10000) g;
 
-VACUUM test1;  -- set up FSM
+-- set up FSM and VM
+VACUUM test_rel_forks;
 
 -- The page contents can vary, so just test that it can be read
 -- successfully, but don't keep the output.
 
-SELECT octet_length(get_raw_page('test1', 'main', 0)) AS main_0;
-SELECT octet_length(get_raw_page('test1', 'main', 1)) AS main_1;
+SELECT octet_length(get_raw_page('test_rel_forks', 'main', 0)) AS main_0;
+SELECT octet_length(get_raw_page('test_rel_forks', 'main', 100)) AS main_100;
 
-SELECT octet_length(get_raw_page('test1', 'fsm', 0)) AS fsm_0;
-SELECT octet_length(get_raw_page('test1', 'fsm', 1)) AS fsm_1;
+SELECT octet_length(get_raw_page('test_rel_forks', 'fsm', 0)) AS fsm_0;
+SELECT octet_length(get_raw_page('test_rel_forks', 'fsm', 10)) AS fsm_10;
 
-SELECT octet_length(get_raw_page('test1', 'vm', 0)) AS vm_0;
-SELECT octet_length(get_raw_page('test1', 'vm', 1)) AS vm_1;
+SELECT octet_length(get_raw_page('test_rel_forks', 'vm', 0)) AS vm_0;
+SELECT octet_length(get_raw_page('test_rel_forks', 'vm', 1)) AS vm_1;
 
 SELECT octet_length(get_raw_page('xxx', 'main', 0));
-SELECT octet_length(get_raw_page('test1', 'xxx', 0));
+SELECT octet_length(get_raw_page('test_rel_forks', 'xxx', 0));
+
+SELECT * FROM fsm_page_contents(get_raw_page('test_rel_forks', 'fsm', 0));
+
+SELECT get_raw_page('test_rel_forks', 0) = get_raw_page('test_rel_forks', 'main', 0);
 
-SELECT get_raw_page('test1', 0) = get_raw_page('test1', 'main', 0);
+DROP TABLE test_rel_forks;
+
+CREATE TABLE test1 (a int, b int);
+INSERT INTO test1 VALUES (16777217, 131584);
 
 SELECT pagesize, version FROM page_header(get_raw_page('test1', 0));
 
@@ -29,8 +38,6 @@ SELECT page_checksum(get_raw_page('test1', 0), 0) IS NOT NULL AS silly_checksum_
 SELECT tuple_data_split('test1'::regclass, t_data, t_infomask, t_infomask2, t_bits)
     FROM heap_page_items(get_raw_page('test1', 0));
 
-SELECT * FROM fsm_page_contents(get_raw_page('test1', 'fsm', 0));
-
 DROP TABLE test1;
 
 -- check that using any of these functions with a partitioned table or index
diff --git a/src/backend/access/brin/brin.c b/src/backend/access/brin/brin.c
index e95fbbcea7..7c5b1af764 100644
--- a/src/backend/access/brin/brin.c
+++ b/src/backend/access/brin/brin.c
@@ -1148,7 +1148,7 @@ terminate_brin_buildstate(BrinBuildState *state)
 		freespace = PageGetFreeSpace(page);
 		blk = BufferGetBlockNumber(state->bs_currentInsertBuf);
 		ReleaseBuffer(state->bs_currentInsertBuf);
-		RecordPageWithFreeSpace(state->bs_irel, blk, freespace);
+		RecordPageWithFreeSpace(state->bs_irel, blk, freespace, InvalidBlockNumber);
 		FreeSpaceMapVacuumRange(state->bs_irel, blk, blk + 1);
 	}
 
diff --git a/src/backend/access/brin/brin_pageops.c b/src/backend/access/brin/brin_pageops.c
index 040cb62e55..70d93878ba 100644
--- a/src/backend/access/brin/brin_pageops.c
+++ b/src/backend/access/brin/brin_pageops.c
@@ -310,7 +310,7 @@ brin_doupdate(Relation idxrel, BlockNumber pagesPerRange,
 
 		if (extended)
 		{
-			RecordPageWithFreeSpace(idxrel, newblk, freespace);
+			RecordPageWithFreeSpace(idxrel, newblk, freespace, InvalidBlockNumber);
 			FreeSpaceMapVacuumRange(idxrel, newblk, newblk + 1);
 		}
 
@@ -461,7 +461,7 @@ brin_doinsert(Relation idxrel, BlockNumber pagesPerRange,
 
 	if (extended)
 	{
-		RecordPageWithFreeSpace(idxrel, blk, freespace);
+		RecordPageWithFreeSpace(idxrel, blk, freespace, InvalidBlockNumber);
 		FreeSpaceMapVacuumRange(idxrel, blk, blk + 1);
 	}
 
@@ -654,7 +654,7 @@ brin_page_cleanup(Relation idxrel, Buffer buf)
 
 	/* Measure free space and record it */
 	RecordPageWithFreeSpace(idxrel, BufferGetBlockNumber(buf),
-							br_page_get_freespace(page));
+							br_page_get_freespace(page), InvalidBlockNumber);
 }
 
 /*
@@ -895,7 +895,7 @@ brin_initialize_empty_new_buffer(Relation idxrel, Buffer buffer)
 	 * pages whose FSM records were forgotten in a crash.
 	 */
 	RecordPageWithFreeSpace(idxrel, BufferGetBlockNumber(buffer),
-							br_page_get_freespace(page));
+							br_page_get_freespace(page), InvalidBlockNumber);
 }
 
 
diff --git a/src/backend/access/heap/hio.c b/src/backend/access/heap/hio.c
index b8b5871559..092ca5b571 100644
--- a/src/backend/access/heap/hio.c
+++ b/src/backend/access/heap/hio.c
@@ -24,6 +24,8 @@
 #include "storage/lmgr.h"
 #include "storage/smgr.h"
 
+/*#define TRACE_TARGETBLOCK */
+
 
 /*
  * RelationPutHeapTuple - place tuple at specified page
@@ -239,8 +241,12 @@ RelationAddExtraBlocks(Relation relation, BulkInsertState bistate)
 		 * Immediately update the bottom level of the FSM.  This has a good
 		 * chance of making this page visible to other concurrently inserting
 		 * backends, and we want that to happen without delay.
+		 *
+		 * Since we know the table will end up with more than extraBlocks
+		 * number of pages, we pass that number to avoid an unnecessary
+		 * system call and to make sure the FSM has been created.
 		 */
-		RecordPageWithFreeSpace(relation, blockNum, freespace);
+		RecordPageWithFreeSpace(relation, blockNum, freespace, extraBlocks);
 	}
 	while (--extraBlocks > 0);
 
@@ -378,24 +384,15 @@ RelationGetBufferForTuple(Relation relation, Size len,
 		 * target.
 		 */
 		targetBlock = GetPageWithFreeSpace(relation, len + saveFreeSpace);
-
-		/*
-		 * If the FSM knows nothing of the rel, try the last page before we
-		 * give up and extend.  This avoids one-tuple-per-page syndrome during
-		 * bootstrapping or in a recently-started system.
-		 */
-		if (targetBlock == InvalidBlockNumber)
-		{
-			BlockNumber nblocks = RelationGetNumberOfBlocks(relation);
-
-			if (nblocks > 0)
-				targetBlock = nblocks - 1;
-		}
 	}
 
 loop:
 	while (targetBlock != InvalidBlockNumber)
 	{
+
+#ifdef TRACE_TARGETBLOCK
+		elog(DEBUG1, "Attempting block %u", targetBlock);
+#endif
 		/*
 		 * Read and exclusive-lock the target block, as well as the other
 		 * block if one was given, taking suitable care with lock ordering and
@@ -484,6 +481,16 @@ loop:
 		{
 			/* use this page as future insert target, too */
 			RelationSetTargetBlock(relation, targetBlock);
+
+			/*
+			 * In case we used an in-memory map of available blocks, reset
+			 * it for next use.
+			 */
+			ClearLocalMap();
+
+#ifdef TRACE_TARGETBLOCK
+			elog(DEBUG1, "Returning buffer for block %u", targetBlock);
+#endif
 			return buffer;
 		}
 
@@ -557,11 +564,18 @@ loop:
 				goto loop;
 			}
 
+#ifdef TRACE_TARGETBLOCK
+			elog(DEBUG1, "Bulk-extending relation");
+#endif
 			/* Time to bulk-extend. */
 			RelationAddExtraBlocks(relation, bistate);
 		}
 	}
 
+#ifdef TRACE_TARGETBLOCK
+	elog(DEBUG1, "Extending relation");
+#endif
+
 	/*
 	 * In addition to whatever extension we performed above, we always add at
 	 * least one block to satisfy our own request.
@@ -600,10 +614,11 @@ loop:
 	 * risk wiping out valid data).
 	 */
 	page = BufferGetPage(buffer);
+	targetBlock = BufferGetBlockNumber(buffer);
 
 	if (!PageIsNew(page))
 		elog(ERROR, "page %u of relation \"%s\" should be empty but is not",
-			 BufferGetBlockNumber(buffer),
+			 targetBlock,
 			 RelationGetRelationName(relation));
 
 	PageInit(page, BufferGetPageSize(buffer), 0);
@@ -623,7 +638,17 @@ loop:
 	 * current backend to make more insertions or not, which is probably a
 	 * good bet most of the time.  So for now, don't add it to FSM yet.
 	 */
-	RelationSetTargetBlock(relation, BufferGetBlockNumber(buffer));
+	RelationSetTargetBlock(relation, targetBlock);
+
+	/*
+	 * In case we used an in-memory map of available blocks, reset
+	 * it for next use.
+	 */
+	ClearLocalMap();
+
+#ifdef TRACE_TARGETBLOCK
+	elog(DEBUG1, "Returning buffer for block %u", targetBlock);
+#endif
 
 	return buffer;
 }
diff --git a/src/backend/commands/vacuumlazy.c b/src/backend/commands/vacuumlazy.c
index 8996d366e9..9f4d8adcdb 100644
--- a/src/backend/commands/vacuumlazy.c
+++ b/src/backend/commands/vacuumlazy.c
@@ -154,7 +154,7 @@ static BufferAccessStrategy vac_strategy;
 static void lazy_scan_heap(Relation onerel, int options,
 			   LVRelStats *vacrelstats, Relation *Irel, int nindexes,
 			   bool aggressive);
-static void lazy_vacuum_heap(Relation onerel, LVRelStats *vacrelstats);
+static void lazy_vacuum_heap(Relation onerel, LVRelStats *vacrelstats, BlockNumber nblocks);
 static bool lazy_check_needs_freeze(Buffer buf, bool *hastup);
 static void lazy_vacuum_index(Relation indrel,
 				  IndexBulkDeleteResult **stats,
@@ -759,7 +759,7 @@ lazy_scan_heap(Relation onerel, int options, LVRelStats *vacrelstats,
 			pgstat_progress_update_multi_param(2, hvp_index, hvp_val);
 
 			/* Remove tuples from heap */
-			lazy_vacuum_heap(onerel, vacrelstats);
+			lazy_vacuum_heap(onerel, vacrelstats, nblocks);
 
 			/*
 			 * Forget the now-vacuumed tuples, and press on, but be careful
@@ -897,7 +897,7 @@ lazy_scan_heap(Relation onerel, int options, LVRelStats *vacrelstats,
 			MarkBufferDirty(buf);
 			UnlockReleaseBuffer(buf);
 
-			RecordPageWithFreeSpace(onerel, blkno, freespace);
+			RecordPageWithFreeSpace(onerel, blkno, freespace, nblocks);
 			continue;
 		}
 
@@ -936,7 +936,7 @@ lazy_scan_heap(Relation onerel, int options, LVRelStats *vacrelstats,
 			}
 
 			UnlockReleaseBuffer(buf);
-			RecordPageWithFreeSpace(onerel, blkno, freespace);
+			RecordPageWithFreeSpace(onerel, blkno, freespace, nblocks);
 			continue;
 		}
 
@@ -1339,7 +1339,7 @@ lazy_scan_heap(Relation onerel, int options, LVRelStats *vacrelstats,
 		 * taken if there are no indexes.)
 		 */
 		if (vacrelstats->num_dead_tuples == prev_dead_count)
-			RecordPageWithFreeSpace(onerel, blkno, freespace);
+			RecordPageWithFreeSpace(onerel, blkno, freespace, nblocks);
 	}
 
 	/* report that everything is scanned and vacuumed */
@@ -1401,7 +1401,7 @@ lazy_scan_heap(Relation onerel, int options, LVRelStats *vacrelstats,
 		/* Remove tuples from heap */
 		pgstat_progress_update_param(PROGRESS_VACUUM_PHASE,
 									 PROGRESS_VACUUM_PHASE_VACUUM_HEAP);
-		lazy_vacuum_heap(onerel, vacrelstats);
+		lazy_vacuum_heap(onerel, vacrelstats, nblocks);
 		vacrelstats->num_index_scans++;
 	}
 
@@ -1472,9 +1472,10 @@ lazy_scan_heap(Relation onerel, int options, LVRelStats *vacrelstats,
  * Note: the reason for doing this as a second pass is we cannot remove
  * the tuples until we've removed their index entries, and we want to
  * process index entry removal in batches as large as possible.
+ * Note: nblocks is passed as an optimization for RecordPageWithFreeSpace().
  */
 static void
-lazy_vacuum_heap(Relation onerel, LVRelStats *vacrelstats)
+lazy_vacuum_heap(Relation onerel, LVRelStats *vacrelstats, BlockNumber nblocks)
 {
 	int			tupindex;
 	int			npages;
@@ -1511,7 +1512,7 @@ lazy_vacuum_heap(Relation onerel, LVRelStats *vacrelstats)
 		freespace = PageGetHeapFreeSpace(page);
 
 		UnlockReleaseBuffer(buf);
-		RecordPageWithFreeSpace(onerel, tblk, freespace);
+		RecordPageWithFreeSpace(onerel, tblk, freespace, nblocks);
 		npages++;
 	}
 
diff --git a/src/backend/storage/freespace/freespace.c b/src/backend/storage/freespace/freespace.c
index 7c4ad1c449..1005459f7c 100644
--- a/src/backend/storage/freespace/freespace.c
+++ b/src/backend/storage/freespace/freespace.c
@@ -76,6 +76,11 @@
 #define FSM_ROOT_LEVEL	(FSM_TREE_DEPTH - 1)
 #define FSM_BOTTOM_LEVEL 0
 
+/* Status codes for local map. */
+#define FSM_LOCAL_ZERO	0x00	/* Beyond the end of the relation */
+#define FSM_LOCAL_AVAIL	0x01	/* Available to try */
+#define FSM_LOCAL_TRIED	0x02	/* Already tried, not enough space */
+
 /*
  * The internal FSM routines work on a logical addressing scheme. Each
  * level of the tree can be thought of as a separately addressable file.
@@ -89,6 +94,9 @@ typedef struct
 /* Address of the root page. */
 static const FSMAddress FSM_ROOT_ADDRESS = {FSM_ROOT_LEVEL, 0};
 
+/* Local map of block numbers to try. */
+static char FSMLocalMap[HEAP_FSM_CREATION_THRESHOLD] = {FSM_LOCAL_ZERO};
+
 /* functions to navigate the tree */
 static FSMAddress fsm_get_child(FSMAddress parent, uint16 slot);
 static FSMAddress fsm_get_parent(FSMAddress child, uint16 *slot);
@@ -111,6 +119,10 @@ static BlockNumber fsm_search(Relation rel, uint8 min_cat);
 static uint8 fsm_vacuum_page(Relation rel, FSMAddress addr,
 				BlockNumber start, BlockNumber end,
 				bool *eof);
+static bool fsm_allow_writes(Relation rel, BlockNumber heapblk,
+				BlockNumber nblocks, BlockNumber *get_nblocks);
+static void fsm_local_set(Relation rel, BlockNumber nblocks);
+static BlockNumber fsm_local_search(void);
 
 
 /******** Public API ********/
@@ -132,8 +144,35 @@ BlockNumber
 GetPageWithFreeSpace(Relation rel, Size spaceNeeded)
 {
 	uint8		min_cat = fsm_space_needed_to_cat(spaceNeeded);
+	BlockNumber target_block,
+				nblocks;
+
+	/* First try the FSM, if it exists. */
+	target_block = fsm_search(rel, min_cat);
 
-	return fsm_search(rel, min_cat);
+	if (target_block == InvalidBlockNumber &&
+		rel->rd_rel->relkind == RELKIND_RELATION)
+	{
+		nblocks = RelationGetNumberOfBlocks(rel);
+
+		if (nblocks > HEAP_FSM_CREATION_THRESHOLD)
+		{
+			/*
+			 * If the FSM knows nothing of the rel, try the last page before
+			 * we give up and extend.  This avoids one-tuple-per-page syndrome
+			 * during bootstrapping or in a recently-started system.
+			 */
+			target_block = nblocks - 1;
+		}
+		else if (nblocks > 0)
+		{
+			/* Create or update local map and get first candidate block. */
+			fsm_local_set(rel, nblocks);
+			target_block = fsm_local_search();
+		}
+	}
+
+	return target_block;
 }
 
 /*
@@ -154,6 +193,32 @@ RecordAndGetPageWithFreeSpace(Relation rel, BlockNumber oldPage,
 	FSMAddress	addr;
 	uint16		slot;
 	int			search_slot;
+	BlockNumber nblocks = InvalidBlockNumber;
+
+	if (oldPage < HEAP_FSM_CREATION_THRESHOLD &&
+		rel->rd_rel->relkind == RELKIND_RELATION)
+	{
+		/* First try the local map, if it exists. */
+		if (FSMLocalMap[oldPage] == FSM_LOCAL_AVAIL)
+		{
+			FSMLocalMap[oldPage] = FSM_LOCAL_TRIED;
+			return fsm_local_search();
+		}
+
+		/*
+		 * If we have neither a local map nor an FSM, we probably just
+		 * tried the target block in the relcache and failed, so we'll
+		 * need to create the local map.
+		 */
+		if (!fsm_allow_writes(rel, oldPage, InvalidBlockNumber, &nblocks))
+		{
+			/* Create local map and get first candidate block. */
+			fsm_local_set(rel, nblocks);
+			return fsm_local_search();
+		}
+	}
+
+	/* Normal FSM logic follows */
 
 	/* Get the location of the FSM byte representing the heap block */
 	addr = fsm_get_location(oldPage, &slot);
@@ -178,11 +243,17 @@ RecordAndGetPageWithFreeSpace(Relation rel, BlockNumber oldPage,
  * FreeSpaceMapVacuum call, which updates the upper level pages.
  */
 void
-RecordPageWithFreeSpace(Relation rel, BlockNumber heapBlk, Size spaceAvail)
+RecordPageWithFreeSpace(Relation rel, BlockNumber heapBlk,
+						Size spaceAvail, BlockNumber nblocks)
 {
 	int			new_cat = fsm_space_avail_to_cat(spaceAvail);
 	FSMAddress	addr;
 	uint16		slot;
+	BlockNumber dummy;
+
+	/* Note: We do not have a local map to update. */
+	if (!fsm_allow_writes(rel, heapBlk, nblocks, &dummy))
+		return;
 
 	/* Get the location of the FSM byte representing the heap block */
 	addr = fsm_get_location(heapBlk, &slot);
@@ -190,6 +261,16 @@ RecordPageWithFreeSpace(Relation rel, BlockNumber heapBlk, Size spaceAvail)
 	fsm_set_and_search(rel, addr, slot, new_cat, 0);
 }
 
+/*
+ * Clear the local map. We must call this when we have found a block with
+ * enough free space, or when we extend the relation.
+ */
+void
+ClearLocalMap(void)
+{
+	memset(FSMLocalMap, 0, sizeof(FSMLocalMap));
+}
+
 /*
  * XLogRecordPageWithFreeSpace - like RecordPageWithFreeSpace, for use in
  *		WAL replay
@@ -204,11 +285,35 @@ XLogRecordPageWithFreeSpace(RelFileNode rnode, BlockNumber heapBlk,
 	BlockNumber blkno;
 	Buffer		buf;
 	Page		page;
+	bool		write_to_fsm;
 
 	/* Get the location of the FSM byte representing the heap block */
 	addr = fsm_get_location(heapBlk, &slot);
 	blkno = fsm_logical_to_physical(addr);
 
+	/* This is meant to mirror the logic in fsm_allow_writes() */
+	if (heapBlk > HEAP_FSM_CREATION_THRESHOLD)
+		write_to_fsm = true;
+	else
+	{
+		/* Open the relation at smgr level */
+		SMgrRelation smgr = smgropen(rnode, InvalidBackendId);
+
+		if (smgrexists(smgr, FSM_FORKNUM))
+			write_to_fsm = true;
+		else
+		{
+			BlockNumber heap_nblocks = smgrnblocks(smgr, MAIN_FORKNUM);
+			if (heap_nblocks > HEAP_FSM_CREATION_THRESHOLD)
+				write_to_fsm = true;
+			else
+				write_to_fsm = false;
+		}
+	}
+
+	if (!write_to_fsm)
+		return;
+
 	/* If the page doesn't exist already, extend */
 	buf = XLogReadBufferExtended(rnode, FSM_FORKNUM, blkno, RBM_ZERO_ON_ERROR);
 	LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
@@ -904,3 +1009,118 @@ fsm_vacuum_page(Relation rel, FSMAddress addr,
 
 	return max_avail;
 }
+
+/*
+ * For heaps we prevent extension of the FSM unless the number of pages
+ * exceeds HEAP_FSM_CREATION_THRESHOLD. For tables that don't already have
+ * a FSM, this will save an inode and a few kB of space.
+ *
+ * XXX The API is a little awkward -- if the caller has a valid nblocks,
+ * it can pass it as an optimization to avoid a system call. If the caller
+ * passes InvalidBlockNumber and receives a false return value, it can
+ * get an up-to-date relation size from get_nblocks.
+ */
+static bool
+fsm_allow_writes(Relation rel, BlockNumber heapblk,
+				 BlockNumber nblocks, BlockNumber *get_nblocks)
+{
+	bool		skip_get_nblocks;
+
+	if (heapblk > HEAP_FSM_CREATION_THRESHOLD)
+		return true;
+
+	/* Index rels can always create an FSM. */
+	if (rel->rd_rel->relkind != RELKIND_RELATION)
+		return true;
+
+	/*
+	 * If the caller knows nblocks, we can avoid a system call later.
+	 * If it doesn't, maybe we have relpages from a previous VACUUM.
+	 * Since the table may have extended since then, we still have to
+	 * count the pages later if we can't return now.
+	 */
+	if (nblocks != InvalidBlockNumber)
+	{
+		if (nblocks > HEAP_FSM_CREATION_THRESHOLD)
+			return true;
+		else
+			skip_get_nblocks = true;
+	}
+	else
+	{
+		if (rel->rd_rel->relpages != InvalidBlockNumber &&
+			rel->rd_rel->relpages > HEAP_FSM_CREATION_THRESHOLD)
+			return true;
+		else
+			skip_get_nblocks = false;
+	}
+
+	RelationOpenSmgr(rel);
+	if (smgrexists(rel->rd_smgr, FSM_FORKNUM))
+		return true;
+
+	if (skip_get_nblocks)
+		return false;
+
+	/* last resort */
+	*get_nblocks = RelationGetNumberOfBlocks(rel);
+	if (*get_nblocks > HEAP_FSM_CREATION_THRESHOLD)
+		return true;
+	else
+		return false;
+}
+
+/*
+ * Set or update the local map of blocks to try, for when there is no FSM.
+ * It's designed specifically to be idempotent, so the heap insert code
+ * should only clear the map when it has found a free block or when it
+ * has to extend the relation.  That way, if we have tried all blocks we
+ * know about, but the rel has been extended by another backend, this will
+ * set only newly added blocks to 'available'.
+ */
+static void
+fsm_local_set(Relation rel, BlockNumber nblocks)
+{
+	BlockNumber blkno,
+				cached_target_block;
+
+	/*
+	 * If the blkno is beyond the end of the relation, the status should
+	 * be zero already, but make sure it is.  If the blkno is within the
+	 * relation, mark it available unless it's already been tried.
+	 */
+	for (blkno = 0; blkno < HEAP_FSM_CREATION_THRESHOLD; blkno++)
+	{
+		if (blkno < nblocks)
+			FSMLocalMap[blkno] |= FSM_LOCAL_AVAIL;
+		else
+			FSMLocalMap[blkno] = FSM_LOCAL_ZERO;
+	}
+
+	/* Set the status of the cached target block to 'tried'. */
+	cached_target_block = RelationGetTargetBlock(rel);
+	if (cached_target_block != InvalidBlockNumber &&
+		cached_target_block < HEAP_FSM_CREATION_THRESHOLD)
+		FSMLocalMap[cached_target_block] = FSM_LOCAL_TRIED;
+}
+
+/*
+ * Search the local map for an available block to try, in descending order.
+ *
+ * For use when there is no FSM.
+ */
+static BlockNumber
+fsm_local_search(void)
+{
+	BlockNumber		target_block = HEAP_FSM_CREATION_THRESHOLD;
+
+	do
+	{
+		target_block--;
+		if (FSMLocalMap[target_block] == FSM_LOCAL_AVAIL)
+			return target_block;
+	}
+	while (target_block > 0);
+
+	return InvalidBlockNumber;
+}
diff --git a/src/backend/storage/freespace/indexfsm.c b/src/backend/storage/freespace/indexfsm.c
index e21047b96f..d8fd29a7eb 100644
--- a/src/backend/storage/freespace/indexfsm.c
+++ b/src/backend/storage/freespace/indexfsm.c
@@ -51,7 +51,7 @@ GetFreeIndexPage(Relation rel)
 void
 RecordFreeIndexPage(Relation rel, BlockNumber freeBlock)
 {
-	RecordPageWithFreeSpace(rel, freeBlock, BLCKSZ - 1);
+	RecordPageWithFreeSpace(rel, freeBlock, BLCKSZ - 1, InvalidBlockNumber);
 }
 
 
@@ -61,7 +61,7 @@ RecordFreeIndexPage(Relation rel, BlockNumber freeBlock)
 void
 RecordUsedIndexPage(Relation rel, BlockNumber usedBlock)
 {
-	RecordPageWithFreeSpace(rel, usedBlock, 0);
+	RecordPageWithFreeSpace(rel, usedBlock, 0, InvalidBlockNumber);
 }
 
 /*
diff --git a/src/include/storage/freespace.h b/src/include/storage/freespace.h
index 726eb30fb8..0d130ddfe0 100644
--- a/src/include/storage/freespace.h
+++ b/src/include/storage/freespace.h
@@ -18,6 +18,10 @@
 #include "storage/relfilenode.h"
 #include "utils/relcache.h"
 
+/* Only extend a heap's FSM if the heap has greater than this many blocks */
+/* TODO: Performance-test different values. */
+#define HEAP_FSM_CREATION_THRESHOLD 8
+
 /* prototypes for public functions in freespace.c */
 extern Size GetRecordedFreeSpace(Relation rel, BlockNumber heapBlk);
 extern BlockNumber GetPageWithFreeSpace(Relation rel, Size spaceNeeded);
@@ -26,7 +30,8 @@ extern BlockNumber RecordAndGetPageWithFreeSpace(Relation rel,
 							  Size oldSpaceAvail,
 							  Size spaceNeeded);
 extern void RecordPageWithFreeSpace(Relation rel, BlockNumber heapBlk,
-						Size spaceAvail);
+						Size spaceAvail, BlockNumber nblocks);
+extern void ClearLocalMap(void);
 extern void XLogRecordPageWithFreeSpace(RelFileNode rnode, BlockNumber heapBlk,
 							Size spaceAvail);
 
-- 
2.17.1

Reply via email to