Hi,
On Mon, Oct 14, 2024 at 09:57:22AM +1100, Peter Smith wrote:
> Here are some minor review comments for v15-0002.
>
> ======
> contrib/pg_logicalinspect/pg_logicalinspect.c
>
> 1.
> +pg_get_logical_snapshot_meta(PG_FUNCTION_ARGS)
> +{
> +#define PG_GET_LOGICAL_SNAPSHOT_META_COLS 3
> + SnapBuildOnDisk ondisk;
> + HeapTuple tuple;
> + Datum values[PG_GET_LOGICAL_SNAPSHOT_META_COLS] = {0};
> + bool nulls[PG_GET_LOGICAL_SNAPSHOT_META_COLS] = {0};
> + TupleDesc tupdesc;
> + char path[MAXPGPATH];
> + int i = 0;
> + text *filename_t = PG_GETARG_TEXT_PP(0);
> +
> + sprintf(path, "%s/%s",
> + PG_LOGICAL_SNAPSHOTS_DIR,
> + text_to_cstring(filename_t));
> +
> + /* Build a tuple descriptor for our result type */
> + if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
> + elog(ERROR, "return type must be a row type");
> +
> + /* Validate and restore the snapshot to 'ondisk' */
> + SnapBuildRestoreSnapshot(&ondisk, path, CurrentMemoryContext, false);
>
> The sprintf should be deferred. Could you do it after the ERROR check?
I think that makes sense, done in v16 attached.
> ======
> src/backend/replication/logical/snapbuild.c
>
> 3.
> /*
> - * Restore a snapshot into 'builder' if previously one has been stored at the
> - * location indicated by 'lsn'. Returns true if successful, false otherwise.
> + * Restore the logical snapshot file contents to 'ondisk'.
> + *
> + * If 'missing_ok' is true, will not throw an error if the file is not found.
> + * 'context' is the memory context where the catalog modifying/committed xid
> + * will live.
> */
> -static bool
> -SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn)
> +bool
> +SnapBuildRestoreSnapshot(SnapBuildOnDisk *ondisk, const char *path,
> + MemoryContext context, bool missing_ok)
>
> nit - I think it's better to describe parameters in the same order
> that they are declared.
Done in v16.
> Also, include a 'path' description, so it is
> not the only one omitted.
I don't think that's worth it as self explanatory IMHO.
Regards,
--
Bertrand Drouvot
PostgreSQL Contributors Team
RDS Open Source Databases
Amazon Web Services: https://aws.amazon.com
>From ef5997544afd9f414648e5ccae6c0d08c5ab66fa Mon Sep 17 00:00:00 2001
From: Masahiko Sawada <[email protected]>
Date: Fri, 11 Oct 2024 16:23:14 -0700
Subject: [PATCH v16 1/2] Move SnapBuild and SnapBuildOnDisk structs to
snapshot_internal.h
This commit moves the definitions of the SnapBuild and SnapBuildOnDisk
structs, related to logical snapshots, to the snapshot_internal.h
file. This change allows external tools, such as
pg_logicalinspect (with an upcoming patch), to access and utilize the
contents of logical snapshots.
Author: Bertrand Drouvot
Reviewed-by: Amit Kapila, Shveta Malik, Peter Smith
Discussion: https://postgr.es/m/ZscuZ92uGh3wm4tW%40ip-10-97-1-34.eu-west-3.compute.internal
---
src/backend/replication/logical/snapbuild.c | 175 +----------------
src/include/replication/snapbuild.h | 2 +-
src/include/replication/snapbuild_internal.h | 196 +++++++++++++++++++
3 files changed, 198 insertions(+), 175 deletions(-)
46.4% src/backend/replication/logical/
53.5% src/include/replication/
diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c
index 0450f94ba8..b9df8c0a02 100644
--- a/src/backend/replication/logical/snapbuild.c
+++ b/src/backend/replication/logical/snapbuild.c
@@ -134,6 +134,7 @@
#include "replication/logical.h"
#include "replication/reorderbuffer.h"
#include "replication/snapbuild.h"
+#include "replication/snapbuild_internal.h"
#include "storage/fd.h"
#include "storage/lmgr.h"
#include "storage/proc.h"
@@ -143,146 +144,6 @@
#include "utils/memutils.h"
#include "utils/snapmgr.h"
#include "utils/snapshot.h"
-
-/*
- * This struct contains the current state of the snapshot building
- * machinery. Besides a forward declaration in the header, it is not exposed
- * to the public, so we can easily change its contents.
- */
-struct SnapBuild
-{
- /* how far are we along building our first full snapshot */
- SnapBuildState state;
-
- /* private memory context used to allocate memory for this module. */
- MemoryContext context;
-
- /* all transactions < than this have committed/aborted */
- TransactionId xmin;
-
- /* all transactions >= than this are uncommitted */
- TransactionId xmax;
-
- /*
- * Don't replay commits from an LSN < this LSN. This can be set externally
- * but it will also be advanced (never retreat) from within snapbuild.c.
- */
- XLogRecPtr start_decoding_at;
-
- /*
- * LSN at which two-phase decoding was enabled or LSN at which we found a
- * consistent point at the time of slot creation.
- *
- * The prepared transactions, that were skipped because previously
- * two-phase was not enabled or are not covered by initial snapshot, need
- * to be sent later along with commit prepared and they must be before
- * this point.
- */
- XLogRecPtr two_phase_at;
-
- /*
- * Don't start decoding WAL until the "xl_running_xacts" information
- * indicates there are no running xids with an xid smaller than this.
- */
- TransactionId initial_xmin_horizon;
-
- /* Indicates if we are building full snapshot or just catalog one. */
- bool building_full_snapshot;
-
- /*
- * Indicates if we are using the snapshot builder for the creation of a
- * logical replication slot. If it's true, the start point for decoding
- * changes is not determined yet. So we skip snapshot restores to properly
- * find the start point. See SnapBuildFindSnapshot() for details.
- */
- bool in_slot_creation;
-
- /*
- * Snapshot that's valid to see the catalog state seen at this moment.
- */
- Snapshot snapshot;
-
- /*
- * LSN of the last location we are sure a snapshot has been serialized to.
- */
- XLogRecPtr last_serialized_snapshot;
-
- /*
- * The reorderbuffer we need to update with usable snapshots et al.
- */
- ReorderBuffer *reorder;
-
- /*
- * TransactionId at which the next phase of initial snapshot building will
- * happen. InvalidTransactionId if not known (i.e. SNAPBUILD_START), or
- * when no next phase necessary (SNAPBUILD_CONSISTENT).
- */
- TransactionId next_phase_at;
-
- /*
- * Array of transactions which could have catalog changes that committed
- * between xmin and xmax.
- */
- struct
- {
- /* number of committed transactions */
- size_t xcnt;
-
- /* available space for committed transactions */
- size_t xcnt_space;
-
- /*
- * Until we reach a CONSISTENT state, we record commits of all
- * transactions, not just the catalog changing ones. Record when that
- * changes so we know we cannot export a snapshot safely anymore.
- */
- bool includes_all_transactions;
-
- /*
- * Array of committed transactions that have modified the catalog.
- *
- * As this array is frequently modified we do *not* keep it in
- * xidComparator order. Instead we sort the array when building &
- * distributing a snapshot.
- *
- * TODO: It's unclear whether that reasoning has much merit. Every
- * time we add something here after becoming consistent will also
- * require distributing a snapshot. Storing them sorted would
- * potentially also make it easier to purge (but more complicated wrt
- * wraparound?). Should be improved if sorting while building the
- * snapshot shows up in profiles.
- */
- TransactionId *xip;
- } committed;
-
- /*
- * Array of transactions and subtransactions that had modified catalogs
- * and were running when the snapshot was serialized.
- *
- * We normally rely on some WAL record types such as HEAP2_NEW_CID to know
- * if the transaction has changed the catalog. But it could happen that
- * the logical decoding decodes only the commit record of the transaction
- * after restoring the previously serialized snapshot in which case we
- * will miss adding the xid to the snapshot and end up looking at the
- * catalogs with the wrong snapshot.
- *
- * Now to avoid the above problem, we serialize the transactions that had
- * modified the catalogs and are still running at the time of snapshot
- * serialization. We fill this array while restoring the snapshot and then
- * refer it while decoding commit to ensure if the xact has modified the
- * catalog. We discard this array when all the xids in the list become old
- * enough to matter. See SnapBuildPurgeOlderTxn for details.
- */
- struct
- {
- /* number of transactions */
- size_t xcnt;
-
- /* This array must be sorted in xidComparator order */
- TransactionId *xip;
- } catchange;
-};
-
/*
* Starting a transaction -- which we need to do while exporting a snapshot --
* removes knowledge about the previously used resowner, so we save it here.
@@ -1557,40 +1418,6 @@ SnapBuildWaitSnapshot(xl_running_xacts *running, TransactionId cutoff)
}
}
-/* -----------------------------------
- * Snapshot serialization support
- * -----------------------------------
- */
-
-/*
- * We store current state of struct SnapBuild on disk in the following manner:
- *
- * struct SnapBuildOnDisk;
- * TransactionId * committed.xcnt; (*not xcnt_space*)
- * TransactionId * catchange.xcnt;
- *
- */
-typedef struct SnapBuildOnDisk
-{
- /* first part of this struct needs to be version independent */
-
- /* data not covered by checksum */
- uint32 magic;
- pg_crc32c checksum;
-
- /* data covered by checksum */
-
- /* version, in case we want to support pg_upgrade */
- uint32 version;
- /* how large is the on disk data, excluding the constant sized part */
- uint32 length;
-
- /* version dependent part */
- SnapBuild builder;
-
- /* variable amount of TransactionIds follows */
-} SnapBuildOnDisk;
-
#define SnapBuildOnDiskConstantSize \
offsetof(SnapBuildOnDisk, builder)
#define SnapBuildOnDiskNotChecksummedSize \
diff --git a/src/include/replication/snapbuild.h b/src/include/replication/snapbuild.h
index caa5113ff8..dbb4bc2f4b 100644
--- a/src/include/replication/snapbuild.h
+++ b/src/include/replication/snapbuild.h
@@ -46,7 +46,7 @@ typedef enum
SNAPBUILD_CONSISTENT = 2,
} SnapBuildState;
-/* forward declare so we don't have to expose the struct to the public */
+/* forward declare so we don't have to include snapbuild_internal.h */
struct SnapBuild;
typedef struct SnapBuild SnapBuild;
diff --git a/src/include/replication/snapbuild_internal.h b/src/include/replication/snapbuild_internal.h
new file mode 100644
index 0000000000..03719ccf2a
--- /dev/null
+++ b/src/include/replication/snapbuild_internal.h
@@ -0,0 +1,196 @@
+/*-------------------------------------------------------------------------
+ *
+ * snapbuild_internal.h
+ * This file contains declarations for logical decoding utility
+ * functions for internal use.
+ *
+ * Copyright (c) 2024, PostgreSQL Global Development Group
+ *
+ * src/include/replication/snapbuild_internal.h
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#ifndef SNAPBUILD_INTERNAL_H
+#define SNAPBUILD_INTERNAL_H
+
+#include "port/pg_crc32c.h"
+#include "replication/reorderbuffer.h"
+#include "replication/snapbuild.h"
+
+/*
+ * This struct contains the current state of the snapshot building
+ * machinery. It is exposed to the public, so pay attention when changing its
+ * contents.
+ */
+typedef struct SnapBuild
+{
+ /* how far are we along building our first full snapshot */
+ SnapBuildState state;
+
+ /* private memory context used to allocate memory for this module. */
+ MemoryContext context;
+
+ /* all transactions < than this have committed/aborted */
+ TransactionId xmin;
+
+ /* all transactions >= than this are uncommitted */
+ TransactionId xmax;
+
+ /*
+ * Don't replay commits from an LSN < this LSN. This can be set externally
+ * but it will also be advanced (never retreat) from within snapbuild.c.
+ */
+ XLogRecPtr start_decoding_at;
+
+ /*
+ * LSN at which two-phase decoding was enabled or LSN at which we found a
+ * consistent point at the time of slot creation.
+ *
+ * The prepared transactions, that were skipped because previously
+ * two-phase was not enabled or are not covered by initial snapshot, need
+ * to be sent later along with commit prepared and they must be before
+ * this point.
+ */
+ XLogRecPtr two_phase_at;
+
+ /*
+ * Don't start decoding WAL until the "xl_running_xacts" information
+ * indicates there are no running xids with an xid smaller than this.
+ */
+ TransactionId initial_xmin_horizon;
+
+ /* Indicates if we are building full snapshot or just catalog one. */
+ bool building_full_snapshot;
+
+ /*
+ * Indicates if we are using the snapshot builder for the creation of a
+ * logical replication slot. If it's true, the start point for decoding
+ * changes is not determined yet. So we skip snapshot restores to properly
+ * find the start point. See SnapBuildFindSnapshot() for details.
+ */
+ bool in_slot_creation;
+
+ /*
+ * Snapshot that's valid to see the catalog state seen at this moment.
+ */
+ Snapshot snapshot;
+
+ /*
+ * LSN of the last location we are sure a snapshot has been serialized to.
+ */
+ XLogRecPtr last_serialized_snapshot;
+
+ /*
+ * The reorderbuffer we need to update with usable snapshots et al.
+ */
+ ReorderBuffer *reorder;
+
+ /*
+ * TransactionId at which the next phase of initial snapshot building will
+ * happen. InvalidTransactionId if not known (i.e. SNAPBUILD_START), or
+ * when no next phase necessary (SNAPBUILD_CONSISTENT).
+ */
+ TransactionId next_phase_at;
+
+ /*
+ * Array of transactions which could have catalog changes that committed
+ * between xmin and xmax.
+ */
+ struct
+ {
+ /* number of committed transactions */
+ size_t xcnt;
+
+ /* available space for committed transactions */
+ size_t xcnt_space;
+
+ /*
+ * Until we reach a CONSISTENT state, we record commits of all
+ * transactions, not just the catalog changing ones. Record when that
+ * changes so we know we cannot export a snapshot safely anymore.
+ */
+ bool includes_all_transactions;
+
+ /*
+ * Array of committed transactions that have modified the catalog.
+ *
+ * As this array is frequently modified we do *not* keep it in
+ * xidComparator order. Instead we sort the array when building &
+ * distributing a snapshot.
+ *
+ * TODO: It's unclear whether that reasoning has much merit. Every
+ * time we add something here after becoming consistent will also
+ * require distributing a snapshot. Storing them sorted would
+ * potentially also make it easier to purge (but more complicated wrt
+ * wraparound?). Should be improved if sorting while building the
+ * snapshot shows up in profiles.
+ */
+ TransactionId *xip;
+ } committed;
+
+ /*
+ * Array of transactions and subtransactions that had modified catalogs
+ * and were running when the snapshot was serialized.
+ *
+ * We normally rely on some WAL record types such as HEAP2_NEW_CID to know
+ * if the transaction has changed the catalog. But it could happen that
+ * the logical decoding decodes only the commit record of the transaction
+ * after restoring the previously serialized snapshot in which case we
+ * will miss adding the xid to the snapshot and end up looking at the
+ * catalogs with the wrong snapshot.
+ *
+ * Now to avoid the above problem, we serialize the transactions that had
+ * modified the catalogs and are still running at the time of snapshot
+ * serialization. We fill this array while restoring the snapshot and then
+ * refer it while decoding commit to ensure if the xact has modified the
+ * catalog. We discard this array when all the xids in the list become old
+ * enough to matter. See SnapBuildPurgeOlderTxn for details.
+ */
+ struct
+ {
+ /* number of transactions */
+ size_t xcnt;
+
+ /* This array must be sorted in xidComparator order */
+ TransactionId *xip;
+ } catchange;
+} SnapBuild;
+
+/* -----------------------------------
+ * Snapshot serialization support
+ * -----------------------------------
+ */
+
+/*
+ * We store current state of struct SnapBuild on disk in the following manner:
+ *
+ * struct SnapBuildOnDisk;
+ * TransactionId * committed.xcnt; (*not xcnt_space*)
+ * TransactionId * catchange.xcnt;
+ *
+ * Check if the SnapBuildOnDiskConstantSize and SnapBuildOnDiskNotChecksummedSize
+ * macros need to be updated when modifying the SnapBuildOnDisk struct.
+ */
+typedef struct SnapBuildOnDisk
+{
+ /* first part of this struct needs to be version independent */
+
+ /* data not covered by checksum */
+ uint32 magic;
+ pg_crc32c checksum;
+
+ /* data covered by checksum */
+
+ /* version, in case we want to support pg_upgrade */
+ uint32 version;
+ /* how large is the on disk data, excluding the constant sized part */
+ uint32 length;
+
+ /* version dependent part */
+ SnapBuild builder;
+
+ /* variable amount of TransactionIds follows */
+} SnapBuildOnDisk;
+
+#endif /* SNAPBUILD_INTERNAL_H */
--
2.34.1
>From 9f9e2ed520d0e9315dc16521f2f7d1e26fc50cb7 Mon Sep 17 00:00:00 2001
From: Masahiko Sawada <[email protected]>
Date: Fri, 11 Oct 2024 16:24:14 -0700
Subject: [PATCH v16 2/2] Add contrib/pg_logicalinspect.
This module provides SQL functions that allow to inspect logical
decoding components.
It currently allows to inspect the contents of serialized logical
snapshots of a running database cluster, which is useful for debugging
or educational purposes.
Author: Bertrand Drouvot
Reviewed-by: Amit Kapila, Shveta Malik, Peter Smith, Peter Eisentraut
Reviewed-by: David G. Johnston
Discussion: https://postgr.es/m/ZscuZ92uGh3wm4tW%40ip-10-97-1-34.eu-west-3.compute.internal
---
contrib/Makefile | 1 +
contrib/meson.build | 1 +
contrib/pg_logicalinspect/.gitignore | 6 +
contrib/pg_logicalinspect/Makefile | 31 ++++
.../expected/logical_inspect.out | 52 ++++++
contrib/pg_logicalinspect/logicalinspect.conf | 1 +
contrib/pg_logicalinspect/meson.build | 39 ++++
.../pg_logicalinspect--1.0.sql | 43 +++++
contrib/pg_logicalinspect/pg_logicalinspect.c | 167 ++++++++++++++++++
.../pg_logicalinspect.control | 5 +
.../specs/logical_inspect.spec | 34 ++++
doc/src/sgml/contrib.sgml | 1 +
doc/src/sgml/filelist.sgml | 1 +
doc/src/sgml/pglogicalinspect.sgml | 143 +++++++++++++++
src/backend/replication/logical/snapbuild.c | 99 +++++++----
src/backend/utils/adt/arrayfuncs.c | 6 +
src/include/replication/snapbuild.h | 4 +
src/include/replication/snapbuild_internal.h | 3 +
18 files changed, 598 insertions(+), 39 deletions(-)
12.0% contrib/pg_logicalinspect/expected/
8.4% contrib/pg_logicalinspect/specs/
40.8% contrib/pg_logicalinspect/
21.9% doc/src/sgml/
14.5% src/backend/replication/logical/
diff --git a/contrib/Makefile b/contrib/Makefile
index abd780f277..952855d9b6 100644
--- a/contrib/Makefile
+++ b/contrib/Makefile
@@ -32,6 +32,7 @@ SUBDIRS = \
passwordcheck \
pg_buffercache \
pg_freespacemap \
+ pg_logicalinspect \
pg_prewarm \
pg_stat_statements \
pg_surgery \
diff --git a/contrib/meson.build b/contrib/meson.build
index 14a8906865..159ff41555 100644
--- a/contrib/meson.build
+++ b/contrib/meson.build
@@ -46,6 +46,7 @@ subdir('passwordcheck')
subdir('pg_buffercache')
subdir('pgcrypto')
subdir('pg_freespacemap')
+subdir('pg_logicalinspect')
subdir('pg_prewarm')
subdir('pgrowlocks')
subdir('pg_stat_statements')
diff --git a/contrib/pg_logicalinspect/.gitignore b/contrib/pg_logicalinspect/.gitignore
new file mode 100644
index 0000000000..b4903eba65
--- /dev/null
+++ b/contrib/pg_logicalinspect/.gitignore
@@ -0,0 +1,6 @@
+# Generated subdirectories
+/log/
+/results/
+/output_iso/
+/tmp_check/
+/tmp_check_iso/
diff --git a/contrib/pg_logicalinspect/Makefile b/contrib/pg_logicalinspect/Makefile
new file mode 100644
index 0000000000..55124514d4
--- /dev/null
+++ b/contrib/pg_logicalinspect/Makefile
@@ -0,0 +1,31 @@
+# contrib/pg_logicalinspect/Makefile
+
+MODULE_big = pg_logicalinspect
+OBJS = \
+ $(WIN32RES) \
+ pg_logicalinspect.o
+PGFILEDESC = "pg_logicalinspect - functions to inspect logical decoding components"
+
+EXTENSION = pg_logicalinspect
+DATA = pg_logicalinspect--1.0.sql
+
+EXTRA_INSTALL = contrib/test_decoding
+
+ISOLATION = logical_inspect
+
+ISOLATION_OPTS = --temp-config $(top_srcdir)/contrib/pg_logicalinspect/logicalinspect.conf
+
+# Disabled because these tests require "wal_level=logical", which
+# some installcheck users do not have (e.g. buildfarm clients).
+NO_INSTALLCHECK = 1
+
+ifdef USE_PGXS
+PG_CONFIG = pg_config
+PGXS := $(shell $(PG_CONFIG) --pgxs)
+include $(PGXS)
+else
+subdir = contrib/pg_logicalinspect
+top_builddir = ../..
+include $(top_builddir)/src/Makefile.global
+include $(top_srcdir)/contrib/contrib-global.mk
+endif
diff --git a/contrib/pg_logicalinspect/expected/logical_inspect.out b/contrib/pg_logicalinspect/expected/logical_inspect.out
new file mode 100644
index 0000000000..d95efa4d1e
--- /dev/null
+++ b/contrib/pg_logicalinspect/expected/logical_inspect.out
@@ -0,0 +1,52 @@
+Parsed test spec with 2 sessions
+
+starting permutation: s0_init s0_begin s0_savepoint s0_truncate s1_checkpoint s1_get_changes s0_commit s0_begin s0_insert s1_checkpoint s1_get_changes s0_commit s1_get_changes s1_get_logical_snapshot_info s1_get_logical_snapshot_meta
+step s0_init: SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding');
+?column?
+--------
+init
+(1 row)
+
+step s0_begin: BEGIN;
+step s0_savepoint: SAVEPOINT sp1;
+step s0_truncate: TRUNCATE tbl1;
+step s1_checkpoint: CHECKPOINT;
+step s1_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0');
+data
+----
+(0 rows)
+
+step s0_commit: COMMIT;
+step s0_begin: BEGIN;
+step s0_insert: INSERT INTO tbl1 VALUES (1);
+step s1_checkpoint: CHECKPOINT;
+step s1_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0');
+data
+---------------------------------------
+BEGIN
+table public.tbl1: TRUNCATE: (no-flags)
+COMMIT
+(3 rows)
+
+step s0_commit: COMMIT;
+step s1_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0');
+data
+-------------------------------------------------------------
+BEGIN
+table public.tbl1: INSERT: val1[integer]:1 val2[integer]:null
+COMMIT
+(3 rows)
+
+step s1_get_logical_snapshot_info: SELECT info.state, info.catchange_count, array_length(info.catchange_xip,1) AS catchange_array_length, info.committed_count, array_length(info.committed_xip,1) AS committed_array_length FROM pg_ls_logicalsnapdir(), pg_get_logical_snapshot_info(name) AS info ORDER BY 2;
+state |catchange_count|catchange_array_length|committed_count|committed_array_length
+----------+---------------+----------------------+---------------+----------------------
+consistent| 0| | 2| 2
+consistent| 2| 2| 0|
+(2 rows)
+
+step s1_get_logical_snapshot_meta: SELECT COUNT(meta.*) from pg_ls_logicalsnapdir(), pg_get_logical_snapshot_meta(name) as meta;
+count
+-----
+ 2
+(1 row)
+
diff --git a/contrib/pg_logicalinspect/logicalinspect.conf b/contrib/pg_logicalinspect/logicalinspect.conf
new file mode 100644
index 0000000000..e3d257315f
--- /dev/null
+++ b/contrib/pg_logicalinspect/logicalinspect.conf
@@ -0,0 +1 @@
+wal_level = logical
diff --git a/contrib/pg_logicalinspect/meson.build b/contrib/pg_logicalinspect/meson.build
new file mode 100644
index 0000000000..3ec635509b
--- /dev/null
+++ b/contrib/pg_logicalinspect/meson.build
@@ -0,0 +1,39 @@
+# Copyright (c) 2024, PostgreSQL Global Development Group
+
+pg_logicalinspect_sources = files('pg_logicalinspect.c')
+
+if host_system == 'windows'
+ pg_logicalinspect_sources += rc_lib_gen.process(win32ver_rc, extra_args: [
+ '--NAME', 'pg_logicalinspect',
+ '--FILEDESC', 'pg_logicalinspect - functions to inspect logical decoding components',])
+endif
+
+pg_logicalinspect = shared_module('pg_logicalinspect',
+ pg_logicalinspect_sources,
+ kwargs: contrib_mod_args + {
+ 'dependencies': contrib_mod_args['dependencies'],
+ },
+)
+contrib_targets += pg_logicalinspect
+
+install_data(
+ 'pg_logicalinspect.control',
+ 'pg_logicalinspect--1.0.sql',
+ kwargs: contrib_data_args,
+)
+
+tests += {
+ 'name': 'pg_logicalinspect',
+ 'sd': meson.current_source_dir(),
+ 'bd': meson.current_build_dir(),
+ 'isolation': {
+ 'specs': [
+ 'logical_inspect',
+ ],
+ 'regress_args': [
+ '--temp-config', files('logicalinspect.conf'),
+ ],
+ # see above
+ 'runningcheck': false,
+ },
+}
diff --git a/contrib/pg_logicalinspect/pg_logicalinspect--1.0.sql b/contrib/pg_logicalinspect/pg_logicalinspect--1.0.sql
new file mode 100644
index 0000000000..8f7f947cbb
--- /dev/null
+++ b/contrib/pg_logicalinspect/pg_logicalinspect--1.0.sql
@@ -0,0 +1,43 @@
+/* contrib/pg_logicalinspect/pg_logicalinspect--1.0.sql */
+
+-- complain if script is sourced in psql, rather than via CREATE EXTENSION
+\echo Use "CREATE EXTENSION pg_logicalinspect" to load this file. \quit
+
+--
+-- pg_get_logical_snapshot_meta()
+--
+CREATE FUNCTION pg_get_logical_snapshot_meta(IN filename text,
+ OUT magic int4,
+ OUT checksum int8,
+ OUT version int4
+)
+AS 'MODULE_PATHNAME', 'pg_get_logical_snapshot_meta'
+LANGUAGE C STRICT PARALLEL SAFE;
+
+REVOKE EXECUTE ON FUNCTION pg_get_logical_snapshot_meta(text) FROM PUBLIC;
+GRANT EXECUTE ON FUNCTION pg_get_logical_snapshot_meta(text) TO pg_read_server_files;
+
+--
+-- pg_get_logical_snapshot_info()
+--
+CREATE FUNCTION pg_get_logical_snapshot_info(IN filename text,
+ OUT state text,
+ OUT xmin xid,
+ OUT xmax xid,
+ OUT start_decoding_at pg_lsn,
+ OUT two_phase_at pg_lsn,
+ OUT initial_xmin_horizon xid,
+ OUT building_full_snapshot boolean,
+ OUT in_slot_creation boolean,
+ OUT last_serialized_snapshot pg_lsn,
+ OUT next_phase_at xid,
+ OUT committed_count int4,
+ OUT committed_xip xid[],
+ OUT catchange_count int4,
+ OUT catchange_xip xid[]
+)
+AS 'MODULE_PATHNAME', 'pg_get_logical_snapshot_info'
+LANGUAGE C STRICT PARALLEL SAFE;
+
+REVOKE EXECUTE ON FUNCTION pg_get_logical_snapshot_info(text) FROM PUBLIC;
+GRANT EXECUTE ON FUNCTION pg_get_logical_snapshot_info(text) TO pg_read_server_files;
diff --git a/contrib/pg_logicalinspect/pg_logicalinspect.c b/contrib/pg_logicalinspect/pg_logicalinspect.c
new file mode 100644
index 0000000000..675760e686
--- /dev/null
+++ b/contrib/pg_logicalinspect/pg_logicalinspect.c
@@ -0,0 +1,167 @@
+/*-------------------------------------------------------------------------
+ *
+ * pg_logicalinspect.c
+ * Functions to inspect contents of PostgreSQL logical snapshots
+ *
+ * Copyright (c) 2024, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ * contrib/pg_logicalinspect/pg_logicalinspect.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include "funcapi.h"
+#include "replication/snapbuild_internal.h"
+#include "utils/array.h"
+#include "utils/builtins.h"
+#include "utils/pg_lsn.h"
+
+PG_MODULE_MAGIC;
+
+PG_FUNCTION_INFO_V1(pg_get_logical_snapshot_meta);
+PG_FUNCTION_INFO_V1(pg_get_logical_snapshot_info);
+
+/* Return the description of SnapBuildState */
+static const char *
+get_snapbuild_state_desc(SnapBuildState state)
+{
+ const char *stateDesc = "unknown state";
+
+ switch (state)
+ {
+ case SNAPBUILD_START:
+ stateDesc = "start";
+ break;
+ case SNAPBUILD_BUILDING_SNAPSHOT:
+ stateDesc = "building";
+ break;
+ case SNAPBUILD_FULL_SNAPSHOT:
+ stateDesc = "full";
+ break;
+ case SNAPBUILD_CONSISTENT:
+ stateDesc = "consistent";
+ break;
+ }
+
+ return stateDesc;
+}
+
+/*
+ * Retrieve the logical snapshot file metadata.
+ */
+Datum
+pg_get_logical_snapshot_meta(PG_FUNCTION_ARGS)
+{
+#define PG_GET_LOGICAL_SNAPSHOT_META_COLS 3
+ SnapBuildOnDisk ondisk;
+ HeapTuple tuple;
+ Datum values[PG_GET_LOGICAL_SNAPSHOT_META_COLS] = {0};
+ bool nulls[PG_GET_LOGICAL_SNAPSHOT_META_COLS] = {0};
+ TupleDesc tupdesc;
+ char path[MAXPGPATH];
+ int i = 0;
+ text *filename_t = PG_GETARG_TEXT_PP(0);
+
+ /* Build a tuple descriptor for our result type */
+ if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
+ elog(ERROR, "return type must be a row type");
+
+ sprintf(path, "%s/%s",
+ PG_LOGICAL_SNAPSHOTS_DIR,
+ text_to_cstring(filename_t));
+
+ /* Validate and restore the snapshot to 'ondisk' */
+ SnapBuildRestoreSnapshot(&ondisk, path, CurrentMemoryContext, false);
+
+ values[i++] = UInt32GetDatum(ondisk.magic);
+ values[i++] = Int64GetDatum((int64) ondisk.checksum);
+ values[i++] = UInt32GetDatum(ondisk.version);
+
+ Assert(i == PG_GET_LOGICAL_SNAPSHOT_META_COLS);
+
+ tuple = heap_form_tuple(tupdesc, values, nulls);
+
+ PG_RETURN_DATUM(HeapTupleGetDatum(tuple));
+
+#undef PG_GET_LOGICAL_SNAPSHOT_META_COLS
+}
+
+Datum
+pg_get_logical_snapshot_info(PG_FUNCTION_ARGS)
+{
+#define PG_GET_LOGICAL_SNAPSHOT_INFO_COLS 14
+ SnapBuildOnDisk ondisk;
+ HeapTuple tuple;
+ Datum values[PG_GET_LOGICAL_SNAPSHOT_INFO_COLS] = {0};
+ bool nulls[PG_GET_LOGICAL_SNAPSHOT_INFO_COLS] = {0};
+ TupleDesc tupdesc;
+ char path[MAXPGPATH];
+ int i = 0;
+ text *filename_t = PG_GETARG_TEXT_PP(0);
+
+ /* Build a tuple descriptor for our result type */
+ if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
+ elog(ERROR, "return type must be a row type");
+
+ sprintf(path, "%s/%s",
+ PG_LOGICAL_SNAPSHOTS_DIR,
+ text_to_cstring(filename_t));
+
+ /* Validate and restore the snapshot to 'ondisk' */
+ SnapBuildRestoreSnapshot(&ondisk, path, CurrentMemoryContext, false);
+
+ values[i++] = CStringGetTextDatum(get_snapbuild_state_desc(ondisk.builder.state));
+ values[i++] = TransactionIdGetDatum(ondisk.builder.xmin);
+ values[i++] = TransactionIdGetDatum(ondisk.builder.xmax);
+ values[i++] = LSNGetDatum(ondisk.builder.start_decoding_at);
+ values[i++] = LSNGetDatum(ondisk.builder.two_phase_at);
+ values[i++] = TransactionIdGetDatum(ondisk.builder.initial_xmin_horizon);
+ values[i++] = BoolGetDatum(ondisk.builder.building_full_snapshot);
+ values[i++] = BoolGetDatum(ondisk.builder.in_slot_creation);
+ values[i++] = LSNGetDatum(ondisk.builder.last_serialized_snapshot);
+ values[i++] = TransactionIdGetDatum(ondisk.builder.next_phase_at);
+
+ values[i++] = UInt32GetDatum(ondisk.builder.committed.xcnt);
+ if (ondisk.builder.committed.xcnt > 0)
+ {
+ Datum *arrayelems;
+
+ arrayelems = (Datum *) palloc(ondisk.builder.committed.xcnt * sizeof(Datum));
+
+ for (int j = 0; j < ondisk.builder.committed.xcnt; j++)
+ arrayelems[j] = TransactionIdGetDatum(ondisk.builder.committed.xip[j]);
+
+ values[i++] = PointerGetDatum(construct_array_builtin(arrayelems,
+ ondisk.builder.committed.xcnt,
+ XIDOID));
+ }
+ else
+ nulls[i++] = true;
+
+ values[i++] = UInt32GetDatum(ondisk.builder.catchange.xcnt);
+ if (ondisk.builder.catchange.xcnt > 0)
+ {
+ Datum *arrayelems;
+
+ arrayelems = (Datum *) palloc(ondisk.builder.catchange.xcnt * sizeof(Datum));
+
+ for (int j = 0; j < ondisk.builder.catchange.xcnt; j++)
+ arrayelems[j] = TransactionIdGetDatum(ondisk.builder.catchange.xip[j]);
+
+ values[i++] = PointerGetDatum(construct_array_builtin(arrayelems,
+ ondisk.builder.catchange.xcnt,
+ XIDOID));
+ }
+ else
+ nulls[i++] = true;
+
+ Assert(i == PG_GET_LOGICAL_SNAPSHOT_INFO_COLS);
+
+ tuple = heap_form_tuple(tupdesc, values, nulls);
+
+ PG_RETURN_DATUM(HeapTupleGetDatum(tuple));
+
+#undef PG_GET_LOGICAL_SNAPSHOT_INFO_COLS
+}
diff --git a/contrib/pg_logicalinspect/pg_logicalinspect.control b/contrib/pg_logicalinspect/pg_logicalinspect.control
new file mode 100644
index 0000000000..b4a70e57ba
--- /dev/null
+++ b/contrib/pg_logicalinspect/pg_logicalinspect.control
@@ -0,0 +1,5 @@
+# pg_logicalinspect extension
+comment = 'functions to inspect logical decoding components'
+default_version = '1.0'
+module_pathname = '$libdir/pg_logicalinspect'
+relocatable = true
diff --git a/contrib/pg_logicalinspect/specs/logical_inspect.spec b/contrib/pg_logicalinspect/specs/logical_inspect.spec
new file mode 100644
index 0000000000..9851a6c18e
--- /dev/null
+++ b/contrib/pg_logicalinspect/specs/logical_inspect.spec
@@ -0,0 +1,34 @@
+# Test the pg_logicalinspect functions: that needs some permutation to
+# ensure that we are creating multiple logical snapshots and that one of them
+# contains ongoing catalogs changes.
+setup
+{
+ DROP TABLE IF EXISTS tbl1;
+ CREATE TABLE tbl1 (val1 integer, val2 integer);
+ CREATE EXTENSION pg_logicalinspect;
+}
+
+teardown
+{
+ DROP TABLE tbl1;
+ SELECT 'stop' FROM pg_drop_replication_slot('isolation_slot');
+ DROP EXTENSION pg_logicalinspect;
+}
+
+session "s0"
+setup { SET synchronous_commit=on; }
+step "s0_init" { SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding'); }
+step "s0_begin" { BEGIN; }
+step "s0_savepoint" { SAVEPOINT sp1; }
+step "s0_truncate" { TRUNCATE tbl1; }
+step "s0_insert" { INSERT INTO tbl1 VALUES (1); }
+step "s0_commit" { COMMIT; }
+
+session "s1"
+setup { SET synchronous_commit=on; }
+step "s1_checkpoint" { CHECKPOINT; }
+step "s1_get_changes" { SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0'); }
+step "s1_get_logical_snapshot_meta" { SELECT COUNT(meta.*) from pg_ls_logicalsnapdir(), pg_get_logical_snapshot_meta(name) as meta;}
+step "s1_get_logical_snapshot_info" { SELECT info.state, info.catchange_count, array_length(info.catchange_xip,1) AS catchange_array_length, info.committed_count, array_length(info.committed_xip,1) AS committed_array_length FROM pg_ls_logicalsnapdir(), pg_get_logical_snapshot_info(name) AS info ORDER BY 2; }
+
+permutation "s0_init" "s0_begin" "s0_savepoint" "s0_truncate" "s1_checkpoint" "s1_get_changes" "s0_commit" "s0_begin" "s0_insert" "s1_checkpoint" "s1_get_changes" "s0_commit" "s1_get_changes" "s1_get_logical_snapshot_info" "s1_get_logical_snapshot_meta"
diff --git a/doc/src/sgml/contrib.sgml b/doc/src/sgml/contrib.sgml
index 44639a8dca..7c381949a5 100644
--- a/doc/src/sgml/contrib.sgml
+++ b/doc/src/sgml/contrib.sgml
@@ -154,6 +154,7 @@ CREATE EXTENSION <replaceable>extension_name</replaceable>;
&pgbuffercache;
&pgcrypto;
&pgfreespacemap;
+ &pglogicalinspect;
&pgprewarm;
&pgrowlocks;
&pgstatstatements;
diff --git a/doc/src/sgml/filelist.sgml b/doc/src/sgml/filelist.sgml
index a7ff5f8264..66e6dccd4c 100644
--- a/doc/src/sgml/filelist.sgml
+++ b/doc/src/sgml/filelist.sgml
@@ -143,6 +143,7 @@
<!ENTITY pgbuffercache SYSTEM "pgbuffercache.sgml">
<!ENTITY pgcrypto SYSTEM "pgcrypto.sgml">
<!ENTITY pgfreespacemap SYSTEM "pgfreespacemap.sgml">
+<!ENTITY pglogicalinspect SYSTEM "pglogicalinspect.sgml">
<!ENTITY pgprewarm SYSTEM "pgprewarm.sgml">
<!ENTITY pgrowlocks SYSTEM "pgrowlocks.sgml">
<!ENTITY pgstatstatements SYSTEM "pgstatstatements.sgml">
diff --git a/doc/src/sgml/pglogicalinspect.sgml b/doc/src/sgml/pglogicalinspect.sgml
new file mode 100644
index 0000000000..4b111f9611
--- /dev/null
+++ b/doc/src/sgml/pglogicalinspect.sgml
@@ -0,0 +1,143 @@
+<!-- doc/src/sgml/pglogicalinspect.sgml -->
+
+<sect1 id="pglogicalinspect" xreflabel="pg_logicalinspect">
+ <title>pg_logicalinspect — logical decoding components inspection</title>
+
+ <indexterm zone="pglogicalinspect">
+ <primary>pg_logicalinspect</primary>
+ </indexterm>
+
+ <para>
+ The <filename>pg_logicalinspect</filename> module provides SQL functions
+ that allow you to inspect the contents of logical decoding components. It
+ allows the inspection of serialized logical snapshots of a running
+ <productname>PostgreSQL</productname> database cluster, which is useful
+ for debugging or educational purposes.
+ </para>
+
+ <para>
+ By default, use of these functions is restricted to superusers and members of
+ the <literal>pg_read_server_files</literal> role. Access may be granted by
+ superusers to others using <command>GRANT</command>.
+ </para>
+
+ <sect2 id="pglogicalinspect-funcs">
+ <title>Functions</title>
+
+ <variablelist>
+ <varlistentry id="pglogicalinspect-funcs-pg-get-logical-snapshot-meta">
+ <term>
+ <function>pg_get_logical_snapshot_meta(filename text) returns record</function>
+ </term>
+
+ <listitem>
+ <para>
+ Gets logical snapshot metadata about a snapshot file that is located in
+ the server's <filename>pg_logical/snapshots</filename> directory.
+ The <replaceable>filename</replaceable> argument represents the snapshot
+ file name.
+ For example:
+<screen>
+postgres=# SELECT * FROM pg_ls_logicalsnapdir();
+-[ RECORD 1 ]+-----------------------
+name | 0-40796E18.snap
+size | 152
+modification | 2024-08-14 16:36:32+00
+
+postgres=# SELECT * FROM pg_get_logical_snapshot_meta('0-40796E18.snap');
+-[ RECORD 1 ]--------
+magic | 1369563137
+checksum | 1028045905
+version | 6
+
+postgres=# SELECT ss.name, meta.* FROM pg_ls_logicalsnapdir() AS ss,
+pg_get_logical_snapshot_meta(ss.name) AS meta;
+-[ RECORD 1 ]-------------
+name | 0-40796E18.snap
+magic | 1369563137
+checksum | 1028045905
+version | 6
+</screen>
+ </para>
+ <para>
+ If <replaceable>filename</replaceable> does not match a snapshot file, the
+ function raises an error.
+ </para>
+ </listitem>
+ </varlistentry>
+
+ <varlistentry id="pglogicalinspect-funcs-pg-get-logical-snapshot-info">
+ <term>
+ <function>pg_get_logical_snapshot_info(filename text) returns record</function>
+ </term>
+
+ <listitem>
+ <para>
+ Gets logical snapshot information about a snapshot file that is located in
+ the server's <filename>pg_logical/snapshots</filename> directory.
+ The <replaceable>filename</replaceable> argument represents the snapshot
+ file name.
+ For example:
+<screen>
+postgres=# SELECT * FROM pg_ls_logicalsnapdir();
+-[ RECORD 1 ]+-----------------------
+name | 0-40796E18.snap
+size | 152
+modification | 2024-08-14 16:36:32+00
+
+postgres=# SELECT * FROM pg_get_logical_snapshot_info('0-40796E18.snap');
+-[ RECORD 1 ]------------+-----------
+state | consistent
+xmin | 751
+xmax | 751
+start_decoding_at | 0/40796AF8
+two_phase_at | 0/40796AF8
+initial_xmin_horizon | 0
+building_full_snapshot | f
+in_slot_creation | f
+last_serialized_snapshot | 0/0
+next_phase_at | 0
+committed_count | 0
+committed_xip |
+catchange_count | 2
+catchange_xip | {751,752}
+
+postgres=# SELECT ss.name, info.* FROM pg_ls_logicalsnapdir() AS ss,
+pg_get_logical_snapshot_info(ss.name) AS info;
+-[ RECORD 1 ]------------+----------------
+name | 0-40796E18.snap
+state | consistent
+xmin | 751
+xmax | 751
+start_decoding_at | 0/40796AF8
+two_phase_at | 0/40796AF8
+initial_xmin_horizon | 0
+building_full_snapshot | f
+in_slot_creation | f
+last_serialized_snapshot | 0/0
+next_phase_at | 0
+committed_count | 0
+committed_xip |
+catchange_count | 2
+catchange_xip | {751,752}
+</screen>
+ </para>
+ <para>
+ If <replaceable>filename</replaceable> does not match a snapshot file, the
+ function raises an error.
+ </para>
+ </listitem>
+ </varlistentry>
+
+ </variablelist>
+ </sect2>
+
+ <sect2 id="pglogicalinspect-author">
+ <title>Author</title>
+
+ <para>
+ Bertrand Drouvot <email>[email protected]</email>
+ </para>
+ </sect2>
+
+</sect1>
diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c
index b9df8c0a02..a6a4da3266 100644
--- a/src/backend/replication/logical/snapbuild.c
+++ b/src/backend/replication/logical/snapbuild.c
@@ -1684,34 +1684,31 @@ out:
}
/*
- * Restore a snapshot into 'builder' if previously one has been stored at the
- * location indicated by 'lsn'. Returns true if successful, false otherwise.
+ * Restore the logical snapshot file contents to 'ondisk'.
+ *
+ * 'context' is the memory context where the catalog modifying/committed xid
+ * will live.
+ * If 'missing_ok' is true, will not throw an error if the file is not found.
*/
-static bool
-SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn)
+bool
+SnapBuildRestoreSnapshot(SnapBuildOnDisk *ondisk, const char *path,
+ MemoryContext context, bool missing_ok)
{
- SnapBuildOnDisk ondisk;
int fd;
- char path[MAXPGPATH];
- Size sz;
pg_crc32c checksum;
-
- /* no point in loading a snapshot if we're already there */
- if (builder->state == SNAPBUILD_CONSISTENT)
- return false;
-
- sprintf(path, "%s/%X-%X.snap",
- PG_LOGICAL_SNAPSHOTS_DIR,
- LSN_FORMAT_ARGS(lsn));
+ Size sz;
fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
- if (fd < 0 && errno == ENOENT)
- return false;
- else if (fd < 0)
+ if (fd < 0)
+ {
+ if (missing_ok && errno == ENOENT)
+ return false;
+
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not open file \"%s\": %m", path)));
+ }
/* ----
* Make sure the snapshot had been stored safely to disk, that's normally
@@ -1724,47 +1721,46 @@ SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn)
fsync_fname(path, false);
fsync_fname(PG_LOGICAL_SNAPSHOTS_DIR, true);
-
/* read statically sized portion of snapshot */
- SnapBuildRestoreContents(fd, (char *) &ondisk, SnapBuildOnDiskConstantSize, path);
+ SnapBuildRestoreContents(fd, (char *) ondisk, SnapBuildOnDiskConstantSize, path);
- if (ondisk.magic != SNAPBUILD_MAGIC)
+ if (ondisk->magic != SNAPBUILD_MAGIC)
ereport(ERROR,
(errcode(ERRCODE_DATA_CORRUPTED),
errmsg("snapbuild state file \"%s\" has wrong magic number: %u instead of %u",
- path, ondisk.magic, SNAPBUILD_MAGIC)));
+ path, ondisk->magic, SNAPBUILD_MAGIC)));
- if (ondisk.version != SNAPBUILD_VERSION)
+ if (ondisk->version != SNAPBUILD_VERSION)
ereport(ERROR,
(errcode(ERRCODE_DATA_CORRUPTED),
errmsg("snapbuild state file \"%s\" has unsupported version: %u instead of %u",
- path, ondisk.version, SNAPBUILD_VERSION)));
+ path, ondisk->version, SNAPBUILD_VERSION)));
INIT_CRC32C(checksum);
COMP_CRC32C(checksum,
- ((char *) &ondisk) + SnapBuildOnDiskNotChecksummedSize,
+ ((char *) ondisk) + SnapBuildOnDiskNotChecksummedSize,
SnapBuildOnDiskConstantSize - SnapBuildOnDiskNotChecksummedSize);
/* read SnapBuild */
- SnapBuildRestoreContents(fd, (char *) &ondisk.builder, sizeof(SnapBuild), path);
- COMP_CRC32C(checksum, &ondisk.builder, sizeof(SnapBuild));
+ SnapBuildRestoreContents(fd, (char *) &ondisk->builder, sizeof(SnapBuild), path);
+ COMP_CRC32C(checksum, &ondisk->builder, sizeof(SnapBuild));
/* restore committed xacts information */
- if (ondisk.builder.committed.xcnt > 0)
+ if (ondisk->builder.committed.xcnt > 0)
{
- sz = sizeof(TransactionId) * ondisk.builder.committed.xcnt;
- ondisk.builder.committed.xip = MemoryContextAllocZero(builder->context, sz);
- SnapBuildRestoreContents(fd, (char *) ondisk.builder.committed.xip, sz, path);
- COMP_CRC32C(checksum, ondisk.builder.committed.xip, sz);
+ sz = sizeof(TransactionId) * ondisk->builder.committed.xcnt;
+ ondisk->builder.committed.xip = MemoryContextAllocZero(context, sz);
+ SnapBuildRestoreContents(fd, (char *) ondisk->builder.committed.xip, sz, path);
+ COMP_CRC32C(checksum, ondisk->builder.committed.xip, sz);
}
/* restore catalog modifying xacts information */
- if (ondisk.builder.catchange.xcnt > 0)
+ if (ondisk->builder.catchange.xcnt > 0)
{
- sz = sizeof(TransactionId) * ondisk.builder.catchange.xcnt;
- ondisk.builder.catchange.xip = MemoryContextAllocZero(builder->context, sz);
- SnapBuildRestoreContents(fd, (char *) ondisk.builder.catchange.xip, sz, path);
- COMP_CRC32C(checksum, ondisk.builder.catchange.xip, sz);
+ sz = sizeof(TransactionId) * ondisk->builder.catchange.xcnt;
+ ondisk->builder.catchange.xip = MemoryContextAllocZero(context, sz);
+ SnapBuildRestoreContents(fd, (char *) ondisk->builder.catchange.xip, sz, path);
+ COMP_CRC32C(checksum, ondisk->builder.catchange.xip, sz);
}
if (CloseTransientFile(fd) != 0)
@@ -1775,11 +1771,36 @@ SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn)
FIN_CRC32C(checksum);
/* verify checksum of what we've read */
- if (!EQ_CRC32C(checksum, ondisk.checksum))
+ if (!EQ_CRC32C(checksum, ondisk->checksum))
ereport(ERROR,
(errcode(ERRCODE_DATA_CORRUPTED),
errmsg("checksum mismatch for snapbuild state file \"%s\": is %u, should be %u",
- path, checksum, ondisk.checksum)));
+ path, checksum, ondisk->checksum)));
+
+ return true;
+}
+
+/*
+ * Restore a snapshot into 'builder' if previously one has been stored at the
+ * location indicated by 'lsn'. Returns true if successful, false otherwise.
+ */
+static bool
+SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn)
+{
+ SnapBuildOnDisk ondisk;
+ char path[MAXPGPATH];
+
+ /* no point in loading a snapshot if we're already there */
+ if (builder->state == SNAPBUILD_CONSISTENT)
+ return false;
+
+ sprintf(path, "%s/%X-%X.snap",
+ PG_LOGICAL_SNAPSHOTS_DIR,
+ LSN_FORMAT_ARGS(lsn));
+
+ /* validate and restore the snapshot to 'ondisk' */
+ if (!SnapBuildRestoreSnapshot(&ondisk, path, builder->context, true))
+ return false;
/*
* ok, we now have a sensible snapshot here, figure out if it has more
diff --git a/src/backend/utils/adt/arrayfuncs.c b/src/backend/utils/adt/arrayfuncs.c
index e5c7e57a5d..41434279c5 100644
--- a/src/backend/utils/adt/arrayfuncs.c
+++ b/src/backend/utils/adt/arrayfuncs.c
@@ -3447,6 +3447,12 @@ construct_array_builtin(Datum *elems, int nelems, Oid elmtype)
elmalign = TYPALIGN_SHORT;
break;
+ case XIDOID:
+ elmlen = sizeof(TransactionId);
+ elmbyval = true;
+ elmalign = TYPALIGN_INT;
+ break;
+
default:
elog(ERROR, "type %u not supported by construct_array_builtin()", elmtype);
/* keep compiler quiet */
diff --git a/src/include/replication/snapbuild.h b/src/include/replication/snapbuild.h
index dbb4bc2f4b..3c1454df99 100644
--- a/src/include/replication/snapbuild.h
+++ b/src/include/replication/snapbuild.h
@@ -15,6 +15,10 @@
#include "access/xlogdefs.h"
#include "utils/snapmgr.h"
+/*
+ * Please keep get_snapbuild_state_desc() (located in the pg_logicalinspect
+ * module) updated if a change needs to be made to SnapBuildState.
+ */
typedef enum
{
/*
diff --git a/src/include/replication/snapbuild_internal.h b/src/include/replication/snapbuild_internal.h
index 03719ccf2a..7134b48b96 100644
--- a/src/include/replication/snapbuild_internal.h
+++ b/src/include/replication/snapbuild_internal.h
@@ -193,4 +193,7 @@ typedef struct SnapBuildOnDisk
/* variable amount of TransactionIds follows */
} SnapBuildOnDisk;
+extern bool SnapBuildRestoreSnapshot(SnapBuildOnDisk *ondisk, const char *path,
+ MemoryContext context, bool missing_ok);
+
#endif /* SNAPBUILD_INTERNAL_H */
--
2.34.1