---
 src/backend/access/rmgrdesc/umbradesc.c       |  11 +
 src/backend/access/transam/umbra_xlog.c       |  43 +
 src/backend/postmaster/Makefile               |   3 +-
 src/backend/postmaster/bgworker.c             |   4 +
 src/backend/postmaster/mapcompactor.c         | 151 ++++
 src/backend/postmaster/mapwriter.c            |  14 +
 src/backend/postmaster/meson.build            |   1 +
 src/backend/storage/map/map.c                 |  12 +
 src/backend/storage/map/mapbgproc.c           | 748 +++++++++++++++++-
 src/backend/storage/map/mapinit.c             |  81 ++
 src/backend/storage/map/mapsuper.c            | 163 ++++
 src/backend/storage/smgr/smgr.c               |  44 ++
 src/backend/storage/smgr/umbra.c              |  24 +
 src/backend/storage/sync/sync.c               | 101 ++-
 .../utils/activity/wait_event_names.txt       |   2 +
 src/backend/utils/adt/pgstatfuncs.c           |  25 +
 src/backend/utils/misc/guc_parameters.dat     |  75 ++
 src/include/access/umbra_xlog.h               |  12 +
 src/include/catalog/pg_proc.dat               |  20 +
 src/include/postmaster/mapwriter.h            |   4 +
 src/include/storage/map.h                     |  20 +
 src/include/storage/map_internal.h            |   1 +
 src/include/storage/mapsuper_internal.h       |  13 +
 src/include/storage/smgr.h                    |   4 +
 src/include/storage/sync.h                    |   3 +-
 src/include/storage/umbra.h                   |   4 +
 src/test/recovery/meson.build                 |   4 +
 .../t/059_umbra_compactor_relocation.pl       |  91 +++
 .../060_umbra_reclaim_checkpoint_counters.pl  |  82 ++
 ...64_umbra_mainfork_internal_reclaim_seg0.pl | 283 +++++++
 ...umbra_mainfork_middle_reclaim_keep_seg0.pl | 356 +++++++++
 31 files changed, 2381 insertions(+), 18 deletions(-)
 create mode 100644 src/backend/postmaster/mapcompactor.c
 create mode 100644 src/test/recovery/t/059_umbra_compactor_relocation.pl
 create mode 100644 src/test/recovery/t/060_umbra_reclaim_checkpoint_counters.pl
 create mode 100644 
src/test/recovery/t/064_umbra_mainfork_internal_reclaim_seg0.pl
 create mode 100644 
src/test/recovery/t/065_umbra_mainfork_middle_reclaim_keep_seg0.pl

diff --git a/src/backend/access/rmgrdesc/umbradesc.c 
b/src/backend/access/rmgrdesc/umbradesc.c
index a6b3e6e55e..07c5112974 100644
--- a/src/backend/access/rmgrdesc/umbradesc.c
+++ b/src/backend/access/rmgrdesc/umbradesc.c
@@ -78,6 +78,14 @@ umbra_desc(StringInfo buf, XLogReaderState *record)
                                                         
xlrec->entries[i].forknum,
                                                         
xlrec->entries[i].nblocks);
        }
+       else if (info == XLOG_UMBRA_RECLAIM_UNLINK)
+       {
+               xl_umbra_reclaim_unlink *xlrec = (xl_umbra_reclaim_unlink *) 
rec;
+               RelPathStr      path = umbra_fork_relpath(xlrec->rlocator, 
xlrec->forknum);
+
+               appendStringInfo(buf, "%s seg %u reclaim_unlink",
+                                                path.str, xlrec->segno);
+       }
 }
 
 const char *
@@ -99,6 +107,9 @@ umbra_identify(uint8 info)
                case XLOG_UMBRA_SKIP_WAL_DENSE_MAP:
                        id = "SKIP_WAL_DENSE_MAP";
                        break;
+               case XLOG_UMBRA_RECLAIM_UNLINK:
+                       id = "RECLAIM_UNLINK";
+                       break;
        }
 
        return id;
diff --git a/src/backend/access/transam/umbra_xlog.c 
b/src/backend/access/transam/umbra_xlog.c
index 186eca102e..dd384a3f04 100644
--- a/src/backend/access/transam/umbra_xlog.c
+++ b/src/backend/access/transam/umbra_xlog.c
@@ -13,6 +13,7 @@
 #include "access/xloginsert.h"
 #include "storage/map.h"
 #include "storage/smgr.h"
+#include "storage/sync.h"
 #include "storage/umbra.h"
 #include "storage/umfile.h"
 
@@ -113,6 +114,23 @@ log_umbra_skip_wal_dense_map(RelFileLocator rlocator,
                                          XLOG_UMBRA_SKIP_WAL_DENSE_MAP | 
XLR_SPECIAL_REL_UPDATE);
 }
 
+XLogRecPtr
+log_umbra_reclaim_unlink(RelFileLocator rlocator, ForkNumber forknum,
+                                                BlockNumber segno)
+{
+       xl_umbra_reclaim_unlink xlrec;
+
+       xlrec.rlocator = rlocator;
+       xlrec.forknum = forknum;
+       xlrec.segno = segno;
+
+       XLogBeginInsert();
+       XLogRegisterData((char *) &xlrec, sizeof(xlrec));
+
+       return XLogInsert(RM_UMBRA_ID,
+                                         XLOG_UMBRA_RECLAIM_UNLINK | 
XLR_SPECIAL_REL_UPDATE);
+}
+
 void
 umbra_redo(XLogReaderState *record)
 {
@@ -317,6 +335,31 @@ umbra_redo(XLogReaderState *record)
                        }
                        break;
 
+               case XLOG_UMBRA_RECLAIM_UNLINK:
+                       {
+                               xl_umbra_reclaim_unlink *xlrec;
+                               FileTag         tag;
+                               char            path[MAXPGPATH];
+                               int                     ret;
+
+                               xlrec = (xl_umbra_reclaim_unlink *) 
XLogRecGetData(record);
+                               tag.handler = SYNC_HANDLER_UMBRA;
+                               tag.forknum = xlrec->forknum;
+                               tag.rlocator = xlrec->rlocator;
+                               tag.segno = (uint64) xlrec->segno;
+
+                               /*
+                                * Recovery consumes reclaim targets eagerly. 
ENOENT is fine
+                                * because replay can race with prior deletion 
on the same path.
+                                */
+                               ret = umunlinkfiletag(&tag, path);
+                               if (ret < 0 && errno != ENOENT)
+                                       ereport(WARNING,
+                                                       
(errcode_for_file_access(),
+                                                        errmsg("could not 
remove file \"%s\": %m", path)));
+                       }
+                       break;
+
                default:
                        elog(PANIC, "umbra_redo: unknown op code %u", info);
        }
diff --git a/src/backend/postmaster/Makefile b/src/backend/postmaster/Makefile
index 05cb330024..f54d704ae3 100644
--- a/src/backend/postmaster/Makefile
+++ b/src/backend/postmaster/Makefile
@@ -32,7 +32,8 @@ OBJS = \
 
 ifeq ($(with_umbra), yes)
 OBJS += \
-       mapwriter.o
+       mapwriter.o \
+       mapcompactor.o
 endif
 
 include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/postmaster/bgworker.c 
b/src/backend/postmaster/bgworker.c
index 45f0abf94a..915de09219 100644
--- a/src/backend/postmaster/bgworker.c
+++ b/src/backend/postmaster/bgworker.c
@@ -175,6 +175,10 @@ static const struct
        {
                .fn_name = "MapWriterMain",
                .fn_addr = MapWriterMain
+       },
+       {
+               .fn_name = "MapCompactorMain",
+               .fn_addr = MapCompactorMain
        }
 #endif
 };
diff --git a/src/backend/postmaster/mapcompactor.c 
b/src/backend/postmaster/mapcompactor.c
new file mode 100644
index 0000000000..f657435849
--- /dev/null
+++ b/src/backend/postmaster/mapcompactor.c
@@ -0,0 +1,151 @@
+/*-------------------------------------------------------------------------
+ *
+ * mapcompactor.c
+ *       Umbra map compactor background worker.
+ *
+ * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ *       src/backend/postmaster/mapcompactor.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include <signal.h>
+#include <unistd.h>
+
+#include "libpq/pqsignal.h"
+#include "miscadmin.h"
+#include "pgstat.h"
+#include "postmaster/bgworker.h"
+#include "postmaster/bgwriter.h"
+#include "postmaster/interrupt.h"
+#include "postmaster/mapwriter.h"
+#include "storage/bufmgr.h"
+#include "storage/condition_variable.h"
+#include "storage/fd.h"
+#include "storage/ipc.h"
+#include "storage/latch.h"
+#include "storage/lwlock.h"
+#include "storage/map.h"
+#include "storage/proc.h"
+#include "storage/procnumber.h"
+#include "storage/procsignal.h"
+#include "storage/smgr.h"
+#include "utils/guc.h"
+#include "utils/hsearch.h"
+#include "utils/memutils.h"
+#include "utils/wait_event.h"
+
+#define MAPCOMPACTOR_HIBERNATE_FACTOR 50
+
+int                    MapCompactorDelay = 200;
+int                    MapCompactorMaxRelations = 8;
+int                    MapCompactorBusyAllocThreshold = 128;
+
+static void
+MapCompactorExitCallback(int code, Datum arg)
+{
+       (void) code;
+       (void) arg;
+       MapStrategyNotifyCompactor(INVALID_PROC_NUMBER);
+}
+
+void
+MapCompactorMain(Datum arg)
+{
+       sigjmp_buf      local_sigjmp_buf;
+       MemoryContext mapcompactor_context;
+       bool            prev_hibernate = false;
+
+       (void) arg;
+       before_shmem_exit(MapCompactorExitCallback, 0);
+
+       pqsignal(SIGHUP, SignalHandlerForConfigReload);
+       pqsignal(SIGINT, SIG_IGN);
+       pqsignal(SIGTERM, SignalHandlerForShutdownRequest);
+       pqsignal(SIGQUIT, SignalHandlerForCrashExit);
+       pqsignal(SIGALRM, SIG_IGN);
+       pqsignal(SIGPIPE, SIG_IGN);
+       pqsignal(SIGUSR1, procsignal_sigusr1_handler);
+       pqsignal(SIGUSR2, SIG_IGN);
+       pqsignal(SIGCHLD, SIG_DFL);
+
+       BackgroundWorkerUnblockSignals();
+       BackgroundWorkerInitializeConnectionByOid(InvalidOid, InvalidOid, 0);
+
+       mapcompactor_context = AllocSetContextCreate(TopMemoryContext,
+                                                                               
                         "Map Compactor",
+                                                                               
                 ALLOCSET_DEFAULT_SIZES);
+       MemoryContextSwitchTo(mapcompactor_context);
+
+       if (sigsetjmp(local_sigjmp_buf, 1) != 0)
+       {
+               error_context_stack = NULL;
+               HOLD_INTERRUPTS();
+               EmitErrorReport();
+
+               LWLockReleaseAll();
+               ConditionVariableCancelSleep();
+               pgstat_report_wait_end();
+               MapBackendExitCleanup();
+               AtEOXact_Buffers(false);
+               AtEOXact_SMgr();
+               AtEOXact_Files(false);
+               AtEOXact_HashTables(false);
+
+               MemoryContextSwitchTo(mapcompactor_context);
+               FlushErrorState();
+               MemoryContextReset(mapcompactor_context);
+               RESUME_INTERRUPTS();
+
+               pg_usleep(1000000L);
+               smgrreleaseall();
+       }
+
+       PG_exception_stack = &local_sigjmp_buf;
+
+       for (;;)
+       {
+               int                     compact_moves = 0;
+               uint32          alloc_pressure = 0;
+               bool            busy_round = false;
+
+               ResetLatch(MyLatch);
+               ProcessMainLoopInterrupts();
+
+               alloc_pressure = MapAllocPressurePeek();
+               busy_round = (MapCompactorBusyAllocThreshold > 0 &&
+                                         alloc_pressure >= (uint32) 
MapCompactorBusyAllocThreshold);
+
+               if (!busy_round && MapCompactorMaxRelations > 0)
+                       compact_moves = 
MapCompactorStep(MapCompactorMaxRelations);
+
+               if (FirstCallSinceLastCheckpoint())
+                       smgrreleaseall();
+
+               MapStrategyNotifyCompactor(MyProcNumber);
+               if (busy_round)
+               {
+                       (void) WaitLatch(MyLatch,
+                                                        WL_LATCH_SET | 
WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
+                                                        MapCompactorDelay * 
MAPCOMPACTOR_HIBERNATE_FACTOR,
+                                                        
WAIT_EVENT_MAPCOMPACTOR_HIBERNATE);
+               }
+               else if (WaitLatch(MyLatch,
+                                                  WL_LATCH_SET | WL_TIMEOUT | 
WL_EXIT_ON_PM_DEATH,
+                                                  MapCompactorDelay,
+                                                  
WAIT_EVENT_MAPCOMPACTOR_MAIN) == WL_TIMEOUT &&
+                                compact_moves == 0 &&
+                                prev_hibernate)
+               {
+                       (void) WaitLatch(MyLatch,
+                                                        WL_LATCH_SET | 
WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
+                                                        MapCompactorDelay * 
MAPCOMPACTOR_HIBERNATE_FACTOR,
+                                                        
WAIT_EVENT_MAPCOMPACTOR_HIBERNATE);
+               }
+               MapStrategyNotifyCompactor(INVALID_PROC_NUMBER);
+               prev_hibernate = (compact_moves == 0 || busy_round);
+       }
+}
diff --git a/src/backend/postmaster/mapwriter.c 
b/src/backend/postmaster/mapwriter.c
index e659b6be94..3cd81815cb 100644
--- a/src/backend/postmaster/mapwriter.c
+++ b/src/backend/postmaster/mapwriter.c
@@ -32,6 +32,7 @@
 #include "storage/procnumber.h"
 #include "storage/procsignal.h"
 #include "storage/smgr.h"
+#include "utils/hsearch.h"
 #include "utils/memutils.h"
 #include "utils/wait_event.h"
 
@@ -67,6 +68,19 @@ MapBackgroundWorkersRegister(void)
        bgw.bgw_notify_pid = 0;
        bgw.bgw_main_arg = (Datum) 0;
        RegisterBackgroundWorker(&bgw);
+
+       memset(&bgw, 0, sizeof(bgw));
+       bgw.bgw_flags = BGWORKER_SHMEM_ACCESS |
+               BGWORKER_BACKEND_DATABASE_CONNECTION;
+       bgw.bgw_start_time = BgWorkerStart_RecoveryFinished;
+       snprintf(bgw.bgw_library_name, BGW_MAXLEN, "postgres");
+       snprintf(bgw.bgw_function_name, BGW_MAXLEN, "MapCompactorMain");
+       snprintf(bgw.bgw_name, BGW_MAXLEN, "Umbra mapcompactor");
+       snprintf(bgw.bgw_type, BGW_MAXLEN, "map compactor");
+       bgw.bgw_restart_time = 5;
+       bgw.bgw_notify_pid = 0;
+       bgw.bgw_main_arg = (Datum) 0;
+       RegisterBackgroundWorker(&bgw);
 }
 
 void
diff --git a/src/backend/postmaster/meson.build 
b/src/backend/postmaster/meson.build
index 0a30057703..780078e366 100644
--- a/src/backend/postmaster/meson.build
+++ b/src/backend/postmaster/meson.build
@@ -22,5 +22,6 @@ backend_sources += files(
 if get_option('umbra').enabled()
   backend_sources += files(
     'mapwriter.c',
+    'mapcompactor.c',
   )
 endif
diff --git a/src/backend/storage/map/map.c b/src/backend/storage/map/map.c
index 6793db8671..62d3db5cdf 100644
--- a/src/backend/storage/map/map.c
+++ b/src/backend/storage/map/map.c
@@ -669,6 +669,8 @@ MapTryReserveFreshPblknoInternal(UmbraFileContext *map_ctx, 
RelFileLocator rnode
                                                        rnode.spcOid, 
rnode.dbOid, rnode.relNumber, forknum)));
                }
 
+               Assert(MapNormalizeForkBlockCount(forknum,
+                                                                               
  MapSuperGetReclaimBoundary(entry, forknum)) <= next);
                MapSuperSetReservedNextFree(entry, forknum, next + 1);
                Assert(MapNormalizeForkBlockCount(forknum,
                                                                                
  MapSuperblockGetNextFreePhysBlock(&entry->super,
@@ -1197,6 +1199,13 @@ MapInvalidateRelation(RelFileLocator rnode)
 
        /* Remove dedicated superblock cache entry for this relation. */
        MapSuperDeleteEntry(rnode);
+
+       /*
+        * Relation lifecycle ended (drop/unlink path). Purge queued reclaim 
tasks
+        * so post-checkpoint workers cannot act on a future relation that 
reuses
+        * the same relfilenode.
+        */
+       MapReclaimForgetRelation(rnode);
 }
 
 static bool
@@ -1293,7 +1302,10 @@ MapInvalidateDatabaseTablespaces(Oid dbid, int 
ntablespaces,
                }
 
                for (i = 0; i < target_count; i++)
+               {
                        MapSuperDeleteEntry(targets[i]);
+                       MapReclaimForgetRelation(targets[i]);
+               }
 
                pfree(targets);
        }
diff --git a/src/backend/storage/map/mapbgproc.c 
b/src/backend/storage/map/mapbgproc.c
index 3bb167bae9..226abddb1d 100644
--- a/src/backend/storage/map/mapbgproc.c
+++ b/src/backend/storage/map/mapbgproc.c
@@ -7,17 +7,67 @@
  */
 #include "postgres.h"
 
+#include "access/umbra_xlog.h"
 #include "access/xlog.h"
 #include "access/xlogutils.h"
 #include "miscadmin.h"
-#include "storage/latch.h"
+#include "storage/buf_internals.h"
 #include "storage/map.h"
 #include "storage/map_internal.h"
 #include "storage/mapsuper_internal.h"
 #include "storage/proc.h"
+#include "storage/sync.h"
+#include "utils/hsearch.h"
+#include "utils/memutils.h"
 
 static bool MapForkNeedsPrealloc(const MapSuperEntry *entry, ForkNumber 
forknum,
                                                                 bool 
background_mode);
+static bool MapSegmentHasLiveReferences(UmbraFileContext *map_ctx,
+                                                                               
RelFileLocator rnode,
+                                                                               
ForkNumber forknum,
+                                                                               
BlockNumber segno);
+static void MapReclaimInitFileTag(FileTag *tag, RelFileLocator rnode,
+                                                                 ForkNumber 
forknum, BlockNumber segno);
+static bool MapReclaimRegisterUnlinkRequest(RelFileLocator rnode,
+                                                                               
        ForkNumber forknum,
+                                                                               
        BlockNumber segno);
+static bool MapReclaimEnqueueSegment(UmbraFileContext *map_ctx,
+                                                                        
RelFileLocator rnode, ForkNumber forknum,
+                                                                        
BlockNumber segno);
+static void MapReclaimRegisterRelationFilterRequest(RelFileLocator rnode);
+static bool MapReclaimEnqueue(UmbraFileContext *map_ctx, RelFileLocator rnode,
+                                                         ForkNumber forknum,
+                                                         BlockNumber extent_no,
+                                                         BlockNumber 
extent_blocks);
+static bool MapSegmentFullyBelowReclaimBoundary(BlockNumber boundary_pblk,
+                                                                               
                BlockNumber segno);
+static bool MapExtentFullyBelowReclaimBoundary(BlockNumber boundary_pblk,
+                                                                               
           BlockNumber extent_no,
+                                                                               
           BlockNumber extent_blocks);
+static BlockNumber MapCompactorAdvanceBoundaryFromSet(BlockNumber 
boundary_pblk,
+                                                                               
                          BlockNumber next_free_pblk,
+                                                                               
                          HTAB *committed_pblks);
+static bool MapCompactorRelocateEntry(UmbraFileContext *map_ctx,
+                                                                         
RelFileLocator rnode,
+                                                                         
ForkNumber forknum,
+                                                                         
BlockNumber lblkno,
+                                                                         
BlockNumber old_pblkno);
+static int     MapCompactorAnalyzeFork(UmbraFileContext *map_ctx,
+                                                                       
RelFileLocator rnode,
+                                                                       
ForkNumber forknum,
+                                                                       int 
max_moves,
+                                                                       int 
*moves_done);
+
+typedef struct MapExtentLiveEntry
+{
+       BlockNumber     extent_no;
+       uint32          live_blocks;
+} MapExtentLiveEntry;
+
+typedef struct MapBoundaryPblkEntry
+{
+       BlockNumber     pblkno;
+} MapBoundaryPblkEntry;
 
 uint32
 MapAllocPressurePeek(void)
@@ -48,6 +98,29 @@ MapWakeWriter(void)
                SetLatch(&ProcGlobal->allProcs[mapwriter_procno].procLatch);
 }
 
+void
+MapStrategyNotifyCompactor(int mapcompactor_procno)
+{
+       SpinLockAcquire(&MapShared->clock_lock);
+       MapShared->mapcompactor_procno = mapcompactor_procno;
+       SpinLockRelease(&MapShared->clock_lock);
+}
+
+void
+MapWakeCompactor(void)
+{
+       int                     mapcompactor_procno = -1;
+
+       SpinLockAcquire(&MapShared->clock_lock);
+       mapcompactor_procno = MapShared->mapcompactor_procno;
+       if (mapcompactor_procno != -1)
+               MapShared->mapcompactor_procno = -1;
+       SpinLockRelease(&MapShared->clock_lock);
+
+       if (mapcompactor_procno != -1)
+               SetLatch(&ProcGlobal->allProcs[mapcompactor_procno].procLatch);
+}
+
 bool
 MapMaybePreallocateFork(UmbraFileContext *map_ctx, RelFileLocator rnode,
                                                ForkNumber forknum, bool 
background_mode)
@@ -180,8 +253,7 @@ MapMaybePreallocateFork(UmbraFileContext *map_ctx, 
RelFileLocator rnode,
                        entry->in_use &&
                        (entry->flags & MAPSUPER_FLAG_VALID) != 0 &&
                        MapNormalizeForkBlockCount(forknum,
-                                                                          
MapSuperblockGetPhysCapacity(&entry->super,
-                                                                               
                                                   forknum)) < target_nblocks)
+                                                                          
MapSuperblockGetPhysCapacity(&entry->super, forknum)) < target_nblocks)
                {
                        XLogRecPtr      map_lsn = GetXLogWriteRecPtr();
 
@@ -200,6 +272,50 @@ MapMaybePreallocateFork(UmbraFileContext *map_ctx, 
RelFileLocator rnode,
        return prealloc_ok;
 }
 
+static bool
+MapSegmentFullyBelowReclaimBoundary(BlockNumber boundary_pblk, BlockNumber 
segno)
+{
+       uint64          seg_end;
+
+       seg_end = ((uint64) segno + 1) * (uint64) RELSEG_SIZE;
+       return seg_end <= (uint64) boundary_pblk;
+}
+
+static bool
+MapExtentFullyBelowReclaimBoundary(BlockNumber boundary_pblk,
+                                                                  BlockNumber 
extent_no,
+                                                                  BlockNumber 
extent_blocks)
+{
+       uint64          extent_end;
+
+       if (extent_blocks == 0)
+               return false;
+
+       extent_end = ((uint64) extent_no + 1) * (uint64) extent_blocks;
+       return extent_end <= (uint64) boundary_pblk;
+}
+
+static BlockNumber
+MapCompactorAdvanceBoundaryFromSet(BlockNumber boundary_pblk,
+                                                                  BlockNumber 
next_free_pblk,
+                                                                  HTAB 
*committed_pblks)
+{
+       while (boundary_pblk < next_free_pblk)
+       {
+               MapBoundaryPblkEntry *entry;
+
+               entry = (MapBoundaryPblkEntry *) hash_search(committed_pblks,
+                                                                               
                         &boundary_pblk,
+                                                                               
                         HASH_FIND,
+                                                                               
                         NULL);
+               if (entry == NULL)
+                       break;
+               boundary_pblk++;
+       }
+
+       return boundary_pblk;
+}
+
 static bool
 MapForkNeedsPrealloc(const MapSuperEntry *entry, ForkNumber forknum,
                                         bool background_mode)
@@ -231,7 +347,7 @@ MapForkNeedsPrealloc(const MapSuperEntry *entry, ForkNumber 
forknum,
        next = MapSuperGetReservedNextFree(entry, forknum);
        capacity = MapNormalizeForkBlockCount(forknum,
                                                                                
  MapSuperblockGetPhysCapacity(&entry->super,
-                                                                               
                                                   forknum));
+                                                                               
                                                           forknum));
 
        if (next < soft_low)
                return false;
@@ -249,6 +365,158 @@ MapForkNeedsPrealloc(const MapSuperEntry *entry, 
ForkNumber forknum,
        return true;
 }
 
+static bool
+MapSegmentHasLiveReferences(UmbraFileContext *map_ctx, RelFileLocator rnode,
+                                                       ForkNumber forknum, 
BlockNumber segno)
+{
+       BlockNumber     n_lblknos = 0;
+       BlockNumber     n_map_pages;
+       BlockNumber     current_page = InvalidBlockNumber;
+       BlockNumber     page_idx;
+       BlockNumber     page_count;
+       int                     current_slot = -1;
+
+       if (!MapSBlockTryGetLogicalNblocks(map_ctx, rnode, forknum, &n_lblknos) 
||
+               n_lblknos == 0)
+               return false;
+
+       if (!umfile_ctx_fork_exists(map_ctx, UMBRA_METADATA_FORKNUM,
+                                                               
UMFILE_EXISTS_DENSE))
+               return false;
+       n_map_pages = umfile_ctx_get_nblocks(map_ctx, UMBRA_METADATA_FORKNUM,
+                                                                               
 UMFILE_NBLOCKS_DENSE);
+       if (n_map_pages == 0)
+               return false;
+
+       page_count = (n_lblknos + MAP_ENTRIES_PER_PAGE - 1) / 
MAP_ENTRIES_PER_PAGE;
+       for (page_idx = 0; page_idx < page_count; page_idx++)
+       {
+               BlockNumber     page_no = MapForkPageIndexToMapBlkno(forknum, 
page_idx);
+               int                     entry_idx;
+               int                     limit_idx;
+               MapPage    *page;
+               MapBufferDesc *buf;
+
+               if (page_no >= n_map_pages)
+                       break;
+
+               if (page_no != current_page)
+               {
+                       if (current_slot >= 0)
+                               MapUnpinBuffer(current_slot);
+                       current_slot = MapReadBuffer(map_ctx, rnode, forknum, 
page_no);
+                       current_page = page_no;
+               }
+
+               buf = &MapBuffers[current_slot];
+               page = MapGetPage(current_slot);
+               limit_idx = MAP_ENTRIES_PER_PAGE;
+               if (page_idx == page_count - 1 && (n_lblknos % 
MAP_ENTRIES_PER_PAGE) != 0)
+                       limit_idx = n_lblknos % MAP_ENTRIES_PER_PAGE;
+
+               LWLockAcquire(&buf->buffer_lock, LW_SHARED);
+               for (entry_idx = 0; entry_idx < limit_idx; entry_idx++)
+               {
+                       BlockNumber pblkno = page->pblknos[entry_idx];
+
+                       if (pblkno != InvalidBlockNumber &&
+                               pblkno / ((BlockNumber) RELSEG_SIZE) == segno)
+                       {
+                               LWLockRelease(&buf->buffer_lock);
+                               if (current_slot >= 0)
+                                       MapUnpinBuffer(current_slot);
+                               return true;
+                       }
+               }
+               LWLockRelease(&buf->buffer_lock);
+       }
+
+       if (current_slot >= 0)
+               MapUnpinBuffer(current_slot);
+
+       return false;
+}
+
+static void
+MapReclaimInitFileTag(FileTag *tag, RelFileLocator rnode,
+                                         ForkNumber forknum, BlockNumber segno)
+{
+       tag->handler = SYNC_HANDLER_UMBRA;
+       tag->forknum = forknum;
+       tag->rlocator = rnode;
+       tag->segno = (uint64) segno;
+}
+
+static bool
+MapReclaimRegisterUnlinkRequest(RelFileLocator rnode, ForkNumber forknum,
+                                                               BlockNumber 
segno)
+{
+       FileTag         tag;
+
+       MapReclaimInitFileTag(&tag, rnode, forknum, segno);
+       return RegisterSyncRequest(&tag, SYNC_RECLAIM_REQUEST,
+                                                          true /* retryOnError 
*/ );
+}
+
+static bool
+MapReclaimEnqueueSegment(UmbraFileContext *map_ctx, RelFileLocator rnode,
+                                                ForkNumber forknum, 
BlockNumber segno)
+{
+       BlockNumber     reclaim_boundary_pblk;
+
+       if (!MapSBlockTryGetReclaimBoundary(map_ctx, rnode, forknum,
+                                                                               
&reclaim_boundary_pblk))
+               return false;
+       if (!MapSegmentFullyBelowReclaimBoundary(reclaim_boundary_pblk, segno))
+               return false;
+
+       if (!MapReclaimRegisterUnlinkRequest(rnode, forknum, segno))
+               return false;
+
+       elog(DEBUG1,
+                "map reclaim enqueue rel %u/%u/%u fork %u seg %u",
+                rnode.spcOid, rnode.dbOid, rnode.relNumber,
+                forknum, segno);
+
+       MapStatsAddReclaimEnqueued(1);
+       return true;
+}
+
+static void
+MapReclaimRegisterRelationFilterRequest(RelFileLocator rnode)
+{
+       FileTag         tag;
+
+       MapReclaimInitFileTag(&tag, rnode, InvalidForkNumber, 
InvalidBlockNumber);
+       (void) RegisterSyncRequest(&tag, SYNC_FILTER_REQUEST,
+                                                          true /* retryOnError 
*/ );
+}
+
+static bool
+MapReclaimEnqueue(UmbraFileContext *map_ctx, RelFileLocator rnode,
+                                 ForkNumber forknum,
+                                 BlockNumber extent_no, BlockNumber 
extent_blocks)
+{
+       uint64          start64;
+       BlockNumber     segno;
+
+       if (extent_blocks == 0)
+               return false;
+
+       start64 = (uint64) extent_no * (uint64) extent_blocks;
+       if (start64 > (uint64) MaxBlockNumber)
+               return false;
+       segno = ((BlockNumber) start64) / ((BlockNumber) RELSEG_SIZE);
+
+       return MapReclaimEnqueueSegment(map_ctx, rnode, forknum, segno);
+}
+
+void
+MapReclaimForgetRelation(RelFileLocator rnode)
+{
+       MapReclaimRegisterRelationFilterRequest(rnode);
+}
+
 int
 MapPreallocStep(int max_relations)
 {
@@ -321,3 +589,475 @@ MapPreallocStep(int max_relations)
 
        return prealloc_ops;
 }
+
+static bool
+MapCompactorRelocateEntry(UmbraFileContext *map_ctx, RelFileLocator rnode,
+                                                 ForkNumber forknum, 
BlockNumber lblkno,
+                                                 BlockNumber old_pblkno)
+{
+       BlockNumber     map_blkno;
+       int                     entry_idx;
+       int                     slot_id;
+       MapPage    *page;
+       MapBufferDesc *buf;
+       BlockNumber     cur_pblkno;
+       BlockNumber     new_pblkno;
+       XLogRecPtr      map_lsn;
+       char            pagebuf[BLCKSZ];
+
+       if (!MapTryReserveFreshPblkno(map_ctx, rnode, forknum, lblkno,
+                                                                 &new_pblkno, 
true))
+       {
+               return false;
+       }
+
+       if (!umfile_ctx_block_exists(map_ctx, forknum, old_pblkno))
+       {
+               MapInflightRelease(rnode, forknum, lblkno);
+               return false;
+       }
+
+       /*
+        * Compactor relocation is a raw physical copy, not a shared-buffer page
+        * copy. If a newer image is still resident/dirty in buffer pool, this 
path
+        * will not refresh that buffer's PageLSN or cached contents.
+        */
+       umfile_ctx_prefetch(map_ctx, forknum, old_pblkno);
+       umfile_ctx_read(map_ctx, forknum, old_pblkno, pagebuf, BLCKSZ);
+       umfile_ctx_extend(map_ctx, forknum, new_pblkno, pagebuf);
+
+       umfile_ctx_register_dirty(map_ctx, forknum, new_pblkno, false, false);
+
+       map_blkno = MapLblknoToMapBlkno(forknum, lblkno);
+       entry_idx = map_blkno % MAP_ENTRIES_PER_PAGE;
+       map_blkno = map_blkno / MAP_ENTRIES_PER_PAGE;
+
+       slot_id = MapReadBuffer(map_ctx, rnode, forknum, map_blkno);
+       buf = &MapBuffers[slot_id];
+       page = MapGetPage(slot_id);
+       if (!LWLockConditionalAcquire(&buf->buffer_lock, LW_EXCLUSIVE))
+       {
+               MapUnpinBuffer(slot_id);
+               MapInflightRelease(rnode, forknum, lblkno);
+               return false;
+       }
+
+       cur_pblkno = page->pblknos[entry_idx];
+       if (cur_pblkno != old_pblkno)
+       {
+               LWLockRelease(&buf->buffer_lock);
+               MapUnpinBuffer(slot_id);
+               MapInflightRelease(rnode, forknum, lblkno);
+               return false;
+       }
+
+       map_lsn = log_umbra_map_set(rnode, forknum, lblkno, old_pblkno, 
new_pblkno);
+       page->pblknos[entry_idx] = new_pblkno;
+       MapMarkBufferDirty(map_ctx, buf, map_lsn);
+
+       LWLockRelease(&buf->buffer_lock);
+       MapUnpinBuffer(slot_id);
+       MapSBlockBumpPhysicalState(map_ctx, rnode, forknum, new_pblkno + 1,
+                                                          true, true, map_lsn);
+       MapInflightRelease(rnode, forknum, lblkno);
+       MapStatsAddCompactorRelocations(1);
+
+       return true;
+}
+
+static int
+MapCompactorAnalyzeFork(UmbraFileContext *map_ctx, RelFileLocator rnode,
+                                               ForkNumber forknum,
+                                               int max_moves, int *moves_done)
+{
+       HASHCTL         ctl;
+       HASHCTL         boundary_ctl;
+       HTAB       *extent_live;
+       HTAB       *boundary_committed = NULL;
+       bool       *segment_live = NULL;
+       MemoryContext extent_ctx;
+       HASH_SEQ_STATUS seq;
+       MapExtentLiveEntry *live_entry;
+       BlockNumber     n_lblknos = 0;
+       BlockNumber     n_map_pages;
+       BlockNumber     current_page = InvalidBlockNumber;
+       BlockNumber     page_idx;
+       BlockNumber     page_count;
+       BlockNumber     max_extent_no = InvalidBlockNumber;
+       BlockNumber     extent_blocks;
+       BlockNumber     next_free_pblk = 0;
+       BlockNumber     reclaim_boundary_pblk = 0;
+       BlockNumber     advanced_boundary_pblk = 0;
+       BlockNumber     reclaim_seg_limit = 0;
+       int                     current_slot = -1;
+       int                     sparse_count = 0;
+       int                     live_threshold;
+       int                     moves_limit;
+       int                     moved_here = 0;
+
+       if (!map_compactor_enable)
+               return 0;
+       if (!MapForkHasMappedState(forknum))
+               return 0;
+       if (map_compactor_extent_blocks <= 0)
+               return 0;
+       extent_blocks = (BlockNumber) map_compactor_extent_blocks;
+
+       if (!MapSBlockTryGetLogicalNblocks(map_ctx, rnode, forknum, &n_lblknos) 
||
+               n_lblknos == 0)
+               return 0;
+       if (!MapSBlockTryGetNextFreePhysBlock(map_ctx, rnode, forknum,
+                                                                               
  &next_free_pblk) ||
+               next_free_pblk == 0)
+               return 0;
+       if (!MapSBlockTryGetReclaimBoundary(map_ctx, rnode, forknum,
+                                                                               
&reclaim_boundary_pblk))
+               return 0;
+       if (reclaim_boundary_pblk > next_free_pblk)
+               reclaim_boundary_pblk = next_free_pblk;
+       reclaim_seg_limit = reclaim_boundary_pblk / ((BlockNumber) RELSEG_SIZE);
+
+       if (!umfile_ctx_fork_exists(map_ctx, UMBRA_METADATA_FORKNUM,
+                                                               
UMFILE_EXISTS_DENSE))
+               return 0;
+       n_map_pages = umfile_ctx_get_nblocks(map_ctx, UMBRA_METADATA_FORKNUM,
+                                                                               
 UMFILE_NBLOCKS_DENSE);
+       if (n_map_pages == 0)
+               return 0;
+       page_count = (n_lblknos + MAP_ENTRIES_PER_PAGE - 1) / 
MAP_ENTRIES_PER_PAGE;
+       if (page_count == 0)
+               return 0;
+
+       extent_ctx = AllocSetContextCreate(CurrentMemoryContext,
+                                                                          
"MapCompactorExtentLiveContext",
+                                                                          
ALLOCSET_DEFAULT_SIZES);
+
+       MemSet(&ctl, 0, sizeof(ctl));
+       ctl.keysize = sizeof(BlockNumber);
+       ctl.entrysize = sizeof(MapExtentLiveEntry);
+       ctl.hcxt = extent_ctx;
+       extent_live = hash_create("Map Compactor Extent Live",
+                                                         1024,
+                                                         &ctl,
+                                                         HASH_ELEM | 
HASH_BLOBS | HASH_CONTEXT);
+       if (reclaim_seg_limit > 0)
+               segment_live = MemoryContextAllocZero(extent_ctx,
+                                                                               
          sizeof(bool) * reclaim_seg_limit);
+       if (reclaim_boundary_pblk < next_free_pblk)
+       {
+               MemSet(&boundary_ctl, 0, sizeof(boundary_ctl));
+               boundary_ctl.keysize = sizeof(BlockNumber);
+               boundary_ctl.entrysize = sizeof(MapBoundaryPblkEntry);
+               boundary_ctl.hcxt = extent_ctx;
+               boundary_committed = hash_create("Map Compactor Boundary 
Committed",
+                                                                               
 1024,
+                                                                               
 &boundary_ctl,
+                                                                               
 HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
+       }
+
+       for (page_idx = 0; page_idx < page_count; page_idx++)
+       {
+               BlockNumber     page_no = MapForkPageIndexToMapBlkno(forknum, 
page_idx);
+               int                     entry_idx;
+               int                     limit_idx;
+               MapPage    *page;
+               MapBufferDesc *buf;
+               BlockNumber     extent_no;
+               BlockNumber     segno;
+               bool            found;
+
+               if (page_no >= n_map_pages)
+                       break;
+
+               if (page_no != current_page)
+               {
+                       if (current_slot >= 0)
+                               MapUnpinBuffer(current_slot);
+                       current_slot = MapReadBuffer(map_ctx, rnode, forknum, 
page_no);
+                       current_page = page_no;
+               }
+
+               buf = &MapBuffers[current_slot];
+               page = MapGetPage(current_slot);
+               if (!LWLockConditionalAcquire(&buf->buffer_lock, LW_SHARED))
+                       continue;
+               limit_idx = MAP_ENTRIES_PER_PAGE;
+               if (page_idx == page_count - 1 && (n_lblknos % 
MAP_ENTRIES_PER_PAGE) != 0)
+                       limit_idx = n_lblknos % MAP_ENTRIES_PER_PAGE;
+               for (entry_idx = 0; entry_idx < limit_idx; entry_idx++)
+               {
+                       BlockNumber pblkno = page->pblknos[entry_idx];
+
+                       if (pblkno == InvalidBlockNumber)
+                               continue;
+
+                       if (boundary_committed != NULL &&
+                               pblkno >= reclaim_boundary_pblk &&
+                               pblkno < next_free_pblk)
+                       {
+                               (void) hash_search(boundary_committed,
+                                                                  &pblkno,
+                                                                  HASH_ENTER,
+                                                                  NULL);
+                       }
+
+                       if (pblkno >= reclaim_boundary_pblk)
+                               continue;
+
+                       extent_no = pblkno / extent_blocks;
+                       segno = pblkno / ((BlockNumber) RELSEG_SIZE);
+                       if (max_extent_no == InvalidBlockNumber || extent_no > 
max_extent_no)
+                               max_extent_no = extent_no;
+
+                       live_entry = (MapExtentLiveEntry *) 
hash_search(extent_live,
+                                                                               
                                        &extent_no,
+                                                                               
                                        HASH_ENTER,
+                                                                               
                                        &found);
+                       if (!found)
+                               live_entry->live_blocks = 0;
+                       if (live_entry->live_blocks < extent_blocks)
+                               live_entry->live_blocks++;
+
+                       if (segment_live != NULL && segno < reclaim_seg_limit)
+                               segment_live[segno] = true;
+               }
+               LWLockRelease(&buf->buffer_lock);
+       }
+
+       if (current_slot >= 0)
+               MapUnpinBuffer(current_slot);
+
+       live_threshold = map_compactor_low_live_percent;
+       if (live_threshold < 1)
+               live_threshold = 1;
+       if (live_threshold > 100)
+               live_threshold = 100;
+       moves_limit = Max(0, max_moves);
+
+       hash_seq_init(&seq, extent_live);
+       while ((live_entry = (MapExtentLiveEntry *) hash_seq_search(&seq)) != 
NULL)
+       {
+               int                     live_pct;
+
+               if (max_extent_no != InvalidBlockNumber &&
+                       live_entry->extent_no == max_extent_no)
+                       continue;
+               if (!MapExtentFullyBelowReclaimBoundary(reclaim_boundary_pblk,
+                                                                               
                live_entry->extent_no,
+                                                                               
                extent_blocks))
+                       continue;
+
+               live_pct = (int) (((uint64) live_entry->live_blocks * 100) /
+                                                 Max((uint64) 1, (uint64) 
extent_blocks));
+               if (live_pct > live_threshold)
+                       continue;
+
+               sparse_count++;
+       }
+
+       for (BlockNumber segno = 0; segno < reclaim_seg_limit; segno++)
+       {
+               if (segment_live != NULL && segment_live[segno])
+                       continue;
+               if (!umfile_ctx_segment_exists(map_ctx, forknum, segno))
+                       continue;
+               if (MapSegmentHasLiveReferences(map_ctx, rnode, forknum, segno))
+                       continue;
+
+               (void) MapReclaimEnqueueSegment(map_ctx, rnode, forknum, segno);
+       }
+
+       if (sparse_count > 0 && moves_limit > 0)
+       {
+               BlockNumber     pass_page = InvalidBlockNumber;
+               int                     pass_slot = -1;
+
+               for (page_idx = 0; page_idx < page_count; page_idx++)
+               {
+                       BlockNumber     page_no = 
MapForkPageIndexToMapBlkno(forknum, page_idx);
+                       int                     entry_idx;
+                       int                     limit_idx;
+
+                       if (moved_here >= moves_limit)
+                               break;
+                       if (moves_done != NULL && *moves_done >= moves_limit)
+                               break;
+                       if (page_no >= n_map_pages)
+                               break;
+
+                       if (page_no != pass_page)
+                       {
+                               if (pass_slot >= 0)
+                                       MapUnpinBuffer(pass_slot);
+                               pass_slot = MapReadBuffer(map_ctx, rnode, 
forknum, page_no);
+                               pass_page = page_no;
+                       }
+
+                       limit_idx = MAP_ENTRIES_PER_PAGE;
+                       if (page_idx == page_count - 1 && (n_lblknos % 
MAP_ENTRIES_PER_PAGE) != 0)
+                               limit_idx = n_lblknos % MAP_ENTRIES_PER_PAGE;
+                       for (entry_idx = 0; entry_idx < limit_idx; entry_idx++)
+                       {
+                               BlockNumber lblkno = page_idx * 
MAP_ENTRIES_PER_PAGE + entry_idx;
+                               MapPage    *page;
+                               MapBufferDesc *buf;
+                               BlockNumber pblkno;
+                               BlockNumber extent_no;
+                               bool candidate = false;
+
+                               buf = &MapBuffers[pass_slot];
+                               page = MapGetPage(pass_slot);
+                               if 
(!LWLockConditionalAcquire(&buf->buffer_lock, LW_SHARED))
+                                       continue;
+                               pblkno = page->pblknos[entry_idx];
+                               LWLockRelease(&buf->buffer_lock);
+
+                               if (pblkno == InvalidBlockNumber)
+                                       continue;
+
+                               extent_no = pblkno / extent_blocks;
+                               if (max_extent_no != InvalidBlockNumber &&
+                                       extent_no == max_extent_no)
+                                       continue;
+                               if 
(!MapExtentFullyBelowReclaimBoundary(reclaim_boundary_pblk,
+                                                                               
                                extent_no,
+                                                                               
                                extent_blocks))
+                                       continue;
+
+                               live_entry = (MapExtentLiveEntry *) 
hash_search(extent_live,
+                                                                               
                                                &extent_no,
+                                                                               
                                                HASH_FIND,
+                                                                               
                                                NULL);
+                               if (live_entry != NULL)
+                               {
+                                       int live_pct = (int) (((uint64) 
live_entry->live_blocks * 100) /
+                                                                               
  Max((uint64) 1, (uint64) extent_blocks));
+                                       if (live_pct <= live_threshold)
+                                               candidate = true;
+                               }
+
+                               if (!candidate)
+                                       continue;
+
+                               if (MapCompactorRelocateEntry(map_ctx, rnode, 
forknum,
+                                                                               
          lblkno, pblkno))
+                               {
+                                       moved_here++;
+                                       if (moves_done != NULL)
+                                               (*moves_done)++;
+
+                                       live_entry = (MapExtentLiveEntry *) 
hash_search(extent_live,
+                                                                               
                                                        &extent_no,
+                                                                               
                                                        HASH_FIND,
+                                                                               
                                                        NULL);
+                                       if (live_entry != NULL && 
live_entry->live_blocks > 0)
+                                       {
+                                               live_entry->live_blocks--;
+                                               if (live_entry->live_blocks == 
0)
+                                               {
+                                                       BlockNumber     
start_blkno = extent_no * extent_blocks;
+                                                       BlockNumber     segno = 
start_blkno / ((BlockNumber) RELSEG_SIZE);
+
+                                                       if 
(!MapSegmentHasLiveReferences(map_ctx, rnode, forknum, segno))
+                                                               (void) 
MapReclaimEnqueue(map_ctx, rnode, forknum,
+                                                                               
                                 extent_no, extent_blocks);
+                                               }
+                                       }
+                               }
+                       }
+               }
+
+               if (pass_slot >= 0)
+                       MapUnpinBuffer(pass_slot);
+       }
+
+       if (boundary_committed != NULL)
+       {
+               advanced_boundary_pblk =
+                       
MapCompactorAdvanceBoundaryFromSet(reclaim_boundary_pblk,
+                                                                               
           next_free_pblk,
+                                                                               
           boundary_committed);
+               if (advanced_boundary_pblk > reclaim_boundary_pblk)
+                       MapSBlockAdvanceReclaimBoundary(map_ctx, rnode, forknum,
+                                                                               
        advanced_boundary_pblk);
+       }
+
+       hash_destroy(extent_live);
+       MemoryContextDelete(extent_ctx);
+       return sparse_count;
+}
+
+int
+MapCompactorStep(int max_relations)
+{
+       static int      scan_slot = 0;
+       int                     max_scan;
+       int                     scanned = 0;
+       int                     visited = 0;
+       int                     total_moves = 0;
+       int                     move_budget;
+
+       if (!map_compactor_enable || InRecovery || max_relations <= 0 ||
+               MapSuperCapacity <= 0)
+               return 0;
+
+       move_budget = Max(0, map_compactor_max_moves);
+
+       max_scan = Min(MapSuperCapacity, Max(64, max_relations * 8));
+       while (scanned < max_scan && visited < max_relations)
+       {
+               MapSuperEntry *entry;
+               RelFileLocator  rnode;
+               RelFileLocatorBackend rlocator;
+               UmbraFileContext *ctx;
+               bool            has_init_fork;
+               bool            has_map_fork;
+               entry = MapSuperEntryBySlot(scan_slot);
+               scan_slot = (scan_slot + 1) % MapSuperCapacity;
+               scanned++;
+
+               if (!LWLockConditionalAcquire(&entry->lock, LW_SHARED))
+                       continue;
+               if (!entry->in_use)
+               {
+                       LWLockRelease(&entry->lock);
+                       continue;
+               }
+               rnode = entry->key.rnode;
+               LWLockRelease(&entry->lock);
+               visited++;
+
+               rlocator.locator = rnode;
+               rlocator.backend = INVALID_PROC_NUMBER;
+               ctx = umfile_ctx_acquire(rlocator);
+               if (ctx == NULL)
+                       continue;
+
+               has_init_fork = umfile_ctx_fork_exists(ctx, INIT_FORKNUM,
+                                                                               
           UMFILE_EXISTS_DENSE);
+               if (has_init_fork)
+                       continue;
+               has_map_fork = umfile_ctx_fork_exists(ctx, 
UMBRA_METADATA_FORKNUM,
+                                                                               
          UMFILE_EXISTS_DENSE);
+               if (!has_map_fork)
+                       continue;
+
+               MapCompactorAnalyzeFork(ctx, rnode, MAIN_FORKNUM,
+                                                          move_budget, 
&total_moves);
+               MapCompactorAnalyzeFork(ctx, rnode, FSM_FORKNUM,
+                                                          move_budget, 
&total_moves);
+               MapCompactorAnalyzeFork(ctx, rnode, VISIBILITYMAP_FORKNUM,
+                                                          move_budget, 
&total_moves);
+       }
+
+       /*
+        * Fresh relations usually occupy low-numbered super slots first. If a
+        * startup-time sweep observes only inactive slots, don't keep marching 
the
+        * scan cursor forward in large strides, or newly created relations can 
sit
+        * behind a full rotation before compactor ever visits them.
+        */
+       if (visited == 0)
+               scan_slot = 0;
+
+       return total_moves;
+}
diff --git a/src/backend/storage/map/mapinit.c 
b/src/backend/storage/map/mapinit.c
index c30057cf04..6014b7558d 100644
--- a/src/backend/storage/map/mapinit.c
+++ b/src/backend/storage/map/mapinit.c
@@ -35,6 +35,10 @@ int                  map_prealloc_fsm_batch = 128; /* 1MB in 
8k blocks */
 int                    map_prealloc_vm_low = 64;       /* 512kB in 8k blocks */
 int                    map_prealloc_vm_hard = 16;      /* 128kB in 8k blocks */
 int                    map_prealloc_vm_batch = 128; /* 1MB in 8k blocks */
+bool           map_compactor_enable = false;
+int                    map_compactor_extent_blocks = 1024;     /* 8MB in 8k 
blocks */
+int                    map_compactor_low_live_percent = 10;
+int                    map_compactor_max_moves = 16;
 
 /* Shared memory pointer */
 MapSharedData *MapShared = NULL;
@@ -115,8 +119,13 @@ MapShmemInit(void *arg)
        MapShared->num_slots = map_buffers;
        MapShared->first_free_buffer = 0;
        MapShared->mapwriter_procno = -1;
+       MapShared->mapcompactor_procno = -1;
        pg_atomic_init_u32(&MapShared->next_victim_buffer, 0);
        pg_atomic_init_u32(&MapShared->num_allocs, 0);
+       pg_atomic_init_u64(&MapShared->map_compactor_relocations, 0);
+       pg_atomic_init_u64(&MapShared->map_reclaim_enqueued, 0);
+       pg_atomic_init_u64(&MapShared->map_reclaim_processed, 0);
+       pg_atomic_init_u64(&MapShared->map_reclaim_failed, 0);
        MapShared->complete_passes = 0;
        SpinLockInit(&MapShared->clock_lock);
 
@@ -156,3 +165,75 @@ MapShmemAttach(void *arg)
 
        MapSuperTableShmemAttach();
 }
+
+void
+MapStatsAddCompactorRelocations(uint64 count)
+{
+       if (MapShared == NULL || count == 0)
+               return;
+
+       pg_atomic_fetch_add_u64(&MapShared->map_compactor_relocations, count);
+}
+
+void
+MapStatsAddReclaimEnqueued(uint64 count)
+{
+       if (MapShared == NULL || count == 0)
+               return;
+
+       pg_atomic_fetch_add_u64(&MapShared->map_reclaim_enqueued, count);
+}
+
+void
+MapStatsAddReclaimProcessed(uint64 count)
+{
+       if (MapShared == NULL || count == 0)
+               return;
+
+       pg_atomic_fetch_add_u64(&MapShared->map_reclaim_processed, count);
+}
+
+void
+MapStatsAddReclaimFailed(uint64 count)
+{
+       if (MapShared == NULL || count == 0)
+               return;
+
+       pg_atomic_fetch_add_u64(&MapShared->map_reclaim_failed, count);
+}
+
+uint64
+MapStatsGetCompactorRelocations(void)
+{
+       if (MapShared == NULL)
+               return 0;
+
+       return pg_atomic_read_u64(&MapShared->map_compactor_relocations);
+}
+
+uint64
+MapStatsGetReclaimEnqueued(void)
+{
+       if (MapShared == NULL)
+               return 0;
+
+       return pg_atomic_read_u64(&MapShared->map_reclaim_enqueued);
+}
+
+uint64
+MapStatsGetReclaimProcessed(void)
+{
+       if (MapShared == NULL)
+               return 0;
+
+       return pg_atomic_read_u64(&MapShared->map_reclaim_processed);
+}
+
+uint64
+MapStatsGetReclaimFailed(void)
+{
+       if (MapShared == NULL)
+               return 0;
+
+       return pg_atomic_read_u64(&MapShared->map_reclaim_failed);
+}
diff --git a/src/backend/storage/map/mapsuper.c 
b/src/backend/storage/map/mapsuper.c
index e3e9421566..e0d370acd6 100644
--- a/src/backend/storage/map/mapsuper.c
+++ b/src/backend/storage/map/mapsuper.c
@@ -71,6 +71,10 @@ static BlockNumber MapSuperGetExtendingTarget(const 
MapSuperEntry *entry,
 static void MapSuperSetExtendingTarget(MapSuperEntry *entry,
                                                                           
ForkNumber forknum,
                                                                           
BlockNumber nblocks);
+static void MapSuperSetReclaimBoundary(MapSuperEntry *entry,
+                                                                          
ForkNumber forknum,
+                                                                          
BlockNumber boundary_pblk);
+static void MapSuperResetReclaimBoundaries(MapSuperEntry *entry);
 static bool MapSuperPrepareEntryForUpdate(UmbraFileContext *map_ctx,
                                                                                
  RelFileLocator rnode,
                                                                                
  XLogRecPtr map_lsn,
@@ -581,6 +585,9 @@ MapSuperEnsureEntryLocked(RelFileLocator rnode)
        entry->extending_target_main = InvalidBlockNumber;
        entry->extending_target_fsm = InvalidBlockNumber;
        entry->extending_target_vm = InvalidBlockNumber;
+       entry->reclaim_boundary_main = 0;
+       entry->reclaim_boundary_fsm = 0;
+       entry->reclaim_boundary_vm = 0;
        MapSuperIndex[insert_bucket].slot_id = slot_id;
 
        LWLockAcquire(&entry->lock, LW_EXCLUSIVE);
@@ -618,6 +625,9 @@ MapSuperDeleteEntry(RelFileLocator rnode)
                entry->extending_target_main = InvalidBlockNumber;
                entry->extending_target_fsm = InvalidBlockNumber;
                entry->extending_target_vm = InvalidBlockNumber;
+               entry->reclaim_boundary_main = 0;
+               entry->reclaim_boundary_fsm = 0;
+               entry->reclaim_boundary_vm = 0;
                entry->in_use = false;
                SpinLockAcquire(&MapSuperCtlData->free_list_lock);
                entry->next_free = MapSuperCtlData->free_head;
@@ -654,6 +664,7 @@ MapSBlockRead(UmbraFileContext *map_ctx, RelFileLocator 
rnode, MapSuperblock *su
                                entry->page_lsn = 
MapSuperblockGetLastUpdatedLSN(&disk_super);
                                entry->flags = MAPSUPER_FLAG_VALID;
                                MapSuperResetReservedNextFrees(entry);
+                               MapSuperResetReclaimBoundaries(entry);
                                Assert(MapNormalizeForkBlockCount(MAIN_FORKNUM,
                                                                                
                  MapSuperblockGetNextFreePhysBlock(&entry->super,
                                                                                
                                                                                
        MAIN_FORKNUM)) <=
@@ -673,6 +684,7 @@ MapSBlockRead(UmbraFileContext *map_ctx, RelFileLocator 
rnode, MapSuperblock *su
                                entry->page_lsn = InvalidXLogRecPtr;
                                entry->flags = MAPSUPER_FLAG_VALID | 
MAPSUPER_FLAG_CORRUPT;
                                MapSuperResetReservedNextFrees(entry);
+                               MapSuperResetReclaimBoundaries(entry);
                                Assert(MapNormalizeForkBlockCount(MAIN_FORKNUM,
                                                                                
                  MapSuperblockGetNextFreePhysBlock(&entry->super,
                                                                                
                                                                                
        MAIN_FORKNUM)) <=
@@ -708,6 +720,7 @@ MapSBlockRead(UmbraFileContext *map_ctx, RelFileLocator 
rnode, MapSuperblock *su
                                entry->page_lsn = 
MapSuperblockGetLastUpdatedLSN(&disk_super);
                                entry->flags = MAPSUPER_FLAG_VALID;
                                MapSuperResetReservedNextFrees(entry);
+                               MapSuperResetReclaimBoundaries(entry);
                                Assert(MapNormalizeForkBlockCount(MAIN_FORKNUM,
                                                                                
                  MapSuperblockGetNextFreePhysBlock(&entry->super,
                                                                                
                                                                                
        MAIN_FORKNUM)) <=
@@ -727,6 +740,7 @@ MapSBlockRead(UmbraFileContext *map_ctx, RelFileLocator 
rnode, MapSuperblock *su
                                entry->page_lsn = InvalidXLogRecPtr;
                                entry->flags = MAPSUPER_FLAG_VALID | 
MAPSUPER_FLAG_CORRUPT;
                                MapSuperResetReservedNextFrees(entry);
+                               MapSuperResetReclaimBoundaries(entry);
                                Assert(MapNormalizeForkBlockCount(MAIN_FORKNUM,
                                                                                
                  MapSuperblockGetNextFreePhysBlock(&entry->super,
                                                                                
                                                                                
        MAIN_FORKNUM)) <=
@@ -910,6 +924,67 @@ MapSuperSetExtendingTarget(MapSuperEntry *entry, 
ForkNumber forknum,
        }
 }
 
+BlockNumber
+MapSuperGetReclaimBoundary(const MapSuperEntry *entry, ForkNumber forknum)
+{
+       Assert(entry != NULL);
+
+       switch (forknum)
+       {
+               case MAIN_FORKNUM:
+                       return entry->reclaim_boundary_main;
+               case FSM_FORKNUM:
+                       return entry->reclaim_boundary_fsm;
+               case VISIBILITYMAP_FORKNUM:
+                       return entry->reclaim_boundary_vm;
+               default:
+                       elog(ERROR, "unsupported fork number for reclaim 
boundary: %d", forknum);
+       }
+
+       pg_unreachable();
+}
+
+static void
+MapSuperSetReclaimBoundary(MapSuperEntry *entry, ForkNumber forknum,
+                                                  BlockNumber boundary_pblk)
+{
+       Assert(entry != NULL);
+
+       switch (forknum)
+       {
+               case MAIN_FORKNUM:
+                       entry->reclaim_boundary_main = boundary_pblk;
+                       break;
+               case FSM_FORKNUM:
+                       entry->reclaim_boundary_fsm = boundary_pblk;
+                       break;
+               case VISIBILITYMAP_FORKNUM:
+                       entry->reclaim_boundary_vm = boundary_pblk;
+                       break;
+               default:
+                       elog(ERROR, "unsupported fork number for reclaim 
boundary: %d", forknum);
+       }
+}
+
+static void
+MapSuperResetReclaimBoundaries(MapSuperEntry *entry)
+{
+       Assert(entry != NULL);
+
+       entry->reclaim_boundary_main =
+               MapNormalizeForkBlockCount(MAIN_FORKNUM,
+                                                                  
MapSuperblockGetNextFreePhysBlock(&entry->super,
+                                                                               
                                                         MAIN_FORKNUM));
+       entry->reclaim_boundary_fsm =
+               MapNormalizeForkBlockCount(FSM_FORKNUM,
+                                                                  
MapSuperblockGetNextFreePhysBlock(&entry->super,
+                                                                               
                                                         FSM_FORKNUM));
+       entry->reclaim_boundary_vm =
+               MapNormalizeForkBlockCount(VISIBILITYMAP_FORKNUM,
+                                                                  
MapSuperblockGetNextFreePhysBlock(&entry->super,
+                                                                               
                                                         
VISIBILITYMAP_FORKNUM));
+}
+
 static bool
 MapSuperPrepareEntryForUpdate(UmbraFileContext *map_ctx, RelFileLocator rnode,
                                                          XLogRecPtr map_lsn, 
const char *missing_errmsg,
@@ -946,6 +1021,7 @@ MapSuperPrepareEntryForUpdate(UmbraFileContext *map_ctx, 
RelFileLocator rnode,
                                entry->page_lsn = 
MapSuperblockGetLastUpdatedLSN(&disk_super);
                                entry->flags = MAPSUPER_FLAG_VALID;
                                MapSuperResetReservedNextFrees(entry);
+                               MapSuperResetReclaimBoundaries(entry);
                        }
                        else
                        {
@@ -953,6 +1029,7 @@ MapSuperPrepareEntryForUpdate(UmbraFileContext *map_ctx, 
RelFileLocator rnode,
                                entry->page_lsn = InvalidXLogRecPtr;
                                entry->flags = MAPSUPER_FLAG_VALID | 
MAPSUPER_FLAG_CORRUPT;
                                MapSuperResetReservedNextFrees(entry);
+                               MapSuperResetReclaimBoundaries(entry);
                        }
                }
        }
@@ -974,6 +1051,7 @@ MapSuperPrepareEntryForUpdate(UmbraFileContext *map_ctx, 
RelFileLocator rnode,
                MapSuperblockInit(&entry->super, 0);
                entry->flags = MAPSUPER_FLAG_VALID;
                MapSuperResetReservedNextFrees(entry);
+               MapSuperResetReclaimBoundaries(entry);
        }
 
        Assert(MapNormalizeForkBlockCount(MAIN_FORKNUM,
@@ -1103,6 +1181,8 @@ MapSBlockBumpPhysicalState(UmbraFileContext *map_ctx, 
RelFileLocator rnode,
        {
                MapSuperblockSetNextFreePhysBlock(&entry->super, forknum, 
nblocks);
                MapSuperMaybeBumpReservedNextFree(entry, forknum, nblocks);
+               if (InRecovery)
+                       MapSuperSetReclaimBoundary(entry, forknum, nblocks);
                changed = true;
        }
        if (bump_capacity && current_capacity < nblocks)
@@ -1274,6 +1354,7 @@ MapSBlockInit(UmbraFileContext *map_ctx, RelFileLocator 
rnode, XLogRecPtr map_ls
        MapSuperblockSetLastUpdatedLSN(&entry->super, entry->page_lsn);
        entry->flags = MAPSUPER_FLAG_VALID | MAPSUPER_FLAG_DIRTY;
        MapSuperResetReservedNextFrees(entry);
+       MapSuperResetReclaimBoundaries(entry);
        Assert(MapNormalizeForkBlockCount(MAIN_FORKNUM,
                                                                          
MapSuperblockGetNextFreePhysBlock(&entry->super,
                                                                                
                                                                MAIN_FORKNUM)) 
<=
@@ -1336,6 +1417,7 @@ MapSBlockEnsureLoaded(UmbraFileContext *map_ctx, 
RelFileLocator rnode)
                                entry->page_lsn = 
MapSuperblockGetLastUpdatedLSN(&disk_super);
                                entry->flags = MAPSUPER_FLAG_VALID;
                                MapSuperResetReservedNextFrees(entry);
+                               MapSuperResetReclaimBoundaries(entry);
                        }
                        else
                        {
@@ -1343,6 +1425,7 @@ MapSBlockEnsureLoaded(UmbraFileContext *map_ctx, 
RelFileLocator rnode)
                                entry->page_lsn = InvalidXLogRecPtr;
                                entry->flags = MAPSUPER_FLAG_VALID | 
MAPSUPER_FLAG_CORRUPT;
                                MapSuperResetReservedNextFrees(entry);
+                               MapSuperResetReclaimBoundaries(entry);
                        }
                }
        }
@@ -1481,6 +1564,83 @@ MapSBlockTryGetNextFreePhysBlock(UmbraFileContext 
*map_ctx, RelFileLocator rnode
        return true;
 }
 
+bool
+MapSBlockTryGetReclaimBoundary(UmbraFileContext *map_ctx, RelFileLocator rnode,
+                                                          ForkNumber forknum, 
BlockNumber *boundary_pblk)
+{
+       MapSuperEntry *entry;
+       uint32          flags;
+
+       Assert(boundary_pblk != NULL);
+
+       if (!MapForkHasMappedState(forknum))
+               return false;
+       if (!MapSBlockEnsureLoaded(map_ctx, rnode))
+               return false;
+       if (!MapSuperFindEntryLocked(rnode, LW_SHARED, &entry))
+               return false;
+
+       flags = entry->flags;
+       if ((flags & MAPSUPER_FLAG_CORRUPT) ||
+               !MapSuperblockHasValidIdentity(&entry->super) ||
+               ((flags & MAPSUPER_FLAG_DIRTY) == 0 &&
+                !MapSuperblockCheckCRC(&entry->super)))
+       {
+               LWLockRelease(&entry->lock);
+               if (!InRecovery)
+                       MapSBlockReportCorrupt(rnode, "invalid identity or 
CRC");
+               return false;
+       }
+
+       *boundary_pblk = MapNormalizeForkBlockCount(forknum,
+                                                                               
                MapSuperGetReclaimBoundary(entry,
+                                                                               
                                                                   forknum));
+       LWLockRelease(&entry->lock);
+       return true;
+}
+
+void
+MapSBlockAdvanceReclaimBoundary(UmbraFileContext *map_ctx, RelFileLocator 
rnode,
+                                                               ForkNumber 
forknum, BlockNumber boundary_pblk)
+{
+       MapSuperEntry *entry;
+       BlockNumber     current;
+       BlockNumber     next_free;
+
+       if (!MapForkHasMappedState(forknum))
+               return;
+       if (boundary_pblk == InvalidBlockNumber)
+               return;
+       if (!MapSBlockEnsureLoaded(map_ctx, rnode))
+               return;
+       if (!MapSuperFindEntryLocked(rnode, LW_EXCLUSIVE, &entry))
+               return;
+
+       if (!entry->in_use ||
+               (entry->flags & MAPSUPER_FLAG_VALID) == 0 ||
+               (entry->flags & MAPSUPER_FLAG_CORRUPT) != 0 ||
+               !MapSuperblockHasValidIdentity(&entry->super) ||
+               ((entry->flags & MAPSUPER_FLAG_DIRTY) == 0 &&
+                !MapSuperblockCheckCRC(&entry->super)))
+       {
+               LWLockRelease(&entry->lock);
+               return;
+       }
+
+       current = MapNormalizeForkBlockCount(forknum,
+                                                                               
 MapSuperGetReclaimBoundary(entry,
+                                                                               
                                                        forknum));
+       next_free = MapNormalizeForkBlockCount(forknum,
+                                                                               
   MapSuperblockGetNextFreePhysBlock(&entry->super,
+                                                                               
                                                                         
forknum));
+       if (boundary_pblk > next_free)
+               boundary_pblk = next_free;
+       if (boundary_pblk > current)
+               MapSuperSetReclaimBoundary(entry, forknum, boundary_pblk);
+
+       LWLockRelease(&entry->lock);
+}
+
 void
 MapSBlockBumpLogicalNblocks(UmbraFileContext *map_ctx, RelFileLocator rnode,
                                                        ForkNumber forknum, 
BlockNumber nblocks,
@@ -1612,6 +1772,9 @@ MapSuperTableShmemInit(void)
                entry->extending_target_main = InvalidBlockNumber;
                entry->extending_target_fsm = InvalidBlockNumber;
                entry->extending_target_vm = InvalidBlockNumber;
+               entry->reclaim_boundary_main = 0;
+               entry->reclaim_boundary_fsm = 0;
+               entry->reclaim_boundary_vm = 0;
                LWLockInitialize(&entry->lock, LWTRANCHE_MAP_BUFFER_CONTENT);
        }
 
diff --git a/src/backend/storage/smgr/smgr.c b/src/backend/storage/smgr/smgr.c
index 8ba29edc56..777b66ab4c 100644
--- a/src/backend/storage/smgr/smgr.c
+++ b/src/backend/storage/smgr/smgr.c
@@ -150,6 +150,10 @@ typedef struct f_smgr
        void            (*smgr_invalidate_database_tablespaces) (Oid dbid,
                                                                                
                                 int ntablespaces,
                                                                                
                                 const Oid *tablespace_ids);
+       uint64          (*smgr_get_map_compactor_relocations) (void);
+       uint64          (*smgr_get_map_reclaim_enqueued) (void);
+       uint64          (*smgr_get_map_reclaim_processed) (void);
+       uint64          (*smgr_get_map_reclaim_failed) (void);
        void            (*smgr_mark_skip_wal_pending) (SMgrRelation reln);
        void            (*smgr_clear_skip_wal_pending) (SMgrRelation reln);
        bool            (*smgr_prepare_pendingsync) (SMgrRelation reln);
@@ -194,6 +198,10 @@ static const f_smgr smgrsw[] = {
                .smgr_redo_create_fork = umredocreatefork,
                .smgr_checkpoint_database_tablespaces = 
umcheckpointdatabasetablespaces,
                .smgr_invalidate_database_tablespaces = 
uminvalidatedatabasetablespaces,
+               .smgr_get_map_compactor_relocations = 
umgetmapcompactorrelocations,
+               .smgr_get_map_reclaim_enqueued = umgetmapreclaimenqueued,
+               .smgr_get_map_reclaim_processed = umgetmapreclaimprocessed,
+               .smgr_get_map_reclaim_failed = umgetmapreclaimfailed,
                .smgr_mark_skip_wal_pending = ummarkskipwalpending,
                .smgr_clear_skip_wal_pending = umclearskipwalpending,
                .smgr_prepare_pendingsync = umpreparependingsync,
@@ -694,6 +702,42 @@ smgrinvalidatedatabase(Oid dbid)
        smgrinvalidatedatabasetablespaces(dbid, 0, NULL);
 }
 
+uint64
+smgrgetmapcompactorrelocations(void)
+{
+       if (smgrsw[0].smgr_get_map_compactor_relocations)
+               return smgrsw[0].smgr_get_map_compactor_relocations();
+
+       return 0;
+}
+
+uint64
+smgrgetmapreclaimenqueued(void)
+{
+       if (smgrsw[0].smgr_get_map_reclaim_enqueued)
+               return smgrsw[0].smgr_get_map_reclaim_enqueued();
+
+       return 0;
+}
+
+uint64
+smgrgetmapreclaimprocessed(void)
+{
+       if (smgrsw[0].smgr_get_map_reclaim_processed)
+               return smgrsw[0].smgr_get_map_reclaim_processed();
+
+       return 0;
+}
+
+uint64
+smgrgetmapreclaimfailed(void)
+{
+       if (smgrsw[0].smgr_get_map_reclaim_failed)
+               return smgrsw[0].smgr_get_map_reclaim_failed();
+
+       return 0;
+}
+
 void
 smgrmarkskipwalpending(RelFileLocator rlocator)
 {
diff --git a/src/backend/storage/smgr/umbra.c b/src/backend/storage/smgr/umbra.c
index 61c74a2378..f7e3625b6e 100644
--- a/src/backend/storage/smgr/umbra.c
+++ b/src/backend/storage/smgr/umbra.c
@@ -2633,3 +2633,27 @@ umfiletagmatches(const FileTag *ftag, const FileTag 
*candidate)
                ftag->forknum == candidate->forknum &&
                ftag->segno == candidate->segno;
 }
+
+uint64
+umgetmapcompactorrelocations(void)
+{
+       return MapStatsGetCompactorRelocations();
+}
+
+uint64
+umgetmapreclaimenqueued(void)
+{
+       return MapStatsGetReclaimEnqueued();
+}
+
+uint64
+umgetmapreclaimprocessed(void)
+{
+       return MapStatsGetReclaimProcessed();
+}
+
+uint64
+umgetmapreclaimfailed(void)
+{
+       return MapStatsGetReclaimFailed();
+}
diff --git a/src/backend/storage/sync/sync.c b/src/backend/storage/sync/sync.c
index 51ed171c33..8c533a4ea7 100644
--- a/src/backend/storage/sync/sync.c
+++ b/src/backend/storage/sync/sync.c
@@ -30,6 +30,8 @@
 #include "storage/latch.h"
 #include "storage/md.h"
 #ifdef USE_UMBRA
+#include "access/umbra_xlog.h"
+#include "storage/map.h"
 #include "storage/umbra.h"
 #endif
 #include "utils/hsearch.h"
@@ -69,6 +71,7 @@ typedef struct
        FileTag         tag;                    /* identifies handler and file 
*/
        CycleCtr        cycle_ctr;              /* checkpoint_cycle_ctr when 
request was made */
        bool            canceled;               /* true if request has been 
canceled */
+       SyncRequestType request_type;   /* plain unlink vs reclaim unlink */
 } PendingUnlinkEntry;
 
 static HTAB *pendingOps = NULL;
@@ -128,6 +131,51 @@ static const SyncOps syncsw[] = {
 #endif
 };
 
+static inline const SyncOps *
+SyncOpsForHandler(int16 handler)
+{
+       if (handler < 0 || handler >= (int) lengthof(syncsw))
+               ereport(PANIC,
+                               (errmsg("invalid sync request handler"),
+                                errdetail("handler=%d", handler),
+                                errbacktrace()));
+
+       return &syncsw[handler];
+}
+
+static inline void
+SyncPreUnlinkRequest(const PendingUnlinkEntry *entry)
+{
+#ifdef USE_UMBRA
+       if (entry->request_type == SYNC_RECLAIM_REQUEST &&
+               entry->tag.handler == SYNC_HANDLER_UMBRA)
+               log_umbra_reclaim_unlink(entry->tag.rlocator,
+                                                                
entry->tag.forknum,
+                                                                (BlockNumber) 
entry->tag.segno);
+#else
+       (void) entry;
+#endif
+}
+
+static inline void
+SyncPostUnlinkRequest(const PendingUnlinkEntry *entry,
+                                         int unlink_rc, int unlink_errno)
+{
+#ifdef USE_UMBRA
+       if (entry->request_type == SYNC_RECLAIM_REQUEST)
+       {
+               if (unlink_rc < 0 && unlink_errno != ENOENT)
+                       MapStatsAddReclaimFailed(1);
+               else
+                       MapStatsAddReclaimProcessed(1);
+       }
+#else
+       (void) entry;
+       (void) unlink_rc;
+       (void) unlink_errno;
+#endif
+}
+
 /*
  * Initialize data structures for the file sync tracking.
  */
@@ -220,6 +268,8 @@ SyncPostCheckpoint(void)
        {
                PendingUnlinkEntry *entry = (PendingUnlinkEntry *) lfirst(lc);
                char            path[MAXPGPATH];
+               int                     unlink_rc;
+               int                     unlink_errno;
 
                /* Skip over any canceled entries */
                if (entry->canceled)
@@ -238,8 +288,21 @@ SyncPostCheckpoint(void)
                        break;
 
                /* Unlink the file */
-               if (syncsw[entry->tag.handler].sync_unlinkfiletag(&entry->tag,
-                                                                               
                                  path) < 0)
+               {
+                       const SyncOps *ops = 
SyncOpsForHandler(entry->tag.handler);
+
+                       if (ops->sync_unlinkfiletag == NULL)
+                               ereport(PANIC,
+                                               (errmsg("sync unlink request 
uses handler without unlink function"),
+                                                errdetail("handler=%d", 
entry->tag.handler),
+                                                errbacktrace()));
+
+                       SyncPreUnlinkRequest(entry);
+                       unlink_rc = ops->sync_unlinkfiletag(&entry->tag, path);
+                       unlink_errno = errno;
+               }
+
+               if (unlink_rc < 0)
                {
                        /*
                         * There's a race condition, when the database is 
dropped at the
@@ -248,14 +311,18 @@ SyncPostCheckpoint(void)
                         * here. rmtree() also has to ignore ENOENT errors, to 
deal with
                         * the possibility that we delete the file first.
                         */
-                       if (errno != ENOENT)
-                               ereport(WARNING,
+                       if (unlink_errno != ENOENT)
+                       {
+                               errno = unlink_errno;
+                               ereport(entry->request_type == 
SYNC_RECLAIM_REQUEST ? DEBUG1 : WARNING,
                                                (errcode_for_file_access(),
                                                 errmsg("could not remove file 
\"%s\": %m", path)));
+                       }
                }
 
-               /* Mark the list entry as canceled, just in case */
-               entry->canceled = true;
+                       SyncPostUnlinkRequest(entry, unlink_rc, unlink_errno);
+                       /* Mark the list entry as canceled, just in case */
+                       entry->canceled = true;
 
                /*
                 * As in ProcessSyncRequests, we don't want to stop absorbing 
fsync
@@ -516,27 +583,38 @@ RememberSyncRequest(const FileTag *ftag, SyncRequestType 
type)
                HASH_SEQ_STATUS hstat;
                PendingFsyncEntry *pfe;
                ListCell   *cell;
+               const SyncOps *ops = SyncOpsForHandler(ftag->handler);
+
+               if (ops->sync_filetagmatches == NULL)
+                       ereport(PANIC,
+                                       (errmsg("sync filter request uses 
handler without match function"),
+                                        errdetail("handler=%d", ftag->handler),
+                                        errbacktrace()));
 
                /* Cancel matching fsync requests */
                hash_seq_init(&hstat, pendingOps);
                while ((pfe = (PendingFsyncEntry *) hash_seq_search(&hstat)) != 
NULL)
                {
                        if (pfe->tag.handler == ftag->handler &&
-                               syncsw[ftag->handler].sync_filetagmatches(ftag, 
&pfe->tag))
+                               ops->sync_filetagmatches(ftag, &pfe->tag))
                                pfe->canceled = true;
                }
 
-               /* Cancel matching unlink requests */
+               /* Remove matching reclaim unlink requests only. */
                foreach(cell, pendingUnlinks)
                {
                        PendingUnlinkEntry *pue = (PendingUnlinkEntry *) 
lfirst(cell);
 
                        if (pue->tag.handler == ftag->handler &&
-                               syncsw[ftag->handler].sync_filetagmatches(ftag, 
&pue->tag))
-                               pue->canceled = true;
+                               pue->request_type == SYNC_RECLAIM_REQUEST &&
+                               ops->sync_filetagmatches(ftag, &pue->tag))
+                       {
+                               pendingUnlinks = 
foreach_delete_current(pendingUnlinks, cell);
+                               pfree(pue);
+                       }
                }
        }
-       else if (type == SYNC_UNLINK_REQUEST)
+       else if (type == SYNC_UNLINK_REQUEST || type == SYNC_RECLAIM_REQUEST)
        {
                /* Unlink request: put it in the linked list */
                MemoryContext oldcxt = MemoryContextSwitchTo(pendingOpsCxt);
@@ -546,6 +624,7 @@ RememberSyncRequest(const FileTag *ftag, SyncRequestType 
type)
                entry->tag = *ftag;
                entry->cycle_ctr = checkpoint_cycle_ctr;
                entry->canceled = false;
+               entry->request_type = type;
 
                pendingUnlinks = lappend(pendingUnlinks, entry);
 
diff --git a/src/backend/utils/activity/wait_event_names.txt 
b/src/backend/utils/activity/wait_event_names.txt
index ec5e2eabf4..7cf5598ecf 100644
--- a/src/backend/utils/activity/wait_event_names.txt
+++ b/src/backend/utils/activity/wait_event_names.txt
@@ -63,6 +63,8 @@ LOGICAL_LAUNCHER_MAIN "Waiting in main loop of logical 
replication launcher proc
 LOGICAL_PARALLEL_APPLY_MAIN    "Waiting in main loop of logical replication 
parallel apply process."
 MAPWRITER_HIBERNATE    "Waiting in Umbra map writer process, hibernating."
 MAPWRITER_MAIN "Waiting in main loop of Umbra map writer process."
+MAPCOMPACTOR_HIBERNATE "Waiting in Umbra map compactor process, hibernating."
+MAPCOMPACTOR_MAIN      "Waiting in main loop of Umbra map compactor process."
 RECOVERY_WAL_STREAM    "Waiting in main loop of startup process for WAL to 
arrive, during streaming recovery."
 REPLICATION_SLOTSYNC_MAIN      "Waiting in main loop of slot synchronization."
 REPLICATION_SLOTSYNC_SHUTDOWN  "Waiting for slot sync worker to shut down."
diff --git a/src/backend/utils/adt/pgstatfuncs.c 
b/src/backend/utils/adt/pgstatfuncs.c
index 1408de387e..ded745d995 100644
--- a/src/backend/utils/adt/pgstatfuncs.c
+++ b/src/backend/utils/adt/pgstatfuncs.c
@@ -26,6 +26,7 @@
 #include "pgstat.h"
 #include "postmaster/bgworker.h"
 #include "replication/logicallauncher.h"
+#include "storage/smgr.h"
 #include "storage/proc.h"
 #include "storage/procarray.h"
 #include "utils/acl.h"
@@ -1336,6 +1337,30 @@ pg_stat_get_buf_alloc(PG_FUNCTION_ARGS)
        PG_RETURN_INT64(pgstat_fetch_stat_bgwriter()->buf_alloc);
 }
 
+Datum
+pg_stat_get_map_compactor_relocations(PG_FUNCTION_ARGS)
+{
+       PG_RETURN_INT64((int64) smgrgetmapcompactorrelocations());
+}
+
+Datum
+pg_stat_get_map_reclaim_enqueued(PG_FUNCTION_ARGS)
+{
+       PG_RETURN_INT64((int64) smgrgetmapreclaimenqueued());
+}
+
+Datum
+pg_stat_get_map_reclaim_processed(PG_FUNCTION_ARGS)
+{
+       PG_RETURN_INT64((int64) smgrgetmapreclaimprocessed());
+}
+
+Datum
+pg_stat_get_map_reclaim_failed(PG_FUNCTION_ARGS)
+{
+       PG_RETURN_INT64((int64) smgrgetmapreclaimfailed());
+}
+
 /*
 * When adding a new column to the pg_stat_io view and the
 * pg_stat_get_backend_io() function, add a new enum value here above
diff --git a/src/backend/utils/misc/guc_parameters.dat 
b/src/backend/utils/misc/guc_parameters.dat
index f3726be78d..a45e33e31d 100644
--- a/src/backend/utils/misc/guc_parameters.dat
+++ b/src/backend/utils/misc/guc_parameters.dat
@@ -1932,6 +1932,41 @@
   max => 'MAX_KILOBYTES',
 },
 
+{ name => 'map_compactor_enable', type => 'bool', context => 'PGC_SIGHUP', 
group => 'RESOURCES_BGWRITER',
+  short_desc => 'Enable extent-level compactor planning in Umbra.',
+  variable => 'map_compactor_enable',
+  boot_val => 'false',
+  ifdef => 'USE_UMBRA',
+},
+
+{ name => 'map_compactor_extent_blocks', type => 'int', context => 
'PGC_SIGHUP', group => 'RESOURCES_BGWRITER',
+  short_desc => 'Extent size in blocks for Umbra map compactor planning.',
+  flags => 'GUC_UNIT_BLOCKS',
+  variable => 'map_compactor_extent_blocks',
+  boot_val => '1024',
+  min => '128',
+  max => 'INT_MAX / 2',
+  ifdef => 'USE_UMBRA',
+},
+
+{ name => 'map_compactor_low_live_percent', type => 'int', context => 
'PGC_SIGHUP', group => 'RESOURCES_BGWRITER',
+  short_desc => 'Treat an extent as sparse when live ratio is at or below this 
percentage.',
+  variable => 'map_compactor_low_live_percent',
+  boot_val => '10',
+  min => '1',
+  max => '100',
+  ifdef => 'USE_UMBRA',
+},
+
+{ name => 'map_compactor_max_moves', type => 'int', context => 'PGC_SIGHUP', 
group => 'RESOURCES_BGWRITER',
+  short_desc => 'Maximum relocation moves Umbra map compactor performs per 
round.',
+  variable => 'map_compactor_max_moves',
+  boot_val => '16',
+  min => '0',
+  max => 'INT_MAX / 2',
+  ifdef => 'USE_UMBRA',
+},
+
 { name => 'map_prealloc_fsm_batch', type => 'int', context => 'PGC_SIGHUP', 
group => 'RESOURCES_BGWRITER',
   short_desc => 'Preallocation batch size in blocks for Umbra FSM fork.',
   flags => 'GUC_UNIT_BLOCKS',
@@ -2022,6 +2057,45 @@
   ifdef => 'USE_UMBRA',
 },
 
+{ name => 'map_superblocks', type => 'int', context => 'PGC_POSTMASTER', group 
=> 'RESOURCES_MEM',
+  short_desc => 'Sets the number of dedicated shared-memory slots for Umbra 
MAP superblocks.',
+  long_desc => 'These slots hold hot relation superblock metadata and are not 
managed as an LRU cache.',
+  variable => 'map_superblocks',
+  boot_val => '262144',
+  min => 'MAP_SUPERBLOCK_MIN_ENTRIES',
+  max => 'INT_MAX / 2',
+  ifdef => 'USE_UMBRA',
+},
+
+{ name => 'mapcompactor_busy_alloc_threshold', type => 'int', context => 
'PGC_SIGHUP', group => 'RESOURCES_BGWRITER',
+  short_desc => 'Map compactor yields when recent MAP allocation pressure 
reaches this threshold.',
+  long_desc => '0 disables busy-yield behavior.',
+  variable => 'MapCompactorBusyAllocThreshold',
+  boot_val => '128',
+  min => '0',
+  max => 'INT_MAX / 2',
+  ifdef => 'USE_UMBRA',
+},
+
+{ name => 'mapcompactor_delay', type => 'int', context => 'PGC_SIGHUP', group 
=> 'RESOURCES_BGWRITER',
+  short_desc => 'Umbra map compactor sleep time between rounds.',
+  flags => 'GUC_UNIT_MS',
+  variable => 'MapCompactorDelay',
+  boot_val => '200',
+  min => '1',
+  max => '10000',
+  ifdef => 'USE_UMBRA',
+},
+
+{ name => 'mapcompactor_max_relations', type => 'int', context => 
'PGC_SIGHUP', group => 'RESOURCES_BGWRITER',
+  short_desc => 'Maximum number of relations scanned per Umbra map compactor 
round.',
+  variable => 'MapCompactorMaxRelations',
+  boot_val => '8',
+  min => '0',
+  max => 'INT_MAX',
+  ifdef => 'USE_UMBRA',
+},
+
 { name => 'mapwriter_delay', type => 'int', context => 'PGC_SIGHUP', group => 
'RESOURCES_BGWRITER',
   short_desc => 'Umbra map writer sleep time between rounds.',
   flags => 'GUC_UNIT_MS',
@@ -2059,6 +2133,7 @@
   max => 'INT_MAX / 2',
   ifdef => 'USE_UMBRA',
 },
+
 { name => 'max_active_replication_origins', type => 'int', context => 
'PGC_POSTMASTER', group => 'REPLICATION_SUBSCRIBERS',
   short_desc => 'Sets the maximum number of active replication origins.',
   variable => 'max_active_replication_origins',
diff --git a/src/include/access/umbra_xlog.h b/src/include/access/umbra_xlog.h
index 6b2408d33c..bc068597d0 100644
--- a/src/include/access/umbra_xlog.h
+++ b/src/include/access/umbra_xlog.h
@@ -8,6 +8,7 @@
  * - RANGE_REMAP: atomically establish a range of first-born mappings
  * - RANGE_REMAP_COMPACT: same semantics for contiguous lblk/pblk runs
  * - SKIP_WAL_DENSE_MAP: record non-empty skip-WAL dense lblk==pblk frontiers
+ * - RECLAIM_UNLINK: physically remove one reclaimed relation segment
  *
  *-------------------------------------------------------------------------
  */
@@ -22,6 +23,7 @@
 /* XLOG gives us high 4 bits */
 #define XLOG_UMBRA_MAP_SET                     0x10
 #define XLOG_UMBRA_RANGE_REMAP         0x30
+#define XLOG_UMBRA_RECLAIM_UNLINK      0x40
 #define XLOG_UMBRA_RANGE_REMAP_COMPACT 0x50
 #define XLOG_UMBRA_SKIP_WAL_DENSE_MAP  0x60
 
@@ -74,6 +76,13 @@ typedef struct xl_umbra_skip_wal_dense_map
        xl_umbra_skip_wal_dense_map_entry entries[FLEXIBLE_ARRAY_MEMBER];
 } xl_umbra_skip_wal_dense_map;
 
+typedef struct xl_umbra_reclaim_unlink
+{
+       RelFileLocator rlocator;
+       ForkNumber      forknum;
+       BlockNumber segno;
+} xl_umbra_reclaim_unlink;
+
 extern XLogRecPtr log_umbra_map_set(RelFileLocator rlocator, ForkNumber 
forknum,
                                                                        
BlockNumber lblkno, BlockNumber old_pblkno,
                                                                        
BlockNumber new_pblkno);
@@ -89,6 +98,9 @@ extern XLogRecPtr 
log_umbra_range_remap_compact(RelFileLocator rlocator,
 extern XLogRecPtr log_umbra_skip_wal_dense_map(RelFileLocator rlocator,
                                                                                
           uint16 count,
                                                                                
           const xl_umbra_skip_wal_dense_map_entry *entries);
+extern XLogRecPtr log_umbra_reclaim_unlink(RelFileLocator rlocator,
+                                                                               
   ForkNumber forknum,
+                                                                               
   BlockNumber segno);
 
 extern void umbra_redo(XLogReaderState *record);
 extern void umbra_desc(StringInfo buf, XLogReaderState *record);
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 99fa9a6ede..f11dd11c38 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -6051,6 +6051,26 @@
 { oid => '2859', descr => 'statistics: number of buffer allocations',
   proname => 'pg_stat_get_buf_alloc', provolatile => 's', proparallel => 'r',
   prorettype => 'int8', proargtypes => '', prosrc => 'pg_stat_get_buf_alloc' },
+{ oid => '9781',
+  descr => 'statistics: number of map compactor block relocations',
+  proname => 'pg_stat_get_map_compactor_relocations', provolatile => 's',
+  proparallel => 'r', prorettype => 'int8', proargtypes => '',
+  prosrc => 'pg_stat_get_map_compactor_relocations' },
+{ oid => '9785',
+  descr => 'statistics: number of map reclaim entries enqueued',
+  proname => 'pg_stat_get_map_reclaim_enqueued', provolatile => 's',
+  proparallel => 'r', prorettype => 'int8', proargtypes => '',
+  prosrc => 'pg_stat_get_map_reclaim_enqueued' },
+{ oid => '9782',
+  descr => 'statistics: number of map reclaim entries processed',
+  proname => 'pg_stat_get_map_reclaim_processed', provolatile => 's',
+  proparallel => 'r', prorettype => 'int8', proargtypes => '',
+  prosrc => 'pg_stat_get_map_reclaim_processed' },
+{ oid => '9784',
+  descr => 'statistics: number of map reclaim attempts failed',
+  proname => 'pg_stat_get_map_reclaim_failed', provolatile => 's',
+  proparallel => 'r', prorettype => 'int8', proargtypes => '',
+  prosrc => 'pg_stat_get_map_reclaim_failed' },
 
 { oid => '6214', descr => 'statistics: per backend type IO statistics',
   proname => 'pg_stat_get_io', prorows => '30', proretset => 't',
diff --git a/src/include/postmaster/mapwriter.h 
b/src/include/postmaster/mapwriter.h
index 6c984922b0..6b5dae25f5 100644
--- a/src/include/postmaster/mapwriter.h
+++ b/src/include/postmaster/mapwriter.h
@@ -17,8 +17,12 @@ extern PGDLLIMPORT int MapWriterDelay;
 extern PGDLLIMPORT int MapWriterMaxPages;
 extern PGDLLIMPORT int MapWriterPreallocMaxRelations;
 extern PGDLLIMPORT double MapWriterLRUMultiplier;
+extern PGDLLIMPORT int MapCompactorDelay;
+extern PGDLLIMPORT int MapCompactorMaxRelations;
+extern PGDLLIMPORT int MapCompactorBusyAllocThreshold;
 
 extern void MapBackgroundWorkersRegister(void);
 extern void MapWriterMain(Datum arg);
+extern void MapCompactorMain(Datum arg);
 
 #endif                                                 /* MAPWRITER_H */
diff --git a/src/include/storage/map.h b/src/include/storage/map.h
index c61414fd16..37b721bf83 100644
--- a/src/include/storage/map.h
+++ b/src/include/storage/map.h
@@ -89,10 +89,15 @@ typedef struct MapSharedData
        slock_t         clock_lock;
        int                     first_free_buffer;      /* head of free list, 
-1 if empty */
        int                     mapwriter_procno;       /* procno to wake, -1 
if none */
+       int                     mapcompactor_procno;    /* procno to wake, -1 
if none */
 
        /* statistics */
        pg_atomic_uint32 num_allocs;
        uint32          complete_passes;
+       pg_atomic_uint64 map_compactor_relocations;
+       pg_atomic_uint64 map_reclaim_enqueued;
+       pg_atomic_uint64 map_reclaim_processed;
+       pg_atomic_uint64 map_reclaim_failed;
 
        /* configuration */
        int                     num_slots;
@@ -130,6 +135,14 @@ typedef struct MapInflightBarrier
 
 extern void MapBackendInit(void);
 extern const ShmemCallbacks MapShmemCallbacks;
+extern void MapStatsAddCompactorRelocations(uint64 count);
+extern void MapStatsAddReclaimEnqueued(uint64 count);
+extern void MapStatsAddReclaimProcessed(uint64 count);
+extern void MapStatsAddReclaimFailed(uint64 count);
+extern uint64 MapStatsGetCompactorRelocations(void);
+extern uint64 MapStatsGetReclaimEnqueued(void);
+extern uint64 MapStatsGetReclaimProcessed(void);
+extern uint64 MapStatsGetReclaimFailed(void);
 
 /* Lookup/modification */
 extern bool MapTryLookup(UmbraFileContext *map_ctx, RelFileLocator rnode,
@@ -262,7 +275,10 @@ extern int MapSyncStart(uint32 *complete_passes, uint32 
*num_allocs);
 extern uint32 MapAllocPressurePeek(void);
 extern void MapStrategyNotifyWriter(int mapwriter_procno);
 extern void MapWakeWriter(void);
+extern void MapStrategyNotifyCompactor(int mapcompactor_procno);
+extern void MapWakeCompactor(void);
 extern int     MapPreallocStep(int max_relations);
+extern int     MapCompactorStep(int max_relations);
 
 /* Map cache hash table (in mapclock.c) */
 extern int     MapCacheLookup(RelFileLocator rnode, ForkNumber forknum,
@@ -291,6 +307,10 @@ extern int map_prealloc_fsm_batch;
 extern int     map_prealloc_vm_low;
 extern int     map_prealloc_vm_hard;
 extern int     map_prealloc_vm_batch;
+extern bool map_compactor_enable;
+extern int     map_compactor_extent_blocks;
+extern int     map_compactor_low_live_percent;
+extern int     map_compactor_max_moves;
 
 /* Global data (defined in map.c) */
 extern MapSharedData *MapShared;
diff --git a/src/include/storage/map_internal.h 
b/src/include/storage/map_internal.h
index 368b3da15a..491282c7b2 100644
--- a/src/include/storage/map_internal.h
+++ b/src/include/storage/map_internal.h
@@ -43,6 +43,7 @@ extern bool MapMaybePreallocateFork(UmbraFileContext *map_ctx,
                                                                        
RelFileLocator rnode,
                                                                        
ForkNumber forknum,
                                                                        bool 
background_mode);
+extern void MapReclaimForgetRelation(RelFileLocator rnode);
 extern bool MapInflightTryClaim(UmbraFileContext *map_ctx,
                                                                RelFileLocator 
rnode,
                                                                ForkNumber 
forknum,
diff --git a/src/include/storage/mapsuper_internal.h 
b/src/include/storage/mapsuper_internal.h
index 5d64ddec87..fadbb7f269 100644
--- a/src/include/storage/mapsuper_internal.h
+++ b/src/include/storage/mapsuper_internal.h
@@ -42,6 +42,9 @@ typedef struct MapSuperEntry
        BlockNumber     extending_target_main;
        BlockNumber     extending_target_fsm;
        BlockNumber     extending_target_vm;
+       BlockNumber     reclaim_boundary_main;
+       BlockNumber     reclaim_boundary_fsm;
+       BlockNumber     reclaim_boundary_vm;
        int                     next_free;
        bool            in_use;
        LWLock          lock;
@@ -147,6 +150,8 @@ extern void MapSuperDeleteEntry(RelFileLocator rnode);
 extern bool MapSuperForkExists(const MapSuperblock *super,
                                                           ForkNumber forknum);
 extern uint32 MapSuperPreallocFlag(ForkNumber forknum);
+extern BlockNumber MapSuperGetReclaimBoundary(const MapSuperEntry *entry,
+                                                                               
          ForkNumber forknum);
 extern void MapSBlockBumpPhysicalState(UmbraFileContext *map_ctx,
                                                                           
RelFileLocator rnode,
                                                                           
ForkNumber forknum,
@@ -154,6 +159,14 @@ extern void MapSBlockBumpPhysicalState(UmbraFileContext 
*map_ctx,
                                                                           bool 
bump_next_free,
                                                                           bool 
bump_capacity,
                                                                           
XLogRecPtr map_lsn);
+extern bool MapSBlockTryGetReclaimBoundary(UmbraFileContext *map_ctx,
+                                                                               
   RelFileLocator rnode,
+                                                                               
   ForkNumber forknum,
+                                                                               
   BlockNumber *boundary_pblk);
+extern void MapSBlockAdvanceReclaimBoundary(UmbraFileContext *map_ctx,
+                                                                               
        RelFileLocator rnode,
+                                                                               
        ForkNumber forknum,
+                                                                               
        BlockNumber boundary_pblk);
 extern void MapSuperTableShmemRequest(void);
 extern void MapSuperTableShmemInit(void);
 extern void MapSuperTableShmemAttach(void);
diff --git a/src/include/storage/smgr.h b/src/include/storage/smgr.h
index b7f95ed5d3..a946baaa0e 100644
--- a/src/include/storage/smgr.h
+++ b/src/include/storage/smgr.h
@@ -133,6 +133,10 @@ extern void smgrcheckpointdatabasetablespaces(Oid dbid, 
int ntablespaces,
 extern void smgrinvalidatedatabasetablespaces(Oid dbid, int ntablespaces,
                                                                                
          const Oid *tablespace_ids);
 extern void smgrinvalidatedatabase(Oid dbid);
+extern uint64 smgrgetmapcompactorrelocations(void);
+extern uint64 smgrgetmapreclaimenqueued(void);
+extern uint64 smgrgetmapreclaimprocessed(void);
+extern uint64 smgrgetmapreclaimfailed(void);
 extern void smgrregistershutdowncleanup(void);
 extern void smgrmarkskipwalpending(RelFileLocator rlocator);
 extern void smgrclearskipwalpending(RelFileLocator rlocator);
diff --git a/src/include/storage/sync.h b/src/include/storage/sync.h
index 559a8eea6c..f051b15748 100644
--- a/src/include/storage/sync.h
+++ b/src/include/storage/sync.h
@@ -24,6 +24,7 @@ typedef enum SyncRequestType
 {
        SYNC_REQUEST,                           /* schedule a call of sync 
function */
        SYNC_UNLINK_REQUEST,            /* schedule a call of unlink function */
+       SYNC_RECLAIM_REQUEST,           /* schedule internal reclaim unlink */
        SYNC_FORGET_REQUEST,            /* forget all calls for a tag */
        SYNC_FILTER_REQUEST,            /* forget all calls satisfying match fn 
*/
 } SyncRequestType;
@@ -39,9 +40,7 @@ typedef enum SyncRequestHandler
        SYNC_HANDLER_COMMIT_TS,
        SYNC_HANDLER_MULTIXACT_OFFSET,
        SYNC_HANDLER_MULTIXACT_MEMBER,
-#ifdef USE_UMBRA
        SYNC_HANDLER_UMBRA,
-#endif
        SYNC_HANDLER_NONE,
 } SyncRequestHandler;
 
diff --git a/src/include/storage/umbra.h b/src/include/storage/umbra.h
index 0702f7b392..0cc6388b26 100644
--- a/src/include/storage/umbra.h
+++ b/src/include/storage/umbra.h
@@ -136,6 +136,10 @@ extern int umfd(SMgrRelation reln, ForkNumber forknum, 
BlockNumber blocknum, uin
 extern int umsyncfiletag(const FileTag *ftag, char *path);
 extern int umunlinkfiletag(const FileTag *ftag, char *path);
 extern bool umfiletagmatches(const FileTag *ftag, const FileTag *candidate);
+extern uint64 umgetmapcompactorrelocations(void);
+extern uint64 umgetmapreclaimenqueued(void);
+extern uint64 umgetmapreclaimprocessed(void);
+extern uint64 umgetmapreclaimfailed(void);
 
 /*
  * Runtime semantic helpers.
diff --git a/src/test/recovery/meson.build b/src/test/recovery/meson.build
index da020abc31..9d94dd548a 100644
--- a/src/test/recovery/meson.build
+++ b/src/test/recovery/meson.build
@@ -67,9 +67,13 @@ tests += {
       't/056_umbra_truncate_superblock.pl',
       't/057_umbra_remap_crash_consistency.pl',
       't/058_umbra_2pc_remap_recovery.pl',
+      't/059_umbra_compactor_relocation.pl',
+      't/060_umbra_reclaim_checkpoint_counters.pl',
       't/061_umbra_fsm_vm_map_translation.pl',
       't/062_umbra_truncate_drop_crash_matrix.pl',
       't/063_umbra_mainfork_head_unlink_checkpoint.pl',
+      't/064_umbra_mainfork_internal_reclaim_seg0.pl',
+      't/065_umbra_mainfork_middle_reclaim_keep_seg0.pl',
       't/066_umbra_truncate_redo.pl',
       't/067_umbra_remap_redo.pl',
       't/068_umbra_old_baseline_checkpoint_window.pl',
diff --git a/src/test/recovery/t/059_umbra_compactor_relocation.pl 
b/src/test/recovery/t/059_umbra_compactor_relocation.pl
new file mode 100644
index 0000000000..03e3a0a643
--- /dev/null
+++ b/src/test/recovery/t/059_umbra_compactor_relocation.pl
@@ -0,0 +1,91 @@
+# Verify map compactor relocation survives crash restart.
+#
+# This is UMBRA-specific and skipped in md mode.
+use strict;
+use warnings;
+
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+plan skip_all => 'requires --with-umbra MAP fork'
+       unless check_pg_config('^#define USE_UMBRA 1$');
+
+my $node = PostgreSQL::Test::Cluster->new('master');
+$node->init();
+$node->append_conf(
+       'postgresql.conf', qq{
+autovacuum = off
+full_page_writes = on
+log_min_messages = debug1
+map_superblocks = 50000
+map_compactor_enable = off
+map_compactor_extent_blocks = 128
+map_compactor_low_live_percent = 100
+map_compactor_max_moves = 64
+mapcompactor_delay = 10ms
+mapcompactor_max_relations = 64
+mapcompactor_busy_alloc_threshold = 0
+});
+$node->start();
+
+$node->safe_psql('postgres',
+       q{CREATE TABLE umb_compact_t(id int PRIMARY KEY, payload text);});
+
+$node->safe_psql(
+       'postgres', q{
+INSERT INTO umb_compact_t
+SELECT g, repeat('x', 700) FROM generate_series(1, 25000) g;
+CHECKPOINT;
+UPDATE umb_compact_t
+SET payload = md5(id::text) || repeat('u', 668)
+WHERE id % 2 = 0;
+});
+
+$node->safe_psql(
+       'postgres', q{
+ALTER SYSTEM SET map_compactor_enable = on;
+SELECT pg_reload_conf();
+SELECT pg_sleep(1.0);
+});
+
+ok($node->poll_query_until(
+               'postgres',
+               q{SELECT pg_stat_get_map_compactor_relocations() > 0;}),
+       'map compactor relocation stats become visible');
+
+my $before = $node->safe_psql(
+       'postgres', q{
+SELECT count(*) || ',' ||
+          sum(length(payload))::bigint || ',' ||
+          sum(id)::bigint
+FROM umb_compact_t;
+});
+
+$node->stop('immediate');
+$node->start();
+
+my $after = $node->safe_psql(
+       'postgres', q{
+SELECT count(*) || ',' ||
+          sum(length(payload))::bigint || ',' ||
+          sum(id)::bigint
+FROM umb_compact_t;
+});
+
+is($after, $before, 'aggregate state preserved after compactor + crash 
restart');
+
+my $idx_count = $node->safe_psql(
+       'postgres', q{
+SET enable_seqscan = off;
+SELECT count(*) FROM umb_compact_t WHERE id BETWEEN 100 AND 24000;
+});
+my $seq_count = $node->safe_psql(
+       'postgres', q{
+SET enable_indexscan = off;
+SET enable_bitmapscan = off;
+SELECT count(*) FROM umb_compact_t WHERE id BETWEEN 100 AND 24000;
+});
+is($idx_count, $seq_count, 'index path and seq path return same rowcount');
+
+done_testing();
diff --git a/src/test/recovery/t/060_umbra_reclaim_checkpoint_counters.pl 
b/src/test/recovery/t/060_umbra_reclaim_checkpoint_counters.pl
new file mode 100644
index 0000000000..5f1e3730f0
--- /dev/null
+++ b/src/test/recovery/t/060_umbra_reclaim_checkpoint_counters.pl
@@ -0,0 +1,82 @@
+# Verify reclaim counters remain sane across checkpoint when punch is disabled.
+#
+# This is UMBRA-specific and skipped in md mode.
+use strict;
+use warnings;
+
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+plan skip_all => 'requires --with-umbra MAP fork'
+       unless check_pg_config('^#define USE_UMBRA 1$');
+
+my $node = PostgreSQL::Test::Cluster->new('master');
+$node->init();
+$node->append_conf(
+       'postgresql.conf', qq{
+autovacuum = off
+full_page_writes = on
+checkpoint_timeout = '30min'
+max_wal_size = '4GB'
+log_min_messages = debug1
+map_superblocks = 50000
+map_compactor_enable = off
+map_compactor_extent_blocks = 128
+map_compactor_low_live_percent = 100
+map_compactor_max_moves = 4096
+mapcompactor_delay = 20ms
+mapcompactor_max_relations = 128
+mapcompactor_busy_alloc_threshold = 0
+});
+$node->start();
+
+$node->safe_psql('postgres',
+       q{CREATE TABLE umb_reclaim_t(id int PRIMARY KEY, payload text);});
+
+$node->safe_psql(
+       'postgres', q{
+INSERT INTO umb_reclaim_t
+SELECT g, repeat('x', 700) FROM generate_series(1, 30000) g;
+CHECKPOINT;
+UPDATE umb_reclaim_t
+SET payload = md5(id::text) || repeat('u', 668)
+WHERE id % 2 = 0;
+});
+
+$node->safe_psql(
+       'postgres', q{
+ALTER SYSTEM SET map_compactor_enable = on;
+SELECT pg_reload_conf();
+SELECT pg_sleep(1.5);
+});
+
+ok($node->poll_query_until(
+               'postgres',
+               q{SELECT pg_stat_get_map_compactor_relocations() > 0;}),
+       'map compactor produced relocations');
+
+my ($processed_before, $failed_before) = split(/\|/, $node->safe_psql(
+       'postgres',
+       q{SELECT pg_stat_get_map_reclaim_processed(),
+                 pg_stat_get_map_reclaim_failed();}));
+my $attempt_before = $processed_before + $failed_before;
+
+$node->safe_psql('postgres', q{CHECKPOINT;});
+
+ok($node->safe_psql(
+               'postgres',
+               "SELECT (pg_stat_get_map_reclaim_processed() + 
pg_stat_get_map_reclaim_failed()) >= $attempt_before;") eq 't',
+       'reclaim counters remain monotonic after checkpoint');
+
+my ($processed_after, $failed_after) = split(/\|/, $node->safe_psql(
+       'postgres',
+       q{SELECT pg_stat_get_map_reclaim_processed(),
+                 pg_stat_get_map_reclaim_failed();}));
+cmp_ok($processed_after + $failed_after, '>=', $attempt_before,
+       'reclaim attempt counters remain monotonic');
+
+is($node->safe_psql('postgres', q{SELECT count(*) FROM umb_reclaim_t;}),
+   '30000', 'table remains readable after checkpoint');
+
+done_testing();
diff --git a/src/test/recovery/t/064_umbra_mainfork_internal_reclaim_seg0.pl 
b/src/test/recovery/t/064_umbra_mainfork_internal_reclaim_seg0.pl
new file mode 100644
index 0000000000..d5bc351e10
--- /dev/null
+++ b/src/test/recovery/t/064_umbra_mainfork_internal_reclaim_seg0.pl
@@ -0,0 +1,283 @@
+# Verify UMBRA internal reclaim can physically remove MAIN seg0.
+#
+# This test targets internal physical deletion (reclaim), not DROP/TRUNCATE:
+# - keep the relation alive
+# - churn only a tail key range until MAIN seg1 exists
+# - keep a large untouched prefix so seg0 must still have live mappings
+# - enable compactor and use checkpoint rounds to flush map state to disk
+# - verify seg0 disappears while seg1 still exists
+#
+# In md mode, skip this test.
+use strict;
+use warnings;
+
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+$PostgreSQL::Test::Utils::timeout_default = 600;
+
+plan skip_all => 'requires --with-umbra MAP fork'
+       unless check_pg_config('^#define USE_UMBRA 1$');
+
+my $node = PostgreSQL::Test::Cluster->new('master');
+$node->init(allows_streaming => 1);
+$node->append_conf(
+       'postgresql.conf', qq{
+autovacuum = off
+full_page_writes = on
+checkpoint_timeout = '30min'
+max_wal_size = '4GB'
+log_min_messages = debug1
+map_superblocks = 50000
+map_compactor_enable = off
+map_compactor_extent_blocks = 131072
+map_compactor_low_live_percent = 100
+map_compactor_max_moves = 2000000
+mapcompactor_delay = 10ms
+mapcompactor_max_relations = 50000
+mapcompactor_busy_alloc_threshold = 0
+});
+$node->start();
+
+my $map_mode = $node->safe_psql(
+       'postgres', q{
+CREATE OR REPLACE FUNCTION umb_count_mapped_in_seg(rel regclass, segno int)
+RETURNS bigint
+LANGUAGE plpgsql
+AS $$
+DECLARE
+       path text;
+       bs int;
+       seg_blocks bigint;
+       nblocks bigint;
+       lblk bigint;
+       fork_page_idx bigint;
+       group_no bigint;
+       page_no bigint;
+       cur_page_no bigint := -1;
+       page bytea;
+       off int;
+       pblk bigint;
+       seg_lo bigint;
+       seg_hi bigint;
+       cnt bigint := 0;
+BEGIN
+       path := pg_relation_filepath(rel) || '_map';
+       bs := current_setting('block_size')::int;
+       seg_blocks := (1024::bigint * 1024 * 1024) / bs;
+       seg_lo := segno::bigint * seg_blocks;
+       seg_hi := seg_lo + seg_blocks;
+       nblocks := pg_relation_size(rel) / bs;
+
+       FOR lblk IN 0 .. (nblocks - 1) LOOP
+               fork_page_idx := lblk / 2048;
+               group_no := fork_page_idx / 8192;
+               page_no := 3 + group_no * 8194 + (fork_page_idx % 8192);
+               IF page_no <> cur_page_no THEN
+                       page := pg_read_binary_file(path, page_no * bs, bs, 
true);
+                       cur_page_no := page_no;
+               END IF;
+
+               off := ((lblk % 2048) * 4)::int;
+               IF page IS NULL OR length(page) < off + 4 THEN
+                       CONTINUE;
+               END IF;
+               pblk := get_byte(page, off)::bigint
+                         + (get_byte(page, off + 1)::bigint << 8)
+                         + (get_byte(page, off + 2)::bigint << 16)
+                         + (get_byte(page, off + 3)::bigint << 24);
+
+               IF pblk <> 4294967295::bigint AND pblk >= seg_lo AND pblk < 
seg_hi THEN
+                       cnt := cnt + 1;
+               END IF;
+       END LOOP;
+
+       RETURN cnt;
+END;
+$$;
+
+CREATE TABLE umb_reclaim_seg0_t(id int PRIMARY KEY, payload text);
+INSERT INTO umb_reclaim_seg0_t
+SELECT g, repeat('x', 2000) FROM generate_series(1, 40000) g;
+SELECT 
COALESCE(encode(pg_read_binary_file(pg_relation_filepath('umb_reclaim_seg0_t') 
|| '_map', 0, 1, true), 'hex'), '') <> '';
+});
+
+my $main_path = $node->safe_psql(
+       'postgres',
+       q{SELECT pg_relation_filepath('umb_reclaim_seg0_t');}
+);
+
+$node->backup('bkp_reclaim_seg0');
+my $standby = PostgreSQL::Test::Cluster->new('standby');
+$standby->init_from_backup($node, 'bkp_reclaim_seg0', has_streaming => 1);
+$standby->start;
+is($standby->safe_psql('postgres',
+               q{SELECT pg_relation_filepath('umb_reclaim_seg0_t');}),
+       $main_path,
+       'primary and standby see same relpath');
+
+my $logical_blocks = $node->safe_psql(
+       'postgres',
+       q{SELECT pg_relation_size('umb_reclaim_seg0_t') / 
current_setting('block_size')::int;}
+);
+cmp_ok($logical_blocks, '>', 0, 'table has non-zero logical blocks');
+cmp_ok($logical_blocks, '<', 131072,
+       'table initially fits in MAIN seg0 before churn');
+
+my $seg1_size = -1;
+is($seg1_size, -1,
+       'MAIN seg1 does not exist before boundary-crossing partial churn');
+
+my $mapped_seg0 = $node->safe_psql(
+       'postgres',
+       q{SELECT umb_count_mapped_in_seg('umb_reclaim_seg0_t'::regclass, 0);}
+);
+cmp_ok($mapped_seg0, '>', 0,
+       'current mappings initially reside in seg0 before final crossing 
phase');
+
+# Drive the physical frontier to seg1 by repeatedly updating only the tail
+# portion of the table. The untouched prefix should keep seg0 live, while the
+# churned tail should eventually push new mappings into seg1.
+my $cross_rounds = 0;
+while ($seg1_size < 16 * 1024 * 1024)
+{
+       $cross_rounds++;
+       die "tail churn did not push MAIN seg1 past prealloc-only size\n"
+               if $cross_rounds > 64;
+
+       $node->safe_psql(
+               'postgres',
+               "UPDATE umb_reclaim_seg0_t " .
+               "SET payload = md5((id + ($cross_rounds * 1000000))::text) || 
repeat('u', 1968) " .
+               "WHERE id > 20000;");
+
+       $seg1_size = $node->safe_psql(
+               'postgres',
+               "SELECT COALESCE((pg_stat_file('$main_path.1', true)).size, 
-1);");
+}
+
+cmp_ok($seg1_size, '>=', 16 * 1024 * 1024,
+       'MAIN seg1 grew beyond prealloc-only tail after controlled churn');
+
+# The physical frontier can cross into seg1 before any *current* mapping is
+# redirected there. Keep churning the tail until the relation's logical size
+# itself also extends beyond seg0, guaranteeing that some live mappings now
+# reside in seg1.
+my $post_cross_logical = $node->safe_psql(
+       'postgres',
+       q{SELECT pg_relation_size('umb_reclaim_seg0_t') / 
current_setting('block_size')::int;}
+);
+my $post_cross_round = 0;
+while ($post_cross_logical <= 140000)
+{
+       $post_cross_round++;
+       die "post-cross tail churn did not push logical blocks into seg1\n"
+               if $post_cross_round > 24;
+
+       $node->safe_psql(
+               'postgres',
+               "UPDATE umb_reclaim_seg0_t " .
+               "SET payload = md5((id + (9000000 + $post_cross_round * 
1000000))::text) || repeat('v', 1968) " .
+               "WHERE id > 20000;");
+       $post_cross_logical = $node->safe_psql(
+               'postgres',
+               q{SELECT pg_relation_size('umb_reclaim_seg0_t') / 
current_setting('block_size')::int;}
+       );
+}
+cmp_ok($post_cross_logical, '>', 131072,
+       'logical relation size extends into seg1 before compactor reclaim');
+
+is($node->safe_psql(
+               'postgres',
+               "SELECT COALESCE((pg_stat_file('$main_path', true)).size, -1) 
>= 0;"),
+       't',
+       'MAIN seg0 still exists before compactor reclaim');
+
+is($node->safe_psql(
+               'postgres',
+               q{SELECT count(*) FROM umb_reclaim_seg0_t WHERE id <= 20000;}),
+       '20000',
+       'untouched prefix remains visible before compactor reclaim');
+
+my $reloc_before = $node->safe_psql(
+       'postgres',
+       q{SELECT pg_stat_get_map_compactor_relocations();}
+);
+my $reclaim_processed_before = $node->safe_psql(
+       'postgres',
+       q{SELECT pg_stat_get_map_reclaim_processed();}
+);
+my $reclaim_enqueued_before = $node->safe_psql(
+       'postgres',
+       q{SELECT pg_stat_get_map_reclaim_enqueued();}
+);
+
+$node->safe_psql(
+       'postgres', q{
+ALTER SYSTEM SET map_compactor_enable = on;
+SELECT pg_reload_conf();
+});
+
+my $removed_on_primary = 'f';
+my $reclaim_processed = 'f';
+my $reloc_advanced = 'f';
+my $reclaim_enqueued = 'f';
+for my $round (1 .. 60)
+{
+       $node->safe_psql('postgres', q{CHECKPOINT; SELECT pg_sleep(0.5);});
+       $reloc_advanced = $node->safe_psql(
+               'postgres',
+               "SELECT pg_stat_get_map_compactor_relocations() > 
$reloc_before;");
+       last if $reloc_advanced eq 't';
+}
+is($reloc_advanced, 't',
+       'compactor relocation counter advanced before internal reclaim 
completed');
+for my $round (1 .. 120)
+{
+       $node->safe_psql('postgres', q{CHECKPOINT; SELECT pg_sleep(0.5);});
+       $reclaim_enqueued = $node->safe_psql(
+               'postgres',
+               "SELECT pg_stat_get_map_reclaim_enqueued() > 
$reclaim_enqueued_before;");
+       last if $reclaim_enqueued eq 't';
+}
+
+for my $round (1 .. 20)
+{
+       $node->safe_psql('postgres', q{CHECKPOINT; SELECT pg_sleep(1.0);});
+       $reclaim_processed = $node->safe_psql(
+               'postgres',
+               "SELECT pg_stat_get_map_reclaim_processed() > 
$reclaim_processed_before;");
+       $removed_on_primary = $node->safe_psql(
+               'postgres',
+               "SELECT COALESCE((pg_stat_file('$main_path', true)).size, -1) = 
-1;");
+       last if $reclaim_processed eq 't' && $removed_on_primary eq 't';
+}
+is($reclaim_processed, 't',
+       'internal reclaim processed seg0 after compactor relocation');
+is($removed_on_primary, 't',
+       'MAIN seg0 is physically removed by internal reclaim after checkpoint 
rounds');
+$reclaim_enqueued = $node->safe_psql(
+       'postgres',
+       "SELECT pg_stat_get_map_reclaim_enqueued() > 
$reclaim_enqueued_before;");
+is($reclaim_enqueued, 't',
+       'internal reclaim was enqueued before seg0 was physically removed');
+
+my $until_lsn = $node->safe_psql('postgres', q{SELECT pg_current_wal_lsn();});
+ok($standby->poll_query_until(
+               'postgres',
+               "SELECT '$until_lsn'::pg_lsn <= pg_last_wal_replay_lsn();"),
+       'standby caught up to reclaim WAL');
+ok($standby->poll_query_until(
+               'postgres',
+               "SELECT COALESCE((pg_stat_file('$main_path', true)).size, -1) = 
-1;"),
+       'standby MAIN seg0 is physically removed after replay');
+
+is($node->safe_psql('postgres', q{SELECT count(*) FROM umb_reclaim_seg0_t;}),
+       '40000',
+       'relation remains readable after internal seg0 reclaim');
+is($standby->safe_psql('postgres', q{SELECT count(*) FROM 
umb_reclaim_seg0_t;}),
+       '40000',
+       'standby relation remains readable after replay');
+
+done_testing();
diff --git a/src/test/recovery/t/065_umbra_mainfork_middle_reclaim_keep_seg0.pl 
b/src/test/recovery/t/065_umbra_mainfork_middle_reclaim_keep_seg0.pl
new file mode 100644
index 0000000000..e863acd958
--- /dev/null
+++ b/src/test/recovery/t/065_umbra_mainfork_middle_reclaim_keep_seg0.pl
@@ -0,0 +1,356 @@
+# Verify UMBRA internal reclaim can remove a middle MAIN segment while seg0
+# remains present.
+#
+# Target behavior:
+# - relation stays alive
+# - middle segment (.1) is physically removed by internal reclaim after 
checkpoint
+# - seg0 file remains present
+# - after reclaim, writes on logical seg1 range + checkpoints keep 
primary/standby consistent
+#
+# In md mode, skip this test.
+use strict;
+use warnings;
+
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+$PostgreSQL::Test::Utils::timeout_default = 600;
+
+plan skip_all => 'requires --with-umbra MAP fork'
+       unless check_pg_config('^#define USE_UMBRA 1$');
+
+sub seg_path
+{
+       my ($main_path, $segno) = @_;
+
+       return $segno == 0 ? $main_path : "$main_path.$segno";
+}
+
+sub highest_existing_segno
+{
+       my ($node, $main_path, $max_segno) = @_;
+       my $highest = 0;
+
+       for my $segno (0 .. $max_segno)
+       {
+               my $exists = $node->safe_psql(
+                       'postgres',
+                       "SELECT COALESCE((pg_stat_file('" . 
seg_path($main_path, $segno) .
+                       "', true)).size, -1) >= 0;");
+               $highest = $segno if $exists eq 't';
+       }
+
+       return $highest;
+}
+
+sub ctid_id_band_for_segno
+{
+       my ($node, $relname, $segno) = @_;
+       my $sql = qq{
+WITH params AS (
+       SELECT (1024::bigint * 1024 * 1024) /
+                  current_setting('block_size')::int AS seg_blocks
+),
+seg AS (
+       SELECT min(id) AS min_id,
+                  max(id) AS max_id,
+                  count(*) AS cnt
+       FROM $relname, params
+       WHERE split_part(trim(both '()' from ctid::text), ',', 1)::bigint >=
+                 $segno * seg_blocks
+         AND split_part(trim(both '()' from ctid::text), ',', 1)::bigint <
+                 ($segno + 1) * seg_blocks
+)
+SELECT COALESCE(min_id::text, '') || '|' ||
+          COALESCE(max_id::text, '') || '|' ||
+          cnt::text
+FROM seg;
+};
+       my ($min_id, $max_id, $cnt) =
+         split(/\|/, $node->safe_psql('postgres', $sql));
+
+       return ($min_id, $max_id, $cnt);
+}
+
+my $node = PostgreSQL::Test::Cluster->new('master');
+$node->init(allows_streaming => 1);
+$node->append_conf(
+       'postgresql.conf', qq{
+autovacuum = off
+full_page_writes = on
+checkpoint_timeout = '1d'
+max_wal_size = '64GB'
+wal_keep_size = '1GB'
+log_min_messages = debug1
+map_superblocks = 50000
+map_compactor_enable = off
+map_compactor_extent_blocks = 131072
+map_compactor_low_live_percent = 60
+map_compactor_max_moves = 2000000
+mapcompactor_delay = 10ms
+mapcompactor_max_relations = 256
+mapcompactor_busy_alloc_threshold = 0
+});
+$node->start();
+
+my $map_mode = $node->safe_psql(
+       'postgres', q{
+CREATE TABLE umb_reclaim_mid_t(id int PRIMARY KEY, payload text);
+ALTER TABLE umb_reclaim_mid_t ALTER COLUMN payload SET STORAGE PLAIN;
+INSERT INTO umb_reclaim_mid_t
+SELECT g, repeat('x', 7000) FROM generate_series(1, 300000) g;
+SELECT 
COALESCE(encode(pg_read_binary_file(pg_relation_filepath('umb_reclaim_mid_t') 
|| '_map', 0, 1, true), 'hex'), '') <> '';
+});
+
+my $main_path = $node->safe_psql(
+       'postgres',
+       q{SELECT pg_relation_filepath('umb_reclaim_mid_t');}
+);
+my $max_table_id = 300000;
+my $phase2_max_update_id = $max_table_id;
+my $target_middle_segno = 1;
+my ($phase1_min_id, $phase1_max_id, $phase1_cnt) =
+  ctid_id_band_for_segno($node, 'umb_reclaim_mid_t', $target_middle_segno);
+my $seg_prealloc_bytes = 4 * 1024 * 1024;
+
+is($node->safe_psql(
+               'postgres',
+               "SELECT COALESCE((pg_stat_file('$main_path', true)).size, -1) 
>= 0;"),
+       't',
+       'MAIN seg0 exists after load');
+is($node->safe_psql(
+                       'postgres',
+                       "SELECT COALESCE((pg_stat_file('$main_path.1', 
true)).size, -1) >= 0;"),
+       't',
+       'MAIN seg1 exists after load');
+cmp_ok($phase1_cnt, '>', 0, 'identified initial seg1 id band before frontier 
push');
+is($phase1_max_id - $phase1_min_id + 1,
+   $phase1_cnt,
+   'initial seg1 id band is contiguous');
+
+my $frontier_crossed_seg1 = 'f';
+my $seg2_size_after_load = $node->safe_psql(
+       'postgres',
+       "SELECT COALESCE((pg_stat_file('" . seg_path($main_path, 2) .
+       "', true)).size, -1);");
+my $update_plan = $node->safe_psql(
+       'postgres', qq{
+SET enable_seqscan = off;
+SET enable_bitmapscan = off;
+SET enable_indexscan = on;
+EXPLAIN (COSTS OFF)
+UPDATE umb_reclaim_mid_t
+SET payload = md5((id + 1000000)::text) || repeat('u', 6968)
+WHERE id BETWEEN $phase1_min_id AND $phase1_max_id;
+});
+
+like($update_plan, qr/Index Scan using umb_reclaim_mid_t_pkey/i,
+       'update path uses primary-key index scan');
+unlike($update_plan, qr/Seq Scan|Bitmap/i,
+       'update path avoids seqscan/bitmapscan');
+
+       $frontier_crossed_seg1 = 't' if $seg2_size_after_load > 
$seg_prealloc_bytes;
+
+is($frontier_crossed_seg1, 't',
+   'initial load already pushed seg2 past the 4MB preallocation watermark');
+
+my ($target_min_id, $target_max_id, $target_cnt) =
+  ctid_id_band_for_segno($node, 'umb_reclaim_mid_t', $target_middle_segno);
+cmp_ok($target_cnt, '>', 0,
+       're-identified current seg1 id band after frontier crossed seg1');
+is($target_max_id - $target_min_id + 1,
+   $target_cnt,
+   'current seg1 id band is contiguous before backup');
+
+$node->backup('bkp_reclaim_mid');
+my $standby = PostgreSQL::Test::Cluster->new('standby');
+$standby->init_from_backup($node, 'bkp_reclaim_mid', has_streaming => 1);
+$standby->start;
+is($standby->safe_psql('postgres',
+               q{SELECT pg_relation_filepath('umb_reclaim_mid_t');}),
+       $main_path,
+       'primary and standby see same relpath');
+
+$node->safe_psql(
+       'postgres', q{
+ALTER SYSTEM SET map_compactor_enable = on;
+SELECT pg_reload_conf();
+SELECT pg_sleep(1.0);
+});
+
+my $enq_before = $node->safe_psql(
+       'postgres',
+       q{SELECT pg_stat_get_map_reclaim_enqueued();}
+);
+my $reclaim_enqueued = 'f';
+my ($phase2_min_id, $phase2_max_id, $phase2_cnt);
+my ($phase2_update_min_id, $phase2_update_max_id);
+my $phase2_floor_id = $phase1_min_id;
+
+# Phase 2: now that the frontier is in seg2+ and standby has copied the shaped
+# state, repeatedly cross checkpoint boundaries and rewrite the current seg1 id
+# band. Keep the lower bound anchored at the current seg1 band so we do not
+# punch seg0, but let the upper bound run past the original 160000-row load to
+# keep pressure on tail allocations.
+for my $i (1 .. 12)
+{
+       my $seed = 10000000 + $i * 1000000;
+
+       $node->safe_psql('postgres', q{CHECKPOINT;});
+       ($phase2_min_id, $phase2_max_id, $phase2_cnt) =
+         ctid_id_band_for_segno($node, 'umb_reclaim_mid_t', 
$target_middle_segno);
+       last if $phase2_cnt <= 0;
+       $phase2_update_min_id =
+         $phase2_min_id > $phase2_floor_id ? $phase2_min_id : $phase2_floor_id;
+       $phase2_update_max_id = $phase2_max_update_id;
+       $node->safe_psql(
+               'postgres',
+               "SET enable_seqscan = off; " .
+               "SET enable_bitmapscan = off; " .
+               "SET enable_indexscan = on; " .
+               "UPDATE umb_reclaim_mid_t " .
+               "SET payload = md5((id + $seed)::text) || repeat('w', 6968) " .
+               "WHERE id BETWEEN $phase2_update_min_id AND 
$phase2_update_max_id;");
+
+       $reclaim_enqueued = $node->safe_psql(
+               'postgres',
+               "SELECT pg_stat_get_map_reclaim_enqueued() > $enq_before;");
+       last if $reclaim_enqueued eq 't';
+}
+
+if ($reclaim_enqueued ne 't')
+{
+       for my $round (1 .. 24)
+       {
+               $node->safe_psql('postgres', q{CHECKPOINT; SELECT 
pg_sleep(0.5);});
+               $reclaim_enqueued = $node->safe_psql(
+                       'postgres',
+                       "SELECT pg_stat_get_map_reclaim_enqueued() > 
$enq_before;");
+               last if $reclaim_enqueued eq 't';
+       }
+}
+
+is($reclaim_enqueued, 't',
+       'compactor enqueued reclaim unlink after seg1 pages were remapped into 
tail space');
+
+my $middle_seg_removed_on_primary = 'f';
+for my $round (1 .. 16)
+{
+       $node->safe_psql('postgres', q{CHECKPOINT; SELECT pg_sleep(0.2);});
+
+       my $removed = $node->safe_psql(
+               'postgres',
+               "SELECT COALESCE((pg_stat_file('" .
+               seg_path($main_path, $target_middle_segno) .
+               "', true)).size, -1) = -1;");
+       $middle_seg_removed_on_primary = 't' if $removed eq 't';
+       last if $middle_seg_removed_on_primary eq 't';
+}
+is($middle_seg_removed_on_primary, 't',
+       'segment 1 is physically removed by internal reclaim after checkpoint 
rounds');
+ok($node->poll_query_until(
+                       'postgres',
+                       "SELECT COALESCE((pg_stat_file('$main_path', 
true)).size, -1) >= 0;"),
+       'MAIN seg0 file still exists after middle-segment reclaim');
+
+my $until_lsn = $node->safe_psql('postgres', q{SELECT pg_current_wal_lsn();});
+ok($standby->poll_query_until(
+               'postgres',
+               "SELECT '$until_lsn'::pg_lsn <= pg_last_wal_replay_lsn();"),
+       'standby caught up to reclaim WAL');
+ok($standby->poll_query_until(
+               'postgres',
+               "SELECT COALESCE((pg_stat_file('" .
+               seg_path($main_path, $target_middle_segno) .
+               "', true)).size, -1) = -1;"),
+       'standby segment 1 is physically removed after replay');
+ok($standby->poll_query_until(
+                               'postgres',
+                               "SELECT COALESCE((pg_stat_file('$main_path', 
true)).size, -1) >= 0;"),
+       'standby MAIN seg0 file still exists after replay');
+
+# Post-reclaim stability round:
+# run further writes including logical seg1 ranges, checkpoint, and verify
+# primary/standby remain consistent while seg1 stays absent.
+$node->safe_psql(
+       'postgres', qq{
+UPDATE umb_reclaim_mid_t
+SET payload = md5((id + 7000000)::text) || repeat('r', 6968)
+WHERE id BETWEEN 1 AND 5000;
+
+UPDATE umb_reclaim_mid_t
+SET payload = md5((id + 9000000)::text) || repeat('s', 6968)
+WHERE id BETWEEN $target_min_id AND $target_max_id;
+
+DELETE FROM umb_reclaim_mid_t WHERE id BETWEEN 20001 AND 21000;
+INSERT INTO umb_reclaim_mid_t
+SELECT g, md5((g + 12000000)::text) || repeat('n', 6968)
+FROM generate_series(20001, 21000) g;
+
+CHECKPOINT;
+});
+
+my $stability_lsn = $node->safe_psql('postgres', q{SELECT 
pg_current_wal_lsn();});
+ok($standby->poll_query_until(
+               'postgres',
+               "SELECT '$stability_lsn'::pg_lsn <= pg_last_wal_replay_lsn();"),
+       'standby caught up after post-reclaim write round');
+
+ok($node->poll_query_until(
+               'postgres',
+               "SELECT COALESCE((pg_stat_file('" .
+               seg_path($main_path, $target_middle_segno) .
+               "', true)).size, -1) = -1;"),
+       'primary segment 1 stays absent after post-reclaim writes');
+ok($standby->poll_query_until(
+               'postgres',
+               "SELECT COALESCE((pg_stat_file('" .
+               seg_path($main_path, $target_middle_segno) .
+               "', true)).size, -1) = -1;"),
+       'standby segment 1 stays absent after replay of post-reclaim writes');
+
+my $primary_fp = $node->safe_psql(
+       'postgres', q{
+SELECT format('%s|%s|%s|%s',
+                         count(*),
+                         min(id),
+                         max(id),
+                         sum((id::bigint * 3 + length(payload))::bigint))
+FROM umb_reclaim_mid_t;
+});
+is($standby->safe_psql(
+               'postgres', q{
+SELECT format('%s|%s|%s|%s',
+                         count(*),
+                         min(id),
+                         max(id),
+                         sum((id::bigint * 3 + length(payload))::bigint))
+FROM umb_reclaim_mid_t;
+}),
+       $primary_fp,
+       'primary/standby aggregate fingerprint matches after post-reclaim 
writes');
+
+my $primary_sample = $node->safe_psql(
+       'postgres', qq{
+SELECT string_agg(id::text || ':' || left(payload, 12), ',' ORDER BY id)
+FROM umb_reclaim_mid_t
+WHERE id IN (1, 5000, 20001, 21000, $target_min_id, $target_max_id);
+});
+is($standby->safe_psql(
+               'postgres', qq{
+SELECT string_agg(id::text || ':' || left(payload, 12), ',' ORDER BY id)
+FROM umb_reclaim_mid_t
+WHERE id IN (1, 5000, 20001, 21000, $target_min_id, $target_max_id);
+}),
+       $primary_sample,
+       'primary/standby sample rows match after post-reclaim writes');
+
+is($node->safe_psql('postgres', q{SELECT count(*) FROM umb_reclaim_mid_t;}),
+       $max_table_id,
+       'relation remains readable after middle-segment reclaim');
+is($standby->safe_psql('postgres', q{SELECT count(*) FROM umb_reclaim_mid_t;}),
+       $max_table_id,
+       'standby relation remains readable after replay');
+
+done_testing();
-- 
2.50.1 (Apple Git-155)



Reply via email to