Hi, The recently added tests for slotsync on two-phase enabled slots failed[1] due to a broken assertion triggered while decoding the COMMIT PREPARED record on the promoted standby.
-----
Analysis
-----
if ((txn->final_lsn < two_phase_at) && is_commit)
{
/*
* txn must have been marked as a prepared transaction and
skipped but
* not sent a prepare. Also, the prepare info must have been
updated
* in txn even if we skip prepare.
*/
Assert((txn->txn_flags & RBTXN_PREPARE_STATUS_MASK) ==
(RBTXN_IS_PREPARED | RBTXN_SKIPPED_PREPARE));
I think the issue stem from the PREPARED transaction, which was skipped on the
primary, not being skipped on the promoted standby. The root cause appears to
be the latest 'confirmed_flush' (0/6005118) of the source slot not being synced
to the standby. This results in the confirmed_flush (0/6004F90) of the synced
slot being less than the synced two_phase_at field (0/6005118). Consequently,
the PREPARE record's LSN exceeds the synced confirmed_flush during
decoding, causing it to be incorrectly decoded and sent to the subscriber.
Thus, when
decoding COMMIT PREPARED, the PREPARE record is found before two_phase_at but
wasn't skipped.
-- The LOGs that proves the confirmed_flush is not synced:
decoding on original slot (primary):
LOG: 0/6004F90 has been already streamed, forwarding to 0/6005118
...
DETAIL: Streaming transactions committing after 0/6005118, reading WAL
from 0/60049C8.
decoding on synced slot (promted standby) - failed run:
DETAIL: Streaming transactions committing after 0/6004F90, reading WAL
from 0/60049C8.
decoding on synced slot (promted standby) - success run:
LOG: 0/6004F90 has been already streamed, forwarding to 0/6005118
...
DETAIL: Streaming transactions committing after 0/6005118, reading WAL
from 0/60049C8.
--
The logs above clearly show that during the test failure, the starting position
(6004F90) was less than that of a successful run. Additionally, it was lower
than the starting position of the remote slot on the primary, indicating a
failure to synchronize confirmed_lsn.
The reason confirmed_flush isn't synced should stem from the following check,
which skip the syncing of the confirmed_flush value. I also reproduced the
assertion failure by creating a scenario that leads to sync omission due to
this check:
/*
* Don't overwrite if we already have a newer catalog_xmin and
* restart_lsn.
*/
if (remote_slot->restart_lsn < slot->data.restart_lsn ||
TransactionIdPrecedes(remote_slot->catalog_xmin,
slot->data.catalog_xmin))
{
This check triggered because the synced catalog_xmin surpassed that of the
source slot (It can be seen from the log that the restart_lsn was synced to the
same value) ). This situation arises due to several factors:
On the publisher(primary), the slot may retain an older catalog_xmin and
restart_lsn because it is being consumed by a walsender. In this scenario, if
the apply worker does not immediately confirm that changes have been flushed,
the walsender advances the catalog_xmin/restart_lsn slowly. It sets an old
value for candidate_catalog_xmin/candidate_restart_lsn and would not update
them until the apply worker confirms via LogicalConfirmReceivedLocation.
However, the slotsync worker has the opportunity to begin WAL reading from a
more recent point, potentially advancing catalog_xmin/restart_lsn to newer
values.
And it's also possible to advance only catalog_xmin while keep restart_lsn
unchanged, by starting a transaction before the running_xact record. In this
case, it would pass builder->last_serialized_snapshot as restart_lsn to
LogicalIncreaseRestartDecodingForSlot(), but last_serialized_snapshot would
point to the position of the last running_xact record instead of the current
one, or the value would be NULL if no snapshot has been serialized before, so
it would not advance restart_lsn. The advancement of catalog_xmin is not
blocked by running transaction, as it would either use the XID of the running
transaction or the oldestRunningXid as candidate.
-----
Reproduction
-----
Accompanying this analysis is a script to reproduce the issue. In these
scripts, two running_xacts were logged post-slot creation on the publisher.
Below is a detailed progression of restart_lsn/catalog_xmin advancement
observed on both the publisher and standby:
The process is : slot creation -> log running_xact -> prepare a txn -> log
running_xact
The process of advancing the catalog_xmin/restart_lsn on publisher:
slot creation
...
1. 0/301B880 - running_xacts (no running txn, oldestrunningxid = 755)
got new catalog xmin 755 at 0/301B880
LOG: got new restart lsn 0/301B880 at 0/301B880
2. 0/301B8B8 - PREPAPRE a TRANSACTION.
3. 0/301BA20 - running_xacts (no running txn, oldestrunningxid = 756)
failed to increase restart lsn: proposed 0/301B880, after
0/301BA20,
current candidate 0/301B880, current after 0/301B880, flushed
up to
0/301B810
final slot data:
catalog_xmin | 755
restart_lsn | 0/301B880
confirmed_flush_lsn | 0/301BAC8
The process of advancing the catalog_xmin/restart_lsn on standby (during slot
sync):
slot creation
...
1. 0/301B880 - running_xacts (no running txn, oldestrunningxid = 755)
building consistent snapshot, so will skip advancing the
catalog_xmin/restart_lsn
...
2. 0/301BA20 - running_xacts (no running txn, oldestrunningxid = 756)
got new catalog xmin 756 at 0/301BA20
cannot get new restart_lsn as running txn exists and didn't serialize
snapshot yet.
first synced slot data:
catalog_xmin | 756
restart_lsn | 0/301B880
confirmed_flush_lsn | 0/301BAC8
The second slot sync cycle would skip updating confirmed_lsn because
catalog_xmin is newer.
-----
Fix
-----
I think we should keep the confirmed_flush even if the previous synced
restart_lsn/catalog_xmin is newer. Attachments include a patch for the same.
Best Regards,
Hou zj
#!/bin/bash
port_primary=5432
port_secondary=5433
port_third=5434
port_secondary_2=5436
port_secondary_3=5437
port_subscriber=5434
port_subscriber2=5435
echo '=========='
echo '=Clean up='
echo '=========='
pg_ctl stop -D data_primary
pg_ctl stop -D data_secondary
pg_ctl stop -D data_third_
pg_ctl stop -D data_secondary_2
pg_ctl stop -D data_secondary_3
pg_ctl stop -D data_subscriber
pg_ctl stop -D data_subscriber2
rm -rf data_* *log
rm -rf /home/houzj/archivedir/*
echo '======================='
echo '=Set up primary server='
echo '======================='
initdb -D data_primary
cat << EOF >> data_primary/postgresql.conf
wal_level = logical
port = $port_primary
#standby_slot_names = 'physical'
#log_replication_commands = 'on'
#max_slot_wal_keep_size = 64kB
max_wal_senders=550
max_worker_processes=1000
max_replication_slots=550
log_replication_commands = 'on'
checkpoint_timeout = 1d
shared_buffers = 6GB
max_worker_processes = 32
max_parallel_maintenance_workers = 24
max_parallel_workers = 32
synchronous_commit = on
checkpoint_timeout = 1d
max_wal_size = 24GB
min_wal_size = 15GB
autovacuum = off
wal_sender_timeout = 6000s
wal_receiver_timeout = 6000s
#log_min_messages = 'debug2'
#archive_mode = on
#archive_command = 'cp %p /home/houzj/archivedir/%f'
#restore_command = 'cp /home/houzj/archivedir/%f %p'
max_prepared_transactions = 10
EOF
pg_ctl -D data_primary start -w -l primary.log
psql -d postgres -p $port_primary -c "SELECT * FROM
pg_create_physical_replication_slot('physical');"
echo '========================='
echo '=Set up secondary server='
echo '========================='
psql -d postgres -p $port_primary -c "CHECKPOINT;"
pg_basebackup -D data_secondary -p $port_primary
cat << EOF >> data_secondary/postgresql.conf
port = $port_secondary
primary_conninfo = 'port=$port_primary application_name=secondary
dbname=postgres'
#primary_conninfo = 'port=$port_primary application_name=secondary
dbname=postgreis'
primary_slot_name = 'physical'
hot_standby = on
hot_standby_feedback = on
#sync_replication_slots = on
#standby_slot_names = ''
EOF
cat << EOF >> data_secondary/standby.signal
EOF
pg_ctl -D data_secondary start -w
psql -d postgres -p $port_secondary -c "SELECT 'init' FROM
pg_create_logical_replication_slot('stuck', 'pgoutput');" &
sleep 1
psql -d postgres -p $port_primary -c "CHECKPOINT;"
echo '==================='
echo '=Set up subscirber='
echo '==================='
initdb -D data_subscriber
cat << EOF >> data_subscriber/postgresql.conf
port = $port_subscriber
checkpoint_timeout = 1h
shared_buffers = '8GB'
wal_buffers = '1GB'
max_connections = '5000'
max_wal_size = 20GB
min_wal_size = 10GB
max_wal_senders = 100
max_replication_slots = 101
autovacuum = off
wal_sender_timeout = 6000s
wal_receiver_timeout = 6000s
max_prepared_transactions = 10
EOF
pg_ctl start -D data_subscriber
psql -d postgres -p $port_primary -c "create table tab1 (a int);CREATE
PUBLICATION pub FOR TABLE tab1;"
psql -d postgres -p $port_primary -c "SELECT 'init' FROM
pg_create_logical_replication_slot('sub', 'pgoutput', false, false, true);"
psql -d postgres -p $port_subscriber -c "create table tab1 (a int);"
psql -d postgres -p $port_subscriber -c "CREATE SUBSCRIPTION sub CONNECTION
'dbname=postgres port=$port_primary' PUBLICATION pub WITH (create_slot=false,
copy_data = false, two_phase=false, failover, enabled=false)"
sleep 1
psql -d postgres -p $port_primary -c "INSERT INTO tab1 VALUES(1);"
psql -d postgres -p $port_primary -c "SELECT pg_current_wal_lsn();"
psql -d postgres -p $port_primary -c "SELECT pg_log_standby_snapshot();"
psql -d postgres -p $port_primary -c "SELECT txid_current();"
psql -d postgres -p $port_primary -c "BEGIN; INSERT INTO tab1 VALUES(2);
PREPARE TRANSACTION 'test';"
psql -d postgres -p $port_primary -c "SELECT pg_log_standby_snapshot();"
psql -d postgres -p $port_primary -c "INSERT INTO tab1 VALUES(3);"
psql -d postgres -p $port_subscriber -c "alter subscription sub enable;"
sleep 1
psql -d postgres -p $port_secondary -c "select pg_sync_replication_slots();"
#psql -d postgres -p $port_primary -c "INSERT INTO tab1 VALUES(4);"
psql -d postgres -p $port_primary -c "BEGIN; INSERT INTO tab1 VALUES(11);
PREPARE TRANSACTION 'bug_twophase';"
sleep 1
psql -d postgres -p $port_subscriber -c "alter subscription sub disable ; ;"
sleep 1
psql -d postgres -p $port_subscriber -c "alter subscription sub set (two_phase
=on); alter subscription sub enable ;"
sleep 1
psql -d postgres -p $port_secondary -c "select pg_sync_replication_slots();"
sleep 1
pg_ctl promote -D data_secondary
sleep 1
psql -d postgres -p $port_secondary -c "COMMIT PREPARED 'bug_twophase';"
psql -d postgres -p $port_subscriber -c "ALTER SUBSCRIPTION sub CONNECTION
'dbname=postgres port=$port_secondary'"
#psql -d postgres -p $port_secondary -c "SELECT * FROM pg_replication_slots;"
#echo 'Moving lsns on primary so that slot-creation on secondary is successful'
#psql -d postgres -p $port_primary -c "INSERT INTO tbl VALUES (2)"
#psql -d postgres -p $port_primary -c "INSERT INTO tbl VALUES (3)"
#psql -d postgres -p $port_primary -c "INSERT INTO tbl VALUES (4)"
0001-Fix-assertion-failure-when-decoding-synced-two-phase.patch
Description: 0001-Fix-assertion-failure-when-decoding-synced-two-phase.patch
