Hi All, It is a spin-off thread from earlier discussions at [1] and [2].
While analyzing the slot-sync BF failure as stated in [1], it was observed that there are chances that confirmed_flush_lsn may move backward depending on the feedback messages received from the downstream system. It was suspected that the backward movement of confirmed_flush_lsn may result in data duplication issues. Earlier we were able to successfully reproduce the issue with two_phase enabled subscriptions (see[2]). Now on further analysing, it seems possible that data duplication issues may happen without two-phase as well. With the attached injection-point patch and test-script (provided by Hou-San), the data duplication issue is reproduced without using the twophase option. The problem arises when changes applied by the table sync worker are duplicated by the apply worker if the confirmed_flush_lsn moves backward after the table sync is completed. The error expected on sub after executing the test script: # ERROR: conflict detected on relation "public.tab2": conflict=insert_exists # DETAIL: Key already exists in unique index "tab2_a_key", modified in transaction .... # Key (a)=(2); existing local tuple (2); remote tuple (2). The general steps followed by attached script are: 1. After adding a new table, tab2, to the publication, refresh the subscription to initiate a table sync for tab2. Before the state reaches SYNCDONE, insert some data into tab2. This new insertion will be replicated by the table sync worker. 2. Disable the subscription and stop the apply worker before changing the state to READY. 3. Re-enable the subscription and wait for the table sync to finish. Notice that the origin position should not have progressed since step 1. 4. Disable and re-enable the subscription. Given that the origin position is less than the slot's confirmed_flush_lsn, control the walsender to stop when the confirmed_flush_lsn moves backward. 5. Disable and re-enable the subscription once more, causing the slot to retain the previous confirmed_flush_lsn, and the insertion from step 2 will be replicated again. The test script uses 3 injection points to control the race condition mentioned in above steps. Also the LOG_SNAPSHOT_INTERVAL_MS is increased to prevent the restart_lsn from increasing beyond the insert. To fix this issue, we need to prevent confirmed_flush_lsn from moving backward. Attached the fix patch for the same. With the given script, the problem reproduces on Head and PG17. We are trying to reproduce the issue on PG16 and below where injection points are not there. [1]: https://www.postgresql.org/message-id/OS3PR01MB5718BC899AEE1D04755C6E7594832%40OS3PR01MB5718.jpnprd01.prod.outlook.com [2]: https://www.postgresql.org/message-id/OS0PR01MB57164AB5716AF2E477D53F6F9489A%40OS0PR01MB5716.jpnprd01.prod.outlook.com thanks Shveta
#!/bin/bash # The intent of this script is to reproduce the issue due to confirmed_lsn # backward movement without the use of two_phase option. # The problem arises when changes applied by the table sync worker are duplicated by # the apply worker if the confirmed_flush_lsn moves backward after the table sync. # # This script reproduces this issue with the help of three injection points. # # After this script run, the error expected on sub due to above stated issue: # ERROR: conflict detected on relation "public.tab2": conflict=insert_exists # DETAIL: Key already exists in unique index "tab2_a_key", modified in transaction .... # Key (a)=(2); existing local tuple (2); remote tuple (2). port_primary=5432 port_subscriber=5434 echo '==========' echo '=Clean up=' echo '==========' pg_ctl stop -D data_primary pg_ctl stop -D data_subscriber rm -rf data_* *log echo '=======================' echo '=Set up primary server=' echo '=======================' initdb -D data_primary cat << EOF >> data_primary/postgresql.conf wal_level = logical port = $port_primary #standby_slot_names = 'physical' #log_replication_commands = 'on' #max_slot_wal_keep_size = 64kB max_wal_senders=550 max_worker_processes=1000 max_replication_slots=550 log_replication_commands = 'on' checkpoint_timeout = 1d shared_buffers = 6GB max_worker_processes = 32 max_parallel_maintenance_workers = 24 max_parallel_workers = 32 synchronous_commit = on checkpoint_timeout = 1d max_wal_size = 24GB min_wal_size = 15GB autovacuum = off wal_sender_timeout = 6000s wal_receiver_timeout = 6000s max_prepared_transactions = 10 log_min_messages = 'debug1' EOF pg_ctl -D data_primary start -w -l primary.log psql -d postgres -p $port_primary -c "SELECT * FROM pg_create_physical_replication_slot('physical');" echo '===================' echo '=Set up subscriber=' echo '===================' initdb -D data_subscriber cat << EOF >> data_subscriber/postgresql.conf port = $port_subscriber checkpoint_timeout = 1h shared_buffers = '8GB' wal_buffers = '1GB' max_connections = '5000' max_wal_size = 20GB min_wal_size = 10GB max_wal_senders = 100 max_replication_slots = 101 autovacuum = off wal_sender_timeout = 6000s wal_receiver_timeout = 6000s wal_receiver_status_interval = 1 max_prepared_transactions = 10 log_min_messages = 'debug1' EOF pg_ctl start -D data_subscriber -l sub.log # Put injection point in tablesync worker after initial copy is done # and before it can update state to SUBREL_STATE_SYNCWAIT. # It is important to stop tablesync before updating the state to SUBREL_STATE_SYNCWAIT # as we want the apply worker to keep on processing the changes instead of loop-waiting # for table sync to finish. psql -d postgres -p $port_subscriber -c "create extension injection_points;SELECT injection_points_attach('table-sync-wait-2', 'wait');" psql -d postgres -p $port_primary -c "CREATE TABLE tab1(a int); INSERT INTO tab1 VALUES(1); CREATE PUBLICATION pub FOR TABLE tab1;" psql -d postgres -p $port_subscriber -c "CREATE TABLE tab1(a int);" # Start the subscription with copy_data to false psql -d postgres -p $port_subscriber -c "CREATE SUBSCRIPTION sub CONNECTION 'dbname=postgres port=$port_primary' PUBLICATION pub WITH (slot_name='logicalslot', create_slot=true, copy_data = false, two_phase=false, failover=true, enabled=true)" sleep 2 psql -d postgres -p $port_primary -c "SELECT slot_name, confirmed_flush_lsn FROM pg_replication_slots WHERE slot_name='logicalslot'" # Insert the data into tab1. After this insert: # --apply worker's origin_lsn on sub will be advanced to this INSERT lsn (say lsn1) # --confirmed_flush on pub will also be advanced to same lsn (lsn1) psql -d postgres -p $port_primary -c "INSERT INTO tab1 VALUES(2);" sleep 1 # check both confirmed_flush and origin_lsn, they will be lsn1. psql -d postgres -p $port_subscriber -c "select * from pg_replication_origin_status where local_id = 1;" psql -d postgres -p $port_primary -c "SELECT slot_name, confirmed_flush_lsn FROM pg_replication_slots WHERE slot_name='logicalslot'" # Now add another table to publication. psql -d postgres -p $port_primary -c "CREATE TABLE tab2 (a int UNIQUE); ALTER PUBLICATION pub ADD TABLE tab2;" psql -d postgres -p $port_subscriber -c "CREATE TABLE tab2 (a int UNIQUE);" # Refresh the subscription. # It will start tablesync for tab2. Tablesync worker will do initial copy # but will wait before updating the state to SUBREL_STATE_SYNCWAIT due to # injection point table-sync-wait-2 psql -d postgres -p $port_subscriber -c "ALTER SUBSCRIPTION sub REFRESH PUBLICATION" sleep 1 # Insert the data to tab2, it will not be consumed by tablesync worker on sub yet. # Apply worker will see this change and will ignore it. # Lets say the remote lsn for this change is lsn2. psql -d postgres -p $port_primary -c "INSERT INTO tab2 VALUES(2);" sleep 3 psql -d postgres -p $port_subscriber -c "select * from pg_replication_origin_status where local_id = 1;" # By this time confirmed_flush must have moved to lsn2 now (where lsn2 > lsn1 ) # due to keepalive message handling in apply worker psql -d postgres -p $port_primary -c "SELECT slot_name, confirmed_flush_lsn FROM pg_replication_slots WHERE slot_name='logicalslot'" # Before we wake up table-sync worker to catchup, block apply worker # before it does maybe_reread_subscription() and process_syncing_tables(). # This is to avoid moving the state to SUBREL_STATE_READY psql -d postgres -p $port_subscriber -c "SELECT injection_points_attach('reread-sub', 'wait');" # Wake up table sync worker, it will now catch-up and will consume # the data inserted above in tab2. Table-sync worker's origin_lsn # will be lsn2 now. psql -d postgres -p $port_subscriber -c "SELECT injection_points_wakeup('table-sync-wait-2');SELECT injection_points_detach('table-sync-wait-2')" psql -d postgres -p $port_subscriber -c "alter subscription sub disable;" sleep 1 # Now wake up apply worker, it will re-read subscription and will exit due to sub # being disabled before moving the state to SUBREL_STATE_READY psql -d postgres -p $port_subscriber -c "SELECT injection_points_wakeup('reread-sub');SELECT injection_points_detach('reread-sub')" sleep 1 psql -d postgres -p $port_subscriber -c "alter subscription sub enable;" # Now let the apply worker start and move the state to SUBREL_STATE_READY # and let the table sync finish and exit now. sleep 3 # Now the state is: # --table sync is finished on sub, changes are synced upton lsn2 # --apply worker has processed and ignored the changes upto lsn2 without updating origin_lsn # --apply worker's origin_lsn at sub is still lsn1 # --confirmed_flush on pub is at lsn2 # # Now disable sub and before we re-enable, stop walsender to process any more # replies or send any more keepalive. psql -d postgres -p $port_primary -c "SELECT slot_name, confirmed_flush_lsn FROM pg_replication_slots WHERE slot_name='logicalslot'" psql -d postgres -p $port_subscriber -c "alter subscription sub disable;" sleep 1 psql -d postgres -p $port_primary -c "create extension injection_points;;SELECT injection_points_attach('process-replies', 'wait');" psql -d postgres -p $port_subscriber -c "alter subscription sub enable;" # Due to lack of any message from walsnder, let apply worker send feedback with # flush position as lsn1 (origin_lsn). # But this will only be processed by walsender after we release injection point # 'process-replies'. sleep 1 # Check origin is still at lsn1 and confirmed_flush at lsn2 psql -d postgres -p $port_subscriber -c "select * from pg_replication_origin_status where local_id = 1;" psql -d postgres -p $port_primary -c "SELECT slot_name, confirmed_flush_lsn FROM pg_replication_slots WHERE slot_name='logicalslot'" # Disable the subscription and release walsender's wait before ProcessRepliesIfAny() psql -d postgres -p $port_subscriber -c "alter subscription sub disable;" psql -d postgres -p $port_primary -c "; SELECT injection_points_wakeup('process-replies');SELECT injection_points_detach('process-replies');" # After injection point is released, let previous walsender process the reply from apply worker # and move the confirmed_flush to lsn1. It will then exit sleep 1 # Check confirmed_flush is moved to lsn1 psql -d postgres -p $port_primary -c "SELECT slot_name, confirmed_flush_lsn FROM pg_replication_slots WHERE slot_name='logicalslot'" # Now let new walsnder start streaming from lsn1. This will result in replay of # 'INSERT to tab2 (lsn2)' and data duplication in tab2. psql -d postgres -p $port_subscriber -c "alter subscription sub enable;" exit
v1-0001-Injection-points-to-reproduce-the-confirmed_flush.patch
Description: Binary data
v1-0001-Fix-confirmed_flush-backward-movement-issue.patch
Description: Binary data