Hi, The recently added tests for slotsync on two-phase enabled slots failed[1] due to a broken assertion triggered while decoding the COMMIT PREPARED record on the promoted standby.
----- Analysis ----- if ((txn->final_lsn < two_phase_at) && is_commit) { /* * txn must have been marked as a prepared transaction and skipped but * not sent a prepare. Also, the prepare info must have been updated * in txn even if we skip prepare. */ Assert((txn->txn_flags & RBTXN_PREPARE_STATUS_MASK) == (RBTXN_IS_PREPARED | RBTXN_SKIPPED_PREPARE)); I think the issue stem from the PREPARED transaction, which was skipped on the primary, not being skipped on the promoted standby. The root cause appears to be the latest 'confirmed_flush' (0/6005118) of the source slot not being synced to the standby. This results in the confirmed_flush (0/6004F90) of the synced slot being less than the synced two_phase_at field (0/6005118). Consequently, the PREPARE record's LSN exceeds the synced confirmed_flush during decoding, causing it to be incorrectly decoded and sent to the subscriber. Thus, when decoding COMMIT PREPARED, the PREPARE record is found before two_phase_at but wasn't skipped. -- The LOGs that proves the confirmed_flush is not synced: decoding on original slot (primary): LOG: 0/6004F90 has been already streamed, forwarding to 0/6005118 ... DETAIL: Streaming transactions committing after 0/6005118, reading WAL from 0/60049C8. decoding on synced slot (promted standby) - failed run: DETAIL: Streaming transactions committing after 0/6004F90, reading WAL from 0/60049C8. decoding on synced slot (promted standby) - success run: LOG: 0/6004F90 has been already streamed, forwarding to 0/6005118 ... DETAIL: Streaming transactions committing after 0/6005118, reading WAL from 0/60049C8. -- The logs above clearly show that during the test failure, the starting position (6004F90) was less than that of a successful run. Additionally, it was lower than the starting position of the remote slot on the primary, indicating a failure to synchronize confirmed_lsn. The reason confirmed_flush isn't synced should stem from the following check, which skip the syncing of the confirmed_flush value. I also reproduced the assertion failure by creating a scenario that leads to sync omission due to this check: /* * Don't overwrite if we already have a newer catalog_xmin and * restart_lsn. */ if (remote_slot->restart_lsn < slot->data.restart_lsn || TransactionIdPrecedes(remote_slot->catalog_xmin, slot->data.catalog_xmin)) { This check triggered because the synced catalog_xmin surpassed that of the source slot (It can be seen from the log that the restart_lsn was synced to the same value) ). This situation arises due to several factors: On the publisher(primary), the slot may retain an older catalog_xmin and restart_lsn because it is being consumed by a walsender. In this scenario, if the apply worker does not immediately confirm that changes have been flushed, the walsender advances the catalog_xmin/restart_lsn slowly. It sets an old value for candidate_catalog_xmin/candidate_restart_lsn and would not update them until the apply worker confirms via LogicalConfirmReceivedLocation. However, the slotsync worker has the opportunity to begin WAL reading from a more recent point, potentially advancing catalog_xmin/restart_lsn to newer values. And it's also possible to advance only catalog_xmin while keep restart_lsn unchanged, by starting a transaction before the running_xact record. In this case, it would pass builder->last_serialized_snapshot as restart_lsn to LogicalIncreaseRestartDecodingForSlot(), but last_serialized_snapshot would point to the position of the last running_xact record instead of the current one, or the value would be NULL if no snapshot has been serialized before, so it would not advance restart_lsn. The advancement of catalog_xmin is not blocked by running transaction, as it would either use the XID of the running transaction or the oldestRunningXid as candidate. ----- Reproduction ----- Accompanying this analysis is a script to reproduce the issue. In these scripts, two running_xacts were logged post-slot creation on the publisher. Below is a detailed progression of restart_lsn/catalog_xmin advancement observed on both the publisher and standby: The process is : slot creation -> log running_xact -> prepare a txn -> log running_xact The process of advancing the catalog_xmin/restart_lsn on publisher: slot creation ... 1. 0/301B880 - running_xacts (no running txn, oldestrunningxid = 755) got new catalog xmin 755 at 0/301B880 LOG: got new restart lsn 0/301B880 at 0/301B880 2. 0/301B8B8 - PREPAPRE a TRANSACTION. 3. 0/301BA20 - running_xacts (no running txn, oldestrunningxid = 756) failed to increase restart lsn: proposed 0/301B880, after 0/301BA20, current candidate 0/301B880, current after 0/301B880, flushed up to 0/301B810 final slot data: catalog_xmin | 755 restart_lsn | 0/301B880 confirmed_flush_lsn | 0/301BAC8 The process of advancing the catalog_xmin/restart_lsn on standby (during slot sync): slot creation ... 1. 0/301B880 - running_xacts (no running txn, oldestrunningxid = 755) building consistent snapshot, so will skip advancing the catalog_xmin/restart_lsn ... 2. 0/301BA20 - running_xacts (no running txn, oldestrunningxid = 756) got new catalog xmin 756 at 0/301BA20 cannot get new restart_lsn as running txn exists and didn't serialize snapshot yet. first synced slot data: catalog_xmin | 756 restart_lsn | 0/301B880 confirmed_flush_lsn | 0/301BAC8 The second slot sync cycle would skip updating confirmed_lsn because catalog_xmin is newer. ----- Fix ----- I think we should keep the confirmed_flush even if the previous synced restart_lsn/catalog_xmin is newer. Attachments include a patch for the same. Best Regards, Hou zj
#!/bin/bash port_primary=5432 port_secondary=5433 port_third=5434 port_secondary_2=5436 port_secondary_3=5437 port_subscriber=5434 port_subscriber2=5435 echo '==========' echo '=Clean up=' echo '==========' pg_ctl stop -D data_primary pg_ctl stop -D data_secondary pg_ctl stop -D data_third_ pg_ctl stop -D data_secondary_2 pg_ctl stop -D data_secondary_3 pg_ctl stop -D data_subscriber pg_ctl stop -D data_subscriber2 rm -rf data_* *log rm -rf /home/houzj/archivedir/* echo '=======================' echo '=Set up primary server=' echo '=======================' initdb -D data_primary cat << EOF >> data_primary/postgresql.conf wal_level = logical port = $port_primary #standby_slot_names = 'physical' #log_replication_commands = 'on' #max_slot_wal_keep_size = 64kB max_wal_senders=550 max_worker_processes=1000 max_replication_slots=550 log_replication_commands = 'on' checkpoint_timeout = 1d shared_buffers = 6GB max_worker_processes = 32 max_parallel_maintenance_workers = 24 max_parallel_workers = 32 synchronous_commit = on checkpoint_timeout = 1d max_wal_size = 24GB min_wal_size = 15GB autovacuum = off wal_sender_timeout = 6000s wal_receiver_timeout = 6000s #log_min_messages = 'debug2' #archive_mode = on #archive_command = 'cp %p /home/houzj/archivedir/%f' #restore_command = 'cp /home/houzj/archivedir/%f %p' max_prepared_transactions = 10 EOF pg_ctl -D data_primary start -w -l primary.log psql -d postgres -p $port_primary -c "SELECT * FROM pg_create_physical_replication_slot('physical');" echo '=========================' echo '=Set up secondary server=' echo '=========================' psql -d postgres -p $port_primary -c "CHECKPOINT;" pg_basebackup -D data_secondary -p $port_primary cat << EOF >> data_secondary/postgresql.conf port = $port_secondary primary_conninfo = 'port=$port_primary application_name=secondary dbname=postgres' #primary_conninfo = 'port=$port_primary application_name=secondary dbname=postgreis' primary_slot_name = 'physical' hot_standby = on hot_standby_feedback = on #sync_replication_slots = on #standby_slot_names = '' EOF cat << EOF >> data_secondary/standby.signal EOF pg_ctl -D data_secondary start -w psql -d postgres -p $port_secondary -c "SELECT 'init' FROM pg_create_logical_replication_slot('stuck', 'pgoutput');" & sleep 1 psql -d postgres -p $port_primary -c "CHECKPOINT;" echo '===================' echo '=Set up subscirber=' echo '===================' initdb -D data_subscriber cat << EOF >> data_subscriber/postgresql.conf port = $port_subscriber checkpoint_timeout = 1h shared_buffers = '8GB' wal_buffers = '1GB' max_connections = '5000' max_wal_size = 20GB min_wal_size = 10GB max_wal_senders = 100 max_replication_slots = 101 autovacuum = off wal_sender_timeout = 6000s wal_receiver_timeout = 6000s max_prepared_transactions = 10 EOF pg_ctl start -D data_subscriber psql -d postgres -p $port_primary -c "create table tab1 (a int);CREATE PUBLICATION pub FOR TABLE tab1;" psql -d postgres -p $port_primary -c "SELECT 'init' FROM pg_create_logical_replication_slot('sub', 'pgoutput', false, false, true);" psql -d postgres -p $port_subscriber -c "create table tab1 (a int);" psql -d postgres -p $port_subscriber -c "CREATE SUBSCRIPTION sub CONNECTION 'dbname=postgres port=$port_primary' PUBLICATION pub WITH (create_slot=false, copy_data = false, two_phase=false, failover, enabled=false)" sleep 1 psql -d postgres -p $port_primary -c "INSERT INTO tab1 VALUES(1);" psql -d postgres -p $port_primary -c "SELECT pg_current_wal_lsn();" psql -d postgres -p $port_primary -c "SELECT pg_log_standby_snapshot();" psql -d postgres -p $port_primary -c "SELECT txid_current();" psql -d postgres -p $port_primary -c "BEGIN; INSERT INTO tab1 VALUES(2); PREPARE TRANSACTION 'test';" psql -d postgres -p $port_primary -c "SELECT pg_log_standby_snapshot();" psql -d postgres -p $port_primary -c "INSERT INTO tab1 VALUES(3);" psql -d postgres -p $port_subscriber -c "alter subscription sub enable;" sleep 1 psql -d postgres -p $port_secondary -c "select pg_sync_replication_slots();" #psql -d postgres -p $port_primary -c "INSERT INTO tab1 VALUES(4);" psql -d postgres -p $port_primary -c "BEGIN; INSERT INTO tab1 VALUES(11); PREPARE TRANSACTION 'bug_twophase';" sleep 1 psql -d postgres -p $port_subscriber -c "alter subscription sub disable ; ;" sleep 1 psql -d postgres -p $port_subscriber -c "alter subscription sub set (two_phase =on); alter subscription sub enable ;" sleep 1 psql -d postgres -p $port_secondary -c "select pg_sync_replication_slots();" sleep 1 pg_ctl promote -D data_secondary sleep 1 psql -d postgres -p $port_secondary -c "COMMIT PREPARED 'bug_twophase';" psql -d postgres -p $port_subscriber -c "ALTER SUBSCRIPTION sub CONNECTION 'dbname=postgres port=$port_secondary'" #psql -d postgres -p $port_secondary -c "SELECT * FROM pg_replication_slots;" #echo 'Moving lsns on primary so that slot-creation on secondary is successful' #psql -d postgres -p $port_primary -c "INSERT INTO tbl VALUES (2)" #psql -d postgres -p $port_primary -c "INSERT INTO tbl VALUES (3)" #psql -d postgres -p $port_primary -c "INSERT INTO tbl VALUES (4)"
0001-Fix-assertion-failure-when-decoding-synced-two-phase.patch
Description: 0001-Fix-assertion-failure-when-decoding-synced-two-phase.patch