On 2017-12-23 21:06, Tomas Vondra wrote:
On 12/23/2017 03:03 PM, Erikjan Rijkers wrote:
On 2017-12-23 05:57, Tomas Vondra wrote:
Hi all,
Attached is a patch series that implements two features to the
logical
replication - ability to define a memory limit for the reorderbuffer
(responsible for building the decoded transactions), and ability to
stream large in-progress transactions (exceeding the memory limit).
logical replication of 2 instances is OK but 3 and up fail with:
TRAP: FailedAssertion("!(last_lsn < change->lsn)", File:
"reorderbuffer.c", Line: 1773)
I can cobble up a script but I hope you have enough from the assertion
to see what's going wrong...
The assertion says that the iterator produces changes in order that
does
not correlate with LSN. But I have a hard time understanding how that
could happen, particularly because according to the line number this
happens in ReorderBufferCommit(), i.e. the current (non-streaming)
case.
So instructions to reproduce the issue would be very helpful.
Using:
0001-Introduce-logical_work_mem-to-limit-ReorderBuffer-v2.patch
0002-Issue-XLOG_XACT_ASSIGNMENT-with-wal_level-logical-v2.patch
0003-Issue-individual-invalidations-with-wal_level-log-v2.patch
0004-Extend-the-output-plugin-API-with-stream-methods-v2.patch
0005-Implement-streaming-mode-in-ReorderBuffer-v2.patch
0006-Add-support-for-streaming-to-built-in-replication-v2.patch
As you expected the problem is the same with these new patches.
I have now tested more, and seen that it not always fails. I guess that
it here fails 3 times out of 4. But the laptop I'm using at the moment
is old and slow -- it may well be a factor as we've seen before [1].
Attached is the bash that I put together. I tested with
NUM_INSTANCES=2, which yields success, and NUM_INSTANCES=3, which fails
often. This same program run with HEAD never seems to fail (I tried a
few dozen times).
thanks,
Erik Rijkers
[1]
https://www.postgresql.org/message-id/3897361c7010c4ac03f358173adbcd60%40xs4all.nl
#!/bin/bash
unset PGSERVICE PGSERVICEFILE PGDATA PGPORT PGDATABASE
# PGPASSFILE must be set and have the appropriate entries
env | grep ^PG
PROJECT=large_logical
# PROJECT=HEAD
BIN_DIR=$HOME/pg_stuff/pg_installations/pgsql.$PROJECT/bin
POSTGRES=$BIN_DIR/postgres
INITDB=$BIN_DIR/initdb
TMP_DIR=$HOME'/tmp/'$PROJECT
devel_file=${TMP_DIR}'/.devel'
NUM_INSTANCES=3
BASE_PORT=6015 # ports 6015, 6016, 6017
port1=$(( $BASE_PORT + 0 ))
port2=$(( $port1 + 1 ))
port3=$(( $port1 + 2 ))
scale=1 dbname=postgres pubname=pub1 subname=sub1
if [[ ! -d $TMP_DIR ]]; then mkdir $TMP_DIR; fi
echo 's3kr1t' > $devel_file
max_wal_senders=10 # publication side
max_replication_slots=10 # publication side and subscription side
max_worker_processes=12 # subscription side
max_logical_replication_workers=10 # subscription side
max_sync_workers_per_subscription=4 # subscription side
for n in `seq 1 $NUM_INSTANCES`; do
port=$(( $BASE_PORT + $n -1 ))
data_dir=$TMP_DIR/pgsql.instance${n}/data
server_dir=$TMP_DIR/pgsql.instance${n}
$INITDB --pgdata=$data_dir --encoding=UTF8 --auth=scram-sha-256 --pwfile=$devel_file # --waldir=$xlog_dir
( $POSTGRES -D $data_dir -p $port \
--wal_level=logical \
--max_replication_slots=$max_replication_slots \
--max_worker_processes=$max_worker_processes \
--max_logical_replication_workers=$max_logical_replication_workers \
--max_wal_senders=$max_wal_senders \
--max_sync_workers_per_subscription=$max_sync_workers_per_subscription \
--logging_collector=on \
--log_directory=${server_dir} \
--log_filename=logfile.${port} \
--log_replication_commands=on \
--autovacuum=off & )
# --logical_work_mem=128MB & )
# pg_isready -d $dbname --timeout=60 -p $port
done
#sleep $NUM_INSTANCES
#pg_isready -d $dbname -qp 6015 --timeout=60
#pg_isready -d $dbname -qp 6016 --timeout=60
num_loop=$(( $NUM_INSTANCES - 1 ))
$BIN_DIR/pgbench --port=$BASE_PORT --quiet --initialize --scale=$scale $dbname
echo "alter table pgbench_history add column hid serial primary key" | $BIN_DIR/psql -d $dbname -p $BASE_PORT -X
#pg_isready -d $dbname -qp 6015 --timeout=60
#pg_isready -d $dbname -qp 6016 --timeout=60
for n in `seq 1 $num_loop`; do
target_port=$(( $BASE_PORT + $n ))
pg_dump -Fc -p $BASE_PORT \
--exclude-table-data=pgbench_history --exclude-table-data=pgbench_accounts \
--exclude-table-data=pgbench_branches --exclude-table-data=pgbench_tellers \
-tpgbench_history -tpgbench_accounts \
-tpgbench_branches -tpgbench_tellers \
$dbname | pg_restore -1 -p $target_port -d $dbname
done
#echo "sleep 2 (after dump/restore)"; sleep 2
for n in `seq 1 $num_loop`; do
pubport=$(( $BASE_PORT + $n - 1 ))
subport=$(( $BASE_PORT + $n ))
appname='casc:'${subport}'<'${pubport}
echo "create publication $pubname for all tables" | psql -d $dbname -p $pubport -X
echo "create subscription $subname
connection 'port=${pubport} dbname=${dbname} application_name=${appname}'
publication $pubname with (enabled=false, slot_name=${subname}_${subport});" | psql -d $dbname -p $subport -X
echo "alter subscription $subname enable; " | psql -d $dbname -p $subport -X
done
c_a1=$( echo "select count(*) from pgbench_accounts"|psql -d$dbname -qtAX -p$port1)
c_b1=$( echo "select count(*) from pgbench_branches"|psql -d$dbname -qtAX -p$port1)
c_t1=$( echo "select count(*) from pgbench_tellers "|psql -d$dbname -qtAX -p$port1)
c_h1=$( echo "select count(*) from pgbench_history "|psql -d$dbname -qtAX -p$port1)
mda1=$( echo "select aid,bid,abalance,filler from pgbench_accounts order by aid"|psql -d $dbname -qtAX -p$port1|md5sum|cut -b 1-7 )
mdb1=$( echo "select bid,bbalance,filler from pgbench_branches order by bid"|psql -d $dbname -qtAX -p$port1|md5sum|cut -b 1-7 )
mdt1=$( echo "select tid,bid,tbalance,filler from pgbench_tellers order by tid"|psql -d $dbname -qtAX -p$port1|md5sum|cut -b 1-7 )
mdh1=$( echo "select hid,bid,aid,delta,mtime,filler,hid from pgbench_history order by hid"|psql -d $dbname -qtAX -p$port1|md5sum|cut -b 1-7 )
md5_1=$(echo "$mda1 $mdb1 $mdt1 $mdh1" | md5sum | cut -b 1-7 )
printf "a,b,t,h $port1 %6d %6d %6d %6d $mda1 $mdb1 $mdt1 $mdh1 $md5_1\n" $c_a1 $c_b1 $c_t1 $c_h1
ver1=$( echo "select substring(version(),1,70)" | psql -d $dbname -qtAXp $port1 )
ver2=$( echo "select substring(version(),1,70)" | psql -d $dbname -qtAXp $port2 )
if [[ $NUM_INSTANCES -gt 2 ]]; then
ver3=$( echo "select substring(version(),1,70)" | psql -d $dbname -qtAXp $port3 )
fi
echo
rc=0
while [[ $rc -eq 0 ]]
do
mda2=$(echo "select aid,bid,abalance,filler from pgbench_accounts order by aid"|psql -d$dbname -qtAXp$port2|md5sum|cut -b 1-7)
mdb2=$(echo "select bid,bbalance,filler from pgbench_branches order by bid"|psql -d$dbname -qtAXp$port2|md5sum|cut -b 1-7)
mdt2=$(echo "select tid,bid,tbalance,filler from pgbench_tellers order by tid"|psql -d$dbname -qtAXp$port2|md5sum|cut -b 1-7)
mdh2=$(echo "select hid,bid,aid,delta,mtime,filler,hid from pgbench_history order by hid"|psql -d$dbname -qtAXp$port2|md5sum|cut -b 1-7)
c_a2=$(echo "select count(*) from pgbench_accounts"|psql -d$dbname -qtAX -p$port2)
c_b2=$(echo "select count(*) from pgbench_branches"|psql -d$dbname -qtAX -p$port2)
c_t2=$(echo "select count(*) from pgbench_tellers "|psql -d$dbname -qtAX -p$port2)
c_h2=$(echo "select count(*) from pgbench_history "|psql -d$dbname -qtAX -p$port2)
md5_2=$(echo "$mda2 $mdb2 $mdt2 $mdh2" | md5sum | cut -b 1-7 )
if [[ $NUM_INSTANCES -gt 2 ]]; then
mda3=$(echo "select aid,bid,abalance,filler from pgbench_accounts order by aid"|psql -d$dbname -qtAXp$port3|md5sum|cut -b 1-7)
mdb3=$(echo "select bid,bbalance,filler from pgbench_branches order by bid"|psql -d$dbname -qtAXp$port3|md5sum|cut -b 1-7)
mdt3=$(echo "select tid,bid,tbalance,filler from pgbench_tellers order by tid"|psql -d$dbname -qtAXp$port3|md5sum|cut -b 1-7)
mdh3=$(echo "select hid,bid,aid,delta,mtime,filler,hid from pgbench_history order by hid"|psql -d$dbname -qtAXp$port3|md5sum|cut -b 1-7)
c_a3=$(echo "select count(*) from pgbench_accounts"|psql -d$dbname -qtAX -p$port3)
c_b3=$(echo "select count(*) from pgbench_branches"|psql -d$dbname -qtAX -p$port3)
c_t3=$(echo "select count(*) from pgbench_tellers "|psql -d$dbname -qtAX -p$port3)
c_h3=$(echo "select count(*) from pgbench_history "|psql -d$dbname -qtAX -p$port3)
md5_3=$(echo "$mda3 $mdb3 $mdt3 $mdh3" | md5sum | cut -b 1-7 )
fi
echo "-- $POSTGRES"
printf "a,b,t,h $port1 %7d %6d %6d %6d $mda1 $mdb1 $mdt1 $mdh1 $md5_1 " $c_a1 $c_b1 $c_t1 $c_h1; echo "$ver1"
printf "a,b,t,h $port2 %7d %6d %6d %6d $mda2 $mdb2 $mdt2 $mdh2 $md5_2 " $c_a2 $c_b2 $c_t2 $c_h2; echo "$ver2"
if [[ $NUM_INSTANCES -gt 2 ]]; then
printf "a,b,t,h $port3 %7d %6d %6d %6d $mda3 $mdb3 $mdt3 $mdh3 $md5_3 " $c_a3 $c_b3 $c_t3 $c_h3; echo "$ver3"
fi
if [[ $NUM_INSTANCES -eq 2 ]]; then
if [[ "$md5_1" == "$md5_2" ]] ; then echo "OK - done."; break; fi
elif [[ $NUM_INSTANCES -eq 3 ]]; then
if [[ "$md5_1" == "$md5_3" ]] ; then echo "OK - done."; break; fi
fi
sleep 1
rc=$?
done
for n in `seq 1 $NUM_INSTANCES`; do
port=$(( $BASE_PORT + $n -1 ))
data_dir=$TMP_DIR/pgsql.instance${n}/data
$BIN_DIR/pg_ctl stop -w -D $data_dir
done