From ee9e1dc7472c8ff30c31357290114e80c6aa4e8d Mon Sep 17 00:00:00 2001
From: Amit Kapila <akapila@postgresql.org>
Date: Fri, 23 Apr 2021 14:23:28 +0530
Subject: [PATCH v2] Update decoding stats during replication slot release.

Currently, replication slot statistics are updated at prepare, commit, and
rollback. Now, if the transaction is interrupted the stats might not get
updated. Fixed this by updating statistics during replication slot
release.

Moved the statistics related variables from ReorderBuffer to
ReplicationSlot structure to make them accessible at
ReplicationSlotRelease.

Reported-by: Andres Freund
Author: Vignesh C
Reviewed-by: Amit Kapila
Discussion: https://postgr.es/m/20210319185247.ldebgpdaxsowiflw@alap3.anarazel.de
---
 src/backend/replication/logical/decode.c      | 12 ++--
 src/backend/replication/logical/logical.c     | 55 +++++++++----------
 .../replication/logical/reorderbuffer.c       | 39 +++++++------
 src/backend/replication/slot.c                | 26 +++++++++
 src/include/replication/logical.h             |  2 +-
 src/include/replication/reorderbuffer.h       | 23 --------
 src/include/replication/slot.h                | 24 ++++++++
 7 files changed, 104 insertions(+), 77 deletions(-)

diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index 7924581cdcd..312fab07985 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -748,9 +748,10 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
 	/*
 	 * Update the decoding stats at transaction prepare/commit/abort. It is
 	 * not clear that sending more or less frequently than this would be
-	 * better.
+	 * better. We do send the stats in ReplicationSlotRelease to avoid losing
+	 * in case the decoding is interrupted.
 	 */
-	UpdateDecodingStats(ctx);
+	UpdateDecodingStats();
 }
 
 /*
@@ -830,9 +831,10 @@ DecodePrepare(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
 	/*
 	 * Update the decoding stats at transaction prepare/commit/abort. It is
 	 * not clear that sending more or less frequently than this would be
-	 * better.
+	 * better. We do send the stats in ReplicationSlotRelease to avoid losing
+	 * in case the decoding is interrupted.
 	 */
-	UpdateDecodingStats(ctx);
+	UpdateDecodingStats();
 }
 
 
@@ -889,7 +891,7 @@ DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
 	}
 
 	/* update the decoding stats */
-	UpdateDecodingStats(ctx);
+	UpdateDecodingStats();
 }
 
 /*
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 35b0c676412..156a9780f20 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -1770,44 +1770,39 @@ ResetLogicalStreamingState(void)
  * Report stats for a slot.
  */
 void
-UpdateDecodingStats(LogicalDecodingContext *ctx)
+UpdateDecodingStats()
 {
-	ReorderBuffer *rb = ctx->reorder;
 	PgStat_ReplSlotStats repSlotStat;
+	ReplicationSlot *slot = MyReplicationSlot;
+
+	Assert(MyReplicationSlot != NULL);
 
 	/* Nothing to do if we don't have any replication stats to be sent. */
-	if (rb->spillBytes <= 0 && rb->streamBytes <= 0 && rb->totalBytes <= 0)
+	if (slot->spillBytes <= 0 && slot->streamBytes <= 0 && slot->totalBytes <= 0)
 		return;
 
 	elog(DEBUG2, "UpdateDecodingStats: updating stats %p %lld %lld %lld %lld %lld %lld %lld %lld",
-		 rb,
-		 (long long) rb->spillTxns,
-		 (long long) rb->spillCount,
-		 (long long) rb->spillBytes,
-		 (long long) rb->streamTxns,
-		 (long long) rb->streamCount,
-		 (long long) rb->streamBytes,
-		 (long long) rb->totalTxns,
-		 (long long) rb->totalBytes);
-
-	namestrcpy(&repSlotStat.slotname, NameStr(ctx->slot->data.name));
-	repSlotStat.spill_txns = rb->spillTxns;
-	repSlotStat.spill_count = rb->spillCount;
-	repSlotStat.spill_bytes = rb->spillBytes;
-	repSlotStat.stream_txns = rb->streamTxns;
-	repSlotStat.stream_count = rb->streamCount;
-	repSlotStat.stream_bytes = rb->streamBytes;
-	repSlotStat.total_txns = rb->totalTxns;
-	repSlotStat.total_bytes = rb->totalBytes;
+		 slot,
+		 (long long) slot->spillTxns,
+		 (long long) slot->spillCount,
+		 (long long) slot->spillBytes,
+		 (long long) slot->streamTxns,
+		 (long long) slot->streamCount,
+		 (long long) slot->streamBytes,
+		 (long long) slot->totalTxns,
+		 (long long) slot->totalBytes);
+
+	namestrcpy(&repSlotStat.slotname, NameStr(slot->data.name));
+	repSlotStat.spill_txns = slot->spillTxns;
+	repSlotStat.spill_count = slot->spillCount;
+	repSlotStat.spill_bytes = slot->spillBytes;
+	repSlotStat.stream_txns = slot->streamTxns;
+	repSlotStat.stream_count = slot->streamCount;
+	repSlotStat.stream_bytes = slot->streamBytes;
+	repSlotStat.total_txns = slot->totalTxns;
+	repSlotStat.total_bytes = slot->totalBytes;
 
 	pgstat_report_replslot(&repSlotStat);
 
-	rb->spillTxns = 0;
-	rb->spillCount = 0;
-	rb->spillBytes = 0;
-	rb->streamTxns = 0;
-	rb->streamCount = 0;
-	rb->streamBytes = 0;
-	rb->totalTxns = 0;
-	rb->totalBytes = 0;
+	InitializeSlotStats(slot);
 }
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 5cb484f0323..700ccf542c9 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -344,15 +344,6 @@ ReorderBufferAllocate(void)
 	buffer->outbufsize = 0;
 	buffer->size = 0;
 
-	buffer->spillTxns = 0;
-	buffer->spillCount = 0;
-	buffer->spillBytes = 0;
-	buffer->streamTxns = 0;
-	buffer->streamCount = 0;
-	buffer->streamBytes = 0;
-	buffer->totalTxns = 0;
-	buffer->totalBytes = 0;
-
 	buffer->current_restart_decoding_lsn = InvalidXLogRecPtr;
 
 	dlist_init(&buffer->toplevel_by_lsn);
@@ -1316,6 +1307,9 @@ ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state)
 	ReorderBufferChange *change;
 	ReorderBufferIterTXNEntry *entry;
 	int32		off;
+	ReplicationSlot *slot = MyReplicationSlot;
+
+	Assert(MyReplicationSlot != NULL);
 
 	/* nothing there anymore */
 	if (state->heap->bh_size == 0)
@@ -1369,7 +1363,7 @@ ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state)
 		 * Update the total bytes processed before releasing the current set
 		 * of changes and restoring the new set of changes.
 		 */
-		rb->totalBytes += rb->size;
+		slot->totalBytes += rb->size;
 		if (ReorderBufferRestoreChanges(rb, entry->txn, &entry->file,
 										&state->entries[off].segno))
 		{
@@ -2018,6 +2012,9 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
 	ReorderBufferChange *volatile specinsert = NULL;
 	volatile bool stream_started = false;
 	ReorderBufferTXN *volatile curtxn = NULL;
+	ReplicationSlot *slot = MyReplicationSlot;
+
+	Assert(MyReplicationSlot != NULL);
 
 	/* build data to be able to lookup the CommandIds of catalog tuples */
 	ReorderBufferBuildTupleCidHash(rb, txn);
@@ -2380,9 +2377,9 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
 		 * which we have already accounted in ReorderBufferIterTXNNext.
 		 */
 		if (!rbtxn_is_streamed(txn))
-			rb->totalTxns++;
+			slot->totalTxns++;
 
-		rb->totalBytes += rb->size;
+		slot->totalBytes += rb->size;
 
 		/*
 		 * Done with current changes, send the last message for this set of
@@ -3482,6 +3479,9 @@ ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
 	XLogSegNo	curOpenSegNo = 0;
 	Size		spilled = 0;
 	Size		size = txn->size;
+	ReplicationSlot *slot = MyReplicationSlot;
+
+	Assert(MyReplicationSlot != NULL);
 
 	elog(DEBUG2, "spill %u changes in XID %u to disk",
 		 (uint32) txn->nentries_mem, txn->xid);
@@ -3543,11 +3543,11 @@ ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
 	/* update the statistics iff we have spilled anything */
 	if (spilled)
 	{
-		rb->spillCount += 1;
-		rb->spillBytes += size;
+		slot->spillCount += 1;
+		slot->spillBytes += size;
 
 		/* don't consider already serialized transactions */
-		rb->spillTxns += (rbtxn_is_serialized(txn) || rbtxn_is_serialized_clear(txn)) ? 0 : 1;
+		slot->spillTxns += (rbtxn_is_serialized(txn) || rbtxn_is_serialized_clear(txn)) ? 0 : 1;
 	}
 
 	Assert(spilled == txn->nentries_mem);
@@ -3818,6 +3818,9 @@ ReorderBufferStreamTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
 	CommandId	command_id;
 	Size		stream_bytes;
 	bool		txn_is_streamed;
+	ReplicationSlot *slot = MyReplicationSlot;
+
+	Assert(MyReplicationSlot != NULL);
 
 	/* We can never reach here for a subtransaction. */
 	Assert(txn->toptxn == NULL);
@@ -3911,11 +3914,11 @@ ReorderBufferStreamTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
 	ReorderBufferProcessTXN(rb, txn, InvalidXLogRecPtr, snapshot_now,
 							command_id, true);
 
-	rb->streamCount += 1;
-	rb->streamBytes += stream_bytes;
+	slot->streamCount += 1;
+	slot->streamBytes += stream_bytes;
 
 	/* Don't consider already streamed transaction. */
-	rb->streamTxns += (txn_is_streamed) ? 0 : 1;
+	slot->streamTxns += (txn_is_streamed) ? 0 : 1;
 
 	Assert(dlist_is_empty(&txn->changes));
 	Assert(txn->nentries == 0);
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index f61b163f78d..6fde2bc8676 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -44,6 +44,7 @@
 #include "common/string.h"
 #include "miscadmin.h"
 #include "pgstat.h"
+#include "replication/logical.h"
 #include "replication/slot.h"
 #include "storage/fd.h"
 #include "storage/proc.h"
@@ -295,6 +296,7 @@ ReplicationSlotCreate(const char *name, bool db_specific,
 	slot->candidate_xmin_lsn = InvalidXLogRecPtr;
 	slot->candidate_restart_valid = InvalidXLogRecPtr;
 	slot->candidate_restart_lsn = InvalidXLogRecPtr;
+	InitializeSlotStats(slot);
 
 	/*
 	 * Create the slot on disk.  We haven't actually marked the slot allocated
@@ -500,6 +502,15 @@ ReplicationSlotRelease(void)
 
 	Assert(slot != NULL && slot->active_pid != 0);
 
+	if (SlotIsLogical(slot))
+	{
+		/*
+		 * Update the decoding stats. This avoids losing them if the decoding
+		 * is interrupted.
+		 */
+		UpdateDecodingStats();
+	}
+
 	if (slot->data.persistency == RS_EPHEMERAL)
 	{
 		/*
@@ -1782,6 +1793,7 @@ RestoreSlotFromDisk(const char *name)
 		slot->candidate_xmin_lsn = InvalidXLogRecPtr;
 		slot->candidate_restart_lsn = InvalidXLogRecPtr;
 		slot->candidate_restart_valid = InvalidXLogRecPtr;
+		InitializeSlotStats(slot);
 
 		slot->in_use = true;
 		slot->active_pid = 0;
@@ -1795,3 +1807,17 @@ RestoreSlotFromDisk(const char *name)
 				(errmsg("too many replication slots active before shutdown"),
 				 errhint("Increase max_replication_slots and try again.")));
 }
+
+/* Initialize Replication Slot stats */
+void
+InitializeSlotStats(ReplicationSlot *slot)
+{
+	slot->spillTxns = 0;
+	slot->spillCount = 0;
+	slot->spillBytes = 0;
+	slot->streamTxns = 0;
+	slot->streamCount = 0;
+	slot->streamBytes = 0;
+	slot->totalTxns = 0;
+	slot->totalBytes = 0;
+}
diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h
index 7dfcb7be187..44baec63114 100644
--- a/src/include/replication/logical.h
+++ b/src/include/replication/logical.h
@@ -134,6 +134,6 @@ extern bool filter_prepare_cb_wrapper(LogicalDecodingContext *ctx,
 									  TransactionId xid, const char *gid);
 extern bool filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id);
 extern void ResetLogicalStreamingState(void);
-extern void UpdateDecodingStats(LogicalDecodingContext *ctx);
+extern void UpdateDecodingStats(void);
 
 #endif
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index bfab8303ee7..c8295fc9edf 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -602,29 +602,6 @@ struct ReorderBuffer
 
 	/* memory accounting */
 	Size		size;
-
-	/*
-	 * Statistics about transactions spilled to disk.
-	 *
-	 * A single transaction may be spilled repeatedly, which is why we keep
-	 * two different counters. For spilling, the transaction counter includes
-	 * both toplevel transactions and subtransactions.
-	 */
-	int64		spillTxns;		/* number of transactions spilled to disk */
-	int64		spillCount;		/* spill-to-disk invocation counter */
-	int64		spillBytes;		/* amount of data spilled to disk */
-
-	/* Statistics about transactions streamed to the decoding output plugin */
-	int64		streamTxns;		/* number of transactions streamed */
-	int64		streamCount;	/* streaming invocation counter */
-	int64		streamBytes;	/* amount of data streamed */
-
-	/*
-	 * Statistics about all the transactions sent to the decoding output
-	 * plugin
-	 */
-	int64		totalTxns;		/* total number of transactions sent */
-	int64		totalBytes;		/* total amount of data sent */
 };
 
 
diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
index 1ad5e6c50df..267eed60aef 100644
--- a/src/include/replication/slot.h
+++ b/src/include/replication/slot.h
@@ -172,6 +172,29 @@ typedef struct ReplicationSlot
 	XLogRecPtr	candidate_xmin_lsn;
 	XLogRecPtr	candidate_restart_valid;
 	XLogRecPtr	candidate_restart_lsn;
+
+	/*
+	 * Statistics about transactions spilled to disk.
+	 *
+	 * A single transaction may be spilled repeatedly, which is why we keep
+	 * two different counters. For spilling, the transaction counter includes
+	 * both toplevel transactions and subtransactions.
+	 */
+	int64		spillTxns;		/* number of transactions spilled to disk */
+	int64		spillCount;		/* spill-to-disk invocation counter */
+	int64		spillBytes;		/* amount of data spilled to disk */
+
+	/* Statistics about transactions streamed to the decoding output plugin */
+	int64		streamTxns;		/* number of transactions streamed */
+	int64		streamCount;	/* streaming invocation counter */
+	int64		streamBytes;	/* amount of data streamed */
+
+	/*
+	 * Statistics about all the transactions sent to the decoding output
+	 * plugin
+	 */
+	int64		totalTxns;		/* total number of transactions sent */
+	int64		totalBytes;		/* total amount of data sent */
 } ReplicationSlot;
 
 #define SlotIsPhysical(slot) ((slot)->data.database == InvalidOid)
@@ -231,5 +254,6 @@ extern void StartupReplicationSlots(void);
 extern void CheckPointReplicationSlots(void);
 
 extern void CheckSlotRequirements(void);
+extern void InitializeSlotStats(ReplicationSlot *slot);
 
 #endif							/* SLOT_H */
-- 
2.28.0.windows.1

