On Tue, May 6, 2025 at 7:22 PM Zhijie Hou (Fujitsu) wrote: > > On Mon, May 5, 2025 at 6:59 PM Amit Kapila wrote: > > > > On Sun, May 4, 2025 at 2:33 PM Masahiko Sawada > <sawada.m...@gmail.com> > > wrote: > > > > > > While I cannot be entirely certain of my analysis, I believe the > > > root cause might be related to the backward movement of the > > > confirmed_flush LSN. The following scenario seems possible: > > > > > > 1. The walsender enables the two_phase and sets two_phase_at > > > (which should be the same as confirmed_flush). > > > 2. The slot's confirmed_flush regresses for some reason. > > > 3. The slotsync worker retrieves the remote slot information and > > > enables two_phase for the local slot. > > > > > > > Yes, this is possible. Here is my theory as to how it can happen in > > the current case. In the failed test, after the primary has prepared > > a transaction, the transaction won't be replicated to the subscriber > > as two_phase was not enabled for the slot. However, subsequent > > keepalive messages can send the latest WAL location to the > > subscriber and get the confirmation of the same from the subscriber > > without its origin being moved. Now, after we restart the apply > > worker (due to disable/enable for a subscription), it will use the > > previous origin_lsn to temporarily move back the confirmed flush LSN > > as explained in one of the previous emails in another thread [1]. > > During this temporary movement of confirm flush LSN, the slotsync > > worker fetches the two_phase_at and confirm_flush_lsn values, > > leading to the assertion failure. We see this issue intermittently > > because it depends on the > timing of slotsync worker's request to fetch the slot's value. > > Based on this theory, I can reproduce the BF failure in the 040 > tap-test on HEAD after applying the 0001 patch. This is achieved by > using the injection point to stop the walsender from sending a > keepalive before receiving the old origin position from the apply > worker, ensuring the confirmed_flush consistently moves backward before > slotsync. > > Additionally, I've reproduced the duplicate data issue on HEAD without > slotsync using the attached script (after applying the injection point patch). > This issue arises if we immediately disable the subscription after the > confirm_flush_lsn moves backward, preventing the walsender from > advancing the confirm_flush_lsn. > > In this case, if a prepared transaction exists before two_phase_at, > then after re-enabling the subscription, it will replicate that > prepared transaction when decoding the PREPARE record and replicate > that again when decoding the COMMIT PREPARED record. In such cases, > the apply worker keeps reporting the error: > > ERROR: transaction identifier "pg_gid_16387_755" is already in use. > > Apart from above, we're investigating whether the same issue can occur > in back-branches and will share the results once ready.
I reproduced the duplicate data issue on PG17 as well using the attached shell script. Since PG17 doesn’t allow altering the twophase option, I created a subscription with two_phase=on and copy_data=on. I prepared a transaction before the table synchronization was ready, at a time when the slot's two_phase hadn't been set to true. This setup can cause in the prepared transaction being replicated twice after restarting the apply worker and the confirmed_flush_lsn move backwards. To ensure the origin position is initialized during table sync, I inserted some data before the prepared transaction. I added injection points(0001) to manage the table sync worker's process, allowing the apply worker to replicate some changes and update the origin position while table sync was ongoing. Best Regards, Hou zj
#!/bin/bash port_primary=5432 port_secondary=5433 port_third=5434 port_secondary_2=5436 port_secondary_3=5437 port_subscriber=5434 port_subscriber2=5435 echo '==========' echo '=Clean up=' echo '==========' pg_ctl stop -D data_primary pg_ctl stop -D data_secondary pg_ctl stop -D data_third_ pg_ctl stop -D data_secondary_2 pg_ctl stop -D data_secondary_3 pg_ctl stop -D data_subscriber pg_ctl stop -D data_subscriber2 rm -rf data_* *log rm -rf /home/houzj/archivedir/* echo '=======================' echo '=Set up primary server=' echo '=======================' initdb -D data_primary cat << EOF >> data_primary/postgresql.conf wal_level = logical port = $port_primary #standby_slot_names = 'physical' #log_replication_commands = 'on' #max_slot_wal_keep_size = 64kB max_wal_senders=550 max_worker_processes=1000 max_replication_slots=550 log_replication_commands = 'on' checkpoint_timeout = 1d shared_buffers = 6GB max_worker_processes = 32 max_parallel_maintenance_workers = 24 max_parallel_workers = 32 synchronous_commit = on checkpoint_timeout = 1d max_wal_size = 24GB min_wal_size = 15GB autovacuum = off wal_sender_timeout = 6000s wal_receiver_timeout = 6000s #log_min_messages = 'debug2' #archive_mode = on #archive_command = 'cp %p /home/houzj/archivedir/%f' #restore_command = 'cp /home/houzj/archivedir/%f %p' max_prepared_transactions = 10 EOF pg_ctl -D data_primary start -w -l primary.log psql -d postgres -p $port_primary -c "SELECT * FROM pg_create_physical_replication_slot('physical');" echo '=========================' echo '=Set up secondary server=' echo '=========================' psql -d postgres -p $port_primary -c "CHECKPOINT;" pg_basebackup -D data_secondary -p $port_primary cat << EOF >> data_secondary/postgresql.conf port = $port_secondary primary_conninfo = 'port=$port_primary application_name=secondary dbname=postgres' #primary_conninfo = 'port=$port_primary application_name=secondary dbname=postgreis' primary_slot_name = 'physical' hot_standby = on hot_standby_feedback = on #sync_replication_slots = on #standby_slot_names = '' EOF cat << EOF >> data_secondary/standby.signal EOF pg_ctl -D data_secondary start -w psql -d postgres -p $port_secondary -c "SELECT 'init' FROM pg_create_logical_replication_slot('stuck', 'pgoutput');" & sleep 1 psql -d postgres -p $port_primary -c "CHECKPOINT;" echo '===================' echo '=Set up subscirber=' echo '===================' initdb -D data_subscriber cat << EOF >> data_subscriber/postgresql.conf port = $port_subscriber checkpoint_timeout = 1h shared_buffers = '8GB' wal_buffers = '1GB' max_connections = '5000' max_wal_size = 20GB min_wal_size = 10GB max_wal_senders = 100 max_replication_slots = 101 autovacuum = off wal_sender_timeout = 6000s wal_receiver_timeout = 6000s wal_receiver_status_interval = 1 max_prepared_transactions = 10 EOF pg_ctl start -D data_subscriber psql -d postgres -p $port_subscriber -c "create extension injection_points;SELECT injection_points_attach('table-sync-done-1', 'wait');SELECT injection_points_attach('table-sync-wait-2', 'wait');" psql -d postgres -p $port_primary -c "CREATE TABLE tab1(a int); INSERT INTO tab1 VALUES(1); CREATE TABLE tab2(a int); INSERT INTO tab2 VALUES(1);CREATE PUBLICATION pub FOR TABLE tab1,tab2;" psql -d postgres -p $port_subscriber -c "CREATE TABLE tab1(a int); CREATE TABLE tab2(a int);" psql -d postgres -p $port_subscriber -c "CREATE SUBSCRIPTION sub CONNECTION 'dbname=postgres port=$port_primary' PUBLICATION pub WITH (slot_name='logicalslot', create_slot=true, copy_data = true, two_phase=true, failover=true, enabled=true)" sleep 2 psql -d postgres -p $port_primary -c "SELECT slot_name, confirmed_flush_lsn FROM pg_replication_slots WHERE slot_name='logicalslot'" psql -d postgres -p $port_primary -c "INSERT INTO tab1 VALUES(2);INSERT INTO tab2 VALUES(2)" psql -d postgres -p $port_primary -c "BEGIN; INSERT INTO tab1 VALUES(3);INSERT INTO tab2 VALUES(3); PREPARE TRANSACTION 'slotsync_twophase_prepared';" sleep 1 psql -d postgres -p $port_subscriber -c "SELECT injection_points_wakeup('table-sync-done-1');SELECT injection_points_detach('table-sync-done-1')" sleep 2 psql -d postgres -p $port_primary -c "SELECT slot_name, confirmed_flush_lsn FROM pg_replication_slots WHERE slot_name='logicalslot'" psql -d postgres -p $port_subscriber -c "SELECT injection_points_attach('connecting-walsender', 'wait');" psql -d postgres -p $port_subscriber -c "SELECT injection_points_wakeup('table-sync-wait-2');SELECT injection_points_detach('table-sync-wait-2')" sleep 2 psql -d postgres -p $port_primary -c "create extension injection_points;SELECT injection_points_attach('process-replies', 'wait');" psql -d postgres -p $port_subscriber -c "SELECT injection_points_wakeup('connecting-walsender');SELECT injection_points_detach('connecting-walsender')" sleep 2 psql -d postgres -p $port_subscriber -c "alter subscription sub disable;" psql -d postgres -p $port_primary -c "; SELECT injection_points_wakeup('process-replies');;SELECT injection_points_detach('process-replies');" sleep 1 psql -d postgres -p $port_primary -c "COMMIT PREPARED 'slotsync_twophase_prepared';" psql -d postgres -p $port_subscriber -c "alter subscription sub enable;" exit
0001-table-sync-injection-points.patch
Description: 0001-table-sync-injection-points.patch