Hi, On Thu, Aug 29, 2024 at 02:15:47PM +0000, Bertrand Drouvot wrote: > I don't see any use case where it could be useful when the server is down. So, > I think I'll move forward with in core functions (unless someone has a > different > opinion). >
Please find v2 attached that creates the 2 new in core functions. Note that once those new functions are in (or maybe sooner), I'll submit an additional patch to get rid of the code duplication between the new ValidateSnapshotFile() and SnapBuildRestore(). Regards, -- Bertrand Drouvot PostgreSQL Contributors Team RDS Open Source Databases Amazon Web Services: https://aws.amazon.com
>From 6c5a1ad66b203036739aae955932b8e3813c71e3 Mon Sep 17 00:00:00 2001 From: Bertrand Drouvot <bertranddrouvot...@gmail.com> Date: Thu, 29 Aug 2024 20:51:31 +0000 Subject: [PATCH v2] Functions to get ondisk logical snapshots details Provides SQL functions that allow to inspect the contents of serialized logical snapshots of a running database cluster, which is useful for debugging or educational purposes. --- contrib/test_decoding/Makefile | 2 +- .../expected/get_ondisk_snapshot_info.out | 57 +++++ contrib/test_decoding/meson.build | 1 + .../specs/get_ondisk_snapshot_info.spec | 32 +++ doc/src/sgml/func.sgml | 53 +++++ src/backend/replication/logical/snapbuild.c | 225 ++++++++++++++++++ src/include/catalog/pg_proc.dat | 16 ++ 7 files changed, 385 insertions(+), 1 deletion(-) 17.3% contrib/test_decoding/expected/ 12.6% contrib/test_decoding/specs/ 17.3% doc/src/sgml/ 45.3% src/backend/replication/logical/ 6.6% src/include/catalog/ diff --git a/contrib/test_decoding/Makefile b/contrib/test_decoding/Makefile index a4ba1a509a..b1b8ffa9e8 100644 --- a/contrib/test_decoding/Makefile +++ b/contrib/test_decoding/Makefile @@ -9,7 +9,7 @@ REGRESS = ddl xact rewrite toast permissions decoding_in_xact \ ISOLATION = mxact delayed_startup ondisk_startup concurrent_ddl_dml \ oldest_xmin snapshot_transfer subxact_without_top concurrent_stream \ twophase_snapshot slot_creation_error catalog_change_snapshot \ - skip_snapshot_restore + skip_snapshot_restore get_ondisk_snapshot_info REGRESS_OPTS = --temp-config $(top_srcdir)/contrib/test_decoding/logical.conf ISOLATION_OPTS = --temp-config $(top_srcdir)/contrib/test_decoding/logical.conf diff --git a/contrib/test_decoding/expected/get_ondisk_snapshot_info.out b/contrib/test_decoding/expected/get_ondisk_snapshot_info.out new file mode 100644 index 0000000000..b676ccd528 --- /dev/null +++ b/contrib/test_decoding/expected/get_ondisk_snapshot_info.out @@ -0,0 +1,57 @@ +Parsed test spec with 2 sessions + +starting permutation: s0_init s0_begin s0_savepoint s0_truncate s1_checkpoint s1_get_changes s0_commit s0_begin s0_insert s1_checkpoint s1_get_changes s0_commit s1_get_changes s1_get_logical_snapshot_info s1_get_logical_snapshot_meta +step s0_init: SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding'); +?column? +-------- +init +(1 row) + +step s0_begin: BEGIN; +step s0_savepoint: SAVEPOINT sp1; +step s0_truncate: TRUNCATE tbl1; +step s1_checkpoint: CHECKPOINT; +step s1_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0'); +data +---- +(0 rows) + +step s0_commit: COMMIT; +step s0_begin: BEGIN; +step s0_insert: INSERT INTO tbl1 VALUES (1); +step s1_checkpoint: CHECKPOINT; +step s1_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0'); +data +--------------------------------------- +BEGIN +table public.tbl1: TRUNCATE: (no-flags) +COMMIT +(3 rows) + +step s0_commit: COMMIT; +step s1_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0'); +data +------------------------------------------------------------- +BEGIN +table public.tbl1: INSERT: val1[integer]:1 val2[integer]:null +COMMIT +(3 rows) + +step s1_get_logical_snapshot_info: SELECT (pg_get_logical_snapshot_info(f.name::pg_lsn)).state,(pg_get_logical_snapshot_info(f.name::pg_lsn)).catchange_count,array_length((pg_get_logical_snapshot_info(f.name::pg_lsn)).catchange_xip,1),(pg_get_logical_snapshot_info(f.name::pg_lsn)).committed_count,array_length((pg_get_logical_snapshot_info(f.name::pg_lsn)).committed_xip,1) FROM (SELECT replace(replace(name,'.snap',''),'-','/') AS name FROM pg_ls_logicalsnapdir()) AS f ORDER BY 2; +state|catchange_count|array_length|committed_count|array_length +-----+---------------+------------+---------------+------------ + 2| 0| | 2| 2 + 2| 2| 2| 0| +(2 rows) + +step s1_get_logical_snapshot_meta: SELECT COUNT((pg_get_logical_snapshot_meta(f.name::pg_lsn))) FROM (SELECT replace(replace(name,'.snap',''),'-','/') AS name FROM pg_ls_logicalsnapdir()) AS f; +count +----- + 2 +(1 row) + +?column? +-------- +stop +(1 row) + diff --git a/contrib/test_decoding/meson.build b/contrib/test_decoding/meson.build index f643dc81a2..d6e784cbdd 100644 --- a/contrib/test_decoding/meson.build +++ b/contrib/test_decoding/meson.build @@ -63,6 +63,7 @@ tests += { 'twophase_snapshot', 'slot_creation_error', 'skip_snapshot_restore', + 'get_ondisk_snapshot_info', ], 'regress_args': [ '--temp-config', files('logical.conf'), diff --git a/contrib/test_decoding/specs/get_ondisk_snapshot_info.spec b/contrib/test_decoding/specs/get_ondisk_snapshot_info.spec new file mode 100644 index 0000000000..39c2ee1430 --- /dev/null +++ b/contrib/test_decoding/specs/get_ondisk_snapshot_info.spec @@ -0,0 +1,32 @@ +# Test the functions that retrieve ondisk logical snapshots informations. +# That needs some permutation to ensure that we are creating multiple logical +# snapshots and that one of them contains ongoing catalogs changes. +setup +{ + DROP TABLE IF EXISTS tbl1; + CREATE TABLE tbl1 (val1 integer, val2 integer); +} + +teardown +{ + DROP TABLE tbl1; + SELECT 'stop' FROM pg_drop_replication_slot('isolation_slot'); +} + +session "s0" +setup { SET synchronous_commit=on; } +step "s0_init" { SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding'); } +step "s0_begin" { BEGIN; } +step "s0_savepoint" { SAVEPOINT sp1; } +step "s0_truncate" { TRUNCATE tbl1; } +step "s0_insert" { INSERT INTO tbl1 VALUES (1); } +step "s0_commit" { COMMIT; } + +session "s1" +setup { SET synchronous_commit=on; } +step "s1_checkpoint" { CHECKPOINT; } +step "s1_get_changes" { SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0'); } +step "s1_get_logical_snapshot_meta" { SELECT COUNT((pg_get_logical_snapshot_meta(f.name::pg_lsn))) FROM (SELECT replace(replace(name,'.snap',''),'-','/') AS name FROM pg_ls_logicalsnapdir()) AS f; } +step "s1_get_logical_snapshot_info" { SELECT (pg_get_logical_snapshot_info(f.name::pg_lsn)).state,(pg_get_logical_snapshot_info(f.name::pg_lsn)).catchange_count,array_length((pg_get_logical_snapshot_info(f.name::pg_lsn)).catchange_xip,1),(pg_get_logical_snapshot_info(f.name::pg_lsn)).committed_count,array_length((pg_get_logical_snapshot_info(f.name::pg_lsn)).committed_xip,1) FROM (SELECT replace(replace(name,'.snap',''),'-','/') AS name FROM pg_ls_logicalsnapdir()) AS f ORDER BY 2; } + +permutation "s0_init" "s0_begin" "s0_savepoint" "s0_truncate" "s1_checkpoint" "s1_get_changes" "s0_commit" "s0_begin" "s0_insert" "s1_checkpoint" "s1_get_changes" "s0_commit" "s1_get_changes" "s1_get_logical_snapshot_info" "s1_get_logical_snapshot_meta" diff --git a/doc/src/sgml/func.sgml b/doc/src/sgml/func.sgml index 461fc3f437..8d233f092d 100644 --- a/doc/src/sgml/func.sgml +++ b/doc/src/sgml/func.sgml @@ -29682,6 +29682,59 @@ DETAIL: Make sure pg_wal_replay_wait() isn't called within a transaction with a </entry> </row> + <row> + <entry id="pg-get-logical-snapshot-meta" role="func_table_entry"><para role="func_signature"> + <indexterm> + <primary>pg_get_logical_snapshot_meta</primary> + </indexterm> + <function>pg_get_logical_snapshot_meta</function> ( <parameter>in_lsn</parameter> <type>pg_lsn</type> ) + <returnvalue>record</returnvalue> + ( <parameter>magic</parameter> <type>int</type>, + <parameter>checksum</parameter> <type>int</type>, + <parameter>version</parameter> <type>int</type> ) + </para> + <para> + Gets logical snapshot metadata about a snapshot file that is located in + the <filename>pg_logical/snapshots</filename> directory. + The <replaceable>in_lsn</replaceable> argument can be extracted from the + snapshot file name. The aim of this function is mainly for debugging or + educational purposes. + </para> + </entry> + </row> + + <row> + <entry id="pg-get-logical-snapshot-info" role="func_table_entry"><para role="func_signature"> + <indexterm> + <primary>pg_get_logical_snapshot_info</primary> + </indexterm> + <function>pg_get_logical_snapshot_info</function> ( <parameter>in_lsn</parameter> <type>pg_lsn</type> ) + <returnvalue>record</returnvalue> + ( <parameter>state</parameter> <type>smallint</type>, + <parameter>xmin</parameter> <type>xid</type>, + <parameter>xmax</parameter> <type>xid</type>, + <parameter>start_decoding_at</parameter> <type>pg_lsn</type>, + <parameter>two_phase_at</parameter> <type>pg_lsn</type>, + <parameter>initial_xmin_horizon</parameter> <type>xid</type>, + <parameter>building_full_snapshot</parameter> <type>boolean</type>, + <parameter>in_slot_creation</parameter> <type>boolean</type>, + <parameter>last_serialized_snapshot</parameter> <type>pg_lsn</type>, + <parameter>next_phase_at</parameter> <type>xid</type>, + <parameter>committed_count</parameter> <type>bigint</type>, + <parameter>committed_xip</parameter> <type>xid[]</type>, + <parameter>catchange_count</parameter> <type>bigint</type>, + <parameter>catchange_xip</parameter> <type>xid[]</type> ) + </para> + <para> + Gets logical snapshot information about a snapshot file that is located + in the <filename>pg_logical/snapshots</filename> directory. + The <replaceable>in_lsn</replaceable> argument can be extracted from the + snapshot file name. The aim of this function is mainly for debugging or + educational purposes. + </para> + </entry> + </row> + </tbody> </tgroup> </table> diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c index 0450f94ba8..b4704dd5b4 100644 --- a/src/backend/replication/logical/snapbuild.c +++ b/src/backend/replication/logical/snapbuild.c @@ -129,6 +129,7 @@ #include "access/transam.h" #include "access/xact.h" #include "common/file_utils.h" +#include "funcapi.h" #include "miscadmin.h" #include "pgstat.h" #include "replication/logical.h" @@ -139,8 +140,10 @@ #include "storage/proc.h" #include "storage/procarray.h" #include "storage/standby.h" +#include "utils/array.h" #include "utils/builtins.h" #include "utils/memutils.h" +#include "utils/pg_lsn.h" #include "utils/snapmgr.h" #include "utils/snapshot.h" @@ -1599,6 +1602,9 @@ typedef struct SnapBuildOnDisk #define SNAPBUILD_MAGIC 0x51A1E001 #define SNAPBUILD_VERSION 6 +static void ValidateSnapshotFile(XLogRecPtr lsn, SnapBuildOnDisk *ondisk, + const char *path); + /* * Store/Load a snapshot from disk, depending on the snapshot builder's state. * @@ -2178,3 +2184,222 @@ SnapBuildSnapshotExists(XLogRecPtr lsn) return ret == 0; } + +static void +ValidateSnapshotFile(XLogRecPtr lsn, SnapBuildOnDisk *ondisk, const char *path) +{ + int fd; + Size sz; + pg_crc32c checksum; + MemoryContext context; + + context = AllocSetContextCreate(CurrentMemoryContext, + "ondisk logical snapshot inspect context", + ALLOCSET_DEFAULT_SIZES); + + fd = OpenTransientFile(path, O_RDONLY | PG_BINARY); + + if (fd < 0 && errno == ENOENT) + ereport(ERROR, + errmsg("file \"%s\" does not exist", path)); + else if (fd < 0) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not open file \"%s\": %m", path))); + + /* ---- + * Make sure the snapshot had been stored safely to disk, that's normally + * cheap. + * Note that we do not need PANIC here, nobody will be able to use the + * slot without fsyncing, and saving it won't succeed without an fsync() + * either... + * ---- + */ + fsync_fname(path, false); + fsync_fname(PG_LOGICAL_SNAPSHOTS_DIR, true); + + + /* read statically sized portion of snapshot */ + SnapBuildRestoreContents(fd, (char *) ondisk, SnapBuildOnDiskConstantSize, path); + + if (ondisk->magic != SNAPBUILD_MAGIC) + ereport(ERROR, + (errcode(ERRCODE_DATA_CORRUPTED), + errmsg("snapbuild state file \"%s\" has wrong magic number: %u instead of %u", + path, ondisk->magic, SNAPBUILD_MAGIC))); + + if (ondisk->version != SNAPBUILD_VERSION) + ereport(ERROR, + (errcode(ERRCODE_DATA_CORRUPTED), + errmsg("snapbuild state file \"%s\" has unsupported version: %u instead of %u", + path, ondisk->version, SNAPBUILD_VERSION))); + + INIT_CRC32C(checksum); + COMP_CRC32C(checksum, + ((char *) ondisk) + SnapBuildOnDiskNotChecksummedSize, + SnapBuildOnDiskConstantSize - SnapBuildOnDiskNotChecksummedSize); + + /* read SnapBuild */ + SnapBuildRestoreContents(fd, (char *) &ondisk->builder, sizeof(SnapBuild), path); + COMP_CRC32C(checksum, &ondisk->builder, sizeof(SnapBuild)); + + ondisk->builder.context = context; + + /* restore committed xacts information */ + if (ondisk->builder.committed.xcnt > 0) + { + sz = sizeof(TransactionId) * ondisk->builder.committed.xcnt; + ondisk->builder.committed.xip = MemoryContextAllocZero(ondisk->builder.context, sz); + SnapBuildRestoreContents(fd, (char *) ondisk->builder.committed.xip, sz, path); + COMP_CRC32C(checksum, ondisk->builder.committed.xip, sz); + } + + /* restore catalog modifying xacts information */ + if (ondisk->builder.catchange.xcnt > 0) + { + sz = sizeof(TransactionId) * ondisk->builder.catchange.xcnt; + ondisk->builder.catchange.xip = MemoryContextAllocZero(ondisk->builder.context, sz); + SnapBuildRestoreContents(fd, (char *) ondisk->builder.catchange.xip, sz, path); + COMP_CRC32C(checksum, ondisk->builder.catchange.xip, sz); + } + + if (CloseTransientFile(fd) != 0) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not close file \"%s\": %m", path))); + + FIN_CRC32C(checksum); + + /* verify checksum of what we've read */ + if (!EQ_CRC32C(checksum, ondisk->checksum)) + ereport(ERROR, + (errcode(ERRCODE_DATA_CORRUPTED), + errmsg("checksum mismatch for snapbuild state file \"%s\": is %u, should be %u", + path, checksum, ondisk->checksum))); +} + +/* + * Retrieve the logical snapshot file metadata. + */ +Datum +pg_get_logical_snapshot_meta(PG_FUNCTION_ARGS) +{ +#define PG_GET_LOGICAL_SNAPSHOT_META_COLS 3 + SnapBuildOnDisk ondisk; + XLogRecPtr lsn; + HeapTuple tuple; + Datum values[PG_GET_LOGICAL_SNAPSHOT_META_COLS]; + bool nulls[PG_GET_LOGICAL_SNAPSHOT_META_COLS]; + TupleDesc tupdesc; + char path[MAXPGPATH]; + + lsn = PG_GETARG_LSN(0); + + sprintf(path, "%s/%X-%X.snap", + PG_LOGICAL_SNAPSHOTS_DIR, + LSN_FORMAT_ARGS(lsn)); + + ValidateSnapshotFile(lsn, &ondisk, path); + + /* Build a tuple descriptor for our result type. */ + if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE) + elog(ERROR, "return type must be a row type"); + + memset(nulls, 0, sizeof(nulls)); + + values[0] = Int32GetDatum(ondisk.magic); + values[1] = Int32GetDatum(ondisk.checksum); + values[2] = Int32GetDatum(ondisk.version); + + tuple = heap_form_tuple(tupdesc, values, nulls); + + MemoryContextReset(ondisk.builder.context); + + PG_RETURN_DATUM(HeapTupleGetDatum(tuple)); + +#undef PG_GET_LOGICAL_SNAPSHOT_META_COLS +} + +/* + * Retrieve the logical snapshot file data. + */ +Datum +pg_get_logical_snapshot_info(PG_FUNCTION_ARGS) +{ +#define PG_GET_LOGICAL_SNAPSHOT_INFO_COLS 14 + SnapBuildOnDisk ondisk; + XLogRecPtr lsn; + HeapTuple tuple; + Datum values[PG_GET_LOGICAL_SNAPSHOT_INFO_COLS]; + bool nulls[PG_GET_LOGICAL_SNAPSHOT_INFO_COLS]; + TupleDesc tupdesc; + char path[MAXPGPATH]; + + lsn = PG_GETARG_LSN(0); + + sprintf(path, "%s/%X-%X.snap", + PG_LOGICAL_SNAPSHOTS_DIR, + LSN_FORMAT_ARGS(lsn)); + + ValidateSnapshotFile(lsn, &ondisk, path); + + /* Build a tuple descriptor for our result type. */ + if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE) + elog(ERROR, "return type must be a row type"); + + memset(nulls, 0, sizeof(nulls)); + + values[0] = Int16GetDatum(ondisk.builder.state); + values[1] = TransactionIdGetDatum(ondisk.builder.xmin); + values[2] = TransactionIdGetDatum(ondisk.builder.xmax); + values[3] = LSNGetDatum(ondisk.builder.start_decoding_at); + values[4] = LSNGetDatum(ondisk.builder.two_phase_at); + values[5] = TransactionIdGetDatum(ondisk.builder.initial_xmin_horizon); + values[6] = BoolGetDatum(ondisk.builder.building_full_snapshot); + values[7] = BoolGetDatum(ondisk.builder.in_slot_creation); + values[8] = LSNGetDatum(ondisk.builder.last_serialized_snapshot); + values[9] = TransactionIdGetDatum(ondisk.builder.next_phase_at); + values[10] = Int64GetDatum(ondisk.builder.committed.xcnt); + + if (ondisk.builder.committed.xcnt > 0) + { + Datum *arrayelems; + int narrayelems; + + arrayelems = (Datum *) palloc(ondisk.builder.committed.xcnt * sizeof(Datum)); + narrayelems = 0; + + for (narrayelems = 0; narrayelems < ondisk.builder.committed.xcnt; narrayelems++) + arrayelems[narrayelems] = Int64GetDatum((int64) ondisk.builder.committed.xip[narrayelems]); + + values[11] = PointerGetDatum(construct_array_builtin(arrayelems, narrayelems, INT8OID)); + } + else + nulls[11] = true; + + values[12] = Int64GetDatum(ondisk.builder.catchange.xcnt); + + if (ondisk.builder.catchange.xcnt > 0) + { + Datum *arrayelems; + int narrayelems; + + arrayelems = (Datum *) palloc(ondisk.builder.catchange.xcnt * sizeof(Datum)); + narrayelems = 0; + + for (narrayelems = 0; narrayelems < ondisk.builder.catchange.xcnt; narrayelems++) + arrayelems[narrayelems] = Int64GetDatum((int64) ondisk.builder.catchange.xip[narrayelems]); + + values[13] = PointerGetDatum(construct_array_builtin(arrayelems, narrayelems, INT8OID)); + } + else + nulls[13] = true; + + tuple = heap_form_tuple(tupdesc, values, nulls); + + MemoryContextReset(ondisk.builder.context); + + PG_RETURN_DATUM(HeapTupleGetDatum(tuple)); + +#undef PG_GET_LOGICAL_SNAPSHOT_INFO_COLS +} diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index 85f42be1b3..76a8c00ba0 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -11264,6 +11264,22 @@ proargmodes => '{i,i,i,v,o,o,o}', proargnames => '{slot_name,upto_lsn,upto_nchanges,options,lsn,xid,data}', prosrc => 'pg_logical_slot_get_changes' }, +{ oid => '9080', descr => 'get logical snapshot file data', + proname => 'pg_get_logical_snapshot_info', + provolatile => 'i', proparallel => 's', + prorettype => 'record', proargtypes => 'pg_lsn', + proallargtypes => '{pg_lsn,int2,xid,xid,pg_lsn,pg_lsn,xid,bool,bool,pg_lsn,xid,int8,_int8,int8,_int8}', + proargmodes => '{i,o,o,o,o,o,o,o,o,o,o,o,o,o,o}', + proargnames => '{in_lsn,state,xmin,xmax,start_decoding_at,two_phase_at,initial_xmin_horizon,building_full_snapshot,in_slot_creation,last_serialized_snapshot,next_phase_at,committed_count,committed_xip,catchange_count,catchange_xip}', + prosrc => 'pg_get_logical_snapshot_info' }, +{ oid => '9095', descr => 'get logical snapshot file metadata', + proname => 'pg_get_logical_snapshot_meta', + provolatile => 'i', proparallel => 's', + prorettype => 'record', proargtypes => 'pg_lsn', + proallargtypes => '{pg_lsn,int4,int4,int4}', + proargmodes => '{i,o,o,o}', + proargnames => '{in_lsn,magic,checksum,version}', + prosrc => 'pg_get_logical_snapshot_meta' }, { oid => '3783', descr => 'get binary changes from replication slot', proname => 'pg_logical_slot_get_binary_changes', procost => '1000', prorows => '1000', provariadic => 'text', proisstrict => 'f', -- 2.34.1