On Thu, Apr 29, 2021 at 3:06 PM Amit Kapila <[email protected]> wrote:
>
> On Wed, Apr 28, 2021 at 5:01 PM Amit Kapila <[email protected]>
wrote:
> >
> > On Wed, Apr 28, 2021 at 4:51 PM Masahiko Sawada <[email protected]>
wrote:
> > >
> > > On Wed, Apr 28, 2021 at 6:39 PM Amit Kapila <[email protected]>
wrote:
> >
> > @@ -1369,7 +1369,7 @@ ReorderBufferIterTXNNext(ReorderBuffer *rb,
> > ReorderBufferIterTXNState *state)
> > * Update the total bytes processed before releasing the current set
> > * of changes and restoring the new set of changes.
> > */
> > - rb->totalBytes += rb->size;
> > + rb->totalBytes += entry->txn->total_size;
> > if (ReorderBufferRestoreChanges(rb, entry->txn, &entry->file,
> > &state->entries[off].segno))
> >
> > I have not tested this but won't in the above change you need to check
> > txn->toptxn for subtxns?
> >
>
> Now, I am able to reproduce this issue:
> Create table t1(c1 int);
> select pg_create_logical_replication_slot('s', 'test_decoding');
> Begin;
> insert into t1 values(1);
> savepoint s1;
> insert into t1 select generate_series(1, 100000);
> commit;
>
> postgres=# select count(*) from pg_logical_slot_peek_changes('s1', NULL,
NULL);
> count
> --------
> 100005
> (1 row)
>
> postgres=# select * from pg_stat_replication_slots;
> slot_name | spill_txns | spill_count | spill_bytes | stream_txns |
> stream_count | stream_bytes | total_txns | total_bytes |
> stats_reset
>
-----------+------------+-------------+-------------+-------------+--------------+--------------+------------+-------------+----------------------------------
> s1 | 0 | 0 | 0 | 0 |
> 0 | 0 | 2 | 13200672 | 2021-04-29
> 14:33:55.156566+05:30
> (1 row)
>
> select * from pg_stat_reset_replication_slot('s1');
>
> Now reduce the logical decoding work mem to allow spilling.
> postgres=# set logical_decoding_work_mem='64kB';
> SET
> postgres=# select count(*) from pg_logical_slot_peek_changes('s1', NULL,
NULL);
> count
> --------
> 100005
> (1 row)
>
> postgres=# select * from pg_stat_replication_slots;
> slot_name | spill_txns | spill_count | spill_bytes | stream_txns |
> stream_count | stream_bytes | total_txns | total_bytes |
> stats_reset
>
-----------+------------+-------------+-------------+-------------+--------------+--------------+------------+-------------+----------------------------------
> s1 | 1 | 202 | 13200000 | 0 |
> 0 | 0 | 2 | 672 | 2021-04-29
> 14:35:21.836613+05:30
> (1 row)
>
> You can notice that after we have allowed spilling the 'total_bytes'
> stats is showing a different value. The attached patch fixes the issue
> for me. Let me know what do you think about this?
I found one issue with the following scenario when testing with
logical_decoding_work_mem as 64kB:
BEGIN;
INSERT INTO t1 values(generate_series(1,10000));
SAVEPOINT s1;
INSERT INTO t1 values(generate_series(1,10000));
COMMIT;
SELECT count(*) FROM pg_logical_slot_get_changes('regression_slot1', NULL,
NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
select * from pg_stat_replication_slots;
slot_name | spill_txns | spill_count | spill_bytes | stream_txns |
stream_count | stream_bytes | total_txns | total_bytes |
stats_reset
------------------+------------+-------------+-------------+-------------+--------------+--------------+------------+-------------+----------------------------------
regression_slot1 | 6 | 154 | 9130176 | 0 |
0 | 0 | 1 | *4262016* | 2021-04-29
17:50:00.080663+05:30
(1 row)
Same thing works fine with logical_decoding_work_mem as 64MB:
select * from pg_stat_replication_slots;
slot_name | spill_txns | spill_count | spill_bytes | stream_txns |
stream_count | stream_bytes | total_txns | total_bytes |
stats_reset
------------------+------------+-------------+-------------+-------------+--------------+--------------+------------+-------------+----------------------------------
regression_slot1 | 6 | 154 | 9130176 | 0 |
0 | 0 | 1 | *2640000* | 2021-04-29
17:50:00.080663+05:30
(1 row)
The patch required one change:
- rb->totalBytes += rb->size;
+ if (entry->txn->toptxn)
+ rb->totalBytes += entry->txn->toptxn->total_size;
+ else
+ rb->totalBytes += entry->txn->*total_size*;
The above should be changed to:
- rb->totalBytes += rb->size;
+ if (entry->txn->toptxn)
+ rb->totalBytes += entry->txn->toptxn->total_size;
+ else
+ rb->totalBytes += entry->txn->*size*;
Attached patch fixes the issue.
Thoughts?
Regards,
Vignesh
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index c27f710053..cdf46a36af 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -1369,7 +1369,10 @@ ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state)
* Update the total bytes processed before releasing the current set
* of changes and restoring the new set of changes.
*/
- rb->totalBytes += rb->size;
+ if (entry->txn->toptxn)
+ rb->totalBytes += entry->txn->toptxn->total_size;
+ else
+ rb->totalBytes += entry->txn->size;
if (ReorderBufferRestoreChanges(rb, entry->txn, &entry->file,
&state->entries[off].segno))
{
@@ -2382,7 +2385,7 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
if (!rbtxn_is_streamed(txn))
rb->totalTxns++;
- rb->totalBytes += rb->size;
+ rb->totalBytes += txn->total_size;
/*
* Done with current changes, send the last message for this set of
@@ -3073,7 +3076,7 @@ ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb,
{
Size sz;
ReorderBufferTXN *txn;
- ReorderBufferTXN *toptxn = NULL;
+ ReorderBufferTXN *toptxn;
Assert(change->txn);
@@ -3087,14 +3090,14 @@ ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb,
txn = change->txn;
- /* If streaming supported, update the total size in top level as well. */
- if (ReorderBufferCanStream(rb))
- {
- if (txn->toptxn != NULL)
- toptxn = txn->toptxn;
- else
- toptxn = txn;
- }
+ /*
+ * Update the total size in top level as well. This is later used to
+ * compute the decoding stats.
+ */
+ if (txn->toptxn != NULL)
+ toptxn = txn->toptxn;
+ else
+ toptxn = txn;
sz = ReorderBufferChangeSize(change);
@@ -3104,8 +3107,7 @@ ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb,
rb->size += sz;
/* Update the total size in the top transaction. */
- if (toptxn)
- toptxn->total_size += sz;
+ toptxn->total_size += sz;
}
else
{
@@ -3114,8 +3116,7 @@ ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb,
rb->size -= sz;
/* Update the total size in the top transaction. */
- if (toptxn)
- toptxn->total_size -= sz;
+ toptxn->total_size -= sz;
}
Assert(txn->size <= rb->size);