On Mon, Apr 28, 2025 at 7:33 PM Zhijie Hou (Fujitsu) wrote:
>
> Thanks for reviewing. Here is V3 patch that addressed it.
>
> BTW, I also did some tests to confirm the catalog_xmin could still be
> ahead in some case, and here is an example:
>
> 1. Create a failover replication slot named 'logicalslot' on primary
> and acquire it in the walsender.
>
> 2. Log two standby snapshots on primary. Before logging, call
> txid_current() To assign a xid, so that each standby snapshot will
> hold a new xid in its oldestrunningXid field:
> - txid_current();
> - `0/3000420` - `running_xacts` (no running transactions,
> oldestrunningXid = 755)
> - txid_current();
> - `0/3000488` - `running_xacts` (no running transactions,
> oldestrunningXid = 756)
>
> 3. The walsender sets `0/3000420` as the `candidate_restart_lsn`, 755 as
> `candidate_catalog_xmin`, skipping the second `running_xacts` because
> `candidate_restart_lsn`/`candidate_catalog_xmin` is already valid.
>
> 4. After receiving a reply from the apply worker, the walsender assigns
> `0/3000420` to `restart_lsn`, `755` to `catalog_xmin`. At this point, the
> replication slot 'logicalslot' has `restart_lsn` set to `0/3000420`,
> `catalog_xmin` set to `755`.
>
> 5. On the standby, execute `pg_sync_replication_slots()` to synchronize
> 'logicalslot'.
>
> 6. During synchronization, with the initial `restart_lsn` at `0/3000420`, the
> sync slot reaches a consistent point at this position. As a result, it does
> not update `candidate_restart_lsn` and `candidate_catalog_xmin` at
> consistent point (refer to `SnapBuildProcessRunningXacts()`).
>
> 7. The sync process identifies the second standby snapshot at
> `0/3000488` and
> uses its LSN as `candidate_restart_lsn`, and use the
> oldestrunningXid `756`
> as `candidate_catalog_xmin`, eventually updating it to `restart_lsn` and
> `catalog_xmin`.
>
> 8. Now, the synced slot holds `restart_lsn` at `0/3000488`, `catalog_xmin` at
> `756`, which are all ahead of the remote slot on the primary server.
>
> Attaching a script to reproduce the same.
>
> Note that, to reproduce this stably, we'd better modify the value of
> LOG_SNAPSHOT_INTERVAL_MS in bgwriter.c to a bigger number to avoid
> unexpected xl_running_xacts logging.
In addition to above steps, for those interested in reproducing the specific
scenario where two_phase_at advances past the synced confirmed_flush, I'm
attaching a new script. This script can reproduce the issue after applying the
injection points I provided.
Best Regards,
Hou zj
From a4a314b7041779d6e5725a9d66b0135715d393f4 Mon Sep 17 00:00:00 2001
From: Zhijie Hou <[email protected]>
Date: Tue, 29 Apr 2025 13:56:25 +0800
Subject: [PATCH] injection point
---
src/backend/replication/logical/logical.c | 3 +++
1 file changed, 3 insertions(+)
diff --git a/src/backend/replication/logical/logical.c
b/src/backend/replication/logical/logical.c
index a8d2e024d34..afe194fb398 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -41,6 +41,7 @@
#include "storage/proc.h"
#include "storage/procarray.h"
#include "utils/builtins.h"
+#include "utils/injection_point.h"
#include "utils/inval.h"
#include "utils/memutils.h"
@@ -603,6 +604,8 @@ CreateDecodingContext(XLogRecPtr start_lsn,
SnapBuildSetTwoPhaseAt(ctx->snapshot_builder, start_lsn);
}
+ INJECTION_POINT("initialized-two-phase-at");
+
ctx->reorder->output_rewrites = ctx->options.receive_rewrites;
ereport(LOG,
--
2.30.0.windows.2
#!/bin/bash
port_primary=5432
port_secondary=5433
port_third=5434
port_secondary_2=5436
port_secondary_3=5437
port_subscriber=5434
port_subscriber2=5435
echo '=========='
echo '=Clean up='
echo '=========='
pg_ctl stop -D data_primary
pg_ctl stop -D data_secondary
pg_ctl stop -D data_third_
pg_ctl stop -D data_secondary_2
pg_ctl stop -D data_secondary_3
pg_ctl stop -D data_subscriber
pg_ctl stop -D data_subscriber2
rm -rf data_* *log
rm -rf /home/houzj/archivedir/*
echo '======================='
echo '=Set up primary server='
echo '======================='
initdb -D data_primary
cat << EOF >> data_primary/postgresql.conf
wal_level = logical
port = $port_primary
#standby_slot_names = 'physical'
#log_replication_commands = 'on'
#max_slot_wal_keep_size = 64kB
max_wal_senders=550
max_worker_processes=1000
max_replication_slots=550
log_replication_commands = 'on'
checkpoint_timeout = 1d
shared_buffers = 6GB
max_worker_processes = 32
max_parallel_maintenance_workers = 24
max_parallel_workers = 32
synchronous_commit = on
checkpoint_timeout = 1d
max_wal_size = 24GB
min_wal_size = 15GB
autovacuum = off
wal_sender_timeout = 6000s
wal_receiver_timeout = 6000s
#log_min_messages = 'debug2'
#archive_mode = on
#archive_command = 'cp %p /home/houzj/archivedir/%f'
#restore_command = 'cp /home/houzj/archivedir/%f %p'
max_prepared_transactions = 10
EOF
pg_ctl -D data_primary start -w -l primary.log
psql -d postgres -p $port_primary -c "SELECT * FROM
pg_create_physical_replication_slot('physical');"
echo '========================='
echo '=Set up secondary server='
echo '========================='
psql -d postgres -p $port_primary -c "CHECKPOINT;"
pg_basebackup -D data_secondary -p $port_primary
cat << EOF >> data_secondary/postgresql.conf
port = $port_secondary
primary_conninfo = 'port=$port_primary application_name=secondary
dbname=postgres'
#primary_conninfo = 'port=$port_primary application_name=secondary
dbname=postgreis'
primary_slot_name = 'physical'
hot_standby = on
hot_standby_feedback = on
#sync_replication_slots = on
#standby_slot_names = ''
EOF
cat << EOF >> data_secondary/standby.signal
EOF
pg_ctl -D data_secondary start -w
psql -d postgres -p $port_secondary -c "SELECT 'init' FROM
pg_create_logical_replication_slot('stuck', 'pgoutput');" &
sleep 1
psql -d postgres -p $port_primary -c "CHECKPOINT;"
echo '==================='
echo '=Set up subscirber='
echo '==================='
initdb -D data_subscriber
cat << EOF >> data_subscriber/postgresql.conf
port = $port_subscriber
checkpoint_timeout = 1h
shared_buffers = '8GB'
wal_buffers = '1GB'
max_connections = '5000'
max_wal_size = 20GB
min_wal_size = 10GB
max_wal_senders = 100
max_replication_slots = 101
autovacuum = off
wal_sender_timeout = 6000s
wal_receiver_timeout = 6000s
max_prepared_transactions = 10
EOF
pg_ctl start -D data_subscriber
psql -d postgres -p $port_primary -c "CREATE TABLE tab1(a int); CREATE
PUBLICATION pub FOR TABLE tab1;"
psql -d postgres -p $port_subscriber -c "CREATE TABLE tab1(a int);"
psql -d postgres -p $port_subscriber -c "CREATE SUBSCRIPTION sub CONNECTION
'dbname=postgres port=$port_primary' PUBLICATION pub WITH
(slot_name='logicalslot', create_slot=true, copy_data = false, two_phase=false,
failover=true, enabled=false)"
psql -d postgres -p $port_primary -c "SELECT pg_current_wal_lsn();"
psql -d postgres -p $port_primary -c "SELECT txid_current();"
psql -d postgres -p $port_primary -c "SELECT pg_log_standby_snapshot();"
psql -d postgres -p $port_primary -c "SELECT txid_current();"
psql -d postgres -p $port_primary -c "SELECT pg_log_standby_snapshot();"
psql -d postgres -p $port_primary -c "SELECT txid_current();"
psql -d postgres -p $port_primary -c "SELECT pg_log_standby_snapshot();"
sleep 1
psql -d postgres -p $port_subscriber -c "ALTER SUBSCRIPTION sub ENABLE;"
psql -d postgres -p $port_primary -c "SELECT restart_lsn FROM
pg_replication_slots;"
sleep 1
psql -d postgres -p $port_secondary -c "select pg_sync_replication_slots();"
psql -d postgres -p $port_primary -c "BEGIN; SELECT
pg_logical_emit_message(true, 'test', 'test'); PREPARE TRANSACTION
'slotsync_twophase_prepared';"
sleep 1
psql -d postgres -p $port_primary -c "create extension injection_points;SELECT
injection_points_attach('initialized-two-phase-at', 'wait');"
psql -d postgres -p $port_subscriber -c "alter subscription sub disable;"
sleep 1
psql -d postgres -p $port_subscriber -c "alter subscription sub set (two_phase
=on); alter subscription sub enable ;"
sleep 1
psql -d postgres -p $port_secondary -c "select pg_sync_replication_slots();"
exit