---
 src/backend/access/rmgrdesc/Makefile          |   5 +
 src/backend/access/rmgrdesc/meson.build       |   6 +
 src/backend/access/rmgrdesc/umbradesc.c       |  81 ++++
 src/backend/access/rmgrdesc/xlogdesc.c        |   1 +
 src/backend/access/transam/Makefile           |   5 +
 src/backend/access/transam/meson.build        |   6 +
 src/backend/access/transam/rmgr.c             |   3 +
 src/backend/access/transam/umbra_xlog.c       | 227 ++++++++++
 src/backend/access/transam/xlogutils.c        | 400 ++++++++++++++++--
 src/backend/storage/map/mapsuper.c            |   3 +
 src/backend/storage/smgr/umbra.c              |  50 ++-
 src/bin/pg_waldump/rmgrdesc.c                 |   3 +
 src/include/access/rmgrlist.h                 |   3 +
 src/include/access/umbra_xlog.h               |  58 +++
 src/include/access/xloginsert.h               |   4 +
 src/include/access/xlogrecord.h               |  16 +
 src/include/access/xlogutils.h                |   3 +
 src/include/catalog/storage.h                 |   1 +
 src/test/recovery/meson.build                 |   4 +
 .../t/056_umbra_truncate_superblock.pl        |  82 ++++
 .../t/062_umbra_truncate_drop_crash_matrix.pl | 108 +++++
 .../recovery/t/066_umbra_truncate_redo.pl     |  64 +++
 .../t/071_umbra_skip_wal_dense_map.pl         |  65 +++
 23 files changed, 1147 insertions(+), 51 deletions(-)
 create mode 100644 src/backend/access/rmgrdesc/umbradesc.c
 create mode 100644 src/backend/access/transam/umbra_xlog.c
 create mode 100644 src/include/access/umbra_xlog.h
 create mode 100644 src/test/recovery/t/056_umbra_truncate_superblock.pl
 create mode 100644 src/test/recovery/t/062_umbra_truncate_drop_crash_matrix.pl
 create mode 100644 src/test/recovery/t/066_umbra_truncate_redo.pl
 create mode 100644 src/test/recovery/t/071_umbra_skip_wal_dense_map.pl

diff --git a/src/backend/access/rmgrdesc/Makefile 
b/src/backend/access/rmgrdesc/Makefile
index cd95eec37f..4e9a52d8d3 100644
--- a/src/backend/access/rmgrdesc/Makefile
+++ b/src/backend/access/rmgrdesc/Makefile
@@ -32,4 +32,9 @@ OBJS = \
        xactdesc.o \
        xlogdesc.o
 
+ifeq ($(with_umbra), yes)
+OBJS += \
+       umbradesc.o
+endif
+
 include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/access/rmgrdesc/meson.build 
b/src/backend/access/rmgrdesc/meson.build
index d9000ccd9f..f70cdbb587 100644
--- a/src/backend/access/rmgrdesc/meson.build
+++ b/src/backend/access/rmgrdesc/meson.build
@@ -26,4 +26,10 @@ rmgr_desc_sources = files(
   'xlogdesc.c',
 )
 
+if get_option('umbra').enabled()
+  rmgr_desc_sources += files(
+    'umbradesc.c',
+  )
+endif
+
 backend_sources += rmgr_desc_sources
diff --git a/src/backend/access/rmgrdesc/umbradesc.c 
b/src/backend/access/rmgrdesc/umbradesc.c
new file mode 100644
index 0000000000..6bad4bb38e
--- /dev/null
+++ b/src/backend/access/rmgrdesc/umbradesc.c
@@ -0,0 +1,81 @@
+/*-------------------------------------------------------------------------
+ *
+ * umbradesc.c
+ *       rmgr descriptor routines for Umbra MAP WAL records
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "access/umbra_xlog.h"
+#include "common/relpath.h"
+#include "storage/um_defs.h"
+
+static RelPathStr
+umbra_metadata_relpath(RelFileLocator rlocator)
+{
+       RelPathStr      base;
+       RelPathStr      path;
+
+       base = relpathperm(rlocator, MAIN_FORKNUM);
+       snprintf(path.str, sizeof(path.str), "%s_map", base.str);
+       return path;
+}
+
+static RelPathStr
+umbra_fork_relpath(RelFileLocator rlocator, ForkNumber forknum)
+{
+       if (forknum == UMBRA_METADATA_FORKNUM)
+               return umbra_metadata_relpath(rlocator);
+
+       return relpathperm(rlocator, forknum);
+}
+
+void
+umbra_desc(StringInfo buf, XLogReaderState *record)
+{
+       char       *rec = XLogRecGetData(record);
+       uint8           info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
+
+       if (info == XLOG_UMBRA_MAP_SET)
+       {
+               xl_umbra_map_set *xlrec = (xl_umbra_map_set *) rec;
+               RelPathStr      path = umbra_fork_relpath(xlrec->rlocator, 
xlrec->forknum);
+
+               appendStringInfo(buf, "%s lblk %u old %u new %u",
+                                                path.str, xlrec->lblkno, 
xlrec->old_pblkno,
+                                                xlrec->new_pblkno);
+       }
+       else if (info == XLOG_UMBRA_SKIP_WAL_DENSE_MAP)
+       {
+               xl_umbra_skip_wal_dense_map *xlrec =
+                       (xl_umbra_skip_wal_dense_map *) rec;
+               RelPathStr      path = umbra_metadata_relpath(xlrec->rlocator);
+
+               appendStringInfo(buf, "%s skip_wal_dense count %u",
+                                                path.str, xlrec->count);
+               for (uint16 i = 0; i < xlrec->count; i++)
+                       appendStringInfo(buf, " fork %d nblocks %u",
+                                                        
xlrec->entries[i].forknum,
+                                                        
xlrec->entries[i].nblocks);
+       }
+}
+
+const char *
+umbra_identify(uint8 info)
+{
+       const char *id = NULL;
+
+       switch (info & ~XLR_INFO_MASK)
+       {
+               case XLOG_UMBRA_MAP_SET:
+                       id = "MAP_SET";
+                       break;
+               case XLOG_UMBRA_SKIP_WAL_DENSE_MAP:
+                       id = "SKIP_WAL_DENSE_MAP";
+                       break;
+       }
+
+       return id;
+}
diff --git a/src/backend/access/rmgrdesc/xlogdesc.c 
b/src/backend/access/rmgrdesc/xlogdesc.c
index 2468a7d257..0fc4f48ca6 100644
--- a/src/backend/access/rmgrdesc/xlogdesc.c
+++ b/src/backend/access/rmgrdesc/xlogdesc.c
@@ -324,6 +324,7 @@ XLogRecGetBlockRefInfo(XLogReaderState *record, bool pretty,
                if (detailed_format)
                {
                        /* Get block references in detailed format. */
+                       DecodedBkpBlock *blkref = XLogRecGetBlock(record, 
block_id);
 
                        if (pretty)
                                appendStringInfoChar(buf, '\t');
diff --git a/src/backend/access/transam/Makefile 
b/src/backend/access/transam/Makefile
index a32f473e0a..920625d345 100644
--- a/src/backend/access/transam/Makefile
+++ b/src/backend/access/transam/Makefile
@@ -39,6 +39,11 @@ OBJS = \
        xlogutils.o \
        xlogwait.o
 
+ifeq ($(with_umbra), yes)
+OBJS += \
+       umbra_xlog.o
+endif
+
 include $(top_srcdir)/src/backend/common.mk
 
 # ensure that version checks in xlog.c get recompiled when catversion.h changes
diff --git a/src/backend/access/transam/meson.build 
b/src/backend/access/transam/meson.build
index 06aadc7f31..57eaf44af8 100644
--- a/src/backend/access/transam/meson.build
+++ b/src/backend/access/transam/meson.build
@@ -27,6 +27,12 @@ backend_sources += files(
   'xlogwait.c',
 )
 
+if get_option('umbra').enabled()
+  backend_sources += files(
+    'umbra_xlog.c',
+  )
+endif
+
 # used by frontend programs to build a frontend xlogreader
 xlogreader_sources = files(
   'xlogreader.c',
diff --git a/src/backend/access/transam/rmgr.c 
b/src/backend/access/transam/rmgr.c
index 4fda03a3cf..bb6beaa71d 100644
--- a/src/backend/access/transam/rmgr.c
+++ b/src/backend/access/transam/rmgr.c
@@ -30,6 +30,9 @@
 #include "access/multixact.h"
 #include "access/nbtxlog.h"
 #include "access/spgxlog.h"
+#ifdef USE_UMBRA
+#include "access/umbra_xlog.h"
+#endif
 #include "access/xact.h"
 #include "catalog/storage_xlog.h"
 #include "commands/dbcommands_xlog.h"
diff --git a/src/backend/access/transam/umbra_xlog.c 
b/src/backend/access/transam/umbra_xlog.c
new file mode 100644
index 0000000000..71c7ad7bb1
--- /dev/null
+++ b/src/backend/access/transam/umbra_xlog.c
@@ -0,0 +1,227 @@
+/*-------------------------------------------------------------------------
+ *
+ * umbra_xlog.c
+ *       WAL support for Umbra MAP lifecycle records.
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "access/umbra_xlog.h"
+#include "access/xlog.h"
+#include "access/xloginsert.h"
+#include "storage/map.h"
+#include "storage/smgr.h"
+#include "storage/umbra.h"
+#include "storage/umfile.h"
+
+/*
+ * Log a mapping establishment/switch for one logical block.
+ *
+ * The chosen physical block number is recorded in WAL so redo never allocates
+ * locally; that keeps mapping deterministic in recovery.
+ */
+XLogRecPtr
+log_umbra_map_set(RelFileLocator rlocator, ForkNumber forknum,
+                                 BlockNumber lblkno, BlockNumber old_pblkno,
+                                 BlockNumber new_pblkno)
+{
+       xl_umbra_map_set xlrec;
+
+       xlrec.rlocator = rlocator;
+       xlrec.forknum = forknum;
+       xlrec.lblkno = lblkno;
+       xlrec.old_pblkno = old_pblkno;
+       xlrec.new_pblkno = new_pblkno;
+
+       XLogBeginInsert();
+       XLogRegisterData((char *) &xlrec, sizeof(xlrec));
+
+       return XLogInsert(RM_UMBRA_ID, XLOG_UMBRA_MAP_SET | 
XLR_SPECIAL_REL_UPDATE);
+}
+
+XLogRecPtr
+log_umbra_skip_wal_dense_map(RelFileLocator rlocator,
+                                                        uint16 count,
+                                                        const 
xl_umbra_skip_wal_dense_map_entry *entries)
+{
+       xl_umbra_skip_wal_dense_map xlrec;
+
+       Assert(count > 0);
+       Assert(entries != NULL);
+
+       xlrec.rlocator = rlocator;
+       xlrec.count = count;
+       xlrec.padding = 0;
+
+       XLogBeginInsert();
+       XLogRegisterData((char *) &xlrec,
+                                        offsetof(xl_umbra_skip_wal_dense_map, 
entries));
+       XLogRegisterData((char *) entries,
+                                        
sizeof(xl_umbra_skip_wal_dense_map_entry) * count);
+
+       return XLogInsert(RM_UMBRA_ID,
+                                         XLOG_UMBRA_SKIP_WAL_DENSE_MAP | 
XLR_SPECIAL_REL_UPDATE);
+}
+
+void
+umbra_redo(XLogReaderState *record)
+{
+       uint8           info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
+
+       /* Backup blocks are not used in Umbra MAP records. */
+       Assert(!XLogRecHasAnyBlockRefs(record));
+
+       switch (info)
+       {
+               case XLOG_UMBRA_MAP_SET:
+                       {
+                               xl_umbra_map_set *xlrec = (xl_umbra_map_set *) 
XLogRecGetData(record);
+                               SMgrRelation reln;
+                               UmbraFileContext *ctx;
+                               static const PGIOAlignedBlock zero_page = {{0}};
+                               bool            materialized = false;
+
+                               reln = smgropen(xlrec->rlocator, 
INVALID_PROC_NUMBER);
+                               ctx = umfile_ctx_acquire(reln->smgr_rlocator);
+
+                               /*
+                                * During replay of drop/tablespace churn, the 
relation path can
+                                * already be gone. Treat missing MAIN+MAP as 
stale WAL and skip.
+                                */
+                               if (!UmMetadataExists(reln))
+                                       break;
+
+                               /*
+                                * MAP_SET(old,new) replays the physical copy 
first, then switches
+                                * the mapping. This keeps crash recovery 
independent of whether
+                                * background flushing persisted the new 
physical page before crash.
+                                */
+                               if (xlrec->old_pblkno != InvalidBlockNumber)
+                               {
+                                       BlockNumber nblocks;
+                                       char            pagebuf[BLCKSZ];
+
+                                       nblocks = umfile_ctx_get_nblocks(ctx, 
xlrec->forknum,
+                                                                               
                         UMFILE_NBLOCKS_SPARSE);
+                                       if (xlrec->old_pblkno < nblocks)
+                                       {
+                                               umfile_ctx_read(ctx, 
xlrec->forknum, xlrec->old_pblkno,
+                                                                               
pagebuf, BLCKSZ);
+                                               umfile_ctx_extend(ctx, 
xlrec->forknum, xlrec->new_pblkno,
+                                                                               
  pagebuf);
+                                               umfile_ctx_register_dirty(ctx, 
xlrec->forknum,
+                                                                               
                  xlrec->new_pblkno,
+                                                                               
                  false, false);
+                                               materialized = true;
+                                       }
+                                       else
+                                       {
+                                               ereport(DEBUG1,
+                                                               
(errmsg_internal("skip UMBRA MAP_SET relocation replay for relation %u/%u/%u 
fork %d lblk %u: old pblk %u beyond nblocks %u",
+                                                                               
                 xlrec->rlocator.spcOid,
+                                                                               
                 xlrec->rlocator.dbOid,
+                                                                               
                 xlrec->rlocator.relNumber,
+                                                                               
                 xlrec->forknum,
+                                                                               
                 xlrec->lblkno,
+                                                                               
                 xlrec->old_pblkno,
+                                                                               
                 nblocks)));
+                                       }
+                               }
+
+                               MapSetMapping(ctx, xlrec->rlocator, 
xlrec->forknum,
+                                                         xlrec->lblkno, 
xlrec->new_pblkno,
+                                                         record->EndRecPtr);
+                               MapSBlockBumpNextFreePhysBlock(ctx, 
xlrec->rlocator,
+                                                                               
           xlrec->forknum,
+                                                                               
           xlrec->new_pblkno + 1,
+                                                                               
           record->EndRecPtr);
+
+                               /*
+                                * MAP_SET with invalid old_pblkno means first 
mapping for this
+                                * logical block (extend/zeroextend path). Keep 
superblock
+                                * logical_nblocks in sync during redo as well.
+                                */
+                               if (xlrec->old_pblkno == InvalidBlockNumber)
+                               {
+                                       /*
+                                        * Ensure the mapped physical page 
exists even if there is no
+                                        * later WAL record that overwrites it 
(e.g. dummy pages used
+                                        * to fill gaps for smgrextend 
semantics).  It's safe to write
+                                        * zeros even if a later record will 
overwrite the page image.
+                                        */
+                                       umfile_ctx_extend(ctx, xlrec->forknum, 
xlrec->new_pblkno,
+                                                                         
(const char *) zero_page.data);
+                                       umfile_ctx_register_dirty(ctx, 
xlrec->forknum,
+                                                                               
          xlrec->new_pblkno,
+                                                                               
          false, false);
+                                       materialized = true;
+
+                                       MapSBlockBumpLogicalNblocks(ctx, 
xlrec->rlocator,
+                                                                               
                xlrec->forknum,
+                                                                               
                xlrec->lblkno + 1,
+                                                                               
                record->EndRecPtr);
+                               }
+
+                               if (materialized)
+                                       MapSBlockBumpPhysicalNblocks(ctx, 
xlrec->rlocator,
+                                                                               
                 xlrec->forknum,
+                                                                               
                 xlrec->new_pblkno + 1,
+                                                                               
                 record->EndRecPtr);
+                       }
+                       break;
+
+               case XLOG_UMBRA_SKIP_WAL_DENSE_MAP:
+                       {
+                               xl_umbra_skip_wal_dense_map *xlrec;
+                               xl_umbra_skip_wal_dense_map_entry *entries;
+                               SMgrRelation reln;
+                               UmbraFileContext *ctx;
+
+                               xlrec = (xl_umbra_skip_wal_dense_map *) 
XLogRecGetData(record);
+                               entries = xlrec->entries;
+                               reln = smgropen(xlrec->rlocator, 
INVALID_PROC_NUMBER);
+                               ctx = umfile_ctx_acquire(reln->smgr_rlocator);
+
+                               if (!UmMetadataExists(reln))
+                                       break;
+
+                               MapInvalidateRelation(xlrec->rlocator);
+
+                               for (uint16 i = 0; i < xlrec->count; i++)
+                               {
+                                       ForkNumber      forknum = 
entries[i].forknum;
+                                       BlockNumber nblocks = 
entries[i].nblocks;
+
+                                       if 
(!UmbraForkUsesMapTranslation(forknum) ||
+                                               !BlockNumberIsValid(nblocks))
+                                               elog(PANIC,
+                                                        "invalid UMBRA 
skip-WAL dense-map record for relation %u/%u/%u fork %d nblocks %u",
+                                                        xlrec->rlocator.spcOid,
+                                                        xlrec->rlocator.dbOid,
+                                                        
xlrec->rlocator.relNumber,
+                                                        forknum, nblocks);
+                                       Assert(nblocks > 0);
+
+                                       for (BlockNumber lblk = 0; lblk < 
nblocks; lblk++)
+                                               MapSetMapping(ctx, 
xlrec->rlocator, forknum,
+                                                                         lblk, 
lblk, record->EndRecPtr);
+
+                                       MapSBlockBumpNextFreePhysBlock(ctx, 
xlrec->rlocator,
+                                                                               
                   forknum, nblocks,
+                                                                               
                   record->EndRecPtr);
+                                       MapSBlockBumpPhysicalNblocks(ctx, 
xlrec->rlocator,
+                                                                               
                 forknum, nblocks,
+                                                                               
                 record->EndRecPtr);
+                                       MapSBlockSetLogicalNblocks(ctx, 
xlrec->rlocator,
+                                                                               
           forknum, nblocks,
+                                                                               
           record->EndRecPtr);
+                               }
+                       }
+                       break;
+
+               default:
+                       elog(PANIC, "umbra_redo: unknown op code %u", info);
+       }
+}
diff --git a/src/backend/access/transam/xlogutils.c 
b/src/backend/access/transam/xlogutils.c
index 5fbe39133b..f32aac5476 100644
--- a/src/backend/access/transam/xlogutils.c
+++ b/src/backend/access/transam/xlogutils.c
@@ -25,6 +25,10 @@
 #include "access/xlogutils.h"
 #include "miscadmin.h"
 #include "storage/fd.h"
+#ifdef USE_UMBRA
+#include "storage/map.h"
+#include "storage/umbra.h"
+#endif
 #include "storage/smgr.h"
 #include "utils/hsearch.h"
 #include "utils/rel.h"
@@ -77,10 +81,45 @@ typedef struct xl_invalid_page
 
 static HTAB *invalid_page_tab = NULL;
 
+#ifdef USE_UMBRA
+typedef struct xl_missing_metadata_key
+{
+       RelFileLocator locator;         /* relation whose Umbra metadata is 
missing */
+} xl_missing_metadata_key;
+
+typedef struct xl_missing_metadata
+{
+       xl_missing_metadata_key key;    /* hash key ... must be first */
+} xl_missing_metadata;
+
+static HTAB *missing_metadata_tab = NULL;
+#endif
+
 static int     read_local_xlog_page_guts(XLogReaderState *state, XLogRecPtr 
targetPagePtr,
                                                                          int 
reqLen, XLogRecPtr targetRecPtr,
                                                                          char 
*cur_page, bool wait_for_wal);
 
+#ifndef USE_UMBRA
+static XLogRedoAction XLogReadBufferForRedoExtendedMd(XLogReaderState *record,
+                                                                               
                         uint8 block_id,
+                                                                               
                         ReadBufferMode mode,
+                                                                               
                         bool get_cleanup_lock,
+                                                                               
                         Buffer *buf);
+#endif
+#ifdef USE_UMBRA
+static XLogRedoAction XLogReadBufferForRedoExtendedUmbra(XLogReaderState 
*record,
+                                                                               
                                uint8 block_id,
+                                                                               
                                ReadBufferMode mode,
+                                                                               
                                bool get_cleanup_lock,
+                                                                               
                                Buffer *buf);
+static uint8 XLogUmbraMapStateForRedo(SMgrRelation smgr, ForkNumber forknum);
+static bool XLogUmbraEnsureMappedBlockForRedo(RelFileLocator rlocator,
+                                                                               
          ForkNumber forknum,
+                                                                               
          BlockNumber blkno);
+static bool XLogUmbraEnsureMetadataForRedo(RelFileLocator rlocator,
+                                                                               
   ForkNumber forknum);
+#endif
+
 /* Report a reference to an invalid page */
 static void
 report_invalid_page(int elevel, RelFileLocator locator, ForkNumber forkno,
@@ -96,6 +135,16 @@ report_invalid_page(int elevel, RelFileLocator locator, 
ForkNumber forkno,
                         blkno, path.str);
 }
 
+#ifdef USE_UMBRA
+static void
+report_missing_metadata(int elevel, RelFileLocator locator)
+{
+       RelPathStr      path = UmMetadataRelPathPerm(locator);
+
+       elog(elevel, "MAP metadata for relation %s is missing", path.str);
+}
+#endif
+
 /* Log a reference to an invalid page */
 static void
 log_invalid_page(RelFileLocator locator, ForkNumber forkno, BlockNumber blkno,
@@ -160,6 +209,39 @@ log_invalid_page(RelFileLocator locator, ForkNumber 
forkno, BlockNumber blkno,
        }
 }
 
+#ifdef USE_UMBRA
+void
+XLogLogMissingRelationMetadata(RelFileLocator locator)
+{
+       xl_missing_metadata_key key;
+       xl_missing_metadata *hentry;
+       bool            found;
+
+       if (message_level_is_interesting(DEBUG1))
+               report_missing_metadata(DEBUG1, locator);
+
+       if (missing_metadata_tab == NULL)
+       {
+               HASHCTL         ctl;
+
+               ctl.keysize = sizeof(xl_missing_metadata_key);
+               ctl.entrysize = sizeof(xl_missing_metadata);
+
+               missing_metadata_tab = hash_create("XLOG missing-metadata 
table",
+                                                                               
   32,
+                                                                               
   &ctl,
+                                                                               
   HASH_ELEM | HASH_BLOBS);
+       }
+
+       key.locator = locator;
+       hentry = (xl_missing_metadata *)
+               hash_search(missing_metadata_tab, &key, HASH_ENTER, &found);
+
+       (void) hentry;
+       (void) found;
+}
+#endif
+
 /* Forget any invalid pages >= minblkno, because they've been dropped */
 static void
 forget_invalid_pages(RelFileLocator locator, ForkNumber forkno,
@@ -219,6 +301,48 @@ forget_invalid_pages_db(Oid dbid)
        }
 }
 
+#ifdef USE_UMBRA
+static void
+forget_missing_metadata(RelFileLocator locator)
+{
+       xl_missing_metadata_key key;
+
+       if (missing_metadata_tab == NULL)
+               return;
+
+       key.locator = locator;
+       if (hash_search(missing_metadata_tab, &key, HASH_REMOVE, NULL) != NULL)
+               elog(DEBUG2, "MAP metadata for relation %s has been resolved",
+                        UmMetadataRelPathPerm(locator).str);
+}
+
+static void
+forget_missing_metadata_db(Oid dbid)
+{
+       HASH_SEQ_STATUS status;
+       xl_missing_metadata *hentry;
+
+       if (missing_metadata_tab == NULL)
+               return;
+
+       hash_seq_init(&status, missing_metadata_tab);
+
+       while ((hentry = (xl_missing_metadata *) hash_seq_search(&status)) != 
NULL)
+       {
+               if (hentry->key.locator.dbOid == dbid)
+               {
+                       elog(DEBUG2, "MAP metadata for relation %s has been 
resolved",
+                                
UmMetadataRelPathPerm(hentry->key.locator).str);
+
+                       if (hash_search(missing_metadata_tab,
+                                                       &hentry->key,
+                                                       HASH_REMOVE, NULL) == 
NULL)
+                               elog(ERROR, "hash table corrupted");
+               }
+       }
+}
+#endif
+
 /* Are there any unresolved references to invalid pages? */
 bool
 XLogHaveInvalidPages(void)
@@ -226,6 +350,11 @@ XLogHaveInvalidPages(void)
        if (invalid_page_tab != NULL &&
                hash_get_num_entries(invalid_page_tab) > 0)
                return true;
+#ifdef USE_UMBRA
+       if (missing_metadata_tab != NULL &&
+               hash_get_num_entries(missing_metadata_tab) > 0)
+               return true;
+#endif
        return false;
 }
 
@@ -237,21 +366,43 @@ XLogCheckInvalidPages(void)
        xl_invalid_page *hentry;
        bool            foundone = false;
 
+#ifdef USE_UMBRA
+       if (invalid_page_tab == NULL && missing_metadata_tab == NULL)
+#else
        if (invalid_page_tab == NULL)
+#endif
                return;                                 /* nothing to do */
 
-       hash_seq_init(&status, invalid_page_tab);
+       if (invalid_page_tab != NULL)
+       {
+               hash_seq_init(&status, invalid_page_tab);
 
-       /*
-        * Our strategy is to emit WARNING messages for all remaining entries 
and
-        * only PANIC after we've dumped all the available info.
-        */
-       while ((hentry = (xl_invalid_page *) hash_seq_search(&status)) != NULL)
+               /*
+                * Our strategy is to emit WARNING messages for all remaining 
entries and
+                * only PANIC after we've dumped all the available info.
+                */
+               while ((hentry = (xl_invalid_page *) hash_seq_search(&status)) 
!= NULL)
+               {
+                       report_invalid_page(WARNING, hentry->key.locator, 
hentry->key.forkno,
+                                                               
hentry->key.blkno, hentry->present);
+                       foundone = true;
+               }
+       }
+
+#ifdef USE_UMBRA
+       if (missing_metadata_tab != NULL)
        {
-               report_invalid_page(WARNING, hentry->key.locator, 
hentry->key.forkno,
-                                                       hentry->key.blkno, 
hentry->present);
-               foundone = true;
+               HASH_SEQ_STATUS missing_status;
+               xl_missing_metadata *mentry;
+
+               hash_seq_init(&missing_status, missing_metadata_tab);
+               while ((mentry = (xl_missing_metadata *) 
hash_seq_search(&missing_status)) != NULL)
+               {
+                       report_missing_metadata(WARNING, mentry->key.locator);
+                       foundone = true;
+               }
        }
+#endif
 
        if (foundone)
                elog(ignore_invalid_pages ? WARNING : PANIC,
@@ -259,8 +410,99 @@ XLogCheckInvalidPages(void)
 
        hash_destroy(invalid_page_tab);
        invalid_page_tab = NULL;
+
+#ifdef USE_UMBRA
+       if (missing_metadata_tab != NULL)
+       {
+               hash_destroy(missing_metadata_tab);
+               missing_metadata_tab = NULL;
+       }
+#endif
+}
+
+#ifdef USE_UMBRA
+/*
+ * Redo is an owner point for handle-local Umbra MAP state.
+ *
+ * Mapped permanent forks replay under REQUIRE_MAP even if a later restartpoint
+ * cleanup already removed the MAP fork from disk. The only runtime override
+ * is the durable skip-WAL-pending bit stored in the MAP superblock.
+ * INIT/internal/temp forks stay on BYPASS_MAP.
+ */
+static uint8
+XLogUmbraMapStateForRedo(SMgrRelation smgr, ForkNumber forknum)
+{
+       UmbraFileContext *ctx = umfile_ctx_acquire(smgr->smgr_rlocator);
+
+       if (!UmbraForkUsesMapTranslation(forknum) ||
+               forknum == INIT_FORKNUM ||
+               smgrisinternalfork(forknum) ||
+               RelFileLocatorBackendIsTemp(smgr->smgr_rlocator))
+               return UMBRA_MAP_POLICY_BYPASS_MAP;
+
+       if (UmMetadataExists(smgr) &&
+               MapSBlockIsSkipWalPending(ctx, smgr->smgr_rlocator.locator))
+               return UMBRA_MAP_POLICY_SKIP_WAL_PENDING_MAP;
+
+       return UMBRA_MAP_POLICY_REQUIRE_MAP;
+}
+
+/*
+ * For mapped forks, redo must not silently invent local mappings.
+ * Missing mapping source is treated as an invalid-page reference.
+ */
+static bool
+XLogUmbraEnsureMappedBlockForRedo(RelFileLocator rlocator, ForkNumber forknum,
+                                                                 BlockNumber 
blkno)
+{
+       SMgrRelation smgr;
+       BlockNumber     pblkno;
+
+       if (forknum == INIT_FORKNUM || smgrisinternalfork(forknum))
+               return true;
+
+       smgr = smgropen(rlocator, INVALID_PROC_NUMBER);
+
+       if (!UmMetadataExists(smgr))
+       {
+               XLogLogMissingRelationMetadata(rlocator);
+               return false;
+       }
+
+       if (!UmMapTryLookupPblkno(smgr, forknum, blkno, &pblkno))
+       {
+               if (UmMapIsLogicalUnmaterialized(smgr, forknum, blkno))
+                       return true;
+               log_invalid_page(rlocator, forknum, blkno, false);
+               return false;
+       }
+
+       return true;
 }
 
+static bool
+XLogUmbraEnsureMetadataForRedo(RelFileLocator rlocator, ForkNumber forknum)
+{
+       SMgrRelation smgr;
+       uint8           map_state;
+
+       if (forknum == INIT_FORKNUM || smgrisinternalfork(forknum))
+               return true;
+
+       smgr = smgropen(rlocator, INVALID_PROC_NUMBER);
+       map_state = XLogUmbraMapStateForRedo(smgr, forknum);
+       smgrsetmapstate(smgr, map_state);
+       if (map_state != UMBRA_MAP_POLICY_REQUIRE_MAP)
+               return true;
+
+       if (!UmMetadataExists(smgr))
+               smgrcreaterelationmetadata(smgr);
+
+       return true;
+}
+
+#endif
+
 
 /*
  * XLogReadBufferForRedo
@@ -341,6 +583,22 @@ XLogReadBufferForRedoExtended(XLogReaderState *record,
                                                          uint8 block_id,
                                                          ReadBufferMode mode, 
bool get_cleanup_lock,
                                                          Buffer *buf)
+{
+ #ifdef USE_UMBRA
+       return XLogReadBufferForRedoExtendedUmbra(record, block_id, mode,
+                                                                               
          get_cleanup_lock, buf);
+ #else
+       return XLogReadBufferForRedoExtendedMd(record, block_id, mode,
+                                                                               
   get_cleanup_lock, buf);
+ #endif
+}
+
+#ifndef USE_UMBRA
+static XLogRedoAction
+XLogReadBufferForRedoExtendedMd(XLogReaderState *record,
+                                                               uint8 block_id,
+                                                               ReadBufferMode 
mode, bool get_cleanup_lock,
+                                                               Buffer *buf)
 {
        XLogRecPtr      lsn = record->EndRecPtr;
        RelFileLocator rlocator;
@@ -354,15 +612,10 @@ XLogReadBufferForRedoExtended(XLogReaderState *record,
        if (!XLogRecGetBlockTagExtended(record, block_id, &rlocator, &forknum, 
&blkno,
                                                                        
&prefetch_buffer))
        {
-               /* Caller specified a bogus block_id */
                elog(PANIC, "failed to locate backup block with ID %d in WAL 
record",
                         block_id);
        }
 
-       /*
-        * Make sure that if the block is marked with WILL_INIT, the caller is
-        * going to initialize it. And vice versa.
-        */
        zeromode = (mode == RBM_ZERO_AND_LOCK || mode == 
RBM_ZERO_AND_CLEANUP_LOCK);
        willinit = (XLogRecGetBlock(record, block_id)->flags & 
BKPBLOCK_WILL_INIT) != 0;
        if (willinit && !zeromode)
@@ -370,7 +623,6 @@ XLogReadBufferForRedoExtended(XLogReaderState *record,
        if (!willinit && zeromode)
                elog(PANIC, "block to be initialized in redo routine must be 
marked with WILL_INIT flag in the WAL record");
 
-       /* If it has a full-page image and it should be restored, do it. */
        if (XLogRecBlockImageApply(record, block_id))
        {
                Assert(XLogRecHasBlockImage(record, block_id));
@@ -383,49 +635,105 @@ XLogReadBufferForRedoExtended(XLogReaderState *record,
                                        (errcode(ERRCODE_INTERNAL_ERROR),
                                         errmsg_internal("%s", 
record->errormsg_buf)));
 
-               /*
-                * The page may be uninitialized. If so, we can't set the LSN 
because
-                * that would corrupt the page.
-                */
                if (!PageIsNew(page))
-               {
                        PageSetLSN(page, lsn);
-               }
 
                MarkBufferDirty(*buf);
-
-               /*
-                * At the end of crash recovery the init forks of unlogged 
relations
-                * are copied, without going through shared buffers. So we need 
to
-                * force the on-disk state of init forks to always be in sync 
with the
-                * state in shared buffers.
-                */
                if (forknum == INIT_FORKNUM)
                        FlushOneBuffer(*buf);
 
                return BLK_RESTORED;
        }
-       else
+
+       *buf = XLogReadBufferExtended(rlocator, forknum, blkno, mode, 
prefetch_buffer);
+       if (BufferIsValid(*buf))
        {
-               *buf = XLogReadBufferExtended(rlocator, forknum, blkno, mode, 
prefetch_buffer);
-               if (BufferIsValid(*buf))
+               if (mode != RBM_ZERO_AND_LOCK && mode != 
RBM_ZERO_AND_CLEANUP_LOCK)
                {
-                       if (mode != RBM_ZERO_AND_LOCK && mode != 
RBM_ZERO_AND_CLEANUP_LOCK)
-                       {
-                               if (get_cleanup_lock)
-                                       LockBufferForCleanup(*buf);
-                               else
-                                       LockBuffer(*buf, BUFFER_LOCK_EXCLUSIVE);
-                       }
-                       if (lsn <= PageGetLSN(BufferGetPage(*buf)))
-                               return BLK_DONE;
+                       if (get_cleanup_lock)
+                               LockBufferForCleanup(*buf);
                        else
-                               return BLK_NEEDS_REDO;
+                               LockBuffer(*buf, BUFFER_LOCK_EXCLUSIVE);
                }
-               else
-                       return BLK_NOTFOUND;
+               if (lsn <= PageGetLSN(BufferGetPage(*buf)))
+                       return BLK_DONE;
+               return BLK_NEEDS_REDO;
        }
+
+       return BLK_NOTFOUND;
+}
+#endif
+
+#ifdef USE_UMBRA
+static XLogRedoAction
+XLogReadBufferForRedoExtendedUmbra(XLogReaderState *record,
+                                                                  uint8 
block_id,
+                                                                  
ReadBufferMode mode, bool get_cleanup_lock,
+                                                                  Buffer *buf)
+{
+       XLogRecPtr      lsn = record->EndRecPtr;
+       RelFileLocator rlocator;
+       ForkNumber      forknum;
+       BlockNumber blkno;
+       Buffer          prefetch_buffer;
+       Page            page;
+       bool            zeromode;
+       bool            willinit;
+
+       if (!XLogRecGetBlockTagExtended(record, block_id, &rlocator, &forknum, 
&blkno,
+                                                                       
&prefetch_buffer))
+       {
+               elog(PANIC, "failed to locate backup block with ID %d in WAL 
record",
+                        block_id);
+       }
+
+       *buf = InvalidBuffer;
+
+       zeromode = (mode == RBM_ZERO_AND_LOCK || mode == 
RBM_ZERO_AND_CLEANUP_LOCK);
+       willinit = (XLogRecGetBlock(record, block_id)->flags & 
BKPBLOCK_WILL_INIT) != 0;
+       if (willinit && !zeromode)
+               elog(PANIC, "block with WILL_INIT flag in WAL record must be 
zeroed by redo routine");
+       if (!willinit && zeromode)
+               elog(PANIC, "block to be initialized in redo routine must be 
marked with WILL_INIT flag in the WAL record");
+
+       if (!XLogUmbraEnsureMetadataForRedo(rlocator, forknum))
+               return BLK_NOTFOUND;
+
+       if (XLogRecBlockImageApply(record, block_id))
+       {
+               Assert(XLogRecHasBlockImage(record, block_id));
+               *buf = XLogReadBufferExtended(rlocator, forknum, blkno,
+                                                                         
get_cleanup_lock ? RBM_ZERO_AND_CLEANUP_LOCK : RBM_ZERO_AND_LOCK,
+                                                                         
prefetch_buffer);
+               page = BufferGetPage(*buf);
+               if (!RestoreBlockImage(record, block_id, page))
+                       ereport(ERROR,
+                                       (errcode(ERRCODE_INTERNAL_ERROR),
+                                        errmsg_internal("%s", 
record->errormsg_buf)));
+
+               if (!PageIsNew(page))
+                       PageSetLSN(page, lsn);
+
+               MarkBufferDirty(*buf);
+               if (forknum == INIT_FORKNUM)
+                       FlushOneBuffer(*buf);
+
+               return BLK_RESTORED;
+       }
+
+       if (!XLogUmbraEnsureMappedBlockForRedo(rlocator, forknum, blkno))
+               return BLK_NOTFOUND;
+
+       *buf = XLogReadBufferExtended(rlocator, forknum, blkno, mode,
+                                                                 
prefetch_buffer);
+       if (!BufferIsValid(*buf))
+               return BLK_NOTFOUND;
+
+       if (lsn <= PageGetLSN(BufferGetPage(*buf)))
+               return BLK_DONE;
+       return BLK_NEEDS_REDO;
 }
+#endif
 
 /*
  * XLogReadBufferExtended
@@ -630,6 +938,9 @@ void
 XLogDropRelation(RelFileLocator rlocator, ForkNumber forknum)
 {
        forget_invalid_pages(rlocator, forknum, 0);
+#ifdef USE_UMBRA
+       forget_missing_metadata(rlocator);
+#endif
 }
 
 /*
@@ -649,6 +960,9 @@ XLogDropDatabase(Oid dbid)
        smgrdestroyall();
 
        forget_invalid_pages_db(dbid);
+#ifdef USE_UMBRA
+       forget_missing_metadata_db(dbid);
+#endif
 }
 
 /*
diff --git a/src/backend/storage/map/mapsuper.c 
b/src/backend/storage/map/mapsuper.c
index ad4a6f6bdb..3d8909f7a4 100644
--- a/src/backend/storage/map/mapsuper.c
+++ b/src/backend/storage/map/mapsuper.c
@@ -858,7 +858,10 @@ MapSuperPrepareEntryForUpdate(UmbraFileContext *map_ctx, 
RelFileLocator rnode,
                if (status == MAP_SBLOCK_READ_MISSING)
                {
                        if (InRecovery)
+                       {
+                               XLogLogMissingRelationMetadata(rnode);
                                return false;
+                       }
                        elog(ERROR, "%s", missing_errmsg);
                }
 
diff --git a/src/backend/storage/smgr/umbra.c b/src/backend/storage/smgr/umbra.c
index 2baf64defe..917dff0a64 100644
--- a/src/backend/storage/smgr/umbra.c
+++ b/src/backend/storage/smgr/umbra.c
@@ -28,6 +28,7 @@
 
 #include "postgres.h"
 
+#include "access/umbra_xlog.h"
 #include "access/xlog.h"
 #include "access/xlogrecovery.h"
 #include "access/xlogutils.h"
@@ -793,6 +794,12 @@ void
 UmRebuildMapAndSuperblockForSkipWAL(SMgrRelation reln)
 {
        UmbraFileContext *ctx = um_ctx_acquire(reln);
+       xl_umbra_skip_wal_dense_map_entry apply_entries[MAX_FORKNUM + 1];
+       xl_umbra_skip_wal_dense_map_entry wal_entries[MAX_FORKNUM + 1];
+       uint16          apply_count = 0;
+       uint16          wal_count = 0;
+       XLogRecPtr      map_lsn = InvalidXLogRecPtr;
+       bool            wal_insert_enabled;
 
        /*
         * Rebuild assumes the relation stayed on direct lblk==pblk access 
during
@@ -812,27 +819,54 @@ UmRebuildMapAndSuperblockForSkipWAL(SMgrRelation reln)
                        continue;
 
                if (!umfile_exists(ctx, forknum, UMFILE_EXISTS_DENSE))
-               {
-                       MapSBlockSetLogicalNblocks(ctx, 
reln->smgr_rlocator.locator,
-                                                                          
forknum, 0, InvalidXLogRecPtr);
                        continue;
-               }
 
                nblocks = umfile_nblocks(ctx, forknum, UMFILE_NBLOCKS_DENSE);
+               apply_entries[apply_count].forknum = forknum;
+               apply_entries[apply_count].nblocks = nblocks;
+               apply_count++;
+
+               /*
+                * The redo anchor records dense [0, nblocks) mapping. Empty 
forks
+                * don't need an anchor and may correspond to zero-length 
metadata left
+                * by aborted storage operations.
+                */
+               if (nblocks > 0)
+               {
+                       wal_entries[wal_count].forknum = forknum;
+                       wal_entries[wal_count].nblocks = nblocks;
+                       wal_count++;
+               }
+       }
+
+       wal_insert_enabled =
+               XLogInsertAllowed() &&
+               !IsBootstrapProcessingMode() &&
+               !IsInitProcessingMode();
+
+       if (wal_count > 0 && wal_insert_enabled)
+               map_lsn = 
log_umbra_skip_wal_dense_map(reln->smgr_rlocator.locator,
+                                                                               
           wal_count, wal_entries);
+
+       for (uint16 i = 0; i < apply_count; i++)
+       {
+               ForkNumber      forknum = apply_entries[i].forknum;
+               BlockNumber nblocks = apply_entries[i].nblocks;
+               XLogRecPtr      fork_lsn = nblocks > 0 ? map_lsn : 
InvalidXLogRecPtr;
 
                for (BlockNumber lblk = 0; lblk < nblocks; lblk++)
                        MapSetMapping(ctx, reln->smgr_rlocator.locator, forknum,
-                                                 lblk, lblk, 
InvalidXLogRecPtr);
+                                                 lblk, lblk, fork_lsn);
 
                if (nblocks > 0)
                {
                        MapSBlockBumpNextFreePhysBlock(ctx, 
reln->smgr_rlocator.locator,
-                                                                               
   forknum, nblocks, InvalidXLogRecPtr);
+                                                                               
   forknum, nblocks, fork_lsn);
                        MapSBlockBumpPhysicalNblocks(ctx, 
reln->smgr_rlocator.locator,
-                                                                               
 forknum, nblocks, InvalidXLogRecPtr);
+                                                                               
 forknum, nblocks, fork_lsn);
                }
                MapSBlockSetLogicalNblocks(ctx, reln->smgr_rlocator.locator,
-                                                                  forknum, 
nblocks, InvalidXLogRecPtr);
+                                                                  forknum, 
nblocks, fork_lsn);
        }
 }
 
diff --git a/src/bin/pg_waldump/rmgrdesc.c b/src/bin/pg_waldump/rmgrdesc.c
index 931ab8b979..21fc620afa 100644
--- a/src/bin/pg_waldump/rmgrdesc.c
+++ b/src/bin/pg_waldump/rmgrdesc.c
@@ -20,6 +20,9 @@
 #include "access/nbtxlog.h"
 #include "access/rmgr.h"
 #include "access/spgxlog.h"
+#ifdef USE_UMBRA
+#include "access/umbra_xlog.h"
+#endif
 #include "access/xact.h"
 #include "access/xlog_internal.h"
 #include "catalog/storage_xlog.h"
diff --git a/src/include/access/rmgrlist.h b/src/include/access/rmgrlist.h
index ae32ef16d6..3ca7d45ca4 100644
--- a/src/include/access/rmgrlist.h
+++ b/src/include/access/rmgrlist.h
@@ -47,4 +47,7 @@ PG_RMGR(RM_COMMIT_TS_ID, "CommitTs", commit_ts_redo, 
commit_ts_desc, commit_ts_i
 PG_RMGR(RM_REPLORIGIN_ID, "ReplicationOrigin", replorigin_redo, 
replorigin_desc, replorigin_identify, NULL, NULL, NULL, NULL)
 PG_RMGR(RM_GENERIC_ID, "Generic", generic_redo, generic_desc, 
generic_identify, NULL, NULL, generic_mask, NULL)
 PG_RMGR(RM_LOGICALMSG_ID, "LogicalMessage", logicalmsg_redo, logicalmsg_desc, 
logicalmsg_identify, NULL, NULL, NULL, logicalmsg_decode)
+#ifdef USE_UMBRA
+PG_RMGR(RM_UMBRA_ID, "Umbra", umbra_redo, umbra_desc, umbra_identify, NULL, 
NULL, NULL, NULL)
+#endif
 PG_RMGR(RM_XLOG2_ID, "XLOG2", xlog2_redo, xlog2_desc, xlog2_identify, NULL, 
NULL, NULL, xlog2_decode)
diff --git a/src/include/access/umbra_xlog.h b/src/include/access/umbra_xlog.h
new file mode 100644
index 0000000000..cb0c2bac57
--- /dev/null
+++ b/src/include/access/umbra_xlog.h
@@ -0,0 +1,58 @@
+/*-------------------------------------------------------------------------
+ *
+ * umbra_xlog.h
+ *       WAL support for Umbra MAP metadata.
+ *
+ * Umbra logs these record types:
+ * - MAP_SET: establish/switch lblkno -> pblkno mapping
+ * - SKIP_WAL_DENSE_MAP: record non-empty skip-WAL dense lblk==pblk frontiers
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef UMBRA_XLOG_H
+#define UMBRA_XLOG_H
+
+#include "access/xlogreader.h"
+#include "lib/stringinfo.h"
+#include "storage/block.h"
+#include "storage/relfilelocator.h"
+
+/* XLOG gives us high 4 bits */
+#define XLOG_UMBRA_MAP_SET                     0x10
+#define XLOG_UMBRA_SKIP_WAL_DENSE_MAP  0x60
+
+typedef struct xl_umbra_map_set
+{
+       RelFileLocator rlocator;
+       ForkNumber      forknum;
+       BlockNumber lblkno;
+       BlockNumber old_pblkno;
+       BlockNumber new_pblkno;
+} xl_umbra_map_set;
+
+typedef struct xl_umbra_skip_wal_dense_map_entry
+{
+       ForkNumber      forknum;
+       BlockNumber nblocks;
+} xl_umbra_skip_wal_dense_map_entry;
+
+typedef struct xl_umbra_skip_wal_dense_map
+{
+       RelFileLocator rlocator;
+       uint16          count;
+       uint16          padding;
+       xl_umbra_skip_wal_dense_map_entry entries[FLEXIBLE_ARRAY_MEMBER];
+} xl_umbra_skip_wal_dense_map;
+
+extern XLogRecPtr log_umbra_map_set(RelFileLocator rlocator, ForkNumber 
forknum,
+                                                                       
BlockNumber lblkno, BlockNumber old_pblkno,
+                                                                       
BlockNumber new_pblkno);
+extern XLogRecPtr log_umbra_skip_wal_dense_map(RelFileLocator rlocator,
+                                                                               
           uint16 count,
+                                                                               
           const xl_umbra_skip_wal_dense_map_entry *entries);
+
+extern void umbra_redo(XLogReaderState *record);
+extern void umbra_desc(StringInfo buf, XLogReaderState *record);
+extern const char *umbra_identify(uint8 info);
+
+#endif                                                 /* UMBRA_XLOG_H */
diff --git a/src/include/access/xloginsert.h b/src/include/access/xloginsert.h
index 91dfbd5627..073d6e2ee7 100644
--- a/src/include/access/xloginsert.h
+++ b/src/include/access/xloginsert.h
@@ -33,12 +33,16 @@
 #define REGBUF_NO_IMAGE                0x02    /* don't take a full-page image 
*/
 #define REGBUF_WILL_INIT       (0x04 | 0x02)   /* page will be re-initialized 
at
                                                                                
         * replay (implies NO_IMAGE) */
+#define REGBUF_WILL_INIT_BIRTH \
+       (REGBUF_WILL_INIT | REGBUF_LOGICAL_BIRTH)
 #define REGBUF_STANDARD                0x08    /* page follows "standard" page 
layout,
                                                                         * 
(data between pd_lower and pd_upper
                                                                         * will 
be skipped) */
 #define REGBUF_KEEP_DATA       0x10    /* include data even if a full-page 
image
                                                                         * is 
taken */
 #define REGBUF_NO_CHANGE       0x20    /* intentionally register clean buffer 
*/
+#define REGBUF_LOGICAL_BIRTH 0x40 /* this record publishes a logical page
+                                                                        * 
birth/rebirth mapping */
 
 /* prototypes for public functions in xloginsert.c: */
 extern void XLogBeginInsert(void);
diff --git a/src/include/access/xlogrecord.h b/src/include/access/xlogrecord.h
index e8999d3fe9..80764f9a26 100644
--- a/src/include/access/xlogrecord.h
+++ b/src/include/access/xlogrecord.h
@@ -90,6 +90,22 @@ typedef struct XLogRecord
  */
 #define XLR_CHECK_CONSISTENCY  0x02
 
+/*
+ * Legacy Umbra-only record flags formerly used for compact remap encodings.
+ *
+ * New WAL records always use the full remap header. Reader-side code keeps
+ * these bits only to reject unsupported old-format records explicitly.
+ */
+#ifdef USE_UMBRA
+#define XLR_UMBRA_REMAP_FORMAT_MASK            0x0C
+#define XLR_UMBRA_COMPACT_BIRTH_REMAP  0x04
+#define XLR_UMBRA_ORDINARY_SLIM_REMAP  0x08
+#else
+#define XLR_UMBRA_REMAP_FORMAT_MASK            0x00
+#define XLR_UMBRA_COMPACT_BIRTH_REMAP  0x00
+#define XLR_UMBRA_ORDINARY_SLIM_REMAP  0x00
+#endif
+
 /*
  * Header info for block data appended to an XLOG record.
  *
diff --git a/src/include/access/xlogutils.h b/src/include/access/xlogutils.h
index b97387c6d4..a59a3da69f 100644
--- a/src/include/access/xlogutils.h
+++ b/src/include/access/xlogutils.h
@@ -61,6 +61,9 @@ extern PGDLLIMPORT HotStandbyState standbyState;
 
 
 extern bool XLogHaveInvalidPages(void);
+#ifdef USE_UMBRA
+extern void XLogLogMissingRelationMetadata(RelFileLocator locator);
+#endif
 extern void XLogCheckInvalidPages(void);
 
 extern void XLogDropRelation(RelFileLocator rlocator, ForkNumber forknum);
diff --git a/src/include/catalog/storage.h b/src/include/catalog/storage.h
index 70f619a6d6..3666e1d702 100644
--- a/src/include/catalog/storage.h
+++ b/src/include/catalog/storage.h
@@ -32,6 +32,7 @@ extern void RelationTruncate(Relation rel, BlockNumber 
nblocks);
 extern void RelationCopyStorage(SMgrRelation src, SMgrRelation dst,
                                                                ForkNumber 
forkNum, char relpersistence);
 extern bool RelFileLocatorSkippingWAL(RelFileLocator rlocator);
+extern bool RelFileLocatorWasTruncated(RelFileLocator rlocator);
 extern Size EstimatePendingSyncsSpace(void);
 extern void SerializePendingSyncs(Size maxSize, char *startAddress);
 extern void RestorePendingSyncs(char *startAddress);
diff --git a/src/test/recovery/meson.build b/src/test/recovery/meson.build
index 0abe8ff1a1..a527f446f2 100644
--- a/src/test/recovery/meson.build
+++ b/src/test/recovery/meson.build
@@ -63,8 +63,12 @@ tests += {
       't/052_checkpoint_segment_missing.pl',
       't/053_umbra_map_superblock_watermark.pl',
       't/054_umbra_map_fork_policy.pl',
+      't/056_umbra_truncate_superblock.pl',
       't/061_umbra_fsm_vm_map_translation.pl',
+      't/062_umbra_truncate_drop_crash_matrix.pl',
       't/063_umbra_mainfork_head_unlink_checkpoint.pl',
+      't/066_umbra_truncate_redo.pl',
+      't/071_umbra_skip_wal_dense_map.pl',
     ],
   },
 }
diff --git a/src/test/recovery/t/056_umbra_truncate_superblock.pl 
b/src/test/recovery/t/056_umbra_truncate_superblock.pl
new file mode 100644
index 0000000000..db8d34c7a0
--- /dev/null
+++ b/src/test/recovery/t/056_umbra_truncate_superblock.pl
@@ -0,0 +1,82 @@
+# Verify TRUNCATE updates MAP superblock logical_nblocks and survives restart.
+#
+# In md mode, skip this test.
+use strict;
+use warnings;
+
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+plan skip_all => 'requires --with-umbra MAP fork'
+       unless check_pg_config('^#define USE_UMBRA 1$');
+
+sub u32le_from_hex
+{
+       my ($hex, $offset) = @_;
+       my $chunk = substr($hex, $offset * 2, 8);
+       my @b = ($chunk =~ /../g);
+
+       return hex($b[0]) +
+         (hex($b[1]) << 8) +
+         (hex($b[2]) << 16) +
+         (hex($b[3]) << 24);
+}
+
+my $node = PostgreSQL::Test::Cluster->new('master');
+$node->init();
+$node->append_conf(
+       'postgresql.conf', qq{
+autovacuum = off
+});
+$node->start();
+
+$node->safe_psql(
+       'postgres', q{
+CREATE TABLE umb_truncate_t(a int, b text);
+INSERT INTO umb_truncate_t
+SELECT g, repeat('t', 400) FROM generate_series(1, 20000) g;
+CHECKPOINT;
+});
+
+my $map_super_hex = $node->safe_psql(
+       'postgres',
+       q{SELECT 
encode(pg_read_binary_file(pg_relation_filepath('umb_truncate_t') || '_map', 0, 
64, true), 'hex');}
+);
+
+my $logical_before = u32le_from_hex($map_super_hex, 40);
+cmp_ok($logical_before, '>', 0, 'logical_nblocks_main is non-zero before 
TRUNCATE');
+
+$node->safe_psql(
+       'postgres', q{
+TRUNCATE umb_truncate_t;
+CHECKPOINT;
+});
+
+my $logical_size_after = $node->safe_psql(
+       'postgres',
+       q{SELECT pg_relation_size('umb_truncate_t') / 
current_setting('block_size')::int;}
+);
+
+$map_super_hex = $node->safe_psql(
+       'postgres',
+       q{SELECT 
encode(pg_read_binary_file(pg_relation_filepath('umb_truncate_t') || '_map', 0, 
64, true), 'hex');}
+);
+my $logical_after = u32le_from_hex($map_super_hex, 40);
+
+is($logical_size_after, '0', 'relation size is zero blocks after TRUNCATE');
+is($logical_after, 0, 'superblock logical_nblocks_main is zero after 
TRUNCATE');
+
+$node->stop('immediate');
+$node->start();
+
+$map_super_hex = $node->safe_psql(
+       'postgres',
+       q{SELECT 
encode(pg_read_binary_file(pg_relation_filepath('umb_truncate_t') || '_map', 0, 
64, true), 'hex');}
+);
+my $logical_after_restart = u32le_from_hex($map_super_hex, 40);
+
+is($logical_after_restart, 0,
+       'superblock logical_nblocks_main remains zero after restart');
+
+done_testing();
diff --git a/src/test/recovery/t/062_umbra_truncate_drop_crash_matrix.pl 
b/src/test/recovery/t/062_umbra_truncate_drop_crash_matrix.pl
new file mode 100644
index 0000000000..18dd441d5e
--- /dev/null
+++ b/src/test/recovery/t/062_umbra_truncate_drop_crash_matrix.pl
@@ -0,0 +1,108 @@
+# Verify UMBRA truncate/drop behavior across crash restart.
+#
+# Matrix intent:
+# - TRUNCATE result survives crash restart (logical size and superblock 
logical_nblocks)
+# - DROP result survives crash restart
+# - dropped relation MAP fork disappears after a post-restart checkpoint
+#
+# In md mode, skip this test.
+use strict;
+use warnings;
+
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+plan skip_all => 'requires --with-umbra MAP fork'
+       unless check_pg_config('^#define USE_UMBRA 1$');
+
+sub u32le_from_hex
+{
+       my ($hex, $offset) = @_;
+       my $chunk = substr($hex, $offset * 2, 8);
+       my @b = ($chunk =~ /../g);
+
+       return hex($b[0]) +
+         (hex($b[1]) << 8) +
+         (hex($b[2]) << 16) +
+         (hex($b[3]) << 24);
+}
+
+my $node = PostgreSQL::Test::Cluster->new('master');
+$node->init();
+$node->append_conf(
+       'postgresql.conf', qq{
+autovacuum = off
+});
+$node->start();
+
+ $node->safe_psql(
+       'postgres', q{
+CREATE TABLE umb_mx_trunc_t(id int, payload text);
+INSERT INTO umb_mx_trunc_t
+SELECT g, repeat('a', 500) FROM generate_series(1, 18000) g;
+CREATE TABLE umb_mx_drop_t(id int, payload text);
+INSERT INTO umb_mx_drop_t
+SELECT g, repeat('b', 500) FROM generate_series(1, 18000) g;
+SELECT 
COALESCE(encode(pg_read_binary_file(pg_relation_filepath('umb_mx_trunc_t') || 
'_map', 0, 1, true), 'hex'), '') <> '';
+});
+
+cmp_ok(
+       $node->safe_psql(
+               'postgres',
+               q{SELECT pg_relation_size('umb_mx_trunc_t') / 
current_setting('block_size')::int;}),
+       '>',
+       0,
+       'truncate relation logical size is non-zero before TRUNCATE');
+
+my $drop_map_path = $node->safe_psql(
+       'postgres',
+       q{SELECT pg_relation_filepath('umb_mx_drop_t') || '_map';}
+);
+
+$node->safe_psql(
+       'postgres', q{
+TRUNCATE umb_mx_trunc_t;
+DROP TABLE umb_mx_drop_t;
+});
+
+$node->stop('immediate');
+$node->start();
+
+is($node->safe_psql('postgres', q{SELECT count(*) FROM umb_mx_trunc_t;}), '0',
+       'TRUNCATE result survives crash restart');
+is($node->safe_psql(
+               'postgres',
+               q{SELECT pg_relation_size('umb_mx_trunc_t') / 
current_setting('block_size')::int;}),
+       '0',
+       'truncated relation logical size is zero blocks after restart');
+
+my $trunc_map_hex_after = $node->safe_psql(
+       'postgres',
+       q{SELECT 
encode(pg_read_binary_file(pg_relation_filepath('umb_mx_trunc_t') || '_map', 0, 
64, true), 'hex');}
+);
+my $trunc_logical_after = u32le_from_hex($trunc_map_hex_after, 40);
+is($trunc_logical_after, 0,
+       'superblock logical_nblocks_main remains zero after crash restart');
+
+is($node->safe_psql(
+               'postgres',
+               q{SELECT count(*) FROM pg_class WHERE relname = 
'umb_mx_drop_t';}),
+       '0',
+       'DROP result survives crash restart');
+
+$node->safe_psql('postgres', q{CHECKPOINT;});
+ok($node->poll_query_until('postgres',
+               "SELECT COALESCE(encode(pg_read_binary_file('$drop_map_path', 
0, 1, true), 'hex'), '') = '';",
+               't'),
+       'dropped relation MAP fork disappears after post-restart checkpoint');
+
+$node->safe_psql(
+       'postgres', q{
+INSERT INTO umb_mx_trunc_t
+SELECT g, repeat('c', 300) FROM generate_series(1, 1000) g;
+});
+is($node->safe_psql('postgres', q{SELECT count(*) FROM umb_mx_trunc_t;}), 
'1000',
+       'truncated relation remains writable after crash restart');
+
+done_testing();
diff --git a/src/test/recovery/t/066_umbra_truncate_redo.pl 
b/src/test/recovery/t/066_umbra_truncate_redo.pl
new file mode 100644
index 0000000000..5cef03be87
--- /dev/null
+++ b/src/test/recovery/t/066_umbra_truncate_redo.pl
@@ -0,0 +1,64 @@
+# Copyright (c) 2026, PostgreSQL Global Development Group
+
+use strict;
+use warnings FATAL => 'all';
+
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+plan skip_all => 'requires --with-umbra MAP fork'
+       unless check_pg_config('^#define USE_UMBRA 1$');
+
+my $node = PostgreSQL::Test::Cluster->new('umbra_truncate');
+
+$node->init();
+$node->append_conf(
+       'postgresql.conf', qq[
+wal_level = 'replica'
+autovacuum = off
+]);
+$node->start();
+
+$node->safe_psql(
+       'postgres', q[
+CREATE TABLE umbra_trunc(i int);
+INSERT INTO umbra_trunc
+SELECT generate_series(1, 1000);
+CHECKPOINT;
+TRUNCATE umbra_trunc;
+INSERT INTO umbra_trunc
+SELECT generate_series(1, 10);
+UPDATE umbra_trunc
+SET i = i + 100;
+]);
+
+$node->stop('immediate');
+ok($node->start(), 'restart after truncate crash');
+
+is($node->safe_psql('postgres',
+               'SELECT count(*), sum(i), min(i), max(i) FROM umbra_trunc'),
+       '10|1055|101|110',
+       'truncate redo preserved only post-truncate rows');
+
+# Exercise normal mapped writes after crash recovery.  The table should no
+# longer behave as if its logical size were 0, and follow-up restart should
+# keep both the recovered rows and the new rows.
+$node->safe_psql(
+       'postgres', q[
+UPDATE umbra_trunc
+SET i = i + 1000
+WHERE i <= 105;
+INSERT INTO umbra_trunc VALUES (9999);
+CHECKPOINT;
+]);
+
+$node->stop('immediate');
+ok($node->start(), 'restart after post-recovery writes');
+
+is($node->safe_psql('postgres',
+               'SELECT count(*), sum(i), min(i), max(i) FROM umbra_trunc'),
+       '11|16054|106|9999',
+       'post-recovery mapped writes survived second restart');
+
+done_testing();
diff --git a/src/test/recovery/t/071_umbra_skip_wal_dense_map.pl 
b/src/test/recovery/t/071_umbra_skip_wal_dense_map.pl
new file mode 100644
index 0000000000..a7ca06dd5b
--- /dev/null
+++ b/src/test/recovery/t/071_umbra_skip_wal_dense_map.pl
@@ -0,0 +1,65 @@
+# Copyright (c) 2026, PostgreSQL Global Development Group
+
+use strict;
+use warnings FATAL => 'all';
+
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+plan skip_all => 'requires --with-umbra MAP fork'
+       unless check_pg_config('^#define USE_UMBRA 1$');
+
+my $node = PostgreSQL::Test::Cluster->new('umbra_skip_wal_dense_map');
+
+$node->init;
+$node->append_conf(
+       'postgresql.conf', qq[
+wal_level = 'minimal'
+autovacuum = off
+shared_buffers = '256MB'
+max_wal_size = '4GB'
+min_wal_size = '1GB'
+checkpoint_timeout = '1h'
+]);
+$node->start();
+
+my $start_lsn =
+  $node->safe_psql('postgres', q[SELECT pg_current_wal_lsn();]);
+
+$node->safe_psql('postgres', q[
+CREATE TABLE umbra_skipwal_dense AS
+SELECT g::bigint AS id, repeat('x', 200) AS pad
+FROM generate_series(1, 50000) AS g;
+]);
+
+my $count =
+  $node->safe_psql('postgres', q[SELECT count(*) FROM umbra_skipwal_dense;]);
+is($count, '50000', 'skip-WAL-created relation is readable before restart');
+
+my $end_lsn =
+  $node->safe_psql('postgres', q[SELECT pg_current_wal_lsn();]);
+
+my ($dump_stdout, $dump_stderr) = run_command(
+       [
+               'pg_waldump', '-p', $node->data_dir . '/pg_wal',
+               '--start',   $start_lsn,
+               '--end',     $end_lsn
+       ]);
+is($dump_stderr, '', 'pg_waldump raw dump completed without stderr');
+
+my @dense_lines =
+  grep { /desc: SKIP_WAL_DENSE_MAP/ }
+  split /\n/, $dump_stdout;
+ok(@dense_lines > 0,
+   'raw WAL dump contains skip-WAL dense MAP records');
+
+my @main_dense_lines =
+  grep { /fork 0 nblocks ([1-9][0-9]*)/ }
+  @dense_lines;
+ok(@main_dense_lines > 0,
+   'skip-WAL dense MAP record carries concrete MAIN fork nblocks');
+
+$node->stop();
+
+done_testing();
-- 
2.50.1 (Apple Git-155)



Reply via email to