On 2017-12-23 21:06, Tomas Vondra wrote:
On 12/23/2017 03:03 PM, Erikjan Rijkers wrote:
On 2017-12-23 05:57, Tomas Vondra wrote:
Hi all,

Attached is a patch series that implements two features to the logical
replication - ability to define a memory limit for the reorderbuffer
(responsible for building the decoded transactions), and ability to
stream large in-progress transactions (exceeding the memory limit).


logical replication of 2 instances is OK but 3 and up fail with:

TRAP: FailedAssertion("!(last_lsn < change->lsn)", File:
"reorderbuffer.c", Line: 1773)

I can cobble up a script but I hope you have enough from the assertion
to see what's going wrong...

The assertion says that the iterator produces changes in order that does
not correlate with LSN. But I have a hard time understanding how that
could happen, particularly because according to the line number this
happens in ReorderBufferCommit(), i.e. the current (non-streaming) case.

So instructions to reproduce the issue would be very helpful.

Using:

0001-Introduce-logical_work_mem-to-limit-ReorderBuffer-v2.patch
0002-Issue-XLOG_XACT_ASSIGNMENT-with-wal_level-logical-v2.patch
0003-Issue-individual-invalidations-with-wal_level-log-v2.patch
0004-Extend-the-output-plugin-API-with-stream-methods-v2.patch
0005-Implement-streaming-mode-in-ReorderBuffer-v2.patch
0006-Add-support-for-streaming-to-built-in-replication-v2.patch

As you expected the problem is the same with these new patches.

I have now tested more, and seen that it not always fails. I guess that it here fails 3 times out of 4. But the laptop I'm using at the moment is old and slow -- it may well be a factor as we've seen before [1].

Attached is the bash that I put together. I tested with NUM_INSTANCES=2, which yields success, and NUM_INSTANCES=3, which fails often. This same program run with HEAD never seems to fail (I tried a few dozen times).

thanks,

Erik Rijkers


[1] https://www.postgresql.org/message-id/3897361c7010c4ac03f358173adbcd60%40xs4all.nl

#!/bin/bash
unset PGSERVICE PGSERVICEFILE PGDATA PGPORT PGDATABASE
# PGPASSFILE must be set and have the appropriate entries

env | grep ^PG

  PROJECT=large_logical
# PROJECT=HEAD

BIN_DIR=$HOME/pg_stuff/pg_installations/pgsql.$PROJECT/bin
POSTGRES=$BIN_DIR/postgres
INITDB=$BIN_DIR/initdb
TMP_DIR=$HOME'/tmp/'$PROJECT
devel_file=${TMP_DIR}'/.devel'
NUM_INSTANCES=3
BASE_PORT=6015  #   ports 6015, 6016, 6017 
port1=$(( $BASE_PORT + 0 ))
port2=$(( $port1 + 1 ))
port3=$(( $port1 + 2 ))
scale=1  dbname=postgres  pubname=pub1  subname=sub1
if [[ ! -d $TMP_DIR ]]; then mkdir $TMP_DIR; fi
echo 's3kr1t' > $devel_file
                  max_wal_senders=10  # publication side
            max_replication_slots=10  # publication side and subscription side
             max_worker_processes=12  # subscription side
  max_logical_replication_workers=10  # subscription side
max_sync_workers_per_subscription=4   # subscription side
for n in `seq 1 $NUM_INSTANCES`; do
  port=$(( $BASE_PORT + $n -1 ))
    data_dir=$TMP_DIR/pgsql.instance${n}/data
  server_dir=$TMP_DIR/pgsql.instance${n}
  $INITDB --pgdata=$data_dir --encoding=UTF8 --auth=scram-sha-256 --pwfile=$devel_file  # --waldir=$xlog_dir
 ( $POSTGRES -D $data_dir -p $port \
    --wal_level=logical \
    --max_replication_slots=$max_replication_slots \
    --max_worker_processes=$max_worker_processes \
    --max_logical_replication_workers=$max_logical_replication_workers \
    --max_wal_senders=$max_wal_senders \
    --max_sync_workers_per_subscription=$max_sync_workers_per_subscription \
    --logging_collector=on \
    --log_directory=${server_dir} \
    --log_filename=logfile.${port} \
    --log_replication_commands=on \
    --autovacuum=off & )
#   --logical_work_mem=128MB & )
#   pg_isready -d $dbname --timeout=60 -p $port
done
#sleep $NUM_INSTANCES
#pg_isready -d $dbname -qp 6015 --timeout=60
#pg_isready -d $dbname -qp 6016 --timeout=60
num_loop=$(( $NUM_INSTANCES - 1 ))
$BIN_DIR/pgbench --port=$BASE_PORT --quiet --initialize --scale=$scale $dbname
echo "alter table pgbench_history add column hid serial primary key" | $BIN_DIR/psql -d $dbname -p $BASE_PORT -X
#pg_isready -d $dbname -qp 6015 --timeout=60
#pg_isready -d $dbname -qp 6016 --timeout=60
for n in `seq 1 $num_loop`; do
  target_port=$(( $BASE_PORT + $n ))
  pg_dump -Fc -p $BASE_PORT \
    --exclude-table-data=pgbench_history  --exclude-table-data=pgbench_accounts \
    --exclude-table-data=pgbench_branches --exclude-table-data=pgbench_tellers \
    -tpgbench_history -tpgbench_accounts \
    -tpgbench_branches -tpgbench_tellers \
        $dbname | pg_restore -1 -p $target_port -d $dbname
done

#echo "sleep 2 (after dump/restore)"; sleep 2

for n in `seq 1 $num_loop`; do
  pubport=$(( $BASE_PORT + $n - 1 ))
  subport=$(( $BASE_PORT + $n     ))
  appname='casc:'${subport}'<'${pubport}
  echo "create publication  $pubname for all tables" | psql -d $dbname -p $pubport -X
  echo "create subscription $subname
        connection 'port=${pubport} dbname=${dbname} application_name=${appname}'
        publication $pubname with (enabled=false, slot_name=${subname}_${subport});" | psql -d $dbname -p $subport -X
  echo "alter subscription $subname enable; " | psql -d $dbname -p $subport -X
done
c_a1=$( echo "select count(*) from pgbench_accounts"|psql -d$dbname -qtAX -p$port1) 
c_b1=$( echo "select count(*) from pgbench_branches"|psql -d$dbname -qtAX -p$port1) 
c_t1=$( echo "select count(*) from pgbench_tellers "|psql -d$dbname -qtAX -p$port1) 
c_h1=$( echo "select count(*) from pgbench_history "|psql -d$dbname -qtAX -p$port1) 
mda1=$( echo "select aid,bid,abalance,filler            from pgbench_accounts order by aid"|psql -d $dbname -qtAX -p$port1|md5sum|cut -b 1-7 ) 
mdb1=$( echo "select bid,bbalance,filler                from pgbench_branches order by bid"|psql -d $dbname -qtAX -p$port1|md5sum|cut -b 1-7 )
mdt1=$( echo "select tid,bid,tbalance,filler            from pgbench_tellers  order by tid"|psql -d $dbname -qtAX -p$port1|md5sum|cut -b 1-7 )
mdh1=$( echo "select hid,bid,aid,delta,mtime,filler,hid from pgbench_history  order by hid"|psql -d $dbname -qtAX -p$port1|md5sum|cut -b 1-7 )
md5_1=$(echo "$mda1  $mdb1  $mdt1  $mdh1" | md5sum | cut -b 1-7 )
printf "a,b,t,h  $port1    %6d  %6d  %6d  %6d   $mda1  $mdb1  $mdt1  $mdh1     $md5_1\n" $c_a1  $c_b1  $c_t1  $c_h1
ver1=$( echo "select substring(version(),1,70)" | psql -d $dbname -qtAXp $port1 )
ver2=$( echo "select substring(version(),1,70)" | psql -d $dbname -qtAXp $port2 )
if [[ $NUM_INSTANCES -gt 2 ]]; then
ver3=$( echo "select substring(version(),1,70)" | psql -d $dbname -qtAXp $port3 )
fi
echo 
rc=0
while [[ $rc -eq 0 ]]
do
  mda2=$(echo "select aid,bid,abalance,filler            from pgbench_accounts order by aid"|psql -d$dbname -qtAXp$port2|md5sum|cut -b 1-7) 
  mdb2=$(echo "select bid,bbalance,filler                from pgbench_branches order by bid"|psql -d$dbname -qtAXp$port2|md5sum|cut -b 1-7)
  mdt2=$(echo "select tid,bid,tbalance,filler            from pgbench_tellers  order by tid"|psql -d$dbname -qtAXp$port2|md5sum|cut -b 1-7)
  mdh2=$(echo "select hid,bid,aid,delta,mtime,filler,hid from pgbench_history  order by hid"|psql -d$dbname -qtAXp$port2|md5sum|cut -b 1-7)
  c_a2=$(echo "select count(*) from pgbench_accounts"|psql -d$dbname -qtAX -p$port2) 
  c_b2=$(echo "select count(*) from pgbench_branches"|psql -d$dbname -qtAX -p$port2) 
  c_t2=$(echo "select count(*) from pgbench_tellers "|psql -d$dbname -qtAX -p$port2) 
  c_h2=$(echo "select count(*) from pgbench_history "|psql -d$dbname -qtAX -p$port2) 
  md5_2=$(echo "$mda2  $mdb2  $mdt2  $mdh2" | md5sum | cut -b 1-7 )
  if [[ $NUM_INSTANCES -gt 2 ]]; then
  mda3=$(echo "select aid,bid,abalance,filler            from pgbench_accounts order by aid"|psql -d$dbname -qtAXp$port3|md5sum|cut -b 1-7) 
  mdb3=$(echo "select bid,bbalance,filler                from pgbench_branches order by bid"|psql -d$dbname -qtAXp$port3|md5sum|cut -b 1-7)
  mdt3=$(echo "select tid,bid,tbalance,filler            from pgbench_tellers  order by tid"|psql -d$dbname -qtAXp$port3|md5sum|cut -b 1-7)
  mdh3=$(echo "select hid,bid,aid,delta,mtime,filler,hid from pgbench_history  order by hid"|psql -d$dbname -qtAXp$port3|md5sum|cut -b 1-7)
  c_a3=$(echo "select count(*) from pgbench_accounts"|psql -d$dbname -qtAX -p$port3) 
  c_b3=$(echo "select count(*) from pgbench_branches"|psql -d$dbname -qtAX -p$port3) 
  c_t3=$(echo "select count(*) from pgbench_tellers "|psql -d$dbname -qtAX -p$port3) 
  c_h3=$(echo "select count(*) from pgbench_history "|psql -d$dbname -qtAX -p$port3) 
  md5_3=$(echo "$mda3  $mdb3  $mdt3  $mdh3" | md5sum | cut -b 1-7 )
  fi
  echo "-- $POSTGRES"
  printf "a,b,t,h  $port1    %7d %6d %6d %6d    $mda1 $mdb1 $mdt1 $mdh1   $md5_1   " $c_a1 $c_b1 $c_t1 $c_h1; echo "$ver1"
  printf "a,b,t,h  $port2    %7d %6d %6d %6d    $mda2 $mdb2 $mdt2 $mdh2   $md5_2   " $c_a2 $c_b2 $c_t2 $c_h2; echo "$ver2"
  if [[ $NUM_INSTANCES -gt 2 ]]; then
  printf "a,b,t,h  $port3    %7d %6d %6d %6d    $mda3 $mdb3 $mdt3 $mdh3   $md5_3   " $c_a3 $c_b3 $c_t3 $c_h3; echo "$ver3"
  fi
  if   [[ $NUM_INSTANCES -eq 2 ]]; then
       if [[ "$md5_1" == "$md5_2" ]] ; then echo "OK - done."; break; fi
  elif [[ $NUM_INSTANCES -eq 3 ]]; then
       if [[ "$md5_1" == "$md5_3" ]] ; then echo "OK - done."; break; fi
  fi
  sleep 1
  rc=$?
done
for n in `seq 1 $NUM_INSTANCES`; do 
  port=$(( $BASE_PORT + $n -1 ))
  data_dir=$TMP_DIR/pgsql.instance${n}/data
  $BIN_DIR/pg_ctl stop -w -D $data_dir
done

Reply via email to