Hi,

The recently added tests for slotsync on two-phase enabled slots failed[1] due
to a broken assertion triggered while decoding the COMMIT PREPARED record on
the promoted standby.

-----
Analysis
-----

        if ((txn->final_lsn < two_phase_at) && is_commit)
        {
                /*
                 * txn must have been marked as a prepared transaction and 
skipped but
                 * not sent a prepare. Also, the prepare info must have been 
updated
                 * in txn even if we skip prepare.
                 */
                Assert((txn->txn_flags & RBTXN_PREPARE_STATUS_MASK) ==
                           (RBTXN_IS_PREPARED | RBTXN_SKIPPED_PREPARE));

I think the issue stem from the PREPARED transaction, which was skipped on the
primary, not being skipped on the promoted standby. The root cause appears to
be the latest 'confirmed_flush' (0/6005118) of the source slot not being synced
to the standby. This results in the confirmed_flush (0/6004F90) of the synced
slot being less than the synced two_phase_at field (0/6005118). Consequently,
the PREPARE record's LSN exceeds the synced confirmed_flush during
decoding, causing it to be incorrectly decoded and sent to the subscriber. 
Thus, when
decoding COMMIT PREPARED, the PREPARE record is found before two_phase_at but
wasn't skipped.

-- The LOGs that proves the confirmed_flush is not synced:

decoding on original slot (primary):
        LOG:  0/6004F90 has been already streamed, forwarding to 0/6005118
        ...
        DETAIL:  Streaming transactions committing after 0/6005118, reading WAL 
from 0/60049C8.

decoding on synced slot (promted standby) - failed run:
        DETAIL:  Streaming transactions committing after 0/6004F90, reading WAL 
from 0/60049C8.

decoding on synced slot (promted standby) - success run:
        LOG:  0/6004F90 has been already streamed, forwarding to 0/6005118
        ...
        DETAIL:  Streaming transactions committing after 0/6005118, reading WAL 
from 0/60049C8.
-- 

The logs above clearly show that during the test failure, the starting position
(6004F90) was less than that of a successful run. Additionally, it was lower
than the starting position of the remote slot on the primary, indicating a
failure to synchronize confirmed_lsn.

The reason confirmed_flush isn't synced should stem from the following check,
which skip the syncing of the confirmed_flush value. I also reproduced the
assertion failure by creating a scenario that leads to sync omission due to
this check:

        /*
         * Don't overwrite if we already have a newer catalog_xmin and
         * restart_lsn.
         */
        if (remote_slot->restart_lsn < slot->data.restart_lsn ||
                TransactionIdPrecedes(remote_slot->catalog_xmin,
                                                          
slot->data.catalog_xmin))
        {


This check triggered because the synced catalog_xmin surpassed that of the
source slot (It can be seen from the log that the restart_lsn was synced to the
same value) ). This situation arises due to several factors:

On the publisher(primary), the slot may retain an older catalog_xmin and
restart_lsn because it is being consumed by a walsender. In this scenario, if
the apply worker does not immediately confirm that changes have been flushed,
the walsender advances the catalog_xmin/restart_lsn slowly. It sets an old
value for candidate_catalog_xmin/candidate_restart_lsn and would not update
them until the apply worker confirms via LogicalConfirmReceivedLocation.

However, the slotsync worker has the opportunity to begin WAL reading from a
more recent point, potentially advancing catalog_xmin/restart_lsn to newer
values.

And it's also possible to advance only catalog_xmin while keep restart_lsn
unchanged, by starting a transaction before the running_xact record. In this
case, it would pass builder->last_serialized_snapshot as restart_lsn to
LogicalIncreaseRestartDecodingForSlot(), but last_serialized_snapshot would
point to the position of the last running_xact record instead of the current
one, or the value would be NULL if no snapshot has been serialized before, so
it would not advance restart_lsn. The advancement of catalog_xmin is not
blocked by running transaction, as it would either use the XID of the running
transaction or the oldestRunningXid as candidate.

-----
Reproduction
-----

Accompanying this analysis is a script to reproduce the issue. In these
scripts, two running_xacts were logged post-slot creation on the publisher.
Below is a detailed progression of restart_lsn/catalog_xmin advancement
observed on both the publisher and standby:

The process is : slot creation -> log running_xact -> prepare a txn -> log
running_xact

The process of advancing the catalog_xmin/restart_lsn on publisher:

slot creation
...
1. 0/301B880 - running_xacts (no running txn, oldestrunningxid = 755)

                got new catalog xmin 755 at 0/301B880
                LOG:  got new restart lsn 0/301B880 at 0/301B880
2. 0/301B8B8 - PREPAPRE a TRANSACTION.
3. 0/301BA20 - running_xacts (no running txn, oldestrunningxid = 756)

                failed to increase restart lsn: proposed 0/301B880, after 
0/301BA20,
                current candidate 0/301B880, current after 0/301B880, flushed 
up to
                0/301B810

final slot data:

catalog_xmin        | 755
restart_lsn         | 0/301B880
confirmed_flush_lsn | 0/301BAC8


The process of advancing the catalog_xmin/restart_lsn on standby (during slot
sync):

slot creation
...
1. 0/301B880 - running_xacts (no running txn, oldestrunningxid = 755)

        building consistent snapshot, so will skip advancing the 
catalog_xmin/restart_lsn
...
2. 0/301BA20 - running_xacts (no running txn, oldestrunningxid = 756)

        got new catalog xmin 756 at 0/301BA20

        cannot get new restart_lsn as running txn exists and didn't serialize
        snapshot yet.

first synced slot data:

catalog_xmin        | 756
restart_lsn         | 0/301B880
confirmed_flush_lsn | 0/301BAC8

The second slot sync cycle would skip updating confirmed_lsn because
catalog_xmin is newer.

-----
Fix
-----

I think we should keep the confirmed_flush even if the previous synced
restart_lsn/catalog_xmin is newer. Attachments include a patch for the same.


Best Regards,
Hou zj

#!/bin/bash

port_primary=5432
port_secondary=5433
port_third=5434
port_secondary_2=5436
port_secondary_3=5437
port_subscriber=5434
port_subscriber2=5435

echo '=========='
echo '=Clean up='
echo '=========='

pg_ctl stop -D data_primary
pg_ctl stop -D data_secondary
pg_ctl stop -D data_third_
pg_ctl stop -D data_secondary_2
pg_ctl stop -D data_secondary_3
pg_ctl stop -D data_subscriber
pg_ctl stop -D data_subscriber2

rm -rf data_* *log
rm -rf /home/houzj/archivedir/*

echo '======================='
echo '=Set up primary server='
echo '======================='

initdb -D data_primary

cat << EOF >> data_primary/postgresql.conf
wal_level = logical 
port = $port_primary
#standby_slot_names = 'physical'
#log_replication_commands = 'on'
#max_slot_wal_keep_size = 64kB

max_wal_senders=550
max_worker_processes=1000 
max_replication_slots=550 
log_replication_commands = 'on' 
checkpoint_timeout = 1d 
shared_buffers = 6GB 
max_worker_processes = 32 
max_parallel_maintenance_workers = 24 
max_parallel_workers = 32 
synchronous_commit = on 
checkpoint_timeout = 1d 
max_wal_size = 24GB 
min_wal_size = 15GB 
autovacuum = off

wal_sender_timeout = 6000s
wal_receiver_timeout = 6000s

#log_min_messages = 'debug2'

#archive_mode = on
#archive_command = 'cp %p /home/houzj/archivedir/%f'
#restore_command = 'cp /home/houzj/archivedir/%f %p'

max_prepared_transactions = 10

EOF

pg_ctl -D data_primary start -w -l primary.log 

psql -d postgres -p $port_primary -c "SELECT * FROM 
pg_create_physical_replication_slot('physical');"

echo '========================='
echo '=Set up secondary server='
echo '========================='

psql -d postgres -p $port_primary -c "CHECKPOINT;"

pg_basebackup -D data_secondary -p $port_primary

cat << EOF >> data_secondary/postgresql.conf
port = $port_secondary
primary_conninfo = 'port=$port_primary application_name=secondary 
dbname=postgres'
#primary_conninfo = 'port=$port_primary application_name=secondary 
dbname=postgreis'
primary_slot_name = 'physical'
hot_standby = on 
hot_standby_feedback = on
#sync_replication_slots = on
#standby_slot_names = ''
EOF

cat << EOF >> data_secondary/standby.signal
EOF


pg_ctl -D data_secondary start -w 
psql -d postgres -p $port_secondary -c "SELECT 'init' FROM 
pg_create_logical_replication_slot('stuck', 'pgoutput');" &

sleep 1

psql -d postgres -p $port_primary -c "CHECKPOINT;"


echo '==================='
echo '=Set up subscirber='
echo '==================='

initdb -D data_subscriber

cat << EOF >> data_subscriber/postgresql.conf
port = $port_subscriber


checkpoint_timeout = 1h
shared_buffers = '8GB'
wal_buffers = '1GB'
max_connections = '5000'
max_wal_size = 20GB
min_wal_size = 10GB
max_wal_senders = 100
max_replication_slots = 101
autovacuum = off

wal_sender_timeout = 6000s
wal_receiver_timeout = 6000s


max_prepared_transactions = 10
EOF

pg_ctl start -D data_subscriber


psql -d postgres -p $port_primary -c "create table tab1 (a int);CREATE 
PUBLICATION pub FOR TABLE tab1;"
psql -d postgres -p $port_primary -c "SELECT 'init' FROM 
pg_create_logical_replication_slot('sub', 'pgoutput', false, false, true);"

psql -d postgres -p $port_subscriber -c "create table tab1 (a int);"
psql -d postgres -p $port_subscriber -c "CREATE SUBSCRIPTION sub CONNECTION 
'dbname=postgres port=$port_primary' PUBLICATION pub WITH (create_slot=false, 
copy_data = false, two_phase=false, failover, enabled=false)"

sleep 1

psql -d postgres -p $port_primary -c "INSERT INTO tab1 VALUES(1);"
psql -d postgres -p $port_primary -c "SELECT pg_current_wal_lsn();"
psql -d postgres -p $port_primary -c "SELECT pg_log_standby_snapshot();"
psql -d postgres -p $port_primary -c "SELECT txid_current();"
psql -d postgres -p $port_primary -c "BEGIN; INSERT INTO tab1 VALUES(2); 
PREPARE TRANSACTION 'test';"
psql -d postgres -p $port_primary -c "SELECT pg_log_standby_snapshot();"
psql -d postgres -p $port_primary -c "INSERT INTO tab1 VALUES(3);"

psql -d postgres -p $port_subscriber -c "alter subscription sub enable;"

sleep 1

psql -d postgres -p $port_secondary -c "select pg_sync_replication_slots();"

#psql -d postgres -p $port_primary -c "INSERT INTO tab1 VALUES(4);"

psql -d postgres -p $port_primary -c "BEGIN; INSERT INTO tab1 VALUES(11); 
PREPARE TRANSACTION 'bug_twophase';"

sleep 1

psql -d postgres -p $port_subscriber -c "alter subscription sub disable ; ;"
sleep 1
psql -d postgres -p $port_subscriber -c "alter subscription sub set (two_phase 
=on); alter subscription sub enable ;"

sleep 1

psql -d postgres -p $port_secondary -c "select pg_sync_replication_slots();"

sleep 1

pg_ctl promote -D data_secondary

sleep 1

psql -d postgres -p $port_secondary -c "COMMIT PREPARED 'bug_twophase';"



psql -d postgres -p $port_subscriber -c "ALTER SUBSCRIPTION sub CONNECTION 
'dbname=postgres port=$port_secondary'"



#psql -d postgres -p $port_secondary -c "SELECT * FROM pg_replication_slots;"

#echo 'Moving lsns on primary so that slot-creation on secondary is successful'
#psql -d postgres -p $port_primary -c "INSERT INTO tbl VALUES (2)"
#psql -d postgres -p $port_primary -c "INSERT INTO tbl VALUES (3)"
#psql -d postgres -p $port_primary -c "INSERT INTO tbl VALUES (4)"



Attachment: 0001-Fix-assertion-failure-when-decoding-synced-two-phase.patch
Description: 0001-Fix-assertion-failure-when-decoding-synced-two-phase.patch

Reply via email to