On Sat, Mar 20, 2021 at 9:26 AM Amit Kapila <amit.kapil...@gmail.com> wrote: > > On Sat, Mar 20, 2021 at 12:22 AM Andres Freund <and...@anarazel.de> wrote: > > > > And then more generally about the feature: > > - If a slot was used to stream out a large amount of changes (say an > > initial data load), but then replication is interrupted before the > > transaction is committed/aborted, stream_bytes will not reflect the > > many gigabytes of data we may have sent. > > > > We can probably update the stats each time we spilled or streamed the > transaction data but it was not clear at that stage whether or how > much it will be useful. >
I felt we can update the replication slot statistics data each time we spill/stream the transaction data instead of accumulating the statistics and updating at the end. I have tried this in the attached patch and the statistics data were getting updated. Thoughts? Regards, Vignesh
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c index 99f2afc73c..9905e2b8ad 100644 --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -748,7 +748,7 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, * not clear that sending more or less frequently than this would be * better. */ - UpdateDecodingStats(ctx); + UpdateDecodingStats(ctx->reorder); } /* @@ -830,7 +830,7 @@ DecodePrepare(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, * not clear that sending more or less frequently than this would be * better. */ - UpdateDecodingStats(ctx); + UpdateDecodingStats(ctx->reorder); } @@ -887,7 +887,7 @@ DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, } /* update the decoding stats */ - UpdateDecodingStats(ctx); + UpdateDecodingStats(ctx->reorder); } /* diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 954dbb5554..00c481e1d2 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -219,6 +219,7 @@ StartupDecodingContext(List *output_plugin_options, ctx->reorder->apply_truncate = truncate_cb_wrapper; ctx->reorder->commit = commit_cb_wrapper; ctx->reorder->message = message_cb_wrapper; + namestrcpy(&ctx->reorder->slotname, NameStr(ctx->slot->data.name)); /* * To support streaming, we require start/stop/abort/commit/change @@ -1791,15 +1792,14 @@ ResetLogicalStreamingState(void) * Report stats for a slot. */ void -UpdateDecodingStats(LogicalDecodingContext *ctx) +UpdateDecodingStats(ReorderBuffer *rb) { - ReorderBuffer *rb = ctx->reorder; - /* * Nothing to do if we haven't spilled or streamed anything since the last * time the stats has been sent. */ - if (rb->spillBytes <= 0 && rb->streamBytes <= 0) + if (rb->spillBytes <= 0 && rb->spillCount <=0 && + rb->streamBytes <= 0 && rb->streamCount <=0) return; elog(DEBUG2, "UpdateDecodingStats: updating stats %p %lld %lld %lld %lld %lld %lld", @@ -1811,7 +1811,7 @@ UpdateDecodingStats(LogicalDecodingContext *ctx) (long long) rb->streamCount, (long long) rb->streamBytes); - pgstat_report_replslot(NameStr(ctx->slot->data.name), + pgstat_report_replslot(NameStr(rb->slotname), rb->spillTxns, rb->spillCount, rb->spillBytes, rb->streamTxns, rb->streamCount, rb->streamBytes); rb->spillTxns = 0; diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index 92c7fa7993..b60a864004 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -3528,6 +3528,7 @@ ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) /* don't consider already serialized transactions */ rb->spillTxns += (rbtxn_is_serialized(txn) || rbtxn_is_serialized_clear(txn)) ? 0 : 1; + UpdateDecodingStats(rb); } Assert(spilled == txn->nentries_mem); @@ -3897,6 +3898,8 @@ ReorderBufferStreamTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) /* Don't consider already streamed transaction. */ rb->streamTxns += (txn_is_streamed) ? 0 : 1; + UpdateDecodingStats(rb); + Assert(dlist_is_empty(&txn->changes)); Assert(txn->nentries == 0); Assert(txn->nentries_mem == 0); diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h index 72f049b347..71d93058d2 100644 --- a/src/include/replication/logical.h +++ b/src/include/replication/logical.h @@ -139,6 +139,6 @@ extern bool filter_prepare_cb_wrapper(LogicalDecodingContext *ctx, TransactionId xid, const char *gid); extern bool filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id); extern void ResetLogicalStreamingState(void); -extern void UpdateDecodingStats(LogicalDecodingContext *ctx); +extern void UpdateDecodingStats(ReorderBuffer *rb); #endif diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h index 6c9f2c6c77..bcc4788480 100644 --- a/src/include/replication/reorderbuffer.h +++ b/src/include/replication/reorderbuffer.h @@ -618,6 +618,8 @@ struct ReorderBuffer int64 streamTxns; /* number of transactions streamed */ int64 streamCount; /* streaming invocation counter */ int64 streamBytes; /* amount of data streamed */ + + NameData slotname; /* Slot name for updating statistics */ };