On Mon, Apr 28, 2025 at 7:33 PM Zhijie Hou (Fujitsu) wrote:
> 
> Thanks for reviewing. Here is V3 patch that addressed it.
> 
> BTW, I also did some tests to confirm the catalog_xmin could still be 
> ahead in some case, and here is an example:
> 
> 1. Create a failover replication slot named 'logicalslot' on primary
>   and acquire it in the walsender.
> 
> 2. Log two standby snapshots on primary. Before logging, call 
> txid_current() To assign a xid, so that each standby snapshot will 
> hold a new xid in its oldestrunningXid field:
>    - txid_current();
>    - `0/3000420` - `running_xacts` (no running transactions, 
> oldestrunningXid = 755)
>    - txid_current();
>    - `0/3000488` - `running_xacts` (no running transactions, 
> oldestrunningXid = 756)
> 
> 3. The walsender sets `0/3000420` as the `candidate_restart_lsn`, 755 as
>    `candidate_catalog_xmin`, skipping the second `running_xacts` because
>    `candidate_restart_lsn`/`candidate_catalog_xmin` is already valid.
> 
> 4. After receiving a reply from the apply worker, the walsender assigns
>    `0/3000420` to `restart_lsn`, `755` to `catalog_xmin`. At this point, the
>    replication slot 'logicalslot' has `restart_lsn` set to `0/3000420`,
>    `catalog_xmin` set to `755`.
> 
> 5. On the standby, execute `pg_sync_replication_slots()` to synchronize
>    'logicalslot'.
> 
> 6. During synchronization, with the initial `restart_lsn` at `0/3000420`, the
>    sync slot reaches a consistent point at this position. As a result, it does
>    not update `candidate_restart_lsn` and `candidate_catalog_xmin` at
>    consistent point (refer to `SnapBuildProcessRunningXacts()`).
> 
> 7. The sync process identifies the second standby snapshot at 
> `0/3000488` and
>    uses its LSN as `candidate_restart_lsn`, and use the 
> oldestrunningXid `756`
>    as `candidate_catalog_xmin`, eventually updating it to `restart_lsn` and
>    `catalog_xmin`.
> 
> 8. Now, the synced slot holds `restart_lsn` at `0/3000488`, `catalog_xmin` at
>    `756`, which are all ahead of the remote slot on the primary server.
> 
> Attaching a script to reproduce the same.
> 
> Note that, to reproduce this stably, we'd better modify the value of 
> LOG_SNAPSHOT_INTERVAL_MS in bgwriter.c to a bigger number to avoid 
> unexpected xl_running_xacts logging.

In addition to above steps, for those interested in reproducing the specific
scenario where two_phase_at advances past the synced confirmed_flush, I'm
attaching a new script. This script can reproduce the issue after applying the
injection points I provided.

Best Regards,
Hou zj
From a4a314b7041779d6e5725a9d66b0135715d393f4 Mon Sep 17 00:00:00 2001
From: Zhijie Hou <houzj.f...@cn.fujitsu.com>
Date: Tue, 29 Apr 2025 13:56:25 +0800
Subject: [PATCH] injection point

---
 src/backend/replication/logical/logical.c | 3 +++
 1 file changed, 3 insertions(+)

diff --git a/src/backend/replication/logical/logical.c 
b/src/backend/replication/logical/logical.c
index a8d2e024d34..afe194fb398 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -41,6 +41,7 @@
 #include "storage/proc.h"
 #include "storage/procarray.h"
 #include "utils/builtins.h"
+#include "utils/injection_point.h"
 #include "utils/inval.h"
 #include "utils/memutils.h"
 
@@ -603,6 +604,8 @@ CreateDecodingContext(XLogRecPtr start_lsn,
                SnapBuildSetTwoPhaseAt(ctx->snapshot_builder, start_lsn);
        }
 
+       INJECTION_POINT("initialized-two-phase-at");
+
        ctx->reorder->output_rewrites = ctx->options.receive_rewrites;
 
        ereport(LOG,
-- 
2.30.0.windows.2

#!/bin/bash

port_primary=5432
port_secondary=5433
port_third=5434
port_secondary_2=5436
port_secondary_3=5437
port_subscriber=5434
port_subscriber2=5435

echo '=========='
echo '=Clean up='
echo '=========='

pg_ctl stop -D data_primary
pg_ctl stop -D data_secondary
pg_ctl stop -D data_third_
pg_ctl stop -D data_secondary_2
pg_ctl stop -D data_secondary_3
pg_ctl stop -D data_subscriber
pg_ctl stop -D data_subscriber2

rm -rf data_* *log
rm -rf /home/houzj/archivedir/*

echo '======================='
echo '=Set up primary server='
echo '======================='

initdb -D data_primary

cat << EOF >> data_primary/postgresql.conf
wal_level = logical 
port = $port_primary
#standby_slot_names = 'physical'
#log_replication_commands = 'on'
#max_slot_wal_keep_size = 64kB

max_wal_senders=550
max_worker_processes=1000 
max_replication_slots=550 
log_replication_commands = 'on' 
checkpoint_timeout = 1d 
shared_buffers = 6GB 
max_worker_processes = 32 
max_parallel_maintenance_workers = 24 
max_parallel_workers = 32 
synchronous_commit = on 
checkpoint_timeout = 1d 
max_wal_size = 24GB 
min_wal_size = 15GB 
autovacuum = off

wal_sender_timeout = 6000s
wal_receiver_timeout = 6000s

#log_min_messages = 'debug2'

#archive_mode = on
#archive_command = 'cp %p /home/houzj/archivedir/%f'
#restore_command = 'cp /home/houzj/archivedir/%f %p'

max_prepared_transactions = 10

EOF


pg_ctl -D data_primary start -w -l primary.log 

psql -d postgres -p $port_primary -c "SELECT * FROM 
pg_create_physical_replication_slot('physical');"

echo '========================='
echo '=Set up secondary server='
echo '========================='

psql -d postgres -p $port_primary -c "CHECKPOINT;"

pg_basebackup -D data_secondary -p $port_primary

cat << EOF >> data_secondary/postgresql.conf
port = $port_secondary
primary_conninfo = 'port=$port_primary application_name=secondary 
dbname=postgres'
#primary_conninfo = 'port=$port_primary application_name=secondary 
dbname=postgreis'
primary_slot_name = 'physical'
hot_standby = on 
hot_standby_feedback = on
#sync_replication_slots = on
#standby_slot_names = ''
EOF

cat << EOF >> data_secondary/standby.signal
EOF

pg_ctl -D data_secondary start -w 


psql -d postgres -p $port_secondary -c "SELECT 'init' FROM 
pg_create_logical_replication_slot('stuck', 'pgoutput');" &

sleep 1

psql -d postgres -p $port_primary -c "CHECKPOINT;"

echo '==================='
echo '=Set up subscirber='
echo '==================='

initdb -D data_subscriber

cat << EOF >> data_subscriber/postgresql.conf
port = $port_subscriber


checkpoint_timeout = 1h
shared_buffers = '8GB'
wal_buffers = '1GB'
max_connections = '5000'
max_wal_size = 20GB
min_wal_size = 10GB
max_wal_senders = 100
max_replication_slots = 101
autovacuum = off

wal_sender_timeout = 6000s
wal_receiver_timeout = 6000s


max_prepared_transactions = 10
EOF

pg_ctl start -D data_subscriber


psql -d postgres -p $port_primary -c "CREATE TABLE tab1(a int); CREATE 
PUBLICATION pub FOR TABLE tab1;"

psql -d postgres -p $port_subscriber -c "CREATE TABLE tab1(a int);"
psql -d postgres -p $port_subscriber -c "CREATE SUBSCRIPTION sub CONNECTION 
'dbname=postgres port=$port_primary' PUBLICATION pub WITH 
(slot_name='logicalslot', create_slot=true, copy_data = false, two_phase=false, 
failover=true, enabled=false)"

psql -d postgres -p $port_primary -c "SELECT pg_current_wal_lsn();"
psql -d postgres -p $port_primary -c "SELECT txid_current();"
psql -d postgres -p $port_primary -c "SELECT pg_log_standby_snapshot();"
psql -d postgres -p $port_primary -c "SELECT txid_current();"
psql -d postgres -p $port_primary -c "SELECT pg_log_standby_snapshot();"
psql -d postgres -p $port_primary -c "SELECT txid_current();"
psql -d postgres -p $port_primary -c "SELECT pg_log_standby_snapshot();"

sleep 1

psql -d postgres -p $port_subscriber -c "ALTER SUBSCRIPTION sub ENABLE;"

psql -d postgres -p $port_primary -c "SELECT restart_lsn FROM 
pg_replication_slots;"

sleep 1

psql -d postgres -p $port_secondary -c "select pg_sync_replication_slots();"

psql -d postgres -p $port_primary -c "BEGIN; SELECT 
pg_logical_emit_message(true, 'test', 'test'); PREPARE TRANSACTION 
'slotsync_twophase_prepared';"

sleep 1

psql -d postgres -p $port_primary -c "create extension injection_points;SELECT 
injection_points_attach('initialized-two-phase-at', 'wait');"

psql -d postgres -p $port_subscriber -c "alter subscription sub disable;"
sleep 1
psql -d postgres -p $port_subscriber -c "alter subscription sub set (two_phase 
=on); alter subscription sub enable ;"

sleep 1

psql -d postgres -p $port_secondary -c "select pg_sync_replication_slots();"

exit

Reply via email to