Hi All,

It is a spin-off thread from earlier discussions at [1] and [2].

While analyzing the slot-sync BF failure as stated in [1], it was
observed that there are chances that confirmed_flush_lsn may move
backward depending on the feedback messages received from the
downstream system. It was suspected that the backward movement of
confirmed_flush_lsn may result in data duplication issues. Earlier we
were able to successfully reproduce the issue with two_phase enabled
subscriptions (see[2]). Now on further analysing, it seems possible
that data duplication issues may happen without two-phase as well.

With the attached injection-point patch and test-script (provided by
Hou-San), the data duplication issue is reproduced without using the
twophase option. The problem arises when changes applied by the table
sync worker are duplicated by the apply worker if the
confirmed_flush_lsn moves backward after the table sync is completed.

The error expected on sub after executing the test script:
# ERROR:  conflict detected on relation "public.tab2": conflict=insert_exists
# DETAIL:  Key already exists in unique index "tab2_a_key", modified
in transaction ....
# Key (a)=(2); existing local tuple (2); remote tuple (2).

The general steps followed by attached script are:

1. After adding a new table, tab2, to the publication, refresh the
subscription to initiate a table sync for tab2. Before the state
reaches SYNCDONE, insert some data into tab2. This new insertion will
be replicated by the table sync worker.
2. Disable the subscription and stop the apply worker before changing
the state to READY.
3. Re-enable the subscription and wait for the table sync to finish.
Notice that the origin position should not have progressed since step
1.
4. Disable and re-enable the subscription. Given that the origin
position is less than the slot's confirmed_flush_lsn, control the
walsender to stop when the confirmed_flush_lsn moves backward.
5. Disable and re-enable the subscription once more, causing the slot
to retain the previous confirmed_flush_lsn, and the insertion from
step 2 will be replicated again.

The test script uses 3 injection points to control the race condition
mentioned in above steps. Also the LOG_SNAPSHOT_INTERVAL_MS is
increased to prevent the restart_lsn from increasing beyond the
insert. To fix this issue, we need to  prevent confirmed_flush_lsn
from moving backward. Attached the fix patch for the same.

With the given script, the problem reproduces on Head and PG17. We are
trying to reproduce the issue on PG16 and below where injection points
are not there.

[1]: 
https://www.postgresql.org/message-id/OS3PR01MB5718BC899AEE1D04755C6E7594832%40OS3PR01MB5718.jpnprd01.prod.outlook.com
[2]: 
https://www.postgresql.org/message-id/OS0PR01MB57164AB5716AF2E477D53F6F9489A%40OS0PR01MB5716.jpnprd01.prod.outlook.com

thanks
Shveta
#!/bin/bash

# The intent of this script is to reproduce the issue due to confirmed_lsn
# backward movement without the use of two_phase option.
# The problem arises when changes applied by the table sync worker are duplicated by
# the apply worker if the confirmed_flush_lsn moves backward after the table sync.
#
# This script reproduces this issue with the help of three injection points.
#
# After this script run, the error expected on sub due to above stated issue:
# ERROR:  conflict detected on relation "public.tab2": conflict=insert_exists
# DETAIL:  Key already exists in unique index "tab2_a_key", modified in transaction ....
#	Key (a)=(2); existing local tuple (2); remote tuple (2).

port_primary=5432
port_subscriber=5434

echo '=========='
echo '=Clean up='
echo '=========='

pg_ctl stop -D data_primary
pg_ctl stop -D data_subscriber

rm -rf data_* *log

echo '======================='
echo '=Set up primary server='
echo '======================='

initdb -D data_primary

cat << EOF >> data_primary/postgresql.conf
wal_level = logical 
port = $port_primary
#standby_slot_names = 'physical'
#log_replication_commands = 'on'
#max_slot_wal_keep_size = 64kB

max_wal_senders=550
max_worker_processes=1000 
max_replication_slots=550 
log_replication_commands = 'on' 
checkpoint_timeout = 1d 
shared_buffers = 6GB 
max_worker_processes = 32 
max_parallel_maintenance_workers = 24 
max_parallel_workers = 32 
synchronous_commit = on 
checkpoint_timeout = 1d 
max_wal_size = 24GB 
min_wal_size = 15GB 
autovacuum = off

wal_sender_timeout = 6000s
wal_receiver_timeout = 6000s

max_prepared_transactions = 10
log_min_messages = 'debug1'

EOF


pg_ctl -D data_primary start -w -l primary.log 

psql -d postgres -p $port_primary -c "SELECT * FROM pg_create_physical_replication_slot('physical');"

echo '==================='
echo '=Set up subscriber='
echo '==================='

initdb -D data_subscriber

cat << EOF >> data_subscriber/postgresql.conf
port = $port_subscriber


checkpoint_timeout = 1h
shared_buffers = '8GB'
wal_buffers = '1GB'
max_connections = '5000'
max_wal_size = 20GB
min_wal_size = 10GB
max_wal_senders = 100
max_replication_slots = 101
autovacuum = off

wal_sender_timeout = 6000s
wal_receiver_timeout = 6000s


wal_receiver_status_interval = 1
max_prepared_transactions = 10
log_min_messages = 'debug1'
EOF

pg_ctl start -D data_subscriber -l sub.log


# Put injection point in tablesync worker after initial copy is done
# and before it can update state to SUBREL_STATE_SYNCWAIT.
# It is important to stop tablesync before updating the state to SUBREL_STATE_SYNCWAIT
# as we want the apply worker to keep on processing the changes instead of loop-waiting
# for table sync to finish.
psql -d postgres -p $port_subscriber -c "create extension injection_points;SELECT injection_points_attach('table-sync-wait-2', 'wait');"

psql -d postgres -p $port_primary -c "CREATE TABLE tab1(a int); INSERT INTO tab1 VALUES(1); CREATE PUBLICATION pub FOR TABLE tab1;"

psql -d postgres -p $port_subscriber -c "CREATE TABLE tab1(a int);"

# Start the subscription with copy_data to false
psql -d postgres -p $port_subscriber -c "CREATE SUBSCRIPTION sub CONNECTION 'dbname=postgres port=$port_primary' PUBLICATION pub WITH (slot_name='logicalslot', create_slot=true, copy_data = false, two_phase=false, failover=true, enabled=true)"

sleep 2

psql -d postgres -p $port_primary -c "SELECT slot_name, confirmed_flush_lsn FROM pg_replication_slots WHERE slot_name='logicalslot'"

# Insert the data into tab1. After this insert:
# --apply worker's origin_lsn on sub will be advanced to this INSERT lsn (say lsn1)
# --confirmed_flush on pub will also be advanced to same lsn (lsn1)
psql -d postgres -p $port_primary -c "INSERT INTO tab1 VALUES(2);"

sleep 1

# check both confirmed_flush and origin_lsn, they will be lsn1.
psql -d postgres -p $port_subscriber -c "select * from pg_replication_origin_status where local_id = 1;"
psql -d postgres -p $port_primary -c "SELECT slot_name, confirmed_flush_lsn FROM pg_replication_slots WHERE slot_name='logicalslot'"

# Now add another table to publication.
psql -d postgres -p $port_primary -c "CREATE TABLE tab2 (a int UNIQUE); ALTER PUBLICATION pub ADD TABLE tab2;"

psql -d postgres -p $port_subscriber -c "CREATE TABLE tab2 (a int UNIQUE);"

# Refresh the subscription.
# It will start tablesync for tab2. Tablesync worker will do initial copy
# but will wait before updating the state to SUBREL_STATE_SYNCWAIT due to 
# injection point table-sync-wait-2
psql -d postgres -p $port_subscriber -c "ALTER SUBSCRIPTION sub REFRESH PUBLICATION"

sleep 1

# Insert the data to tab2, it will not be consumed by tablesync worker on sub yet.
# Apply worker will see this change and will ignore it.
# Lets say the remote lsn for this change is lsn2.
psql -d postgres -p $port_primary -c "INSERT INTO tab2 VALUES(2);"
sleep 3

psql -d postgres -p $port_subscriber -c "select * from pg_replication_origin_status where local_id = 1;"

# By this time confirmed_flush must have moved to lsn2 now (where lsn2 > lsn1 )
# due to keepalive message handling in apply worker
psql -d postgres -p $port_primary -c "SELECT slot_name, confirmed_flush_lsn FROM pg_replication_slots WHERE slot_name='logicalslot'"

# Before we wake up table-sync worker to catchup, block apply worker
# before it does maybe_reread_subscription() and process_syncing_tables().
# This is to avoid moving the state to SUBREL_STATE_READY
psql -d postgres -p $port_subscriber -c "SELECT injection_points_attach('reread-sub', 'wait');"

# Wake up table sync worker, it will now catch-up and will consume
# the data inserted above in tab2. Table-sync worker's origin_lsn 
# will be lsn2 now. 
psql -d postgres -p $port_subscriber -c "SELECT injection_points_wakeup('table-sync-wait-2');SELECT injection_points_detach('table-sync-wait-2')"

psql -d postgres -p $port_subscriber -c "alter subscription sub disable;"

sleep 1

# Now wake up apply worker, it will re-read subscription and will exit due to sub
# being disabled before moving the state to SUBREL_STATE_READY
psql -d postgres -p $port_subscriber -c "SELECT injection_points_wakeup('reread-sub');SELECT injection_points_detach('reread-sub')"

sleep 1

psql -d postgres -p $port_subscriber -c "alter subscription sub enable;"

# Now let the apply worker start and move the state to SUBREL_STATE_READY 
# and let the table sync finish and exit now.
sleep 3

# Now the state is: 
# --table sync is finished on sub, changes are synced upton lsn2
# --apply worker has processed and ignored the changes upto lsn2 without updating origin_lsn
# --apply worker's origin_lsn at sub is still lsn1
# --confirmed_flush on pub is at lsn2
#
# Now disable sub and before we re-enable, stop walsender to process any more 
# replies or send any more keepalive.
psql -d postgres -p $port_primary -c "SELECT slot_name, confirmed_flush_lsn FROM pg_replication_slots WHERE slot_name='logicalslot'"
psql -d postgres -p $port_subscriber -c "alter subscription sub disable;"

sleep 1

psql -d postgres -p $port_primary -c "create extension injection_points;;SELECT injection_points_attach('process-replies', 'wait');"

psql -d postgres -p $port_subscriber -c "alter subscription sub enable;"

# Due to lack of any message from walsnder, let apply worker send feedback with 
# flush position as lsn1 (origin_lsn).
# But this will only be processed by walsender after we release injection point
# 'process-replies'.

sleep 1

# Check origin is still at lsn1 and confirmed_flush at lsn2
psql -d postgres -p $port_subscriber -c "select * from pg_replication_origin_status where local_id = 1;"
psql -d postgres -p $port_primary -c "SELECT slot_name, confirmed_flush_lsn FROM pg_replication_slots WHERE slot_name='logicalslot'"

# Disable the subscription  and release walsender's wait before ProcessRepliesIfAny()
psql -d postgres -p $port_subscriber -c "alter subscription sub disable;"

psql -d postgres -p $port_primary -c "; SELECT injection_points_wakeup('process-replies');SELECT injection_points_detach('process-replies');"

# After injection point is released, let previous walsender process the reply from apply worker
# and move the confirmed_flush to lsn1. It will then exit
sleep 1

# Check confirmed_flush is moved to lsn1
psql -d postgres -p $port_primary -c "SELECT slot_name, confirmed_flush_lsn FROM pg_replication_slots WHERE slot_name='logicalslot'"

# Now let new walsnder start streaming from lsn1. This will result in replay of 
# 'INSERT to tab2 (lsn2)' and data duplication in tab2.
psql -d postgres -p $port_subscriber -c "alter subscription sub enable;"

exit

Attachment: v1-0001-Injection-points-to-reproduce-the-confirmed_flush.patch
Description: Binary data

Attachment: v1-0001-Fix-confirmed_flush-backward-movement-issue.patch
Description: Binary data

Reply via email to