On Sat, Mar 20, 2021 at 9:26 AM Amit Kapila <amit.kapil...@gmail.com> wrote:
>
> On Sat, Mar 20, 2021 at 12:22 AM Andres Freund <and...@anarazel.de> wrote:
> >
> > And then more generally about the feature:
> > - If a slot was used to stream out a large amount of changes (say an
> >   initial data load), but then replication is interrupted before the
> >   transaction is committed/aborted, stream_bytes will not reflect the
> >   many gigabytes of data we may have sent.
> >
>
> We can probably update the stats each time we spilled or streamed the
> transaction data but it was not clear at that stage whether or how
> much it will be useful.
>

I felt we can update the replication slot statistics data each time we
spill/stream the transaction data instead of accumulating the
statistics and updating at the end. I have tried this in the attached
patch and the statistics data were getting updated.
Thoughts?

Regards,
Vignesh
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index 99f2afc73c..9905e2b8ad 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -748,7 +748,7 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
 	 * not clear that sending more or less frequently than this would be
 	 * better.
 	 */
-	UpdateDecodingStats(ctx);
+	UpdateDecodingStats(ctx->reorder);
 }
 
 /*
@@ -830,7 +830,7 @@ DecodePrepare(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
 	 * not clear that sending more or less frequently than this would be
 	 * better.
 	 */
-	UpdateDecodingStats(ctx);
+	UpdateDecodingStats(ctx->reorder);
 }
 
 
@@ -887,7 +887,7 @@ DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
 	}
 
 	/* update the decoding stats */
-	UpdateDecodingStats(ctx);
+	UpdateDecodingStats(ctx->reorder);
 }
 
 /*
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 954dbb5554..00c481e1d2 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -219,6 +219,7 @@ StartupDecodingContext(List *output_plugin_options,
 	ctx->reorder->apply_truncate = truncate_cb_wrapper;
 	ctx->reorder->commit = commit_cb_wrapper;
 	ctx->reorder->message = message_cb_wrapper;
+	namestrcpy(&ctx->reorder->slotname, NameStr(ctx->slot->data.name));
 
 	/*
 	 * To support streaming, we require start/stop/abort/commit/change
@@ -1791,15 +1792,14 @@ ResetLogicalStreamingState(void)
  * Report stats for a slot.
  */
 void
-UpdateDecodingStats(LogicalDecodingContext *ctx)
+UpdateDecodingStats(ReorderBuffer *rb)
 {
-	ReorderBuffer *rb = ctx->reorder;
-
 	/*
 	 * Nothing to do if we haven't spilled or streamed anything since the last
 	 * time the stats has been sent.
 	 */
-	if (rb->spillBytes <= 0 && rb->streamBytes <= 0)
+	if (rb->spillBytes <= 0 && rb->spillCount <=0 &&
+		rb->streamBytes <= 0 && rb->streamCount <=0)
 		return;
 
 	elog(DEBUG2, "UpdateDecodingStats: updating stats %p %lld %lld %lld %lld %lld %lld",
@@ -1811,7 +1811,7 @@ UpdateDecodingStats(LogicalDecodingContext *ctx)
 		 (long long) rb->streamCount,
 		 (long long) rb->streamBytes);
 
-	pgstat_report_replslot(NameStr(ctx->slot->data.name),
+	pgstat_report_replslot(NameStr(rb->slotname),
 						   rb->spillTxns, rb->spillCount, rb->spillBytes,
 						   rb->streamTxns, rb->streamCount, rb->streamBytes);
 	rb->spillTxns = 0;
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 92c7fa7993..b60a864004 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -3528,6 +3528,7 @@ ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
 
 		/* don't consider already serialized transactions */
 		rb->spillTxns += (rbtxn_is_serialized(txn) || rbtxn_is_serialized_clear(txn)) ? 0 : 1;
+		UpdateDecodingStats(rb);
 	}
 
 	Assert(spilled == txn->nentries_mem);
@@ -3897,6 +3898,8 @@ ReorderBufferStreamTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
 	/* Don't consider already streamed transaction. */
 	rb->streamTxns += (txn_is_streamed) ? 0 : 1;
 
+	UpdateDecodingStats(rb);
+
 	Assert(dlist_is_empty(&txn->changes));
 	Assert(txn->nentries == 0);
 	Assert(txn->nentries_mem == 0);
diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h
index 72f049b347..71d93058d2 100644
--- a/src/include/replication/logical.h
+++ b/src/include/replication/logical.h
@@ -139,6 +139,6 @@ extern bool filter_prepare_cb_wrapper(LogicalDecodingContext *ctx,
 									  TransactionId xid, const char *gid);
 extern bool filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id);
 extern void ResetLogicalStreamingState(void);
-extern void UpdateDecodingStats(LogicalDecodingContext *ctx);
+extern void UpdateDecodingStats(ReorderBuffer *rb);
 
 #endif
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index 6c9f2c6c77..bcc4788480 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -618,6 +618,8 @@ struct ReorderBuffer
 	int64		streamTxns;		/* number of transactions streamed */
 	int64		streamCount;	/* streaming invocation counter */
 	int64		streamBytes;	/* amount of data streamed */
+
+	NameData	slotname;		/* Slot name for updating statistics */
 };
 
 

Reply via email to