On Mon, 2022-03-28 at 11:00 -0700, Andres Freund wrote: > > +/* > > + * Start up all resource managers. > > + */ > > +void > > +StartupResourceManagers() > > (void)
Fixed. > Random idea: Might be worth emitting the id->name mapping just after > a redo > location is determined, to make it easier to debug things. Not quite sure I understood this idea, do you mean dump all rmgrs and the IDs when performing recovery? > > +RmgrData * > > +GetRmgr(RmgrId rmid) > > +{ > > + return RmgrTable[rmid]; > > +} > > Given this is so simple, why incur the cost of a function call? > Rather than > continuing to expose RmgrTable and move GetRmgr() into the header, as > a static > inline? Made it static inline. > Shouldn't this continue to enforce RM_MAX_ID as well? Done. > Personally I'd rather name it ResourceManagersStartup() or > RmgrStartup(). Done. > Like here. It's obviously not as performance critical as replay. But > it's > still a shame to add 3 calls to GetRmgr, that each then need to > dereference > RmgrTable. The compiler won't be able to optimize any of that away. Changed to only call it once and save it in a variable for each call site where it makes sense. > So we can't filter by rmgr id for non-builtin rmgr's? That sucks. > Maybe add a > custom:<i> syntax? Or generally allow numerical identifiers in > addition to the > names? Good idea. I changed it to allow "custom###" to mean the custom rmgr with ID ### (3 digits). I still may change it to go back to two RmgrTables (one for builtin and one for custom) to remove the lingering performance doubts. Other than that, and some cleanup, this is pretty close to the version I intend to commit. Regards, Jeff Davis
From 014027bc982896e7e1b6fafe4b622ed5fa45a08b Mon Sep 17 00:00:00 2001 From: Jeff Davis <j...@j-davis.com> Date: Sat, 6 Nov 2021 13:01:38 -0700 Subject: [PATCH] Extensible rmgr. Allow extensions to specify a new custom rmgr, which allows specialized WAL. This is meant to be used by a custom Table Access Method, which would not otherwise be able to offer logical decoding/replication. It may also be used by new Index Access Methods. Prior to this commit, only Generic WAL was available, which offers support for recovery and physical replication but not logical replication. Reviewed-by: Julien Rouhaud, Andres Freund Discussion: https://postgr.es/m/ed1fb2e22d15d3563ae0eb610f7b61bb15999c0a.camel%40j-davis.com --- src/backend/access/transam/rmgr.c | 80 ++++++++++++++++++++++- src/backend/access/transam/xlogreader.c | 2 +- src/backend/access/transam/xlogrecovery.c | 33 ++++------ src/backend/replication/logical/decode.c | 8 +-- src/backend/utils/misc/guc.c | 10 ++- src/bin/pg_rewind/parsexlog.c | 11 ++-- src/bin/pg_waldump/pg_waldump.c | 65 +++++++++++++----- src/bin/pg_waldump/rmgrdesc.c | 60 ++++++++++++++++- src/bin/pg_waldump/rmgrdesc.h | 2 +- src/include/access/rmgr.h | 15 ++++- src/include/access/xlog_internal.h | 11 +++- 11 files changed, 239 insertions(+), 58 deletions(-) diff --git a/src/backend/access/transam/rmgr.c b/src/backend/access/transam/rmgr.c index f8847d5aebf..504d0a5b5ac 100644 --- a/src/backend/access/transam/rmgr.c +++ b/src/backend/access/transam/rmgr.c @@ -24,6 +24,7 @@ #include "commands/dbcommands_xlog.h" #include "commands/sequence.h" #include "commands/tablespace.h" +#include "miscadmin.h" #include "replication/decode.h" #include "replication/message.h" #include "replication/origin.h" @@ -32,8 +33,83 @@ /* must be kept in sync with RmgrData definition in xlog_internal.h */ #define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask,decode) \ - { name, redo, desc, identify, startup, cleanup, mask, decode }, + &(struct RmgrData){ name, redo, desc, identify, startup, cleanup, mask, decode }, -const RmgrData RmgrTable[RM_MAX_ID + 1] = { +RmgrData *RmgrTable[RM_MAX_ID + 1] = { #include "access/rmgrlist.h" }; + +/* + * Start up all resource managers. + */ +void +RmgrStartup(void) +{ + for (int i = 0; i < RM_MAX_ID; i++) + { + if (RmgrTable[i] == NULL) + continue; + + if (RmgrTable[i]->rm_startup != NULL) + RmgrTable[i]->rm_startup(); + } +} + +/* + * Clean up all resource managers. + */ +void +RmgrCleanup(void) +{ + for (int i = 0; i < RM_MAX_ID; i++) + { + if (RmgrTable[i] == NULL) + continue; + + if (RmgrTable[i]->rm_cleanup != NULL) + RmgrTable[i]->rm_cleanup(); + } +} + +/* + * Register a new custom rmgr. + * + * Refer to https://wiki.postgresql.org/wiki/ExtensibleRmgr to reserve a + * unique RmgrId for your extension, to avoid conflicts. During development, + * use RM_EXPERIMENTAL_ID. + */ +void +RegisterCustomRmgr(RmgrId rmid, RmgrData *rmgr) +{ + if (rmid < RM_MIN_CUSTOM_ID) + ereport(PANIC, errmsg("custom rmgr id %d is out of range", rmid)); + + if (!process_shared_preload_libraries_in_progress) + ereport(ERROR, + (errmsg("custom rmgr must be registered while initializing modules in shared_preload_libraries"))); + + if (RmgrTable[rmid] != NULL) + ereport(PANIC, + (errmsg("custom rmgr ID %d already registered with name \"%s\"", + rmid, RmgrTable[rmid]->rm_name))); + + /* check for existing rmgr with the same name */ + for (int i = 0; i <= RM_MAX_ID; i++) + { + const RmgrData *existing_rmgr = RmgrTable[i]; + + if (existing_rmgr == NULL) + continue; + + if (!strcmp(existing_rmgr->rm_name, rmgr->rm_name)) + ereport(PANIC, + (errmsg("custom rmgr \"%s\" has the same name as builtin rmgr", + existing_rmgr->rm_name))); + } + + /* register it */ + RmgrTable[rmid] = rmgr; + ereport(LOG, + (errmsg("registered custom rmgr \"%s\" with ID %d", + rmgr->rm_name, rmid))); +} diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c index e437c429920..1017e913e51 100644 --- a/src/backend/access/transam/xlogreader.c +++ b/src/backend/access/transam/xlogreader.c @@ -1102,7 +1102,7 @@ ValidXLogRecordHeader(XLogReaderState *state, XLogRecPtr RecPtr, (uint32) SizeOfXLogRecord, record->xl_tot_len); return false; } - if (record->xl_rmid > RM_MAX_ID) + if (!RM_IS_VALID(record->xl_rmid)) { report_invalid_record(state, "invalid resource manager ID %u at %X/%X", diff --git a/src/backend/access/transam/xlogrecovery.c b/src/backend/access/transam/xlogrecovery.c index 8d2395dae25..6c661fa6e27 100644 --- a/src/backend/access/transam/xlogrecovery.c +++ b/src/backend/access/transam/xlogrecovery.c @@ -1531,7 +1531,6 @@ ShutdownWalRecovery(void) void PerformWalRecovery(void) { - int rmid; XLogRecord *record; bool reachedRecoveryTarget = false; TimeLineID replayTLI; @@ -1604,12 +1603,7 @@ PerformWalRecovery(void) InRedo = true; - /* Initialize resource managers */ - for (rmid = 0; rmid <= RM_MAX_ID; rmid++) - { - if (RmgrTable[rmid].rm_startup != NULL) - RmgrTable[rmid].rm_startup(); - } + RmgrStartup(); ereport(LOG, (errmsg("redo starts at %X/%X", @@ -1746,12 +1740,7 @@ PerformWalRecovery(void) } } - /* Allow resource managers to do any required cleanup. */ - for (rmid = 0; rmid <= RM_MAX_ID; rmid++) - { - if (RmgrTable[rmid].rm_cleanup != NULL) - RmgrTable[rmid].rm_cleanup(); - } + RmgrCleanup(); ereport(LOG, (errmsg("redo done at %X/%X system usage: %s", @@ -1871,7 +1860,7 @@ ApplyWalRecord(XLogReaderState *xlogreader, XLogRecord *record, TimeLineID *repl xlogrecovery_redo(xlogreader, *replayTLI); /* Now apply the WAL record itself */ - RmgrTable[record->xl_rmid].rm_redo(xlogreader); + GetRmgr(record->xl_rmid)->rm_redo(xlogreader); /* * After redo, check whether the backup pages associated with the WAL @@ -2101,20 +2090,20 @@ rm_redo_error_callback(void *arg) void xlog_outdesc(StringInfo buf, XLogReaderState *record) { - RmgrId rmid = XLogRecGetRmid(record); + RmgrData *rmgr = GetRmgr(XLogRecGetRmid(record)); uint8 info = XLogRecGetInfo(record); const char *id; - appendStringInfoString(buf, RmgrTable[rmid].rm_name); + appendStringInfoString(buf, rmgr->rm_name); appendStringInfoChar(buf, '/'); - id = RmgrTable[rmid].rm_identify(info); + id = rmgr->rm_identify(info); if (id == NULL) appendStringInfo(buf, "UNKNOWN (%X): ", info & ~XLR_INFO_MASK); else appendStringInfo(buf, "%s: ", id); - RmgrTable[rmid].rm_desc(buf, record); + rmgr->rm_desc(buf, record); } #ifdef WAL_DEBUG @@ -2263,7 +2252,7 @@ getRecordTimestamp(XLogReaderState *record, TimestampTz *recordXtime) static void verifyBackupPageConsistency(XLogReaderState *record) { - RmgrId rmid = XLogRecGetRmid(record); + RmgrData *rmgr = GetRmgr(XLogRecGetRmid(record)); RelFileNode rnode; ForkNumber forknum; BlockNumber blkno; @@ -2343,10 +2332,10 @@ verifyBackupPageConsistency(XLogReaderState *record) * If masking function is defined, mask both the primary and replay * images */ - if (RmgrTable[rmid].rm_mask != NULL) + if (rmgr->rm_mask != NULL) { - RmgrTable[rmid].rm_mask(replay_image_masked, blkno); - RmgrTable[rmid].rm_mask(primary_image_masked, blkno); + rmgr->rm_mask(replay_image_masked, blkno); + rmgr->rm_mask(primary_image_masked, blkno); } /* Time to compare the primary and replay images. */ diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c index 77bc7aea7a0..3bdeda6b6a9 100644 --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -94,7 +94,7 @@ LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *recor { XLogRecordBuffer buf; TransactionId txid; - RmgrId rmid; + RmgrData *rmgr; buf.origptr = ctx->reader->ReadRecPtr; buf.endptr = ctx->reader->EndRecPtr; @@ -115,10 +115,10 @@ LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *recor buf.origptr); } - rmid = XLogRecGetRmid(record); + rmgr = GetRmgr(XLogRecGetRmid(record)); - if (RmgrTable[rmid].rm_decode != NULL) - RmgrTable[rmid].rm_decode(ctx, &buf); + if (rmgr->rm_decode != NULL) + rmgr->rm_decode(ctx, &buf); else { /* just deal with xid, and done */ diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c index 9e8ab1420d9..d3e7dbc3bf8 100644 --- a/src/backend/utils/misc/guc.c +++ b/src/backend/utils/misc/guc.c @@ -11775,8 +11775,11 @@ check_wal_consistency_checking(char **newval, void **extra, GucSource source) if (pg_strcasecmp(tok, "all") == 0) { for (rmid = 0; rmid <= RM_MAX_ID; rmid++) - if (RmgrTable[rmid].rm_mask != NULL) + { + RmgrData *rmgr = GetRmgr(rmid); + if (rmgr != NULL && rmgr->rm_mask != NULL) newwalconsistency[rmid] = true; + } found = true; } else @@ -11787,8 +11790,9 @@ check_wal_consistency_checking(char **newval, void **extra, GucSource source) */ for (rmid = 0; rmid <= RM_MAX_ID; rmid++) { - if (pg_strcasecmp(tok, RmgrTable[rmid].rm_name) == 0 && - RmgrTable[rmid].rm_mask != NULL) + RmgrData *rmgr = GetRmgr(rmid); + if (rmgr != NULL && rmgr->rm_mask != NULL && + pg_strcasecmp(tok, rmgr->rm_name) == 0) { newwalconsistency[rmid] = true; found = true; diff --git a/src/bin/pg_rewind/parsexlog.c b/src/bin/pg_rewind/parsexlog.c index 49966e7b7fd..dfa836d1561 100644 --- a/src/bin/pg_rewind/parsexlog.c +++ b/src/bin/pg_rewind/parsexlog.c @@ -25,8 +25,8 @@ #include "pg_rewind.h" /* - * RmgrNames is an array of resource manager names, to make error messages - * a bit nicer. + * RmgrNames is an array of the built-in resource manager names, to make error + * messages a bit nicer. */ #define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask,decode) \ name, @@ -35,6 +35,9 @@ static const char *RmgrNames[RM_MAX_ID + 1] = { #include "access/rmgrlist.h" }; +#define RmgrName(rmid) (((rmid) <= RM_MAX_BUILTIN_ID) ? \ + RmgrNames[rmid] : "custom") + static void extractPageInfo(XLogReaderState *record); static int xlogreadfd = -1; @@ -436,9 +439,9 @@ extractPageInfo(XLogReaderState *record) * track that change. */ pg_fatal("WAL record modifies a relation, but record type is not recognized: " - "lsn: %X/%X, rmgr: %s, info: %02X", + "lsn: %X/%X, rmid: %d, rmgr: %s, info: %02X", LSN_FORMAT_ARGS(record->ReadRecPtr), - RmgrNames[rmid], info); + rmid, RmgrName(rmid), info); } for (block_id = 0; block_id <= XLogRecMaxBlockId(record); block_id++) diff --git a/src/bin/pg_waldump/pg_waldump.c b/src/bin/pg_waldump/pg_waldump.c index 4cb40d068a9..afd786fe7bd 100644 --- a/src/bin/pg_waldump/pg_waldump.c +++ b/src/bin/pg_waldump/pg_waldump.c @@ -80,8 +80,8 @@ typedef struct XLogDumpStats uint64 count; XLogRecPtr startptr; XLogRecPtr endptr; - Stats rmgr_stats[RM_NEXT_ID]; - Stats record_stats[RM_NEXT_ID][MAX_XLINFO_TYPES]; + Stats rmgr_stats[RM_MAX_ID + 1]; + Stats record_stats[RM_MAX_ID + 1][MAX_XLINFO_TYPES]; } XLogDumpStats; #define fatal_error(...) do { pg_log_fatal(__VA_ARGS__); exit(EXIT_FAILURE); } while(0) @@ -104,9 +104,9 @@ print_rmgr_list(void) { int i; - for (i = 0; i <= RM_MAX_ID; i++) + for (i = 0; i <= RM_MAX_BUILTIN_ID; i++) { - printf("%s\n", RmgrDescTable[i].rm_name); + printf("%s\n", GetRmgrDesc(i)->rm_name); } } @@ -535,7 +535,7 @@ static void XLogDumpDisplayRecord(XLogDumpConfig *config, XLogReaderState *record) { const char *id; - const RmgrDescData *desc = &RmgrDescTable[XLogRecGetRmid(record)]; + const RmgrDescData *desc = GetRmgrDesc(XLogRecGetRmid(record)); uint32 rec_len; uint32 fpi_len; RelFileNode rnode; @@ -720,7 +720,7 @@ XLogDumpDisplayStats(XLogDumpConfig *config, XLogDumpStats *stats) * calculate column totals. */ - for (ri = 0; ri < RM_NEXT_ID; ri++) + for (ri = 0; ri < RM_MAX_ID; ri++) { total_count += stats->rmgr_stats[ri].count; total_rec_len += stats->rmgr_stats[ri].rec_len; @@ -741,13 +741,18 @@ XLogDumpDisplayStats(XLogDumpConfig *config, XLogDumpStats *stats) "Type", "N", "(%)", "Record size", "(%)", "FPI size", "(%)", "Combined size", "(%)", "----", "-", "---", "-----------", "---", "--------", "---", "-------------", "---"); - for (ri = 0; ri < RM_NEXT_ID; ri++) + for (ri = 0; ri <= RM_MAX_ID; ri++) { uint64 count, rec_len, fpi_len, tot_len; - const RmgrDescData *desc = &RmgrDescTable[ri]; + const RmgrDescData *desc; + + if (!RM_IS_VALID(ri)) + continue; + + desc = GetRmgrDesc(ri); if (!config->stats_per_record) { @@ -756,6 +761,9 @@ XLogDumpDisplayStats(XLogDumpConfig *config, XLogDumpStats *stats) fpi_len = stats->rmgr_stats[ri].fpi_len; tot_len = rec_len + fpi_len; + if (RM_IS_CUSTOM(ri) && count == 0) + continue; + XLogDumpStatsRow(desc->rm_name, count, total_count, rec_len, total_rec_len, fpi_len, total_fpi_len, tot_len, total_len); @@ -1000,7 +1008,7 @@ main(int argc, char **argv) break; case 'r': { - int i; + int rmid; if (pg_strcasecmp(optarg, "list") == 0) { @@ -1008,20 +1016,41 @@ main(int argc, char **argv) exit(EXIT_SUCCESS); } - for (i = 0; i <= RM_MAX_ID; i++) + /* + * First look for the generated name of a custom rmgr, of + * the form "custom###". We accept this form, because the + * custom rmgr module is not loaded, so there's no way to + * compare with the real name. + */ + if (sscanf(optarg, "custom%03d", &rmid) == 1) { - if (pg_strcasecmp(optarg, RmgrDescTable[i].rm_name) == 0) + if (!RM_IS_CUSTOM(rmid)) { - config.filter_by_rmgr[i] = true; - config.filter_by_rmgr_enabled = true; - break; + pg_log_error("custom resource manager \"%s\" does not exist", + optarg); + goto bad_argument; } + config.filter_by_rmgr[rmid] = true; + config.filter_by_rmgr_enabled = true; } - if (i > RM_MAX_ID) + else { - pg_log_error("resource manager \"%s\" does not exist", - optarg); - goto bad_argument; + /* then look for builtin rmgrs */ + for (rmid = 0; rmid <= RM_MAX_BUILTIN_ID; rmid++) + { + if (pg_strcasecmp(optarg, GetRmgrDesc(rmid)->rm_name) == 0) + { + config.filter_by_rmgr[rmid] = true; + config.filter_by_rmgr_enabled = true; + break; + } + } + if (rmid > RM_MAX_BUILTIN_ID) + { + pg_log_error("resource manager \"%s\" does not exist", + optarg); + goto bad_argument; + } } } break; diff --git a/src/bin/pg_waldump/rmgrdesc.c b/src/bin/pg_waldump/rmgrdesc.c index 6a4ebd1310b..0f4a4b30827 100644 --- a/src/bin/pg_waldump/rmgrdesc.c +++ b/src/bin/pg_waldump/rmgrdesc.c @@ -35,6 +35,64 @@ #define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask,decode) \ { name, desc, identify}, -const RmgrDescData RmgrDescTable[RM_MAX_ID + 1] = { +static const RmgrDescData RmgrDescTable[RM_MAX_BUILTIN_ID + 1] = { #include "access/rmgrlist.h" }; + +/* + * We are unable to get the real name of a custom rmgr because the module is + * not loaded. Generate a table of numeric names of the form "custom###" where + * "###" is the 3-digit resource manager ID. + */ +#define CUSTOM_NUMERIC_NAME_LEN sizeof("custom###") + +static bool CustomNumericNamesInitialized = false; +static char CustomNumericNames[RM_N_CUSTOM_IDS][CUSTOM_NUMERIC_NAME_LEN] = {0}; + +/* + * No information on custom resource managers; just print the ID. + */ +static void +default_desc(StringInfo buf, XLogReaderState *record) +{ + appendStringInfo(buf, "rmid: %d", XLogRecGetRmid(record)); +} + +/* + * No information on custom resource managers; just return NULL and let the + * caller handle it. + */ +static const char * +default_identify(uint8 info) +{ + return NULL; +} + +const RmgrDescData * +GetRmgrDesc(RmgrId rmid) +{ + Assert(RM_IS_VALID(rmid)); + + if (RM_IS_BUILTIN(rmid)) + { + return &RmgrDescTable[rmid]; + } + else + { + if (!CustomNumericNamesInitialized) + { + for (int i = 0; i < RM_N_CUSTOM_IDS; i++) + { + snprintf(CustomNumericNames[i], CUSTOM_NUMERIC_NAME_LEN, + "custom%03d", i + RM_MIN_CUSTOM_ID); + } + CustomNumericNamesInitialized = true; + } + + return &(RmgrDescData) { + CustomNumericNames[rmid - RM_MIN_CUSTOM_ID], + default_desc, + default_identify + }; + } +} diff --git a/src/bin/pg_waldump/rmgrdesc.h b/src/bin/pg_waldump/rmgrdesc.h index 42f8483b482..f733cd467d5 100644 --- a/src/bin/pg_waldump/rmgrdesc.h +++ b/src/bin/pg_waldump/rmgrdesc.h @@ -18,6 +18,6 @@ typedef struct RmgrDescData const char *(*rm_identify) (uint8 info); } RmgrDescData; -extern const RmgrDescData RmgrDescTable[]; +extern const RmgrDescData *GetRmgrDesc(RmgrId rmid); #endif /* RMGRDESC_H */ diff --git a/src/include/access/rmgr.h b/src/include/access/rmgr.h index d9b512630ca..6e407f5df66 100644 --- a/src/include/access/rmgr.h +++ b/src/include/access/rmgr.h @@ -30,6 +30,19 @@ typedef enum RmgrIds #undef PG_RMGR -#define RM_MAX_ID (RM_NEXT_ID - 1) +#define RM_MAX_ID UINT8_MAX +#define RM_MAX_BUILTIN_ID (RM_NEXT_ID - 1) +#define RM_MIN_CUSTOM_ID 128 +#define RM_N_CUSTOM_IDS (RM_MAX_ID - RM_MIN_CUSTOM_ID + 1) +#define RM_IS_BUILTIN(rmid) ((rmid) <= RM_MAX_BUILTIN_ID) +#define RM_IS_CUSTOM(rmid) ((rmid) >= RM_MIN_CUSTOM_ID && (rmid) < RM_MAX_ID) +#define RM_IS_VALID(rmid) (RM_IS_BUILTIN((rmid)) || RM_IS_CUSTOM((rmid))) + +/* + * RmgrId to use for extensions that require an RmgrId, but are still in + * development and have not reserved their own unique RmgrId yet. See: + * https://wiki.postgresql.org/wiki/ExtensibleRmgr + */ +#define RM_EXPERIMENTAL_ID 128 #endif /* RMGR_H */ diff --git a/src/include/access/xlog_internal.h b/src/include/access/xlog_internal.h index 0e94833129a..ecbe0fa1343 100644 --- a/src/include/access/xlog_internal.h +++ b/src/include/access/xlog_internal.h @@ -319,7 +319,16 @@ typedef struct RmgrData struct XLogRecordBuffer *buf); } RmgrData; -extern const RmgrData RmgrTable[]; +extern RmgrData *RmgrTable[]; +extern void RmgrStartup(void); +extern void RmgrCleanup(void); +extern void RegisterCustomRmgr(RmgrId rmid, RmgrData *rmgr); + +static inline RmgrData * +GetRmgr(RmgrId rmid) +{ + return RmgrTable[rmid]; +} /* * Exported to support xlog switching from checkpointer -- 2.17.1