On Thu, Apr 15, 2021 at 2:46 PM Amit Kapila <amit.kapil...@gmail.com> wrote: > > On Thu, Apr 15, 2021 at 1:13 PM Masahiko Sawada <sawada.m...@gmail.com> wrote: > > > > On Thu, Apr 15, 2021 at 3:22 PM Amit Kapila <amit.kapil...@gmail.com> wrote: > > > > > Thank you for updating the patch. > > > > I have one question on the doc change: > > > > + so the counter is not incremented for subtransactions. Note that > > this > > + includes the transactions streamed and or spilled. > > + </para></entry> > > > > The patch uses the sentence "streamed and or spilled" in two places. > > You meant “streamed and spilled”? Even if it actually means “and or”, > > using "and or” (i.g., connecting “and” to “or” by a space) is general? > > I could not find we use it other places in the doc but found we're > > using "and/or" instead. > > > > I changed it to 'and/or' and made another minor change.
I have rebased the remaining patches on top of head. Attached the patches for the same. Thoughts? Regards, Vignesh
From e66015cc7ce9c67280f6779bca56a257652a4342 Mon Sep 17 00:00:00 2001 From: vignesh <vignes...@gmail.com> Date: Fri, 16 Apr 2021 08:33:40 +0530 Subject: [PATCH v13 1/2] Use HTAB for replication slot statistics. Previously, we used to use the array to store stats for repilcation slots. But this had two problems in case where drop-slot-stats message is lost: 1) the stats for the new slot are not recorded and 2) writing beyond the end of the array when after restring the number of slots whose stats are stored in the stats file exceeds max_replication_slots. This commit changes to use HTAB for replication slot statistics, resolving both problems. Instead, we have pgstat_vacuum_stat() checks if a slot for stats entry in the stats collector still exists or not. Then send drop-slot-stats message. --- src/backend/catalog/system_views.sql | 30 +-- src/backend/postmaster/pgstat.c | 268 +++++++++++----------- src/backend/replication/logical/logical.c | 2 +- src/backend/replication/slot.c | 23 +- src/backend/utils/adt/pgstatfuncs.c | 135 ++++++----- src/include/catalog/pg_proc.dat | 14 +- src/include/pgstat.h | 8 +- src/include/replication/slot.h | 2 +- src/test/regress/expected/rules.out | 4 +- 9 files changed, 258 insertions(+), 228 deletions(-) diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index 6d78b33590..fba45473c1 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -866,20 +866,6 @@ CREATE VIEW pg_stat_replication AS JOIN pg_stat_get_wal_senders() AS W ON (S.pid = W.pid) LEFT JOIN pg_authid AS U ON (S.usesysid = U.oid); -CREATE VIEW pg_stat_replication_slots AS - SELECT - s.slot_name, - s.spill_txns, - s.spill_count, - s.spill_bytes, - s.stream_txns, - s.stream_count, - s.stream_bytes, - s.total_txns, - s.total_bytes, - s.stats_reset - FROM pg_stat_get_replication_slots() AS s; - CREATE VIEW pg_stat_slru AS SELECT s.name, @@ -984,6 +970,22 @@ CREATE VIEW pg_replication_slots AS FROM pg_get_replication_slots() AS L LEFT JOIN pg_database D ON (L.datoid = D.oid); +CREATE VIEW pg_stat_replication_slots AS + SELECT + s.slot_name, + s.spill_txns, + s.spill_count, + s.spill_bytes, + s.stream_txns, + s.stream_count, + s.stream_bytes, + s.total_txns, + s.total_bytes, + s.stats_reset + FROM pg_replication_slots as r, + LATERAL pg_stat_get_replication_slot(slot_name) as s + WHERE r.datoid IS NOT NULL; -- excluding physical slots + CREATE VIEW pg_stat_database AS SELECT D.oid AS datid, diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c index e1ec7d8b7d..9ded0a75e9 100644 --- a/src/backend/postmaster/pgstat.c +++ b/src/backend/postmaster/pgstat.c @@ -106,6 +106,7 @@ #define PGSTAT_DB_HASH_SIZE 16 #define PGSTAT_TAB_HASH_SIZE 512 #define PGSTAT_FUNCTION_HASH_SIZE 512 +#define PGSTAT_REPLSLOT_HASH_SIZE 32 /* ---------- @@ -278,8 +279,7 @@ static PgStat_ArchiverStats archiverStats; static PgStat_GlobalStats globalStats; static PgStat_WalStats walStats; static PgStat_SLRUStats slruStats[SLRU_NUM_ELEMENTS]; -static PgStat_ReplSlotStats *replSlotStats; -static int nReplSlotStats; +static HTAB *replSlotStats = NULL; static PgStat_RecoveryPrefetchStats recoveryPrefetchStats; /* @@ -319,8 +319,8 @@ static void backend_read_statsfile(void); static bool pgstat_write_statsfile_needed(void); static bool pgstat_db_requested(Oid databaseid); -static int pgstat_replslot_index(const char *name, bool create_it); -static void pgstat_reset_replslot(int i, TimestampTz ts); +static PgStat_ReplSlotEntry *pgstat_get_replslot_entry(NameData name, bool create_it); +static void pgstat_reset_replslot(PgStat_ReplSlotEntry *slotstats, TimestampTz ts); static void pgstat_send_tabstat(PgStat_MsgTabstat *tsmsg); static void pgstat_send_funcstats(void); @@ -1109,6 +1109,24 @@ pgstat_vacuum_stat(void) /* Clean up */ hash_destroy(htab); + /* + * Check for all replication slots in stats hash table. We do this check + * when replSlotStats has more than max_replication_slots entries, i.e, + * when there are stats for the already-dropped slot, to avoid frequent + * call SearchNamedReplicationSlot() which acquires LWLock. + */ + if (replSlotStats && hash_get_num_entries(replSlotStats) > max_replication_slots) + { + PgStat_ReplSlotEntry *slotentry; + + hash_seq_init(&hstat, replSlotStats); + while ((slotentry = (PgStat_ReplSlotEntry *) hash_seq_search(&hstat)) != NULL) + { + if (SearchNamedReplicationSlot(NameStr(slotentry->slotname), true) == NULL) + pgstat_report_replslot_drop(NameStr(slotentry->slotname)); + } + } + /* * Lookup our own database entry; if not found, nothing more to do. */ @@ -1516,30 +1534,6 @@ pgstat_reset_replslot_counter(const char *name) if (name) { - ReplicationSlot *slot; - - /* - * Check if the slot exists with the given name. It is possible that by - * the time this message is executed the slot is dropped but at least - * this check will ensure that the given name is for a valid slot. - */ - LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); - slot = SearchNamedReplicationSlot(name); - LWLockRelease(ReplicationSlotControlLock); - - if (!slot) - ereport(ERROR, - (errcode(ERRCODE_INVALID_PARAMETER_VALUE), - errmsg("replication slot \"%s\" does not exist", - name))); - - /* - * Nothing to do for physical slots as we collect stats only for - * logical slots. - */ - if (SlotIsPhysical(slot)) - return; - namestrcpy(&msg.m_slotname, name); msg.clearall = false; } @@ -1813,7 +1807,7 @@ pgstat_report_tempfile(size_t filesize) * ---------- */ void -pgstat_report_replslot(const PgStat_ReplSlotStats *repSlotStat) +pgstat_report_replslot(const PgStat_ReplSlotEntry *repSlotStat) { PgStat_MsgReplSlot msg; @@ -2872,17 +2866,19 @@ pgstat_fetch_slru(void) * pgstat_fetch_replslot() - * * Support function for the SQL-callable pgstat* functions. Returns - * a pointer to the replication slot statistics struct and sets the - * number of entries in nslots_p. + * a pointer to the replication slot statistics struct. * --------- */ -PgStat_ReplSlotStats * -pgstat_fetch_replslot(int *nslots_p) +PgStat_ReplSlotEntry * +pgstat_fetch_replslot(NameData slotname) { + PgStat_ReplSlotEntry *slotent = NULL; + backend_read_statsfile(); - *nslots_p = nReplSlotStats; - return replSlotStats; + slotent = pgstat_get_replslot_entry(slotname, false); + + return slotent; } /* @@ -3654,7 +3650,6 @@ pgstat_write_statsfiles(bool permanent, bool allDbs) const char *tmpfile = permanent ? PGSTAT_STAT_PERMANENT_TMPFILE : pgstat_stat_tmpname; const char *statfile = permanent ? PGSTAT_STAT_PERMANENT_FILENAME : pgstat_stat_filename; int rc; - int i; elog(DEBUG2, "writing stats file \"%s\"", statfile); @@ -3744,11 +3739,17 @@ pgstat_write_statsfiles(bool permanent, bool allDbs) /* * Write replication slot stats struct */ - for (i = 0; i < nReplSlotStats; i++) + if (replSlotStats) { - fputc('R', fpout); - rc = fwrite(&replSlotStats[i], sizeof(PgStat_ReplSlotStats), 1, fpout); - (void) rc; /* we'll check for error with ferror */ + PgStat_ReplSlotEntry *slotentry; + + hash_seq_init(&hstat, replSlotStats); + while ((slotentry = (PgStat_ReplSlotEntry *) hash_seq_search(&hstat)) != NULL) + { + fputc('R', fpout); + rc = fwrite(slotentry, sizeof(PgStat_ReplSlotEntry), 1, fpout); + (void) rc; /* we'll check for error with ferror */ + } } /* @@ -3975,12 +3976,6 @@ pgstat_read_statsfiles(Oid onlydb, bool permanent, bool deep) dbhash = hash_create("Databases hash", PGSTAT_DB_HASH_SIZE, &hash_ctl, HASH_ELEM | HASH_BLOBS | HASH_CONTEXT); - /* Allocate the space for replication slot statistics */ - replSlotStats = MemoryContextAllocZero(pgStatLocalContext, - max_replication_slots - * sizeof(PgStat_ReplSlotStats)); - nReplSlotStats = 0; - /* * Clear out global, archiver, WAL and SLRU statistics so they start from * zero in case we can't load an existing statsfile. @@ -4005,12 +4000,6 @@ pgstat_read_statsfiles(Oid onlydb, bool permanent, bool deep) for (i = 0; i < SLRU_NUM_ELEMENTS; i++) slruStats[i].stat_reset_timestamp = globalStats.stat_reset_timestamp; - /* - * Set the same reset timestamp for all replication slots too. - */ - for (i = 0; i < max_replication_slots; i++) - replSlotStats[i].stat_reset_timestamp = globalStats.stat_reset_timestamp; - /* * Try to open the stats file. If it doesn't exist, the backends simply * return zero for anything and the collector simply starts from scratch @@ -4197,21 +4186,27 @@ pgstat_read_statsfiles(Oid onlydb, bool permanent, bool deep) break; /* - * 'R' A PgStat_ReplSlotStats struct describing a replication + * 'R' A PgStat_ReplSlotEntry struct describing a replication * slot follows. */ case 'R': - if (fread(&replSlotStats[nReplSlotStats], 1, sizeof(PgStat_ReplSlotStats), fpin) - != sizeof(PgStat_ReplSlotStats)) + { + PgStat_ReplSlotEntry slotstats; + PgStat_ReplSlotEntry *slotent; + + if (fread(&slotstats, 1, sizeof(PgStat_ReplSlotEntry), fpin) + != sizeof(PgStat_ReplSlotEntry)) { ereport(pgStatRunningInCollector ? LOG : WARNING, (errmsg("corrupted statistics file \"%s\"", statfile))); - memset(&replSlotStats[nReplSlotStats], 0, sizeof(PgStat_ReplSlotStats)); goto done; } - nReplSlotStats++; + + slotent = pgstat_get_replslot_entry(slotstats.slotname, true); + memcpy(slotent, &slotstats, sizeof(PgStat_ReplSlotEntry)); break; + } case 'E': goto done; @@ -4424,7 +4419,7 @@ pgstat_read_db_statsfile_timestamp(Oid databaseid, bool permanent, PgStat_ArchiverStats myArchiverStats; PgStat_WalStats myWalStats; PgStat_SLRUStats mySLRUStats[SLRU_NUM_ELEMENTS]; - PgStat_ReplSlotStats myReplSlotStats; + PgStat_ReplSlotEntry myReplSlotStats; PgStat_RecoveryPrefetchStats myRecoveryPrefetchStats; FILE *fpin; int32 format_id; @@ -4553,12 +4548,12 @@ pgstat_read_db_statsfile_timestamp(Oid databaseid, bool permanent, break; /* - * 'R' A PgStat_ReplSlotStats struct describing a replication + * 'R' A PgStat_ReplSlotEntry struct describing a replication * slot follows. */ case 'R': - if (fread(&myReplSlotStats, 1, sizeof(PgStat_ReplSlotStats), fpin) - != sizeof(PgStat_ReplSlotStats)) + if (fread(&myReplSlotStats, 1, sizeof(PgStat_ReplSlotEntry), fpin) + != sizeof(PgStat_ReplSlotEntry)) { ereport(pgStatRunningInCollector ? LOG : WARNING, (errmsg("corrupted statistics file \"%s\"", @@ -4765,7 +4760,6 @@ pgstat_clear_snapshot(void) pgStatLocalContext = NULL; pgStatDBHash = NULL; replSlotStats = NULL; - nReplSlotStats = 0; /* * Historically the backend_status.c facilities lived in this file, and @@ -5189,20 +5183,26 @@ static void pgstat_recv_resetreplslotcounter(PgStat_MsgResetreplslotcounter *msg, int len) { - int i; - int idx = -1; + PgStat_ReplSlotEntry *slotent; TimestampTz ts; + /* Return if we don't have replication slot statistics */ + if (replSlotStats == NULL) + return; + ts = GetCurrentTimestamp(); if (msg->clearall) { - for (i = 0; i < nReplSlotStats; i++) - pgstat_reset_replslot(i, ts); + HASH_SEQ_STATUS sstat; + + hash_seq_init(&sstat, replSlotStats); + while ((slotent = (PgStat_ReplSlotEntry *) hash_seq_search(&sstat)) != NULL) + pgstat_reset_replslot(slotent, ts); } else { /* Get the index of replication slot statistics to reset */ - idx = pgstat_replslot_index(NameStr(msg->m_slotname), false); + slotent = pgstat_get_replslot_entry(msg->m_slotname, false); /* * Nothing to do if the given slot entry is not found. This could @@ -5210,11 +5210,11 @@ pgstat_recv_resetreplslotcounter(PgStat_MsgResetreplslotcounter *msg, * corresponding statistics entry is also removed before receiving the * reset message. */ - if (idx < 0) + if (!slotent) return; /* Reset the stats for the requested replication slot */ - pgstat_reset_replslot(idx, ts); + pgstat_reset_replslot(slotent, ts); } } @@ -5532,46 +5532,29 @@ pgstat_recv_checksum_failure(PgStat_MsgChecksumFailure *msg, int len) static void pgstat_recv_replslot(PgStat_MsgReplSlot *msg, int len) { - int idx; - - /* - * Get the index of replication slot statistics. On dropping, we don't - * create the new statistics. - */ - idx = pgstat_replslot_index(NameStr(msg->m_slotname), !msg->m_drop); - - /* - * The slot entry is not found or there is no space to accommodate the new - * entry. This could happen when the message for the creation of a slot - * reached before the drop message even though the actual operations - * happen in reverse order. In such a case, the next update of the - * statistics for the same slot will create the required entry. - */ - if (idx < 0) - return; - - /* it must be a valid replication slot index */ - Assert(idx < nReplSlotStats); - if (msg->m_drop) { /* Remove the replication slot statistics with the given name */ - if (idx < nReplSlotStats - 1) - memcpy(&replSlotStats[idx], &replSlotStats[nReplSlotStats - 1], - sizeof(PgStat_ReplSlotStats)); - nReplSlotStats--; + if (replSlotStats != NULL) + (void) hash_search(replSlotStats, (void *) NameStr(msg->m_slotname), + HASH_REMOVE, NULL); } else { + PgStat_ReplSlotEntry *slotent; + + slotent = pgstat_get_replslot_entry(msg->m_slotname, true); + Assert(slotent); + /* Update the replication slot statistics */ - replSlotStats[idx].spill_txns += msg->m_spill_txns; - replSlotStats[idx].spill_count += msg->m_spill_count; - replSlotStats[idx].spill_bytes += msg->m_spill_bytes; - replSlotStats[idx].stream_txns += msg->m_stream_txns; - replSlotStats[idx].stream_count += msg->m_stream_count; - replSlotStats[idx].stream_bytes += msg->m_stream_bytes; - replSlotStats[idx].total_txns += msg->m_total_txns; - replSlotStats[idx].total_bytes += msg->m_total_bytes; + slotent->spill_txns += msg->m_spill_txns; + slotent->spill_count += msg->m_spill_count; + slotent->spill_bytes += msg->m_spill_bytes; + slotent->stream_txns += msg->m_stream_txns; + slotent->stream_count += msg->m_stream_count; + slotent->stream_bytes += msg->m_stream_bytes; + slotent->total_txns += msg->m_total_txns; + slotent->total_bytes += msg->m_total_bytes; } } @@ -5749,59 +5732,78 @@ pgstat_db_requested(Oid databaseid) } /* ---------- - * pgstat_replslot_index + * pgstat_get_replslot_entry * - * Return the index of entry of a replication slot with the given name, or - * -1 if the slot is not found. + * Return the entry of replication slot stats with the given name. Return + * NULL if not found and the caller didn't request to create it. * * create_it tells whether to create the new slot entry if it is not found. * ---------- */ -static int -pgstat_replslot_index(const char *name, bool create_it) +static PgStat_ReplSlotEntry * +pgstat_get_replslot_entry(NameData name, bool create_it) { - int i; + PgStat_ReplSlotEntry *slotent; + bool found; - Assert(nReplSlotStats <= max_replication_slots); - for (i = 0; i < nReplSlotStats; i++) + /* + * Create the replication slot stats hash table if we don't have + * it already. + */ + if (replSlotStats == NULL) { - if (namestrcmp(&replSlotStats[i].slotname, name) == 0) - return i; /* found */ + HASHCTL hash_ctl; + + hash_ctl.keysize = sizeof(NameData); + hash_ctl.entrysize = sizeof(PgStat_ReplSlotEntry); + hash_ctl.hcxt = pgStatLocalContext; + + replSlotStats = hash_create("Replication slots hash", + PGSTAT_REPLSLOT_HASH_SIZE, + &hash_ctl, + HASH_ELEM | HASH_BLOBS | HASH_CONTEXT); } - /* - * The slot is not found. We don't want to register the new statistics if - * the list is already full or the caller didn't request. - */ - if (i == max_replication_slots || !create_it) - return -1; + slotent = (PgStat_ReplSlotEntry *) hash_search(replSlotStats, + (void *) &name, + create_it ? HASH_ENTER : HASH_FIND, + &found); + if (!slotent) + { + /* not found */ + Assert(!create_it && !found); + return NULL; + } - /* Register new slot */ - memset(&replSlotStats[nReplSlotStats], 0, sizeof(PgStat_ReplSlotStats)); - namestrcpy(&replSlotStats[nReplSlotStats].slotname, name); + /* initialize the entry */ + if (create_it && !found) + { + memset(slotent, 0, sizeof(PgStat_ReplSlotEntry)); + namestrcpy(&(slotent->slotname), NameStr(name)); + } - return nReplSlotStats++; + return slotent; } /* ---------- * pgstat_reset_replslot * - * Reset the replication slot stats at index 'i'. + * Reset the given replication slot stats. * ---------- */ static void -pgstat_reset_replslot(int i, TimestampTz ts) +pgstat_reset_replslot(PgStat_ReplSlotEntry *slotent, TimestampTz ts) { /* reset only counters. Don't clear slot name */ - replSlotStats[i].spill_txns = 0; - replSlotStats[i].spill_count = 0; - replSlotStats[i].spill_bytes = 0; - replSlotStats[i].stream_txns = 0; - replSlotStats[i].stream_count = 0; - replSlotStats[i].stream_bytes = 0; - replSlotStats[i].total_txns = 0; - replSlotStats[i].total_bytes = 0; - replSlotStats[i].stat_reset_timestamp = ts; + slotent->spill_txns = 0; + slotent->spill_count = 0; + slotent->spill_bytes = 0; + slotent->stream_txns = 0; + slotent->stream_count = 0; + slotent->stream_bytes = 0; + slotent->total_txns = 0; + slotent->total_bytes = 0; + slotent->stat_reset_timestamp = ts; } /* diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 35b0c67641..93b31a46f6 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -1773,7 +1773,7 @@ void UpdateDecodingStats(LogicalDecodingContext *ctx) { ReorderBuffer *rb = ctx->reorder; - PgStat_ReplSlotStats repSlotStat; + PgStat_ReplSlotEntry repSlotStat; /* Nothing to do if we don't have any replication stats to be sent. */ if (rb->spillBytes <= 0 && rb->streamBytes <= 0 && rb->totalBytes <= 0) diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index f61b163f78..f75e7e95f9 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -329,8 +329,8 @@ ReplicationSlotCreate(const char *name, bool db_specific, */ if (SlotIsLogical(slot)) { - PgStat_ReplSlotStats repSlotStat; - MemSet(&repSlotStat, 0, sizeof(PgStat_ReplSlotStats)); + PgStat_ReplSlotEntry repSlotStat; + MemSet(&repSlotStat, 0, sizeof(PgStat_ReplSlotEntry)); namestrcpy(&repSlotStat.slotname, NameStr(slot->data.name)); pgstat_report_replslot(&repSlotStat); } @@ -349,17 +349,15 @@ ReplicationSlotCreate(const char *name, bool db_specific, * Search for the named replication slot. * * Return the replication slot if found, otherwise NULL. - * - * The caller must hold ReplicationSlotControlLock in shared mode. */ ReplicationSlot * -SearchNamedReplicationSlot(const char *name) +SearchNamedReplicationSlot(const char *name, bool need_lock) { int i; ReplicationSlot *slot = NULL; - Assert(LWLockHeldByMeInMode(ReplicationSlotControlLock, - LW_SHARED)); + if (need_lock) + LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); for (i = 0; i < max_replication_slots; i++) { @@ -372,6 +370,9 @@ SearchNamedReplicationSlot(const char *name) } } + if (need_lock) + LWLockRelease(ReplicationSlotControlLock); + return slot; } @@ -416,7 +417,7 @@ retry: * Search for the slot with the specified name if the slot to acquire is * not given. If the slot is not found, we either return -1 or error out. */ - s = slot ? slot : SearchNamedReplicationSlot(name); + s = slot ? slot : SearchNamedReplicationSlot(name, false); if (s == NULL || !s->in_use) { LWLockRelease(ReplicationSlotControlLock); @@ -712,7 +713,11 @@ ReplicationSlotDropPtr(ReplicationSlot *slot) * and create messages while holding ReplicationSlotAllocationLock to * reduce that possibility. If the messages reached in reverse, we would * lose one statistics update message. But the next update message will - * create the statistics for the replication slot. + * create the statistics for the replication slot. In case where the + * message for dropping the old slot gets lost and a slot with the same is + * created, the stats will be accumulated into the old slots since we + * use the slot name as the key. In that case, user can reset the particular + * stats by pg_stat_reset_replication_slot(). */ if (SlotIsLogical(slot)) pgstat_report_replslot_drop(NameStr(slot->data.name)); diff --git a/src/backend/utils/adt/pgstatfuncs.c b/src/backend/utils/adt/pgstatfuncs.c index 2680190a40..b1899d8c68 100644 --- a/src/backend/utils/adt/pgstatfuncs.c +++ b/src/backend/utils/adt/pgstatfuncs.c @@ -24,6 +24,7 @@ #include "pgstat.h" #include "postmaster/bgworker_internals.h" #include "postmaster/postmaster.h" +#include "replication/slot.h" #include "storage/proc.h" #include "storage/procarray.h" #include "utils/acl.h" @@ -2207,8 +2208,32 @@ pg_stat_reset_replication_slot(PG_FUNCTION_ARGS) char *target = NULL; if (!PG_ARGISNULL(0)) + { + ReplicationSlot *slot; + target = text_to_cstring(PG_GETARG_TEXT_PP(0)); + /* + * Check if the slot exists with the given name. It is possible that by + * the time this message is executed the slot is dropped but at least + * this check will ensure that the given name is for a valid slot. + */ + slot = SearchNamedReplicationSlot(target, true); + + if (!slot) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("replication slot \"%s\" does not exist", + target))); + + /* + * Nothing to do for physical slots as we collect stats only for + * logical slots. + */ + if (SlotIsPhysical(slot)) + PG_RETURN_VOID(); + } + pgstat_reset_replslot_counter(target); PG_RETURN_VOID(); @@ -2280,73 +2305,67 @@ pg_stat_get_archiver(PG_FUNCTION_ARGS) PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls))); } -/* Get the statistics for the replication slots */ +/* Get the statistics for the replication slot */ Datum -pg_stat_get_replication_slots(PG_FUNCTION_ARGS) +pg_stat_get_replication_slot(PG_FUNCTION_ARGS) { #define PG_STAT_GET_REPLICATION_SLOT_COLS 10 - ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; + text *slotname_text = PG_GETARG_TEXT_P(0); + NameData slotname; TupleDesc tupdesc; - Tuplestorestate *tupstore; - MemoryContext per_query_ctx; - MemoryContext oldcontext; - PgStat_ReplSlotStats *slotstats; - int nstats; - int i; - - /* check to see if caller supports us returning a tuplestore */ - if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo)) - ereport(ERROR, - (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("set-valued function called in context that cannot accept a set"))); - if (!(rsinfo->allowedModes & SFRM_Materialize)) - ereport(ERROR, - (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("materialize mode required, but it is not allowed in this context"))); - - /* Build a tuple descriptor for our result type */ - if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE) - elog(ERROR, "return type must be a row type"); - - per_query_ctx = rsinfo->econtext->ecxt_per_query_memory; - oldcontext = MemoryContextSwitchTo(per_query_ctx); - - tupstore = tuplestore_begin_heap(true, false, work_mem); - rsinfo->returnMode = SFRM_Materialize; - rsinfo->setResult = tupstore; - rsinfo->setDesc = tupdesc; + Datum values[10]; + bool nulls[10]; + PgStat_ReplSlotEntry *slotent; - MemoryContextSwitchTo(oldcontext); + /* Initialise values and NULL flags arrays */ + MemSet(values, 0, sizeof(values)); + MemSet(nulls, 0, sizeof(nulls)); - slotstats = pgstat_fetch_replslot(&nstats); - for (i = 0; i < nstats; i++) - { - Datum values[PG_STAT_GET_REPLICATION_SLOT_COLS]; - bool nulls[PG_STAT_GET_REPLICATION_SLOT_COLS]; - PgStat_ReplSlotStats *s = &(slotstats[i]); + /* Initialise attributes information in the tuple descriptor */ + tupdesc = CreateTemplateTupleDesc(PG_STAT_GET_REPLICATION_SLOT_COLS); + TupleDescInitEntry(tupdesc, (AttrNumber) 1, "slot_name", + TEXTOID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 2, "spill_txns", + INT8OID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 3, "spill_count", + INT8OID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 4, "spill_bytes", + INT8OID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 5, "stream_txns", + INT8OID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 6, "stream_count", + INT8OID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 7, "stream_bytes", + INT8OID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 8, "total_txns", + INT8OID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 9, "total_bytes", + INT8OID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 10, "stats_reset", + TIMESTAMPTZOID, -1, 0); + BlessTupleDesc(tupdesc); - MemSet(values, 0, sizeof(values)); - MemSet(nulls, 0, sizeof(nulls)); + namestrcpy(&slotname, text_to_cstring(slotname_text)); + slotent = pgstat_fetch_replslot(slotname); - values[0] = CStringGetTextDatum(NameStr(s->slotname)); - values[1] = Int64GetDatum(s->spill_txns); - values[2] = Int64GetDatum(s->spill_count); - values[3] = Int64GetDatum(s->spill_bytes); - values[4] = Int64GetDatum(s->stream_txns); - values[5] = Int64GetDatum(s->stream_count); - values[6] = Int64GetDatum(s->stream_bytes); - values[7] = Int64GetDatum(s->total_txns); - values[8] = Int64GetDatum(s->total_bytes); - - if (s->stat_reset_timestamp == 0) - nulls[9] = true; - else - values[9] = TimestampTzGetDatum(s->stat_reset_timestamp); + if (!slotent) + PG_RETURN_NULL(); - tuplestore_putvalues(tupstore, tupdesc, values, nulls); - } + values[0] = CStringGetTextDatum(NameStr(slotent->slotname)); + values[1] = Int64GetDatum(slotent->spill_txns); + values[2] = Int64GetDatum(slotent->spill_count); + values[3] = Int64GetDatum(slotent->spill_bytes); + values[4] = Int64GetDatum(slotent->stream_txns); + values[5] = Int64GetDatum(slotent->stream_count); + values[6] = Int64GetDatum(slotent->stream_bytes); + values[7] = Int64GetDatum(slotent->total_txns); + values[8] = Int64GetDatum(slotent->total_bytes); - tuplestore_donestoring(tupstore); + if (slotent->stat_reset_timestamp == 0) + nulls[9] = true; + else + values[9] = TimestampTzGetDatum(slotent->stat_reset_timestamp); - return (Datum) 0; + /* Returns the record as Datum */ + PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls))); } diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index 591753fe81..a37cb8a426 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -5311,14 +5311,14 @@ proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}', proargnames => '{pid,status,receive_start_lsn,receive_start_tli,written_lsn,flushed_lsn,received_tli,last_msg_send_time,last_msg_receipt_time,latest_end_lsn,latest_end_time,slot_name,sender_host,sender_port,conninfo}', prosrc => 'pg_stat_get_wal_receiver' }, -{ oid => '8595', descr => 'statistics: information about replication slots', - proname => 'pg_stat_get_replication_slots', prorows => '10', +{ oid => '8595', descr => 'statistics: information about replication slot', + proname => 'pg_stat_get_replication_slot', prorows => '1', proisstrict => 'f', proretset => 't', provolatile => 's', proparallel => 'r', - prorettype => 'record', proargtypes => '', - proallargtypes => '{text,int8,int8,int8,int8,int8,int8,int8,int8,timestamptz}', - proargmodes => '{o,o,o,o,o,o,o,o,o,o}', - proargnames => '{slot_name,spill_txns,spill_count,spill_bytes,stream_txns,stream_count,stream_bytes,total_txns,total_bytes,stats_reset}', - prosrc => 'pg_stat_get_replication_slots' }, + prorettype => 'record', proargtypes => 'text', + proallargtypes => '{text,text,int8,int8,int8,int8,int8,int8,int8,int8,timestamptz}', + proargmodes => '{i,o,o,o,o,o,o,o,o,o,o}', + proargnames => '{slot_name,slot_name,spill_txns,spill_count,spill_bytes,stream_txns,stream_count,stream_bytes,total_txns,total_bytes,stats_reset}', + prosrc => 'pg_stat_get_replication_slot' }, { oid => '6118', descr => 'statistics: information about subscription', proname => 'pg_stat_get_subscription', prorows => '10', proisstrict => 'f', proretset => 't', provolatile => 's', proparallel => 'r', diff --git a/src/include/pgstat.h b/src/include/pgstat.h index 2aeb3cded4..e9e16e5e2d 100644 --- a/src/include/pgstat.h +++ b/src/include/pgstat.h @@ -917,7 +917,7 @@ typedef struct PgStat_SLRUStats /* * Replication slot statistics kept in the stats collector */ -typedef struct PgStat_ReplSlotStats +typedef struct PgStat_ReplSlotEntry { NameData slotname; PgStat_Counter spill_txns; @@ -929,7 +929,7 @@ typedef struct PgStat_ReplSlotStats PgStat_Counter total_txns; PgStat_Counter total_bytes; TimestampTz stat_reset_timestamp; -} PgStat_ReplSlotStats; +} PgStat_ReplSlotEntry; /* @@ -1031,7 +1031,7 @@ extern void pgstat_report_recovery_conflict(int reason); extern void pgstat_report_deadlock(void); extern void pgstat_report_checksum_failures_in_db(Oid dboid, int failurecount); extern void pgstat_report_checksum_failure(void); -extern void pgstat_report_replslot(const PgStat_ReplSlotStats *repSlotStat); +extern void pgstat_report_replslot(const PgStat_ReplSlotEntry *repSlotStat); extern void pgstat_report_replslot_drop(const char *slotname); extern void pgstat_initialize(void); @@ -1129,7 +1129,7 @@ extern PgStat_ArchiverStats *pgstat_fetch_stat_archiver(void); extern PgStat_GlobalStats *pgstat_fetch_global(void); extern PgStat_WalStats *pgstat_fetch_stat_wal(void); extern PgStat_SLRUStats *pgstat_fetch_slru(void); -extern PgStat_ReplSlotStats *pgstat_fetch_replslot(int *nslots_p); +extern PgStat_ReplSlotEntry *pgstat_fetch_replslot(NameData slotname); extern PgStat_RecoveryPrefetchStats *pgstat_fetch_recoveryprefetch(void); extern void pgstat_count_slru_page_zeroed(int slru_idx); diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h index 1ad5e6c50d..357068403a 100644 --- a/src/include/replication/slot.h +++ b/src/include/replication/slot.h @@ -223,7 +223,7 @@ extern XLogRecPtr ReplicationSlotsComputeLogicalRestartLSN(void); extern bool ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive); extern void ReplicationSlotsDropDBSlots(Oid dboid); extern void InvalidateObsoleteReplicationSlots(XLogSegNo oldestSegno); -extern ReplicationSlot *SearchNamedReplicationSlot(const char *name); +extern ReplicationSlot *SearchNamedReplicationSlot(const char *name, bool need_lock); extern void ReplicationSlotNameForTablesync(Oid suboid, Oid relid, char *syncslotname, int szslot); extern void ReplicationSlotDropAtPubNode(WalReceiverConn *wrconn, char *slotname, bool missing_ok); diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out index 6399f3feef..ce2ecb914b 100644 --- a/src/test/regress/expected/rules.out +++ b/src/test/regress/expected/rules.out @@ -2071,7 +2071,9 @@ pg_stat_replication_slots| SELECT s.slot_name, s.total_txns, s.total_bytes, s.stats_reset - FROM pg_stat_get_replication_slots() s(slot_name, spill_txns, spill_count, spill_bytes, stream_txns, stream_count, stream_bytes, total_txns, total_bytes, stats_reset); + FROM pg_replication_slots r, + LATERAL pg_stat_get_replication_slot((r.slot_name)::text) s(slot_name, spill_txns, spill_count, spill_bytes, stream_txns, stream_count, stream_bytes, total_txns, total_bytes, stats_reset) + WHERE (r.datoid IS NOT NULL); pg_stat_slru| SELECT s.name, s.blks_zeroed, s.blks_hit, -- 2.25.1
From 410d14bd6aee29e29e75cdc8a49ea0d9fc0c0214 Mon Sep 17 00:00:00 2001 From: vignesh <vignes...@gmail.com> Date: Mon, 12 Apr 2021 16:10:11 +0530 Subject: [PATCH v13 2/2] Test where there are more replication slot statistics than max_replication_slot slots at startup. There is a remote scenario where one of the replication slots is dropped and the drop slot statistics message is not received by the statistic collector process, now if the max_replication_slots is reduced to the actual number of replication slots that are in use and the server is re-started then the statistics process will not be aware of this and the statistic collector process will write beyond the slots available, added a test for this. --- contrib/test_decoding/t/001_repl_stats.pl | 30 +++++++++++++++++++++-- 1 file changed, 28 insertions(+), 2 deletions(-) diff --git a/contrib/test_decoding/t/001_repl_stats.pl b/contrib/test_decoding/t/001_repl_stats.pl index 11b6cd9b9c..5ca4d3d7a7 100644 --- a/contrib/test_decoding/t/001_repl_stats.pl +++ b/contrib/test_decoding/t/001_repl_stats.pl @@ -2,9 +2,10 @@ # drop replication slot and restart. use strict; use warnings; +use File::Path qw(rmtree); use PostgresNode; use TestLib; -use Test::More tests => 1; +use Test::More tests => 2; # Test set-up my $node = get_new_node('test'); @@ -66,11 +67,36 @@ is($result, qq(regression_slot1|t|t regression_slot2|t|t regression_slot3|t|t), 'check replication statistics are updated'); +# Test to remove one of the replication slots and adjust +# max_replication_slots accordingly to the number of slots. This leads +# to a mismatch between the number of slots present in the stats file and the +# number of stats present in the shared memory, simulating the scenario for +# drop slot message lost by the statistics collector process. We verify +# replication statistics data is fine after restart. + +$node->stop; +my $datadir = $node->data_dir; +my $slot3_replslotdir = "$datadir/pg_replslot/regression_slot3"; + +rmtree($slot3_replslotdir); + +$node->append_conf('postgresql.conf', 'max_replication_slots = 2'); +$node->start; + +# Verify statistics data present in pg_stat_replication_slots are sane after +# restart. +$result = $node->safe_psql('postgres', + "SELECT slot_name, total_txns > 0 AS total_txn, + total_bytes > 0 AS total_bytes FROM pg_stat_replication_slots + ORDER BY slot_name" +); +is($result, qq(regression_slot1|t|t +regression_slot2|t|t), 'check replication statistics are updated'); + # cleanup $node->safe_psql('postgres', "DROP TABLE test_repl_stat"); $node->safe_psql('postgres', "SELECT pg_drop_replication_slot('regression_slot1')"); $node->safe_psql('postgres', "SELECT pg_drop_replication_slot('regression_slot2')"); -$node->safe_psql('postgres', "SELECT pg_drop_replication_slot('regression_slot3')"); # shutdown $node->stop; -- 2.25.1